Skip to main content

Creating Your First SSE Stream

In this tutorial, you will build a real-time streaming API using Server-Sent Events (SSE). You will learn how to stream data updates, customize event metadata, and handle client reconnections using the specialized tools provided in this codebase.

Prerequisites

To follow this tutorial, you need a basic FastAPI application structure. Ensure you have the following imports available from the fastapi.sse module:

  • EventSourceResponse: The response class that handles the text/event-stream protocol.
  • ServerSentEvent: The Pydantic model used to define individual event frames.

Step 1: Create a Basic Stream

The simplest way to implement SSE is to use EventSourceResponse as your response_class and yield data from an asynchronous generator.

from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()

class Item(BaseModel):
name: str
price: float

items = [
Item(name="Plumbus", price=32.99),
Item(name="Portal Gun", price=999.99),
]

@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item

When you use response_class=EventSourceResponse, FastAPI automatically wraps every yielded object in an SSE data: field. Because Item is a Pydantic model, it is automatically serialized to JSON. On the wire, the client receives:

data: {"name":"Plumbus","price":32.99}

data: {"name":"Portal Gun","price":999.99}

Step 2: Customize Events with Metadata

To give the client more control—such as specific event types for addEventListener or reconnection IDs—you can yield ServerSentEvent objects instead of raw models.

from fastapi.sse import EventSourceResponse, ServerSentEvent

@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items() -> AsyncIterable[ServerSentEvent]:
# Send a comment for keep-alive or debugging
yield ServerSentEvent(comment="stream of item updates")

for i, item in enumerate(items):
yield ServerSentEvent(
data=item,
event="item_update",
id=str(i + 1),
retry=5000
)

In this step, you are using several key fields of the ServerSentEvent class:

  • event: Sets the event type. The browser can listen specifically for this using eventSource.addEventListener("item_update", ...).
  • id: Assigns a unique ID to the event. If the connection drops, the browser will send this ID back in the Last-Event-ID header.
  • retry: Tells the browser to wait 5000 milliseconds (5 seconds) before attempting to reconnect if the stream is interrupted.
  • comment: Sends a line starting with :, which is ignored by the browser but keeps the connection active.

Step 3: Stream Raw Text and Logs

By default, the data field in ServerSentEvent is always JSON-serialized. If you want to send pre-formatted strings (like log lines or CSV) without extra quotes, use the raw_data field.

@app.get("/logs/stream", response_class=EventSourceResponse)
async def stream_logs() -> AsyncIterable[ServerSentEvent]:
logs = [
"2025-01-01 INFO Application started",
"2025-01-01 WARN High memory usage detected",
]
for log_line in logs:
# raw_data is sent exactly as-is without JSON encoding
yield ServerSentEvent(raw_data=log_line)

Note: data and raw_data are mutually exclusive. If you try to set both on the same ServerSentEvent, the model will raise a ValueError.

Step 4: Handle Reconnections

When a client reconnects after a failure, it sends the last received ID. You can capture this using the Last-Event-ID header to resume the stream from where it left off.

from typing import Annotated
from fastapi import Header

@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items_resumable(
last_event_id: Annotated[int | None, Header()] = None,
) -> AsyncIterable[ServerSentEvent]:
# Determine where to start based on the client's last received ID
start_index = last_event_id + 1 if last_event_id is not None else 0

for i, item in enumerate(items):
if i < start_index:
continue
yield ServerSentEvent(data=item, id=str(i))

By reading the last_event_id from the request headers, you ensure that the client doesn't miss updates or receive duplicates during intermittent network issues.

Keep-Alive and Timeouts

This implementation includes a built-in keep-alive mechanism. If your generator is idle (e.g., waiting for a database change), the EventSourceResponse will automatically send a : ping comment every 15 seconds. This prevents proxies and load balancers from closing the connection due to inactivity.

Summary of Results

You now have a robust SSE implementation that:

  1. Streams JSON-serialized Pydantic models automatically.
  2. Supports custom event types and reconnection logic.
  3. Handles raw text streaming for logs.
  4. Maintains active connections via automatic pings.

For your next steps, consider integrating this with a database listener or a message queue to stream real-time updates from your backend services.