ufwban/ufwban.py
2025-10-29 12:58:21 +00:00

411 lines
11 KiB
Python

import argparse
import json
import logging
import os
import subprocess
import sys
import time
from dataclasses import dataclass
from datetime import datetime as dt
from logging.handlers import RotatingFileHandler
stdout_handler = logging.StreamHandler(stream=sys.stdout)
rotate_handler = RotatingFileHandler("ufwban.log", maxBytes=2 * 1024 * 1024)
logging.basicConfig(
format="[%(levelname)s] %(asctime)s - %(message)s",
level=logging.INFO,
handlers=(stdout_handler, rotate_handler),
)
NGINX_DENY_CONF = "/etc/nginx/conf.d/blocked-ips.conf"
NGINX_ACCESS_LOGS_DIR = "/var/log/nginx"
UFW_CONF = "conf.json"
class UFW:
"""Wraps ufw binary commands"""
@staticmethod
def drop_all():
"""Remove all "DENY IN" rules."""
logging.info("dropping all deny rules...")
while (ufw_deny_ips := UFW.list_deny()) and len(ufw_deny_ips):
for ip, id_ in ufw_deny_ips.items():
logging.info(f"delete rule: id_: {id_} for {ip}")
UFW.delete_deny_ip(id_)
break
@staticmethod
def reload():
"""Force ufw reload."""
logging.info("reloading ufw...")
process = subprocess.run(["ufw", "reload"], capture_output=True)
if process.returncode != 0:
raise Exception(f"unable to reload ufw, err={process.stderr!r}")
@staticmethod
def delete_deny_ip(id_: str):
"""Delete a rule based on its id."""
logging.info(f"cmd running: ufw delete {id_}")
process = subprocess.run(
["ufw", "delete", id_], input=b"y\n", capture_output=True
)
if process.returncode != 0:
raise Exception(
f"unable to delete deny rule for id: {id_}, err={process.stderr!r}"
)
@staticmethod
def ban_ip(ip: str):
"""Deny an ip for any ports on the machine."""
logging.info(f"cmd running: ufw deny from {ip}")
process = subprocess.run(["ufw", "deny", "from", ip], capture_output=True)
if process.returncode != 0:
raise Exception(f"unable to deny for ip: {ip}, err={process.stderr!r}")
@staticmethod
def list_deny() -> dict[str, str]:
"""
List all the denied ip.
Return a dict mapping the ip with its ufw id.
"""
ips = {}
cp = subprocess.run(["ufw", "status", "numbered"], capture_output=True)
if cp.returncode != 0:
raise Exception(f"unable to get ufw rules, err={cp.stderr!r}")
idx = 0
for rule in cp.stdout.decode().split("\n"):
# cut header
if idx <= 3:
idx += 1
continue
if "Anywhere" in rule and "DENY IN" in rule:
id_ = ""
ip = ""
feed_id = False
feed_ip = False
for c in rule:
if c == "[":
feed_id = True
continue
if c == "]":
feed_id = False
continue
if not feed_id and id_ != "" and c >= "0" and c <= "9":
feed_ip = True
if feed_id and c != " ":
id_ += c
if feed_ip and c != " ":
ip += c
ips[ip] = id_
return ips
@dataclass(frozen=True, slots=True)
class Rules:
http_codes: list[str]
contents: list[str]
user_agents: list[str]
whitelist: list[str]
@classmethod
def from_conf(cls) -> "Rules":
try:
with open(UFW_CONF, "r") as f:
conf = json.load(f)
except Exception as e:
raise Exception(f"unable to read {UFW_CONF}, err={e}")
try:
return Rules(
conf["rules"]["codes"],
conf["rules"]["contents"],
conf["rules"]["agents"],
conf["whitelist"],
)
except Exception as e:
raise Exception(f"unable to parse conf {UFW_CONF}, err={e}")
def code_allowed(self, code: int) -> bool:
return code not in self.http_codes
def content_allowed(self, content: str) -> bool:
for c in self.contents:
if c in content:
return False
return True
def user_agent_allowed(self, user_agent: str) -> bool:
for u in self.user_agents:
if u in user_agent:
return False
return True
def is_whitelist(self, ip: str) -> bool:
return ip in self.whitelist
@dataclass(frozen=True, slots=True)
class NginxLog:
ip: str
date: dt
request: str
code: int
source: str
user_agent: str
def __repr__(self):
return f"""{self.date} | {self.ip}
code={self.code}
request={self.request}
source={self.source}
user-agent={self.user_agent}"""
@classmethod
def from_raw(cls, raw: str) -> "NginxLog":
raw = raw.replace("\n", "")
content = []
buf = ""
is_str = False
for c in raw:
if c == '"':
if is_str:
is_str = False
content.append(buf)
buf = ""
continue
is_str = True
continue
if c == " " and not is_str:
content.append(buf)
buf = ""
continue
buf += c
content = [c for c in content if c not in ("")]
try:
source = content[8]
except IndexError:
logging.warning(
"unable to parse source from raw: %s", raw.replace("\n", "")
)
source = ""
try:
user_agent = content[9]
except IndexError:
logging.warning("unable to parse user-agent from raw: %s", raw)
user_agent = ""
return NginxLog(
content[0],
dt.strptime(content[3], "[%d/%b/%Y:%H:%M:%S"),
content[5],
int(content[6]),
source,
user_agent,
)
def parse_nginx_logs() -> list[NginxLog]:
logs: list[NginxLog] = []
files = [
os.path.abspath(os.path.join(NGINX_ACCESS_LOGS_DIR, f))
for f in os.listdir(NGINX_ACCESS_LOGS_DIR)
if all((f.startswith("access"), not f.endswith(".gz")))
]
for file in files:
with open(file, "r") as f:
while line := f.readline():
logs.append(NginxLog.from_raw(line))
return logs
def get_logs_to_deny(logs: list[NginxLog], rules: Rules) -> dict[str, NginxLog]:
filter_logs: dict[str, NginxLog] = {}
for log in logs:
if rules.is_whitelist(log.ip):
continue
if filter_logs.get(log.ip) is not None:
continue
if not rules.code_allowed(log.code):
filter_logs[log.ip] = log
continue
if not rules.content_allowed(log.request.lower()):
filter_logs[log.ip] = log
continue
if not rules.user_agent_allowed(log.user_agent.lower()):
filter_logs[log.ip] = log
continue
return filter_logs
def get_nginx_denied_ips() -> list[str]:
"""Retrieve existing Nginx denied ips"""
denied_ips = []
try:
with open(NGINX_DENY_CONF, "r") as f:
while line := f.readline():
parts = line.split(" ")
if len(parts) != 2:
logging.error(f"unable to parse line: {line} in {NGINX_DENY_CONF}")
continue
if parts[0].lower() != "deny":
logging.warning(f"not a deny rule: {line} in {NGINX_DENY_CONF}")
continue
denied_ips.append(parts[1].removesuffix("\n")[:-1])
except FileNotFoundError:
logging.warning(f"{NGINX_DENY_CONF} does not exist")
return denied_ips
def gen_nginx_conf(dry_run: bool = False):
"""Generate an Nginx conf with a list a IP to deny"""
rules = Rules.from_conf()
logs = parse_nginx_logs()
ips_to_deny = [ip for ip in get_logs_to_deny(logs, rules).keys()]
ips_denied = get_nginx_denied_ips()
ips = [
ip for ip in set(ips_to_deny) | set(ips_denied) if not rules.is_whitelist(ip)
]
if not len(ips):
logging.info("no ip to deny")
return
with open(f"{NGINX_DENY_CONF}.tmp", "w") as f:
for ip in ips:
f.write(f"deny {ip};\n")
f.write("allow all;\n")
if not dry_run:
os.rename(f"{NGINX_DENY_CONF}.tmp", f"{NGINX_DENY_CONF}")
logging.info(f"deny Nginx conf installed: {NGINX_DENY_CONF}")
def main(refresh: bool = False, reload: bool = False, dry_run: bool = False):
rules = Rules.from_conf()
logs = parse_nginx_logs()
logs_to_deny = get_logs_to_deny(logs, rules)
if refresh and not dry_run:
UFW.drop_all()
return
for ip, log in logs_to_deny.items():
print(f"> banning log: {log}")
if not dry_run:
UFW.ban_ip(ip)
if reload:
UFW.reload()
logging.info(f"{len(logs_to_deny)} ip banned")
def live(dry_run: bool = False):
rules = Rules.from_conf()
for line in sys.stdin:
try:
log = NginxLog.from_raw(line)
except Exception as e:
logging.error(f"unable to parse Nginx log: {line}, err={e}")
continue
if rules.is_whitelist(log.ip):
continue
if not rules.code_allowed(log.code):
logging.info(f"banning log (http code not allowed): {log}")
if not dry_run:
UFW.ban_ip(log.ip)
continue
if not rules.content_allowed(log.request.lower()):
logging.info(f"banning log (contents not allowed): {log}")
if not dry_run:
UFW.ban_ip(log.ip)
continue
if not rules.user_agent_allowed(log.user_agent.lower()):
logging.info(f"banning log (user agent not allowed): {log}")
if not dry_run:
UFW.ban_ip(log.ip)
continue
if __name__ == "__main__":
parser = argparse.ArgumentParser(
"ufwban", description="Ban ip from Nginx access logs based on simple rules."
)
parser.add_argument("--dry-run", action="store_true", default=False)
parser.add_argument(
"--refresh",
action="store_true",
default=False,
help="Drop all the deny ip in the UFW table and return",
)
parser.add_argument(
"--reload", action="store_true", default=False, help="Reload the UFW firewall"
)
parser.add_argument(
"--to-nginx",
action="store_true",
default=False,
help="Generate an Nginx deny configuration",
)
parser.add_argument(
"--live", action="store_true", default=False, help="Read inputs from stdin"
)
args = parser.parse_args()
logging.info("collecting and denying ip from Nginx access logs...")
start = time.perf_counter()
exit_code = 0
try:
if args.to_nginx:
gen_nginx_conf(args.dry_run)
elif args.live:
live(args.dry_run)
else:
main(args.refresh, args.reload, args.dry_run)
except Exception as e:
exit_code = 1
logging.fatal(f"unexpected error occurred, err={e}")
except KeyboardInterrupt:
logging.warning("ok, you just kill me... Bye !")
logging.info(f"ufwban done in elapsed time: {time.perf_counter() - start:.2f}s")
exit(exit_code)