1

I have the following situation with my Pyspark:

In my driver program (driver.py), I call a function from another file (prod.py)

latest_prods = prod.featurize_prods(). 

Driver code:

from Featurize import Featurize
from LatestProd import LatestProd
from Oldprod import Oldprod

sc = SparkContext()

if __name__ == '__main__':
    print 'Into main'

featurize_latest = Featurize('param1', 'param2', sc)

latest_prod = LatestProd(featurize_latest)
latest_prods = latest_prod.featurize_prods()
featurize_old = Featurize('param3', 'param3', sc)

old_prods = Oldprod(featurize_old)
old_prods = oldprod.featurize_oldprods()
total_prods =  sc.union([latest_prods, old_prods])

Then I do some some reduceByKey code here... that generates total_prods_processed.

Finally I call:

total_prods_processed.saveAsTextFile(...)

I would like to generate latest_prods and old_prods in parallel. Both are created in the same SparkContext. Is it possible to do that? If not, how can I achieve that functionality?

Is this something that does Spark automatically? I am not seeing this behavior when I run the code so please let me know if it is a configuration option.

zero323
  • 322,348
  • 103
  • 959
  • 935
user3803714
  • 5,269
  • 10
  • 42
  • 61

2 Answers2

1

After searching on the internet, I think your problem can be addressed by threads. It is as simple as create two threads for your old_prod and latest_prod work.

Check this post for a simplified example. Since Spark is thread-safe, you gain the parallel efficiency without sacrifice anything.

alxiaaaa
  • 11
  • 1
-1

The short answer is no, you can't schedule operations on two distinct RDDs at the same time in the same spark context. However there are some workarounds, you could process them in two distinct SparkContext on the same cluster and call SaveAsTextFile. Then read both in another job to perform the union. (this is not recommended by the documentation). If you want to try this method, it is discussed here using spark-jobserver since spark doesn't support multiple context by default : https://github.com/spark-jobserver/spark-jobserver/issues/147

However according to the operations you perform there would be no reason to process both at the same time since you need the full results to perform the union, spark will split those operations in 2 different stages that will be executed one after the other.

Paul K.
  • 796
  • 2
  • 7
  • 20
  • Each of them takes around 8-10 minutes. If I can process them at the same time, then I can get to the union code in 8-10 minutes rather than ~20 minutes (done in a serial manner). – user3803714 Nov 27 '15 at 19:26
  • Spark tries to optimize the use of cpu and memory, thus computing more operations at the same time would make them slower. If you want the processing to be faster, you would rather scale your cluster (upgrade the worker machines to higher number of cpus or add more workers) – Paul K. Nov 27 '15 at 19:30
  • Also a key reason is to avoid disk I/O. I want to avoid writing SaveAsTextFile multiple times. Since I have around 10 such RDDS that need to be processed first and then need to create a union. Here for simplicity, I have just shown 2. Each of these RDDs when processed generated large dataset and having to write them and then read again is not efficient. My cluster has 20 nodes with few hundred cores. – user3803714 Nov 27 '15 at 19:40
  • 1
    It is not true. You can schedule and execute multiple jobs at the same time using a single context. Usually not the best idea though. – zero323 Nov 27 '15 at 19:43
  • Having multiple large RDDs would be a sufficient reason to write to disk since you can't cache everything in your clusters memory. Sometimes you can't avoid disk I/O because you have limited ram available on your cluster compared to the dataset size. – Paul K. Nov 27 '15 at 19:44
  • @zero323, you can use threadpools but Spark already computes tasks simultaneously, there would be no tangible reason to do this in this kind of processing. If you want to look at it however there is an example here : http://stackoverflow.com/questions/30214474/how-to-run-multiple-jobs-in-one-sparkcontext-from-separate-threads-in-pyspark – Paul K. Nov 27 '15 at 19:48
  • The sum of memory of all the worker nodes is much larger than the dataset so I am very much inclined toward avoiding costly disk i/o if at all possible. – user3803714 Nov 27 '15 at 19:49
  • It seems clear that spark would process the same amount of tasks at the same time on the cluster since it is already using the maximum of the ressources you have. So I think you have the best possible option already, but I could be wrong. You can still search for other bottlenecks in your spark program to make it faster. – Paul K. Nov 27 '15 at 19:55
  • @PaulK I will argue there are scenarios where parallel or synchronous job submitting is useful. Executing multiple small (relatively to the amount of resources) actions at the same time. – zero323 Nov 27 '15 at 20:05
  • @zero323, any pointers to how I can do it for my application? – user3803714 Nov 27 '15 at 20:15
  • @user3803714 Sure, but you'll have to be more precise about what exactly is going on in your code. – zero323 Nov 27 '15 at 22:31
  • In essence, I want to have a driver program where I create SparkContext and then fan out and generate 10 RDDs concurrently on my cluster (fan out) and then there is union operation that depends on all those RDDs being computed. I can sort of trivially do that by running 10 independent pyspark scripts in 10 contexts and then each script at the end writes to a file. Then have another script that reads these 10 files and then calls union. Drawback is more disk i/o. I want to use the same context create them in parallel and then call union in my driver program. Rrecommended way for doing this? – user3803714 Nov 28 '15 at 00:59
  • If all you need is process and union then @PaulK. is right - it is already done in parallel. If think you that resources are not utilized optimally you can reduce parallelism on a RDD level. – zero323 Nov 28 '15 at 04:12