# 流式调用问题的根因分析

之前调用北科的个别流式接口,一直误认为是假流式,实则不是。它用的协议是Streamable HTTP ,一种比SSE更易用的流式请求(基于http),是MCP协会最新官方推荐的流式调用协议。

# 1. 概念解释

Streamable HTTP 是一种基于标准 HTTP 协议、支持流式传输按需动态升级的通信机制,它允许数据以分块形式传输,并支持无状态服务器运行。它并非一个全新的协议标准,而更像是一种结合了传统 HTTP POST 和 Server-Sent Events (SSE) 优势的设计模式或应用层实践。

它与 SSE (Server-Sent Events) 的主要区别如下表所示:

特性维度 Streamable HTTP SSE (Server-Sent Events)
通信方向 双向 (客户端通过POST发,服务器通过SSE推) 单向 (仅服务器到客户端)
协议性质 HTTP协议的一种使用方式,无严格标准格式 有特定格式和事件机制的专门协议
端点设计 统一端点 (如 /message),所有操作通过此端点 需要两个端点 (如 /sse用于连接,/messages用于发送请求)
连接管理 灵活,可短连接,也可动态升级为长连接 始终保持长连接
服务器状态 支持无状态,会话ID通过Header传递,适合云原生 需要有状态,服务器需维护连接和会话信息
数据格式 灵活,由应用层定义 (如JSON、Protobuf) 固定文本格式 (UTF-8编码,特定事件字段)
重连机制 通常需应用层自行实现 内置自动重连机制
基础设施兼容性 兼容性好,易于与CDN、API网关、负载均衡器配合 可能受代理或防火墙策略影响,长连接维护成本高
浏览器支持 依赖标准HTTP,支持广泛 现代浏览器普遍支持(IE/Edge旧版本除外)
MCP协议中的角色 官方推荐的下一代传输机制,逐步替代SSE+HTTP 传统方式,计划被淘汰

# 🧠 核心工作原理简述

  1. Streamable HTTP:所有通信(包括客户端请求和服务器响应)都通过一个统一的端点(如 /message)进行。客户端通过 HTTP POST 请求(短连接)发送数据,并在请求头中携带 Mcp-Session-Id来标识会话。服务器根据请求内容或头信息,动态决定是否将本次 HTTP 响应升级为类似 SSE 的流式传输(长连接),用于持续推送数据(如进度更新)。
  2. SSE:客户端通过创建 EventSource对象向一个专门的端点(如 /sse)发起 HTTP GET 请求,服务器接受并保持此长连接,并按照特定的文本格式(如 data: ...)持续向客户端推送事件流。客户端如需向服务器发送数据,则需额外发起一个 HTTP POST 请求到另一个端点(如 /messages)。

# ⚖️ 如何选择?

  • 选择 Streamable HTTP 当
    • 你希望架构更灵活、更现代,追求更好的可扩展性和无状态设计,尤其是在云原生、Serverless(如 AWS Lambda)或高并发分布式环境中。
    • 你需要更好的基础设施兼容性,希望无缝集成现有的 CDN、API 网关和负载均衡器。
    • 你正在基于 MCP 协议进行开发,它已成为官方推荐的传输方式。
  • 选择 SSE 当
    • 你的应用场景非常简单,只需要服务器向客户端单向推送信息(如实时通知、行情更新、新闻推送)。
    • 你希望快速实现原型,并且看重浏览器提供的开箱即用的自动重连功能
    • 项目对旧版浏览器兼容性没有要求,或者可以使用 Polyfill。

# 🔮 概念小结

简单来说,Streamable HTTP 是在借鉴和融合了 SSE 特性后,对传统 HTTP 通信模式的一种增强和优化。它提供了更大的灵活性、更好的可扩展性以及更现代化的架构支持,特别是在 MCP 等协议中被视为未来方向。而 SSE 则是一个简单、专一且成熟的解决方案,适用于特定的单向数据推送场景。

# 2. 为什么在经验成果库一直无法正常调用个别北科的流式接口?

# 2.1 现象

# (1) curl调用,可正常流式响应

(opens new window)

# (2) Postman一次性返回响应(多条json一次性返回),而非流式(看上去是假流式)

pV6T91S.md.png (opens new window)

# (3)前端页面用Axios调用,或者直接用fetch API调用,均为一次性返回响应,现象上和postman的调用类似。

# 2.2 根因分析

(1)经验成果库的前端框架可能为了兼容早期的ie,对原生fetch进行了封装,导致其无法处理 streamble http(Accept:application/x-ndjson , Content-Type: application/json )(原生的fetch可以), axios调用的时候也受影响,可能也改过原生的xhr。

[pV6IOiV.png

pV6IjRU.png (opens new window)

(2)postman暂时不支持 Streamble http流式效果展示,所以看上去也是假流式。

#

# Fetch API对 Streamable HTTP的应用(示例Demo)
# 前端
<!DOCTYPE html>
<html lang="zh">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>测试流式接口</title>
    <style>
        #output {
            white-space: pre-wrap;
            word-wrap: break-word;
            border: 1px solid #ccc;
            padding: 10px;
            margin-top: 10px;
            min-height: 100px;
        }
    </style>
