《源码解读系列》bRPC bthread

ToC

捕获续体 (Continuation)

setjmp/longjmp

这两个函数来自于 C 标准库 <setjmp.h>,用于实现非本地跳转(non-local goto)的函数,可用于跨越函数调用栈直接跳转,常用于错误恢复、状态机等场景。类似函数之间 goto,调试难度可想而知,工程项目中一般不使用。

基本用法如下:

// jmp.c
#include <setjmp.h>
#include <stdio.h>

jmp_buf env;

void foo(void) {
    printf("Inside foo, before longjmp\n");
    longjmp(env, 1); // 跳回 setjmp 调用处,返回值为 1
                     // 注意:longjmp 之后的代码不会执行!
    printf("This line is NOT printed.\n");
}

int main(void) {
    int r = setjmp(env); // 第一次调用:返回 0
    if (r == 0) {
        printf("First call to setjmp, r=%d\n", r);
        foo();
    } else {
        printf("Returned via longjmp, r=%d\n", r); // 此处 r==1
    }
    return 0;
}

输出:

First call to setjmp, r=0
Inside foo, before longjmp
Returned via longjmp, r=1

实现方法

setjmp / longjmp 的实现本质是手动保存和恢复 CPU 的执行上下文(主要是寄存器状态和栈指针),从而实现“跳过正常函数返回路径”的控制流转移。它不依赖操作系统或语言运行时,而是利用硬件和 ABI 细节,由编译器和 C 库协同完成。

以典型 x86-64 / System V ABI 为例:

jmp_buf env 是一个结构体(或数组),用于保存跳转所需的关键寄存器值,典型内容包括:

typedef struct {
    unsigned long rbx; // callee-saved reg
    unsigned long rbp; // callee-saved reg
    unsigned long r12; // callee-saved reg
    unsigned long r13; // callee-saved reg
    unsigned long r14; // callee-saved reg
    unsigned long r15; // callee-saved reg
    unsigned long rsp; // 当前栈顶指针, callee-saved reg
    unsigned long rip; // PC,setjmp 返回后下一条指令的地址,用于 longjmp 跳转回来
} jmp_buf[1];

伪代码

setjmp:
    mov [env+0], rbx   ; 保存 callee-saved 寄存器
    mov [env+8], rbp
    mov [env+16], r12
    ...
    mov [env+48], rsp  ; 保存栈指针
    lea rax, [rip+?]   ; 取“setjmp 返回后”的指令地址
    mov [env+56], rax  ; 存为 rip
    xor rax, rax       ; 返回 0 (xor rax, rax 比 mov rax, 0 更快)
    ret

伪代码

longjmp:
    mov rbx, [env+0]
    mov rbp, [env+8]
    mov r12, [env+16]
    ...
    mov rsp, [env+48]  ; 恢复栈指针!栈回退到 setjmp 时的状态
    mov rax, rsi       ; rsi = val
    test rax, rax
    jnz 1f             ; val != 0,跳转到 1:
    inc rax            ; val == 0 → 改为 1
1:
    mov rdx, [env+56]  ; 取保存的 rip
    jmp rdx            ; 直接跳过去!

使用 setjmp(env) 捕获的续体通常是一次性的,使用这个原语实现对称协程比较繁琐复杂。 What is the difference between asymmetric and symmetric coroutines?

协程类型

call/cc

搞点函数式编程:First-Class Continuations / Continuation-Passing Style

call/cccall-with-current-continuation 的缩写,是 Scheme 等支持一级续体(first-class continuations)的语言中一个强大而独特的控制流原语。它允许你捕获“当前的计算上下文”(即“续体” continuation),并将其当作一个普通的一级值(函数)进行传递和调用——从而实现非局部跳转、异常处理、生成器、协程等高级控制结构。

(call/cc proc)

1. 查找 list 中符合 (pred x) 的元素 x,找到立即返回:

