I want to parallelize the "for loop" iteration using OpenMP threads or similar techniques in python. The code is shown below. "idexs" iterates for 1024 times and all it does is just picks an index (i) and do an array access at self._storage[i] and stores all the information in data.
Is there a technique from OpenMP in python that I can use to speedup this operation?
Code:
def _encode_sample(self, idxes):
obses_t, actions, rewards, obses_tp1, dones = [], [], [], [], []
for i in idxes:
data = self._storage[i]
obs_t, action, reward, obs_tp1, done = data
obses_t.append(np.array(obs_t, copy=False))
actions.append(np.array(action, copy=False))
rewards.append(reward)
obses_tp1.append(np.array(obs_tp1, copy=False))
dones.append(done)
return np.array(obses_t), np.array(actions), np.array(rewards), np.array(obses_tp1), np.array(dones)
I want the for loop to be executed like this:
Code after using ray (answer from @cade):
import numpy as np
import random
import ray
import psutil
num_cpus = psutil.cpu_count(logical=False)
ray.init(num_cpus=1)
class ReplayBuffer(object):
def __init__(self, size):
"""Create Prioritized Replay buffer.
Parameters
----------
size: int
Max number of transitions to store in the buffer. When the buffer
overflows the old memories are dropped.
"""
self._storage = []
self._maxsize = int(size)
self._next_idx = 0
def __len__(self):
return len(self._storage)
def clear(self):
#self._storage = []
self._storage = []
self._next_idx = 0
def add(self, obs_t, action, reward, obs_tp1, done):
data = (obs_t, action, reward, obs_tp1, done)
if self._next_idx >= len(self._storage):
self._storage.append(data)
else:
self._storage[self._next_idx] = data
self._next_idx = (self._next_idx + 1) % self._maxsize
def _encode_sample(self, idxes):
#ray.init()
n = 256
# using list comprehension
split_idxes = [idxes[i * n:(i + 1) * n] for i in range((len(idxes) + n - 1) // n )]
futures = []
for subrange in split_idxes:
futures.append(_encode_sample_helper.remote(self._storage, subrange))
obses_t, actions, rewards, obses_tp1, dones = [], [], [], [], []
outputs = ray.get(futures)
for a, b, c, d, e in outputs:
obses_t.extend(a)
actions.extend(b)
rewards.extend(c)
obses_tp1.extend(d)
dones.extend(e)
return np.array(obses_t), np.array(actions), np.array(rewards), np.array(obses_tp1), np.array(dones)
def make_index(self, batch_size):
return [random.randint(0, len(self._storage) - 1) for _ in range(batch_size)]
def make_latest_index(self, batch_size):
idx = [(self._next_idx - 1 - i) % self._maxsize for i in range(batch_size)]
np.random.shuffle(idx)
return idx
def sample_index(self, idxes):
return self._encode_sample(idxes)
def sample(self, batch_size):
"""Sample a batch of experiences.
Parameters
----------
batch_size: int
How many transitions to sample.
Returns
-------
obs_batch: np.array
batch of observations
act_batch: np.array
batch of actions executed given obs_batch
rew_batch: np.array
rewards received as results of executing act_batch
next_obs_batch: np.array
next set of observations seen after executing act_batch
done_mask: np.array
done_mask[i] = 1 if executing act_batch[i] resulted in
the end of an episode and 0 otherwise.
"""
if batch_size > 0:
idxes = self.make_index(batch_size)
else:
idxes = range(0, len(self._storage))
return self._encode_sample(idxes)
def collect(self):
return self.sample(-1)
@ray.remote(num_cpus=1)
def _encode_sample_helper(_storage, subrange):
obses_t, actions, rewards, obses_tp1, dones = [], [], [], [], []
print(subrange)
for i in subrange:
data = _storage[i]#storage[i]
obs_t, action, reward, obs_tp1, done = data
obses_t.append(np.array(obs_t, copy=False))
actions.append(np.array(action, copy=False))
rewards.append(reward)
obses_tp1.append(np.array(obs_tp1, copy=False))
dones.append(done)
return np.array(obses_t), np.array(actions), np.array(rewards), np.array(obses_tp1), np.array(dones)
I am answering with Ray since you allowed for "OpenMP threads or similar".
Ray makes parallelizing python really easy -- you could do what you want with the following (assuming you have four cores on your machine):
import ray
def _encode_sample(self, idxes):
split_idxes = np.array_split(idxes, 4)
for subrange in split_idxes:
futures.append(_encode_sample_helper.remote(self.storage, subrange))
obses_t, actions, rewards, obses_tp1, dones = [], [], [], [], []
outputs = ray.get(futures)
for a, b, c, d, e in outputs:
obses_t.extend(a)
actions.extend(b)
rewards.extend(c)
obses_tp1.extend(d)
dones.extend(e)
return np.array(obses_t), np.array(actions), np.array(rewards), np.array(obses_tp1), np.array(dones)
@ray.remote(num_cpus=1)
def _encode_sample_helper(storage, subrange):
obses_t, actions, rewards, obses_tp1, dones = [], [], [], [], []
for i in subrange:
data = storage[i]
obs_t, action, reward, obs_tp1, done = data
obses_t.append(np.array(obs_t, copy=False))
actions.append(np.array(action, copy=False))
rewards.append(reward)
obses_tp1.append(np.array(obs_tp1, copy=False))
dones.append(done)
return np.array(obses_t), np.array(actions), np.array(rewards), np.array(obses_tp1), np.array(dones)