程序在运行时,内存实际的访问顺序和程序代码编写的访问顺序不一定一致,基于这个前提,Memory barrier 就有存在的必要了。

Memory barrier 能够保证其之前的内存访问操作先于其后的完成。Memory barrier的主要应用场景如下:

  1. 实现同步原语(synchronization primitives)
  2. 实现无锁数据结构(lock-free data structures)
  3. 驱动程序

本文内容转载自:Name5566:理解 Memory barrier

1. Memory barrier 简介

程序在运行时,内存实际的访问顺序和程序代码编写的访问顺序不一定一致,这就是内存乱序访问。内存乱序访问出现的理由是为了提升程序运行时的性能。内存乱序访问主要发生在两个阶段:

  1. 编译时,编译器优化导致内存乱序访问(指令重排)
  2. 运行时,多个CPU之间的交互引起内存乱序访问

Memory barrier 能够让 CPU 或编译器在内存访问上有序。一个 Memory barrier 之前的内存访问操作必定先于其之后的完成。Memory barrier 包括两类:

  1. 编译器 barrier
  2. CPU Memory barrier

通常情况下,编译器和 CPU 引起的内存乱序访问不会带来什么问题,但在一些特殊情况下,程序逻辑的正确性依赖于内存访问的顺序,此时内存乱序访问会带来逻辑上的错误,例如:

1
2
3
4
5
6
7
// thread 1
while (!ok);
do(x);

// thread 2
x = 42;
ok = 1;

此段代码中,ok 初始化为 0,线程 1 等待 ok 被设置为 1 后执行 do 函数。假如说,线程 2 对内存的写操作乱序执行,也就是 x 赋值后于 ok 赋值完成,那么 do 函数接受的实参就很可能不是 42。

1.1 编译时内存乱序访问

编译器对代码做出优化时,可能改变指令的执行顺序(例如 gcc 下 O2 或 O3 都可能会改变指令的执行顺序):

1
2
3
4
5
6
7
// test.cpp
int x, y, r;
void f()
{
x = r;
y = 1;
}

编译器优化的结果可能导致 y = 1 在 x = r 之前执行完成。首先直接编译此源文件:
g++ -S test.cpp

得到相关的汇编代码如下:

1
2
3
movl    r(%rip), %eax
movl %eax, x(%rip)
movl $1, y(%rip)

这里可以看到,x = r 和 y = 1 并没有乱序。现使用优化选项 O2(或 O3)编译上面的代码(g++ -O2 -S test.cpp),生成汇编代码如下:

1
2
3
movl    r(%rip), %eax
movl $1, y(%rip)
movl %eax, x(%rip)

我们可以清楚的看到,经过编译器优化之后 ,movl $1, y(%rip) 先于 movl %eax, x(%rip) 执行。避免编译时内存乱序访问的办法就是使用编译器 barrier(又叫优化 barrier)。Linux 内核提供函数 barrier() 用于让编译器保证其之前的内存访问先于其之后的完成。内核实现 barrier()如下(X86-64 架构):

1
#define barrier() __asm__ __volatile__("" ::: "memory")

现在把此编译器 barrier 加入代码中:

1
2
3
4
5
6
7
int x, y, r;
void f()
{
x = r;
__asm__ __volatile__("" ::: "memory");
y = 1;
}

这样就避免了编译器优化带来的内存乱序访问的问题了。本例中,我们还可以使用 volatile 这个关键字来避免编译时内存乱序访问(而无法避免后面要说的运行时内存乱序访问)。volatile 关键字能够避免相关的变量在内存访问上乱序访问,这里可以修改 x 和 y 的定义来解决问题:

1
2
3
4
5
6
7
volatile int x, y;
int r;
void f()
{
x = r;
y = 1;
}

现加上了 volatile 关键字,这使得 x 相对于 y、y 相对于 x 在内存访问上有序。在 Linux 内核中,提供了一个宏 ACCESS_ONCE 来避免编译器对于连续的 ACCESS_ONCE实例进行指令重排。其实 ACCESS_ONCE 实现源码如下:

