GraphQL subscriptions give your Python applications the ability to push data to clients the instant something changes on the server. Unlike queries and mutations, which follow a request-response pattern, subscriptions hold open a persistent connection and stream updates in real time. This article walks through the mechanics of how subscriptions work, how they differ from the other two GraphQL operation types, and how to implement them in Python using Strawberry GraphQL and async generators.
GraphQL defines three operation types: queries for reading data, mutations for writing data, and subscriptions for streaming data over time. While queries and mutations use a straightforward HTTP request-response cycle, subscriptions require a persistent connection between the client and the server. In the Python ecosystem, this means working with asynchronous code, WebSocket connections, and libraries that support the subscription specification. The good news is that the tooling has matured significantly, and implementing subscriptions in Python today is more straightforward than it was even a couple of years ago.
What GraphQL Subscriptions Are and Why They Exist
A GraphQL subscription is a long-lived operation that allows the server to send data to the client whenever a specific event occurs. The client does not repeatedly poll for updates. Instead, it establishes a connection, declares what events it cares about, and then waits for the server to push matching data as it becomes available.
Think of a chat application. When a user opens a conversation, the client needs the existing messages (a query) and the ability to send new ones (a mutation). But the client also needs to know when someone else sends a message in real time, without refreshing the page or hammering the server with repeated requests. That is precisely what a subscription handles.
The subscription operation itself looks similar to a query. Here is an example of a GraphQL subscription document that a client would send to the server:
subscription {
newMessage(channelId: "general") {
id
text
sender
timestamp
}
}
The server receives this, registers the client's interest, and then pushes data back through the connection each time a new message appears in that channel. The client continues to receive updates until it explicitly unsubscribes or the connection drops.
Subscriptions are best suited for scenarios where data changes frequently and the client needs to know about those changes immediately. Common use cases include chat systems, live dashboards, collaborative editing, notification feeds, and real-time monitoring. For data that changes infrequently, polling with standard queries at reasonable intervals is often a simpler and more resource-efficient choice.
How the WebSocket Transport Works
Standard HTTP is a request-response protocol. The client sends a request, the server sends back a single response, and the connection closes. This model does not support the ongoing stream of data that subscriptions require. WebSockets solve this by upgrading an HTTP connection into a full-duplex communication channel where both the client and the server can send messages at any time.
When a client wants to subscribe to a GraphQL event, the sequence typically looks like this: the client opens a WebSocket connection to the server, sends an initialization message (often containing authentication credentials), the server acknowledges the connection, and then the client sends its subscription query. From that point forward, the server pushes data through the open socket whenever the subscribed event fires.
There are two main WebSocket subprotocols used in the GraphQL ecosystem. The older graphql-ws protocol (sometimes called the "legacy" or "Apollo" protocol) was the first widely adopted standard, created around 2016. The newer graphql-transport-ws protocol was developed as a replacement to address security concerns and design shortcomings in the original. The newer protocol includes proper connection acknowledgment, cleaner error handling, and support for running queries and mutations over the same WebSocket connection.
When choosing a WebSocket subprotocol, prefer graphql-transport-ws for new projects. The legacy graphql-ws subprotocol (from the subscriptions-transport-ws npm package) is largely unmaintained and has known security gaps. The newer protocol, implemented by the graphql-ws npm library, is actively developed and is the recommended standard going forward.
An alternative to WebSockets for subscription delivery is Server-Sent Events (SSE), which uses standard HTTP to stream data from server to client. SSE is simpler to set up, works naturally with HTTP/2 multiplexing, and avoids the overhead of maintaining a full-duplex connection. However, SSE only supports server-to-client communication, so the client cannot send messages back through the same channel. For GraphQL subscriptions, where the data flow is primarily server-to-client after the initial subscription request, SSE can be a practical option, though WebSockets remain the more widely supported transport in the GraphQL ecosystem.
Building Subscriptions with Strawberry GraphQL
Strawberry is a modern Python library for building GraphQL APIs. It uses Python type hints and dataclasses to define schemas, which makes subscription resolvers feel natural if you are already comfortable with Python's async and await syntax. Strawberry supports subscriptions out of the box and integrates with ASGI-compatible servers like Uvicorn through frameworks such as FastAPI.
To get started, install Strawberry with its FastAPI integration:
pip install strawberry-graphql[fastapi] uvicorn
A subscription resolver in Strawberry is an async generator function decorated with @strawberry.subscription. The function uses yield to emit values over time, and each yielded value is sent to the connected client. Here is a basic example that counts from zero to a target number, pausing one second between each count:
import asyncio
import strawberry
from typing import AsyncGenerator
@strawberry.type
class Query:
@strawberry.field
def hello(self) -> str:
return "world"
@strawberry.type
class Subscription:
@strawberry.subscription
async def count(self, target: int = 10) -> AsyncGenerator[int, None]:
for i in range(target + 1):
yield i
await asyncio.sleep(1)
The return type annotation AsyncGenerator[int, None] tells Strawberry that this subscription will produce integer values. The first type argument is the value being yielded (what gets sent to the client), and the second is conventionally None for subscriptions.
Now wire up the schema and serve it through FastAPI:
from fastapi import FastAPI
from strawberry.fastapi import GraphQLRouter
schema = strawberry.Schema(
query=Query,
subscription=Subscription
)
app = FastAPI()
graphql_app = GraphQLRouter(schema)
app.include_router(graphql_app, prefix="/graphql")
Run the server with uvicorn main:app --reload and navigate to http://localhost:8000/graphql in your browser. The built-in GraphiQL interface supports running subscription operations over a WebSocket connection. Enter the following subscription and press play:
subscription {
count(target: 5)
}
You will see the numbers 0 through 5 appear one per second in the response panel, each delivered as an individual message over the WebSocket connection.
A More Practical Example: Live Notifications
Counting is useful for demonstration, but a real application would subscribe to meaningful events. Here is a notification system where a subscription yields new notifications as they are published through a shared in-memory queue:
import asyncio
import strawberry
from typing import AsyncGenerator
from datetime import datetime
notification_queue: asyncio.Queue = asyncio.Queue()
@strawberry.type
class Notification:
message: str
severity: str
created_at: str
@strawberry.type
class Query:
@strawberry.field
def status(self) -> str:
return "running"
@strawberry.type
class Mutation:
@strawberry.mutation
async def send_notification(
self, message: str, severity: str = "info"
) -> bool:
notification = Notification(
message=message,
severity=severity,
created_at=datetime.now().isoformat()
)
await notification_queue.put(notification)
return True
@strawberry.type
class Subscription:
@strawberry.subscription
async def notifications(self) -> AsyncGenerator[Notification, None]:
while True:
notification = await notification_queue.get()
yield notification
schema = strawberry.Schema(
query=Query,
mutation=Mutation,
subscription=Subscription
)
In this example, a client subscribes to notifications and receives each new Notification object as soon as it is placed into the queue by a mutation. The while True loop keeps the subscription alive indefinitely, waiting on the async queue for the next item. When a separate client (or server process) calls the sendNotification mutation, the notification flows through the queue and is pushed to all subscribed clients.
The asyncio.Queue approach shown above works for a single-process server. In production, where you may have multiple server instances behind a load balancer, you will need a shared message broker like Redis Pub/Sub, RabbitMQ, or Kafka to distribute events across all processes and ensure every subscribed client receives updates regardless of which server instance it is connected to.
Connecting a Client and Handling Unsubscription
On the client side, the gql library is the primary Python GraphQL client with subscription support. It can connect to a subscription endpoint over WebSockets and iterate over incoming results asynchronously:
import asyncio
from gql import Client, gql
from gql.transport.websockets import WebsocketsTransport
async def subscribe_to_notifications():
transport = WebsocketsTransport(
url="ws://localhost:8000/graphql",
subprotocols=[WebsocketsTransport.GRAPHQL_TRANSPORT_WS]
)
async with Client(
transport=transport,
fetch_schema_from_transport=False
) as session:
subscription = gql("""
subscription {
notifications {
message
severity
createdAt
}
}
""")
async for result in session.subscribe(subscription):
print(f"[{result['notifications']['severity']}] "
f"{result['notifications']['message']}")
asyncio.run(subscribe_to_notifications())
The async for loop automatically handles the WebSocket handshake, subscription registration, and message parsing. When you break out of the loop or the function exits, the client sends an unsubscribe message and cleanly closes the connection.
On the server side, Strawberry handles unsubscription through Python's standard asyncio.CancelledError. When a client disconnects, the async generator's pending await is cancelled. You can wrap your subscription logic in a try/except block to perform cleanup when this happens:
@strawberry.type
class Subscription:
@strawberry.subscription
async def notifications(self) -> AsyncGenerator[Notification, None]:
try:
while True:
notification = await notification_queue.get()
yield notification
except asyncio.CancelledError:
# Client disconnected - perform cleanup here
print("Client unsubscribed from notifications")
raise
Re-raising the CancelledError after your cleanup logic is important. Swallowing it silently can prevent Strawberry and the ASGI server from properly releasing the WebSocket resources.
Comparing Python GraphQL Libraries for Subscriptions
Several Python libraries support GraphQL subscriptions, each with different approaches and trade-offs. Here is how they compare:
| Library | Subscription Approach | Notes |
|---|---|---|
| Strawberry GraphQL | Native async generators with type hints | Active development, supports both WebSocket subprotocols, integrates with FastAPI, Django, Flask, and Litestar |
| Graphene + Django Channels | Community modules via Django Channels | Graphene-Django does not include subscription support out of the box; requires third-party packages and an ASGI server like Daphne |
| Ariadne | Schema-first with async generators | Supports subscriptions through ASGI, uses SDL for schema definition rather than code-first type hints |
| Tartiflette | SDL-based with WebSocket endpoint | Built on aiohttp, defines subscriptions in SDL files with Python resolver functions |
Strawberry's approach stands out for its use of Python type annotations, which means your subscription resolvers are statically type-checkable and benefit from IDE autocompletion. Graphene remains popular in Django-heavy projects but requires more assembly for subscriptions, since the core library delegates that functionality to community-maintained packages. Ariadne takes a schema-first approach where you write your types in SDL and attach resolvers in Python, which some teams prefer for keeping the schema as the single source of truth. Tartiflette uses a similar SDL-based model and is built around aiohttp, making it a good fit if your stack already uses that framework.
Key Takeaways
- Subscriptions are event-driven, not request-driven. Unlike queries and mutations that follow a single request-response cycle, subscriptions maintain an open connection and push data from the server each time a relevant event occurs.
- WebSockets are the standard transport. The
graphql-transport-wssubprotocol is the recommended choice for new projects, replacing the oldersubscriptions-transport-wsprotocol that is no longer actively maintained. - Python async generators are the core mechanism. In Strawberry and other modern Python GraphQL libraries, a subscription resolver is an async generator that yields values over time. Each yielded value becomes a message pushed to the client.
- Production deployments need a message broker. An in-memory queue works for development and single-process servers, but distributed production environments require a shared pub/sub system like Redis to fan out events to all connected clients across multiple server instances.
- Cleanup matters. Catching
asyncio.CancelledErrorin your subscription resolver lets you release resources and perform any necessary teardown when a client disconnects.
GraphQL subscriptions fill a specific gap in API design: delivering server-initiated, event-driven data to clients without polling. Python's async ecosystem, combined with libraries like Strawberry GraphQL, makes implementing this pattern approachable. Start with a simple counter subscription to get the wiring right, then build toward production-ready patterns with shared message brokers, authentication in connectionParams, and proper error handling. The persistent WebSocket connection is a different model from the stateless HTTP calls you may be used to, but once the infrastructure is in place, subscriptions become a powerful tool for any application that depends on real-time data.