0

I have a large nested json document for each year (say 2018, 2017), which has aggregated data by each month (Jan-Dec) and each day (1-31).

{
 "2018" : {
    "Jan": {
        "1": {
            "u": 1,
            "n": 2
        }
        "2": {
            "u": 4,
            "n": 7
        }
    },
    "Feb": {
        "1": {
            "u": 3,
            "n": 2
        },
        "4": {
            "u": 4,
            "n": 5
        }
    }
 }
}

I have used AWS Glue Relationalize.apply function to convert above hierarchal data into flat structure:

dfc = Relationalize.apply(frame = datasource0, staging_path = my_temp_bucket, name = my_ref_relationalize_table, transformation_ctx = "dfc")

Which gives me table with columns of each json element as below:

| 2018.Jan.1.u | 2018.Jan.1.n | 2018.Jan.2.u | 2018.Jan.1.n | 2018.Feb.1.u | 2018.Feb.1.n | 2018.Feb.2.u | 2018.Feb.1.n | 
| 1            |      2       |      4       |      7       |      3       |      2       |      4       |      5       | 

As you can see, there will be lot of column in the table for each day and each month. And, I want to simplify the table by converting columns into rows to have below table.

| year | month | dd | u | n | 
| 2018 | Jan   | 1  | 1 | 2 | 
| 2018 | Jan   | 2  | 4 | 7 |  
| 2018 | Feb   | 1  | 3 | 2 |  
| 2018 | Jan   | 4  | 4 | 5 |

With my search, I could not get right answer. Is there a solution AWS Glue/PySpark or any other way to accomplish unpivot function to get row based table from column based table? Can it be done in Athena ?

bizready
  • 139
  • 3
  • 14
  • 1
    Possible duplicate of [Unpivot in spark-sql/pyspark](https://stackoverflow.com/questions/42465568/unpivot-in-spark-sql-pyspark) – pault Jan 04 '19 at 02:27
  • I did see the link that @pault posted and it is for fixed/known columns, I guess it is a good start. Would there be any solution with dynamic nature of columns, possibly?. – bizready Jan 04 '19 at 03:17
  • Try flattening the JSON – pissall Jan 04 '19 at 05:41
  • First table in the description is a result of flattened jason; it will still created lots of columns in my way of thinking, and I think still requires UnPivot – bizready Jan 04 '19 at 12:44

2 Answers2

0

Implemented solution similar to the below snippet

dataFrame = datasource0.toDF()
tableDataArray = [] ## to hold rows
rowArrayCount = 0
for row in dataFrame.rdd.toLocalIterator():
    for colName in dataFrame.schema.names:
        value = row[colName]
        keyArray = colName.split('.')
        rowDataArray = []
        rowDataArray.insert(0,str(id))
        rowDataArray.insert(1,str(keyArray[0]))
        rowDataArray.insert(2,str(keyArray[1]))
        rowDataArray.insert(3,str(keyArray[2]))
        rowDataArray.insert(4,str(keyArray[3]))
        tableDataArray.insert(rowArrayCount,rowDataArray)
    rowArrayCount=+1

unpivotDF = None
for rowDataArray in tableDataArray:
    newRowDF = sc.parallelize([Row(year=rowDataArray[0],month=rowDataArray[1],dd=rowDataArray[2],u=rowDataArray[3],n=rowDataArray[4])]).toDF()
    if unpivotDF is None:
        unpivotDF = newRowDF
    else :
        unpivotDF = unpivotDF.union(newRowDF)

datasource0 = datasource0.fromDF(unpivotDF, glueContext, "datasource0")

in above newRowDF can also be created as below if data type has to be enforced

columns = [StructField('year',StringType(), True),StructField('month', IntegerType(), ....]
schema = StructType(columns)
unpivotDF = sqlContext.createDataFrame(sc.emptyRDD(), schema)
for rowDataArray in tableDataArray:
    newRowDF = spark.createDataFrame(rowDataArray, schema)
bizready
  • 139
  • 3
  • 14
0

Here are the steps to successfully unpivot your Dataset Using AWS Glue with Pyspark

  1. We need to add an additional import statement to the existing boiler plate import statements

from pyspark.sql.functions import expr
  1. If our data is in a DynamicFrame, we need to convert it to a Spark DataFrame for example:

    df_customer_sales = dyf_customer_sales.toDF()

dataset to unpivot

  1. Use the stack method to unpivot our dataset based on how many columns we want to unpivot

unpivotExpr = "stack(4, 'january', january, 'febuary', febuary,  'march', march, 'april', april) as (month, total_sales)" 
unPivotDF = df_customer_sales.select('item_type', expr(unpivotExpr))

So using an example dataset, our dataframe looks like this now: enter image description here

If my explanation is not clear, I made a youtube tutorial walkthrough of the solution: https://youtu.be/Nf78KMhNc3M

Geocoder
  • 123
  • 2
  • 8