As of ray 1.2.0, the object spilling to support out-of-core data processing is supported. Fro 1.3+ (which will be released in 3 weeks), this feature will be turned on by default.
https://docs.ray.io/en/latest/ray-core/objects/object-spilling.html
But your example won't work with this feature. Let me explain why here.
There are two things you need to know.
- When you call ray task (f.remote) or ray.put, it returns an object reference. Try
ref = f.remote()
print(ref)
- When you run
ray.get
on this reference, then now the python variable accesses to the memory directly (in Ray, it will be in shared memory, which is managed by a distributed object store of ray called plasma store if your object size is >= 100KB). So,
obj = ray.get(ref) # Now, obj is pointing to the shared memory directly.
Currently, the object spilling feature support disk spilling for the 1 case, but not for 2 (2 is much trickier to support if you imagine).
So there are 2 solutions here;
- Use a file directory for your plasma store. For example, start ray with
ray.init(_plasma_directory="/tmp")
This will allow you to use tmp
folder as a plasma store (meaning ray objects are stored in the tmp file system). Note you can possibly see the performance degradation when you use this option.
- Use the object spilling with backpressure. Instead of getting all of ray objects using
ray.get
, use ray.wait
.
import ray
import numpy as np
# Note: You don't need to specify this if you use the latest master.
ray.init(
_system_config={
"automatic_object_spilling_enabled": True,
"object_spilling_config": json.dumps(
{"type": "filesystem", "params": {"directory_path": "/tmp/spill"}},
)
},
)
@ray.remote
def f():
return np.zeros(10000000)
result_refs = []
for i in range(100):
print(i)
result_refs += [f.remote() for _ in range(50)]
while result_refs:
[ready], result_refs = ray.wait(result_refs)
result = ray.get(ready)