I'm trying to get nested json values in a pyspark dataframe. I have easily solved this using pandas, but now I'm trying to get it working with just pyspark functions.
print(response)
{'ResponseMetadata': {'RequestId': 'PGMCTZNAPV677CWE', 'HostId': '/8qweqweEfpdegFSNU/hfqweqweqweSHtM=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': '/8yacqweqwe/hfjuSwKXDv3qweqweqweHtM=', 'x-amz-request-id': 'PqweqweqweE', 'date': 'Fri, 09 Sep 2022 09:25:04 GMT', 'x-amz-bucket-region': 'eu-central-1', 'content-type': 'application/xml', 'transfer-encoding': 'chunked', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'IsTruncated': False, 'Contents': [{'Key': 'qweqweIntraday.csv', 'LastModified': datetime.datetime(2022, 7, 12, 8, 32, 10, tzinfo=tzutc()), 'ETag': '"qweqweqwe4"', 'Size': 1165, 'StorageClass': 'STANDARD'}], 'Name': 'test-bucket', 'Prefix': '', 'MaxKeys': 1000, 'EncodingType': 'url', 'KeyCount': 1}
With pandas I can parse this input into a dataframe with the following code:
object_df = pd.DataFrame()
for elem in response:
if 'Contents' in elem:
object_df = pd.json_normalize(response['Contents'])
print(object_df)
Key LastModified \
0 202207110000_qweIntraday.csv 2022-07-12 08:32:10+00:00
ETag Size StorageClass
0 "fqweqweqwee0cb4" 1165 STANDARD
(there are sometimes multiple "Contents", so I have to use recursion).
This was my attempt to replicate this with spark dataframe, and sc.parallelize:
object_df = spark.sparkContext.emptyRDD()
for elem in response:
if 'Contents' in elem:
rddjson = spark.read.json(sc.parallelize([response['Contents']]))
Also tried:
sqlc = SQLContext(sc)
rddjson = spark.read.json(sc.parallelize([response['Contents']]))
df = sqlc.read.json("multiline", "true").json(rddjson)
df.show()
+--------------------+
| _corrupt_record|
+--------------------+
|[{'Key': '2/3c6a6...|
+--------------------+
This is not working. I already saw some related posts, saying that I can use explode like in this example (stackoverflow answer) instead of json_normalize, but i'm having trouble replicating the example.
Any suggestion how I can solve this with pyspark or pyspark.sql (and not adding additional libraries) is very welcome.