pythonraymodin

Ray object store running out of memory using out of core. How can I configure an external object store like s3 bucket?


import ray
import numpy as np



ray.init()

@ray.remote
def f():
  return np.zeros(10000000)

results = []
for i in range(100):
  print(i)
  results += ray.get([f.remote() for _ in range(50)])

Normally, when the object store fills up, it begins evicting objects that are not in use (in a least-recently used fashion). However, because all of the objects are numpy arrays that are being held in the results list, they are all still in use, and the memory that those numpy arrays live in is actually in the object store, so they are taking up space in the object store. The object store can't evict them until those objects go out of scope.

Question: How can I specify an external object store like redis without exceeding memory on single machine? I don't want to use /dev/shm or /tmp as object store as only limited memory is available and it quickly fills up


Solution

  • As of ray 1.2.0, the object spilling to support out-of-core data processing is supported. Fro 1.3+ (which will be released in 3 weeks), this feature will be turned on by default.

    https://docs.ray.io/en/latest/ray-core/objects/object-spilling.html

    But your example won't work with this feature. Let me explain why here.

    There are two things you need to know.

    1. When you call ray task (f.remote) or ray.put, it returns an object reference. Try
    ref = f.remote()
    print(ref)
    
    1. When you run ray.get on this reference, then now the python variable accesses to the memory directly (in Ray, it will be in shared memory, which is managed by a distributed object store of ray called plasma store if your object size is >= 100KB). So,
    obj = ray.get(ref) # Now, obj is pointing to the shared memory directly.
    

    Currently, the object spilling feature support disk spilling for the 1 case, but not for 2 (2 is much trickier to support if you imagine).

    So there are 2 solutions here;

    1. Use a file directory for your plasma store. For example, start ray with
    ray.init(_plasma_directory="/tmp")
    

    This will allow you to use tmp folder as a plasma store (meaning ray objects are stored in the tmp file system). Note you can possibly see the performance degradation when you use this option.

    1. Use the object spilling with backpressure. Instead of getting all of ray objects using ray.get, use ray.wait.
    import ray
    import numpy as np
    
    # Note: You don't need to specify this if you use the latest master.
    ray.init(
        _system_config={
            "automatic_object_spilling_enabled": True,
            "object_spilling_config": json.dumps(
                {"type": "filesystem", "params": {"directory_path": "/tmp/spill"}},
            )
        },
    )
    
    @ray.remote
    def f():
      return np.zeros(10000000)
    
    result_refs = []
    for i in range(100):
      print(i)
      result_refs += [f.remote() for _ in range(50)]
    
    while result_refs:
        [ready], result_refs = ray.wait(result_refs)
        result  = ray.get(ready)