C++ Concurrent Queues

ISO/IEC JTC1 SC22 WG21 N3353 = 12-0043 - 2012-01-14

Lawrence Crowl, crowl@google.com, Lawrence@Crowl.org

Introduction
Conceptual Interface
    Basic Operations
    Closed Queues
    Empty Queues
    Queue Names
    Iterators
Binary Interfaces
    Virtual Interfaces
    Queue Wrapper
A Concrete Buffer Queue
Managed Indirection
Synopsis

Introduction

The existing deque in the standard library is an inherently sequential data structure. Its reference-returning element access operations cannot synchronize access to those elements with other queue operations.

Concurrent pushes and pops on queues requires a different interface to the queues.

Conceptual Interface

We provide basic queue operations, and then extend those operations to cover other important issues.

Basic Operations

The essential solution to the problem is to shift to a value-based operations, rather than reference-based operations.

The waiting operations are:

void queue::push(const Element&);

Push the Element onto the queue. If the operation cannot be completed immediately, wait until it can be completed.

Element queue::pop();

Pop the Element from the queue. If the operation cannot be completed immediately, wait until it can be completed.

Of course, there are operations to attempt a push or pop, and execute that operation only if it can be completed immediately.

queue_op_status queue::try_push(const Element&);

Try to immediately push the Element onto the queue. If successful, return queue_op_status::success. If the queue was full, return queue_op_status::full.

queue_op_status queue::try_pop(Element&);

Try to immediately pop the Element from the queue. If successful, return queue_op_status::success. If the queue was empty, return queue_op_status::empty.

We intend to extend this interface to cover movable element types as well.

Closed Queues

Threads using a queue for communication need some mechanism to signal when the queue is no longer needed. Rather than require an out-of-band signal, we chose to directly support such a signal in the queue itself, which considerably simplified coding.

To achieve this signal, a thread may close a queue. Once closed, no new elements may be pushed onto the queue. Push operations on a closed queue will either return queue_op_status::closed (when they have a status return) or throw queue_op_status::closed (when they do not). Elements already on the queue may be popped off. When an queue is empty and closed, pop operations will either return queue_op_status::closed (when they have a status return) or throw queue_op_status::closed (when they do not).

The operations are as follows.

void queue::close();

Close the queue.

bool queue::is_closed();

Return true iff the queue is closed.

queue_op_status queue::wait_push(const Element&);

Push the Element onto the queue. If successful, return queue_op_status::success. If the queue was closed, return queue_op_status::closed. Otherwise, wait for either of the above to become true.

queue_op_status queue::wait_pop(Element&);

Try to immediately pop the Element from the queue. If successful, return queue_op_status::success. If the queue was closed, return queue_op_status::closed. Otherwise, wait for either of the above to become true.

Empty Queues

It is sometimes desirable to know if a queue is empty.

bool queue::is_empty();

Return true iff the queue is empty.

This operation is useful only during intervals when the queue is known to not be subject to pushes and pops from other threads. Its primary use case is assertions on the state of the queue at the end if its lifetime.

Queue Names

It is sometimes desirable for queues to be able to identify themselves. This feature is particularly helpful for run-time diagnotics, particularly when 'ends' become dynamically passed around between threads. See Managed Indirection below.

const char* queue::name();

Return the name string provided as a parameter to queue construction.

Iterators

In order to enable the use of existing algorithms with concurrent queues, they need to support iterators. Output iterators will push to a queue and input iterators will pop from a queue. Stronger forms of iterators are in general not possible with concurrent queues.

Binary Interfaces

The standard library is template based, but it is often desirable to have a binary interface that shields the client from the concrete queue implementation. We achieve this capability with type erasure.

Virtual Interfaces

We provide classes that collectively provide indirect access to concurrent queues. First, a template class queue_front provides push access to a queue. Second, a template class queue_back provides pop access to a queue. Third, a template class queue_base, derived from both queue_front and queue_back, provides full access to a queue. Operations on these classes are virtual.

Queue Wrapper

To obtain a queue_base from an non-virtual concurrent queue, use the queue_wrap function, which takes a pointer to the non-virtual queue and returns a pointer to an instance of the template class queue_wrapper, which is derived from queue_base. In addition, queue_wrapper takes a parameter that specifies whether or not ownership of the queue is to transfer to the queue_base object or not.

A Concrete Buffer Queue

