0

Assume I have a data frame like this, where json_column is StringType():

json_column
{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}
{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}

I want to extract all the fields of this json into separate columns like this:

line_1 house_number city name
Test street 123 New York Test1
Test street 456 Los Angeles Test2
Robert Kossendey
  • 6,733
  • 2
  • 12
  • 42

4 Answers4

1

I think there is an easier way to do:

import pyspark.sql.functions as f
from pyspark import Row
from pyspark.shell import spark
from pyspark.sql import DataFrame

df: DataFrame = spark.createDataFrame([
    Row(json_column='{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}'),
    Row(json_column='{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}')
])

schema = 'STRUCT<`address`: STRUCT<`city`: STRING, `houseNumber`: BIGINT, `line1`: STRING>, `name`: STRING>'
df = df.withColumn('obj', f.from_json('json_column', schema))

df = df.select(f.col('obj.address.line1').alias('line_1'),
               f.col('obj.address.houseNumber').alias('house_number'),
               f.col('obj.address.city').alias('city'),
               f.col('obj.name').alias('name'))
df.show(truncate=False)

Output:

+-----------+------------+-----------+-----+
|line_1     |house_number|city       |name |
+-----------+------------+-----------+-----+
|Test street|123         |New York   |Test1|
|Test street|456         |Los Angeles|Test2|
+-----------+------------+-----------+-----+

UPDATE (Generic function)

import pyspark.sql.functions as f
from pyspark import Row
from pyspark.shell import spark
from pyspark.sql import DataFrame


def get_schema(dataframe: DataFrame, column: str):
    row = dataframe.where(f.col(column).isNotNull()).select(column).first()
    return f.schema_of_json(f.lit(row.asDict()[column]))


def flatten(dataframe, column):
    # Adapted from https://stackoverflow.com/a/49532496/6080276 answer
    while True:
        nested_cols = [col for col, _type in dataframe.dtypes
                       if col.startswith(column) and _type.startswith('struct')]
        if len(nested_cols) == 0:
            break

        flat_cols = [col for col in dataframe.columns if col not in nested_cols]
        dataframe = dataframe.select(flat_cols +
                                     [f.col(nc + '.' + c).alias(nc + '_' + c)
                                      for nc in nested_cols
                                      for c in dataframe.select(nc + '.*').columns])
    return dataframe


def extract_json(dataframe, column_name):
    schema = get_schema(dataframe, column_name)
    dataframe = dataframe.withColumn(column_name, f.from_json(column_name, schema).alias(column_name))
    return flatten(dataframe, column_name)


df: DataFrame = spark.createDataFrame([
    Row(json_column='{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}',
        another_json='{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}'),
    Row(json_column='{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}',
        another_json='{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}')
])

df.show(truncate=False)

df = extract_json(dataframe=df, column_name='json_column')
df.show(truncate=False)

df = extract_json(dataframe=df, column_name='another_json')
df.show(truncate=False)

First output (dataframe):

+-----------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------+
|json_column                                                                                    |another_json                                                                                   |
+-----------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------+
|{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}   |{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}   |
|{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}|{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}|
+-----------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------+

Second output (json_column extraction):

+-----------------------------------------------------------------------------------------------+----------------+------------------------+-------------------------------+-------------------------+
|another_json                                                                                   |json_column_name|json_column_address_city|json_column_address_houseNumber|json_column_address_line1|
+-----------------------------------------------------------------------------------------------+----------------+------------------------+-------------------------------+-------------------------+
|{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}   |Test1           |New York                |123                            |Test street              |
|{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}|Test2           |Los Angeles             |456                            |Test street              |
+-----------------------------------------------------------------------------------------------+----------------+------------------------+-------------------------------+-------------------------+

Third output (another_json extraction):

