説明なし

test-firewall-syslog.py 9.3KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. #!/usr/bin/env python3
  2. """
  3. test-firewall-syslog.py — Send FortiGate-style syslog test messages to Wazuh manager port 514/UDP.
  4. Usage:
  5. python3 scripts/test-firewall-syslog.py [--host HOST] [--port PORT] [--src-ip IP] [--scenario SCENARIO]
  6. python3 scripts/test-firewall-syslog.py --via-docker # send from inside container (avoids NAT)
  7. Examples:
  8. python3 scripts/test-firewall-syslog.py
  9. python3 scripts/test-firewall-syslog.py --via-docker
  10. python3 scripts/test-firewall-syslog.py --host 192.168.1.10 --src-ip 172.16.22.253
  11. python3 scripts/test-firewall-syslog.py --scenario rdp
  12. python3 scripts/test-firewall-syslog.py --scenario all --delay 0.5
  13. """
  14. import argparse
  15. import socket
  16. import subprocess
  17. import time
  18. import datetime
  19. # ── Default target ────────────────────────────────────────────────────────────
  20. DEFAULT_HOST = "127.0.0.1"
  21. DEFAULT_PORT = 514
  22. DEFAULT_SRC_IP = "172.16.22.253" # must be in allowed-ips list
  23. WAZUH_CONTAINER = "wazuh-single-wazuh.manager-1"
  24. # ── FortiGate syslog scenarios ────────────────────────────────────────────────
  25. def _ts():
  26. now = datetime.datetime.now(datetime.timezone.utc)
  27. return now.strftime("%Y-%m-%d"), now.strftime("%H:%M:%S")
  28. def make_fortigate_log(src_ip, **fields):
  29. """Build a FortiGate v6 syslog line (key=value pairs)."""
  30. date, time_ = _ts()
  31. base = {
  32. "date": date,
  33. "time": time_,
  34. "devname": "FG-TEST-FW",
  35. "devid": "FGT60E0000000001",
  36. "logid": "0000000013",
  37. "type": "traffic",
  38. "subtype": "forward",
  39. "level": "notice",
  40. "vd": "root",
  41. "srcip": src_ip,
  42. "srcport": "54321",
  43. "srcintf": "port1",
  44. "dstip": "10.0.0.1",
  45. "dstport": "80",
  46. "dstintf": "port2",
  47. "policyid": "1",
  48. "proto": "6",
  49. "action": "accept",
  50. "service": "HTTP",
  51. "duration": "1",
  52. "sentbyte": "1024",
  53. "rcvdbyte": "2048",
  54. }
  55. base.update(fields)
  56. parts = []
  57. for k, v in base.items():
  58. if " " in str(v) or "=" in str(v):
  59. parts.append(f'{k}="{v}"')
  60. else:
  61. parts.append(f"{k}={v}")
  62. return " ".join(parts)
  63. SCENARIOS = {
  64. # A2-01: RDP traffic allowed
  65. "rdp": {
  66. "description": "A2-01 — RDP (3389) traffic allowed",
  67. "log": lambda src: make_fortigate_log(src,
  68. logid="0000000013", type="traffic", subtype="forward",
  69. dstport="3389", action="accept", service="RDP",
  70. ),
  71. },
  72. # A2-02: Admin password change
  73. "password_change": {
  74. "description": "A2-02 — Admin password changed",
  75. "log": lambda src: make_fortigate_log(src,
  76. logid="0100032001", type="event", subtype="system",
  77. level="information", action="password-change",
  78. user="admin", msg="Change admin password",
  79. ),
  80. },
  81. # A2-03: New admin account created
  82. "create_admin": {
  83. "description": "A2-03 — New admin account created",
  84. "log": lambda src: make_fortigate_log(src,
  85. logid="0100032002", type="event", subtype="system",
  86. level="information", action="create-admin",
  87. user="newadmin", msg="Create admin account",
  88. ),
  89. },
  90. # A2-04: Alerting disabled via config change
  91. "disable_alert": {
  92. "description": "A2-04 — Alerting disabled (config-change)",
  93. "log": lambda src: make_fortigate_log(src,
  94. logid="0100032003", type="event", subtype="system",
  95. level="warning", action="config-change",
  96. config_value="disable", msg="Email alerting disabled",
  97. ),
  98. },
  99. # A2-05: Config file downloaded
  100. "download_config": {
  101. "description": "A2-05 — Firewall config downloaded",
  102. "log": lambda src: make_fortigate_log(src,
  103. logid="0100032004", type="event", subtype="system",
  104. level="warning", action="download-config",
  105. user="admin", msg="Configuration backup downloaded",
  106. ),
  107. },
  108. # A2-06: Multiple critical IPS signatures
  109. "ips_critical": {
  110. "description": "A2-06 — Multiple critical IPS signatures",
  111. "log": lambda src: make_fortigate_log(src,
  112. logid="0419016384", type="utm", subtype="ips",
  113. level="critical", action="dropped",
  114. attack="Multiple.Critical.Signatures", severity="critical",
  115. srcip="203.0.113.42", dstip="172.16.1.10",
  116. ),
  117. },
  118. # A2-07: TCP port scan
  119. "port_scan": {
  120. "description": "A2-07 — TCP port scan detected",
  121. "log": lambda src: make_fortigate_log(src,
  122. logid="0419016385", type="utm", subtype="anomaly",
  123. level="critical", action="dropped",
  124. attack="TCP.Port.Scan", severity="critical",
  125. srcip="203.0.113.99", dstip="172.16.0.0",
  126. ),
  127. },
  128. # A2-08: IPS IOC-based IP indicator
  129. "ioc_ip": {
  130. "description": "A2-08 — IPS IOC-based IP indicator",
  131. "log": lambda src: make_fortigate_log(src,
  132. logid="0419016386", type="utm", subtype="ips",
  133. level="critical", action="blocked",
  134. ioc_type="ip", ioc_value="198.51.100.42",
  135. srcip="198.51.100.42", dstip="172.16.1.5",
  136. ),
  137. },
  138. # Generic traffic allow
  139. "traffic_allow": {
  140. "description": "Generic — Traffic allowed (HTTP)",
  141. "log": lambda src: make_fortigate_log(src,
  142. dstport="80", action="accept", service="HTTP",
  143. ),
  144. },
  145. # Generic traffic deny
  146. "traffic_deny": {
  147. "description": "Generic — Traffic denied",
  148. "log": lambda src: make_fortigate_log(src,
  149. dstport="22", action="deny", service="SSH",
  150. ),
  151. },
  152. }
  153. def _build_syslog_packet(message, src_ip, facility=16, severity=6):
  154. pri = (facility * 8) + severity
  155. _, time_ = _ts()
  156. month_day = datetime.datetime.now(datetime.timezone.utc).strftime("%b %d").replace(" 0", " ")
  157. return f"<{pri}>{month_day} {time_} {src_ip} {message}"
  158. def send_syslog_udp(host, port, message, src_ip):
  159. """Send directly via UDP socket (source IP will appear as Docker bridge on local test)."""
  160. packet = _build_syslog_packet(message, src_ip)
  161. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  162. try:
  163. sock.sendto(packet.encode("utf-8"), (host, port))
  164. return True
  165. except OSError:
  166. return False
  167. finally:
  168. sock.close()
  169. def send_syslog_via_docker(message, src_ip, port):
  170. """Send UDP from inside the Wazuh container using /dev/udp — source IP is container's own IP."""
  171. packet = _build_syslog_packet(message, src_ip)
  172. py_cmd = (
  173. f"import socket; s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM); "
  174. f"s.sendto({repr(packet.encode())}, ('127.0.0.1',{port})); s.close()"
  175. )
  176. result = subprocess.run(
  177. ["docker", "exec", WAZUH_CONTAINER, "python3", "-c", py_cmd],
  178. capture_output=True, timeout=5,
  179. )
  180. return result.returncode == 0
  181. def run_scenario(name, info, host, port, src_ip, via_docker):
  182. log = info["log"](src_ip)
  183. if via_docker:
  184. ok = send_syslog_via_docker(log, src_ip, port)
  185. else:
  186. ok = send_syslog_udp(host, port, log, src_ip)
  187. status = "✓" if ok else "✗"
  188. print(f" {status} {name:20s} {info['description']}")
  189. return ok
  190. def main():
  191. parser = argparse.ArgumentParser(description="Send FortiGate syslog test messages to Wazuh")
  192. parser.add_argument("--host", default=DEFAULT_HOST, help=f"Wazuh manager host (default: {DEFAULT_HOST})")
  193. parser.add_argument("--port", default=DEFAULT_PORT, type=int, help=f"Syslog UDP port (default: {DEFAULT_PORT})")
  194. parser.add_argument("--src-ip", default=DEFAULT_SRC_IP, help=f"Simulated firewall source IP (default: {DEFAULT_SRC_IP})")
  195. parser.add_argument("--scenario", default="all",
  196. choices=list(SCENARIOS.keys()) + ["all"],
  197. help="Scenario to send (default: all)")
  198. parser.add_argument("--delay", default=0.2, type=float, help="Delay between messages in seconds (default: 0.2)")
  199. parser.add_argument("--repeat", default=1, type=int, help="Number of times to repeat each scenario (default: 1)")
  200. parser.add_argument("--via-docker", action="store_true",
  201. help="Send from inside the Wazuh container (avoids Docker NAT source-IP rewrite)")
  202. args = parser.parse_args()
  203. mode = f"docker exec {WAZUH_CONTAINER}" if args.via_docker else f"{args.host}:{args.port}"
  204. print(f"Wazuh syslog test — mode: {mode} src-ip: {args.src_ip}")
  205. print(f"{'─' * 65}")
  206. to_run = list(SCENARIOS.items()) if args.scenario == "all" else [(args.scenario, SCENARIOS[args.scenario])]
  207. total = ok_count = 0
  208. for _ in range(args.repeat):
  209. for name, info in to_run:
  210. success = run_scenario(name, info, args.host, args.port, args.src_ip, args.via_docker)
  211. total += 1
  212. ok_count += int(success)
  213. if args.delay:
  214. time.sleep(args.delay)
  215. print(f"{'─' * 65}")
  216. print(f"Sent {ok_count}/{total} messages")
  217. print()
  218. print("Verify with:")
  219. print(f" docker exec {WAZUH_CONTAINER} tail -f /var/ossec/logs/archives/archives.log | grep {args.src_ip}")
  220. if __name__ == "__main__":
  221. main()