1

I have the following data where I need to group based on key and count the number based on key to monitor the metrics. I can use groupBy and do the count for that group but this involves some shuffle. Can we do without doing the shuffle ?

ID,TempID,PermanantID
----------

xxx, abcd, 12345

xxx, efg, 1345

xxx, ijk, 1534

xxx, lmn, 13455

xxx, null, 12345

xxx, axg, null

yyy, abcd, 12345

yyy, efg, 1345

yyy, ijk, 1534

zzz, lmn, 13455

zzz, abc, null

output should be

ID Count1 Count2
----------
XXX 5 5

YYY 3 3

ZZZ 2 1

I can do this with groupBy and count

dataframe.groupby("ID").agg(col("TempID").as("Count1"),count(col("PermanantID").as("Count2"))

can we do this using mapPartition ?

thebluephantom
  • 16,458
  • 8
  • 40
  • 83

2 Answers2

2

The question, whilst understandable, is flawed.

mapPartitions cannot be used directly on a dataframe, but on an RDD and Dataset.

Moreover, what about the partitioning and shuffling required prior to invoking the mapPartitions? Otherwise, the results will be incorrect. There is no mention of the guarantee of the order of the data initially in the question.

Hence, I would rely on the groupBy approach as postulated. It's an illusion to think that no shuffling is required in an App, rather we can reduce shuffling, that is the goal.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
  • 1
    I want to get these stats for each partition and send to external system where i have option to aggregate and yes, goal is to reduce the shuffle but from info on map partition shows shuffle will be avoided as your are computing or aggregating at each executors where it has one or more partitions. its spark structured streaming ..so expect the data come from kafka on fly with micro batches.. and i can see dataframe.mappartion function. why cant we do on dataframes ? – user2837600 Oct 01 '19 at 08:49
  • These are extra questions. Post new questions. Spark does not cater for df with mapP. Btw, I know how it works. – thebluephantom Oct 01 '19 at 08:55
  • ok. my question remain the same. if i convert it to Dataset, i will be able to do mappartition. but is it possible to do group and aggegation on iterator provided by mappartion ? – user2837600 Oct 01 '19 at 09:17
  • Yes, but you have to partition correctly first. All the xxxx need to be in same partition. – thebluephantom Oct 01 '19 at 09:41
  • So you cannot avoid shuffle. – thebluephantom Oct 01 '19 at 09:42
  • But you will make life difficult, I do not advise it. – thebluephantom Oct 01 '19 at 09:46
  • as said, I am not considering data of other partitions of xxx ID's. As each partition computes for their partition and send to external system which can aggregate .. looking for code snippet all xxx is in the same partition or not care about other partition for time being – user2837600 Oct 01 '19 at 09:51
  • Interesting approach. I work as Architect, I would not approve. You are interested in a combine reduce intermediate result... – thebluephantom Oct 01 '19 at 10:23
  • https://stackoverflow.com/questions/46690575/spark-aggregate-on-multiple-columns-within-partition-without-shuffle – thebluephantom Oct 01 '19 at 10:33
  • https://stackoverflow.com/questions/50291201/doing-reducebykey-on-each-partition-of-rdd-separately-without-aggregating-result – thebluephantom Oct 01 '19 at 10:44
  • https://stackoverflow.com/questions/31082066/is-there-a-way-to-rewrite-spark-rdd-distinct-to-use-mappartitions-instead-of-dis – thebluephantom Oct 01 '19 at 10:47
  • Gives you an idea – thebluephantom Oct 01 '19 at 10:47
  • Used https://stackoverflow.com/questions/46690575/spark-aggregate-on-multiple-columns-within-partition-without-shuffle but still it does not group it fully. iterator aggregates only if the values are next to each other. I want to use the groupby inside that iterator. Thanks for your link . It fully does not solve but solves part of it. – user2837600 Oct 04 '19 at 09:06
  • But it is a pretty tall order when you think about it. – thebluephantom Oct 04 '19 at 09:17
0

Old question, but I feel the question was left somewhat unanswered. Answering the implicit question in the comments, it looks like OP wants to aggregate per partition first, then by the group (to avoid shuffle at all costs). So the output will (purposefully) not look like the example output given in the question.

Good idea or not aside, this seems like it would achieve the aggregation without shuffle

import org.apache.spark.sql._


dataframe.
  withColumn("partition_id", spark_partition_id).
  groupby(col("partition_id"), col("ID")).
  agg(
    col("TempID").as("Count1"),
    count(col("PermanantID").as("Count2")).
  drop(col("partition_id"))
Akhil Nair
  • 3,144
  • 1
  • 17
  • 32