djangopython-3.xdjango-channelspytest-djangopytest-asyncio

Authentication in Django Channels v2 tests with WebSocketCommunicator


In the process of writing tests for my chat consumer I encountered with a problem of being unable to authenticate in tests, using WebSocketCommunicator. I have custom JwtTokenAuthMiddleware that implements authentication in sockets by using token in request query, because as I know, decent authentication with the use of authorization headers is not possible yet. Can you guys advise me on that or provide me with the example code, which I couldn't find across the net ? Btw, my chat is working without problems. Also tests should be perfectly fine, I took the guide from official documentation Django Channels 2.x Testing.

--JwtTokenAuthMiddlewate--

class JwtTokenAuthMiddleware:
    def __init__(self, inner):
        self.inner = inner

    def __call__(self, scope):
        close_old_connections()
        try:
            raw_token = scope['query_string'].decode().split('=')[1]
            auth = JWTAuthentication()
            validated_token = auth.get_validated_token(raw_token)
            user = auth.get_user(validated_token)
            scope['user'] = user
        except (IndexError, InvalidToken, AuthenticationFailed):
            scope['user'] = AnonymousUser()
        return self.inner(scope)


JwtTokenAuthMiddlewareStack = lambda inner: JwtTokenAuthMiddleware(AuthMiddlewareStack(inner))

--Example Test--

@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_trainer_auth_success():
    room = await database_sync_to_async(RoomFactory.create)()
    trainer = room.trainer
    trainer_token = await sync_to_async(get_token_for_user)(trainer.user)
    room_url = f'ws/room/{room.id}/'

    trainer_communicator = WebsocketCommunicator(application, f'{room_url}?t={trainer_token}')
    connected, _ = await trainer_communicator.connect()
    assert connected

    trainer_connect_resp = await trainer_communicator.receive_json_from()
    assert_connection(trainer_connect_resp, [], room.max_round_time)
    await trainer_communicator.disconnect()

--Traceback of Error--

___________________________________________________________ test_trainer_auth_success ___________________________________________________________

self = <channels.testing.websocket.WebsocketCommunicator object at 0x7f6b9906f290>, timeout = 1

    async def receive_output(self, timeout=1):
        """
        Receives a single message from the application, with optional timeout.
        """
        # Make sure there's not an exception to raise from the task
        if self.future.done():
            self.future.result()
        # Wait and receive the message
        try:
            async with async_timeout(timeout):
>               return await self.output_queue.get()

/usr/local/lib/python3.7/site-packages/asgiref/testing.py:74: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <Queue at 0x7f6b98f76510 maxsize=0>

    async def get(self):
        """Remove and return an item from the queue.

        If queue is empty, wait until an item is available.
        """
        while self.empty():
            getter = self._loop.create_future()
            self._getters.append(getter)
            try:
>               await getter
E               concurrent.futures._base.CancelledError

/usr/local/lib/python3.7/asyncio/queues.py:159: CancelledError

During handling of the above exception, another exception occurred:

    @pytest.mark.django_db(transaction=True)
    @pytest.mark.asyncio
    async def test_trainer_auth_success():
        room = await database_sync_to_async(RoomFactory.create)()
        trainer = room.trainer
        trainer_token = await sync_to_async(get_token_for_user)(trainer.user)
        room_url = f'ws/room/{room.id}/'

        # trainer_communicator = await assert_get_connected_communicator(application, room_url, trainer_token)
        trainer_communicator = WebsocketCommunicator(application, f'{room_url}?t={trainer_token}')
>       connected, _ = await trainer_communicator.connect()

apps/chat/tests/test_consumers.py:39: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.7/site-packages/channels/testing/websocket.py:36: in connect
    response = await self.receive_output(timeout)
/usr/local/lib/python3.7/site-packages/asgiref/testing.py:85: in receive_output
    raise e
/usr/local/lib/python3.7/site-packages/asgiref/testing.py:74: in receive_output
    return await self.output_queue.get()
/usr/local/lib/python3.7/site-packages/asgiref/timeout.py:68: in __aexit__
    self._do_exit(exc_type)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <asgiref.timeout.timeout object at 0x7f6b98f76d50>, exc_type = <class 'concurrent.futures._base.CancelledError'>

    def _do_exit(self, exc_type: Type[BaseException]) -> None:
        if exc_type is asyncio.CancelledError and self._cancelled:
            self._cancel_handler = None
            self._task = None
>           raise asyncio.TimeoutError
E           concurrent.futures._base.TimeoutError

/usr/local/lib/python3.7/site-packages/asgiref/timeout.py:105: TimeoutError
=============================================================== warnings summary ================================================================
/usr/local/lib/python3.7/site-packages/pytest_asyncio/plugin.py:39
  /usr/local/lib/python3.7/site-packages/pytest_asyncio/plugin.py:39: PytestDeprecationWarning: direct construction of Function has been deprecated, please use Function.from_parent
    item = pytest.Function(name, parent=collector)

/usr/local/lib/python3.7/site-packages/pytest_asyncio/plugin.py:45
  /usr/local/lib/python3.7/site-packages/pytest_asyncio/plugin.py:45: PytestDeprecationWarning: direct construction of Function has been deprecated, please use Function.from_parent
    item = pytest.Function(name, parent=collector)  # To reload keywords.

Solution

  • I've been trying to do more or less the same and it seems there is no easy way to authenticate the user while testing consumers and communicators. There is a GH issue about this topic where a few (working!) workarounds are given, maybe you find it helpful.