如何实现一个多读多写的线程安全的无锁队列

描述

在ZMQ无锁队列的原理与实现一文中,我们已经知道了ypipe可以实现一线程写一线程读的无锁队列,那么其劣势就很明显了,无法适应多写多读的场景,因为其在读的时候没有对r指针加锁,在写的时候没有对w指针加锁。那么如何实现一个多读多写的线程安全的无锁队列呢?

  1. 互斥锁:mutexqueue(太简单不介绍了)
  2. 互斥锁+条件变量:blockqueue(太简单不介绍了)
  3. 内存屏障:lockfreequeue(SimpleLockFreeQueue.h 暂时未写文章介绍)
  4. 双重CAS原子操作:arraylockfreequeue(本文)

封装

2. ArrayLockFreeQueue的类接⼝和变量

该程序使用 gcc 内置的__sync_bool_compare_and_swap,但重新做了宏定义封装。

#define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)

所谓循环数组,就是RingBuffer

封装

#define QUEUE_INT unsigned long
#define ARRAY_LOCK_FREE_Q_DEFAULT_SIZE 65535 // 2^16

template
class ArrayLockFreeQueue {
public:

ArrayLockFreeQueue();

virtual ~ArrayLockFreeQueue();

QUEUE_INT size();

bool enqueue(const ELEM_T &a_data);// ⼊队列

bool dequeue(ELEM_T &a_data);// 出队列

private:

ELEM_T m_thequeue[Q_SIZE];

volatile QUEUE_INT m_count;// 队列内有多少元素
volatile QUEUE_INT m_writeIndex;//新元素⼊列时存放位置在数组中的下标

volatile QUEUE_INT m_readIndex;//下⼀个出列元素在数组中的下标

volatile QUEUE_INT m_maximumReadIndex;最后⼀个已经完成⼊列操作的元素在数组中的下标

//取余
inline QUEUE_INT countToIndex(QUEUE_INT a_count);
};

2.1 变量介绍

  • m_count:队列的元素个数
  • m_writeIndex:新元素⼊列时存放位置在数组中的下标
  • m_readIndex:下⼀个出列元素在数组中的下标
  • m_maximumReadIndex:最后⼀个已经完成⼊列操作的元素在数组中的下标。如果它的值跟m_writeIndex不⼀致,表明有写请求尚未完成。这意味着,有写请求成功申请了空间但数据还没完全写进队列。所以如果有线程要读取,必须要等到写线程将数据完全写⼊到队列之后。

必须指明的是使⽤3种不同的下标都是必须的,因为队列允许任意数量的⽣产者和消费者围绕着它⼯作。已经存在⼀种基于循环数组的⽆锁队列,使得唯⼀的⽣产者和唯⼀的消费者可以良好的⼯作。它的实现相当简洁⾮常值得阅读。

2.2 函数介绍

2.2.1 取余函数QUEUE_INT countToIndex(QUEUE_INT a_count);

这个函数非常有用,因为我们实现的是循环队列,所以一定要对数组长度取余。

template
inline QUEUE_INT ArrayLockFreeQueue::countToIndex(QUEUE_INT a_count) {
return (a_count % Q_SIZE); // 取余的时候
},>

队列已满判断如下

// (m_writeIndex + 1) %/Q_SIZE == m_readIndex
countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)

队列为空判断如下

//m_readIndex == m_maximumReadIndex
countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)

相关视频推荐

C++无锁队列的设计与实现

高并发场景下,三种锁方案:互斥锁,自旋锁,原子操作的优缺点

需要C/C++ Linux服务器架构师学习资料加qun812855908获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享

封装

2.2.2 文字举例

2.2.2.1 入队函数bool enqueue(const ELEM_T &a_data);

下面以文字举例说明函数,后续再图示举例。

假设现在有两个线程都进入了enqueue这个函数,m_writeIndex,m_readIndex和m_maximumReadIndex都为0。

  • 线程1在第一个while和CAS处:
currentWriteIndex = 0
currentReadIndex = 0
CAS(0,0,1)----> m_writeIndex = 1
m_maximumReadIndex = 0
  • 线程2在第一个while和CAS处:
currentWriteIndex = 0
currentReadIndex = 0
CAS(1,0,1)----> m_writeIndex = m_writeIndex false

while再次循环

currentWriteIndex = 1
currentReadIndex = 0
CAS(1,1,2)----> m_writeIndex = 2
m_maximumReadIndex = 0
  • 线程2在第二个while和CAS处:
CAS(0,1,2) -----> m_maximumReadIndex = m_maximumReadIndex false
yidld让出CPU
此时线程1执行↓
  • 线程1在第二个while和CAS处:
