I tried setting up multiple sub-processes, and using PyTorch
to train a separate model on a separate dataset within each sub-process. Here is my code: (no cuda
/GPU involved yet)
##################################################################################
# this part of code has nothing to do with the error, we include it for the completeness
import torch
from torch import nn
from torch.utils.data import DataLoader
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import fetch_california_housing
class CADataset(torch.utils.data.Dataset):
'''
Prepare the Boston dataset for regression
'''
def __init__(self, X, y, scale_data=True):
if not torch.is_tensor(X) and not torch.is_tensor(y):
# Apply scaling if necessary
if scale_data:
X = StandardScaler().fit_transform(X)
self.X = torch.from_numpy(X)
self.y = torch.from_numpy(y)
def __len__(self):
return len(self.X)
def __getitem__(self, i):
return self.X[i], self.y[i]
class MLP(nn.Module):
'''
Multilayer Perceptron for regression.
'''
def __init__(self):
super().__init__()
self.layers = nn.Sequential(
nn.Linear(8, 32),
nn.ReLU(),
nn.Linear(32, 16),
nn.ReLU(),
nn.Linear(16, 1)
)
def forward(self, x):
'''
Forward pass
'''
return self.layers(x)
def mlp_demo(branchID: int):
housing = fetch_california_housing() # in this toy example data from all branches are the same, while in my real application they are not.
print('in branch {}'.format(branchID))
print(housing.data.shape)
print(housing.target.shape)
# Prepare CA dataset
dataset = CADataset(housing.data, housing.target)
trainloader = torch.utils.data.DataLoader(dataset, batch_size=64, shuffle=True, num_workers=4)
# Initialize the MLP
mlp = MLP()
# Define the loss function and optimizer
loss_function = nn.L1Loss()
optimizer = torch.optim.Adam(mlp.parameters(), lr=1e-4)
# Run the training loop
for epoch in range(0, 5): # 5 epochs at maximum
# Print epoch
print(f'Starting epoch {epoch+1}')
# Set current loss value
current_loss = 0.0
# Iterate over the DataLoader for training data
for i, data in enumerate(trainloader, 0):
# Get and prepare inputs
inputs, targets = data
inputs, targets = inputs.float(), targets.float()
targets = targets.reshape((targets.shape[0], 1))
# Zero the gradients
optimizer.zero_grad()
# Perform forward pass
outputs = mlp(inputs)
# Compute loss
loss = loss_function(outputs, targets)
# Perform backward pass
loss.backward()
# Perform optimization
optimizer.step()
# Print statistics
current_loss += loss.item()
if i % 20 == 0:
print('Loss after mini-batch %5d: %.3f' %
(i + 1, current_loss / 500))
current_loss = 0.0
# Process is complete.
print('Training process has finished.')
##################################################################################
# above code has nothing to do with the error, we include it for the completeness
from torch.multiprocessing import Pool, set_start_method
if __name__ == '__main__':
# Set fixed random number seed
torch.manual_seed(42)
try:
set_start_method('spawn')
except RuntimeError:
pass
with Pool() as pool:
pool.map(mlp_demo, range(3))
I learnt to import the set_start_method
function from here, but I still got the following error:
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib64/python3.9/multiprocessing/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
File "/usr/lib64/python3.9/multiprocessing/pool.py", line 48, in mapstar
return list(map(*args))
File "/home/wangyu/code/test_cuda/demo.py", line 72, in mlp_demo
for i, data in enumerate(trainloader, 0):
File "/usr/local/lib64/python3.9/site-packages/torch/utils/data/dataloader.py", line 441, in __iter__
return self._get_iterator()
File "/usr/local/lib64/python3.9/site-packages/torch/utils/data/dataloader.py", line 388, in _get_iterator
return _MultiProcessingDataLoaderIter(self)
File "/usr/local/lib64/python3.9/site-packages/torch/utils/data/dataloader.py", line 1042, in __init__
w.start()
File "/usr/lib64/python3.9/multiprocessing/process.py", line 118, in start
assert not _current_process._config.get('daemon'), \
AssertionError: daemonic processes are not allowed to have children
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/wangyu/code/test_cuda/demo.py", line 110, in <module>
pool.map(mlp_demo, range(3))
File "/usr/lib64/python3.9/multiprocessing/pool.py", line 364, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib64/python3.9/multiprocessing/pool.py", line 771, in get
raise self._value
AssertionError: daemonic processes are not allowed to have children
In my real application, I have several datasets, and their trainings are independent. I know I could run multiple instances of python3 my_script.py --dataset=<my_ds>
, but since their pre-processings are correlated and the results of the trainings are to be aggregated, I really want it could be done within one python script (and one python instance).
Is there any way taht I can fix the demon
error?
You can replace:
from torch.multiprocessing import Pool, set_start_method
With:
from concurrent.futures import ProcessPoolExecutor as Pool
from multiprocessing import set_start_method
To avoid to start your (pool) processes as daemon and allow them to create sub-processes.