I am trying to write a data pipeline that reads a .tsv file from Azure Blob Storage and write the data to a MySQL database. I have a sensor that looks for a file with a given prefix within my storage container and then a SparkSubmitOperator which actually reads the data and writes it to the database.
The sensor works fine and when I write the data from local storage to MySQL, that works fine as well. However, I am having quite a bit of trouble reading the data from Blob Storage.
This is the simple Spark job that I am trying to run,
spark = (SparkSession
.builder \
.config("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") \
.config("fs.azure.account.key.{}.blob.core.windows.net".format(blob_account_name), blob_account_key) \
.getOrCreate()
)
sc = spark.sparkContext
sc.setLogLevel("WARN")
df_tsv = spark.read.csv("wasb://{}@{}.blob.core.windows.net/{}".format(blob_container, blob_account_name, blob_name), sep=r'\t', header=True)
mysql_url = 'jdbc:mysql://' + mysql_server
df_tsv.write.jdbc(url=mysql_url, table=mysql_table, mode="append", properties={"user":mysql_user, "password": mysql_password, "driver: "com.mysql.cj.jdbc.Driver" })
This is my SparkSubmitOperator,
spark_job = SparkSubmitOperator(
task_id="my-spark-app",
application="path/to/my/spark/job.py", # Spark application path created in airflow and spark cluster
name="my-spark-app",
conn_id="spark_default",
verbose=1,
conf={"spark.master":spark_master},
application_args=[tsv_file, mysql_server, mysql_user, mysql_password, mysql_table],
jars=azure_hadoop_jar + ", " + mysql_driver_jar,
driver_class_path=azure_hadoop_jar + ", " + mysql_driver_jar,
dag=dag)
I keep getting this error,
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem not found
What exactly am I doing wrong?
I have both mysql-connector-java-8.0.27.jar
and hadoop-azure-3.3.1.jar
in my application. I have given the path to these in the driver_class_path
and jars
parameters. Is there something wrong with how I have done that here?
I have tried following the suggestions given here, Saving Pyspark Dataframe to Azure Storage, but they have not been helpful.