411 lines
11 KiB
Python
411 lines
11 KiB
Python
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from dataclasses import dataclass
|
|
from datetime import datetime as dt
|
|
from logging.handlers import RotatingFileHandler
|
|
|
|
stdout_handler = logging.StreamHandler(stream=sys.stdout)
|
|
rotate_handler = RotatingFileHandler("ufwban.log", maxBytes=2 * 1024 * 1024)
|
|
logging.basicConfig(
|
|
format="[%(levelname)s] %(asctime)s - %(message)s",
|
|
level=logging.INFO,
|
|
handlers=(stdout_handler, rotate_handler),
|
|
)
|
|
|
|
NGINX_DENY_CONF = "/etc/nginx/conf.d/blocked-ips.conf"
|
|
NGINX_ACCESS_LOGS_DIR = "/var/log/nginx"
|
|
UFW_CONF = "conf.json"
|
|
|
|
|
|
class UFW:
|
|
"""Wraps ufw binary commands"""
|
|
|
|
@staticmethod
|
|
def drop_all():
|
|
"""Remove all "DENY IN" rules."""
|
|
logging.info("dropping all deny rules...")
|
|
while (ufw_deny_ips := UFW.list_deny()) and len(ufw_deny_ips):
|
|
for ip, id_ in ufw_deny_ips.items():
|
|
logging.info(f"delete rule: id_: {id_} for {ip}")
|
|
UFW.delete_deny_ip(id_)
|
|
break
|
|
|
|
@staticmethod
|
|
def reload():
|
|
"""Force ufw reload."""
|
|
logging.info("reloading ufw...")
|
|
process = subprocess.run(["ufw", "reload"], capture_output=True)
|
|
|
|
if process.returncode != 0:
|
|
raise Exception(f"unable to reload ufw, err={process.stderr!r}")
|
|
|
|
@staticmethod
|
|
def delete_deny_ip(id_: str):
|
|
"""Delete a rule based on its id."""
|
|
logging.info(f"cmd running: ufw delete {id_}")
|
|
process = subprocess.run(
|
|
["ufw", "delete", id_], input=b"y\n", capture_output=True
|
|
)
|
|
|
|
if process.returncode != 0:
|
|
raise Exception(
|
|
f"unable to delete deny rule for id: {id_}, err={process.stderr!r}"
|
|
)
|
|
|
|
@staticmethod
|
|
def ban_ip(ip: str):
|
|
"""Deny an ip for any ports on the machine."""
|
|
logging.info(f"cmd running: ufw deny from {ip}")
|
|
process = subprocess.run(["ufw", "deny", "from", ip], capture_output=True)
|
|
|
|
if process.returncode != 0:
|
|
raise Exception(f"unable to deny for ip: {ip}, err={process.stderr!r}")
|
|
|
|
@staticmethod
|
|
def list_deny() -> dict[str, str]:
|
|
"""
|
|
List all the denied ip.
|
|
Return a dict mapping the ip with its ufw id.
|
|
"""
|
|
ips = {}
|
|
cp = subprocess.run(["ufw", "status", "numbered"], capture_output=True)
|
|
if cp.returncode != 0:
|
|
raise Exception(f"unable to get ufw rules, err={cp.stderr!r}")
|
|
|
|
idx = 0
|
|
for rule in cp.stdout.decode().split("\n"):
|
|
# cut header
|
|
if idx <= 3:
|
|
idx += 1
|
|
continue
|
|
|
|
if "Anywhere" in rule and "DENY IN" in rule:
|
|
id_ = ""
|
|
ip = ""
|
|
feed_id = False
|
|
feed_ip = False
|
|
|
|
for c in rule:
|
|
if c == "[":
|
|
feed_id = True
|
|
continue
|
|
|
|
if c == "]":
|
|
feed_id = False
|
|
continue
|
|
|
|
if not feed_id and id_ != "" and c >= "0" and c <= "9":
|
|
feed_ip = True
|
|
|
|
if feed_id and c != " ":
|
|
id_ += c
|
|
|
|
if feed_ip and c != " ":
|
|
ip += c
|
|
|
|
ips[ip] = id_
|
|
return ips
|
|
|
|
|
|
@dataclass(frozen=True, slots=True)
|
|
class Rules:
|
|
http_codes: list[str]
|
|
contents: list[str]
|
|
user_agents: list[str]
|
|
whitelist: list[str]
|
|
|
|
@classmethod
|
|
def from_conf(cls) -> "Rules":
|
|
try:
|
|
with open(UFW_CONF, "r") as f:
|
|
conf = json.load(f)
|
|
except Exception as e:
|
|
raise Exception(f"unable to read {UFW_CONF}, err={e}")
|
|
|
|
try:
|
|
return Rules(
|
|
conf["rules"]["codes"],
|
|
conf["rules"]["contents"],
|
|
conf["rules"]["agents"],
|
|
conf["whitelist"],
|
|
)
|
|
except Exception as e:
|
|
raise Exception(f"unable to parse conf {UFW_CONF}, err={e}")
|
|
|
|
def code_allowed(self, code: int) -> bool:
|
|
return code not in self.http_codes
|
|
|
|
def content_allowed(self, content: str) -> bool:
|
|
for c in self.contents:
|
|
if c in content:
|
|
return False
|
|
return True
|
|
|
|
def user_agent_allowed(self, user_agent: str) -> bool:
|
|
for u in self.user_agents:
|
|
if u in user_agent:
|
|
return False
|
|
return True
|
|
|
|
def is_whitelist(self, ip: str) -> bool:
|
|
return ip in self.whitelist
|
|
|
|
|
|
@dataclass(frozen=True, slots=True)
|
|
class NginxLog:
|
|
ip: str
|
|
date: dt
|
|
request: str
|
|
code: int
|
|
source: str
|
|
user_agent: str
|
|
|
|
def __repr__(self):
|
|
return f"""{self.date} | {self.ip}
|
|
code={self.code}
|
|
request={self.request}
|
|
source={self.source}
|
|
user-agent={self.user_agent}"""
|
|
|
|
@classmethod
|
|
def from_raw(cls, raw: str) -> "NginxLog":
|
|
raw = raw.replace("\n", "")
|
|
content = []
|
|
buf = ""
|
|
is_str = False
|
|
|
|
for c in raw:
|
|
if c == '"':
|
|
if is_str:
|
|
is_str = False
|
|
content.append(buf)
|
|
buf = ""
|
|
continue
|
|
|
|
is_str = True
|
|
continue
|
|
|
|
if c == " " and not is_str:
|
|
content.append(buf)
|
|
buf = ""
|
|
continue
|
|
buf += c
|
|
|
|
content = [c for c in content if c not in ("")]
|
|
|
|
try:
|
|
source = content[8]
|
|
except IndexError:
|
|
logging.warning(
|
|
"unable to parse source from raw: %s", raw.replace("\n", "")
|
|
)
|
|
source = ""
|
|
|
|
try:
|
|
user_agent = content[9]
|
|
except IndexError:
|
|
logging.warning("unable to parse user-agent from raw: %s", raw)
|
|
user_agent = ""
|
|
|
|
return NginxLog(
|
|
content[0],
|
|
dt.strptime(content[3], "[%d/%b/%Y:%H:%M:%S"),
|
|
content[5],
|
|
int(content[6]),
|
|
source,
|
|
user_agent,
|
|
)
|
|
|
|
|
|
def parse_nginx_logs() -> list[NginxLog]:
|
|
logs: list[NginxLog] = []
|
|
|
|
files = [
|
|
os.path.abspath(os.path.join(NGINX_ACCESS_LOGS_DIR, f))
|
|
for f in os.listdir(NGINX_ACCESS_LOGS_DIR)
|
|
if all((f.startswith("access"), not f.endswith(".gz")))
|
|
]
|
|
for file in files:
|
|
with open(file, "r") as f:
|
|
while line := f.readline():
|
|
logs.append(NginxLog.from_raw(line))
|
|
|
|
return logs
|
|
|
|
|
|
def get_logs_to_deny(logs: list[NginxLog], rules: Rules) -> dict[str, NginxLog]:
|
|
filter_logs: dict[str, NginxLog] = {}
|
|
|
|
for log in logs:
|
|
if rules.is_whitelist(log.ip):
|
|
continue
|
|
|
|
if filter_logs.get(log.ip) is not None:
|
|
continue
|
|
|
|
if not rules.code_allowed(log.code):
|
|
filter_logs[log.ip] = log
|
|
continue
|
|
|
|
if not rules.content_allowed(log.request.lower()):
|
|
filter_logs[log.ip] = log
|
|
continue
|
|
|
|
if not rules.user_agent_allowed(log.user_agent.lower()):
|
|
filter_logs[log.ip] = log
|
|
continue
|
|
|
|
return filter_logs
|
|
|
|
|
|
def get_nginx_denied_ips() -> list[str]:
|
|
"""Retrieve existing Nginx denied ips"""
|
|
denied_ips = []
|
|
|
|
try:
|
|
with open(NGINX_DENY_CONF, "r") as f:
|
|
while line := f.readline():
|
|
parts = line.split(" ")
|
|
if len(parts) != 2:
|
|
logging.error(f"unable to parse line: {line} in {NGINX_DENY_CONF}")
|
|
continue
|
|
|
|
if parts[0].lower() != "deny":
|
|
logging.warning(f"not a deny rule: {line} in {NGINX_DENY_CONF}")
|
|
continue
|
|
|
|
denied_ips.append(parts[1].removesuffix("\n")[:-1])
|
|
except FileNotFoundError:
|
|
logging.warning(f"{NGINX_DENY_CONF} does not exist")
|
|
|
|
return denied_ips
|
|
|
|
|
|
def gen_nginx_conf(dry_run: bool = False):
|
|
"""Generate an Nginx conf with a list a IP to deny"""
|
|
rules = Rules.from_conf()
|
|
|
|
logs = parse_nginx_logs()
|
|
ips_to_deny = [ip for ip in get_logs_to_deny(logs, rules).keys()]
|
|
ips_denied = get_nginx_denied_ips()
|
|
|
|
ips = [
|
|
ip for ip in set(ips_to_deny) | set(ips_denied) if not rules.is_whitelist(ip)
|
|
]
|
|
if not len(ips):
|
|
logging.info("no ip to deny")
|
|
return
|
|
|
|
with open(f"{NGINX_DENY_CONF}.tmp", "w") as f:
|
|
for ip in ips:
|
|
f.write(f"deny {ip};\n")
|
|
f.write("allow all;\n")
|
|
|
|
if not dry_run:
|
|
os.rename(f"{NGINX_DENY_CONF}.tmp", f"{NGINX_DENY_CONF}")
|
|
logging.info(f"deny Nginx conf installed: {NGINX_DENY_CONF}")
|
|
|
|
|
|
def main(refresh: bool = False, reload: bool = False, dry_run: bool = False):
|
|
rules = Rules.from_conf()
|
|
|
|
logs = parse_nginx_logs()
|
|
logs_to_deny = get_logs_to_deny(logs, rules)
|
|
|
|
if refresh and not dry_run:
|
|
UFW.drop_all()
|
|
return
|
|
|
|
for ip, log in logs_to_deny.items():
|
|
print(f"> banning log: {log}")
|
|
if not dry_run:
|
|
UFW.ban_ip(ip)
|
|
|
|
if reload:
|
|
UFW.reload()
|
|
logging.info(f"{len(logs_to_deny)} ip banned")
|
|
|
|
|
|
def live(dry_run: bool = False):
|
|
rules = Rules.from_conf()
|
|
|
|
for line in sys.stdin:
|
|
try:
|
|
log = NginxLog.from_raw(line)
|
|
except Exception as e:
|
|
logging.error(f"unable to parse Nginx log: {line}, err={e}")
|
|
continue
|
|
|
|
if rules.is_whitelist(log.ip):
|
|
continue
|
|
|
|
if not rules.code_allowed(log.code):
|
|
logging.info(f"banning log (http code not allowed): {log}")
|
|
if not dry_run:
|
|
UFW.ban_ip(log.ip)
|
|
continue
|
|
|
|
if not rules.content_allowed(log.request.lower()):
|
|
logging.info(f"banning log (contents not allowed): {log}")
|
|
if not dry_run:
|
|
UFW.ban_ip(log.ip)
|
|
continue
|
|
|
|
if not rules.user_agent_allowed(log.user_agent.lower()):
|
|
logging.info(f"banning log (user agent not allowed): {log}")
|
|
if not dry_run:
|
|
UFW.ban_ip(log.ip)
|
|
continue
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(
|
|
"ufwban", description="Ban ip from Nginx access logs based on simple rules."
|
|
)
|
|
|
|
parser.add_argument("--dry-run", action="store_true", default=False)
|
|
parser.add_argument(
|
|
"--refresh",
|
|
action="store_true",
|
|
default=False,
|
|
help="Drop all the deny ip in the UFW table and return",
|
|
)
|
|
parser.add_argument(
|
|
"--reload", action="store_true", default=False, help="Reload the UFW firewall"
|
|
)
|
|
parser.add_argument(
|
|
"--to-nginx",
|
|
action="store_true",
|
|
default=False,
|
|
help="Generate an Nginx deny configuration",
|
|
)
|
|
parser.add_argument(
|
|
"--live", action="store_true", default=False, help="Read inputs from stdin"
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
logging.info("collecting and denying ip from Nginx access logs...")
|
|
start = time.perf_counter()
|
|
|
|
exit_code = 0
|
|
try:
|
|
if args.to_nginx:
|
|
gen_nginx_conf(args.dry_run)
|
|
elif args.live:
|
|
live(args.dry_run)
|
|
else:
|
|
main(args.refresh, args.reload, args.dry_run)
|
|
except Exception as e:
|
|
exit_code = 1
|
|
logging.fatal(f"unexpected error occurred, err={e}")
|
|
except KeyboardInterrupt:
|
|
logging.warning("ok, you just kill me... Bye !")
|
|
|
|
logging.info(f"ufwban done in elapsed time: {time.perf_counter() - start:.2f}s")
|
|
exit(exit_code)
|