一根棍子(epoll)+ 一堆续体 + 线程池 + 定时器 = 一台完整的协程引擎
| # | 零件 | 作用 |
|---|---|---|
| ① | epoll(棍子) | OS 帮你监听所有网络连接,有事件才通知 |
| ② | 续体(状态机) | 每个协程是一小段可暂停的函数,活在堆上 |
| ③ | 定时器队列 | 按时间排序的协程列表,"delay 到了叫醒我" |
| ④ | 线程池 | 几个 OS 线程,负责实际执行协程代码 |
| ⑤ | Dispatcher | 决定哪个协程交给哪个线程池 |
| ⑥ | 作用域 | 父子关系,取消传播 |
// 协程A: 网络请求
launch { val data = httpGet("https://api.example.com/user"); cache(data) }
// 协程B: 定时等待
launch { delay(2000) }
// 协程C: CPU 计算
launch { heavyComputation() }
// 这就是那根棍子!线程在这等,不是轮询
epoll_wait(
epoll_fd, // epoll 实例
events, // 输出:哪些 fd 有事件了
timeout = 到最近定时器的时间差 // ← 关键!不是永久等
);
// timeout 计算:
// 定时器队列最早到期:协程B = 10:00:02.000
// 现在:10:00:00.000
// timeout = 2000ms
// 同时等三件事:
// ① socket fd_A 有数据了 → 立刻醒
// ② 等了 2000ms 还没数据 → 也醒了(定时器到期)
// ③ 新协程被 launch → eventfd 通知 → 醒了
epoll_wait 的 timeout 是动态计算的——每次取值都看定时器队列里最早到期的那个还剩多久。网络事件到来也会提前返回,不是傻等到 timeout 结束。
| 时间 | 线程在干嘛 | epoll | 定时器 | 调度队列 |
|---|---|---|---|---|
| 0ms | 跑协程A:httpGet → 挂起 → fd_A 注册 epoll | {fd_A→A} | 空 | [A,B,C]→[B,C] |
| 0ms | 跑协程B:delay(2000) → 续体B 进定时器 | {fd_A→A} | {(B,2000)} | [C] |
| 0ms | 跑协程C:CPU计算 → 完成 | {fd_A→A} | {(B,2000)} | [] |
| 0-1000ms | 队列空,睡!epoll_wait(timeout=2000) | {fd_A→A} | {(B,2000)} | [] |
| 1000ms | fd_A 有数据,被唤醒!续体A 进队列 | {fd_A→A} | {(B,1000)} | [A] |
| 1000ms | 跑协程A(从 httpGet 后继续):cache(data) → 完成 | {fd_A→A} | {(B,1000)} | [] |
| 1000-2000ms | 队列空,睡!epoll_wait(timeout=1000) | {fd_A→A} | {(B,1000)} | [] |
| 2000ms | timeout 到期,续体B 进队列 | {fd_A→A} | 空 | [B] |
| 2000ms | 跑协程B:println("2秒到了") → 完成 | {fd_A→A} | 空 | [] |
| 2000ms+ | 全空,永久睡。epoll_wait(timeout=∞) 等新协程 | {fd_A→A} | 空 | [] |
整个过程中,只有1 个线程,0 次轮询,0 次 CPU 空转。
# 协程 Runtime 的核心循环(伪代码)
ready_queue = [] # 就绪协程
timer_queue = PriorityQueue() # 定时器队列,按到期时间排序
epoll_fd = epoll_create()
while True:
# 第 1 步:处理就绪协程
while ready_queue:
coroutine = ready_queue.pop(0)
result = coroutine.resume() # 执行协程,直到它挂起
if result == SUSPENDED_ON_FD:
# 协程在等 socket → 注册到 epoll
epoll_ctl(epoll_fd, ADD, coroutine.fd, coroutine)
elif result == SUSPENDED_ON_TIMER:
# 协程在等定时器 → 塞进定时器队列
timer_queue.push(coroutine, wakeup_time)
elif result == COMPLETED:
pass # 结束了,啥也不用干
# 第 2 步:没就绪协程了,调 epoll_wait 等
next_timer = timer_queue.peek() # 最近的定时器
if next_timer is None:
timeout = -1 # 没定时器 → 永久等
else:
timeout = max(0, next_timer.wakeup_time - now())
events = epoll_wait(epoll_fd, timeout=timeout)
# 第 3 步:醒来,三种可能被唤醒的原因
# ① socket 有数据了
for event in events:
ready_queue.append(event.bound_coroutine)
# ② 定时器到了
while timer_queue and timer_queue.peek().wakeup_time <= now():
ready_queue.append(timer_queue.pop())
# ③ epoll_wait 被新 launch 的协程通过 eventfd 唤醒
# (eventfd 也是 epoll 注册的一个 fd)
# 处理同 ①,循环回到第 1 步
# 回到循环开头,继续处理就绪队列
一个协程引擎 = while True 循环:1.拿就绪协程→跑→挂起了注册到 epoll 或定时器队列;2.队列空了→epoll_wait(timeout=最近定时器剩余时间)→线程睡觉;3.醒了→把唤醒的协程塞回就绪队列→回到 1。整个引擎就一个循环,几个线程,一堆堆上的续体对象,一根棍子(epoll)+ 一个闹钟队列,撑起百万并发。