0

I state that I am a beginner with this type of development, so if there are some stupid errors I apologize in advance. I created an Express.js server to use as base for my service, here the settings and the code:

{
  "name": "nodejstrial",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "scripts": {
    "dev": "nodemon src/index.js"
  },
  "keywords": [],
  "author": "",
  "license": "ISC",
  "dependencies": {
    "@kafkajs/confluent-schema-registry": "^3.3.0",
    "body-parser": "^1.20.1",
    "ejs-mate": "^4.0.0",
    "events": "^3.3.0",
    "express": "^4.18.2",
    "kafka-node": "^5.0.0",
    "kafkajs": "^2.2.2",
    "morgan": "^1.10.0",
    "node-fetch": "^3.3.0",
    "socket.io": "^4.5.3",
    "tiny-json-http": "^7.4.2"
  },
  "devDependencies": {
    "nodemon": "^2.0.20"
  }

}

const express = require('express');
const engine = require('ejs-mate');
const path = require('path');
const {consumeMessage}= require('./consumer')
const port = 3000;
// Initializiation
const app = express();   
//Settings
app.engine('ejs',engine);
app.set('view engine','ejs');
app.set('views',path.join(__dirname,'views'));
app.set('src',__dirname);

app.use(express.json())
//Routes
app.use(require('./routes/'));

// static-files
app.use(express.static(path.join(__dirname,'static')))



// Starting server
app.listen(port,() =>{
  console.log('APP started at http://localhost:'+port);
  consumeMessage()
})

The directory structure is the following:

.
├── src
│   ├── routes
│   │     └── index.js
│   ├ static
│   │     ├──  css
│   │     ├── favicon.ico
│   │     ├── images
│   │     └── js
│   │         └── main.js
│   └── views
│   │     └── index.ejs
│   │     
│   └── consumer.js
│   └── index.js

Practically the program listen to a Kafka broker using KafkaJS, when a message arrive it will generate a server-sent event that should trigger the creation of a marker in the map. This is the code of the consumer.js:

const { Kafka } = require('kafkajs')
const router =require('./routes/index')
const fetchP = import('node-fetch').then(mod => mod.default)
const fetch = (...args) => fetchP.then(fn => fn(...args))
var bodyParser = require('body-parser')
var jsonParser = bodyParser.json()
var url_marker = 'http://localhost:3000/topic/bleLocalization'

const username = ''
const password = ''
const brokers = ['BROKER_LINK']

const clientId = 'mapConsumer'
const groupId = 'mapApplication'
const topic = 'bleLocalization'
let body={data:'start'};

const sasl = username && password ? { username, password, mechanism: 'plain' } : null
const ssl = !!sasl


const kafka = new Kafka({ clientId, brokers /*ssl sasl*/ })


const consumer = kafka.consumer({groupId:groupId })

router.get('/topic/bleLocalization',jsonParser,  function (req,res){
    console.log(body)
    res.set({
        'Cache-Control': 'no-cache',
        'Content-Type': 'text/event-stream',
        'Connection': 'keep-alive'
    });
    res.flushHeaders();
    res.write('data: '+body+'\n\n');

})


const consumeMessage = async () => {
  await consumer.disconnect();
  await consumer.connect()
  await consumer.subscribe({topic:topic,fromBeginning: true})
  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log('received message')
   try {

       body = JSON.stringify(JSON.parse(message.value))

       try{

           const response = await fetch(url_marker, {
               method: 'get',
               headers: {    'Cache-Control': 'no-cache',
                   'Content-Type': 'text/event-stream',
                   'Connection': 'keep-alive'}
           });
       }catch (e) {
           console.log(e)
       }

   } catch (e) {
    await console.error('unable to handle incoming message', e);

  }
},
})
  // await consumer.disconnect()
}




module.exports.consumeMessage = consumeMessage

To do this in the consumer at the begin I initialize a variable

let body={data:'start'};

That everytime a message arrives it is overwrited. I think that there is a better approach but for the moment this workaroud works. The code till here is able to generate events, the problem arise in the interaction with the static javascript main.js. Each events will bring data that I will use to print proper stuffs on leaflet map

  const url= 'http://localhost:3000/topic/bleLocalization';

      var source1 = new EventSource(url);
      //Attach a listener to the responses triggered by the source
      source1.onmessage= function(e) {
          console.log("MESSAGE")
          //Snippet of code launched each time an event arrive
          // MSG FORMAT [{'floor':int,'x':int,'y':int,'id':String},...]
          let array = JSON.parse(e.data);
          // DO STUFF WITH ARRAY


      };

For sake of completeness I put also the code of the index.ejs

<html>
  <head>
    <meta charset="utf-8">

    <!-- LEAFLET -->
<link rel="stylesheet" href="https://unpkg.com/leaflet@1.9.3/dist/leaflet.css" />
<script src="https://unpkg.com/leaflet@1.9.3/dist/leaflet.js"></script>
    <!-- END LEAFLET -->
    <link rel="apple-touch-icon" sizes="57x57" href="/favicon.ico/apple-icon-57x57.png">
    <meta name="theme-color" content="#ffffff">
    <title>Live Map</title>
    <!-- <link rel ="stylesheet" href="/css/main.css" -->
  </head>
  <body>
    <h1>Live Map</h1>

    <!-- LEAFLET -->
    <script src="https://unpkg.com/leaflet@1.9.3/dist/leaflet.js">
    </script>
    <div id="map-template" class="leaflet-container leaflet-touch leaflet-fade-anim leaflet-grab leaflet-touch-drag leaflet-touch-zoom __web-inspector-hide-shortcut__"
     style = "width:1700px; height:750px;"></div>
    <script src="/socket.io/socket.io.js"></script>
     <script type="module" src="/js/geoFences.js"></script>
    <script type="module" src="/js/main.js"></script>
    <!-- END LEAFLET -->

  </body>
