SSE (Server-Sent Events)

실시간 스트리밍의 가벼운 선택지

SSE는 서버에서 클라이언트로 실시간 데이터를 푸시하는 HTTP 기반 프로토콜이다. WebSocket과의 차이, FastAPI StreamingResponse 구현, 프론트엔드 EventSource 사용법, AI Agent 토큰 스트리밍 패턴을 정리한다.

Engineering
저자

Kwangmin Kim

공개

2026년 05월 05일

1 왜 스트리밍이 필요한가

LLM 응답은 생성에 수 초~수십 초가 걸린다. 일반 HTTP 요청은 서버가 응답을 완성한 후에야 전체를 전송한다. 사용자는 그 동안 빈 화면을 보며 기다려야 한다.

스트리밍은 응답을 토큰 단위로 즉시 전송한다. ChatGPT가 글자를 하나씩 출력하는 효과가 이 방식이다. 체감 응답 시간이 크게 줄어든다.

방식 사용자 체감 서버 동작
일반 응답 5초 대기 → 전체 텍스트 한 번에 출력 전체 생성 완료 후 전송
스트리밍 0.3초 후부터 토큰이 하나씩 출력 토큰 생성 즉시 전송

2 SSE vs WebSocket vs Polling

실시간 통신 방식은 여러 가지가 있다. AI Agent 서빙에서 SSE가 적합한 이유를 비교한다.

기준 SSE WebSocket Long Polling
방향 서버 → 클라이언트 (단방향) 양방향 서버 → 클라이언트
프로토콜 HTTP WS (HTTP 업그레이드) HTTP
재연결 자동 (브라우저 내장) 수동 구현 필요 수동 구현 필요
인프라 호환 HTTP 그대로 사용 프록시, 로드밸런서 설정 필요 HTTP 그대로 사용
복잡도 낮음 높음 중간
적합 사례 LLM 스트리밍, 알림, 로그 채팅, 게임, 공동 편집 레거시 호환

LLM 스트리밍은 서버가 토큰을 보내고, 클라이언트는 받기만 한다. 양방향 통신이 필요 없으므로 SSE가 가장 간단한 선택이다. HTTP 위에서 동작하므로 기존 인프라(로드밸런서, 프록시, CORS)를 수정 없이 사용할 수 있다.

3 SSE 프로토콜 구조

SSE는 특별한 프로토콜이 아니다. HTTP 응답의 Content-Typetext/event-stream으로 설정하고, 특정 형식으로 데이터를 보내면 된다.

3.1 응답 형식

HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

data: 첫 번째 메시지

data: 두 번째 메시지

event: custom_event
data: {"key": "value"}

data: [DONE]

규칙은 간단하다:

  • 각 메시지는 data: 접두사로 시작한다
  • 메시지 사이는 빈 줄(\n\n)로 구분한다
  • event: 필드로 이벤트 타입을 지정할 수 있다 (기본값: message)
  • id: 필드로 이벤트 ID를 지정하면 재연결 시 마지막 ID부터 재개할 수 있다
  • retry: 필드로 재연결 간격(밀리초)을 지정한다

3.2 이벤트 타입 활용

event: token
data: RAG

event: token
data: 는

event: citation
data: {"source": "guide.pdf", "page": 42}

event: done
data: {"run_id": "abc-123", "latency_ms": 1500}

이벤트 타입을 분리하면 클라이언트에서 토큰, 인용, 완료 신호를 각각 다르게 처리할 수 있다.

4 FastAPI 서버 구현

FastAPI의 StreamingResponse로 SSE를 구현한다.

4.1 기본 구조

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

async def event_generator():
    """SSE 이벤트를 생성하는 비동기 제너레이터"""
    for i in range(5):
        await asyncio.sleep(0.5)
        yield f"data: 메시지 {i}\n\n"
    yield "data: [DONE]\n\n"

@app.get("/stream")
def stream():
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        },
    )

