1

Introduction

I have a vector entities containing 44 million names. I want to split it into 4 parts and process each part in parallel. Class Freebase contains the function loadData() which is used to split the vector and call function multiThread in order to do the processing.

  • loadEntities() reads a text file containing the names. I didn't put the implementation in the class because it's not important
  • loadData() splits the vector entities that was initialized in the constructor into 4 parts and adds every part the vector<thread> threads as follows:

threads.push_back(thread(&Freebase::multiThread, this, i, i + right, ref(data)));
  • multiThread is the function where I process the files
  • i and i+right are the indices used in the for loop of multithread to loop through entities
  • returnValues is a subfunction of multiThreadand is used to call an external function.

Problem

cout <<"Entity " << entities[i] << endl; is showing the following results:

  • Entity m.0rzf6wv (ok)
  • Entity m.0rzf70 (ok)
  • Entity m.068s4h9 m.0n_k8bz (WRONG)
  • Entity Entity m.068s5_1 (WRONG)

The last 2 outputs are wrong. The output should be:

  • Entity name not entity entity name nor entity name name

This is causing a segmentation fault when the input is being sent to function returnValues. How can I solve it?


Source Code

#ifndef FREEBASE_H
#define FREEBASE_H

class Freebase
{
 public:
    Freebase(const std::string &, const std::string &, const std::string &, const std::string &);
    void loadData();
 private:
   std::string _serverURL;
   std::string _entities;
   std::string _xmlFile;
   void multiThread(int,int, std::vector<std::pair<std::string, std::string>> &);
   //private data members
   std::vector<std::string> entities;
};

#endif

#include "Freebase.h"
#include "queries/SparqlQuery.h"

Freebase::Freebase(const string & url, const string & e, const string & xmlFile, const string & tfidfDatabase):_serverURL(url), _entities(e), _xmlFile(xmlFile), _tfidfDatabase(tfidfDatabase)
{
  entities = loadEntities();
}

void Freebase::multiThread(int start, int end, vector<pair<string,string>> & data)
{
  string basekb = "PREFIX basekb:<http://rdf.basekb.com/ns/> ";
  for(int i = start; i < end; i++)
  {
     cout <<"Entity " << entities[i] << endl;
     vector<pair<string, string>> description = returnValues(basekb + "select ?description where {"+ entities[i] +" basekb:common.topic.description ?description. FILTER (lang(?description) = 'en') }");
     string desc = "";
     for(auto &d: description)
     {
       desc += d.first + " ";
     }
     data.push_back(make_pair(entities[i], desc));
  }
}


void Freebase::loadData()
{
  vector<pair<string, string>> data;
  vector<thread> threads;
  int Size = entities.size();
  //split database into 4 parts
  int p = 4;
  int right = round((double)Size / (double)p);
  int left = Size % p;
  float totalduration = 0;
  
  vector<pair<int, int>> coordinates;
  int counter = 0;
  for(int i = 0; i < Size; i += right)
  {

      if(i < Size - right)
      {
      threads.push_back(thread(&Freebase::multiThread, this, i, i + right, ref(data)));
      }
      else
      {
      threads.push_back(thread(&Freebase::multiThread, this, i, Size, ref(data)));
      }
      
  }//end outer for
  
   for(auto &t : threads)
   {
      t.join();
   }
   
}


vector<pair<string, string>>  Freebase::returnValues(const string & query)
{
  vector<pair<string, string>> data;
  SparqlQuery sparql(query, _serverURL);
  string result = sparql.retrieveInformations();
  istringstream str(result);
  string line;
  //skip first line
  getline(str,line);
  while(getline(str, line))
  {
    vector<string> values;
    line.erase(remove( line.begin(), line.end(), '\"' ), line.end());
    
    boost::split(values, line, boost::is_any_of("\t"));
    if(values.size() == 2)
    {
      pair<string,string> fact = make_pair(values[0], values[1]);
      data.push_back(fact);
    }
    else
    {
      data.push_back(make_pair(line, ""));
    }
  }
  
  return data;
}//end function
Community
  • 1
  • 1
