-1

I have the following sorted vectors:

vector<unsigned> vector1;
vector<unsigned> vector2;
vector<unsigned> vector3;
...
vector<unsigned> vector30000;

I need to perform the intersection of vector1 with the rest of the vectors. i.e. I need to perform the following intersections:

vectori1=intersection of vector1 with vector2;
vectori2=intersection of vector1 with vector3;
vectori3=intersection of vector1 with vector4;
...
vectori30000=intersection of vector1 with vector30000;

Now I need to find out all the non-empty vector's vectori1,vectori2,vectori3,...,vectori30000 and store them in "intersected" vector.

In order to do so I wrote the following serialized code:

int main()
{
    vector<unsigned> vector1;
    vector1.push_back(10); vector1.push_back(20); vector1.push_back(30);
    vector<vector<unsigned> > vecOfVectors;
    vector<unsigned> vector2;
    vector2.push_back(1); vector2.push_back(5); vector2.push_back(8); vector2.push_back(10);
    vecOfVectors.push_back(vector2);
    vector<unsigned> vector3;
    vector3.push_back(3); vector3.push_back(20); vector3.push_back(25);
    vecOfVectors.push_back(vector3);
    vector<unsigned> vector4;
    vector4.push_back(28); vector4.push_back(29); vector4.push_back(39);
    vecOfVectors.push_back(vector4);
    vector<vector<unsigned> > intersected;
    for(vector<vector<unsigned> >::iterator it=vecOfVectors.begin(),l=vecOfVectors.end();it!=l;++it)
    {
        vector<unsigned> intersectedLocal;
        std::set_intersection(vector1.begin(),vector1.end,(*it).begin(),(*it).end(),back_inserter(intersectedLocal));
        if(!intersectedLocal.empty())
            intersected.push_back(intersectedLocal);
    }
}

In order to improve performance I need to parallelize the intersection algorithm. I am not getting how to do the same. My attempt is shown below:

void multThreadIntersect(vector<unsigned>& vector1, vector<vector<unsigned> >::iterator it, int size,int i,vector<vector<unsigned> >& intersected,vector<int>& idIntersected)
{
    if(i<size) 
    {        
        vector<unsigned> intersectedLocal;
        std::set_intersection(vector1.begin(),vector1.end,(*it).begin(),(*it).end(),back_inserter(intersectedLocal));
        it++;
        idIntersected.push_back(i);
        intersected.push_back(intersectedLocal);
        auto future = std::async(std::launch::async,multThreadIntersect, vector1, it, size,intersected,idIntersected);
        future.wait();
        i++;
    }
    else
    {
        return;        
    }
}

int main()
{
    vector<unsigned> vector1;
    vector1.push_back(10); vector1.push_back(20); vector1.push_back(30);
    vector<vector<unsigned> > vecOfVectors;
    vector<unsigned> vector2;
    vector2.push_back(1); vector2.push_back(5); vector2.push_back(8); vector2.push_back(10);
    vecOfVectors.push_back(vector2);
    vector<unsigned> vector3;
    vector3.push_back(3); vector3.push_back(20); vector3.push_back(25);
    vecOfVectors.push_back(vector3);
    vector<unsigned> vector4;
    vector4.push_back(28); vector4.push_back(29); vector4.push_back(39);
    vecOfVectors.push_back(vector4);
    vector<vector<unsigned> >::iterator it=vecOfVectors.begin();
    int size=vecOfVectors.size();
    int i=0;
    vector<vector<unsigned> > intersected;
    vector<int> idIntersected; //contains those i's whose intersection was non-zero
    long unsigned int nThreads = std::thread::hardware_concurrency();
    multThreadIntersect(vector1,it,size,i,intersected,idIntersected);    
    cout<<"id intersected vector:";
    for(vector<int>::iterator it=idIntersected.begin(),l=idIntersected.end();it!=l;++it)
        cout<<" "<<(*it);
    cout<<"\n";
}

The gcc version that I am using is: gcc (GCC) 4.8.2 20140120 (Red Hat 4.8.2-15)

