init repository

This commit is contained in:
root 2025-05-25 13:01:19 +00:00
commit 9145377125
6 changed files with 411 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
*.log
venv
conf.json

18
Makefile Normal file
View File

@ -0,0 +1,18 @@
ROOT_DIR := $(dir $(realpath $(lastword $(MAKEFILE_LIST))))
PYTHON := $(ROOT_DIR)venv/bin/python
install:
python3 -m venv venv
$(PYTHON) -m pip install -r requirements-dev.txt
lint:
$(PYTHON) -m ruff check --fix
format:
$(PYTHON) -m ruff format
check-type:
$(PYTHON) -m mypy *.py
check: format lint check-type

14
conf.json.example Normal file
View File

@ -0,0 +1,14 @@
{
"rules": {
"codes": [
444
],
"contents": [
"\\x", ".env", "php", ".git", ".js"
],
"agents": [
"bot"
]
},
"whitelist": []
}

29
pyproject.toml Normal file
View File

@ -0,0 +1,29 @@
[project]
name = "ngxden"
description = "Generate a deny Nginx conf based on Nginx access logs"
authors = [
{name = "rmanach", email = "manach.r@msn.com"},
]
requires-python = ">= 3.10"
[tool.ruff.lint]
select = ["E", "F", "I"]
ignore = []
[tool.ruff]
exclude = [
"venv",
]
line-length = 88
target-version = "py311"
[tool.ruff.lint.mccabe]
max-complexity = 10
[tool.mypy]
exclude = [
"venv",
]
ignore_missing_imports = true

2
requirements-dev.txt Normal file
View File

@ -0,0 +1,2 @@
mypy==1.10.0
ruff==0.4.6

343
ufwban.py Normal file
View File

