1

As the title says i have a question concerning the following scenario (simplyfied example):

Assume that i have an object of the Generator-Class below, which continuously updates its dataChunk member ( running in the main thread).

class Generator
{
  void generateData();
  uint8_t dataChunk[999];
}

Furthermore i have an async. acceptor of TCP-connections to which 1-N clients can connect to (running in a second thread). The acceptor starts a new thread for each new client-connection, in which an object of the Connection class below, receives a request message from the client and provides a fraction of the dataChunk (belonging to the Generator) as an answer. Then waits for a new request and so on...

class Connection
{

  void setDataChunk(uint8_t* dataChunk);
  void handleRequest();
  uint8_t* dataChunk;
}

Finally the actual question: The desired behaviour is that the Generator object generates a new dataChunk and waits until all 1-N Connection objects have delt with their client requests until it generates a new dataChunk.

How do i lock the dataChunk for writing access of the Generator object while the Connection objects deal with their requests, but all Connection objects in their respective threads are supposed to have reading-access at the same time during their request-handling phase.

On the other hand the Connection objects are supposed to wait for a new dataChunk after dealing with their respective request, without dropping a new client request.

--> I think a single mutex won't do the trick here.

My first idea was to share a struct between the objects with a semaphore for the Generator and a vector of semaphores for the connections. With these, every object could "understand" the state of the full-system and work accordingly.

What to you guys think, what is best practice i cases like this?

Thanks in advance!

  • "I think a single mutex won't do the trick here." - usually thinking mutual exclusions is an anti-pattern in asynchronous code – sehe May 12 '18 at 11:51
  • As far as I understood you can use a simple atomic counter (to hold an amount of data available), decrement it by connection when data was sent to a client and wait on it if it turns to zero. BTW, it's not a good idea to drop mutex before trying it. A lot of programs successfully use locks and scale well. – Michael Nastenko May 12 '18 at 13:23
  • @MichaelNastenko it is a bad idea to use mutex with asynchronous operations, because it doesn't work. Locking it across operations means you might try to unlock on another thread and not locking across makes it useless - a race. – sehe May 12 '18 at 14:12
  • @sehe No, it's not. It will be bad idea only if you have a lot of contention, but you can say before you test it. In this case it shouldn't be a problem since lock time should be too long. – Michael Nastenko May 13 '18 at 02:24
  • @SterndesSuedens you can try to use read/write lock or lockless queue – Michael Nastenko May 13 '18 at 07:10
  • @MichaelNastenko huh. I was never talking about performance – sehe May 13 '18 at 09:07
  • @sehe Problem in your example not caused by a mutex, but incorrect usage of a shared resource. – Michael Nastenko May 14 '18 at 11:40
  • @MichaelNastenko I didn't give an example (or at least I have no clue what example you refer to). Regardless, let's stop this fruitless discussion. Have a look at https://www.boost.org/doc/libs/1_67_0/doc/html/boost_asio/overview/core/strands.html - This exists for a reason. Strands, internally, still implement locking BUT they abstract the interaction with the async operation multiplexing in such a way that it will work correctly (and potentially performs better, but that's a tunable and not the core feature). – sehe May 14 '18 at 12:53
  • @sehe I meant your second comment. And about your link - this only one specific case. It doesn't mean that mutex (or lock) bad idea in general. This is a tool that quite useful where it's applicable. – Michael Nastenko May 14 '18 at 13:58
  • Nobody is talking about the "general" case. The question is specifically about sharing resources in async operations with Boost Asio – sehe May 14 '18 at 14:00
  • @sehe And mutex will work fine in this case. – Michael Nastenko May 14 '18 at 14:22
  • Come on. Time to show the goods. Talk is cheap. – sehe May 14 '18 at 14:38
  • "The acceptor starts a new thread for each new client-connection,[...]" This *can* work at least some degree, but adds enough overhead that most tend to avoid it in favor of asynchronous I/O carried out by only a few threads (often only one). – Jerry Coffin May 14 '18 at 16:40

2 Answers2

1

There are several ways to solve it.

You can use std::shared_mutex.

void Connection::handleRequest()
{
    while(true)
    {
        std::shared_lock<std::shared_mutex> lock(GeneratorObj.shared_mutex);
        if(GeneratorObj.DataIsAvailable()) // we need to know that data is available
        {
            // Send to client
            break;
        }
    }
}

void Generator::generateData()
{
    std::unique_lock<std::shared_mutex> lock(GeneratorObj.shared_mutex);

    // Generate data
}

Or you can use a boost::lockfree::queue, but data structures will be different.

