2

I used rdd.map in order to extract and decode a json from a column like so:

def process_data(row):
    encoded_data = json.loads(row["value"])
    base64_bytes = encoded_data["payload"].encode('ascii')
    ecoded_data_bytes = base64.b64decode(base64_bytes)
    data = json.loads(ecoded_data_bytes.decode('ascii'), strict=False)
    return data, row["file_name"], row["load_time"]

df = df.rdd.map(process_data).toDF

I got the data column as a map type, but I want it as a struct, can I do it?

A row of the data I’m working with looks like that:

{“value” = <encoded data>, “file_name”=“a”, “load_time”=1/1/1}

The encoded data(what’s in value) looks like this:

{“payload”=[
  {
    “key_1”={
     “key_2”=val_2, 
     “key_3”=val_3
    }
  }, {
    “key_1”={
     “key_2”=val_2, 
     “key_3”=val_3
  }}, 
}]}

To avoid this problem I also tried to use 'withColumn' to decode and load the json, but when I loaded the json with this command:

df.withColumn("payload", from_json(col("payload"), json_schema))

Every cell in "payload" returned null(even when I limited myself to only one row).

Why this kind of load does not work? is there a better way?

Avner Huri
  • 111
  • 2
  • 8
  • 1
    Does [this topic](https://stackoverflow.com/questions/49675860/pyspark-converting-json-string-to-dataframe) help you? – Christophe Feb 17 '22 at 15:28
  • Or [this link](https://sparkbyexamples.com/spark/spark-from_json-convert-json-column-to-struct-map-or-multiple-columns/) – Christophe Feb 17 '22 at 15:30
  • 1
    @Christophe I have a json per row, so I think it won’t work. The second one seems to be the same as I did(or is more to it?) – Avner Huri Feb 17 '22 at 16:15

1 Answers1

1

For casting a map to a json part: after asking a colleague, I understood that such casting couldn't work, simply because map type is key value one without any specific schema not like struct type. Because more information is needed, map to struct cast can't work.

For the loading a json part: I managed to solve the json issue after removing the json loading and using the "failfast" mode to load the json:

json_schema = spark.read.json(df.rdd.map(lambda row: row["payload"])).schema
df = df.withColumn("payload", from_json(col("payload"), json_schema, options={"mode": "FAILFAST"}))

I got an exception: BadRecordException: java.lang.RuntimeException: Parsing JSON arrays as structs is forbidden.

I warped the payload with another json, like so:

def warp_data(payload):
  try:
    payload = json.loads(payload, strict=False)
    payload_as_dictionary = {"payload": payload}
    return json.dumps(payload_as_dictionary)
  except:
    return None

warp_data_udf = udf(warp_data)

And doing:

df.withColumn("payload", warp_data_udf("payload"))

After that I was able to load the json and work with it.

ouflak
  • 2,458
  • 10
  • 44
  • 49
Avner Huri
  • 111
  • 2
  • 8