0

I have an rdd with type RDD[String] as an example here is a part of it as such:

1990,1990-07-08
1994,1994-06-18
1994,1994-06-18
1994,1994-06-22
1994,1994-06-22
1994,1994-06-26
1994,1994-06-26
1954,1954-06-20
2002,2002-06-26
1954,1954-06-23
2002,2002-06-29
1954,1954-06-16
2002,2002-06-30
...

result: (1982,52) (2006,64) (1962,32) (1966,32) (1986,52) (2002,64) (1994,52) (1974,38) (1990,52) (2010,64) (1978,38) (1954,26) (2014,64) (1958,35) (1998,64) (1970,32)

I group it nicely, but my problem is this v.size part, I do not know to to calculate that length.

Just to put it in perspective, here are expected results:

It is not a mistake that there is two times for 2002. But ignore that.
Lazar Gugleta
  • 115
  • 1
  • 2
  • 14

1 Answers1

1

define date format:

val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")

and order:

implicit val localDateOrdering: Ordering[LocalDate] = Ordering.by(_.toEpochDay)

create a function that receives "v" and returns MAX(date_of_matching_year) - MIN(date_of_matching_year)) = LENGTH (in days):

def f(v: Iterable[Array[String]]): Int = {
    val parsedDates = v.map(LocalDate.parse(_(1), formatter))
    parsedDates.max.getDayOfYear - parsedDates.min.getDayOfYear

then replace the v.size with f(v)

Liza Shakury
  • 670
  • 9
  • 20
  • What is the type of v? v: ..? I tried int, but map is not for int – Lazar Gugleta Jun 19 '19 at 18:32
  • ```scala scala> val evega = concat.map(_.split(",")).keyBy(_(0)).groupByKey().map{case (k, v) => (k, f(v))} :36: error: type mismatch; found : Iterable[Array[String]] required: Array[String] val evega = concat.map(_.split(",")).keyBy(_(0)).groupByKey().map{case (k, v) => (k, f(v))} ^ ``` I tried this – Lazar Gugleta Jun 19 '19 at 18:53
  • And function is: ```scala scala> def f(v: Array[String]): Int = { | val parsedDates = v.map(LocalDate.parse(_, formatter)) | parsedDates.max.getDayOfYear - parsedDates.min.getDayOfYear} f: (v: Array[String])Int ``` – Lazar Gugleta Jun 19 '19 at 18:53
  • I have edited my answer, V is a collection of all the rows that match the same key. Since every row is an array (because of the .split(",")) it is an Iterable[Array[String]] . Now what I changed in my answer is going through all of the rows (with the flatmap) and parse the date on the second column and return it. – Liza Shakury Jun 20 '19 at 08:28
  • Why does this happen? ```scala :37: error: missing parameter type for expanded function ((x$1) => x$1(1)) val parsedDates = v.flatMap(LocalDate.parse(_(1), formatter)) ``` – Lazar Gugleta Jun 20 '19 at 08:32
  • I can't reproduce that error on my end. Try using map instead of flatMap. (I changed it in my answer) Did it help?> – Liza Shakury Jun 20 '19 at 08:49
  • I fixed it, but now I get a org.apache.spark.SparkException: Task not serializable error – Lazar Gugleta Jun 20 '19 at 09:16
  • https://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou – Liza Shakury Jun 20 '19 at 13:27
  • https://stackoverflow.com/questions/43592742/spark-scala-task-not-serializable-error?rq=1 – Liza Shakury Jun 20 '19 at 13:27
  • https://stackoverflow.com/questions/36132451/spark-and-not-serializable-datetimeformatter I tried all of those and none work. The error remains: Caused by: java.io.NotSerializableException: java.time.format.DateTimeFormatter – Lazar Gugleta Jun 20 '19 at 14:54