commit 91453771254a4038e9725a4276a10708f10ddac9 Author: root Date: Sun May 25 13:01:19 2025 +0000 init repository diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e781761 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +*.log + +venv + +conf.json diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..0179c7e --- /dev/null +++ b/Makefile @@ -0,0 +1,18 @@ +ROOT_DIR := $(dir $(realpath $(lastword $(MAKEFILE_LIST)))) +PYTHON := $(ROOT_DIR)venv/bin/python + +install: + python3 -m venv venv + $(PYTHON) -m pip install -r requirements-dev.txt + +lint: + $(PYTHON) -m ruff check --fix + +format: + $(PYTHON) -m ruff format + +check-type: + $(PYTHON) -m mypy *.py + +check: format lint check-type + diff --git a/conf.json.example b/conf.json.example new file mode 100644 index 0000000..40dfa75 --- /dev/null +++ b/conf.json.example @@ -0,0 +1,14 @@ +{ + "rules": { + "codes": [ + 444 + ], + "contents": [ + "\\x", ".env", "php", ".git", ".js" + ], + "agents": [ + "bot" + ] + }, + "whitelist": [] +} diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..01bccea --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,29 @@ +[project] +name = "ngxden" +description = "Generate a deny Nginx conf based on Nginx access logs" + +authors = [ + {name = "rmanach", email = "manach.r@msn.com"}, +] +requires-python = ">= 3.10" + +[tool.ruff.lint] +select = ["E", "F", "I"] +ignore = [] + +[tool.ruff] +exclude = [ + "venv", +] + +line-length = 88 +target-version = "py311" + +[tool.ruff.lint.mccabe] +max-complexity = 10 + +[tool.mypy] +exclude = [ + "venv", +] +ignore_missing_imports = true diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..c882600 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,2 @@ +mypy==1.10.0 +ruff==0.4.6 \ No newline at end of file diff --git a/ufwban.py b/ufwban.py new file mode 100644 index 0000000..67410cc --- /dev/null +++ b/ufwban.py @@ -0,0 +1,343 @@ +import argparse +import json +import logging +import os +import subprocess +import sys +import time +from dataclasses import dataclass +from datetime import datetime as dt +from logging.handlers import RotatingFileHandler + +stdout_handler = logging.StreamHandler(stream=sys.stdout) +rotate_handler = RotatingFileHandler("ufwban.log", maxBytes=2 * 1024 * 1024) +logging.basicConfig( + format="[%(levelname)s] %(asctime)s - %(message)s", + level=logging.INFO, + handlers=(stdout_handler, rotate_handler), +) + +NGINX_ACCESS_LOGS_DIR = "/var/log/nginx" +UFW_CONF = "conf.json" + + +class UFW: + @staticmethod + def drop_all(): + logging.info("dropping all deny rules...") + while (ufw_deny_ips := UFW.list_deny()) and len(ufw_deny_ips): + for ip, id_ in ufw_deny_ips.items(): + logging.info(f"delete rule: id_: {id_} for {ip}") + UFW.delete_deny_ip(id_) + break + + @staticmethod + def reload(): + logging.info("reloading ufw...") + process = subprocess.run(["ufw", "reload"], capture_output=True) + + if process.returncode != 0: + raise Exception(f"unable to reload ufw, err={process.stderr!r}") + + @staticmethod + def delete_deny_ip(id_: str): + logging.info(f"cmd running: ufw delete {id_}") + process = subprocess.run( + ["ufw", "delete", id_], input=b"y\n", capture_output=True + ) + + if process.returncode != 0: + raise Exception( + f"unable to delete deny rule for id: {id_}, err={process.stderr!r}" + ) + + @staticmethod + def ban_ip(ip: str): + logging.info(f"cmd running: ufw deny from {ip}") + process = subprocess.run(["ufw", "deny", "from", ip], capture_output=True) + + if process.returncode != 0: + raise Exception(f"unable to deny for ip: {ip}, err={process.stderr!r}") + + @staticmethod + def list_deny() -> dict[str, str]: + ips = {} + cp = subprocess.run(["ufw", "status", "numbered"], capture_output=True) + if cp.returncode != 0: + raise Exception(f"unable to get ufw rules, err={cp.stderr!r}") + + idx = 0 + for rule in cp.stdout.decode().split("\n"): + # cut header + if idx <= 3: + idx += 1 + continue + + if "Anywhere" in rule and "DENY IN" in rule: + id_ = "" + ip = "" + feed_id = False + feed_ip = False + + for c in rule: + if c == "[": + feed_id = True + continue + + if c == "]": + feed_id = False + continue + + if not feed_id and id_ != "" and c >= "0" and c <= "9": + feed_ip = True + + if feed_id and c != " ": + id_ += c + + if feed_ip and c != " ": + ip += c + + ips[ip] = id_ + return ips + + +@dataclass(frozen=True, slots=True) +class Rules: + http_codes: list[str] + contents: list[str] + user_agents: list[str] + whitelist: list[str] + + @classmethod + def from_conf(cls) -> "Rules": + try: + with open(UFW_CONF, "r") as f: + conf = json.load(f) + except Exception as e: + raise Exception(f"unable to read {UFW_CONF}, err={e}") + + try: + return Rules( + conf["rules"]["codes"], + conf["rules"]["contents"], + conf["rules"]["agents"], + conf["whitelist"], + ) + except Exception as e: + raise Exception(f"unable to parse conf {UFW_CONF}, err={e}") + + def code_allowed(self, code: int) -> bool: + return code not in self.http_codes + + def content_allowed(self, content: str) -> bool: + for c in self.contents: + if c in content: + return False + return True + + def user_agent_allowed(self, user_agent: str) -> bool: + for u in self.user_agents: + if u in user_agent: + return False + return True + + def is_whitelist(self, ip: str) -> bool: + return ip in self.whitelist + + +@dataclass(frozen=True, slots=True) +class NginxLog: + ip: str + date: dt + request: str + code: int + source: str + user_agent: str + + def __repr__(self): + return f"""{self.date} | {self.ip} + code={self.code} + request={self.request} + source={self.source} + user-agent={self.user_agent}""" + + @classmethod + def from_raw(cls, raw: str) -> "NginxLog": + raw = raw.replace("\n", "") + content = [] + buf = "" + is_str = False + + for c in raw: + if c == '"': + if is_str: + is_str = False + content.append(buf) + buf = "" + continue + + is_str = True + continue + + if c == " " and not is_str: + content.append(buf) + buf = "" + continue + buf += c + + content = [c for c in content if c not in ("")] + + try: + source = content[8] + except IndexError: + logging.warning( + "unable to parse source from raw: %s", raw.replace("\n", "") + ) + source = "" + + try: + user_agent = content[9] + except IndexError: + logging.warning("unable to parse user-agent from raw: %s", raw) + user_agent = "" + + return NginxLog( + content[0], + dt.strptime(content[3], "[%d/%B/%Y:%H:%M:%S"), + content[5], + int(content[6]), + source, + user_agent, + ) + + +def parse_nginx_logs() -> list[NginxLog]: + logs: list[NginxLog] = [] + + files = [ + os.path.abspath(os.path.join(NGINX_ACCESS_LOGS_DIR, f)) + for f in os.listdir(NGINX_ACCESS_LOGS_DIR) + if all((f.startswith("access"), not f.endswith(".gz"))) + ] + for file in files: + with open(file, "r") as f: + while line := f.readline(): + logs.append(NginxLog.from_raw(line)) + + return logs + + +def get_logs_to_deny(logs: list[NginxLog], rules: Rules) -> dict[str, NginxLog]: + filter_logs: dict[str, NginxLog] = {} + + for log in logs: + if rules.is_whitelist(log.ip): + continue + + if filter_logs.get(log.ip) is not None: + continue + + if not rules.code_allowed(log.code): + filter_logs[log.ip] = log + continue + + if not rules.content_allowed(log.request.lower()): + filter_logs[log.ip] = log + continue + + if not rules.user_agent_allowed(log.user_agent.lower()): + filter_logs[log.ip] = log + continue + + return filter_logs + + +def main(refresh: bool = False, reload: bool = False, dry_run: bool = False): + rules = Rules.from_conf() + + logs = parse_nginx_logs() + logs_to_deny = get_logs_to_deny(logs, rules) + + if args.refresh and not args.dry_run: + UFW.drop_all() + return + + for ip, log in logs_to_deny.items(): + print(f"> banning log: {log}") + if not args.dry_run: + UFW.ban_ip(ip) + + if reload: + UFW.reload() + logging.info(f"{len(logs_to_deny)} ip banned") + + +def live(dry_run: bool = False): + rules = Rules.from_conf() + + for line in sys.stdin: + try: + log = NginxLog.from_raw(line) + except Exception as e: + logging.error(f"unable to parse Nginx log: {line}, err={e}") + continue + + if rules.is_whitelist(log.ip): + continue + + if not rules.code_allowed(log.code): + logging.info(f"banning log (http code not allowed): {log}") + if not dry_run: + UFW.ban_ip(log.ip) + continue + + if not rules.content_allowed(log.request.lower()): + logging.info(f"banning log (contents not allowed): {log}") + if not dry_run: + UFW.ban_ip(log.ip) + continue + + if not rules.user_agent_allowed(log.user_agent.lower()): + logging.info(f"banning log (user agent not allowed): {log}") + if not dry_run: + UFW.ban_ip(log.ip) + continue + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + "ufwban", description="Ban ip from Nginx access logs based on simple rules." + ) + + parser.add_argument("--dry-run", action="store_true", default=False) + parser.add_argument( + "--refresh", + action="store_true", + default=False, + help="Drop all the deny ip in the UFW table and return", + ) + parser.add_argument( + "--reload", action="store_true", default=False, help="Reload the UFW firewall" + ) + parser.add_argument( + "--live", action="store_true", default=False, help="Read inputs from stdin" + ) + args = parser.parse_args() + + logging.info("collecting and denying ip from Nginx access logs...") + start = time.perf_counter() + + exit_code = 0 + try: + if args.live: + live(args.dry_run) + else: + main(args.refresh, args.reload, args.dry_run) + except Exception as e: + exit_code = 1 + logging.fatal(f"unexpected error occurred, err={e}") + except KeyboardInterrupt: + logging.warning("ok, you just kill me..., bye") + + logging.info(f"ufwban done in elapsed time: {time.perf_counter() - start:.2f}s") + exit(exit_code)