Creating Your First SSE Stream
In this tutorial, you will build a real-time streaming API using Server-Sent Events (SSE). You will learn how to stream data updates, customize event metadata, and handle client reconnections using the specialized tools provided in this codebase.
Prerequisites
To follow this tutorial, you need a basic FastAPI application structure. Ensure you have the following imports available from the fastapi.sse module:
EventSourceResponse: The response class that handles thetext/event-streamprotocol.ServerSentEvent: The Pydantic model used to define individual event frames.
Step 1: Create a Basic Stream
The simplest way to implement SSE is to use EventSourceResponse as your response_class and yield data from an asynchronous generator.
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
price: float
items = [
Item(name="Plumbus", price=32.99),
Item(name="Portal Gun", price=999.99),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
When you use response_class=EventSourceResponse, FastAPI automatically wraps every yielded object in an SSE data: field. Because Item is a Pydantic model, it is automatically serialized to JSON. On the wire, the client receives:
data: {"name":"Plumbus","price":32.99}
data: {"name":"Portal Gun","price":999.99}
Step 2: Customize Events with Metadata
To give the client more control—such as specific event types for addEventListener or reconnection IDs—you can yield ServerSentEvent objects instead of raw models.
from fastapi.sse import EventSourceResponse, ServerSentEvent
@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items() -> AsyncIterable[ServerSentEvent]:
# Send a comment for keep-alive or debugging
yield ServerSentEvent(comment="stream of item updates")
for i, item in enumerate(items):
yield ServerSentEvent(
data=item,
event="item_update",
id=str(i + 1),
retry=5000
)
In this step, you are using several key fields of the ServerSentEvent class:
event: Sets the event type. The browser can listen specifically for this usingeventSource.addEventListener("item_update", ...).id: Assigns a unique ID to the event. If the connection drops, the browser will send this ID back in theLast-Event-IDheader.retry: Tells the browser to wait 5000 milliseconds (5 seconds) before attempting to reconnect if the stream is interrupted.comment: Sends a line starting with:, which is ignored by the browser but keeps the connection active.
Step 3: Stream Raw Text and Logs
By default, the data field in ServerSentEvent is always JSON-serialized. If you want to send pre-formatted strings (like log lines or CSV) without extra quotes, use the raw_data field.
@app.get("/logs/stream", response_class=EventSourceResponse)
async def stream_logs() -> AsyncIterable[ServerSentEvent]:
logs = [
"2025-01-01 INFO Application started",
"2025-01-01 WARN High memory usage detected",
]
for log_line in logs:
# raw_data is sent exactly as-is without JSON encoding
yield ServerSentEvent(raw_data=log_line)
Note: data and raw_data are mutually exclusive. If you try to set both on the same ServerSentEvent, the model will raise a ValueError.
Step 4: Handle Reconnections
When a client reconnects after a failure, it sends the last received ID. You can capture this using the Last-Event-ID header to resume the stream from where it left off.
from typing import Annotated
from fastapi import Header
@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items_resumable(
last_event_id: Annotated[int | None, Header()] = None,
) -> AsyncIterable[ServerSentEvent]:
# Determine where to start based on the client's last received ID
start_index = last_event_id + 1 if last_event_id is not None else 0
for i, item in enumerate(items):
if i < start_index:
continue
yield ServerSentEvent(data=item, id=str(i))
By reading the last_event_id from the request headers, you ensure that the client doesn't miss updates or receive duplicates during intermittent network issues.
Keep-Alive and Timeouts
This implementation includes a built-in keep-alive mechanism. If your generator is idle (e.g., waiting for a database change), the EventSourceResponse will automatically send a : ping comment every 15 seconds. This prevents proxies and load balancers from closing the connection due to inactivity.
Summary of Results
You now have a robust SSE implementation that:
- Streams JSON-serialized Pydantic models automatically.
- Supports custom event types and reconnection logic.
- Handles raw text streaming for logs.
- Maintains active connections via automatic pings.
For your next steps, consider integrating this with a database listener or a message queue to stream real-time updates from your backend services.