5

Trying to cast StringType to ArrayType of JSON for a dataframe generated form CSV.

Using pyspark on Spark2

The CSV file I am dealing with; is as follows -

date,attribute2,count,attribute3
2017-09-03,'attribute1_value1',2,'[{"key":"value","key2":2},{"key":"value","key2":2},{"key":"value","key2":2}]'
2017-09-04,'attribute1_value2',2,'[{"key":"value","key2":20},{"key":"value","key2":25},{"key":"value","key2":27}]'

As shown above, it contains one attribute "attribute3" in literal string, which is technically a list of dictionary(JSON) with exact length of 2. (This is the output of function distinct)

Snippet from the printSchema()

attribute3: string (nullable = true)

I am trying to cast the "attribute3" to ArrayType as follows

temp = dataframe.withColumn(
    "attribute3_modified",
    dataframe["attribute3"].cast(ArrayType())
)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: __init__() takes at least 2 arguments (1 given)

Indeed, ArrayType expects datatype as argument. I tried with "json", but it did not work.

Desired Output - In the end, I need to convert attribute3 to ArrayType() or plain simple Python list. (I am trying to avoid use of eval)

How do I convert it to ArrayType, so that I can treat it as list of JSONs?

Am I missing anything here?

(The documentation,does not address this problem in straightforward way)

pault
  • 41,343
  • 15
  • 107
  • 149
malhar
  • 562
  • 1
  • 9
  • 21
  • 2
    What's your desired output? Please read [how to create good reproducible apache spark dataframe examples](https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-dataframe-examples) and try to provide us with some sample input/output. – pault Aug 06 '18 at 18:43
  • @pault Updated the question. I am just trying to convert the String to `ArrayType(JSON?)` – malhar Aug 06 '18 at 18:48
  • 1
    JSON is not a valid data type for an array in `pyspark`. If you could provide an example of what you desire the final output to look like that would be helpful. There could be different methods to get to that output that have not been considered. – vielkind Aug 06 '18 at 18:51
  • Yup, JSON is not a valid data type. I wanted to convert it to plain simple Python list where I can perform some operations on it (For example - Sum / concatenate values in JSON, or joining it with other data frame and check equality of attributes) I am trying to avoid using `eval` – malhar Aug 06 '18 at 18:59

2 Answers2

10

Use from_json with a schema that matches the actual data in attribute3 column to convert json to ArrayType:

Original data frame:

df.printSchema()
#root
# |-- date: string (nullable = true)
# |-- attribute2: string (nullable = true)
# |-- count: long (nullable = true)
# |-- attribute3: string (nullable = true)

from pyspark.sql.functions import from_json
from pyspark.sql.types import *

Create the schema:

schema = ArrayType(
    StructType([StructField("key", StringType()), 
                StructField("key2", IntegerType())]))

Use from_json:

df = df.withColumn("attribute3", from_json(df.attribute3, schema))

df.printSchema()
#root
# |-- date: string (nullable = true)
# |-- attribute2: string (nullable = true)
# |-- count: long (nullable = true)
# |-- attribute3: array (nullable = true)
# |    |-- element: struct (containsNull = true)
# |    |    |-- key: string (nullable = true)
# |    |    |-- key2: integer (nullable = true)

df.show(1, False)
#+----------+----------+-----+------------------------------------+
#|date      |attribute2|count|attribute3                          |
#+----------+----------+-----+------------------------------------+
#|2017-09-03|attribute1|2    |[[value, 2], [value, 2], [value, 2]]|
#+----------+----------+-----+------------------------------------+
Psidom
  • 209,562
  • 33
  • 339
  • 356
  • 1
    Likely a version issue, but I get `java.lang.ClassCastException: org.apache.spark.sql.types.ArrayType cannot be cast to org.apache.spark.sql.types.StructType` with this code. – pault Aug 06 '18 at 19:10
  • 1
    @pault Agree. According to the [docs](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$), in version `2.1.0`, only *StructType* is allowed. *ArrayType* is added in `2.2.0`. – Psidom Aug 06 '18 at 19:20
6

The answer by @Psidom does not work for me because I am using Spark 2.1.

In my case, I had to slightly modify your attribute3 string to wrap it in a dictionary:

import pyspark.sql.functions as f
df2 = df.withColumn("attribute3", f.concat(f.lit('{"data": '), "attribute3", f.lit("}")))
df2.select("attribute3").show(truncate=False)
#+--------------------------------------------------------------------------------------+
#|attribute3                                                                            |
#+--------------------------------------------------------------------------------------+
#|{"data": [{"key":"value","key2":2},{"key":"value","key2":2},{"key":"value","key2":2}]}|
#+--------------------------------------------------------------------------------------+

Now I can define the schema as follows:

schema = StructType(
    [
        StructField(
            "data",
            ArrayType(
                StructType(
                    [
                        StructField("key", StringType()),
                        StructField("key2", IntegerType())
                    ]
                )
            )
        )
    ]
)

Now use from_json followed by getItem():

df3 = df2.withColumn("attribute3", f.from_json("attribute3", schema).getItem("data"))
df3.show(truncate=False)
#+----------+----------+-----+---------------------------------+
#|date      |attribute2|count|attribute3                       |
#+----------+----------+-----+---------------------------------+
#|2017-09-03|attribute1|2    |[[value,2], [value,2], [value,2]]|
#+----------+----------+-----+---------------------------------+

And the schema:

df3.printSchema()
# root
# |-- attribute3: array (nullable = true)
# |    |-- element: struct (containsNull = true)
# |    |    |-- key: string (nullable = true)
# |    |    |-- key2: integer (nullable = true)
pault
  • 41,343
  • 15
  • 107
  • 149
  • 4
    This worked splendidly for me. Smart use of using the wrapping trick to make it work. I was having the same problem on 2.1. Just to add to your answer, I was able to dynamically let spark determine the schema using `schema = spark.read.json(df2.rdd.map(lambda row: row.attribute3)).schema` – Scratch'N'Purr Feb 21 '19 at 09:48