We provide a concrete concurrent queue in the form of a fixed-size buffer_queue. It provides the conceptual operations above, construction of an empty queue, and construction of a queue from a pair of iterators. Constructors take a parameter specifying the maximum number of elements in the buffer. Constructors may also take a parameter specifying the name of the queue. If the name is not present, it defaults to the empty string.

The buffer_queue deletes the default constructor, the copy constructor, and the copy assignment operator. We feel that their benefit might not justify their potential confusion.

Managed Indirection

Long running servers may have the need to reconfigure the relationship between queues and threads. The ability to pass 'ends' of queues between threads with automatic memory management eases programming.

To this end, we provide four template classes, shared_queue_front, shared_queue_back, unique_queue_front, and unique_queue_back, that are shared_ptr/unique_ptr analogs of queue_front and queue_back. Obtain a pair of these pointers from template functions shared_queue_ends and unique_queue_ends.

Synopsis

The synopsis of the binary/virtual interfaces are as follows.

template <typename Element>
class queue_front;

template <typename Element>
class queue_back;

template <typename Element>
class queue_front_iter
:
    public std::iterator<std::output_iterator_tag, Element,
                         ptrdiff_t, const Element*, const Element&>
{
  public:
    queue_front_iter( queue_front<Element>* q);

    queue_front_iter& operator *();
    queue_front_iter& operator ++();
    queue_front_iter& operator ++(int);
    queue_front_iter& operator =(const Element& value);

    bool operator ==(const queue_front_iter<Element>& y);
    bool operator !=(const queue_front_iter<Element>& y);
};

template <typename Element>
class queue_back_iter
:
    public std::iterator<std::input_iterator_tag, Element,
                         ptrdiff_t, const Element*, const Element&>
{
  public:
    class value
    {
      public:
        value( Element v);
        Element operator *() const;
    };

    queue_back_iter( queue_back<Element>* q);

    const Element& operator *() const;
    const Element* operator ->() const;
    queue_back_iter& operator ++();
    value operator ++(int);

    bool operator ==(const queue_back_iter<Element>& y);
    bool operator !=(const queue_back_iter<Element>& y);
};

enum class queue_op_status
{
    success = 0,
    empty,
    full,
    closed
};

template <typename Element>
class queue_common
{
  public:
    typedef Element& reference;
    typedef const Element& const_reference;
    typedef Element value_type;

    virtual void close() = 0;
    virtual bool is_closed() = 0;
    virtual bool is_empty() = 0;

    virtual const char* name() = 0;

  protected:
    virtual ~queue_common();
};

template <typename Element>
class queue_front
:
    public virtual queue_common<Element>
{
  public:
    typedef queue_front_iter<Element> iterator;
    typedef const queue_front_iter<Element> const_iterator;

    iterator begin();
    iterator end();
    const iterator cbegin();
    const iterator cend();

    virtual void push(const Element& x) = 0;
    virtual queue_op_status try_push(const Element& x) = 0;
    virtual queue_op_status wait_push(const Element& x) = 0;
};

template <typename Element>
class queue_back
:
    public virtual queue_common<Element>
{
  public:
    typedef queue_back_iter<Element> iterator;
    typedef queue_back_iter<Element> const_iterator;

    iterator begin();
    iterator end();
    const iterator cbegin();
    const iterator cend();

    virtual Element pop() = 0;
    virtual queue_op_status try_pop(Element&) = 0;
    virtual queue_op_status wait_pop(Element&) = 0;
};

template <typename Element>
class queue_base
:
    public virtual queue_front<Element>, public queue_back<Element>
{
  public:
    virtual ~queue_base();
};

enum class ownership { transfer, retain };

template <typename Queue>
class queue_wrapper
:
    public virtual queue_base <typename Queue::element_type>
{
  public:
    typedef typename Queue::element_type element_type;

    queue_wrapper(Queue* arg, ownership xfer);
    virtual ~queue_wrapper();
    virtual void close();
    virtual bool is_closed();
    virtual bool is_empty();
    virtual const char* name();
    virtual void push(const element_type& x);
    virtual queue_op_status try_push(const element_type& x);
    virtual queue_op_status wait_push(const element_type& x);
    virtual element_type pop();
    virtual queue_op_status try_pop(element_type& x);
    virtual queue_op_status wait_pop(element_type& x);
};

template <typename Queue>
queue_wrapper <Queue>* queue_wrap(Queue* arg, ownership xfer);