pythonmulti-agent

How to write a method that checks if an agent is done with a task in Osbrain?


I have a question on how to write a proper function that checks if an agent is done with their task in Osbrain. I have three agents, the Transport agent, Node agent and the Coordinator agent. The coordinator agent's main task is to synchronize the actions of the other agents. The Coordinator agents bind to SYNC_PUB and the Node and the Transport agents SUB to the coordinator agent. My initial implementation hanged after the first timestep/iteration. Am I implementing the status_checker method wrongly?

from osbrain import run_nameserver, run_agent, Agent

import time

SYNCHRONIZER_CHANNEL_1 = 'coordinator1'


class TransportAgent(Agent):
    def transportAgent_first_handler(self, message):
        # time.sleep(2)
        self.log_info(message)
        self.send(SYNCHRONIZER_CHANNEL_1, 'is_done', handler='process_reply')

    def process_reply(self, message):
        yield 1


class NodeAgent(Agent):
    def NodeAgent_first_handler(self, message):
        time.sleep(2)
        self.log_info(message)
        self.send(SYNCHRONIZER_CHANNEL_1, 'is_done', handler='process_reply')

    def process_reply(self, message):
        yield 1


class SynchronizerCoordinatorAgent(Agent):
    def on_init(self):
        self.network_agent_addr = self.bind('SYNC_PUB', alias=SYNCHRONIZER_CHANNEL_1, handler='status_handler')
        self.status_list = []

    def first_synchronization(self, time_step, iteration):
        self.send(SYNCHRONIZER_CHANNEL_1, message={'time_step': time_step, 'iteration': iteration},
                  topic='first_synchronization')

    def status_handler(self, message):
        yield 'I have added you to the status_list'
        self.status_list.append(message)

    def status_checker(self):
        count = 0
        while len(self.status_list) < 2:
            count += 1
            time.sleep(1)
            return
        self.status_list.clear()


    def init_environment(self):
        self.TransportAgent = run_agent('TransportAgent', base=TransportAgent)

        self.NodeAgent = run_agent('NodeAgent', base=NodeAgent)

        self.TransportAgent.connect(self.network_agent_addr, alias=SYNCHRONIZER_CHANNEL_1,
                                    handler={'first_synchronization': TransportAgent.transportAgent_first_handler})
        self.NodeAgent.connect(self.network_agent_addr, alias=SYNCHRONIZER_CHANNEL_1,
                               handler={'first_synchronization': NodeAgent.NodeAgent_first_handler})


if __name__ == '__main__':

    ns = run_nameserver()
    synchronizer_coordinator_agent = run_agent('Synchronizer_CoordinatorAgent',
                                               base=SynchronizerCoordinatorAgent)
    synchronizer_coordinator_agent.init_environment()

    for iteration in range(1, 2):
        for time_step in range(0, 90, 30):
            synchronizer_coordinator_agent.first_synchronization(time_step=time_step, iteration=iteration)
            synchronizer_coordinator_agent.status_checker()
    time.sleep(1)

It prints this and then hangs

(NetworkAgent): {'time_step': 0, 'iteration': 1}

(RMOAgent): {'time_step': 0, 'iteration': 1}


Solution

  • Yeah, it seems your status_checker() method may be broken. I am guessing you want that method to block until 2 messages are in the status_list (one from the node agent and another one from the transport agent).

    So you may be looking for something like:

    def status_checker(self):
        while True:
            if len(self.status_list) == 2:
                break
            time.sleep(1)
        self.status_list.clear()
    

    However, when you call that method from the proxy:

    synchronizer_coordinator_agent.status_checker()
    

    The coordinator is blocking executing that call, so it will not be processing other incoming messages. A quick and dirty workaround is to use an unsafe call like this:

    synchronizer_coordinator_agent.unsafe.status_checker()
    

    The main issue I see here is the way you are handling that status checker from __main__. You should move your synchronization/steps into your coordinator. That means:

    Your main could look like this:

    if __name__ == "__main__":
    
        ns = run_nameserver()
    
        coordinator = run_agent(...)
        coordinator.init_environment()
        coordinator.start_iterations()
    
        while not coordinator.finished():
            time.sleep(0.5)
    
        ns.shutdown()
    

    Not really related to this, but note that your current range(1, 2) will result in just one iteration (that may be intentional though). If you want 2 iterations you can use range(2).