As described in Spark Structured Streaming with Hbase integration, I'm interesting in writing data to HBase in structured streaming framework. I've cloned SHC code from github, extend it by sync provider and trying to write records to HBase. Buy, I've received error: """Queries with streaming sources must be executed with writeStream.start(). My python code is below:
JARS
Spark version: 2.4.0, scala.version: 2.11.8, com.hortonworks.shc-core: 1.1.1-2.1-s_2.11 spark-sql-kafka-0-10_2.11
spark = SparkSession \
.builder \
.appName("SparkConsumer") \
.getOrCreate()
print 'read Avro schema from file: {}...'.format(schema_name)
schema = avro.schema.parse(open(schema_name, 'rb').read())
reader = avro.io.DatumReader(schema)
print 'the schema is read'
rows = spark \
.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', brokers) \
.option('subscribe', topic) \
.option('group.id', group_id) \
.option('maxOffsetsPerTrigger', 1000) \
.option("startingOffsets", "earliest") \
.load()
rows.printSchema()
schema = StructType([ \
StructField('consumer_id', StringType(), False), \
StructField('audit_system_id', StringType(), False), \
StructField('object_path', StringType(), True), \
StructField('object_type', StringType(), False), \
StructField('what_action', StringType(), False), \
StructField('when', LongType(), False), \
StructField('where', StringType(), False), \
StructField('who', StringType(), True), \
StructField('workstation', StringType(), True) \
])
def decode_avro(msg):
bytes_reader = io.BytesIO(bytes(msg))
decoder = avro.io.BinaryDecoder(bytes_reader)
data = reader.read(decoder)
return (\
data['consumer_id'],\
data['audit_system_id'],\
data['object_path'],\
data['object_type'],\
data['what_action'],\
data['when'],\
data['where'],\
data['who'],\
data['workstation']\
)
udf_decode_avro = udf(decode_avro, schema)
values = rows.select('value')
values.printSchema()
changes = values.withColumn('change', udf_decode_avro(col('value'))).select('change.*')
changes.printSchema()
change_catalog = '''
{
"table":
{
"namespace": "uba_input",
"name": "changes"
},
"rowkey": "consumer_id",
"columns":
{
"consumer_id": {"cf": "rowkey", "col": "consumer_id", "type": "string"},
"audit_system_id": {"cf": "data", "col": "audit_system_id", "type": "string"},
"object_path": {"cf": "data", "col": "object_path", "type": "string"},
"object_type": {"cf": "data", "col": "object_type", "type": "string"},
"what_action": {"cf": "data", "col": "what_action", "type": "string"},
"when": {"cf": "data", "col": "when", "type": "bigint"},
"where": {"cf": "data", "col": "where", "type": "string"},
"who": {"cf": "data", "col": "who", "type": "string"},
"workstation": {"cf": "data", "col": "workstation", "type": "string"}
}
}'''
query = changes \
.writeStream \
.outputMode("append") \
.format('HBase.HBaseSinkProvider')\
.option('hbasecat', change_catalog) \
.option("checkpointLocation", '/tmp/checkpoint') \
.start()
# .format('org.apache.spark.sql.execution.datasources.hbase')\
# query = changes \
# .writeStream \
# .format('console') \
# .start()
query.awaitTermination()
EXCEPTION
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; LogicalRDD [key#146, value#147, topic#148, partition#149, offset#150L, timestamp#151, timestampType#152], true at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:37) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:381)