I am working on a pipeline that includes obtaining predictions from a machine learning model and I'm trying to use ray to speed it up. The inputs can get repetitive so I'd like to share a cache in the function such that all the workers of this remote function can a share access to the cache and search it and obtain values. Something like the below
@ray.remote
def f(x):
# create inputs from x
# do work
unknown_y1 = []
obtained_y1 = []
for index, y in enumerate(y1):
key = '|'.join([str(x) for x in y.values()])
if key in cached:
obtained_y1.append(cached[key])
else:
obtained_y1.append(np.inf)
unknown_y1.append(promo)
unknown_y2 = []
obtained_y2 = []
for index, y in enumerate(y2):
key = '|'.join([str(x) for x in y.values()])
if key in cached:
obtained_y2.append(cached[key])
else:
obtained_y2.append(np.inf)
unknown_y2.append(baseline)
known_y1, known_y2 = predictor.predict(unknown_y1,unknown_y2)
unknown_index = 0
for index in range(len(y1)):
if(obtained_y1[index] == np.inf):
obtained_y1[index] = known_y1[unknown_index]
key = '|'.join([str(x) for x in y1[index].values()])
if not(key in cached):
cached[key] = obtained_y1[index]
unknown_index = unknown_index+1
unknown_index = 0
for index in range(len(y2)):
if(obtained_y2[index] == np.inf):
obtained_y2[index] = known_y2[unknown_index]
key = '|'.join([str(x) for x in y2[index].values()])
if not(key in cached):
cached[key] = obtained_y2[index]
unknown_index = unknown_index+1
I've tried creating a global dictionary by adding global cached;cached=dict()
at the top of my script but it seems like that variable is a different version across workers and does not share the data. Previously I was doing this with dogpile.cache.redis
but the region will not be serializable since it uses a thread lock. I've also tried creating a dict and putting it in ray's object store using ray.put(cached)
but I think I read somewhere that ray cannot share dictionaries in memory
I am currently trying to return the cache from each worker and merge them in the main and then put them in object store again. Is there a better way of sharing a dictionary/cache between ray workers?