0

I have created an Arduino application that runs on an Arduino MKR 1010. After connecting to the WiFi, the device initially subscribes to an MQTT Topic designated for itself, and through this topic it is informed of the other Topics it should subscribe to. The payload of these subsequent JSON messages are approximately 2kb in size, the last one I tested was 1.91kb. The information is processed and represented in the form of coloured lights. As new Topics come online or go offline, it is notified of this information and adjusts its subscriptions accordingly.

Expected Behaviour

The application should operate correctly - subscribing and unsubscribing to Topics as it is notified - and processing the payloads of Topics in the form of coloured LEDS. These JSON payloads should be cleared after processing, releasing memory for any subsequent workload.

Actual Behaviour

After working for a period of time (this can vary), the application ceases to either process a payload fully, or stops working altogether and requires a hard reset to communicate with device again.

Below is the code as it is - I hope this help - I've commented the code to make it clearer where and why it is performing a given function. I have tried changing the Dynamic Json Document to various sizes - and it seems for the size of payload this is optimal - though I would be gladly advised otherwise.

#include <SPI.h>
#include <WiFiNINA.h>
#include <ArduinoMqttClient.h>
#include <Adafruit_NeoPixel.h>
#include <ArduinoJson.h>
#include <MemoryFree.h>
#include <pgmStrToRAM.h>

// Networking Values
WiFiClient wlan;
int status = WL_IDLE_STATUS;
char ssid[] = "";        //  network SSID (name)
char pass[] = "";    // network password (use for WPA, or use as key for WEP)
int keyIndex = 0;  

// Subscription list for MQTT Subscriptions
MqttClient    mqttClient(wlan);
String subscriptions[10] ;

// Lighting Values
int dataPin = 5; 
#define NUMPIXELS          30
Adafruit_NeoPixel pixel = Adafruit_NeoPixel(NUMPIXELS, dataPin, NEO_GRB + NEO_KHZ800);
int lights[] = {0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29};

typedef struct {
  uint8_t red;
  uint8_t green;
  uint8_t blue;
} pixel_config_t;

pixel_config_t currentLights[NUMPIXELS];


// Setup - Initialises the light array to give non-sequential illumination
void setup() {
  Serial.begin(57600);

  pixel.begin();
  
  const size_t n = sizeof(lights) / sizeof(lights[0]);
  
  for (size_t i = 0; i < n - 1; i++) {
      size_t j = random(0, n - i);
      int t = lights[i];
      lights[i] = lights[j];
      lights[j] = t;
  }

  // Initialise lights with white light
  for(int counter = 0; counter < NUMPIXELS; counter++) {
    changePixelColor(lights[counter], 255, 255, 255);
  }
  
  mqttClient.onMessage(onMqttMessage);
}

// Loop - checks for the connectivity - and polls the MQTT server for new messages every 3 seconds
void loop() {
  if (WiFi.status() != WL_CONNECTED) {
    connectWiFi();
  }

  if (!mqttClient.connected()) {
    // MQTT client is disconnected, connect
    connectMQTT();
  }

  mqttClient.poll();
  
  delay(3000);
}

// On receipt of MQTT message - process either a notification of a new Topic to subscribe to - or process a publication to a subscribed Topic
void onMqttMessage(int messageSize) {
  Serial.println(F("Received Mqtt Message"));
  
  DynamicJsonDocument doc(12000);
  deserializeJson(doc, mqttClient);

  String type = doc["type"];

  // if this is a message notifying of a new Topics that has come online - unsubscribe to those in our list that are no longer on this payload - and subscribe to the new ones
  if(type == "subscription_notification") {
    JsonArray query_uuids = doc["query_uuids"].as<JsonArray>();

    for(JsonVariant currentUuid: query_uuids) {
      String currentUuidAsStr = currentUuid.as<String>();
      if(!isInStringArray(currentUuidAsStr, subscriptions)) {
        subscribeToNewTopic(currentUuidAsStr);
      }
    }
    for(String existingSubscription: subscriptions) {
      if(existingSubscription && existingSubscription != "" && !isInJsonVariantArray(existingSubscription, query_uuids))  {
        unsubscribeToTopic(existingSubscription);
      }
    }
  }
  // Else this is a notification of new colourings of lights - iterate through the list of lights changing the LED to the corresponding number in the payload
  else {
    JsonArray spreadArray = doc["spreadProportions"].as<JsonArray>();
    int lightNumber = 0;
    
    for(JsonObject currentProportion : spreadArray) {
      uint8_t red = currentProportion["red"].as<int>();
      uint8_t green = currentProportion["green"].as<int>();
      uint8_t blue = currentProportion["blue"].as<int>();
      changePixelColor(lights[lightNumber], red, green, blue);
      lightNumber++;
    }

    Serial.println(F("Completed Update."));
  }

  // Clear JSON Document
  doc.clear();
}

