5

I have a Vagrant image with Spark Notebook, Spark, Accumulo 1.6, and Hadoop all running. From notebook, I can manually create a Scanner and pull test data from a table I created using one of the Accumulo examples:

val instanceNameS = "accumulo"
val zooServersS = "localhost:2181"
val instance: Instance = new ZooKeeperInstance(instanceNameS, zooServersS)
val connector: Connector = instance.getConnector( "root", new PasswordToken("password"))
val auths = new Authorizations("exampleVis")
val scanner = connector.createScanner("batchtest1", auths)

scanner.setRange(new Range("row_0000000000", "row_0000000010"))

for(entry: Entry[Key, Value] <- scanner) {
  println(entry.getKey + " is " + entry.getValue)
}

will give the first ten rows of table data.

When I try to create the RDD thusly:

val rdd2 = 
  sparkContext.newAPIHadoopRDD (
    new Configuration(), 
    classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat], 
    classOf[org.apache.accumulo.core.data.Key], 
    classOf[org.apache.accumulo.core.data.Value]
  )

I get an RDD returned to me that I can't do much with due to the following error:

java.io.IOException: Input info has not been set. at org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630) at org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343) at org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1367) at org.apache.spark.rdd.RDD.count(RDD.scala:927)

This totally makes sense in light of the fact that I haven't specified any parameters as to which table to connect with, what the auths are, etc.

So my question is: What do I need to do from here to get those first ten rows of table data into my RDD?

update one Still doesn't work, but I did discover a few things. Turns out there are two nearly identical packages,

org.apache.accumulo.core.client.mapreduce

&

org.apache.accumulo.core.client.mapred

both have nearly identical members, except for the fact that some of the method signatures are different. not sure why both exist as there's no deprecation notice that I could see. I attempted to implement Sietse's answer with no joy. Below is what I did, and the responses:

import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.conf.Configuration
val jobConf = new JobConf(new Configuration)

import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.conf.Configuration jobConf: org.apache.hadoop.mapred.JobConf = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml

Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml

AbstractInputFormat.setConnectorInfo(jobConf, 
                                     "root", 
                                     new PasswordToken("password")

AbstractInputFormat.setScanAuthorizations(jobConf, auths)

AbstractInputFormat.setZooKeeperInstance(jobConf, new ClientConfiguration)

val rdd2 = 
  sparkContext.hadoopRDD (
    jobConf, 
    classOf[org.apache.accumulo.core.client.mapred.AccumuloInputFormat], 
    classOf[org.apache.accumulo.core.data.Key], 
    classOf[org.apache.accumulo.core.data.Value], 
    1
  )

rdd2: org.apache.spark.rdd.RDD[(org.apache.accumulo.core.data.Key, org.apache.accumulo.core.data.Value)] = HadoopRDD[1] at hadoopRDD at :62

rdd2.first

java.io.IOException: Input info has not been set. at org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630) at org.apache.accumulo.core.client.mapred.AbstractInputFormat.validateOptions(AbstractInputFormat.java:308) at org.apache.accumulo.core.client.mapred.AbstractInputFormat.getSplits(AbstractInputFormat.java:505) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.rdd.RDD.take(RDD.scala:1077) at org.apache.spark.rdd.RDD.first(RDD.scala:1110) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:64) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:69) at...

* edit 2 *

re: Holden's answer - still no joy:

    AbstractInputFormat.setConnectorInfo(jobConf, 
                                         "root", 
                                         new PasswordToken("password")
    AbstractInputFormat.setScanAuthorizations(jobConf, auths)
    AbstractInputFormat.setZooKeeperInstance(jobConf, new ClientConfiguration)
    InputFormatBase.setInputTableName(jobConf, "batchtest1")
    val rddX = sparkContext.newAPIHadoopRDD(
      jobConf, 
      classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat], 
      classOf[org.apache.accumulo.core.data.Key], 
      classOf[org.apache.accumulo.core.data.Value]
      )

rddX: org.apache.spark.rdd.RDD[(org.apache.accumulo.core.data.Key, org.apache.accumulo.core.data.Value)] = NewHadoopRDD[0] at newAPIHadoopRDD at :58

Out[15]: NewHadoopRDD[0] at newAPIHadoopRDD at :58

rddX.first

