1

I want to group my dataset by this first part of my string. So to group it by "SC Freiburg","Arsenal" and so on... Also in addition to group by, I need the count of those which are grouped.

scala> res61.foreach(println)
SC Freiburg,2014,Germany,7747
Arsenal,2014,Germany,7745
Arsenal,2014,Germany,7750
Arsenal,2014,Germany,7758
Bayern Munich,2014,Germany,7737
Bayern Munich,2014,Germany,7744
Bayern Munich,2014,Germany,7746
Bayern Munich,2014,Germany,7749
Bayern Munich,2014,Germany,7752
Bayern Munich,2014,Germany,7754
Bayern Munich,2014,Germany,7755
Borussia Dortmund,2014,Germany,7739
Borussia Dortmund,2014,Germany,7740
Borussia Dortmund,2014,Germany,7742
Borussia Dortmund,2014,Germany,7743
Borussia Dortmund,2014,Germany,7756
Borussia Mönchengladbach,2014,Germany,7757
Schalke 04,2014,Germany,7741
Schalke 04,2014,Germany,7753
Chelsea,2014,Germany,7751
Hannover 96,2014,Germany,7738
Real Madrid,2014,Germany,7748
Lazio,2014,Germany,7759

HINT: I have to use rdd operations, please do not suggest using dataframes I have seen this post: spark dataset group by and sum But I do not know to reproduce it in my example.

This is result output, from my postgresql database:

result

Lazar Gugleta
  • 115
  • 1
  • 2
  • 14

3 Answers3

3

RDD has groupBy() and groupByKey() methods for this. for example to have group count you can do:

val str ="""SC Freiburg,2014,Germany,7747
   Arsenal,2014,Germany,7745
   ...
"""
val rdd = sc.parallelize(str.split("\n"))
rdd.map (_.split(",")).keyBy(_(0)).groupByKey().map {case (k, v) => (k, v.size)}.collect
Artem Aliev
  • 1,362
  • 7
  • 12
  • Okay, that helped a lot. Do you know how can I order by the count? Current result: ```scala scala> res63.foreach(println) (SC Freiburg,1) (Hannover 96,1) (Bayern Munich,7) (Borussia Dortmund,5) (Lazio,1) (Chelsea,1) (Arsenal,3) (Borussia Mönchengladbach,1) (Schalke 04,2) (Real Madrid,1) ``` – Lazar Gugleta Jun 18 '19 at 17:56
  • sortBy() gives me weird output, I need it to be descending. – Lazar Gugleta Jun 18 '19 at 18:01
  • make size a key use sortByKey(false) rdd.map (_.split(",")).keyBy(_(0)).groupByKey().map {case (k, v) => (v.size, k)}.sortByKey(false).collect – Artem Aliev Jun 18 '19 at 18:06
  • I figured it out. I used .sortWith(_._2 > _._2) and it gave me a descending order. – Lazar Gugleta Jun 18 '19 at 18:07
1

Assuming "yourrdd" represents the data you showed earlier, you could use something like below to arrive at result.

yourrdd.groupBy(_(0)).map(x => (x._1,x._2.size)).sortBy((x => x._2),false).collect.foreach(println)
Amit
  • 1,111
  • 1
  • 8
  • 14
1

your mycsv is your csv as a file.

groupByKey(_._1.toLowerCase)

is what you need.


Note: RDD approach using large data is performance bottle neck since its using java serialization if you are using dataframe datasets tungsten will be used as internal memory format. So always prefer DataSet and DataFrame approches.

package com.examples

import org.apache.log4j.Level
import org.apache.spark.sql.{Dataset, KeyValueGroupedDataset, SparkSession}


object DataSetGroupTest {
org.apache.log4j.Logger.getLogger("org").setLevel(Level.ERROR)

def main(args: Array[String]) {

 val spark = SparkSession.builder.
   master("local")
   .appName("DataSetGroupTest")
   .getOrCreate()

 import spark.implicits._
 // if you have a file
 val csvData: Dataset[String] = spark.read.text("mycsv.csv").as[String]

 csvData.show(false)
 //csvData.foreach(println(_))
 val words: Dataset[Array[String]] = csvData.map(value => value.split(","))
 println("convert to array")
 val finalwords: Dataset[(String, String, String, String)] = words.map { case Array(f1, f2, f3, f4) => (f1, f2, f3, f4) }
 finalwords.foreach(println(_))
 val groupedWords: KeyValueGroupedDataset[String, (String, String, String, String)] = finalwords.groupByKey(_._1.toLowerCase)
 val counts: Dataset[(String, Long)] = groupedWords.count().sort($"count(1)".desc)
 counts.show(false)
}
}

Result :

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
+------------------------------------------+
|value                                     |
+------------------------------------------+
|Freiburg,2014,Germany,7747                |
|Arsenal,2014,Germany,7745                 |
|Arsenal,2014,Germany,7750                 |
|Arsenal,2014,Germany,7758                 |
|Bayern Munich,2014,Germany,7737           |
|Bayern Munich,2014,Germany,7744           |
|Bayern Munich,2014,Germany,7746           |
|Bayern Munich,2014,Germany,7749           |
|Bayern Munich,2014,Germany,7752           |
|Bayern Munich,2014,Germany,7754           |
|Bayern Munich,2014,Germany,7755           |
|Borussia Dortmund,2014,Germany,7739       |
|Borussia Dortmund,2014,Germany,7740       |
|Borussia Dortmund,2014,Germany,7742       |
|Borussia Dortmund,2014,Germany,7743       |
|Borussia Dortmund,2014,Germany,7756       |
|Borussia Mönchengladbach,2014,Germany,7757|
|Schalke 04,2014,Germany,7741              |
|Schalke 04,2014,Germany,7753              |
|Chelsea,2014,Germany,7751                 |
+------------------------------------------+
only showing top 20 rows

convert to array
(Freiburg,2014,Germany,7747)
(Arsenal,2014,Germany,7745)
(Arsenal,2014,Germany,7750)
(Arsenal,2014,Germany,7758)
(Bayern Munich,2014,Germany,7737)
(Bayern Munich,2014,Germany,7744)
(Bayern Munich,2014,Germany,7746)
(Bayern Munich,2014,Germany,7749)
(Bayern Munich,2014,Germany,7752)
(Bayern Munich,2014,Germany,7754)
(Bayern Munich,2014,Germany,7755)
(Borussia Dortmund,2014,Germany,7739)
(Borussia Dortmund,2014,Germany,7740)
(Borussia Dortmund,2014,Germany,7742)
(Borussia Dortmund,2014,Germany,7743)
(Borussia Dortmund,2014,Germany,7756)
(Borussia Mönchengladbach,2014,Germany,7757)
(Schalke 04,2014,Germany,7741)
(Schalke 04,2014,Germany,7753)
(Chelsea,2014,Germany,7751)
(Hannover 96,2014,Germany,7738)
(Real Madrid,2014,Germany,7748)
(Lazio,2014,Germany,7759)
                                                                               +------------------------+--------+
|value                   |count(1)|
+------------------------+--------+
|bayern munich           |7       |
|borussia dortmund       |5       |
|arsenal                 |3       |
|schalke 04              |2       |
|lazio                   |1       |
|hannover 96             |1       |
|chelsea                 |1       |
|real madrid             |1       |
|freiburg                |1       |
|borussia mönchengladbach|1       |
+------------------------+--------+

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121