1
#define ACCESS_ONCE(x) (*(volatile typeof(x) *)&(x))

此代码只是将变量 x 转换为 volatile 的而已。现在我们就有了第三个修改方案:

1
2
3
4
5
6
int x, y, r;
void f()
{
ACCESS_ONCE(x) = r;
ACCESS_ONCE(y) = 1;
}

到此基本上就阐述完了编译时内存乱序访问的问题。下面开始介绍运行时内存乱序访问。

1.2 运行时内存乱序访问

在运行时,CPU 虽然会乱序执行指令,但是在单个 CPU上,硬件能够保证程序执行时,所有的内存访问操作看起来像是按程序代码编写的顺序执行的,这时候 Memory barrier 没有必要使用(不考虑编译器优化的情况下)。这里我们了解一下 CPU 乱序执行的行为。在乱序执行时,一个处理器真正执行指令的顺序由可用的输入数据决定,而非程序员编写的顺序。
早期的处理器为有序处理器(In-order processors),有序处理器处理指令通常有以下几步:

  1. 指令获取
  2. 如果指令的输入操作对象(input operands)可用(例如已经在寄存器中了),则将此指令分发到适当的功能单元中。如果一个或者多个操作对象不可用(通常是由于需要从内存中获取),则处理器会等待,直到它们可用
  3. 指令被适当的功能单元执行
  4. 功能单元将结果写回寄存器堆(Register file,一个 CPU 中的一组寄存器)

相比之下,乱序处理器(Out-of-order processors)处理指令通常有以下几步:

  1. 指令获取
  2. 指令被分发到指令队列
  3. 指令在指令队列中等待,直到输入操作对象可用(一旦输入操作对象可用,指令就可以离开队列,即便更早的指令未被执行)
  4. 指令被分配到适当的功能单元并执行
  5. 执行结果被放入队列(而不立即写入寄存器堆)
  6. 只有所有更早请求执行的指令的执行结果被写入寄存器堆后,指令执行的结果才被写入寄存器堆(执行结果重排序,让执行看起来是有序的)

从上面的执行过程可以看出,相比有序执行,乱序执行能够避免等待不可用的操作对象(有序执行的第二步),从而提高了效率。处理器运行的速度比内存快很多,在有序处理器花在等待可用数据的时间里,已经可以处理大量指令了。
现在思考一下乱序处理器处理指令的过程,我们能得到几个结论:

  1. 对于单个 CPU, 指令获取是有序的(通过队列实现)
  2. 对于单个 CPU,指令执行结果也是有序返回寄存器堆的(通过队列实现)

由此可知,在单 CPU 上,当不考虑编译器优化导致乱序时,多线程执行不存在内存乱序访问的问题。我们从内核源码中也可以得到类似的结论:

1
2
3
4
5
#ifdef CONFIG_SMP
#define smp_mb() mb()
#else
#define smp_mb() barrier()
#endif

这里可以看到,如果是 SMP, 则使用 mb,mb 被定义为 CPU Memory barrier(后面会讲到),而非 SMP 时,直接使用编译器 barrier。

多处理器架构一般采用NUMA,这种架构下的内存操作会有巨大的延时问题。为了缓解这些问题,处理器会采取一些优化措施, 而导致程序顺序被破坏。

  • 情景一: 设想某处理器发出一条某内存位置读的指令,恰好这个内存位置在远端内存,而且处理器本地缓存也没有命中。于是,为了等待这个值,处理器需要空转(stall)。这显然是效率的极大浪费,事实上,现代的处理器都有乱序执行引擎, 指令并不是直接被执行,而是放到等待队列里,等待操作数到位后才执行,而这期间处理器优先处理其他指令。也就是出于效率考虑,处理器会重排指令。

  • 情景二: 设想有一个热点全局变量,那么在程序运行一段时间后,很可能很多个处理器的本地缓存都有该变量的一份拷贝。再设想现在有处理器A修改这个全局变量,这个修改会发布一条消息能过网络通知所有其他处理器更新该变量缓存。由于路径的问题,消息不会同时到达所有处理器,那么存在一种可能性,某处理器此时仍观察到旧的值,而采取一些基于该旧值的动作。

