After adding a queue for a work pool, in which i put the jobs and get them with a unique_lock, i get memory leak errors, but i can't find where i am missing to delete. Simple logic: i got a farm, it split the work, give them to threads, threads do the computation and push into the queue the results, then the emitter node split the job if needed and send it to the threads again.
I give you the actual code and i also post the error of -fsanitize=addres, anyway the code is runnable and you can try with your best profiling tool.
#include <iostream>
#include <unistd.h>
#include <typeinfo>
#include <chrono>
#include <thread>
#include <ff/ff.hpp>
#include <ff/pipeline.hpp>
#include <ff/farm.hpp>
#include <mutex>
#include <atomic>
#include <list>
#include <array>
#include <math.h>
#define UNASSIGNED 0
#define N 9
#define ERROR_PAIR std::make_pair(-1, -1)
using namespace std;
using namespace ff;
atomic<bool> solutionFound{false};
mutex mtx;
// Declaration for a tree node
struct Node {
array<unsigned char, N * N> grid;
vector<Node *> child;
};
vector<vector<Node *>> queueWork(0, vector<Node *>(0));
// Utility function to create a new tree node
Node *newNode(const array<unsigned char, N * N> &newGrid) {
Node *temp = new Node;
temp->grid = newGrid;
return temp;
}
void printGrid(const array<unsigned char, N * N> &grid) {
for (int row = 0; row < N; row++) {
if (row == 3 || row == 6) {
cout << "---------------------" << endl;
}
for (int col = 0; col < N; col++) {
if (col == 3 || col == 6) {
cout << "| ";
}
cout << (int)grid[row + col * N] << " ";
}
cout << endl;
}
}
bool canInsert(const int &val, const int &row_, const int &col_,
const array<unsigned char, N * N> &grid) {
// Check column
for (int row = 0; row < N; row++) {
if (grid[row + col_ * N] == val) return false;
}
// check row
for (int col = 0; col < N; col++) {
if (grid[row_ + col * N] == val) return false;
}
// check box
for (int row = 0; row < N; row++) {
for (int col = 0; col < N; col++) {
if (row / 3 == row_ / 3 &&
col / 3 == col_ / 3) { // they are in the same square 3x3
if ((grid[row + col * N] == val)) return false;
}
}
}
return true;
}
// vector<vector<int>> gridTest(9, vector<int>(9,0)); il vettore deve essere
// inizializzato, cosi. n = how many numbers you want to initialize the matrix
// with
void generateMatrix(const int &seed, const int &n,
array<unsigned char, N * N> &grid) {
srand(seed);
int i = 0;
while (i < n) {
int row = rand() % 9;
int col = rand() % 9;
int val = rand() % 9 + 1;
if (grid[row + col * N] == UNASSIGNED &&
canInsert(val, row, col, grid)) {
grid[row + col * N] = val;
i++;
}
}
return;
}
bool isSolution(
const array<unsigned char, N * N> &grid) // check if the sudoku is solved
{
char row_[9][N + 1] = {0};
char column_[9][N + 1] = {0};
char box[3][3][N + 1] = {0};
for (int row = 0; row < N; row++) {
for (int col = 0; col < N; col++) {
// mark the element in row column and box
row_[row][grid[row + col * N]] += 1;
column_[col][grid[row + col * N]] += 1;
box[row / 3][col / 3][grid[row + col * N]] += 1;
// if an element is already
// present in the hashmap
if (box[row / 3][col / 3][grid[row + col * N]] > 1 ||
column_[col][grid[row + col * N]] > 1 ||
row_[row][grid[row + col * N]] > 1)
return false;
}
}
return true;
}
pair<int, int> findCell(const array<unsigned char, N * N> &grid) {
for (int i = 0; i < N; i++) {
for (int j = 0; j < N; j++) {
if (grid[i + j * N] == UNASSIGNED) {
return make_pair(i, j);
}
}
}
return ERROR_PAIR;
}
void addChoices(list<array<unsigned char, N * N>> &choices, Node &node) {
while (!choices.empty()) {
node.child.push_back(newNode(choices.front()));
choices.pop_front();
}
return;
}
list<array<unsigned char, N * N>> getChoices(
const int &row, const int &col, const array<unsigned char, N * N> &grid) {
list<array<unsigned char, N * N>> choices;
for (int i = 1; i < 10; i++) {
if (canInsert(i, row, col, grid)) {
array<unsigned char, N *N> tmpGrid = grid;
tmpGrid[row + col * N] = i;
choices.push_back(move(tmpGrid));
}
}
return choices;
}
// Compute one step of computation for each node in input, and put all the
// childreen in the task vector.
void solveOneStep(vector<Node *> &nodes, vector<Node *> &tasks) {
// std::this_thread::sleep_for(std::chrono::milliseconds(2000));
// std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (solutionFound) {
for (Node *&t : nodes) {
delete t;
}
return;
}
for (Node *&n : nodes) {
if (findCell(n->grid) != ERROR_PAIR) {
pair<int, int> freeCell = findCell(n->grid);
list<array<unsigned char, N *N>> choices =
getChoices(freeCell.first, freeCell.second, n->grid);
if (choices.empty()) {
delete n;
continue;
}
addChoices(choices, *n);
for (Node *&n : n->child) { //Store all the children in tasks
tasks.push_back(n);
}
delete n;
continue;
} else if (isSolution(n->grid)) {
if (!solutionFound.load()) {
solutionFound.store(true);
printGrid(n->grid);
cout << "That's the first solution found !" << endl;
}
delete n;
return;
}
}
}
//Start the computation sequentially, until we have enough works to start all the threads togheter
vector<Node *> findChunks(Node *root, const int &nw) {
vector<Node *> tasks;
vector<Node *> nodes;
nodes.push_back(root);
while ((int)tasks.size() < nw && !solutionFound) {
tasks.clear();
solveOneStep(nodes, tasks);
if (tasks.empty()) {
vector<Node *> error;
cout << "errore" << endl;
return error;
}
nodes = tasks;
}
return tasks;
}
//Assign each part of the work to each worker
vector<vector<Node *>> splitChunks(vector<Node *> &tasks, int nw) {
int freeWorker = nw;
vector<vector<Node *>> works(nw, vector<Node *>());
for (int i = 0; i < nw; i++) {
int limit = 0;
i == nw - 1 ? limit = tasks.size()
: limit = ceil(tasks.size() / double(freeWorker));
for (int j = 0; j < limit; j++) {
works[i].push_back(tasks.back());
tasks.pop_back();
}
freeWorker--;
}
return works;
}
vector<Node *> solveTest(vector<Node *> &nodes) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
vector<Node *> results;
if (solutionFound) {
for (Node *&t : nodes) {
delete t;
}
return results;
}
for (Node *&n : nodes) {
if (findCell(n->grid) != ERROR_PAIR) { //There is an empty cell
pair<int, int> freeCell = findCell(n->grid);
list<array<unsigned char, N *N>> choices =
getChoices(freeCell.first, freeCell.second, n->grid);
if (choices.empty()) {
delete n;
continue;
}
addChoices(choices, *n); //Update the tree
for (Node *&child : n->child) {
results.push_back(child);
};
delete n;
continue;
} else if (isSolution(n->grid) && !solutionFound.load()) { //Grid is full, check for a solution
solutionFound = true;
printGrid(n->grid);
cout << "That's the first solution found !" << endl;
delete n;
return results;
} else { //Grid full but it's not a solution
delete n;
continue;
}
}
return results;
}
//Get a work from the queue
vector<Node *> getWork() {
unique_lock<mutex> lck(mtx);
auto tmp = queueWork.back();
queueWork.pop_back();
lck.unlock();
return tmp;
}
//Put a work in the queue
void pushWork(vector<Node *> &work) {
unique_lock<mutex> lck(mtx);
queueWork.push_back(work);
lck.unlock();
return;
}
struct firstThirdStage : ff_node_t<vector<Node *>> {
firstThirdStage(Node *root, const int nw) : root(root), nw(nw) {}
vector<Node *> *svc(vector<Node *> *task) {
if (task == nullptr) {
vector<Node *> tasks = findChunks(root, nw);
if (tasks.empty() && !solutionFound) { //No more moves to do, no solution.
cout << "This sudoku is unsolvable!" << endl;
delete task;
return EOS;
}
vector<vector<Node *>> works = splitChunks(tasks, nw);
for (size_t i = 0; i < works.size(); ++i) {
ff_send_out(new vector<Node *>(works[i]));
}
delete task;
return GO_ON;
}
//cout << threadSus << endl;
if (solutionFound.load()) { //After the first iteration
delete task;
return EOS;
} else {
if (!queueWork.empty()) {
vector<Node *> tmp;
tmp = getWork();
ff_send_out(new vector<Node *>(tmp));
delete task;
return GO_ON;
} else
if (++threadSus == nw) {
cout << "This sudoku is unsolvable!" << endl;
delete task;
return EOS;
}
}
delete task;
return GO_ON;
}
void svc_end() {
cout << "Done !" << endl;
}
Node *root;
const int nw;
int threadSus = 0; //Threads suspended
};
struct secondStage : ff_node_t<vector<Node *>> {
vector<Node *> *svc(vector<Node *> *task) {
vector<Node *> &t = *task;
vector<Node *> res = solveTest(t);
if (!res.empty()) {
pushWork(res);
} else {
for (auto &t : res){
delete t;
}
}
return task;
}
};
int main(int argc, char *argv[]) {
chrono::high_resolution_clock::time_point t1 =
chrono::high_resolution_clock::now();
array<unsigned char, N *N> grid = {
3, 0, 6, 5, 0, 8, 4, 0, 0, 5, 2, 0, 0, 0, 0, 0, 0, 0, 0, 8, 7,
0, 0, 0, 0, 3, 1, 0, 0, 3, 0, 1, 0, 0, 8, 0, 9, 0, 0, 8, 6, 3,
0, 0, 5, 0, 5, 0, 0, 9, 0, 6, 0, 0, 1, 3, 0, 0, 0, 0, 2, 5, 0,
0, 0, 0, 0, 0, 0, 0, 7, 4, 0, 0, 5, 2, 0, 6, 3, 0, 0};
array<unsigned char, N *N> testGrid2 = {
0, 0, 0, 5, 7, 8, 4, 9, 2, 0, 0, 0, 1, 3, 4, 7, 6, 8, 0, 0, 0,
6, 2, 9, 5, 3, 1, 2, 6, 3, 0, 1, 5, 9, 8, 7, 9, 7, 4, 8, 6, 0,
1, 2, 5, 8, 5, 1, 7, 9, 2, 6, 4, 3, 1, 3, 8, 0, 4, 7, 2, 0, 6,
6, 9, 2, 3, 5, 1, 8, 7, 4, 7, 4, 5, 0, 8, 6, 3, 1, 0};
if (argc < 2) {
std::cerr << "use: " << argv[0] << " nworkers\n";
return -1;
}
array<unsigned char, N *N> testGrid = {0};
generateMatrix(12,20, testGrid);
Node *root = newNode(testGrid);
const size_t nworkers = std::stol(argv[1]);
firstThirdStage firstthird(root, nworkers);
std::vector<std::unique_ptr<ff_node>> W;
for (size_t i = 0; i < nworkers; ++i)
W.push_back(make_unique<secondStage>());
ff_Farm<vector<Node *>> farm(std::move(W), firstthird);
farm.remove_collector(); // needed because the collector is present by
// default in the ff_Farm
farm.wrap_around(); // this call creates feedbacks from Workers to the
// Emitter
// farm.set_scheduling_ondemand(); // optional
ffTime(START_TIME);
if (farm.run_and_wait_end() < 0) {
error("running farm");
return -1;
}
ffTime(STOP_TIME);
std::cout << "Time: " << ffTime(GET_TIME) << "\n";
chrono::high_resolution_clock::time_point t2 =
chrono::high_resolution_clock::now();
chrono::duration<double> time_span =
chrono::duration_cast<chrono::duration<double>>(t2 - t1);
std::cout << "It took me " << time_span.count() << " seconds." << endl;
return (0);
}