1

I am trying to write a data pipeline that reads a .tsv file from Azure Blob Storage and write the data to a MySQL database. I have a sensor that looks for a file with a given prefix within my storage container and then a SparkSubmitOperator which actually reads the data and writes it to the database.

The sensor works fine and when I write the data from local storage to MySQL, that works fine as well. However, I am having quite a bit of trouble reading the data from Blob Storage.

This is the simple Spark job that I am trying to run,

spark = (SparkSession
    .builder \
    .config("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") \
    .config("fs.azure.account.key.{}.blob.core.windows.net".format(blob_account_name), blob_account_key) \
    .getOrCreate()
)
sc = spark.sparkContext
sc.setLogLevel("WARN")

df_tsv = spark.read.csv("wasb://{}@{}.blob.core.windows.net/{}".format(blob_container, blob_account_name, blob_name), sep=r'\t', header=True)

mysql_url = 'jdbc:mysql://' + mysql_server
df_tsv.write.jdbc(url=mysql_url, table=mysql_table, mode="append", properties={"user":mysql_user, "password": mysql_password, "driver: "com.mysql.cj.jdbc.Driver" })

This is my SparkSubmitOperator,

spark_job = SparkSubmitOperator(
    task_id="my-spark-app",
    application="path/to/my/spark/job.py", # Spark application path created in airflow and spark cluster
    name="my-spark-app",
    conn_id="spark_default",
    verbose=1,
    conf={"spark.master":spark_master},
    application_args=[tsv_file, mysql_server, mysql_user, mysql_password, mysql_table],
    jars=azure_hadoop_jar + ", " + mysql_driver_jar,
    driver_class_path=azure_hadoop_jar + ", " + mysql_driver_jar,
    dag=dag)

I keep getting this error,

java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem not found

What exactly am I doing wrong?

I have both mysql-connector-java-8.0.27.jar and hadoop-azure-3.3.1.jar in my application. I have given the path to these in the driver_class_path and jars parameters. Is there something wrong with how I have done that here?

I have tried following the suggestions given here, Saving Pyspark Dataframe to Azure Storage, but they have not been helpful.

Minura Punchihewa
  • 1,498
  • 1
  • 12
  • 35

0 Answers0