19

I am trying to run 2 functions doing completely independent transformations on a single RDD in parallel using PySpark. What are some methods to do the same?

def doXTransforms(sampleRDD):
    (X transforms)

def doYTransforms(sampleRDD):
    (Y Transforms)

if __name__ == "__main__":
    sc = SparkContext(appName="parallelTransforms")
    sqlContext = SQLContext(sc)
    hive_context = HiveContext(sc)

    rows_rdd = hive_context.sql("select * from tables.X_table")

    p1 = Process(target=doXTransforms , args=(rows_rdd,))
    p1.start()
    p2 = Process(target=doYTransforms, args=(rows_rdd,))  
    p2.start()
    p1.join()
    p2.join()
    sc.stop()

This does not work and I now understand this will not work. But is there any alternative way to make this work? Specifically are there any python-spark specific solutions?

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
preitam ojha
  • 239
  • 1
  • 2
  • 7
  • If each of your transformations could use (almost) 100% of the cluster resource, which is usually the case, running them in parallel actually makes it slower. – shuaiyuancn Jun 27 '16 at 09:02

1 Answers1

18

Just use threads and make sure that cluster have enough resources to process both tasks at the same time.

from threading import Thread
import time

def process(rdd, f):
    def delay(x):
        time.sleep(1)
        return f(x)
    return rdd.map(delay).sum()


rdd = sc.parallelize(range(100), int(sc.defaultParallelism / 2))

t1 = Thread(target=process, args=(rdd, lambda x: x * 2))
t2  = Thread(target=process, args=(rdd, lambda x: x + 1))
t1.start(); t2.start()

Arguably this is not that often useful in practice but otherwise should work just fine.

You can further use in-application scheduling with FAIR scheduler and scheduler pools for a better control over execution strategy.

You can also try pyspark-asyncactions (disclaimer - the author of this answer is also the author of the package) which provides a set of wrappers around Spark API and concurrent.futures:

import asyncactions
import concurrent.futures

f1 = rdd.filter(lambda x: x % 3 == 0).countAsync()
f2 = rdd.filter(lambda x: x % 11 == 0).countAsync()

[x.result() for x in concurrent.futures.as_completed([f1, f2])]
Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • 1
    Threads are not truly parallel in python because of the GIL. So if I use the above method then I will not be able to utilize multiple cores , right? – preitam ojha Jun 27 '16 at 16:54
  • 1
    It doesn't matter. The only thing that happens in this code is RPC calls. It doesn't touch actual computations. You could handle this in a single thread with async calls as well. See also [this answer](http://stackoverflow.com/a/38038346/1560062) and my comments beneath. – zero323 Jun 27 '16 at 17:14
  • This worked , thanks! Only change I had to make to make it work was to change yarn.scheduler.capacity.maximum-am-resource-percent from 0.1 to 0.5 in /etc/hadoop/conf/capacity-scheduler.xml. – preitam ojha Jun 27 '16 at 20:34