1 왜 스트리밍이 필요한가
LLM 응답은 생성에 수 초~수십 초가 걸린다. 일반 HTTP 요청은 서버가 응답을 완성한 후에야 전체를 전송한다. 사용자는 그 동안 빈 화면을 보며 기다려야 한다.
스트리밍은 응답을 토큰 단위로 즉시 전송한다. ChatGPT가 글자를 하나씩 출력하는 효과가 이 방식이다. 체감 응답 시간이 크게 줄어든다.
| 방식 | 사용자 체감 | 서버 동작 |
|---|---|---|
| 일반 응답 | 5초 대기 → 전체 텍스트 한 번에 출력 | 전체 생성 완료 후 전송 |
| 스트리밍 | 0.3초 후부터 토큰이 하나씩 출력 | 토큰 생성 즉시 전송 |
2 SSE vs WebSocket vs Polling
실시간 통신 방식은 여러 가지가 있다. AI Agent 서빙에서 SSE가 적합한 이유를 비교한다.
| 기준 | SSE | WebSocket | Long Polling |
|---|---|---|---|
| 방향 | 서버 → 클라이언트 (단방향) | 양방향 | 서버 → 클라이언트 |
| 프로토콜 | HTTP | WS (HTTP 업그레이드) | HTTP |
| 재연결 | 자동 (브라우저 내장) | 수동 구현 필요 | 수동 구현 필요 |
| 인프라 호환 | HTTP 그대로 사용 | 프록시, 로드밸런서 설정 필요 | HTTP 그대로 사용 |
| 복잡도 | 낮음 | 높음 | 중간 |
| 적합 사례 | LLM 스트리밍, 알림, 로그 | 채팅, 게임, 공동 편집 | 레거시 호환 |
LLM 스트리밍은 서버가 토큰을 보내고, 클라이언트는 받기만 한다. 양방향 통신이 필요 없으므로 SSE가 가장 간단한 선택이다. HTTP 위에서 동작하므로 기존 인프라(로드밸런서, 프록시, CORS)를 수정 없이 사용할 수 있다.
3 SSE 프로토콜 구조
SSE는 특별한 프로토콜이 아니다. HTTP 응답의 Content-Type을 text/event-stream으로 설정하고, 특정 형식으로 데이터를 보내면 된다.
3.1 응답 형식
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
data: 첫 번째 메시지
data: 두 번째 메시지
event: custom_event
data: {"key": "value"}
data: [DONE]
규칙은 간단하다:
- 각 메시지는
data:접두사로 시작한다 - 메시지 사이는 빈 줄(
\n\n)로 구분한다 event:필드로 이벤트 타입을 지정할 수 있다 (기본값:message)id:필드로 이벤트 ID를 지정하면 재연결 시 마지막 ID부터 재개할 수 있다retry:필드로 재연결 간격(밀리초)을 지정한다
3.2 이벤트 타입 활용
event: token
data: RAG
event: token
data: 는
event: citation
data: {"source": "guide.pdf", "page": 42}
event: done
data: {"run_id": "abc-123", "latency_ms": 1500}
이벤트 타입을 분리하면 클라이언트에서 토큰, 인용, 완료 신호를 각각 다르게 처리할 수 있다.
4 FastAPI 서버 구현
FastAPI의 StreamingResponse로 SSE를 구현한다.
4.1 기본 구조
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
async def event_generator():
"""SSE 이벤트를 생성하는 비동기 제너레이터"""
for i in range(5):
await asyncio.sleep(0.5)
yield f"data: 메시지 {i}\n\n"
yield "data: [DONE]\n\n"
@app.get("/stream")
def stream():
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)yield가 호출될 때마다 해당 데이터가 클라이언트에 즉시 전송된다. 제너레이터가 종료되면 연결이 닫힌다.
4.2 LLM 토큰 스트리밍
실제 AI Agent에서의 구현 패턴이다.
from pydantic import BaseModel
class RunRequest(BaseModel):
text: str
history: list[dict] = []
class StreamEvent(BaseModel):
event: str # "token", "citation", "done", "error"
data: str # JSON 문자열
async def stream_agent(request: RunRequest):
"""에이전트 응답을 토큰 단위로 스트리밍한다"""
try:
async for token in agent.astream(request.text):
event = StreamEvent(event="token", data=token)
yield f"event: token\ndata: {event.model_dump_json()}\n\n"
yield f"event: done\ndata: {{}}\n\n"
except Exception as e:
error_data = {"message": str(e)}
yield f"event: error\ndata: {error_data}\n\n"
@app.post("/agents/qna_chatbot/stream")
def stream_endpoint(request: RunRequest):
return StreamingResponse(
stream_agent(request),
media_type="text/event-stream",
)이 패턴의 핵심:
async for로 LLM의 토큰을 하나씩 받는다- 각 토큰을 SSE 형식으로 즉시
yield한다 - 완료 시
done이벤트를 보내 클라이언트에 종료를 알린다 - 에러 발생 시
error이벤트로 클라이언트에 전달한다
4.3 Pydantic으로 이벤트 스키마 정의
from pydantic import BaseModel
from enum import Enum
class EventType(str, Enum):
token = "token"
citation = "citation"
done = "done"
error = "error"
class StreamEvent(BaseModel):
event: EventType
data: str
class Citation(BaseModel):
source: str
page: int | None = None
class DonePayload(BaseModel):
run_id: str
latency_ms: int
input_tokens: int = 0
output_tokens: int = 0이벤트 스키마를 Pydantic으로 정의하면 서버와 클라이언트 간 계약이 명확해진다.
5 프론트엔드 구현
이 섹션은 JavaScript와 브라우저 API를 사용한다. 프론트엔드를 아직 배우지 않았다면 위의 FastAPI 서버 구현까지만 읽고, React 기초 학습 후 돌아와도 된다. 서버 측 SSE 구현만으로도 curl이나 Postman으로 스트리밍 응답을 확인할 수 있다.
5.1 EventSource API
브라우저에 내장된 EventSource는 SSE를 위한 전용 API이다. 자동 재연결, 이벤트 타입별 리스너를 지원한다.
const eventSource = new EventSource("/stream");
// 기본 메시지 수신
eventSource.onmessage = (event) => {
console.log(event.data);
};
// 커스텀 이벤트 수신
eventSource.addEventListener("token", (event) => {
const data = JSON.parse(event.data);
appendToken(data);
});
eventSource.addEventListener("done", (event) => {
eventSource.close();
});
// 에러 처리
eventSource.onerror = (event) => {
console.error("SSE 연결 에러");
eventSource.close();
};그러나 EventSource는 GET 요청만 지원하고, 요청 헤더나 본문을 설정할 수 없다. AI Agent API처럼 POST로 JSON 본문을 보내야 하는 경우에는 fetch를 사용한다.
5.2 fetch로 SSE 수신
async function streamChat(text: string): Promise<void> {
const response = await fetch("/agents/qna_chatbot/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ text }),
});
if (!response.ok || !response.body) {
throw new Error(`HTTP ${response.status}`);
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n\n");
buffer = lines.pop() || "";
for (const block of lines) {
if (!block.trim()) continue;
const eventMatch = block.match(/^event:\s*(.+)$/m);
const dataMatch = block.match(/^data:\s*(.+)$/m);
if (!dataMatch) continue;
const eventType = eventMatch?.[1] || "message";
const data = dataMatch[1];
switch (eventType) {
case "token":
appendToChat(JSON.parse(data));
break;
case "done":
finishChat();
break;
case "error":
showError(JSON.parse(data));
break;
}
}
}
}이 구현의 주요 포인트:
response.body.getReader()로 ReadableStream을 청크 단위로 읽는다buffer에 불완전한 메시지를 누적하여\n\n기준으로 분리한다- 이벤트 타입별로 다른 핸들러를 호출한다
5.3 React에서의 사용
import { useState, useCallback } from "react";
function ChatComponent() {
const [messages, setMessages] = useState<string[]>([]);
const [currentResponse, setCurrentResponse] = useState("");
const [isStreaming, setIsStreaming] = useState(false);
const sendMessage = useCallback(async (text: string) => {
setIsStreaming(true);
setCurrentResponse("");
const response = await fetch("/agents/qna_chatbot/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ text }),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
let accumulated = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const blocks = buffer.split("\n\n");
buffer = blocks.pop() || "";
for (const block of blocks) {
const dataMatch = block.match(/^data:\s*(.+)$/m);
if (!dataMatch) continue;
const parsed = JSON.parse(dataMatch[1]);
accumulated += parsed;
setCurrentResponse(accumulated);
}
}
setMessages((prev) => [...prev, accumulated]);
setCurrentResponse("");
setIsStreaming(false);
}, []);
return (
<div>
{messages.map((msg, i) => (
<div key={i}>{msg}</div>
))}
{isStreaming && <div>{currentResponse}</div>}
</div>
);
}setCurrentResponse가 토큰마다 호출되어 화면에 글자가 하나씩 나타나는 효과를 만든다.
6 SSE와 CORS
SSE도 HTTP 위에서 동작하므로 CORS 규칙이 동일하게 적용된다.
EventSource는GET요청이므로 단순 요청에 해당한다fetch로POST+ JSON을 보내면 Preflight가 발생한다- Vite Proxy를 사용하면 CORS 자체가 발생하지 않는다
FastAPI의 CORSMiddleware가 StreamingResponse에도 동일하게 적용되므로 추가 설정은 필요 없다.
7 SSE 연결 관리
7.1 클라이언트 연결 해제 감지
사용자가 페이지를 닫거나 네비게이션하면 SSE 연결이 끊어진다. 서버에서 이를 감지하지 않으면 불필요한 LLM 호출이 계속된다.
from starlette.requests import Request
@app.post("/agents/qna_chatbot/stream")
async def stream_endpoint(request: RunRequest, raw_request: Request):
async def generate():
async for token in agent.astream(request.text):
if await raw_request.is_disconnected():
break
yield f"data: {token}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")request.is_disconnected()로 연결 상태를 확인하여 불필요한 처리를 중단한다.
7.2 타임아웃
LLM 호출이 너무 오래 걸리면 연결을 종료해야 한다.
import asyncio
async def stream_with_timeout(request: RunRequest):
try:
async for token in asyncio.timeout(60)(agent.astream(request.text)):
yield f"event: token\ndata: {token}\n\n"
yield "event: done\ndata: {}\n\n"
except asyncio.TimeoutError:
yield 'event: error\ndata: {"message": "응답 시간 초과"}\n\n'8 관련 주제
선행 지식
- API 기초 – HTTP 요청/응답 구조
- FastAPI 입문 – StreamingResponse 기반
- Pydantic – StreamEvent 스키마 정의
- CORS와 Proxy – SSE도 CORS 규칙이 적용됨 (SSE와 CORS 섹션 참조)
후속 주제
- ASGI와 uvicorn – 비동기 서버가 SSE를 처리하는 원리
- React에서 API 호출 – fetch + ReadableStream 클라이언트
다른 카테고리 연결
- Async Programming – Python async/await, 비동기 제너레이터