有兴趣可以研究Memory Barriers: a Hardware View for Software Hackers一文,其详细的分析了以上过程。

2. Memory barrier的应用场景

Memory barrier 常用场合包括:

  1. 实现同步原语(synchronization primitives)
  2. 实现无锁数据结构(lock-free data structures)
  3. 驱动程序

在应用程序开发中,开发者可能完全不知道 Memory barrier 就可以开发正确的多线程程序,这主要是因为各种同步机制中已经隐含了 Memory barrier(但和实际的 Memory barrier 有细微差别)。但是如果你希望编写诸如无锁数据结构,那么 Memory barrier 还是很有用的。

在 Linux 内核中,除了前面说到的编译器 barrier — barrier()ACCESS_ONCE(),还有 CPU Memory barrier:

  1. 通用 barrier,保证读写操作有序的,mb()smp_mb()
  2. 写操作 barrier,仅保证写操作有序的,wmb()smp_wmb()
  3. 读操作 barrier,仅保证读操作有序的,rmb()smp_rmb()

注意,以上的CPU Memory barrier都隐含了编译器 barrier。这里以smp 开头的 Memory barrier 会在单处理器上直接使用编译器 barrier,而在 SMP 上才使用 CPU Memory barrier。

最后需要注意的是,CPU Memory barrier 中某些类型的 Memory barrier 需要成对使用,否则会出错。例如:一个写操作 barrier 需要和读操作barrier 一起使用(当然,通用 barrier 也是可以的),反之依然。

2.1 Memory barrier 的示例

