0

I am trying to create a web-worker logic into a react custom hook, but unfortunately i noticed that memory usage is gradual increasing. After a research, i found out that in order to transfer large data between web-workers and main thread,a good practice is to use transferable objects. I tried to add transferable objects, but every time i get following errors:

// postMessage(arrayBuffer , '/', [arrayBuffer]) error:
Uncaught TypeError: Failed to execute 'postMessage' on 'DedicatedWorkerGlobalScope': Overload resolution failed.
// postMessage(arrayBuffer, [arrayBuffer]) error:
Uncaught DOMException: Failed to execute 'postMessage' on 'DedicatedWorkerGlobalScope': Value at index 0 does not have a transferable type.

Any ideas how I can solve that problem (any alternative solutions or any possible web worker improvements) and where the problem is?

. web-worker main job:

  • connect to a mqtt client
  • subscribe to topics
  • listen to changes for every topic, store all values into a object and every 1 second send stored topics data object to main thread (notice that data is large)

custom hook main job:

  • create a web-worker,
  • in every onmessage event, update redux store
// react custom hook code

import React, { useEffect, useRef } from 'react';
import { useDispatch, useSelector } from 'react-redux';
import { setMqttData } from 'store-actions';

const useMqttService = () => {
  const dispatch = useDispatch();
  const topics = useSelector(state => state.topics);

  const workerRef = useRef<Worker>();

  useEffect(() => {
    workerRef.current = new Worker(new URL('../mqttWorker.worker.js', import.meta.url));

    workerRef.current.postMessage({ type: 'CONNECT', host: 'ws://path ...' });

    workerRef.current.onmessage = (event: MessageEvent): void => {
      dispatch(setMqttData(JSON.parse(event.data)));
      // dispatch(setMqttData(bufferToObj(event.data)));
    };

    return () => {
      if (workerRef.current) workerRef.current.terminate();
    };
  }, [dispatch]);

  useEffect(() => {
    if (workerRef.current) {
      workerRef.current.postMessage({ type: 'TOPICS_CHANGED', topics });
    }
  }, [topics ]);

  return null;
};
// web-worker, mqttWorker.worker.js file code
import mqtt from 'mqtt';

export default class WorkerState {
  constructor() {
    this.client = null;
    this.topics = [];
    this.data = {};
    this.shareDataTimeoutId = null;
  }

  tryConnect(host) {
    if (host && !this.client) {
      this.client = mqtt.connect(host, {});
    }

    this.client?.on('connect', () => {
      this.data.mqttStatus = 'connected';
      trySubscribe();
    });

    this.client?.on('message', (topic, message) => {
      const value = JSON.parse(message.toString());
      this.data = { ...this.data, [topic]: value };
    });
  }

  trySubscribe() {
    if (this.topics.length > 0) {
      this.client?.subscribe(this.topics, { qos: 0 }, err => {
        if (!err) {
          this.tryShareData();
        }
      });
    }
  }

  tryShareData() {
    clearTimeout(this.shareDataTimeoutId);

    if (this.client && this.topics.length > 0) {
      postMessage(JSON.stringify(this.data));

     // Attemp 1, error:
     // Uncaught TypeError: Failed to execute 'postMessage' on    
     // 'DedicatedWorkerGlobalScope': Overload resolution failed.

     // const arrayBuffer = objToBuffer(this.data);
     // postMessage(arrayBuffer , '/', [arrayBuffer]);

     // Attemp 2, error:
     // Uncaught DOMException: Failed to execute 'postMessage' on 
     // 'DedicatedWorkerGlobalScope': Value at index 0 does not have a transferable type.

     // const arrayBuffer = objToBuffer(this.data);
     // postMessage(arrayBuffer, [arrayBuffer]);


      this.shareDataTimeoutId = setTimeout(() => {
        this.tryShareData();
      }, 1000);
    }
  }

  onmessage = (data) => {
    const { type, host = '', topics = [] } = data;

    if (type === 'CONNECT_MQTT') {
      this.tryConnect(host);
    } else if (type === 'TOPICS_CHANGED') {
      this.topics = topics;
      this.trySubscribe();
    }
  };
}

const workerState = new WorkerState();

self.onmessage = (event) => {
  workerState.onmessage(event.data);
};
// tranform functions

function objToBuffer(obj) {
  const jsonString = JSON.stringify(obj);
  return Buffer.from(jsonString);
}

function bufferToObj(buffer) {
  const jsonString = Buffer.from(buffer).toString();
  return JSON.parse(jsonString);
}
  • You need to use array buffer, not buffer I guess https://stackoverflow.com/questions/6965107/converting-between-strings-and-arraybuffers – Konrad Jan 02 '23 at 17:41
  • 1
    thanks a lot, i update tranform functions and all works fine , for the moment, no memory increase yet!!! – Manolis Thalassinos Jan 02 '23 at 18:11

1 Answers1

0

i update tranform functions

function objToBuffer(obj){
  // const jsonString = JSON.stringify(obj);
  // return Buffer.from(jsonString);
  const jsonString = JSON.stringify(obj);
  const uint8_array = new TextEncoder().encode(jsonString);
  const array_buffer = uint8_array.buffer;
  return array_buffer;
}

function bufferToObj(array_buffer) {
  // const jsonString = Buffer.from(array_buffer).toString();
  // return JSON.parse(jsonString);

  const decoder = new TextDecoder('utf-8');
  const view = new DataView(array_buffer, 0, array_buffer.byteLength);
  const string = decoder.decode(view);
  const object = JSON.parse(string);
  return object;
}

in web-worker file add

 const arrayBuffer = objToBuffer(this.data);
 postMessage(arrayBuffer, [arrayBuffer]);

finally in custom hook add in onmessage

dispatch(setMqttData(bufferToObj(event.data)));