yield가 호출될 때마다 해당 데이터가 클라이언트에 즉시 전송된다. 제너레이터가 종료되면 연결이 닫힌다.

4.2 LLM 토큰 스트리밍

실제 AI Agent에서의 구현 패턴이다.

from pydantic import BaseModel

class RunRequest(BaseModel):
    text: str
    history: list[dict] = []

class StreamEvent(BaseModel):
    event: str      # "token", "citation", "done", "error"
    data: str       # JSON 문자열

async def stream_agent(request: RunRequest):
    """에이전트 응답을 토큰 단위로 스트리밍한다"""
    try:
        async for token in agent.astream(request.text):
            event = StreamEvent(event="token", data=token)
            yield f"event: token\ndata: {event.model_dump_json()}\n\n"

        yield f"event: done\ndata: {{}}\n\n"

    except Exception as e:
        error_data = {"message": str(e)}
        yield f"event: error\ndata: {error_data}\n\n"

@app.post("/agents/qna_chatbot/stream")
def stream_endpoint(request: RunRequest):
    return StreamingResponse(
        stream_agent(request),
        media_type="text/event-stream",
    )

이 패턴의 핵심:

  • async for로 LLM의 토큰을 하나씩 받는다
  • 각 토큰을 SSE 형식으로 즉시 yield한다
  • 완료 시 done 이벤트를 보내 클라이언트에 종료를 알린다
  • 에러 발생 시 error 이벤트로 클라이언트에 전달한다

4.3 Pydantic으로 이벤트 스키마 정의

from pydantic import BaseModel
from enum import Enum

class EventType(str, Enum):
    token = "token"
    citation = "citation"
    done = "done"
    error = "error"

class StreamEvent(BaseModel):
    event: EventType
    data: str

class Citation(BaseModel):
    source: str
    page: int | None = None

class DonePayload(BaseModel):
    run_id: str
    latency_ms: int
    input_tokens: int = 0
    output_tokens: int = 0

이벤트 스키마를 Pydantic으로 정의하면 서버와 클라이언트 간 계약이 명확해진다.

5 프론트엔드 구현

읽기 가이드

이 섹션은 JavaScript와 브라우저 API를 사용한다. 프론트엔드를 아직 배우지 않았다면 위의 FastAPI 서버 구현까지만 읽고, React 기초 학습 후 돌아와도 된다. 서버 측 SSE 구현만으로도 curl이나 Postman으로 스트리밍 응답을 확인할 수 있다.

5.1 EventSource API

브라우저에 내장된 EventSource는 SSE를 위한 전용 API이다. 자동 재연결, 이벤트 타입별 리스너를 지원한다.

const eventSource = new EventSource("/stream");

// 기본 메시지 수신
eventSource.onmessage = (event) => {
  console.log(event.data);
};

// 커스텀 이벤트 수신
eventSource.addEventListener("token", (event) => {
  const data = JSON.parse(event.data);
  appendToken(data);
});

eventSource.addEventListener("done", (event) => {
  eventSource.close();
});

// 에러 처리
eventSource.onerror = (event) => {
  console.error("SSE 연결 에러");
  eventSource.close();
};

그러나 EventSourceGET 요청만 지원하고, 요청 헤더나 본문을 설정할 수 없다. AI Agent API처럼 POST로 JSON 본문을 보내야 하는 경우에는 fetch를 사용한다.

5.2 fetch로 SSE 수신

async function streamChat(text: string): Promise<void> {
  const response = await fetch("/agents/qna_chatbot/stream", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ text }),
  });

  if (!response.ok || !response.body) {
    throw new Error(`HTTP ${response.status}`);
  }

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let buffer = "";

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });
    const lines = buffer.split("\n\n");
    buffer = lines.pop() || "";

    for (const block of lines) {
      if (!block.trim()) continue;

      const eventMatch = block.match(/^event:\s*(.+)$/m);
      const dataMatch = block.match(/^data:\s*(.+)$/m);

      if (!dataMatch) continue;

      const eventType = eventMatch?.[1] || "message";
      const data = dataMatch[1];

      switch (eventType) {
        case "token":
          appendToChat(JSON.parse(data));
          break;
        case "done":
          finishChat();
          break;
        case "error":
          showError(JSON.parse(data));
          break;
      }
    }
  }
}

