I know that in Spark I can split my computation by using multiple partitions. If say I can split my input RDD into 1000 partitions and the number of my machines is 100, Spark will split the computation into 1000 tasks and dynamically allocate them into my 100 machines in some smart way.
Now suppose I can initially split my data into only 2 partitions, but I still have 100 machines. Naturally, my 98 machines will be idle. But as I am processing each task I could probably split it into sub-tasks that might be potentially executed on different machines. It can be easily achieved in plain Java with a queue, but I am not sure of what is the best way to attack it in Apache Spark.
Consider the following Java pseudo-code:
BlockingQueue<Task> q = new LinkedBlockingQueue<Task>();
q.push(myInitialTask);
...
//On each thread:
while (!queue.isEmpty()) {
Task nextTask = queue.take();
List<Task> newTasks = process_task_and_split_to_sub_tasks(nextTask);
queue.pushAll(newTasks);
}
The above Java code will keep all my 100 threads busy assuming the method 'process_task_and_split_to_sub_tasks()' can split any large task to a number of smaller ones.
Is there a way to achieve the same in Spark, may be in combination with other tools?
Update: It has been correctly pointed out that one of the way to attack it is just to
- Generate more finer-grained keys and
- Then to use a smart Partitioner that will assign those keys to partitions.
I guess this is the 'classic' way to attack this problem, but it requires from me to be able to correctly estimate the amount of work per key to properly partition it. What if I don't have a good way to know the amount of work per key in advance? I might end up with very unfortunate partitioning when most of my machines will stay idle waiting for a few unfortunate ones.
Example: Let's take a simplified frequent itemset mining as an example.
Suppose my file contains lines with letters from a to j (10 letters), all letters in each line are sorted alphabetically and without repetitions, e.g. 'abcf' and the task is to find all letter combinations that exist in 50% of all lines. E.g. if many lines match the pattern 'ab.*f', then the output will contain {'a', 'b', 'f', 'ab', 'af', 'bf', 'abf'}.
One of the way to implement it is to send all lines starting with 'a' to one mapper (machine), all lines starting with 'b' to another etc. By the way, this is how frequent pattern mining is implemented in Spark. Now suppose I have 100 machines (but only 10 letters). Then 90 of my machines will stay idle.
With the finer-grained keys solution I could generate 10,000 4-letter prefixes and then somehow partition them based on estimated work per prefix. But I can be very wrong in my partitioning: if the majority of the lines start with 'abcd', then all the work will be done by the machine responsible for this prefix (and probably other prefixes in addition to it), again yielding a situation when most of my machines stay idle waiting for some unfortunate one.
The dynamic load-balancing in this case would be something like this: the mapper that has received the lines starting with 'a' might want to further split its lines - to those starting with 'ab', 'ac', 'ad',... and then send them to 10 other machines which might decide to further split their work to more tasks.
I understand that the standard Apache Spark does not have an answer out of the box, but I wonder if there is a way to achieve this nonetheless.
Kafka (i.e. the queue, as above) + Spark Streaming looks promising, do you think I will be able to achieve the dynamic load-balancing by using these tools in relatively straightforward way? Could you recommend other tools instead?