0

I am using boost library to develop a asynchronous udp communication. A data received at the receiver side is being precessed by another thread. Then my problem is when I read the received data in another thread rather than the receiver thread it self it gives a modified data or updated data which is not the data that is supposed to be. My code is working on unsigned character buffer array at sender side and receiver side. The reason is I need consider unsigned character buffer as a packet of data e.g buffer[2] = Engine_start_ID

  /* global buffer to store the incomming data 
  unsigned char received_buffer[200];
  /*
     global buffer accessed by another thread
      which contains copy the received_buffer
  */
   unsigned char read_hmi_buffer[200];
   boost::mutex hmi_buffer_copy_mutex;


       void udpComm::start_async_receive() {
             udp_socket.async_receive_from(
             boost::asio::buffer(received_buffer, max_length), remote_endpoint,
             boost::bind(&udpComm::handle_receive_from, this,
             boost::asio::placeholders::error,
             boost::asio::placeholders::bytes_transferred));
      }
   /* the data received is stored in the unsigned char received_buffer data buffer*/

     void udpComm::handle_receive_from(const boost::system::error_code& error,
        size_t bytes_recvd) {
        if (!error && bytes_recvd > 0) {
            received_bytes = bytes_recvd;
                    hmi_buffer_copy_mutex.lock();
             memcpy(&read_hmi_buffer[0], &received_buffer[0], received_bytes);
                     hmi_buffer_copy_mutex.unlock();

                     /*data received here is correct  'cus i printed in the console
                      checked it
                     */
                    cout<<(int)read_hmi_buffer[2]<<endl;
       }

             start_async_receive();

       }
       /*  io_service  is running in a thread 
         */
       void udpComm::run_io_service() {
       udp_io_service.run();
        usleep(1000000);
         }

The above code is the asynchronous udp communication running a thread

/* My second thread function is */

        void  thread_write_to_datalink()
         {    hmi_buffer_copy_mutex.lock();
           /* here is my problem begins*/
              cout<<(int)read_hmi_buffer[2]<<endl;
             hmi_buffer_copy_mutex.unlock();

           /* all data are already changed */

              serial.write_to_serial(read_hmi_buffer, 6);

         }
         /* threads from my main function
          are as below */


           int main() {
                 receive_from_hmi.start_async_receive();
              boost::thread thread_receive_from_hmi(&udpComm::run_io_service,
                &receive_from_hmi);
              boost::thread thread_serial(&thread_write_to_datalink);   
              thread_serial.join();
              thread_receive_from_hmi.join();
                 return 0;
        } 

/* The Serial_manager class contains functions for writting and reading from serial port*/

  #include <iostream>
            #include <boost/thread.hpp>
            #include <boost/asio.hpp>
            #include <boost/date_time/posix_time/posix_time.hpp>


            using namespace boost::asio;

            class Serial_manager {
            public:

                Serial_manager(boost::asio::io_service &serial_io_service,char *dev_name);
                void open_serial_port();
                void write_to_serial(void *data, int size);
                size_t read_from_serial(void *data, int size);
                void handle_serial_exception(std::exception &ex);
                virtual ~Serial_manager();
                void setDeviceName(char* deviceName);

            protected:
                io_service &port_io_service;
                serial_port datalink_serial_port;
                bool serial_port_open;
                char *device_name;

            };

            void Serial_manager::setDeviceName(char* deviceName) {
                device_name = deviceName;
            }
            Serial_manager::Serial_manager(boost::asio::io_service &serial_io_service,char *dev_name):
                port_io_service(serial_io_service),
                datalink_serial_port(serial_io_service) {
                device_name = dev_name;
                serial_port_open = false;
                open_serial_port();
            }

            void Serial_manager::open_serial_port() {
                bool temp_port_status = false;
                bool serial_port_msg_printed = false;
                do {
                    try {
                        datalink_serial_port.open(device_name);
                        temp_port_status = true;
                    } catch (std::exception &ex) {
                        if (!serial_port_msg_printed) {
                            std::cout << "Exception-check the serial port device "
                                    << ex.what() << std::endl;
                            serial_port_msg_printed = true;
                        }
                        datalink_serial_port.close();
                        temp_port_status = false;
                    }

                } while (!temp_port_status);
                serial_port_open = temp_port_status;
                std::cout <<std::endl <<"serial port device opened successfully"<<std::endl;
                datalink_serial_port.set_option(serial_port_base::baud_rate(115200));
                datalink_serial_port.set_option(
                        serial_port_base::flow_control(
                                serial_port_base::flow_control::none));
                datalink_serial_port.set_option(
                        serial_port_base::parity(serial_port_base::parity::none));
                datalink_serial_port.set_option(
                        serial_port_base::stop_bits(serial_port_base::stop_bits::one));
                datalink_serial_port.set_option(serial_port_base::character_size(8));

            }

            void Serial_manager::write_to_serial(void *data, int size) {
                boost::asio::write(datalink_serial_port, boost::asio::buffer(data, size));
            }

            size_t Serial_manager::read_from_serial(void *data, int size) {
                return boost::asio::read(datalink_serial_port, boost::asio::buffer(data, size));
            }
            void Serial_manager::handle_serial_exception(std::exception& ex) {
                std::cout << "Exception-- " << ex.what() << std::endl;
                std::cout << "Cannot access data-link, check the serial connection"
                        << std::endl;
                datalink_serial_port.close();
                open_serial_port();
            }

            Serial_manager::~Serial_manager() {
                // TODO Auto-generated destructor stub
            }

