1

I am doing a join operation from S3 parquet data, to a JDBC (Postgres) table, using a column in the parquet data to the primary key of the JDBC table. I need a small fraction (but still an overall large number - tens or hundreds of thousands of rows overall) from the JDBC table, and then I need to intelligently partition the data for use in the executors.

I'm still new to data engineering as a whole and Spark in particular, so pardon (and assume!) my ignorance. I'm less concerned with processing time than memory usage; I have to fit the memory usage into the Amazon Glue limits.

What is a good way to do this?

My existing thoughts:

I could, in theory, construct a SQL query like:

select * from t1 where id = key1 UNION
select * from t1 where id = key2 UNION...

But, this seems silly. This question: Selecting multiple rows by ID, is there a faster way than WHERE IN gives me the idea of writing the keys I want to pull to a temporary table, joining that with the original table, and pulling the result; which seems like the "correct" way to do the above. But, this also seems like it could be a common enough problem that there's a ready-made solution I just haven't yet found.

There's also the possibility of pulling between the min/max UUID values, but then it's a question of how many extra rows I'm pulling and since the UUIDs are, AFAIK, randomly distributed throughout the possible UUID values, I expect that to get a lot of extra rows (rows that will be left out during the join). Still, this might be a useful way to partition the JDBC data.

It's also still unclear to me how the JDBC data gets to the executors; that it possibly passes (in it's entirety) through the driver process.

So, to try to formalize this into questions:

  1. Is there an existing recipe for this usage?
  2. What are features of Spark I should be looking at to accomplish this?
  3. What is that actual Spark data flow for data coming from a JDBC connection?
zero323
  • 322,348
  • 103
  • 959
  • 935
Narfanator
  • 5,595
  • 3
  • 39
  • 71
  • Out of the box Spark doesn't provide anything like this. However if you search through external posts by [Russell Spitzer](https://stackoverflow.com/users/1174164/russs) how useful pushdown can be achieved in similar cases. – zero323 Oct 04 '18 at 21:06

2 Answers2

0

It seems like the best-possible way to do this (as yet) is to write the row IDs you want to fetch to a temporary table on the DB, do the join with the main table, and then read out the result (as described in the linked answer).

Theoretically, this is totally doable in Spark; something like

// PSUEDOCODE!
df.select("row_id").write.jdbc(<target db>, "ids_to_fetch")
databaseConnection.execute("create table output from (select * from ids_to_fetch join target_table on row_id = id)")
df = df.join(
  spark.read.jdbc(<target db>, "output")
)

This is probably the most effective way to do it, because (AFAIK) it will farm both the write of IDs and the read of the join table out to the executors, instead of trying to do much of anything in the driver.

However, right now I can't write a temp table to the target database, so I'm producing a series of select where in statements in the driver, and then pulling the results of those.

Narfanator
  • 5,595
  • 3
  • 39
  • 71
0

Spark is not designed to do any performant things on driver, better to avoid it.

For your case I would recommend to load a data from S3 first to some DF. Persist this data frame as it will be need later.

Then you can resolve unique values for you keys from S3 using combination of map(row->).distinct()

Then partition above keys having reasonable number of keys in each partition for making a single query to JDBC. You can persist above result as well, and perform count() operation, and then repartition(). For example for having not more than 1000 items in single partition.

Then using mapPartitions, compose a query like 'SELECT * FROM table WHERE key in '.

Then using spark flatMap need to perform actual selects. I do not know automatic way for this with data frames, so probably you will need to use JDBC directly to perform select and map a data. You can have a spring framework initialized on a worker machine and use spring data extensions to easy load data from DB to some entities list.

Now you have DatasSet with required data from Postgres in cluster. You can create a data frame from it, by toDF(). Probably need some additional mapping for columns here, or to map the data to Row type on previous step.

So, now you have 2 required data frames, one is persisted initial with data from S3, and other with data from Postgres, you can join them in standard way using Dataframe.join.

Note: Do not forget persisting Data Sets and Frames using .persist() when they are going to be reused. Otherwise it will repeat all steps for data retrieval every time.