捕获续体 (Continuation)
setjmp/longjmp
这两个函数来自于 C 标准库 <setjmp.h>,用于实现非本地跳转(non-local goto)的函数,可用于跨越函数调用栈直接跳转,常用于错误恢复、状态机等场景。类似函数之间 goto,调试难度可想而知,工程项目中一般不使用。
基本用法如下:
// jmp.c
#include <setjmp.h>
#include <stdio.h>
jmp_buf env;
void foo(void) {
printf("Inside foo, before longjmp\n");
longjmp(env, 1); // 跳回 setjmp 调用处,返回值为 1
// 注意:longjmp 之后的代码不会执行!
printf("This line is NOT printed.\n");
}
int main(void) {
int r = setjmp(env); // 第一次调用:返回 0
if (r == 0) {
printf("First call to setjmp, r=%d\n", r);
foo();
} else {
printf("Returned via longjmp, r=%d\n", r); // 此处 r==1
}
return 0;
}
输出:
First call to setjmp, r=0
Inside foo, before longjmp
Returned via longjmp, r=1
实现方法
setjmp / longjmp 的实现本质是手动保存和恢复 CPU 的执行上下文(主要是寄存器状态和栈指针),从而实现“跳过正常函数返回路径”的控制流转移。它不依赖操作系统或语言运行时,而是利用硬件和 ABI 细节,由编译器和 C 库协同完成。
以典型 x86-64 / System V ABI 为例:
jmp_buf env 是一个结构体(或数组),用于保存跳转所需的关键寄存器值,典型内容包括:
typedef struct {
unsigned long rbx; // callee-saved reg
unsigned long rbp; // callee-saved reg
unsigned long r12; // callee-saved reg
unsigned long r13; // callee-saved reg
unsigned long r14; // callee-saved reg
unsigned long r15; // callee-saved reg
unsigned long rsp; // 当前栈顶指针, callee-saved reg
unsigned long rip; // PC,setjmp 返回后下一条指令的地址,用于 longjmp 跳转回来
} jmp_buf[1];
setjmp(env)用内联汇编(或编译器内置函数__builtin_setjmp)将上述寄存器值写入env。记录rip为setjmp返回后下一条指令的地址(即if(r==0)处)。首次返回0(通过rax)。
伪代码
setjmp:
mov [env+0], rbx ; 保存 callee-saved 寄存器
mov [env+8], rbp
mov [env+16], r12
...
mov [env+48], rsp ; 保存栈指针
lea rax, [rip+?] ; 取“setjmp 返回后”的指令地址
mov [env+56], rax ; 存为 rip
xor rax, rax ; 返回 0 (xor rax, rax 比 mov rax, 0 更快)
ret
longjmp(env, val)从env中恢复所有保存的寄存器(包括rsp,rbp,rip)。将val(若为0则替换为1)放入rax作为“返回值”。跳转到env.rip处执行 → 即当初setjmp返回后的那条指令。
伪代码
longjmp:
mov rbx, [env+0]
mov rbp, [env+8]
mov r12, [env+16]
...
mov rsp, [env+48] ; 恢复栈指针!栈回退到 setjmp 时的状态
mov rax, rsi ; rsi = val
test rax, rax
jnz 1f ; val != 0,跳转到 1:
inc rax ; val == 0 → 改为 1
1:
mov rdx, [env+56] ; 取保存的 rip
jmp rdx ; 直接跳过去!
使用
setjmp(env)捕获的续体通常是一次性的,使用这个原语实现对称协程比较繁琐复杂。 What is the difference between asymmetric and symmetric coroutines?
协程类型
-
非对称协程(Asymmetric / Semi-coroutines)
- 提供两个控制转移操作: i. 调用(invoke/resume):由调用者启动或恢复协程; ii. 挂起(suspend/yield):由协程内部主动挂起,并返回控制权给其直接调用者(caller)。
- 具有栈式层级关系:协程从属于其调用者,类似函数调用 —— 调用者 → 协程 → 返回调用者。
- 典型用途:生成器(generators)、迭代器(iterators)等“值序列生产者”。
-
对称协程(Symmetric coroutines)
- 仅提供一个控制转移原语:直接跳转(transfer)到任意指定的其他协程。
- 所有协程地位平等,无固定调用层级;控制流可在任意协程间自由切换。
- 典型用途:用户态线程、协作式多任务调度,其中调度器直接在多个协程间切换。
call/cc
搞点函数式编程:First-Class Continuations / Continuation-Passing Style
call/cc 是 call-with-current-continuation 的缩写,是 Scheme 等支持一级续体(first-class continuations)的语言中一个强大而独特的控制流原语。它允许你捕获“当前的计算上下文”(即“续体” continuation),并将其当作一个普通的一级值(函数)进行传递和调用——从而实现非局部跳转、异常处理、生成器、协程等高级控制结构。
(call/cc proc)
-
continuation = “程序接下来要做的事”
- 例如:
(display (+ 1 2))中,(+ 1 2)的续体是(lambda (v) (display v)) - 即:计算完
(+ 1 2)后,把结果v传给display
- 例如:
-
call/cc会捕获当前 continuation(记作k),并将k作为参数传给proc,立马执行(proc k)。 -
k本身是一个函数:调用(k val)会立即跳出当前上下文,返回val到call/cc被调用的位置,并放弃后续计算。 -
若
proc正常返回(未调用k),则call/cc返回proc的返回值。
1. 查找 list 中符合 (pred x) 的元素 x,找到立即返回:
(define (find-first pred lst)
(call/cc
(lambda (return)
(for-each (lambda (x)
(when (pred x)
(return x))) ; 找到即返回,退出循环
lst)
#f))) ; 未找到
2. 双协程交换打印:
(define one
(lambda (go)
(display 1)
(set! go (call/cc go))
(display 2)
(set! go (call/cc go))
(display 3)
(set! go (call/cc go))))
(define two
(lambda (go)
(display #\a)
(set! go (call/cc go))
(display #\b)
(set! go (call/cc go))
(display #\c)
(set! go (call/cc go))))
; (one two) -> 1a2b3c
; (two one) -> a1b2c3
到此为止,读者的工作不是写括号语言,再学下去脑细胞不够用了。
bthread_make_fcontext / bthread_jump_fcontext
这是用于 bthread 的跳转的两个关键原语。
typedef void* bthread_fcontext_t;
// 从当前上下文跳转(jump)到目标上下文 nfc,同时可选地保存当前上下文到 *ofc。
// 这是无返回的跳转(jump),因上下文保存,后续可通过另一次 jump 返回
// Args:
// - ofc: 输出参数,是个指向 bthread 栈指针的指针,当前上下文(%rsp)将被保存到 *ofc。
// 后续可通过 bthread_jump_fcontext 跳回此处。
// - nfc: 目标上下文,是指向 bthread 栈的指针,将要跳转到的执行点(即目标协程的寄存器状
// 态+栈指针等)。通常由 bthread_make_fcontext 创建(首次进入)或前一次 jump 保
// 存而来(恢复)。
// - vp: 传递给目标上下文的“参数”。当目标上下文是首次通过 make_fcontext 创建的函数时,
// vp 会作为其唯一参数传入;若目标是已挂起的上下文,则 vp 会作为 jump_fcontext 的
// 返回值出现在目标上下文的视角中。
// - preserve_fpu: 是否保存/恢复浮点寄存器(FPU/SSE/AVX 状态)。默认 false(出于性能
// 考虑,多数协程不涉及浮点密集计算)。设为 true 可保证浮点状态精确恢
// 复,但开销更大。
// Return:
// - 当该上下文被“跳回”时(即别人 jump 到它),本函数“返回”,返回值即为对方调用
// jump_fcontext 时传入的 vp 参数。
intptr_t bthread_jump_fcontext(bthread_fcontext_t* ofc,
bthread_fcontext_t nfc,
intptr_t vp,
bool preserve_fpu = false);
// 在指定栈空间上“构造”一个可跳转的上下文,使其下次被 jump_fcontext 激活时,从函数 fn
// 开始执行。
// 这是协程的“入口点构建器”,仅用于首次创建一个可运行的上下文。
// Args:
// - sp: 栈内存的高地址(top)。注意:x86/x86_64 栈向下增长,所以 sp 应指向分配的栈空间
// 的最高地址(即 stack_base + stack_size)。
// - size: 栈空间的字节大小。用于内部栈数据对齐(如 16-byte 对齐),但不负责分配内存
// —— 用户需自行 malloc/mmap 一块足够大的内存,并保证 sp 正确指向栈顶。
// - fn: 协程入口函数。当该上下文首次被 jump_fcontext 激活时,将从此函数开始执行,且 fn
// 的唯一参数即为 jump_fcontext 中传入的 vp。函数不应返回(返回会导致未定义行为),
// 通常以 jump 到其他上下文结束。
bthread_fcontext_t bthread_make_fcontext(void* sp, size_t size, void (*fn)(intptr_t));
使用方式如下:
// 协程 A 中:
intptr_t r = bthread_jump_fcontext(&ctx_a, ctx_b, 123);
// 后续某刻被 ctx_b 跳回时,r = 456(见下面 B 的调用)
// 协程 B 中:
bthread_jump_fcontext(&ctx_b, ctx_a, 456); // 跳回 A,传 456
// fn 不能是普通 C 函数直接返回 —— 因为返回后无合法返回地址(上下文是伪造的)。通常做法是:
void coroutine_main(intptr_t arg) {
// do work...
// 最后 jump 到调度器
bthread_jump_fcontext(¤t_ctx, scheduler_ctx, 0);
// 若无处可跳,可 exit() 或 abort()
}
典型 x86-64 / System V ABI 实现如下(AT&T):
context.cpp
#if defined(BTHREAD_CONTEXT_PLATFORM_linux_x86_64) && defined(BTHREAD_CONTEXT_COMPILER_gcc)
__asm(
".text\n"
".globl bthread_jump_fcontext\n"
".type bthread_jump_fcontext,@function\n"
".align 16\n"
"bthread_jump_fcontext:\n"
" pushq %rbp \n" // 按照 System V ABI,保存 callee-saved 寄存器 (%rbp, %rbx, %r12–%r15)
" pushq %rbx \n"
" pushq %r15 \n"
" pushq %r14 \n"
" pushq %r13 \n"
" pushq %r12 \n"
" leaq -0x8(%rsp),%rsp\n" // 为后续 stmxcsr(4 字节 MXCSR)+ fnstcw(2 字节 x87 控制字)预留空间(按 8 字节对齐)
" cmp $0,%rcx\n" // %rcx 为是否保存 FPU 状态的标志,来自 preserve_fpu
" je 1f\n" // 不保存,跳转到 1:
" stmxcsr (%rsp)\n" // 保存 SSE 浮点控制状态
" fnstcw 0x4(%rsp)\n" // 保存 x87 浮点控制字
"1:\n"
" movq %rsp,(%rdi)\n" // 保存当前栈指针到 ofc,ofc 是一个 void** 双重指针
" movq %rsi,%rsp\n" // 切换到目标上下文栈 nfc, 从此刻起,栈切换完成,后续所有 pop 都从目标栈中取值
" cmp $0,%rcx\n" // 是否恢复 FPU 状态的标志,来自 preserve_fpu
" je 2f\n" // 不恢复,跳转到 2:
" ldmxcsr (%rsp)\n" // 恢复 SSE 浮点控制状态
" fldcw 0x4(%rsp)\n" // 恢复 x87 浮点控制字
"2:\n"
" leaq 0x8(%rsp),%rsp\n" // 跳过 FPU 空间(8 字节),指向上次调用 jump 时 push 的 r12
" popq %r12 \n" // 恢复 6 个 callee-saved 寄存器
" popq %r13 \n"
" popq %r14 \n"
" popq %r15 \n"
" popq %rbx \n"
" popq %rbp \n"
" popq %r8\n" // 弹出返回地址到 r8, 此时 %rsp 指向目标栈上保存的“返回地址”(即该协程上次调用 jump 时 push 的 %rip,但此处用 pop %r8 而非 ret!)
" movq %rdx,%rax\n" // vp → %rax(作为返回值)
" movq %rdx,%rdi\n" // 若目标地址是一段 C 函数入口(如首次进入协程),传给目标函数的第 1 个参数(按 ABI)
" jmp *%r8\n" // 跳转到此前保存的 %rip(即目标协程挂起的位置)
".size bthread_jump_fcontext,.-bthread_jump_fcontext\n" // 记录函数大小,供调试/链接使用
".section .note.GNU-stack,\"\",%progbits\n" // 标记该目标文件不需要可执行栈
".previous\n"
);
#endif
#if defined(BTHREAD_CONTEXT_PLATFORM_linux_x86_64) && defined(BTHREAD_CONTEXT_COMPILER_gcc)
__asm(
".text\n"
".globl bthread_make_fcontext\n"
".type bthread_make_fcontext,@function\n"
".align 16\n"
"bthread_make_fcontext:\n"
" movq %rdi,%rax\n" // %rdi = sp,存入 %rax(栈底高地址,后续返回的 %rax 表示 bthread_fcontext_t)
" andq $-16,%rax\n" // 向下对齐到 16 字节, ABI 要求函数调用前 %rsp 是 16 字节对齐。为保证首次进入 fn 时栈对齐合法,初始 %rsp 必须是 16 字节对齐。
" leaq -0x48(%rax),%rax\n" // 预留上下文保存空间 72 字节
" movq %rdx, 0x38(%rax)\n" // %rdx = fn, 写入 %rax + 56
" stmxcsr (%rax)\n" // MXCSR → %rax + 0
" fnstcw 0x4(%rax)\n" // FPU CW → %rax + 4
" leaq finish(%rip),%rcx\n" // x86-64 位置无关代码(PIC)标准写法,计算 finish 相对于当前 %rip 的偏移,得到绝对地址,写入 %rcx
" movq %rcx, 0x40(%rax)\n" // %rcx 写入 %rax + 64
" ret\n" // 返回 %rax,即 bthread_fcontext_t
"finish:\n" // finsish 地址放到栈顶,保护用的,PC 跳到这个地方就会停止进程
" xorq %rdi,%rdi\n" // 清零 %rdi -> _exit(0)
" call _exit@PLT\n"
" hlt\n" // 如果 _exit 出问题,没退出,陷入停机
".size bthread_make_fcontext,.-bthread_make_fcontext\n"
".section .note.GNU-stack,\"\",%progbits\n"
".previous\n"
);
#endif
Bthread 栈结构
分析上面的汇编,得知 bthread 在被挂起时会将所有 callee-saved reg 放到栈上
由此可得,初始化时一个 bthread 栈结构如下:
High Address
┌──────────────────────────────────┐ ← void* sp(分配 Stack 内存的最⾼地址)
│ ... (unused stack) │
├──────────────────────────────────┤ ← sp_top (对⻬ 16 字节)
│ finish: │ ← finish: 绝对地址
│ void (*fn)(intptr_t) (8B) │ ← coroutine_main ⼊⼝函数指针
│ %rip (8B) │ ← callee-saved reg
│ %rbp (8B) │
│ %rbx (8B) │
│ %r15 (8B) │
│ %r14 (8B) │
│ %r13 (8B) │
│ %r12 (8B) │
├──────────────────────────────────┤
│ PAD (2B)FPU_CW (2B) │ MXCSR (4B) │ ← FPU 状态字
├──────────────────────────────────┤ ← bthread_fcontext_t* ctx
│ ... (unused stack) │
│ ... (unused stack) │
...
Low Address
一个嵌套很多层函数的 bthread 被挂起时栈的结构如下:
High Address
┌──────────────────────────────────┐ ← void* sp(分配 Stack 内存的最⾼地址)
│ ... (unused stack) │
├──────────────────────────────────┤ ← sp_top (对⻬ 16 字节)
│ finish: │ ← finish: 绝对地址
...
│ fn stacks ... │
│ fn stacks ... │
...
│ %rip (8B) │ ← call bthread_jump_fcontext ⾃动时 push 的 %rip,指向当前 bthread 的续体(挂起时下⼀条指令)
│ %rbp (8B) │
│ %rbx (8B) │
│ %r15 (8B) │
│ %r14 (8B) │
│ %r13 (8B) │
│ %r12 (8B) │
├──────────────────────────────────┤
│ PAD (2B)FPU_CW (2B) │ MXCSR (4B) │ ← FPU 状态字
├──────────────────────────────────┤ ← bthread_fcontext_t* ctx
│ ... (unused stack) │
│ ... (unused stack) │
...
Low Address
恢复 bthread(或者初始化运行),从 bthread_fcontext_t* ctx 指向的位置开始,向上恢复寄存器现场,然后 jmp 到之前保存的 %rip(或主函数 fn 上),顺着之前的栈继续执行。
组件结构
Stack 组件
StackStorage 存储 Stack 内存的结构,定义如下:
struct StackStorage {
unsigned stacksize;
unsigned guardsize;
void* bottom;
unsigned valgrind_stack_id;
// Clears all members.
void zeroize() {
stacksize = 0;
guardsize = 0;
bottom = NULL;
valgrind_stack_id = 0;
}
};
// Allocate a piece of stack.
int allocate_stack_storage(StackStorage* s, int stacksize, int guardsize);
// Deallocate a piece of stack. Parameters MUST be returned or set by the
// corresponding allocate_stack_storage() otherwise behavior is undefined.
void deallocate_stack_storage(StackStorage* s);
分配方式如下:
- 对齐:
stacksize按页(通常是 4KB)向上对齐,最小为2 * PAGESIZE。 guardsize_in <= 0,即无 guard page- 分配方式:
malloc(stacksize) - 设置
bottom:s->bottom = (char*)mem + stacksize;
- 分配方式:
guardsize_in > 0,有 guard page,默认guard_page_size = 4096- 对齐
guardsize:按页向上对齐,最小为PAGESIZE。 - 分配方式:
mmap一大块内存:memsize = stacksize + guardsize。 - 将内存低地址部分(大小为
guardsize)设为PROT_NONE(读写执行均禁),便于排查 stack overflow。
- 对齐
ContextualStack 将几个相关的组件包装起来,表示一个可上下文切换的栈对象,拥有栈空间所有权。
enum StackType {
STACK_TYPE_MAIN = 0,
STACK_TYPE_PTHREAD = BTHREAD_STACKTYPE_PTHREAD, // 特殊 stack,bthread stack 分配失败了,task 直接在 pthread 里运行
STACK_TYPE_SMALL = BTHREAD_STACKTYPE_SMALL,
STACK_TYPE_NORMAL = BTHREAD_STACKTYPE_NORMAL,
STACK_TYPE_LARGE = BTHREAD_STACKTYPE_LARGE
};
struct ContextualStack {
virtual ~ContextualStack() = default;
bthread_fcontext_t context; // 指向栈顶的指针,用于上下文切换
StackType stacktype;
StackStorage storage;
};
// Get a stack in the `type' and run `entry' at the first time that the
// stack is jumped.
ContextualStack* get_stack(StackType type, void (*entry)(intptr_t));
// Recycle a stack. NULL does nothing.
void return_stack(ContextualStack*);
// Jump from stack `from' to stack `to'. `from' must be the stack of callsite
// (to save contexts before jumping)
inline void jump_stack(ContextualStack* from, ContextualStack* to) {
bthread_jump_fcontext(&from->context, to->context, 0/*not skip remained*/);
}
get_stack 根据不同的 StackType 可以构建不同大小的 Stack,每个 StackType 都有一个对象池用于减少频繁创建销毁的开销(除了 STACK_TYPE_MAIN 和 STACK_TYPE_PTHREAD),默认情况下栈大小:
FLAGS_stack_size_small:32KBFLAGS_stack_size_normal:1MBFLAGS_stack_size_large:8MB
STACK_TYPE_MAIN 分配规则不太一样:仅分配一个 ContextualStack 对象,不会分配栈空间,作为标记使用。
STACK_TYPE_PTHREAD 则不会分配对象,返回 NULL 指针,也可作为标记使用。
TaskGroup 组件
调度操作的核心组件,每个 TaskGroup 绑定一个 pthread,它是 thread local 的,全局有个 TaskControl 结构管理所有的 TaskGroup。
每个 TaskGroup 有两个队列:
| 队列 | 作用 | 访问方式 | 特点 |
|---|---|---|---|
rq_ (run queue) | 本地待运行 bthread | 无锁(仅本线程访问) | WorkStealingQueue,FIFO/LIFO(取决于 BTHREAD_FAIR_WSQ) |
remote_rq_ | 其他线程提交的任务 | 带锁(bthread::Mutex) | BoundedQueue,FIFO |
每个 TaskGroup 有一个 TaskMeta 指针 _cur_meta,指向了当前运行的 Task,它包含上文中提到的 ContextualStack,用户的函数(若有),以及一些控制字段。TaskMeta 对象由一个资源池管理,可用 bthread_id 的后 32 位进行索引。
TaskGroup 由 TaskControl::create_group(bthread_tag_t tag) 创建,构造过程如下:
TaskGroup::rq_初始化为大小runqueue_capacity(默认为FLAGS_task_group_runqueue_capacity=4096)。TaskGroup::remote_rq_初始化为大小runqueue_capacity / 2。- 初始化一个 MainStack(不分配实际栈空间,作为特殊的 idle task),并将
TaskGroup::_cur_meta指向该任务。 - 根据
tag分组选择一个ParkingLot并关联至该TaskGroup:- 选取方式为
_pl[tag][butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM]。 - 其中
PARKING_LOT_NUM=4,FLAGS_task_group_ntags默认为 1。 ParkingLot由futex和state构成,用于等待指定state满足。
- 选取方式为
- 将该
TaskGroup按tag分组存入TaskControl::_tagged_ngroup。
TaskGroup 销毁时从 TaskControl::_tagged_ngroup 摘除,不过默认 FLAGS_task_group_delete_delay=1s 后才析构 TaskGroup 对象,因为 steal_task 并不持有保护 _tagged_ngroup 的锁,避免 steal_task 和 TaskGroup 销毁 race。
TaskControl 全局唯一,内部有一个按 tag 分片的 WorkStealingQueue,名为 _priority_queues,但事实上是个 FIFO,跨 TaskGroup 偷取时会尝试先从这个 queue 中偷取,比从其他 TaskGroup 的 rq_ 和 remote_rq_ 偷取有更高的优先级。
它通过 get_or_new_task_control() 获取,它的构造过程如下:
- 初始化
concurrency = FLAGS_bthread_min_concurrency(> 0)或者FLAGS_bthread_concurrency。 - 初始化每个
tag分片的WorkStealingQueue。 - 初始化 Global TimerThread,是个
pthread,用于sleep等 timer 信号唤醒。 - 创建
concurrency个pthreadworker,每个worker都会创建一个本地TaskGroup。 a. 按照concurrencyround-robin 给每个tag分配worker。 - 等待一个
worker启动,防止choose_one_group()返回NULL。
TaskControl::worker_thread 入口:
- 运行
g_worker_startfn()和g_tagged_worker_startfn(tag)用户注册的回调函数。 a. 这是 Unstable API。 - 创建 Worker 的
TaskGroup(见上文TaskGroup创建过程)。 - 执行
TaskGroup::run_main_task,run_main_task退出时worker终止,销毁worker。 a. 循环逻辑为:
bthread_t tid;
while (wait_task(&tid)) {
sched_to(this, tid); // 跳转到 tid 执行
DCHECK_EQ(_cur_meta->stack, _main_stack); // 当前不运行任何 bthread,应当是 _main_stack
if (_cur_meta->tid != _main_tid) {
// 但是 tid 不是 _main_tid,说明 sched_to 修改了,
// 但却没有跳转成功,直接在 pthread 中执行 bthread 主函数
// 并且不用处理 remained 回调(见后续内容)
task_runner(1/*skip remained*/);
}
}
TaskGroup::wait_task 实现:
bool TaskGroup::wait_task(bthread_t* tid) {
do {
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
// 保存了 parking state,读取上一次的 pl_state 即可
if (_last_pl_state.stopped()) {
return false;
}
// 等待 state 发生变化,变化了说明有 task 准备好了,可以偷了
_pl->wait(_last_pl_state);
if (steal_task(tid)) {
// 拿到了一个 task,返回
return true;
}
#else
// 没有保存上一个 state,读取最新的 state
const ParkingLot::State st = _pl->get_state();
if (st.stopped()) {
return false;
}
if (steal_task(tid)) {
// 拿到了一个 task,返回
return true;
}
// 没偷成功,等待 state 发生变化,位置和上面不一样,因为 state 是最新的
_pl->wait(st);
#endif
} while (true);
}
TaskGroup::steal_task 实现:
bool steal_task(bthread_t* tid) {
if (_remote_rq.pop(tid)) {
// 从 _remote_rq 里拿到了一个 task,返回
return true;
}
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
// 保存当前 state,下一次调用 wait_task 时则是上一次 state
_last_pl_state = _pl->get_state();
#endif
// 否则从其他 worker 那里偷一个过来
return _control->steal_task(tid, &_steal_seed, _steal_offset);
}
bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
auto tag = tls_task_group->tag();
// 先从对应 tag 的 FIFO queue 里尝试偷一个,这里的偷取可能有 FP
if (_priority_queues[tag].steal(tid)) {
return true;
}
...
// 获取 tag 所有的 group,从它们的 _rq 或者 _remote_rq 里偷一个
bool stolen = false;
size_t s = *seed;
auto& groups = tag_group(tag);
for (size_t i = 0; i < ngroup; ++i, s += offset) {
TaskGroup* g = groups[s % ngroup]; // g is possibly NULL because of concurrent_destroy_group
if (g) {
if (g->_rq.steal(tid)) {
stolen = true;
break;
}
if (g->_remote_rq.pop(tid)) {
stolen = true;
break;
}
}
}
*seed = s; // 每尝试一次偷取,seed 会加 offset
return stolen;
}
TaskGroup::sched_to 实现:
inline void TaskGroup::sched_to(TaskGroup** pg, bthread_t next_tid) {
TaskMeta* next_meta = address_meta(next_tid); // 从 pool 里拿到了 task_meta
if (next_meta->stack == NULL) { // stack 未初始化
// 分配 stack,实现见上文,task_runner 作为 bthread 的入口函数
ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
if (stk) {
next_meta->set_stack(stk);
} else {
// stk 指针为空,说明是 STACKTYPE_PTHREAD,或者 OOM 了,分配不了新的
// stack。对于后者,强行把这个 task 改成 PTHREAD 模式,会在上文直接
// 在 pthread 里执行这个 task
// 同时把 meta 里的 stack 指针重置为 _main_stack,作为标记
next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
next_meta->set_stack((*pg)->_main_stack);
}
}
sched_to(pg, next_meta);
}
// 这并非一个普通的函数,在执行这个函数时会跳到其他栈上执行其他栈的函数,当这个函数跳转
// 回来的时候,运行它的线程可能已经换了,或者 jump_stack 后永远不会再回来了
// (bthread 执行完毕之后被回收了,不会 jump 回去了)
void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
TaskGroup* g = *pg;
TaskMeta* const cur_meta = g->_cur_meta;
// Switch to the task
if (__builtin_expect(next_meta != cur_meta, 1)) {
g->_cur_meta = next_meta; // 设置当前的 task 为 next
// 切换 tls,指向下一个 task 的 local storage
cur_meta->local_storage = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls);
BAIDU_SET_VOLATILE_THREAD_LOCAL(tls_bls, next_meta->local_storage);
if (cur_meta->stack != NULL) {
// cur_meta->stack != NULL 说明不是 STACKTYPE_PTHREAD
if (next_meta->stack != cur_meta->stack) {
// stack 不一样,说明没复用栈 or 不是 _main_stack,跳转过去使用其他栈
jump_stack(cur_meta->stack, next_meta->stack);
// 跳回来了,group 可能变了(被其他 worker 偷走了),从内存里读取最新的 tls group
g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
}
}
} else {
LOG(FATAL) << "bthread=" << g->current_tid() << " sched_to itself!";
}
// 执行一些回调,通常是之前的一些 task 被挂起了,
// 一些任务需要推迟到下一次上下文切换时执行,
// sched_to 的时候执行它的回调
while (g->_last_context_remained) {
RemainedFn fn = g->_last_context_remained;
g->_last_context_remained = NULL;
fn(g->_last_context_remained_arg);
// 同样,group 可能变了(回调内部有 sched_to),从内存里读取最新的 tls group
g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
}
*pg = g;
}
TaskGroup::task_runner 实现
// bthread 的入口函数
void TaskGroup::task_runner(intptr_t skip_remained) {
// NOTE: tls_task_group is volatile since tasks are moved around
// different groups.
TaskGroup* g = tls_task_group;
if (!skip_remained) {
// 如果不 skip(新建了一个 bthread 并执行),执行之前注册的回调
while (g->_last_context_remained) {
RemainedFn fn = g->_last_context_remained;
g->_last_context_remained = NULL;
fn(g->_last_context_remained_arg);
// 同样,group 可能变了
g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
}
}
do {
// Meta and identifier of the task is persistent in this run.
TaskMeta* const m = g->_cur_meta;
// 执行用户的主函数,只会抓 bthread_exit() 抛出的 ExitException
void* thread_return;
try {
// 当下面这个函数返回,说明用户 bthread 已经执行完毕
// 注意执行用户 bthread 时可能会切到其他的 bthread,
// 并非只运行这一个 bthread,m->fn 是最外面的栈,当它
// 弹出时,说明这个 bthread 已经结束了
thread_return = m->fn(m->arg);
} catch (ExitException& e) {
thread_return = e.value();
}
... // Thread local storage 相关
// 执行完 m->fn 后 group 可能变了
g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
{
// 确保 version > 0,防止 tid 为 0,0 不合法
BAIDU_SCOPED_LOCK(m->version_lock);
if (0 == ++*m->version_butex) {
++*m->version_butex;
}
}
butex_wake_except(m->version_butex, 0); // 唤醒,相关逻辑见后面 Butex
g->set_remained(_release_last_context, m); // 注册回调 _release_last_context,回收资源
ending_sched(&g);
} while (g->_cur_meta->tid != g->_main_tid);
// 正常作为 bthread 入口函数执行不会到这里,有点费解,不过 bthread 入口函数不能弹出,
// 只有在 TaskGroup::run_main_task 直接里执行 task_runner(见上文)才会到这里,
// 可以退出,执行其他的 task
// Was called from a pthread and we don't have BTHREAD_STACKTYPE_PTHREAD
// tasks to run, quit for more tasks.
}
// 这个回调用于回收 bthread 的栈,回收 TaskMeta 对象
void TaskGroup::_release_last_context(void* arg) {
TaskMeta* m = static_cast<TaskMeta*>(arg);
if (m->stack_type() != STACK_TYPE_PTHREAD) {
// 放回 Stack 对象池内
return_stack(m->release_stack()/*may be NULL*/);
} else {
// it's _main_stack, don't return.
m->set_stack(NULL);
}
// 回收 TaskMeta 对象
return_resource(get_slot(m->tid));
}
TaskGroup::ending_sched 实现
void TaskGroup::ending_sched(TaskGroup** pg) {
TaskGroup* g = *pg;
bthread_t next_tid = 0;
// Find next task to run, if none, switch to idle thread of the group.
#ifndef BTHREAD_FAIR_WSQ
// When BTHREAD_FAIR_WSQ is defined, profiling shows that cpu cost of
// WSQ::steal() in example/multi_threaded_echo_c++ changes from 1.9%
// to 2.9%
// LIFO 不公平调度,但是 cache 友好
// XXX:这里是安全的吗?不会和其他 _rq->push 冲突吗?
const bool popped = g->_rq.pop(&next_tid);
#else
// FIFO 公平调度,steal 是并发安全的
const bool popped = g->_rq.steal(&next_tid);
#endif
if (!popped && !g->steal_task(&next_tid)) {
// Jump to main task if there's no task to run.
next_tid = g->_main_tid;
}
TaskMeta* const cur_meta = g->_cur_meta; // 准备结束的 bthread meta
TaskMeta* next_meta = address_meta(next_tid); // 下一个 task
if (next_meta->stack == NULL) { // 下一个 task 没初始化,或者是 pthread stack
if (next_meta->stack_type() == cur_meta->stack_type()) {
// 复用当前准备结束的 bthread 的 stack
// 如果是 pthread stask,_main_stack 是空的,复用是个空操作
// Reuse the stack of the current ending task.
// also works with pthread_task scheduling to pthread_task, the
// transfered stack is just _main_stack.
next_meta->set_stack(cur_meta->release_stack());
} else {
// 和上面 sched_to 类似逻辑,分配 task
ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
if (stk) {
next_meta->set_stack(stk);
} else {
next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
next_meta->set_stack(g->_main_stack);
}
}
}
// 切到下一个 task
sched_to(pg, next_meta);
}
自此,worker 的后台调度逻辑形成了一个闭环,这里总结一下: 下文中 TaskGroup、worker 指代同一个概念,即运行 Task 的主体。
- 每个
TaskGroup对象有两个队列,rq_和remote_rq_,默认情况下均是 FIFO 队列;每个TaskGroup有一个ParkingLot结构,ParkingLot由futex和state构成,用于等待指定state满足;每个TaskGroup有一个TaskMeta指针_cur_meta,表示当前运行的 Task。 - 每个 worker 一个
pthread线程、绑定一个 ThreadLocal 的TaskGroup对象、运行主函数run_main_task,我们分以下四种情况分析,可以覆盖上文代码中所有路径:- 情况 1:当前没有可以运行的 bthread,所有的队列均为空:
wait_task将阻塞在等待ParkingLot状态变化上,所有的 worker 均处于阻塞状态。
- 情况 2:在情况 1 背景下,某个队列里出现了一个可以运行的 Task(空 for 循环 100 次):
- 某个
TaskGroupor 某些TaskGroup的ParkingLot状态变化,被唤醒,最终有一个TaskGroup执行steal_task时得到了这个 Task,其余TaskGroup继续阻塞等待ParkingLot状态发生变化,这个TaskGroup将sched_to到这个 Task 上执行任务。 - 由于这个 Task 首次运行,先分配栈,
TaskGroup的_cur_meta更新为这个 Task,由于这个 Task 的栈和之前_main_task的栈不一样,所以调用jump_stack跳转到这个 Task 的栈上,开始执行栈上的第一个函数task_runner。 task_runner开始执行用户的函数,空 for 循环 100 次,退出回到task_runner,没有唤醒其他 task,注册回收的回调,执行ending_sched。- 由于没有新的 Task,
ending_sched得到下一个 Task 为_main_task,切换过去,回到之前jump_stack之后继续执行,执行之前的回收 Task 回调,回收栈空间和TaskMeta对象,随后回到 worker 的主函数run_main_task继续执行。 - worker 陷入下一轮
wait_task。
- 某个
- 情况 3:在情况 2 背景下,
ending_sched阶段出现了一个新的 Task,并且这个 Task 和之前的 Task 是一种栈类型:ending_sched复用原来的 Task 的栈,将原来的栈指针设置为空(回收回调中则不会回收栈,只会回收TaskMeta对象),进入sched_to。sched_to更新当前TaskGroup的_cur_meta为新的 Task,由于之前的cur_meta的 stack 被新的 Task 复用了,为空,没有执行跳转逻辑,接着执行回收 Task 回调,只清理之前的TaskMeta对象,随后回到task_runner当中。task_runner判断while (g->_cur_meta->tid != g->_main_tid);成立,继续下一轮循环,执行新的 Task 中用户的函数。- 接着逻辑就和上面类似,直到
ending_sched切回_main_task,Task 被回收,回到 worker 主函数陷入下一轮wait_task。
- 情况 4:在情况 2 背景下,给新的 Task 分配栈失败了(OOM):
- 这个 Task 的 stack 改为
_main_stack,进入sched_to,更新当前_cur_meta为该 Task,由于之前的cur_meta也是_main_stack,和新的 Task stack 一样,不进行跳转。 sched_to返回,回到 worker 主函数run_main_task继续执行;判断if (_cur_meta->tid != _main_tid);成立,在 woker 主函数中直接运行task_runner(由于没有新建 bthread 栈,所以不执行 remained 回调)。task_runner执行完用户的函数,注册回收函数,执行ending_sched,和之前一样,新的 task 为_main_task,和当前的 task 栈一样,不进行跳转,执行回收函数不会回收_main_stack,仅回收TaskMeta,随后回到task_runner当中。task_runner判断while (g->_cur_meta->tid != g->_main_tid);不成立,当前 task 为_main_task,退出循环,函数返回,由于task_runner是直接在 worker 中调用的,这里函数返回是安全的,回到 worker 主函数陷入下一轮wait_task。
- 这个 Task 的 stack 改为
- 情况 1:当前没有可以运行的 bthread,所有的队列均为空:
TaskGroup 除了上文的调度实现,还有一些 Task Dispatch/Suspend 逻辑,用户可以使用 bthread_start_urgent 或者 bthread_start_background Dispatch 一个 Task,使用 bthread_usleep 或者 bthread_yield Suspend 一个 Task。
例如 bthread_start_urgent 和 bthread_start_background 的实现逻辑大概如下:
- 获取当前 Thread Local 的
TaskGroup,如果TaskGroup存在,且tag匹配,则调用TaskGroup::start_foreground/TaskGroup::start_background<false>。 a.<false>表示没有跨线程。 - 否则获取全局的
TaskControl,找到tag匹配的TaskGroup调用TaskGroup::start_background<true>。 a.<true>表示跨线程。 b. 额外的,如果attr & BTHREAD_NOSIGNAL,则选择一个全局特殊的tls_task_group_nosignalTaskGroup调用start_background,并且调用完使用bthread_flush()(unstable API)驱动唤醒。这种模式适合 batch dispatch bthread。
start_foreground 和 start_background 的区别在于前者会直接 sched_to 新的 Task,并且要求只能作用于本地的 TaskGroup,不能跨线程;后者为常用的 API,会将 Task 插入到队列末尾,只看后者实现:
template <bool REMOTE>
int TaskGroup::start_background(bthread_t* __restrict th,
const bthread_attr_t* __restrict attr,
void* (*fn)(void*),
void* __restrict arg) {
const bthread_attr_t using_attr = (attr ? *attr : BTHREAD_ATTR_NORMAL);
butil::ResourceId<TaskMeta> slot;
TaskMeta* m = butil::get_resource(&slot);
...
m->fn = fn;
m->arg = arg;
m->tid = make_tid(*m->version_butex, slot);
*th = m->tid;
...
if (REMOTE) {
ready_to_run_remote(m, (using_attr.flags & BTHREAD_NOSIGNAL));
} else {
ready_to_run(m, (using_attr.flags & BTHREAD_NOSIGNAL));
}
return 0;
}
void TaskGroup::ready_to_run_remote(TaskMeta* meta, bool nosignal) {
_remote_rq._mutex.lock(); // _remote_rq 访问需要上锁
while (!_remote_rq.push_locked(meta->tid)) {
flush_nosignal_tasks_remote_locked(_remote_rq._mutex); // _remote_rq is full
::usleep(1000);
_remote_rq._mutex.lock();
}
if (nosignal) {
++_remote_num_nosignal;
_remote_rq._mutex.unlock();
} else {
// 累积过去未 signal 的次数
const int additional_signal = _remote_num_nosignal;
_remote_num_nosignal = 0;
_remote_rq._mutex.unlock();
_control->signal_task(1 + additional_signal, _tag);
}
}
void TaskGroup::ready_to_run(TaskMeta* meta, bool nosignal) {
push_rq(meta->tid);
if (nosignal) {
++_num_nosignal;
} else {
const int additional_signal = _num_nosignal;
_num_nosignal = 0;
_control->signal_task(1 + additional_signal, _tag);
}
}
它们都统一走到了 TaskControl::signal_task 下,这个函数的实现如下:
void TaskControl::signal_task(int num_task, bthread_tag_t tag) {
// TODO(gejun):当前算法并不能保证创建足够多的线程以满足调用方的需求;
// 但另一方面,依据当前实现,也存在大量无谓的唤醒
// 对并发度进行限制,是在调度性能与任务及时性之间取得的良好平衡。
if (num_task > 2) {
num_task = 2; // Magic!
}
auto& pl = tag_pl(tag); // 拿到这个 tag 下的所有 parking-lot,默认一共 4 个
// 新建 Group 的时候是随机取得,这里保证公平性也随机选一个开始
int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM;
num_task -= pl[start_index].signal(1);
if (num_task > 0) {
// 继续唤醒更多 worker
for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) {
if (++start_index >= PARKING_LOT_NUM) {
start_index = 0;
}
num_task -= pl[start_index].signal(1);
}
}
if (num_task > 0 && FLAGS_bthread_min_concurrency > 0 && // test min_concurrency for performance
_concurrency.load(butil::memory_order_relaxed) < FLAGS_bthread_concurrency) {
// 仍然不够,但发现当初以 FLAGS_bthread_min_concurrency 初始化的 worker 数量,
// 准备加更多的 worker
BAIDU_SCOPED_LOCK(g_task_control_mutex);
if (_concurrency.load(butil::memory_order_acquire) < FLAGS_bthread_concurrency) {
add_workers(1, tag);
}
}
}
另一个常用 API bthread_usleep 的实现逻辑如下:
int TaskGroup::usleep(TaskGroup** pg, uint64_t timeout_us) {
if (0 == timeout_us) {
yield(pg);
return 0;
}
TaskGroup* g = *pg;
// We have to schedule timer after we switched to next bthread otherwise
// the timer may wake up(jump to) current still-running context.
// sched_to 一个 running 的 task 是非法的,必须保证先把当前任务挂起,让
// 下一个任务注册 sleep_event
SleepArgs e = {timeout_us, g->current_tid(), g->current_task(), g};
g->set_remained(_add_sleep_event, &e);
// 在 rq_ 中选择下一个任务/或者 steal 一个切换过去,下一个任务从 sched_to 中的
// jump_stack 处恢复 or 开始执行 task_runner(0),这两个函数都会执行上面注册
// 的回调,添加 sleep_event
sched(pg);
g = *pg; // 从其他 woker 那恢复,pg 可能变了
...
return 0;
}
// yield 就很简单了,挂起当前 Task,下一个 Task 会把当前 Task 放到 rq_ 里,
// 并唤醒 workers 去拿
void TaskGroup::yield(TaskGroup** pg) {
TaskGroup* g = *pg;
ReadyToRunArgs args = {g->tag(), g->_cur_meta, false};
g->set_remained(ready_to_run_in_worker, &args);
sched(pg);
}
void TaskGroup::_add_sleep_event(void* void_args) {
// Must copy SleepArgs. After calling TimerThread::schedule(), previous
// thread may be stolen by a worker immediately and the on-stack SleepArgs
// will be gone.
SleepArgs e = *static_cast<SleepArgs*>(void_args);
TaskGroup* g = e.group;
TimerThread::TaskId sleep_id;
// 放到之前提到的全局 timer pthread 里调度,唤醒后执行回调
// ready_to_run_from_timer_thread
sleep_id = get_global_timer_thread()->schedule(
ready_to_run_from_timer_thread, void_args,
butil::microseconds_from_now(e.timeout_us));
... // interrupt 处理
}
static void ready_to_run_from_timer_thread(void* arg) {
CHECK(tls_task_group == NULL); // 当前是在 timer pthread 里,没有 tls task group
const SleepArgs* e = static_cast<const SleepArgs*>(arg);
auto g = e->group;
auto tag = g->tag();
// 找到对应 tag 的 group,推到 remote_rq 里
g->control()->choose_one_group(tag)->ready_to_run_remote(e->meta);
}
ExecutionQueue
ExecutionQueue 提供了一个 FIFO 的执行队列,会按顺序提交 task 到 bthread 中执行。
默认情况下内部使用 bthread_start_background(可以支持在没有 TLS TaskGroup 环境中提交 task),启动 _execute_tasks,迭代 task node 链表。
代码写的有点乱,看不太懂,链表是无锁的,提交 task 过程仅涉及原子操作。支持 high_priority task 插队。
不太一样的是用户的 task hanlder 函数拿到的是一个 task 迭代器,而非某个 task,这对 cache locality 比较友好,也方便用户攒 batch 处理:
int demo_execute(void* meta, TaskIterator<T>& iter) {
if (iter.is_queue_stopped()) {
// destroy meta and related resources
return 0;
}
for (; iter; ++iter) {
// do_something(*iter)
// or do_something(iter->a_member_of_T)
}
return 0;
}
Bthread 提供这个组件带有一个执行器角色,使用更底层的原语,比如在 channel 上包装一层,下面的 golang 也能实现一样的逻辑:
// worker goroutine(执行器)
go func(){
for task := range queue {
doSomething(task)
}
}()
// submit tasks
queue <- task1
queue <- task2
而 bthread 并没有提供 channel(bounded channel)这一种原语,文档里阐述了部分原因(读者认为这限制了一些自由,多个 channel 可以多路复用共享一个执行器,ExecutionQueue 这种绑定方式就无法实现了):
Q:bthread 会有 Channel 吗? 不会。channel 代表的是两点间的关系,而很多现实问题是多点的,这个时候使用 channel 最自然的解决方案就是:有一个角色负责操作某件事情或某个资源,其他线程都通过 channel 向这个角色发号施令。如果我们在程序中设置 N 个角色,让它们各司其职,那么程序就能分类有序地运转下去。所以使用 channel 的潜台词就是把程序划分为不同的角色。channel 固然直观,但是有代价:额外的上下文切换。做成任何事情都得等到被调用处被调度,处理,回复,调用处才能继续。这个再怎么优化,再怎么尊重 cache locality,也是有明显开销的。另外一个现实是:用 channel 的代码也不好写。由于业务一致性的限制,一些资源往往被绑定在一起,所以一个角色很可能身兼数职,但它做一件事情时便无法做另一件事情,而事情又有优先级。各种打断、跳出、继续形成的最终代码异常复杂。 我们需要的往往是 buffered channel,扮演的是队列和有序执行的作用,bthread 提供了 ExecutionQueue,可以完成这个目的。
并发原语
Butex / bthread::Mutex / bthread::Condvar
Butex 相关组件为 bthread 提供了类似 futex 的并发原语,它可以在阻塞的时候挂起当前 bthread,而非阻塞 pthread worker。Butex 这个组件仅供 brpc 内部使用,外面使用 bthread::Mutex,它提供 pthread_mutex 类似的接口。
Butex 的结构如下,字段 value 的指针会作为这个对象的 handle 在各个地方被使用,而非 Butex 指针:
struct Butex {
butil::atomic<int> value;
butil::LinkedList<ButexWaiter> waiters;
FastPthreadMutex waiter_lock;
};
Butex 的 API 也和 futex 一一对应,内部如果操作的是 pthread 则 fallback 到 futex API:
// futex_op:
// FUTEX_WAIT 如果 *uaddr == val,则挂起当前线程,直到被 FUTEX_WAKE 唤醒
// FUTEX_WAKE 唤醒最多 val 个在 uaddr 上等待的线程
// FUTEX_WAIT_BITSET 更灵活的等待(可指定唤醒掩码)
// FUTEX_WAKE_BITSET 按位掩码唤醒特定等待者
// FUTEX_LOCK_PI / FUTEX_UNLOCK_PI 支持优先级继承(Priority Inheritance)的互斥锁
// FUTEX_CMP_REQUEUE 条件性地将等待者从一个 futex 队列移到另一个(用于实现条件变量)
int futex(int* uaddr, int futex_op, int val, const struct timespec* timeout, /* or: uint32_t val2 */ int* uaddr2, int val3);
// butex:
int butex_wake(void* butex, bool nosignal = false);
int butex_wake_n(void* butex, size_t n, bool nosignal = false);
int butex_wake_all(void* butex, bool nosignal = false);
int butex_wake_except(void* butex, bthread_t excluded_bthread);
// Wake up at most 1 thread waiting on |butex1|, let all other threads wait
// on |butex2| instead.
// Returns # of threads woken up.
int butex_requeue(void* butex1, void* butex2);
// Returns 0 on success, -1 otherwise and errno is set.
int butex_wait(void* butex, int expected_value, const timespec* abstime, // Different from FUTEX_WAIT, butex_wait uses absolute time.
bool prepend = false); // If |prepend| is true, queue the bthread at the head of the queue,
和 brpc 很多组件一样,Butex 对象也有个对象池负责缓冲分配和回收,不过这个对象池还有其他作用,这个我们后面会谈到,先看下这三个关键操作 wake, wait, requeue 是如何实现的:
int butex_wake(void* arg, bool nosignal) {
//.. 上锁 Butex::waiter_lock,拿到第一个 waiters: front
ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
// 如果有 sleep 任务,则取消调度 sleep,说明 wait->wake 很快
unsleep_if_necessary(bbw, get_global_timer_thread());
// 拿到对应 tag 的 task_group,会随机选中
TaskGroup* g = get_task_group(bbw->control, bbw->tag);
if (g == tls_task_group) {
if (!nosignal) {
// cur_meta 和 next_meta 替换执行,比起唤醒,这种方式调度 next_meta
// 更快
TaskGroup::exchange(&g, next_meta);
} else {
// 提交到本地 rq 里,不进行唤醒 worker
g->ready_to_run(next_meta, true);
}
} else {
// 提交到 remote_rq_ 里
// nosignal = nosignal && tls_task_group ?->tag() == g->tag()
// 跨 tag wakeup 会 signal
g->ready_to_run_remote(bbw->task_meta, check_nosignal(nosignal, g->tag()));
}
return 1;
}
// butex_wake_n, butex_wake_all, butex_wake_except 类似,细节上就不展开了
int butex_requeue(void* arg, void* arg2) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
Butex* m = container_of(static_cast<butil::atomic<int>*>(arg2), Butex, value);
ButexWaiter* front = NULL;
{
// lck1, lck2 构建,延迟上锁
std::unique_lock<FastPthreadMutex> lck1(b->waiter_lock, std::defer_lock);
std::unique_lock<FastPthreadMutex> lck2(m->waiter_lock, std::defer_lock);
// 这里比较 lck1 lck2 内存地址,按顺序上锁,避免死锁
butil::double_lock(lck1, lck2);
if (b->waiters.empty()) {
return 0;
}
// 唤醒 front
front = b->waiters.head()->value();
front->RemoveFromList();
front->container.store(NULL, butil::memory_order_relaxed);
// requeue 剩余的
while (!b->waiters.empty()) {
ButexWaiter* bw = b->waiters.head()->value();
bw->RemoveFromList();
m->waiters.Append(bw);
bw->container.store(m, butil::memory_order_relaxed);
}
}
//... 和上面 wake front 一样
}
Wakeup 的实现都比较 naive,wait 的实现就要考虑非常多的 corner case 了,特别是支持 interrupt 和 pthread fallback 的代码片段,总体来说,它符合以下模型:
- 检查
val->load() == expected_value,如果不等于设置errno = EAGAIN,返回-1,调用方需要重新检查。 - TLS TaskGroup 切换到下一个 task。
- 将之前的任务注册进
Butex::waiters,如果有timeout,则schedule一个timertaskwakeup。
// Returns 0 on success, -1 otherwise and errno is set.
int butex_wait(void* arg, int expected_value, const timespec* abstime, bool prepend) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
if (b->value.load(butil::memory_order_relaxed) != expected_value) {
errno = EWOULDBLOCK;
// Sometimes we may take actions immediately after unmatched butex,
// this fence makes sure that we see changes before changing butex.
butil::atomic_thread_fence(butil::memory_order_acquire);
return -1;
}
TaskGroup* g = tls_task_group;
//...
ButexBthreadWaiter bbw;
// tid is 0 iff the thread is non-bthread
bbw.tid = g->current_tid();
bbw.container.store(NULL, butil::memory_order_relaxed);
bbw.task_meta = g->current_task();
bbw.sleep_id = 0;
bbw.waiter_state = WAITER_STATE_READY;
bbw.expected_value = expected_value;
bbw.initial_butex = b;
bbw.control = g->control();
bbw.abstime = abstime;
bbw.tag = g->tag();
//...
// release fence matches with acquire fence in interrupt_and_consume_waiters
// in task_group.cpp to guarantee visibility of `interrupted'.
bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release);
WaitForButexArgs args{&bbw, prepend};
// 下一个 task 会执行回调 wait_for_butex 当前的 task
g->set_remained(wait_for_butex, &args);
// 切到下一个 task 执行
TaskGroup::sched(&g);
// 切回来的时候说明条件满足了,检查一下
// erase_from_butex_and_wakeup(called by TimerThread) is possibly still
// running and using bbw. The chance is small, just spin until it's done.
BT_LOOP_WHEN(unsleep_if_necessary(&bbw, get_global_timer_thread()) < 0, 30/*nops before sched_yield*/);
// If current_waiter is NULL, TaskGroup::interrupt() is running and using bbw.
// Spin until current_waiter != NULL.
BT_LOOP_WHEN(bbw.task_meta->current_waiter.exchange(
NULL, butil::memory_order_acquire) == NULL,
30/*nops before sched_yield*/);
bool is_interrupted = false;
if (bbw.task_meta->interrupted) {
// Race with set and may consume multiple interruptions, which are OK.
bbw.task_meta->interrupted = false;
is_interrupted = true;
}
// If timed out as well as value unmatched, return ETIMEDOUT.
if (WAITER_STATE_TIMEDOUT == bbw.waiter_state) {
errno = ETIMEDOUT;
return -1;
} else if (WAITER_STATE_UNMATCHEDVALUE == bbw.waiter_state) {
errno = EWOULDBLOCK;
return -1;
} else if (is_interrupted) {
errno = EINTR;
return -1;
}
return 0;
}
void wait_for_butex(void* arg) {
auto args = static_cast<WaitForButexArgs*>(arg);
ButexBthreadWaiter* const bw = args->bw;
Butex* const b = bw->initial_butex;
// 1: waiter with timeout should have waiter_state == WAITER_STATE_READY
// before they're queued, otherwise the waiter is already timedout
// and removed by TimerThread, in which case we should stop queueing.
//
// Visibility of waiter_state:
// [bthread] [TimerThread]
// waiter_state = TIMED
// tt_lock{ add task}
// tt_lock{ get task}
// waiter_lock{ waiter_state=TIMEDOUT}
// waiter_lock{ use waiter_state}
// tt_lock represents TimerThread::_mutex. Visibility of waiter_state is
// sequenced by two locks, both threads are guaranteed to see the correct
// value.
{
BAIDU_SCOPED_LOCK(b->waiter_lock);
if (b->value.load(butil::memory_order_relaxed) != bw->expected_value) {
bw->waiter_state = WAITER_STATE_UNMATCHEDVALUE;
} else if (bw->waiter_state == WAITER_STATE_READY /*1*/ &&
!bw->task_meta->interrupted) {
if (args->prepend) {
b->waiters.Prepend(bw);
} else {
b->waiters.Append(bw);
}
bw->container.store(b, butil::memory_order_relaxed);
if (bw->abstime != NULL) {
bw->sleep_id = get_global_timer_thread()->schedule(
erase_from_butex_and_wakeup, bw, *bw->abstime);
if (!bw->sleep_id) { // TimerThread stopped.
errno = ESTOP;
erase_from_butex_and_wakeup(bw);
}
}
return;
}
}
// 看不懂,但 wait 结束了,可以恢复当前的 current_task 了
// b->container is NULL which makes erase_from_butex_and_wakeup() and
// TaskGroup::interrupt() no-op, there's no race between following code and
// the two functions. The on-stack ButexBthreadWaiter is safe to use and
// bw->waiter_state will not change again.
// unsleep_if_necessary(bw, get_global_timer_thread());
tls_task_group->ready_to_run(bw->task_meta);
// FIXME: jump back to original thread is buggy.
//// Value unmatched or waiter is already woken up by TimerThread, jump
//// back to original bthread.
// TaskGroup* g = tls_task_group;
// ReadyToRunArgs args = {g->current_tid(), false};
// g->set_remained(TaskGroup::ready_to_run_in_worker, &args);
//// 2: Don't run remained because we're already in a remained function
//// otherwise stack may overflow.
// TaskGroup::sched_to(&g, bw->tid, false/*2*/);
}
上面我们埋下了一个伏笔:Butex 对象也有个对象池负责缓冲分配和回收,这个对象池可以用于解决 butex_wake() 与 butex_destroy() 之间的竞态条件,考虑下面的情况:
class Event {
public:
void wait() {
_mutex.lock();
if (!_done) {
_cond.wait(&_mutex);
}
_mutex.unlock();
}
void signal() {
_mutex.lock();
if (!_done) {
_done = true;
_cond.signal();
}
_mutex.unlock(); /*1*/
}
private:
bool _done = false;
Mutex _mutex;
Condition _cond;
};
// Thread 1:
void foo() {
Event event;
pass_to_thread2(&event); // 将 event 传递给 Thread2
event.wait();
} // 函数返回,event 被销毁
// Thread 2:
event.signal();
Thread1 将一个带有状态的条件变量(Event)传递给 Thread2,并等待其被触发,这通常意味着关联任务已完成,Thread1 可安全释放相关资源(包括互斥锁和条件变量)。该逻辑在语义上是正确的。
但存在一个微妙的竞态条件:
在 signal() 中的 /*1*/ 处解锁操作,其底层实现通常包含两个步骤:
locked->store(0); // 原子地释放锁
butex_wake(locked); // 唤醒等待者
在 store(0) 执行后,锁已被释放,此时 wait() 线程可能立即获取锁、完成剩余逻辑、退出 foo() 函数,并销毁 Event 对象(包括其内部的 butex)。
随后执行的 butex_wake(locked) 就会访问已释放的内存,导致程序崩溃。
有两种方法可以解决:
- 引用计数法:在
store(0)前增加引用计数,在butex_wake()后减少引用。 缺点:几乎所有使用butex_wake()的地方都需插入引用管理逻辑,极易出错。 - 永不释放 butex(采用 ObjectPool):内存不归还给系统,而是保留在池中复用。
副作用:
butex_wake()可能唤醒一个无关的、复用了相同内存地址的新 butex,导致虚假唤醒(spurious wakeup)。 但根据观察,上述竞态本身极为罕见,因此额外的虚假唤醒在实践中是可接受的。
Butex 和 Futex 一样,是底层的 API,应用程序通常会使用这个 API 的各种包装,比如 mutex, condvar。
bthread_mutex_t 提供和 pthread_mutex_t 对等的 API,基于 futex 的 API 实现一个 naive mutex 不难,参考下面的算法即可:
- 使用一个整型变量(如
int state)表示锁状态:0:未锁定1:已锁定2:已锁定且有线程在等待(用于避免不必要的futex_wait)
- 加锁流程:
a. 尝试原子地将
state从0改为1(CAS)。 b. 若失败(说明已被占用),则进入慢路径: - 将state设为2(表示有等待者)。 - 调用futex(&state, FUTEX_WAIT, 2, NULL, NULL, 0)阻塞。 c. 循环重试。 - 解锁流程:
a. 将
state从1或2改回0。 b. 如果原值是2(说明有等待者),调用futex(&state, FUTEX_WAKE, 1, NULL, NULL, 0)唤醒一个等待线程。
此外 bthread 提供了一个 contented profiler,可以分析锁竞争,bthread_mutex 对 butex 的 val 是按如下结构解析的:
struct MutexInternal {
butil::static_atomic<unsigned char> locked; // locked->exchange(1)==0 try_lock 上锁成功
butil::static_atomic<unsigned char> contended; // contended==1 表示有竞争
unsigned short padding;
};
const MutexInternal MUTEX_CONTENDED_RAW = {{1}, {1}, 0};
const MutexInternal MUTEX_LOCKED_RAW = {{1}, {0}, 0};
// “常量不能放入只读段”这一理由在现代 C++ 中基本不成立。可能是出于历史原因或对某些嵌入式/旧编译器的顾虑。
// 在 C++ 中,不同编译单元(translation units)之间的全局/静态变量的初始化顺序是未定义的。
// 如果一个全局 const 变量依赖于另一个编译单元中的全局变量,就可能在对方尚未初始化时被使用,导致未定义行为。
#define BTHREAD_MUTEX_CONTENDED (*(const unsigned*)&bthread::MUTEX_CONTENDED_RAW)
#define BTHREAD_MUTEX_LOCKED (*(const unsigned*)&bthread::MUTEX_LOCKED_RAW)
struct bthread_mutex_t {
uint32_t* butex; // uint32_t represented by MutexInternal
bthread_contention_site_t csite; // int64_t duration_ns; + size_t sampling_range; 用于锁竞争分析
bool enable_csite;
mutex_owner_t owner; // debug 使用,可用于检测 double lock
};
mutex_lock_contended_impl 的实现如下:
inline int mutex_lock_contended_impl(bthread_mutex_t* __restrict m,
const struct timespec* __restrict abstime) {
BTHREAD_MUTEX_CHECK_OWNER;
TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
if (BAIDU_UNLIKELY(NULL == g || g->rq_size() == 0)) {
// g == NULL:如果不自旋,只能阻塞整个 pthread,代价高,说不定锁马上就能释放,转一会
// g != NULL && g->rq_size() == 0:既然没别的事可做,不如自旋一下,可能立刻拿到锁继续执行,延迟更低。
for (int i = 0; i < MAX_SPIN_ITER; ++i) {
cpu_relax();
}
}
bool queue_lifo = false;
bool first_wait = true;
auto whole = (butil::atomic<unsigned>*)m->butex;
while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, abstime, queue_lifo) < 0 &&
errno != EWOULDBLOCK && errno != EINTR /*note*/) {
// EWOULDBLOCK 和 EINTR 通常会被忽略,不会返回给用户
return errno;
}
if (first_wait && 0 == errno) {
// 返回 EWOULDBLOCK 和 EINTR 说明根本没有 wait
first_wait = false;
}
if (!first_wait) {
// 通常情况下,bthread 会以 FIFO(先进先出)顺序排队。但在竞争互斥锁所有权时,
// 一个被唤醒的 bthread 很可能会输给新到达的 bthread。这是因为新到达的 bthread
// 已经在 CPU 上运行,而且数量可能很多。在这种情况下,为了保证公平性、避免饥饿,
// 被唤醒的 bthread 会被插入到等待队列的头部。
queue_lifo = true;
}
}
BTHREAD_MUTEX_SET_OWNER;
return 0;
}
Bthread 中的 Condvar 实现使用到了 butex_requeue 这个 API。
struct CondInternal {
butil::atomic<bthread_mutex_t*> m; // cv 关联的那把锁,初始化时是 NULL
butil::atomic<int>* seq; // notify 时会往上递增,进行唤醒
};
bthread_cond_wait 的实现如下,十分简单:
int bthread_cond_wait(bthread_cond_t* __restrict c,
bthread_mutex_t* __restrict m) {
bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c);
// 一定要在上锁的时候读取 expected_seq,否则其他 bthread 修改条件+notify 时
// 可能会丢失唤醒,释放锁后读取到的 seq 可能是 notify 写入的新值
const int expected_seq = ic->seq->load(butil::memory_order_relaxed);
if (ic->m.load(butil::memory_order_relaxed) != m) {
// bind m to c
bthread_mutex_t* expected_m = NULL;
if (!ic->m.compare_exchange_strong(expected_m, m, butil::memory_order_relaxed)) {
return EINVAL;
}
}
// 这里释放锁
bthread_mutex_unlock(m);
int rc1 = 0;
if (bthread::butex_wait(ic->seq, expected_seq, NULL) < 0 &&
errno != EWOULDBLOCK && errno != EINTR /*note*/) {
rc1 = errno;
}
const int rc2 = bthread_mutex_lock_contended(m);
return (rc2 ? rc2 : rc1);
}
bthread_cond_broadcast 利用了 butex_requeue API,唤醒一个 bthread 拿到锁,剩余的 bthread 移动到 mutex 上的 butex,当这个 bthread 处理完/陷入等待释放锁的时候,会唤醒剩余所有的绑定在这个 butex 上的 bthread,可以缓解惊群的问题,至少让一个 bthread 无阻碍的完成工作。
int bthread_cond_broadcast(bthread_cond_t* c) {
bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c);
bthread_mutex_t* m = ic->m.load(butil::memory_order_relaxed);
butil::atomic<int>* const saved_seq = ic->seq;
if (!m) {
return 0;
}
void* const saved_butex = m->butex;
// Wakeup one thread and requeue the rest on the mutex.
ic->seq->fetch_add(1, butil::memory_order_release);
bthread::butex_requeue(saved_seq, saved_butex);
return 0;
}
此外,bthread 还提供了信号量和读写锁的实现,不过使用的频率要比上面这两个原语要低,读写锁在一定程度上使用 DoublyBufferedData 更为高效,就不展开讨论了。
BThread-Local Storage
bthread_local 提供的一种 bthread 局部存储机制,类似 pthread_key_t 的 API,如下所示:
extern int bthread_key_create(bthread_key_t* key, void (*destructor)(void* data));
extern int bthread_key_delete(bthread_key_t key);
// 注意:如果当前线程不是由 brpc 服务器创建的,并且生命周期非常短(仅执行少量任务后立即退出),
// 应避免使用 bthread-local 存储。原因是在首次调用 bthread_setspecific 时,
// bthread-local 总会分配一个 keytable;该开销在长生命周期线程中可忽略不计,
// 但在短生命周期线程中则较为明显。brpc 服务器中的线程是特例,
// 因为它们会从服务器内的 bthread_keytable_pool_t 中复用 keytable。
extern int bthread_setspecific(bthread_key_t key, void* data);
extern void* bthread_getspecific(bthread_key_t key);
其核心是一个叫 KeyTable 结构,两层 Array 表。
- 第一级(KeyTable):包含
KEY_1STLEVEL_SIZE个指针,每个指向一个SubKeyTable。 - 第二级(SubKeyTable):每个包含
KEY_2NDLEVEL_SIZE个实际存储槽(Data 结构,含 ptr 和 version)。
这种设计避免了为每个 key 分配独立内存的开销,同时在 key 稀疏使用时节省空间(只按需分配 SubKeyTable)。
在 sched_to 中,切换 bthread 之前会切换 tls_bls,这个在上文已经介绍,在 task_runner 中,用户 bthread 结束时会清理 tls_bls:
void TaskGroup::task_runner(intptr_t skip_remained) {
...
do {
..
void* thread_return;
try {
thread_return = m->fn(m->arg);
} catch (ExitException& e) {
thread_return = e.value();
}
// 用户程序已经结束
// 清理 TLS(线程局部存储)变量,必须在修改 version_butex 之前完成,
// 否则,另一个刚刚 join 该 bthread 的 bthread 可能无法观察到 TLS 变量析构所产生的副作用。
LocalStorage* tls_bls_ptr = BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(tls_bls);
KeyTable* kt = tls_bls_ptr->keytable;
if (kt != NULL) {
return_keytable(m->attr.keytable_pool, kt);
// After deletion: tls may be set during deletion.
tls_bls_ptr = BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(tls_bls);
tls_bls_ptr->keytable = NULL;
m->local_storage.keytable = NULL; // optional
}
...
butex_wake_except(m->version_butex, 0);
} while (g->_cur_meta->tid != g->_main_tid);
...
}
后记
bRPC 是一个通用的 C++ RPC 框架,它的目标并非将某个 ad hoc 的 RPC 需求优化到极致,如果你愿意,完全可以用 pthread 手动管理 epoll 的 fd,对每个请求的执行路径做到极致掌控。brpc 真正解决的是在复杂、多变、高负载的实际生产场景中,如何让 RPC 通信整体表现稳定、高效且可预测,从而让 RPC 层本身不太会成为系统性能的瓶颈。
从今天回看,brpc 已是一个近十年历史的项目,但其技术理念依然先进,尤其是 bthread 相关的代码,经过长期打磨已臻成熟。它称得上是真正意义上的“工业级” RPC 框架,不仅稳定高效,还切实支撑起大量实际业务,甚至可以说养活了不少人。
近年来,work-stealing 模型与 run-to-completion 模型常被拿来比较。这两种调度模型并无优劣之分,关键在于是否契合具体的系统负载特征。在未明确负载模式、请求粒度、并发规模等上下文之前,泛泛而谈哪种模型更优,往往缺乏实际意义。bRPC 使用 work-stealing 模型,并在较新的版本支持 tag group 功能,这使得 work-stealing 边界可控,足够生产使用。
类似地,polling 与 event-driven / completion-driven 模型之间的对比也应如此看待。polling 在高吞吐、低延迟场景中更高效,它并非总是有效,而且它更费电,一点也不环保,部署方案也变得复杂