1

I have a dataframe that stores a JSON object in one column. I want to process the JSON object to create a new dataframe (different number and type of columns, and each row will generate n new rows from the JSON object). I wrote this logic below that appends a dictionary (row) to a list while iterating through the original dataset.

data = []

def process_row_data(row):
    global data
    for item in row.json_object['obj']:
        # create a dictionary to represent each row of a new dataframe
        parsed_row = {'a': item.a, 'b':item.b, ..... 'zyx':item.zyx}
        data.append(parsed_row)


df.apply(lambda row: process_row_data(row), axis=1)
# create the new dataframe
df_final = pd.DataFrame.from_dict(data)

However, this solution doesn't seem to be scalable when the number of rows and the size of the parsed_row grow.

Is there a way to write this in a scalable way with PySpark?

ZygD
  • 22,092
  • 39
  • 79
  • 102
Alex
  • 1,447
  • 7
  • 23
  • 48
  • 2
    A working example with some dummy-data would be really helpful here. So could you define a dataframe df with some json-data in your example code? – MangoNrFive Oct 28 '22 at 09:43
  • I had a question similar to your some time ago : json array in a column, and desire to concatenate all in another dataframe. maybe you could find some inspiration https://stackoverflow.com/questions/54537616/flatten-dataframe-nested-list-array-with-extra-index-keys-for-time-series – LoneWanderer Oct 30 '22 at 14:03

2 Answers2

2

To create a new dataframe using another dataframe with a column that contains JSON objects, one object per row, you can use different approaches depending on the library you are using. In this answer, we will show you two possible solutions: one using pandas and one using PySpark.

Solution using pandas

Pandas is a popular Python library for data analysis and manipulation. It provides a DataFrame object that can store tabular data in rows and columns. To create a new dataframe using another dataframe with a JSON column, you can use the following steps:

  1. Import pandas and json libraries
  2. Load the original dataframe from a source (e.g. a csv file, a database, etc.)
  3. Define a function that can parse a JSON string and return a dictionary
  4. Apply the function to the JSON column and store the result in a new column
  5. Use the pandas.json_normalize function to convert the dictionary column into a new dataframe
  6. Drop the original JSON column and merge the new dataframe with the original one

Here is an example of Python code that implements these steps:

# Import libraries
import pandas as pd
import json

# Load the original dataframe
df = pd.read_csv('data.csv')

# Define a function to parse JSON strings
def parse_json(x):
  try:
    return json.loads(x)
  except:
    return None

# Apply the function to the JSON column
df['json_dict'] = df['json_column'].apply(parse_json)

# Convert the dictionary column into a new dataframe
df_json = pd.json_normalize(df['json_dict'])

# Drop the original JSON column and merge the new dataframe
df = df.drop('json_column', axis=1)
df = pd.concat([df, df_json], axis=1)

Solution using PySpark

PySpark is a Python interface for Apache Spark, a distributed computing framework for large-scale data processing. It provides a SparkSession object that can create and manipulate Spark DataFrames, which are similar to pandas DataFrames but can handle distributed data across multiple nodes. To create a new dataframe using another dataframe with a JSON column, you can use the following steps:

  1. Import pyspark.sql and pyspark.sql.functions libraries
  2. Create a SparkSession object and load the original dataframe from a source (e.g. a csv file, a database, etc.)
  3. Use the pyspark.sql.functions.from_json function to convert the JSON column into a StructType column
  4. Use the pyspark.sql.functions.col function to select the fields from the StructType column and create new columns
  5. Drop the original JSON column and keep the new columns

Here is an example of Python code that implements these steps:

# Import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col

# Create a SparkSession object
spark = SparkSession.builder.appName('json_example').getOrCreate()

# Load the original dataframe
df = spark.read.csv('data.csv', header=True, inferSchema=True)

# Define the schema of the JSON column
json_schema = spark.read.json(df.rdd.map(lambda row: row.json_column)).schema

# Convert the JSON column into a StructType column
df = df.withColumn('json_struct', from_json(col('json_column'), json_schema))

# Select the fields from the StructType column and create new columns
for field in json_schema.fields:
  df = df.withColumn(field.name, col('json_struct.' + field.name))

# Drop the original JSON column and the StructType column
df = df.drop('json_column', 'json_struct')
Ahmed Mohamed
  • 866
  • 1
  • 4
1

You have given us the function, but haven't provided input data (please add example input next time). It seems, your input data must have been PySpark dataframe which you later transformed into Pandas. This is the only way in which I made your function work.

Input dataframe:

df = spark.createDataFrame(
    [({"obj":[{"a": 1, "b": 2}, {"a": 5, "b": 6}]},),
     ({"obj":[{"a": 3, "b": 4}, {"a": 7, "b": 8}]},)],
    "json_object struct<obj:array<struct<a:int,b:int>>>")

Using your function and the above dataframe we get this:

df = df.toPandas()

data = []
def process_row_data(row):
    global data
    for item in row.json_object['obj']:
        parsed_row = {'a': item.a, 'b':item.b}
        data.append(parsed_row)

df.apply(lambda row: process_row_data(row), axis=1)
df_final = pd.DataFrame.from_dict(data)

print(df_final)
#    a  b
# 0  1  2
# 1  5  6
# 2  3  4
# 3  7  8

Turning to Pandas like this, when working in PySpark is a problem, because you go out of distributed computing mode and perform tasks in one node, i.e. inefficiently. There are exceptions like Spark's pandas_udf. Just keep in mind that it's always most efficient to work your way using native Spark tools.

Answer in PySpark:

df = df.selectExpr("inline(json_object.obj)")

df.show()
# +---+---+
# |  a|  b|
# +---+---+
# |  1|  2|
# |  5|  6|
# |  3|  4|
# |  7|  8|
# +---+---+

json_object.obj navigates to the obj field of the json_object struct type column.

inline explodes array of structs into columns without providing column names, because they already exist as struct field names.

ZygD
  • 22,092
  • 39
  • 79
  • 102