generate nginx deny ips conf
This commit is contained in:
parent
82e2ad8a39
commit
42c3a5e0a0
@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "ngxden"
|
||||
description = "Generate a deny Nginx conf based on Nginx access logs"
|
||||
name = "ufwban"
|
||||
description = "A little CLI tool that read Nginx access logs and block ip based on simple rules."
|
||||
|
||||
authors = [
|
||||
{name = "rmanach", email = "manach.r@msn.com"},
|
||||
|
||||
56
ufwban.py
56
ufwban.py
@ -17,6 +17,7 @@ logging.basicConfig(
|
||||
handlers=(stdout_handler, rotate_handler),
|
||||
)
|
||||
|
||||
NGINX_DENY_CONF = "/etc/nginx/conf.d/blocked-ips.conf"
|
||||
NGINX_ACCESS_LOGS_DIR = "/var/log/nginx"
|
||||
UFW_CONF = "conf.json"
|
||||
|
||||
@ -262,6 +263,51 @@ def get_logs_to_deny(logs: list[NginxLog], rules: Rules) -> dict[str, NginxLog]:
|
||||
return filter_logs
|
||||
|
||||
|
||||
def get_nginx_denied_ips() -> list[str]:
|
||||
denied_ips = []
|
||||
|
||||
try:
|
||||
with open(NGINX_DENY_CONF, "r") as f:
|
||||
while line := f.readline():
|
||||
parts = line.split(" ")
|
||||
if len(parts) != 2:
|
||||
logging.error(f"unable to parse line: {line} in {NGINX_DENY_CONF}")
|
||||
continue
|
||||
|
||||
if parts[0].lower() != "deny":
|
||||
logging.warning(f"not a deny rule: {line} in {NGINX_DENY_CONF}")
|
||||
continue
|
||||
|
||||
denied_ips.append(parts[2][:-1])
|
||||
except FileNotFoundError:
|
||||
logging.warning(f"{NGINX_DENY_CONF} does not exist")
|
||||
|
||||
return denied_ips
|
||||
|
||||
|
||||
def gen_nginx_conf(dry_run: bool = False):
|
||||
"""Generate an Nginx conf with a list a IP to deny"""
|
||||
rules = Rules.from_conf()
|
||||
|
||||
logs = parse_nginx_logs()
|
||||
ips_to_deny = [ip for ip in get_logs_to_deny(logs, rules).keys()]
|
||||
ips_denied = get_nginx_denied_ips()
|
||||
|
||||
ips = set(ips_to_deny) | set(ips_denied)
|
||||
if not len(ips):
|
||||
logging.info("no ip to deny")
|
||||
return
|
||||
|
||||
with open(f"{NGINX_DENY_CONF}.tmp", "w") as f:
|
||||
for ip in ips:
|
||||
f.write(f"deny {ip};\n")
|
||||
f.write("allow all;\n")
|
||||
|
||||
if not dry_run:
|
||||
os.rename(f"{NGINX_DENY_CONF}.tmp", f"{NGINX_DENY_CONF}")
|
||||
logging.info(f"deny Nginx conf installed: {NGINX_DENY_CONF}")
|
||||
|
||||
|
||||
def main(refresh: bool = False, reload: bool = False, dry_run: bool = False):
|
||||
rules = Rules.from_conf()
|
||||
|
||||
@ -329,6 +375,12 @@ if __name__ == "__main__":
|
||||
parser.add_argument(
|
||||
"--reload", action="store_true", default=False, help="Reload the UFW firewall"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--to-nginx",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Generate an Nginx deny configuration",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--live", action="store_true", default=False, help="Read inputs from stdin"
|
||||
)
|
||||
@ -339,7 +391,9 @@ if __name__ == "__main__":
|
||||
|
||||
exit_code = 0
|
||||
try:
|
||||
if args.live:
|
||||
if args.to_nginx:
|
||||
gen_nginx_conf()
|
||||
elif args.live:
|
||||
live(args.dry_run)
|
||||
else:
|
||||
main(args.refresh, args.reload, args.dry_run)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user