4

I was trying to deploy a trained Faiss index to PySpark and do a distributed search. So the whole process includes:

  1. Pre-process
  2. Load Faiss Index(~15G) and do Faiss Search
  3. Post-process and write to HDFS

I set CPUs per task as 10 (spark.task.cpus=10) in order to do multi-thread search. But step 1 and step 3 can only utilize 1 CPU per task. In order to utilize all CPUs I want to set spark.task.cpus=1 before step 1 and 3. I have tried set method of RuntimeConfig but it seems it made my program stuck. Any advice on how to change config at runtime or how to optimize this problem?

Code example:

def load_and_search(x, model_path):
    faiss_idx = faiss.read_index(model_path)
    q_vec = np.concatenate(x)
    _, idx_array = faiss_idx.search(q_vec, k=10)
    return idx_array


data = sc.textFile(input_path)

# preprocess, only used one cpu per task
data = data.map(lambda x: x)

# load faiss index and search, used multiple cpus per task
data = data.mapPartitioins(lambda x: load_and_search(x, model_path))

# postprocess and write, one cpu per task
data = data.map(lambda x: x).saveAsTextFile(result_path)
mkrieger1
  • 19,194
  • 5
  • 54
  • 65
Jay Yip
  • 119
  • 7

2 Answers2

1

Alternative idea: use mapPartitions for steps 1 and 3. Then, use a multiprocessing pool within each worker to map the items in the partition in parallel. This way, you can use all cpus assigned to a worker without changing configuration (which I do not know if it is at all possible).

Pseudocode:

def item_mapper(item):
    return ...

def partition_mapper(partition):
    with mp.Pool(processes=10) as pool:
        yield from pool.imap(item_mapper, partition)

rdd.mapPartitions(partition_mapper)
BlackBear
  • 22,411
  • 10
  • 48
  • 86
1

Well you can change the sparkContext properties in the following ways:

conf = sc._conf.setAll([('spark.task.cpus','1')])
sc._conf.getAll()
data = data.map(lambda x: x)

conf = sc._conf.setAll([('spark.task.cpus','10')])
sc._conf.getAll()
# load faiss index and search, used multiple cpus per task
data = data.mapPartitioins(lambda x: load_and_search(x, model_path))

conf = sc._conf.setAll([('spark.task.cpus','1')])
sc._conf.getAll()
# postprocess and write, one cpu per task
data = data.map(lambda x: x).saveAsTextFile(result_path)

getAll() can be removed, added just to check the current configuration.

Shubham Jain
  • 5,327
  • 2
  • 15
  • 38