8

I'm a newbie in the Spark world and struggling with some concepts.

How does parallelism happen when using Spark Structured Streaming sourcing from Kafka ?

Let's consider the following snippet code:

SparkSession spark = SparkSession
          .builder()
          .appName("myApp")
          .getOrCreate();   

Dataset<VideoEventData> ds = spark
  .readStream()
  .format("kafka")
  ...

gDataset = ds.groupByKey(...)

pDataset = gDataset.mapGroupsWithState(
      ...
      /* process each key - values */
      loop values
        if value is valid - save key/value result in the HDFS
      ... 
)

StreamingQuery query = pDataset.writeStream()
          .outputMode("update")
          .format("console")
          .start();

//await
query.awaitTermination();

I've read that the parallelism is related with the number of data partitions, and the number of partitions for a Dataset is based on the spark.sql.shuffle.partitions parameter.

  1. For every batch (pull from the Kafka), will the pulled items be divided among the number of spark.sql.shuffle.partitions? For example, spark.sql.shuffle.partitions=5 and Batch1=100 rows, will we end up with 5 partitions with 20 rows each ?

  2. Considering the snippet code provided, do we still leverage in the Spark parallelism due to the groupByKey followed by a mapGroups/mapGroupsWithState functions ?

UPDATE:

Inside the gDataset.mapGroupsWithState is where I process each key/values and store the result in the HDFS. So, the output sink is being used only to output some stats in the console.

zero323
  • 322,348
  • 103
  • 959
  • 935
Kleyson Rios
  • 2,597
  • 5
  • 40
  • 65

1 Answers1

9

For every Batch (pull from the Kafka), will the pulled items be divided among the number of spark.sql.shuffle.partitions?

They will be divided once they reach groupByKey which is a shuffle boundary. When you retrieve the data at first, the number of partitions will be equal to the number of Kafka partitions

Considering the snippet code provided, do we still leverage in the Spark parallelism due to the groupByKey followed by a mapGroups/mapGroupsWithState functions

Generally yes, but it also depends on how you setup your Kafka topic. Although not visible to you from the code, Spark will internally split the data different stage into smaller tasks and distribute them among the available executors in the cluster. If your Kafka topic has only 1 partition, that means that prior to groupByKey, your internal stream will contain a single partition, which won't be parallalized but executed on a single executor. As long as your Kafka partition count is greater than 1, your processing will be parallel. After the shuffle boundary, Spark will re-partition the data to contain the amount of partitions specified by the spark.sql.shuffle.partitions.

Yuval Itzchakov
  • 146,575
  • 32
  • 257
  • 321
  • Interesting ... My topic is set for just one single partition. In the Spark UI, I can see that, regardless the number of spark.sql.shuffle.partitions/tasks, just a single task is doing all the work, and the others are completely idle. Then, would be correct to say that the number of input partitions (topic partitions) should be greater than the number of spark.sql.shuffle.partitions ? – Kleyson Rios Jan 13 '18 at 14:42
  • @KleysonRios The number of topic partitions in Kafka need to be configured appropriately for your throughput needs. Later, in Spark, if you use `mapGroupsWithState`, it will cause a repartitions according to your key anyway. There is no need for one to be greater than the other, you just need to configure Spark according to the throughput you expect and the resources at your disposal. – Yuval Itzchakov Jan 13 '18 at 14:45
  • @YuvalItzchakov It would be great if you could answer the question i recently posted here https://stackoverflow.com/questions/52474516/what-does-spark-sql-shuffle-partitions-exactly-refers-to, because i think i picked my answer in this post, but i think it would be best if you could lay that according to the term i use in my post. They are more formal to spark and i am sure a lot of people would really benefit from it. For instance I think sometime people conflate words that does not help understand everything. What is a Shuffle boundary? Shuffling is repartitioning. So saying .... – MaatDeamon Sep 24 '18 at 07:56
  • Saying repartition happen after the shuffle boundary sounds inaccurate to me. A wide transformation is one that includes shuffling operation and result in partition that are the result of shuffling data. In my question in the comment i just did not know what was spark.sql.shuffle.partitions for and how did it fit the model. I think the name is not intuitive but thanks to you i kind of understand it now – MaatDeamon Sep 24 '18 at 08:00