1

Scenario:
Kafka -> Spark Streaming

Logic in each Spark Streaming microbatch (30 seconds):
Read Json->Parse Json->Send to Kafka

My streaming job is reading from around 1000 Kafka topics, with around 10K Kafka partitions, the throughput was around 5 million events/s.

The issue is coming from uneven traffic load between Kafka partitions, some partitions have throughput about 50 times of smaller ones, this results in skewed RDD partitions (as KafkaUtils creates 1:1 mapping from Kafka partitions to Spark partitions) and really hurt the overall performance, because for each microbatch, most executors are waiting for the one that has largest load to finish, I know this by looking at Spark UI, at some point of each microbatch, there`s only a few executors have "ACTIVE" tasks, all other executors are done with their task and waiting, also by looking at task time distribution, MAX is 2.5 minute but MEDIAN is only 20 seconds.

Notes:

  1. Spark Streaming not Structured Streaming
  2. I am aware of this post Spark - repartition() vs coalesce() , I`m not asking about difference between repartition() or coalesce(), load is consistent, so not relevant to autoscaling or dynamic allocation also

What I tried:

  1. Coalesce() helps a little bit but does not remove the skewness and sometimes even worse, also comes with a higher risk to OOM on executors.
  2. Repartition() does remove skewness but full shuffling is simply too expensive at this scale, the penalty does not payback on execution time for each batch, increasing the batch time does not work also because when batch time increases, load increases for each microbatch and the work load to shuffle increases also

How do I make workload more evenly distributed between Spark executors so that resources are being used more efficiently? And performance would be better?

Dharman
  • 30,962
  • 25
  • 85
  • 135
dunlu_98k
  • 209
  • 2
  • 3
  • 11
  • 1
    The question still has a chance to be seen and answered. Just because two people found it not useful does not mean it won't be answered. Please refrain from offending us in the comments, this will increase any chances someone will eventually answer or help you. – Dharman Apr 30 '20 at 23:49
  • 1
    You can always try to update the question to make it better, or you can change your mind and delete it. Voting and closing is our way of giving you feedback too. Maybe some Kafka expert will come and guide you on how you can improve the quality of your question. Try not to read too much into it. You don't need to delete a question just because someone decided to downvote it. Even if this question is not popular maybe one of your next ones will be successful. – Dharman May 01 '20 at 01:09
  • 1
    Please don't have rants in the comments under your question. If you have ideas for improvement then take it to [meta]. It is clear to me that even after I have explained to you what downvotes are, you still do not understand how rating works. I could argue that whoever wants to upvote should also leave a mandatory comment, but you can see how ridiculous that would get. You take the votes too personally. Your life does not depend on them. – Dharman May 01 '20 at 01:38
  • how is the skewness occuring? what are the sources? can you influence to put into different partitions - the feed that is? – thebluephantom May 01 '20 at 09:31
  • @thebluephantom the skewness comes from Kafka topics, the difference in data volume between Kafka topics are very huge ( like 50 times) , that resulted in skewed Spark partitions (because spark creates 1 RDD partition for each Kafka partition ) – dunlu_98k May 01 '20 at 10:18
  • Yes I know that, but thanks for reminding me. – thebluephantom May 01 '20 at 10:22

2 Answers2

2

I have the same issue. you can try the minPartitoin parameter from spark 2.4.7

Few things which are important to highlight.

  • By default One Kafka partition mapped to 1 spark partition or a few from spark to one from Kafka.
  • Kafka Dataframe has start and end boundaries per partition.
  • Kafka Dataframe maxMessagePerTrigger define a number of messages readed from kafka.
  • From Spark 2.4.7 also supports minParrtions parameter, which can bound one Kafka partition to multiple Kafka partitions based on offset range. By default, it tries to do its best effort to split Kafka partition(offset range) evenly.

So using minPartitons and maxOffsetsPerTrigger you can pre-calculate a good amount of partitions.

.option("minPartitions", partitionsNumberLoadedFromKafkaAdminAPI * splitPartitionFactor)
.option("maxOffsetsPerTrigger", maxEventsPerPartition * partitionsNumber)

maxEventsPerPartition and splitPartitionFactor defined from config.

In my case, sometimes I have data spikes and my message size can be very different. So I have implemented my own Streaming Source which can split kafka-partitions by exact record size and even coalesce a few kafka-parttiions on one spark.

Grigoriev Nick
  • 1,099
  • 8
  • 24
0

Actually you have provided your own answer.

Do not have 1 Streaming Job reading from a 1000 topics. Put those with biggest load into separate Streaming Job(s). Reconfigure, that simple. Load balancing, queuing theory.

Stragglers are an issue in Spark, although a straggler takes on a slightly different trait in Spark.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
  • 1
    it's nice that this answer help, but it solves another problem. it will not help If I really need a different topic, with a different volume. And I personally have such requirements to merge data from multiple topics, which may have different data volumes. So I think this answer for another question, – Grigoriev Nick Jul 03 '21 at 14:40