0

I have a data with the following scheme:

sourceip
destinationip
packets sent

And I want to calculate several aggregative fields out of this data and have the following schema:

ip 
packets sent as sourceip
packets sent as destination

In the happy days of RDDs I could use aggregate, define a map of {ip -> []}, and count the appearances in a corresponding array location.

In the Dataset/Dataframe aggregate is no longer available, instead UDAF could be used, unfortunately, from the experience I had with UDAF they are immutable, means they cannot be used (have to create a new instance on every map update) example + explanation here

on one hand, technically, I could convert the Dataset to RDD, aggregate etc and go back to dataset. Which I expect would result in performance degradation, as Datasets are more optimized. UDAFs are out of the question due to the copying.

Is there any other way to perform aggregations?

antonpuz
  • 3,256
  • 4
  • 25
  • 48

4 Answers4

3

It sounds like you need a standard melt (How to melt Spark DataFrame?) and pivot combination:

val df = Seq(
  ("192.168.1.102", "192.168.1.122", 10),
  ("192.168.1.122", "192.168.1.65", 10),
  ("192.168.1.102", "192.168.1.97", 10)
).toDF("sourceip", "destinationip", "packets sent")


df.melt(Seq("packets sent"), Seq("sourceip", "destinationip"), "type", "ip")
  .groupBy("ip")
  .pivot("type", Seq("sourceip", "destinationip"))
  .sum("packets sent").na.fill(0).show

// +-------------+--------+-------------+             
// |           ip|sourceip|destinationip|
// +-------------+--------+-------------+
// | 192.168.1.65|       0|           10|
// |192.168.1.102|      20|            0|
// |192.168.1.122|      10|           10|
// | 192.168.1.97|       0|           10|
// +-------------+--------+-------------+
Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
2

One way to go about it without any custom aggregation would be to use flatMap (or explode for dataframes) like this:

case class Info(ip : String, sent : Int, received : Int)
case class Message(from : String, to : String, p : Int)
val ds = Seq(Message("ip1", "ip2", 5), 
             Message("ip2", "ip3", 7), 
             Message("ip2", "ip1", 1), 
             Message("ip3", "ip2", 3)).toDS()

ds
    .flatMap(x => Seq(Info(x.from, x.p, 0), Info(x.to, 0, x.p)))
    .groupBy("ip")
    .agg(sum('sent) as "sent", sum('received) as "received")
    .show


// +---+----+--------+
// | ip|sent|received|
// +---+----+--------+
// |ip2|   8|       8|
// |ip3|   3|       7|
// |ip1|   5|       1|
// +---+----+--------+

As far as the performance is concerned, I am not sure a flatMap is an improvement versus a custom aggregation though.

Oli
  • 9,766
  • 5
  • 25
  • 46
  • Thank you very much! this seems like a perfect solution. I have to accept user8371915's solution not because yours is worse but because flatMap is not available in pyspark (sorry, had to mention is earlier). – antonpuz Jul 10 '18 at 11:04
  • No problem, I am glad you found an answer. In case you are interested, I added another answer with the same logic using only pyspark dataframes. – Oli Jul 10 '18 at 12:39
1

Here is a pyspark version using explode. It is more verbose but the logic is exactly the same as the flatMap version, only with pure dataframe code.

sc\
  .parallelize([("ip1", "ip2", 5), ("ip2", "ip3", 7), ("ip2", "ip1", 1), ("ip3", "ip2", 3)])\
  .toDF(("from", "to", "p"))\
  .select(F.explode(F.array(\
      F.struct(F.col("from").alias("ip"),\
               F.col("p").alias("received"),\
               F.lit(0).cast("long").alias("sent")),\
      F.struct(F.col("to").alias("ip"),\
               F.lit(0).cast("long").alias("received"),\
               F.col("p").alias("sent")))))\
  .groupBy("col.ip")\
  .agg(F.sum(F.col("col.received")).alias("received"), F.sum(F.col("col.sent")).alias("sent"))

// +---+----+--------+
// | ip|sent|received|
// +---+----+--------+
// |ip2|   8|       8|
// |ip3|   3|       7|
// |ip1|   5|       1|
// +---+----+--------+
Oli
  • 9,766
  • 5
  • 25
  • 46
  • first, thx for the investment of time, isn't the logic the same as the melt function presented by user8371915? – antonpuz Jul 10 '18 at 12:45
  • Well the approach is very similar because `melt` explodes the dataframe as well. Yet, this approach does not need the additional pivot and would probably be slightly more efficient. – Oli Jul 10 '18 at 13:23
  • could you elaborate on why it would be more efficient? I guess its because you manually defined the fields to be aggregated rather than using a general `pivot`, could you approve? – antonpuz Jul 10 '18 at 15:19
  • Melt generates records of type (ip, string, int) whereas this one generates records of type (ip, int, int) which would take less space. This could be fixed by replacing the string by an index though. Also, the melt/pivot/fill is slightly more complex (it generates 3 spark stages vs 2 for this one, 2 shuffle phases vs one). On some sample data I generated, the melt/pivot approach takes approx 50% more time. – Oli Jul 11 '18 at 08:14
-1

Since you didn't mention the context and aggregations, you may do something like below,

val df = ??? // your dataframe/ dataset

From Spark source:

(Scala-specific) Compute aggregates by specifying a map from column name to aggregate methods. The resulting DataFrame will also contain the grouping columns. The available aggregate methods are avg, max, min, sum, count.

// Selects the age of the oldest employee and the aggregate expense for each department

 df
 .groupBy("department")
 .agg(Map(
      "age" -> "max",
      "expense" -> "sum"   
     ))
Chitral Verma
  • 2,695
  • 1
  • 17
  • 29