3

I have non-unique key-value pairs that I have created using the map function from an RDD Array[String]

val kvPairs = myRdd.map(line => (line(0), line(1)))

This produces data of format:

1, A
1, A
1, B
2, C

I would like to group all of they keys by their values and provide the counts for these values like so:

1, {(A, 2), (B, 1)}
2, {(C, 1)}

I have tried many different attempts, but the closest I can get is with something like this:

kvPairs.sortByKey().countByValue()

This gives

1, (A, 2)
1, (B, 1)
2, (C, 1)

Also,

kvPairs.groupByKey().sortByKey()

Provides value, but it still isn't quite there:

1, {(A, A, B)}
2, {(C)}

I tried combining the two together:

kvPairs.countByValue().groupByKey().sortByKey()

But this return an error

error: value groupByKey is not a member of scala.collection.Map[(String, String),Long]

Community
  • 1
  • 1
Brian
  • 7,098
  • 15
  • 56
  • 73

1 Answers1

10

Just count pairs directly and group (if you have to) afterwards:

kvPairs.map((_, 1L))
  .reduceByKey(_ + _)
  .map{ case ((k, v), cnt) => (k, (v, cnt)) }
  .groupByKey

If you want to gropuByKey after reducing you may want to use custom partitioner which considers only the first element of the key. You can check RDD split and do aggregation on new RDDs for an example implementation.

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935