+----------------+------------------------+-------------------------------+-------------------------+-----------------+-------------------------+--------------------------------+--------------------------+
|json_column_name|json_column_address_city|json_column_address_houseNumber|json_column_address_line1|another_json_name|another_json_address_city|another_json_address_houseNumber|another_json_address_line1|
+----------------+------------------------+-------------------------------+-------------------------+-----------------+-------------------------+--------------------------------+--------------------------+
|Test1           |New York                |123                            |Test street              |Test1            |New York                 |123                             |Test street               |
|Test2           |Los Angeles             |456                            |Test street              |Test2            |Los Angeles              |456                             |Test street               |
+----------------+------------------------+-------------------------------+-------------------------+-----------------+-------------------------+--------------------------------+--------------------------+
Kafels
  • 3,864
  • 1
  • 15
  • 32
0

I came up with this class:

"""This module provides methods and classes for extracting jsons out
of data frames and adding them as columns"""
from typing import List, Tuple, Union

from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructField, StructType, StringType, DataType


# pylint: disable=too-few-public-methods
class CfJsonStructField:
    """
     This class contains information about a field
     in a json. It can be used to rename the column names while extraction.
     It acts as a wrapper around the spark.sql class StructField

    :param CfJsonStructType nested_json_struct_fields: left None if the Field has a simple data type
    If the Field is a nested JSON then a List of CfJsonStructField objects is passed.
    """

    # pylint: disable=too-many-arguments
    def __init__(
        self,
        old_column_name: str,
        new_column_name: str = None,
        column_type=StringType(),
        nested_json_struct_fields=None,
    ):
        self.old_column_name = old_column_name
        self.new_column_name = (
            old_column_name if new_column_name is None else new_column_name
        )
        self.column_type = column_type
        self.nested_json_struct_fields = nested_json_struct_fields

    def __repr__(self):
        """returns a string representation of the CfJsonStructField which is at the same time code
        for the Union[Tuple, str] representaiton."""
        new_col_str = ""
        col_type_str = ""
        nested_fields_str = ""
        if self.old_column_name != self.new_column_name:
            new_col_str = f", '{self.new_column_name}'"
        if not isinstance(self.column_type, StringType):
            col_type_str = f", {self.column_type.__str__()}()"
        if self.nested_json_struct_fields:
            nested_fields_str = f", {self.nested_json_struct_fields}"
        return f"('{self.old_column_name}'{new_col_str}{col_type_str}{nested_fields_str})"

    def construct_spark_struct_field(self):
        """This method creates a spark.sql StructField from
        the class variables"""
        return StructField(
            # Per default we make every column nullable. May be subject to future change
            name=self.old_column_name, dataType=self.column_type, nullable=True
        )


def construct_spark_struct_type(schema: List[CfJsonStructField]) -> StructType:
    """This method creates a spark.sql StructType
    from a list of CfJsonStructFields"""
    struct_fields = list(
        map(lambda jsonfield: jsonfield.construct_spark_struct_field(), schema)
    )
    return StructType(struct_fields)


def extract_json_from_column(
    data_frame: DataFrame, column: str, schema: List[Union[Tuple, str]]
) -> DataFrame:
    """This method extracts a json from a column and adds all the fields
    as new columns back to the dataframe"""

    def _col_desc_to_cf_json_struct_field(col_desc: Union[Tuple, str]) -> CfJsonStructField:
        if isinstance(col_desc, str):
            return CfJsonStructField(col_desc)
        old_column_name = col_desc[0]
        if isinstance(col_desc[1], str):
            new_column_name = col_desc[1]
            if len(col_desc) == 3:
                if isinstance(col_desc[2], DataType):
                    column_type = col_desc[2]
                    return CfJsonStructField(old_column_name, new_column_name, column_type)
                return CfJsonStructField(
                    old_column_name,
                    new_column_name,
                    nested_json_struct_fields=[
                        _col_desc_to_cf_json_struct_field(t) for t in col_desc[2]])
            return CfJsonStructField(old_column_name, new_column_name)
        return CfJsonStructField(
            old_column_name,
            nested_json_struct_fields=[_col_desc_to_cf_json_struct_field(t) for t in col_desc[1]])

    def _extract(
        data_frame: DataFrame, column: str, schema: List[CfJsonStructField]
    ) -> DataFrame:
        """This method extracts a json from a column and adds all the fields
        as new columns back to the dataframe"""
        data_frame = data_frame.withColumn(
            "data", from_json(column, construct_spark_struct_type(schema))
        ).select("*", col("data.*"))

        for field in schema:
            data_frame = data_frame.withColumnRenamed(
                field.old_column_name, field.new_column_name
            )

        data_frame = data_frame.drop(column, "data")
        for field in schema:
            if field.nested_json_struct_fields is not None:
                data_frame = _extract(
                    data_frame,
                    column=field.new_column_name,
                    schema=field.nested_json_struct_fields,
                )
        return data_frame

    return _extract(
        data_frame,
        column,
        [_col_desc_to_cf_json_struct_field(t) for t in schema]
    )

