1

Can anyone please explain me why this c++ code crashes (actually it doesn't crash instead valgrind complains about invalid memory access).

The idea is simple. Controler creates a few workers. Every worker gets functor object which erases this worker once it has done its job. Can anyone please present some solution?

typedef boost::function<void()> Callback;

struct Worker
{
    void start(Callback onFinnish)
    {
        // Called when finnished working
        onFinnish();
    }
};

typedef boost::shared_ptr<Worker> WorkerPtr;

struct Controler
{
    void start()
    {
        for(int i = 0; i < 5; ++i)
        {
            auto workerPtr = boost::make_shared<Worker>();
            workers.insert(workerPtr);
        }

        for(const auto &workerPtr: workers)
        {
            workerPtr->start(
                [this, workerPtr] ()
                {
                    workers.erase(workerPtr);
                    if(workers.size() == 0)
                    {
                        std::cout << "All workers done!" << std::endl;
                    }
                }
            );
        }
    }

    std::set<WorkerPtr> workers;
};

// Somewhere in code
Controler c; c.start();

EDIT AFTER COMMENTS: I implemented the for auto loop like this and now it works:

    for(auto workerIt = workers.begin(); workerIt != workers.end();)
    {
        auto nextWorker = workerIt; ++nextWorker;

        (*workerIt)->start(
            [this, workerIt] ()
            {
                workers.erase(workerIt);
                if(workers.empty())
                {
                    onWorkersDone();
                }
            }
        );

        workerIt = nextWorker;
    }
piotrekg2
  • 1,237
  • 11
  • 21
  • Can you clarify what `workerPtr->start(...)` does? In particular, does this operation spawn a new thread? Depending on the answer of this question the problem is either a data race (if a thread is spawned) or iterator invalidation (if no thread is spawned). – Dietmar Kühl Aug 23 '14 at 17:20
  • 1
    @DietmarKühl: Currently, OP provided an implementation of `workerPtr->start(...)` without threading (maybe by over simplification of his problem). – Jarod42 Aug 23 '14 at 17:23
  • @Jarod42: er, true. In which case the problem is, indeed, the iterators being invalidated. Maybe time to restore parts of my original answer ;-) Thanks - somehow I didn't realize that `start()` is implemented! – Dietmar Kühl Aug 23 '14 at 17:27
  • @Jarod42: yes, you are right. Somehow I missed the fact that the `start()` function is there and doesn't start a thread. I would guess, however, that the problem was shifted from one to another problem when simplifying the code... – Dietmar Kühl Aug 23 '14 at 17:35

2 Answers2

3

In your for range, you modify the container inside the loop, which involves UB.

creating a copy of the set should fix your issue:

for(const auto &workerPtr: std::set<WorkerPtr>(workers)) // create a copy of the set
{
    workerPtr->start(
        [this, workerPtr] ()
        {
            workers.erase(workerPtr);
            if(workers.size() == 0)
            {
                std::cout << "All workers done!" << std::endl;
            }
        }
    );
}

Edit: As noted by Dietmar Kühl, if your method start launches a new thread, then you will have data race on the set.

Jarod42
  • 203,559
  • 14
  • 181
  • 302
3

According to 6.5.4 [stmt.ranged] the range-based for-loop behaves like this piece of code:

{
    auto && __range = workers;
    for (auto __begin = workers.begin(), __end = workers.end(); __begin != __end; ++__begin ) {
        const auto& = *__begin;
        <body-of-the loop>
    }
 }

Since you happily blow away the objects currently being referred to, you actually invalidate the __begin iterator in each iteration of the loop: for associative containers all iterators and pointers/references to objects being erased are invalidated. Any invalidated iterators is a booby-trap waiting to be disarmed (by assignment or destruction) which will blow up in undefined behavior when used in any way other than disarming it.

Personally, I would fix the issue by iterating over the set and erasing its element in a more effective way anyway: your approach requires searching the elements in each iteration which is unnecessary cost. You can just keep an iterator:

for (auto it = works.begin(), end = works.end(); it != end; ) {
    (*it)->start([this, it]() {
         workers.erase(it++);
         if (workers.empty()) {
             std::cout << "All workers done!\n";
         }
     });
}

Of course, you should also use container.empty() to determine if the container is empty. ... and just don't use std::endl as it doesn't do anybody any good. Use std::flush if you really mean to flush the stream.

Of course, if start() in the posted code is actually a simplification and actually starts a thread, the problem becomes a different one! In this case, the erase() statement isn't executed when the function object is being created to start the thread. The original issue, i.e., invaliding the iterators of the range-based for would only happen if the started thread executes and erase()d the object before the starting thread moved on!

Instead, there is an obvious data race: there are multiple thread erase()ing objects from this->workers without any synchronization at all! You can address the problem by guarding the access to the shared std::set<WorkerPtr> using a suitable mutex. For example, I think this code resolves the problem:

struct Controler
{
    void start()
    {
        for(int i = 0; i < 5; ++i)
        {
            auto workerPtr = boost::make_shared<Worker>();
            this->workers.insert(workerPtr);
        }

        std::lock_guard<std::mutex> kerberos(this->mutex);
        for(const auto &workerPtr: this->workers)
        {
            workerPtr->start(
                [this, workerPtr] ()
                {
                    std::lock_guard<std::mutex> kerberos(this->mutex);
                    this->workers.erase(workerPtr);
                    if(this->workers.empty()) {
                        std::cout << "All workers done!\n";
                    }
                }
            );
        }
    }

    std::set<WorkerPtr> workers;
    std::mutex          mutex;
};

Of course, the problem analysis and resolution assumes that workerPtr->start(...) actually starts a thread to do the work! If the work is executed on the same thread the iterators from the range-based for are, indeed, invalidated and adding a mutex as is done in the code above would cause a dead-lock.

Dietmar Kühl
  • 150,225
  • 13
  • 225
  • 380
  • But I really need to use this callback mechanism. Every worker must erase itself once its done. There is a single thread. Worker finnish its job immediately or it creates another callback which will be called later. – piotrekg2 Aug 23 '14 at 16:53
  • @piotrekg2: actually, you I'm convinced that there is _no_ such need! You may be better off redesigning your concurrency system to send suitable messages between actor objects. That is, instead of removing objects from a shared resource, you'd sent a suitable message to the object owning the shared resource and have it remove objects. Although I'm aware of various synchronization primitives, I'm generally not capable of using them correctly but I manage to get message passing system rights. – Dietmar Kühl Aug 23 '14 at 17:06
  • @DietmarKühl: as the OP's code doesn't use any thread (maybe due to over simplification), OP really modifies container during iteration... so my answer makes sense. – Jarod42 Aug 23 '14 at 17:16
  • Thank you all for your comments. It's true that there was a problem with invalidating iterator. I used raw iterators to solve this issue (increment before erasing). @DietmarKühl I can't redesign the system (don't ask). I just have to stick with it. Generally the system works on a single thread and dispatches control to registered callbacks. If I want to synchronize things I have to create callback which registers another callback and so on. – piotrekg2 Aug 23 '14 at 17:43