47

The below code will read from the hbase, then convert it to json structure and the convert to schemaRDD , But the problem is that I am using List to store the json string then pass to javaRDD, for data of about 100 GB the master will be loaded with data in memory. What is the right way to load the data from hbase then perform manipulation,then convert to JavaRDD.

package hbase_reader;


import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;

import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;

import com.google.common.collect.Lists;

public class hbase_reader {

    public static void main(String[] args) throws IOException, ParseException {

        List<String> jars = Lists.newArrayList("");

        SparkConf spconf = new SparkConf();
        spconf.setMaster("local[2]");
        spconf.setAppName("HBase");
        //spconf.setSparkHome("/opt/human/opt/spark-0.9.0-hdp1");
        spconf.setJars(jars.toArray(new String[jars.size()]));
        JavaSparkContext sc = new JavaSparkContext(spconf);
        //spconf.set("spark.executor.memory", "1g");

        JavaSQLContext jsql = new JavaSQLContext(sc);


        HBaseConfiguration conf = new HBaseConfiguration();
        String tableName = "HBase.CounData1_Raw_Min1";
        HTable table = new HTable(conf,tableName);
        try {

            ResultScanner scanner = table.getScanner(new Scan());
            List<String> jsonList = new ArrayList<String>();

            String json = null;

            for(Result rowResult:scanner) {
                json = "";
                String rowKey  = Bytes.toString(rowResult.getRow());
                for(byte[] s1:rowResult.getMap().keySet()) {
                    String s1_str = Bytes.toString(s1);

                    String jsonSame = "";
                    for(byte[] s2:rowResult.getMap().get(s1).keySet()) {
                        String s2_str = Bytes.toString(s2);
                        for(long s3:rowResult.getMap().get(s1).get(s2).keySet()) {
                            String s3_str = new String(rowResult.getMap().get(s1).get(s2).get(s3));
                            jsonSame += "\""+s2_str+"\":"+s3_str+",";
                        }
                    }
                    jsonSame = jsonSame.substring(0,jsonSame.length()-1);
                    json += "\""+s1_str+"\""+":{"+jsonSame+"}"+",";
                }
                json = json.substring(0,json.length()-1);
                json = "{\"RowKey\":\""+rowKey+"\","+json+"}";
                jsonList.add(json);
            }

            JavaRDD<String> jsonRDD = sc.parallelize(jsonList);

            JavaSchemaRDD schemaRDD = jsql.jsonRDD(jsonRDD);




            System.out.println(schemaRDD.take(2));

        } finally {
            table.close();
        }

    }

}
smola
  • 863
  • 8
  • 15
madan ram
  • 1,260
  • 2
  • 19
  • 26

4 Answers4

52

A Basic Example to Read the HBase data using Spark (Scala), You can also wrtie this in Java :

import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}
import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor }
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.spark._

object HBaseRead {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    val conf = HBaseConfiguration.create()
    val tableName = "table1"

    System.setProperty("user.name", "hdfs")
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    conf.set("hbase.master", "localhost:60000")
    conf.setInt("timeout", 120000)
    conf.set("hbase.zookeeper.quorum", "localhost")
    conf.set("zookeeper.znode.parent", "/hbase-unsecure")
    conf.set(TableInputFormat.INPUT_TABLE, tableName)

    val admin = new HBaseAdmin(conf)
    if (!admin.isTableAvailable(tableName)) {
      val tableDesc = new HTableDescriptor(tableName)
      admin.createTable(tableDesc)
    }

    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    println("Number of Records found : " + hBaseRDD.count())
    sc.stop()
  }
}

UPDATED -2016

As of Spark 1.0.x+, Now you can use Spark-HBase Connector also :

Maven Dependency to Include :

<dependency>
  <groupId>it.nerdammer.bigdata</groupId>
  <artifactId>spark-hbase-connector_2.10</artifactId>
  <version>1.0.3</version> // Version can be changed as per your Spark version, I am using Spark 1.6.x
</dependency>

And find a below sample code for the same :

import org.apache.spark._
import it.nerdammer.spark.hbase._

