This concise article expounds on the implementation of a multi-producer, multi-consumer, wait-free, fixed-size FIFO queue.

To the best of my knowledge, I invented this algorithm in the summer of 2016, based on my notes. However, it is evident that the concept is quite obvious and has probably been invented by someone else before or after that date. It has not been patented, although I have not conducted a patent search.

The provided code is intended to serve as a snippet explaining an idea of the algorithm.

One potential way to make it faster is to replace % (reminder) with mask, but this requires that QUEUE_SIZE is 2^n.

Obviously, the algorithm in this article has a maximum number of elements that can be delivered via this queue, but a real-life implementation that uses unsigned long, which is usually DWORD or 2^64, which makes such a limitation negligible on any real-life throughput.

/* queue needs at least one empty element, to avoid infinity loop */
#define QUEUE_SIZE                (123 + 1)

struct queue_item {
    unsigned long                 qi_seq;
    int                           qi_idx;
};

struct queue {
    struct queue_item             q_item[QUEUE_SIZE];
    unsigned long                 q_head_seq;
    unsigned long                 q_tail_seq;
};

...

    /* reset / initialize the queues */
    for (i = 0; i < QUEUE_SIZE; i++) {
        q_in.q_item[i].qi_seq = i;
        q_in.q_item[i].qi_idx = -1;

        q_out.q_item[i].qi_seq = i;
        q_out.q_item[i].qi_idx = -1;
    }

    q_in.q_head_seq = 0;
    q_in.q_tail_seq = 0;
    q_out.q_head_seq = 0;
    q_out.q_tail_seq = 0;

...

static inline int
queue_op(struct queue_item *items, unsigned long *seq, int dequeue, int idx) {
    long diff;
    unsigned long s;
    struct queue_item *item;

    for (;;) {
        s = *seq;
        item = &items[(int)(s % QUEUE_SIZE)];
        diff = item->qi_seq - (s + dequeue);

        if (diff == 0) {
            /* item can be locked */
            if (atomic_cas_ulong(seq, s, s + 1) != s)
                continue;

            /* it is locked */

            if (dequeue) {
                idx = item->qi_idx;
                item->qi_seq = s + QUEUE_SIZE;
                return (idx);
            } else {
                item->qi_idx = idx;
                item->qi_seq++;
                return (0);
            }
        }

        if (diff < 0) {
            /* queue is full */
            return (-1);
        }

        /* diff > 0 - means another thread moved sequence */
    }
}

int
queue_dequeue(struct queue *q)
{
    return queue_op(q->q_item, &q->q_tail_seq, 1, -1);
}

int
queue_enqueue(struct queue *q, int idx)
{
    return queue_op(q->q_item, &q->q_head_seq, 0, idx);
}