# RPM与TPM的关系(以硅基流动为例)

硅基流动平台的RPM(Requests Per Minute,每分钟请求数)TPM(Tokens Per Minute,每分钟令牌数)并非简单的乘积模式,而是独立的速率限制指标,分别约束请求频率和令牌消耗。

  • RPM:限制每分钟向API发送的请求次数(如1000 RPM表示每分钟最多发送1000次请求),与请求的复杂度(如输入token数量)无关。

  • TPM:限制每分钟API处理的总token数量(如500万TPM表示每分钟最多处理500万token),与请求次数无关(如1次请求发送10万token,仍占用10万TPM)。

    两者共同作用,确保API调用的合理性和稳定性:即使请求次数未达RPM上限,若总token数超过TPM,仍会被限流。

# 1000次请求/分钟、每请求5万token的场景计算

若每分钟发送1000次请求,每次请求消耗5万token,则:

  • 总token消耗:1000次/分钟 × 5万token/次 = 500万token/分钟(即500万TPM)。
  • RPM占用:1000次/分钟(刚好达到1000 RPM的上限)。

此时,若硅基流动API的TPM上限≥500万RPM上限≥1000,则该场景可通过;若TPM上限低于500万(如100万TPM),则会因token超限触发限流(即使RPM未超)。

TPM,代表着一家企业客户在单位时间内能够使用的token限额;RPM,代表着一家企业客户在单位时间内最多能用多少次大模型。两个指标中有一个达到限额,模型就拒绝提供服务了。

打个比方:大模型API调用就像一个大水池,TPM是限定了每分钟可以打一杯水还是一桶水,RPM是限定了同时可以来10个人还是来100个人打水。无论水价多便宜,哪怕是免费的,如果一分钟只能接一杯水,那也没太大用处。

# 关键结论

  • RPM与TPM是独立限制,不存在“乘积模式”。
  • 总token消耗=请求次数×每次请求的token数,需同时满足RPM和TPM的限制。
  • 实际使用时,需根据API的RPM和TPM上限,合理调整请求频率和token数量(如减少单次请求的token数或降低请求频率),避免触发限流。

如何处理模型API速率限制 https://blog.csdn.net/yjw123456/article/details/142305561

https://free.kike.cc/2xQf ( https://blog.csdn.net/weixin_41961749/article/details/147325848?spm=1001.2101.3001.6650.4&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-4-147325848-blog-142305561.235%5Ev43%5Epc_blog_bottom_relevance_base1&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-4-147325848-blog-142305561.235%5Ev43%5Epc_blog_bottom_relevance_base1&utm_relevant_index=9 )

https://free.kike.cc/prq ( https://libin9ioak.blog.csdn.net/article/details/149300917?spm=1001.2101.3001.6650.3&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EYuanLiJiHua%7EPosition-3-149300917-blog-142305561.235%5Ev43%5Epc_blog_bottom_relevance_base1&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EYuanLiJiHua%7EPosition-3-149300917-blog-142305561.235%5Ev43%5Epc_blog_bottom_relevance_base1&utm_relevant_index=6 )

https://free.kike.cc/GLgY( https://blog.csdn.net/kycg_/article/details/147513675?ops_request_misc=%257B%2522request%255Fid%2522%253A%252272c6a8d6ed88ffb0c532d19c68cc89f8%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=72c6a8d6ed88ffb0c532d19c68cc89f8&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~blog~first_rank_ecpm_v1~rank_v31_ecpm-6-147513675-null-null.nonecase&utm_term=%E5%A4%A7%E6%A8%A1%E5%9E%8B%20%E9%80%9F%E7%8E%87%E9%99%90%E5%88%B6&spm=1018.2226.3001.4450 )

# 3种处理大模型并发调用的队列框架

以下是针对FastAPI+LangChain项目中处理大模型并发调用和资源紧张问题的三种解决方案的代码示例,分别基于Celery、RQ和ARQ实现:


# 1. Celery + Redis/RabbitMQ(功能强大,适合复杂业务)

核心优势:分布式任务队列、任务重试、结果存储、多Worker支持。

# celery_app.py
from celery import Celery
from langchain.llms import OpenAI

celery_app = Celery(
    'worker',
    broker='redis://localhost:6379/0',  # 任务队列
    backend='redis://localhost:6379/1'  # 结果存储
)