(define (find-first pred lst)
  (call/cc
    (lambda (return)
      (for-each (lambda (x)
                  (when (pred x)
                    (return x))) ; 找到即返回,退出循环
                lst)
      #f))) ; 未找到

2. 双协程交换打印:

(define one
  (lambda (go)
    (display 1)
    (set! go (call/cc go))
    (display 2)
    (set! go (call/cc go))
    (display 3)
    (set! go (call/cc go))))

(define two
  (lambda (go)
    (display #\a)
    (set! go (call/cc go))
    (display #\b)
    (set! go (call/cc go))
    (display #\c)
    (set! go (call/cc go))))

; (one two) -> 1a2b3c
; (two one) -> a1b2c3

到此为止,读者的工作不是写括号语言,再学下去脑细胞不够用了。

bthread_make_fcontext / bthread_jump_fcontext

这是用于 bthread 的跳转的两个关键原语。

typedef void* bthread_fcontext_t;

// 从当前上下文跳转(jump)到目标上下文 nfc,同时可选地保存当前上下文到 *ofc。
// 这是无返回的跳转(jump),因上下文保存,后续可通过另一次 jump 返回
// Args:
// - ofc: 输出参数,是个指向 bthread 栈指针的指针,当前上下文(%rsp)将被保存到 *ofc。
//        后续可通过 bthread_jump_fcontext 跳回此处。
// - nfc: 目标上下文,是指向 bthread 栈的指针,将要跳转到的执行点(即目标协程的寄存器状
//        态+栈指针等)。通常由 bthread_make_fcontext 创建(首次进入)或前一次 jump 保
//        存而来(恢复)。
// - vp: 传递给目标上下文的“参数”。当目标上下文是首次通过 make_fcontext 创建的函数时,
//       vp 会作为其唯一参数传入;若目标是已挂起的上下文,则 vp 会作为 jump_fcontext 的
//       返回值出现在目标上下文的视角中。
// - preserve_fpu: 是否保存/恢复浮点寄存器(FPU/SSE/AVX 状态)。默认 false(出于性能
//                 考虑,多数协程不涉及浮点密集计算)。设为 true 可保证浮点状态精确恢
//                 复,但开销更大。
// Return:
// - 当该上下文被“跳回”时(即别人 jump 到它),本函数“返回”,返回值即为对方调用
//   jump_fcontext 时传入的 vp 参数。
intptr_t bthread_jump_fcontext(bthread_fcontext_t* ofc,
                               bthread_fcontext_t nfc,
                               intptr_t vp,
                               bool preserve_fpu = false);

// 在指定栈空间上“构造”一个可跳转的上下文,使其下次被 jump_fcontext 激活时,从函数 fn
// 开始执行。
// 这是协程的“入口点构建器”,仅用于首次创建一个可运行的上下文。
// Args:
// - sp: 栈内存的高地址(top)。注意:x86/x86_64 栈向下增长,所以 sp 应指向分配的栈空间
//       的最高地址(即 stack_base + stack_size)。
// - size: 栈空间的字节大小。用于内部栈数据对齐(如 16-byte 对齐),但不负责分配内存
//         —— 用户需自行 malloc/mmap 一块足够大的内存,并保证 sp 正确指向栈顶。
// - fn: 协程入口函数。当该上下文首次被 jump_fcontext 激活时,将从此函数开始执行,且 fn
//       的唯一参数即为 jump_fcontext 中传入的 vp。函数不应返回(返回会导致未定义行为),
//       通常以 jump 到其他上下文结束。
bthread_fcontext_t bthread_make_fcontext(void* sp, size_t size, void (*fn)(intptr_t));

使用方式如下:

// 协程 A 中:
intptr_t r = bthread_jump_fcontext(&ctx_a, ctx_b, 123);
// 后续某刻被 ctx_b 跳回时,r = 456(见下面 B 的调用)

// 协程 B 中:
bthread_jump_fcontext(&ctx_b, ctx_a, 456); // 跳回 A,传 456

// fn 不能是普通 C 函数直接返回 —— 因为返回后无合法返回地址(上下文是伪造的)。通常做法是:
void coroutine_main(intptr_t arg) {
    // do work...
    // 最后 jump 到调度器
    bthread_jump_fcontext(&current_ctx, scheduler_ctx, 0);
    // 若无处可跳,可 exit() 或 abort()
}

典型 x86-64 / System V ABI 实现如下(AT&T):

context.cpp

#if defined(BTHREAD_CONTEXT_PLATFORM_linux_x86_64) && defined(BTHREAD_CONTEXT_COMPILER_gcc)
__asm(
".text\n"
".globl bthread_jump_fcontext\n"
".type bthread_jump_fcontext,@function\n"
".align 16\n"
"bthread_jump_fcontext:\n"
"  pushq %rbp \n" // 按照 System V ABI,保存 callee-saved 寄存器 (%rbp, %rbx, %r12–%r15"  pushq %rbx \n"
"  pushq %r15 \n"
"  pushq %r14 \n"
"  pushq %r13 \n"
"  pushq %r12 \n"
"  leaq -0x8(%rsp),%rsp\n" // 为后续 stmxcsr4 字节 MXCSR)+ fnstcw2 字节 x87 控制字)预留空间(按 8 字节对齐)
"  cmp $0,%rcx\n"           // %rcx 为是否保存 FPU 状态的标志,来自 preserve_fpu
"  je  1f\n"                // 不保存,跳转到 1:
"  stmxcsr (%rsp)\n"        // 保存 SSE 浮点控制状态
"  fnstcw  0x4(%rsp)\n"     // 保存 x87 浮点控制字
"1:\n"
"  movq %rsp,(%rdi)\n"      // 保存当前栈指针到 ofc,ofc 是一个 void** 双重指针
"  movq %rsi,%rsp\n"        // 切换到目标上下文栈 nfc, 从此刻起,栈切换完成,后续所有 pop 都从目标栈中取值
"  cmp $0,%rcx\n"           // 是否恢复 FPU 状态的标志,来自 preserve_fpu
"  je  2f\n"                // 不恢复,跳转到 2:
"  ldmxcsr (%rsp)\n"        // 恢复 SSE 浮点控制状态
"  fldcw  0x4(%rsp)\n"      // 恢复 x87 浮点控制字
"2:\n"
"  leaq  0x8(%rsp),%rsp\n"  // 跳过 FPU 空间(8 字节),指向上次调用 jump 时 push r12
"  popq %r12 \n"            // 恢复 6 个 callee-saved 寄存器
"  popq %r13 \n"
"  popq %r14 \n"
"  popq %r15 \n"
"  popq %rbx \n"
"  popq %rbp \n"
"  popq %r8\n"              // 弹出返回地址到 r8, 此时 %rsp 指向目标栈上保存的“返回地址”(即该协程上次调用 jump 时 push 的 %rip,但此处用 pop %r8 而非 ret!)
"  movq %rdx,%rax\n"        // vp → %rax(作为返回值)
"  movq %rdx,%rdi\n"        // 若目标地址是一段 C 函数入口(如首次进入协程),传给目标函数的第 1 个参数(按 ABI)
"  jmp *%r8\n"              // 跳转到此前保存的 %rip(即目标协程挂起的位置)
".size bthread_jump_fcontext,.-bthread_jump_fcontext\n" // 记录函数大小,供调试/链接使用
".section .note.GNU-stack,\"\",%progbits\n"             // 标记该目标文件不需要可执行栈
".previous\n"
);
#endif

#if defined(BTHREAD_CONTEXT_PLATFORM_linux_x86_64) && defined(BTHREAD_CONTEXT_COMPILER_gcc)
__asm(
".text\n"
".globl bthread_make_fcontext\n"
".type bthread_make_fcontext,@function\n"
".align 16\n"
"bthread_make_fcontext:\n"
"  movq %rdi,%rax\n"        // %rdi = sp,存入 %rax(栈底高地址,后续返回的 %rax 表示 bthread_fcontext_t)
"  andq $-16,%rax\n"        // 向下对齐到 16 字节, ABI 要求函数调用前 %rsp 16 字节对齐。为保证首次进入 fn 时栈对齐合法,初始 %rsp 必须是 16 字节对齐。
"  leaq -0x48(%rax),%rax\n" // 预留上下文保存空间 72 字节
"  movq %rdx, 0x38(%rax)\n" // %rdx = fn, 写入 %rax + 56
"  stmxcsr (%rax)\n"        // MXCSR → %rax + 0
"  fnstcw  0x4(%rax)\n"     // FPU CW → %rax + 4
"  leaq  finish(%rip),%rcx\n" // x86-64 位置无关代码(PIC)标准写法,计算 finish 相对于当前 %rip 的偏移,得到绝对地址,写入 %rcx
"  movq %rcx, 0x40(%rax)\n" // %rcx 写入 %rax + 64
"  ret\n"                   // 返回 %rax,即 bthread_fcontext_t
"finish:\n"                 // finsish 地址放到栈顶,保护用的,PC 跳到这个地方就会停止进程
"  xorq %rdi,%rdi\n"        // 清零 %rdi -> _exit(0)
"  call _exit@PLT\n"
"  hlt\n"                   // 如果 _exit 出问题,没退出,陷入停机
".size bthread_make_fcontext,.-bthread_make_fcontext\n"
".section .note.GNU-stack,\"\",%progbits\n"
".previous\n"
);
#endif

Bthread 栈结构

分析上面的汇编,得知 bthread 在被挂起时会将所有 callee-saved reg 放到栈上

由此可得,初始化时一个 bthread 栈结构如下:

            High Address
┌──────────────────────────────────┐  ← void* sp(分配 Stack 内存的最⾼地址)
│ ... (unused stack)               │
├──────────────────────────────────┤  ← sp_top (对⻬ 16 字节)
│        finish:                   │  ← finish: 绝对地址
│        void (*fn)(intptr_t) (8B) │  ← coroutine_main ⼊⼝函数指针
│        %rip (8B)                 │  ← callee-saved reg
│        %rbp (8B)                 │
│        %rbx (8B)                 │
│        %r15 (8B)                 │
│        %r14 (8B)                 │
│        %r13 (8B)                 │
│        %r12 (8B)                 │
├──────────────────────────────────┤
│ PAD (2B)FPU_CW (2B) │ MXCSR (4B) │  ← FPU 状态字
├──────────────────────────────────┤  ← bthread_fcontext_t* ctx
│ ... (unused stack)               │
│ ... (unused stack)               │
...
            Low Address

一个嵌套很多层函数的 bthread 被挂起时栈的结构如下:

            High Address
┌──────────────────────────────────┐  ← void* sp(分配 Stack 内存的最⾼地址)
│ ... (unused stack)               │
├──────────────────────────────────┤  ← sp_top (对⻬ 16 字节)
│        finish:                   │  ← finish: 绝对地址
...
│        fn stacks  ...            │
│        fn stacks  ...            │
...
│        %rip (8B)                 │  ← call bthread_jump_fcontext ⾃动时 push 的 %rip,指向当前 bthread 的续体(挂起时下⼀条指令)
│        %rbp (8B)                 │
│        %rbx (8B)                 │
│        %r15 (8B)                 │
│        %r14 (8B)                 │
│        %r13 (8B)                 │
│        %r12 (8B)                 │
├──────────────────────────────────┤
│ PAD (2B)FPU_CW (2B) │ MXCSR (4B) │  ← FPU 状态字
├──────────────────────────────────┤  ← bthread_fcontext_t* ctx
│ ... (unused stack)               │
│ ... (unused stack)               │
...
            Low Address

恢复 bthread(或者初始化运行),从 bthread_fcontext_t* ctx 指向的位置开始,向上恢复寄存器现场,然后 jmp 到之前保存的 %rip(或主函数 fn 上),顺着之前的栈继续执行。

组件结构

Stack 组件

StackStorage 存储 Stack 内存的结构,定义如下:

struct StackStorage {
    unsigned stacksize;
    unsigned guardsize;
    void* bottom;
    unsigned valgrind_stack_id;

    // Clears all members.
    void zeroize() {
        stacksize = 0;
        guardsize = 0;
        bottom = NULL;
        valgrind_stack_id = 0;
    }
};

// Allocate a piece of stack.
int allocate_stack_storage(StackStorage* s, int stacksize, int guardsize);
// Deallocate a piece of stack. Parameters MUST be returned or set by the
// corresponding allocate_stack_storage() otherwise behavior is undefined.
void deallocate_stack_storage(StackStorage* s);

分配方式如下:

ContextualStack 将几个相关的组件包装起来,表示一个可上下文切换的栈对象,拥有栈空间所有权。

enum StackType {
    STACK_TYPE_MAIN = 0,
    STACK_TYPE_PTHREAD = BTHREAD_STACKTYPE_PTHREAD, // 特殊 stack,bthread stack 分配失败了,task 直接在 pthread 里运行
    STACK_TYPE_SMALL = BTHREAD_STACKTYPE_SMALL,
    STACK_TYPE_NORMAL = BTHREAD_STACKTYPE_NORMAL,
    STACK_TYPE_LARGE = BTHREAD_STACKTYPE_LARGE
};

struct ContextualStack {
    virtual ~ContextualStack() = default;
    bthread_fcontext_t context; // 指向栈顶的指针,用于上下文切换
    StackType stacktype;
    StackStorage storage;
};

// Get a stack in the `type' and run `entry' at the first time that the
// stack is jumped.
ContextualStack* get_stack(StackType type, void (*entry)(intptr_t));
// Recycle a stack. NULL does nothing.
void return_stack(ContextualStack*);

// Jump from stack `from' to stack `to'. `from' must be the stack of callsite
// (to save contexts before jumping)
inline void jump_stack(ContextualStack* from, ContextualStack* to) {
    bthread_jump_fcontext(&from->context, to->context, 0/*not skip remained*/);
}

get_stack 根据不同的 StackType 可以构建不同大小的 Stack,每个 StackType 都有一个对象池用于减少频繁创建销毁的开销(除了 STACK_TYPE_MAINSTACK_TYPE_PTHREAD),默认情况下栈大小:

STACK_TYPE_MAIN 分配规则不太一样:仅分配一个 ContextualStack 对象,不会分配栈空间,作为标记使用。

STACK_TYPE_PTHREAD 则不会分配对象,返回 NULL 指针,也可作为标记使用。

TaskGroup 组件

调度操作的核心组件,每个 TaskGroup 绑定一个 pthread,它是 thread local 的,全局有个 TaskControl 结构管理所有的 TaskGroup

每个 TaskGroup 有两个队列:

队列作用访问方式特点
rq_ (run queue)本地待运行 bthread无锁(仅本线程访问)WorkStealingQueue,FIFO/LIFO(取决于 BTHREAD_FAIR_WSQ
remote_rq_其他线程提交的任务带锁(bthread::MutexBoundedQueue,FIFO

每个 TaskGroup 有一个 TaskMeta 指针 _cur_meta,指向了当前运行的 Task,它包含上文中提到的 ContextualStack,用户的函数(若有),以及一些控制字段。TaskMeta 对象由一个资源池管理,可用 bthread_id 的后 32 位进行索引。

TaskGroupTaskControl::create_group(bthread_tag_t tag) 创建,构造过程如下:

  1. TaskGroup::rq_ 初始化为大小 runqueue_capacity(默认为 FLAGS_task_group_runqueue_capacity=4096)。
  2. TaskGroup::remote_rq_ 初始化为大小 runqueue_capacity / 2
  3. 初始化一个 MainStack(不分配实际栈空间,作为特殊的 idle task),并将 TaskGroup::_cur_meta 指向该任务。
  4. 根据 tag 分组选择一个 ParkingLot 并关联至该 TaskGroup
    • 选取方式为 _pl[tag][butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM]
    • 其中 PARKING_LOT_NUM=4FLAGS_task_group_ntags 默认为 1。
    • ParkingLotfutexstate 构成,用于等待指定 state 满足。
  5. 将该 TaskGrouptag 分组存入 TaskControl::_tagged_ngroup

TaskGroup 销毁时从 TaskControl::_tagged_ngroup 摘除,不过默认 FLAGS_task_group_delete_delay=1s 后才析构 TaskGroup 对象,因为 steal_task 并不持有保护 _tagged_ngroup 的锁,避免 steal_taskTaskGroup 销毁 race。

TaskControl 全局唯一,内部有一个按 tag 分片的 WorkStealingQueue,名为 _priority_queues,但事实上是个 FIFO,跨 TaskGroup 偷取时会尝试先从这个 queue 中偷取,比从其他 TaskGrouprq_remote_rq_ 偷取有更高的优先级。

它通过 get_or_new_task_control() 获取,它的构造过程如下:

  1. 初始化 concurrency = FLAGS_bthread_min_concurrency(> 0)或者 FLAGS_bthread_concurrency
  2. 初始化每个 tag 分片的 WorkStealingQueue
  3. 初始化 Global TimerThread,是个 pthread,用于 sleep 等 timer 信号唤醒。
  4. 创建 concurrencypthread worker,每个 worker 都会创建一个本地 TaskGroup。 a. 按照 concurrency round-robin 给每个 tag 分配 worker
  5. 等待一个 worker 启动,防止 choose_one_group() 返回 NULL

TaskControl::worker_thread 入口:

  1. 运行 g_worker_startfn()g_tagged_worker_startfn(tag) 用户注册的回调函数。 a. 这是 Unstable API。
  2. 创建 Worker 的 TaskGroup(见上文 TaskGroup 创建过程)。
  3. 执行 TaskGroup::run_main_taskrun_main_task 退出时 worker 终止,销毁 worker。 a. 循环逻辑为:
bthread_t tid;
while (wait_task(&tid)) {
    sched_to(this, tid); // 跳转到 tid 执行
    DCHECK_EQ(_cur_meta->stack, _main_stack); // 当前不运行任何 bthread,应当是 _main_stack

    if (_cur_meta->tid != _main_tid) {
        // 但是 tid 不是 _main_tid,说明 sched_to 修改了,
        // 但却没有跳转成功,直接在 pthread 中执行 bthread 主函数
        // 并且不用处理 remained 回调(见后续内容)
        task_runner(1/*skip remained*/);
    }
}

TaskGroup::wait_task 实现:

bool TaskGroup::wait_task(bthread_t* tid) {
    do {
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
        // 保存了 parking state,读取上一次的 pl_state 即可
        if (_last_pl_state.stopped()) {
            return false;
        }
        // 等待 state 发生变化,变化了说明有 task 准备好了,可以偷了
        _pl->wait(_last_pl_state);
        if (steal_task(tid)) {
            // 拿到了一个 task,返回
            return true;
        }
#else
        // 没有保存上一个 state,读取最新的 state
        const ParkingLot::State st = _pl->get_state();
        if (st.stopped()) {
            return false;
        }
        if (steal_task(tid)) {
            // 拿到了一个 task,返回
            return true;
        }
        // 没偷成功,等待 state 发生变化,位置和上面不一样,因为 state 是最新的
        _pl->wait(st);
#endif
    } while (true);
}

TaskGroup::steal_task 实现:

bool steal_task(bthread_t* tid) {
    if (_remote_rq.pop(tid)) {
        // 从 _remote_rq 里拿到了一个 task,返回
        return true;
    }
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
    // 保存当前 state,下一次调用 wait_task 时则是上一次 state
    _last_pl_state = _pl->get_state();
#endif
    // 否则从其他 worker 那里偷一个过来
    return _control->steal_task(tid, &_steal_seed, _steal_offset);
}

bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
    auto tag = tls_task_group->tag();
    // 先从对应 tag 的 FIFO queue 里尝试偷一个,这里的偷取可能有 FP
    if (_priority_queues[tag].steal(tid)) {
        return true;
    }
    ...
    // 获取 tag 所有的 group,从它们的 _rq 或者 _remote_rq 里偷一个
    bool stolen = false;
    size_t s = *seed;
    auto& groups = tag_group(tag);
    for (size_t i = 0; i < ngroup; ++i, s += offset) {
        TaskGroup* g = groups[s % ngroup]; // g is possibly NULL because of concurrent_destroy_group
        if (g) {
            if (g->_rq.steal(tid)) {
                stolen = true;
                break;
            }
            if (g->_remote_rq.pop(tid)) {
                stolen = true;
                break;
            }
        }
    }
    *seed = s; // 每尝试一次偷取,seed 会加 offset
    return stolen;
}

TaskGroup::sched_to 实现:

inline void TaskGroup::sched_to(TaskGroup** pg, bthread_t next_tid) {
    TaskMeta* next_meta = address_meta(next_tid); // 从 pool 里拿到了 task_meta
    if (next_meta->stack == NULL) { // stack 未初始化
        // 分配 stack,实现见上文,task_runner 作为 bthread 的入口函数
        ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
        if (stk) {
            next_meta->set_stack(stk);
        } else {
            // stk 指针为空,说明是 STACKTYPE_PTHREAD,或者 OOM 了,分配不了新的
            // stack。对于后者,强行把这个 task 改成 PTHREAD 模式,会在上文直接
            // 在 pthread 里执行这个 task
            // 同时把 meta 里的 stack 指针重置为 _main_stack,作为标记
            next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
            next_meta->set_stack((*pg)->_main_stack);
        }
    }
    sched_to(pg, next_meta);
}

// 这并非一个普通的函数,在执行这个函数时会跳到其他栈上执行其他栈的函数,当这个函数跳转
// 回来的时候,运行它的线程可能已经换了,或者 jump_stack 后永远不会再回来了
// (bthread 执行完毕之后被回收了,不会 jump 回去了)
void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
    TaskGroup* g = *pg;
    TaskMeta* const cur_meta = g->_cur_meta;

    // Switch to the task
    if (__builtin_expect(next_meta != cur_meta, 1)) {
        g->_cur_meta = next_meta; // 设置当前的 task 为 next
        // 切换 tls,指向下一个 task 的 local storage
        cur_meta->local_storage = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls);
        BAIDU_SET_VOLATILE_THREAD_LOCAL(tls_bls, next_meta->local_storage);

        if (cur_meta->stack != NULL) {
            // cur_meta->stack != NULL 说明不是 STACKTYPE_PTHREAD
            if (next_meta->stack != cur_meta->stack) {
                // stack 不一样,说明没复用栈 or 不是 _main_stack,跳转过去使用其他栈
                jump_stack(cur_meta->stack, next_meta->stack);
                // 跳回来了,group 可能变了(被其他 worker 偷走了),从内存里读取最新的 tls group
                g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
            }
        }
    } else {
        LOG(FATAL) << "bthread=" << g->current_tid() << " sched_to itself!";
    }

    // 执行一些回调,通常是之前的一些 task 被挂起了,
    // 一些任务需要推迟到下一次上下文切换时执行,
    // sched_to 的时候执行它的回调
    while (g->_last_context_remained) {
        RemainedFn fn = g->_last_context_remained;
        g->_last_context_remained = NULL;
        fn(g->_last_context_remained_arg);
        // 同样,group 可能变了(回调内部有 sched_to),从内存里读取最新的 tls group
        g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
    }
    *pg = g;
}

TaskGroup::task_runner 实现

// bthread 的入口函数
void TaskGroup::task_runner(intptr_t skip_remained) {
    // NOTE: tls_task_group is volatile since tasks are moved around
    // different groups.
    TaskGroup* g = tls_task_group;
    if (!skip_remained) {
        // 如果不 skip(新建了一个 bthread 并执行),执行之前注册的回调
        while (g->_last_context_remained) {
            RemainedFn fn = g->_last_context_remained;
            g->_last_context_remained = NULL;
            fn(g->_last_context_remained_arg);
            // 同样,group 可能变了
            g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
        }
    }
    do {
        // Meta and identifier of the task is persistent in this run.
        TaskMeta* const m = g->_cur_meta;

        // 执行用户的主函数,只会抓 bthread_exit() 抛出的 ExitException
        void* thread_return;
        try {
            // 当下面这个函数返回,说明用户 bthread 已经执行完毕
            // 注意执行用户 bthread 时可能会切到其他的 bthread,
            // 并非只运行这一个 bthread,m->fn 是最外面的栈,当它
            // 弹出时,说明这个 bthread 已经结束了
            thread_return = m->fn(m->arg);
        } catch (ExitException& e) {
            thread_return = e.value();
        }
        ... // Thread local storage 相关

        // 执行完 m->fn 后 group 可能变了
        g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);

        {
            // 确保 version > 0,防止 tid 为 0,0 不合法
            BAIDU_SCOPED_LOCK(m->version_lock);
            if (0 == ++*m->version_butex) {
                ++*m->version_butex;
            }
        }
        butex_wake_except(m->version_butex, 0); // 唤醒,相关逻辑见后面 Butex
        g->set_remained(_release_last_context, m); // 注册回调 _release_last_context,回收资源
        ending_sched(&g);
    } while (g->_cur_meta->tid != g->_main_tid);

    // 正常作为 bthread 入口函数执行不会到这里,有点费解,不过 bthread 入口函数不能弹出,
    // 只有在 TaskGroup::run_main_task 直接里执行 task_runner(见上文)才会到这里,
    // 可以退出,执行其他的 task

    // Was called from a pthread and we don't have BTHREAD_STACKTYPE_PTHREAD
    // tasks to run, quit for more tasks.
}

// 这个回调用于回收 bthread 的栈,回收 TaskMeta 对象
void TaskGroup::_release_last_context(void* arg) {
    TaskMeta* m = static_cast<TaskMeta*>(arg);
    if (m->stack_type() != STACK_TYPE_PTHREAD) {
        // 放回 Stack 对象池内
        return_stack(m->release_stack()/*may be NULL*/);
    } else {
        // it's _main_stack, don't return.
        m->set_stack(NULL);
    }
    // 回收 TaskMeta 对象
    return_resource(get_slot(m->tid));
}

TaskGroup::ending_sched 实现

void TaskGroup::ending_sched(TaskGroup** pg) {
    TaskGroup* g = *pg;
    bthread_t next_tid = 0;
    // Find next task to run, if none, switch to idle thread of the group.
#ifndef BTHREAD_FAIR_WSQ
    // When BTHREAD_FAIR_WSQ is defined, profiling shows that cpu cost of
    // WSQ::steal() in example/multi_threaded_echo_c++ changes from 1.9%
    // to 2.9%
    // LIFO 不公平调度,但是 cache 友好
    // XXX:这里是安全的吗?不会和其他 _rq->push 冲突吗?
    const bool popped = g->_rq.pop(&next_tid);
#else
    // FIFO 公平调度,steal 是并发安全的
    const bool popped = g->_rq.steal(&next_tid);
#endif
    if (!popped && !g->steal_task(&next_tid)) {
        // Jump to main task if there's no task to run.
        next_tid = g->_main_tid;
    }
    TaskMeta* const cur_meta = g->_cur_meta; // 准备结束的 bthread meta
    TaskMeta* next_meta = address_meta(next_tid); // 下一个 task

    if (next_meta->stack == NULL) { // 下一个 task 没初始化,或者是 pthread stack
        if (next_meta->stack_type() == cur_meta->stack_type()) {
            // 复用当前准备结束的 bthread 的 stack
            // 如果是 pthread stask,_main_stack 是空的,复用是个空操作
            // Reuse the stack of the current ending task.
            // also works with pthread_task scheduling to pthread_task, the
            // transfered stack is just _main_stack.
            next_meta->set_stack(cur_meta->release_stack());
        } else {
            // 和上面 sched_to 类似逻辑,分配 task
            ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
            if (stk) {
                next_meta->set_stack(stk);
            } else {
                next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
                next_meta->set_stack(g->_main_stack);
            }
        }
    }
    // 切到下一个 task
    sched_to(pg, next_meta);
}

自此,worker 的后台调度逻辑形成了一个闭环,这里总结一下: 下文中 TaskGroup、worker 指代同一个概念,即运行 Task 的主体。

TaskGroup 除了上文的调度实现,还有一些 Task Dispatch/Suspend 逻辑,用户可以使用 bthread_start_urgent 或者 bthread_start_background Dispatch 一个 Task,使用 bthread_usleep 或者 bthread_yield Suspend 一个 Task。

例如 bthread_start_urgentbthread_start_background 的实现逻辑大概如下:

  1. 获取当前 Thread Local 的 TaskGroup,如果 TaskGroup 存在,且 tag 匹配,则调用 TaskGroup::start_foreground / TaskGroup::start_background<false>。 a. <false> 表示没有跨线程。
  2. 否则获取全局的 TaskControl,找到 tag 匹配的 TaskGroup 调用 TaskGroup::start_background<true>。 a. <true> 表示跨线程。 b. 额外的,如果 attr & BTHREAD_NOSIGNAL,则选择一个全局特殊的 tls_task_group_nosignal TaskGroup 调用 start_background,并且调用完使用 bthread_flush()(unstable API)驱动唤醒。这种模式适合 batch dispatch bthread。

start_foregroundstart_background 的区别在于前者会直接 sched_to 新的 Task,并且要求只能作用于本地的 TaskGroup,不能跨线程;后者为常用的 API,会将 Task 插入到队列末尾,只看后者实现:

template <bool REMOTE>
int TaskGroup::start_background(bthread_t* __restrict th,
                                const bthread_attr_t* __restrict attr,
                                void* (*fn)(void*),
                                void* __restrict arg) {
    const bthread_attr_t using_attr = (attr ? *attr : BTHREAD_ATTR_NORMAL);
    butil::ResourceId<TaskMeta> slot;
    TaskMeta* m = butil::get_resource(&slot);
    ...
    m->fn = fn;
    m->arg = arg;
    m->tid = make_tid(*m->version_butex, slot);
    *th = m->tid;
    ...
    if (REMOTE) {
        ready_to_run_remote(m, (using_attr.flags & BTHREAD_NOSIGNAL));
    } else {
        ready_to_run(m, (using_attr.flags & BTHREAD_NOSIGNAL));
    }
    return 0;
}

void TaskGroup::ready_to_run_remote(TaskMeta* meta, bool nosignal) {
    _remote_rq._mutex.lock(); // _remote_rq 访问需要上锁
    while (!_remote_rq.push_locked(meta->tid)) {
        flush_nosignal_tasks_remote_locked(_remote_rq._mutex); // _remote_rq is full
        ::usleep(1000);
        _remote_rq._mutex.lock();
    }
    if (nosignal) {
        ++_remote_num_nosignal;
        _remote_rq._mutex.unlock();
    } else {
        // 累积过去未 signal 的次数
        const int additional_signal = _remote_num_nosignal;
        _remote_num_nosignal = 0;
        _remote_rq._mutex.unlock();
        _control->signal_task(1 + additional_signal, _tag);
    }
}

void TaskGroup::ready_to_run(TaskMeta* meta, bool nosignal) {
    push_rq(meta->tid);
    if (nosignal) {
        ++_num_nosignal;
    } else {
        const int additional_signal = _num_nosignal;
        _num_nosignal = 0;
        _control->signal_task(1 + additional_signal, _tag);
    }
}

它们都统一走到了 TaskControl::signal_task 下,这个函数的实现如下:

void TaskControl::signal_task(int num_task, bthread_tag_t tag) {
    // TODO(gejun):当前算法并不能保证创建足够多的线程以满足调用方的需求;
    // 但另一方面,依据当前实现,也存在大量无谓的唤醒
    // 对并发度进行限制,是在调度性能与任务及时性之间取得的良好平衡。
    if (num_task > 2) {
        num_task = 2; // Magic!
    }
    auto& pl = tag_pl(tag); // 拿到这个 tag 下的所有 parking-lot,默认一共 4 个
    // 新建 Group 的时候是随机取得,这里保证公平性也随机选一个开始
    int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM;
    num_task -= pl[start_index].signal(1);
    if (num_task > 0) {
        // 继续唤醒更多 worker
        for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) {
            if (++start_index >= PARKING_LOT_NUM) {
                start_index = 0;
            }
            num_task -= pl[start_index].signal(1);
        }
    }
    if (num_task > 0 && FLAGS_bthread_min_concurrency > 0 && // test min_concurrency for performance
        _concurrency.load(butil::memory_order_relaxed) < FLAGS_bthread_concurrency) {
        // 仍然不够,但发现当初以 FLAGS_bthread_min_concurrency 初始化的 worker 数量,
        // 准备加更多的 worker
        BAIDU_SCOPED_LOCK(g_task_control_mutex);
        if (_concurrency.load(butil::memory_order_acquire) < FLAGS_bthread_concurrency) {
            add_workers(1, tag);
        }
    }
}

另一个常用 API bthread_usleep 的实现逻辑如下:

int TaskGroup::usleep(TaskGroup** pg, uint64_t timeout_us) {
    if (0 == timeout_us) {
        yield(pg);
        return 0;
    }
    TaskGroup* g = *pg;
    // We have to schedule timer after we switched to next bthread otherwise
    // the timer may wake up(jump to) current still-running context.
    // sched_to 一个 running 的 task 是非法的,必须保证先把当前任务挂起,让
    // 下一个任务注册 sleep_event
    SleepArgs e = {timeout_us, g->current_tid(), g->current_task(), g};
    g->set_remained(_add_sleep_event, &e);
    // 在 rq_ 中选择下一个任务/或者 steal 一个切换过去,下一个任务从 sched_to 中的
    // jump_stack 处恢复 or 开始执行 task_runner(0),这两个函数都会执行上面注册
    // 的回调,添加 sleep_event
    sched(pg);
    g = *pg; // 从其他 woker 那恢复,pg 可能变了
    ...
    return 0;
}

// yield 就很简单了,挂起当前 Task,下一个 Task 会把当前 Task 放到 rq_ 里,
// 并唤醒 workers 去拿
void TaskGroup::yield(TaskGroup** pg) {
    TaskGroup* g = *pg;
    ReadyToRunArgs args = {g->tag(), g->_cur_meta, false};
    g->set_remained(ready_to_run_in_worker, &args);
    sched(pg);
}

void TaskGroup::_add_sleep_event(void* void_args) {
    // Must copy SleepArgs. After calling TimerThread::schedule(), previous
    // thread may be stolen by a worker immediately and the on-stack SleepArgs
    // will be gone.
    SleepArgs e = *static_cast<SleepArgs*>(void_args);
    TaskGroup* g = e.group;
    TimerThread::TaskId sleep_id;
    // 放到之前提到的全局 timer pthread 里调度,唤醒后执行回调
    // ready_to_run_from_timer_thread
    sleep_id = get_global_timer_thread()->schedule(
        ready_to_run_from_timer_thread, void_args,
        butil::microseconds_from_now(e.timeout_us));
    ... // interrupt 处理
}

static void ready_to_run_from_timer_thread(void* arg) {
    CHECK(tls_task_group == NULL); // 当前是在 timer pthread 里,没有 tls task group
    const SleepArgs* e = static_cast<const SleepArgs*>(arg);
    auto g = e->group;
    auto tag = g->tag();
    // 找到对应 tag 的 group,推到 remote_rq 里
    g->control()->choose_one_group(tag)->ready_to_run_remote(e->meta);
}

ExecutionQueue

ExecutionQueue 提供了一个 FIFO 的执行队列,会按顺序提交 task 到 bthread 中执行。

默认情况下内部使用 bthread_start_background(可以支持在没有 TLS TaskGroup 环境中提交 task),启动 _execute_tasks,迭代 task node 链表。

代码写的有点乱,看不太懂,链表是无锁的,提交 task 过程仅涉及原子操作。支持 high_priority task 插队。

不太一样的是用户的 task hanlder 函数拿到的是一个 task 迭代器,而非某个 task,这对 cache locality 比较友好,也方便用户攒 batch 处理:

int demo_execute(void* meta, TaskIterator<T>& iter) {
    if (iter.is_queue_stopped()) {
        // destroy meta and related resources
        return 0;
    }
    for (; iter; ++iter) {
        // do_something(*iter)
        // or do_something(iter->a_member_of_T)
    }
    return 0;
}

Bthread 提供这个组件带有一个执行器角色,使用更底层的原语,比如在 channel 上包装一层,下面的 golang 也能实现一样的逻辑:

// worker goroutine(执行器)
go func(){
    for task := range queue {
        doSomething(task)
    }
}()

// submit tasks
queue <- task1
queue <- task2

而 bthread 并没有提供 channel(bounded channel)这一种原语,文档里阐述了部分原因(读者认为这限制了一些自由,多个 channel 可以多路复用共享一个执行器,ExecutionQueue 这种绑定方式就无法实现了):

Q:bthread 会有 Channel 吗? 不会。channel 代表的是两点间的关系,而很多现实问题是多点的,这个时候使用 channel 最自然的解决方案就是:有一个角色负责操作某件事情或某个资源,其他线程都通过 channel 向这个角色发号施令。如果我们在程序中设置 N 个角色,让它们各司其职,那么程序就能分类有序地运转下去。所以使用 channel 的潜台词就是把程序划分为不同的角色。channel 固然直观,但是有代价:额外的上下文切换。做成任何事情都得等到被调用处被调度,处理,回复,调用处才能继续。这个再怎么优化,再怎么尊重 cache locality,也是有明显开销的。另外一个现实是:用 channel 的代码也不好写。由于业务一致性的限制,一些资源往往被绑定在一起,所以一个角色很可能身兼数职,但它做一件事情时便无法做另一件事情,而事情又有优先级。各种打断、跳出、继续形成的最终代码异常复杂。 我们需要的往往是 buffered channel,扮演的是队列和有序执行的作用,bthread 提供了 ExecutionQueue,可以完成这个目的。

并发原语

Butex / bthread::Mutex / bthread::Condvar

Butex 相关组件为 bthread 提供了类似 futex 的并发原语,它可以在阻塞的时候挂起当前 bthread,而非阻塞 pthread worker。Butex 这个组件仅供 brpc 内部使用,外面使用 bthread::Mutex,它提供 pthread_mutex 类似的接口。

Butex 的结构如下,字段 value 的指针会作为这个对象的 handle 在各个地方被使用,而非 Butex 指针:

struct Butex {
    butil::atomic<int> value;
    butil::LinkedList<ButexWaiter> waiters;
    FastPthreadMutex waiter_lock;
};

Butex 的 API 也和 futex 一一对应,内部如果操作的是 pthread 则 fallback 到 futex API:

// futex_op:
// FUTEX_WAIT 如果 *uaddr == val,则挂起当前线程,直到被 FUTEX_WAKE 唤醒
// FUTEX_WAKE 唤醒最多 val 个在 uaddr 上等待的线程
// FUTEX_WAIT_BITSET 更灵活的等待(可指定唤醒掩码)
// FUTEX_WAKE_BITSET 按位掩码唤醒特定等待者
// FUTEX_LOCK_PI / FUTEX_UNLOCK_PI 支持优先级继承(Priority Inheritance)的互斥锁
// FUTEX_CMP_REQUEUE 条件性地将等待者从一个 futex 队列移到另一个(用于实现条件变量)
int futex(int* uaddr, int futex_op, int val, const struct timespec* timeout, /* or: uint32_t val2 */ int* uaddr2, int val3);

// butex:
int butex_wake(void* butex, bool nosignal = false);
int butex_wake_n(void* butex, size_t n, bool nosignal = false);
int butex_wake_all(void* butex, bool nosignal = false);
int butex_wake_except(void* butex, bthread_t excluded_bthread);

// Wake up at most 1 thread waiting on |butex1|, let all other threads wait
// on |butex2| instead.
// Returns # of threads woken up.
int butex_requeue(void* butex1, void* butex2);

// Returns 0 on success, -1 otherwise and errno is set.
int butex_wait(void* butex, int expected_value, const timespec* abstime, // Different from FUTEX_WAIT, butex_wait uses absolute time.
               bool prepend = false); // If |prepend| is true, queue the bthread at the head of the queue,

和 brpc 很多组件一样,Butex 对象也有个对象池负责缓冲分配和回收,不过这个对象池还有其他作用,这个我们后面会谈到,先看下这三个关键操作 wake, wait, requeue 是如何实现的:

int butex_wake(void* arg, bool nosignal) {
    //.. 上锁 Butex::waiter_lock,拿到第一个 waiters: front
    ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
    // 如果有 sleep 任务,则取消调度 sleep,说明 wait->wake 很快
    unsleep_if_necessary(bbw, get_global_timer_thread());
    // 拿到对应 tag 的 task_group,会随机选中
    TaskGroup* g = get_task_group(bbw->control, bbw->tag);
    if (g == tls_task_group) {
        if (!nosignal) {
            // cur_meta 和 next_meta 替换执行,比起唤醒,这种方式调度 next_meta
            // 更快
            TaskGroup::exchange(&g, next_meta);
        } else {
            // 提交到本地 rq 里,不进行唤醒 worker
            g->ready_to_run(next_meta, true);
        }
    } else {
        // 提交到 remote_rq_ 里
        // nosignal = nosignal && tls_task_group ?->tag() == g->tag()
        // 跨 tag wakeup 会 signal
        g->ready_to_run_remote(bbw->task_meta, check_nosignal(nosignal, g->tag()));
    }
    return 1;
}
// butex_wake_n, butex_wake_all, butex_wake_except 类似,细节上就不展开了

int butex_requeue(void* arg, void* arg2) {
    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
    Butex* m = container_of(static_cast<butil::atomic<int>*>(arg2), Butex, value);
    ButexWaiter* front = NULL;
    {
        // lck1, lck2 构建,延迟上锁
        std::unique_lock<FastPthreadMutex> lck1(b->waiter_lock, std::defer_lock);
        std::unique_lock<FastPthreadMutex> lck2(m->waiter_lock, std::defer_lock);
        // 这里比较 lck1 lck2 内存地址,按顺序上锁,避免死锁
        butil::double_lock(lck1, lck2);
        if (b->waiters.empty()) {
            return 0;
        }
        // 唤醒 front
        front = b->waiters.head()->value();
        front->RemoveFromList();
        front->container.store(NULL, butil::memory_order_relaxed);
        // requeue 剩余的
        while (!b->waiters.empty()) {
            ButexWaiter* bw = b->waiters.head()->value();
            bw->RemoveFromList();
            m->waiters.Append(bw);
            bw->container.store(m, butil::memory_order_relaxed);
        }
    }
    //... 和上面 wake front 一样
}

Wakeup 的实现都比较 naive,wait 的实现就要考虑非常多的 corner case 了,特别是支持 interrupt 和 pthread fallback 的代码片段,总体来说,它符合以下模型:

  1. 检查 val->load() == expected_value,如果不等于设置 errno = EAGAIN,返回 -1,调用方需要重新检查。
  2. TLS TaskGroup 切换到下一个 task。
  3. 将之前的任务注册进 Butex::waiters,如果有 timeout,则 schedule 一个 timer task wakeup
// Returns 0 on success, -1 otherwise and errno is set.
int butex_wait(void* arg, int expected_value, const timespec* abstime, bool prepend) {
    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
    if (b->value.load(butil::memory_order_relaxed) != expected_value) {
        errno = EWOULDBLOCK;
        // Sometimes we may take actions immediately after unmatched butex,
        // this fence makes sure that we see changes before changing butex.
        butil::atomic_thread_fence(butil::memory_order_acquire);
        return -1;
    }
    TaskGroup* g = tls_task_group;
    //...
    ButexBthreadWaiter bbw;
    // tid is 0 iff the thread is non-bthread
    bbw.tid = g->current_tid();
    bbw.container.store(NULL, butil::memory_order_relaxed);
    bbw.task_meta = g->current_task();
    bbw.sleep_id = 0;
    bbw.waiter_state = WAITER_STATE_READY;
    bbw.expected_value = expected_value;
    bbw.initial_butex = b;
    bbw.control = g->control();
    bbw.abstime = abstime;
    bbw.tag = g->tag();
    //...
    // release fence matches with acquire fence in interrupt_and_consume_waiters
    // in task_group.cpp to guarantee visibility of `interrupted'.
    bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release);
    WaitForButexArgs args{&bbw, prepend};
    // 下一个 task 会执行回调 wait_for_butex 当前的 task
    g->set_remained(wait_for_butex, &args);
    // 切到下一个 task 执行
    TaskGroup::sched(&g);
    // 切回来的时候说明条件满足了,检查一下

    // erase_from_butex_and_wakeup(called by TimerThread) is possibly still
    // running and using bbw. The chance is small, just spin until it's done.
    BT_LOOP_WHEN(unsleep_if_necessary(&bbw, get_global_timer_thread()) < 0, 30/*nops before sched_yield*/);

    // If current_waiter is NULL, TaskGroup::interrupt() is running and using bbw.
    // Spin until current_waiter != NULL.
    BT_LOOP_WHEN(bbw.task_meta->current_waiter.exchange(
                     NULL, butil::memory_order_acquire) == NULL,
                 30/*nops before sched_yield*/);

    bool is_interrupted = false;
    if (bbw.task_meta->interrupted) {
        // Race with set and may consume multiple interruptions, which are OK.
        bbw.task_meta->interrupted = false;
        is_interrupted = true;
    }
    // If timed out as well as value unmatched, return ETIMEDOUT.
    if (WAITER_STATE_TIMEDOUT == bbw.waiter_state) {
        errno = ETIMEDOUT;
        return -1;
    } else if (WAITER_STATE_UNMATCHEDVALUE == bbw.waiter_state) {
        errno = EWOULDBLOCK;
        return -1;
    } else if (is_interrupted) {
        errno = EINTR;
        return -1;
    }
    return 0;
}

void wait_for_butex(void* arg) {
    auto args = static_cast<WaitForButexArgs*>(arg);
    ButexBthreadWaiter* const bw = args->bw;
    Butex* const b = bw->initial_butex;
    // 1: waiter with timeout should have waiter_state == WAITER_STATE_READY
    //    before they're queued, otherwise the waiter is already timedout
    //    and removed by TimerThread, in which case we should stop queueing.
    //
    // Visibility of waiter_state:
    // [bthread]                    [TimerThread]
    //  waiter_state = TIMED
    //  tt_lock{ add task}
    // tt_lock{ get task}
    // waiter_lock{ waiter_state=TIMEDOUT}
    //  waiter_lock{ use waiter_state}
    // tt_lock represents TimerThread::_mutex. Visibility of waiter_state is
    // sequenced by two locks, both threads are guaranteed to see the correct
    // value.
    {
        BAIDU_SCOPED_LOCK(b->waiter_lock);
        if (b->value.load(butil::memory_order_relaxed) != bw->expected_value) {
            bw->waiter_state = WAITER_STATE_UNMATCHEDVALUE;
        } else if (bw->waiter_state == WAITER_STATE_READY /*1*/ &&
                   !bw->task_meta->interrupted) {
            if (args->prepend) {
                b->waiters.Prepend(bw);
            } else {
                b->waiters.Append(bw);
            }
            bw->container.store(b, butil::memory_order_relaxed);
            if (bw->abstime != NULL) {
                bw->sleep_id = get_global_timer_thread()->schedule(
                    erase_from_butex_and_wakeup, bw, *bw->abstime);
                if (!bw->sleep_id) { // TimerThread stopped.
                    errno = ESTOP;
                    erase_from_butex_and_wakeup(bw);
                }
            }
            return;
        }
    }
    // 看不懂,但 wait 结束了,可以恢复当前的 current_task 了
    // b->container is NULL which makes erase_from_butex_and_wakeup() and
    // TaskGroup::interrupt() no-op, there's no race between following code and
    // the two functions. The on-stack ButexBthreadWaiter is safe to use and
    // bw->waiter_state will not change again.
    // unsleep_if_necessary(bw, get_global_timer_thread());
    tls_task_group->ready_to_run(bw->task_meta);
    // FIXME: jump back to original thread is buggy.
    //// Value unmatched or waiter is already woken up by TimerThread, jump
    //// back to original bthread.
    // TaskGroup* g = tls_task_group;
    // ReadyToRunArgs args = {g->current_tid(), false};
    // g->set_remained(TaskGroup::ready_to_run_in_worker, &args);
    //// 2: Don't run remained because we're already in a remained function
    ////    otherwise stack may overflow.
    // TaskGroup::sched_to(&g, bw->tid, false/*2*/);
}

上面我们埋下了一个伏笔:Butex 对象也有个对象池负责缓冲分配和回收,这个对象池可以用于解决 butex_wake()butex_destroy() 之间的竞态条件,考虑下面的情况:

class Event {
public:
    void wait() {
        _mutex.lock();
        if (!_done) {
            _cond.wait(&_mutex);
        }
        _mutex.unlock();
    }
    void signal() {
        _mutex.lock();
        if (!_done) {
            _done = true;
            _cond.signal();
        }
        _mutex.unlock(); /*1*/
    }
private:
    bool _done = false;
    Mutex _mutex;
    Condition _cond;
};

// Thread 1:
void foo() {
    Event event;
    pass_to_thread2(&event); // 将 event 传递给 Thread2
    event.wait();
} // 函数返回,event 被销毁

// Thread 2:
event.signal();

Thread1 将一个带有状态的条件变量(Event)传递给 Thread2,并等待其被触发,这通常意味着关联任务已完成,Thread1 可安全释放相关资源(包括互斥锁和条件变量)。该逻辑在语义上是正确的。

但存在一个微妙的竞态条件: 在 signal() 中的 /*1*/ 处解锁操作,其底层实现通常包含两个步骤:

locked->store(0); // 原子地释放锁
butex_wake(locked); // 唤醒等待者

store(0) 执行后,锁已被释放,此时 wait() 线程可能立即获取锁、完成剩余逻辑、退出 foo() 函数,并销毁 Event 对象(包括其内部的 butex)。 随后执行的 butex_wake(locked) 就会访问已释放的内存,导致程序崩溃。

有两种方法可以解决:

  1. 引用计数法:在 store(0) 前增加引用计数,在 butex_wake() 后减少引用。 缺点:几乎所有使用 butex_wake() 的地方都需插入引用管理逻辑,极易出错。
  2. 永不释放 butex(采用 ObjectPool):内存不归还给系统,而是保留在池中复用。 副作用:butex_wake() 可能唤醒一个无关的、复用了相同内存地址的新 butex,导致虚假唤醒(spurious wakeup)。 但根据观察,上述竞态本身极为罕见,因此额外的虚假唤醒在实践中是可接受的。

Butex 和 Futex 一样,是底层的 API,应用程序通常会使用这个 API 的各种包装,比如 mutex, condvar。

bthread_mutex_t 提供和 pthread_mutex_t 对等的 API,基于 futex 的 API 实现一个 naive mutex 不难,参考下面的算法即可:

此外 bthread 提供了一个 contented profiler,可以分析锁竞争,bthread_mutex 对 butex 的 val 是按如下结构解析的:

struct MutexInternal {
    butil::static_atomic<unsigned char> locked;   // locked->exchange(1)==0 try_lock 上锁成功
    butil::static_atomic<unsigned char> contended; // contended==1 表示有竞争
    unsigned short padding;
};
const MutexInternal MUTEX_CONTENDED_RAW = {{1}, {1}, 0};
const MutexInternal MUTEX_LOCKED_RAW = {{1}, {0}, 0};
// “常量不能放入只读段”这一理由在现代 C++ 中基本不成立。可能是出于历史原因或对某些嵌入式/旧编译器的顾虑。
// 在 C++ 中,不同编译单元(translation units)之间的全局/静态变量的初始化顺序是未定义的。
// 如果一个全局 const 变量依赖于另一个编译单元中的全局变量,就可能在对方尚未初始化时被使用,导致未定义行为。
#define BTHREAD_MUTEX_CONTENDED (*(const unsigned*)&bthread::MUTEX_CONTENDED_RAW)
#define BTHREAD_MUTEX_LOCKED (*(const unsigned*)&bthread::MUTEX_LOCKED_RAW)

struct bthread_mutex_t {
    uint32_t* butex; // uint32_t represented by MutexInternal
    bthread_contention_site_t csite; // int64_t duration_ns; + size_t sampling_range; 用于锁竞争分析
    bool enable_csite;
    mutex_owner_t owner; // debug 使用,可用于检测 double lock
};

mutex_lock_contended_impl 的实现如下:

inline int mutex_lock_contended_impl(bthread_mutex_t* __restrict m,
                                     const struct timespec* __restrict abstime) {
    BTHREAD_MUTEX_CHECK_OWNER;
    TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
    if (BAIDU_UNLIKELY(NULL == g || g->rq_size() == 0)) {
        // g == NULL:如果不自旋,只能阻塞整个 pthread,代价高,说不定锁马上就能释放,转一会
        // g != NULL && g->rq_size() == 0:既然没别的事可做,不如自旋一下,可能立刻拿到锁继续执行,延迟更低。
        for (int i = 0; i < MAX_SPIN_ITER; ++i) {
            cpu_relax();
        }
    }
    bool queue_lifo = false;
    bool first_wait = true;
    auto whole = (butil::atomic<unsigned>*)m->butex;
    while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
        if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, abstime, queue_lifo) < 0 &&
            errno != EWOULDBLOCK && errno != EINTR /*note*/) {
            // EWOULDBLOCK 和 EINTR 通常会被忽略,不会返回给用户
            return errno;
        }
        if (first_wait && 0 == errno) {
            // 返回 EWOULDBLOCK 和 EINTR 说明根本没有 wait
            first_wait = false;
        }
        if (!first_wait) {
            // 通常情况下,bthread 会以 FIFO(先进先出)顺序排队。但在竞争互斥锁所有权时,
            // 一个被唤醒的 bthread 很可能会输给新到达的 bthread。这是因为新到达的 bthread
            // 已经在 CPU 上运行,而且数量可能很多。在这种情况下,为了保证公平性、避免饥饿,
            // 被唤醒的 bthread 会被插入到等待队列的头部。
            queue_lifo = true;
        }
    }
    BTHREAD_MUTEX_SET_OWNER;
    return 0;
}