CAS(0,0,1) ------>m_maximumReadIndex = 1
执行结束,此时线程2恢复执行
  • 线程2在第二个while和CAS处:
CAS(1,1,2) ------>m_maximumReadIndex = 2
执行结束
template
bool ArrayLockFreeQueue::enqueue(const ELEM_T &a_data) {
QUEUE_INT currentWriteIndex; // 获取写指针的位置
QUEUE_INT currentReadIndex;
// 1. 获取可写入的位置
do {
currentWriteIndex = m_writeIndex;
currentReadIndex = m_readIndex;
if (countToIndex(currentWriteIndex + 1) ==
countToIndex(currentReadIndex)) {
return false; // 队列已经满了
}
// 目的是为了获取一个能写入的位置
} while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
// 获取写入位置后 currentWriteIndex 是一个临时变量,保存我们写入的位置
// We know now that this index is reserved for us. Use it to save the data
m_thequeue[countToIndex(currentWriteIndex)] = a_data; // 把数据更新到对应的位置

// 2. 更新可读的位置,按着m_maximumReadIndex+1的操作
// update the maximum read index after saving the data. It wouldn't fail if there is only one thread
// inserting in the queue. It might fail if there are more than 1 producer threads because this
// operation has to be done in the same order as the previous CAS
while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) {
// this is a good place to yield the thread in case there are more
// software threads than hardware processors and you have more
// than 1 producer thread
// have a look at sched_yield (POSIX.1b)
sched_yield(); // 当线程超过cpu核数的时候如果不让出cpu导致一直循环在此。
}
//printf("m_writeIndex:%d currentWriteIndex:%d m_maximumReadIndex:%dn",m_writeIndex,currentWriteIndex,m_maximumReadIndex);
AtomicAdd(&m_count, 1);

return true;
},>

2.2.2.2 出队函数bool dequeue(ELEM_T &a_data);

下面以文字举例说明函数,后续再图示举例。

假设现在有两个线程都进入了dequeue这个函数,currentReadIndex为0,currentMaximumReadIndex为2。

  • 线程1执行
currentReadIndex = 0
currentMaximumReadIndex = 2
data = m_thequeue[0]
CAS(0,0,1) ----> m_readIndex = 1
  • 线程2执行
currentReadIndex = 0
currentMaximumReadIndex = 2
data = m_thequeue[0]
CAS(1,0,1) ----> m_readIndex = m_readIndex false

while再循环

currentReadIndex = 1
currentMaximumReadIndex = 2
data = m_thequeue[1]
CAS(1,1,2) ----> m_readIndex = 2

如果没有新数据写入,再次读取数据,则currentReadIndex(2)==currentMaximumReadIndex(2)相等,return false,没有数据可读。

template
bool ArrayLockFreeQueue::dequeue(ELEM_T &a_data) {
QUEUE_INT currentMaximumReadIndex;
QUEUE_INT currentReadIndex;

do {
// to ensure thread-safety when there is more than 1 producer thread
// a second index is defined (m_maximumReadIndex)
currentReadIndex = m_readIndex;
currentMaximumReadIndex = m_maximumReadIndex;

if (countToIndex(currentReadIndex) ==
countToIndex(currentMaximumReadIndex)) // 如果不为空,获取到读索引的位置
{
// the queue is empty or
// a producer thread has allocate space in the queue but is
// waiting to commit the data into it
return false;
}
// retrieve the data from the queue
a_data = m_thequeue[countToIndex(currentReadIndex)]; // 从临时位置读取的

// try to perfrom now the CAS operation on the read index. If we succeed
// a_data already contains what m_readIndex pointed to before we
// increased it
if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) {
//printf("m_readIndex:%d currentReadIndex:%d m_maximumReadIndex:%dn",m_readIndex,currentReadIndex,m_maximumReadIndex);
AtomicSub(&m_count, 1); // 真正读取到了数据,元素-1
return true;
}
} while (true);

assert(0);
// Add this return statement to avoid compiler warnings
return false;
},>

2.2.3 图示举例

2.2.3.1 入队函数bool enqueue(const ELEM_T &a_data);

下面的图我就不分目录了,直接一口气说明完。

以下插图展示了对队列执⾏操作时各下标是如何变化的。如果⼀个位置被标记为X,标识这个位置⾥存放了数据。空⽩表示位置是空的。对于下图的情况,队列中存放了两个元素。WriteIndex指示的位置是新元素将会被插⼊的位置。ReadIndex指向的位置中的元素将会在下⼀次pop操作中被弹出。

封装

