Convert Figma logo to code with AI

rigtorp logoMPMCQueue

A bounded multi-producer multi-consumer concurrent queue written in C++11

1,174
160
1,174
9

Top Related Projects

A fast multi-producer, multi-consumer lock-free concurrent queue for C++11

28,292

An open-source C++ library developed and used at Facebook.

Concurrent data structures in C++

C++ lockless queue.

2,546

A C++ library of Concurrent Data Structures

Quick Overview

MPMCQueue is a bounded multi-producer multi-consumer lock-free queue for C++. It's designed for high performance in concurrent scenarios, allowing multiple threads to safely enqueue and dequeue elements without the need for locks.

Pros

  • Lock-free implementation for high performance in multi-threaded environments
  • Bounded queue with a fixed capacity, preventing unbounded memory growth
  • Header-only library for easy integration into projects
  • Supports move-only types, increasing flexibility for various use cases

Cons

  • Requires C++11 or later, which may limit compatibility with older codebases
  • Fixed capacity needs to be set at compile-time, reducing runtime flexibility
  • May have higher memory usage compared to non-lock-free alternatives due to padding for cache line alignment
  • Limited to power-of-two sizes for the queue capacity

Code Examples

  1. Creating and using the queue:
#include "MPMCQueue.h"
#include <thread>

rigtorp::MPMCQueue<int> queue(1024); // Create a queue with capacity 1024

// Producer thread
std::thread producer([&queue]() {
    for (int i = 0; i < 100; ++i) {
        queue.push(i);
    }
});

// Consumer thread
std::thread consumer([&queue]() {
    int value;
    for (int i = 0; i < 100; ++i) {
        while (!queue.try_pop(value));
        // Process value
    }
});

producer.join();
consumer.join();
  1. Using the queue with move-only types:
#include "MPMCQueue.h"
#include <memory>

rigtorp::MPMCQueue<std::unique_ptr<int>> queue(128);

std::unique_ptr<int> ptr = std::make_unique<int>(42);
queue.push(std::move(ptr));

std::unique_ptr<int> result;
queue.pop(result);
  1. Non-blocking operations:
#include "MPMCQueue.h"

rigtorp::MPMCQueue<int> queue(256);

int value = 10;
if (queue.try_push(value)) {
    // Successfully pushed
}

int result;
if (queue.try_pop(result)) {
    // Successfully popped
}

Getting Started

To use MPMCQueue in your project:

  1. Download the MPMCQueue.h header file from the GitHub repository.
  2. Include the header in your C++ project:
#include "MPMCQueue.h"

// Create a queue with a capacity of 1024 elements
rigtorp::MPMCQueue<int> queue(1024);

// Use the queue in your multi-threaded application
// ...

Ensure your project is compiled with C++11 or later support enabled.

Competitor Comparisons

A fast multi-producer, multi-consumer lock-free concurrent queue for C++11

Pros of concurrentqueue

  • More feature-rich, offering multiple queue types (MPMC, SPSC, MPSC)
  • Generally higher performance, especially for high-contention scenarios
  • Extensive documentation and benchmarks provided

Cons of concurrentqueue

  • More complex implementation, potentially harder to understand and maintain
  • Larger codebase and memory footprint
  • May have higher overhead for simple use cases

Code Comparison

MPMCQueue:

template <typename T>
class MPMCQueue {
public:
  explicit MPMCQueue(const size_t capacity);
  bool try_push(const T &value);
  bool try_pop(T &value);
};

concurrentqueue:

template<typename T, size_t MAX_SUBQUEUES = 32>
class ConcurrentQueue {
public:
  ConcurrentQueue();
  void enqueue(T element);
  bool try_dequeue(T& element);
  size_t size_approx() const;
};

Key Differences

  • MPMCQueue is specifically designed for multi-producer, multi-consumer scenarios, while concurrentqueue offers more queue types
  • concurrentqueue provides additional features like bulk operations and implicit sizing
  • MPMCQueue has a simpler API, focusing on core push/pop operations
  • concurrentqueue uses more advanced techniques for lock-free implementation, potentially offering better scalability

