import asyncio import logging from datetime import datetime, timezone from ocpp.v16.enums import Action, RegistrationStatus, AuthorizationStatus from ocpp.v16.datatypes import IdTagInfo from ocpp.v16 import call_result try: import websockets except ModuleNotFoundError: print("This example relies on the 'websockets' package.") print("Please install it by running: ") print() print(" $ pip install websockets") import sys sys.exit(1) from ocpp.routing import on from ocpp.v16 import ChargePoint as cp from ocpp.v16 import call_result from ocpp.v16.enums import Action, RegistrationStatus logging.basicConfig(level=logging.INFO) class ChargePoint(cp): @on(Action.boot_notification) def on_boot_notification( self, charge_point_vendor: str, charge_point_model: str, **kwargs ): return call_result.BootNotification( current_time=datetime.now(timezone.utc).isoformat(), interval=10, status=RegistrationStatus.accepted, ) @on(Action.start_transaction) def on_start_transaction( self, connector_id: int, id_tag: str, meter_start: int, timestamp: str, **kwargs ): print(f"🔌 StartTransaction received from CP with ID tag: {id_tag}") return call_result.StartTransaction( transaction_id=1234, # You can generate a real one if needed id_tag_info={ "status": AuthorizationStatus.accepted } ) async def on_connect(websocket): """For every new charge point that connects, create a ChargePoint instance and start listening for messages. """ try: requested_protocols = websocket.request.headers["Sec-WebSocket-Protocol"] except KeyError: logging.error("Client hasn't requested any Subprotocol. Closing Connection") return await websocket.close() if websocket.subprotocol: logging.info("Protocols Matched: %s", websocket.subprotocol) else: # In the websockets lib if no subprotocols are supported by the # client and the server, it proceeds without a subprotocol, # so we have to manually close the connection. logging.warning( "Protocols Mismatched | Expected Subprotocols: %s," " but client supports %s | Closing connection", websocket.available_subprotocols, requested_protocols, ) return await websocket.close() charge_point_id = websocket.request.path.strip("/") cp = ChargePoint(charge_point_id, websocket) await cp.start() async def main(): server = await websockets.serve( on_connect, "0.0.0.0", 9000, subprotocols=["ocpp1.6"] ) logging.info("Server Started listening to new connections...") await server.wait_closed() if __name__ == "__main__": # asyncio.run() is used when running this example with Python >= 3.7v asyncio.run(main())