python-3.xpython-asynciozeromqpyzmqpython-3.8

How to execute a ZeroMQ PUSH/PULL archetype in an asynchronous way?


I want to initiate a PULL in a port and want to receive from other ports to my PULL port. In the case of a PULL port, it listens asynchronously and when it receives a message, it just prints the message in the console. So for that I have written a method inside a Push-class, which will send the message to the PULL port.

My code is as follows :

import random
import zmq
import time
import sys
import string
import asyncio
import zmq.asyncio

class Push():
    def __init__(self, port, addr='localhost'):
        self.port = port
        self.addr = addr

        self.ctx = zmq.Context()
        self.scoket = self.ctx.socket(zmq.PUSH)
        self.scoket.connect(f'tcp://{self.addr}:{selfa.port}')

    def send(self):
        chars = string.ascii_uppercase + string.ascii_lowercase
        message = ''.join(random.choice(chars) for _ in range(4))
        self.scoket.send(bytes(message, 'utf-8'))
        print(f'sending: {message}')

class Pull():
    def __init__(self, port, addr='*'):
        self.port = port
        self.addr = addr

        self.ctx = zmq.Context()
        self.socket = self.ctx.socket(zmq.PULL)
        self.socket.bind(f'tcp://{self.addr}:{self.port}')

    async def listen(self, listener):
        while True:
            string = await self.socket.recv()
            listener(string)


if __name__ == '__main__':
    push = Push('55501')

    async def send():
        while True:
            await asyncio.sleep(5)
            print('Sending...')
            push.send()

    pull = Pull('55501')

    try:
        asyncio.run(
            pull.listen(print),
            send(),
        )
    except KeyboardInterrupt:
        print('exiting...')
        exit()

The above code is not running. The code stops at the listen method.


Solution

  • #ADAPTED FROM PYMATA EXPRESS EXAMPLE CONCURRENTTAKS
    #https://github.com/MrYsLab/pymata-express/
    import asyncio
    import zmq
    import json
    import zmq.asyncio as zmq_asyncio
    from pymata_express.pymata_express import PymataExpress
    
    
    class ConcurrentTasks:
    
        def __init__(self, board):
    
    
            self.loop = board.get_event_loop()
            self.board = board
    
            self.ctxsync = zmq.Context()
            self.context = zmq.asyncio.Context()
            self.rep = self.context.socket(zmq.REP)
            self.rep.bind("tcp://*:5558")
    
            self.trigger_pin = 53
            self.echo_pin = 51
    
            loop.run_until_complete(self.async_init_and_run())
    
        ### START:  NEW CODE THAT RESOLVED THE ISSUE
        async def pingsonar(self):
            value = await self.board.sonar_read(self.trigger_pin)
            return value
    
        async def readsonar(self):
            while True:
                rep_recv = await self.rep.recv() 
                value = await asyncio.wait([self.pingsonar()])
                valuesonar = list(value[0])[0].result()
                json_data = json.dumps(valuesonar) 
                await self.rep.send(json_data.encode()) 
                await asyncio.sleep(1 / 1000) #maybe this line isn't necessary
    
        ### END : NEW CODE THAT RESOLVED THE ISSUE
    
        async def async_init_and_run(self):
    
            await self.board.set_pin_mode_sonar(self.trigger_pin, self.echo_pin)
    
            readsonar = asyncio.create_task(self.readsonar())
            await readsonar
    
            # OTHER CREATED_TASK GO HERE, (removed them in the MVE, but they work fine)
    
    
    if __name__ == "__main__":
        loop = asyncio.get_event_loop()
        my_board = PymataExpress()
        try:
            ConcurrentTasks(my_board)
        except (KeyboardInterrupt, RuntimeError):
            loop.run_until_complete(my_board.shutdown())
            print('goodbye')
        finally:
            loop.close()