Bthread 中的 Condvar 实现使用到了 butex_requeue 这个 API。

struct CondInternal {
    butil::atomic<bthread_mutex_t*> m; // cv 关联的那把锁,初始化时是 NULL
    butil::atomic<int>* seq;           // notify 时会往上递增,进行唤醒
};

bthread_cond_wait 的实现如下,十分简单:

int bthread_cond_wait(bthread_cond_t* __restrict c,
                      bthread_mutex_t* __restrict m) {
    bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c);
    // 一定要在上锁的时候读取 expected_seq,否则其他 bthread 修改条件+notify 时
    // 可能会丢失唤醒,释放锁后读取到的 seq 可能是 notify 写入的新值
    const int expected_seq = ic->seq->load(butil::memory_order_relaxed);
    if (ic->m.load(butil::memory_order_relaxed) != m) {
        // bind m to c
        bthread_mutex_t* expected_m = NULL;
        if (!ic->m.compare_exchange_strong(expected_m, m, butil::memory_order_relaxed)) {
            return EINVAL;
        }
    }
    // 这里释放锁
    bthread_mutex_unlock(m);
    int rc1 = 0;
    if (bthread::butex_wait(ic->seq, expected_seq, NULL) < 0 &&
        errno != EWOULDBLOCK && errno != EINTR /*note*/) {
        rc1 = errno;
    }
    const int rc2 = bthread_mutex_lock_contended(m);
    return (rc2 ? rc2 : rc1);
}

