28

It is understood from Spark documentation about Scheduling Within an Application:

Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action. Spark’s scheduler is fully thread-safe and supports this use case to enable applications that serve multiple requests (e.g. queries for multiple users)."

I could found few example code of the same in Scala and Java. Can somebody give an example of how this can be implemented using PySpark?

Jonathan Leffler
  • 730,956
  • 141
  • 904
  • 1,278
Meethu Mathew
  • 431
  • 1
  • 6
  • 15
  • Ever get an answer here? I'm trying to do the same thing and thinking it's actually impossible until better locking is added to `SparkContext`s. – Mike Sukmanowsky Aug 20 '15 at 13:50
  • @MikeSukmanowsky what do you mean ? This piece of doc does not talk about a specific Spark API, it just seems to work for all of them. The actual code which runs when using any of the APIs is the Scala code, and some interface code for Java and Python. – Dici Sep 18 '15 at 13:25
  • Can you provide the link of where this statement came from? – Jon Apr 25 '17 at 18:56

3 Answers3

25

I was running into the same issue, so I created a tiny self-contained example. I create multiple threads using python's threading module and submit multiple spark jobs simultaneously.

Note that by default, spark will run the jobs in First-In First-Out (FIFO): http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application. In the example below, I change it to FAIR scheduling

# Prereqs:
# set 
# spark.dynamicAllocation.enabled         true
# spark.shuffle.service.enabled           true
  spark.scheduler.mode                    FAIR
# in spark-defaults.conf

import threading
from pyspark import SparkContext, SparkConf

def task(sc, i):
  print sc.parallelize(range(i*10000)).count()

def run_multiple_jobs():
  conf = SparkConf().setMaster('local[*]').setAppName('appname')
  # Set scheduler to FAIR: http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application
  conf.set('spark.scheduler.mode', 'FAIR')
  sc = SparkContext(conf=conf)
  for i in range(4):
    t = threading.Thread(target=task, args=(sc, i))
    t.start()
    print 'spark task', i, 'has started'


run_multiple_jobs()

Output:

spark task 0 has started
spark task 1 has started
spark task 2 has started
spark task 3 has started
30000
0 
10000
20000
Jay Prajapati
  • 362
  • 4
  • 26
sparknoob
  • 1,266
  • 14
  • 15
  • 1
    any idea is this is the best way to do it? Especially if you are on a cluster. Since the sc remains on master and the master distributes it on worker nodes, I was thinking if this is the best way to do it. – nEO Oct 20 '17 at 05:58
  • There are some nice notes [here](https://www.shanelynn.ie/using-python-threading-for-multiple-results-queue/) on using the threading library, especially for doing things like returning the results of the threaded computations. – Shane Halloran Nov 27 '17 at 00:30
  • Spark 3.1+ users should consider [InheritableThread](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.InheritableThread.html) – Danten Aug 19 '22 at 14:35
8

Today, I was asking me the same. The multiprocessing module offers a ThreadPool, which is spawning a few threads for you and hence runs the jobs in parallel. First instantiate the functions, then create the Pool, and then map it over the range you want to iterate.

In my case, I was calculating these WSSSE numbers for different numbers of centers (hyperparameter tuning) to get a "good" k-means clustering ... just like it is outlined in the MLSpark documentation. Without further explanations, here are some cells from my IPython worksheet:

from pyspark.mllib.clustering import KMeans
import numpy as np

c_points are 12dim arrays:

>>> c_points.cache()
>>> c_points.take(3)
[array([ 1, -1,  0,  1,  0,  0,  0,  0,  0,  0,  0,  0]),
array([-2,  0,  0,  1,  0,  0,  0,  0,  0,  0,  0,  0]),
array([ 7, -1,  1,  0,  0,  0,  0,  0,  0,  0,  0,  0])]

In the following, for each i I'm computing this WSSSE value and returning it as a tuple:

def error(point, clusters):
    center = clusters.centers[clusters.predict(point)]
    return np.linalg.norm(point - center)

def calc_wssse(i):
    clusters = KMeans.train(c_points, i, maxIterations=20,
        runs=20, initializationMode="random")
    WSSSE = c_points\
        .map(lambda point: error(point, clusters))\
        .reduce(lambda x, y: x + y)
    return (i, WSSSE)

Here starts the interesting part:

from multiprocessing.pool import ThreadPool
tpool = ThreadPool(processes=4)

Run it:

wssse_points = tpool.map(calc_wssse, range(1, 30))
wssse_points

gives:

[(1, 195318509740785.66),
 (2, 77539612257334.33),
 (3, 78254073754531.1),
 ...
]
Harald Schilly
  • 1,098
  • 1
  • 14
  • 15
  • ... and out of curiosity, I did benchmark it via `%timeit`. the serial execution (with a "pulsating" behavior) took 53.2 secs, while the parallelized approach with 4 threads finished in 16.2secs. So, there is really a difference. More active stages in parallel and always some in the queue. – Harald Schilly Aug 29 '15 at 12:14
  • Doesn't this raise the possibility of Race Conditions? – Jon Apr 25 '17 at 18:55
  • It probably depends on the data that you manipulate with. Since you manage the threading, you have to make sure, that you do not raise Race Conditions yourself. – Minutis Apr 27 '17 at 08:15
  • @HaraldSchilly Dont you have to do `tpool.close()` and `tpool.join()`? – thentangler Mar 12 '21 at 03:09
0

ThreadPool is convenient but it could cause unexpected behaviors. For example, if all the threads are writing dataframes to the same location using the overwrite mode, whether the threads "overwrite" each other's files depends on the timing. It's very much like "first come first serve".

Normally all threads get evaluated/materialized at the same time, so this location will end up with all files from all threads (as if they were all in the append mode). But if there are delays in some threads, they can certainly overwrite other threads' files.

Zach
  • 862
  • 11
  • 10