i'm trying to create hbase table and insert using spark core (spark streaming after). I succeeded to create the table and add data into it, even when i got this problem:
warning: Class org.apache.hadoop.hbase.classification.InterfaceAudience not found - continuing with a stub.
but when i try to count i got an error; may someone help me with the first warning and how i cant add streaming data into this table
my code is bellow:
import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
val tableName = "ziedspark"
val conf = HBaseConfiguration.create()
conf.addResource(new Path("file:///opt/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/etc/hbase/conf.dist/hbase-site.xml"))
conf.set(TableInputFormat.INPUT_TABLE, tableName)
val admin = new HBaseAdmin(conf)
if(!admin.isTableAvailable(tableName)) {
print("Creating GHbase Table Creating GHbase Table Creating GHbase Table Creating GHbase Table ")
val tableDesc = new HTableDescriptor(tableName)
tableDesc.addFamily(new HColumnDescriptor("z1".getBytes()))
tableDesc.addFamily(new HColumnDescriptor("z2".getBytes()))
admin.createTable(tableDesc)
}else{
print("Table already exists!!")
}
val myTable = new HTable(conf, tableName)
for (i <- 414540 to 414545) {
var p = new Put(Bytes.toBytes(""+i))
p.add("z1".getBytes(), "name".getBytes(), Bytes.toBytes(""+(i*5)))
p.add("z1".getBytes(), "age".getBytes(), Bytes.toBytes("2016-07-01"))
p.add("z2".getBytes(), "job".getBytes(), Bytes.toBytes(""+i))
p.add("z2".getBytes(), "salary".getBytes(), Bytes.toBytes(""+i))
myTable.put(p)
}
myTable.flushCommits()
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
//error here after creating the table count is not working
val count = hBaseRDD.count()
print("HBase RDD count:" + count)
System.exit(0)