pythonmultiprocessingray

Ray: is it possible to use 'ray.put' on a kwargs argument?


I have a function that has accepts kwargs that I would like to use to parallel process requests. I am wondering if there is a simple way of handling kwargs without having to iterate over there and 'put' them in the shared/centralized store.

Something like this:

import ray
ray.init(ignore_reinit_error=True)

def get_data_multip(code=['AB', 'CD', 'ED'], **kwargs):
    kwargs_ref = ray.put(**kwargs)
    results = []
    for s in code:
        func_ref = ray.remote(get_data)
        res = func_ref.remote(code = s,**kwargs)
        results.append(res)
    
    rets = ray.get(results)
    print(rets)

def get_data(code, st_dt = None, end_dt = None):
    print(f'st_dt:{st_dt}, end_dt:{end_dt}')
    return [1,2,3,4]

get_data_multip(code = ['AB', 'CD', 'ED'], st_dt = '01/01/2020', end_dt = '12/31/2020')

The above will fail on the 'put' line with error:

put() got an unexpected keyword argument 'st_dt'

UPDATE

I thought I had a solution here - but this is simply passing a dictionary as the second parameter, not a kwargs.

def get_data_multip(code=['AB', 'CD', 'ED'], **kwargs):
    kwargs_ref = ray.put(vars()['kwargs'])

    results = []
    for s in code:
        func = ray.remote(get_data)
        res = func.remote(s, kwargs_ref)
        results.append(res)
    
    rets = ray.get(results)
    print(rets)

Solution

  • well, kwargs is a dictionary, so instead of passing it entirely to put, you can pass its elements, and create a new dictionary whose values are the references, this way when you call func_ref.remote(code = s,**kwargs_ref) then python will unpack the references into the respective arguments, and you will simply be unpacking a normal python dictionary.

    import ray
    
    ray.init(ignore_reinit_error=True)
    
    
    def get_data_multip(code=['AB', 'CD', 'ED'], **kwargs):
        kwargs_ref = {key:ray.put(kwargs[key]) for key in kwargs}
        results = []
        for s in code:
            func_ref = ray.remote(get_data)
            res = func_ref.remote(code=s, **kwargs_ref)
            results.append(res)
    
        rets = ray.get(results)
        print(rets)
    
    
    def get_data(code, st_dt=None, end_dt=None):
        print(f'st_dt:{st_dt}, end_dt:{end_dt}')
        return [1, 2, 3, 4]
    
    
    get_data_multip(code=['AB', 'CD', 'ED'], st_dt='01/01/2020', end_dt='12/31/2020')