2

I'm not a pro with spark, so ask for help.

I made a migration from DynamoDB table into S3 with built in service. It saves files in *.json format. Let's say below we have an example of a row (each rows data is a dict nested under key "Item").

    {
    "Item": {
        "accept_languages":       {
            "M": {
                "en":    {"N": "0.9"},
                "en-US": {"N": "1"}
            }
        },
        "accept_mimetypes":       {
            "M": {
                "*/*":        {"N": "0.8"},
                "image/*":    {"N": "1"},
                "image/apng": {"N": "1"},
                "image/webp": {"N": "1"}
            }
        },
        "id":                     {"S": "5cddbd53b870c2619f1083ed"},
        "ip":                     {"S": "11.11.111.11"},
        "landing_page__type":     {"S": "PageMain"},
        "location__city":         {"S": "Scituate"},
        "location__country":      {"S": "United States"},
        "location__country_code": {"S": "US"},
        "location__region":       {"S": "MA"},
        "location__zip":          {"S": "02066"},
        "origin_url":             {"S": "https://www.bing.com/"},
        "session":                {"S": "b4d58fd18"},
        "source":                 {"S": "bing"},
        "user_agent__browser":    {"S": "Chrome"},
        "user_device":            {"S": "t"}
    }
}

As we see each rows data is nested. I want to create a *.csv file as a result from it. Any recommendations how I can parse it? Currently I have a UDF (custom function) to transform a dict itself from DynamoDB to regular view. How can I extract data from each row and apply that function to it, for example.

Thanks

4d61726b
  • 427
  • 5
  • 26

1 Answers1

4

The idea (adopted from this answer) is to recursively collect all column names in a list and then use this list in a select statement:

from pyspark.sql import functions as F
from pyspark.sql import types as T

df = spark.read.option("multiLine", "true").json(<filename>)

def flatten(schema, prefix=None):
    for field in schema.fields:
        if prefix is None:
            colName = field.name
        else:
            colName = prefix + "." + field.name
        if isinstance(field.dataType,T.StructType):
            yield from flatten(field.dataType, colName)
        else:
            yield F.col(colName).alias(colName.replace(".", "_"))
    
df.select(list(flatten(df.schema))).show()

Output:

+----------------------------+-------------------------------+-----------------------------+---------------------------------+------------------------------------+------------------------------------+--------------------+------------+-------------------------+---------------------+------------------------+-----------------------------+-----------------------+--------------------+--------------------+--------------+-------------+--------------------------+------------------+
|Item_accept_languages_M_en_N|Item_accept_languages_M_en-US_N|Item_accept_mimetypes_M_*/*_N|Item_accept_mimetypes_M_image/*_N|Item_accept_mimetypes_M_image/apng_N|Item_accept_mimetypes_M_image/webp_N|           Item_id_S|   Item_ip_S|Item_landing_page__type_S|Item_location__city_S|Item_location__country_S|Item_location__country_code_S|Item_location__region_S|Item_location__zip_S|   Item_origin_url_S|Item_session_S|Item_source_S|Item_user_agent__browser_S|Item_user_device_S|
+----------------------------+-------------------------------+-----------------------------+---------------------------------+------------------------------------+------------------------------------+--------------------+------------+-------------------------+---------------------+------------------------+-----------------------------+-----------------------+--------------------+--------------------+--------------+-------------+--------------------------+------------------+
|                         0.9|                              1|                          0.8|                                1|                                   1|                                   1|5cddbd53b870c2619...|11.11.111.11|                 PageMain|             Scituate|           United States|                           US|                     MA|               02066|https://www.bing....|     b4d58fd18|         bing|                    Chrome|                 t|
+----------------------------+-------------------------------+-----------------------------+---------------------------------+------------------------------------+------------------------------------+--------------------+------------+-------------------------+---------------------+------------------------+-----------------------------+-----------------------+--------------------+--------------------+--------------+-------------+--------------------------+------------------+

This dataframe can then be saved as flat csv.

werner
  • 13,518
  • 6
  • 30
  • 45
  • I believe the `.option("multiLine", "true")` is incorrect and will only read the first line in each of the files created by AWS – Jake Greene Oct 29 '21 at 14:43