bthread_cond_broadcast 利用了 butex_requeue API,唤醒一个 bthread 拿到锁,剩余的 bthread 移动到 mutex 上的 butex,当这个 bthread 处理完/陷入等待释放锁的时候,会唤醒剩余所有的绑定在这个 butex 上的 bthread,可以缓解惊群的问题,至少让一个 bthread 无阻碍的完成工作。

int bthread_cond_broadcast(bthread_cond_t* c) {
    bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c);
    bthread_mutex_t* m = ic->m.load(butil::memory_order_relaxed);
    butil::atomic<int>* const saved_seq = ic->seq;
    if (!m) {
        return 0;
    }
    void* const saved_butex = m->butex;
    // Wakeup one thread and requeue the rest on the mutex.
    ic->seq->fetch_add(1, butil::memory_order_release);
    bthread::butex_requeue(saved_seq, saved_butex);
    return 0;
}

此外,bthread 还提供了信号量和读写锁的实现,不过使用的频率要比上面这两个原语要低,读写锁在一定程度上使用 DoublyBufferedData 更为高效,就不展开讨论了。

BThread-Local Storage

bthread_local 提供的一种 bthread 局部存储机制,类似 pthread_key_t 的 API,如下所示:

extern int bthread_key_create(bthread_key_t* key, void (*destructor)(void* data));
extern int bthread_key_delete(bthread_key_t key);
// 注意:如果当前线程不是由 brpc 服务器创建的,并且生命周期非常短(仅执行少量任务后立即退出),
// 应避免使用 bthread-local 存储。原因是在首次调用 bthread_setspecific 时,
// bthread-local 总会分配一个 keytable;该开销在长生命周期线程中可忽略不计,
// 但在短生命周期线程中则较为明显。brpc 服务器中的线程是特例,
// 因为它们会从服务器内的 bthread_keytable_pool_t 中复用 keytable。
extern int bthread_setspecific(bthread_key_t key, void* data);
extern void* bthread_getspecific(bthread_key_t key);

