I am trying to create an OrderBook agent that can be shared across all ranks. I can share the agent using mpi4py send and receive operations. But based on the documentation here I am assuming self.context.synchronize(restore_orderbook)
will create ghost agents in each rank but the program is showing KeyError: 0
. The agent created in rank 0 is not being copied to other ranks as ghost even after using request_agents
function.
Expected output:
Orderbook is created on rank 0 and ghost copies are made on rank 1,2,3 using the OrderBook created in rank 0. Print orders as per the if else logic present in the step
function.
test2.py
from typing import Dict, Tuple
from mpi4py import MPI
import numpy as np
from dataclasses import dataclass
from repast4py import core, random, space, schedule, logging, parameters
from repast4py import context as ctx
import repast4py
from queue import Queue
import pandas as pd
from datetime import datetime, timedelta
import random as rnd
order_count = 1000
items = np.arange(1, 101)
quantities = np.arange(1, 11)
start_date = datetime(2023, 1, 1)
end_date = datetime(2023, 1, 3)
item_values = np.random.choice(items, size=order_count)
qty_values = np.random.choice(quantities, size=order_count)
time_deltas = np.random.randint(0, int((end_date - start_date).total_seconds()), size=order_count)
date_values = [start_date + timedelta(seconds=int(delta)) for delta in time_deltas]
data = {
'item': item_values,
'qty': qty_values,
'order_datetime': date_values
}
df = pd.DataFrame(data).sort_values(by=['order_datetime'], ascending=True).reset_index(drop=True)
df['order_id'] = np.arange(1, order_count+1)
df['order_datetime'] = pd.to_datetime(df['order_datetime'])
df['tick'] = df.apply(lambda x: 1 + int((x['order_datetime'] - df['order_datetime'].min()).total_seconds()/173), axis=1)
df.to_csv('test_data.csv', index=False)
class OrderBook(core.Agent):
TYPE = 0
def __init__(self, a_id: int, rank: int):
super().__init__(id=a_id, type=OrderBook.TYPE, rank=rank)
self.df = list(pd.read_csv('test_data.csv').to_records())
# self.df['order_datetime'] = pd.to_datetime(self.df['order_datetime'])
def save(self) -> Tuple:
return (self.uid, self.df)
def get_order(self, tick):
return rnd.choice(self.df)
def update(self, data):
self.df = data
orderbook_cache = {}
def restore_orderbook(orderbook_data: Tuple):
uid = orderbook_data[0]
if uid[1] == OrderBook.TYPE:
if uid in orderbook_cache:
ob = orderbook_cache[uid]
else:
ob = OrderBook(uid[0], uid[1], uid[2])
orderbook_cache[uid] = ob
ob.df = orderbook_data[1]
return ob
def create_agent(agent_data):
uid = agent_data[0]
book = OrderBook(uid[0], uid[2])
book.df = agent_data[1]
return book
class Model:
def __init__(self, comm: MPI.Intracomm, params: Dict):
self.context = ctx.SharedContext(comm)
self.rank = comm.Get_rank()
if self.rank == 0:
book = OrderBook(1, 0)
self.context.add(book)
requests = []
print(book.uid)
else:
requests = [((1,0,0), 0)]
self.context.request_agents(requests, create_agent)
self.runner = schedule.init_schedule_runner(comm)
self.runner.schedule_repeating_event(1, 1, self.step)
self.runner.schedule_stop(params['stop.at'])
self.runner.schedule_end_event(self.at_end)
def step(self):
tick = self.runner.schedule.tick
self.context.synchronize(restore_orderbook)
if self.rank == 0 and tick < 25:
for b in self.context.agents(OrderBook.TYPE):
order = b.get_order(tick)
print(self.rank, order)
elif self.rank == 1 and 25 <= tick < 50:
for b in self.context.agents(OrderBook.TYPE):
order = b.get_order(tick)
print(self.rank, order)
elif self.rank == 2 and 50 <= tick < 75:
for b in self.context.agents(OrderBook.TYPE):
order = b.get_order(tick)
print(self.rank, order)
elif self.rank == 3 and 75 <= tick < 100:
for b in self.context.agents(OrderBook.TYPE):
order = b.get_order(tick)
print(self.rank, order)
def at_end(self):
print('simulation complete')
def start(self):
self.runner.execute()
def run(params: Dict):
global model
model = Model(MPI.COMM_WORLD, params)
model.start()
if __name__ == "__main__":
parser = parameters.create_args_parser()
args = parser.parse_args()
params = parameters.init_params(args.parameters_file, args.parameters)
run(params)
test.yaml
random.seed: 42
stop.at: 100
orders.count: 1000
command to run
mpirun -n 4 python test2.py test.yaml
I think the ghost agents are not getting created with self.context.request_agents(requests, create_agent)
. Is this usage right?
Error:
Traceback (most recent call last):
File "test2.py", line 136, in <module>
run(params)
File "test2.py", line 129, in run
model.start()
File "test2.py", line 124, in start
self.runner.execute()
File "/home/vinay/miniconda3/lib/python3.8/site-packages/repast4py/schedule.py", line 584, in execute
self.schedule.execute()
File "/home/vinay/miniconda3/lib/python3.8/site-packages/repast4py/schedule.py", line 406, in execute
self.executing_group.execute(self.queue)
File "/home/vinay/miniconda3/lib/python3.8/site-packages/repast4py/schedule.py", line 243, in execute
interrupted = self.execute_evts(self.prioritized, queue)
File "/home/vinay/miniconda3/lib/python3.8/site-packages/repast4py/schedule.py", line 223, in execute_evts
evt()
File "/home/vinay/miniconda3/lib/python3.8/site-packages/repast4py/schedule.py", line 60, in __call__
self.evt()
File "test2.py", line 108, in step
for b in self.context.agents(OrderBook.TYPE):
File "/home/vinay/miniconda3/lib/python3.8/site-packages/repast4py/context.py", line 358, in agents
return self._agents_by_type[agent_type].values().__iter__()
KeyError: 0
Edit: Working solution
I replaced
for b in self.context.agents(OrderBook.TYPE):
order = b.get_order(tick)
print(self.rank, order)
with
b = self.context.ghost_agent((1,0,0))
order = b.get_order(tick)
print(self.rank, order)
The problem here is that all the ranks must call request_agents
. In your code rank 0 is not calling it and so the code hangs waiting for rank 0 to complete the call. In MPI, this is called a collective operation. This is also mentioned in the docs for request_agents
"This is a collective operation and all ranks must call it, regardless of whether agents are being requested by that rank. The requested agents will be automatically added as ghosts to this rank"
This is easy to overlook though. When there's a hang in an MPI program it's usually something like this.
I tried the code with this update,
if self.rank == 0:
book = OrderBook(1, 0)
self.context.add(book)
requests = []
# print(book.uid)
else:
requests = [((1,0,0), 0)]
self.context.request_agents(requests, create_agent)
and the request_agents
call works. However, there's an issue in the create_agent call. The OrderBook constructor only takes 3 arguments but create_agent has 4.
Lastly, I'm not sure how well the pandas data frame will pickle and transfer across processes. That might work, but if you do get errors maybe try to pass the data frame as a list of lists or something like that.