3

I'm seeing an issue when creating a spark streaming table using kafka from the snappy shell.

'The exception 'Invalid input 'C', expected dmlOperation, insert, withIdentifier, select or put (line 1, column 1):'

Reference: http://snappydatainc.github.io/snappydata/streamingWithSQL/#spark-streaming-overview

Here is my sql:

CREATE STREAM TABLE if not exists sensor_data_stream 
(sensor_id string, metric string)
using kafka_stream 
options (
    storagelevel 'MEMORY_AND_DISK_SER_2',
    rowConverter 'io.snappydata.app.streaming.KafkaStreamToRowsConverter',
    zkQuorum 'localhost:2181',
    groupId 'streamConsumer',
    topics 'test:01');

The shell seems to not like the script at the first character 'C'. I'm attempting to execute the script using the following command:

snappy> run '/scripts/my_test_sensor_script.sql';

any help appreciated!

Michael Benjamin
  • 346,931
  • 104
  • 581
  • 701
mike w
  • 131
  • 6

2 Answers2

3

There is some inconsistency in documentation and actual syntax.The correct syntax is:

CREATE STREAM TABLE sensor_data_stream if not exists (sensor_id string, 
metric string) using kafka_stream 
options (storagelevel 'MEMORY_AND_DISK_SER_2', 
rowConverter 'io.snappydata.app.streaming.KafkaStreamToRowsConverter', 
zkQuorum 'localhost:2181',
 groupId 'streamConsumer',  topics 'test:01');

One more thing you need to do is to write row converter for your data

Sachin Janani
  • 1,310
  • 1
  • 17
  • 33
  • 1
    Thank you Sachin! the updated syntax worked. As expected based on your comment about writing a row converter, I'm getting a ClassNotFound exception on io.snappydata.app.streaming.KafkaStreamToRowsConverter. I will search the documentation to work through that issue. – mike w Aug 09 '16 at 15:06
0

Mike, You need to create your own rowConverter class by implementing following trait -

trait StreamToRowsConverter extends Serializable {
  def toRows(message: Any): Seq[Row]
}

and then specify that rowConverter fully qualified class name in the DDL. The rowConverter is specific to a schema. 'io.snappydata.app.streaming.KafkaStreamToRowsConverter' is just an placeholder class name, which should be replaced by your own rowConverter class.

Yogesh Mahajan
  • 241
  • 1
  • 4
  • Thanks Yogesh. I'm attempting to convert a Java Object that I send to Kafka. I'm trying to understand how to convert this object using the row converter. Is there any documentation that you would recommend that explains this process? – mike w Aug 11 '16 at 16:48