#!/usr/bin/env python3 """ test-firewall-syslog.py — Send FortiGate-style syslog test messages to Wazuh manager port 514/UDP. Usage: python3 scripts/test-firewall-syslog.py [--host HOST] [--port PORT] [--src-ip IP] [--scenario SCENARIO] python3 scripts/test-firewall-syslog.py --via-docker # send from inside container (avoids NAT) Examples: python3 scripts/test-firewall-syslog.py python3 scripts/test-firewall-syslog.py --via-docker python3 scripts/test-firewall-syslog.py --host 192.168.1.10 --src-ip 172.16.22.253 python3 scripts/test-firewall-syslog.py --scenario rdp python3 scripts/test-firewall-syslog.py --scenario all --delay 0.5 """ import argparse import socket import subprocess import time import datetime # ── Default target ──────────────────────────────────────────────────────────── DEFAULT_HOST = "127.0.0.1" DEFAULT_PORT = 514 DEFAULT_SRC_IP = "172.16.22.253" # must be in allowed-ips list WAZUH_CONTAINER = "wazuh-single-wazuh.manager-1" # ── FortiGate syslog scenarios ──────────────────────────────────────────────── def _ts(): now = datetime.datetime.now(datetime.timezone.utc) return now.strftime("%Y-%m-%d"), now.strftime("%H:%M:%S") def make_fortigate_log(src_ip, **fields): """Build a FortiGate v6 syslog line (key=value pairs).""" date, time_ = _ts() base = { "date": date, "time": time_, "devname": "FG-TEST-FW", "devid": "FGT60E0000000001", "logid": "0000000013", "type": "traffic", "subtype": "forward", "level": "notice", "vd": "root", "srcip": src_ip, "srcport": "54321", "srcintf": "port1", "dstip": "10.0.0.1", "dstport": "80", "dstintf": "port2", "policyid": "1", "proto": "6", "action": "accept", "service": "HTTP", "duration": "1", "sentbyte": "1024", "rcvdbyte": "2048", } base.update(fields) parts = [] for k, v in base.items(): if " " in str(v) or "=" in str(v): parts.append(f'{k}="{v}"') else: parts.append(f"{k}={v}") return " ".join(parts) SCENARIOS = { # A2-01: RDP traffic allowed "rdp": { "description": "A2-01 — RDP (3389) traffic allowed", "log": lambda src: make_fortigate_log(src, logid="0000000013", type="traffic", subtype="forward", dstport="3389", action="accept", service="RDP", ), }, # A2-02: Admin password change "password_change": { "description": "A2-02 — Admin password changed", "log": lambda src: make_fortigate_log(src, logid="0100032001", type="event", subtype="system", level="information", action="password-change", user="admin", msg="Change admin password", ), }, # A2-03: New admin account created "create_admin": { "description": "A2-03 — New admin account created", "log": lambda src: make_fortigate_log(src, logid="0100032002", type="event", subtype="system", level="information", action="create-admin", user="newadmin", msg="Create admin account", ), }, # A2-04: Alerting disabled via config change "disable_alert": { "description": "A2-04 — Alerting disabled (config-change)", "log": lambda src: make_fortigate_log(src, logid="0100032003", type="event", subtype="system", level="warning", action="config-change", config_value="disable", msg="Email alerting disabled", ), }, # A2-05: Config file downloaded "download_config": { "description": "A2-05 — Firewall config downloaded", "log": lambda src: make_fortigate_log(src, logid="0100032004", type="event", subtype="system", level="warning", action="download-config", user="admin", msg="Configuration backup downloaded", ), }, # A2-06: Multiple critical IPS signatures "ips_critical": { "description": "A2-06 — Multiple critical IPS signatures", "log": lambda src: make_fortigate_log(src, logid="0419016384", type="utm", subtype="ips", level="critical", action="dropped", attack="Multiple.Critical.Signatures", severity="critical", srcip="203.0.113.42", dstip="172.16.1.10", ), }, # A2-07: TCP port scan "port_scan": { "description": "A2-07 — TCP port scan detected", "log": lambda src: make_fortigate_log(src, logid="0419016385", type="utm", subtype="anomaly", level="critical", action="dropped", attack="TCP.Port.Scan", severity="critical", srcip="203.0.113.99", dstip="172.16.0.0", ), }, # A2-08: IPS IOC-based IP indicator "ioc_ip": { "description": "A2-08 — IPS IOC-based IP indicator", "log": lambda src: make_fortigate_log(src, logid="0419016386", type="utm", subtype="ips", level="critical", action="blocked", ioc_type="ip", ioc_value="198.51.100.42", srcip="198.51.100.42", dstip="172.16.1.5", ), }, # Generic traffic allow "traffic_allow": { "description": "Generic — Traffic allowed (HTTP)", "log": lambda src: make_fortigate_log(src, dstport="80", action="accept", service="HTTP", ), }, # Generic traffic deny "traffic_deny": { "description": "Generic — Traffic denied", "log": lambda src: make_fortigate_log(src, dstport="22", action="deny", service="SSH", ), }, } def _build_syslog_packet(message, src_ip, facility=16, severity=6): pri = (facility * 8) + severity _, time_ = _ts() month_day = datetime.datetime.now(datetime.timezone.utc).strftime("%b %d").replace(" 0", " ") return f"<{pri}>{month_day} {time_} {src_ip} {message}" def send_syslog_udp(host, port, message, src_ip): """Send directly via UDP socket (source IP will appear as Docker bridge on local test).""" packet = _build_syslog_packet(message, src_ip) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: sock.sendto(packet.encode("utf-8"), (host, port)) return True except OSError: return False finally: sock.close() def send_syslog_via_docker(message, src_ip, port): """Send UDP from inside the Wazuh container using /dev/udp — source IP is container's own IP.""" packet = _build_syslog_packet(message, src_ip) py_cmd = ( f"import socket; s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM); " f"s.sendto({repr(packet.encode())}, ('127.0.0.1',{port})); s.close()" ) result = subprocess.run( ["docker", "exec", WAZUH_CONTAINER, "python3", "-c", py_cmd], capture_output=True, timeout=5, ) return result.returncode == 0 def run_scenario(name, info, host, port, src_ip, via_docker): log = info["log"](src_ip) if via_docker: ok = send_syslog_via_docker(log, src_ip, port) else: ok = send_syslog_udp(host, port, log, src_ip) status = "✓" if ok else "✗" print(f" {status} {name:20s} {info['description']}") return ok def main(): parser = argparse.ArgumentParser(description="Send FortiGate syslog test messages to Wazuh") parser.add_argument("--host", default=DEFAULT_HOST, help=f"Wazuh manager host (default: {DEFAULT_HOST})") parser.add_argument("--port", default=DEFAULT_PORT, type=int, help=f"Syslog UDP port (default: {DEFAULT_PORT})") parser.add_argument("--src-ip", default=DEFAULT_SRC_IP, help=f"Simulated firewall source IP (default: {DEFAULT_SRC_IP})") parser.add_argument("--scenario", default="all", choices=list(SCENARIOS.keys()) + ["all"], help="Scenario to send (default: all)") parser.add_argument("--delay", default=0.2, type=float, help="Delay between messages in seconds (default: 0.2)") parser.add_argument("--repeat", default=1, type=int, help="Number of times to repeat each scenario (default: 1)") parser.add_argument("--via-docker", action="store_true", help="Send from inside the Wazuh container (avoids Docker NAT source-IP rewrite)") args = parser.parse_args() mode = f"docker exec {WAZUH_CONTAINER}" if args.via_docker else f"{args.host}:{args.port}" print(f"Wazuh syslog test — mode: {mode} src-ip: {args.src_ip}") print(f"{'─' * 65}") to_run = list(SCENARIOS.items()) if args.scenario == "all" else [(args.scenario, SCENARIOS[args.scenario])] total = ok_count = 0 for _ in range(args.repeat): for name, info in to_run: success = run_scenario(name, info, args.host, args.port, args.src_ip, args.via_docker) total += 1 ok_count += int(success) if args.delay: time.sleep(args.delay) print(f"{'─' * 65}") print(f"Sent {ok_count}/{total} messages") print() print("Verify with:") print(f" docker exec {WAZUH_CONTAINER} tail -f /var/ossec/logs/archives/archives.log | grep {args.src_ip}") if __name__ == "__main__": main()