@ -0,0 +1,343 @@
import argparse
import json
import logging
import os
import subprocess
import sys
import time
from dataclasses import dataclass
from datetime import datetime as dt
from logging.handlers import RotatingFileHandler
stdout_handler = logging.StreamHandler(stream=sys.stdout)
rotate_handler = RotatingFileHandler("ufwban.log", maxBytes=2 * 1024 * 1024)
logging.basicConfig(
format="[%(levelname)s] %(asctime)s - %(message)s",
level=logging.INFO,
handlers=(stdout_handler, rotate_handler),
)
NGINX_ACCESS_LOGS_DIR = "/var/log/nginx"
UFW_CONF = "conf.json"
class UFW:
@staticmethod
def drop_all():
logging.info("dropping all deny rules...")
while (ufw_deny_ips := UFW.list_deny()) and len(ufw_deny_ips):
for ip, id_ in ufw_deny_ips.items():
logging.info(f"delete rule: id_: {id_} for {ip}")
UFW.delete_deny_ip(id_)
break
@staticmethod
def reload():
logging.info("reloading ufw...")
process = subprocess.run(["ufw", "reload"], capture_output=True)
if process.returncode != 0:
raise Exception(f"unable to reload ufw, err={process.stderr!r}")
@staticmethod
def delete_deny_ip(id_: str):
logging.info(f"cmd running: ufw delete {id_}")
process = subprocess.run(
["ufw", "delete", id_], input=b"y\n", capture_output=True
)
if process.returncode != 0:
raise Exception(
f"unable to delete deny rule for id: {id_}, err={process.stderr!r}"
)
@staticmethod
def ban_ip(ip: str):
logging.info(f"cmd running: ufw deny from {ip}")
process = subprocess.run(["ufw", "deny", "from", ip], capture_output=True)
if process.returncode != 0:
raise Exception(f"unable to deny for ip: {ip}, err={process.stderr!r}")
@staticmethod
def list_deny() -> dict[str, str]:
ips = {}
cp = subprocess.run(["ufw", "status", "numbered"], capture_output=True)
if cp.returncode != 0:
raise Exception(f"unable to get ufw rules, err={cp.stderr!r}")
idx = 0
for rule in cp.stdout.decode().split("\n"):
# cut header
if idx <= 3:
idx += 1
continue
if "Anywhere" in rule and "DENY IN" in rule:
id_ = ""
ip = ""
feed_id = False
feed_ip = False
for c in rule:
if c == "[":
feed_id = True
continue
if c == "]":
feed_id = False
continue
if not feed_id and id_ != "" and c >= "0" and c <= "9":
feed_ip = True
if feed_id and c != " ":
id_ += c
if feed_ip and c != " ":
ip += c
ips[ip] = id_
return ips
@dataclass(frozen=True, slots=True)
class Rules:
http_codes: list[str]
contents: list[str]
user_agents: list[str]
whitelist: list[str]
@classmethod
def from_conf(cls) -> "Rules":
try:
with open(UFW_CONF, "r") as f:
conf = json.load(f)
except Exception as e:
raise Exception(f"unable to read {UFW_CONF}, err={e}")
try:
return Rules(
conf["rules"]["codes"],
conf["rules"]["contents"],
conf["rules"]["agents"],
conf["whitelist"],
)
except Exception as e:
raise Exception(f"unable to parse conf {UFW_CONF}, err={e}")
def code_allowed(self, code: int) -> bool:
return code not in self.http_codes
def content_allowed(self, content: str) -> bool:
for c in self.contents:
if c in content:
return False
return True
def user_agent_allowed(self, user_agent: str) -> bool:
for u in self.user_agents:
if u in user_agent:
return False
return True
def is_whitelist(self, ip: str) -> bool:
return ip in self.whitelist
@dataclass(frozen=True, slots=True)
class NginxLog:
ip: str
date: dt
request: str
code: int
source: str
user_agent: str
def __repr__(self):
return f"""{self.date} | {self.ip}
code={self.code}
request={self.request}
source={self.source}
user-agent={self.user_agent}"""
@classmethod
def from_raw(cls, raw: str) -> "NginxLog":
raw = raw.replace("\n", "")
content = []
buf = ""
is_str = False
for c in raw:
if c == '"':
if is_str:
is_str = False
content.append(buf)
buf = ""
continue
is_str = True
continue
if c == " " and not is_str:
content.append(buf)
buf = ""
continue
buf += c
content = [c for c in content if c not in ("")]
try:
source = content[8]
except IndexError:
logging.warning(
"unable to parse source from raw: %s", raw.replace("\n", "")
)
source = ""
try:
user_agent = content[9]
except IndexError:
logging.warning("unable to parse user-agent from raw: %s", raw)
user_agent = ""
return NginxLog(
content[0],
dt.strptime(content[3], "[%d/%B/%Y:%H:%M:%S"),
content[5],
int(content[6]),
source,
user_agent,
)
def parse_nginx_logs() -> list[NginxLog]:
logs: list[NginxLog] = []
files = [
os.path.abspath(os.path.join(NGINX_ACCESS_LOGS_DIR, f))
for f in os.listdir(NGINX_ACCESS_LOGS_DIR)
if all((f.startswith("access"), not f.endswith(".gz")))
]
for file in files:
with open(file, "r") as f:
while line := f.readline():
logs.append(NginxLog.from_raw(line))
return logs
def get_logs_to_deny(logs: list[NginxLog], rules: Rules) -> dict[str, NginxLog]:
filter_logs: dict[str, NginxLog] = {}
for log in logs:
if rules.is_whitelist(log.ip):
continue
if filter_logs.get(log.ip) is not None:
continue
if not rules.code_allowed(log.code):
filter_logs[log.ip] = log
continue
if not rules.content_allowed(log.request.lower()):
filter_logs[log.ip] = log
continue
if not rules.user_agent_allowed(log.user_agent.lower()):
filter_logs[log.ip] = log
continue
return filter_logs
def main(refresh: bool = False, reload: bool = False, dry_run: bool = False):
rules = Rules.from_conf()
logs = parse_nginx_logs()
logs_to_deny = get_logs_to_deny(logs, rules)
if args.refresh and not args.dry_run:
UFW.drop_all()
return
for ip, log in logs_to_deny.items():
print(f"> banning log: {log}")
if not args.dry_run:
UFW.ban_ip(ip)
if reload:
UFW.reload()
logging.info(f"{len(logs_to_deny)} ip banned")
def live(dry_run: bool = False):
rules = Rules.from_conf()
for line in sys.stdin:
try:
log = NginxLog.from_raw(line)
except Exception as e:
logging.error(f"unable to parse Nginx log: {line}, err={e}")
continue
if rules.is_whitelist(log.ip):
continue
if not rules.code_allowed(log.code):
logging.info(f"banning log (http code not allowed): {log}")
if not dry_run:
UFW.ban_ip(log.ip)
continue
if not rules.content_allowed(log.request.lower()):
logging.info(f"banning log (contents not allowed): {log}")
if not dry_run:
UFW.ban_ip(log.ip)
continue
if not rules.user_agent_allowed(log.user_agent.lower()):
logging.info(f"banning log (user agent not allowed): {log}")
if not dry_run:
UFW.ban_ip(log.ip)
continue
if __name__ == "__main__":
parser = argparse.ArgumentParser(
"ufwban", description="Ban ip from Nginx access logs based on simple rules."
)
parser.add_argument("--dry-run", action="store_true", default=False)
parser.add_argument(
"--refresh",
action="store_true",
default=False,
help="Drop all the deny ip in the UFW table and return",
)
parser.add_argument(
"--reload", action="store_true", default=False, help="Reload the UFW firewall"
)
parser.add_argument(
"--live", action="store_true", default=False, help="Read inputs from stdin"
)
args = parser.parse_args()
logging.info("collecting and denying ip from Nginx access logs...")
start = time.perf_counter()
exit_code = 0
try:
if args.live:
live(args.dry_run)
else:
main(args.refresh, args.reload, args.dry_run)
except Exception as e:
exit_code = 1
logging.fatal(f"unexpected error occurred, err={e}")
except KeyboardInterrupt:
logging.warning("ok, you just kill me..., bye")
logging.info(f"ufwban done in elapsed time: {time.perf_counter() - start:.2f}s")
exit(exit_code)