I'm working on a project that involves reading data from RDBMS using JDBC and I succeeded reading the data. This is something I will be doing fairly constantly, weekly. So I've been trying to come up with a way to ensure that after the initial read, subsequent ones should only pull updated records instead of pulling the entire data from the table.
I can do this with sqoop incremental import by specifying the three parameters (--check-column, --incremental last-modified/append
and --last-value)
. However, I dont want to use sqoop for this. Is there a way I can replicate same in Spark with Scala?
Secondly, some of the tables do not have unique column which can be used as partitionColumn
, so I thought of using a row-number function to add a unique column to these table and then get the MIN
and MAX
of the unique column as lowerBound
and upperBound
respectively. My challenge now is how to dynamically parse these values into the read statement like below:
val queryNum = "select a1.*, row_number() over (order by sales) as row_nums from (select * from schema.table) a1"
val df = spark.read.format("jdbc").
option("driver", driver).
option("url",url ).
option("partitionColumn",row_nums).
option("lowerBound", min(row_nums)).
option("upperBound", max(row_nums)).
option("numPartitions", some value).
option("fetchsize",some value).
option("dbtable", queryNum).
option("user", user).
option("password",password).
load()
I know the above code is not right and might be missing a whole lot of processes but I guess it'll give a general overview of what I'm trying to achieve here.