2

I am Reading data from S3 using Spark Streaming, and I want to update stream data into Amazon Redshift. Data with same primary key exist then that row should be updated and new rows should be inserted. Can someone please suggest the right approach to do it considering the performance?

val ssc = new StreamingContext(sc, Duration(30000))
val lines = ssc.textFileStream("s3://<path-to-data>/YYYY/MM/DD/HH")

lines.foreachRDD(
    x => {
        val normalizedRDD = processRDD(x)
        val df = spark.createDataset(normalizedRDD)
        //TODO: How to Update/Upsert data in Redshift?
    }

)
  • Prashant, unless you are only needing a few updates, this is not the right approach as the performance will be very bad. The right pattern is spark->s3->redshift->upsert. see https://github.com/databricks/spark-redshift as one option – Jon Scott Feb 13 '19 at 08:17
  • Hi @JonScott can you please explain "spark->s3->redshift->upsert". I am just trying to understand how to upsert data into Redshift table using Spark. – Prashant Tatan Feb 13 '19 at 10:40
  • that is the flow of data - from spark to s3, then from s3 to redshift using copy command, then if needed use that data you uploaded to redshift to upsert into your target table. – Jon Scott Feb 13 '19 at 11:45
  • @JonScott Whenever you update data in RedShift it creates duplicate rows, so what I am asking here is how to update the data in Redshift using Spark. I already know how to write data into the Redshift, but not how to update. I am interested in writing data in Redshift through Spark only as I already have data in S3 and I want to perform some operations before writing into the Redshift. – Prashant Tatan Feb 13 '19 at 12:27
  • You can of course de-dupe in redshift. You also can pre-process the data in spark then follow my process above. also consider redshift spectrum as a solution. – Jon Scott Feb 13 '19 at 14:20
  • As Jon said, the most efficient way to load data into Redshift is using the COPY statement from files in S3. Writing anything other than small amounts of data via a database connection will perform very poorly by comparison. If you need to transform the data, maybe read the S3 files, transform them using Spark and write them back to S3 in the desired format and then use COPY to load them into Redshift. – Nathan Griffiths Feb 14 '19 at 08:41
  • I'm also wondering where you've heard that "Whenever you update data in RedShift it creates duplicate rows" - this is definitely not the case. – Nathan Griffiths Feb 14 '19 at 08:42
  • @NathanGriffiths: Say I have two columns in the Redshift table 'test' called 'id' (primary key) and 'value'. If I execute two queries as follows: 1) Insert into test (id, value) values ('a',10). 2) Insert into test (id, value) values ('a',20). It is creating two rows into the table instead of updating value to 20. Because of this, I am asking for a solution to update delta data from Spark into the Redshift. – Prashant Tatan Feb 15 '19 at 06:52
  • I am new contributor to the Stackoverflow, pardon me if I miss any details in my question or representation of question and please feel free to give any suggestion :) – Prashant Tatan Feb 15 '19 at 06:55
  • Hi Prashant, that isn't a behaviour specific to Redshift, that is how relational databases and SQL work. If you use an INSERT statement then you will always add new rows, never update them. To update a row (or rows) you need to use an UPDATE statement. I can't advise on how to do this with Spark but this tutorial shows a generic example: https://www.khanacademy.org/computing/computer-programming/sql/modifying-databases-with-sql/pt/changing-rows-with-update-and-delete – Nathan Griffiths Feb 16 '19 at 02:36
  • @NathanGriffiths: Yes Nathan that is what I am asking how to update data in Redshift using Spark, I have tried df.write command with SaveMode.Append but it is adding the duplicate row. – Prashant Tatan Feb 18 '19 at 08:31
  • Hi Prashant, looking at the answers to similar questions on SO it sounds like it's not possible to *update* a table from a Spark dataframe e.g. https://stackoverflow.com/questions/34643200/spark-dataframes-upsert-to-postgres-table and https://stackoverflow.com/questions/35151528/spark-dataframe-execute-update-statement – Nathan Griffiths Feb 18 '19 at 23:44
  • @NathanGriffiths: Thanks that is what I am looking for. – Prashant Tatan Feb 19 '19 at 06:22

0 Answers0