discord_fail2ban_alert/fail2ban_discord.py

116 lines
3.2 KiB
Python

import requests
import time
import os
import re
import sqlite3
from dotenv import load_dotenv
import ipinfo
# .env laden
load_dotenv()
LOGFILE = "/var/log/fail2ban.log"
WEBHOOK_URL = os.getenv("DISCORD_WEBHOOK_URL")
TOKEN = os.getenv("IPINFO_TOKEN")
#ipinfo handler
handler = ipinfo.getHandler(TOKEN)
# SQLite-Datenbank
DB_FILE = "bans.db"
def init_db():
conn = sqlite3.connect(DB_FILE)
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS bans (
ip TEXT PRIMARY KEY,
jail TEXT,
country TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
conn.close()
def is_ip_already_banned(ip):
conn = sqlite3.connect(DB_FILE)
c = conn.cursor()
c.execute("SELECT 1 FROM bans WHERE ip = ?", (ip,))
result = c.fetchone()
conn.close()
return result is not None
def save_ban(ip, jail, country):
conn = sqlite3.connect(DB_FILE)
c = conn.cursor()
c.execute("INSERT OR IGNORE INTO bans (ip, jail, country) VALUES (?, ?, ?)", (ip, jail, country))
conn.commit()
conn.close()
def get_country(ip):
try:
details = handler.getDetails(ip)
return details.country_name, details.country
except Exception as e:
print(f"[!] Fehler bei Geo-Abfrage: {e}")
return "Unbekannt", ""
def country_code_to_flag(country):
if not country or len(country) != 2:
return "No_Flag" # Discord-kompatibles Fallback
return f":flag_{country.lower()}:"
def send_discord_embed(ip, jail, country_name, country):
flag = country_code_to_flag(country)
embed = {
"title": "🚨 Neue IP gesperrt",
"color": 16711680,
"fields": [
{"name": "📛 Jail", "value": jail, "inline": True},
{"name": "🌍 IP-Adresse", "value": ip, "inline": True},
{"name": f"{flag} Herkunftsland", "value": f"{country_name} (`{country}`)", "inline": True}
],
"footer": {"text": "Fail2Ban Benachrichtigung"}
}
try:
resp = requests.post(WEBHOOK_URL, json={"embeds": [embed]})
if resp.status_code >= 400:
print(f"[!] Discord-Webhook fehlgeschlagen: {resp.status_code} - {resp.text}")
except Exception as e:
print(f"[!] Fehler beim Senden an Discord: {e}")
def tail_f(path):
with open(path, "r") as f:
f.seek(0, os.SEEK_END)
while True:
line = f.readline()
if not line:
time.sleep(0.2)
continue
yield line
def monitor_fail2ban():
log_lines = tail_f(LOGFILE)
ban_pattern = re.compile(r"Ban\s+(\d+\.\d+\.\d+\.\d+)")
jail_pattern = re.compile(r"\[(\w+)\]")
current_jail = "Unbekannt"
for line in log_lines:
jail_match = jail_pattern.search(line)
if jail_match:
current_jail = jail_match.group(1)
match = ban_pattern.search(line)
if match:
ip = match.group(1)
if not is_ip_already_banned(ip):
country_name, country_code = get_country(ip)
send_discord_embed(ip, current_jail, country_name, country_code)
save_ban(ip, current_jail, country_name)
if __name__ == "__main__":
init_db()
monitor_fail2ban()