object HBaseRead extends App {
    val sparkConf = new SparkConf().setAppName("Spark-HBase").setMaster("local[4]")
    sparkConf.set("spark.hbase.host", "<YourHostnameOnly>") //e.g. 192.168.1.1 or localhost or your hostanme
    val sc = new SparkContext(sparkConf)

    // For Example If you have an HBase Table as 'Document' with ColumnFamily 'SMPL' and qualifier as 'DocID, Title' then:

    val docRdd = sc.hbaseTable[(Option[String], Option[String])]("Document")
    .select("DocID", "Title").inColumnFamily("SMPL")

    println("Number of Records found : " + docRdd .count())
}

UPDATED - 2017

As of Spark 1.6.x+, Now you can use SHC Connector also (Hortonworks or HDP users) :

Maven Dependency to Include :

    <dependency>
        <groupId>com.hortonworks</groupId>
        <artifactId>shc</artifactId>
        <version>1.0.0-2.0-s_2.11</version> // Version depends on the Spark version and is supported upto Spark 2.x
    </dependency>

The Main advantage of using this connector is that it have flexibility in the Schema definition and doesn't need Hardcoded params just like in nerdammer/spark-hbase-connector. Also remember that it supports Spark 2.x so this connector is pretty much flexible and provides end-to-end support in Issues and PRs.

Find the below repository path for the latest readme and samples :

Hortonworks Spark HBase Connector

You can also convert this RDD's to DataFrames and run SQL over it or You can map these Dataset or DataFrames to user defined Java Pojo's or Case classes. It works brilliant.

Please comment below if you need anything else.

