3

I have a problem i dont seem to be able to solve myself. I have Process1 that calculate data in a while-loop. This process has to be executed as fast as possible. I need the data calculated in Process1 for later analysis and writing to a file is to slow.

I have never worked with IPC but thought it was a good way to store the data from Process1 in memory and access it from another Process2 (separate program) that is not time critical and write date to file.

I have created my little test program (to learn about IPC) so:

  1. Process1 will run even if Process2 is not accessible - it will then skip IPC and just execute
  2. When running Process2 it will wait for Process1 - fetch data if Process1 starts-up, and then later write to disk.
  3. Process2 will only fetch x-amount of data (maxRunTime) in below 10 samples.

The current programs i have created are terrible slow, when sending messages over IPC it is 6 times slower. Currently i only pass three floats at each "TimeStep" but this could be 100. And RunTime could be 10.000.

To do: I would be pleased if someone could guide me in the right direction. Below code is working, it might be luck as it is not pretty.

I need to find a solution that is as fast as possible, but doesnt have to be realtime. As im not a pro-programmer i also need to compromise the complexity as i need to understand what im doing.

Hope someone can help.

Code:

  1. Using Boost.1.59 and MSVC 11.0_x86
  2. Two separate programs - ConsoleApps

Process1:

#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>
#include <iostream>
#include <vector>
#include <windows.h>
#include <string>
#include <ctime>
#include <iostream>
#include <fstream>
#include <map>
#include <stdio.h>
#include <conio.h>
#include <tchar.h>
#include <time.h>


#pragma comment(lib, "user32.lib")

using namespace std;
using namespace boost::interprocess;
using namespace boost::posix_time;
using boost::posix_time::microsec_clock; 


bool InitCreateMsgQ()
{
    bool initOK = false;
    //Create a msgQ for parsing data
    try
    {
        message_queue::remove("msgQData");
        //Create a message_queue.
        message_queue mqData
        (open_or_create     //create q 
        ,"msgQData"         //name
        ,1000000                //max message number
        ,sizeof(float)      //max message size
        );
        initOK = true;
    }
    catch(interprocess_exception &ex)
    {
        return false;
    }
//Create State
    try
    {
        message_queue::remove("msgState");
        //Create a message_queue.
        message_queue mqState
        (open_or_create     //create q 
        ,"msgState"     //name
        ,1                  //max message number
        ,sizeof(int)        //max message size
        );
        initOK = true;
    }
    catch(interprocess_exception &ex)
    {
        return false;
    }
    return initOK;
}
bool SetState(int state)
{
    bool timeout = true;
    try
    {
        //Open a message queue.
        message_queue mqState
        (open_only       //only oepn q
        ,"msgState"  //name
        );

        timeout = !mqState.timed_send(&state, sizeof(int), 0, 
                                        ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(100));
    }
    catch(interprocess_exception &ex)
    {
        message_queue::remove("msgState");
        timeout = true;
    }
    return timeout;
}
bool SetData(float data)
{
    bool timeout = true;
    try
    {
        //Open a message queue.
        message_queue mqData
        (open_only       //only oepn q
        ,"msgQData"  //name
        );

        timeout = !mqData.timed_send(&data, sizeof(float), 0, 
                                        ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1));
        //mqData.send(&data, sizeof(float), 0);
    }
    catch(interprocess_exception &ex)
    {
        message_queue::remove("msgQData");
        timeout = true;
    }
    return timeout;
}

int main ()
{
    time_t start,end;

    int runTime = 0; //just for testing
    int dummyState = 2;
    float x;
    int state = 0;
    if (InitCreateMsgQ()){state = 1;} //If all msQ ok set state 1
    if (SetState(state)){state = 0;}// If timeout to set state go to state 0
    //Do twice to get error if observer is not started
    if (SetState(dummyState)){state = 0;}// Set Dummy state for obersver
                                         // If timeout to set state go to state 0

    time (&start);
    //Runtime!
    while(runTime<1000)
    {
        switch (state) 
        {
            case 0:
                state = 0;//force next state 0 - should not be needed
                //Do nothing and break loop if monitor tool is not ready                
                break;
            case 1:
                state = 1;
                cout << "Try SEND DATA" << endl;
                for (int i = 0; i < 3; i++)
                {
                    x = rand() % 100;
                    if (SetData(x)){state = 0;}
                }               
                break;
            default:
                break;
        }
        runTime++;
        cout << "runTime: " << runTime <<" state: " << state << endl;
    }

    message_queue::remove("msgQData");
    message_queue::remove("msgState");
    cout << "done - state: " << state << endl;

    time (&end);
    double dif = difftime (end,start);
    printf ("Elasped time is %.2lf seconds.", dif );

    getchar();
}

