0

I have been able to write to console the json file I want to work on to console. Please, how do I separate the 'value' column into columns of data as in the json and write to delta lake for sql query and MLlib? Thanks.

{"coord": {"lon": -1.15, "lat": 52.95}, "list": [{"main": {"aqi": 2}, "components": {"co": 220.3, "no": 0.26, "no2": 5.14, "o3": 75.1, "so2": 1.54, "pm2_5": 1.8, "pm10": 2.71, "nh3": 2.79}, "dt": 1679418000}, {"main": {"aqi": 2}, "components": {"co": 220.3, "no": 0.07, "no2": 7.45, "o3": 72.24, "so2": 2.18, "pm2_5": 1.9, "pm10": 2.9, "nh3": 3.45}, "dt": 1679421600}}

Value result image here

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
kaffy
  • 15
  • 5

2 Answers2

1

I defined an arraytype struct schema for the json value I want to explode;

schema = "array<struct<main:struct<aqi:int>, components:struct<co:double, no:double, no2:double ...>>"

Then create a data frame with;

df_new = df.select(get_json_object(col("value"), "$.coord").alias("coord"),explode(from_json(get_json_object(col("value"),"$.list"), schema)).alias("exploded_col"),"timestamp")

I extracted value of the explode column into different columns with;

df_stream = df_new.select(
  col("exploded_col.main.aqi").alias("aqi"), 
  col("exploded_col.components.co").alias("co"), 
  col("exploded_col.components.no").alias("no")
  ...
)
Tyler2P
  • 2,324
  • 26
  • 22
  • 31
kaffy
  • 15
  • 5
0

Use get_json_object for each field you want, ex.

get_json_object(col("value"), "$.coord").alias("coord")

For the list field, you need to explode

explode(get_json_object(col("value"), "$.list"))) 
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • Thank you so much for your response. I have done this but got a syntax error "SyntaxError: unexpected EOF while parsing" what could be wrong please, also do I need to cast value into a string df_new = df.select(get_json_object(col("value"), "$.coord").alias("coord"),explode(get_json_object(col("value"), "$.list"),"timestamp") – kaffy Mar 29 '23 at 14:09
  • sorry, I find that I'm missing a bracket. but when I write the new data frame to the console there are no values for each column – kaffy Mar 29 '23 at 14:40
  • I don't know what your `df` contains. But if you get no output, it means it didn't select anything... Also, I don't think you'll be able to select one object, one field, and also expand an array to multiple rows all in one query. I recommend debugging each of these functions individually to verify they work, then you can union multiple dataframes later, if needed – OneCricketeer Mar 29 '23 at 14:43
  • 1
    thank you so much, I was able to select all at once I think I just needed to add the right enclosure for each, your code works! so I got an output +--------------------+--------------------+--------------------+ | coord| col| timestamp| +--------------------+--------------------+--------------------+ |{"lon":-1.15,"lat...|[{"main":{"aqi":2...|2023-03-29 18:09:...| |{"lon":-1.15,"lat...|[{"main":{"aqi":2...|2023-03-29 18:13:...| |{"lon":-1.15,"lat...|[{"main":{"aqi":2...|2023-03-29 18:23:...| – kaffy Mar 29 '23 at 17:29
  • this is the code; df_new = df.select(get_json_object(col("value"), "$.coord").alias("coord"),explode(array((get_json_object(col("value"),"$.list").alias("list")))),"timestamp") from the output the 'List' column still has nested json 'main and components' with different variables each as shown in the above json that I posted in my question, how do I seperate these variables into different columns, would I still use 'explode' on 'col' which is the output for List? same for 'coord' column that has lon and lat which will probably be the same value across. – kaffy Mar 29 '23 at 17:30
  • So, the second param to `get_json_object` is JSONPath, which means you could do something like `$.list[*].components` ... I dont know the exact syntax, but copy your data into https://jsonpath.com/ ; Also, I dont think you need the extra `array()` call. This looks like it is creating you a one element array in each exploded row – OneCricketeer Mar 30 '23 at 17:24
  • Thank you so much for the 'jsonpath' site it made it easy to know which element to select to get the values of the variables. although, when I applied the syntax e.g $.list[*].components.co, $.list[*].main.aqi to the df with array I got a null values in the columns which are the elements of the explode 'List', when I removed the array() I got an error "cannot resolve 'explode(get_json_object(value, '$.list'))' due to data type mismatch: input to function explode should be an array or map type, not string|" As for 'coord' it worked well, I was able to separate the 'lon' and 'lat' into columns. – kaffy Apr 01 '23 at 17:02
  • Your error with the list is because the function "_returns json **string** of the extracted json object._"... Looks like you need `explode(from_json(get_json_object(...), schema))` and apply an ArrayType schema - https://stackoverflow.com/questions/55074331/apache-spark-read-json-with-extra-columns – OneCricketeer Apr 02 '23 at 13:02
  • 1
    Thank you so much! I really appreciate your help. I did as you mentioned above and it worked perfectly well. my solution is in the answer box. – kaffy Apr 05 '23 at 06:21