1

I am new to HBase and have a issue which i am having trouble finding an answer in Google.

I am trying to bulk insert data from Hive to HBase using the salted Table approach as described in: https://www.opencore.com/blog/2016/10/efficient-bulk-load-of-hbase-using-spark/

The only twist is that I need to insert data with multiple columns. The table in Hive have the following columns: Code, Description, Total_emp, salary

I am trying to insert the exact same columns into HBase. The HBase table is as follows:

'test2', {TABLE_ATTRIBUTES => {METADATA => {'SPLIT_POLICY' => 'org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy'}}, {NAME => 'epsg_3857', VERSIONS => '1', EVICT_BLOCKS_ON_CLOSE => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'FAST_DIFF', TTL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION => 'SNAPPY', BLOCKCACHE => 'true', BLOCKSIZE => '1000000', METADATA => {'NEW_VERSION_BEHAVIOR' => 'false'}}

However I keep getting this error when inserting the salary column into the HFile:

java.io.IOException: Added a key not lexically larger than previous. Current cell = 0:0:11-1011/epsg_3857:Salary/1557231349613/Put/vlen=6/seqid=0, lastCell = 0:0:11-1011/epsg_3857:Total/1557231349613/Put/vlen=6/seqid=0

I am able to create the HFile if i remove the salary column or move the salary column into a new column family. However, this should not be as i have read a single column family is able to accommodate many columns.

I have tried increasing to block size from default to 1MB and still the same problem.

Below is my test code:

import Salter.Salts._
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2

object SaltedKeyExample2 extends App {
  System.setProperty("HADOOP_USER_NAME", "cloudera")

  val hive_session = SparkSession
    .builder()
    .appName("Salted Key Example 2")
    .master("local[*]")
    .config("spark.submit.deployMode", "client")
    .config("spark.yarn.jars", "hdfs://192.168.30.12:8020/user/cloudera/jars/*.jar")
    //.config("hive.metastore.uris", "thrift://192.168.30.12:9083")
    .enableHiveSupport()
    .getOrCreate()

  import hive_session.sql

  val df_07 = sql("SELECT * from sample_07")
  val df_08 = sql("SELECT * from sample_08")
  df_07.filter(df_07("salary") > 150000).show()
  val df_09 = df_07.join(df_08, df_07("code") === df_08("code")).select(df_07.col("code"), df_07.col("description"))
  //val sourceRDD = df_09.rdd
  val sourceRDD = df_07.filter(df_07("salary") > 150000).rdd
  df_09.show()

  val spp = new SaltPrefixPartitioner(modulus = 2)

  val saltedRDD = sourceRDD.flatMap(r => {Seq((salt(r.getString(0), 2), (r.getString(0), r.get(1), r.get(2), r.get(3))))})

  saltedRDD.foreach(x => println(x))

  val partitionedRDD = saltedRDD.repartitionAndSortWithinPartitions(spp)

  partitionedRDD.foreach(x => println(x))

  val cells = saltedRDD.sortByKey(true).flatMap(r => {
    val salted_keys = salt(r._1, 2)
    val codes = r._2._1.toString()
    val descriptions = r._2._2.toString()
    val total = r._2._3.toString()
    val salary = r._2._4.toString()
    val colFamily = "epsg_3857"
    val colFamily2 = "epsg_3857_2"
    val colNameCodes = "Code"
    val colNameDesc = "Description"
    val colNameTotal = "Total"
    val colNameSalary = "Salary"

    Seq((new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), colNameCodes.getBytes(), codes.getBytes())),
      (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), colNameDesc.getBytes(), descriptions.getBytes())),
      (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), colNameTotal.getBytes(), total.getBytes())),
      (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), colNameSalary.getBytes(), salary.getBytes())))
  })

  cells.foreach(x => println(x))

  // setup the HBase configuration
  val baseConf = HBaseConfiguration.create(hive_session.sparkContext.hadoopConfiguration)

  // NOTE: job creates a copy of the conf
  val job = Job.getInstance(baseConf, "test2")
  val connection = ConnectionFactory.createConnection(baseConf)
  val table = connection.getTable(TableName.valueOf("test2"))
  val regionLoc = connection.getRegionLocator(table.getName)
  cells.foreach(x => println(x))
  // Major gotcha(!) - see comments that follow
  HFileOutputFormat2.configureIncrementalLoad(job, table, regionLoc)

  val conf = job.getConfiguration // important(!)

  cells.foreach(x => println(x))
  // write HFiles onto HDFS
  cells.saveAsNewAPIHadoopFile(
    "/tmp/test/hfiles",
    classOf[ImmutableBytesWritable],
    classOf[KeyValue],
    classOf[HFileOutputFormat2],
    conf)

  println("hello")
}

I expect to insert more than 3 columns in a column family in HBase but reality says i can't at the moment. Appreciate any help to solve this issue. Thanks.

Kok-Lim Wong
  • 103
  • 1
  • 10
  • 1
    https://stackoverflow.com/questions/46325233/spark-issues-in-creating-hfiles-added-a-key-not-lexically-larger-than-previous does that answer your question? – Ben Watson May 07 '19 at 12:50
  • Hi @Ben Watson. Give me some time to see how I can do this. I am generally new to spark, scala, etc. Pretty much any big data tech in general. I'll get back to you. – Kok-Lim Wong May 07 '19 at 13:04
  • Fair enough. I'd say that writing directly to HFiles is quite an advanced thing to do, but hopefully the link helps. – Ben Watson May 07 '19 at 13:10
  • 1
    Holy Cow! it worked! So you have to sort everything in HFile. Quick question....how do you mark a comment that worked? you know the green check mark to show that comment worked? – Kok-Lim Wong May 07 '19 at 13:52
  • 1
    I didn't make an answer, so there's no tick mark. I'll mark it as a duplicate of the answer I linked to; that way other people will see it. – Ben Watson May 07 '19 at 14:17

0 Answers0