0

I am implementing a concurrent wrapper as introduced by Herb Sutter presented in his talk "C++ and Beyond 2012".

template <typename T>
class ConcurrentWrapper {

  private:

    std::deque<std::unique_ptr<std::function<void()>>> _tasks;
    std::mutex _mutex;
    std::condition_variable _cond;

    T _object;
    std::thread _worker;
    std::atomic<bool> _done {false};

  public:

    template <typename... ArgsT>
    ConcurrentWrapper(ArgsT&&... args) :
      _object {std::forward<ArgsT>(args)...},
      _worker {
        [&]() {
          typename decltype(_tasks)::value_type task;
          while(!_done) {
            { 
              std::unique_lock<std::mutex> lock(_mutex);
              while(_tasks.empty()) {
                _cond.wait(lock);
              }
              task = std::move(_tasks.front());
              _tasks.pop_front();
            } 
            (*task)();
          }
        }
      } {
    }

    ~ConcurrentWrapper() {

      {
        std::unique_lock<std::mutex> lock(_mutex);
        _tasks.push_back(std::make_unique<std::function<void()>>(
          [&](){_done = true;}
        ));
      }
      _cond.notify_one();
      _worker.join();           
    }

    template <typename F, typename R = std::result_of_t<F(T&)>>
    std::future<R> operator()(F&& f) {

      std::packaged_task<R(T&)> task(std::forward<F>(f)); 
      auto fu = task.get_future();

      {
        std::unique_lock<std::mutex> lock(_mutex);
        _tasks.push_back(std::make_unique<std::function<void()>>(
          [this, task=MoveOnCopy<decltype(task)>(std::move(task))]() { 
            task.object(this->_object); 
          }
        ));
      }
      _cond.notify_one();

      return fu;
    }
};

Basically, the idea is to wrap an object and provide thread-safe access in FIFO order using operation (). However, in some runs (not always happen), the following program hanged:

ConcurrentWrapper<std::vector<int>> results;

results(
  [&](std::vector<T>& data) {
    std::cout << "sorting...\n";
    std::sort(data.begin(), data.end());
    std::cout << "done ...\n";
    EXPECT_EQ(data, golden);
  }   
).get();

However, the program work correctly without explicitly calling get() method.

results(
  [&](std::vector<T>& data) {
    std::cout << "sorting...\n";
    std::sort(data.begin(), data.end());
    std::cout << "done ...\n";
    EXPECT_EQ(data, golden);
  }    
);    // Function correctly without calling get

What could the be problem? Did I implement something wrong? I noticed a posted here saying that "a packaged_task needs to be invoked before you call f.get(), otherwise you program will freeze as the future will never become ready." Is this true? If yes, how can I get this problem solved?

I was compiling the code using -std=c++1z -pthread with G++ 6.1

Community
  • 1
  • 1
Jes
  • 2,614
  • 4
  • 25
  • 45
  • The code looks suspiciosly like a troll (`task.object(this->_object);`, `[&](std::vector& data)` etc.). Please provide a [Minimal, Complete, and Verifiable example](http://stackoverflow.com/help/mcve), – rustyx Sep 26 '16 at 19:38
  • Also have you noticed you invoke tasks outside the lock? (`(*task)();`). And why wrap `std::function` in `unique_ptr`? – rustyx Sep 26 '16 at 19:49

0 Answers0