从0开始复现nano-vllm「scheduler.py」

Scheduler

Scheduler是一个推理调度器,,其核心功能是协调序列在等待队列(waiting)和运行队列(running)之间的流转,并根据显存可用性决定是执行新序列的预填充(Prefill)还是现有序列的解码(Decoding)。

在调度逻辑中,系统优先执行预填充阶段,尝试将等待队列中的序列移入运行队列,前提是满足最大并发序列数、最大批处理Token数以及KV Cache显存块充裕这三个条件;若无法进行预填充,则进入解码阶段,调度器会遍历运行队列并为每个序列申请新Token所需的显存槽位,如果此时显存不足,代码执行激进的抢占策略(preempt),即强制将运行队列末尾的序列挂起、释放其占用的全部显存块并放回等待队列头部,从而为当前亟需显存的序列腾出空间。

def __init__(self, config: Config):
    self.max_num_seqs = config.max_num_seqs # 最大并发数量,即可以同时运行的sequence的最大数量
    self.max_num_batched_tokens = config.max_num_batched_tokens # 能接受的最大token数量
    self.eos = config.eos # 输出的停止符
    self.block_manager = BlockManager(config.num_kvcache_blocks, config.kvcache_block_size) # Block管理器
    self.waiting: deque[Sequence] = deque() # waiting队列
    self.running: deque[Sequence] = deque() # runnig队列
    
# 检查模型是否完成生成
def is_finished(self): 
    return not self.waiting and not self.running

# 添加新的seq
def add(self, seq: Sequence):
    self.waiting.append(seq)

 

def preempt(self, seq: Sequence): # 抢占逻辑
    seq.status = SequenceStatus.WAITING # 把被抢占的序列的状态改成waiting
    self.block_manager.deallocate(seq) # 先释放占用的资源
    self.waiting.appendleft(seq) # 再添加到waiting队列,注意是添加到队顶,被抢占的优先级要高于没被抢占的

此函数 preempt 实现了推理引擎中的抢占机制,用于在显存资源紧张(如没有足够的空闲块来继续生成当前所有序列)时,强制“暂停”选定的低优先级序列以避免系统崩溃。

它首先将目标序列的状态标记为“等待中”(WAITING),接着立即调用块管理器释放该序列所占据的所有物理显存块资源,将宝贵的显存归还给系统池。最后,它将这个被暂停的序列插入到等待队列的最前端(队首),这一策略确保了该序列在资源恢复时拥有比新来请求更高的调度优先级,能够第一时间被重新加载和继续生成,从而最大程度地减少被抢占用户的额外等待时间。

# 对于
def postprocess(self, seqs: list[Sequence], token_ids: list[int]) -> list[bool]:
    for seq, token_id in zip(seqs, token_ids):
        seq.append_token(token_id) # 把新生成的token扔到seq里面
        # 停止的两个逻辑
        # 1.seq有停止符的逻辑,且当前生成的token正好是停止符
        # 2.生成的token数量到达max_tokens的上限
        if (not seq.ignore_eos and token_id == self.eos) or seq.num_completion_tokens == seq.max_tokens:
            seq.status = SequenceStatus.FINISHED # 更新seq的状态
            self.block_manager.deallocate(seq) # 释放资源
            self.running.remove(seq) # 从runnig队列里移除

这个函数 postprocess 的主要作用是在模型完成一轮推理生成了新的 Token 之后,对当前的序列批次进行状态更新生命周期管理

具体来说,它遍历当前批次中的每一个序列和其对应的生成结果,首先将新生成的 Token 追加到序列的记录中,随后立即检查该序列是否满足终止条件(例如生成了结束符 EOS 或者达到了设定的最大生成长度)。一旦判定某个序列已经完成生成,函数会将其状态标记为“已完成”(FINISHED),并调用块管理器释放该序列占用的所有物理内存块资源,最后将其从活跃运行队列中移除,从而确保推理引擎能及时回收显存资源以服务新的请求。

def schedule(self) -> tuple[list[Sequence], bool]:
    # prefill
    scheduled_seqs = []
    num_seqs = 0
    num_batched_tokens = 0
    while self.waiting and num_seqs < self.max_num_seqs: # 如果waiting队列里有序列,且没到达最大并发上限
        seq = self.waiting[0] # 取队头,这里不用popleft()的原因是,需要先判断一下当前这个能否成功申请
        if num_batched_tokens + len(seq) > self.max_num_batched_tokens or not self.block_manager.can_allocate(seq): # 判断本次推理的token数量是否超标以及是否能申请下来空间
            break
        self.waiting.popleft() # 从waiting队列中扔出去
        self.running.append(seq) # 加入到runnig队列
        num_seqs += 1 # 更新并发数量
        seq.status = SequenceStatus.RUNNING # 更新seq状态
        self.block_manager.allocate(seq) # 申请空间
        num_batched_tokens += len(seq) - seq.num_cached_tokens # 更新cache的token数量,由于存在prefix caching,所以要减去seq.num_cached_tokens
        scheduled_seqs.append(seq) # 放进本次的调度列表里
    if scheduled_seqs:
        return scheduled_seqs, True # True代表本次是prefill

    # decode
    while self.running and num_seqs < self.max_num_seqs: # 如果runnig队列里有东西,且没达到并发上限
        seq = self.running.popleft() # 取队头出来
        while not self.block_manager.can_append(seq): # 一直试能否申请下来所需的空间,如果不能,就需要抢占机制
            if self.running: # 优先释放runnig队列的队尾seq资源
                self.preempt(self.running.pop())
            else : # 如果runnig队列是空的,那只能释放自己的资源并结束循环
                self.preempt(seq)
                break
        else: # 可以申请下来空间的话
            num_seqs += 1 # 更新本次推理的seq的数量
            self.block_manager.may_append(seq) # may_append 正式锁定资源。它确保了当这个 seq 被放入 scheduled_seqs 送去 GPU 计算时,它绝对拥有存储下一个 Token KV Cache 所需的物理显存空间。
            scheduled_seqs.append(seq) # 放进本次的调度列表里
    assert scheduled_seqs
    self.running.extendleft(reversed(scheduled_seqs)) #把seq重新放回双端队列里面,按原顺序
    return scheduled_seqs, False # False代表本次是decode

  • num_batched_tokens += len(seq) - seq.num_cached_tokens由于我们采用了prefix caching的机制,给每个seq申请空间的时候,会走前缀查询,有一些相同前缀的token是可以共享的,我们不需要重新申请空间去存他们的数值,所以计算num_batched_tokens的时候要扣掉这部分共享的token的数量
  • self.block_manager.may_append(seq) 对于这个代码,他的目的是防止OOM

    比如现在有一个空闲的block,序列A检查发现他需要一个新的block来存生成的token,他检查一下发现有一个空闲位置,假设A不申请这个block,序列B检查发现他也需要一个新的block来存新生成的token,他检查一下发现也有一个空闲位置,同样的,B也不申请这个block,实际运行的时候,A和B都放到GPU上跑,都生成了一个新的token,此时需要两个block,但是只有一个blcok,就炸了。

  • self.running.extendleft(reversed(scheduled_seqs))是因为对于每次decode,每个序列都只会生成一个token,所以没到输出停止符或者到输出上限,就需要一直在running 队列里,此时我们需要把序列按顺序重新塞回running队列的头部,就需要这样操作
博客内容均系原创,未经允许严禁转载!
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