0

Using spark, I am parsing the csv file where each line represents a call made by the application user. After parsing, I get JavaRDD object, which usually contains multiple entries by a single user.

Now what I am trying to achieve is that sum up the total talking time of each individual user.I followed the word counting example given elsewhere and it is working in my case as well, however, I am not sure if this is the right way, because I have to map each parsed object to a separate key.

The code I have written is pasted below, however, I am not sure if this is the right way.

JavaRDD < Subscriber > cdrs = textFile.flatMap(new FlatMapFunction < String, Subscriber > () {
 public Iterable < Subscriber > call(String line) {
  List < Subscriber > list = new ArrayList < Subscriber > ();
  String[] fields = line.split(",");

  if (fields != null && fields[0].trim().matches("[0-9]+")) {
   Subscriber subscriber = new Subscriber();
   subscriber.setMsisdn(fields[0].trim());
   subscriber.setDuration(Double.parseDouble(fields[5].replaceAll("s", "")));

   list.add(subscriber);
  }

  return list;
 }
});

JavaPairRDD < String, Subscriber > counts = words.mapToPair(new PairFunction < Subscriber, String, Subscriber > () {
 public Tuple2 < String, Subscriber > call(Subscriber s) {
  return new Tuple2 < String, Subscriber > (s.getMsisdn(), s);
 }
}).reduceByKey(new Function2 < Subscriber, Subscriber, Subscriber > () {
 @Override
 public Subscriber call(Subscriber v1, Subscriber v2) throws Exception {
  v1.setDuration(v1.getDuration() + v2.getDuration());
  return v1;
 }
});
Waqas
  • 6,812
  • 2
  • 33
  • 50

1 Answers1

0

I'd write the following pseudo code (python-spark using spark 2.0) using spark dataframe:

df = spark.read.format("csv").option("header", "true").load("csv_file.csv")
new_df = df.groupBy("username").agg(sum("talk_time").alias("total_talk_time");

The first line - load CSV into dataframe (see my answer here https://stackoverflow.com/a/37640154/5088142 for more info)

The second line - group by the username column, and perform sum() function on the aggregate data of column "talk_time"

info on groupby / agg can be found here: http://spark.apache.org/docs/latest/sql-programming-guide.html#dataframegroupby-retains-grouping-columns

the new dataframe should hold a "username" column, and "total_talk_time" column - which will hold the data you are looking for.

You'll have to modify slightly it in order to execute it as Java-spark...

Community
  • 1
  • 1
Yaron
  • 10,166
  • 9
  • 45
  • 65