0

I have an array that works when it is inside the closure (it has some values) but outside the loop, the array size is 0. I want to know what causes the behavior to be like that?

I need the hArr to be accessible outside for batch HBase put.

val hArr = new ArrayBuffer[Put]()

rdd.foreach(row => {
  val hConf = HBaseConfiguration.create()
  val hTable = new HTable(hConf, tablename)
  val hRow = new Put(Bytes.toBytes(row._1.toString))
  hRow.add(...)
  hArr += hRow
  println("hArr: " + hArr.toArray.mkString(","))
})

println("hArr.size: " + hArr.size)
sophie
  • 991
  • 2
  • 15
  • 34

2 Answers2

0

The problem is that any items in a rdd closure are copied and use local versions. foreach should only be used for saving to disk or something along those lines.

If you want this in an array, then you can map and then collect

rdd.map(row=> {
  val hConf = HBaseConfiguration.create()
  val hTable = new HTable(hConf, tablename)
  val hRow = new Put(Bytes.toBytes(row._1.toString))
  hRow.add(...)
  hRow
}).collect()
Justin Pihony
  • 66,056
  • 18
  • 147
  • 180
0

I found quite some new Spark users are confused about how the mapper and reducer functions get run and how they are related to things defined in driver's program. In general, all the mapper/reducer functions you defined and registered by map or foreach or reduceByKey or many other variants will not get executed in your driver's program. In your driver's program, you just register them for Spark to run them remotely and distributedly. When those functions reference some objects you instantiated in your driver's program, you literally created a "Closure", which will compile OK most of the time. But usually that's not what you intended and you will usually run into problem in runtime, by either seeing NotSerializable or ClassNotFound exceptions.

You can either do all outputing work remotely by foreach() variants or try to collecting all data back to your driver's program for output by calling collect(). But be careful with collect() as it'll collect all data from distributed nodes to your driver's program. You only do it when you are absolutely sure your final aggregated data is small.

Wesley Miao
  • 851
  • 5
  • 8