从0开始复现nano-vllm「engine/sequence」

 

engine/sequence.py

单个请求进来以后被封存成Sequence对象,这个函数是定义相关Sequence对象的

from copy import copy
from enum import Enum, auto
from itertools import count

from my-nano-vllm.sampling_params import SamplingParams


class SequenceStatus(Enum):
    WAITING = auto()
    RUNNING = auto()
    FINISHED = auto()


class Sequence:
    block_size = 256
    counter = count()

    def __init__(self, token_ids : list[int], sampling_params = SamplingParams()):
        self.seq_id = next(Sequence.counter)
        self.status = SequenceStatus.WAITING
        self.token_ids = copy(token_ids)
        self.last_token = self.token_ids[-1]
        self.num_tokens = len(self.token_ids)
        self.num_prompt_tokens = len(self.token_ids)
        self.num_cached_tokens = 0
        self.block_table = []
        self.temperature = sampling_params.temperature
        self.max_tokens = sampling_params.max_tokens
        self.ignore_eos = sampling_params.ignore_eos

    def __len__(self):
        return self.num_tokens
    
    def __getitem__(self, key):
        return self.token_ids[key]
    
    @property
    def is_finished(self):
        return self.status == SequenceStatus.FINISHED
    
    @property
    def num_completion_tokens(self):
        return self.num_tokens - self.num_prompt_tokens
    
    @property
    def promopt_token_ids(self):
        return self.token_ids[:self.num_prompt_tokens]
    
    @property
    def completion_token_ids(self):
        return self.token_ids[self.num_prompt_tokens:]
    
    @property
    def num_cached_blocks(self):
        return self.num_cached_tokens // self.block_size
    
    @property
    def num_blocks(self):
        return (self.num_tokens + self.block_size - 1) // self.block_size
    
    @property
    def last_block_num_tokens(self):
        return self.num_tokens - (self.num_blocks - 1) * self.block_size
    
    def block(self, i):
        assert 0 <= i <= self.num_blocks
        return self.token_ids[i * self.block_size : (i + 1) * self.block_size]
    
    def append_token(self, token_id : int):
        self.token_ids.append(token_id)
        self.num_tokens += 1
        self.last_token = token_id

    def __getstate__(self):
        return (self.num_tokens, self.num_prompt_tokens, self.num_cached_tokens, self.block_table, 
                self.token_ids if self.num_completion_tokens == 0 else self.last_token)
    
    def __setstate__(self, state):
        self.num_tokens, self.num_prompt_tokens, self.num_cached_tokens, self.block_table = state[:-1]
        if self.num_completion_tokens == 0:
            self.token_ids = state[-1]
        else:
            self.last_token = state[-1]

SequenceStatus是三种序列的三种生命周期状态,

  • WAITING等待
  • RUNNING运行中
  • FINISHED结束

Sequence序列,比较关键的一个类,用来描述一个请求进来以后被封装的状态

初始化

def __init__(self, token_ids : list[int], sampling_params = SamplingParams()):
    self.seq_id = next(Sequence.counter)
    self.status = SequenceStatus.WAITING
    self.token_ids = copy(token_ids)
    self.last_token = self.token_ids[-1]
    self.num_tokens = len(self.token_ids)
    self.num_prompt_tokens = len(self.token_ids)
    self.num_cached_tokens = 0
    self.block_table = []
    self.temperature = sampling_params.temperature
    self.max_tokens = sampling_params.max_tokens
    self.ignore_eos = sampling_params.ignore_eos
  • seq_id,用于标识每一个序列的id,唯一标识
  • status,用于标识序列当前的状态
  • token_ids,存输入和生成的所有token
  • last_token,存最后一个token,用于生成下一个
  • num_tokens,当前所有token总数,每生成一个token会实时更新数量
  • num_prompt_tokens,输入的token总数,不会变化
  • num_cached_tokens,已经进行kv cache缓存的token数量
  • block_table,核心属性,这是一个物理块ID的列表,记录了该序列的KV Cache存储在GPU的哪些显存块里
  • temperaturemax_tokensignore_eos上述介绍过了,不过多赘述