</head>
<body>
    <h1>测试后端流式接口</h1>
    <button id="fetchButton">点击调用接口 (prompt: "test")</button>
    <div id="output">响应输出将显示在这里...</div>

    <script>
        const button = document.getElementById('fetchButton');
        const outputDiv = document.getElementById('output');

        button.addEventListener('click', async () => {
            outputDiv.textContent = '正在调用接口...';
            try {
                const response = await fetch('http://localhost:8000/api/v1/conversion/test_streamable_http', {
                    method: 'POST',
                    headers: {
                        'Content-Type': 'application/json',
                        'Accept': 'application/x-ndjson'
                    },
                    body: JSON.stringify({ prompt: 'test' })
                });

                // 检查响应是否支持流式
                if (!response.body) {
                    outputDiv.textContent += '\n错误: response.body 为 undefined,可能浏览器或环境不支持流式响应。';
                    const text = await response.text();
                    outputDiv.textContent += '\n完整响应: ' + text;
                    return;
                }
				
				debugger
                // 处理流式响应
                const reader = response.body.getReader();
                const decoder = new TextDecoder('utf-8');
                let fullText = '';

                while (true) {
                    const { done, value } = await reader.read();
                    if (done) {
                        outputDiv.textContent += '\n流式响应结束。';
                        break;
                    }

                    // 解码数据块
                    const chunk = decoder.decode(value, { stream: true });
                    // 按行分割 ndjson
                    const lines = chunk.split('\n');
                    for (const line of lines) {
                        if (line.trim()) {
                            try {
                                const json = JSON.parse(line);
                                if (json.delta) {
                                    fullText += json.delta;
                                    outputDiv.textContent = fullText;  // 实时更新显示
                                } else if (json.finish_reason) {
                                    outputDiv.textContent += '\n结束原因: ' + json.finish_reason;
                                }
                            } catch (e) {
                                console.error('解析 JSON 失败:', line, e);
                                outputDiv.textContent += '\n解析错误: ' + line;
                            }
                        }
                    }
                }
            } catch (error) {
                console.error('fetch 错误:', error);
                outputDiv.textContent += '\n错误: ' + error.message;
            }
        });
    </script>
</body>
</html>
# python后端
async def fake_streamer(prompt: str):
    """模拟逐字符返回 JSON delta"""
    text = f"用户输入: {prompt} -> 从重点行业增长点来看,国家统计局最新数据显示,今年一季度安徽省汽车制造业电量同比分别增长xx%。"
    batch = b""
    for char in text:
        # 每个字符构造成 JSON bytes
        chunk = json.dumps({"delta": char}, ensure_ascii=False).encode("utf-8") + b"\n"
        batch += chunk
        # 每 10 字节 flush 一次,避免浏览器缓冲
        if len(batch) > 50:
            yield batch
            batch = b""
        await asyncio.sleep(0.05)
    if batch:
        yield batch
    # 流结束标记
    yield json.dumps({"finish_reason": "stop"}).encode("utf-8") + b"\n"

@router.post("/test_streamable_http")
async def stream_api(prompt: dict):
    user_input = prompt.get("prompt", "")
    return StreamingResponse(
        fake_streamer(user_input),
        media_type="application/x-ndjson",
        headers={"Transfer-Encoding": "chunked", "Cache-Control": "no-cache"}
    )

# 一些其他补充

# 💎 技术选型对比

特性维度 Server-Sent Events (SSE) WebSocket Fetch API + ReadableStream(Streamable HTTP)
通信方向 服务器到客户端的单向推送 双向实时通信 服务器到客户端的单向数据流(常基于HTTP)
协议 标准HTTP协议 独立的ws或wss协议 标准HTTP协议
浏览器支持 广泛(IE除外) 广泛 广泛(现代浏览器)
自定义头部 不支持(需通过URL或Cookie认证) 支持 支持(如Authorization
实现复杂度 (前端使用EventSource 中(需管理连接、心跳) 中(需手动处理流读取、解码)
自动重连 支持 需自行实现 需自行实现
适用场景 实时通知、状态更新、文本逐字生成 聊天室、实时协作、多轮对话 需POST请求或自定义头的流式输出(如与OpenAI兼容的API)

# 🧩 选择建议

  1. 追求简单快捷的文本推送(如新闻更新、模型逐字输出):优先考虑 SSE。它的实现非常简单,前端使用 EventSource即可,并且支持自动重连,对于不需要复杂交互的场景非常合适。
  2. 需要双向实时交互(如多轮对话、实时协作编辑):应该选择 WebSocket。虽然实现相对复杂,但它能支持客户端和服务器之间的全双工通信,适合交互性强的应用。
  3. 需要调用现有API(尤其兼容OpenAI风格)或必须使用POST请求和自定义头部Fetch API + ReadableStream 更灵活。它允许你使用 POST 方法和设置认证头,是调用许多主流大模型API(如设置 stream: true)时的常见方式。
  4. 框架与库的考虑
    • 使用 Next.js 等全栈框架时,可充分利用其 API Routes 来处理流式响应。
    • Gradio 适合快速构建机器学习模型的交互界面,内置了对流式输出的支持。
    • 对于复杂应用,LangChain.jsRxJS 等库能帮助更好地管理和处理流数据。

# 💎 核心注意事项

  • 性能与内存管理:流式交互需关注内存使用。后端可采用 KV Cache 优化等技术减少重复计算,提升推理效率。前端在处理长流时,要注意合理释放已完成处理的数据块。
  • 错误处理与重连机制:务必为流式连接设计健壮的错误处理(onerror)和重连逻辑(SSE自带,其他需手动实现),以保证用户体验。
  • 数据格式解析:服务器发送的数据格式(如纯文本、JSON对象、SSE格式)需要前后端预先约定,前端要根据约定进行解析和拼接。
  • 浏览器兼容性:确认你的目标用户使用的浏览器是否支持所选的方案(如SSE不支持IE)。

# 💡 总结

选择的关键在于明确你的需求:

  • 简单单向推送 ➡️ SSE
  • 复杂双向交互 ➡️ WebSocket
  • 调用第三方API或需高度自定义请求 ➡️ Fetch API + ReadableStream