1

I have a strange problem with my server application. My system is simple: I have 1+ devices and one server app that communicate over a network. Protocol has binary packets with variable length, but fixed header (that contain info about current packet size). Example of packet:

char pct[maxSize] = {}
pct[0] = 0x5a //preambule
pct[1] = 0xa5 //preambule
pct[2] = 0x07 //packet size
pct[3] = 0x0A //command
... [payload]

The protocol is built on the principle of a command-answer.

I use boost::asio for communication - io_service with thread pull (4 threads) + async read/write operation (code example below) and create a "query cycle" - each 200ms by timer:

  • query one value from device
  • get result, query second value
  • get result, start timer again

This work very well on boost 1.53 (Debug and Release). But then i switch to boost 1.54 (especially in Release mode) magic begins. My server successfuly starts, connects to device and starts "query cycle". For about 30-60 seconds everything work well (I receive data, data is correct), but then I start receive asio::error on last read handle (always in one place). Error type: EOF. After recieving the error, I must disconnect from device.

Some time of googling give me info about EOF indicate that other side (device in my case) initiated disconnect procedure. But, according to the logic of the device it can not be true. May somebody explain what's going on? May be i need set some socket option or defines? I see two possible reason:

  • my side init disconnect (with some reason, that i don't know) and EOF is answer of this action.
  • some socket timeout firing.

My environment:

  • OS: Windows 7/8
  • Compiler: MSVC 2012 Update 3

Sample code of main "query cycle". Is adapted from official boost chat example All code simplified for reduce space :)

  • SocketWorker - low level wrapper for sockets
  • DeviceWorker - class for device communication
  • ERes - internal struct for error store
  • ProtoCmd and ProtoAnswer - wrapper for raw array command and answer (chat_message analog from boost chat example)
  • lw_service_proto namespace - predefined commands and max sizes of packets

So, code samples. Socket wrapper:

namespace b = boost;
namespace ba = boost::asio;

typedef b::function<void(const ProtoAnswer answ)> DataReceiverType;

class SocketWorker
{
private:
    typedef ba::ip::tcp::socket socketType;
    typedef std::unique_ptr<socketType> socketPtrType;
    socketPtrType devSocket;
    ProtoCmd      sendCmd;
    ProtoAnswer   rcvAnsw; 

    //[other definitions]

public:

//---------------------------------------------------------------------------
ERes SocketWorker::Connect(/*[connect settings]*/)
{
    ERes res(LGS_RESULT_ERROR, "Connect to device - Unknow Error");

    using namespace boost::asio::ip;
    boost::system::error_code sock_error;

    //try to connect
    devSocket->connect(tcp::endpoint(address::from_string(/*[connect settings ip]*/), /*[connect settings port]*/), sock_error);

    if(sock_error.value() > 0) {
        //[work with error]
        devSocket->close();
    }
    else {
        //[res code ok]
    } 

    return res;
}
//---------------------------------------------------------------------------
ERes SocketWorker::Disconnect()
{
    if (devSocket->is_open())
    {
        boost::system::error_code ec;
        devSocket->shutdown(bi::tcp::socket::shutdown_send, ec);
        devSocket->close();
    }
    return ERes(LGS_RESULT_OK, "OK");
}

//---------------------------------------------------------------------------
//query any cmd
void SocketWorker::QueryCommand(const ProtoCmd cmd, DataReceiverType dataClb)
{
    sendCmd = std::move(cmd); //store command
    if (sendCmd .CommandLength() > 0)
    {
        ba::async_write(*devSocket.get(), ba::buffer(sendCmd.Data(), sendCmd.Length()),
                        b::bind(&SocketWorker::HandleSocketWrite,
                                this, ba::placeholders::error, dataClb));
    }
    else
    {
        cerr << "Send command error: nothing to send" << endl;
    }
}

//---------------------------------------------------------------------------
// boost socket handlers
void SocketWorker::HandleSocketWrite(const b::system::error_code& error, 
                                                   DataReceiverType dataClb)
{
    if (error)
    {
        cerr << "Send cmd error: " << error.message() << endl;
        //[send error to other place]
        return;
    }

    //start reading header of answer (lw_service_proto::headerSize == 3 bytes)
    ba::async_read(*devSocket.get(),
                   ba::buffer(rcvAnsw.Data(), lw_service_proto::headerSize),
                   b::bind(&SocketWorker::HandleSockReadHeader, 
                           this, ba::placeholders::error, dataClb)); 
}
//---------------------------------------------------------------------------
//handler for read header
void SocketWorker::HandleSockReadHeader(const b::system::error_code& error, DataReceiverType dataClb)
{
    if (error)
    {
        //[error working]
        return;
    }

    //decode header (check preambule and get  full packet size) and read answer payload
    if (rcvAnsw.DecodeHeaderAndGetCmdSize())
    {
      ba::async_read(*devSocket.get(),
                     ba::buffer(rcvAnsw.Answer(), rcvAnsw.AnswerLength()),
                     b::bind(&SocketWorker::HandleSockReadBody, 
                             this, ba::placeholders::error, dataClb));
    }
}
//---------------------------------------------------------------------------
//handler for andwer payload
void SocketWorker::HandleSockReadBody(const b::system::error_code& error, DataReceiverType dataClb)
{
    //if no error - send anwser to 'master'
    if (!error){
        if (dataClb != nullptr) 
            dataClb(rcvAnsw);
    }
    else{
        //[error process]

        //here i got EOF in release mode
    }
}

};

