背景
LLMClientPool 多 endpoint 模式当前的并发/QPS 控制是每 endpoint 独立的:
EndpointConfig.concurrency_limit / max_qps 定义每个 endpoint 的上限
- 顶层
concurrency_limit / max_qps 只是"每个未独立配置的 endpoint 的默认回退值"
- 进入
chat_completions 入口时没有 pool 级 semaphore 或 token bucket,只是先轮询选 endpoint、再进入该 endpoint client 的独立 semaphore
代码位置: `flexllm/clients/pool.py` 的 `chat_completions`(L603+) 和 `_init_multi_mode`(L474+)
问题
这导致无法表达"总并发/总 QPS 硬上限"的语义。
举例: 3 个 endpoint,每个 `concurrency_limit=30`,实际峰值并发能到 90。使用者若想把该模型在全链路上卡到"总并发 ≤ 50",目前没有办法做到(除非手动把每个 endpoint 的并发都调小,但那样又丧失了"某个 endpoint 吃不消单独限速"的灵活性)。
对于 API 配额按"总 QPS"计费(例如总 QPS=100,但底层可以均摊到多个 endpoint 做路由)的场景,这个问题更明显。
期望行为
给 `LLMClientPool` 加两个可选参数:
```python
LLMClientPool(
endpoints=[...],
concurrency_limit=20, # 每 endpoint 默认(已有,不变)
max_qps=None, # 每 endpoint 默认(已有,不变)
total_concurrency_limit=50, # 新增: 跨所有 endpoint 的全局并发硬上限
total_max_qps=100, # 新增: 跨所有 endpoint 的全局 QPS 硬上限
)
```
语义: 总 cap 是硬上限,per-endpoint 是软上限,两者同时生效(哪个先触发就卡在哪)。等价于"每 endpoint 自己不能更快,全局也不能更多"。
单 endpoint 模式下这两个参数可以忽略或直接等同于 `concurrency_limit` / `max_qps`。
建议实现
在 `_init_multi_mode` 里初始化:
```python
self._pool_sem = asyncio.Semaphore(total_concurrency_limit) if total_concurrency_limit else None
self._pool_qps_bucket = TokenBucket(total_max_qps) if total_max_qps else None
```
在 `chat_completions`(及 streaming / batch 入口)最外层:
```python
async def _guarded():
if self._pool_qps_bucket:
await self._pool_qps_bucket.acquire()
# 原有的轮询 + fallback + endpoint-level semaphore 逻辑
if self._pool_sem:
async with self._pool_sem:
return await _guarded()
else:
return await _guarded()
```
需要同时覆盖的入口:
- `chat_completions`
- `chat_completions_batch`
- `chat_completions_stream`
- 其他走 pool 调度的方法
兼容性
- 不传 `total_concurrency_limit` / `total_max_qps` 时行为完全不变
- 不影响单 endpoint 模式
- 对 `ProviderRouter` 的轮询 / fallback 逻辑无影响(全局 cap 在路由之前就卡住了)
设计类比
这和 Kubernetes 的 ResourceQuota (namespace 级总量) + LimitRange (单 Pod 上限) 是同构的,属于"双层 cap"的标配设计:
| 层级 |
参数 |
作用 |
| Pool (全局) |
`total_concurrency_limit` / `total_max_qps` |
硬上限,配额约束 |
| Endpoint (单点) |
`concurrency_limit` / `max_qps` |
软上限,单点保护 |
场景
- 第三方 LLM 服务按总 QPS 计费,需要严格控制总调用速率
- 多 endpoint 是为了"故障转移/负载均衡",而不是为了"把并发叠加上去"
- 数据处理/批量推理场景中,下游业务系统有总并发上限,上游不能超发
背景
LLMClientPool多 endpoint 模式当前的并发/QPS 控制是每 endpoint 独立的:EndpointConfig.concurrency_limit/max_qps定义每个 endpoint 的上限concurrency_limit/max_qps只是"每个未独立配置的 endpoint 的默认回退值"chat_completions入口时没有 pool 级 semaphore 或 token bucket,只是先轮询选 endpoint、再进入该 endpoint client 的独立 semaphore代码位置: `flexllm/clients/pool.py` 的 `chat_completions`(L603+) 和 `_init_multi_mode`(L474+)
问题
这导致无法表达"总并发/总 QPS 硬上限"的语义。
举例: 3 个 endpoint,每个 `concurrency_limit=30`,实际峰值并发能到 90。使用者若想把该模型在全链路上卡到"总并发 ≤ 50",目前没有办法做到(除非手动把每个 endpoint 的并发都调小,但那样又丧失了"某个 endpoint 吃不消单独限速"的灵活性)。
对于 API 配额按"总 QPS"计费(例如总 QPS=100,但底层可以均摊到多个 endpoint 做路由)的场景,这个问题更明显。
期望行为
给 `LLMClientPool` 加两个可选参数:
```python
LLMClientPool(
endpoints=[...],
concurrency_limit=20, # 每 endpoint 默认(已有,不变)
max_qps=None, # 每 endpoint 默认(已有,不变)
total_concurrency_limit=50, # 新增: 跨所有 endpoint 的全局并发硬上限
total_max_qps=100, # 新增: 跨所有 endpoint 的全局 QPS 硬上限
)
```
语义: 总 cap 是硬上限,per-endpoint 是软上限,两者同时生效(哪个先触发就卡在哪)。等价于"每 endpoint 自己不能更快,全局也不能更多"。
单 endpoint 模式下这两个参数可以忽略或直接等同于 `concurrency_limit` / `max_qps`。
建议实现
在 `_init_multi_mode` 里初始化:
```python
self._pool_sem = asyncio.Semaphore(total_concurrency_limit) if total_concurrency_limit else None
self._pool_qps_bucket = TokenBucket(total_max_qps) if total_max_qps else None
```
在 `chat_completions`(及 streaming / batch 入口)最外层:
```python
async def _guarded():
if self._pool_qps_bucket:
await self._pool_qps_bucket.acquire()
# 原有的轮询 + fallback + endpoint-level semaphore 逻辑
if self._pool_sem:
async with self._pool_sem:
return await _guarded()
else:
return await _guarded()
```
需要同时覆盖的入口:
兼容性
设计类比
这和 Kubernetes 的 ResourceQuota (namespace 级总量) + LimitRange (单 Pod 上限) 是同构的,属于"双层 cap"的标配设计:
场景