이 구현의 주요 포인트:

  • response.body.getReader()로 ReadableStream을 청크 단위로 읽는다
  • buffer에 불완전한 메시지를 누적하여 \n\n 기준으로 분리한다
  • 이벤트 타입별로 다른 핸들러를 호출한다

5.3 React에서의 사용

import { useState, useCallback } from "react";

function ChatComponent() {
  const [messages, setMessages] = useState<string[]>([]);
  const [currentResponse, setCurrentResponse] = useState("");
  const [isStreaming, setIsStreaming] = useState(false);

  const sendMessage = useCallback(async (text: string) => {
    setIsStreaming(true);
    setCurrentResponse("");

    const response = await fetch("/agents/qna_chatbot/stream", {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({ text }),
    });

    const reader = response.body!.getReader();
    const decoder = new TextDecoder();
    let buffer = "";
    let accumulated = "";

    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      buffer += decoder.decode(value, { stream: true });
      const blocks = buffer.split("\n\n");
      buffer = blocks.pop() || "";

      for (const block of blocks) {
        const dataMatch = block.match(/^data:\s*(.+)$/m);
        if (!dataMatch) continue;

        const parsed = JSON.parse(dataMatch[1]);
        accumulated += parsed;
        setCurrentResponse(accumulated);
      }
    }

    setMessages((prev) => [...prev, accumulated]);
    setCurrentResponse("");
    setIsStreaming(false);
  }, []);

  return (
    <div>
      {messages.map((msg, i) => (
        <div key={i}>{msg}</div>
      ))}
      {isStreaming && <div>{currentResponse}</div>}
    </div>
  );
}

setCurrentResponse가 토큰마다 호출되어 화면에 글자가 하나씩 나타나는 효과를 만든다.

6 SSE와 CORS

SSE도 HTTP 위에서 동작하므로 CORS 규칙이 동일하게 적용된다.

  • EventSourceGET 요청이므로 단순 요청에 해당한다
  • fetchPOST + JSON을 보내면 Preflight가 발생한다
  • Vite Proxy를 사용하면 CORS 자체가 발생하지 않는다

FastAPI의 CORSMiddlewareStreamingResponse에도 동일하게 적용되므로 추가 설정은 필요 없다.

7 SSE 연결 관리

7.1 클라이언트 연결 해제 감지

사용자가 페이지를 닫거나 네비게이션하면 SSE 연결이 끊어진다. 서버에서 이를 감지하지 않으면 불필요한 LLM 호출이 계속된다.

from starlette.requests import Request

@app.post("/agents/qna_chatbot/stream")
async def stream_endpoint(request: RunRequest, raw_request: Request):
    async def generate():
        async for token in agent.astream(request.text):
            if await raw_request.is_disconnected():
                break
            yield f"data: {token}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

request.is_disconnected()로 연결 상태를 확인하여 불필요한 처리를 중단한다.

7.2 타임아웃

LLM 호출이 너무 오래 걸리면 연결을 종료해야 한다.

import asyncio

async def stream_with_timeout(request: RunRequest):
    try:
        async for token in asyncio.timeout(60)(agent.astream(request.text)):
            yield f"event: token\ndata: {token}\n\n"
        yield "event: done\ndata: {}\n\n"
    except asyncio.TimeoutError:
        yield 'event: error\ndata: {"message": "응답 시간 초과"}\n\n'

8 관련 주제

선행 지식

후속 주제

다른 카테고리 연결

Subscribe

Enjoy this blog? Get notified of new posts by email: