多线程同步是现代程序设计中一个重要问题,想要深入理解这一问题,就不得不深入到处理器与操作系统的具体实现中。但是我水平有限┑( ̄Д  ̄)┍,本文不过于详细地去探讨这一问题,而是以有锁栈与无锁栈作为切入点,主要对代码实现与实验性能上对两者进行一个比较。
有锁栈与无锁栈
有锁栈的实现方式其实比较简单,在线程进入临界区的时候加锁即可,在完成对栈的操作解除对锁的占有。但是加锁解锁的过程对资源的消耗比较大,当线程没有Acquire到锁的时候会发生状态的切换,这在需要高性能计算的场景下或许并不合适。
Compare And Swap(CAS)是一种常见的原子操作,是实现无锁同步的一个方式,其主要思想就是Compare和Swap两个过程。Compare的目的主要是探测内存中的值是否为期待值,如果是则Swap为设定的值。这主要依赖于处理器的实现,比如x86处理器的CMPXCHG
与ARM处理器的LDREX/STREX
。
实现与实验
C++实现
本文主要对比C++标准库实现的栈,会用到一些比较新的C++标准,实验环境为:
- 腾讯云Cloud Studio 1C2G(致谢!)
- gcc version 9.4.0 (Ubuntu 9.4.0-1ubuntu1~20.04.2)
- g++ experiment.cpp -o experiment -pthread
为了减少代码冗余,两个栈实现的内部数据结构均为以下代码段。
1
2
3
4
5
6
| template<typename T>
struct Node {
T data;
Node* next;
Node(const T& data) : data(data), next(nullptr) {}
};
|
以下为有锁栈的实现代码的核心部分。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| void push(const T& data) {
std::unique_lock<std::mutex> lock(mtx);
Node<T>* new_node = new Node<T>(data);
new_node->next = head;
head = new_node;
}
T top() const {
std::unique_lock<std::mutex> lock(mtx);
Node<T>* top_node = head;
if (top_node) {
return top_node->data;
}
throw std::runtime_error("Stack is empty.");
}
void pop() {
std::unique_lock<std::mutex> lock(mtx);
Node<T>* top_node = head;
if (top_node) {
Node<T>* next_node = top_node->next;
head = next_node;
delete top_node;
return;
}
throw std::runtime_error("Stack is empty.");
}
bool is_empty() const noexcept {
std::unique_lock<std::mutex> lock(mtx);
return head == nullptr;
}
|
有锁栈的实现主要依赖互斥锁(mutex & unique_lock)实现,线程在进行每项操作时需要先Acquire锁的所有权,在执行完操作后需要Release锁的所有权。在这里我们不对修改操作的合法性进行检查,而是选择直接对不合法的越界操作抛出异常。有锁栈的实现简单,符合一般的多线程编程逻辑比较直观。在这里使用标准库自带的RAII锁,编程实现非常简单。
以下为无锁栈实现的核心部分。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| std::atomic<Node<T>*> head;
void push(const T& data) {
Node<T>* new_node = new Node<T>(data);
new_node->next = head.load(std::memory_order_relaxed);
while (!head.compare_exchange_weak(new_node->next, new_node, std::memory_order_release, std::memory_order_relaxed));
}
T top() const {
Node<T>* top_node = head.load(std::memory_order_acquire);
if (top_node) {
return top_node->data;
}
throw std::runtime_error("Stack is empty.");
}
void pop() {
Node<T>* top_node = head.load(std::memory_order_relaxed);
while (top_node) {
Node<T>* next_node = top_node->next;
if (head.compare_exchange_weak(top_node, next_node, std::memory_order_release, std::memory_order_relaxed)) {
delete top_node;
return;
}
}
throw std::runtime_error("Stack is empty.");
}
bool is_empty() const noexcept {
return head.load(std::memory_order_acquire) == nullptr;
}
|
对比有锁栈,无锁栈在实现上困难了不少,这主要牵涉到对CAS的理解上。这一部分的理论部分在前文与参考资料中有比较详细的描述,这里再简单对C++代码实现方面做一个解释:
- head作为栈顶元素的指针,在这里使用的是原子变量类型,这主要是为了能够在一个CPU Instruction中完成对变量的操作,从而避免中间状态的产生,也就保证了操作的原子性。原子操作存在内存序的概念,在这里用到了三种内存序,分别解释如下
- memory_order_relaxed: 松散内存顺序。这个模式保证了读操作不会被重排,但对于写操作没有任何保证。
- memory_order_release:释放内存顺序。保障了读写顺序,通常不适用于load()。
- memory_order_acquire:获取内存顺序。保障了读写顺序。
atomic.load()
的作用是安全地读取一个原子变量,其中参数为内存序(Memory Order)。在Push和Pop操作中选择的load的内存序为memory_order_relaxed,这是因为这里考虑CAS方式乱序地插入与删除元素,即元素本身的顺序并不重要;而在IsEmpty操作中的内存序为memory_order_acquire,这主要是考虑在当前时间点执行IsEmpty的结果的正确性。atomic.compare_exchange_weak()
用于执行原子比较和交换操作,也就是CAS机制的主要实现。其中一个参数为Expected,是Compare过程比较的值;第二个变量为Desire,是Swap过程想要交换为的值。其工作过程为,比较atomic变量与Expected是否相等,如果相等则交换为Desire,否则退化为一个load的过程,将最新的atomic的值加载进Desire。后两个参数则为比较success或failure后的内存序。值得一提的是atomic.compare_exchange_strong()
也值得了解,其主要适用于不需要循环重试的场景,关注可靠性的场景。
这一部分的内容有一些难理解,如果读到这里你有所困惑,建议去查找更多关于CAS机制及C++内存序的相关资料,本文不再详细展开(没错我也讲不明白😔)。
为了进行公平对比,在这里定义实验任务为:在每个线程中执行1,000,000
次入栈操作,如下。
1
2
3
4
5
6
| auto ok = pool.enqueue([&] {
for (int i = 0; i < 1000000; ++i)
stack.push(i);
return true;
});
|
在这里我使用线程池创建了两个线程,从任务创建开始计时,以两个线程的入栈任务结束截止计时,核心代码如下所示。
1
2
3
4
5
6
7
8
9
| auto start = std::chrono::high_resolution_clock::now();
// Assign tasks to thread pool.
if (ok_1.get() && ok_2.get()) {
auto stop = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(stop - start);
std::cout << duration.count() << std::endl;
}
|
本文所提到的线程池、有锁栈和无锁栈的完整代码实现均可在本人的开源项目MyDSA中找到(如果对你有用,也麻烦点一个小星星呀!)。
实验结果
实验结果如下表(单位为microsecond)。
方法 | 有锁栈 | 无锁栈 |
---|
#1 | 1,003,863 | 399,217 |
#2 | 884,579 | 186,314 |
#3 | 1,013,956 | 427,235 |
Avg | 967,466 | 337,588 |
从实验结果的平均值来看,有锁栈相较于无锁栈快了2.86倍。当然这只是从入栈效率上来进行比较的,如果加上随机出栈与取值操作可能会有差异。
Reference