5

I know that in Spark I can split my computation by using multiple partitions. If say I can split my input RDD into 1000 partitions and the number of my machines is 100, Spark will split the computation into 1000 tasks and dynamically allocate them into my 100 machines in some smart way.

Now suppose I can initially split my data into only 2 partitions, but I still have 100 machines. Naturally, my 98 machines will be idle. But as I am processing each task I could probably split it into sub-tasks that might be potentially executed on different machines. It can be easily achieved in plain Java with a queue, but I am not sure of what is the best way to attack it in Apache Spark.

Consider the following Java pseudo-code:

BlockingQueue<Task> q = new LinkedBlockingQueue<Task>();
q.push(myInitialTask);
...
//On each thread:
while (!queue.isEmpty()) {
    Task nextTask = queue.take();
    List<Task> newTasks = process_task_and_split_to_sub_tasks(nextTask);
    queue.pushAll(newTasks);
} 

The above Java code will keep all my 100 threads busy assuming the method 'process_task_and_split_to_sub_tasks()' can split any large task to a number of smaller ones.

Is there a way to achieve the same in Spark, may be in combination with other tools?


Update: It has been correctly pointed out that one of the way to attack it is just to

  1. Generate more finer-grained keys and
  2. Then to use a smart Partitioner that will assign those keys to partitions.

I guess this is the 'classic' way to attack this problem, but it requires from me to be able to correctly estimate the amount of work per key to properly partition it. What if I don't have a good way to know the amount of work per key in advance? I might end up with very unfortunate partitioning when most of my machines will stay idle waiting for a few unfortunate ones.

Example: Let's take a simplified frequent itemset mining as an example.
Suppose my file contains lines with letters from a to j (10 letters), all letters in each line are sorted alphabetically and without repetitions, e.g. 'abcf' and the task is to find all letter combinations that exist in 50% of all lines. E.g. if many lines match the pattern 'ab.*f', then the output will contain {'a', 'b', 'f', 'ab', 'af', 'bf', 'abf'}.
One of the way to implement it is to send all lines starting with 'a' to one mapper (machine), all lines starting with 'b' to another etc. By the way, this is how frequent pattern mining is implemented in Spark. Now suppose I have 100 machines (but only 10 letters). Then 90 of my machines will stay idle.
With the finer-grained keys solution I could generate 10,000 4-letter prefixes and then somehow partition them based on estimated work per prefix. But I can be very wrong in my partitioning: if the majority of the lines start with 'abcd', then all the work will be done by the machine responsible for this prefix (and probably other prefixes in addition to it), again yielding a situation when most of my machines stay idle waiting for some unfortunate one.

The dynamic load-balancing in this case would be something like this: the mapper that has received the lines starting with 'a' might want to further split its lines - to those starting with 'ab', 'ac', 'ad',... and then send them to 10 other machines which might decide to further split their work to more tasks.
I understand that the standard Apache Spark does not have an answer out of the box, but I wonder if there is a way to achieve this nonetheless.

Kafka (i.e. the queue, as above) + Spark Streaming looks promising, do you think I will be able to achieve the dynamic load-balancing by using these tools in relatively straightforward way? Could you recommend other tools instead?