I have already defined _GLIBCXX_PARALLEL in my program. However, since vector1's intersection with vector2,...,vector30000 are independent of each other. Therefore, I am thinking of parallelly intersecting vector1 with vector2, vector1 with vector3, and vector1 with vector30000

Ami Tavory
  • 74,578
  • 11
  • 141
  • 185
Steg Verner
  • 893
  • 2
  • 12
  • 28
  • The problem with your approach is that you still end up with 29999 intersection that you then have to intersect again. That at least feels slow for me (of course, maybe it isn't). – Baum mit Augen May 24 '15 at 17:21
  • @BaummitAugen Yes I need to perform 29999 intersections, but if I can parallelize the intersections then that I think can be a bit faster – Steg Verner May 24 '15 at 17:23
  • No, the problem I see with this is not that you need to do a lot of intersections for this, but that having done those intersection would not help you much: After your great, parallel intersection, you still need to intersect `vectori2`, ..., `vectori30000` to get to the intersection of all of the original vectors. (Unless I got wrong what you are actually trying to do). – Baum mit Augen May 24 '15 at 17:33
  • @BaummitAugen No I dont need to intersect vectori2,...,vectori30000. I am just storing these vectors in vector >. I really dont need to intersect them again – Steg Verner May 24 '15 at 17:34
  • All right, then I'm afraid I did not understand your question. – Baum mit Augen May 24 '15 at 17:35
  • @BaummitAugen Perhaps the serialized code can help u in understanding as to what I am doing. I am sorry but the parallelized code is messed up a bit..and I am not getting as to how can I correct the same. – Steg Verner May 24 '15 at 17:37
  • So, you have an attempt. What went wrong with your attempt? – Yakk - Adam Nevraumont May 24 '15 at 17:47
  • @Yakk I am not able to find out how to control the number of threads spawned. Ideally it should be equal to std::thread::hardware_concurrency(); but I am not getting where and how should I set the same in my code – Steg Verner May 24 '15 at 17:49

2 Answers2

2

You can use OpenMP to parallel it:

omp_set_num_threads(2); // here set number of threads, if not set it defaults to number of the cores in the machine

#pragma omp parallel for
for (int x = 0; x < vecOfVectors.size(); ++x)
{
    vector<unsigned> intersectedLocal;
    std::set_intersection(vector1.begin(), vector1.end(), vecOfVectors[x].begin(), vecOfVectors[x].end(), back_inserter(intersectedLocal));
    if (!intersectedLocal.empty())
    {
        #pragma omp critical // execute one thread at a time
        intersected.push_back(intersectedLocal);
    }
}

To enable OpenMP add to the linker: -fopenmp

doqtor
  • 8,414
  • 2
  • 20
  • 36
  • Why are the number of threads set as 2. Also how can I use openmp with gcc4.8.2...do I need to install it.. – Steg Verner May 24 '15 at 18:11
  • @StegVerner I updated my answer. You can set number of threads but if you don't then it will be set to number of cores on your machine by default. No need to install anything, just enable openmp in the linker as I wrote in my answer. – doqtor May 24 '15 at 18:15
  • Do I need to add -lgomp also as library to be used – Steg Verner May 24 '15 at 18:23
  • @StegVerner No, you don't need to add -lgomp, -fopenmp is sufficient. – doqtor May 24 '15 at 18:29
  • Sorry but I just found it, I have an additional condition -- when all the values present in vector1 have been found in vecOfVectors[] (without further iterating over it) then I want to exit out of the outer for loop, is it possible to include the above logic using openmp – Steg Verner May 24 '15 at 18:41
  • @StegVerner I'm not sure I understand what you mean ... As far as I'm aware you cannot use `break` statement within the parallel loop. To discontinue processing you can add `if(..) continue;` so that it won't do anything below, there may be something else you can use. There is a lot you can do with openmp, you can share variables between threads etc. I recommend reading OpenMP documentation or finding a good tutorial. I also removed mutex from my example as there is openmp equivalent `#pragma omp critical` as that's openmp we are using here. – doqtor May 24 '15 at 19:42
1

There are a number of libraries/tools that implement thread pools. Microsoft's ppl, Intel's tbb, OpenMP (in descending order of ease of use from my experience). Use one of them if you have it available.


Here is a mass intersection function.

It uses async. It can transparently be converted to use a thread pool by replacing the call to async, and passing the thread pool in, or calling global functions backed by a thread pool (depending on the library you are using):

std::vector<std::vector<unsigned>>
mass_intersect(
  std::vector<unsigned> filter,
  std::initializer_list< std::reference_wrapper<std::vector<unsigned> const> > targets
) {
  std::vector< std::future< std::vector<unsigned>>> working;
  working.reserve(targets.size());
  for (auto const& rhs:targets) {
    working.push_back( std::async( std::launch::async, [&rhs,&filter]{
      std::vector<unsigned> result;
      // do filter on rhs.get() and filter into result
      return result;
    });
  }
  // convert the above futures into a return value:
  std::vector< std::vector<unsigned>> retval;
  retval.reserve(working.size());
  for (auto&& r_f:working) {
    auto r = r_f.get(); // block
    if (r.empty()) continue;
    retval.push_back(std::move(r));
  }
  return retval;
}

I left the actual intersection code unfinished.

It takes its vectors as reference wraps of a const vector. You can construct this with:

{ std::ref( v1 ), std::ref( v2 ), std::ref(v3) }

but really, all it needs is an iterable collection of source vectors.


here is a mass intersection function

It uses async. It can transparently be converted to use a thread pool by replacing the call to async, and passing the thread pool in:

std::vector<std::vector<unsigned>>
mass_intersect(
  std::vector<unsigned> filter,
  std::initializer_list< std::reference_wrapper<std::vector<unsigned> const> > targets
) {
  std::vector< std::future< std::vector<unsigned>>> working;
  working.reserve(targets.size());
  for (auto const& rhs:targets) {
    working.push_back( std::async( std::launch::async, [&rhs,&filter]{
      std::vector<unsigned> result;
      // do filter on rhs.get() and filter into result
      return result;
    });
  }
  // convert the above futures into a return value:
  std::vector< std::vector<unsigned>> retval;
  retval.reserve(working.size());
  for (auto&& r_f:working) {
    auto r = r_f.get(); // block
    if (r.empty()) continue;
    retval.push_back(std::move(r));
  }
  return retval;
}

I left the actual intersection code unfinished.

It takes its vectors as reference wraps of a const vector. You can construct this with:

{ std::ref( v1 ), std::ref( v2 ), std::ref(v3) }

but really, all it needs is an iterable collection of source vectors.

Now, most existing thread pools don't interact seemlessly with std::future and C++11 synchronization primitives (among other reasons, because they predate it). So some adaptation will have to be done, unless you write your own thread pool.


If you want to write a thread pool, here is a brief sketch A thread pool can be written with a condition variable, a mutex (or shared_timed_mutex), and a vector of promised_tasks.

It maintains a collection of threads, which pop tasks off your promised_task queue.

The interface looks a lot like async when you add a task. A sketch of an interface:

struct thread_pool {
  template<class F>
  std::future<std::result_of_t<std::decay_t<F>()>> queue(F&&f);

  std::future<void> abort(); // empties queue, kills all threads

  size_t thread_count() const;
  void add_threads(size_t n=1);

  ~thread_pool();
  thread_pool();
  thread_pool(thread_pool&&)=delete;
  thread_pool& operator=(thread_pool&&)=delete;
private:
  // stuff
};

I wrote a sketch of an implementation in another stack overflow answer recently. The above only supports adding worker threads 1 at a time: the above interface lets you enqueue as many as you want at once. Changing that is easy. It also only supports void return type -- again, should be easy to fix.

Yakk - Adam Nevraumont
  • 262,606
  • 27
  • 330
  • 524
  • Thanks a lot for the help. But I am really a novice at parallel programming and I am not getting as to how can I replace async, and passing the thread pool in. I really apologize for being novice but can you please help me a bit. – Steg Verner May 24 '15 at 18:02
  • @StegVerner If you have problems passing a (reference to) an object to a function, and calling a method, then you need to step back and learn more basic C++ before you start working with concurrency. – Yakk - Adam Nevraumont May 24 '15 at 19:06