In the process of writing tests for my chat consumer I encountered with a problem of being unable to authenticate in tests, using WebSocketCommunicator. I have custom JwtTokenAuthMiddleware that implements authentication in sockets by using token in request query, because as I know, decent authentication with the use of authorization headers is not possible yet. Can you guys advise me on that or provide me with the example code, which I couldn't find across the net ? Btw, my chat is working without problems. Also tests should be perfectly fine, I took the guide from official documentation Django Channels 2.x Testing.
--JwtTokenAuthMiddlewate--
class JwtTokenAuthMiddleware:
def __init__(self, inner):
self.inner = inner
def __call__(self, scope):
close_old_connections()
try:
raw_token = scope['query_string'].decode().split('=')[1]
auth = JWTAuthentication()
validated_token = auth.get_validated_token(raw_token)
user = auth.get_user(validated_token)
scope['user'] = user
except (IndexError, InvalidToken, AuthenticationFailed):
scope['user'] = AnonymousUser()
return self.inner(scope)
JwtTokenAuthMiddlewareStack = lambda inner: JwtTokenAuthMiddleware(AuthMiddlewareStack(inner))
--Example Test--
@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_trainer_auth_success():
room = await database_sync_to_async(RoomFactory.create)()
trainer = room.trainer
trainer_token = await sync_to_async(get_token_for_user)(trainer.user)
room_url = f'ws/room/{room.id}/'
trainer_communicator = WebsocketCommunicator(application, f'{room_url}?t={trainer_token}')
connected, _ = await trainer_communicator.connect()
assert connected
trainer_connect_resp = await trainer_communicator.receive_json_from()
assert_connection(trainer_connect_resp, [], room.max_round_time)
await trainer_communicator.disconnect()
--Traceback of Error--
___________________________________________________________ test_trainer_auth_success ___________________________________________________________
self = <channels.testing.websocket.WebsocketCommunicator object at 0x7f6b9906f290>, timeout = 1
async def receive_output(self, timeout=1):
"""
Receives a single message from the application, with optional timeout.
"""
# Make sure there's not an exception to raise from the task
if self.future.done():
self.future.result()
# Wait and receive the message
try:
async with async_timeout(timeout):
> return await self.output_queue.get()
/usr/local/lib/python3.7/site-packages/asgiref/testing.py:74:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Queue at 0x7f6b98f76510 maxsize=0>
async def get(self):
"""Remove and return an item from the queue.
If queue is empty, wait until an item is available.
"""
while self.empty():
getter = self._loop.create_future()
self._getters.append(getter)
try:
> await getter
E concurrent.futures._base.CancelledError
/usr/local/lib/python3.7/asyncio/queues.py:159: CancelledError
During handling of the above exception, another exception occurred:
@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_trainer_auth_success():
room = await database_sync_to_async(RoomFactory.create)()
trainer = room.trainer
trainer_token = await sync_to_async(get_token_for_user)(trainer.user)
room_url = f'ws/room/{room.id}/'
# trainer_communicator = await assert_get_connected_communicator(application, room_url, trainer_token)
trainer_communicator = WebsocketCommunicator(application, f'{room_url}?t={trainer_token}')
> connected, _ = await trainer_communicator.connect()
apps/chat/tests/test_consumers.py:39:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.7/site-packages/channels/testing/websocket.py:36: in connect
response = await self.receive_output(timeout)
/usr/local/lib/python3.7/site-packages/asgiref/testing.py:85: in receive_output
raise e
/usr/local/lib/python3.7/site-packages/asgiref/testing.py:74: in receive_output
return await self.output_queue.get()
/usr/local/lib/python3.7/site-packages/asgiref/timeout.py:68: in __aexit__
self._do_exit(exc_type)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <asgiref.timeout.timeout object at 0x7f6b98f76d50>, exc_type = <class 'concurrent.futures._base.CancelledError'>
def _do_exit(self, exc_type: Type[BaseException]) -> None:
if exc_type is asyncio.CancelledError and self._cancelled:
self._cancel_handler = None
self._task = None
> raise asyncio.TimeoutError
E concurrent.futures._base.TimeoutError
/usr/local/lib/python3.7/site-packages/asgiref/timeout.py:105: TimeoutError
=============================================================== warnings summary ================================================================
/usr/local/lib/python3.7/site-packages/pytest_asyncio/plugin.py:39
/usr/local/lib/python3.7/site-packages/pytest_asyncio/plugin.py:39: PytestDeprecationWarning: direct construction of Function has been deprecated, please use Function.from_parent
item = pytest.Function(name, parent=collector)
/usr/local/lib/python3.7/site-packages/pytest_asyncio/plugin.py:45
/usr/local/lib/python3.7/site-packages/pytest_asyncio/plugin.py:45: PytestDeprecationWarning: direct construction of Function has been deprecated, please use Function.from_parent
item = pytest.Function(name, parent=collector) # To reload keywords.
I've been trying to do more or less the same and it seems there is no easy way to authenticate the user while testing consumers and communicators. There is a GH issue about this topic where a few (working!) workarounds are given, maybe you find it helpful.