1

My Kafka producers are distributing the messages into topic partitions based on a given key.

So, in the Spark side I already have the messages that need be processed together in the same partition.

Now, I need to do a groupByKey to have in each partition the values aggregated in a list by the keys, but not need merge the partitions because there is not chance to have a given key in more than one partition.

How could I do this groupByKey only at partition level ?

|topic-partition1| ---> |spark-partition1| -- groupByKey --> |spark-partition1.1| -- mapGroupsWithState --> ...
|topic-partition2| ---> |spark-partition2| -- groupByKey --> |spark-partition2.1| -- mapGroupsWithState --> ...
|topic-partition3| ---> |spark-partition3| -- groupByKey --> |spark-partition3.1| -- mapGroupsWithState --> ...
zero323
  • 322,348
  • 103
  • 959
  • 935
Kleyson Rios
  • 2,597
  • 5
  • 40
  • 65

1 Answers1

1

If you know all events are going to come in a given partition, you can use DataSet.mapPartitions on the dataset:

val dataSet: DataSet[(String, String)] = ???
dataSet.mapPartitions { iter =>
  val res: Map[String, List[(String, String)] =
    iter.toList.groupBy { case (key, _) => key }

  // Do additional processing on res, which is now grouped by each key
  // present in the partition.
}

Otherwise, if you need mapGroupsWithState, there is on way to avoid using groupByKey as you need a KeyValueGroupedDataset[K, V].

If you're concerned with performance, don't be unless you've found this a bottleneck while profiling.

zero323
  • 322,348
  • 103
  • 959
  • 935
Yuval Itzchakov
  • 146,575
  • 32
  • 257
  • 321
  • Yes @Yuval , they are coming in the same partition, but I need to use the mapGroupsWithState function, and I didn't find a way to call the function without calling the groupByKey. – Kleyson Rios Jan 23 '18 at 17:23
  • @KleysonRios That's right, `mapGroupsWithState` is defined on a key valued data set. You can't use it without. – Yuval Itzchakov Jan 23 '18 at 17:24
  • this means that I don't have options to achieve that ? Using the groupByKey.mapGroupsWithState, even shuffling, I'm not getting multiple partitions with data. May you check my follow up answer on this [topic](https://stackoverflow.com/questions/48239970/understanding-spark-structured-streaming-parallelism) where you helped me before ? – Kleyson Rios Jan 23 '18 at 17:31
  • @KleysonRios You cannot use `mapGroupsWithState` without grouping by key first. I'm not sure what you mean by "I'm not getting multiple partitions with data", can you elaborate on that? – Yuval Itzchakov Jan 23 '18 at 17:36
  • Before the groupByKey.mapGroupsWithState I had two partitions, and later I ended up with two partitions (I guess) but all the data was put in the same partition. [In this thread](https://stackoverflow.com/questions/48239970/understanding-spark-structured-streaming-parallelism) I put some images from the Spark UI. Instead of having two tasks running in parallel I had just one. I've read something about the partitioner algorithm but didnt find too much about this subject. – Kleyson Rios Jan 23 '18 at 17:41
  • @KleysonRios That's weird, the default partitioner is a hash partitioner. Are your keys skewed by any chance? What are you using as key? – Yuval Itzchakov Jan 23 '18 at 17:45
  • I tried two different scenarios. One where the keys were generated using ("group-" + UUID.randomUUID()) and another scenario where the messages was assigned randomly to the either "group-1" or "group-2". How can I implement and force a new partitioner to be used during the groupByKey.mapGroupsWithState to make sure that the problem might be/not be the default partitioner ? – Kleyson Rios Jan 23 '18 at 18:23
  • @Kleyson Why do you say that all the data was assigned to a single partition? – Yuval Itzchakov Jan 23 '18 at 19:02
  • [On the first stage](https://i.stack.imgur.com/LStRc.png) we can see two initial partitions, that match the kafka partitions, where one has 25 rows and the other one has 14 rows (39 total). [On the second stage](https://i.stack.imgur.com/ze6sG.png) all the 39 rows was assigned to the task #3. Looks like that two partitions was created, see task #1, but no rows assigned. I am supposing that the task #1 was created because there is time for the GC. – Kleyson Rios Jan 23 '18 at 20:06
  • Kafka producer uses [MurmurHash2 algorithm](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L69) as partitioner. Is this [code the default spark HashPartitioner implementation](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala) ? How can I implement and set a new partitioner ? I could try to use the same Murmur2 algorithm and check if the number of partitions and distribution of the messages would match with the kafka producer. – Kleyson Rios Jan 23 '18 at 20:20
  • Thanks Yuval, I found the error in my code. I was returning a wrong information as the key. – Kleyson Rios Jan 23 '18 at 21:37
  • @KleysonRios Are you able to achieve this functionality with any other framework as similar functionality is what I am looking for – vipin Jun 24 '20 at 07:33
  • @vipin Not working with this for a long time. The project didn't finish. So, unfortunately I dont have any answer for you. – Kleyson Rios Jun 24 '20 at 13:45
  • @vipin You can easily do this with Flink which allows any operator to contain state. – Yuval Itzchakov Jun 24 '20 at 17:10