WebSocket Integration
In this codebase, WebSocket integration is provided through the FastAPI application class, allowing for real-time, bidirectional communication. This is implemented by registering specialized routes that handle the WebSocket handshake and subsequent data exchange.
Defining WebSocket Endpoints
The primary way to define a WebSocket endpoint is using the @app.websocket(path) decorator. This decorator, found in fastapi/applications.py, registers a function as a WebSocket handler by calling self.add_api_websocket_route.
A basic implementation requires injecting an instance of the WebSocket class and explicitly accepting the connection:
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
# The connection must be explicitly accepted
await websocket.accept()
while True:
# Receive data from the client
data = await websocket.receive_text()
# Send data back to the client
await websocket.send_text(f"Message text was: {data}")
The WebSocket Class
The WebSocket class (imported from fastapi but originating from Starlette) provides the interface for interacting with the active connection. Key methods include:
accept(): Completes the WebSocket handshake. If this is not called, the server will eventually close the connection.receive_text(),receive_bytes(),receive_json(): Methods to wait for and receive data from the client.send_text(),send_bytes(),send_json(): Methods to push data to the client.close(code): Closes the connection with an optional status code.
Dependency Injection in WebSockets
FastAPI supports dependency injection for WebSocket endpoints similarly to standard HTTP routes. However, a critical distinction in this codebase is that dependencies are resolved once at the start of the connection. They do not re-run for every message received over the socket.
As seen in tests/test_ws_dependencies.py, dependencies can be defined at the application level, router level, or directly in the decorator:
def create_dependency(name: str):
def fun(deps: list[str] = Depends(get_list)):
deps.append(name)
return Depends(fun)
@app.websocket("/", dependencies=[create_dependency("index")])
async def index(websocket: WebSocket, deps: list[str] = Depends(get_list)):
await websocket.accept()
# 'deps' contains values resolved during the initial handshake
await websocket.send_text(json.dumps(deps))
await websocket.close()
If a dependency fails during the handshake (e.g., a validation error), the application raises a WebSocketRequestValidationError. The FastAPI class initializes a default handler for this in fastapi/applications.py:
self.exception_handlers.setdefault(
WebSocketRequestValidationError,
websocket_request_validation_exception_handler,
)
Handling Disconnections
When a client closes the connection, the receive methods will raise a WebSocketDisconnect exception. Graceful handling of these events is typically achieved using a try...except block.
This pattern is essential for cleaning up resources or notifying other parts of the system that a client has left, as demonstrated in the project's documentation examples:
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f"Client #{client_id} says: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client #{client_id} left the chat")
Connection Management
For applications requiring broadcasting or tracking multiple users, a common pattern in this codebase is the use of a ConnectionManager. This is not a built-in FastAPI class but a structural pattern used to track active WebSocket instances.
The manager typically maintains a list of active connections and provides methods to iterate over them for broadcasting:
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
This approach allows the WebSocket endpoint to focus on the lifecycle of a single connection while the manager handles the aggregate state of the real-time system.