当⽣产者准备将数据插⼊到队列中,它⾸先通过增加WriteIndex的值来申请空间。MaximumReadIndex指向最后⼀个存放有效数据的位置(也就是实际的队列尾)。

封装

⼀旦空间的申请完成,⽣产者就可以将数据拷⻉到刚刚申请到的位置中。完成之后增加MaximumReadIndex使得它与WriteIndex的⼀致。

封装

现在队列中有3个元素,接着⼜有⼀个⽣产者尝试向队列中插⼊元素。

封装

在第⼀个⽣产者完成数据拷⻉之前,⼜有另外⼀个⽣产者申请了⼀个新的空间准备拷⻉数据。现在有两个⽣产者同时向队列插⼊数据。

封装

现在⽣产者开始拷⻉数据,在完成拷⻉之后,对MaximumReadIndex的递增操作必须严格遵循⼀个顺序:第⼀个⽣产者线程⾸先递增MaximumReadIndex,接着才轮到第⼆个⽣产者。这个顺序必须被严格遵守的原因是,我们必须保证数据被完全拷⻉到队列之后才允许消费者线程将其出列。让出cpu的⽬的也是为了让排在最前⾯的⽣产者完成m_maximumReadIndex的更新

while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) {
sched_yield(); // 当线程超过cpu核数的时候如果不让出cpu导致一直循环在此。
}

封装

第⼀个⽣产者完成了数据拷⻉,并对MaximumReadIndex完成了递增,现在第⼆个⽣产者可以递增MaximumReadIndex了。

封装

第⼆个⽣产者完成了对MaximumReadIndex的递增,现在队列中有5个元素。

template
bool ArrayLockFreeQueue::enqueue(const ELEM_T &a_data) {
QUEUE_INT currentWriteIndex; // 获取写指针的位置
QUEUE_INT currentReadIndex;
// 1. 获取可写入的位置
do {
currentWriteIndex = m_writeIndex;
currentReadIndex = m_readIndex;
if (countToIndex(currentWriteIndex + 1) ==
countToIndex(currentReadIndex)) {
return false; // 队列已经满了
}
// 目的是为了获取一个能写入的位置
} while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
// 获取写入位置后 currentWriteIndex 是一个临时变量,保存我们写入的位置
// We know now that this index is reserved for us. Use it to save the data
m_thequeue[countToIndex(currentWriteIndex)] = a_data; // 把数据更新到对应的位置

// 2. 更新可读的位置,按着m_maximumReadIndex+1的操作
// update the maximum read index after saving the data. It wouldn't fail if there is only one thread
// inserting in the queue. It might fail if there are more than 1 producer threads because this
// operation has to be done in the same order as the previous CAS
while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) {
// this is a good place to yield the thread in case there are more
// software threads than hardware processors and you have more
// than 1 producer thread
// have a look at sched_yield (POSIX.1b)
sched_yield(); // 当线程超过cpu核数的时候如果不让出cpu导致一直循环在此。
}
// printf("m_writeIndex:%d currentWriteIndex:%d m_maximumReadIndex:%dn",m_writeIndex,currentWriteIndex,m_maximumReadIndex);
AtomicAdd(&m_count, 1);

return true;
},>

2.2.3.2 出队函数bool dequeue(ELEM_T &a_data);

以下插图展示了元素出列的时候各种下标是如何变化的,队列中初始有2个元素。WriteIndex指示的位置是新元素将会被插⼊的位置。ReadIndex指向的位置中的元素将会在下⼀次pop操作中被弹出。

封装

消费者线程拷⻉数组ReadIndex位置的元素,然后尝试⽤CAS操作将ReadIndex加1。如果操作成功消费者成功的将数据出列。因为CAS操作是原⼦的,所以只有唯⼀的线程可以在同⼀时刻更新ReadIndex的值。如果操作失败,读取新的ReadIndex值,以重复以上操作(copy数据,CAS)。

封装

现在⼜有⼀个消费者将元素出列,队列变成空。

封装

现在有⼀个⽣产者正在向队列中添加元素。它已经成功的申请了空间,但尚未完成数据拷⻉。任何其它企图从队列中移除元素的消费者都会发现队列⾮空(因为writeIndex不等于readIndex)。但它不能读取readIndex所指向位置中的数据,因为readIndex与MaximumReadIndex相等(相等break)。直到⽣产者完成数据拷⻉增加MaximumReadIndex的值才能读取这个数据。

封装

当⽣产者完成数据拷⻉,队列的⼤⼩是1,消费者线程可以读取这个数据了。

