3
import ray
import numpy as np



ray.init()

@ray.remote
def f():
  return np.zeros(10000000)

results = []
for i in range(100):
  print(i)
  results += ray.get([f.remote() for _ in range(50)])

Normally, when the object store fills up, it begins evicting objects that are not in use (in a least-recently used fashion). However, because all of the objects are numpy arrays that are being held in the results list, they are all still in use, and the memory that those numpy arrays live in is actually in the object store, so they are taking up space in the object store. The object store can't evict them until those objects go out of scope.

Question: How can I specify an external object store like redis without exceeding memory on single machine? I don't want to use /dev/shm or /tmp as object store as only limited memory is available and it quickly fills up

testgauss321
  • 77
  • 1
  • 5
  • Probably not the best solution, but if you copy the numpy arrays after calling `ray.get` and before appending them to `results`, they won't be in the object store anymore. For example, `results += [x.copy() for x in ray.get([f.remote() for _ in range(50)])]`. – Robert Nishihara Feb 28 '21 at 00:41
  • Of course, the numpy arrays will still be in memory in the Python process, so at some point you will run out of memory. – Robert Nishihara Feb 28 '21 at 00:43
  • Consider reposting on https://discuss.ray.io/. – Robert Nishihara Feb 28 '21 at 00:43

1 Answers1

3

As of ray 1.2.0, the object spilling to support out-of-core data processing is supported. Fro 1.3+ (which will be released in 3 weeks), this feature will be turned on by default.

https://docs.ray.io/en/latest/ray-core/objects/object-spilling.html

But your example won't work with this feature. Let me explain why here.

There are two things you need to know.

  1. When you call ray task (f.remote) or ray.put, it returns an object reference. Try
ref = f.remote()
print(ref)
  1. When you run ray.get on this reference, then now the python variable accesses to the memory directly (in Ray, it will be in shared memory, which is managed by a distributed object store of ray called plasma store if your object size is >= 100KB). So,
obj = ray.get(ref) # Now, obj is pointing to the shared memory directly.

Currently, the object spilling feature support disk spilling for the 1 case, but not for 2 (2 is much trickier to support if you imagine).

So there are 2 solutions here;

  1. Use a file directory for your plasma store. For example, start ray with
ray.init(_plasma_directory="/tmp")

This will allow you to use tmp folder as a plasma store (meaning ray objects are stored in the tmp file system). Note you can possibly see the performance degradation when you use this option.

  1. Use the object spilling with backpressure. Instead of getting all of ray objects using ray.get, use ray.wait.
import ray
import numpy as np

# Note: You don't need to specify this if you use the latest master.
ray.init(
    _system_config={
        "automatic_object_spilling_enabled": True,
        "object_spilling_config": json.dumps(
            {"type": "filesystem", "params": {"directory_path": "/tmp/spill"}},
        )
    },
)

@ray.remote
def f():
  return np.zeros(10000000)

result_refs = []
for i in range(100):
  print(i)
  result_refs += [f.remote() for _ in range(50)]

while result_refs:
    [ready], result_refs = ray.wait(result_refs)
    result  = ray.get(ready)
Nickofthyme
  • 3,032
  • 23
  • 40
Sang
  • 885
  • 5
  • 4