Aucune description

authentication.py 2.7KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. #!/usr/bin/env python
  2. import asyncio
  3. import json
  4. #import aioredis
  5. from redis import asyncio as aioredis
  6. import django
  7. import websockets
  8. from pprint import pprint
  9. django.setup()
  10. from django.contrib.contenttypes.models import ContentType
  11. from sesame.utils import get_user
  12. CONNECTIONS = {}
  13. def get_content_types(user):
  14. """Return the set of IDs of content types visible by user."""
  15. # This does only three database queries because Django caches
  16. # all permissions on the first call to user.has_perm(...).
  17. return {
  18. ct.id
  19. for ct in ContentType.objects.all()
  20. if user.has_perm(f"{ct.app_label}.view_{ct.model}")
  21. or user.has_perm(f"{ct.app_label}.change_{ct.model}")
  22. }
  23. async def handler(websocket):
  24. """Authenticate user and register connection in CONNECTIONS."""
  25. sesame = await websocket.recv()
  26. user = await asyncio.to_thread(get_user, sesame)
  27. if user is None:
  28. await websocket.close(1011, "authentication failed")
  29. return
  30. ct_ids = await asyncio.to_thread(get_content_types, user)
  31. CONNECTIONS[websocket] = {"content_type_ids": ct_ids}
  32. pprint("Connections")
  33. pprint(CONNECTIONS)
  34. try:
  35. await websocket.wait_closed()
  36. finally:
  37. del CONNECTIONS[websocket]
  38. async def process_events():
  39. """Listen to events in Redis and process them."""
  40. redis = aioredis.from_url("redis://redis:6379/1")
  41. pubsub = redis.pubsub()
  42. await pubsub.subscribe("events", "flash_sale")
  43. async for message in pubsub.listen():
  44. pprint("process events")
  45. pprint(message)
  46. if message["type"] != "message":
  47. continue
  48. payload = message["data"].decode()
  49. # Broadcast event to all users who have permissions to see it.
  50. event = json.loads(payload)
  51. #pprint("current channel")
  52. #pprint(str(message['channel']))
  53. ch = message["channel"].decode()
  54. if ch == "events":
  55. pprint("event send ..")
  56. recipients = (
  57. websocket
  58. for websocket, connection in CONNECTIONS.items()
  59. if event["content_type_id"] in connection["content_type_ids"]
  60. )
  61. websockets.broadcast(recipients, payload)
  62. if ch == "flash_sale":
  63. pprint("flash sale send >> ..")
  64. recipients = (
  65. websocket
  66. for websocket, connection in CONNECTIONS.items()
  67. )
  68. pprint("test msg >> ..")
  69. websockets.broadcast(recipients, payload)
  70. async def main():
  71. async with websockets.serve(handler, "0.0.0.0", 8888):
  72. await process_events() # runs forever
  73. #process_flashsale()
  74. if __name__ == "__main__":
  75. asyncio.run(main(), debug=True)