def __len__(self):
    return self.num_tokens

def __getitem__(self, key):
    return self.token_ids[key]

两个魔术方法

__len__可以获取当前总token数

__getitem__可以获取指定下标的token

@property
def is_finished(self):
    return self.status == SequenceStatus.FINISHED

@property
def num_completion_tokens(self):
    return self.num_tokens - self.num_prompt_tokens

@property
def promopt_token_ids(self):
    return self.token_ids[:self.num_prompt_tokens]

@property
def completion_token_ids(self):
    return self.token_ids[self.num_prompt_tokens:]

@property
def num_cached_blocks(self):
    return self.num_cached_tokens // self.block_size

@property
def num_blocks(self):
    return (self.num_tokens + self.block_size - 1) // self.block_size

@property
def last_block_num_tokens(self):
    return self.num_tokens - (self.num_blocks - 1) * self.block_size

属性计算方法,用@property装饰器,提供了实时计算的状态,不需要手动更新变量

  • is_finished,序列是否生成完成
  • num_completion_tokens,生成的token的数量,用num_tokens减去num_prompt_tokens
  • prompt_token_ids,获取prompt的token列表,使用python的切片语法
  • completion_token_ids,获取生成的所有token列表,使用python的切片语法
  • num_cached_blocks,计算目前已经有多少个完整的块已经写入缓存(下取整)
  • num_blocks,计算目前缓存所有token需要的KV cache 块的数量(上取整)
  • last_blcok_num_tokens,计算最后一个块里目前填了多少token(如果满了,下次就要申请新的块)
    • 这里的计算方法是用self.num_tokens - (self.num_blocks - 1) * self.block_size
    • 思考一下为什么不用取余?self.num_tokens % self.block_size
      • 为了避免0带来的歧义,0%256 = 256 % 256 = 0,这两者都是0,而前者不需要申请新块,后者却需要,其次速度上取余数也会慢一点
def block(self, i):
    assert 0 <= i <= self.num_blocks
    return self.token_ids[i * self.block_size : (i + 1) * self.block_size]

def append_token(self, token_id : int):
    self.token_ids.append(token_id)
    self.num_tokens += 1
    self.last_token = token_id

功能函数

  • block(i)根据索引获取第i个块对应的token子列表
  • append_token(token_id),当模型预测出一个新词时调用,同步更新token_idsnum_tokenslast_token
def __getstate__(self):
    return (self.num_tokens, self.num_prompt_tokens, self.num_cached_tokens, self.block_table, 
            self.token_ids if self.num_completion_tokens == 0 else self.last_token)

def __setstate__(self, state):
    self.num_tokens, self.num_prompt_tokens, self.num_cached_tokens, self.block_table = state[:-1]
    if self.num_completion_tokens == 0:
        self.token_ids = state[-1]
    else:
        self.last_token = state[-1]

这两个魔术方法构成了一套针对分布式推理场景优化的“数据打包与还原”机制

  • getstate,智能压缩打包,只带走必须带走的东西
    • num_tokensnum_promot_tokensnum_cached_tokensblock_table这四个是序列的元数据,提及很小但至关重要,需要传输
    • 如果模型还在Prefill阶段,也就是没生成token,那就需要打包完整的token_ids
    • 否则,仅需要打包最后一个token即可,因为推理是逐个token生成的,前面的token计算的KV Cache已经存好了,只需要最后一个token即可计算出下一个token,所以我们不需要传输整个列表
  • setstate,按需恢复,这个函数在接收端被调用,负责把收到的“元组”还原回对象属性。

 

Q: 为什么不把num_tokens做成一个属性计算方法,每次需要他的时候直接求len(self.token_ids)

A:为了解决分布式计算中列表丢失的问题,就是上述的getstatesetstate,会导致分布式计算的时候不会永远存储token_ids,所以也就没法利用len函数求长度,我们仅通过维护一个int 变量即可以达到目的,还可以节省传输带宽,何乐而不为呢?而且,访问一个属性的速度肯定会比调用一个@property的方法要快

 

博客内容均系原创,未经允许严禁转载!
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