2

How can we change the datatype of a nested column in Pyspark? For rxample, how can I change the data type of value from string to int?

Reference:how to change a Dataframe column from String type to Double type in pyspark

{
    "x": "12",
    "y": {
        "p": {
            "name": "abc",
            "value": "10"
        },
        "q": {
            "name": "pqr",
            "value": "20"
        }
    }
}
J.D
  • 45
  • 1
  • 6
  • 1. Does this change need to be persistent, with changes saved to the json file? Or do you need the precision while you are performing an operation? – diek Aug 24 '17 at 00:27
  • @diek Need it white writing to json file – J.D Aug 24 '17 at 00:47

2 Answers2

2

You can read the json data using

from pyspark import SQLContext

sqlContext = SQLContext(sc)
data_df = sqlContext.read.json("data.json", multiLine = True)

data_df.printSchema()

output

root
 |-- x: long (nullable = true)
 |-- y: struct (nullable = true)
 |    |-- p: struct (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- value: long (nullable = true)
 |    |-- q: struct (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- value: long (nullable = true)

Now you can access the data from y column as

data_df.select("y.p.name")
data_df.select("y.p.value")

output

abc, 10

Ok, the solution is to add a new nested column with correct schema and drop the column with wrong schema

from pyspark.sql.functions import *
from pyspark.sql import Row

df3 = spark.read.json("data.json", multiLine = True)

# create correct schema from old 
c = df3.schema['y'].jsonValue()
c['name'] = 'z'
c['type']['fields'][0]['type']['fields'][1]['type'] = 'long'
c['type']['fields'][1]['type']['fields'][1]['type'] = 'long'

y_schema = StructType.fromJson(c['type'])

# define a udf to populate the new column. Row are immuatable so you 
# have to build it from start.

def foo(row):
    d = Row.asDict(row)
    y = {}
    y["p"] = {}
    y["p"]["name"] = d["p"]["name"]
    y["p"]["value"] = int(d["p"]["value"])
    y["q"] = {}
    y["q"]["name"] = d["q"]["name"]
    y["q"]["value"] = int(d["p"]["value"])

    return(y)
map_foo = udf(foo, y_schema)

# add the column
df3_new  = df3.withColumn("z", map_foo("y"))

# delete the column
df4 = df3_new.drop("y")


df4.printSchema()

output

root
 |-- x: long (nullable = true)
 |-- z: struct (nullable = true)
 |    |-- p: struct (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- value: long (nullable = true)
 |    |-- q: struct (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- value: long (nullable = true)


df4.show()

output

+---+-------------------+
|  x|                  z|
+---+-------------------+
| 12|[[abc,10],[pqr,10]]|
+---+-------------------+
pauli
  • 4,191
  • 2
  • 25
  • 41
  • @aswinids I have edited the question. Any thoughts on this one? – J.D Aug 23 '17 at 13:50
  • @aswinids : Thanks for helping. Do we have decima/timestamp data type in json schema? – J.D Aug 23 '17 at 19:05
  • @aswinids: If I change the value of 10 to "10" and use type: 'long', I get null – J.D Aug 23 '17 at 22:37
  • @zero323 Do you have any idea ? – J.D Aug 24 '17 at 00:53
  • @J.D It's working completely fine with above json_schema. can you check it again? And yes, I'm reading the json file after converting values to string. – pauli Aug 24 '17 at 03:46
  • I have tried a lot but I still get null on retrieving the values. It looks like the schema isnt able to cast "10" as long and throws null. Is there any other way? – J.D Aug 24 '17 at 12:45
  • The schema shows that the type is long but on retrieving the value, I get NULL. – J.D Aug 24 '17 at 13:53
  • @J.D my mistake, didn't check the output from df.show() . I have edited the solution. It is working now. – pauli Aug 24 '17 at 14:51
  • What if we have a lot of nested columns under 'y'? Isn't this approach a more static one? since we are using indexes in c['type']['fields'][0]['type']['fields'][1]['type'] = 'long to convert. Do we have any way where in we can do it by the column name? – J.D Aug 24 '17 at 15:40
  • You can always write a simple function to create the schema from the old one. You have to supply nested_col_level1, nested_col_level2 and so on to the function. – pauli Aug 25 '17 at 11:21
0

It may seem simple to use arbitrary variable names but this is problematic and contrary to PEP8. And when dealing with numbers, I suggest avoiding the common names used in iterating over such structures... ie, value.

import json

with open('random.json') as json_file:
    data = json.load(json_file)

for k, v in data.items():
    if k == 'y':
        for key, item in v.items():
            item['value'] = float(item['value'])


print(type(data['y']['p']['value']))
print(type(data['y']['q']['value']))
# mac → python3 make_float.py
# <class 'float'>
# <class 'float'>
json_data = json.dumps(data, indent=4, sort_keys=True)
with open('random.json', 'w') as json_file:
    json_file.write(json_data)

out json file

diek
  • 657
  • 7
  • 16
  • The crucial part of this problem is that we have around 60GB of data produced everyday and we need to ensure the scalability and thats why Spark was the way out – J.D Aug 24 '17 at 02:13
  • Of course this would not be able o handle such a massive amount of data. Why did the question you referenced not work? From the documentation they give an example of dealing with this: https://ghostbin.com/paste/wt5y6 – diek Aug 24 '17 at 02:46