Skip to content
31 changes: 23 additions & 8 deletions src/django_idom/websocket_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import logging
from dataclasses import dataclass
from threading import Thread
from typing import Any, Awaitable, Callable, Optional
from urllib.parse import parse_qsl

Expand All @@ -18,6 +19,11 @@
_logger = logging.getLogger(__name__)


def start_background_loop(loop: asyncio.AbstractEventLoop) -> None:
asyncio.set_event_loop(loop)
loop.run_forever()


@dataclass
class WebsocketConnection:
scope: dict
Expand All @@ -42,17 +48,26 @@ async def connect(self) -> None:
elif user is None:
_logger.warning("IDOM websocket is missing AuthMiddlewareStack!")

self._idom_dispatcher_future = asyncio.ensure_future(self._run_dispatch_loop())
self._recv_queue_loop = asyncio.new_event_loop()
t = Thread(
target=start_background_loop, args=[self._recv_queue_loop], daemon=True
)
t.start()
asyncio.run_coroutine_threadsafe(
self._run_dispatch_loop(), self._recv_queue_loop
)

async def disconnect(self, code: int) -> None:
if self._idom_dispatcher_future.done():
await self._idom_dispatcher_future
else:
self._idom_dispatcher_future.cancel()
self._recv_queue_loop.stop()
await super().disconnect(code)

async def receive_json(self, content: Any, **kwargs: Any) -> None:
await self._idom_recv_queue.put(LayoutEvent(**content))
asyncio.run_coroutine_threadsafe(
self._recv_queue_put(content, **kwargs), self._recv_queue_loop
)

async def _recv_queue_put(self, content: Any, **kwargs: Any):
await self._recv_queue.put(LayoutEvent(**content))

async def _run_dispatch_loop(self):
view_id = self.scope["url_route"]["kwargs"]["view_id"]
Expand All @@ -78,12 +93,12 @@ async def _run_dispatch_loop(self):
)
return

self._idom_recv_queue = recv_queue = asyncio.Queue()
self._recv_queue = asyncio.Queue()
try:
await dispatch_single_view(
Layout(component_instance),
self.send_json,
recv_queue.get,
self._recv_queue.get,
)
except Exception:
await self.close()
Expand Down