Нема описа

charge_point.py 1.8KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. import asyncio
  2. import logging
  3. from datetime import datetime, timezone
  4. try:
  5. import websockets
  6. except ModuleNotFoundError:
  7. print("This example relies on the 'websockets' package.")
  8. print("Please install it by running: ")
  9. print()
  10. print(" $ pip install websockets")
  11. import sys
  12. sys.exit(1)
  13. from ocpp.v16 import ChargePoint as cp
  14. from ocpp.v16 import call
  15. from ocpp.v16.enums import RegistrationStatus, ChargingProfilePurposeType, ChargingRateUnitType
  16. logging.basicConfig(level=logging.INFO)
  17. class ChargePoint(cp):
  18. async def send_boot_notification(self):
  19. request = call.BootNotification(
  20. charge_point_model="Optimus", charge_point_vendor="The Mobility House"
  21. )
  22. response = await self.call(request)
  23. if response.status == RegistrationStatus.accepted:
  24. print("Connected to central system.")
  25. async def send_start_transaction(self):
  26. request = call.StartTransaction(
  27. connector_id=1,
  28. id_tag="ABC123456", # Example RFID/tag ID
  29. meter_start=0,
  30. timestamp=datetime.now(timezone.utc).isoformat()
  31. )
  32. response = await self.call(request)
  33. if response.id_tag_info['status'] == 'Accepted':
  34. print(f"Charging session started! Transaction ID: {response.transaction_id}")
  35. else:
  36. print(f"StartTransaction rejected: {response.id_tag_info['status']}")
  37. async def main():
  38. async with websockets.connect(
  39. "ws://localhost:9000/CP_1", subprotocols=["ocpp1.6"]
  40. ) as ws:
  41. cp = ChargePoint("CP_1", ws)
  42. await asyncio.gather(
  43. cp.start(),
  44. cp.send_boot_notification(),
  45. cp.send_start_transaction(), # Add this line
  46. )
  47. if __name__ == "__main__":
  48. # asyncio.run() is used when running this example with Python >= 3.7v
  49. asyncio.run(main())