下面我们通过读内核代码来进一步学习 Memory barrier 的使用。
Linux 内核实现的无锁(只有一个读线程和一个写线程时)环形缓冲区 kfifo 就使用到了 Memory barrier,实现源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
/*
* A simple kernel FIFO implementation.
*
* Copyright (C) 2004 Stelian Pop <stelian@popies.net>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*
*/
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/slab.h>
#include <linux/err.h>
#include <linux/kfifo.h>
#include <linux/log2.h>
/**
* kfifo_init - allocates a new FIFO using a preallocated buffer
* @buffer: the preallocated buffer to be used.
* @size: the size of the internal buffer, this have to be a power of 2.
* @gfp_mask: get_free_pages mask, passed to kmalloc()
* @lock: the lock to be used to protect the fifo buffer
*
* Do NOT pass the kfifo to kfifo_free() after use! Simply free the
* &struct kfifo with kfree().
*/
struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size,
gfp_t gfp_mask, spinlock_t *lock)
{
struct kfifo *fifo;
/* size must be a power of 2 */
BUG_ON(!is_power_of_2(size));
fifo = kmalloc(sizeof(struct kfifo), gfp_mask);
if (!fifo)
return ERR_PTR(-ENOMEM);
fifo->buffer = buffer;
fifo->size = size;
fifo->in = fifo->out = 0;
fifo->lock = lock;
return fifo;
}
EXPORT_SYMBOL(kfifo_init);
/**
* kfifo_alloc - allocates a new FIFO and its internal buffer
* @size: the size of the internal buffer to be allocated.
* @gfp_mask: get_free_pages mask, passed to kmalloc()
* @lock: the lock to be used to protect the fifo buffer
*
* The size will be rounded-up to a power of 2.
*/
struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
{
unsigned char *buffer;
struct kfifo *ret;
/*
* round up to the next power of 2, since our 'let the indices
* wrap' technique works only in this case.
*/
if (!is_power_of_2(size)) {
BUG_ON(size > 0x80000000);
size = roundup_pow_of_two(size);
}
buffer = kmalloc(size, gfp_mask);
if (!buffer)
return ERR_PTR(-ENOMEM);
ret = kfifo_init(buffer, size, gfp_mask, lock);
if (IS_ERR(ret))
kfree(buffer);
return ret;
}
EXPORT_SYMBOL(kfifo_alloc);
/**
* kfifo_free - frees the FIFO
* @fifo: the fifo to be freed.
*/
void kfifo_free(struct kfifo *fifo)
{
kfree(fifo->buffer);
kfree(fifo);
}
EXPORT_SYMBOL(kfifo_free);
/**
* __kfifo_put - puts some data into the FIFO, no locking version
* @fifo: the fifo to be used.
* @buffer: the data to be added.
* @len: the length of the data to be added.
*
* This function copies at most @len bytes from the @buffer into
* the FIFO depending on the free space, and returns the number of
* bytes copied.
*
* Note that with only one concurrent reader and one concurrent
* writer, you don't need extra locking to use these functions.
*/
unsigned int __kfifo_put(struct kfifo *fifo,
const unsigned char *buffer, unsigned int len)
{
unsigned int l;
len = min(len, fifo->size - fifo->in + fifo->out);
/*
* Ensure that we sample the fifo->out index -before- we
* start putting bytes into the kfifo.
*/
smp_mb();
/* first put the data starting from fifo->in to buffer end */
l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
/* then put the rest (if any) at the beginning of the buffer */
memcpy(fifo->buffer, buffer + l, len - l);
/*
* Ensure that we add the bytes to the kfifo -before-
* we update the fifo->in index.
*/
smp_wmb();
fifo->in += len;
return len;
}
EXPORT_SYMBOL(__kfifo_put);
/**
* __kfifo_get - gets some data from the FIFO, no locking version
* @fifo: the fifo to be used.
* @buffer: where the data must be copied.
* @len: the size of the destination buffer.
*
* This function copies at most @len bytes from the FIFO into the
* @buffer and returns the number of copied bytes.
*
* Note that with only one concurrent reader and one concurrent
* writer, you don't need extra locking to use these functions.
*/
unsigned int __kfifo_get(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int l;
len = min(len, fifo->in - fifo->out);
/*
* Ensure that we sample the fifo->in index -before- we
* start removing bytes from the kfifo.
*/
smp_rmb();
/* first get the data from fifo->out until the end of the buffer */
l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
/* then get the rest (if any) from the beginning of the buffer */
memcpy(buffer + l, fifo->buffer, len - l);
/*
* Ensure that we remove the bytes from the kfifo -before-
* we update the fifo->out index.
*/
smp_mb();
fifo->out += len;
return len;
}
EXPORT_SYMBOL(__kfifo_get);

在上述代码中,我们只需关注Memory barrier 即可。代码中,索引 in 和 out 被两个线程访问。in 和 out 指明了缓冲区中实际数据的边界,由于未使用同步机制,那么如果想保证顺序关系的话,就需要使用到 Memory barrier 了。索引in被一个线程修改,被两个线程读取。 索引out被另一个线程修改,被两个线程读取。__kfifo_put 先通过 in 和 out 来确定可以向缓冲区中写入数据量的多少,这时,out 索引应该先被读取后,才能将用户 buffer 中的数据写入缓冲区,因此这里使用到了 smp_mb(),对应的,__kfifo_get 也使用 smp_mb() 来确保在修改 out 索引之前,缓冲区中数据已经被读取到用户的buffer 中了。对于 in 索引,在 __kfifo_put 中,要保证先向缓冲区写入数据后才修改 in 索引,由于这里只需要保证写入操作有序,故选用写操作 barrier,在 __kfifo_get 中,要保证先读取了 in 索引才开始读取缓冲区中数据,由于这里只需要保证读取操作有序,故选用读操作 barrier。


参考文献:

  1. LINUX KERNEL MEMORY BARRIERS
  2. Memory Barriers: a Hardware View for Software Hackers
  3. the-memory-barriers-in-linux-kernel