ModelRunner
定义了一个名为 ModelRunner 的核心类,用于在 nanovllm 框架下高效地加载和运行 Qwen3 大语言模型,其主要作用是构建一个支持多 GPU 张量并行(Tensor Parallelism)的高性能推理引擎。
该类首先负责初始化分布式计算环境,通过 PyTorch 的 NCCL 后端建立进程组,利用共享内存(SharedMemory)和事件(Event)机制在不同 GPU 进程(Rank)之间实现指令同步与数据通信,确保主进程(Rank 0)能协调所有工作进程协同执行模型推理。在内存管理方面,它实现了显存的精细化控制,通过 allocate_kv_cache 方法根据剩余显存自动计算并预分配巨大的 Key-Value 缓存(KV Cache),并将这些物理显存块映射到模型的各个层中,配合 PagedAttention 机制(体现在 prepare_block_tables 中)来高效管理长序列生成的显存占用。
在推理执行层面,代码区分了“预填充”(Prefill)和“解码”(Decode)两个阶段,prepare_prefill 和 prepare_decode 方法分别负责将逻辑上的序列数据转换为模型所需的扁平化张量、位置编码以及块表映射。为了极致的推理速度,特别是在小 Batch 大小的解码阶段,该类引入了 CUDA Graph 技术,通过 capture_cudagraph 方法预先录制不同 Batch Size 下的计算图,并在 run_model 中根据情况选择直接回放(Replay)计算图以消除 CPU 调度开销,或者在预填充阶段使用传统的 Eager 模式执行。最后,run 方法将上述所有步骤串联,完成从输入准备、模型前向传播到采样(Sampler)输出 Token 的完整闭环。
def __init__(self, config: Config, rank: int, event: Event | list[Event]):
self.config = config
hf_config = config.hf_config
self.block_size = config.kvcache_block_size
self.enforce_eager = config.enforce_eager
self.world_size = config.tensor_parallel_size
self.rank = rank
self.event = event
dist.init_process_group("nccl", "tcp://localhost:2333", world_size=self.world_size, rank=rank)
torch.cuda.set_device(rank)
default_dtype = torch.get_default_dtype()
torch.set_default_dtype(hf_config.torch_dtype)
torch.set_default_device("cuda")
self.model = Qwen3ForCausalLM(hf_config)
load_model(self.model, config.model)
self.sampler = Sampler()
self.warmup_model()
self.allocate_kv_cache()
if not self.enforce_eager:
self.capture_cudagraph()
torch.set_default_device("cpu")
torch.set_default_dtype(default_dtype)
if self.world_size > 1:
if rank == 0:
self.shm = SharedMemory(name="nanovllm", create=True, size=2**20)
dist.barrier()
else:
dist.barrier()
self.shm = SharedMemory(name="nanovllm")
self.loop()
对于每个GPU都要初始化一个ModelRunner,参数是全局的config参数、该GPU分配到的rank,以及event
self.config = config
hf_config = config.hf_config
self.block_size = config.kvcache_block_size # 一个block块的大小
self.enforce_eager = config.enforce_eager
self.world_size = config.tensor_parallel_size # 张量并行的数量
self.rank = rank # 定义rank,即当前ModelRunner是第几个GPU创建的
self.event = event
这一段是初始化的代码,作用更多的是方便后面使用里面的参数
dist.init_process_group("nccl", "tcp://localhost:2333", world_size=self.world_size, rank=rank)
torch.cuda.set_device(rank)
这两行代码是分布式训练/推理的启动仪式。它们负责建立多 GPU 之间的通信连接,并将当前的 Python 进程绑定到指定的 GPU 硬件上。
-
dist.init_process_group("nccl", "tcp://localhost:2333", world_size=self.world_size, rank=rank)初始化分布式进程组,指定通信后端为NCCL,是 NVIDIA 专门为 GPU 设计的高性能通信库,握手地址为
tcp://localhost:2333,其中tcp表示使用TCP/IP进行初次连接,localhost表示单机多卡,2333是端口号,world_size是总进程数,即总GPU数。 -
torch.cuda.set_device(rank),绑定当前进程到特定的 GPU
default_dtype = torch.get_default_dtype()
torch.set_default_dtype(hf_config.torch_dtype)
torch.set_default_device("cuda")
这三行代码通过修改 PyTorch 的全局上下文设置,为即将进行的模型加载和初始化构建了一个临时的“自动化”环境。先保存一下当前 PyTorch 默认的浮点数类型,通常是float32,这是一种保护现场的机制,以便在模型加载结束后能将环境恢复原状;紧接着,它将全局默认的数据类型更改为模型配置中指定的格式(hf_config.torch_dtype),这通常是 float16 或 bfloat16,目的是让后续创建的所有模型参数张量自动采用半精度格式,从而大幅减少显存占用并加速计算;最后,它将默认的张量创建设备设置为 GPU (cuda),这意味着接下来的代码在初始化张量时,如果没有显式指定设备,都会直接在显存中分配内存,避免了先在 CPU 创建再搬运到 GPU 的冗余开销。
self.model = Qwen3ForCausalLM(hf_config)
load_model(self.model, config.model)
self.sampler = Sampler()
这三行代码按顺序完成了大模型推理引擎的核心构建过程,即“搭建骨架”、“注入灵魂”和“准备决策”。首先,代码利用 hf_config 中的结构参数(如层数、维度等)实例化了 Qwen3ForCausalLM 对象,这相当于在显存中构建了神经网络的物理架构和张量占位符,但此时模型内部还没有具体的训练数据;紧接着,load_model 函数根据配置文件中的路径,将磁盘上的预训练权重(weights)读取并填充到刚才创建的模型结构中,正式赋予模型处理语言的能力;最后,代码初始化了一个 Sampler(采样器)实例,该组件独立于繁重的模型计算之外,专门负责在模型输出预测概率(Logits)后,根据温度等参数从候选列表中“挑选”出最终的下一个 Token,从而完成从数学概率到具体文本生成的最后一步。
warmup_model()
self.warmup_model()
def warmup_model(self):
torch.cuda.empty_cache()
torch.cuda.reset_peak_memory_stats()
max_num_batched_tokens, max_model_len = self.config.max_num_batched_tokens, self.config.max_model_len
num_seqs = min(max_num_batched_tokens // max_model_len, self.config.max_num_seqs)
seqs = [Sequence([0] * max_model_len) for _ in range(num_seqs)]
self.run(seqs, True)
torch.cuda.empty_cache()
warmup_model 方法的核心目的是通过模拟一次极高负载的推理过程,来预热 GPU 并清理显存碎片,为后续正式的显存分配做准备。
torch.cuda.empty_cache()和torch.cuda.reset_peak_memory_stats()彻底清空当前未使用的显存缓存并重置统计数据,确保后续测量的显存峰值是准确的净值- 接着取出最大批处理的总 Token 数
max_num_batched_tokens和模型支持的最大序列长度max_model_len,通过这两个数据计算出来最大并发序列数num_seqs - 造出一个极限数据,即
num_seqs * max_model_len的极限批序列数据,扔到run()函数里面进行prefill,通过这种极限测试来,迫使 PyTorch 动态分配并初始化所有必要的内部缓冲区和 CUDA 上下文 - 最后执行
empty_cache(),释放掉刚才预热产生的大量临时张量,此时留下的显存峰值统计数据(peak memory stats)将反映模型运行时的真实最大开销,为后续allocate_kv_cache方法精确计算剩余可用显存提供了关键依据。
allocate_kv_cache()
self.allocate_kv_cache()
def allocate_kv_cache(self):
config = self.config
hf_config = config.hf_config
free, total = torch.cuda.mem_get_info()
used = total - free
peak = torch.cuda.memory_stats()["allocated_bytes.all.peak"]
current = torch.cuda.memory_stats()["allocated_bytes.all.current"]
num_kv_heads = hf_config.num_key_value_heads // self.world_size
head_dim = getattr(hf_config, "head_dim", hf_config.hidden_size // hf_config.num_attention_heads)
block_bytes = 2 * hf_config.num_hidden_layers * self.block_size * num_kv_heads * head_dim * hf_config.torch_dtype.itemsize
config.num_kvcache_blocks = int(total * config.gpu_memory_utilization - used - peak + current) // block_bytes
assert config.num_kvcache_blocks > 0
self.kv_cache = torch.empty(2, hf_config.num_hidden_layers, config.num_kvcache_blocks, self.block_size, num_kv_heads, head_dim)
layer_id = 0
for module in self.model.modules():
if hasattr(module, "k_cache") and hasattr(module, "v_cache"):
module.k_cache = self.kv_cache[0, layer_id]
module.v_cache = self.kv_cache[1, layer_id]
layer_id += 1
这段代码是实现大模型高效推理的关键步骤,主要作用是一次性计算并预分配 GPU 显存中的 Key-Value (KV) Cache,从而避免推理过程中频繁申请显存带来的性能开销和碎片化问题。
-
我们先通过
torch.cuda.mem_get_info()获取当前GPU的空闲内存和总内存,可以计算出来使用了多少内存 -
通过
torch.cuda.memory_stats()["allocated_bytes.all.peak"]可以获得刚刚压力测试过程中的显存占用的峰值,通过torch.cuda.memory_stats()["allocated_bytes.all.current"]可以获得当前显存大小 -
为了计算需要申请的block的数量,我们需要先计算可以用来分配给block的显存总量以及单个block的显存占用,二者做除法,即可得到可申请的block 的数量
-
我们用
total * config.gpu_memory_utilization - used - (peak - current),作为系统内可以分配给 block 的显存的总数,后面减去(peak-current)是为了预留动态显存空间,防止模型在推理过程中因为显存爆满而崩溃 -
而单个block的内存占用的计算方法是
2 * hf_config.num_hidden_layers * self.block_size * num_kv_heads * head_dim * hf_config.torch_dtype.itemsize- 2是因为kv cache需要保存k和v的值,
hf_config.num_hidden_layers是模型的层数,每个层都有注意力机制,都需要存kv cache block_size是每个block块的大小num_kv_heads是注意力头的大小,由于进行了张量并行,注意力头会被切分并分配到不同的显卡上并行计算,所以要通过hf_config.num_key_value_heads // self.world_size来计算得到单个卡上的注意力头的数量head_dim是每个头的维度hf_config.torch_dtype.itemsize是每一个数值在内存中实际占用的字节数
- 2是因为kv cache需要保存k和v的值,
-
有了block的数量以后,我们就可以通过
torch.empty()来申请kv cache的空间。kv_cache表面上看是一个num_kvcache_blocks * block_size的矩阵,但其实他是一个六维矩阵,形状是(2, hf_config.num_hidden_layers, config.num_kvcache_blocks, self.block_size, num_kv_heads, head_dim),顺序和上面计算单个block的内存占用的计算方法是一样的,只是在第三维插入了num_kvcache_blocks,第一维是KV的分片,第二维是模型的层数,block的数量,第三维是block的数量,第四维是block的大小,第五维是head的数量,第六维是每个head的维度 -
然后,我们通过遍历整个模型树,利用
hasattr检查哪些层(通常是每一层的 Self-Attention 模块)定义了k_cache和v_cache属性。一旦定位到目标层,它就会根据当前层的索引layer_id,从那块巨大的六维张量中提取出对应的切片——即self.kv_cache[0, layer_id]分配给键缓存,self.kv_cache[1, layer_id]分配给值缓存。这种操作本质上是 Python 对象的指针引用赋值,并没有发生实际的数据拷贝,因此执行速度极快。通过这种方式,原本分散在模型各层的注意力计算逻辑现在都指向了同一块预先管理好的显存池,确保了在推理过程中,模型可以直接在这些预定义的显存位置上读写 KV 缓存数据。
-
torch.set_default_device("cpu")
torch.set_default_dtype(default_dtype)
恢复环境PyTorch 默认的浮点数类型,以及重设设备为cpu
capture_cudagraph()
if not self.enforce_eager:
self.capture_cudagraph()
@torch.inference_mode()
def capture_cudagraph(self):
config = self.config
hf_config = config.hf_config
max_bs = min(self.config.max_num_seqs, 512)
max_num_blocks = (config.max_model_len + self.block_size - 1) // self.block_size
input_ids = torch.zeros(max_bs, dtype=torch.int64)
positions = torch.zeros(max_bs, dtype=torch.int64)
slot_mapping = torch.zeros(max_bs, dtype=torch.int32)
context_lens = torch.zeros(max_bs, dtype=torch.int32)
block_tables = torch.zeros(max_bs, max_num_blocks, dtype=torch.int32)
outputs = torch.zeros(max_bs, hf_config.hidden_size)
self.graph_bs = [1, 2, 4, 8] + list(range(16, max_bs + 1, 16))
self.graphs = {}
self.graph_pool = None
for bs in reversed(self.graph_bs):
graph = torch.cuda.CUDAGraph()
set_context(False, slot_mapping=slot_mapping[:bs], context_lens=context_lens[:bs], block_tables=block_tables[:bs])
outputs[:bs] = self.model(input_ids[:bs], positions[:bs]) # warmup
with torch.cuda.graph(graph, self.graph_pool):
outputs[:bs] = self.model(input_ids[:bs], positions[:bs]) # capture
if self.graph_pool is None:
self.graph_pool = graph.pool()
self.graphs[bs] = graph
torch.cuda.synchronize()
reset_context()
self.graph_vars = dict(
input_ids=input_ids,
positions=positions,
slot_mapping=slot_mapping,
context_lens=context_lens,
block_tables=block_tables,
outputs=outputs,
)
为什么需要 CUDA Graph?
在 LLM 推理等小算子、高频次的场景中,CPU 逐个调度任务的开销往往比 GPU 实际计算的时间还要长,导致 GPU 大量空闲等待;CUDA Graph 通过将一系列 GPU 操作“录制”为静态图,在执行时只需一次 CPU 指令即可驱动整个计算流程,从而彻底消除 CPU 调度瓶颈,填满 GPU 流水线,显著降低推理延迟。
下面先初始化一些变量
@torch.inference_mode() # 禁用梯度计算,节省显存并加速
def capture_cudagraph(self):
config = self.config
hf_config = config.hf_config
# 设定最大 Batch Size,为了安全起见限制在 512 以内
max_bs = min(self.config.max_num_seqs, 512)
# 计算单个序列 KV Cache 所需的最大 block 数量
max_num_blocks = (config.max_model_len + self.block_size - 1) // self.block_size
接下来是关键的一步:CUDA Graph 要求输入和输出的内存地址是固定的。因此,代码预先分配了一组全零的张量(Tensors)作为“静态缓冲区”。
# 分配静态输入/输出 Tensor,它们将驻留在 GPU 上
input_ids = torch.zeros(max_bs, dtype=torch.int64)
positions = torch.zeros(max_bs, dtype=torch.int64)
slot_mapping = torch.zeros(max_bs, dtype=torch.int32)
context_lens = torch.zeros(max_bs, dtype=torch.int32)
block_tables = torch.zeros(max_bs, max_num_blocks, dtype=torch.int32)
outputs = torch.zeros(max_bs, hf_config.hidden_size)
torch.zeros(),PyTorch 会在内存中申请一块空间,并明确地将所有位置都填充为 0。
以后在推理时,我们不能直接传新 Tensor 给模型,而是必须把数据 copy_ 到这些 input_ids 等静态 Tensor 中,然后重放 Graph。
假设 max_bs 是最大批次大小,max_num_blocks 是为每个序列分配的最大内存块数量。
为什么
input_ids和positions是int64?而slot_mapping,context_lens,block_tables是int32?
因为PyTorch 的核心层 nn.Embedding 以及很多官方算子,强制要求输入的索引张量必须是 LongTensor (即 int64)。而另外三个不是给 PyTorch 标准层用的,而是传给 vLLM 自定义的 CUDA Kernel(例如 PagedAttention 算子)用的,这些元数据在推理过程中会被高频读取,使用int32相比于int64可以节省一半的显存读取,节省带宽。而且int32大概是2e9,显存再大,物理块的数量也不可能超过 21 亿,context_lens和block_tables也是一样。
| 变量名 | 形状 (Shape) | 维度含义解析 | 作用 |
|---|---|---|---|
input_ids |
(max_bs,) |
输入 Token ID。每一行代表当前 Batch 中每个序列正在处理的那个 Token。 | 输入token |
positions |
(max_bs,) |
位置索引。对应每个 Token 在其原始序列中的绝对位置(用于 Position Embedding)。 | 它决定了 RoPE(旋转位置编码) 的旋转角度。只有知道了位置,模型才能分清“我喜欢你”和“你喜欢我”的区别。 |
slot_mapping |
(max_bs,) |
槽位映射。指示当前 Token 应该存储在物理 KV Cache 内存池中的哪个具体位置。 | 表示每个 token 在 KV Cache 池中的物理存储位置(slot 索引),用于 CUDA kernel 将新计算的 K/V 写入正确的缓存位置。 |
context_lens |
(max_bs,) |
有效长度。记录每个序列到目前为止总共拥有多少个有效的 Token | 它是推理时的“边界守护者”,告诉模型在计算注意力时,应该回溯查看多少长度的 KV Cache。 |
block_tables |
(max_bs, max_num_blocks) |
物理块表。每一行映射了一个序列所占用的所有不连续内存块的编号。 | 显存页表,由于模型在计算 Attention 时,它需要回头看过去所有的历史记录(Key/Value),所以需要一个页表记录每个序列的kv cache存在哪些块里。 |
outputs |
(max_bs, hidden_size) |
隐层输出。保存模型最后一层计算出的向量,准备进行 Logits 映射。 | 输出token |
接下来定义 Batch Size 策略
# 定义需要捕获的 Batch Size 列表
# 小 BS:1, 2, 4, 8 (不仅是为了速度,也是为了精确匹配)
# 大 BS:16, 32, 48... 直到 max_bs (以 16 为步长)
self.graph_bs = [1, 2, 4, 8] + list(range(16, max_bs + 1, 16))
self.graphs = {}
self.graph_pool = None # 用于共享内存池
为什么要分这么多size?
我们不可能为 1 到 512 的每个 batch size 都录制一个图(那样显存会爆炸)。通常的做法是“向上取整”:如果你来了 12 个请求,就用 batch size = 16 的图来跑,多余的槽位填 padding。
接下来为不同 batch size 预先录制 CUDA Graph,从而在后续推理时直接 replay 图,避免 Python 调度和 CUDA kernel launch 的开销
for bs in reversed(self.graph_bs):
graph = torch.cuda.CUDAGraph() # 这是一个录制器,用来记录接下来所有的 CUDA kernel 调用
# 1. 设置上下文 (Set Context)
# 这通常用于 PagedAttention 等自定义算子,告诉它们当前只处理前 bs 个数据
set_context(False, slot_mapping=slot_mapping[:bs], context_lens=context_lens[:bs], block_tables=block_tables[:bs])
# 2. Warmup (预热)
# 在录制前必须先运行一次。这是为了让 PyTorch/CUDA 内部完成 lazy initialization(懒加载),
# 确保所有显存分配和 buffer 初始化都已完成,避免录制到内存分配操作。
outputs[:bs] = self.model(input_ids[:bs], positions[:bs])
# 3. 开启录制 (Capture)
# self.graph_pool 用于在不同的 Graph 之间共享显存,极大节省内存占用。
with torch.cuda.graph(graph, self.graph_pool):
outputs[:bs] = self.model(input_ids[:bs], positions[:bs])
# 4. 内存池管理
# 如果是第一次循环(最大的 Batch Size),获取其内存池,供后续较小的 Graph 复用。
if self.graph_pool is None:
self.graph_pool = graph.pool()
# 5. 保存图并清理
self.graphs[bs] = graph
torch.cuda.synchronize() # 等待 GPU 完成
reset_context() # 清理上下文
为什么要倒序遍历batch size,即从大到小?
因为 CUDA Graph capture 过程中会分配 GPU 内存,如果从大到小录制,可以:先为最大 bs 分配最大需要的内存,后面小 bs 可以复用 graph memory pool,可以减少碎片和额外分配
为什么需要内存池?为什么需要共享的内存池?
在大模型的一趟前向传播中,每一层都会产生大量的中间临时变量。在普通模式下,PyTorch 会在算到这一层时,临时向系统申请一块显存(cudaMalloc)来装这些中间变量,算完再释放。 但是,动态申请显存非常耗时。CUDA Graph 为了追求极致的速度,严禁在运行期间进行任何动态内存分配。 因此,在录制 Graph 之前,系统必须提前一次性申请好一块足够大的连续显存作为“专属草稿纸”。这块提前划定好的草稿纸,就是内存池。有了它,Graph 才能把所有中间变量的地址当成常量“死死地刻在”指令里。
因为 CUDA Graph 会将所需地址永久锁定,如果不共享内存池,录制多个不同 Batch Size 的图会导致每一张图都霸占一份专属的临时显存,最终令显存占用随图的数量成倍爆炸;而引入共享内存池(graph_pool)后,系统只需按照最大尺寸的图申请唯一一块显存,巧妙利用大模型推理任务严格串行执行且中间激活值“朝生夕死”的特性,让所有不同尺寸的图轮流在这张相同的“临时草稿纸”上安全地覆盖擦写,从而在维持硬件级极速调度的同时,将底层的显存开销压缩到了绝对的最低极限。
内存池里究竟放什么东西, 不放什么东西?
在一次推理过程中,我们会产生一些临时的tensor,比如
Q = linear(x)
K = linear(x)
V = linear(x)
attn_out = softmax(QK^T)V
mlp_out = ...
residual = ...
这些中间 tensor 的生命周期只在一次 forward 内,下次 forward 可以覆盖,且不需要跨 step 保存,我们会把他放在memory pool中。
而对于KV cache、input_ids、slot_mapping、position_ids、block_tables、模型参数等,是不会存在graph memory pool的
我们的GPU显存可以抽象成这样:
┌────────────────────────────┐
│ 模型权重 (长期存在) │
├────────────────────────────┤
│ KV Cache (长期存在) │
├────────────────────────────┤
│ slot_mapping 等输入张量 │
├────────────────────────────┤
│ Graph Pool (长期存在) │ ← 只放中间 tensor
│ Q tensor │
│ K tensor │
│ V tensor │
│ attn_out │
│ mlp_out │
│ ... │
└────────────────────────────┘
graph 为什么要固定地址?
因为“固定地址”是让 GPU 能够“脱离 CPU 独立全速运行”的唯一代价。
CUDA Graph 必须固定显存地址,是为了彻底消除动态内存分配(cudaMalloc)的高昂开销,并实现 GPU 的纯硬件级自主调度。在 Graph 的录制阶段,底层驱动会将所有算子的输入、输出及中间变量的显存指针直接作为常量“硬编码”到预编译的执行蓝图中。只有将这些“数据交互的仓库位置”完全固定死,GPU 在实际重放时才能彻底脱离 CPU 的干预,像自动化流水线一样:前一个算子算完直接塞进固定地址,后一个算子毫无延迟地从该地址读取,从而实现微秒级的无缝衔接与极致推理性能。
graph 什么时候被释放?
Graph 被保存在了一个字典里:self.graphs[bs] = graph。 只要包含这个字典的实例对象ModelRunner还存活,Graph 就一直活着。在像 vLLM 这样的实际工业应用中,这些 Graph 几乎永远不会被主动释放。
所以说,只要服务还在运行,这块显存就一直被死死霸占着吗,即使目前没有任何请求也不会被释放,原因主要是
- 为了数据的绝对安全,CUDA Graph 在录制的时候,已经把显存地址(比如
0x1A2B)刻在了底层的机器指令里。它在运行时是直接往0x1A2B这个地址读写数据的。如果把他释放掉了,且被别的东西申请了,那调用graph的时候会该地址直接写东西影响别的数据,造成数据污染 - 为了机制的速度,一直把地圈着,就省去了申请和释放空间的时间,用空间换时间
为什么可以共享graph pool
-
降序录制带来的“空间包容”
有较小 Batch Size 的图,在物理内存上完全内含于最大的那个图。因为大图能装下,小图就绝对不会发生内存越界。
-
执行机制的“时间互斥”
如果 BS=128 和 BS=64 的图在 GPU 上同时运行,共享内存池瞬间就会发生灾难性的数据踩踏。但大模型推理引擎的设计避免了这一点。
- 对于同一个模型实例(同一个 Worker),推理引擎的调度器(Scheduler)是严格串行处理请求的。
- 在任何一个微秒级的物理时间点上,GPU 的这个 CUDA Stream 中绝对只有一个 Graph 在运行。
-
池内数据的“绝对无状态”
在大模型推理中,需要跨时间步保留的“有状态数据”只有两样:
- 模型权重 (Weights):只读,不在 Pool 里。
- KV Cache:记录用户历史对话,必须保留,所以它们存在由
block_tables动态管理的专门显存区里,绝对不在 Pool 里。
共享 Pool 全是纯粹的中间激活值。比如:
- 第 1 层 Transformer 算完后准备传给第 2 层的隐藏状态矩阵。
- Softmax 计算时的临时分母。
这些数据有一个共同特点:朝生夕死。一旦这一次前向传播(即这个 Graph 的执行)结束,它们就彻底变成了电子垃圾。当下一个 Graph(哪怕是不同 BS 的图)被启动时,直接用新数据覆盖这些垃圾,不仅毫无影响,反而省去了清理内存的时间。
self.graph_vars = dict(
input_ids=input_ids,
positions=positions,
slot_mapping=slot_mapping,
context_lens=context_lens,
block_tables=block_tables,
outputs=outputs,
)
这是一个字典,为了少传一些参数,我们用一个字典来方便传递参数。我们可以把计算图抽象成一个黑盒测试,他知道去哪里读输入数据,把输出数据写到哪里,因为这些地址都在录制的过程中写好了,所以每次推理的时候,你就需要把本次输入的数据复制到对应地址就可以
比如,你拿着用户新输入的 Token(比如 new_input_ids),把它填进那个早就固定好地址的池子里:
# 把新数据复制到静态张量对应的切片中
graph_vars["input_ids"][:bs] = input_ids
graph_vars["positions"][:bs] = positions
# ... 其他变量同理
关键点:这里切片替换就是“原地替换”(In-place)
# 找到对应 Batch Size 的那盘录像带,直接执行!
self.graphs[bs].replay()
此时,CPU 瞬间“下班”(开销降到极低),GPU 的硬件调度器接管一切,闭着眼睛根据写死的地址一顿狂算,把临时草稿全部扔进 graph_pool 里复写。
# 结果已经乖乖躺在静态的 outputs 槽位里了,直接按需要的长度取走
final_logits = self.graph_vars["outputs"][:bs]
exit()
def exit(self):
if self.world_size > 1 :
self.shm.close() # 断开当前GPU进程和系统共享内存的连接
dist.barrier() # 分布式同步路障。它强制要求所有的 GPU 进程都执行完 close() 操作后,才能继续往下走。这是为了防止有的卡还在读写数据,系统就把内存给扬了。
if self.rank == 0:
self.shm.unlink() # 系统共享内存是0号主进程创建的,所以后面也由它来释放这块内存
if not self.enforce_eager: # 如果有计算图,那也要释放相关的内存
def self.graphs, self.graphl pool
torch.cuda.synchronize() # 它强制要求 CPU 停下当前的脚步,乖乖站在原地等待,直到 GPU 把之前交办的所有任务全部彻底执行完毕。
dist.destroy_process_group() # 解散通信局。彻底销毁 PyTorch 底层的分布式进程组(通常是 NCCL 后端)。释放底层的网络端口、PCIe 通信锁和显卡间的 NVLink 资源。
当你的推理服务需要重启、关闭,或者模型运行结束时,系统必须优雅地释放它所霸占的所有系统级资源(包括多卡通信资源、共享内存、以及 CUDA Graph 显存)。如果直接暴力杀进程,可能会导致显存泄漏或僵尸进程。
基于共享内存的轻量、低延迟的分布式执行引擎(主从协作机制)
if self.world_size > 1:
if rank == 0:
self.shm = SharedMemory(name="nanovllm", create=True, size=2**20)
dist.barrier()
else:
dist.barrier()
self.shm = SharedMemory(name="nanovllm")
self.loop()
这段代码的核心目的是在多显卡并行计算的环境下,构建一个基于共享内存的“主从协作机制”,并利用同步屏障确保初始化的安全顺序。
首先,代码通过判断 world_size 是否大于 1 来确认当前是否处于分布式多卡模式。如果是,它会根据当前进程的 rank(身份 ID)将进程严格分为两类:“指挥官”(Rank 0)和“执行者”(Rank > 0)。
对于指挥官 Rank 0 来说,它的首要任务是“铺路”。它调用 SharedMemory(..., create=True) 在系统内存中开辟一块名为 "nanovllm" 的共享区域,这块区域将作为后续 CPU 之间传递控制信号的高速通道。创建完成后,它立刻执行 dist.barrier(),这是一个至关重要的同步路障。Rank 0 停在这里的含义是:“我已经把共享内存建好了,但我必须等所有人都准备好连接它,我才能继续下一步。”
对于执行者 Rank > 0(即其他所有 GPU 进程)来说,它们的逻辑则是先“等待”。它们第一行就执行 dist.barrier(),这保证了它们绝对不会在 Rank 0 完成创建之前尝试去读取内存,从而避免了“文件不存在”的崩溃错误。当所有进程都到达这个路障时,系统确认共享内存已就绪,于是同时放行。此时,执行者们通过 SharedMemory 连接到那块现成的内存区域,随后直接调用 self.loop() 进入一个死循环。这意味着这些从属进程将彻底变成纯粹的计算节点,在这个循环里不断读取指令并执行,永远不会从初始化函数中返回,而 Rank 0 则会从函数返回,继续执行主程序逻辑去接收外部请求。
Evnet
如果用传统的网络请求(比如 HTTP 或 gRPC)来下达计算指令,延迟太高了。为了解决这个问题,我们采用了Event(事件对象),一个由操作系统底层维护的、用于在不同线程或进程之间传递状态信号的“全局布尔值(True/False)。
Event 内部封装了一个状态标志(Flag),初始值默认为 False。所有的操作都是围绕这一个标志展开的:
- event.wait(),当代码执行到这里时,如果内部标志为
False,当前线程/进程会被立刻剥夺执行权,进入休眠(阻塞)状态。直到某个进程调用了event.set(),把内部状态改成True,该进程就被唤醒了 - event.set(),把内部状态强行改成
True,这是一个广播(Broadcast)操作。一旦调用,操作系统会立刻唤醒所有正在因为wait()而休眠的线程/进程,让它们同时跨过wait()的阻拦,继续向下执行。 - event.clear(),将内部标志强行恢复为
False,通常由被唤醒的进程在获取信号后调用,用于将系统恢复到初始拦截状态,以便迎接下一轮的wait()。
1. call(self, method_name, *args):总指挥部的执行入口
这是整个通信框架的入口。无论是主进程还是工作进程,最终执行某个方法都会走到这里。
def call(self, method_name, *args):
# 如果当前是主进程 (rank 0),并且有多个 GPU (world_size > 1)
if self.world_size > 1 and self.rank == 0:
# 主进程自己先不干活,先把命令通过共享内存广播给所有小弟
self.write_shm(method_name, *args)
# 根据字符串 method_name 反射拿到真正的函数对象
method = getattr(self, method_name, None)
# 真正执行这个函数!(主进程和小弟们都会执行到这一步)
return method(*args)
逻辑:当外界让主进程执行(比如)forward(inputs) 时,主进程会先调用 write_shm 告诉所有小弟:“大家准备一起跑 forward 啦!”,然后主进程自己才去执行 forward。
2. write_shm(self, method_name, *args):主进程的“大喇叭”
这个方法只有主进程 (rank == 0) 有权调用。它的作用是把指令和参数打包,塞进共享内存。
def write_shm(self, method_name, *args):
assert self.world_size > 1 and self.rank == 0
# 1. 序列化:把函数名和参数打包成二进制字节流 (bytes)
data = pickle.dumps([method_name, *args])
n = len(data) # 计算这段数据有多长
# 2. 写消息头 (Header):把数据长度 n 转换成 4 个字节,写在共享内存的最前头
self.shm.buf[0:4] = n.to_bytes(4, "little")
# 3. 写消息体 (Body):紧接着长度后面,把真正的数据塞进去
self.shm.buf[4:n+4] = data
# 4. 这里的event是一个列表,存的是所有并行的GPU进程的event,event.set()和event.wait()是相反的,set是告诉他们可以执行了,wait是要一直等待,等到接收到set信号
for event in self.event:
event.set()
底层细节:这里设计了一个非常经典的 [4字节长度] + [实际数据] 的通信协议(TLV 结构的一种变体)。小弟们读取时,必须先读前 4 个字节,才知道后面跟着的数据有多长。
3. loop(self):工作进程的“待机死循环”
这是工作进程 (rank > 0) 启动后唯一在做的事情。它们就像站在流水线旁的工人,一直盯着大喇叭。
def loop(self):
while True: # 死循环,一直挂起待机
# 阻塞等待,直到从共享内存里读到主进程发来的指令
method_name, args = self.read_shm()
# 拿到指令后,立刻调用自己的 call 方法去干活
self.call(method_name, *args)
# 如果主进程发来的是 "exit" 指令,打破死循环,准备关机下班
if method_name == "exit":
break
4. read_shm(self):工作进程的“收音机”
这个方法只有工作进程 (rank > 0) 会调用。负责从共享内存里把数据挖出来。
def read_shm(self):
assert self.world_size > 1 and self.rank > 0
# 1. 挂起等待:程序走到这里会卡住睡眠!直到主进程调用了 event.set() 才会醒来往下走。
self.event.wait()
# 2. 醒来后,先读前 4 个字节,解析出数据的总长度 n
n = int.from_bytes(self.shm.buf[0:4], "little")
# 3. 根据长度 n,把后面的二进制数据挖出来,反序列化成 Python 对象
method_name, *args = pickle.loads(self.shm.buf[4:n+4])
# 4. 阅后即焚:把自己的状态灯重新变回“红灯”,方便下一次继续 wait() 沉睡等待
self.event.clear()
return method_name, args