17

I have a Spark structured streaming app (v2.3.2) which needs to read from a number of Kafka topics, do some relatively simple processing (mainly aggregations and a few joins) and publishes the results to a number of other Kafka topics. So multiple streams are processed in the same app.

I was wondering whether it makes a difference from a resource point of view (memory, executors, threads, Kafka listeners, etc.) if I setup just 1 direct readStream which subscribes to multiple topics and then split the streams with selects, vs. 1 readStream per topic.

Something like

df = spark.readStream.format("kafka").option("subscribe", "t1,t2,t3")
...
t1df = df.select(...).where("topic = 't1'")...
t2df = df.select(...).where("topic = 't2'")...

vs.

t1df = spark.readStream.format("kafka").option("subscribe", "t1")
t2df = spark.readStream.format("kafka").option("subscribe", "t2")

Is either one more "efficient" than the other? I could not find any documentation about if this makes a difference.

Thanks!

jammann
  • 690
  • 1
  • 6
  • 22

2 Answers2

8

Each action requires a full lineage execution. Youre better off separating this into three separate kafka reads. Otherwise you'll read each topic N times, where N is the number of writes.

I'd really recommend against this but if you wanted to put all the topics into the same read then do this:

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.filter().write.format(...).save(...)  // location 1
  batchDF.filter().write.format(...).save(...)  // location 2
  batchDF.unpersist()
}
Joe Widen
  • 2,378
  • 1
  • 15
  • 21
  • 3
    I think that first sentence of yours has greatly improve my intuition about what's happening in Spark structured streaming. When looking at it "from the end", it becomes clear that one should never include unnecessary dependencies in an execution's lineage. Thanks for that insight! – jammann May 01 '19 at 19:32
  • 1
    Each streaming writer has a checkpoint directory which keeps track of the read offsets in the readers. Kafka doesnt care how many times a message has been read. – Joe Widen Jul 03 '19 at 04:18
  • so per 'action' comment, if i join the 2 streams together and write into 1 place, then maybe it is okay to read the 2 streams together? meaning subscribe multiple topics at one go. correct? @JoeWiden – soMuchToLearnAndShare Sep 17 '20 at 19:31
  • 1
    I am interested to know what other considerations like the checkpointing locations, possibly. – thebluephantom Mar 04 '21 at 13:30
1

From a resource(Memory and Cores) point of view, there will be a difference If you are running it as multiple streams(multiple drives-executors) on the cluster.

For the first case, you mentioned-

df = spark.readStream.format("kafka").option("subscribe", "t1,t2,t3")... t1df = df.select(...).where("topic = 't1'")... t2df = df.select(...).where("topic = 't2'")...

Considering there will be a driver and 2 executers you have provided to above.

In the second case-

t1df = spark.readStream.format("kafka").option("subscribe", "t1") t2df = spark.readStream.format("kafka").option("subscribe", "t2")

You can run these as different streams- 2 drivers and 2 executors(1 executor each). In second case there will require more memory and cores for extra driver required.

Ketan Kumbhar
  • 85
  • 1
  • 1
  • 8
  • 2
    I thought Spark apps can only have a single driver. Did you mean cores? – OneCricketeer May 01 '19 at 04:02
  • Yes, you are right, spark apps can only have one driver. I mean to say, you can run second case code snippet as two different apps one for topic t1 and one for t2. – Ketan Kumbhar May 01 '19 at 04:09
  • Thanks for these explanations. I explicitly did not want to split up the streams into 2 separate apps. The setup I have has quite a lot of different topics, but every single one has only a small amount of traffic (a few messages per second at most). Having something like 1 app per topic would cause massive amounts of management overhead on the cluster resources, and each single app would then only do very little work. So I'm really looking at optimizing within 1 single app – jammann May 01 '19 at 19:37
  • What is the final recommendation then? @jammann - Can you share your solution? – Mata Jun 12 '19 at 08:12
  • 2
    I'm running a small number of apps 3-4, each one holding a small number of queries 5-10. Having 1 app per query was too much overhead for me. And having just 1 app with all query did not work well for me, tended to run out of memory – jammann Jun 12 '19 at 12:50