7

I have a big table in hbase that name is UserAction, and it has three column families(song,album,singer). I need to fetch all of data from 'song' column family as a JavaRDD object. I try this code, but it's not efficient. Is there a better solution to do this ?

    static SparkConf sparkConf = new SparkConf().setAppName("test").setMaster(
        "local[4]");
static JavaSparkContext jsc = new JavaSparkContext(sparkConf);

static void getRatings() {

    Configuration conf = HBaseConfiguration.create();
    conf.set(TableInputFormat.INPUT_TABLE, "UserAction");
    conf.set(TableInputFormat.SCAN_COLUMN_FAMILY, "song");

    JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = jsc
            .newAPIHadoopRDD(
                    conf,
                    TableInputFormat.class,
                    org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
                    org.apache.hadoop.hbase.client.Result.class);

    JavaRDD<Rating> count = hBaseRDD
            .map(new Function<Tuple2<ImmutableBytesWritable, Result>, JavaRDD<Rating>>() {

                @Override
                public JavaRDD<Rating> call(
                        Tuple2<ImmutableBytesWritable, Result> t)
                        throws Exception {
                    Result r = t._2;
                    int user = Integer.parseInt(Bytes.toString(r.getRow()));
                    ArrayList<Rating> ra = new ArrayList<>();

                    for (Cell c : r.rawCells()) {

                        int product = Integer.parseInt(Bytes
                                .toString(CellUtil.cloneQualifier(c)));
                        double rating = Double.parseDouble(Bytes
                                .toString(CellUtil.cloneValue(c)));

                        ra.add(new Rating(user, product, rating));
                    }

                    return jsc.parallelize(ra);
                }
            })
            .reduce(new Function2<JavaRDD<Rating>, JavaRDD<Rating>, JavaRDD<Rating>>() {
                @Override
                public JavaRDD<Rating> call(JavaRDD<Rating> r1,
                        JavaRDD<Rating> r2) throws Exception {
                    return r1.union(r2);
                }
            });
    jsc.stop();
}

Song column family scheme design is :

RowKey = userID, columnQualifier = songID and value = rating.
Fatih Yakut
  • 301
  • 1
  • 4
  • 16

1 Answers1

1

UPDATE: OK I see your problem now, for some crazy reason your turning your arrays into RDDs return jsc.parallelize(ra);. Why are you doing that?? Why are you creating an RDD of RDDs?? Why not leave them as arrays? When you do the reduce you can then concatenate the arrays. An RDD is a Resistant Distributed Dataset - it does not make logical sense to have a Distributed Dataset of Distributed Datasets. I'm surprised your job even runs and doesn't crash! Anyway that's why your job is so slow.

Anyway, in Scala after your map, you would just do a flatMap(identity) and that would concatenate all your lists together.

I don't really understand why you need to do a reduce, maybe that is where you have something inefficient going on. Here is my code to read HBase tables (its generalized - i.e. works for any scheme). One thing to be sure of is to make sure that when you read the HBase table you ensure the number of partitions is suitable (usually you want a lot).

type HBaseRow = java.util.NavigableMap[Array[Byte],
  java.util.NavigableMap[Array[Byte], java.util.NavigableMap[java.lang.Long, Array[Byte]]]]
// Map(CF -> Map(column qualifier -> Map(timestamp -> value)))
type CFTimeseriesRow = Map[Array[Byte], Map[Array[Byte], Map[Long, Array[Byte]]]]

def navMapToMap(navMap: HBaseRow): CFTimeseriesRow =
  navMap.asScala.toMap.map(cf =>
    (cf._1, cf._2.asScala.toMap.map(col =>
      (col._1, col._2.asScala.toMap.map(elem => (elem._1.toLong, elem._2))))))

def readTableAll(table: String): RDD[(Array[Byte], CFTimeseriesRow)] = {
  val conf = HBaseConfiguration.create()
  conf.set(TableInputFormat.INPUT_TABLE, table)
  sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
    classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
    classOf[org.apache.hadoop.hbase.client.Result])
  .map(kv => (kv._1.get(), navMapToMap(kv._2.getMap)))
}

As you can see, I have no need for a reduce in my code. The methods are pretty self explainatory. I could dig further into your code, but I lack the patience to read Java as it's so epically verbose.

I have some more code specifically for fetching the most recent elements from the row (rather than the entire history). Let me know if you want to see that.

Finally, recommend you look into using Cassandra over HBase as datastax is partnering with databricks.

samthebest
  • 30,803
  • 25
  • 102
  • 142
  • At the end of the reading entire table, you have a map for each row. Actually, i have already done this but i want to have an rdd which has all of data of every row as one map instead of a distinct map for every row. @samthebest – Fatih Yakut Jul 03 '14 at 11:00
  • Sorry I don't understand what you mean @FatihYakut, by running reduce, you'll end up with one `Rating` not an RDD with 1 `Rating`. Also what even is `Rating`? I don't know what the `union` method does. Provide more code, be more specific, clarify the problem and maybe someone can help. – samthebest Jul 03 '14 at 11:09
  • CFTimeseriesRow type hold all of data for one row. There are many rows so, there will be many CFTimeseriesRow. I want to only one CFTimeseriesRow which contains all of data. Union operation combines two rdd and produces one rdd from them and i use this operation in reduce to have only one rdd.In other words, to have only one CFTimeseriesRow, i combine all of CFTimeseriesRows in reduce. By the way, rating has userId, productId and rating value for product. It is used in mllib library of spark. @samthebest – Fatih Yakut Jul 03 '14 at 11:21
  • 1
    So your performing a reduce *on RDDs*!!! That's insane, you really should not be creating an RDD of RDDs. That will be why your job doesn't perform! I'll update my answer to explain further. @FatihYakut – samthebest Jul 03 '14 at 16:07
  • 2
    Spark doesn't support nesting of RDDs; if you write a program with nested RDDs, you may encounter `NullPointerException`s or other problems when attempting to perform actions on the nested RDDs. – Josh Rosen Jul 03 '14 at 17:21