template
bool ArrayLockFreeQueue::dequeue(ELEM_T &a_data) {
QUEUE_INT currentMaximumReadIndex;
QUEUE_INT currentReadIndex;

do {
// to ensure thread-safety when there is more than 1 producer thread
// a second index is defined (m_maximumReadIndex)
currentReadIndex = m_readIndex;
currentMaximumReadIndex = m_maximumReadIndex;

if (countToIndex(currentReadIndex) ==
countToIndex(currentMaximumReadIndex)) // 如果不为空,获取到读索引的位置
{
// the queue is empty or
// a producer thread has allocate space in the queue but is
// waiting to commit the data into it
return false;
}
// retrieve the data from the queue
a_data = m_thequeue[countToIndex(currentReadIndex)]; // 从临时位置读取的

// try to perfrom now the CAS operation on the read index. If we succeed
// a_data already contains what m_readIndex pointed to before we
// increased it
if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) {
//printf("m_readIndex:%d currentReadIndex:%d m_maximumReadIndex:%dn",m_readIndex,currentReadIndex,m_maximumReadIndex);
AtomicSub(&m_count, 1); // 真正读取到了数据,元素-1
return true;
}
} while (true);

assert(0);
// Add this return statement to avoid compiler warnings
return false;
},>

2.2.4 计算队列的大小size函数的ABA问题与解决方案

template
QUEUE_INT ArrayLockFreeQueue::size() {
QUEUE_INT currentWriteIndex = m_writeIndex;
QUEUE_INT currentReadIndex = m_readIndex;

if (currentWriteIndex >= currentReadIndex)
return currentWriteIndex - currentReadIndex;
else
return Q_SIZE + currentWriteIndex - currentReadIndex;
},>

下面的场景描述了 size 为何会返回一个不正确的值:

1. 当 currentWriteIndex = m_writeIndex 执行之后,m_writeIndex=3,m_readIndex = 2 那么实际 size 是 1;
2. 之后操作线程被抢占,且在它停止运行的这段时间内,有 2 个元素被插入和从队列中移除。所以 m_writeIndex=5,m_readIndex = 4,而 size 还是 1;
3. 现在被抢占的线程恢复执行,读取 m_readIndex 值,这个时候 currentReadIndex=4,currentWriteIndex=3;
4. currentReadIndex > currentWriteIndex'所以 m_totalSize + currentWriteIndex - currentReadIndex`被返回,这个值意味着队列几乎是满的,而实际上队列几乎是空的。

解决方案:添加一个用于保存队列中元素数量的成员 count.这个成员可以通过 AtomicAdd/AtomicSub 来实现原子的递增和递减。

但需要注意的是这增加了一定开销,因为原子递增,递减操作比较昂贵也很难被编译器优化。如果可以容忍size函数的ABA问题,则可以不用count与AtomicAdd/AtomicSub。使用者可以根据自己的使用场合选择是否承受额外的运行时开销。

2.2.5 与智能指针一起使用,内存无法得到释放

如果你打算用这个队列来存放智能指针对象.需要注意,将一个智能指针存入队列之后,如果它所占用的位置没有被另一个智能指针覆盖,那么它所指向的内存是无法被释放的(因为它的引用计数器无法下降为 0).这对于一个操作频繁的队列来说没有什么问题,但是程序员需要注意的是,一旦队列被填满过一次那么应用程序所占用的内存就不会下降,即使队列被清空.除非自己做改动,每次 pop 手动 delete。

3. 多个⽣产者线程的情况下yielding处理器的必要性

读者可能注意到了enqueue函数中使⽤了sched_yield()来主动的让出处理器,对于⼀个声称⽆锁的算法⽽⾔,这个调⽤看起来有点奇怪。正如⽂章开始的部分解释过的,多线程环境下影响性能的其中⼀个因素就是Cache损坏。⽽产⽣Cache损坏的⼀种情况就是⼀个线程被抢占,操作系统需要保存被抢占线程的上下⽂,然后将被选中作为下⼀个调度线程的上下⽂载⼊。此时Cache中缓存的数据都会失效,因为它是被抢占线程的数据⽽不是新线程的数据。

所以,当此算法调⽤sched_yield()意味着告诉操作系统:“我要把处理器时间让给其它线程,因为我要等待某件事情的发⽣”。⽆锁算法和通过阻塞机制同步的算法的⼀个主要区别在于⽆锁算法不会阻塞在线程同步上,那么为什么在这⾥我们要主动请求操作系统抢占⾃⼰呢?这个问题的答案没那么简单。它与有多少个⽣产者线程在并发的往队列中存放数据有关:每个⽣产者线程所执⾏的CAS操作都必须严格遵循FIFO次序,⼀个⽤于申请空间,另⼀个⽤于通知消费者数据已经写⼊完成可以被读取了。

