-2

Could someone help me how to avoid rdd conversion?

val qksDistribution: Array[((String, Int), Long)] = tripDataset
      .map(i => ((i.getFirstPoint.getQk.substring(0, QK_PARTITION_LEVEL), i.getProviderId), 1L))
      .rdd
      .reduceByKey(_+_)
      .filter(_._2>maxCountInPartition/10)
      .collect
jk1
  • 593
  • 6
  • 16
  • 1
    You can use [`groupByKey`](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset) and then pass the reduce function. In any case, why do you want to avoid the `rdd` transformation? – Luis Miguel Mejía Suárez May 21 '20 at 17:43
  • @LuisMiguelMejíaSuárez Because I am not really understand how that works, I've expect a lot of object allocations during such transformation – jk1 May 21 '20 at 18:00
  • can you provide the schema that the map returns? – Alfilercio May 21 '20 at 18:06
  • 1
    Yeah it may instantiate a few things, but I do not think it would be much of a problem. Anyways, the `groupByKey` is what you need. – Luis Miguel Mejía Suárez May 21 '20 at 18:06

1 Answers1

2
val qksDistribution: Array[((String, Int), Long)] = tripDataset
      .map(i => (i.getFirstPoint.getQk.substring(0, QK_PARTITION_LEVEL), i.getProviderId)) // no need to add the 1
      .groupByKey(x => x) //similar to key by
      .count // you wanted to count per key
      .filter(_._2>maxCountInPartition/10)
      .collect
Alfilercio
  • 1,088
  • 6
  • 13
  • can you help and suggest how to handle this https://stackoverflow.com/questions/62036791/while-writing-to-hdfs-path-getting-error-java-io-ioexception-failed-to-rename – BdEngineer May 27 '20 at 06:41