Device worker

class DeviceWorker
{
private:
    const static int LW_QUERY_TIME = 200;
    LWDeviceSocketWorker sockWorker;
    ba::io_service&    timerIOService;
    typedef std::shared_ptr<ba::deadline_timer> TimerPtr;
    TimerPtr        queryTimer;
    bool            queryCycleWorking;

    //[other definitions]
public:

ERes DeviceWorker::Connect()
{
    ERes intRes = sockWorker.Connect(/*[connect settings here]*/);

    if(intRes != LGS_RESULT_OK) {
        //[set result to error]
    }
    else {
        //[set result to success]

        //start "query cycle"
        StartNewCycleQuery();
    }

    return intRes;
}
//---------------------------------------------------------------------------
ERes DeviceWorker::Disconnect()
{
    return sockWorker.Disconnect();
}
//---------------------------------------------------------------------------
void DeviceWorker::StartNewCycleQuery()
{
    queryCycleWorking = true;
    //start timer
    queryTimer = make_shared<ba::deadline_timer>(timerIOService, bt::milliseconds(LW_QUERY_TIME));
    queryTimer->async_wait(boost::bind(&DeviceWorker::HandleQueryTimer,
                                       this, boost::asio::placeholders::error));
}
//---------------------------------------------------------------------------
void DeviceWorker::StopCycleQuery()
{
    //kill timer
    if (queryTimer) 
        queryTimer->cancel();

    queryCycleWorking = false;
}
//---------------------------------------------------------------------------
//timer handler
void DeviceWorker::HandleQueryTimer(const b::system::error_code& error)
{
    if (!error)
    {
        ProtoCmd cmd;    
        //query for first value
        cmd.EncodeCommandCore(lw_service_proto::cmdGetAlarm, 1);
        sockWorker.QueryCommand(cmd, boost::bind(&DeviceWorker::ReceiveAlarmCycle, 
                                this, _1));    
    }
}
//---------------------------------------------------------------------------
//receive first value
void DeviceWorker::ReceiveAlarmCycle(ProtoAnswer adata)
{
    //check and fix last bytes (remove \r\n from some commands)
    adata.CheckAndFixFooter();

    //[working with answer]

    if (queryCycleWorking)
    { 
        //query for second value
        ProtoCmd cmd;
        cmd.EncodeCommandCore(lw_service_proto::cmdGetEnergyLevel, 1);
        sockWorker.QueryCommand(cmd, b::bind(&DeviceWorker::ReceiveEnergyCycle, 
                                      this, _1));
    }
}
//---------------------------------------------------------------------------
//receive second value
void DeviceWorker::ReceiveEnergyCycle(ProtoAnswer edata)
{
    //check and fix last bytes (remove \r\n from some commands)
    edata.CheckAndFixFooter();

    //[working with second value]

    //start new "query cycle"
    if (queryCycleWorking)
        StartNewCycleQuery();
}

};

Any ideas are welcome :)

edit: After several test I see anower picture:

  • this issue reproduce on boost 1.54 only (Debug and Release mode, Release - much more faster), with boost 1.53 no more error (maybe i poorly clean my code then rebuild first times....)
  • with boost 1.54 and 1 thread (instead of 4) all work well

I also spend some time with debugger and boost source and making some conclusion:

  • When i receive EOF my data is already fully received.
  • This EOF indicate that is nothing to transfer in this operation, i.e. socket result flag is 0 (no error), but boost operation flag if EOF (transfer bytes == 0)

At this moment I am forced to switch on boost 1.53...