Murtaza Kanchwala
  • 2,425
  • 25
  • 33
  • This is good info, but how to access the libraries in the spark-shell REPL ... With Spark 1.5.2 and HBase 1.1.2.2.3.4.0-3485, what packages are supplied to the --packages flag? Ex: I use `spark-shell --packages $SPARK_PKGS with export SPARK_PKGS=$(cat << END | xargs echo | sed 's/ /,/g' org.apache.hadoop:hadoop-aws:2.7.1 com.amazonaws:aws-java-sdk-s3:1.10.30 com.databricks:spark-csv_2.10:1.3.0 com.databricks:spark-avro_2.10:2.0.1 ??? END )` – codeaperature Jan 06 '16 at 01:48
  • 1
    I think with spark-shell you can also mention --jars with that you must include your Hbase libraries. – Murtaza Kanchwala Jan 07 '16 at 09:58
  • 1
    How can we do this in Python using PySpark? – Def_Os Jul 19 '16 at 04:36
  • I tried using the connector, and got stuck as i needed filters. Now contemplating whether i should extend the connector and build them, or use something else. Suggestions ? – Raghav Mar 06 '17 at 21:41
  • @Raghav I would recommend to look on the following as well : https://github.com/hortonworks-spark/shc – Murtaza Kanchwala Mar 07 '17 at 06:01
  • @Raghav I've updated the answer to make it visible for others also. Kindly let me know if you need more things on the same. – Murtaza Kanchwala Mar 08 '17 at 07:04
  • What about [hbase-spark](https://github.com/apache/hbase/tree/master/hbase-spark) ? – Mahesha999 Apr 27 '18 at 12:17
  • @Mahesha999 : I've never tried it, but you may give it a shot. Its native support so my guess is it'll be definitely good. – Murtaza Kanchwala May 04 '18 at 06:40
  • it worked...the repo has commits more recent than nerdammer's connector...also the offcial hbase book talks about it, so I suppose this is official solution. Point is, I want to know if other's (mapr, cloudera and hortonworks) connectors are advanced than this official one and if yes, whether I can use them with Apache Hbase? – Mahesha999 May 04 '18 at 07:08
  • @Mahesha999 You have all the options listed before you. It totally depends on your deployment environment. And looking into the hbase-spark I guess Cloudera is supporting it from a long-time so it can be a good option. – Murtaza Kanchwala May 04 '18 at 08:53
  • @MurtazaKanchwala: I am using the same SHC provided by hortonworks. But my HBase read performance is very random. I mean to say, to read 100,000 records it sometimes take 16 minutes and 30 minutes. Could you pls help me on that regard. I can send more details on this, if required. – Dasarathy D R Oct 15 '18 at 01:51
  • @DasarathyDR The internal components in connector is getting updated. I would suggest to write custom code in case you want to take control. – Murtaza Kanchwala Oct 29 '18 at 05:48
11

I prefer to read from hbase and do the json manipulation all in spark.
Spark provides JavaSparkContext.newAPIHadoopRDD function to read data from hadoop storage, including HBase. You will have to provide the HBase configuration, table name, and scan in the configuration parameter and table input format and it's key-value

You can use table input format class and it's job parameter to provide the table name and scan configuration

example:

conf.set(TableInputFormat.INPUT_TABLE, "tablename");
JavaPairRDD<ImmutableBytesWritable, Result> data = 
jsc.newAPIHadoopRDD(conf, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);

then you can do the json manipulation in spark. Since spark can do recalculation when the memory is full, it will only load the data needed for the recalculation part (cmiiw) so you don't have to worry about the data size

Averman
  • 441
  • 2
  • 8
9

Since the question is not new, there are a few other alternatives for now:

I do not know much about the first project, but it looks like it does not support Spark 2.x. However, it has a rich support at the RDD level for Spark 1.6.x.

Spark-on-HBase, on the other hand, has branches for Spark 2.0 and upcoming Spark 2.1. This project is very promising since it is focused on Dataset/DataFrame APIs. Under the hood, it implements the standard Spark Datasource API and leverages the Spark Catalyst engine for query optimization. The developers claim here that it is capable of partition pruning, column pruning, predicate pushdown and achieving data locality.

A simple example, which uses the com.hortonworks:shc:1.0.0-2.0-s_2.11 artifact from this repo and Spark 2.0.2, is presented next:

case class Record(col0: Int, col1: Int, col2: Boolean)

val spark = SparkSession
  .builder()
  .appName("Spark HBase Example")
  .master("local[4]")
  .getOrCreate()

def catalog =
  s"""{
      |"table":{"namespace":"default", "name":"table1"},
      |"rowkey":"key",
      |"columns":{
      |"col0":{"cf":"rowkey", "col":"key", "type":"int"},
      |"col1":{"cf":"cf1", "col":"col1", "type":"int"},
      |"col2":{"cf":"cf2", "col":"col2", "type":"boolean"}
      |}
      |}""".stripMargin

val artificialData = (0 to 100).map(number => Record(number, number, number % 2 == 0))

// write
spark
  .createDataFrame(artificialData)
  .write
  .option(HBaseTableCatalog.tableCatalog, catalog)
  .option(HBaseTableCatalog.newTable, "5")
  .format("org.apache.spark.sql.execution.datasources.hbase")
  .save()

// read
val df = spark
  .read
  .option(HBaseTableCatalog.tableCatalog, catalog)
  .format("org.apache.spark.sql.execution.datasources.hbase")
  .load()

df.count()
Anton Okolnychyi
  • 936
  • 7
  • 10
  • I followed the white rabbit a bit regarding Spark-on-HBase, and it appears that the original developer has long left Hortonworks, and with it, apparently killed integration of these features into HBase-trunk. – Rick Moritz Jan 18 '17 at 09:27
  • In particular: https://issues.apache.org/jira/browse/HBASE-14789 has one open subtask, https://issues.apache.org/jira/browse/HBASE-15335 , which is essentially finished, but hasn't been updated since middle of 2016. Currently, the provided patches no longer apply, so even though the remaining work may not be massive, there's some maintenance to be done. See also https://reviews.apache.org/r/47547/ – Rick Moritz Jan 18 '17 at 09:34
8

just to add a comment on how to add scan:

TableInputFormat has the following attributes:

  1. SCAN_ROW_START
  2. SCAN_ROW_STOP
conf.set(TableInputFormat.SCAN_ROW_START, "startrowkey");
conf.set(TableInputFormat.SCAN_ROW_STOP, "stoprowkey");
Prasad Khode
  • 6,602
  • 11
  • 44
  • 59
Zhang Kan
  • 103
  • 1
  • 4
  • Other scan parameters can be found in the `Fields` section of https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html. – Ben Watson Mar 19 '18 at 16:42