add README
This commit is contained in:
parent
9145377125
commit
d780f2ba6c
61
README.md
Normal file
61
README.md
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
# ufwban
|
||||||
|
|
||||||
|
A little CLI tool (wrapping **ufw**) that read Nginx access logs and block ip based on simple rules.
|
||||||
|
**THIS IS NOT** a replacement of **fail2ban**, i just do it for fun and to have a simple tool to configure quickly banning undesired IP.
|
||||||
|
|
||||||
|
This a **RUDE** IP deny. It will ban the IP on all machine ports with no ban time, so be careful !
|
||||||
|
|
||||||
|
## Requirements
|
||||||
|
* ufw
|
||||||
|
* Nginx
|
||||||
|
* Python >= 3.10
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
Create a `conf.json` next to the script. Use the [conf.json.example](./conf.json.example) for sample.
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"rules": {
|
||||||
|
"codes": [],
|
||||||
|
"contents": [],
|
||||||
|
"agents": []
|
||||||
|
},
|
||||||
|
"whitelist": []
|
||||||
|
}
|
||||||
|
```
|
||||||
|
* **codes**: List of unauthorized HTTP codes
|
||||||
|
* **contents**: string parts that are not not allowed in the request URL (ex: /x00, .json, .php, .env.local, etc...)
|
||||||
|
* **agents** string parts that are not not allowed in user-agent (ex: bot)
|
||||||
|
* **whitelist**: List of IP to whitelist (ex: 192.168.1.1)
|
||||||
|
|
||||||
|
## Run
|
||||||
|
```bash
|
||||||
|
usage: ufwban [-h] [--dry-run] [--refresh] [--reload] [--live]
|
||||||
|
|
||||||
|
Ban ip from Nginx access logs based on simple rules.
|
||||||
|
|
||||||
|
options:
|
||||||
|
-h, --help show this help message and exit
|
||||||
|
--dry-run
|
||||||
|
--refresh Drop all the deny ip in the UFW table and return
|
||||||
|
--reload Reload the UFW firewall
|
||||||
|
--live Read inputs from stdin
|
||||||
|
```
|
||||||
|
|
||||||
|
* Batch mode:
|
||||||
|
```bash
|
||||||
|
# read all access.log*, parse Nginx log and ban ip
|
||||||
|
python ufwban.py
|
||||||
|
|
||||||
|
# you can launch a --dry-run mode to see which ip is going to be denied
|
||||||
|
python ufwban.py --dry-run
|
||||||
|
|
||||||
|
# drop all "DENY IN" ufw rules (be careful)
|
||||||
|
python ufwban.py --refresh
|
||||||
|
```
|
||||||
|
|
||||||
|
* Live mode:
|
||||||
|
```bash
|
||||||
|
# Read and parse Nginx logs on each new entry and ban ip
|
||||||
|
tail -f /var/log/nginx/access.log | python ufwban.py
|
||||||
|
```
|
||||||
16
ufwban.py
16
ufwban.py
@ -22,8 +22,11 @@ UFW_CONF = "conf.json"
|
|||||||
|
|
||||||
|
|
||||||
class UFW:
|
class UFW:
|
||||||
|
"""Wraps ufw binary commands"""
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def drop_all():
|
def drop_all():
|
||||||
|
"""Remove all "DENY IN" rules."""
|
||||||
logging.info("dropping all deny rules...")
|
logging.info("dropping all deny rules...")
|
||||||
while (ufw_deny_ips := UFW.list_deny()) and len(ufw_deny_ips):
|
while (ufw_deny_ips := UFW.list_deny()) and len(ufw_deny_ips):
|
||||||
for ip, id_ in ufw_deny_ips.items():
|
for ip, id_ in ufw_deny_ips.items():
|
||||||
@ -33,6 +36,7 @@ class UFW:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def reload():
|
def reload():
|
||||||
|
"""Force ufw reload."""
|
||||||
logging.info("reloading ufw...")
|
logging.info("reloading ufw...")
|
||||||
process = subprocess.run(["ufw", "reload"], capture_output=True)
|
process = subprocess.run(["ufw", "reload"], capture_output=True)
|
||||||
|
|
||||||
@ -41,6 +45,7 @@ class UFW:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def delete_deny_ip(id_: str):
|
def delete_deny_ip(id_: str):
|
||||||
|
"""Delete a rule based on its id."""
|
||||||
logging.info(f"cmd running: ufw delete {id_}")
|
logging.info(f"cmd running: ufw delete {id_}")
|
||||||
process = subprocess.run(
|
process = subprocess.run(
|
||||||
["ufw", "delete", id_], input=b"y\n", capture_output=True
|
["ufw", "delete", id_], input=b"y\n", capture_output=True
|
||||||
@ -53,6 +58,7 @@ class UFW:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def ban_ip(ip: str):
|
def ban_ip(ip: str):
|
||||||
|
"""Deny an ip for any ports on the machine."""
|
||||||
logging.info(f"cmd running: ufw deny from {ip}")
|
logging.info(f"cmd running: ufw deny from {ip}")
|
||||||
process = subprocess.run(["ufw", "deny", "from", ip], capture_output=True)
|
process = subprocess.run(["ufw", "deny", "from", ip], capture_output=True)
|
||||||
|
|
||||||
@ -61,6 +67,10 @@ class UFW:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def list_deny() -> dict[str, str]:
|
def list_deny() -> dict[str, str]:
|
||||||
|
"""
|
||||||
|
List all the denied ip.
|
||||||
|
Return a dict mapping the ip with its ufw id.
|
||||||
|
"""
|
||||||
ips = {}
|
ips = {}
|
||||||
cp = subprocess.run(["ufw", "status", "numbered"], capture_output=True)
|
cp = subprocess.run(["ufw", "status", "numbered"], capture_output=True)
|
||||||
if cp.returncode != 0:
|
if cp.returncode != 0:
|
||||||
@ -258,13 +268,13 @@ def main(refresh: bool = False, reload: bool = False, dry_run: bool = False):
|
|||||||
logs = parse_nginx_logs()
|
logs = parse_nginx_logs()
|
||||||
logs_to_deny = get_logs_to_deny(logs, rules)
|
logs_to_deny = get_logs_to_deny(logs, rules)
|
||||||
|
|
||||||
if args.refresh and not args.dry_run:
|
if refresh and not dry_run:
|
||||||
UFW.drop_all()
|
UFW.drop_all()
|
||||||
return
|
return
|
||||||
|
|
||||||
for ip, log in logs_to_deny.items():
|
for ip, log in logs_to_deny.items():
|
||||||
print(f"> banning log: {log}")
|
print(f"> banning log: {log}")
|
||||||
if not args.dry_run:
|
if not dry_run:
|
||||||
UFW.ban_ip(ip)
|
UFW.ban_ip(ip)
|
||||||
|
|
||||||
if reload:
|
if reload:
|
||||||
@ -337,7 +347,7 @@ if __name__ == "__main__":
|
|||||||
exit_code = 1
|
exit_code = 1
|
||||||
logging.fatal(f"unexpected error occurred, err={e}")
|
logging.fatal(f"unexpected error occurred, err={e}")
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
logging.warning("ok, you just kill me..., bye")
|
logging.warning("ok, you just kill me... Bye !")
|
||||||
|
|
||||||
logging.info(f"ufwban done in elapsed time: {time.perf_counter() - start:.2f}s")
|
logging.info(f"ufwban done in elapsed time: {time.perf_counter() - start:.2f}s")
|
||||||
exit(exit_code)
|
exit(exit_code)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user