0

I realized that I may need to add a bit more details. Imagine that I have 2 columns in a dataframe. Both are strings, one is an ID, the other is a json string.

This can be constructed below:

>>> a1 = [{"a": 1, "b": "[{\"h\": 3, \"i\": 5} ,{\"h\": 4, \"i\": 6}]" },
...       {"a": 1, "b": "[{\"h\": 6, \"i\": 10},{\"h\": 8, \"i\": 12}]"}]
>>> df1 = sqlContext.read.json(sc.parallelize(a1))
>>> df1.show()
+---+--------------------+
|  a|                   b|
+---+--------------------+
|  1|[{"h": 3, "i": 5}...|
|  1|[{"h": 6, "i": 10...|
+---+--------------------+
>>> df1.printSchema()
root
 |-- a: long (nullable = true)
 |-- b: string (nullable = true)

Note that the json code is StringType. I want to write a function that creates are new column which stores the data as a nested table, like below:

root
 |-- a: long (nullable = true)
 |-- b: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- h: long (nullable = true)
 |    |    |-- i: long (nullable = true)

I am using 1.6 therefore I don't have to_json cast function. I have tried to do this

>>> df1.withColumn('new', get_json_object(df1.b,'$')).show()
+---+--------------------+--------------------+
|  a|                   b|                 new|
+---+--------------------+--------------------+
|  1|[{"h": 3, "i": 5}...|[{"h":3,"i":5},{"...|
|  1|[{"h": 6, "i": 10...|[{"h":6,"i":10},{...|
+---+--------------------+--------------------+

The issue is the new column created is still a string. :(

  • Just a bit more info, this is to transform an XML string field to a nested table. I have parsed XML into json with a map for the specific column, and used sqlContext.read.json(rdd), and it worked. However, I don't want to do this, I want to use withColumn on the dataframe and create a new column with these nested values. – Anthony Sun Dec 19 '18 at 10:13
  • Do you want to modify the list "a" so Spark can infer the schema that you need? Or do you want not to change your list "a" and work on modifications applied to rdd or df? – titiro89 Dec 19 '18 at 12:26
  • Possible duplicate of [How to query JSON data column using Spark DataFrames?](https://stackoverflow.com/questions/34069282/how-to-query-json-data-column-using-spark-dataframes) – 10465355 Dec 19 '18 at 13:18

1 Answers1

0

I am able to resolve the issue using a map function:

a1 = [{"a": 1, "b": "[{\"h\": 3, \"i\": 5} ,{\"h\": 4, \"i\": 6}]"},{"a": 1, "b": "[{\"h\": 6, \"i\": 10},{\"h\": 8, \"i\": 12}]"}]
df1 = sqlContext.read.json(sc.parallelize(a1))
rdd = df1.map(lambda x: x.b)
df2 = sqlContext.read.json(rdd)

>>> df2.printSchema()
root
 |-- h: long (nullable = true)
 |-- i: long (nullable = true)

The issue is is then I lose the other columns:

+---+---+
|  h|  i|
+---+---+
|  3|  5|
|  4|  6|
|  6| 10|
|  8| 12|
+---+---+

So I have tried with withColumn data frame function, creating an udf to explicitly convert it to json. This is where the issue is, that it doesn't seem that withColumn can work with json objects.

The alternative that I had is write a function to combine the first 2 columns, something like below:

# This is a 2.7 workaroud, all string read from configuration file for some reason are converted
# to unicode. This issue does not appear to impact v3.6 and above
def convert_dict(mydict):
return {k.encode('ascii', 'ignore'): str(v).encode('ascii','ignore') for k, v in mydict.iteritems()}

rdd = df1.map(lambda x: {'a': x.a, 'b': [convert_dict(y) for y in json.loads(x.b)]})
df2 = sqlContext.read.json(rdd)

>>> df2.printSchema()
root
|-- a: long (nullable = true)
|-- b: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- h: string (nullable = true)
| | |-- i: string (nullable = true)