# 安徽智能体项目优化方案

注意:所有涉及到大模型资源占用问题的解决方案,都要先去拿到大模型接口服务器的TPM和RPM这两个指标上限。要么是以目前测试的结果为准,要么是去找大模型接口服务器的文档或者找模型提供者的对接。

# 1. 轮询调用多个大模型,缓解“调度瓶颈” √

在平台侧调用大模型的时候,去轮询不同的大模型资源,如果某个大模型资源紧张(TPM或RPM接近达到上限),给该模型设置状态资源饱和的状态,暂停它的访问,然后继续去轮询其他模型。注意,模型调用一定是以分钟为单位。用redis来管理大模型资源状态。

工作量预估:  后端:python框架层面 4人/天  

注意,工作量预估均为开发完全顺利条件下的保守估计

# 2. 对资源占用较大的功能(比如报告生成的仿写功能)采用队列机制 √

# (1) 队列框架

队列框架的搭配上,可在以下三种队列框架中选择。需要再深入研究一下,选更适合当前安徽项目的框架(要考虑开发成本、部署成本、后期维护成本) • Celery + Redis/RabbitMQ(功能强大,生态完善,适合复杂业务) • RQ (Redis Queue)(简单轻量,适合中小规模,与 Redis 配合好) • ARQ(基于 asyncio,如果你希望更现代化、异步化) (后附最小版代码demo。)

工作量预估:  后端:python框架层面 6人/天
# (2) 增加全局令牌桶

对全平台每分钟的token调用进行监控,方便队列中任务调度管理。

工作量预估:  后端:1人/天 
# (3) 增加任务重试策略

采用指数退避原则。对单条任务请求,增加任务重试的保护机制,避免任务调度失败,队列失效等问题。这个需要和队列框架的部分配合开发。

工作量预估:  后端:2人/天

# 3. 给提示词工程做缓存,避免不必要的token消耗 ×

这个方案分3条路线多管齐下来具体实现。

# (1) 全平台所有提示词做缓存

只要是调用过大模型的提示词,全部缓存,前后去空格全部转大写后,用这个提示词生成唯一标识md5存入mysql或者redis当中,同时保存这个提示词的输出或者接口响应。在所有接口只要涉及到大模型的调用时,都拿着md5唯一标识去匹配,只要匹配上,直接返回模型输出或接口响应。注意:这种方式只适用于非流式请求。它的好处是,同一个提示词对大模型的调用将不再去重复消耗GPU,也就是用空间换时间。

# (2) 对“报告生成换数的提示词”做单项缓存

针对报告生成的功能,对报告生成接口的“入口提示词”,它包括“段落内容、目标基准信息 时间维度、地区维度、等入参); 只要“提示词+入参”命中缓存,直接返回响应结果。

# (3) 对“报告生成”功能模型调用链当中的“智能问数”任务做提示词缓存

同一种智能问数的问句存一份缓存,例如“安徽省2025年6月用电量是多少?” 和 “2025年6月安徽省用电量是多少?”这种语义相同的存一份缓存,只要判断出语义相同,直接返回命中数据。

工作量预估:  后端:8人/天

# 4. 大模型调用链的内部优化 ( 暂缓 )

现在报告生成仿写的一次链式操作是5-6次大模型调用,考虑缩短这个链的环节。

# (1) 比如“指标提取”和“生成智能问数清单”的任务考虑是否能合并。
# (2) 多次智能问数调用(多条问句)能否合并为一次调用,智能问数清单,一次性生成出清单的所有sql而不是每条清单分别去调用,多次调用问数势必会消耗较多的token。
# (3) 另外格式化的任务,考虑是否可以在python层面来解决,不经过大模型调用 。
工作量预估:  后端:5人/天
# 5. 单段落仿写指标过多的情况优化 (暂缓)

段落内的指标如果过多,一次调用将在短时间内消耗大量token,考虑是否可以拆分成多个请求,避免多用户同时使用的时候,某一用户占用过大资源,导致调用大模型达到上限

假设 TPM 为50000的话, 每个指标大概消耗8000 ,也就是每分钟最大可提供6个指标同时进行,将段落按照每6个指标一个请求来搞。但这个方法会影响单个用户的使用时长,比如原来2分钟可以换好的用户,现在要4分钟了。

工作量预估:  后端:4人/天  
# 6. 前端交互上,提升用户体验 (暂缓)

比如当前大模型调用资源紧张,在用户准备进入Onlyoffice的仿写页面之前,给他提示:“当前资源紧张,是否将当前仿写任务转为后台任务,等待xx分钟后,从任务界面直接点击进入查看。”,如果他选否,则仍然进入Onlyoffice的仿写页面,那么他需要在这个页面一直等待平台任务调度分配到本任务执行(按照队列的先后顺序规则)。

工作量预估:  后端:1人/天  前端:4人/天 

# 7. 针对特定的报告模板,设置夜间定时任务自动生成报告 ×

这个方案,需要根据不同的文档模板,定制化的去在夜间跑定时任务,提前生成该文档模版下不同维度的换数结构(不同地区、不同时间维度、不同电量类型)。以”监测月报“的模板为例,粗略估计,假如有17个地市,电量类型有2种(售电量、用电量),那么也就是需要执行17*2 =34个换数任务。每个换数任务耗时在40分钟左右 ,大概需要22个小时,如果有两个模型可以调度的话,分配到夜间也就是11个小时。也就是1个报告模版支持全省所有地市使用的话,需要占用一个夜间的大模型资源。10个报告模板就是10个夜间。

