I'm reading a CSV file that logs some transactions, like this:
l = [("Garfield", "Lasagna", "2021-08-01"),
("Calvin", "Tiger Plushie", "2021-08-02"),
("Calvin", "Lasagna", "2021-08-03")]
transactions = spark.createDataFrame(l, ["name", "product", "date"])
transactions.show()
+--------+-------------+----------+
| name| product| date|
+--------+-------------+----------+
|Garfield| Lasagna|2021-08-01|
| Calvin|Tiger Plushie|2021-08-02|
| Calvin| Lasagna|2021-08-03|
+--------+-------------+----------+
I want to store these values in a SQL database, in a star schema, with tables created like this:
CREATE TABLE [dbo].[test_customer]
(
[id] INT IDENTITY PRIMARY KEY, -- Primary Key column
[name] VARCHAR(50) NOT NULL
);
CREATE TABLE [dbo].[test_product]
(
[id] INT IDENTITY PRIMARY KEY, -- Primary Key column
[product] VARCHAR(50) NOT NULL
);
CREATE TABLE [dbo].[test_transaction]
(
[id] INT IDENTITY PRIMARY KEY, -- Primary Key column
[fk_customer] INT NOT NULL,
[fk_product] INT NOT NULL,
[date] DATE
);
I can write to my database the customers and products info with:
JDBCURL = "jdbc:sqlserver://{}:1433;database={};user={};password={}"
customers = transactions.select("name").drop_duplicates()
customers.write.jdbc(JDBCURL, table="dbo.test_customer", mode="append")
products = transactions.select("product").drop_duplicates()
products.write.jdbc(JDBCURL, table="dbo.test_product", mode="append")
But if I want to store the transaction details, I need to fetch the data from my database, and join it to my in-memory dataframe:
sql_customers = sqlContext.read.format("jdbc").options(url=JDBCURL, dbtable="dbo.test_customer").load()
sql_products = sqlContext.read.format("jdbc").options(url=JDBCURL, dbtable="dbo.test_product").load()
transactions_to_sql = (transactions.join(sql_customers, on="name")
.drop("name")
.withColumnRenamed("id", "fk_customer")
.join(sql_products, on="product")
.drop("product")
.withColumnRenamed("id", "fk_product")
)
transactions_to_sql.write.jdbc(JDBCURL, table="dbo.test_transaction", mode="append")
Which doesn't sound very efficient if (when) I have millions of customers and millions of products.
How can I handle auto-generated identifiers for my customers and products, and foreign keys between my tables?