I think there is an easier way to do:
import pyspark.sql.functions as f
from pyspark import Row
from pyspark.shell import spark
from pyspark.sql import DataFrame
df: DataFrame = spark.createDataFrame([
Row(json_column='{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}'),
Row(json_column='{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}')
])
schema = 'STRUCT<`address`: STRUCT<`city`: STRING, `houseNumber`: BIGINT, `line1`: STRING>, `name`: STRING>'
df = df.withColumn('obj', f.from_json('json_column', schema))
df = df.select(f.col('obj.address.line1').alias('line_1'),
f.col('obj.address.houseNumber').alias('house_number'),
f.col('obj.address.city').alias('city'),
f.col('obj.name').alias('name'))
df.show(truncate=False)
Output:
+-----------+------------+-----------+-----+
|line_1 |house_number|city |name |
+-----------+------------+-----------+-----+
|Test street|123 |New York |Test1|
|Test street|456 |Los Angeles|Test2|
+-----------+------------+-----------+-----+
UPDATE (Generic function)
import pyspark.sql.functions as f
from pyspark import Row
from pyspark.shell import spark
from pyspark.sql import DataFrame
def get_schema(dataframe: DataFrame, column: str):
row = dataframe.where(f.col(column).isNotNull()).select(column).first()
return f.schema_of_json(f.lit(row.asDict()[column]))
def flatten(dataframe, column):
# Adapted from https://stackoverflow.com/a/49532496/6080276 answer
while True:
nested_cols = [col for col, _type in dataframe.dtypes
if col.startswith(column) and _type.startswith('struct')]
if len(nested_cols) == 0:
break
flat_cols = [col for col in dataframe.columns if col not in nested_cols]
dataframe = dataframe.select(flat_cols +
[f.col(nc + '.' + c).alias(nc + '_' + c)
for nc in nested_cols
for c in dataframe.select(nc + '.*').columns])
return dataframe
def extract_json(dataframe, column_name):
schema = get_schema(dataframe, column_name)
dataframe = dataframe.withColumn(column_name, f.from_json(column_name, schema).alias(column_name))
return flatten(dataframe, column_name)
df: DataFrame = spark.createDataFrame([
Row(json_column='{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}',
another_json='{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"}'),
Row(json_column='{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}',
another_json='{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}')
])
df.show(truncate=False)
df = extract_json(dataframe=df, column_name='json_column')
df.show(truncate=False)
df = extract_json(dataframe=df, column_name='another_json')
df.show(truncate=False)
First output (dataframe):
+-----------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------+
|json_column |another_json |
+-----------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------+
|{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"} |{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"} |
|{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}|{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}|
+-----------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------+
Second output (json_column
extraction):
+-----------------------------------------------------------------------------------------------+----------------+------------------------+-------------------------------+-------------------------+
|another_json |json_column_name|json_column_address_city|json_column_address_houseNumber|json_column_address_line1|
+-----------------------------------------------------------------------------------------------+----------------+------------------------+-------------------------------+-------------------------+
|{"address": {"line1": "Test street","houseNumber": 123,"city": "New York"}, "name": "Test1"} |Test1 |New York |123 |Test street |
|{"address": {"line1": "Test street","houseNumber": 456,"city": "Los Angeles"}, "name": "Test2"}|Test2 |Los Angeles |456 |Test street |
+-----------------------------------------------------------------------------------------------+----------------+------------------------+-------------------------------+-------------------------+
Third output (another_json
extraction):
+----------------+------------------------+-------------------------------+-------------------------+-----------------+-------------------------+--------------------------------+--------------------------+
|json_column_name|json_column_address_city|json_column_address_houseNumber|json_column_address_line1|another_json_name|another_json_address_city|another_json_address_houseNumber|another_json_address_line1|
+----------------+------------------------+-------------------------------+-------------------------+-----------------+-------------------------+--------------------------------+--------------------------+
|Test1 |New York |123 |Test street |Test1 |New York |123 |Test street |
|Test2 |Los Angeles |456 |Test street |Test2 |Los Angeles |456 |Test street |
+----------------+------------------------+-------------------------------+-------------------------+-----------------+-------------------------+--------------------------------+--------------------------+