0

I'm developing an application using spark streaming reading from multiple kafka topics and I would like to know if this solution above is the best way of doing that.

bootstrap_server = '10.108.123.238:9092'

topics = [
'ingest_src_api_iq_BTCUSD_1_json', 'ingest_src_api_iq_BTCUSD_5_json', 'ingest_src_api_iq_BTCUSD_60_json', 'ingest_src_api_iq_BTCUSD_300_json', 'ingest_src_api_iq_BTCUSD_900_json', 'ingest_src_api_iq_BTCUSD_1800_json', 'ingest_src_api_iq_BTCUSD_3600_json', 'ingest_src_api_iq_BTCUSD_14400_json', 'ingest_src_api_iq_BTCUSD_86400_json',
'ingest_src_api_iq_XRPUSD_1_json', 'ingest_src_api_iq_XRPUSD_5_json', 'ingest_src_api_iq_XRPUSD_60_json', 'ingest_src_api_iq_XRPUSD_300_json', 'ingest_src_api_iq_XRPUSD_900_json', 'ingest_src_api_iq_XRPUSD_1800_json', 'ingest_src_api_iq_XRPUSD_3600_json', 'ingest_src_api_iq_XRPUSD_14400_json', 'ingest_src_api_iq_XRPUSD_86400_json',
'ingest_src_api_iq_ETHUSD_1_json', 'ingest_src_api_iq_ETHUSD_5_json', 'ingest_src_api_iq_ETHUSD_60_json', 'ingest_src_api_iq_ETHUSD_300_json', 'ingest_src_api_iq_ETHUSD_900_json', 'ingest_src_api_iq_ETHUSD_1800_json', 'ingest_src_api_iq_ETHUSD_3600_json', 'ingest_src_api_iq_ETHUSD_14400_json', 'ingest_src_api_iq_ETHUSD_86400_json',
'ingest_src_api_iq_OMGUSD_1_json', 'ingest_src_api_iq_OMGUSD_5_json', 'ingest_src_api_iq_OMGUSD_60_json', 'ingest_src_api_iq_OMGUSD_300_json', 'ingest_src_api_iq_OMGUSD_900_json', 'ingest_src_api_iq_OMGUSD_1800_json', 'ingest_src_api_iq_OMGUSD_3600_json', 'ingest_src_api_iq_OMGUSD_14400_json', 'ingest_src_api_iq_OMGUSD_86400_json',
'ingest_src_api_iq_TRXUSD_1_json', 'ingest_src_api_iq_TRXUSD_5_json', 'ingest_src_api_iq_TRXUSD_60_json', 'ingest_src_api_iq_TRXUSD_300_json', 'ingest_src_api_iq_TRXUSD_900_json', 'ingest_src_api_iq_TRXUSD_1800_json', 'ingest_src_api_iq_TRXUSD_3600_json', 'ingest_src_api_iq_TRXUSD_14400_json', 'ingest_src_api_iq_TRXUSD_86400_json',
]

schema = "active_id INT, size INT, at STRING, from STRING, to STRING, id INT, open FLOAT, close FLOAT, min FLOAT, max FLOAT, ask FLOAT, bid FLOAT, volume FLOAT, phase STRING"


for topic in topics:
    df = (spark.readStream.format("kafka")
                        .option("kafka.bootstrap.servers", bootstrap_server)
                        .option("subscribe", topic)
                        .option("startingOffsets", "latest")
                        .load()
                        .select(from_json(col("value").cast("string"), schema).alias("value")) 
                        )

    df = (df.select(to_json(struct(expr("value.active_id as active_id"), expr("value.size as timeframe"),
                        expr("cast(value.at / 1000000000 as timestamp) as executed_at"), expr("FROM_UNIXTIME(value.from) as candle_from"), 
                        expr("FROM_UNIXTIME(value.to) as candle_to"), expr("value.id as period"), 
                        "value.open", "value.close", "value.min", "value.max", "value.ask", "value.bid", "value.volume")).alias("value"))
                        .writeStream.format("kafka").option("kafka.bootstrap.servers", bootstrap_server)
                        .option("topic", topic.replace('ingest', 'processed'))
                        .option("checkpointLocation", F"./checkpoint_{topic.replace('ingest', 'processed')}/")
                        .start()   
                        )

I'm using a for loop to read all the Kafka topics. Is this be the best option in terms of processing and memory usage ?

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • If each action requires a full lineage execution,then you should separate into multiple dataframe. And see if this helps https://stackoverflow.com/questions/55929540/spark-structured-streaming-app-reading-from-multiple-kafka-topics – teedak8s Jul 10 '22 at 05:25

1 Answers1

3

Since the processing logic in your example is the same for all topics, you can replace

.option("subscribe", topic)

with

.option("subscribePattern", "ingest_src_api.*")

Behind the scene, this will create one single (distributed) Kafka consumer group for all partitions of all those topics, followed by one single Spark lineage, as opposed to one consumer group per topic as your code is currently doing.

If new topics are later created that match that regex pattern, they should be automatically detected as well, after some delay: the check is done every metadata.max.age.ms, which is 5min by default, and you can override it with .option("kafka.metadata.max.age.ms", "3000") or so.

Svend
  • 6,352
  • 1
  • 25
  • 38
  • That worked well It created a consumer for all of the topics, but how would I write to separated topics using this option? – Renan Nogueira Jul 10 '22 at 10:47
  • Ah indeed, you can dynamically route the parsing result to different topics with this approach. For what it's worth, I think Kafka Streams would allow to do that, here's a relevant blog: https://www.confluent.io/blog/putting-events-in-their-place-with-dynamic-routing/ – Svend Jul 10 '22 at 13:28
  • @RenanNogueira You can filter the consumed dataframe by the topic column – OneCricketeer Jul 10 '22 at 20:35
  • @OneCricketeer I noticed that spark stores in each batch differents topics. I'm trying to figure it out a way to group by the same topic name and send it back to kafka. – Renan Nogueira Jul 11 '22 at 21:46
  • @RenanNogueira It's possible some batches consume zero records from a given topic name, but that doesn't change the logic of `filter('topic == "example"')` – OneCricketeer Jul 11 '22 at 21:49
  • @OneCricketeer I've being doing some tests using the filter, but is there any way that I don't need to use a 'for loop' passing the topic name to the filter and then sending it to its new topic? – Renan Nogueira Jul 12 '22 at 00:26
  • @RenanNogueira I suggest creating a new post explaining the issue you're having now – OneCricketeer Jul 12 '22 at 18:39