djangoredisdjango-channelsconsumerasgi

Django Channels send message to another channel layer


let me simplify my question in one sentence: in one consumer, how can I access to another consumer to send message?


basically, I have two consumers in seprated apps:

  1. note consumer in note app to manage system and crud notifications.
  2. chat consumer in chat app to manage social messaging
application = ProtocolTypeRouter({
  "http": django_asgi_app,
  "websocket": AllowedHostsOriginValidator(
        SessionMiddlewareStack(
            AuthMiddlewareStack(
                URLRouter(
                    [
                        path('ws/chat/<int:pk>/', ChatConsumer.as_asgi()),
                        path('ws/note/<int:pk>/', NoteConsumer.as_asgi()),
                    ]
                )
            )
        )
    ),
})

I am implementing online status function. it works like this:

note ws connection establishes in every page, as long as you are on our site, you have a notification ws to receive notification. this correspond to the idea of online, therefore, we can save online record to True in db on note ws connection, and False when disconnect, to indicate online status.

class NoteConsumer(AsyncJsonWebsocketConsumer):
    @database_sync_to_async
    def noteOnline(self):
        user = User.objects.get(pk=int(self.scope['user'].pk))
        user.chatProfile.online = True
        user.save()
    
    @database_sync_to_async
    def noteOffline(self):
        user = User.objects.get(pk=int(self.scope['user'].pk))
        user.chatProfile.online = False
        user.save()

    async def connect(self):
        # ...
        await self.noteOnline()

    async def disconnect(self):
        # ...
        await self.noteOffline()

in chat friends view, user may want to know if the target he/she is talking to logged off or not, without ws implementation, the information will not be updated immediately, they need to refresh, I don't like that. therefore, I want to add a messaging method in chat consumer:

class ChatConsumer(AsyncJsonWebsocketConsumer):
    async def online_status(self, event):
        # print(f"{event['user']} online to {self.room_name}")
        await self.send(text_data=json.dumps({
            'type': 'online', 
            'message': event['message'], 
            'user': event['user'], 
        }))

which enable us to send online status notification to chat socket, in chat.consumers

class NoteConsumer(AsyncJsonWebsocketConsumer):
    # ...

    # try to get layer from ChatConsumer, in my mind,
    # get_channel_layer takes first arg as unique 
    # identifier of the socket layer and will find 
    # corresponding one for me, but seems not working 
    @database_sync_to_async
    def noteOnline(self):
        user = User.objects.get(pk=int(self.scope['user'].pk))
        user.chatProfile.online = True
        user.save()
        for cp in user.chatProfile.getRecentContact():
            # print(f'{user} connect to {self.room_name}')
            # async_to_sync(get_channel_layer)().group_send(
            get_channel_layer().group_send(
                'chat_{}'.format(cp.pk), 
                {
                    'type': 'online_status', 
                    'message': 'online', 
                    'user': user.pk, 
                }
            )

    @database_sync_to_async
    def noteOffline(self):
        user = User.objects.get(pk=int(self.scope['user'].pk))
        user.chatProfile.online = False
        user.save()
        for cp in user.chatProfile.getRecentContact():
            get_channel_layer().group_send(
                'chat_{}'.format(cp.pk), 
                {
                    'type': 'online_status', 
                    'message': 'offline', 
                    'user': user.pk, 
                }
            )

thank you so much for reading this long, the code above needs a little more explanation:

chat socket architecture: let's say you are user1, and you have a friend user2, user1 uses ws1 to receive message, and connects to user2 to send message.

chat profile iteration gets me the pks of who I talks to recently, and send message to their receiving socket


I doubt anything went wrong, but it seems that online_status is never called. I wonder why this happens and will be very grateful for your help.


Solution

  • I walked around with channel_layer_alias, but it did not work, it seems that my previous mindset was wrong: as long as you have room name, you can access to the layer anywhere with get_channel_layer

    it is weird you can use get_channel_layer outside consumer, but not in another consumer, the channel layer can only be accessed within the consumer, which after I modified the code to this, it worked:

    class ChatConsumer(AsyncJsonWebsocketConsumer):
        @staticmethod
        def send_online(sender_pk, receiver_pk):
            channel_layer = get_channel_layer()
            async_to_sync(channel_layer.group_send)(
                'chat_{}'.format(receiver_pk), 
                {
                    'type': 'online_status', 
                    'message': 'online', 
                    'user': sender_pk, 
                }
            )
    
        @staticmethod
        def send_offline(sender_pk, receiver_pk):
            channel_layer = get_channel_layer()
            async_to_sync(channel_layer.group_send)(
                'chat_{}'.format(receiver_pk), 
                {
                    'type': 'online_status', 
                    'message': 'offline', 
                    'user': sender_pk, 
                }
            )
    
    class NoteConsumer(AsyncJsonWebsocketConsumer):
        @database_sync_to_async
        def noteOnline(self):
            from chat.consumers import ChatConsumer
    
            user = User.objects.get(pk=int(self.scope['user'].pk))
            user.chatProfile.online = True
            user.save()
            for cp in user.chatProfile.getRecentContact():
                ChatConsumer.send_online(user.pk, cp.pk)
                
    
        @database_sync_to_async
        def noteOffline(self):
            from chat.consumers import ChatConsumer
    
            user = User.objects.get(pk=int(self.scope['user'].pk))
            user.chatProfile.online = False
            user.save()
            for cp in user.chatProfile.getRecentContact():
                ChatConsumer.send_offline(user.pk, cp.pk)
    

    I will leave this post here just in case some one run into this occasion and need a hand, if you have a better option, I am very willing to accept it as answer.