I think my area of problem is about thread synchronization and notification and I will be happy if you help me. You should not worry about the sender it is works perfectly as I already checked it the data is received at the receiver thread. I hope you understand my question.

Edit: Here is the modification.My whole idea here is to develop a simulation for the Manual flight control so according my design i have client application that sends commands through udp communication. At the receiver side intended to use 3 threads. one thread receives input from sticks i.e void start_hotas() the second thread is a thread that receives commands from sender(client): void udpComm::run_io_service() and 3rd is the void thread_write_to_datalink().

              /* a thread that listens for input from sticks*/
            void start_hotas() {
        Hotas_manager hotasobj;
        __s16 event_value; /* value */
        __u8 event_number; /* axis/button number */
        while (1) {
            hotasobj.readData_from_hotas();


            event_number = hotasobj.getJoystickEvent().number;
            event_value = hotasobj.getJoystickEvent().value;
            if (hotasobj.isAxisPressed()) {
                if (event_number == 0) {
                    aileron = (float) event_value / 32767;

                } else if (event_number == 1) {
                    elevator = -(float) event_value / 32767;

                } else if (event_number == 2) {
                    rudder = (float) event_value / 32767;

                } else if (event_number == 3) {
                    brake_left = (float) (32767 - event_value) /    65534;


                } else if (event_number == 4) {


                } else if (event_number == 6) {


                } else if (event_number == 10) {


                } else if (event_number == 11) {

                } else if (event_number == 12) {



                }
            } else if (hotasobj.isButtonPressed()) {



            }
            usleep(1000);

        }

    }


            /*
     * Hotas.h
     *
     *  Created on: Jan 31, 2013
     *      Author: metec
     */

    #define JOY_DEV  "/dev/input/js0"
    #include <iostream>
    #include <boost/thread.hpp>
    #include <boost/asio.hpp>
    #include <boost/date_time/posix_time/posix_time.hpp>
    #include <linux/joystick.h>

    bool message_printed = false;
    bool message2_printed = false;

    class Hotas_manager {
    public:
        Hotas_manager();
        virtual ~Hotas_manager();
        void open_hotas_device();

        /*
         *
         * read from hotas input
         * used to the updated event data and status of the joystick from the
         * the file.
         *
         */
        void readData_from_hotas();

        js_event getJoystickEvent() {
            return joystick_event;
        }

        int getNumOfAxis() {
            return num_of_axis;
        }

        int getNumOfButtons() {
            return num_of_buttons;
        }

        bool isAxisPressed() {
            return axis_pressed;
        }

        bool isButtonPressed() {
            return button_pressed;
        }

        int* getAxis() {
            return axis;
        }

        char* getButton() {
            return button;
        }

    private:
        int fd;
        js_event joystick_event;
        bool hotas_connected;
        int num_of_axis;
        int num_of_buttons;
        int version;
        char devName[80];

        /*
         * the the variables below indicates
         * the state of the joystick.
         */
        int axis[30];
        char button[30];
        bool button_pressed;
        bool axis_pressed;
    };

    Hotas_manager::Hotas_manager() {
        // TODO Auto-generated constructor stub
        hotas_connected = false;
        open_hotas_device();
        std::cout << "joystick device detected" << std::endl;

    }

    Hotas_manager::~Hotas_manager() {
        // TODO Auto-generated destructor stub

    }

    void Hotas_manager::open_hotas_device() {
        bool file_open_error_printed = false;
        while (!hotas_connected) {
            if ((fd = open(JOY_DEV, O_RDONLY)) > 0) {
                ioctl(fd, JSIOCGAXES, num_of_axis);
                ioctl(fd, JSIOCGBUTTONS, num_of_buttons);
                ioctl(fd, JSIOCGVERSION, version);
                ioctl(fd, JSIOCGNAME(80), devName);
                /*
                 * NON BLOCKING MODE
                 */
                ioctl(fd, F_SETFL, O_NONBLOCK);
                hotas_connected = true;
            } else {
                if (!file_open_error_printed) {
                    std::cout << "hotas device not detected. check "
                            "whether it is "
                            "plugged" << std::endl;
                    file_open_error_printed = true;
                }
                close(fd);
                hotas_connected = false;
            }
        }

    }

    void Hotas_manager::readData_from_hotas() {
        int result;
        result = read(fd, &joystick_event, sizeof(struct js_event));
        if (result > 0) {
            switch (joystick_event.type & ~JS_EVENT_INIT) {
            case JS_EVENT_AXIS:
                axis[joystick_event.number] = joystick_event.value;
                axis_pressed = true;
                button_pressed = false;
                break;
            case JS_EVENT_BUTTON:
                button[joystick_event.number] = joystick_event.value;
                button_pressed = true;
                axis_pressed = false;
                break;
            }
            message2_printed = false;
            message_printed = false;

        } else {
            if (!message_printed) {
                std::cout << "problem in reading the stick file" << std::endl;
                message_printed = true;
            }
            hotas_connected = false;
            open_hotas_device();
            if (!message2_printed) {
                std::cout << "stick re-connected" << std::endl;
                message2_printed = true;
            }

        }

    }