java.io.IOException: Input info has not been set. at org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630) at org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343) at org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.rdd.RDD.take(RDD.scala:1077) at org.apache.spark.rdd.RDD.first(RDD.scala:1110) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:61) at

edit 3 -- progress!

i was able to figure out why the 'input INFO not set' error was occurring. the eagle-eyed among you will no doubt see the following code is missing a closing '('

AbstractInputFormat.setConnectorInfo(jobConf, "root", new PasswordToken("password") 

as I'm doing this in spark-notebook, I'd been clicking the execute button and moving on because I wasn't seeing an error. what I forgot was that notebook is going to do what spark-shell will do when you leave off a closing ')' -- it will wait forever for you to add it. so the error was the result of the 'setConnectorInfo' method never getting executed.

unfortunately, I'm still unable to shove the accumulo table data into an RDD that's useable to me. when I execute

rddX.count

I get back

res15: Long = 10000

which is the correct response - there are 10,000 rows of data in the table I pointed to. however, when I try to grab the first element of data thusly:

rddX.first

I get the following error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.accumulo.core.data.Key

any thoughts on where to go from here?

edit 4 -- success!

the accepted answer + comments are 90% of the way there - except for the fact that the accumulo key/value need to be cast into something serializable. i got this working by invoking the .toString() method on both. i'll try to post something soon that's complete working code incase anyone else runs into the same issue.

snerd
  • 1,238
  • 1
  • 14
  • 28
  • hello David, just wondering a quick thing (because I don't know yet about accumulo ^^). Did you already tried this kind of thing in the spark-shell? So that I'll know if it's a spark-notebook issue or no :-D. If it's a accumulo thingy I'll can see with @lossyrob that used Accumulo with Spark in Geotrellis – Andy Petrella Mar 25 '15 at 12:49
  • 1
    @andypetrella i haven't tried this in spark-shell because - i think - spark-notebook is simply pass along my commands to spark and returning to me what it gets back from spark (you would know better than i on that). i will say that when I attempt to follow the instructions in the accumulo docs, section 9.1.2, I get a "java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING" error for "Job job = new Job(getConf())" or a 'i don't know what getConf()" is message, depending on how i set things up. – snerd Mar 25 '15 at 17:14
  • 1
    i see here http://pastebin.com/ti7Qz19m that this person is following the method specified in the accumulo docs - but I can't get any traction from it. – snerd Mar 25 '15 at 17:17
  • indeed based on @Sietse answer and your pastebin ref, we could say that the lines #173 and onwards are converging with this static (and weird) methods usage – Andy Petrella Mar 25 '15 at 19:16
  • I could give you these pointers at least. So could try to move forward based on what you'll see there. In geotrellis they use accumulo intensively with Spark. Here is where: https://github.com/geotrellis/geotrellis/blob/master/spark/src/main/scala/geotrellis/spark/io/accumulo/AccumuloInstance.scala. But the full package in itself is worth reading to see how to interact or define new stuffs. But I'm too noob in accumulo so far to help really further. Sorry I'll learn based on your discoveries :-D – Andy Petrella Mar 26 '15 at 12:57
  • I know it is a bit late, but thank you for this guideline. With new Spark (2.x), you have to create a SparkSession and then set its context the parameters from above. Regarding serialization - i created SparkConf object while creating SparkSession that has KryoSerializer, something like this: conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); and it worked flawlessly. – Sekula1991 Jul 18 '17 at 14:09

2 Answers2

2

Generally with custom Hadoop InputFormats, the information is specified using a JobConf. As @Sietse pointed out there are some static methods on the AccumuloInputFormat that you can use to configure the JobConf. In this case I think what you would want to do is:

val jobConf = new JobConf() // Create a job conf
// Configure the job conf with our accumulo properties
AccumuloInputFormat.setConnectorInfo(jobConf, principal, token)
AccumuloInputFormat.setScanAuthorizations(jobConf, authorizations)
val clientConfig =  new ClientConfiguration().withInstance(instanceName).withZkHosts(zooKeepers)
AccumuloInputFormat.setZooKeeperInstance(jobConf, clientConfig)
AccumuloInputFormat.setInputTableName(jobConf, tableName)
// Create an RDD using the jobConf
val rdd2 = sc.newAPIHadoopRDD(jobConf, 
classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat], 
classOf[org.apache.accumulo.core.data.Key], 
classOf[org.apache.accumulo.core.data.Value]
)

