| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- import argparse
- import asyncio
- import logging
- from datetime import datetime, timezone
- from functools import partial
- from random import randint
- from typing import Optional
- import ocpp.v201
- import websockets
- from ocpp.routing import on
- from ocpp.v201.enums import (
- Action,
- AuthorizationStatusEnumType,
- ClearCacheStatusEnumType,
- GenericDeviceModelStatusEnumType,
- RegistrationStatusEnumType,
- ReportBaseEnumType,
- TransactionEventEnumType,
- )
- from websockets import ConnectionClosed
- from timer import Timer
- # Setting up the logging configuration to display debug level messages.
- logging.basicConfig(level=logging.DEBUG)
- ChargePoints = set()
- class ChargePoint(ocpp.v201.ChargePoint):
- _command_timer: Optional[Timer]
- def __init__(self, connection):
- super().__init__(connection.path.strip("/"), connection)
- self._command_timer = None
- # Message handlers to receive OCPP messages.
- @on(Action.boot_notification)
- async def on_boot_notification(self, charging_station, reason, **kwargs):
- logging.info("Received %s", Action.boot_notification)
- # Create and return a BootNotification response with the current time,
- # an interval of 60 seconds, and an accepted status.
- return ocpp.v201.call_result.BootNotification(
- current_time=datetime.now(timezone.utc).isoformat(),
- interval=60,
- status=RegistrationStatusEnumType.accepted,
- )
- @on(Action.heartbeat)
- async def on_heartbeat(self, **kwargs):
- logging.info("Received %s", Action.heartbeat)
- return ocpp.v201.call_result.Heartbeat(
- current_time=datetime.now(timezone.utc).isoformat()
- )
- @on(Action.status_notification)
- async def on_status_notification(
- self, timestamp, evse_id: int, connector_id: int, connector_status, **kwargs
- ):
- logging.info("Received %s", Action.status_notification)
- return ocpp.v201.call_result.StatusNotification()
- @on(Action.authorize)
- async def on_authorize(self, id_token, **kwargs):
- logging.info("Received %s", Action.authorize)
- return ocpp.v201.call_result.Authorize(
- id_token_info={"status": AuthorizationStatusEnumType.accepted}
- )
- @on(Action.transaction_event)
- async def on_transaction_event(
- self,
- event_type: TransactionEventEnumType,
- timestamp,
- trigger_reason,
- seq_no: int,
- transaction_info,
- **kwargs,
- ):
- match event_type:
- case TransactionEventEnumType.started:
- logging.info("Received %s Started", Action.transaction_event)
- return ocpp.v201.call_result.TransactionEvent(
- id_token_info={"status": AuthorizationStatusEnumType.accepted}
- )
- case TransactionEventEnumType.updated:
- logging.info("Received %s Updated", Action.transaction_event)
- return ocpp.v201.call_result.TransactionEvent(total_cost=10)
- case TransactionEventEnumType.ended:
- logging.info("Received %s Ended", Action.transaction_event)
- return ocpp.v201.call_result.TransactionEvent()
- @on(Action.meter_values)
- async def on_meter_values(self, evse_id: int, meter_value, **kwargs):
- logging.info("Received %s", Action.meter_values)
- return ocpp.v201.call_result.MeterValues()
- # Request handlers to emit OCPP messages.
- async def _send_clear_cache(self):
- request = ocpp.v201.call.ClearCache()
- response = await self.call(request)
- if response.status == ClearCacheStatusEnumType.accepted:
- logging.info("%s successful", Action.clear_cache)
- else:
- logging.info("%s failed", Action.clear_cache)
- async def _send_get_base_report(self):
- request = ocpp.v201.call.GetBaseReport(
- request_id=randint(1, 100), # noqa: S311
- report_base=ReportBaseEnumType.full_inventory,
- )
- response = await self.call(request)
- if response.status == GenericDeviceModelStatusEnumType.accepted:
- logging.info("%s successful", Action.get_base_report)
- else:
- logging.info("%s failed", Action.get_base_report)
- async def _send_command(self, command_name: Action):
- logging.debug("Sending OCPP command %s", command_name)
- match command_name:
- case Action.clear_cache:
- await self._send_clear_cache()
- case Action.get_base_report:
- await self._send_get_base_report()
- case _:
- logging.info(f"Not supported command {command_name}")
- async def send_command(
- self, command_name: Action, delay: Optional[float], period: Optional[float]
- ):
- try:
- if delay and not self._command_timer:
- self._command_timer = Timer(
- delay,
- False,
- self._send_command,
- [command_name],
- )
- if period and not self._command_timer:
- self._command_timer = Timer(
- period,
- True,
- self._send_command,
- [command_name],
- )
- except ConnectionClosed:
- self.handle_connection_closed()
- def handle_connection_closed(self):
- logging.info("ChargePoint %s closed connection", self.id)
- if self._command_timer:
- self._command_timer.cancel()
- ChargePoints.remove(self)
- logging.debug("Connected ChargePoint(s): %d", len(ChargePoints))
- # Function to handle new WebSocket connections.
- async def on_connect(
- websocket,
- command_name: Optional[Action],
- delay: Optional[float],
- period: Optional[float],
- ):
- """For every new charge point that connects, create a ChargePoint instance and start
- listening for messages.
- """
- try:
- requested_protocols = websocket.request_headers["Sec-WebSocket-Protocol"]
- except KeyError:
- logging.info("Client hasn't requested any Subprotocol. Closing Connection")
- return await websocket.close()
- if websocket.subprotocol:
- logging.info("Protocols Matched: %s", websocket.subprotocol)
- else:
- logging.warning(
- "Protocols Mismatched | Expected Subprotocols: %s,"
- " but client supports %s | Closing connection",
- websocket.available_subprotocols,
- requested_protocols,
- )
- return await websocket.close()
- cp = ChargePoint(websocket)
- if command_name:
- await cp.send_command(command_name, delay, period)
- ChargePoints.add(cp)
- try:
- await cp.start()
- except ConnectionClosed:
- cp.handle_connection_closed()
- def check_positive_number(value: Optional[float]):
- try:
- value = float(value)
- except ValueError:
- raise argparse.ArgumentTypeError("must be a number") from None
- if value <= 0:
- raise argparse.ArgumentTypeError("must be a positive number")
- return value
- # Main function to start the WebSocket server.
- async def main():
- parser = argparse.ArgumentParser(description="OCPP2 Server")
- parser.add_argument("-c", "--command", type=Action, help="command name")
- group = parser.add_mutually_exclusive_group()
- group.add_argument(
- "-d",
- "--delay",
- type=check_positive_number,
- help="delay in seconds",
- )
- group.add_argument(
- "-p",
- "--period",
- type=check_positive_number,
- help="period in seconds",
- )
- group.required = parser.parse_known_args()[0].command is not None
- args = parser.parse_args()
- # Create the WebSocket server and specify the handler for new connections.
- server = await websockets.serve(
- partial(
- on_connect, command_name=args.command, delay=args.delay, period=args.period
- ),
- "127.0.0.1", # Listen on loopback.
- 9000, # Port number.
- subprotocols=["ocpp2.0", "ocpp2.0.1"], # Specify OCPP 2.0.1 subprotocols.
- )
- logging.info("WebSocket Server Started")
- # Wait for the server to close (runs indefinitely).
- await server.wait_closed()
- # Entry point of the script.
- if __name__ == "__main__":
- # Run the main function to start the server.
- asyncio.run(main())
|