I updated the main function to run 3 threads .




    int main() {


                boost::asio::io_service receive_from_hmi_io;
                udpComm receive_from_hmi(receive_from_hmi_io, 6012);
                receive_from_hmi.setRemoteEndpoint("127.0.0.1", 6011);
                receive_from_hmi.start_async_receive();
                boost::thread thread_receive_from_hmi(&udpComm::run_io_service,
                        &receive_from_hmi);
                boost::thread thread_serial(&thread_write_to_datalink);
                boost::thread thread_hotas(&start_hotas);
                thread_hotas.join();
                thread_serial.join();
                thread_receive_from_hmi.join();
                return 0;
                 }

The void thread_write_to_datalink() also writes the data come from the hotas_manager(joysticks).

            void thread_write_to_datalink() {

                /*
                 * boost serial  communication
                 */
                boost::asio::io_service serial_port_io;
                Serial_manager serial(serial_port_io, (char*) "/dev/ttyUSB0");   


                cout << "aileron  " << "throttle   " << "elevator   " << endl;
                while (1) { 

                        // commands from udp communication      

                      serial.write_to_serial(read_hmi_buffer, 6);

                    // data come from joystick inputs

                    //cout << aileron<<"    "<<throttle<<"    "<<elevator<< endl;
                    memcpy(&buffer_manual_flightcontrol[4], &aileron, 4);
                    memcpy(&buffer_manual_flightcontrol[8], &throttle, 4);
                    memcpy(&buffer_manual_flightcontrol[12], &elevator, 4);

                    unsigned char temp;


                    try {
                        serial.write_to_serial(buffer_manual_flightcontrol, 32);
                        //serial.write_to_serial(buffer_manual_flightcontrol, 32);
                    } catch (std::exception& exp) {
                        serial.handle_serial_exception(exp);
                    }

                    try {
                        serial.write_to_serial(buffer_payloadcontrol, 20);
                    } catch (std::exception& exp) {
                        serial.handle_serial_exception(exp);
                    }

                    usleep(100000);

                }

            }

