RCU
本文主要讲QSBR风格的RCU(即:Quiescent State Based Read Copy Update)。
先来看一段代码
int *g_data;
void reader(void)
{
int cnt = 0;
for (int i = 0; i < 10000; ++i)
cnt += *g_data;
}
void writer(void)
{
int *p = NULL;
for (int i = 10; i < 20; ++i) {
p = calloc(1, sizeof(*p));
*p = i;
free(g_data);
g_data = p;
}
}
假设g_data
已经被初始化,在多线程环境中,多个线程执行reader
函数,一个线程执行writer
函数。那么,读线程就可能使用已经被释放的g_data
而导致段错误。
这时,我们想到最简单的解决办法就是使用一把互斥锁,把g_data
保护起来。这样确实能解决问题,但同时导致多个线程的锁竞争。根据前面的假设,我们可以把锁换成读写锁。这样确实比互斥锁要好一点,但并没有减轻读线程的开销。我们还可以想到做一个延时删除,类似垃圾回收,那我们可以等所有的读线程都不在使用旧的g_data
的时候,就可以把g_data
个删除掉了,这样,我们只需要在读线程使用完g_data
之后就通知写线程本读线程已经读完了,然后写线程把正在使用g_data
的读线程计数减一,当计数变成零时就可以删掉g_data
了。这样,读线程几乎没有开销,而写线程可能需要一段时间的等待。
上面延时删除的想法就是本文的主题:RCU。
具体实现上,我们使用TLS的支持(其实cache line对齐的数组也行),基本思想是:
- 读线程启动时进行注册 (reg_online)
- 读线程进入临界区时进行lock (read_lock)
- 读线程完成临界区操作时进行unlock (read_unlock)
- 读线程报告临界区操作完成 (read_quiescent_state)
- 读线程退出时取消注册 (reg_offline)
- 写线程进行原子更新 (atomic_exchange)
- 写线程等待所有读线程报告临界区操作完成(synchronize_reader)
- 写线程释放旧的资源
下面我们按照上述的流程用伪代码实现以下。
reg_online
我们使用一个全局的链表reg_list
用来串联所有读线程的TLS节点,比如
struct reg_node {
bool is_reg;
uint64_t version;
struct reg_node *next;
struct reg_node *prev;
};
static pthread_mutex_t g_mtx;
static struct reg_node registry_list;
static __thread struct reg_node reader_reg;
void reg_online(void)
{
assert(!reader_reg.is_reg);
pthread_mutex_lock(&g_mtx);
list_add(®_list, &reader_reg);
reader_reg.is_reg = true;
pthread_mutex_unlock(&g_mtx);
}
read_quiescent_state
从上面的流程上来看,read_lock
和read_unlock
其实并不重要,重要的是读线程在操作完成后通知写线程,因此read_lock
和read_unlock
的存在仅仅是在形式上和互斥锁保持一致。
下面,我们来实现报告临界区操作完成的函数
static uint64_t gp_verion;
void read_quiescent_state(void)
{
atomic_set(&reader_reg.version, atomic_load(&gp_verion));
}
这个函数实现很简单,仅仅是把全局的版本号赋值给本地的reader
,这样,在写线程,就可以通过遍历reg_list
来发现读线程是否已经完成临界区操作。这里gp_version
中的gp
指的是Grace Period
即一个时间段,这个时间段把新来的访问和旧得访问隔开,在这之前进入临界区的读线程都读到旧的数据,之后的读线程都读到新数据。
reg_offline
简单的把本地变量从reg_list
删除即可
void reg_offline(void)
{
assert(reader_reg.is_reg);
pthread_mutex_lock(&g_mtx);
list_remove(&reader_reg);
bzero(&reader_reg, sizeof(reader_reg));
pthread_mutex_unlock(&g_mtx);
}
atonic_exchange
写线程完成新数据初始化后,原子的替换掉旧的数据,这既是gp的开始
void atomic_exchange(void *dst, void *src)
{
asm volatile(
"mov rax, [rsi];"
"xchg rax, [rdi];"
"mov [rsi], rax;"
);
}
需要注意的是,这里的参数必须是和void *
等长的,比如
int x = 1;
int y = 2;
atomic_exhange(&x, &y);
是错误的,因为sizeof(int) != sizeof(void *)
。
synchronize_reader
这里需要两个操作,一个是增加gp_version
,另一个是遍历reg_list
等所有的reader_reg
的版本等于前面设置的gp_version
。
static pthread_mutex_t g_sync;
void synchronize_reader(void)
{
struct reg_node done;
pthread_mutex_lock(&g_sync);
atomic_set(&gp_version, gp_version + 1);
for (;;) {
for_each(node, ®_list) {
if (atomic_load(&node->version) == gp_version)
move_node(node, &done); // remove from reg_list and put into done
}
if (list_empty(®_list))
break;
}
// put back to reg_list
list_splice(&done, ®_list);
pthread_mutex_unlock(&g_sync);
}
增加gp_version
的版本,既是gp
的结束。然后写线程一直等注册了的读线程追上新的版本,函数执行结束后,就可以安全的释放旧的资源了。
整个流程基本上就是这样,这时我们可以把最初的例子修改下:
void reader(void)
{
int cnt = 0;
reg_online();
for (int i = 0; i < 10000; ++i) {
read_lock();
cnt += *g_data;
read_unlock();
// report state
read_quiescent_state();
}
reg_offline();
}
void writer(void)
{
int *p = NULL;
for (int i = 10; i < 20; ++i) {
p = calloc(1, sizeof(*p));
*p = i;
// begin GP
atomic_exchange(g_data, p);
// end GP and wait readers
synchronize_reader();
free(p);
}
}
这时,读线程仅有一个原子操作的开销,当然如果可以放开要求的话,可以隔一段时间调用一次read_quiescent_state
。对于写线程,从实现来看,如果reader没有及时报告状态的话,这里将会有大量的原子操作,一个可行的优化是,在本地循环检测n次后,如果还有reader没有就绪,就使用诸如poll
的调用,等待reader来唤醒,这样可以减少CPU的使用。另一个可行的优化是,使用独立的线程用于回收工作,写线程仅仅是向这个线程注册回调。
另外,如果要完整实现的话,需要特别注意:
1,避免编译器的重排
2,避免处理器的重排
如果需要对多个资源做RCU,那么可以考虑使用一个RCU的结构,作为参数传给RCU接口,这个结构中多线程共享的成员可以对齐cache line避免false sharing。