1

I'm trying to get nested json values in a pyspark dataframe. I have easily solved this using pandas, but now I'm trying to get it working with just pyspark functions.

print(response)
{'ResponseMetadata': {'RequestId': 'PGMCTZNAPV677CWE', 'HostId': '/8qweqweEfpdegFSNU/hfqweqweqweSHtM=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': '/8yacqweqwe/hfjuSwKXDv3qweqweqweHtM=', 'x-amz-request-id': 'PqweqweqweE', 'date': 'Fri, 09 Sep 2022 09:25:04 GMT', 'x-amz-bucket-region': 'eu-central-1', 'content-type': 'application/xml', 'transfer-encoding': 'chunked', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'IsTruncated': False, 'Contents': [{'Key': 'qweqweIntraday.csv', 'LastModified': datetime.datetime(2022, 7, 12, 8, 32, 10, tzinfo=tzutc()), 'ETag': '"qweqweqwe4"', 'Size': 1165, 'StorageClass': 'STANDARD'}], 'Name': 'test-bucket', 'Prefix': '', 'MaxKeys': 1000, 'EncodingType': 'url', 'KeyCount': 1}

With pandas I can parse this input into a dataframe with the following code:

object_df = pd.DataFrame()
for elem in response:
    if 'Contents' in elem:
        object_df = pd.json_normalize(response['Contents'])


print(object_df)
                               Key              LastModified  \
0  202207110000_qweIntraday.csv 2022-07-12 08:32:10+00:00   

                                 ETag  Size StorageClass  
0  "fqweqweqwee0cb4"  1165     STANDARD

(there are sometimes multiple "Contents", so I have to use recursion).

This was my attempt to replicate this with spark dataframe, and sc.parallelize:

object_df = spark.sparkContext.emptyRDD()
for elem in response:
    if 'Contents' in elem:
        rddjson = spark.read.json(sc.parallelize([response['Contents']]))

Also tried:

sqlc = SQLContext(sc)
rddjson = spark.read.json(sc.parallelize([response['Contents']]))
df = sqlc.read.json("multiline", "true").json(rddjson)

df.show()
+--------------------+
|     _corrupt_record|
+--------------------+
|[{'Key': '2/3c6a6...|
+--------------------+

This is not working. I already saw some related posts, saying that I can use explode like in this example (stackoverflow answer) instead of json_normalize, but i'm having trouble replicating the example.

Any suggestion how I can solve this with pyspark or pyspark.sql (and not adding additional libraries) is very welcome.

ire
  • 491
  • 2
  • 12
  • 26

1 Answers1

1

It looks like the issue is with the data containing a python datetime object (in the LastModified field).

One way around this might be (assuming your ok with python standard libraries):

import json

sc = spark.sparkContext
for elem in response:
    if 'Contents' in elem:        
        json_str = json.dumps(response['Contents'], default=str)
        object_df = spark.read.json(sc.parallelize([json_str]))
s_pike
  • 1,710
  • 1
  • 10
  • 22