1

I have test Kafka cluster with topic 'topic1' (with schema in Schema Registry).

I created stream from this topic:

CREATE STREAM topic1_basic_stream
WITH (KAFKA_TOPIC='topic1', VALUE_FORMAT='AVRO', TIMESTAMP='triggertime', TIMESTAMP_FORMAT='yyyyMMddHHmmssX');

Then I created aggregated table as select PUSH query (without windowing), with today data only:

CREATE TABLE topic1_agg_table_JSON WITH(VALUE_FORMAT='JSON') AS
select
   concat(product_type,' - ',region) id
  ,sum(sale_charge) sum_sale_charge
  ,count(1) cnt
from topic1_basic_stream
where TIMESTAMPTOSTRING(rowtime, 'yyyyMMdd') = TIMESTAMPTOSTRING(UNIX_TIMESTAMP(), 'yyyyMMdd')
group by concat(product_type,' - ',region)
EMIT CHANGES;

From ksqlDB CLI I run and see OK results:

SET 'auto.offset.reset'='earliest';
select * from topic1_agg_table_JSON EMIT CHANGES;
+----------------------+---------------------------+-----+
|id                    |sum_sale_charge            |CNT  |
+----------------------+---------------------------+-----+
|Pen - London          |90.0                       |45   |
|Book - Paris          |45.0                       |9    |
|Pen - Amsterdam       |26.0                       |13   |
|Keyboard - Oslo       |60.0                       |6    |
|Pen - London          |92.0                       |46   |

Press CTRL-C to interrupt

Also I can see topic1_agg_table_JSON in topics list, with json messages inside.

Target: I want to write consumer in node.js to emit these messages (using websockets) to browser(client) and visualize it (real-time) on client side.

Already tried: kafka-node module. Example code taken from https://github.com/SOHU-Co/kafka-node/blob/master/example/consumer.js With simple original (simple) Kafka topic everything works fine in this code, but if I'll change topic to my topic1_agg_table_JSON, it won't throw any error and won't print any messages.

Question: What is proper way to consume data from topic1_agg_table_JSON using Node.js ?

deeplay
  • 376
  • 3
  • 20

1 Answers1

0

Resolved. Problem were in outdated kafka-node module version. After update (npm update kafka-node) , app successfully consumes all messages from topic, but not as I expected (target is not to receive all the messages, but only latest versions of every message, and that's another issue).

deeplay
  • 376
  • 3
  • 20