3

I'm doing a groupBy for calculating a value, but it seems that when I group by, I lose all the fields that are not in the aggregation keys:

filtered.filterNot('site) {s:String => ...}
        .filterNot('date) {s:String => ...}
aggr = filtered.groupBy('id, 'contentHost) { group =>
    group.min('timestamp -> 'min)
    //how do I keep original fields? (eg: site, date)
}

aggr.store(Tsv(...)) //eg: field "site" won't be here

in pig, it would be something like this:

aggr = group filtered by concat('id, 'contentHost);

result = foreach aggr {
  generate flatten(filtered), //how to do this in scalding?
           min(filtered.timestamp) as min;
}
Miguel Ping
  • 18,082
  • 23
  • 88
  • 136

1 Answers1

4

I had the same problem with the tuple API and could only solve it by using the typed API.

You can either use Scala tuples or define your own case class outside your job. E.g.:

case class Data(id: String, site: String, date: String, contentHost: String)

Then you'd process it like this:

val filtered: TypedPipe[Data] = TypedPipe.from(Seq(Data("...", "2014-04-14", "...", "...")))

filtered
  .filterNot ( data => data.site == "fr" )
  .filterNot ( data => data.date == "2014-02-01" )
  .groupBy (data => (data.id, data.contentHost)) // (String,String) -> Data
  .min // or .minBy { ... }
  .toTypedPipe
  .write(TypedTsv[((String, String), Data)]("/path/"))
Marius Soutier
  • 11,184
  • 1
  • 38
  • 48