Code
I've got an algorithm to run using threads. When several parts of the algorithm are coming to a specific step, I want the main thread to know this. I work under Xubuntu 18.04.
Code of the main application follows this scheme
vector<thread> threads;
vector<bool> flags;
bool finish = false;
cout << "Create" << endl;
create_threads(threads, flags, finish);
cout << "Finish" << endl;
finish = true;
cout << "Destroy" << endl;
destroy_threads(threads);
cout << "End" << endl;
return 0;
Each thread loops until the finish
is true
(it's changed by the main thread from the outside).
Also, after the creation of the threads, I check whether they have come to a specific step by assigning flags[thread_number] = true
void create_threads(vector<thread>& threads, vector<bool>& flags, bool& finish)
{
auto command = [&finish, &flags](unsigned thread_number)
{
while (!finish)
{
flags[thread_number] = true;
}
};
for (unsigned i = 0; i < threads_count; ++i)
{
flags.push_back(false);
threads.push_back(move(thread{command, i}));
}
// BUG: Freeze is here
wait_threads(flags);
// BUG: Freeze is here
}
I've got a function for the check.
It just waits until all elements of the flags
vector are true
.
Before threads execution, they all are false
, and only threads modify them.
void wait_threads(vector<bool>& flags)
{
while (true)
{
if (all_of(flags.begin(), flags.end(), [](bool value) { return value; }))
{
return;
}
}
}
The entire code example looks as follows and can be compiled with clang++ sync.cpp -lpthread -o sync
#include <algorithm>
#include <thread>
#include <vector>
#include <iostream>
#include <utility>
using namespace std;
const unsigned threads_count = 2;
/**
* Wait for threads to start.
* Every thread has an access to the `flags` vector.
* After the initialization each thread assigns `true`
* to each component of the vector.
*
* Though, with `-O2` and `-O3` optimization flags
* the function `wait_threads` freezes,
* like the elements don't change from `false` to `true`.
*/
void wait_threads(vector<bool>& flags)
{
while (true)
{
if (all_of(flags.begin(), flags.end(), [](bool value) { return value; }))
{
return;
}
}
}
/**
* Create threads.
* After the launch, each thread assigns `true` to the corresponding `flag` cell.
* Also, the threads watch `finish` variable to stop when it's `true`.
*/
void create_threads(vector<thread>& threads, vector<bool>& flags, bool& finish)
{
auto command = [&finish, &flags](unsigned thread_number)
{
while (!finish)
{
flags[thread_number] = true;
}
};
for (unsigned i = 0; i < threads_count; ++i)
{
flags.push_back(false);
threads.push_back(move(thread{command, i}));
}
// BUG: Freeze is here
wait_threads(flags);
// BUG: Freeze is here
}
/**
* Wait until all threads finish.
*/
void destroy_threads(vector<thread>& threads)
{
for (auto& el : threads)
{
el.join();
}
}
int main()
{
vector<thread> threads;
vector<bool> flags;
bool finish = false;
cout << "Create" << endl;
create_threads(threads, flags, finish);
cout << "Finish" << endl;
finish = true;
cout << "Destroy" << endl;
destroy_threads(threads);
cout << "End" << endl;
return 0;
}
Issue
When I compile the code without any optimization, it runs well.
If I use -O2
, -O3
or -Ofast
, it freezes on the wait_threads
call.
Looks like the compiler sees that the flags
vector is populated with false
in the main thread, and remembers it. It happens even with const unsigned threads_count = 1;
.
I've tried to use vector< atomic<unsigned> >
, but I've found the How to declare a vector of atomic in C++ question, and understood that this is tricky and is not the right way to solve the problem.
Appendix (The real code)
Check in the real code are more complicated. Here is the real waiting function
void wait_threads_change(
ULONG beginning,
vector<ULONG>* last_changes,
vector<ULONG>* last_checks
)
{
while (*std::min_element(last_checks->begin(), last_checks->end()) <= beginning)
{
// cout << flush; // WORKAROUND: This helps to avoid freeze
}
while (
(*std::min_element(last_checks->begin(), last_checks->end())
<= *std::max_element(last_changes->begin(), last_changes->end()))
)
{
}
}
and the real thread creation
void run_csp_threads(
struct ConstraintGraph *graph,
vector<thread>* threads,
vector<ULONG>* last_changes,
vector<ULONG>* last_checks,
BOOL& finish
)
{
const ULONG THREADS = 7;
auto command = ([graph, THREADS, last_changes, last_checks, &finish](ULONG thread_number)
{
while (!finish)
{
if (csp_solution_iteration(graph, THREADS, thread_number))
{
do
{
(*last_changes)[thread_number] = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
} while (csp_solution_iteration(graph, THREADS, thread_number));
}
else
{
(*last_checks)[thread_number] = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
}
}
});
ULONG beginning = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
for (ULONG thread_number = 0; thread_number < THREADS; ++thread_number)
{
last_changes->push_back(beginning);
last_checks->push_back(beginning);
threads->push_back(move(thread{command, thread_number}));
}
wait_threads_change(beginning, last_changes, last_checks);
}
For the question I've tried to simplify the code as much as I can to know how to solve at least one problem.