I have test Kafka cluster with topic 'topic1'
(with schema in Schema Registry).
I created stream from this topic:
CREATE STREAM topic1_basic_stream
WITH (KAFKA_TOPIC='topic1', VALUE_FORMAT='AVRO', TIMESTAMP='triggertime', TIMESTAMP_FORMAT='yyyyMMddHHmmssX');
Then I created aggregated table as select PUSH query (without windowing), with today data only:
CREATE TABLE topic1_agg_table_JSON WITH(VALUE_FORMAT='JSON') AS
select
concat(product_type,' - ',region) id
,sum(sale_charge) sum_sale_charge
,count(1) cnt
from topic1_basic_stream
where TIMESTAMPTOSTRING(rowtime, 'yyyyMMdd') = TIMESTAMPTOSTRING(UNIX_TIMESTAMP(), 'yyyyMMdd')
group by concat(product_type,' - ',region)
EMIT CHANGES;
From ksqlDB CLI I run and see OK results:
SET 'auto.offset.reset'='earliest';
select * from topic1_agg_table_JSON EMIT CHANGES;
+----------------------+---------------------------+-----+
|id |sum_sale_charge |CNT |
+----------------------+---------------------------+-----+
|Pen - London |90.0 |45 |
|Book - Paris |45.0 |9 |
|Pen - Amsterdam |26.0 |13 |
|Keyboard - Oslo |60.0 |6 |
|Pen - London |92.0 |46 |
Press CTRL-C to interrupt
Also I can see topic1_agg_table_JSON
in topics list, with json messages inside.
Target: I want to write consumer in node.js to emit these messages (using websockets) to browser(client) and visualize it (real-time) on client side.
Already tried: kafka-node
module. Example code taken from https://github.com/SOHU-Co/kafka-node/blob/master/example/consumer.js With simple original (simple) Kafka topic everything works fine in this code, but if I'll change topic to my topic1_agg_table_JSON
, it won't throw any error and won't print any messages.
Question: What is proper way to consume data from topic1_agg_table_JSON
using Node.js ?