I am running a DLT (Delta Live Table) Job that creates a Bronze table > Silver Table for two separate tables. So in the end, I have two separate gold Tables which I want to be merged into one table. I know how to do it in SQL but every time I run the job with an SQL cell in a Databricks notebook on Databricks it gives me this error.
Magic commands (e.g. %py, %sql and %run) are not supported with the exception of
%pip within a Python notebook. Cells containing magic commands are ignored.
Unsupported magic commands were found in the following notebooks
I would do it in PySpark but it does not have creat table functionalities.
Here is my code for making the bronze table
@dlt.table(name="Bronze_or",
comment = "New online retail sales data incrementally ingested from cloud object storage landing zone",
table_properties={
"quality": "bronze",
"pipelines.cdc.tombstoneGCThresholdInSeconds": "2" #reducing the threshold to 2 instead of 5
}
)
def Bronze_or():
return (
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("/**Path to raw csv data**/")
)
Then I create a view
expect_list = {"not_null_pk": "id IS NOT NULL", "QuantityNotNeg": "Quantity >= 0"}
@dlt.view(name="Bronze_or_clean_v",
comment="Cleansed bronze retail sales view (i.e. what will become Silver)")
# @dlt.expect("EMPLOYEE_ID", "EMPLOYEE_ID IS NOT NULL")
@dlt.expect_all(expect_list)
# @dlt.expect("valid_address", "address IS NOT NULL")
# @dlt.expect_or_drop("valid_operation", "operation IS NOT NULL")
def Bronze_or_clean_v():
return dlt.read_stream("Bronze_or") \
.withColumn('inputFileName',F.input_file_name()) \
.withColumn('LoadDate',F.lit(datetime.now()))
Finally, I create the silver table
dlt.create_target_table(name="Silver_or",
comment="Clean, merged retail sales data",
table_properties={
"quality": "silver",
"pipelines.cdc.tombstoneGCThresholdInSeconds": "2"
}
)
Lastly, I build the gold table
@dlt.table(name="Gold_or")
def Gold_or():
return (
dlt.read("Silver_or")
# .filter(expr("current_page_title == 'Apache_Spark'"))
# .withColumnRenamed("previous_page_title", "referrer")
.sort(desc("LIVE.Silver_or.CustomerID"))
.select("LIVE.Silver_or.UnitPrice", "LIVE.Silver_or.Quantity", "LIVE.Silver_or.CustomerID")
#.limit(10)
)
I run this Twice for two different CSV files, So in the end, I have two separate Gold tables but I want to combine them into one with select columns.
FYI: Both tables share a foreign key.