0

I am currently refactoring some code I found in the nvidia hardware encoder for compressing video images. The original question is here: wondering if I can use stl smart pointers for this

Based on the answers, I have updated my code as follows:

Based on the answers and the comments, I have tried to make a thread-safe buffer array. Here it is. Please comment.

#ifndef __BUFFER_ARRAY_H__
#define __BUFFER_ARRAY_H__

#include <vector>
#include <mutex>
#include <thread>

template<class T>
class BufferArray
{
public:
    class BufferArray()
        :num_pending_items(0), pending_index(0), available_index(0)
    {}

    // This method is not thread-safe. 
    // Add an item to our buffer list
    // Note we do not take ownership of the incoming pointer.
    void add(T * buffer)
    {
        buffer_array.push_back(buffer);
    }

    // Returns a naked pointer to an available buffer. Should not be
    // deleted by the caller. 
    T * get_available()
    {
        std::lock_guard<std::mutex> lock(buffer_array_mutex);
        if (num_pending_items == buffer_array.size()) {
            return NULL;
        }       
        T * buffer = buffer_array[available_index];
        // Update the indexes.
        available_index = (available_index + 1) % buffer_array.size();
        num_pending_items += 1;
        return buffer;
    }

    T * get_pending()
    {
        std::lock_guard<std::mutex> lock(buffer_array_mutex);
        if (num_pending_items == 0) {
            return NULL;
        }

        T * buffer = buffer_array[pending_index];
        pending_index = (pending_index + 1) % buffer_array.size();
        num_pending_items -= 1;
        return buffer;
    }


private:
    std::vector<T * >                   buffer_array;
    std::mutex                          buffer_array_mutex;
    unsigned int                        num_pending_items;
    unsigned int                        pending_index;
    unsigned int                        available_index;

    // No copy semantics
    BufferArray(const BufferArray &) = delete;
    void operator=(const BufferArray &) = delete;
};

#endif

My question is whether I am breaking some C++ good practice recommendations here? Also, I am expending the class so that it can be accessed and used my multiple threads. I was wondering if there is anything that I might have missed.

Community
  • 1
  • 1
