| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- import asyncio
- import logging
- from datetime import datetime, timezone
- from ocpp.v16.enums import Action, RegistrationStatus, AuthorizationStatus
- from ocpp.v16.datatypes import IdTagInfo
- from ocpp.v16 import call_result
- try:
- import websockets
- except ModuleNotFoundError:
- print("This example relies on the 'websockets' package.")
- print("Please install it by running: ")
- print()
- print(" $ pip install websockets")
- import sys
- sys.exit(1)
- from ocpp.routing import on
- from ocpp.v16 import ChargePoint as cp
- from ocpp.v16 import call_result
- from ocpp.v16.enums import Action, RegistrationStatus
- logging.basicConfig(level=logging.INFO)
- class ChargePoint(cp):
- @on(Action.boot_notification)
- def on_boot_notification(
- self, charge_point_vendor: str, charge_point_model: str, **kwargs
- ):
- return call_result.BootNotification(
- current_time=datetime.now(timezone.utc).isoformat(),
- interval=10,
- status=RegistrationStatus.accepted,
- )
- @on(Action.start_transaction)
- def on_start_transaction(
- self,
- connector_id: int,
- id_tag: str,
- meter_start: int,
- timestamp: str,
- **kwargs
- ):
- print(f"🔌 StartTransaction received from CP with ID tag: {id_tag}")
- return call_result.StartTransaction(
- transaction_id=1234, # You can generate a real one if needed
- id_tag_info={
- "status": AuthorizationStatus.accepted
- }
- )
- async def on_connect(websocket):
- """For every new charge point that connects, create a ChargePoint
- instance and start listening for messages.
- """
- try:
- requested_protocols = websocket.request.headers["Sec-WebSocket-Protocol"]
- except KeyError:
- logging.error("Client hasn't requested any Subprotocol. Closing Connection")
- return await websocket.close()
- if websocket.subprotocol:
- logging.info("Protocols Matched: %s", websocket.subprotocol)
- else:
- # In the websockets lib if no subprotocols are supported by the
- # client and the server, it proceeds without a subprotocol,
- # so we have to manually close the connection.
- logging.warning(
- "Protocols Mismatched | Expected Subprotocols: %s,"
- " but client supports %s | Closing connection",
- websocket.available_subprotocols,
- requested_protocols,
- )
- return await websocket.close()
- charge_point_id = websocket.request.path.strip("/")
- cp = ChargePoint(charge_point_id, websocket)
- await cp.start()
- async def main():
- server = await websockets.serve(
- on_connect, "0.0.0.0", 9000, subprotocols=["ocpp1.6"]
- )
- logging.info("Server Started listening to new connections...")
- await server.wait_closed()
- if __name__ == "__main__":
- # asyncio.run() is used when running this example with Python >= 3.7v
- asyncio.run(main())
|