3

Problem Statement: On upgrading Databricks runtime version, duplicate column(s) throw error while creating dataframe. In lower runtime, the dataframe was created and since the duplicate column was not required downstream, it was simply excluded in select.

File Location: Json files stored on ADLS Gen2 (Azure). Cluster Mode: Standard

Code: We read it in Azure Databricks as below.

intermediate_df = spark.read.option("multiline","true").json(f"{path}/IN-109418_Part_1.json")

The json file is nested and under one of those occurs tags which is duplicate column(pic below). After reading into dataframe, we pick required columns. We do not require this duplicate tags anyway.

Previously we were running on Databricks runtime 7.3LTS(Spark3.0.1) where it created the dataframe including duplicate columns but since we did not use it further, it didn't hurt.

However we are now upgrading to runtime 9.1LTS(Spark3.1.2) and it throws an error about columns being duplicate while creating dataframe itself. Error message: Found duplicate column(s) in the data schema: `tags`

Pic duplication column:- Duplicate column in json file: tags. Dataframe was created successfully in runtime 7.3LTS(Spark3.0.1)

Conclusion: I've tried selecting columns as soon as I read dataframe but to no success. I have a hunch that since now the upgraded runtime versions of Databricks are more by default Delta table inclined (delta tables do not support duplicate columns within them), there might be a property which we would have to turn off in order to ignore this check throughout notebook or just while reading into dataframe.

Although this exact error occured on json, I believe that it might occur for other file formats like csv if they have duplicate columns.

The file is quite nested and defining schema for all required columns is not very practical as it is tedious and prone to error in case more columns are required in future (this would be secondary solution). File is generated by a vendor using automated process and it is expected that all files will remain in same format as historical files already delivered.

Complete error on runtime 9.1LTS(Spark3.1.2):

AnalysisException                         Traceback (most recent call last)
<command-4270018894919110> in <module>
----> 1 intermediate_df = spark.read.option("multiline","true").json(f"{path}/IN-109418_Part_1.json")

/databricks/spark/python/pyspark/sql/readwriter.py in json(self, path, schema, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, dateFormat, timestampFormat, multiLine, allowUnquotedControlChars, lineSep, samplingRatio, dropFieldIfAllNull, encoding, locale, pathGlobFilter, recursiveFileLookup, allowNonNumericNumbers, modifiedBefore, modifiedAfter)
    370             path = [path]
    371         if type(path) == list:
--> 372             return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    373         elif isinstance(path, RDD):
    374             def func(iterator):

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    121                 # Hide where the exception came from that shows a non-Pythonic
    122                 # JVM exception message.
--> 123                 raise converted from None
    124             else:
    125                 raise

AnalysisException: Found duplicate column(s) in the data schema: `tags`

Edit: Comment on defining schema beforehand.

Nirupam Nishant
  • 209
  • 3
  • 10

3 Answers3

3

There have been different good suggestions which may be helpful from case-to-case.

As pointed out by @ScootCork, defining schema beforehand helps as Spark does not have to create schema on its own. However, my file was quite huge and heavily nested because of which defining schema manually would have been cumbersome.

Finally I did use schema but found a workaround so that I did not have to create it manually. Even with duplicate columns, I was able to create the dataframe in 7.3 LTS runtime as stated in orginal question. Hence I read one file on this runtime and wrote it to ADLS Gen2 (you can store it anywhere). This is a one-time activity and now you can read this file back every time you run your code (multiline does not need to be true while reading this back), get its schema using .schema, and use this schema to read new json files. Since spark does not have to infer schema on its own, it does not throw error for duplicate columns. Note that the duplicate column still exists and you WILL get ambiguous error if you try to use it. However, this method is quite useful if manually defining schema is not very practical due to shear size and complex json structure and if the duplicated columns are of no use. Described below:-

One time activity on 7.3 LTS runtime

# Few columns were coming as duplicate in raw file. e.g.: languages[0].groupingsets[0].element.attributes.tags[0] was repeated twice.
# This caused errror while creating dataframe.
# However, we are able to read it in Databricks Runtime 7.3 LTS. Hence used this runtime to read a file and write it to ADLS as ONE-TIME activity.
# For all further runs, this file can be read using multiline as false, then use its schema while reading the other new files (which in this case needs multiline as true). In this way spark does not have to create schema on its own hence does not throw error eben in higher runtime versions.
# Have used a historical file initially delivered which had a lot of records due to historical data. This ensures we cover all possibilities.
# Can be created again using 7.3 LTS runtime cluster if this schema is deleted. 

dfOldRuntime = spark.read.option("multiline","true").json(pathOneFile) # Can take any file to creat sample schema.
dfOldRuntime.coalesce(1).write.mode('overwrite').format('json').save(pathSchema)

Now use this written file for all future runs even on higher runtimes.

# Read sample which was created using 7.3 LTS runtime.
# The multiline does NOT have to be true for this.
# Get its schema and use it to read new files even on higher runtime without error which was caused due to duplicate columns.
dfSchema = spark.read.json(pathSchema)
schema = dfSchema.schema

# Read new json files using this schema by using `.schema()`. Works on higher runtimes as well since spark now does not have to create schema on its own.
intermediate_df = spark.read.option("multiline","true").schema(schema).json(f"{json_path}")
Nirupam Nishant
  • 209
  • 3
  • 10
2

Please use json.load to convert to json to dictonary with handling duplicated keys

import json

#test json
test_json = """[
   {"id": 1,
   "tags": "test1",
   "tags": "test1"},
  {"id": 2,
   "tags": "test2",
   "tags": "test2",
   "tags": "test3"}]
"""

#function to handle duplicate keys:
def value_resolver(pairs):
    d = {}
    i=1
    for k, v in pairs:
        if k in d:
           d[k + str(i)] = v
           i+=1
        else:
           d[k] = v
    return d

#load
our_dict = json.loads(test_json, object_pairs_hook=value_resolver)
print(our_dict)
>> [{'id': 1, 'tags': 'test1', 'tags1': 'test1'}, {'id': 2, 'tags': 'test2', 'tags1': 'test2', 'tags2': 'test3'}]

#dict to dataframe
df = spark.createDataFrame(our_dict)
df.show()


+---+-----+-----+-----+
| id| tags|tags1|tags2|
+---+-----+-----+-----+
|  1|test1|test1| null|
|  2|test2|test2|test3|
+---+-----+-----+-----+
Hubert Dudek
  • 1,666
  • 1
  • 13
  • 21
1

There is currently no option for this in the spark documentation. There also seem to be differing opinions/standards on the validity of jsons with duplicate key values and how to treat them (SO discussion).

Supplying the schema without the duplicate key field results in a successful load. It takes the value of the last key in the json.

The schema depends on your source file.

test.json

{
    "id": 1,
    "tags": "test1",
    "tags": "test2"
}

python

from pyspark.sql.types import *

schema = StructType([
    StructField('id', LongType(), True),
    StructField('tags', StringType(), True)
])

df = spark.read.schema(schema).json("test.json", multiLine=True)

df.show()

+---+-----+
| id| tags|
+---+-----+
|  1|test2|
+---+-----+

Ran locally on pyspark 3.1.1

ScootCork
  • 3,411
  • 12
  • 22