Process2:

#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>
#include <iostream>
#include <vector>
#include <windows.h>
#include <string>
#include <ctime>
#include <iostream>
#include <fstream>
#include <map>
#include <stdio.h>
#include <conio.h>
#include <tchar.h>
#include <time.h>


#pragma comment(lib, "user32.lib")

using namespace std;
using namespace boost::interprocess;
using namespace boost::posix_time;
using boost::posix_time::microsec_clock; 

ofstream debugOut;      // Output file for debug    (DEBUG)

int getState()
{
    int state = 0;
    bool timeout = true;
    try
    {
        //Open a message queue.
        message_queue mqState
        (open_only       //only oepn q
        ,"msgState"  //name
        );

        unsigned int priority;
        message_queue::size_type recvd_size;

        timeout = !mqState.try_receive(&state, sizeof(state), recvd_size, priority);    
    }
    catch(interprocess_exception &ex)
    {
        timeout = true;
    }

    if(timeout){state = 0;}

    return state;
}
float getData()
{
    float Data = -123456;
    bool timeout = true;
    try
    {
        //Open a message queue.
        message_queue mqData
        (open_only       //only oepn q
        ,"msgQData"  //name
        );

        unsigned int priority;
        message_queue::size_type recvd_size;

        //Receive the data
        //mqData.try_receive(&Data, sizeof(Data), recvd_size, priority);
        timeout = !mqData.timed_receive(&Data, sizeof(Data), recvd_size, priority,
                                        ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(10));
    }
    catch(interprocess_exception &ex)
    {
        timeout = true;
    }

    if(timeout){Data = -123456;}

    return Data;
}

int main ()
{
    int state = 0;
    int maxRunTime = 10;
    float Data;
    float DataArray[100000];

    debugOut.open("IPCWriteTest.txt", std::ios::trunc);
    debugOut.close();

    while(true)
    {
        switch (state) 
        {
            case 0: 
                //Do nothing - data not ready state
                if(getState() == 1)
                {
                    state = 1;
                    cout << "State: 1" <<endl;
                } //If all msQ ok set state 1
                else{state = 0;}
                break;
            case 1:
                for (int runTime = 0; runTime < maxRunTime; runTime++)
                {
                    cout << "runTime: " << runTime << " Data: ";
                    for (int i = 0; i < 3; i++)
                    {
                        Data = getData();
                        cout << Data << "   ";
                        DataArray[runTime]=Data;
                    }   
                    cout << endl;
                }

                debugOut.open("IPCWriteTest.txt", std::ios::app);
                for (int runTime = 0; runTime < maxRunTime; runTime++)
                {
                    debugOut << "runTime: " << runTime << " Data: ";
                    for (int i = 0; i < 3; i++)
                    {
                        debugOut << DataArray[runTime] << " ";

                    }   
                    debugOut << endl;
                }
                debugOut.close();
                state = 0;
                break;
            default:
                break;
        }
    }

    std::cout << "done" << endl;
    getchar();
}
ThomasG
  • 113
  • 1
  • 10
  • Consider using threads rather than processes for this. – Paul R Sep 10 '15 at 09:51
  • How many consuming/producing processes will there be? – sehe Sep 10 '15 at 10:16
  • There will only be one producing and one consuming. The idea is simply to have the process1 running as fast as possible to calculate the data, and then have another process2 to manipulate it, in this case write to file. They have to be two different programs. – ThomasG Sep 10 '15 at 10:18
  • You could look at `spsc_queue` and shared memory: http://stackoverflow.com/questions/22207546/shared-memory-ipc-synchronization-lock-free/22209595#22209595 – sehe Sep 10 '15 at 10:25
  • Why are you using a queue at all? Why not put the data in shared memory and control access with a mutex or semaphore? – Mark Setchell Sep 10 '15 at 10:35
  • use one process only. use thread pool. use async IO. use CRITICAL_SECTION instead of standard synchronization object. the speed will blow your head off – David Haim Sep 10 '15 at 10:51