如果我们的应⽤程序只有唯⼀的⽣产者操作这个队列,sche_yield()将永远没有机会被调⽤,第2个CAS操作永远不会失败。因为在⼀个⽣产者的情况下没有⼈能破坏⽣产者执⾏这两个CAS操作的FIFO顺序。

⽽当多于⼀个⽣产者线程往队列中存放数据的时候,问题就出现了。概括来说,⼀个⽣产者通过第1个CAS操作申请空间,然后将数据写⼊到申请到的空间中,然后执⾏第2个CAS操作通知消费者数据准备完毕可供读取了。这第2个CAS操作必须遵循FIFO顺序,也就是说,如果A线程第⾸先执⾏完第⼀个CAS操作,那么它也要第1个执⾏完第2个CAS操作,如果A线程在执⾏完第⼀个CAS操作之后停⽌,然后B线程执⾏完第1个CAS操作,那么B线程将⽆法完成第2个CAS操作,因为它要等待A先完成第2个CAS操作。⽽这就是问题产⽣的根源。让我们考虑如下场景,3个消费者线程和1个消费者线程:

  • 线程1,2,3按顺序调⽤第1个CAS操作申请了空间。那么它们完成第2个CAS操作的顺序也应该与这个顺序⼀致,1,2,3。
  • 线程2⾸先尝试执⾏第2个CAS,但它会失败,因为线程1还没完成它的第2此CAS操作呢。同样对于线程3也是⼀样的。
  • 线程2和3将会不断的调⽤它们的第2个CAS操作,直到线程1完成它的第2个CAS操作为⽌。
  • 线程1最终完成了它的第2个CAS,现在线程3必须等线程2先完成它的第2个CAS。
  • 线程2也完成了,最终线程3也完成。

在上⾯的场景中,⽣产者可能会在第2个CAS操作上⾃旋⼀段时间,⽤于等待先于它执⾏第1个CAS操作的线程完成它的第2次CAS操作。在⼀个物理处理器数量⼤于操作队列线程数量的系统上,这不会有太严重的问题:因为每个线程都可以分配在⾃⼰的处理器上执⾏,它们最终都会很快完成各⾃的第2次CAS操作。虽然算法导致线程处理忙等状态,但这正是我们所期望的,因为这使得操作更快的完成。也就是说在这种情况下我们是不需要sche_yield()的,它完全可以从代码中删除。

但是,在⼀个物理处理器数量少于线程数量的系统上,sche_yield()就变得⾄关重要了。让我们再次考查上⾯3个线程的场景,当线程3准备向队列中插⼊数据:如果线程1在执⾏完第1个CAS操作,在执⾏第2个CAS操作之前被抢占,那么线程2,3就会⼀直在它们的第2个CAS操作上忙等(它们忙等,不让出处理器,线程1也就没机会执⾏,它们就只能继续忙等,也就是说,如果不适用 sched_yield,一直自旋,那么可能多个线程同时阻塞在第二个 CAS 那儿),直到线程1重新被唤醒,完成它的第2个CAS操作。这就是需要sche_yield()的场合了,操作系统应该避免让线程2,3处于忙等状态。它们应该尽快的让出处理器让线程1执⾏,使得线程1可以把它的第2个CAS操作完成。这样线程2和3才能继续完成它们的操作。

4. 循环数组无锁队列的性能测试

4.1 性能测试

互斥锁队列 vs 互斥锁+条件变量队列 vs 内存屏障链表 vs RingBuffer CAS 实现。

4写1读:性能中等

封装

4写4读:性能中上

封装

1写4读:性能最好

封装

7写7读:性能比互斥锁队列还差

封装

4.2 分析

虽然没有分析第三个内存屏障链表的代码,但是我们不难看出互斥锁+条件变量 与 内存屏障链表 的性能差别不大。为什么呢?链表的方式需要不断的申请和释放元素。当然,用内存池可以适当改善这个影响,但是内存池在分配内存与释放内存的时候也会涉及到线程间的数据竞争,所以用链表的方式性能相对提升不多。

随着生产者数量的增加,无锁队列的效率迅速下降。因为在多个生产者的情况下,第 2 个 CAS 将对性能产生影响。我们通过测试得出循环数组的⽆锁队列在1写4读的场景下性能提升是最高的,因为只有一个生产者,那么第二个CAS不会有yield的情况出现。由此我们可以得出一个结论,在一写多读的场景,我们可以优先使用循环数组的⽆锁队列,比如下图的场景。

封装

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分