@celery_app.task
def process_llm_request(prompt: str, model_name: str = "text-davinci-003"):
    llm = OpenAI(model_name=model_name)
    return llm(prompt)  # 同步调用,实际项目建议异步方法

# main.py (FastAPI)
from fastapi import FastAPI
from celery_app import process_llm_request

app = FastAPI()

@app.post("/generate")
async def generate_text(prompt: str):
    task = process_llm_request.delay(prompt)  # 任务入队
    return {"task_id": task.id}

@app.get("/result/{task_id}")
async def get_result(task_id: str):
    from celery.result import AsyncResult
    result = AsyncResult(task_id)
    return {"status": result.status, "result": result.result}

启动Worker

celery -A celery_app worker --loglevel=info --concurrency=4

特点

  • 使用不同Redis库隔离任务队列和结果存储
  • 支持优先级队列和超时控制(通过soft_time_limit参数)

# 2. RQ (Redis Queue)(轻量级,适合中小规模)

核心优势:简单易用,与Redis深度集成。

# tasks.py
from langchain.llms import OpenAI
from rq import Queue
from redis import Redis

redis_conn = Redis()
q = Queue(connection=redis_conn)

def llm_task(prompt: str):
    llm = OpenAI()
    return llm(prompt)

# main.py (FastAPI)
from fastapi import FastAPI
from tasks import q, llm_task
from rq.job import Job

app = FastAPI()

@app.post("/generate")
async def generate_text(prompt: str):
    job = q.enqueue(llm_task, prompt)
    return {"job_id": job.id}

@app.get("/result/{job_id}")
async def get_result(job_id: str):
    job = Job.fetch(job_id, connection=q.connection)
    return {"status": job.get_status(), "result": job.result}

启动Worker

rq worker --with-scheduler

特点

  • 无需额外配置,直接使用Redis作为消息代理
  • 适合快速原型开发,但缺乏Celery的高级功能如定时任务

# 3. ARQ(基于asyncio的现代化方案)

核心优势:原生异步支持,与FastAPI的async/await完美契合。

# tasks.py
import asyncio
from arq import create_pool
from arq.connections import RedisSettings
from langchain.llms import OpenAI

async def llm_task(ctx, prompt: str):
    llm = OpenAI()
    return llm(prompt)  # 实际项目应使用LangChain的异步接口

# Worker配置
class WorkerSettings:
    functions = [llm_task]
    redis_settings = RedisSettings(host="localhost")

# main.py (FastAPI)
from fastapi import FastAPI
from arq import create_pool

app = FastAPI()

@app.post("/generate")
async def generate_text(prompt: str):
    redis = await create_pool(RedisSettings())
    job = await redis.enqueue_job("llm_task", prompt)
    return {"job_id": job.job_id}

@app.get("/result/{job_id}")
async def get_result(job_id: str):
    redis = await create_pool(RedisSettings())
    result = await redis.get_job_result(job_id)
    return {"result": result}

启动Worker

arq tasks.WorkerSettings

特点

  • 全异步架构,适合高并发IO密集型场景
  • 支持定时任务(通过cron_jobs配置)

# 方案对比总结

方案 适用场景 异步支持 分布式 学习曲线
Celery 复杂业务、生产环境 部分
RQ 中小项目、快速开发
ARQ 高并发、现代化技术栈

推荐选择

  • 需要高级功能(如任务重试、监控)→ Celery
  • 追求简单轻量 → RQ
  • 全异步架构 → ARQ

所有方案均需确保Redis服务正常运行,并根据实际需求调整并发参数(如Celery的--concurrency或ARQ的Worker数量)。

flowchart TD
    A[MySQL 配置加载流程] --> B["主入口点: /etc/my.cnf"]
    B --> C["包含 !includedir /etc/mysql/conf.d/"]
    B --> D["包含 !includedir /etc/mysql/mysql.conf.d/"]
    C --> E["加载 /etc/mysql/conf.d/*.cnf<br>(按文件名字母顺序)"]
    D --> F["加载 /etc/mysql/mysql.conf.d/*.cnf<br>(按文件名字母顺序)"]
    E --> G[后加载的配置覆盖先加载的配置]
    F --> G
    G --> H[最终生效的配置]
class TeeStdout:

    def __init__(self, logger_func, original_stdout, cache_list=None):
        self.logger_func = logger_func
        self.original_stdout = original_stdout
        self.cache_list = cache_list if cache_list is not None else []