2

I am working on code which uses executor service to parallelize tasks (think machine learning computations done over small dataset over and over again). My goal is to execute some code as fast as possible, multiple times and store the result somewhere (total executions will be on the order of 100M runs atleast).

The logic looks something like this (its a simplified example):

dbconn = new dbconn() //This is reused by all threads
for a in listOfSize1000:
   for b in listofSize10:
      for c in listOfSize2:
         taskcompletionexecutorservice.submit(new runner(a, b, c, dbconn))

At the end, taskcompletionexecutorservice.take() is called and I store the Result from "Future" in a db. But this approach is not really scaling after a point.

So this is what I am doing right now in spark (which is a brutal hack, but I am looking for suggestions on how to best structure this):

sparkContext.parallelize(listOfSize1000).filter(a -> {
   dbconn = new dbconn() //Cannot init it outsize parallelize since its not serializable
   for b in listofSize10:
      for c in listOfSize2:
         Result r = new runner(a, b, c. dbconn))
         dbconn.store(r)

    return true //It serves no purpose.
}).count();

This approach looks inefficient to me since its not truly parallelizing on the smallest unit of job, although this job works alright. Also count is not really doing anything for for me, i added it to trigger the execution. It was inspired by computing the pi example here: http://spark.apache.org/examples.html

So any suggestions of how can I better structure my spark runner so that I can efficiently use spark executors?

zengr
  • 38,346
  • 37
  • 130
  • 192

2 Answers2

1

So there are a few things we can do to make this code more Spark like. The first is you are using a filter and count, but really using the results of either. The function foreach is probably closer to what you want.

That being said you are creating a DB connection to store the result, and we can look at doing this in a few ways. One is: if the DB is really what you want to use for storage, you could use foreachPartition OR mapPartitionsWithIndex to create only one connection per partition and then do a count() (which I know is a bit ugly but foreachWith is deprecated as of 1.0.0). You could also just do a simple map and then save your results to on of the many supported output formats (e.g. saveAsSequenceFile).

