2

I have data/rows of multiple key/value pairs with an unknown number of keys -- some overlapping and some not -- that I would like to create a Spark DataFrame from. My ultimate goal is to write CSV from this DataFrame.

I have flexibility with the input data/rows: most readily they are JSON strings, but could be converted, varying by potentially overlapping keys:

{"color":"red", "animal":"fish"}
{"color":"green", "animal":"panda"}
{"color":"red", "animal":"panda", "fruit":"watermelon"}
{"animal":"aardvark"}
{"color":"blue", "fruit":"apple"}

Ideally, I would like to create a DataFrame that looks like this from this data:

-----------------------------
color | animal   | fruit
-----------------------------
red   | fish     | null
green | panda    | null
red   | panda    | watermelon
null  | aardvark | null
blue  | null     | apple
-----------------------------

Of note, data/rows without a particular key are null, and all keys from the data/rows are represented as columns.

I feel relatively comfortable with many of the Spark basics, but am having trouble envisioning a process for efficiently taking my RDD/DataFrame with key/value pairs -- but an unknown number of columns and keys -- and creating a DataFrame with those keys as columns.

Efficient, in that I would like to avoid, if possible, creating an object where all input rows are held in memory (e.g. a single dictionary).

With, again, the final goal of writing CSV, where I'm assuming creating a DataFrame is a logical step to that end.

Another wrinkle:

Some of the data will be multivalued, something like:

{"color":"pink", "animal":["fish","mustang"]}
{"color":["orange","purple"], "animal":"panda"}

With a provided delimiter, e.g. / to avoid collision with , for delimiting columns, I would like to delimit these in output for column, e.g.:

------------------------------------
color         | animal       | fruit
------------------------------------
pink          | fish/mustang | null
orange/purple | panda        | null
------------------------------------

Once there is an approach for the primary question, I'm confident I can work this part out, but throwing it out there anyhow as it will be a dimension of the problem.

Community
  • 1
  • 1
ghukill
  • 1,136
  • 17
  • 42
  • Have you tried `df = spark.read.json("myfile.json")`. Seems to work for me on your first example. **Update**: It also works for your second example, but treats all records as strings so you'll have to do some [regex to convert the string representation of the list](https://stackoverflow.com/a/53504558/5858851) to format it in your desired way. – pault Nov 29 '18 at 16:15
  • Thanks @pault for the idea. I was about to say it might not work, as my data is actually coming from a DataFrame, which I'm converting a single column of XML to a JSON string. But that's interesting, I could create an RDD with the JSON lines, write it, and then read it? or is there another way to simulate the `.json()` method from an RDD, not from reading an external location? – ghukill Nov 29 '18 at 16:30
  • 1
    Realizing that `read.json()` might accept an RDD as well, http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read#pyspark.sql.DataFrameReader.json, giving that a go... – ghukill Nov 29 '18 at 16:33

1 Answers1

2

Reading from a file

If your data were stored in a file (suppose it was named myfile.json) like the following:

{"color":"red", "animal":"fish"}
{"color":"green", "animal":"panda"}
{"color":"red", "animal":"panda", "fruit":"watermelon"}
{"animal":"aardvark"}
{"color":"blue", "fruit":"apple"}
{"color":"pink", "animal":["fish","mustang"]}
{"color":["orange","purple"], "animal":"panda"}

You can use pyspark.sql.DataFrameReader.json to read the file as newline-delimited JSON records.

df = spark.read.json("myfile.json")
df.show()
#+------------------+-------------------+----------+
#|            animal|              color|     fruit|
#+------------------+-------------------+----------+
#|              fish|                red|      null|
#|             panda|              green|      null|
#|             panda|                red|watermelon|
#|          aardvark|               null|      null|
#|              null|               blue|     apple|
#|["fish","mustang"]|               pink|      null|
#|             panda|["orange","purple"]|      null|
#+------------------+-------------------+----------+

df.printSchema()
#root
# |-- animal: string (nullable = true)
# |-- color: string (nullable = true)
# |-- fruit: string (nullable = true)

Read from RDD

You could also do the same to read from an rdd:

import json

rdd = sc.parallelize(
    map(
        json.dumps,
        [
            {"color":"red", "animal":"fish"},
            {"color":"green", "animal":"panda"},
            {"color":"red", "animal":"panda", "fruit":"watermelon"},
            {"animal":"aardvark"},
            {"color":"blue", "fruit":"apple"},
            {"color":"pink", "animal":["fish","mustang"]},
            {"color":["orange","purple"], "animal":"panda"}
        ]
    )
)

df = spark.read.json(rdd)

For the second part, you can use pyspark.sql.functions.regexp_replace to format your multivalued records as desired.

from pyspark.sql.functions import regexp_replace

def format_column(column):
    return regexp_replace(regexp_replace(column, '(^\[)|(\]$)|(")', ''), ",", "/") 

df.select(*[format_column(c).alias(c) for c in df.columns]).show()

#+------------+-------------+----------+
#|      animal|        color|     fruit|
#+------------+-------------+----------+
#|        fish|          red|      null|
#|       panda|        green|      null|
#|       panda|          red|watermelon|
#|    aardvark|         null|      null|
#|        null|         blue|     apple|
#|fish/mustang|         pink|      null|
#|       panda|orange/purple|      null|
#+------------+-------------+----------+
pault
  • 41,343
  • 15
  • 107
  • 149
  • 1
    I didn't realize until now that reading RDDs with `spark.read.json` was possible, which broke the rest wide open. Thanks a bunch! Same for the regex syntax for breaking up multivalued (which is how I'm doing that in a non-Spark context). – ghukill Nov 29 '18 at 16:42