0

I need to migrate a Java program to Apache Spark. The current Java heavily utilizes the functionality provided by java.util.concurrent and runs on a single machine. Since the initialization of a worker (Callable) is expensive, the workers are reused again and again - i.e. a worker reinserts itself into the pool once it terminates and has returned its result.

More precise:

  • The current implementation works on small data sets in the range of 10E06 entries/few GBs.
  • The data contains entries that can be processed independently. That is, one could fire up one worker per task and submit it to the java thread pool.
  • However, setting up a worker for processing an entry involves loading more data in, building graphs... all together some GB AND cpu time in the range of minutes.
  • Some data can indeed be shared among the workers e.g. some look-up tables but does not need to. Some data is private to the worker and thus not shared. The worker may change the data while processing the entry and only later reset it in a fast manner, e.g. caches specific to the entry currently processed. Thus, the worker can reinsert itself in the pool and start working on the next entry without going though the expensive initialization.
  • Runtime per worker and entry is in the range of seconds.
  • The workers hand back their results via an ExecutorCompletionService, i.e. the results are later retrieved by calling pool.take().get() in a central part of the program.

Getting to know Apache Spark I find most examples just use standard transformations and actions. I also find examples that add their own functions to the DAG by extending the API. Still, those examples all stick to simple lightweight calculations and come without initialization cost.

I now wonder what is the best approach to design a Spark application that reuses some kind of "heavy worker". The executors seem to be the only persistent entities that could possibly hold a pool of such workers. However, being new to the world of Spark I most likely miss some point...

edited 20161007

Found an answer that points to a (possible) solution using Functions. So the question is, can I

  1. Split my partition according to the number of executors,
  2. Each executor gets exactly one partition to work on
  3. My Function (called setup in the linked solution) creates a thread pool and reuses the workers
  4. A separate combine function later merges the results
Community
  • 1
  • 1
Andreas
  • 127
  • 1
  • 1
  • 8
  • Can you be a bit more specific on what your current Java implementation actually does? Spark's main selling point is RDDs and the operations defined on those in the API. It sounds to me like you're trying to 'abuse' Spark to scale out an application that probably depends on shared memory etc to multiple nodes. If your application does data crunching, have you tried doing it using the Spark API? – LiMuBei Oct 07 '16 at 08:45
  • @LiMuBei Added a more detailed description of the current java implementation. I am still evaluating if that design is an 'abuse' or not. From the [Spark API](http://spark.apache.org/docs/latest/api/java/index.html) namely .java and .java.function I do see that I can add my own functions. However, I get hardly any control over how they are executed. Probably, that's the whole point of Spark - hiding that messy stuff. – Andreas Oct 07 '16 at 09:30
  • 1
    Well, sounds to me like it should be possible to do the same thing using the Spark API without too much extension. If you use the Scala API you can use Java libraries without much hassle. `mapPartitions` allows you to iterate over entries in a partition and is supposed to be used for things that have expensive setup like database connections etc. – LiMuBei Oct 07 '16 at 09:43

1 Answers1

0

Your current architecture is a monolithic, multi-threaded architecture with shared state between the threads. Given that the size of your dataset is relatively modest for modern hardware you can parallelize it quite easily with Spark, where you will replace the threads with executors in the cluster's nodes.

From your question I understand that your two main concerns is whether Spark can handle complex parallel computations and how to share the necessary bits of state in a distributed environment.

Complicated business logic: Regarding the first part, you can run arbitrarily complicated business logic in the Spark Executors, which are the equivalent of the worker threads in your current architecture.

This blog post from cloudera explains well the concept along with other important concepts of the execution model:

http://blog.cloudera.com/blog/2014/05/apache-spark-resource-management-and-yarn-app-models/

One aspect you will need to pay attention to it though, is the configuration of your Spark job, in order to avoid timeouts due to Executors taking too long to finish, which may be expected for an application with complicated business logic like yours.

Refer to the excellent page from DataBricks for more details, and more specifically to the execution behavior:

http://spark.apache.org/docs/latest/configuration.html#execution-behavior

Shared state: You can share complicated data structures like graphs and application configuration in Spark among the nodes. One approach which works well is Broadcast Variables, where a copy of the state to be distributed is distributed to every node. Below are some very nice explanations of the concept:

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-broadcast.html

http://g-chi.github.io/2015/10/21/Spark-why-use-broadcast-variables/

This will shave the latency from your application, while ensuring data locality.

The processing of your data can be performed on a partition based (more here: https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-partitions.html), with the results aggregated on the driver or with the use of Accumulators (more here: https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-accumulators.html). In case the resulted data are complicated, the partition approach may work better and also gives you more fine grained control over your applications execution.

Regarding the hardware resource requirements, it seems that your application needs a few Gigabytes for the shared state, which will need to stay in memory and additionally a few more Gigabytes for the data in every node. You can set the persistence model to MEMORY_AND_DISK in order to ensure that you wont run out of memory, more details at

http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence