We have data of publications and we want to tag them into various categories. We have stored them into HBase
and from there we are saving them into sparkrdd
and then tag them using scala
code. A sample of the HBase data looks like this:
PubEntity:Abstract timestamp=1476537886382, value=not found
PubEntity:Affiliations timestamp=1476537886382, value=[]
PubEntity:Article_Title timestamp=1476537886382, value=Formate assay in body fluids: application in methanol poisoning.
PubEntity:Author timestamp=1476537886382, value=[{'LastName': 'Makar', 'ForeName': 'A B', 'author_name': 'A B Makar', 'Init
ials': 'AB', 'author_affiliation': 'not found'}, {'LastName': 'McMartin', 'ForeName': 'K E', 'author_name'
: 'K E McMartin', 'Initials': 'KE', 'author_affiliation': 'not found'}, {'LastName': 'Palese', 'ForeName':
'M', 'author_name': 'M Palese', 'Initials': 'M', 'author_affiliation': 'not found'}, {'LastName': 'Tephly
', 'ForeName': 'T R', 'author_name': 'T R Tephly', 'Initials': 'TR', 'author_affiliation': 'not found'}]
PubEntity:Journal_Title timestamp=1476537886382, value=Biochemical medicine
PubEntity:PMID timestamp=1476537886382, value=1
PubRemaining:Countries timestamp=1476537886382, value=[]
PubRemaining:Created_At timestamp=1476537886382, value=170812800.0
PubRemaining:DOI timestamp=1476537886382, value=not found
PubRemaining:Date_Created timestamp=1476537886382, value=19760116
PubRemaining:ISO_Abbreviation timestamp=1476537886382, value=Biochem Med
PubRemaining:ISSN timestamp=1476537886382, value=0006-2944
PubRemaining:Pub_Date timestamp=1476537886382, value=01 Jun, 1975
PubRemaining:Year timestamp=1476537886382, value=1975
We have been able to make a RDD
in spark as explained in first answer in
How to read from hbase using spark Here is the code:
import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}
import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor }
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark._
object HBaseRead {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local[2]").set("spark.driver.allowMultipleContexts", "true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
val tableName = "name_of_the_database"
System.setProperty("user.name", "hdfs")
System.setProperty("HADOOP_USER_NAME", "hdfs")
conf.set("hbase.master", "localhost:60000")
conf.setInt("timeout", 120000)
conf.set("hbase.zookeeper.quorum", "localhost")
conf.set("zookeeper.znode.parent", "/hbase-unsecure")
conf.set(TableInputFormat.INPUT_TABLE, tableName)
val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(tableName)) {
val tableDesc = new HTableDescriptor(tableName)
admin.createTable(tableDesc)
}
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
println(" Number of Records found : " + hBaseRDD.count())
println(hBaseRDD.first())
sc.stop()
}
}
After running this code chunk in Scala I got: defined module HBaseRead
Then I did HBaseRead.main(Array()) which outputs the number of records found and reads the first record as :
(31,keyvalues={1/PubEntity:Abstract/1476537886382/Put/vlen=9/mvcc=0, 1/Entity:Affiliations/1476537886382/Put/vlen=2/mvcc=0, 1/Entity:Article_Title/1476537886382/Put/vlen=64/mvcc=0, 1/Entity:Author/1476537886382/Put/vlen=497/mvcc=0, 1/Entity:Journal_Title/1476537886382/Put/vlen=20/mvcc=0, 1/Entity:PMID/1476537886382/Put/vlen=1/mvcc=0, 1/Remaining:Countries/1476537886382/Put/vlen=2/mvcc=0, 1/Remaining:Created_At/1476537886382/Put/vlen=11/mvcc=0, 1/Remaining:DOI/1476537886382/Put/vlen=9/mvcc=0, 1/Remaining:Date_Created/1476537886382/Put/vlen=8/mvcc=0, 1/Remaining:ISO_Abbreviation/1476537886382/Put/vlen=11/mvcc=0, 1/Remaining:ISSN/1476537886382/Put/vlen=9/mvcc=0, 1/Remaining:Pub_Date/1476537886382/Put/vlen=12/mvcc=0, 1/Remaining:Year/1476537886382/Put/vlen=4/mvcc=0})
Now in this output, you would see vlen=12/mvcc=0. On checking the data, I found vlen is the length of each word/number. I couldn't figure out what mvcc is meant for. We want the output to show the words/number instead of vlen=4. Further we want to read these entries and find certain words and phrases in them and tag them accordingly. All in scala.
Any link to any helpful online resource in this regard would be highly appreciated.