I state that I am a beginner with this type of development, so if there are some stupid errors I apologize in advance. I created an Express.js server to use as base for my service, here the settings and the code:
{
"name": "nodejstrial",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"dev": "nodemon src/index.js"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"@kafkajs/confluent-schema-registry": "^3.3.0",
"body-parser": "^1.20.1",
"ejs-mate": "^4.0.0",
"events": "^3.3.0",
"express": "^4.18.2",
"kafka-node": "^5.0.0",
"kafkajs": "^2.2.2",
"morgan": "^1.10.0",
"node-fetch": "^3.3.0",
"socket.io": "^4.5.3",
"tiny-json-http": "^7.4.2"
},
"devDependencies": {
"nodemon": "^2.0.20"
}
}
const express = require('express');
const engine = require('ejs-mate');
const path = require('path');
const {consumeMessage}= require('./consumer')
const port = 3000;
// Initializiation
const app = express();
//Settings
app.engine('ejs',engine);
app.set('view engine','ejs');
app.set('views',path.join(__dirname,'views'));
app.set('src',__dirname);
app.use(express.json())
//Routes
app.use(require('./routes/'));
// static-files
app.use(express.static(path.join(__dirname,'static')))
// Starting server
app.listen(port,() =>{
console.log('APP started at http://localhost:'+port);
consumeMessage()
})
The directory structure is the following:
.
├── src
│ ├── routes
│ │ └── index.js
│ ├ static
│ │ ├── css
│ │ ├── favicon.ico
│ │ ├── images
│ │ └── js
│ │ └── main.js
│ └── views
│ │ └── index.ejs
│ │
│ └── consumer.js
│ └── index.js
Practically the program listen to a Kafka broker using KafkaJS, when a message arrive it will generate a server-sent event that should trigger the creation of a marker in the map. This is the code of the consumer.js:
const { Kafka } = require('kafkajs')
const router =require('./routes/index')
const fetchP = import('node-fetch').then(mod => mod.default)
const fetch = (...args) => fetchP.then(fn => fn(...args))
var bodyParser = require('body-parser')
var jsonParser = bodyParser.json()
var url_marker = 'http://localhost:3000/topic/bleLocalization'
const username = ''
const password = ''
const brokers = ['BROKER_LINK']
const clientId = 'mapConsumer'
const groupId = 'mapApplication'
const topic = 'bleLocalization'
let body={data:'start'};
const sasl = username && password ? { username, password, mechanism: 'plain' } : null
const ssl = !!sasl
const kafka = new Kafka({ clientId, brokers /*ssl sasl*/ })
const consumer = kafka.consumer({groupId:groupId })
router.get('/topic/bleLocalization',jsonParser, function (req,res){
console.log(body)
res.set({
'Cache-Control': 'no-cache',
'Content-Type': 'text/event-stream',
'Connection': 'keep-alive'
});
res.flushHeaders();
res.write('data: '+body+'\n\n');
})
const consumeMessage = async () => {
await consumer.disconnect();
await consumer.connect()
await consumer.subscribe({topic:topic,fromBeginning: true})
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log('received message')
try {
body = JSON.stringify(JSON.parse(message.value))
try{
const response = await fetch(url_marker, {
method: 'get',
headers: { 'Cache-Control': 'no-cache',
'Content-Type': 'text/event-stream',
'Connection': 'keep-alive'}
});
}catch (e) {
console.log(e)
}
} catch (e) {
await console.error('unable to handle incoming message', e);
}
},
})
// await consumer.disconnect()
}
module.exports.consumeMessage = consumeMessage
To do this in the consumer at the begin I initialize a variable
let body={data:'start'};
That everytime a message arrives it is overwrited. I think that there is a better approach but for the moment this workaroud works. The code till here is able to generate events, the problem arise in the interaction with the static javascript main.js. Each events will bring data that I will use to print proper stuffs on leaflet map
const url= 'http://localhost:3000/topic/bleLocalization';
var source1 = new EventSource(url);
//Attach a listener to the responses triggered by the source
source1.onmessage= function(e) {
console.log("MESSAGE")
//Snippet of code launched each time an event arrive
// MSG FORMAT [{'floor':int,'x':int,'y':int,'id':String},...]
let array = JSON.parse(e.data);
// DO STUFF WITH ARRAY
};
For sake of completeness I put also the code of the index.ejs
<html>
<head>
<meta charset="utf-8">
<!-- LEAFLET -->
<link rel="stylesheet" href="https://unpkg.com/leaflet@1.9.3/dist/leaflet.css" />
<script src="https://unpkg.com/leaflet@1.9.3/dist/leaflet.js"></script>
<!-- END LEAFLET -->
<link rel="apple-touch-icon" sizes="57x57" href="/favicon.ico/apple-icon-57x57.png">
<meta name="theme-color" content="#ffffff">
<title>Live Map</title>
<!-- <link rel ="stylesheet" href="/css/main.css" -->
</head>
<body>
<h1>Live Map</h1>
<!-- LEAFLET -->
<script src="https://unpkg.com/leaflet@1.9.3/dist/leaflet.js">
</script>
<div id="map-template" class="leaflet-container leaflet-touch leaflet-fade-anim leaflet-grab leaflet-touch-drag leaflet-touch-zoom __web-inspector-hide-shortcut__"
style = "width:1700px; height:750px;"></div>
<script src="/socket.io/socket.io.js"></script>
<script type="module" src="/js/geoFences.js"></script>
<script type="module" src="/js/main.js"></script>
<!-- END LEAFLET -->
</body>
</html>
EDIT
The idea is to have similar result tha I have with a Flask server:
WEB_HOST = '0.0.0.0'
PORT = 5000
def get_kafka_client():
return KafkaClient(hosts='BROKER')
app = Flask(__name__)
@app.route('/')
def index():
return(render_template('index.html'))
#Consumer API
@app.route('/topic/<topicname>')
def get_messages(topicname):
client = get_kafka_client()
def events():
for item in client.topics[topicname].get_simple_consumer():
yield 'data:{0}\n\n'.format(item.value.decode())
return Response(events(), mimetype="text/event-stream")