Hani Goc
  • 2,371
  • 5
  • 45
  • 89
  • I'll check this link @ArnonZilca . I still wonder if i am passing a wrong input to returnValues – Hani Goc Jun 24 '15 at 08:21
  • It seems like you're calling `data.push_back` from within your threads, and altering the vector unsafely from several threads - correct me if I'm wrong. I think that's the cause of your segfault problem. – ArnonZ Jun 24 '15 at 08:24
  • yes correct. That's what I am doing. I want to actually be able to push the names and results that I get from `returnValues` into `data` – Hani Goc Jun 24 '15 at 08:27
  • why do you think that output to `cout` impacts the operation of `returnValues` ? Not synchronizing output will cause messed up output, but that's the only effect it should have. Your issue is more likely to be another synchronization issue (for a different shared resource). – Sander De Dycker Jun 24 '15 at 08:28
  • 2
    Then I think you are better off with either creating 4 result vectors and merging them after join, or writing **safely** (using a mutex) to the result vector from each thread - I'm guessing using several vectors will be quicker for a small number of threads. – ArnonZ Jun 24 '15 at 08:33
  • @ArnonZilca: Besides the number of threads, the number of elements in the result vectors will also play a role here, right? – Timo Jun 24 '15 at 08:54
  • 1
    indeed. The more elements the more you'll pay for mutex overhead. Reading a little more about thread safety and vectors, I found out that you can write safely to the same vector from multiple threads as long as you are writing to different indices (check [this](http://stackoverflow.com/questions/2951361/can-multiple-threads-access-a-vector-at-different-places) out). This solution will save you both mutexes (of solution 1) and the vector merging in the end (of solution 2). – ArnonZ Jun 24 '15 at 09:03
  • @ArnonZilca it is mentioned by @Valdo **The only situation you have to worry about is when new elements are added, which is impossible in your case.** So if the vector is already initialized there should be no problem as in the case of `entities` The problem is that when I adding element to `data`. is that right? – Hani Goc Jun 24 '15 at 09:13
  • 1
    You're calling `push_back` which adds elements anyway - If you updated them, I think you should be ok. – ArnonZ Jun 24 '15 at 09:26
  • 1
    But updating elements would require you to know in advance how many elements there are **exactly**, **always**. – Timo Jun 24 '15 at 09:30

1 Answers1

3

EDIT: Arnon Zilca is correct in his comments. You are writing to a single vector from multiple threads (in Freebase::multiThread()), a recipe for disaster. You can use a mutex as described below to protect the push_back operation.

For more info on thread safety on containers see Is std::vector or boost::vector thread safe?.

So:

mtx.lock();
data.push_back(make_pair(entities[i], desc));
mtx.unlock();

Another option is using the same strategy as you do in returnValues, creating a local vector in multiThread and only pushing the contents to the data vector when thread is done processing.

So:

void Freebase::multiThread(int start, int end, vector<pair<string,string>> & data)
{
  vector<pair<string,string>> threadResults;
  string basekb = "PREFIX basekb:<http://rdf.basekb.com/ns/> ";
  for(int i = start; i < end; i++)
  {
     cout <<"Entity " << entities[i] << endl;
     vector<pair<string, string>> description = returnValues(basekb + "select ?description where {"+ entities[i] +" basekb:common.topic.description ?description. FILTER (lang(?description) = 'en') }");
     string desc = "";
     for(auto &d: description)
     {
       desc += d.first + " ";
     }
     threadResults.push_back(make_pair(entities[i], desc));
  }
  mtx.lock()
  data.insert(data.end(), threadResults.begin(), threadResults.end());
  mtx.unlock()
}

Note: I would suggest using a different mutex than the one you use for the cout. The overall result vector data is a different resource than cout. So threads who want to use cout, should not have to wait for another thread to finish with data.

/EDIT

You could use a mutex around

cout <<"Entity " << entities[i] << endl;

That would prevent multiple threads using cout at "the same time". That way you can be sure that an entire message is printed by a thread before another thread gets to print a message. Note that this will impact your performance since threads will have to wait for the mutex to become available before they are allowed to print.

Note: Protecting the cout will only cleanup your output on the stream, it will not influence the behavior of the rest of the code, see above for that.

See http://www.cplusplus.com/reference/mutex/mutex/lock/ for an example.

// mutex::lock/unlock
#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <mutex>          // std::mutex

std::mutex mtx;           // mutex for critical section

void print_thread_id (int id) {
  // critical section (exclusive access to std::cout signaled by locking mtx):
  mtx.lock();
  std::cout << "thread #" << id << '\n';
  mtx.unlock();
}

int main ()
{
  std::thread threads[10];
  // spawn 10 threads:
  for (int i=0; i<10; ++i)
    threads[i] = std::thread(print_thread_id,i+1);

  for (auto& th : threads) th.join();

  return 0;
}
Community
  • 1
  • 1
Timo
  • 2,212
  • 2
  • 25
  • 46
  • concerning the function `returnValues` that I call in multiThread. Should it also be surrounded by mutex lock and unlock? – Hani Goc Jun 24 '15 at 08:20
  • 2
    No, returnValues appears to use local variables only, which should be thread safe. As for the input, it is a const local variable, so the function only has read access, which should not be a problem either. – Timo Jun 24 '15 at 08:35
  • I have a question concerning "std::mutex mtx;" should It be a private member of the class? – Hani Goc Jun 24 '15 at 09:20
  • 1
    For `cout` it should be a global variable, since `cout` is not a class member. For the `data` vector it can be a private class member of `FreeBase`, since each instance of `FreeBase` will have it's own `data`. – Timo Jun 24 '15 at 09:25
  • Was coming to that, just hit enter too soon ;) – Timo Jun 24 '15 at 09:28
  • i have a question concerning passing shared_ptr as parameters. i'll post it if it's ok – Hani Goc Jun 24 '15 at 12:59
  • I think you'd better make a new question for that. – Timo Jun 24 '15 at 19:55