engine/sequence.py
单个请求进来以后被封存成Sequence对象,这个函数是定义相关Sequence对象的
from copy import copy
from enum import Enum, auto
from itertools import count
from my-nano-vllm.sampling_params import SamplingParams
class SequenceStatus(Enum):
WAITING = auto()
RUNNING = auto()
FINISHED = auto()
class Sequence:
block_size = 256
counter = count()
def __init__(self, token_ids : list[int], sampling_params = SamplingParams()):
self.seq_id = next(Sequence.counter)
self.status = SequenceStatus.WAITING
self.token_ids = copy(token_ids)
self.last_token = self.token_ids[-1]
self.num_tokens = len(self.token_ids)
self.num_prompt_tokens = len(self.token_ids)
self.num_cached_tokens = 0
self.block_table = []
self.temperature = sampling_params.temperature
self.max_tokens = sampling_params.max_tokens
self.ignore_eos = sampling_params.ignore_eos
def __len__(self):
return self.num_tokens
def __getitem__(self, key):
return self.token_ids[key]
@property
def is_finished(self):
return self.status == SequenceStatus.FINISHED
@property
def num_completion_tokens(self):
return self.num_tokens - self.num_prompt_tokens
@property
def promopt_token_ids(self):
return self.token_ids[:self.num_prompt_tokens]
@property
def completion_token_ids(self):
return self.token_ids[self.num_prompt_tokens:]
@property
def num_cached_blocks(self):
return self.num_cached_tokens // self.block_size
@property
def num_blocks(self):
return (self.num_tokens + self.block_size - 1) // self.block_size
@property
def last_block_num_tokens(self):
return self.num_tokens - (self.num_blocks - 1) * self.block_size
def block(self, i):
assert 0 <= i <= self.num_blocks
return self.token_ids[i * self.block_size : (i + 1) * self.block_size]
def append_token(self, token_id : int):
self.token_ids.append(token_id)
self.num_tokens += 1
self.last_token = token_id
def __getstate__(self):
return (self.num_tokens, self.num_prompt_tokens, self.num_cached_tokens, self.block_table,
self.token_ids if self.num_completion_tokens == 0 else self.last_token)
def __setstate__(self, state):
self.num_tokens, self.num_prompt_tokens, self.num_cached_tokens, self.block_table = state[:-1]
if self.num_completion_tokens == 0:
self.token_ids = state[-1]
else:
self.last_token = state[-1]
SequenceStatus是三种序列的三种生命周期状态,
WAITING等待RUNNING运行中FINISHED结束
Sequence序列,比较关键的一个类,用来描述一个请求进来以后被封装的状态
初始化
def __init__(self, token_ids : list[int], sampling_params = SamplingParams()):
self.seq_id = next(Sequence.counter)
self.status = SequenceStatus.WAITING
self.token_ids = copy(token_ids)
self.last_token = self.token_ids[-1]
self.num_tokens = len(self.token_ids)
self.num_prompt_tokens = len(self.token_ids)
self.num_cached_tokens = 0
self.block_table = []
self.temperature = sampling_params.temperature
self.max_tokens = sampling_params.max_tokens
self.ignore_eos = sampling_params.ignore_eos
seq_id,用于标识每一个序列的id,唯一标识status,用于标识序列当前的状态token_ids,存输入和生成的所有tokenlast_token,存最后一个token,用于生成下一个num_tokens,当前所有token总数,每生成一个token会实时更新数量num_prompt_tokens,输入的token总数,不会变化num_cached_tokens,已经进行kv cache缓存的token数量block_table,核心属性,这是一个物理块ID的列表,记录了该序列的KV Cache存储在GPU的哪些显存块里temperature和max_tokens和ignore_eos上述介绍过了,不过多赘述
def __len__(self):
return self.num_tokens
def __getitem__(self, key):
return self.token_ids[key]
两个魔术方法
__len__可以获取当前总token数
__getitem__可以获取指定下标的token
@property
def is_finished(self):
return self.status == SequenceStatus.FINISHED
@property
def num_completion_tokens(self):
return self.num_tokens - self.num_prompt_tokens
@property
def promopt_token_ids(self):
return self.token_ids[:self.num_prompt_tokens]
@property
def completion_token_ids(self):
return self.token_ids[self.num_prompt_tokens:]
@property
def num_cached_blocks(self):
return self.num_cached_tokens // self.block_size
@property
def num_blocks(self):
return (self.num_tokens + self.block_size - 1) // self.block_size
@property
def last_block_num_tokens(self):
return self.num_tokens - (self.num_blocks - 1) * self.block_size
属性计算方法,用@property装饰器,提供了实时计算的状态,不需要手动更新变量
is_finished,序列是否生成完成num_completion_tokens,生成的token的数量,用num_tokens减去num_prompt_tokensprompt_token_ids,获取prompt的token列表,使用python的切片语法completion_token_ids,获取生成的所有token列表,使用python的切片语法num_cached_blocks,计算目前已经有多少个完整的块已经写入缓存(下取整)num_blocks,计算目前缓存所有token需要的KV cache 块的数量(上取整)last_blcok_num_tokens,计算最后一个块里目前填了多少token(如果满了,下次就要申请新的块)- 这里的计算方法是用
self.num_tokens - (self.num_blocks - 1) * self.block_size - 思考一下为什么不用取余?
self.num_tokens % self.block_size- 为了避免0带来的歧义,0%256 = 256 % 256 = 0,这两者都是0,而前者不需要申请新块,后者却需要,其次速度上取余数也会慢一点
- 这里的计算方法是用
def block(self, i):
assert 0 <= i <= self.num_blocks
return self.token_ids[i * self.block_size : (i + 1) * self.block_size]
def append_token(self, token_id : int):
self.token_ids.append(token_id)
self.num_tokens += 1
self.last_token = token_id
功能函数
block(i)根据索引获取第i个块对应的token子列表append_token(token_id),当模型预测出一个新词时调用,同步更新token_ids、num_tokens、last_token
def __getstate__(self):
return (self.num_tokens, self.num_prompt_tokens, self.num_cached_tokens, self.block_table,
self.token_ids if self.num_completion_tokens == 0 else self.last_token)
def __setstate__(self, state):
self.num_tokens, self.num_prompt_tokens, self.num_cached_tokens, self.block_table = state[:-1]
if self.num_completion_tokens == 0:
self.token_ids = state[-1]
else:
self.last_token = state[-1]
这两个魔术方法构成了一套针对分布式推理场景优化的“数据打包与还原”机制
getstate,智能压缩打包,只带走必须带走的东西num_tokens、num_promot_tokens、num_cached_tokens、block_table这四个是序列的元数据,提及很小但至关重要,需要传输- 如果模型还在Prefill阶段,也就是没生成token,那就需要打包完整的
token_ids - 否则,仅需要打包最后一个token即可,因为推理是逐个token生成的,前面的token计算的KV Cache已经存好了,只需要最后一个token即可计算出下一个token,所以我们不需要传输整个列表
setstate,按需恢复,这个函数在接收端被调用,负责把收到的“元组”还原回对象属性。
Q: 为什么不把
num_tokens做成一个属性计算方法,每次需要他的时候直接求len(self.token_ids)
A:为了解决分布式计算中列表丢失的问题,就是上述的getstate和setstate,会导致分布式计算的时候不会永远存储token_ids,所以也就没法利用len函数求长度,我们仅通过维护一个int 变量即可以达到目的,还可以节省传输带宽,何乐而不为呢?而且,访问一个属性的速度肯定会比调用一个@property的方法要快