Both libraries are well-suited for concurrent programming, but concurrentqueue may be preferred for more complex scenarios or when higher performance is crucial, while MPMCQueue might be a better choice for simpler use cases or when a smaller, more focused implementation is desired.

28,292

An open-source C++ library developed and used at Facebook.

Pros of Folly

  • Comprehensive C++ library with a wide range of utilities and components
  • Actively maintained by Facebook with frequent updates and improvements
  • Extensive documentation and examples available

Cons of Folly

  • Large codebase with many dependencies, potentially increasing build times
  • May include unnecessary components for projects only needing a queue implementation
  • Steeper learning curve due to the breadth of features

Code Comparison

MPMCQueue:

MPMCQueue<int> q(100);
q.push(42);
int value;
q.pop(value);

Folly:

folly::MPMCQueue<int> q(100);
q.write(42);
int value;
q.read(value);

Summary

MPMCQueue is a focused, single-purpose implementation of a multi-producer multi-consumer queue. It's lightweight and easy to integrate into projects. Folly, on the other hand, is a comprehensive C++ library that includes an MPMC queue implementation along with many other utilities. While Folly offers more features and ongoing support from Facebook, it may be overkill for projects only needing a queue. The choice between the two depends on project requirements and the desire for a minimal vs. feature-rich solution.

Concurrent data structures in C++

Pros of Junction

  • Offers a wider variety of concurrent data structures (queues, maps, tuples)
  • Provides more flexibility with customizable memory allocation and reclamation strategies
  • Supports multiple platforms including Windows, Linux, and macOS

Cons of Junction

  • May have higher memory overhead due to its more complex implementation
  • Potentially slower performance for simple use cases compared to MPMCQueue's focused design
  • Less frequently updated, with the last major update being several years ago

Code Comparison

MPMCQueue:

MPMCQueue<int> q(100);
q.push(42);
int value;
q.pop(value);

Junction:

junction::ConcurrentMap_Leapfrog<int, int> map;
map.assign(42, 100);
int value = map.get(42);

Summary

MPMCQueue is a focused, high-performance multi-producer multi-consumer queue implementation, while Junction offers a broader range of concurrent data structures with more customization options. MPMCQueue may be preferable for simpler use cases requiring maximum performance, while Junction provides more flexibility for complex concurrent programming scenarios across multiple platforms.

C++ lockless queue.

Pros of atomic_queue

  • Generally faster performance, especially for high-concurrency scenarios
  • More memory-efficient, using less memory per queue element
  • Supports both bounded and unbounded queue implementations

Cons of atomic_queue

  • Less mature project with potentially fewer real-world deployments
  • Documentation is not as comprehensive as MPMCQueue
  • May have less robust error handling and edge case coverage

Code Comparison

MPMCQueue:

template <typename T>
class MPMCQueue {
public:
  explicit MPMCQueue(const size_t capacity);
  bool try_push(const T &value);
  bool try_pop(T &value);
};

atomic_queue:

template <typename T, unsigned N>
class atomic_queue {
public:
  atomic_queue() = default;
  bool try_push(T element);
  bool try_pop(T& element);
};

Both implementations offer similar core functionality with try_push and try_pop methods. However, atomic_queue uses a template parameter for queue size, while MPMCQueue uses a constructor parameter. This difference reflects atomic_queue's focus on compile-time optimizations.

atomic_queue provides a more diverse set of queue implementations, including bounded and unbounded variants, which may offer better performance in specific use cases. MPMCQueue, on the other hand, focuses on a single, well-tested implementation that may be more suitable for general-purpose use in production environments.

2,546

A C++ library of Concurrent Data Structures

Pros of libcds

  • Offers a wider range of concurrent data structures beyond just queues
  • Provides more flexibility with various implementation options for each data structure
  • Supports both C++11 and C++14 standards

Cons of libcds

  • More complex API due to its broader scope and flexibility
  • Potentially higher learning curve for developers new to the library
  • May have slightly higher overhead due to its generalized approach

Code Comparison

MPMCQueue:

MPMCQueue<int> q(100);
q.push(42);
int value;
q.pop(value);

libcds:

cds::container::MSQueue<cds::gc::HP, int> q;
q.enqueue(42);
int value;
q.dequeue(value);

