
本文详解如何在 quart 框架中正确实现 server-sent events,重点解决事件流被 nginx/hypercorn 缓冲导致前端收不到实时响应的问题,并提供可直接运行的异步队列驱动 sse 示例。
本文详解如何在 quart 框架中正确实现 server-sent events,重点解决事件流被 nginx/hypercorn 缓冲导致前端收不到实时响应的问题,并提供可直接运行的异步队列驱动 sse 示例。
Server-Sent Events(SSE)是一种基于 HTTP 的单向实时通信机制,适用于服务端向客户端持续推送更新(如通知、日志、状态变更等)。Quart 作为异步 Python Web 框架,天然支持 SSE,但实际部署中常因反向代理(如 Nginx)或 ASGI 服务器(如 Hypercorn)的默认缓冲策略,导致响应体被截断或延迟发送——表现为 Postman 或浏览器长时间挂起、无任何 event-stream 数据输出,仅在服务中断时才“刷出”累积内容。
根本原因在于:Nginx 默认启用 proxy_buffering on,会暂存后端响应直到缓冲区满或连接关闭;而 SSE 要求逐块即时传输(chunked encoding + keep-alive),必须显式禁用代理层缓冲。
✅ 正确解决方案是:在 Quart 响应头中添加 X-Accel-Buffering: no(Nginx 专用指令),强制其绕过缓冲,直通流式数据。
以下是一个生产就绪的 Quart SSE 实现示例:
from quart import Quart, request, make_response, abort
from asyncio import Queue
from dataclasses import dataclass
import asyncio
app = Quart(__name__)
@dataclass
class ServerSentEvent:
data: str
event: str = "message"
def encode(self) -> bytes:
# 严格遵循 SSE 规范:每条消息以 \r\n\r\n 结尾,字段为 data: / event: / id: 等
lines = [f"data: {self.data}"]
if self.event != "message":
lines.append(f"event: {self.event}")
lines.append("") # 空行分隔消息
return "\r\n".join(lines).encode("utf-8")
# 全局事件队列(生产环境建议使用 Redis Pub/Sub 或更健壮的消息总线)
global_event_queue: Queue = Queue()
@app.route("/sse")
async def sse_endpoint():
# 客户端必须声明接受 text/event-stream
if "text/event-stream" not in request.accept_mimetypes:
abort(400, "Accept header must include text/event-stream")
async def event_stream():
# 初始订阅确认事件
yield ServerSentEvent(event="connected", data="SSE connection established").encode()
# 持续监听队列并推送事件
while True:
try:
# 使用 timeout 避免永久阻塞(可选,增强健壮性)
event = await asyncio.wait_for(global_event_queue.get(), timeout=30.0)
yield event.encode()
except asyncio.TimeoutError:
# 心跳保活:发送空注释(SSE 注释以 : 开头,客户端忽略)
yield b": heartbeat\n\n"
# 构造响应:关键!添加 X-Accel-Buffering: no
response = await make_response(
event_stream(),
{
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # ? 核心修复:禁用 Nginx 缓冲
}
)
response.timeout = None # 禁用 Quart 响应超时
return response
# 示例:模拟后端触发事件(如 WebSocket 收到消息、数据库变更等)
@app.route("/trigger-event", methods=["POST"])
async def trigger_event():
form = await request.form
data = form.get("data", "default message")
event_name = form.get("event", "update")
await global_event_queue.put(ServerSentEvent(data=data, event=event_name))
return {"status": "event queued"}? 关键注意事项:
-
Nginx 配置补充(如使用): 除响应头外,还需在 location /sse 块中配置:
proxy_buffering off; proxy_cache off; proxy_http_version 1.1; proxy_set_header Connection ''; chunked_transfer_encoding off; # 确保 Hypercorn 的 chunked 不被覆盖
- ASGI 服务器兼容性: Hypercorn 默认支持流式响应,但需确保未启用 --worker-class sync 等同步模式;推荐使用默认 asyncio worker。
- 客户端连接管理: 浏览器 EventSource 会在断连后自动重试(默认 3s),服务端应处理重复连接(如通过 request.sid 或 token 鉴权)。
- 错误处理与心跳: 示例中加入了超时心跳,避免长连接被中间设备(如负载均衡器)静默关闭;生产环境建议结合 retry: 字段控制重连间隔。
- 并发安全: asyncio.Queue 是协程安全的,多路 event_stream() 可共享同一队列,无需额外锁。
总结:Quart 完全胜任 SSE 场景,问题不在框架本身,而在 HTTP 中间件的流式语义适配。只需牢记——X-Accel-Buffering: no 是 Nginx 环境下 SSE 生效的黄金钥匙,配合规范的事件编码与异步流生成,即可构建低延迟、高可靠的服务端推送能力。










