4

I'm reading a CSV file that logs some transactions, like this:

l = [("Garfield", "Lasagna", "2021-08-01"),
     ("Calvin", "Tiger Plushie", "2021-08-02"),
     ("Calvin", "Lasagna", "2021-08-03")]
transactions = spark.createDataFrame(l, ["name", "product", "date"])

transactions.show()

+--------+-------------+----------+
|    name|      product|      date|
+--------+-------------+----------+
|Garfield|      Lasagna|2021-08-01|
|  Calvin|Tiger Plushie|2021-08-02|
|  Calvin|      Lasagna|2021-08-03|
+--------+-------------+----------+

I want to store these values in a SQL database, in a star schema, with tables created like this:

CREATE TABLE [dbo].[test_customer]
(
    [id] INT IDENTITY PRIMARY KEY, -- Primary Key column
    [name] VARCHAR(50) NOT NULL
);
CREATE TABLE [dbo].[test_product]
(
    [id] INT IDENTITY PRIMARY KEY, -- Primary Key column
    [product] VARCHAR(50) NOT NULL
);
CREATE TABLE [dbo].[test_transaction]
(
    [id] INT IDENTITY PRIMARY KEY, -- Primary Key column
    [fk_customer] INT NOT NULL,
    [fk_product] INT NOT NULL,
    [date] DATE
);

I can write to my database the customers and products info with:

JDBCURL = "jdbc:sqlserver://{}:1433;database={};user={};password={}"

customers = transactions.select("name").drop_duplicates()    
customers.write.jdbc(JDBCURL, table="dbo.test_customer", mode="append")

products = transactions.select("product").drop_duplicates()    
products.write.jdbc(JDBCURL, table="dbo.test_product", mode="append")

But if I want to store the transaction details, I need to fetch the data from my database, and join it to my in-memory dataframe:

sql_customers = sqlContext.read.format("jdbc").options(url=JDBCURL, dbtable="dbo.test_customer").load()
sql_products = sqlContext.read.format("jdbc").options(url=JDBCURL, dbtable="dbo.test_product").load()

transactions_to_sql = (transactions.join(sql_customers, on="name")
                      .drop("name")
                       .withColumnRenamed("id", "fk_customer")
                      .join(sql_products, on="product")
                      .drop("product")
                       .withColumnRenamed("id", "fk_product")
                      )

transactions_to_sql.write.jdbc(JDBCURL, table="dbo.test_transaction", mode="append")

Which doesn't sound very efficient if (when) I have millions of customers and millions of products.

How can I handle auto-generated identifiers for my customers and products, and foreign keys between my tables?

Be Chiller Too
  • 2,502
  • 2
  • 16
  • 42
  • 1
    What would happen if one of the keys in the Spark dataset already exists in the SQL database, e.g. if there was already an entry for `Garfield` in the `test_customer` table before the Spark logic is executed? – werner Aug 29 '21 at 20:26
  • that can't happen, let's say I have a database filled with all my customers and products, every day I'm only saving transactions. – Be Chiller Too Aug 30 '21 at 07:23
  • 3
    I think you will have to read the primary keys from the database back to Spark (like you already mentioned in the question) or you have to create the IDs within Spark as part of the datasets. You could have a look [here](https://stackoverflow.com/q/54573300/2129801): a similar question, but unfortunately no accepted answer. – werner Aug 30 '21 at 18:11
  • Thanks for your comment, I'll keep searching – Be Chiller Too Aug 31 '21 at 07:53
  • Unless you're Amazon scale, I don't think that you have any course for worry and even still if it becomes bothersome, you can chunk this operation. Since you process transactions as a daily batch, you can consider increasing the cadence to process it every hour or so. – Oluwafemi Sule Sep 03 '21 at 09:22
  • My example includes daily operations, but in reality I have a pipeline running every hour that processes events. – Be Chiller Too Sep 03 '21 at 09:28

0 Answers0