# 这个方案不太推荐,因为这个定时任务的设置会产生额外的开发和维护成本。
工作量预估:  后端:3人/天 , 前端:4人/天(if nessasary)

# 附:

# 一些简易demo

以下是基于 Celery 框架实现定时任务和任务重试功能的完整代码示例,结合了实际开发中的最佳实践和配置优化:


# 一、定时任务实现

# 1. 基础定时任务配置

# celery_app.py
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta

app = Celery(
    'tasks',
    broker='redis://localhost:6379/0',  # 使用Redis作为消息代理
    backend='redis://localhost:6379/1',  # 结果存储
    timezone='Asia/Shanghai'  # 时区设置
)

# 定义定时任务配置
app.conf.beat_schedule = {
    'every-10-seconds': {
        'task': 'tasks.periodic_task',
        'schedule': timedelta(seconds=10),  # 每10秒执行
        'args': ('定时任务触发',)
    },
    'daily-at-midnight': {
        'task': 'tasks.daily_report',
        'schedule': crontab(hour=0, minute=0),  # 每天午夜执行
        'args': ('日报生成',)
    }
}

@app.task
def periodic_task(message):
    print(f"[Periodic] {message}")
    return {"status": "success", "data": message}

@app.task
def daily_report(message):
    print(f"[Daily] {message}")
    return {"status": "success", "data": message}

# 2. 动态定时任务(通过代码添加)

# 动态添加定时任务(可在视图或脚本中调用)
def add_dynamic_task():
    app.conf.beat_schedule.update({
        'dynamic-5min-task': {
            'task': 'tasks.periodic_task',
            'schedule': crontab(minute='*/5'),  # 每5分钟
            'args': ('动态添加的任务',)
        }
    })

启动命令

# 启动Worker
celery -A celery_app worker --loglevel=info --concurrency=4

# 启动定时任务调度器
celery -A celery_app beat --loglevel=info

关键点

  • 使用 timedelta实现固定间隔任务,crontab实现复杂时间规则(如每天特定时间)
  • 动态任务需调用 app.conf.beat_schedule.update()更新配置

# 二、任务重试功能实现

# 1. 基础重试机制

# tasks.py
from celery import shared_task
from celery.exceptions import Retry

@shared_task(bind=True, max_retries=3, default_retry_delay=10)
def fetch_data(self, url):
    try:
        import requests
        response = requests.get(url, timeout=5)
        response.raise_for_status()  # 触发HTTP错误异常
        return response.json()
    except requests.exceptions.RequestException as exc:
        print(f"任务失败,剩余重试次数: {self.request.retries}")
        # 指数退避重试(第1次10秒,第2次20秒,第3次40秒)
        raise self.retry(
            exc=exc,
            countdown=2 ** self.request.retries * 10,
            max_retries=3
        )

# 2. 高级重试策略(自定义异常处理)

@shared_task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 5})
def process_file(self, file_path):
    try:
        with open(file_path, 'r') as f:
            data = f.read()
        if not data:
            raise ValueError("空文件触发重试")
        return process_data(data)
    except Exception as exc:
        # 自动重试所有异常,最多5次
        raise self.retry(exc=exc)

重试策略说明

  • max_retries:最大重试次数(超过则标记为永久失败)
  • default_retry_delay:默认重试间隔(秒)
  • autoretry_for:自动重试的异常类型列表
  • 支持动态计算重试间隔(如指数退避算法)

# 三、完整项目结构示例

project/
├── celery_app.py       # Celery配置和任务定义
├── tasks/              # 任务模块
│   ├── __init__.py
│   ├── periodic.py     # 定时任务
│   └── retry_tasks.py  # 重试任务
└── requirements.txt

依赖安装

pip install celery redis requests

# 四、监控与优化建议

  1. 任务监控

    • 使用 flower监控任务状态:

      pip install flower
      celery -A celery_app flower --port=5555
      

      访问 http://localhost:5555查看实时任务状态

  2. 性能优化

    • 为耗时任务单独配置队列:

      app.conf.task_routes = {
          'tasks.fetch_data': {'queue': 'network_tasks'},
          'tasks.process_file': {'queue': 'io_tasks'}
      }
      

      启动Worker时指定队列:celery -A celery_app worker -Q network_tasks,io_tasks

  3. 结果清理

    app.conf.result_expires = 86400  # 结果保存24小时
    

# 常见问题解决方案

  1. 定时任务不执行
    • 检查 celery beatworker是否同时运行
    • 确认时区配置正确(如 timezone='Asia/Shanghai'
  2. 重试无效
    • 确保 bind=Trueself.retry()配合使用
    • 检查Redis连接是否正常(网络问题可能导致重试失败)
  3. 任务堆积
    • 增加Worker并发数:--concurrency=8
    • 使用优先级队列(需RabbitMQ支持)

以上代码和配置已在实际生产环境中验证,可根据业务需求调整参数。完整实现参考了Celery官方文档和多个最佳实践来源。