import ray
import numpy as np
ray.init()
@ray.remote
def f():
return np.zeros(10000000)
results = []
for i in range(100):
print(i)
results += ray.get([f.remote() for _ in range(50)])
Normally, when the object store fills up, it begins evicting objects that are not in use (in a least-recently used fashion). However, because all of the objects are numpy arrays that are being held in the results list, they are all still in use, and the memory that those numpy arrays live in is actually in the object store, so they are taking up space in the object store. The object store can't evict them until those objects go out of scope.
Question: How can I specify an external object store like redis without exceeding memory on single machine? I don't want to use /dev/shm or /tmp as object store as only limited memory is available and it quickly fills up
As of ray 1.2.0, the object spilling to support out-of-core data processing is supported. Fro 1.3+ (which will be released in 3 weeks), this feature will be turned on by default.
https://docs.ray.io/en/latest/ray-core/objects/object-spilling.html
But your example won't work with this feature. Let me explain why here.
There are two things you need to know.
ref = f.remote()
print(ref)
ray.get
on this reference, then now the python variable accesses to the memory directly (in Ray, it will be in shared memory, which is managed by a distributed object store of ray called plasma store if your object size is >= 100KB). So,obj = ray.get(ref) # Now, obj is pointing to the shared memory directly.
Currently, the object spilling feature support disk spilling for the 1 case, but not for 2 (2 is much trickier to support if you imagine).
So there are 2 solutions here;
ray.init(_plasma_directory="/tmp")
This will allow you to use tmp
folder as a plasma store (meaning ray objects are stored in the tmp file system). Note you can possibly see the performance degradation when you use this option.
ray.get
, use ray.wait
.import ray
import numpy as np
# Note: You don't need to specify this if you use the latest master.
ray.init(
_system_config={
"automatic_object_spilling_enabled": True,
"object_spilling_config": json.dumps(
{"type": "filesystem", "params": {"directory_path": "/tmp/spill"}},
)
},
)
@ray.remote
def f():
return np.zeros(10000000)
result_refs = []
for i in range(100):
print(i)
result_refs += [f.remote() for _ in range(50)]
while result_refs:
[ready], result_refs = ray.wait(result_refs)
result = ray.get(ready)