有锁栈与无锁栈,实现与性能对比

多线程同步是现代程序设计中一个重要问题,想要深入理解这一问题,就不得不深入到处理器与操作系统的具体实现中。但是我水平有限┑( ̄Д  ̄)┍,本文不过于详细地去探讨这一问题,而是以有锁栈与无锁栈作为切入点,主要对代码实现与实验性能上对两者进行一个比较。

有锁栈与无锁栈

有锁栈的实现方式其实比较简单,在线程进入临界区的时候加锁即可,在完成对栈的操作解除对锁的占有。但是加锁解锁的过程对资源的消耗比较大,当线程没有Acquire到锁的时候会发生状态的切换,这在需要高性能计算的场景下或许并不合适。

Compare And Swap(CAS)是一种常见的原子操作,是实现无锁同步的一个方式,其主要思想就是Compare和Swap两个过程。Compare的目的主要是探测内存中的值是否为期待值,如果是则Swap为设定的值。这主要依赖于处理器的实现,比如x86处理器的CMPXCHG与ARM处理器的LDREX/STREX

实现与实验

C++实现

本文主要对比C++标准库实现的栈,会用到一些比较新的C++标准,实验环境为:

  • 腾讯云Cloud Studio 1C2G(致谢!)
  • gcc version 9.4.0 (Ubuntu 9.4.0-1ubuntu1~20.04.2)
  • g++ experiment.cpp -o experiment -pthread

为了减少代码冗余,两个栈实现的内部数据结构均为以下代码段。

1
2
3
4
5
6
template<typename T>
struct Node {
    T data;
    Node* next;
    Node(const T& data) : data(data), next(nullptr) {}
};

以下为有锁栈的实现代码的核心部分。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void push(const T& data) {
    std::unique_lock<std::mutex> lock(mtx);
    Node<T>* new_node = new Node<T>(data);
    new_node->next = head;
    head = new_node;
}

T top() const {
    std::unique_lock<std::mutex> lock(mtx);
    Node<T>* top_node = head;
    if (top_node) {
        return top_node->data;
    }
        
    throw std::runtime_error("Stack is empty.");
}

void pop() {
    std::unique_lock<std::mutex> lock(mtx);
    Node<T>* top_node = head;
    if (top_node) {
        Node<T>* next_node = top_node->next;
        head = next_node;
        delete top_node;

        return;
    }
    
    throw std::runtime_error("Stack is empty.");
}

bool is_empty() const noexcept {
    std::unique_lock<std::mutex> lock(mtx);

    return head == nullptr;
}

有锁栈的实现主要依赖互斥锁(mutex & unique_lock)实现,线程在进行每项操作时需要先Acquire锁的所有权,在执行完操作后需要Release锁的所有权。在这里我们不对修改操作的合法性进行检查,而是选择直接对不合法的越界操作抛出异常。有锁栈的实现简单,符合一般的多线程编程逻辑比较直观。在这里使用标准库自带的RAII锁,编程实现非常简单。

以下为无锁栈实现的核心部分。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
std::atomic<Node<T>*> head;

void push(const T& data) {
    Node<T>* new_node = new Node<T>(data);
    new_node->next = head.load(std::memory_order_relaxed);
    while (!head.compare_exchange_weak(new_node->next, new_node, std::memory_order_release, std::memory_order_relaxed));
}

T top() const {
    Node<T>* top_node = head.load(std::memory_order_acquire);
    if (top_node) {
        return top_node->data;
    }
        
    throw std::runtime_error("Stack is empty.");
}

void pop() {
    Node<T>* top_node = head.load(std::memory_order_relaxed);
    while (top_node) {
        Node<T>* next_node = top_node->next;
        if (head.compare_exchange_weak(top_node, next_node, std::memory_order_release, std::memory_order_relaxed)) {
            delete top_node;
            return;
        }
    }

    throw std::runtime_error("Stack is empty.");
}

bool is_empty() const noexcept {
    return head.load(std::memory_order_acquire) == nullptr;
}

对比有锁栈,无锁栈在实现上困难了不少,这主要牵涉到对CAS的理解上。这一部分的理论部分在前文与参考资料中有比较详细的描述,这里再简单对C++代码实现方面做一个解释:

  • head作为栈顶元素的指针,在这里使用的是原子变量类型,这主要是为了能够在一个CPU Instruction中完成对变量的操作,从而避免中间状态的产生,也就保证了操作的原子性。原子操作存在内存序的概念,在这里用到了三种内存序,分别解释如下
    • memory_order_relaxed: 松散内存顺序。这个模式保证了读操作不会被重排,但对于写操作没有任何保证。
    • memory_order_release:释放内存顺序。保障了读写顺序,通常不适用于load()。
    • memory_order_acquire:获取内存顺序。保障了读写顺序。
  • atomic.load()的作用是安全地读取一个原子变量,其中参数为内存序(Memory Order)。在Push和Pop操作中选择的load的内存序为memory_order_relaxed,这是因为这里考虑CAS方式乱序地插入与删除元素,即元素本身的顺序并不重要;而在IsEmpty操作中的内存序为memory_order_acquire,这主要是考虑在当前时间点执行IsEmpty的结果的正确性。
  • atomic.compare_exchange_weak()用于执行原子比较和交换操作,也就是CAS机制的主要实现。其中一个参数为Expected,是Compare过程比较的值;第二个变量为Desire,是Swap过程想要交换为的值。其工作过程为,比较atomic变量与Expected是否相等,如果相等则交换为Desire,否则退化为一个load的过程,将最新的atomic的值加载进Desire。后两个参数则为比较success或failure后的内存序。值得一提的是atomic.compare_exchange_strong()也值得了解,其主要适用于不需要循环重试的场景,关注可靠性的场景。

这一部分的内容有一些难理解,如果读到这里你有所困惑,建议去查找更多关于CAS机制及C++内存序的相关资料,本文不再详细展开(没错我也讲不明白😔)

为了进行公平对比,在这里定义实验任务为:在每个线程中执行1,000,000次入栈操作,如下。

1
2
3
4
5
6
auto ok = pool.enqueue([&] {
    for (int i = 0; i < 1000000; ++i)
        stack.push(i);

    return true;
});

在这里我使用线程池创建了两个线程,从任务创建开始计时,以两个线程的入栈任务结束截止计时,核心代码如下所示。

1
2
3
4
5
6
7
8
9
auto start = std::chrono::high_resolution_clock::now();

// Assign tasks to thread pool.

if (ok_1.get() && ok_2.get()) {
    auto stop = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::microseconds>(stop - start);
    std::cout << duration.count() << std::endl;
}

本文所提到的线程池、有锁栈和无锁栈的完整代码实现均可在本人的开源项目MyDSA中找到(如果对你有用,也麻烦点一个小星星呀!)。

实验结果

实验结果如下表(单位为microsecond)。

方法有锁栈无锁栈
#11,003,863399,217
#2884,579186,314
#31,013,956427,235
Avg967,466337,588

从实验结果的平均值来看,有锁栈相较于无锁栈快了2.86倍。当然这只是从入栈效率上来进行比较的,如果加上随机出栈与取值操作可能会有差异。


Reference