Luca
  • 10,458
  • 24
  • 107
  • 234
  • 1
    Your `add()` is not thread safe. It also needs a lock guard. Otherwise, the question is too vague and nebulous. – Sam Varshavchik Sep 27 '16 at 12:22
  • 3
    Unrelated to your question, but symbols starting with double underscores, or with an underscore followed by an upper-case letter are reserved in all scopes. See [this old answer](http://stackoverflow.com/a/228797/440558) for more information. – Some programmer dude Sep 27 '16 at 12:23
  • @JoachimPileborg But I have not used anything starting with single or double underscore? You mean the include guard? – Luca Sep 27 '16 at 12:24
  • @SamVarshavchik Thanks for your comment. I will; add that as well. Currently, the use case is that it gets filled and then used but adding the lock guard does not hurt. Thanks you for that. – Luca Sep 27 '16 at 12:25
  • 2
    @Luca Include guards count in the rule. – NathanOliver Sep 27 '16 at 12:25
  • 1
    Your comment says you do not take ownership of the incoming pointer but the function itself **does** take ownership of it. Is `buffer_array` supposed to contain unique pointers or raw pointers? – Galik Sep 27 '16 at 12:26
  • @Galik Sorry! I ahd mistakenly pasted old code where I did design it with unique_ptr. Fixed now. Thanks for the snipe! – Luca Sep 27 '16 at 12:27
  • would need to know the use case. The class as it stands smells bad. can you post some code from the call sites? – Richard Hodges Sep 27 '16 at 12:28
  • @RichardHodges Yes, it will be slightly complicated as the nvidia codec is basically a C style API. Ok, I will do my best to create something to highlight the use case but it is slightly complicated and not yet ready on my side yet.Can you tell me what aspects of the code smells bad so far? – Luca Sep 27 '16 at 12:30
  • 1
    @Luca 1. there seems to be no indication how large the buffers are (unless this is encoded?) 2. There is a mutex but no means of signalling that the buffer buffer is empty/full. So the consumer (and provider) must poll. Is this what you intended? – Richard Hodges Sep 27 '16 at 12:32
  • @RichardHodges I am guessing the number of buffers are basically given by the length of the vector but querying this is not important in the application but I guess it could be added. The `get_pending` and `get_available` return NULL in case the buffer is full or no items are left to process. But yes, there will be a processing thread dedicated to just processing this buffer and polling it for items that need to be processed. – Luca Sep 27 '16 at 12:35
  • 2
    Why not use something like a [lock free queue](http://www.boost.org/doc/libs/1_59_0/doc/html/boost/lockfree/queue.html)? You can push all the jobs into the queue and then worker threads can keep popping them out as they need. If you need to put the job back in when done then the worker thread can just push it back into the back of the queue. – NathanOliver Sep 27 '16 at 12:36
  • @NathanOliver Thanks for that as well. I will look int it. – Luca Sep 27 '16 at 12:37
  • Is the purpose of this structure to wait for some work to be done on each buffer before the consumer then performs some more work on the buffer, knowing that the co-process has finished with it? – Richard Hodges Sep 27 '16 at 12:40
  • So basically there is a thread that takes images or video frames to be compressed and pushes it onto this queue and calls some nvidia HW compression libraries on it and there is another thread that takes those same compressed items and writes to the disk. So, there is an encode thread and a flush thread and the idea is that this queue is shared by them. – Luca Sep 27 '16 at 12:43
  • ok, will post a skeleton of how i'd approach it – Richard Hodges Sep 27 '16 at 13:04

1 Answers1

1

I think I'd approach it something like this:

In this test, the "processing" is just multiplying an int by 2. But notice how the processor thread takes pending data off a pending queue, processes it, then pushes available data to the available queue. Then it signals (via the condition variable) that the consumer (in this case, your disk-writer) should look again for available data.

#include <vector>
#include <mutex>
#include <thread>
#include <queue>
#include <condition_variable>
#include <iostream>

namespace notstd {
    template<class Mutex> auto getlock(Mutex& m)
    {
        return std::unique_lock<Mutex>(m);
    }
}

template<class T>
class ProcessQueue
{
public:
    ProcessQueue()
    {}

    // This method is not thread-safe.
    // Add an item to our buffer list
    // Note we do not take ownership of the incoming pointer.
    // @pre start_processing shall not have been called
    void add(T * buffer)
    {
        pending_.push(buffer);
    }

    void start_processing()
    {
        process_thread_ = std::thread([this] {
            while(not this->pending_.empty())
            {
                auto lock = notstd::getlock(this->mutex_);
                auto buf = this->pending_.front();
                lock.unlock();

                //
                // this is the part that processes the "buffer"

                *buf *= 2;

                //
                // now notify the structure that the processing is done - buffer is available
                //

                lock.lock();
                this->pending_.pop();
                this->available_.push(buf);
                lock.unlock();
                this->change_.notify_one();
            }
        });
    }

    T* wait_available()
    {
        auto lock = notstd::getlock(mutex_);
        change_.wait(lock, [this] { return not this->available_.empty() or this->pending_.empty(); });
        if (not available_.empty())
        {
            auto p = available_.front();
            available_.pop();
            return p;
        }

        lock.unlock();
        process_thread_.join();
        return nullptr;
    }

private:
    std::queue<T * >                   pending_;
    std::queue<T * >                   available_;
    std::mutex                          mutex_;
    std::condition_variable             change_;
    std::thread                     process_thread_;

    // No copy semantics - implicit because of the mutex
};

int main()
{
    ProcessQueue<int> pq;

    std::vector<int> v = { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
    for (auto& i : v) {
        pq.add(std::addressof(i));
    }

    pq.start_processing();

    while (auto p = pq.wait_available())
    {
        std::cout << *p << '\n';
    }
}

expected output:

2
4
6
8
10
12
14
16
18
Richard Hodges
  • 68,278
  • 7
  • 90
  • 142