</html>

EDIT

The idea is to have similar result tha I have with a Flask server:

WEB_HOST = '0.0.0.0'
PORT = 5000


def get_kafka_client():
    return KafkaClient(hosts='BROKER')

app = Flask(__name__)

@app.route('/')
def index():
    return(render_template('index.html'))

#Consumer API
@app.route('/topic/<topicname>')
def get_messages(topicname):
    client = get_kafka_client()
    def events():
        for item in client.topics[topicname].get_simple_consumer():
            yield 'data:{0}\n\n'.format(item.value.decode())
    return Response(events(), mimetype="text/event-stream")
Rikissssss
  • 17
  • 5
  • This seems backwards. 1) Kafka is setting `body` asynchonously, so your HTML page will never "get" the latest message. 2) `await fetch` in the consumer is not necessary; you should be using server-sent-events (SSE) there, not fetch API – OneCricketeer Feb 10 '23 at 20:50
  • I correct the first point but still same behaviour. I don't get what does you mean with "you should be using server-sent-events (SSE) there, not fetch API ". Sorry It is the first time I manage these stuffs. – Rikissssss Feb 13 '23 at 08:12
  • Compare your code to answers here - https://stackoverflow.com/q/34657222/2308683 . Within the consumer, you are using `const response = await fetch`, but this sends data **to the server**, not leaflet frontend since that is not an HTTP server that accepts a GET method – OneCricketeer Feb 13 '23 at 16:40
  • I suggest you create a [mcve] that simply logs the kafka messages in the frontend, and remove all the leaflef/map code for now. – OneCricketeer Feb 13 '23 at 16:50
  • I try to follow your advices, in the meanwhile I edited the post to show you what I want to achieve basically. This is done with a simple Flask server but I want to translate it in an Express.js server. – Rikissssss Feb 14 '23 at 11:56
  • Thanks, but what do you mean "overridden"? What is "DO STUFF" actually doing? Still regarding Kafka, As mentioned, `Response(events(), mimetype="text/event-stream")` is not the same as `await fetch`. I see you've added `socket.io/socket.io.js`, but you're never using this? – OneCricketeer Feb 14 '23 at 15:08
  • Honestly, I think it'd be better for you to start over with the existing Confluent Kafka REST Proxy (or ksqlDB) rather than run Express or Flask, to do the same. There are edge cases here around offset management and rebalancing that are complicated, and you're not handling at all. Alternatively, dumping data into a database rather than streaming directly from Kafka would obviously be more persistent (rather than lose all map entries when you refresh your browser) – OneCricketeer Feb 14 '23 at 15:14
  • To answer the two comments. The idea of the app is to consume only the last message on Kafka queue, because this part of the app is built to show in real time the data on the map(I instist to use Express.JS because this map should be integrated on a larger system based on this framework). Doing stuff means receive the event, parse the event and print the data on the map as marker. My idea is to reach the same result that has Response(events(), mimetype="text/event-stream") in Flask on Express.js, but as I said I'm new to this so I am I little lost. I added socket.io to try another approach. – Rikissssss Feb 15 '23 at 09:00
  • So the sum up, for this app I don't want persistent data, but only the last data to consume,parse and show on the fly. – Rikissssss Feb 15 '23 at 09:01
  • Okay sure. 1) You don't need expressjs, any http backend will work, such as Flask, Spring, etc 2) When I asked what "do stuff" is going, your question currently does nothing to show those parsed events. Socket.io is just one alternative, but you need to clarify more what/where your code isn't working – OneCricketeer Feb 15 '23 at 15:27
  • 1
    Actually to make it work in Express.js I modified only the route of GET rewriting in this way: `res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Access-Control-Allow-Origin', '*'); res.setHeader('Connection', 'keep-alive'); res.flushHeaders(); let interValID = setInterval(() => { res.write('data: '+body+'\n\n'); clearInterval(interValID); res.end(); // terminates SSE session return;}, 5000);});` – Rikissssss Feb 16 '23 at 10:39
  • I understood my error, but now I will try to do this in Angular.js, following your advice approaching with an HTTP backend. Thanks – Rikissssss Feb 16 '23 at 10:40
  • The frontend framework doesn't matter, either. You'll still eventually end up with EventSource instance used somewhere, which requires any HTTP backend to send it events... Also, you shouldn't need `'data:` and two line breaks. Just send JSON payload and flush out the response writer, and like I said before, `body` won't be every Kafka message – OneCricketeer Feb 16 '23 at 14:11

1 Answers1

0

For getting data from KafkaJS, to the response, you should not be using global variables across async functions.

Pass the response object into your consumer. For example,

const consumer = kafka.consumer({ groupId })

router.get('/topic/bleLocalization',jsonParser,  async (req,res) => {
    res.set({
        'Cache-Control': 'no-cache',
        'Content-Type': 'text/event-stream',
        'Connection': 'keep-alive'
    });
    res.flushHeaders();
    // ideally, you would extract the topic name from the path 
    await consumeMessage('bleLocalization', res);
})


async consumeMessage(topic, res) {
  await consumer.disconnect();
  await consumer.connect()
  await consumer.subscribe({topic,fromBeginning: true})
  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log('received message');
       try {

           const body = JSON.parse(message.value);
           res.write(JSON.stringify({'data' : body}) + '\n');

Keep in mind that this will run forever (until the consumer disconnects and throws an error)

Then,in the frontend code, at least do console.log(array) in your message listener to actually see the message data (but, it should be an object, not an "array" since you have an object with one key of data). Otherwise, only write body if the data in the topic is a JSON array

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245