import asyncio import logging from datetime import datetime from pprint import pprint try: import websockets except ModuleNotFoundError: print("This example relies on the 'websockets' package.") print("Please install it by running: ") print() print(" $ pip install websockets") import sys sys.exit(1) from ocpp.routing import on, after from ocpp.v16 import ChargePoint as cp from ocpp.v16 import call_result from ocpp.v16.datatypes import IdTagInfo from ocpp.v16.enums import Action, RegistrationStatus, AuthorizationStatus from ocpp.messages import Call, MessageType, unpack, validate_payload logging.basicConfig(level=logging.INFO) class CustomCP(cp): def __init__(self, id, connection, response_timeout=30): super().__init__(id, connection, response_timeout) async def _handle_call(self, msg): """ Execute all hooks installed for based on the Action of the message. First the '_on_action' hook is executed and its response is returned to the client. If there is no '_on_action' hook for Action in the message a CallError with a NotImplementedError is returned. If the Action is not supported by the OCPP version a NotSupportedError is returned. Next the '_after_action' hook is executed. """ try: handlers = self.route_map[msg.action] except KeyError: _raise_key_error(msg.action, self._ocpp_version) return if not handlers.get("_skip_schema_validation", False): validate_payload(msg, self._ocpp_version) # OCPP uses camelCase for the keys in the payload. It's more pythonic # to use snake_case for keyword arguments. Therefore the keys must be # 'translated'. Some examples: # # * chargePointVendor becomes charge_point_vendor # * firmwareVersion becomes firmwareVersion if msg.payload is None: msg.payload = {} snake_case_payload = camel_to_snake_case(msg.payload) try: handler = handlers["_on_action"] except KeyError: _raise_key_error(msg.action, self._ocpp_version) try: response = handler(**snake_case_payload) if inspect.isawaitable(response): response = await response except Exception as e: LOGGER.exception("Error while handling request '%s'", msg) response = msg.create_call_error(e).to_json() await self._send(response) return temp_response_payload = asdict(response) # Remove nones ensures that we strip out optional arguments # which were not set and have a default value of None response_payload = remove_nones(temp_response_payload) # The response payload must be 'translated' from snake_case to # camelCase. So: # # * charge_point_vendor becomes chargePointVendor # * firmware_version becomes firmwareVersion camel_case_payload = snake_to_camel_case(response_payload) response = msg.create_call_result(camel_case_payload) if not handlers.get("_skip_schema_validation", False): validate_payload(response, self._ocpp_version) await self._send(response.to_json()) try: handler = handlers["_after_action"] # Create task to avoid blocking when making a call inside the # after handler response = handler(**snake_case_payload) if inspect.isawaitable(response): asyncio.ensure_future(response) except KeyError: # '_on_after' hooks are not required. Therefore ignore exception # when no '_on_after' hook is installed. pass return response class ChargePoint(cp): @on(Action.BootNotification) def on_boot_notification( self, charge_point_vendor: str, charge_point_model: str, **kwargs): pprint(kwargs) return call_result.BootNotificationPayload( current_time=datetime.utcnow().isoformat(), interval=10, status=RegistrationStatus.accepted, ) @on(Action.Authorize) def on_authorization(self, id_tag): # pprint(kwargs) idti = IdTagInfo(status=AuthorizationStatus.accepted) # idti.status = AuthorizationStatus.accepted return call_result.AuthorizePayload( id_tag_info=idti ) @on(Action.StatusNotification) def on_status_notification(self, id_tag): # pprint(kwargs) # idti = IdTagInfo(status=AuthorizationStatus.accepted) # idti.status = AuthorizationStatus.accepted return call_result.StatusNotificationPayload() @on(Action.Heartbeat, skip_schema_validation=True) def on_heartbeat(self, **kwargs): # receives empty payload from CP print("--HeartBeat--") print(kwargs) return call_result.HeartbeatPayload( current_time=datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") ) async def on_connect(websocket, path): """For every new charge point that connects, create a ChargePoint instance and start listening for messages. """ try: requested_protocols = websocket.request_headers["Sec-WebSocket-Protocol"] except KeyError: logging.error("Client hasn't requested any Subprotocol. Closing Connection") return await websocket.close() if websocket.subprotocol: logging.info("Protocols Matched: %s", websocket.subprotocol) else: # In the websockets lib if no subprotocols are supported by the # client and the server, it proceeds without a subprotocol, # so we have to manually close the connection. logging.warning( "Protocols Mismatched | Expected Subprotocols: %s," " but client supports %s | Closing connection", websocket.available_subprotocols, requested_protocols, ) return await websocket.close() charge_point_id = path.strip("/") cp = ChargePoint(charge_point_id, websocket) await cp.start() async def main(): server = await websockets.serve( on_connect, "0.0.0.0", 9000, subprotocols=["ocpp1.6"] ) logging.info("Server Started listening to new connections...") await server.wait_closed() if __name__ == "__main__": # asyncio.run() is used when running this example with Python >= 3.7v asyncio.run(main())