3

I am running a DLT (Delta Live Table) Job that creates a Bronze table > Silver Table for two separate tables. So in the end, I have two separate gold Tables which I want to be merged into one table. I know how to do it in SQL but every time I run the job with an SQL cell in a Databricks notebook on Databricks it gives me this error.

Magic commands (e.g. %py, %sql and %run) are not supported with the exception of
%pip within a Python notebook. Cells containing magic commands are ignored.
Unsupported magic commands were found in the following notebooks

I would do it in PySpark but it does not have creat table functionalities.

Here is my code for making the bronze table

@dlt.table(name="Bronze_or",
                  comment = "New online retail sales data incrementally ingested from cloud object storage landing zone",
  table_properties={
    "quality": "bronze",
    "pipelines.cdc.tombstoneGCThresholdInSeconds": "2" #reducing the threshold to 2 instead of 5
  }
)
def Bronze_or():
  return (
    spark.readStream.format("cloudFiles") \
      .option("cloudFiles.format", "csv") \
      .option("cloudFiles.inferColumnTypes", "true") \
      .load("/**Path to raw csv data**/")
  )

Then I create a view

expect_list = {"not_null_pk": "id IS NOT NULL", "QuantityNotNeg": "Quantity >= 0"}

@dlt.view(name="Bronze_or_clean_v",
  comment="Cleansed bronze retail sales view (i.e. what will become Silver)")

# @dlt.expect("EMPLOYEE_ID", "EMPLOYEE_ID IS NOT NULL")
@dlt.expect_all(expect_list) 
# @dlt.expect("valid_address", "address IS NOT NULL")
# @dlt.expect_or_drop("valid_operation", "operation IS NOT NULL")

def Bronze_or_clean_v():
  return dlt.read_stream("Bronze_or") \
  .withColumn('inputFileName',F.input_file_name()) \
  .withColumn('LoadDate',F.lit(datetime.now()))

Finally, I create the silver table

dlt.create_target_table(name="Silver_or",
  comment="Clean, merged retail sales data",
  table_properties={
    "quality": "silver",
    "pipelines.cdc.tombstoneGCThresholdInSeconds": "2"
  }
)

Lastly, I build the gold table

@dlt.table(name="Gold_or")
def Gold_or():
  return (
    dlt.read("Silver_or")
#       .filter(expr("current_page_title == 'Apache_Spark'"))
#       .withColumnRenamed("previous_page_title", "referrer")
      .sort(desc("LIVE.Silver_or.CustomerID"))
      .select("LIVE.Silver_or.UnitPrice", "LIVE.Silver_or.Quantity", "LIVE.Silver_or.CustomerID")
      #.limit(10)
  )

I run this Twice for two different CSV files, So in the end, I have two separate Gold tables but I want to combine them into one with select columns.

FYI: Both tables share a foreign key.

1 Answers1

0

Have you considered writing it in SQL notebook ? So that you can get rid of %sql magic commands and directly write in SQL.

See screenshot to see how to switch : enter image description here

Axel R.
  • 1,141
  • 7
  • 22
  • I tried that while running this query: `DROP TABLE Gold_data; CREATE TABLE IF NOT EXISTS Gold_data AS SELECT * FROM gold_or LEFT JOIN gold_rc ON gold_or.CustomerID=gold_rc.customer_id;` But then it gave me this error in DLT: `org.apache.spark.sql.AnalysisException: Unable to process statement of type: 'SetCatalogAndNamespace'. DLT currently only accepts 'CREATE TEMPORARY LIVE VIEW', 'CREATE OR REFRESH LIVE TABLE', 'APPLY CHANGES INTO', and 'SET' statements.` – Anton Kopti Jul 22 '22 at 16:13
  • 1
    I have exactly the same issue, have you solved yours? – emily.mi Jan 30 '23 at 09:20