// Check if the UUID is in our current List of topics
bool isInStringArray(String uuidToQuery, String arrayToQuery[]) {
  for(int counter = 0; counter < sizeof(arrayToQuery) ; counter++) {
    if(arrayToQuery[counter] == uuidToQuery) {
      return true;
    }
  }
  return false;
}

// Check if a UUID is included in the incoming Json Array 
bool isInJsonVariantArray(String uuidToQuery, JsonArray arrayToQuery) {
   for(JsonVariant currentUuid: arrayToQuery) {
    String currentUuidAsStr = currentUuid.as<String>();
    if(currentUuidAsStr == uuidToQuery) {
      return true;
    }
  }
  return false;
}

// Subscribe to this new Topic and add it to our list
void subscribeToNewTopic(String topicname) {
  if((!topicname) || topicname == "null") {
    return;
  }
  Serial.println(F("Subscribing to new topic"));
  mqttClient.subscribe("query/" + topicname, 2);
  // add to the first element that has no value
  int counter = 0;
  for(String subscription : subscriptions) { 
    if(subscription == "") {
      break;
    }
    counter++;
  }
  subscriptions[counter] = topicname;
}

// Unsubscribe to a Topic
void unsubscribeToTopic(String topicname) {
  Serial.println(F("Unsubscribing to old topic"));
  mqttClient.unsubscribe("query/" + topicname);
  for(int counter = 0; counter < sizeof(subscriptions) ; counter++) {
    if(subscriptions[counter] == topicname) {
      subscriptions[counter] = "";
      break;
    }
  }
}

// Change the pixel color gradually from the current color
void changePixelColor(int pixelNum, uint8_t r, uint8_t g, uint8_t b ) {
  //int pixelNum = *(int*) pvParameters;

  uint8_t startR = currentLights[pixelNum].red;
  uint8_t startG = currentLights[pixelNum].green;
  uint8_t startB = currentLights[pixelNum].blue;

  while ((startR != r) || (startG != g) || (startB != b)){  // while the curr color is not yet the target color
    if (startR < r) startR++; else if (startR > r) startR--;  // increment or decrement the old color values
    if (startG < g) startG++; else if (startG > g) startG--;
    if (startB < b) startB++; else if (startB > b) startB--;
    uint32_t color = pixel.gamma32(pixel.Color(startR, startG, startB));
    pixel.setPixelColor(pixelNum, color);  
    pixel.show();
    currentLights[pixelNum].red = startR;
    currentLights[pixelNum].green = startG;
    currentLights[pixelNum].blue = startB;

    delay(3);  // add a delay if its too fast
  }
}

// Connect to the Wifi
void connectWiFi() {
  Serial.print("Attempting to connect to WiFi");
  while (WiFi.begin("", "") != WL_CONNECTED) {
    // failed, retry
    delay(5000);
  }
  Serial.println("Connected.");
}

// Connect to the MQTT Server - subscribing to the base Topic associated with this device
void connectMQTT() {
  mqttClient.setId("divnr-29715938");
  mqttClient.setUsernamePassword("", "");
  mqttClient.setKeepAliveInterval(5000);

  while (!mqttClient.connect("", 1883)) {
    // failed, retry
    Serial.print(".");
    delay(2000);
  }
  
  Serial.println(F("Reconnected"));
  // subscribe to a topic
  mqttClient.subscribe("device/7e887232-9cf1-453c-b352-647e9ccce11a", 2);
}
Adam Bell
  • 23
  • 4
  • How fast are the MQTT messages coming in? the 'onMqttMessage()' function is interrupt driven, and each new message is going to take up memory and stack space....if that runs out or goes over, you will be locking up. It might be interesting to see a count of subs & unsubs too....maybe a certain number is reached and it locks up? – JD Allen Apr 05 '21 at 15:19
  • I use the older PubSub library, which also locks up from time to time with no discernible trigger, but I also used a watchdog timer on my Adafruit Feather boards which gets around the issue.....but then again, I only use them as sensors, not displays....I just use RPis and small HDMI screens for that purpose. – JD Allen Apr 05 '21 at 15:22
  • Messages are coming in infrequently - as in say around 1 new message informing the device to subscribe/unsubscribe every hour - and a new input from one of those topics arriving a few times an hour. I would use RPis - except this I think would be overkill for the simple function this has to fulfil. It's such a small number of requests - that it appears that it is falling down either a) in the middle of processing a large request - which is unlikely given the size b) two requests are attempted at the same time - and this moment needing some level of asynchronicity is causing it to fall over. – Adam Bell Apr 05 '21 at 16:17

0 Answers0