Summary

MPMCQueue is a focused, single-purpose library for multi-producer/multi-consumer queues, while libcds is a comprehensive concurrent data structures library. MPMCQueue may be simpler to use for specific queue implementations, but libcds offers more versatility for various concurrent data structure needs. The choice between them depends on the project requirements and the developer's familiarity with concurrent programming concepts.

Convert Figma logo designs to code with AI

Visual Copilot

Introducing Visual Copilot: A new AI model to turn Figma designs to high quality code using your components.

Try Visual Copilot

README

MPMCQueue.h

C/C++ CI License

A bounded multi-producer multi-consumer concurrent queue written in C++11.

It's battle hardened and used daily in production:

It's been cited by the following papers:

  • Peizhao Ou and Brian Demsky. 2018. Towards understanding the costs of avoiding out-of-thin-air results. Proc. ACM Program. Lang. 2, OOPSLA, Article 136 (October 2018), 29 pages. DOI: https://doi.org/10.1145/3276506

Example

MPMCQueue<int> q(10);
auto t1 = std::thread([&] {
  int v;
  q.pop(v);
  std::cout << "t1 " << v << "\n";
});
auto t2 = std::thread([&] {
  int v;
  q.pop(v);
  std::cout << "t2 " << v << "\n";
});
q.push(1);
q.push(2);
t1.join();
t2.join();

Usage

  • MPMCQueue<T>(size_t capacity);

    Constructs a new MPMCQueue holding items of type T with capacity capacity.

  • void emplace(Args &&... args);

    Enqueue an item using inplace construction. Blocks if queue is full.

  • bool try_emplace(Args &&... args);

    Try to enqueue an item using inplace construction. Returns true on success and false if queue is full.

  • void push(const T &v);

    Enqueue an item using copy construction. Blocks if queue is full.

  • template <typename P> void push(P &&v);

    Enqueue an item using move construction. Participates in overload resolution only if std::is_nothrow_constructible<T, P&&>::value == true. Blocks if queue is full.

  • bool try_push(const T &v);

    Try to enqueue an item using copy construction. Returns true on success and false if queue is full.

  • template <typename P> bool try_push(P &&v);

    Try to enqueue an item using move construction. Participates in overload resolution only if std::is_nothrow_constructible<T, P&&>::value == true. Returns true on success and false if queue is full.

  • void pop(T &v);

    Dequeue an item by copying or moving the item into v. Blocks if queue is empty.

  • bool try_pop(T &v);

    Try to dequeue an item by copying or moving the item into v. Return true on sucess and false if the queue is empty.

  • ssize_t size();

    Returns the number of elements in the queue.

    The size can be negative when the queue is empty and there is at least one reader waiting. Since this is a concurrent queue the size is only a best effort guess until all reader and writer threads have been joined.

  • bool empty();

    Returns true if the queue is empty.

    Since this is a concurrent queue this is only a best effort guess until all reader and writer threads have been joined.

All operations except construction and destruction are thread safe.

Implementation

Memory layout

Enqeue:

  1. Acquire next write ticket from head.
  2. Wait for our turn (2 * (ticket / capacity)) to write slot (ticket % capacity).
  3. Set turn = turn + 1 to inform the readers we are done writing.

Dequeue:

  1. Acquire next read ticket from tail.
  2. Wait for our turn (2 * (ticket / capacity) + 1) to read slot (ticket % capacity).
  3. Set turn = turn + 1 to inform the writers we are done reading.

References:

Testing

Testing concurrency algorithms is hard. I'm using two approaches to test the implementation:

  • A single threaded test that the functionality works as intended, including that the element constructor and destructor is invoked correctly.
  • A multithreaded fuzz test that all elements are enqueued and dequeued correctly under heavy contention.

TODO

  • Add allocator supports so that the queue could be used with huge pages and shared memory
  • Add benchmarks and compare to boost::lockfree::queue and others
  • Use C++20 concepts instead of static_assert if available
  • Use std::hardware_destructive_interference_size if available
  • Add API for zero-copy deqeue and batch dequeue operations
  • Add [[nodiscard]] attributes

About

This project was created by Erik Rigtorp <erik@rigtorp.se>.