0

As described in Spark Structured Streaming with Hbase integration, I'm interesting in writing data to HBase in structured streaming framework. I've cloned SHC code from github, extend it by sync provider and trying to write records to HBase. Buy, I've received error: """Queries with streaming sources must be executed with writeStream.start(). My python code is below:

JARS

Spark version: 2.4.0, scala.version: 2.11.8, com.hortonworks.shc-core: 1.1.1-2.1-s_2.11 spark-sql-kafka-0-10_2.11

    spark = SparkSession \
        .builder \
        .appName("SparkConsumer") \
        .getOrCreate()

    print 'read Avro schema from file: {}...'.format(schema_name)
    schema = avro.schema.parse(open(schema_name, 'rb').read())
    reader = avro.io.DatumReader(schema)
    print 'the schema is read'

    rows = spark \
        .readStream \
        .format('kafka') \
        .option('kafka.bootstrap.servers', brokers) \
        .option('subscribe', topic) \
        .option('group.id', group_id) \
        .option('maxOffsetsPerTrigger', 1000) \
        .option("startingOffsets", "earliest") \
        .load()
    rows.printSchema()

    schema = StructType([ \
            StructField('consumer_id', StringType(), False), \
            StructField('audit_system_id', StringType(), False), \
            StructField('object_path', StringType(), True), \
            StructField('object_type', StringType(), False), \
            StructField('what_action', StringType(), False), \
            StructField('when', LongType(), False), \
            StructField('where', StringType(), False), \
            StructField('who', StringType(), True), \
            StructField('workstation', StringType(), True) \
        ])

    def decode_avro(msg):
        bytes_reader = io.BytesIO(bytes(msg))
        decoder = avro.io.BinaryDecoder(bytes_reader)
        data = reader.read(decoder)
        return (\
                data['consumer_id'],\
                data['audit_system_id'],\
                data['object_path'],\
                data['object_type'],\
                data['what_action'],\
                data['when'],\
                data['where'],\
                data['who'],\
                data['workstation']\
               )

    udf_decode_avro = udf(decode_avro, schema)

    values = rows.select('value')
    values.printSchema()

    changes = values.withColumn('change', udf_decode_avro(col('value'))).select('change.*')
    changes.printSchema()

    change_catalog = '''
    {
        "table":
        {
            "namespace": "uba_input",
            "name": "changes"
        },
        "rowkey": "consumer_id",
        "columns":
        {
            "consumer_id": {"cf": "rowkey", "col": "consumer_id", "type": "string"},
            "audit_system_id": {"cf": "data", "col": "audit_system_id", "type": "string"},
            "object_path": {"cf": "data", "col": "object_path", "type": "string"},
            "object_type": {"cf": "data", "col": "object_type", "type": "string"},
            "what_action": {"cf": "data", "col": "what_action", "type": "string"},
            "when": {"cf": "data", "col": "when", "type": "bigint"},
            "where": {"cf": "data", "col": "where", "type": "string"},
            "who": {"cf": "data", "col": "who", "type": "string"},
            "workstation": {"cf": "data", "col": "workstation", "type": "string"}
        }
    }'''

    query = changes \
        .writeStream \
        .outputMode("append") \
        .format('HBase.HBaseSinkProvider')\
        .option('hbasecat', change_catalog) \
        .option("checkpointLocation", '/tmp/checkpoint') \
        .start()

#        .format('org.apache.spark.sql.execution.datasources.hbase')\
#    query = changes \
#        .writeStream \
#        .format('console') \
#        .start()

    query.awaitTermination()

EXCEPTION

org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; LogicalRDD [key#146, value#147, topic#148, partition#149, offset#150L, timestamp#151, timestampType#152], true at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:37) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:381)

0 Answers0