You can now call extract_json_from_column like this:

has_json_extracted = extract_json_from_column(
    data_frame=data_frame,
    column='json_column',
    schema=[
        'name',
        ('address', [
                ('line1', 'line_1'),
                ('houseNumber', 'house_number', IntegerType()),
                'city',
        ])
    ]
)

The tuples are build like this:

  1. 'field_name'
  2. either how the new column is supposed to be called (can be left out if column name = field name), or array if it's a nested field
  3. Spark DataType, default is StringType() )
Robert Kossendey
  • 6,733
  • 2
  • 12
  • 42
0

Converting to RDD then reading it again will be the simplest way.

tl; dr;

from pyspark.sql.functions import col
df=spark.createDataFrame([
    Row(json_column='{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}'),
    Row(json_column='{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}')
])
new_rdd = df.rdd.flatMap(lambda x:x)
new_df = spark.read.json(new_rdd)
new_df = new_df.select(col("address.*"),col("name"))

Explanation

>>> from pyspark.sql.functions import col
>>> df=spark.createDataFrame([
    Row(json_column='{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}'),
    Row(json_column='{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}')
])
>>> df.show(truncate=False)
+-----------------------------------------------------------------------------------------------+
|json_column                                                                                    |
+-----------------------------------------------------------------------------------------------+
|{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}   |
|{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}|
+-----------------------------------------------------------------------------------------------+

>>> df.printSchema()
root
 |-- json_column: string (nullable = true)

>>> new_rdd = df.rdd.flatMap(lambda x:x)
>>> new_rdd.take(2)
['{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}', '{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}']

>>> new_df = spark.read.json(new_rdd)

>>> new_df.show(truncate=False)
+-------------------------------+-----+
|address                        |name |
+-------------------------------+-----+
|[New York, 123, Test street]   |Test1|
|[Los Angeles, 456, Test street]|Test2|
+-------------------------------+-----+

>>> new_df.printSchema()
root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- houseNumber: long (nullable = true)
 |    |-- line1: string (nullable = true)
 |-- name: string (nullable = true)

>>> new_df = new_df.select(col("address.*"),col("name"))
>>> new_df.show()
+-----------+-----------+-----------+-----+
|       city|houseNumber|      line1| name|
+-----------+-----------+-----------+-----+
|   New York|        123|Test street|Test1|
|Los Angeles|        456|Test street|Test2|
+-----------+-----------+-----------+-----+

Sandeep Singh
  • 432
  • 6
  • 17
0

There is a dedicated function for such behaviour: json_tuple(). In this case it's a bit more complicated since there is a nested JSON, but it still can be used.

import pyspark.sql.functions as F
from pyspark import Row
from pyspark.shell import spark

df = spark.createDataFrame([
    Row(json_column='{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}'),
    Row(json_column='{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}')
])

df = (
  df
  .select(F.json_tuple(F.col("json_column"), "address", "name"))
  .select(F.json_tuple(F.col("c0"), "line1", "houseNumber", "city"), "c1")
)
df = df.toDF("line_1", "house_number", "city", "name")
df.show(2, False)

Result:

+-----------+------------+-----------+-----+
|line_1     |house_number|city       |name |
+-----------+------------+-----------+-----+
|Test street|123         |New York   |Test1|
|Test street|456         |Los Angeles|Test2|
+-----------+------------+-----------+-----+
l_po
  • 61
  • 5