5

I have an implementation of a lockfree queue, which I believe to be correct (or at least data race-free):

#include <atomic>
#include <iostream>
#include <optional>
#include <thread>

struct Job {
  int id;
  int data;
};

class JobQueue {
  using stdmo = std::memory_order;

  struct Node {
    std::atomic<Node *> next = QUEUE_END;
    Job job;
  };

  static inline Node *const QUEUE_END = nullptr;
  static inline Node *const STACK_END = QUEUE_END + 1;

  struct GenNodePtr {
    Node *node;
    std::uintptr_t gen;
  };

  alignas(64) std::atomic<Node *> jobs_back;
  alignas(64) std::atomic<GenNodePtr> jobs_front;
  alignas(64) std::atomic<GenNodePtr> stack_top;

 public:
  JobQueue()
      : jobs_back{new Node{}},
        jobs_front{GenNodePtr{jobs_back.load(stdmo::relaxed), 1}},
        stack_top{GenNodePtr{STACK_END, 1}} {}

  ~JobQueue() {
    Node *cur_queue = jobs_front.load(stdmo::relaxed).node;
    while (cur_queue != QUEUE_END) {
      Node *next = cur_queue->next;
      delete cur_queue;
      cur_queue = next;
    }

    Node *cur_stack = stack_top.load(stdmo::relaxed).node;
    while (cur_stack != STACK_END) {
      Node *next = cur_stack->next;
      delete cur_stack;
      cur_stack = next;
    }
  }

  Node *allocate_node() {
    GenNodePtr cur_stack = stack_top.load(stdmo::acquire);
    while (true) {
      if (cur_stack.node == STACK_END) {
        return new Node{};
      }
      Node *cur_stack_next = cur_stack.node->next.load(stdmo::relaxed);
      GenNodePtr new_stack{cur_stack_next, cur_stack.gen + 1};
      if (stack_top.compare_exchange_weak(cur_stack, new_stack,
                                          stdmo::acq_rel)) {
        return cur_stack.node;
      }
    }
  }

  void deallocate_node(Node *node) {
    GenNodePtr cur_stack = stack_top.load(stdmo::acquire);
    while (true) {
      node->next.store(cur_stack.node, stdmo::relaxed);
      GenNodePtr new_stack{node, cur_stack.gen + 1};
      if (stack_top.compare_exchange_weak(cur_stack, new_stack,
                                          stdmo::acq_rel)) {
        break;
      }
    }
  }

 public:
  void enqueue(Job job) {
    Node *new_node = allocate_node();
    new_node->next.store(QUEUE_END, stdmo::relaxed);
    Node *old_dummy = jobs_back.exchange(new_node, stdmo::acq_rel);
    old_dummy->job = job;
    old_dummy->next.store(new_node, stdmo::release);
  }

  std::optional<Job> try_dequeue() {
    GenNodePtr old_front = jobs_front.load(stdmo::relaxed);
    while (true) {
      Node *old_front_next = old_front.node->next.load(stdmo::acquire);
      if (old_front_next == QUEUE_END) {
        return std::nullopt;
      }

      GenNodePtr new_front{old_front_next, old_front.gen + 1};
      if (jobs_front.compare_exchange_weak(old_front, new_front,
                                           stdmo::relaxed)) {
        break;
      }
    }

    Job job = old_front.node->job;
    deallocate_node(old_front.node);

    return job;
  }
};

int main() {
  JobQueue queue;

  std::atomic<int> i = 0;

  std::thread consumer{[&queue, &i]() {
    // producer enqueues 1
    while (i.load(std::memory_order_relaxed) != 1) {}
    std::atomic_thread_fence(std::memory_order_acq_rel);

    std::cout << queue.try_dequeue().value_or(Job{-1, -1}).data
              << std::endl;

    std::atomic_thread_fence(std::memory_order_acq_rel);
    i.store(2, std::memory_order_relaxed);

    // producer enqueues 2 and 3
  }};

  std::thread producer{[&queue, &i]() {
    queue.enqueue(Job{1, 1});

    std::atomic_thread_fence(std::memory_order_acq_rel);
    i.store(1, std::memory_order_relaxed);

    // consumer consumes here

    while (i.load(std::memory_order_relaxed) != 2) {}
    std::atomic_thread_fence(std::memory_order_acq_rel);

    queue.enqueue(Job{2, 2});
    queue.enqueue(Job{3, 3});
  }};

  producer.join();
  consumer.join();

  return 0;
}

This queue is implemented as a singly-linked double ended linked list. It uses a dummy node to decouple producers and consumers, and it uses a generation counter and node recycling (using an internal stack) to avoid the ABA problem and a use-after-free in try_dequeue.

Running this under TSan compiled with Clang 13.0.1, Linux x64, I get the following race:

WARNING: ThreadSanitizer: data race (pid=223081)
  Write of size 8 at 0x7b0400000008 by thread T2:
    #0 JobQueue::enqueue(Job) .../bug4.cpp:85 (bug4.tsan+0xe3e53)
    #1 operator() .../bug4.cpp:142 (bug4.tsan+0xe39ee)
    ...

  Previous read of size 8 at 0x7b0400000008 by thread T1:
    #0 JobQueue::try_dequeue() .../bug4.cpp:104 (bug4.tsan+0xe3c07)
    #1 operator() .../bug4.cpp:121 (bug4.tsan+0xe381c)
    ...

Run on Godbolt (note, because of how Godbolt runs the program, TSan doesn't show line number information)

This race is between this previous read in try_dequeue called from the consumer thread:

    Job job = old_front.node->job;

and this later write in enqueue, which is the third call to enqueue by the producer thread:

    old_dummy->job = job;

I believe this race to be impossible, because the producer thread should synchronise with the consumer thread via the acquire-release compare-exchange to stack_top in allocate_node and deallocate_node.

Now, the weird thing is that making GenNodePointer alignas(32) removes the race.

Run on Godbolt

Questions:

  1. Is this race actually possible?
  2. Why does increasing the alignment of GenNodePointer make TSan no longer register a race?
angelsl
  • 395
  • 3
  • 14
  • OT: Those `alignas(64)` are there to avoid false sharing? If so, you might want to look at the following question: https://stackoverflow.com/questions/29199779/false-sharing-and-128-byte-alignment-padding – Daniel Langr Feb 23 '22 at 08:41
  • @DanielLangr: Yeah, they are. That's cool, thanks for that! – angelsl Feb 23 '22 at 09:04

0 Answers0