Holden
  • 7,392
  • 1
  • 27
  • 33
  • Thanks Holden! I am passing `dbconn` to `runner` too where my actual logic hits db and memoizes some data in memory (in a jvm). It has some legacy code and I will need to do some substantial refactoring to remove that dependency and create an RDD rather than hitting db from runner. – zengr Aug 28 '15 at 20:05
  • Ok, then doing the `mapPartitionsWithIndex` to allow you to re-use dbconn for all of the elements in each partition is likely the best approach. – Holden Aug 28 '15 at 20:21
  • Yup, just tried it. `mapPartitionsWithIndex` says its missing a return statement (needs a RDD iterator). But I don't really have anything to return, how can I get around it? – zengr Aug 28 '15 at 20:22
  • Ah yes, simply return an empty iterator. – Holden Aug 28 '15 at 20:23
  • Not sure if I am missing out on something basic here: http://markdownshare.com/view/c52a7ffb-27ef-44aa-978f-f4822ec876a3 – zengr Aug 28 '15 at 20:53
  • Ah your return iterator needs to have a type (you've encountered a weird corner case), just make sure you have some type information with the iterator. – Holden Aug 28 '15 at 21:33
  • Hi Holden, what do you mean by do a `count()` after `foreachPartition`? `foreachPartition` is is main loop which processes my actual logic, how can I do a count (or map) after that? – zengr Sep 03 '15 at 23:45
  • I mean you can either do mappartitions along with a count OR do foreachpartition. – Holden Sep 03 '15 at 23:46
  • Got it. The rewrite to use `foreachPartition` is really behaving abnormally for me. The executors are dying after a while - probably because of memory and io bottleneck. i think I will endup refactoring this code to play well with spark. The job was never meant to deal with millions of runs. – zengr Sep 03 '15 at 23:50
  • Posted an extension of this question: http://stackoverflow.com/questions/32512392/efficiently-load-data-from-mongo-and-postgres-inside-spark-for-lookup – zengr Sep 10 '15 at 22:06
1

You may try another approach to parallelize it even better, though with the price. The code is in scala, but there is cartesian method for python. For simplicity, my lists contain integers.

val rdd1000 = sc.parallelize(list1000)
val rdd10 = sc.parallelize(list10)
val rdd2 = sc.parallelize(list2)

rdd1000.cartesian(rdd10).cartesian(rdd2)
    .foreachPartition((tuples: Iterator[Tuple2[Tuple2[Int, Int], Int]]) => {
        dbconn =...
        for (tuple <- tuples) {
            val a = tuple._1._1
            val b = tuple._1._2
            val c = tuple._2

            val r = new Result(a, b, c, dbconn)
            dbconn.store(r)
        }
    })

Filter in your case is a transformation, which is lazy - spark doesn't evaluate it at the call. The process starts only when an action is called. Collect is an action and it starts actual processing in your example. ForeachPartition is also an action and spark starts it right away. ForeachPartition is required here, because it allows to open connection once for the whole partition of the data.

Probably the bad thing with cartesian is that it might imply a shuffle over the cluster, so if you have complex objects, that might hurt performance. That might happen if you're going to read data from external sources. In case you're going to use parallelize, that's fine.

One more thing to be aware is that depending on size of your cluster spark might create pretty high pressure on database you use.

evgenii
  • 1,190
  • 1
  • 8
  • 21
  • This is case, will be `dbconn` init happen for every single run? – zengr Aug 28 '15 at 21:30
  • One downside is cartesian is an expensive operation (it results in a shuffle). – Holden Aug 28 '15 at 21:34
  • @Holden, yeah, it might. I mentioned that. On the other hand, I printed debugString on the result rdd and it shows there is no shuffles. I assume it's because everything is local, although I specified spark master as local[8]. So, I'd say this approach is good as it uses standard API and it's always possible to look at explain. – evgenii Aug 28 '15 at 21:38
  • @zengr, dbconn init happen just once for the whole Iterator. – evgenii Aug 28 '15 at 21:39
  • Alright it works but I am curious now, how is creating an RDD for all lists different from looping over list10, list2 etc? Does spark handle task distribution more efficiently? How is this impl different from @Hoden's implementation in terms of performance? – zengr Aug 28 '15 at 23:10
  • @zengr I'd say the data is spread more evenly across the cluster. The code looks cleaner and simpler. That's an API of the system, it makes sense to use it and rely on the efficiency of internals. It also looks like looping over inside the closure is basically the same issue you started with. On the other hand it might lead to a shuffle (subject to investigate) and thus might hurt performance. There is a counterintuitive idea with some research that shows disk and network performance are almost irrelevant as oppose to CPU: http://radar.oreilly.com/2015/04/investigating-sparks-performance.html – evgenii Aug 28 '15 at 23:31
  • Thanks. My initial objective was: the code I need to run will take 3 yrs to compete if I run it on a single jvm on the biggest machine on aws (36 core ~ 40-50 threads). So I need parallelize the computation over multiple nodes and these even more tasks per nodes. I need to complete that processing in less than 24hrs – zengr Aug 28 '15 at 23:34
  • Also, currently I am doing `repartition(list1000.size())` on my RDD so that it tries to maximize the parallel runs. But thats again looks like a hack to me. – zengr Aug 28 '15 at 23:52
  • @zengr, repartition is standard way to get better parallelizm – evgenii Aug 29 '15 at 12:12
  • I don't see efficient utilization of the spark cluster, only 10% of the allocated CPU is actually being used in both the approached. I am trying out a combination of executorservice with RDD/partitions. Will keep you guys posted. – zengr Aug 31 '15 at 23:19
  • Posted an extension of this question: http://stackoverflow.com/questions/32512392/efficiently-load-data-from-mongo-and-postgres-inside-spark-for-lookup – zengr Sep 10 '15 at 22:06
  • Also, the `cartesian` resulted in a total partition size of 256 for me, which tried to create 256 db connections in one go. Didn't work. – zengr Sep 10 '15 at 22:07
  • @zengr, the simplest (and arguable) approach might be to have a sleep with random number of millis to separate db connections, although it might not be linear scalable. – evgenii Sep 11 '15 at 21:02