Michael Nastenko
  • 2,785
  • 1
  • 10
  • 14
  • How will you hold the lock while sending data to n clients? The sending happens asynchronously. Your sample code implies a pull model for clients. The question implies a push model (n-way broadcast) – sehe May 14 '18 at 15:22
  • Function name `Connection::handleRequest` implies pull model - client made request. Why do you think it is push model? And even so - there is no need to hold the lock, data can be cached by Connection for transfer. – Michael Nastenko May 14 '18 at 23:09
  • I know it can be copied into the task. However, the question asks about locking the generator, which shows that they don't want that. Also, the question explicitly describes the push model in that it's important for all N clients to receive the same chunk before generating the next (_"The desired behaviour is that the Generator object generates a new dataChunk and waits until all 1-N Connection objects have delt with their client requests until it generates a new dataChunk"_. The only leeway that exists is that "their client requests" are pretty loosely described) – sehe May 14 '18 at 23:12
  • 1
    In fairness, I'm starting to wonder what OP meant exactly. So, yeah, this might be a fairer suggestion (though then I'd suggest copying rather than locking. No sharing is always better than sharing, not even for perf but for simplicity) – sehe May 14 '18 at 23:14
  • @sehe _No sharing is always better than sharing, not even for perf but for simplicity)_ Totally agree on that. – Michael Nastenko May 14 '18 at 23:15
0

How do i lock the dataChunk for writing access of the Generator object while the Connection objects deal with their requests, but all Connection objects in their respective threads are supposed to have reading-access at the same time during their request-handling phase.

I'd make a logical chain of operations, that includes the generation.

Here's a sample:

  • it is completely single threaded
  • accepts unbounded connections and deals with dropped connections
  • it uses a deadline_timer object to signal a barrier when waiting for to send of a chunck to (many) connections. This makes it convenient to put the generateData call in an async call chain.

Live On Coliru

#include <boost/asio.hpp>
#include <list>
#include <iostream>

namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;

using Clock = std::chrono::high_resolution_clock;
using Duration = Clock::duration;
using namespace std::chrono_literals;

struct Generator {
    void generateData();
    uint8_t dataChunk[999];
};

struct Server {
    Server(unsigned short port) : _port(port) {
        _barrier.expires_at(boost::posix_time::neg_infin);

        _acc.set_option(tcp::acceptor::reuse_address());
        accept_loop();
    }

    void generate_loop() {
        assert(n_sending == 0);

        garbage_collect(); // remove dead connections, don't interfere with sending

        if (_socks.empty()) {
            std::clog << "No more connections; pausing Generator\n";
        } else {
            _gen.generateData();
            _barrier.expires_at(boost::posix_time::pos_infin);

            for (auto& s : _socks) {
                ++n_sending;
                ba::async_write(s, ba::buffer(_gen.dataChunk), [this,&s](error_code ec, size_t written) {
                    assert(n_sending);
                    --n_sending; // even if failed, decreases pending operation
                    if (ec) {
                        std::cerr << "Write: " << ec.message() << "\n";
                        s.close();
                    }
                    std::clog << "Written: " << written << ", " << n_sending << " to go\n";

                    if (!n_sending) {
                        // green light to generate next chunk
                        _barrier.expires_at(boost::posix_time::neg_infin);
                    }
                });
            }

            _barrier.async_wait([this](error_code ec) {
                if (ec && ec != ba::error::operation_aborted)
                    std::cerr << "Client activity: " << ec.message() << "\n";
                else generate_loop();
            });
        }
    }

    void accept_loop() {
        _acc.async_accept(_accepting, [this](error_code ec) {
                if (ec) {
                    std::cerr << "Accept fail: " << ec.message() << "\n";
                } else {
                    std::clog << "Accepted: " << _accepting.remote_endpoint() << "\n";
                    _socks.push_back(std::move(_accepting));

                    if (_socks.size() == 1) // first connection?
                        generate_loop();    // start generator

                    accept_loop();
                }
            });
    }

    void run_for(Duration d) {
        _svc.run_for(d);
    }

    void garbage_collect() {
        _socks.remove_if([](tcp::socket& s) { return !s.is_open(); });
    }
  private:
    ba::io_service _svc;
    unsigned short _port;
    tcp::acceptor _acc { _svc, { {}, _port } };
    tcp::socket _accepting {_svc};

    std::list<tcp::socket> _socks;

    Generator _gen;
    size_t n_sending = 0;
    ba::deadline_timer _barrier {_svc};
};

int main() {
    Server s(6767);
    s.run_for(3s); // COLIRU
}

#include <fstream>
// synchronously generate random data chunks
void Generator::generateData() {
    std::ifstream ifs("/dev/urandom", std::ios::binary);
    ifs.read(reinterpret_cast<char*>(dataChunk), sizeof(dataChunk));
    std::clog << "Generated chunk: " << ifs.gcount() << "\n";
}

Prints (for just the 1 client):

Accepted: 127.0.0.1:60870
Generated chunk: 999
Written: 999, 0 to go
Generated chunk: 999
   [... snip ~4000 lines ...]
Written: 999, 0 to go
Generated chunk: 999
Write: Broken pipe
Written: 0, 0 to go
No more connections; pausing Generator
sehe
  • 374,641
  • 47
  • 450
  • 633