0

I currently am trying to do some aggregation on the services column. I would like to group all the similar services and sum the values, and if possible flatten this into a single row.

Input:

+------------------+--------------------+
|         cid      |            Services|
+------------------+--------------------+
|845124826013182686|     [112931, serv1]|
|845124826013182686|     [146936, serv1]|
|845124826013182686|      [32718, serv2]|
|845124826013182686|      [28839, serv2]|
|845124826013182686|       [8710, serv2]|
|845124826013182686|    [2093140, serv3]|

Hopeful Output:

+------------------+--------------------+------------------+--------------------+
|         cid      |            serv1   |    serv2         |      serv3         |               
+------------------+--------------------+------------------+--------------------+
|845124826013182686|             259867 |            70267 |            2093140 |

Below is the code I currently have

from pyspark.sql import SparkSession, functions
spark = SparkSession.builder.appName("Service Aggregation").getOrCreate()
pathToFile = '/path/to/jsonfile'
df = spark.read.json(pathToFile)
df2 = df.select('cid',functions.explode_outer(df.nodes.services))
finaldataFrame = df2.select('cid',(functions.explode_outer(df2.col)).alias('Services'))
finaldataFrame.show()

I am quite new to pyspark and have been looking at resources and trying to create some UDF to apply to that column but the map function withi pyspark only works fro RDDs and not DataFrames and am unsure how move forward to get the desired output.

Any suggestions or help would be much appreciated.

Result of printSchema

root
 |-- clusterId: string (nullable = true)
 |-- col: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cpuCoreInSeconds: long (nullable = true)
 |    |    |-- name: string (nullable = true)
pault
  • 41,343
  • 15
  • 107
  • 149
Rukgo
  • 121
  • 1
  • 13

1 Answers1

2

First, extract the service and the value from the Services column by position. Note this assumes that the value is always in position 0 and the service is always in position 1 (as shown in your example).

import pyspark.sql.functions as f
df2 = df.select(
    'cid',
    f.col("Services").getItem(0).alias('value').cast('integer'),
    f.col("Services").getItem(1).alias('service')
)

df2.show()
#+------------------+-------+-------+
#|               cid|  value|service|
#+------------------+-------+-------+
#|845124826013182686| 112931|  serv1|
#|845124826013182686| 146936|  serv1|
#|845124826013182686|  32718|  serv2|
#|845124826013182686|  28839|  serv2|
#|845124826013182686|   8710|  serv2|
#|845124826013182686|2093140|  serv3|
#+------------------+-------+-------+

Note that I casted the value to integer, but it may already be an integer depending on how your schema is defined.

Once the data is in this format, it's easy to pivot() it. Group by the cid column, pivot the service column, and aggregate by summing the value column:

df2.groupBy('cid').pivot('service').sum("value").show()
#+------------------+------+-----+-------+
#|               cid| serv1|serv2|  serv3|
#+------------------+------+-----+-------+
#|845124826013182686|259867|70267|2093140|
#+------------------+------+-----+-------+

Update

Based on the schema you provided, you will have to get the value and service by name, rather than by position:

df2 = df.select(
    'cid',
    f.col("Services").getItem("cpuCoreInSeconds").alias('value'),
    f.col("Services").getItem("name").alias('service')
)

The rest is the same. Also, no need to cast to integer as cpuCoreInSeconds is already a long.

pault
  • 41,343
  • 15
  • 107
  • 149
  • Keep getting an Error on .getItem when I try to pull the particular item out of service. But thank you so much this gives me a path forward, been stuck on this problem for the past day! `pyspark.sql.utils.AnalysisException: u"Field name should be String Literal, but it's 0;"` – Rukgo Jun 15 '18 at 20:18
  • @Rukgo please [edit] your question and include the output of `df.printSchema()` -- it's likely that your `Services` column is a Struct – pault Jun 15 '18 at 20:27
  • I have added the printSchema, you are correct it is a Struct – Rukgo Jun 15 '18 at 20:30
  • Was able to get the desired output. Instead of having to get the index of the column was able to just call it because it was Structured. Truly appreciate the help, been stuck on this for a while! – Rukgo Jun 15 '18 at 21:18
  • @Rukgo yes, you'd have to get the fields by name rather than by position. I'll post an update. – pault Jun 15 '18 at 21:18