2 Answers2

3

You are opening the queue for each operation.

You should try opening once and passing a reference to all related code (typically you would store it as a member in a class).

Also, having separate queues is recipe for slowness. It seems to me you're "abusing" mqState as an interprocess::condition_variable or semaphore:

Translating exceptions to tedious error codes like that is not very productive anyways. You're doing, manually, what exception handling should be doing.

Also, the fact that you trace debug messages to standard output will vastly slow down the program, especially on Windows

Notes on the observer

The same things go, and also for the debugOutput file which should probably not be reopened continuously.

It's strange to be "hardlooping" in triples. If it's a queue, just pop 1 message at a time. If a message "logically" consists of three floats, send messages containing three floats. Right now I even think this is a bug:

            for (int i = 0; i < 3; i++) {
                data = getData();
                std::cout << data << "   ";
                DataArray[runTime] = data;
            }

It assigns three different values to the same index (runTime)...

Simplified code

Code for the producer after I "reviewed it" (cleaned it up):

Live1 On Coliru

#include <boost/date_time.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <fstream>
#include <algorithm>
#include <iterator>
#include <iostream>
#include <map>
#include <string>
#include <vector>

namespace bip = boost::interprocess;
namespace pt  = boost::posix_time;

struct QueueLogic {

    bool forced_remove = bip::message_queue::remove("msgQData");
    bip::message_queue mqData{ bip::open_or_create, "msgQData", 1000000, sizeof(float) };

    bool SetData(float data) {
        return !mqData.timed_send(&data, sizeof(float), 0, pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(1));
    }
};

#include <boost/chrono.hpp>
#include <boost/chrono/chrono_io.hpp>
using Clock = boost::chrono::high_resolution_clock;

int main() {
    std::vector<float> pre_calculated;
    std::generate_n(back_inserter(pre_calculated), 10000*100, [] { return rand()%100; });

    auto start = Clock::now();

    try {
        QueueLogic instance;

        for (auto v : pre_calculated)
            instance.SetData(v);

    } catch(std::exception const& e) {
        std::cout << "Exception thrown: " << e.what() << "\n";
        bip::message_queue::remove("msgQData");
        throw;
    }

    auto end = Clock::now();
    std::cout << boost::chrono::duration_cast<boost::chrono::milliseconds>(end-start) << "\n";
}

Code for the consumer:

Live1 On Coliru

#include <iostream>
#include <fstream>
#include <vector>

#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>

using namespace std;
namespace bip = boost::interprocess;
namespace pt  = boost::posix_time;

#include <boost/chrono.hpp>
#include <boost/chrono/chrono_io.hpp>
using Clock = boost::chrono::high_resolution_clock;

struct ObserverLogic {

    bip::message_queue mqData{bip::open_only, "msgQData"};

    float getData() {
        float data;
        bip::message_queue::size_type recvd_size;
        unsigned int priority;
        if (!mqData.timed_receive(&data, sizeof(data), recvd_size, priority,
                                  pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(10))) 
        {
            throw std::runtime_error("timeout in timed_receive");
        }

        return data;
    }
};

int main() {
    std::vector<float> DataArray;
    DataArray.reserve(100000);

    ObserverLogic instance;

    try {
        while (DataArray.size() <= 100000) {
            DataArray.push_back(instance.getData());
        }
    } catch (std::exception const &e) {
        std::cout << "Exception caught: " << e.what() << "\n";
    }

    std::cout << "Received " << DataArray.size() << " messages\n";
    std::copy(DataArray.begin(), DataArray.end(), std::ostream_iterator<float>(std::cout, "; "));

    std::cout << "\n\ndone" << std::endl;
}

Notes

Live1 - shared memory is not allowed on Coliru

