0

I have some data below:

b   3
c   1
a   1
b   2
b   1
a   2

I want to repartition 3 sections by first column, and save as files, the result should be like this(needn't sort):

//file: part-00000
a   1
a   2

//file: part-00001
b   3
b   2
b   1

//file: part-00002
c   1

I try to call repartition function, but it can't achieve my purpose.

How to do it? Thank you very much!

Guo
  • 1,761
  • 2
  • 22
  • 45
  • 1
    Spark supports Hash and Range Partitioner. Though Range Partitioner may suffice your need to an extent but in case your requirements is not fulfilled by both then you need to write your custom Partitioner. But even before thinking about that, can you tell the reasons/ benefits for Partitioning the way you have defined? – Sumit Jan 18 '16 at 08:39
  • Thank you for your response! I know how to define custom partitioner, but I don't know how to use it when I call _repartition function_ and I don't know where custom partitioner can be used correctly. Could you tell me how to use custom partitioner? My custom partitioner is:`class MyPartitioner(val partitions: Int = 1) extends Partitioner{ def numPartitions = partitions def getPartition(key: Any): Int = { val s = key.toString if(s.equals("a")) 0 else if(s.equals("b")) 1 % partitions else if(s.equals("c")) 2 % partitions else 0 } }` – Guo Jan 18 '16 at 09:16
  • Please see if the answer works for you. – Sumit Jan 18 '16 at 09:46
  • @Guo; You may live this question too : http://stackoverflow.com/questions/31610971/spark-repartition-vs-coalesce – Ravindra babu Jan 18 '16 at 10:42

3 Answers3

1

Custom Partitioners can be used only with RDD to type Key/ Value i.e. PairRDDFunctions.partitionBy(partitioner: Partitioner). For more Information refer to PairRDDFunctions API.

Sumit
  • 1,400
  • 7
  • 9
1

You need to call the partitionBy-function to partition the data with your custom partitioner. I can recommend reading the "Data Partitioning"-section of this online book: https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html

Glennie Helles Sindholt
  • 12,816
  • 5
  • 44
  • 50
0

More complements for Sumit's answner:
Implement your custimized org.apache.spark.Partitioner. For example:

class AlphbetPartitioner extends Partitioner {

  override def numPartitions: Int = 26

  override def getPartition(key: Any): Int = {

    return key.asInstanceOf[scala.Char].asDigit % numPartitions
  }
}

Example Code for PairRDDFunctions.partitionBy(partitioner: Partitioner)

val data = Array(('b', 3), ('c', 1), ('a', 1), ('b', 2), ('b', 1), ('a', 2))
val distData = sc.parallelize(data,1).map(u => (u._1, u._2)).partitionBy(new AlphbetPartitioner).map(u=>u._1+","+u._2+"\t")
Shawn Guo
  • 3,169
  • 3
  • 21
  • 28