0

Author said that the MPMC Queue implemented in this article is not lock free and is blocking producers/consumers queue.

According to the classification it's MPMC, array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO, blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lockfree in the official meaning, just implemented by means of atomic RMW operations w/o mutexes.

but according to this (http://www.1024cores.net/home/lock-free-algorithms/introduction):

Lock-freedom Lock-freedom means that a system as a whole moves forward regardless of anything. Forward progress for each individual thread is not guaranteed (that is, individual threads can starve).

An example of a lockfree algorithm is:

void stack_push(stack * s, node * n) {
  node * head;
  do {
    head = s - > head;
    n - > next = head;
  }
  while (!atomic_compare_exchange(s - > head, head, n));
}

As can be seen, a thread can "whirl" in the cycle theoretically infinitely. But every repeat of the cycle means that some other thread had made forward progress (that is, successfully pushed a node to the stack). A blocked/interrupted/terminated thread can not prevent forward progress of other threads. Consequently, the system as a whole undoubtedly makes forward progress.

seems that the MPMC Queue above are satisfy these assumption in lock-free. but why says that It's not lockfree in the official meaning , makes me confusing. want to know if i miss something here ?

not familiar with c++, is the c# port version here is Blocking Algorithms, not lock-free too ?

code in the article are quoted here:

template<typename T>

class mpmc_bounded_queue

{

public:

  mpmc_bounded_queue(size_t buffer_size)

    : buffer_(new cell_t [buffer_size])

    , buffer_mask_(buffer_size - 1)

  {

    assert((buffer_size >= 2) &&

      ((buffer_size & (buffer_size - 1)) == 0));

    for (size_t i = 0; i != buffer_size; i += 1)

      buffer_[i].sequence_.store(i, std::memory_order_relaxed);

    enqueue_pos_.store(0, std::memory_order_relaxed);

    dequeue_pos_.store(0, std::memory_order_relaxed);

  }

  ~mpmc_bounded_queue()

  {

    delete [] buffer_;

  }

  bool enqueue(T const& data)

  {

    cell_t* cell;

    size_t pos = enqueue_pos_.load(std::memory_order_relaxed);

    for (;;)

    {

      cell = &buffer_[pos & buffer_mask_];

      size_t seq = 

        cell->sequence_.load(std::memory_order_acquire);

      intptr_t dif = (intptr_t)seq - (intptr_t)pos;

      if (dif == 0)

      {

        if (enqueue_pos_.compare_exchange_weak

            (pos, pos + 1, std::memory_order_relaxed))

          break;

      }

      else if (dif < 0)

        return false;

      else

        pos = enqueue_pos_.load(std::memory_order_relaxed);

    }

    cell->data_ = data;

    cell->sequence_.store(pos + 1, std::memory_order_release);

    return true;

  }

  bool dequeue(T& data)

  {

    cell_t* cell;

    size_t pos = dequeue_pos_.load(std::memory_order_relaxed);

    for (;;)

    {

      cell = &buffer_[pos & buffer_mask_];

      size_t seq = 

        cell->sequence_.load(std::memory_order_acquire);

      intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);

      if (dif == 0)

      {

        if (dequeue_pos_.compare_exchange_weak

            (pos, pos + 1, std::memory_order_relaxed))

          break;

      }

      else if (dif < 0)

        return false;

      else

        pos = dequeue_pos_.load(std::memory_order_relaxed);

    }

    data = cell->data_;

    cell->sequence_.store

      (pos + buffer_mask_ + 1, std::memory_order_release);

    return true;

  }

private:

  struct cell_t

  {

    std::atomic<size_t>   sequence_;

    T                     data_;

  };

  static size_t const     cacheline_size = 64;

  typedef char            cacheline_pad_t [cacheline_size];

  cacheline_pad_t         pad0_;

  cell_t* const           buffer_;

  size_t const            buffer_mask_;

  cacheline_pad_t         pad1_;

  std::atomic<size_t>     enqueue_pos_;

  cacheline_pad_t         pad2_;

  std::atomic<size_t>     dequeue_pos_;

  cacheline_pad_t         pad3_;

  mpmc_bounded_queue(mpmc_bounded_queue const&);

  void operator = (mpmc_bounded_queue const&);

}; 
Jun
  • 121
  • 1
  • 12
  • "**blocking** producers and consumers queue" - *Blocking* means the queue is not lock-free. – Tsyvarev Jan 20 '18 at 12:41
  • yes, i know the blocking means not lock-free, but according to the code implementation, seems there is no blocking here. ( Lock-freedom means that a system as a whole moves forward regardless of anything) CAS here can ensure this, there always one producer or one consumer can move forward. maybe change my question, why it is a blocking queue ? @Tsyvarev – Jun Jan 20 '18 at 13:54
  • Producer's `enqueue()` method computes `intptr_t dif = (intptr_t)seq - (intptr_t)pos;`, and waits while it is positive. This looks like a busy waiting for concurrent producer. BTW, if you ask about the code on Stack Overflow, you need to provide it in the question post, not just link it (unless it is very well-known piece code, which is not true in the given case). – Tsyvarev Jan 20 '18 at 14:30
  • sorry for not quote the code in the question, update the question. when dif == 0 , then try to acquire the enqueue/dequeue permission. if one acquired then the progress goes on, and other concurrent action are failed then others will try again. when diff < 0 , means the queue is full/empty then return. when diff > 0, means other concurrent action take first, then try again. so there is no blocking ? – Jun Jan 21 '18 at 05:28
  • as the author make an example for lock-free, says : As can be seen, a thread can "whirl" in the cycle theoretically infinitely. But every repeat of the cycle means that some other thread had made forward progress. looks like they are same as the producer waits when dif is positive. – Jun Jan 21 '18 at 05:30
  • 1
    I think this and https://stackoverflow.com/questions/45907210/lock-free-progress-guarantees implement the same basic idea for a MPMC queue. The answers there explain why it's not lock-free in a computer-science sense, even though it *is* lockless (no mutexes) and fast in practice. IIRC, a writer that sleeps or is killed half way through claiming a slot will block reads from getting past that slot, eventually leading to all other readers and writers and writers being blocked. A reader that sleeps at the wrong time can also block the queue. – Peter Cordes Jan 21 '18 at 08:21
  • 1
    @PeterCordes thanks very much, I think they are the same question, and answer there makes it very clear. Like BeeOnRope says the contention is inherent in the FIFO queue structure. – Jun Jan 21 '18 at 09:51

0 Answers0