import argparse import json import logging import os import subprocess import sys import time from dataclasses import dataclass from datetime import datetime as dt from logging.handlers import RotatingFileHandler stdout_handler = logging.StreamHandler(stream=sys.stdout) rotate_handler = RotatingFileHandler("ufwban.log", maxBytes=2 * 1024 * 1024) logging.basicConfig( format="[%(levelname)s] %(asctime)s - %(message)s", level=logging.INFO, handlers=(stdout_handler, rotate_handler), ) NGINX_DENY_CONF = "/etc/nginx/conf.d/blocked-ips.conf" NGINX_ACCESS_LOGS_DIR = "/var/log/nginx" UFW_CONF = "conf.json" class UFW: """Wraps ufw binary commands""" @staticmethod def drop_all(): """Remove all "DENY IN" rules.""" logging.info("dropping all deny rules...") while (ufw_deny_ips := UFW.list_deny()) and len(ufw_deny_ips): for ip, id_ in ufw_deny_ips.items(): logging.info(f"delete rule: id_: {id_} for {ip}") UFW.delete_deny_ip(id_) break @staticmethod def reload(): """Force ufw reload.""" logging.info("reloading ufw...") process = subprocess.run(["ufw", "reload"], capture_output=True) if process.returncode != 0: raise Exception(f"unable to reload ufw, err={process.stderr!r}") @staticmethod def delete_deny_ip(id_: str): """Delete a rule based on its id.""" logging.info(f"cmd running: ufw delete {id_}") process = subprocess.run( ["ufw", "delete", id_], input=b"y\n", capture_output=True ) if process.returncode != 0: raise Exception( f"unable to delete deny rule for id: {id_}, err={process.stderr!r}" ) @staticmethod def ban_ip(ip: str): """Deny an ip for any ports on the machine.""" logging.info(f"cmd running: ufw deny from {ip}") process = subprocess.run(["ufw", "deny", "from", ip], capture_output=True) if process.returncode != 0: raise Exception(f"unable to deny for ip: {ip}, err={process.stderr!r}") @staticmethod def list_deny() -> dict[str, str]: """ List all the denied ip. Return a dict mapping the ip with its ufw id. """ ips = {} cp = subprocess.run(["ufw", "status", "numbered"], capture_output=True) if cp.returncode != 0: raise Exception(f"unable to get ufw rules, err={cp.stderr!r}") idx = 0 for rule in cp.stdout.decode().split("\n"): # cut header if idx <= 3: idx += 1 continue if "Anywhere" in rule and "DENY IN" in rule: id_ = "" ip = "" feed_id = False feed_ip = False for c in rule: if c == "[": feed_id = True continue if c == "]": feed_id = False continue if not feed_id and id_ != "" and c >= "0" and c <= "9": feed_ip = True if feed_id and c != " ": id_ += c if feed_ip and c != " ": ip += c ips[ip] = id_ return ips @dataclass(frozen=True, slots=True) class Rules: http_codes: list[str] contents: list[str] user_agents: list[str] whitelist: list[str] @classmethod def from_conf(cls) -> "Rules": try: with open(UFW_CONF, "r") as f: conf = json.load(f) except Exception as e: raise Exception(f"unable to read {UFW_CONF}, err={e}") try: return Rules( conf["rules"]["codes"], conf["rules"]["contents"], conf["rules"]["agents"], conf["whitelist"], ) except Exception as e: raise Exception(f"unable to parse conf {UFW_CONF}, err={e}") def code_allowed(self, code: int) -> bool: return code not in self.http_codes def content_allowed(self, content: str) -> bool: for c in self.contents: if c in content: return False return True def user_agent_allowed(self, user_agent: str) -> bool: for u in self.user_agents: if u in user_agent: return False return True def is_whitelist(self, ip: str) -> bool: return ip in self.whitelist @dataclass(frozen=True, slots=True) class NginxLog: ip: str date: dt request: str code: int source: str user_agent: str def __repr__(self): return f"""{self.date} | {self.ip} code={self.code} request={self.request} source={self.source} user-agent={self.user_agent}""" @classmethod def from_raw(cls, raw: str) -> "NginxLog": raw = raw.replace("\n", "") content = [] buf = "" is_str = False for c in raw: if c == '"': if is_str: is_str = False content.append(buf) buf = "" continue is_str = True continue if c == " " and not is_str: content.append(buf) buf = "" continue buf += c content = [c for c in content if c not in ("")] try: source = content[8] except IndexError: logging.warning( "unable to parse source from raw: %s", raw.replace("\n", "") ) source = "" try: user_agent = content[9] except IndexError: logging.warning("unable to parse user-agent from raw: %s", raw) user_agent = "" return NginxLog( content[0], dt.strptime(content[3], "[%d/%b/%Y:%H:%M:%S"), content[5], int(content[6]), source, user_agent, ) def parse_nginx_logs() -> list[NginxLog]: logs: list[NginxLog] = [] files = [ os.path.abspath(os.path.join(NGINX_ACCESS_LOGS_DIR, f)) for f in os.listdir(NGINX_ACCESS_LOGS_DIR) if all((f.startswith("access"), not f.endswith(".gz"))) ] for file in files: with open(file, "r") as f: while line := f.readline(): logs.append(NginxLog.from_raw(line)) return logs def get_logs_to_deny(logs: list[NginxLog], rules: Rules) -> dict[str, NginxLog]: filter_logs: dict[str, NginxLog] = {} for log in logs: if rules.is_whitelist(log.ip): continue if filter_logs.get(log.ip) is not None: continue if not rules.code_allowed(log.code): filter_logs[log.ip] = log continue if not rules.content_allowed(log.request.lower()): filter_logs[log.ip] = log continue if not rules.user_agent_allowed(log.user_agent.lower()): filter_logs[log.ip] = log continue return filter_logs def get_nginx_denied_ips() -> list[str]: """Retrieve existing Nginx denied ips""" denied_ips = [] try: with open(NGINX_DENY_CONF, "r") as f: while line := f.readline(): parts = line.split(" ") if len(parts) != 2: logging.error(f"unable to parse line: {line} in {NGINX_DENY_CONF}") continue if parts[0].lower() != "deny": logging.warning(f"not a deny rule: {line} in {NGINX_DENY_CONF}") continue denied_ips.append(parts[1].removesuffix("\n")[:-1]) except FileNotFoundError: logging.warning(f"{NGINX_DENY_CONF} does not exist") return denied_ips def gen_nginx_conf(dry_run: bool = False): """Generate an Nginx conf with a list a IP to deny""" rules = Rules.from_conf() logs = parse_nginx_logs() ips_to_deny = [ip for ip in get_logs_to_deny(logs, rules).keys()] ips_denied = get_nginx_denied_ips() ips = [ ip for ip in set(ips_to_deny) | set(ips_denied) if not rules.is_whitelist(ip) ] if not len(ips): logging.info("no ip to deny") return with open(f"{NGINX_DENY_CONF}.tmp", "w") as f: for ip in ips: f.write(f"deny {ip};\n") f.write("allow all;\n") if not dry_run: os.rename(f"{NGINX_DENY_CONF}.tmp", f"{NGINX_DENY_CONF}") logging.info(f"deny Nginx conf installed: {NGINX_DENY_CONF}") def main(refresh: bool = False, reload: bool = False, dry_run: bool = False): rules = Rules.from_conf() logs = parse_nginx_logs() logs_to_deny = get_logs_to_deny(logs, rules) if refresh and not dry_run: UFW.drop_all() return for ip, log in logs_to_deny.items(): print(f"> banning log: {log}") if not dry_run: UFW.ban_ip(ip) if reload: UFW.reload() logging.info(f"{len(logs_to_deny)} ip banned") def live(dry_run: bool = False): rules = Rules.from_conf() for line in sys.stdin: try: log = NginxLog.from_raw(line) except Exception as e: logging.error(f"unable to parse Nginx log: {line}, err={e}") continue if rules.is_whitelist(log.ip): continue if not rules.code_allowed(log.code): logging.info(f"banning log (http code not allowed): {log}") if not dry_run: UFW.ban_ip(log.ip) continue if not rules.content_allowed(log.request.lower()): logging.info(f"banning log (contents not allowed): {log}") if not dry_run: UFW.ban_ip(log.ip) continue if not rules.user_agent_allowed(log.user_agent.lower()): logging.info(f"banning log (user agent not allowed): {log}") if not dry_run: UFW.ban_ip(log.ip) continue if __name__ == "__main__": parser = argparse.ArgumentParser( "ufwban", description="Ban ip from Nginx access logs based on simple rules." ) parser.add_argument("--dry-run", action="store_true", default=False) parser.add_argument( "--refresh", action="store_true", default=False, help="Drop all the deny ip in the UFW table and return", ) parser.add_argument( "--reload", action="store_true", default=False, help="Reload the UFW firewall" ) parser.add_argument( "--to-nginx", action="store_true", default=False, help="Generate an Nginx deny configuration", ) parser.add_argument( "--live", action="store_true", default=False, help="Read inputs from stdin" ) args = parser.parse_args() logging.info("collecting and denying ip from Nginx access logs...") start = time.perf_counter() exit_code = 0 try: if args.to_nginx: gen_nginx_conf(args.dry_run) elif args.live: live(args.dry_run) else: main(args.refresh, args.reload, args.dry_run) except Exception as e: exit_code = 1 logging.fatal(f"unexpected error occurred, err={e}") except KeyboardInterrupt: logging.warning("ok, you just kill me... Bye !") logging.info(f"ufwban done in elapsed time: {time.perf_counter() - start:.2f}s") exit(exit_code)