4

I am getting a weird exception when I try to access Cassandra from hadoop, by using ColumnFamilyInputFormat class. In my hadoop process, this is how I connect to cassandra, after including cassandra-all.jar version 1.1:

private void setCassandraConfig(Job job) {
    job.setInputFormatClass(ColumnFamilyInputFormat.class);
    ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
    ConfigHelper
        .setInputInitialAddress(job.getConfiguration(), "204.236.1.29");
    ConfigHelper.setInputPartitioner(job.getConfiguration(),
            "RandomPartitioner");
    ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE,
            COLUMN_FAMILY);
    SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays
            .asList(ByteBufferUtil.bytes(COLUMN_NAME)));
    ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);
    // this will cause the predicate to be ignored in favor of scanning
    // everything as a wide row
    ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE,
            COLUMN_FAMILY, true);
    ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
            "204.236.1.29");
    ConfigHelper.setOutputPartitioner(job.getConfiguration(),
            "RandomPartitioner");
}

public int run(String[] args) throws Exception {
    // use a smaller page size that doesn't divide the row count evenly to
    // exercise the paging logic better
    ConfigHelper.setRangeBatchSize(getConf(), 99);

    Job processorJob = new Job(getConf(), "dmp_normalizer");
    processorJob.setJarByClass(DmpProcessorRunner.class);
    processorJob.setMapperClass(NormalizerMapper.class);
    processorJob.setReducerClass(SelectorReducer.class);
    processorJob.setOutputKeyClass(Text.class);
    processorJob.setOutputValueClass(Text.class);
    FileOutputFormat
            .setOutputPath(processorJob, new Path(TEMP_PATH_PREFIX));
    processorJob.setOutputFormatClass(TextOutputFormat.class);
    setCassandraConfig(processorJob);
    ...
}

But when I run hadoop (I am running it at amazon EMR) I get the exception bellow. Not that the ip is 127.0.0.1 instead of the ip I want...

Any hint? What could I be doing wrong?

2012-11-22 21:37:34,235 ERROR org.apache.hadoop.security.UserGroupInformation (Thread-6): PriviledgedActionException as:hadoop cause:java.io.IOException: Could not get input splits 
2012-11-22 21:37:34,235 INFO org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob (Thread-6): dmp_normalizer got an error while submitting java.io.IOException: Could not get input splits at 
    org.apache.cassandra.hadoop.ColumnFamilyInputFormat.getSplits(ColumnFamilyInputFormat.java:178) at 
    org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:1017) at 
    org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:1034) at 
    org.apache.hadoop.mapred.JobClient.access$700(JobClient.java:174) at 
    org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:952) at 
    org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:905) at 
    java.security.AccessController.doPrivileged(Native Method) at 
    javax.security.auth.Subject.doAs(Subject.java:396) at 
    org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1132) at 
    org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:905) at 
    org.apache.hadoop.mapreduce.Job.submit(Job.java:500) at 
    org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.submit(ControlledJob.java:336) at 
    org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.run(JobControl.java:233) at 
    java.lang.Thread.run(Thread.java:662) Caused by: java.util.concurrent.ExecutionException: java.io.IOException: failed connecting to all endpoints 127.0.0.1 at 
    java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222) at 
    java.util.concurrent.FutureTask.get(FutureTask.java:83) at 
    org.apache.cassandra.hadoop.ColumnFamilyInputFormat.getSplits(ColumnFamilyInputFormat.java:174) ... 13 more Caused by: java.io.IOException: failed connecting to all endpoints 127.0.0.1 at 
    org.apache.cassandra.hadoop.ColumnFamilyInputFormat.getSubSplits(ColumnFamilyInputFormat.java:272) at 
    org.apache.cassandra.hadoop.ColumnFamilyInputFormat.access$200(ColumnFamilyInputFormat.java:77) at 
    org.apache.cassandra.hadoop.ColumnFamilyInputFormat$SplitCallable.call(ColumnFamilyInputFormat.java:211) at 
    org.apache.cassandra.hadoop.ColumnFamilyInputFormat$SplitCallable.call(ColumnFamilyInputFormat.java:196) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at 
    java.util.concurrent.FutureTask.run(FutureTask.java:138) at 
    java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at 
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) ... 1 more 
2012-11-22 21:37:39,319 INFO com.s1mbi0se.dmp.processor.main.DmpProcessorRunner (main): Process ended
f3lix
  • 29,500
  • 10
  • 66
  • 86
mvallebr
  • 2,388
  • 21
  • 36

2 Answers2

1

I was able to solve the problem by changing the cassandra configuration. listen_address needed to be a valid external ip for this to work.

The exception didn't seem to have something to do with it, it took me long to find the answer. In the end, if you specify 0.0.0.0 in cassandra config and try to access it from an external ip, you took this error saying no host was found at 127.0.0.1 .

mvallebr
  • 2,388
  • 21
  • 36
-1

In my case it was wrong keyspace name issue, look carefully what you pass to ConfigHelper.setInputColumnFamily method.

DamianoPantani
  • 1,168
  • 2
  • 13
  • 23