We are integrating Eclipse Ditto into a digital twin platform, but we have encountered a problem while testing and we don't really know how to fix it.
As a context, the goal is to receive in 593 twins (Ditto Thing) the result of a simulation. The idea is to be able to do several simulation runs simultaneously and that each simulation run sends 593 messages to a Kafka topic. For example, for 6 runs we will have 3558 messages in the topic.
In Eclipse Ditto we have created 593 things, one for each element returned by the simulation. We have established in Eclipse Ditto a source connection with Kafka, where it performs a Javascript mapping of each message. In this mapping, a merge is performed where the features of the twin corresponding to the message are updated, together with an attribute that indicates to which simulation identifier this new data corresponds.
On the other hand, we have a target connection with MQTT. In the connection we send all the events of any Ditto thing in order to store them in a database. In this connection no mapping is done, it is sent directly in Ditto Protocol.
The problem that we have is the following. Initially we were using Eclipse Ditto version 2.3 but, although all messages are received in MQTT, it was incredibly slow. Reading 6 runs took about 20 minutes approximately. We know that this is not a problem of the simulation because all messages are ready in Kafka's topic in just a few seconds. We decided to upgrade to the latest version (which is 3.2 currently) which seems to be much faster. According to the metrics and logs, this version consumes and maps all the messages in the Kafka topic, so it seems that things are updated. This makes it much faster. The problem is that not all update events are sent over the target connection. For example, not even 1000 out of 3558 are sent over MQTT. According to the logs the messages are being dropped as a result of a backpressure strategy. Considering this page, we have changed some environment variables but we still get the same problem. I put them below, in Helm's values.yaml that we use to install Eclipse Ditto. When a single run is sent it works correctly, it starts to fail when several runs are sent simultaneously. We have also tried making the target connection with Kafka, but the same thing happens. Likewise, we have installed it on two different computers with enough resources to run it easily, but the result has been the same on both.
Here are the connections we have established, their logs and metrics, along with the Helm's values.yaml.
Source connection:
{
"name": "connection-for-simulation",
"connectionType": "kafka",
"connectionStatus": "open",
"failoverEnabled": true,
"uri": "tcp://KAFKAIP",
"specificConfig": {
"bootstrapServers": "KAFKAIP",
"saslMechanism": "plain"
},
"sources": [
{
"addresses": [
"riego"
],
"consumerCount": 1,
"qos": 1,
"authorizationContext": [
"nginx:ditto"
],
"headerMapping": {
"id": "{{ entity:id }}",
"namespace": "{{ entity:namespace }}",
"connection": "{{ connection:id }}"
},
"payloadMapping": [
"javascript"
]
}
],
"mappingDefinitions": {
"javascript": {
"mappingEngine": "JavaScript",
"options": {
"incomingScript": "function mapToDittoProtocolMsg(headers, textPayload, bytePayload, contentType) { const jsonData = JSON.parse(textPayload); const degree = jsonData[0]; const data = jsonData[1]; var id = 'irrigation_simulation:' + data.Id; headers = Object.assign(headers, { 'Content-Type': 'application/merge-patch+json' }); var features = { idSimulationRun: { properties: { value: 'degree_' + degree } } }; Object.keys(data).forEach((key) => { if (key !== 'Id') { obj = {}; obj[key] = { properties: { value: data[key] } }; Object.assign(features, obj); } }); const thing = { attributes: { _parents: 'pivot:irrigation_simulation' }, features: features }; return Ditto.buildDittoProtocolMsg( 'pivot', id, 'things', 'twin', 'commands', 'merge', '/', headers, thing ); }"
}
}
}
}
Target connection:
{
"name": "mqtt-connection",
"connectionType": "mqtt-5",
"connectionStatus": "open",
"failoverEnabled": true,
"uri": "tcp://MQTTIP",
"sources": [],
"targets": [
{
"address": "opentwins/{{ topic:channel }}/{{ topic:criterion }}/{{ thing:namespace }}/{{ thing:name }}",
"qos": 1,
"topics": [
"_/_/things/twin/events?extraFields=thingId,attributes/_parents,features/idSimulationRun/properties/value",
"_/_/things/live/messages",
"_/_/things/live/commands"
],
"authorizationContext": [
"nginx:ditto"
]
}
]
}
Logs source connection: https://pastebin.com/zMKetKtL
Metrics source connection:
{
"type": "connectivity.responses:retrieveConnectionMetrics",
"status": 200,
"connectionId": "59ef4006-9b18-40f4-8d45-8117920e5d11",
"connectionMetrics": {
"inbound": {
"consumed": {
"success": {
"PT1M": 0,
"PT1H": 3558,
"PT24H": 3558,
"lastMessageAt": "2023-04-17T12:54:03.125Z"
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"mapped": {
"success": {
"PT1M": 0,
"PT1H": 3558,
"PT24H": 3558,
"lastMessageAt": "2023-04-17T12:54:03.461Z"
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"dropped": {
"success": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"enforced": {
"success": {
"PT1M": 0,
"PT1H": 3558,
"PT24H": 3558,
"lastMessageAt": "2023-04-17T12:54:03.461Z"
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"acknowledged": {
"success": {
"PT1M": 0,
"PT1H": 3558,
"PT24H": 3558,
"lastMessageAt": "2023-04-17T12:54:03.508Z"
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"throttled": {
"success": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
},
"failure": {
"PT1M": 0,
"PT1H": 2,
"PT24H": 2,
"lastMessageAt": "2023-04-17T12:54:03.461Z"
}
}
},
"outbound": {
"dispatched": {
"success": {
"PT1M": 0,
"PT1H": 3558,
"PT24H": 3558,
"lastMessageAt": "2023-04-17T12:54:03.507Z"
},
"failure": {
"PT1M": 0,
"PT1H": 904,
"PT24H": 904,
"lastMessageAt": "2023-04-17T12:54:03.509Z"
}
},
"filtered": {
"success": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"mapped": {
"success": {
"PT1M": 0,
"PT1H": 2654,
"PT24H": 2654,
"lastMessageAt": "2023-04-17T12:54:03.883Z"
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"dropped": {
"success": {
"PT1M": 0,
"PT1H": 2654,
"PT24H": 2654,
"lastMessageAt": "2023-04-17T12:54:03.883Z"
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"published": {
"success": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"acknowledged": {
"success": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
}
}
},
"containsFailures": true,
"sourceMetrics": {
"addressMetrics": {
"riego": {
"consumed": {
"success": {
"PT1M": 0,
"PT1H": 3558,
"PT24H": 3558,
"lastMessageAt": "2023-04-17T12:54:03.125Z"
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"mapped": {
"success": {
"PT1M": 0,
"PT1H": 3558,
"PT24H": 3558,
"lastMessageAt": "2023-04-17T12:54:03.461Z"
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"dropped": {
"success": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"enforced": {
"success": {
"PT1M": 0,
"PT1H": 3558,
"PT24H": 3558,
"lastMessageAt": "2023-04-17T12:54:03.461Z"
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"acknowledged": {
"success": {
"PT1M": 0,
"PT1H": 3558,
"PT24H": 3558,
"lastMessageAt": "2023-04-17T12:54:03.508Z"
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"throttled": {
"success": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
},
"failure": {
"PT1M": 0,
"PT1H": 2,
"PT24H": 2,
"lastMessageAt": "2023-04-17T12:54:03.461Z"
}
}
}
}
},
"targetMetrics": {
"addressMetrics": {
"_responses": {
"dispatched": {
"success": {
"PT1M": 0,
"PT1H": 3558,
"PT24H": 3558,
"lastMessageAt": "2023-04-17T12:54:03.507Z"
},
"failure": {
"PT1M": 0,
"PT1H": 904,
"PT24H": 904,
"lastMessageAt": "2023-04-17T12:54:03.509Z"
}
},
"filtered": {
"success": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"mapped": {
"success": {
"PT1M": 0,
"PT1H": 2654,
"PT24H": 2654,
"lastMessageAt": "2023-04-17T12:54:03.883Z"
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"dropped": {
"success": {
"PT1M": 0,
"PT1H": 2654,
"PT24H": 2654,
"lastMessageAt": "2023-04-17T12:54:03.883Z"
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"published": {
"success": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"acknowledged": {
"success": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
}
}
}
}
}
Logs target connection: https://pastebin.com/VSApKH0g
Metrics target connection:
{
"type": "connectivity.responses:retrieveConnectionMetrics",
"status": 200,
"connectionId": "bf7da601-b483-4069-b721-d2e9a456388e",
"connectionMetrics": {
"inbound": null,
"outbound": {
"dispatched": {
"success": {
"PT1M": 0,
"PT1H": 3558,
"PT24H": 3558,
"lastMessageAt": "2023-04-17T12:54:03.480Z"
},
"failure": {
"PT1M": 0,
"PT1H": 2948,
"PT24H": 2948,
"lastMessageAt": "2023-04-17T12:54:03.484Z"
}
},
"filtered": {
"success": {
"PT1M": 0,
"PT1H": 3558,
"PT24H": 3558,
"lastMessageAt": "2023-04-17T12:54:03.480Z"
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"mapped": {
"success": {
"PT1M": 0,
"PT1H": 610,
"PT24H": 610,
"lastMessageAt": "2023-04-17T12:54:05.199Z"
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"dropped": {
"success": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"published": {
"success": {
"PT1M": 0,
"PT1H": 610,
"PT24H": 610,
"lastMessageAt": "2023-04-17T12:54:05.250Z"
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"acknowledged": {
"success": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
}
}
},
"containsFailures": true,
"sourceMetrics": {
"addressMetrics": null
},
"targetMetrics": {
"addressMetrics": {
"opentwins2/{{ topic:channel }}/{{ topic:criterion }}/{{ thing:namespace }}/{{ thing:name }}": {
"dispatched": {
"success": {
"PT1M": 0,
"PT1H": 3558,
"PT24H": 3558,
"lastMessageAt": "2023-04-17T12:54:03.480Z"
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"filtered": {
"success": {
"PT1M": 0,
"PT1H": 3558,
"PT24H": 3558,
"lastMessageAt": "2023-04-17T12:54:03.480Z"
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"mapped": {
"success": {
"PT1M": 0,
"PT1H": 610,
"PT24H": 610,
"lastMessageAt": "2023-04-17T12:54:05.199Z"
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"dropped": {
"success": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"published": {
"success": {
"PT1M": 0,
"PT1H": 610,
"PT24H": 610,
"lastMessageAt": "2023-04-17T12:54:05.250Z"
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"acknowledged": {
"success": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
}
},
"_responses": {
"dispatched": {
"success": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
},
"failure": {
"PT1M": 0,
"PT1H": 2948,
"PT24H": 2948,
"lastMessageAt": "2023-04-17T12:54:03.484Z"
}
},
"filtered": {
"success": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"mapped": {
"success": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"dropped": {
"success": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"published": {
"success": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
},
"acknowledged": {
"success": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
},
"failure": {
"PT1M": 0,
"PT1H": 0,
"PT24H": 0,
"lastMessageAt": null
}
}
}
}
}
}
values.yaml
connectivity:
extraEnv:
- name: MQTT_CONSUMER_THROTTLING_ENABLED
value: "false"
- name: MQTT_CONSUMER_THROTTLING_LIMIT
value: "100000"
- name: KAFKA_CONSUMER_THROTTLING_ENABLED
value: "false"
- name: KAFKA_CONSUMER_THROTTLING_LIMIT
value: "100000"
- name: CONNECTIVITY_MQTT_MAX_QUEUE_SIZE
value: "100000"
- name: CONNECTIVITY_KAFKA_MAX_QUEUE_SIZE
value: "100000"
resources:
requests:
cpu: 200m
limits:
memory: "2Gi"
gateway:
resources:
requests:
cpu: 200m
limits:
memory: "768Mi"
nginx:
service:
type: NodePort
nodePort: 30525
resources:
requests:
cpu: 50m
limits:
cpu: 150m
memory: "32Mi"
policies:
resources:
requests:
cpu: 200m
limits:
memory: "768Mi"
swaggerui:
enabled: false
things:
resources:
requests:
cpu: 200m
limits:
memory: "768Mi"
thingsSearch:
resources:
requests:
cpu: 200m
limits:
memory: "768Mi"
mongodb:
enabled: false
Thanks in advance!