| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 |
- #!/usr/bin/env python3
- """
- test-firewall-syslog.py — Send FortiGate-style syslog test messages to Wazuh manager port 514/UDP.
- Usage:
- python3 scripts/test-firewall-syslog.py [--host HOST] [--port PORT] [--src-ip IP] [--scenario SCENARIO]
- python3 scripts/test-firewall-syslog.py --via-docker # send from inside container (avoids NAT)
- Examples:
- python3 scripts/test-firewall-syslog.py
- python3 scripts/test-firewall-syslog.py --via-docker
- python3 scripts/test-firewall-syslog.py --host 192.168.1.10 --src-ip 172.16.22.253
- python3 scripts/test-firewall-syslog.py --scenario rdp
- python3 scripts/test-firewall-syslog.py --scenario all --delay 0.5
- """
- import argparse
- import socket
- import subprocess
- import time
- import datetime
- # ── Default target ────────────────────────────────────────────────────────────
- DEFAULT_HOST = "127.0.0.1"
- DEFAULT_PORT = 514
- DEFAULT_SRC_IP = "172.16.22.253" # must be in allowed-ips list
- WAZUH_CONTAINER = "wazuh-single-wazuh.manager-1"
- # ── FortiGate syslog scenarios ────────────────────────────────────────────────
- def _ts():
- now = datetime.datetime.now(datetime.timezone.utc)
- return now.strftime("%Y-%m-%d"), now.strftime("%H:%M:%S")
- def make_fortigate_log(src_ip, **fields):
- """Build a FortiGate v6 syslog line (key=value pairs)."""
- date, time_ = _ts()
- base = {
- "date": date,
- "time": time_,
- "devname": "FG-TEST-FW",
- "devid": "FGT60E0000000001",
- "logid": "0000000013",
- "type": "traffic",
- "subtype": "forward",
- "level": "notice",
- "vd": "root",
- "srcip": src_ip,
- "srcport": "54321",
- "srcintf": "port1",
- "dstip": "10.0.0.1",
- "dstport": "80",
- "dstintf": "port2",
- "policyid": "1",
- "proto": "6",
- "action": "accept",
- "service": "HTTP",
- "duration": "1",
- "sentbyte": "1024",
- "rcvdbyte": "2048",
- }
- base.update(fields)
- parts = []
- for k, v in base.items():
- if " " in str(v) or "=" in str(v):
- parts.append(f'{k}="{v}"')
- else:
- parts.append(f"{k}={v}")
- return " ".join(parts)
- SCENARIOS = {
- # A2-01: RDP traffic allowed
- "rdp": {
- "description": "A2-01 — RDP (3389) traffic allowed",
- "log": lambda src: make_fortigate_log(src,
- logid="0000000013", type="traffic", subtype="forward",
- dstport="3389", action="accept", service="RDP",
- ),
- },
- # A2-02: Admin password change
- "password_change": {
- "description": "A2-02 — Admin password changed",
- "log": lambda src: make_fortigate_log(src,
- logid="0100032001", type="event", subtype="system",
- level="information", action="password-change",
- user="admin", msg="Change admin password",
- ),
- },
- # A2-03: New admin account created
- "create_admin": {
- "description": "A2-03 — New admin account created",
- "log": lambda src: make_fortigate_log(src,
- logid="0100032002", type="event", subtype="system",
- level="information", action="create-admin",
- user="newadmin", msg="Create admin account",
- ),
- },
- # A2-04: Alerting disabled via config change
- "disable_alert": {
- "description": "A2-04 — Alerting disabled (config-change)",
- "log": lambda src: make_fortigate_log(src,
- logid="0100032003", type="event", subtype="system",
- level="warning", action="config-change",
- config_value="disable", msg="Email alerting disabled",
- ),
- },
- # A2-05: Config file downloaded
- "download_config": {
- "description": "A2-05 — Firewall config downloaded",
- "log": lambda src: make_fortigate_log(src,
- logid="0100032004", type="event", subtype="system",
- level="warning", action="download-config",
- user="admin", msg="Configuration backup downloaded",
- ),
- },
- # A2-06: Multiple critical IPS signatures
- "ips_critical": {
- "description": "A2-06 — Multiple critical IPS signatures",
- "log": lambda src: make_fortigate_log(src,
- logid="0419016384", type="utm", subtype="ips",
- level="critical", action="dropped",
- attack="Multiple.Critical.Signatures", severity="critical",
- srcip="203.0.113.42", dstip="172.16.1.10",
- ),
- },
- # A2-07: TCP port scan
- "port_scan": {
- "description": "A2-07 — TCP port scan detected",
- "log": lambda src: make_fortigate_log(src,
- logid="0419016385", type="utm", subtype="anomaly",
- level="critical", action="dropped",
- attack="TCP.Port.Scan", severity="critical",
- srcip="203.0.113.99", dstip="172.16.0.0",
- ),
- },
- # A2-08: IPS IOC-based IP indicator
- "ioc_ip": {
- "description": "A2-08 — IPS IOC-based IP indicator",
- "log": lambda src: make_fortigate_log(src,
- logid="0419016386", type="utm", subtype="ips",
- level="critical", action="blocked",
- ioc_type="ip", ioc_value="198.51.100.42",
- srcip="198.51.100.42", dstip="172.16.1.5",
- ),
- },
- # Generic traffic allow
- "traffic_allow": {
- "description": "Generic — Traffic allowed (HTTP)",
- "log": lambda src: make_fortigate_log(src,
- dstport="80", action="accept", service="HTTP",
- ),
- },
- # Generic traffic deny
- "traffic_deny": {
- "description": "Generic — Traffic denied",
- "log": lambda src: make_fortigate_log(src,
- dstport="22", action="deny", service="SSH",
- ),
- },
- }
- def _build_syslog_packet(message, src_ip, facility=16, severity=6):
- pri = (facility * 8) + severity
- _, time_ = _ts()
- month_day = datetime.datetime.now(datetime.timezone.utc).strftime("%b %d").replace(" 0", " ")
- return f"<{pri}>{month_day} {time_} {src_ip} {message}"
- def send_syslog_udp(host, port, message, src_ip):
- """Send directly via UDP socket (source IP will appear as Docker bridge on local test)."""
- packet = _build_syslog_packet(message, src_ip)
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- try:
- sock.sendto(packet.encode("utf-8"), (host, port))
- return True
- except OSError:
- return False
- finally:
- sock.close()
- def send_syslog_via_docker(message, src_ip, port):
- """Send UDP from inside the Wazuh container using /dev/udp — source IP is container's own IP."""
- packet = _build_syslog_packet(message, src_ip)
- py_cmd = (
- f"import socket; s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM); "
- f"s.sendto({repr(packet.encode())}, ('127.0.0.1',{port})); s.close()"
- )
- result = subprocess.run(
- ["docker", "exec", WAZUH_CONTAINER, "python3", "-c", py_cmd],
- capture_output=True, timeout=5,
- )
- return result.returncode == 0
- def run_scenario(name, info, host, port, src_ip, via_docker):
- log = info["log"](src_ip)
- if via_docker:
- ok = send_syslog_via_docker(log, src_ip, port)
- else:
- ok = send_syslog_udp(host, port, log, src_ip)
- status = "✓" if ok else "✗"
- print(f" {status} {name:20s} {info['description']}")
- return ok
- def main():
- parser = argparse.ArgumentParser(description="Send FortiGate syslog test messages to Wazuh")
- parser.add_argument("--host", default=DEFAULT_HOST, help=f"Wazuh manager host (default: {DEFAULT_HOST})")
- parser.add_argument("--port", default=DEFAULT_PORT, type=int, help=f"Syslog UDP port (default: {DEFAULT_PORT})")
- parser.add_argument("--src-ip", default=DEFAULT_SRC_IP, help=f"Simulated firewall source IP (default: {DEFAULT_SRC_IP})")
- parser.add_argument("--scenario", default="all",
- choices=list(SCENARIOS.keys()) + ["all"],
- help="Scenario to send (default: all)")
- parser.add_argument("--delay", default=0.2, type=float, help="Delay between messages in seconds (default: 0.2)")
- parser.add_argument("--repeat", default=1, type=int, help="Number of times to repeat each scenario (default: 1)")
- parser.add_argument("--via-docker", action="store_true",
- help="Send from inside the Wazuh container (avoids Docker NAT source-IP rewrite)")
- args = parser.parse_args()
- mode = f"docker exec {WAZUH_CONTAINER}" if args.via_docker else f"{args.host}:{args.port}"
- print(f"Wazuh syslog test — mode: {mode} src-ip: {args.src_ip}")
- print(f"{'─' * 65}")
- to_run = list(SCENARIOS.items()) if args.scenario == "all" else [(args.scenario, SCENARIOS[args.scenario])]
- total = ok_count = 0
- for _ in range(args.repeat):
- for name, info in to_run:
- success = run_scenario(name, info, args.host, args.port, args.src_ip, args.via_docker)
- total += 1
- ok_count += int(success)
- if args.delay:
- time.sleep(args.delay)
- print(f"{'─' * 65}")
- print(f"Sent {ok_count}/{total} messages")
- print()
- print("Verify with:")
- print(f" docker exec {WAZUH_CONTAINER} tail -f /var/ossec/logs/archives/archives.log | grep {args.src_ip}")
- if __name__ == "__main__":
- main()
|