Ei kuvausta

server.py 8.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. import argparse
  2. import asyncio
  3. import logging
  4. from datetime import datetime, timezone
  5. from functools import partial
  6. from random import randint
  7. from typing import Optional
  8. import ocpp.v201
  9. import websockets
  10. from ocpp.routing import on
  11. from ocpp.v201.enums import (
  12. Action,
  13. AuthorizationStatusEnumType,
  14. ClearCacheStatusEnumType,
  15. GenericDeviceModelStatusEnumType,
  16. RegistrationStatusEnumType,
  17. ReportBaseEnumType,
  18. TransactionEventEnumType,
  19. )
  20. from websockets import ConnectionClosed
  21. from timer import Timer
  22. # Setting up the logging configuration to display debug level messages.
  23. logging.basicConfig(level=logging.DEBUG)
  24. ChargePoints = set()
  25. class ChargePoint(ocpp.v201.ChargePoint):
  26. _command_timer: Optional[Timer]
  27. def __init__(self, connection):
  28. super().__init__(connection.path.strip("/"), connection)
  29. self._command_timer = None
  30. # Message handlers to receive OCPP messages.
  31. @on(Action.boot_notification)
  32. async def on_boot_notification(self, charging_station, reason, **kwargs):
  33. logging.info("Received %s", Action.boot_notification)
  34. # Create and return a BootNotification response with the current time,
  35. # an interval of 60 seconds, and an accepted status.
  36. return ocpp.v201.call_result.BootNotification(
  37. current_time=datetime.now(timezone.utc).isoformat(),
  38. interval=60,
  39. status=RegistrationStatusEnumType.accepted,
  40. )
  41. @on(Action.heartbeat)
  42. async def on_heartbeat(self, **kwargs):
  43. logging.info("Received %s", Action.heartbeat)
  44. return ocpp.v201.call_result.Heartbeat(
  45. current_time=datetime.now(timezone.utc).isoformat()
  46. )
  47. @on(Action.status_notification)
  48. async def on_status_notification(
  49. self, timestamp, evse_id: int, connector_id: int, connector_status, **kwargs
  50. ):
  51. logging.info("Received %s", Action.status_notification)
  52. return ocpp.v201.call_result.StatusNotification()
  53. @on(Action.authorize)
  54. async def on_authorize(self, id_token, **kwargs):
  55. logging.info("Received %s", Action.authorize)
  56. return ocpp.v201.call_result.Authorize(
  57. id_token_info={"status": AuthorizationStatusEnumType.accepted}
  58. )
  59. @on(Action.transaction_event)
  60. async def on_transaction_event(
  61. self,
  62. event_type: TransactionEventEnumType,
  63. timestamp,
  64. trigger_reason,
  65. seq_no: int,
  66. transaction_info,
  67. **kwargs,
  68. ):
  69. match event_type:
  70. case TransactionEventEnumType.started:
  71. logging.info("Received %s Started", Action.transaction_event)
  72. return ocpp.v201.call_result.TransactionEvent(
  73. id_token_info={"status": AuthorizationStatusEnumType.accepted}
  74. )
  75. case TransactionEventEnumType.updated:
  76. logging.info("Received %s Updated", Action.transaction_event)
  77. return ocpp.v201.call_result.TransactionEvent(total_cost=10)
  78. case TransactionEventEnumType.ended:
  79. logging.info("Received %s Ended", Action.transaction_event)
  80. return ocpp.v201.call_result.TransactionEvent()
  81. @on(Action.meter_values)
  82. async def on_meter_values(self, evse_id: int, meter_value, **kwargs):
  83. logging.info("Received %s", Action.meter_values)
  84. return ocpp.v201.call_result.MeterValues()
  85. # Request handlers to emit OCPP messages.
  86. async def _send_clear_cache(self):
  87. request = ocpp.v201.call.ClearCache()
  88. response = await self.call(request)
  89. if response.status == ClearCacheStatusEnumType.accepted:
  90. logging.info("%s successful", Action.clear_cache)
  91. else:
  92. logging.info("%s failed", Action.clear_cache)
  93. async def _send_get_base_report(self):
  94. request = ocpp.v201.call.GetBaseReport(
  95. request_id=randint(1, 100), # noqa: S311
  96. report_base=ReportBaseEnumType.full_inventory,
  97. )
  98. response = await self.call(request)
  99. if response.status == GenericDeviceModelStatusEnumType.accepted:
  100. logging.info("%s successful", Action.get_base_report)
  101. else:
  102. logging.info("%s failed", Action.get_base_report)
  103. async def _send_command(self, command_name: Action):
  104. logging.debug("Sending OCPP command %s", command_name)
  105. match command_name:
  106. case Action.clear_cache:
  107. await self._send_clear_cache()
  108. case Action.get_base_report:
  109. await self._send_get_base_report()
  110. case _:
  111. logging.info(f"Not supported command {command_name}")
  112. async def send_command(
  113. self, command_name: Action, delay: Optional[float], period: Optional[float]
  114. ):
  115. try:
  116. if delay and not self._command_timer:
  117. self._command_timer = Timer(
  118. delay,
  119. False,
  120. self._send_command,
  121. [command_name],
  122. )
  123. if period and not self._command_timer:
  124. self._command_timer = Timer(
  125. period,
  126. True,
  127. self._send_command,
  128. [command_name],
  129. )
  130. except ConnectionClosed:
  131. self.handle_connection_closed()
  132. def handle_connection_closed(self):
  133. logging.info("ChargePoint %s closed connection", self.id)
  134. if self._command_timer:
  135. self._command_timer.cancel()
  136. ChargePoints.remove(self)
  137. logging.debug("Connected ChargePoint(s): %d", len(ChargePoints))
  138. # Function to handle new WebSocket connections.
  139. async def on_connect(
  140. websocket,
  141. command_name: Optional[Action],
  142. delay: Optional[float],
  143. period: Optional[float],
  144. ):
  145. """For every new charge point that connects, create a ChargePoint instance and start
  146. listening for messages.
  147. """
  148. try:
  149. requested_protocols = websocket.request_headers["Sec-WebSocket-Protocol"]
  150. except KeyError:
  151. logging.info("Client hasn't requested any Subprotocol. Closing Connection")
  152. return await websocket.close()
  153. if websocket.subprotocol:
  154. logging.info("Protocols Matched: %s", websocket.subprotocol)
  155. else:
  156. logging.warning(
  157. "Protocols Mismatched | Expected Subprotocols: %s,"
  158. " but client supports %s | Closing connection",
  159. websocket.available_subprotocols,
  160. requested_protocols,
  161. )
  162. return await websocket.close()
  163. cp = ChargePoint(websocket)
  164. if command_name:
  165. await cp.send_command(command_name, delay, period)
  166. ChargePoints.add(cp)
  167. try:
  168. await cp.start()
  169. except ConnectionClosed:
  170. cp.handle_connection_closed()
  171. def check_positive_number(value: Optional[float]):
  172. try:
  173. value = float(value)
  174. except ValueError:
  175. raise argparse.ArgumentTypeError("must be a number") from None
  176. if value <= 0:
  177. raise argparse.ArgumentTypeError("must be a positive number")
  178. return value
  179. # Main function to start the WebSocket server.
  180. async def main():
  181. parser = argparse.ArgumentParser(description="OCPP2 Server")
  182. parser.add_argument("-c", "--command", type=Action, help="command name")
  183. group = parser.add_mutually_exclusive_group()
  184. group.add_argument(
  185. "-d",
  186. "--delay",
  187. type=check_positive_number,
  188. help="delay in seconds",
  189. )
  190. group.add_argument(
  191. "-p",
  192. "--period",
  193. type=check_positive_number,
  194. help="period in seconds",
  195. )
  196. group.required = parser.parse_known_args()[0].command is not None
  197. args = parser.parse_args()
  198. # Create the WebSocket server and specify the handler for new connections.
  199. server = await websockets.serve(
  200. partial(
  201. on_connect, command_name=args.command, delay=args.delay, period=args.period
  202. ),
  203. "127.0.0.1", # Listen on loopback.
  204. 9000, # Port number.
  205. subprotocols=["ocpp2.0", "ocpp2.0.1"], # Specify OCPP 2.0.1 subprotocols.
  206. )
  207. logging.info("WebSocket Server Started")
  208. # Wait for the server to close (runs indefinitely).
  209. await server.wait_closed()
  210. # Entry point of the script.
  211. if __name__ == "__main__":
  212. # Run the main function to start the server.
  213. asyncio.run(main())