| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- #!/usr/bin/env python
- import asyncio
- import json
- #import aioredis
- from redis import asyncio as aioredis
- import django
- import websockets
- from pprint import pprint
- django.setup()
- from django.contrib.contenttypes.models import ContentType
- from sesame.utils import get_user
- CONNECTIONS = {}
- def get_content_types(user):
- """Return the set of IDs of content types visible by user."""
- # This does only three database queries because Django caches
- # all permissions on the first call to user.has_perm(...).
- return {
- ct.id
- for ct in ContentType.objects.all()
- if user.has_perm(f"{ct.app_label}.view_{ct.model}")
- or user.has_perm(f"{ct.app_label}.change_{ct.model}")
- }
- async def handler(websocket):
- """Authenticate user and register connection in CONNECTIONS."""
- sesame = await websocket.recv()
- user = await asyncio.to_thread(get_user, sesame)
- if user is None:
- await websocket.close(1011, "authentication failed")
- return
- ct_ids = await asyncio.to_thread(get_content_types, user)
- CONNECTIONS[websocket] = {"content_type_ids": ct_ids}
- pprint("Connections")
- pprint(CONNECTIONS)
- try:
- await websocket.wait_closed()
- finally:
- del CONNECTIONS[websocket]
- async def process_events():
- """Listen to events in Redis and process them."""
- redis = aioredis.from_url("redis://redis:6379/1")
- pubsub = redis.pubsub()
- await pubsub.subscribe("events", "flash_sale")
- async for message in pubsub.listen():
- pprint("process events")
- pprint(message)
- if message["type"] != "message":
- continue
- payload = message["data"].decode()
- # Broadcast event to all users who have permissions to see it.
- event = json.loads(payload)
- #pprint("current channel")
- #pprint(str(message['channel']))
- ch = message["channel"].decode()
- if ch == "events":
- pprint("event send ..")
- recipients = (
- websocket
- for websocket, connection in CONNECTIONS.items()
- if event["content_type_id"] in connection["content_type_ids"]
- )
- websockets.broadcast(recipients, payload)
- if ch == "flash_sale":
- pprint("flash sale send >> ..")
- recipients = (
- websocket
- for websocket, connection in CONNECTIONS.items()
- )
- pprint("test msg >> ..")
- websockets.broadcast(recipients, payload)
- async def main():
- async with websockets.serve(handler, "0.0.0.0", 8888):
- await process_events() # runs forever
- #process_flashsale()
- if __name__ == "__main__":
- asyncio.run(main(), debug=True)
|