1

i use cdh hbase-spark to read data from hbase in mapwithstate but i can't get hbasecontext,because hbasecontext need sparkcontext and i can't get it in mapwith state.

code in main

val snapState = jsonO.map(x => (x.get("liveId").toString, x)).groupByKey().mapWithState(LiveState2.get.state).cache()

code in LiveState2

class LiveState2 extends StateMg with Serializable {
  val stateLive = List("END", "LIVING", "PAUSE", "WAIT", "NOT_FOUND")
  logger.info(s"meetstatePath===$statePath")
  val state: StateSpec[String, scala.Iterable[JSONObject], LiveInfo, (String, List[(String, Int, Int, Int, Int, Long)], Array[(Long, String, String, String, String, String, String, String, String, String, String, Int)])] = ToHdfs.getInstance.isExist(statePath + "/_SUCCESS") match {
    case true =>
      logger.info("hdfs does have meetStat::::")
      val kk = sc.objectFile[(String, LiveInfo)](statePath)
      if (!Try(kk.partitions.isEmpty).isFailure) StateSpec.function(updateFun _).timeout(Seconds(con.get("spark.dsTimeOut").toLong)).initialState(kk) else StateSpec.function(updateFun _).timeout(Seconds(con.get("spark.dsTimeOut").toLong))
    case false =>
      logger.info("hdfs does not have meetStat::::")
      StateSpec.function(updateFun _).timeout(Seconds(con.get("spark.dsTimeOut").toLong))
  }



def updateFun(LiveId: String, one: Option[scala.Iterable[JSONObject]], state: State[LiveInfo]): (String, List[(String, Int, Int, Int, Int, Long)], Array[(Long, String, String, String, String, String, String, String, String, String, String, Int)]) = {
    //    val stateInit = LiveInfo(Array((0, 0, "END"), (0, 0, "LIVING"), (0, 0, "PAUSE"), (0, 0, "WAIT"), (0, 0, "NOT_FOUND")), Array(), BloomFilter.create[String](Funnels.stringFunnel(Charset.forName("utf-8")), 10000, 0.0001))
    val stateInit = LiveInfo(Array((0, 0, stateLive(0)), (0, 0, stateLive(1)), (0, 0, stateLive(2)), (0, 0, stateLive(3)), (0, 0, stateLive(4))), Array(), BloomFilter.create[String](Funnels.stringFunnel(Charset.forName("utf-8")), 100000, 0.0001))
    val uniqueStrMem = ArrayBuffer[String]()
    val liveState = if (!state.exists()) {
      val dbStatisState = new LiveStateReco().getState(LiveId)
      val dbbloomState = new LiveStateBloomfilter().getState(LiveId) //read hbase code

code in LiveStateBloomfilter

class LiveStateBloomfilter extends LoggingTime with Serializable {

  def getState(liveId: String): Array[String] = {
    val hbaseCon = HbaseConfCDH.hbaseContest    //can not get hbasecontext for it need sparkcontext
    val tablename: String = "live_state"
    val scan = new Scan
    val filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(liveId + "*"))
    scan.setFilter(filter)
    scan.setCaching(100)
    val result = ArrayBuffer[String]()
    logger.warn(HbaseConfCDH.sc)
    val resultBroadCast = HbaseConfCDH.sc.broadcast(result)
    logger.warn("state recovery!!!")
    hbaseCon.hbaseRDD(TableName.valueOf(tablename), scan)
Echo
  • 35
  • 1
  • 7

0 Answers0