21

I just got the example below for the parallelism and have some related questions:

  1. The setParallelism(5) is setting the Parallelism 5 just to sum or both flatMap and sum?

  2. Is it possible that we can set the different Parallelism to different operators such as flatMap and sum respectively ?such as set Parallelism 5 to sum and 10 to flatMap .

  3. Based on my understanding ,keyBy is partitioning the DataStream to logical Stream\partitions based on the different keys, and suppose there are 10,000 different key values, so there are 10,000 different partitions , then how many threads would deal with the 10,000 partitions? Just 5 threads? How about if we didn't set the setParallelism(5) ?

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/parallel.html

final StreamExecutionEnvironment env =     
  StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
  .flatMap(new LineSplitter())
  .keyBy(0)
  .timeWindow(Time.seconds(5))
  .sum(1).setParallelism(5);

wordCounts.print();

env.execute("Word Count Example");
Till Rohrmann
  • 13,148
  • 1
  • 25
  • 51
YuFeng Shen
  • 1,475
  • 1
  • 17
  • 41

2 Answers2

15

When calling setParallelism on an operator, then it changes the parallelism of this specific operator. Consequently, in your example, only the window operator will be executed with a parallelism of 5 and the preceding flatMap operator with the default parallelism.

Consequently, you can set for each operator a different parallelism. However, be aware that operators with different parallelism cannot be chained and entail a rebalance (similar to a shuffle) operation.

If you want to set the parallelism for all operators, then you have to do it via the ExecutionEnvironment#setParallelism API call.

The keyBy operation partitions in the input stream into as many partitions as you have parallel operator instances. This makes sure that all elements with the same key end up in the same partition. So in your example where you set the parallelism to 5, you would end up with 5 partitions. Each partition can harbour elements with different keys.

Till Rohrmann
  • 13,148
  • 1
  • 25
  • 51
  • 1
    If I setParallelism to 5 ,then I would end up with 5 partitions, so if I setMaxParallelism to 5, then how many partitions would be ? Does it mean it would be have partitions more more than 5? Also why we have both setParallelism and setMaxParallelism ? I mean when we should use the setParallelism and when we should use the setMaxParallelism ? – YuFeng Shen Jun 08 '17 at 14:36
  • 1
    The parallelism defines the number of parallel instances of an operator. The max parallelism defines the maximum parallelism you can scale your job up to. This is important if you take a savepoint and use it to restart your job with a higher parallelism. – Till Rohrmann Jun 08 '17 at 15:42
  • For my example above suppose I remove the .keyBy(0), but keep the .setParallelism(5) , the result is there are still 5 partitions, the difference is that the same key end up different partition. So what's the Parallelism for the window operater Sum? It would become 1 even use the setParallelism(5)? As based on the following official document, the parallelism would be 1 for the Non-Keyed Windows . https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html ,or still 5 as the setParallelism(5) can overwrite the Parallelism 1 for the Non-Keyed Windows? – YuFeng Shen Jun 08 '17 at 16:26
  • 1
    For non-keyed windows the parallelism is enforced to be `1` even if you try to set it to something else. The reason is that you cannot distribute an unkeyed window. – Till Rohrmann Jun 12 '17 at 12:34
  • 1
    Great explanation @TillRohrmann. I was about to post the question for the difference between `keyBy()` and `setParallelism()` and what will happen if I `don't` use `keyBy` and still set the `parallelism` in the code and you made my day. – whatsinthename Dec 25 '21 at 16:03
  • Happy that I could help you @whatsinthename :-) – Till Rohrmann Dec 27 '21 at 14:27
2

Execution Environment Level As mentioned here Flink programs are executed in the context of an execution environment. An execution environment defines a default parallelism for all operators, data sources, and data sinks it executes. Execution environment parallelism can be overwritten by explicitly configuring the parallelism of an operator.

The default parallelism of an execution environment can be specified by calling the setParallelism() method. To execute all operators, data sources, and data sinks with a parallelism of 3, set the default parallelism of the execution environment as follows:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();

env.execute("Word Count Example");
Ru Chern Chong
  • 3,692
  • 13
  • 33
  • 43
Qi Wei
  • 51
  • 3