3

I have two linked services for two AzureDataLakeStorage-Gen2 set up in my Synapse workspace:

  • one_linked_service_name
  • two_linked_service_name

I need to copy data (and do transformations) from one ADLS to the other ADLS.

I have a list of parquet-directories that I want to copy. Sometimes the code executes smoothly, but then it crashes seemingly randomly in the middle of the list. The maddening fact is that I cannot reliably reproduce the error.

At first I thought that maybe I had to collect the data in between the steps because I imagined that the lazy execution meant that PySpark didn't retrieve the data before it switched the linkedServiceName, but the error (randomly) persists.

I am out of ideas by now. Any help is appreciated.

Code:

for filepath_source in filepaths:
   spark.conf.set("fs.azure.account.auth.type", "SAS")
   spark.conf.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")


   spark.conf.set("spark.storage.synapse.linkedServiceName", one_linked_service_name)
   print("Switched to", spark.conf.get("spark.storage.synapse.linkedServiceName"))
   df = spark.read.option("forwardSparkAzureStorageCredentials ", "true").parquet(ZONE_RAW_CONNECTION_STR + "/" + filepath_source)

   _ = df.collect() # test, but did not change the outcome. 

        
   # Store into /refined/ zone
   spark.conf.set("spark.storage.synapse.linkedServiceName", two_linked_service_name)
   print("Switched to", spark.conf.get("spark.storage.synapse.linkedServiceName"))
   df.write \
       .mode("overwrite") \
       .option("forwardSparkAzureStorageCredentials ", "true")\
       .option("compression", "snappy") \
       .parquet(ZONE_REFINED_CONNECTION_STR + "/" + filepath_target)

Error:

Caused by: java.nio.file.AccessDeniedException: Operation failed: 
"Server failed to authenticate the request. 
Make sure the value of Authorization header is formed correctly including the signature.", 403, HEAD, 
https://mydatastorename.dfs.core.windows.net/some/path?upn=false&action=getStatus&timeout=90&sv=2020-02-10&ss=bf&srt=sco&se=2022-02-17T17%3A13%3A26Z&sp=rwdl&sig=XXXX
at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:199)
at org.apache.hadoop.fs.azurebfs.services.AbfsClient.getPathStatus(AbfsClient.java:560)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.openFileForRead(AzureBlobFileSystemStore.java:627)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.open(AzureBlobFileSystem.java:196)
... 26 more
Traceback (most recent call last):

File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 677, in collect
sock_info = self._jdf.collectToPython()

File "/home/trusted-service-user/cluster-env/env/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(

File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 
111, in deco
return f(*a, **kw)

File "/home/trusted-service-user/cluster-env/env/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
CHEEKATLAPRADEEP
  • 12,191
  • 1
  • 19
  • 42
Cribber
  • 2,513
  • 2
  • 21
  • 60

3 Answers3

0

Why don't you save the dataframe in the DBFS /tmp/data/ folder instead of performing a collect() and then write to the other adls. This should work.

Frau P
  • 114
  • 1
  • 1
  • 6
  • This would mean I have 2 physical disk read/writes for every 1 logical read/write operation - I'm hoping there is a better solution... – Cribber Feb 21 '22 at 08:47
0

OK this is a total hack but I have it working let me lay out the scenario

  • Azure Synapse Workspace using a Managed Virtual Network.
  • Azure Synapse Pipeline.
  • source account ADLS Gen 2 secured with endpoints.
  • sink account ADLS Gen 2 secured with endpoints. This happens to be the default ADLS account for the Azure Synapse Workspace.
  • source account ADLS Gen 2 has directory format yyyy/mm/dd with only one file in that directory containing data in jsonl format.
  • sink account ADLS Gen 2 has directory format Year=yyyy/Month=mm/Day=dd. The data from the source has a few transformations and is stored in parquet format.
  • Using a pyspark spark notebook and this is critical YOU MUST HAVE THE NUMBER OF EXECUTORS ON THE NOTEBOOK SET TO 1.

I was getting the same errors that you were getting but kept plugging away at it and this is what I ended up with.

spark.conf.set("spark.storage.synapse.linkedServiceName", psourceLinkedServiceName)
spark.conf.set("fs.azure.account.auth.type", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")

df = spark.read.schema(inputSchema).json(psourceFile)

#perform the transformations to flatten this structure 
#create the partition columns for parquet
dfTransform = {df tranformations here}


#this is critical must do a show
dfTransform.show() 


#change the linked service
spark.conf.set("spark.storage.synapse.linkedServiceName", psinkLinkedServiceName)


#output the parquet result
dfTransform.write.partitionBy("Year","Month","Day").mode('append').format("parquet").save(targetFile) ;

OK why does this work.

  1. I am only passing in one file name from the source account to process.
  2. I have the spark notebook set to 1 executor.
  3. I am doing a dfTransform.show().

I guessed that using the linked services feature of the Microsoft TokenLibrary was designed to work within the containers of a single storage account. But I noticed that it worked across storage accounts if the file was very small. After repeated experiments I figured out that I could get it to work for a single file of any size as long as I followed the 3 items above. You need to invoke the show method so that data is actually loaded into the dfTransform data frame. After that load you are safe to change the linked service and write out the data. However if you have more than one executor things get out of wack because the additional executors use the last linked service change for the source read.

Hope this helps you and good luck.

bmukes
  • 119
  • 2
  • 9
0

It's been such a long time that I forgot about this question on SO. The issue was that pySpark overwrote the credentials before retrieving the entire data (lazy execution and all that ... ).

After having a lengthy conversation with the MS support, there is a quite simple solution: not using (and having to overwrite) the same spark config (spark.storage.synapse.linkedServiceName) for authentication purposes when reading/writing from/to multiple ADLS.

We can use the spark config

fs.azure.sas.{container_name}.{account_name}.dfs.core.windows.net

to store SAS tokens which are being retrieved when reading/writing. Since the config is individualized for each account (and even container), we don't get any problems when "switching" between ADLS as each has their own config for authentication.

# In Synapse we can use `mssparkutils` to retrieve a SAS token via the LinkedServices. 
# Outside of Synapse we have to get the SAS token another way as 
# the package & the linked services are not availabe 
sas_token_storage_x = mssparkutils.credentials.getConnectionStringOrCreds(
            linked_service_name
        )


# set the token for each account individually in their own config
spark.conf.set(
            f"fs.azure.sas.{adls_container}.{adls_account}.dfs.core.windows.net",
            sas_token_storage_x ,
        )

# read data
path= f"abfss://{adls_container}@{adls_account}.dfs.core.windows.net/my/path/to/file_x.csv"
spark.read.load(
    path,
    format="csv",
    schema=schema,
    encoding=encoding,
    header=header,
    sep=separator,
)
# analogue for parquet etc.
Cribber
  • 2,513
  • 2
  • 21
  • 60