其核心是一个叫 KeyTable 结构,两层 Array 表。

这种设计避免了为每个 key 分配独立内存的开销,同时在 key 稀疏使用时节省空间(只按需分配 SubKeyTable)。

sched_to 中,切换 bthread 之前会切换 tls_bls,这个在上文已经介绍,在 task_runner 中,用户 bthread 结束时会清理 tls_bls

void TaskGroup::task_runner(intptr_t skip_remained) {
    ...
    do {
        ..
        void* thread_return;
        try {
            thread_return = m->fn(m->arg);
        } catch (ExitException& e) {
            thread_return = e.value();
        }
        // 用户程序已经结束

        // 清理 TLS(线程局部存储)变量,必须在修改 version_butex 之前完成,
        // 否则,另一个刚刚 join 该 bthread 的 bthread 可能无法观察到 TLS 变量析构所产生的副作用。
        LocalStorage* tls_bls_ptr = BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(tls_bls);
        KeyTable* kt = tls_bls_ptr->keytable;
        if (kt != NULL) {
            return_keytable(m->attr.keytable_pool, kt);
            // After deletion: tls may be set during deletion.
            tls_bls_ptr = BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(tls_bls);
            tls_bls_ptr->keytable = NULL;
            m->local_storage.keytable = NULL; // optional
        }
        ...
        butex_wake_except(m->version_butex, 0);
    } while (g->_cur_meta->tid != g->_main_tid);
    ...
}

