I have a data preprocessing pipeline where I clean data from tens of thousands of tweets. I want to save my dataframe in stages so that I can load these "save points" from later stages in my pipeline. I have read that saving a dataframe in a parquet format is the most 'efficient' writing method in that it is quick, scalable etc. and this is ideal for me as I'm trying to keep scalability in mind for this project.
However, I have ran into a problem where I cannot seem to save fields which contain structs to file. I receive a JSON error json.decoder.JSONDecodeError: Expecting ',' delimiter: ...
when trying to output my dataframe (more details below).
My dataframe is currently in the following format:
+------------------+----------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------+
| id| timestamp| tweet_text| tweet_hashtags|tweet_media| tweet_urls| topic| categories|priority|
+------------------+----------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------+
|266269932671606786|1536170446|Eight dead in the...| []| []| []|guatemalaEarthqua...|[Report-EmergingT...| Low|
|266804609954234369|1536256997|Guys, lets help ... |[[Guatemala, [72,...| []|[[http:url... |guatemalaEarthqua...|[CallToAction-Don...| Medium|
|266250638852243457|1536169939|My heart goes out...|[[Guatemala, [31,...| []| []|guatemalaEarthqua...|[Report-EmergingT...| Medium|
|266381928989589505|1536251780|Strong earthquake...| []| []|[[http:url... |guatemalaEarthqua...|[Report-EmergingT...| Medium|
|266223346520297472|1536167235|Magnitude 7.5 Qua...| []| []| []|guatemalaEarthqua...|[Report-EmergingT...| Medium|
+------------------+----------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------+
only showing top 5 rows
with the following schema for clarity:
root
|-- id: string (nullable = true)
|-- timestamp: long (nullable = true)
|-- tweet_text: string (nullable = true)
|-- tweet_hashtags: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- text: string (nullable = false)
| | |-- indices: array (nullable = false)
| | | |-- element: integer (containsNull = true)
|-- tweet_media: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id_str: string (nullable = true)
| | |-- type: string (nullable = false)
| | |-- url: string (nullable = true)
| | |-- media_url: string (nullable = true)
| | |-- media_https: string (nullable = true)
| | |-- display_url: string (nullable = true)
| | |-- expanded_url: string (nullable = true)
| | |-- indices: array (nullable = false)
| | | |-- element: integer (containsNull = true)
|-- tweet_urls: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- url: string (nullable = false)
| | |-- display_url: string (nullable = true)
| | |-- expanded_url: string (nullable = true)
| | |-- indices: array (nullable = false)
| | | |-- element: integer (containsNull = true)
|-- topic: string (nullable = true)
|-- categories: array (nullable = true)
| |-- element: string (containsNull = true)
|-- priority: string (nullable = true)
I am trying to save this dataframe in the parquet format with the following line:
df.write.mode('overwrite').save(
path=f'{DATA_DIR}/interim/feature_select.parquet',
format='parquet')
and also with df.write.parquet(f'{DATA_DIR}/interim/feature_select.parquet', mode='overwrite')
.
However, I am presented with the error json.decoder.JSONDecodeError: Expecting ',' delimiter: ...
when trying to save these files:
File "features.py", line 207, in <lambda>
entities_udf = F.udf(lambda s: _convert_str_to_arr(s), v)
File "features.py", line 194, in _convert_str_to_arr
arr = [json.loads(x) for x in arr]
File "features.py", line 194, in <listcomp>
arr = [json.loads(x) for x in arr]
File "/media/ntfs/anaconda3/envs/py37/lib/python3.7/json/__init__.py", line 348, in loads
return _default_decoder.decode(s)
File "/media/ntfs/anaconda3/envs/py37/lib/python3.7/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/media/ntfs/anaconda3/envs/py37/lib/python3.7/json/decoder.py", line 353, in raw_decode
obj, end = self.scan_once(s, idx)
json.decoder.JSONDecodeError: Expecting ',' delimiter: line 1 column 93 (char 92)
The line stated in the error code also refers to an earlier UDF
transformation I had made on a number of columns (cols tweet_*
). This works fine when I remove the writer.
I wasn't able to find much on specifying a delimiter for parquet files, is this something that is possible? Or will I have to serialize any data which contains a comma? Or will I even have to take the Spark structs that I've parsed and changed, and convert them back into JSON in order to save the file?