I was trying to deploy a trained Faiss index to PySpark and do a distributed search. So the whole process includes:
- Pre-process
- Load Faiss Index(~15G) and do Faiss Search
- Post-process and write to HDFS
I set CPUs per task as 10 (spark.task.cpus=10
) in order to do multi-thread search. But step 1 and step 3 can only utilize 1 CPU per task. In order to utilize all CPUs I want to set spark.task.cpus=1
before step 1 and 3. I have tried set method of RuntimeConfig
but it seems it made my program stuck. Any advice on how to change config at runtime or how to optimize this problem?
Code example:
def load_and_search(x, model_path):
faiss_idx = faiss.read_index(model_path)
q_vec = np.concatenate(x)
_, idx_array = faiss_idx.search(q_vec, k=10)
return idx_array
data = sc.textFile(input_path)
# preprocess, only used one cpu per task
data = data.map(lambda x: x)
# load faiss index and search, used multiple cpus per task
data = data.mapPartitioins(lambda x: load_and_search(x, model_path))
# postprocess and write, one cpu per task
data = data.map(lambda x: x).saveAsTextFile(result_path)