2

I have a data preprocessing pipeline where I clean data from tens of thousands of tweets. I want to save my dataframe in stages so that I can load these "save points" from later stages in my pipeline. I have read that saving a dataframe in a parquet format is the most 'efficient' writing method in that it is quick, scalable etc. and this is ideal for me as I'm trying to keep scalability in mind for this project.

However, I have ran into a problem where I cannot seem to save fields which contain structs to file. I receive a JSON error json.decoder.JSONDecodeError: Expecting ',' delimiter: ... when trying to output my dataframe (more details below).

My dataframe is currently in the following format:

+------------------+----------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------+
|                id| timestamp|          tweet_text|      tweet_hashtags|tweet_media|          tweet_urls|               topic|          categories|priority|
+------------------+----------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------+
|266269932671606786|1536170446|Eight dead in the...|                  []|         []|                  []|guatemalaEarthqua...|[Report-EmergingT...|     Low|
|266804609954234369|1536256997|Guys, lets help ... |[[Guatemala, [72,...|         []|[[http:url...       |guatemalaEarthqua...|[CallToAction-Don...|  Medium|
|266250638852243457|1536169939|My heart goes out...|[[Guatemala, [31,...|         []|                  []|guatemalaEarthqua...|[Report-EmergingT...|  Medium|
|266381928989589505|1536251780|Strong earthquake...|                  []|         []|[[http:url...       |guatemalaEarthqua...|[Report-EmergingT...|  Medium|
|266223346520297472|1536167235|Magnitude 7.5 Qua...|                  []|         []|                  []|guatemalaEarthqua...|[Report-EmergingT...|  Medium|
+------------------+----------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------+
only showing top 5 rows

with the following schema for clarity:

root
 |-- id: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- tweet_text: string (nullable = true)
 |-- tweet_hashtags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- text: string (nullable = false)
 |    |    |-- indices: array (nullable = false)
 |    |    |    |-- element: integer (containsNull = true)
 |-- tweet_media: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id_str: string (nullable = true)
 |    |    |-- type: string (nullable = false)
 |    |    |-- url: string (nullable = true)
 |    |    |-- media_url: string (nullable = true)
 |    |    |-- media_https: string (nullable = true)
 |    |    |-- display_url: string (nullable = true)
 |    |    |-- expanded_url: string (nullable = true)
 |    |    |-- indices: array (nullable = false)
 |    |    |    |-- element: integer (containsNull = true)
 |-- tweet_urls: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- url: string (nullable = false)
 |    |    |-- display_url: string (nullable = true)
 |    |    |-- expanded_url: string (nullable = true)
 |    |    |-- indices: array (nullable = false)
 |    |    |    |-- element: integer (containsNull = true)
 |-- topic: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- priority: string (nullable = true)

I am trying to save this dataframe in the parquet format with the following line:

df.write.mode('overwrite').save(
    path=f'{DATA_DIR}/interim/feature_select.parquet',
    format='parquet')

and also with df.write.parquet(f'{DATA_DIR}/interim/feature_select.parquet', mode='overwrite').

However, I am presented with the error json.decoder.JSONDecodeError: Expecting ',' delimiter: ... when trying to save these files:

  File "features.py", line 207, in <lambda>
    entities_udf = F.udf(lambda s: _convert_str_to_arr(s), v)
  File "features.py", line 194, in _convert_str_to_arr
    arr = [json.loads(x) for x in arr]
  File "features.py", line 194, in <listcomp>
    arr = [json.loads(x) for x in arr]
  File "/media/ntfs/anaconda3/envs/py37/lib/python3.7/json/__init__.py", line 348, in loads
    return _default_decoder.decode(s)
  File "/media/ntfs/anaconda3/envs/py37/lib/python3.7/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/media/ntfs/anaconda3/envs/py37/lib/python3.7/json/decoder.py", line 353, in raw_decode
    obj, end = self.scan_once(s, idx)
json.decoder.JSONDecodeError: Expecting ',' delimiter: line 1 column 93 (char 92)

The line stated in the error code also refers to an earlier UDF transformation I had made on a number of columns (cols tweet_*). This works fine when I remove the writer.

I wasn't able to find much on specifying a delimiter for parquet files, is this something that is possible? Or will I have to serialize any data which contains a comma? Or will I even have to take the Spark structs that I've parsed and changed, and convert them back into JSON in order to save the file?

Shaido
  • 27,497
  • 23
  • 70
  • 73
apgsov
  • 794
  • 1
  • 8
  • 30

1 Answers1

0

This error is actually not related to parquet at all. The transformations on the dataframe are not applied until an action is taken (in this case, saving to parquet). So the error will not occur until this point.

From the error, we can see that the actual problem is the line:

arr = [json.loads(x) for x in arr]

which occurs inside the UDF transformation.

A json.decoder.JSONDecodeError error occurs when there is some problem with the JSON. Two common problems are that it is not valid JSON or there is a quote issue, see here. So,

  1. Confirm that the columns contains valid JSON.
  2. Try replacing \" with \\", this can be done as x.replace("\\", r"\\").
Shaido
  • 27,497
  • 23
  • 70
  • 73