My question is how better can I design to synchronize these 3 threads. If your answer says you do not need to use 3 threads I need you to tell me how.

BenMorel
  • 34,448
  • 50
  • 182
  • 322
Clickmit Wg
  • 523
  • 2
  • 9
  • 25
  • Please use consistent indentation in your code, it currently is difficult to understand. – Sam Miller Mar 10 '13 at 03:41
  • also post the code for `serial.write_to_serial()`, it's not obvious to me why you need multiple threads at all. – Sam Miller Mar 10 '13 at 03:41
  • i made some modifications still the data is not synchronized.could u check it out again. – Clickmit Wg Mar 11 '13 at 06:59
  • In the updated code seems you are opening a `/dev/input/js0` file descriptor then performing a blocking read in `Hotas_manager::readData_from_hotas()`. I suggest using a `posix::stream_descriptor` to handle reading from this device. As I've said previously, this will decouple [threading from concurrency](http://www.boost.org/doc/libs/release/doc/html/boost_asio/overview/core/async.html) and allow you to use a single thread. This way you can concentrate on the correctness of your program first rather than worry about synchronization. – Sam Miller Mar 14 '13 at 14:17

2 Answers2

0

You need to make your access to read_hmi_buffer synchronized.

Therefore you need a mutex (std::mutex, pthread_mutex_t, or the windows equivalent), to lock onto whenever a piece of code read or write in that buffer.

See this question for a few explanations on the concept and links to other tutorials.

Community
  • 1
  • 1
didierc
  • 14,572
  • 3
  • 32
  • 52
  • I use the mutex but it does not solve it. may be i could use seprate – Clickmit Wg Mar 11 '13 at 07:01
  • you did not protect that line: `serial.write_to_serial(read_hmi_buffer, 6);`. – didierc Mar 11 '13 at 07:12
  • also, please don't include the fix proposed by answers in your question updates, it may confuse new people. – didierc Mar 11 '13 at 07:29
  • Thanks for your comment. I think ur solution may not work.the reading in side the mutex is still wrong then what is the difference if i protect the **serial.write_to_serial(read_hmi_buffer, 6);** line if reading is still wrong. I use different buffers and flags below the line then it works fine but it may not be better solution **memcpy(&read_hmi_buffer[0], &received_buffer[0], received_bytes);** `if (read_hmi_buffer[2] == MSG_ID_RQS_START_ENGINE) { engine_started = true; // separate buffer handling for engine start }` – Clickmit Wg Mar 11 '13 at 12:28
0

Let's back up a little bit from multi-threading, your program mixes synchronous and asynchronous operations. You don't need to do this, as it will only cause confusion. You can asynchronously write the buffer read from the UDP socket to the serial port. This can all be achieved with a single thread running the io_service, eliminating any concurrency concerns.

You will need to add buffer management to keep the data read from the socket in scope for the lifetime of the async_write for the serial port, study the async UDP server as an example. Also study the documentation, specifically the requirements for buffer lifetime in async_write

buffers

One or more buffers containing the data to be written. Although the buffers object may be copied as necessary, ownership of the underlying memory blocks is retained by the caller, which must guarantee that they remain valid until the handler is called.

Once you have completed that design, then you can move to more advanced techniques such as a thread pool or multiple io_services.

Community
  • 1
  • 1
Sam Miller
  • 23,808
  • 4
  • 67
  • 87
  • Thanks! your answer is helpful. The reason why i use multi-threading is the **void thread_write_to_datalink()** also writes to serial port the data received from **joystick inputs**. rather than creating two instances of the class **Serial_manager** i want it to be one instance and with this you can write to serial port data from udpComm and joystick class. – Clickmit Wg Mar 13 '13 at 12:33
  • @Clickmit your question as currently worded makes no mention of a joystick or a joystick class. Please edit your question to include representative code such that we can accurately provide answers. – Sam Miller Mar 14 '13 at 01:52