Note: After digging into the code, it seems the the is configured property is set based in part on the class which is called (makes sense to avoid conflicts with other packages potentially), so when we go and get it back in the concrete class later it fails to find the is configured flag. The solution to this is to not use the Abstract classes. see https://github.com/apache/accumulo/blob/bf102d0711103e903afa0589500f5796ad51c366/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java#L127 for the implementation details). If you can't call this method on the concrete implementation with spark-notebook probably using spark-shell or a regularly built application is the easiest solution.

Holden
  • 7,392
  • 1
  • 27
  • 33
  • 1
    Also, as per your update which seems to be pretty close to this, you still need to specify the table name (e.g. I think your code is missing the setInputTableName call). – Holden Mar 25 '15 at 21:41
  • do i need to do anything to setup the clientConfig obj prior to passing it to setZookeeperInstance? – snerd Mar 25 '15 at 21:44
  • also - fyi - you can't call setScanAuthorizations on AccumuloInputFormat from spark-notebook as it'll give you a 'not a member of' error – snerd Mar 25 '15 at 21:56
  • @DavidDaedalus The packages that you mention in your update org.apache.accumulo.core.client.mapreduce seem to be the package names for version(s) <= 1.4 and org.apache.accumulo.core.client.mapred for versions >= 1.5. Accumulo 1.4 does not have the setScanAuthorizations method, while 1.5 onwards does. Try importing from the correct package and see if it works. – Sietse Mar 25 '15 at 22:26
  • i already am importing org.apache.accumulo.core.client.mapred._ . moreover, both are listed in the 1.6 api docs: https://accumulo.apache.org/1.6/apidocs/org/apache/accumulo/core/client/mapreduce/package-summary.html https://accumulo.apache.org/1.6/apidocs/org/apache/accumulo/core/client/mapred/package-summary.html – snerd Mar 25 '15 at 22:30
  • i think the issue might be that setScanAuthorizations is actually a inherited by AccumuloInputFormat from AbstractInputFormat – snerd Mar 25 '15 at 22:31
  • @DavidDaedalus I've updated the example to include constructing the clientConfig . As for setScanAuthorizations , looking at the javadoc, it seems that it should be defined on AccumuloInputFormat (its inhereted from AbstractInputFormat). – Holden Mar 25 '15 at 22:33
  • @Holden w00t - i'll give this a try. If you mean 'defined on' as --> AccumuloInputFormat.setScanAuthorizations, then yes, it 'should' be. However, recall that I'm doing this through spark-notebook and when I try to use the above method call i get a 'i don't know what you're talking about with this setScanAuthorizations business' error. Beanshell is the same way (Java REPL / scripting package) - it has no awareness of inhereted methods. the only way to get at them is to explicitly call them from the class in which they are defined. – snerd Mar 25 '15 at 22:46
  • blerg - no joy with the updated code. t/y, btw, for sticking with me on this one! – snerd Mar 25 '15 at 22:54
  • Whats the latest exception you run into? – Holden Mar 25 '15 at 22:55
  • same thing: java.io.IOException: Input info has not been set, when I attempt operation: rddX.first – snerd Mar 25 '15 at 22:56
  • Digging into this code, it looks like your going to be stuck since it sets a configuration property based on the implement class (see https://github.com/apache/accumulo/blob/master/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java ) so you really need to use the concrete class rather than the calling the methods on the abstract classes. Please try with the spark-shell and let me know if this works. – Holden Mar 25 '15 at 23:03
  • blerg - running into problems from the shell with the auth token. this is starting to look like a many-beer problem. lemmie go take care of that and I'll come back with the results of running this either directly from spark-shell or from within a stand-alone scala program submitted via spark-submit. – snerd Mar 26 '15 at 00:42
  • update 3 posted -- turns out your method was correct so long as I directly reference the method in question AND don't forget to close my parens :-p unfortunately it blows up now for a different reason – snerd Mar 26 '15 at 18:31
-1

It looks like those parameters have to be set through static methods : http://accumulo.apache.org/1.6/apidocs/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.html. So try setting the non-optional parameters and run again. It should work.

Sietse
  • 201
  • 1
  • 4