#!/usr/bin/env python import asyncio import json #import aioredis from redis import asyncio as aioredis import django import websockets from pprint import pprint django.setup() from django.contrib.contenttypes.models import ContentType from sesame.utils import get_user CONNECTIONS = {} def get_content_types(user): """Return the set of IDs of content types visible by user.""" # This does only three database queries because Django caches # all permissions on the first call to user.has_perm(...). return { ct.id for ct in ContentType.objects.all() if user.has_perm(f"{ct.app_label}.view_{ct.model}") or user.has_perm(f"{ct.app_label}.change_{ct.model}") } async def handler(websocket): """Authenticate user and register connection in CONNECTIONS.""" sesame = await websocket.recv() user = await asyncio.to_thread(get_user, sesame) if user is None: await websocket.close(1011, "authentication failed") return ct_ids = await asyncio.to_thread(get_content_types, user) CONNECTIONS[websocket] = {"content_type_ids": ct_ids} pprint("Connections") pprint(CONNECTIONS) try: await websocket.wait_closed() finally: del CONNECTIONS[websocket] async def process_events(): """Listen to events in Redis and process them.""" redis = aioredis.from_url("redis://redis:6379/1") pubsub = redis.pubsub() await pubsub.subscribe("events", "flash_sale") async for message in pubsub.listen(): pprint("process events") pprint(message) if message["type"] != "message": continue payload = message["data"].decode() # Broadcast event to all users who have permissions to see it. event = json.loads(payload) #pprint("current channel") #pprint(str(message['channel'])) ch = message["channel"].decode() if ch == "events": pprint("event send ..") recipients = ( websocket for websocket, connection in CONNECTIONS.items() if event["content_type_id"] in connection["content_type_ids"] ) websockets.broadcast(recipients, payload) if ch == "flash_sale": pprint("flash sale send >> ..") recipients = ( websocket for websocket, connection in CONNECTIONS.items() ) pprint("test msg >> ..") websockets.broadcast(recipients, payload) async def main(): async with websockets.serve(handler, "0.0.0.0", 8888): await process_events() # runs forever #process_flashsale() if __name__ == "__main__": asyncio.run(main(), debug=True)