4

I am looking a way to trigger my Databricks notebook once to process Kinesis Stream and using following pattern

 import org.apache.spark.sql.streaming.Trigger

// Load your Streaming DataFrame
   val sdf = spark.readStream.format("json").schema(my_schema).load("/in/path")
// Perform transformations and then write…
   sdf.writeStream.trigger(Trigger.Once).format("delta").start("/out/path")

It looks like it's not possible with AWS Kinesis and that's what Databricks documentation suggest as well. My Question is what else can we do to Achieve that?

2 Answers2

2

As you mentioned in the question the trigger once isn't supported for Kinesis.

But you can achieve what you need by adding into the picture the Kinesis Data Firehose that will write data from Kinesis into S3 bucket (you can select format that you need, like, Parquet, ORC, or just leave in JSON), and then you can point the streaming job to given bucket, and use Trigger.Once for it, as it's a normal streaming source (For efficiency it's better to use Auto Loader that is available on Databricks). Also, to have the costs under the control, you can setup retention policy for your S3 destination to remove or archive files after some period of time, like 1 week or month.

Alex Ott
  • 80,552
  • 8
  • 87
  • 132
  • Is this the best / only solution to have "Trigger once" for cost saving purposes? We don't want our Cluster running 24x7 and would like to schedule job to run on Job Cluster. Does it also mean the Azure Event Hub is better option than AWS Kinesis as Event hub supports Trigger.Once option. How about killing the job and starting next time with "at_timestamp" option? What other people are doing? Are they leaving cluster up and running if Kinesis is their only source and they are not using Firehose to push it to S3 bucket? – InTheWorldOfCodingApplications Apr 03 '21 at 15:14
  • it's so many questions at once :-) `at_timestamp` won't help - it's used only at first run, after that everything is handled via checkpoint. But yes, stopping the stream and start it again later may work (something like this answer: https://stackoverflow.com/questions/66777031/how-to-configure-backpreasure-in-spark-3-structure-stream-kafka-files-source-wit/66777533#66777533) – Alex Ott Apr 03 '21 at 17:13
  • What other people are doing depends on their requirements. People who need realtime, run clusters 24x7. I saw people who don't need that - use the Firehose. EventHubs has its own limitations, I found by experimentation that for EventHubs the Kafka interface works much better than native protocol – Alex Ott Apr 03 '21 at 17:15
  • 1
    Thanks Alex. Marking it as answered. Need to look into Auto Loader Tigger.Once though. – InTheWorldOfCodingApplications Apr 03 '21 at 17:17
  • looked into Autoloader as suggested but having following issue https://stackoverflow.com/questions/67050262/delta-table-transactional-guarantees-when-loading-using-autoloader-from-aws-s3-t. Giving me some error regarding Delta Table transactional guarantees – InTheWorldOfCodingApplications Apr 11 '21 at 21:25
1

A workaround is to stop after X runs, without trigger. It'll guarantee a fix number of rows per run. The only issue is that if you have millions of rows waiting in the queue you won't have the guarantee to process all of them

In scala you can add an event listener, in python count the number of batches.

from time import sleep
s = sdf.writeStream.format("delta").start("/out/path")

#by defaut keep spark.sql.streaming.numRecentProgressUpdates=100 in the list. Stop after 10 microbatch
#maxRecordsPerFetch is 10 000 by default, so we will consume a max value of 10x10 000= 100 000 messages per run
while len(s.recentProgress) < 10:
  print("Batchs #:"+str(len(s.recentProgress)))
  sleep(10)
s.stop()

You can have a more advanced logic counting the number of message processed per batch and stopping when the queue is empty (the throughput should lower once it's all consumed as you'll only get the "real-time" flow, not the history)

Quentin
  • 3,150
  • 4
  • 24
  • 34