后记

bRPC 是一个通用的 C++ RPC 框架,它的目标并非将某个 ad hoc 的 RPC 需求优化到极致,如果你愿意,完全可以用 pthread 手动管理 epoll 的 fd,对每个请求的执行路径做到极致掌控。brpc 真正解决的是在复杂、多变、高负载的实际生产场景中,如何让 RPC 通信整体表现稳定、高效且可预测,从而让 RPC 层本身不太会成为系统性能的瓶颈。

从今天回看,brpc 已是一个近十年历史的项目,但其技术理念依然先进,尤其是 bthread 相关的代码,经过长期打磨已臻成熟。它称得上是真正意义上的“工业级” RPC 框架,不仅稳定高效,还切实支撑起大量实际业务,甚至可以说养活了不少人。

近年来,work-stealing 模型与 run-to-completion 模型常被拿来比较。这两种调度模型并无优劣之分,关键在于是否契合具体的系统负载特征。在未明确负载模式、请求粒度、并发规模等上下文之前,泛泛而谈哪种模型更优,往往缺乏实际意义。bRPC 使用 work-stealing 模型,并在较新的版本支持 tag group 功能,这使得 work-stealing 边界可控,足够生产使用。

类似地,polling 与 event-driven / completion-driven 模型之间的对比也应如此看待。polling 在高吞吐、低延迟场景中更高效,它并非总是有效,而且它更费电,一点也不环保,部署方案也变得复杂