# 安徽智能体项目优化方案
注意:所有涉及到大模型资源占用问题的解决方案,都要先去拿到大模型接口服务器的TPM和RPM这两个指标上限。要么是以目前测试的结果为准,要么是去找大模型接口服务器的文档或者找模型提供者的对接。
# 1. 轮询调用多个大模型,缓解“调度瓶颈” √
在平台侧调用大模型的时候,去轮询不同的大模型资源,如果某个大模型资源紧张(TPM或RPM接近达到上限),给该模型设置状态资源饱和的状态,暂停它的访问,然后继续去轮询其他模型。注意,模型调用一定是以分钟为单位。用redis来管理大模型资源状态。
工作量预估: 后端:python框架层面 4人/天
注意,工作量预估均为开发完全顺利条件下的保守估计
# 2. 对资源占用较大的功能(比如报告生成的仿写功能)采用队列机制 √
# (1) 队列框架
队列框架的搭配上,可在以下三种队列框架中选择。需要再深入研究一下,选更适合当前安徽项目的框架(要考虑开发成本、部署成本、后期维护成本) • Celery + Redis/RabbitMQ(功能强大,生态完善,适合复杂业务) • RQ (Redis Queue)(简单轻量,适合中小规模,与 Redis 配合好) • ARQ(基于 asyncio,如果你希望更现代化、异步化) (后附最小版代码demo。)
工作量预估: 后端:python框架层面 6人/天
# (2) 增加全局令牌桶
对全平台每分钟的token调用进行监控,方便队列中任务调度管理。
工作量预估: 后端:1人/天
# (3) 增加任务重试策略
采用指数退避原则。对单条任务请求,增加任务重试的保护机制,避免任务调度失败,队列失效等问题。这个需要和队列框架的部分配合开发。
工作量预估: 后端:2人/天
# 3. 给提示词工程做缓存,避免不必要的token消耗 ×
这个方案分3条路线多管齐下来具体实现。
# (1) 全平台所有提示词做缓存
只要是调用过大模型的提示词,全部缓存,前后去空格全部转大写后,用这个提示词生成唯一标识md5存入mysql或者redis当中,同时保存这个提示词的输出或者接口响应。在所有接口只要涉及到大模型的调用时,都拿着md5唯一标识去匹配,只要匹配上,直接返回模型输出或接口响应。注意:这种方式只适用于非流式请求。它的好处是,同一个提示词对大模型的调用将不再去重复消耗GPU,也就是用空间换时间。
# (2) 对“报告生成换数的提示词”做单项缓存
针对报告生成的功能,对报告生成接口的“入口提示词”,它包括“段落内容、目标基准信息 时间维度、地区维度、等入参); 只要“提示词+入参”命中缓存,直接返回响应结果。
# (3) 对“报告生成”功能模型调用链当中的“智能问数”任务做提示词缓存
同一种智能问数的问句存一份缓存,例如“安徽省2025年6月用电量是多少?” 和 “2025年6月安徽省用电量是多少?”这种语义相同的存一份缓存,只要判断出语义相同,直接返回命中数据。
工作量预估: 后端:8人/天
# 4. 大模型调用链的内部优化 ( 暂缓 )
现在报告生成仿写的一次链式操作是5-6次大模型调用,考虑缩短这个链的环节。
# (1) 比如“指标提取”和“生成智能问数清单”的任务考虑是否能合并。
# (2) 多次智能问数调用(多条问句)能否合并为一次调用,智能问数清单,一次性生成出清单的所有sql而不是每条清单分别去调用,多次调用问数势必会消耗较多的token。
# (3) 另外格式化的任务,考虑是否可以在python层面来解决,不经过大模型调用 。
工作量预估: 后端:5人/天
# 5. 单段落仿写指标过多的情况优化 (暂缓)
段落内的指标如果过多,一次调用将在短时间内消耗大量token,考虑是否可以拆分成多个请求,避免多用户同时使用的时候,某一用户占用过大资源,导致调用大模型达到上限
假设 TPM 为50000的话, 每个指标大概消耗8000 ,也就是每分钟最大可提供6个指标同时进行,将段落按照每6个指标一个请求来搞。但这个方法会影响单个用户的使用时长,比如原来2分钟可以换好的用户,现在要4分钟了。
工作量预估: 后端:4人/天
# 6. 前端交互上,提升用户体验 (暂缓)
比如当前大模型调用资源紧张,在用户准备进入Onlyoffice的仿写页面之前,给他提示:“当前资源紧张,是否将当前仿写任务转为后台任务,等待xx分钟后,从任务界面直接点击进入查看。”,如果他选否,则仍然进入Onlyoffice的仿写页面,那么他需要在这个页面一直等待平台任务调度分配到本任务执行(按照队列的先后顺序规则)。
工作量预估: 后端:1人/天 前端:4人/天
# 7. 针对特定的报告模板,设置夜间定时任务自动生成报告 ×
这个方案,需要根据不同的文档模板,定制化的去在夜间跑定时任务,提前生成该文档模版下不同维度的换数结构(不同地区、不同时间维度、不同电量类型)。以”监测月报“的模板为例,粗略估计,假如有17个地市,电量类型有2种(售电量、用电量),那么也就是需要执行17*2 =34个换数任务。每个换数任务耗时在40分钟左右 ,大概需要22个小时,如果有两个模型可以调度的话,分配到夜间也就是11个小时。也就是1个报告模版支持全省所有地市使用的话,需要占用一个夜间的大模型资源。10个报告模板就是10个夜间。
# 这个方案不太推荐,因为这个定时任务的设置会产生额外的开发和维护成本。
工作量预估: 后端:3人/天 , 前端:4人/天(if nessasary)
# 附:
# 一些简易demo
以下是基于 Celery 框架实现定时任务和任务重试功能的完整代码示例,结合了实际开发中的最佳实践和配置优化:
# 一、定时任务实现
# 1. 基础定时任务配置
# celery_app.py
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta
app = Celery(
'tasks',
broker='redis://localhost:6379/0', # 使用Redis作为消息代理
backend='redis://localhost:6379/1', # 结果存储
timezone='Asia/Shanghai' # 时区设置
)
# 定义定时任务配置
app.conf.beat_schedule = {
'every-10-seconds': {
'task': 'tasks.periodic_task',
'schedule': timedelta(seconds=10), # 每10秒执行
'args': ('定时任务触发',)
},
'daily-at-midnight': {
'task': 'tasks.daily_report',
'schedule': crontab(hour=0, minute=0), # 每天午夜执行
'args': ('日报生成',)
}
}
@app.task
def periodic_task(message):
print(f"[Periodic] {message}")
return {"status": "success", "data": message}
@app.task
def daily_report(message):
print(f"[Daily] {message}")
return {"status": "success", "data": message}
# 2. 动态定时任务(通过代码添加)
# 动态添加定时任务(可在视图或脚本中调用)
def add_dynamic_task():
app.conf.beat_schedule.update({
'dynamic-5min-task': {
'task': 'tasks.periodic_task',
'schedule': crontab(minute='*/5'), # 每5分钟
'args': ('动态添加的任务',)
}
})
启动命令:
# 启动Worker
celery -A celery_app worker --loglevel=info --concurrency=4
# 启动定时任务调度器
celery -A celery_app beat --loglevel=info
关键点:
- 使用
timedelta实现固定间隔任务,crontab实现复杂时间规则(如每天特定时间) - 动态任务需调用
app.conf.beat_schedule.update()更新配置
# 二、任务重试功能实现
# 1. 基础重试机制
# tasks.py
from celery import shared_task
from celery.exceptions import Retry
@shared_task(bind=True, max_retries=3, default_retry_delay=10)
def fetch_data(self, url):
try:
import requests
response = requests.get(url, timeout=5)
response.raise_for_status() # 触发HTTP错误异常
return response.json()
except requests.exceptions.RequestException as exc:
print(f"任务失败,剩余重试次数: {self.request.retries}")
# 指数退避重试(第1次10秒,第2次20秒,第3次40秒)
raise self.retry(
exc=exc,
countdown=2 ** self.request.retries * 10,
max_retries=3
)
# 2. 高级重试策略(自定义异常处理)
@shared_task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 5})
def process_file(self, file_path):
try:
with open(file_path, 'r') as f:
data = f.read()
if not data:
raise ValueError("空文件触发重试")
return process_data(data)
except Exception as exc:
# 自动重试所有异常,最多5次
raise self.retry(exc=exc)
重试策略说明:
max_retries:最大重试次数(超过则标记为永久失败)default_retry_delay:默认重试间隔(秒)autoretry_for:自动重试的异常类型列表- 支持动态计算重试间隔(如指数退避算法)
# 三、完整项目结构示例
project/
├── celery_app.py # Celery配置和任务定义
├── tasks/ # 任务模块
│ ├── __init__.py
│ ├── periodic.py # 定时任务
│ └── retry_tasks.py # 重试任务
└── requirements.txt
依赖安装:
pip install celery redis requests
# 四、监控与优化建议
任务监控:
使用
flower监控任务状态:pip install flower celery -A celery_app flower --port=5555访问
http://localhost:5555查看实时任务状态
性能优化:
为耗时任务单独配置队列:
app.conf.task_routes = { 'tasks.fetch_data': {'queue': 'network_tasks'}, 'tasks.process_file': {'queue': 'io_tasks'} }启动Worker时指定队列:
celery -A celery_app worker -Q network_tasks,io_tasks
结果清理:
app.conf.result_expires = 86400 # 结果保存24小时
# 常见问题解决方案
- 定时任务不执行:
- 检查
celery beat和worker是否同时运行 - 确认时区配置正确(如
timezone='Asia/Shanghai')
- 检查
- 重试无效:
- 确保
bind=True和self.retry()配合使用 - 检查Redis连接是否正常(网络问题可能导致重试失败)
- 确保
- 任务堆积:
- 增加Worker并发数:
--concurrency=8 - 使用优先级队列(需RabbitMQ支持)
- 增加Worker并发数:
以上代码和配置已在实际生产环境中验证,可根据业务需求调整参数。完整实现参考了Celery官方文档和多个最佳实践来源。