your mycsv is your csv as a file.
groupByKey(_._1.toLowerCase)
is what you need.
Note:
RDD approach using large data is performance bottle neck since its using java serialization if you are using dataframe datasets tungsten will be used as internal memory format. So always prefer DataSet and DataFrame approches.
package com.examples
import org.apache.log4j.Level
import org.apache.spark.sql.{Dataset, KeyValueGroupedDataset, SparkSession}
object DataSetGroupTest {
org.apache.log4j.Logger.getLogger("org").setLevel(Level.ERROR)
def main(args: Array[String]) {
val spark = SparkSession.builder.
master("local")
.appName("DataSetGroupTest")
.getOrCreate()
import spark.implicits._
// if you have a file
val csvData: Dataset[String] = spark.read.text("mycsv.csv").as[String]
csvData.show(false)
//csvData.foreach(println(_))
val words: Dataset[Array[String]] = csvData.map(value => value.split(","))
println("convert to array")
val finalwords: Dataset[(String, String, String, String)] = words.map { case Array(f1, f2, f3, f4) => (f1, f2, f3, f4) }
finalwords.foreach(println(_))
val groupedWords: KeyValueGroupedDataset[String, (String, String, String, String)] = finalwords.groupByKey(_._1.toLowerCase)
val counts: Dataset[(String, Long)] = groupedWords.count().sort($"count(1)".desc)
counts.show(false)
}
}
Result :
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
+------------------------------------------+
|value |
+------------------------------------------+
|Freiburg,2014,Germany,7747 |
|Arsenal,2014,Germany,7745 |
|Arsenal,2014,Germany,7750 |
|Arsenal,2014,Germany,7758 |
|Bayern Munich,2014,Germany,7737 |
|Bayern Munich,2014,Germany,7744 |
|Bayern Munich,2014,Germany,7746 |
|Bayern Munich,2014,Germany,7749 |
|Bayern Munich,2014,Germany,7752 |
|Bayern Munich,2014,Germany,7754 |
|Bayern Munich,2014,Germany,7755 |
|Borussia Dortmund,2014,Germany,7739 |
|Borussia Dortmund,2014,Germany,7740 |
|Borussia Dortmund,2014,Germany,7742 |
|Borussia Dortmund,2014,Germany,7743 |
|Borussia Dortmund,2014,Germany,7756 |
|Borussia Mönchengladbach,2014,Germany,7757|
|Schalke 04,2014,Germany,7741 |
|Schalke 04,2014,Germany,7753 |
|Chelsea,2014,Germany,7751 |
+------------------------------------------+
only showing top 20 rows
convert to array
(Freiburg,2014,Germany,7747)
(Arsenal,2014,Germany,7745)
(Arsenal,2014,Germany,7750)
(Arsenal,2014,Germany,7758)
(Bayern Munich,2014,Germany,7737)
(Bayern Munich,2014,Germany,7744)
(Bayern Munich,2014,Germany,7746)
(Bayern Munich,2014,Germany,7749)
(Bayern Munich,2014,Germany,7752)
(Bayern Munich,2014,Germany,7754)
(Bayern Munich,2014,Germany,7755)
(Borussia Dortmund,2014,Germany,7739)
(Borussia Dortmund,2014,Germany,7740)
(Borussia Dortmund,2014,Germany,7742)
(Borussia Dortmund,2014,Germany,7743)
(Borussia Dortmund,2014,Germany,7756)
(Borussia Mönchengladbach,2014,Germany,7757)
(Schalke 04,2014,Germany,7741)
(Schalke 04,2014,Germany,7753)
(Chelsea,2014,Germany,7751)
(Hannover 96,2014,Germany,7738)
(Real Madrid,2014,Germany,7748)
(Lazio,2014,Germany,7759)
+------------------------+--------+
|value |count(1)|
+------------------------+--------+
|bayern munich |7 |
|borussia dortmund |5 |
|arsenal |3 |
|schalke 04 |2 |
|lazio |1 |
|hannover 96 |1 |
|chelsea |1 |
|real madrid |1 |
|freiburg |1 |
|borussia mönchengladbach|1 |
+------------------------+--------+