0

I have four files main.py, jobs.zip, libs.zip & params.yaml and these I have stored on Azure Storage Account Container.

Now I have this code which is making a payload and will try to run a spark job using that payload. And that payload will be having the location link of these 4 files.

hook = AzureSynapseHook(
            azure_synapse_conn_id=self.azure_synapse_conn_id, spark_pool=self.spark_pool
        )

payload = SparkBatchJobOptions(
            name=f"{self.job_name}_{self.app_id}",
            file=f"abfss://{Variable.get('ARTIFACT_BUCKET')}@{Variable.get('ARTIFACT_ACCOUNT')}.dfs.core.windows.net/{self.env}/{SPARK_DIR}/main.py",
            arguments=self.job_args,
            python_files=[
                f"abfss://{Variable.get('ARTIFACT_BUCKET')}@{Variable.get('ARTIFACT_ACCOUNT')}.dfs.core.windows.net/{self.env}/{SPARK_DIR}/jobs.zip",
                f"abfss://{Variable.get('ARTIFACT_BUCKET')}@{Variable.get('ARTIFACT_ACCOUNT')}.dfs.core.windows.net/{self.env}/{SPARK_DIR}/libs.zip",
            ],
            files=[
                f"abfss://{Variable.get('ARTIFACT_BUCKET')}@{Variable.get('ARTIFACT_ACCOUNT')}.dfs.core.windows.net/{self.env}/{SPARK_DIR}/params.yaml"
            ],
        )

self.log.info("Executing the Synapse spark job.")
response = hook.run_spark_job(payload=payload)

I have checked the location link that is correct but when I run this on airflow it throws an error related to the payload which I think it is trying to say that it is not able to grab the links.

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/azure/core/pipeline/transport/_base.py", line 579, in format_url
    base = self._base_url.format(**kwargs).rstrip("/")
KeyError: 'endpoint'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/airflow/dags/operators/spark/__init__.py", line 36, in execute
    return self.executor.execute()
  File "/usr/local/airflow/dags/operators/spark/azure.py", line 60, in execute
    response = hook.run_spark_job(payload=payload)
  File "/usr/local/lib/python3.9/site-packages/airflow/providers/microsoft/azure/hooks/synapse.py", line 144, in run_spark_job
    job = self.get_conn().spark_batch.create_spark_batch_job(payload)
  File "/usr/local/lib/python3.9/site-packages/azure/synapse/spark/operations/_spark_batch_operations.py", line 163, in create_spark_batch_job
    request = self._client.post(url, query_parameters, header_parameters, **body_content_kwargs)
  File "/usr/local/lib/python3.9/site-packages/azure/core/pipeline/transport/_base.py", line 659, in post
    request = self._request(
  File "/usr/local/lib/python3.9/site-packages/azure/core/pipeline/transport/_base.py", line 535, in _request
    request = HttpRequest(method, self.format_url(url))
  File "/usr/local/lib/python3.9/site-packages/azure/core/pipeline/transport/_base.py", line 582, in format_url
    raise ValueError(err_msg.format(key.args[0]))
ValueError: The value provided for the url part endpoint was incorrect, and resulted in an invalid url

I also want to know the difference of abfss and wasbs and where should i upload my files so that the code will be able to grab the links ?

Maybe I am uploading the files at wrong place.

member2
  • 36
  • 6

1 Answers1

0

You have something wrong in the connection self.azure_synapse_conn_id, where the host (Synapse Workspace URL) is not valid, here is an example of the connection:

Connection(
    conn_id=DEFAULT_CONNECTION_CLIENT_SECRET,
    conn_type="azure_synapse",
    host="https://testsynapse.dev.azuresynapse.net",
    login="clientId",
    password="clientSecret",
    extra=json.dumps(
        {
            "extra__azure_synapse__tenantId": "tenantId",
            "extra__azure_synapse__subscriptionId": "subscriptionId",
        }
    ),
)

For the difference between abfss and wasbs, here is a detailed answer about the topic.

Hussein Awala
  • 4,285
  • 2
  • 9
  • 23