ShaKeSPeaR
  • 153
  • 1
  • 7
  • 1
    I admit I didn't go deeply into the problem description... But ata aglance, the buffers' lifetimes look suspicious to me. In particular, you send `buffer(cmd.Data(), cmd.Length())` - where `cmd` is a local object, i.e. the buffer obviously doesn't outlive the async.operation. Similarly, what is `rcvAnsw`, where it's defined? – Igor R. Jul 15 '13 at 11:51
  • @IgorR. My bad, sorry :) Local object for one command and one answer defined in SocketWorker, and therefore lives all async operations time. But for local "cmd" is a good question. for some reason I thought that the buffer make copy data for send. Try save command localy... PS: add local command to source in main post – ShaKeSPeaR Jul 15 '13 at 12:42
  • 1
    No, `buffer()` free function doesn't copy and doesn't "own" the underlying buffer, it just adapts it to `ConstBufferSequence` (or `MutableBufferSequence`) concept. http://www.boost.org/doc/libs/1_54_0/doc/html/boost_asio/reference/buffer.html#boost_asio.reference.buffer.buffer_invalidation – Igor R. Jul 15 '13 at 12:53
  • I mean buffer stored somewhere inside `async_send` but re-read manual and understood that I was wrong :) I corrected this issue in my code - sadly, nothing change with main problem... But thank you anyway for this remark :) – ShaKeSPeaR Jul 15 '13 at 13:18
  • how many threads are invoking `io_service::run()`? – Sam Miller Jul 15 '13 at 16:29
  • @SamMiller io_service invoke on pull of 4 threads – ShaKeSPeaR Jul 16 '13 at 06:37
  • Does the problem reproduce if you invoke `io_service::run()` from a single thread? If so, please include this code in your question. – Sam Miller Jul 16 '13 at 16:56
  • @SamMiller ok, try this tomorrow, thanks. I also did more deep research with boost, and found out some interested things. I will try to make the results as a separate answer a bit later. – ShaKeSPeaR Jul 18 '13 at 09:26
  • @SamMiller Tried with one thread - all working well.... Edited main post with this info – ShaKeSPeaR Jul 19 '13 at 13:43
  • I think I have a similar [problem](http://stackoverflow.com/questions/18654798/random-eof-in-boost-asio-in-multi-thread) – Arnaud Sep 06 '13 at 13:50

2 Answers2

0

If your application works fine with a single thread invoking io_service::run() but fails with four threads, you very likely have a race condition. This type of problem is difficult to diagnose. Generally speaking you should ensure your devSocket has at most one outstanding async_read() and async_write() operation. Your current implementation of SocketWorker::QueryCommand() unconditionally invokes async_write() which may violate the ordering assumption documented as such

This operation is implemented in terms of zero or more calls to the stream's async_write_some function, and is known as a composed operation. The program must ensure that the stream performs no other write operations (such as async_write, the stream's async_write_some function, or any other composed operations that perform writes) until this operation completes.

The classic solution to this problem is to maintain a queue of outgoing messages. If a previous write is outstanding, append the next outgoing message to the queue. When the previous write completes, initiate the async_write() for the next message in the queue. When using multiple threads invoking io_service::run() you may need to use a strand as the linked answer does.

Community
  • 1
  • 1
Sam Miller
  • 23,808
  • 4
  • 67
  • 87
  • Thanks for answer. I know about "one async operation on socket at time" problem, and my current design escaping this by manualy control query cycle, e.g. `timer end-write cmd-read cmd-....-start timer`). But anyway I tried rewrite my code with `strand` but with no luck - issue is still here. I forced temporally switch to boost 1.53. – ShaKeSPeaR Jul 29 '13 at 09:20
  • I also tried rewrite code use `async_read_until` with `\r\n` parameters (just for remove my handlers). But result is the same - I get `EOF` and `byte_transferred=0`, though my buffer already handle correct packet, i.e. it already transferred and bytes count more than 0.... – ShaKeSPeaR Jul 29 '13 at 09:42
  • I have just spent the weekend debugging a similar issue with boost 1.54. I couldn't find anything wrong with my code. Installed 1.55 and haven't seen the issue again (it occured everytime I ran a specific test, ran the same test over 100 times now with boost 1.55 and haven't seen the problem). – regu Jan 06 '14 at 06:03
0

I had the exact same problem and I am quite sure that this is a bug of boost::asio 1.54.0

Here is the bug report.

The solution is effectively to get back to 1.53, although there is a patch available for 1.54 in the bug report page.

Arnaud
  • 3,765
  • 3
  • 39
  • 69
  • Thanks, rolld back to 1.53 already. I hope patch is included in 1.55 :) – ShaKeSPeaR Sep 07 '13 at 20:41
  • Just looked up upcoming [boost 1.55](http://www.boost.org/users/history/version_1_55_0.html). Looks like it found and fixed bug with io_service + multi thread pool for Windows :) – ShaKeSPeaR Oct 21 '13 at 13:54