Alexander
  • 2,761
  • 1
  • 28
  • 33
  • If you have 100 machines and 10 letters and processing by 2-letter prefix is acceptable, then you'd partition by 2-letter prefix from the beginning. And map only task can achieve perfect data distribution. What we worry about is `by` operations, which won't be a problem, in a described scenario. Once you compute patterns for partition cardinality should be >> 10, in non degenerate case. For other problems we have established patterns (salting with multistep aggregations for example). To me it sounds like you want Akka style micromanaging, not Spark. – Alper t. Turker Jan 11 '18 at 13:49
  • I agree, finer-grained partitioning is certainly one way to go. Yet not always. For instance, in certain actual Big Data frequent itemset examples even partitioning by 3-letter prefix is too crude as just a few of the most frequent 3-letter prefixes might generate the majority of the answers. On the other hand, having 100,000 partitions (splitting by 5-letter prefix) does degrade performance. – Alexander Jan 11 '18 at 14:37
  • _having 100,000 partitions (splitting by 5-letter prefix) does degrade performance_ - you seem to have [wrong idea how partitioner works](https://stackoverflow.com/q/31424396/6910411). Number of keys and number of partitions are independent (optimal number partitions depends on, among other factors, number of unique keys, but there is no fixed relationship). – Alper t. Turker Jan 12 '18 at 16:48
  • Well, Spark documentation recommends 2-3 partitions per CPU core, as each partition means a separate task with a related overhead, see e.g. [Spark documentation](https://spark.apache.org/docs/latest/tuning.html#level-of-parallelism) and [Stack Overflow discussion](https://stackoverflow.com/questions/35800795/number-of-partitions-in-rdd-and-performance-in-spark) on this topic. I have also seen it myself when playing with Spark on a similar task. – Alexander Jan 13 '18 at 09:56
  • And the point is? This is a rule of thumb for managing you hardware. It has nothing to do with relationship between number of partitions and number of unique keys. – Alper t. Turker Jan 13 '18 at 14:28
  • OK, I've got your point: 100,000 keys can yield any number of partitions, depending on how I write my partitioner. Thanks for pointing to it. If my partitioner could smartly partition those 100,000 keys to 200 partitions so that the amount of work in each partition is roughly equal then OK, the problem is solved. But the actual problem is that in many cases I am unable to properly predict the amount of work in advance, so again I'll end up in a situation when 99 of machines are idle and I am waiting for the most unlucky machine. – Alexander Jan 13 '18 at 14:59
  • Exactly. But the problem actually goes further. Let's assume we have hypothetical extension which allows you to split tasks. At the point your data actually go to the executor it is to late for that. With typical workload cost of scanning input is comparable to the cost of task (computationally intensive tasks are different thing and we use different strategies) and cost of moving data out of executor (shuffle) is higher than the computation itself. Not to mention that once you see the data, you've already hit at least two important bottlenecks (net recv and dist read/write). – Alper t. Turker Jan 13 '18 at 15:10
  • So any solution would have work much earlier, possibly once shuffle is prepared, executors could exchange statistics for better balancing. It could be possible, but complex, and would affect the core of the project. And since Spark is strongly shifting from general purpose / latency to SQL-like low latency / streaming, with optimizations targeting low latency batches and streaming, it is not gonna happen. – Alper t. Turker Jan 13 '18 at 15:12
  • The typical cost of scanning is not at all comparable to the cost of computation in most frequent itemset/frequent sequences-related tasks. The cost of dataset scanning is seconds, the cost of the analysis might be days. In my experience with these tasks the cost of shuffle is indeed might be large, but only if the number of keys is very large, but not because of the data movement (as the data reading time is negligible compared to the data processing time in my case). – Alexander Jan 13 '18 at 15:20
  • You miss the point that "scaning" in case of shuffle (otherwise data skew is unlikely) contains, serialization, network transfer, deserialization and writing everything to disk. Moving data out of executor requires building shuffle structures - which is the most common source of problems with RDD API. And then of course whole ser - trans - deser - data dump. If tasks are computationally expensive, then all these 3*cores rules are useless, and we prefer smaller tasks with external shuffle service to improve fault tolerance and recovery time. – Alper t. Turker Jan 13 '18 at 15:26
  • In my case the cost of shuffle is not dominating if I am careful not to create too many keys. My problem is different cost of processing of each key and difficulty in predicting the work given a key (i.e. difficulty in writing a good partitioner). – Alexander Jan 13 '18 at 15:44
  • Good partitioner can be defined dynamically. RangePartitioner is a trivial example, but you can use way more sophisticated tools with different probabilistic data structures. Also please keep in mind that cost of starting a task and cost of shuffle are two different things. Wow, this got pretty long... To summarize - Spark is not designed for dynamic workloads. It can be adjusted to handle this in specific cases, especially if you have control over surrounding architecture, but you can easily find tools, which can do the same out-of-the-box. – Alper t. Turker Jan 13 '18 at 15:53
  • Thanks. I guess given your knowledge your 'official' stack overflow answer would be beneficial to everyone, especially if you provide an example of the tools you have just mentioned. I thought Spark Streaming might be such a tool. – Alexander Jan 13 '18 at 16:04

3 Answers3

3

Now suppose I have 100 machines (but only 10 letters). The mapper that has received the lines starting with 'a' might want to further split its lines - to those starting with 'ab', 'ac', 'ad' etc. and then send them to 10 other machines.

It is just not how Spark works. "Mapper" (task) is mostly ignorant of all the distributed context. At this level there is no access to SparkContext and we longer have RDDs, just input as a local Iterator and code to be executed on it. It cannot start and it cannot create new tasks.

At the same time your problem definition is artificial. To find frequent patterns you have to aggregate data, therefore you need shuffle. At these point records corresponding to a given pattern has to be shuffled to the same machine. Ensuring that there data is properly distributed is a job of Partitioner and there is really no place for "splits" here.

  • Why is my problem definition artificial? I think the problem is clear: 99 machines might wait for a single mapper working hard on some single task. I agree that there is no straightforward solution, but it does not mean the solution does not exist. For instance, one of the solution is that a mapper will add 'ab', 'ac' etc. prefixes as tasks to the input of Spark Streaming. – Alexander Jan 10 '18 at 14:03
2

Spark's own dynamic allocation can, to some limited extent, emulate what you want, however if you need a detailed, high performance approach with low level control, then Spark is not for you. For starters you won't be able to dynamically split tasks - you can only adjust overall resources assigned to the application.

You should rather consider low level scheduler and implementing your own solution from scratch.

  • Thanks for your answer. Could you add a link to docs or another reference so we can follow up on `you won't be able to dynamically split tasks - you can only adjust overall resources assigned to the application` – SherylHohman Jan 07 '18 at 17:38
  • Spark Streaming could probably do it as well. I mean even though Spark does not allow doing it in some straightforward way there still might be some less obvious way to achieve it. – Alexander Jan 07 '18 at 19:47
2

To archive your requirement, you can just repartition your data from two partition to whatever number of partition you want.

See https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/api/java/JavaPairRDD.html#repartition-int-

BTW, spark Streaming is unrelated to your problem.

Note that level of parallelism is not just depend on partition of dataset, but also depends on our job/algorithm.

petertc
  • 3,607
  • 1
  • 31
  • 36
  • Spark Streaming might be relevant in the following way: my current task processor will identify new tasks and then feed them as completely new task input to Spark Streaming (or, for instance, to Kafka from which it will go to Spark Streaming). – Alexander Jan 08 '18 at 15:44