I have a problem i dont seem to be able to solve myself. I have Process1 that calculate data in a while-loop. This process has to be executed as fast as possible. I need the data calculated in Process1 for later analysis and writing to a file is to slow.
I have never worked with IPC but thought it was a good way to store the data from Process1 in memory and access it from another Process2 (separate program) that is not time critical and write date to file.
I have created my little test program (to learn about IPC) so:
- Process1 will run even if Process2 is not accessible - it will then skip IPC and just execute
- When running Process2 it will wait for Process1 - fetch data if Process1 starts-up, and then later write to disk.
- Process2 will only fetch x-amount of data (maxRunTime) in below 10 samples.
The current programs i have created are terrible slow, when sending messages over IPC it is 6 times slower. Currently i only pass three floats at each "TimeStep" but this could be 100. And RunTime could be 10.000.
To do: I would be pleased if someone could guide me in the right direction. Below code is working, it might be luck as it is not pretty.
I need to find a solution that is as fast as possible, but doesnt have to be realtime. As im not a pro-programmer i also need to compromise the complexity as i need to understand what im doing.
Hope someone can help.
Code:
- Using Boost.1.59 and MSVC 11.0_x86
- Two separate programs - ConsoleApps
Process1:
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>
#include <iostream>
#include <vector>
#include <windows.h>
#include <string>
#include <ctime>
#include <iostream>
#include <fstream>
#include <map>
#include <stdio.h>
#include <conio.h>
#include <tchar.h>
#include <time.h>
#pragma comment(lib, "user32.lib")
using namespace std;
using namespace boost::interprocess;
using namespace boost::posix_time;
using boost::posix_time::microsec_clock;
bool InitCreateMsgQ()
{
bool initOK = false;
//Create a msgQ for parsing data
try
{
message_queue::remove("msgQData");
//Create a message_queue.
message_queue mqData
(open_or_create //create q
,"msgQData" //name
,1000000 //max message number
,sizeof(float) //max message size
);
initOK = true;
}
catch(interprocess_exception &ex)
{
return false;
}
//Create State
try
{
message_queue::remove("msgState");
//Create a message_queue.
message_queue mqState
(open_or_create //create q
,"msgState" //name
,1 //max message number
,sizeof(int) //max message size
);
initOK = true;
}
catch(interprocess_exception &ex)
{
return false;
}
return initOK;
}
bool SetState(int state)
{
bool timeout = true;
try
{
//Open a message queue.
message_queue mqState
(open_only //only oepn q
,"msgState" //name
);
timeout = !mqState.timed_send(&state, sizeof(int), 0,
ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(100));
}
catch(interprocess_exception &ex)
{
message_queue::remove("msgState");
timeout = true;
}
return timeout;
}
bool SetData(float data)
{
bool timeout = true;
try
{
//Open a message queue.
message_queue mqData
(open_only //only oepn q
,"msgQData" //name
);
timeout = !mqData.timed_send(&data, sizeof(float), 0,
ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1));
//mqData.send(&data, sizeof(float), 0);
}
catch(interprocess_exception &ex)
{
message_queue::remove("msgQData");
timeout = true;
}
return timeout;
}
int main ()
{
time_t start,end;
int runTime = 0; //just for testing
int dummyState = 2;
float x;
int state = 0;
if (InitCreateMsgQ()){state = 1;} //If all msQ ok set state 1
if (SetState(state)){state = 0;}// If timeout to set state go to state 0
//Do twice to get error if observer is not started
if (SetState(dummyState)){state = 0;}// Set Dummy state for obersver
// If timeout to set state go to state 0
time (&start);
//Runtime!
while(runTime<1000)
{
switch (state)
{
case 0:
state = 0;//force next state 0 - should not be needed
//Do nothing and break loop if monitor tool is not ready
break;
case 1:
state = 1;
cout << "Try SEND DATA" << endl;
for (int i = 0; i < 3; i++)
{
x = rand() % 100;
if (SetData(x)){state = 0;}
}
break;
default:
break;
}
runTime++;
cout << "runTime: " << runTime <<" state: " << state << endl;
}
message_queue::remove("msgQData");
message_queue::remove("msgState");
cout << "done - state: " << state << endl;
time (&end);
double dif = difftime (end,start);
printf ("Elasped time is %.2lf seconds.", dif );
getchar();
}
Process2:
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>
#include <iostream>
#include <vector>
#include <windows.h>
#include <string>
#include <ctime>
#include <iostream>
#include <fstream>
#include <map>
#include <stdio.h>
#include <conio.h>
#include <tchar.h>
#include <time.h>
#pragma comment(lib, "user32.lib")
using namespace std;
using namespace boost::interprocess;
using namespace boost::posix_time;
using boost::posix_time::microsec_clock;
ofstream debugOut; // Output file for debug (DEBUG)
int getState()
{
int state = 0;
bool timeout = true;
try
{
//Open a message queue.
message_queue mqState
(open_only //only oepn q
,"msgState" //name
);
unsigned int priority;
message_queue::size_type recvd_size;
timeout = !mqState.try_receive(&state, sizeof(state), recvd_size, priority);
}
catch(interprocess_exception &ex)
{
timeout = true;
}
if(timeout){state = 0;}
return state;
}
float getData()
{
float Data = -123456;
bool timeout = true;
try
{
//Open a message queue.
message_queue mqData
(open_only //only oepn q
,"msgQData" //name
);
unsigned int priority;
message_queue::size_type recvd_size;
//Receive the data
//mqData.try_receive(&Data, sizeof(Data), recvd_size, priority);
timeout = !mqData.timed_receive(&Data, sizeof(Data), recvd_size, priority,
ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(10));
}
catch(interprocess_exception &ex)
{
timeout = true;
}
if(timeout){Data = -123456;}
return Data;
}
int main ()
{
int state = 0;
int maxRunTime = 10;
float Data;
float DataArray[100000];
debugOut.open("IPCWriteTest.txt", std::ios::trunc);
debugOut.close();
while(true)
{
switch (state)
{
case 0:
//Do nothing - data not ready state
if(getState() == 1)
{
state = 1;
cout << "State: 1" <<endl;
} //If all msQ ok set state 1
else{state = 0;}
break;
case 1:
for (int runTime = 0; runTime < maxRunTime; runTime++)
{
cout << "runTime: " << runTime << " Data: ";
for (int i = 0; i < 3; i++)
{
Data = getData();
cout << Data << " ";
DataArray[runTime]=Data;
}
cout << endl;
}
debugOut.open("IPCWriteTest.txt", std::ios::app);
for (int runTime = 0; runTime < maxRunTime; runTime++)
{
debugOut << "runTime: " << runTime << " Data: ";
for (int i = 0; i < 3; i++)
{
debugOut << DataArray[runTime] << " ";
}
debugOut << endl;
}
debugOut.close();
state = 0;
break;
default:
break;
}
}
std::cout << "done" << endl;
getchar();
}