16

I use Spark 2.2.0.

How can I feed Amazon SQS stream to spark structured stream using pyspark?

This question tries to answer it for a non structured streaming and for scala by creating a custom receiver.
Is something similar possible in pyspark?

spark.readStream \
   .format("s3-sqs") \
   .option("fileFormat", "json") \
   .option("queueUrl", ...) \
   .schema(...) \
   .load()

According to Databricks above receiver can be used for S3-SQS file source. However, for only SQS how may one approach.

I tried understanding from AWS-SQS-Receive_Message to receive message. However, how to directly send stream to spark streaming was not clear.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
OSK
  • 596
  • 1
  • 7
  • 23
  • If your message is too big for `SQS` it will be placed in `S3` - something like BLOB/CLOB in relational database. I'll look into this topic... – Michał Zaborowski Jan 03 '18 at 15:17
  • @MichałZaborowski the message is not too big, I think. Can I check it in AWS if it is also stored in S3? – OSK Jan 05 '18 at 12:46
  • If it is too long then you have ref in it. By too long I mean 256k - [see SQS limits](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-messages.html). `Spark SQS Receiver` mentioned in the answer to the question you are mentioning is quite poor. Databricks solution seems to be much better. But to tell how to deal with it I need to try it, see sources. – Michał Zaborowski Jan 05 '18 at 13:29

1 Answers1

11

I know nothing about Amazon SQS, but "how can I feed Amazon SQS stream to spark structured stream using pyspark." is not possible with any external messaging system or a data source using Spark Structured Streaming (aka Spark "Streams").

It's the other way round in Spark Structured Streaming when it is Spark to pull data in at regular intervals (similarly to the way Kafka Consumer API works where it pulls data in not is given it).

In other words, Spark "Streams" is just another consumer of messages from a "queue" in Amazon SQS.

Whenever I'm asked to integrate an external system with Spark "Streams" I start writing a client for the system using the client/consumer API.

Once I have it, the next step is to develop a custom streaming Source for the external system, e.g. Amazon SQS, using the sample client code above.

While developing a custom streaming Source you have to do the following steps:

  1. Write a Scala class that implements the Source trait

  2. Register the Scala class (the custom Source) with Spark SQL using META-INF/services/org.apache.spark.sql.sources.DataSourceRegister file with the fully-qualified class name or use the fully-qualified class name in format

Having a custom streaming source is a two-part development with developing the source (and optionally registering it with Spark SQL) and using it in a Spark Structured Streaming application (in Python) by means of format method.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420