RCU

本文主要讲QSBR风格的RCU(即:Quiescent State Based Read Copy Update)。

先来看一段代码

int *g_data;

void reader(void)
{
        int cnt = 0;

        for (int i = 0; i < 10000; ++i)
                cnt += *g_data;
}

void writer(void)
{
        int *p = NULL;

        for (int i = 10; i < 20; ++i) {
                p = calloc(1, sizeof(*p));
                *p = i;

                free(g_data);
                g_data = p;
        }
}

假设g_data已经被初始化,在多线程环境中,多个线程执行reader函数,一个线程执行writer函数。那么,读线程就可能使用已经被释放的g_data而导致段错误。

这时,我们想到最简单的解决办法就是使用一把互斥锁,把g_data保护起来。这样确实能解决问题,但同时导致多个线程的锁竞争。根据前面的假设,我们可以把锁换成读写锁。这样确实比互斥锁要好一点,但并没有减轻读线程的开销。我们还可以想到做一个延时删除,类似垃圾回收,那我们可以等所有的读线程都不在使用旧的g_data的时候,就可以把g_data个删除掉了,这样,我们只需要在读线程使用完g_data之后就通知写线程本读线程已经读完了,然后写线程把正在使用g_data的读线程计数减一,当计数变成零时就可以删掉g_data了。这样,读线程几乎没有开销,而写线程可能需要一段时间的等待。

上面延时删除的想法就是本文的主题:RCU。

具体实现上,我们使用TLS的支持(其实cache line对齐的数组也行),基本思想是:

下面我们按照上述的流程用伪代码实现以下。

reg_online

我们使用一个全局的链表reg_list用来串联所有读线程的TLS节点,比如

struct reg_node {
        bool is_reg;
        uint64_t version;
        struct reg_node *next;
        struct reg_node *prev;
};

static pthread_mutex_t g_mtx;
static struct reg_node registry_list;

static __thread struct reg_node reader_reg;

void reg_online(void)
{
        assert(!reader_reg.is_reg);
        pthread_mutex_lock(&g_mtx);
        list_add(&reg_list, &reader_reg);
        reader_reg.is_reg = true;
        pthread_mutex_unlock(&g_mtx);
}

read_quiescent_state

从上面的流程上来看,read_lockread_unlock其实并不重要,重要的是读线程在操作完成后通知写线程,因此read_lockread_unlock的存在仅仅是在形式上和互斥锁保持一致。

下面,我们来实现报告临界区操作完成的函数

static uint64_t gp_verion;

void read_quiescent_state(void)
{
        atomic_set(&reader_reg.version,  atomic_load(&gp_verion));
}

这个函数实现很简单,仅仅是把全局的版本号赋值给本地的reader,这样,在写线程,就可以通过遍历reg_list来发现读线程是否已经完成临界区操作。这里gp_version中的gp指的是Grace Period即一个时间段,这个时间段把新来的访问和旧得访问隔开,在这之前进入临界区的读线程都读到的数据,之后的读线程都读到数据。

reg_offline

简单的把本地变量从reg_list删除即可

void reg_offline(void)
{
        assert(reader_reg.is_reg);
        pthread_mutex_lock(&g_mtx);
        list_remove(&reader_reg);
        bzero(&reader_reg, sizeof(reader_reg));
        pthread_mutex_unlock(&g_mtx);
}

atonic_exchange

写线程完成新数据初始化后,原子的替换掉旧的数据,这既是gp的开始

void atomic_exchange(void *dst, void *src)
{
        asm volatile(
                "mov rax, [rsi];"
                "xchg rax, [rdi];"
                "mov [rsi], rax;"
        );
}

需要注意的是,这里的参数必须是和void *等长的,比如

int x = 1;
int y = 2;
atomic_exhange(&x, &y);

是错误的,因为sizeof(int) != sizeof(void *)

synchronize_reader

这里需要两个操作,一个是增加gp_version,另一个是遍历reg_list等所有的reader_reg的版本等于前面设置的gp_version

static pthread_mutex_t g_sync;

void synchronize_reader(void)
{
        struct reg_node done;

        pthread_mutex_lock(&g_sync);
        atomic_set(&gp_version, gp_version + 1);

        for (;;) {
                for_each(node, &reg_list) {
                        if (atomic_load(&node->version) == gp_version)
                                move_node(node, &done); // remove from reg_list and put into done
                }
                if (list_empty(&reg_list))
                        break;
        }
        // put back to reg_list
        list_splice(&done, &reg_list);
        
        pthread_mutex_unlock(&g_sync);
}

增加gp_version的版本,既是gp的结束。然后写线程一直等注册了的读线程追上新的版本,函数执行结束后,就可以安全的释放旧的资源了。


整个流程基本上就是这样,这时我们可以把最初的例子修改下:

void reader(void)
{
        int cnt = 0;

        reg_online();

        for (int i = 0; i < 10000; ++i) {
                read_lock();
                cnt += *g_data;
                read_unlock();

                // report state
                read_quiescent_state();
        }

        reg_offline();
}

void writer(void)
{
        int *p = NULL;

        for (int i = 10; i < 20; ++i) {
                p = calloc(1, sizeof(*p));
                *p = i;

                // begin GP
                atomic_exchange(g_data, p);

                // end GP and wait readers
                synchronize_reader();

                free(p);
        }
}

这时,读线程仅有一个原子操作的开销,当然如果可以放开要求的话,可以隔一段时间调用一次read_quiescent_state。对于写线程,从实现来看,如果reader没有及时报告状态的话,这里将会有大量的原子操作,一个可行的优化是,在本地循环检测n次后,如果还有reader没有就绪,就使用诸如poll的调用,等待reader来唤醒,这样可以减少CPU的使用。另一个可行的优化是,使用独立的线程用于回收工作,写线程仅仅是向这个线程注册回调。

另外,如果要完整实现的话,需要特别注意:
1,避免编译器的重排
2,避免处理器的重排

如果需要对多个资源做RCU,那么可以考虑使用一个RCU的结构,作为参数传给RCU接口,这个结构中多线程共享的成员可以对齐cache line避免false sharing。

references