This concise article expounds on the implementation of a multi-producer, multi-consumer, wait-free, fixed-size FIFO queue.
To the best of my knowledge, I invented this algorithm in the summer of 2016, based on my notes. However, it is evident that the concept is quite obvious and has probably been invented by someone else before or after that date. It has not been patented, although I have not conducted a patent search.
The provided code is intended to serve as a snippet explaining an idea of the algorithm.
One potential way to make it faster is to replace %
(reminder) with
mask, but this requires that QUEUE_SIZE
is 2^n
.
Obviously, the algorithm in this article has a maximum number of
elements that can be delivered via this queue, but a real-life
implementation that uses unsigned long
, which is usually DWORD
or
2^64
, which makes such a limitation negligible on any real-life
throughput.
/* queue needs at least one empty element, to avoid infinity loop */
#define QUEUE_SIZE (123 + 1)
struct queue_item {
unsigned long qi_seq;
int qi_idx;
};
struct queue {
struct queue_item q_item[QUEUE_SIZE];
unsigned long q_head_seq;
unsigned long q_tail_seq;
};
...
/* reset / initialize the queues */
for (i = 0; i < QUEUE_SIZE; i++) {
q_in.q_item[i].qi_seq = i;
q_in.q_item[i].qi_idx = -1;
q_out.q_item[i].qi_seq = i;
q_out.q_item[i].qi_idx = -1;
}
q_in.q_head_seq = 0;
q_in.q_tail_seq = 0;
q_out.q_head_seq = 0;
q_out.q_tail_seq = 0;
...
static inline int
queue_op(struct queue_item *items, unsigned long *seq, int dequeue, int idx) {
long diff;
unsigned long s;
struct queue_item *item;
for (;;) {
s = *seq;
item = &items[(int)(s % QUEUE_SIZE)];
diff = item->qi_seq - (s + dequeue);
if (diff == 0) {
/* item can be locked */
if (atomic_cas_ulong(seq, s, s + 1) != s)
continue;
/* it is locked */
if (dequeue) {
idx = item->qi_idx;
item->qi_seq = s + QUEUE_SIZE;
return (idx);
} else {
item->qi_idx = idx;
item->qi_seq++;
return (0);
}
}
if (diff < 0) {
/* queue is full */
return (-1);
}
/* diff > 0 - means another thread moved sequence */
}
}
int
queue_dequeue(struct queue *q)
{
return queue_op(q->q_item, &q->q_tail_seq, 1, -1);
}
int
queue_enqueue(struct queue *q, int idx)
{
return queue_op(q->q_item, &q->q_head_seq, 0, idx);
}