I am trying to perform IO operations with multiple directories using ray, modin(with ray backend) and python. The file writes pause and the memory and disk usages do not change at all and the program is blocked.
I have a ray actor set up as this
import os
os.environ["MODIN_ENGINE"] = "ray" # Modin will use Ray
import ray
import modin.pandas as mpd
from numpy.core import numeric
from tqdm import tqdm
@ray.remote
class DatasetHelper:
# Class Variables (static) are to be written here
@ray.method(num_returns=1)
def get_dataset(self):
return self.dataset
@ray.method(num_returns=1)
def generate_dataset(self):
# generates some dataset and returns a dictionary.
return {'status': 1,
'data_dir': self.data_dir}
@ray.method(num_returns=1)
def get_config(self):
return {
"data_dir": self.data_dir,
"data_map_dir": self.data_map_dir,
"out_dir": self.out_dir
}
def _validate_initialization(self):
# Logic here isnt relevant
if self.data_dir == "" or self.data_map == "" or self.nRows == 42:
return False
return True
def __init__(self, data_dir, data_map_dir, nRows, out_dir):
self.data = {}
self.data_map = {}
self.dataset = mpd.DataFrame()
self.timestamp = []
self.first = True
self.out_dir = out_dir
self.data_dir = data_dir
self.data_map_dir = data_map_dir
self.nRows = nRows
def _extract_data(self):
print('Reading data ...')
for each in os.listdir(self.data_dir):
self.data[each.split('.')[0]] = mpd.read_csv(os.path.join(self.data_dir, each),
header=None,
nrows=self.nRows)
print('Data read successfully ...')
print('Validating times for monotonicity and uniqueness ... ')
for each in tqdm(self.data):
if mpd.to_datetime(self.data[each][0]).is_monotonic and mpd.to_datetime(self.data[each][0]).is_unique:
pass
else:
print('Validation failed for uuid: {}'.format(each))
return
def _extract_data_maps(self):
self.data_map = mpd.read_pickle(self.data_map_dir)
print('Data-Map unpickled successfully ...')
The main logic is structured as shown below,
from functools import cached_property
import os
import threading
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
from DatasetHelper import DatasetHelper
import gc
import json
import logging
from multiprocessing import Process
import asyncio
import ray
ray.init(
# Limiting the object memory store used by ray.put()
# object_store_memory=20000000000,
# Limiting the memory usage of each worker.
# _memory = (1024.0 * 3) * 0.5,
# Specifiying custom directories for temp and object spilling
_temp_dir=os.path.join("/project/bhavaraj/Anaheim/ray_tmp"),
_system_config={
"object_spilling_config": json.dumps(
{"type": "filesystem", "params": {
"directory_path": "/project/bhavaraj/Anaheim/ray_plasma"}},
)
},
logging_level=logging.DEBUG,
ignore_reinit_error=True,
num_gpus=1,
num_cpus=40,
dashboard_port=8265
)
write_lock = threading.Lock()
def cache_dataset(loc):
from datetime import datetime
params = ray.get(loc.get_config.remote())
params['out_dir'] = os.getcwd() if params['out_dir'] is None else params['out_dir']
if os.path.exists(params['out_dir']) is False:
os.mkdir(params['out_dir'])
dataset_name = datetime.now().strftime("%H:%M:%S") + \
"_{}_Cache.csv".format(id(params['out_dir']))
print("Writing to file in {}".format(params['out_dir']))
print("Acquiring Lock")
with write_lock:
print("Lock acquired ...")
ray.get(loc.get_dataset.remote()).to_csv(os.path.join(params['out_dir'], dataset_name))
print("Writing to file finished at {}".format(params['out_dir']))
R_DATA_DIR: str = '/data/intermediate/R/'
R_DATA_MAP: str = '/data/external/DataMap/R.pkl'
G_DATA_DIR: str = '/data/intermediate/G/'
G_DATA_MAP: str = 'data/external/DataMap/G.pkl'
B_DATA_DIR: str = '/data/intermediate/B/'
B_DATA_MAP: str = '/data/external/DataMap/B.pkl'
C_DATA_DIR: str = '/data/intermediate/C/'
C_DATA_MAP: str = '/data/external/DataMap/C.pkl'
Z_DATA_DIR: str = '/data/intermediate/Z/'
Z_DATA_MAP: str = '/data/external/DataMap/Z.pkl'
objs_refs = []
n = 50000
b = DatasetHelper.remote(B_DATA_DIR, B_DATA_MAP, n,"./CB")
r = DatasetHelper.remote(R_DATA_DIR, R_DATA_MAP, n, "./LR")
c = DatasetHelper.remote(C_DATA_DIR, C_DATA_MAP, n, "./CC")
g = DatasetHelper.remote(G_DATA_DIR, G_DATA_MAP, n, "./AG")
objs_refs.append(b.generate_dataset.remote())
objs_refs.append(r.generate_dataset.remote())
objs_refs.append(c.generate_dataset.remote())
objs_refs.append(r.generate_dataset.remote())
objs_refs.append(g.generate_dataset.remote())
generate_outs = ray.get([x for x in objs_refs])
print("Printing dataset generation results...")
for each in generate_outs:
print(each)
# I also tried placing these methods inside the actor but the same issue persists
cache_dataset(b)
cache_dataset(r)
cache_dataset(c)
cache_dataset(g)
I tried decorating the cache_dataset()
method with @remote
and calling the method as below,
locs = [b, r, c, g]
ray.get([cache_dataset.remote(each) for each in locs])
There are no errors with file writes but the programs pauses execution.
2021-09-20 08:32:53,024 DEBUG node.py:890 -- Process STDOUT and STDERR is being redirected to /project/bhavaraj/Anaheim/ray_tmp/session_2021-09-20_08-32-53_008570_36561/logs.
2021-09-20 08:32:53,172 DEBUG services.py:652 -- Waiting for redis server at 127.0.0.1:6379 to respond...
2021-09-20 08:32:53,334 DEBUG services.py:652 -- Waiting for redis server at 127.0.0.1:44291 to respond...
2021-09-20 08:32:53,340 DEBUG services.py:1043 -- Starting Redis shard with 10.0 GB max memory.
2021-09-20 08:33:01,212 INFO services.py:1263 -- View the Ray dashboard at http://127.0.0.1:8265
2021-09-20 08:33:01,216 DEBUG node.py:911 -- Process STDOUT and STDERR is being redirected to /project/bhavaraj/Anaheim/ray_tmp/session_2021-09-20_08-32-53_008570_36561/logs.
2021-09-20 08:33:01,221 DEBUG services.py:1788 -- Determine to start the Plasma object store with 76.48 GB memory using /dev/shm.
2021-09-20 08:33:01,314 DEBUG services.py:652 -- Waiting for redis server at 10.2.1.35:6379 to respond...
(pid=36906) Dataset shape: (100340, 41)
(pid=36913) Dataset shape: (150692, 40)
(pid=36902) Dataset shape: (103949, 41)
(pid=36910) Dataset shape: (420269, 41)
Printing dataset generation results... # prints the results correctly
Writing to file in ./CB
Acquiring Lock
Lock acquired ...
Writing to file finished at ./CB
Writing to file in ./LR
Acquiring Lock
Lock acquired ...
2021-09-20 08:43:02,612 DEBUG (unknown file):0 -- gc.collect() freed 115 refs in 0.23721289704553783 seconds
For any future readers,
modin.DataFrame.to_csv()
pauses unexplainably for unknown reasons, but modin.Dataframe.to pickle()
doesnt with the same logic.
There is also a significant performance increase in terms of read/write times, when data is stored as .pkl
files.