sehe
  • 374,641
  • 47
  • 450
  • 633
  • Adding more notes as I'm reviewing the code more: https://www.livecoding.tv/sehe/ – sehe Sep 10 '15 at 10:35
  • I've added the code after I reviewed it ([vid of the livestream](https://www.livecoding.tv/video/reviewing-boost-interprocess-message-queue/)). The code performed pretty well. The synchronization was not required. If you feel you require it still, just use a synchronization primitive instead of a full queue. – sehe Sep 10 '15 at 11:20
  • (in case you wanted to do debug output to a file again: [coliru](http://coliru.stacked-crooked.com/a/9b0a0da9dfcb1361)) – sehe Sep 10 '15 at 11:26
  • Code looks very cool i will try to play with it and have ago, however the MSVC11 compiler don't support all c++11 features so i will install newer version to check. However i think your code will probably run very slow in my setup too, due to the producer. I need a producer that will write data to "memory/msgQ" each time step. Calculate data -> Save to memory->calc new data-> save to memory and so on. The second issue is that the consumer will never know how much data there is saved, but that is another issue that i can solve :) – ThomasG Sep 12 '15 at 06:15
  • Producer will have "two" functions, one there will initialise memoryblock open msqQ or what is needed.. the second function will be called x-times and at each timestep write data to memory. Let's say the function two is supposed to run 100 times and save 100 variables to memory. If the code for some reason only runs 33 times that data should be in the memory and accessed by the consumer. Producer writing to memory is where speed is critical. Processing time at consumer is not important. Hope it make all sense, and i will play with you code, think it looks very neat! – ThomasG Sep 12 '15 at 07:25
  • Code is working as it "should", i have modified your great example to match my needs! Thank you very much! – ThomasG Sep 12 '15 at 14:26
  • @ThomasG Does it run faster? Does it run fast enough? – sehe Sep 12 '15 at 19:01
  • unfortunately it seems not to be fast enough, i have calculated that i takes around ~1ms pr. 100messages, i would like to speed it up by a factor 50 to 100. Would you recommend to use shared memory instead? – ThomasG Sep 13 '15 at 13:21
0

Please fint my updated code below: Compiling using MSVC14.

I only have one issue now. If i close my consumer while producer is running it stops? Don't know why.

Producer

#include <boost/date_time.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <fstream>
#include <algorithm>
#include <iterator>
#include <iostream>
#include <map>
#include <string>
#include <vector>
#include <time.h>
#include <windows.h>

namespace bip = boost::interprocess;
namespace pt = boost::posix_time;

struct QueueLogic 
{
    //DataConfig Setup
    bool forced_removeDataConfig = bip::message_queue::remove("msgDataConfig");
    bip::message_queue mqDataConfig{ bip::open_or_create, "msgDataConfig", 2, sizeof(float) };

    bool SetDataConfig(float data) {
        return !mqDataConfig.timed_send(&data, sizeof(float), 0, pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(1));
    }

    //Data Setup
    bool forced_remove = bip::message_queue::remove("msgQData");
    bip::message_queue mqData{ bip::open_or_create, "msgQData", 1000000, sizeof(float) };

    bool SetData(float data) {
        return !mqData.timed_send(&data, sizeof(float), 0, pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(1));
    }
};



int main() 
{
    time_t start, end;
    time(&start);

    float noVarsToMonitor = 10.f;
    float minTimeStep = 1.f;// 0.001f;

    std::vector<float> pre_calculated;
    std::vector<float> data_config;

    //Set Vars to monitor
    data_config.push_back(noVarsToMonitor); //Add noVars as first param in vector
    data_config.push_back(minTimeStep); //Add noVars as first param in vector

    //Parse parameters into vector
    std::generate_n(back_inserter(pre_calculated), noVarsToMonitor, [] { return rand() % 100; });

    //Create instance of struct
    QueueLogic instance;

    //Setup data config
    try
    {       
        for (auto v : data_config)
        {
            instance.SetDataConfig(v);
        }
    }
    catch (std::exception const& e)
    {
            std::cout << "Exception thrown: " << e.what() << "\n";
            bip::message_queue::remove("msgQData");
            bip::message_queue::remove("msgDataConfig");
            throw;
    }

    //Get Data
    for (size_t i = 0; i < 1000; i++) //simulate that code will be called 1000 times after data is recalculated
    {
        try
        {

            for (auto v : pre_calculated)
            {
                instance.SetData(v);
            }
            std::cout << "case: " << i << std::endl;
            Sleep(20); //sleep to test code including consumer
        }
        catch (std::exception const& e)
        {
            std::cout << "Exception thrown: " << e.what() << "\n";
            bip::message_queue::remove("msgQData");
            bip::message_queue::remove("msgDataConfig");
            throw;
        }
    }

    time(&end);
    double dif = difftime(end, start);
    printf("Elasped time is %.2lf seconds.", dif);

    getchar();
}

Consumer:

#include <iostream>
#include <fstream>
#include <vector>
#include <windows.h>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>

using namespace std;
namespace bip = boost::interprocess;
namespace pt = boost::posix_time;

struct ObserverLogic 
{
    //Get Config Data
    bip::message_queue mqDataConfig{ bip::open_only, "msgDataConfig" };

    float getDataConfig()
    {
        float data;
        bip::message_queue::size_type recvd_size;
        unsigned int priority;
        if (!mqDataConfig.timed_receive(&data, sizeof(data), recvd_size, priority,
            pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(250)))
        {
            throw std::runtime_error("timeout in timed_receive");
        }
        return data;
    }

    //Get Var Data
    bip::message_queue mqData{ bip::open_only, "msgQData" };

    float getData() 
    {
        float data;
        bip::message_queue::size_type recvd_size;
        unsigned int priority;
        if (!mqData.timed_receive(&data, sizeof(data), recvd_size, priority,
            pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(250)))
        {
            throw std::runtime_error("timeout in timed_receive");
        }
        return data;
    }
};

int main() {
    std::vector<float> DataArray;
    int count = 0; 
    float maxMonitorTime = 10.f;
    DataArray.reserve(100000);

    //Fetch this from Producer
    float noVarsToMonitor = 0.f; 
    float minTimeStep = 0.f;
    float maxSimSamples = 0.f;

    while (true)
    {
        try
        {
            ObserverLogic instance;

            //Get Numbers of vars to monitor - used another thread!
            noVarsToMonitor = instance.getDataConfig();
            minTimeStep = instance.getDataConfig();
            maxSimSamples = (noVarsToMonitor*(maxMonitorTime * floor((1 / minTimeStep) + 0.5)))-1;

            std::cout << "noVarsToMonitor: " << noVarsToMonitor << std::endl;
            std::cout << "minTimeStep: " << minTimeStep << std::endl;
            std::cout << "maxSimSamples: " << maxSimSamples << std::endl;

            std::ofstream ofs("IPQ_Debug.log", std::ios::trunc); //Only clear when data is ready from Producer

            //Get Var Data below here:
            try
            {
                while (DataArray.size() <= maxSimSamples)
                {
                    float value = instance.getData();
                    DataArray.push_back(value);
                    ofs << value << "; ";
                    count++;

                    if (count>noVarsToMonitor - 1) //Split Vector to match no Vars pr. Timestep
                    {
                        ofs << std::endl;
                        count = 0;
                    }
                }
                std::cout << "Received " << DataArray.size() << " messages\n";
                std::cout << "\n\ndone" << std::endl;
                std::cout << std::endl;
            }
            catch (std::exception const &e)
            {
                std::cout << "Exception caught: " << e.what() << "\n";
            }
        }
        catch (std::exception const &e)
        {
            std::cout << "Exception caught: " << e.what() << "\n";
        }
        std::cout << "Wait 5 seconds to try fetch again" << "\n";
        Sleep(5000); //sleep and wait to run loop again before looking at for msqQ
    }

    getchar();
}

Output to txt:

41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 

The output can then be plotted against the "simulation time" keeping the data in the right column and row.

It might still not be pretty code but I'm still learning and i appreciate the support i have gotten here on my first post. Please feel free to keep comment

ThomasG
  • 113
  • 1
  • 10
  • (this is not an answer) – sehe Sep 13 '15 at 22:23
  • What are you trying to achieve. You've - again - complicated the code with some kind of "out-of-band" configdata passing mechanism. This is not fine. If you really want to just send bursts of `noVarsToMonitor`, why not _send_ that instead of loose samples? This will - likely - improve the speed by a factor of ~ `noVarsToMonitor` too. – sehe Sep 13 '15 at 22:25
  • The code is to replicate my real code. i have code that runs at 0.001ms time step, for each timestep it will calculate `noVarsToMonitor`, these variables will be sent to consumer. Code will run one more timestep calculate `noVarsToMonitor` and again parse to consumer. My forloop is running 1000 times to illustrate that i will parse 10 recalculated variables 1000 times. I'm not sure how to explain but does it make sense? – ThomasG Sep 14 '15 at 10:48