A coroutine is a special kind of function that can pause its own execution, hand control back to the caller, and resume right where it left off. That single idea is what makes Python's async programming model tick — and understanding it properly will change the way you write programs that need to do more than one thing at a time.
If you have spent any time reading Python documentation or browsing Stack Overflow, you have probably seen async and await pop up without much explanation. Those two keywords are the entry point to coroutines. They were formalized in Python 3.5 through PEP 492, which proposed making coroutines a standalone concept with dedicated syntax. The stated goal was to create an approachable mental model for async programming that reads almost like synchronous code. Understanding that model properly — not just copying the syntax — will change how you think about writing programs that do more than one thing at a time.
Functions vs. Coroutines
A regular Python function runs from top to bottom and returns a value. While it is running, everything else waits. That model is perfectly fine when your code is actively computing, but it falls apart the moment your program needs to wait on something slow — a network request, a file read, a database query — without freezing the entire process.
A coroutine solves this by introducing a suspension point. Instead of blocking the thread while waiting, the coroutine pauses itself and says "come back to me when there is something worth waking up for." Other work can happen in the meantime.
There is a meaningful distinction here worth internalizing. A regular function has a single entry point and a single exit point. A coroutine has a single entry point but multiple suspension points. Each await expression is a place where execution can pause, other coroutines can run, and eventually the original coroutine picks up exactly where it stopped — with all of its local variables intact.
Coroutines are concurrent, not parallel. They run on a single thread. The event loop switches between them when one is paused, so you are never actually executing two coroutines at the exact same instant. This is cooperative multitasking — coroutines voluntarily yield control, which eliminates the race conditions that plague threaded code.
Think of a chef who puts a pot of water on to boil, then chops vegetables while they wait. They are not doing both actions simultaneously — they are switching between tasks during idle time. That is exactly what coroutines let your program do. But there is a critical detail the cooking analogy usually leaves out: the chef has to actively decide to switch tasks. If they stand at the stove staring at the water, nothing else happens. That is exactly what happens when you accidentally put blocking code inside a coroutine — the event loop freezes because no one told it to switch.
Defining and Calling a Coroutine
Defining a coroutine looks almost identical to defining a regular function, with one addition: the async keyword placed before def.
async def greet(name):
return f"Hello, {name}!"
Here is where a lot of new learners get tripped up. Calling a coroutine function does not execute it immediately. It returns a coroutine object — a suspended task waiting to be run. If you call it without await or without scheduling it, Python will raise a RuntimeWarning telling you the coroutine was never awaited.
# This does NOT print anything — it creates a coroutine object
result = greet("Kandi")
print(result)
# Output: <coroutine object greet at 0x...>
# RuntimeWarning: coroutine 'greet' was never awaited
To actually run the coroutine, you need an event loop. The simplest way is asyncio.run(), which was introduced in Python 3.7 and is the recommended entry point for async programs.
import asyncio
async def greet(name):
return f"Hello, {name}!"
result = asyncio.run(greet("Kandi"))
print(result)
# Output: Hello, Kandi!
What asyncio.run() does under the hood is create a brand-new event loop, run the provided coroutine to completion, close the loop, and return the result. It is designed to be called once as the top-level entry point of your program.
Call asyncio.run() only once, at the top level of your program. It creates an event loop, runs your coroutine to completion, and then closes the loop. Calling it again inside running async code will raise a RuntimeError because a loop is already active. If you need finer control, use asyncio.Runner (Python 3.11+) as a context manager to run multiple async functions across separate calls.
How await Works
The await keyword is where the suspension actually happens. When a coroutine hits an await expression, it pauses and hands control back to the event loop. The event loop can then run other coroutines until the awaited thing finishes.
You can only use await inside an async def function, and you can only await something that is "awaitable" — that means another coroutine, a Task, or a Future. The Python documentation groups these under a single term: awaitable objects.
import asyncio
async def fetch_data():
print("Starting fetch...")
await asyncio.sleep(2) # simulates a slow I/O operation
print("Data received!")
return "some data"
asyncio.run(fetch_data())
asyncio.sleep() is the async version of time.sleep(). The important difference is that time.sleep() blocks the entire thread, while asyncio.sleep() simply suspends the current coroutine and lets the event loop do other work during that time. This is the foundational pattern of all async programming: replace blocking waits with awaitable suspensions.
It helps to think about what the event loop is actually doing during an await. The coroutine says "I need this result before I can continue." The event loop takes note, shelves that coroutine, and checks whether any other coroutines are ready to run. When the awaited operation completes — maybe a network response arrived, or a timer expired — the event loop puts the original coroutine back on its schedule. No threads were created. No locks were needed.
Never use regular blocking calls like time.sleep(), requests.get(), or synchronous file reads inside a coroutine without wrapping them properly. A blocking call inside a coroutine blocks the entire event loop, defeating the purpose of using async at all. The Python documentation explicitly warns that CPU-bound or blocking code should not be called directly in coroutines (asyncio — Developing with asyncio).
Running Multiple Coroutines Together
The real power of coroutines shows up when you run several of them at the same time. asyncio.gather() takes multiple coroutines and runs them concurrently, collecting their results once all of them finish.
import asyncio
async def task(name, delay):
print(f"{name}: starting")
await asyncio.sleep(delay)
print(f"{name}: done after {delay}s")
return name
async def main():
results = await asyncio.gather(
task("Alpha", 3),
task("Beta", 1),
task("Gamma", 2),
)
print(f"All done: {results}")
asyncio.run(main())
If you ran those three tasks sequentially, you would wait 3 + 1 + 2 = 6 seconds total. With asyncio.gather(), the total wait is only as long as the slowest task — 3 seconds. All three coroutines are alive at the same time; the event loop switches between them as each one pauses.
The output will look something like this:
Alpha: starting
Beta: starting
Gamma: starting
Beta: done after 1s
Gamma: done after 2s
Alpha: done after 3s
All done: ['Alpha', 'Beta', 'Gamma']
asyncio.gather() preserves the order of results, matching the order of the coroutines you passed in — regardless of which one finished first. However, gather() has a design limitation: if one task raises an exception, the other tasks keep running by default. You can pass return_exceptions=True to collect exceptions as results, but that still does not cancel the remaining work. For safer behavior, see TaskGroup below.
Structured Concurrency with TaskGroup
Python 3.11 introduced asyncio.TaskGroup, which the official documentation now recommends over gather() for scheduling concurrent work. The reason is safety: if any task in a TaskGroup raises an exception, all remaining tasks in the group are automatically cancelled before the error propagates.
import asyncio
async def task(name, delay):
print(f"{name}: starting")
await asyncio.sleep(delay)
print(f"{name}: done after {delay}s")
return name
async def main():
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(task("Alpha", 3))
t2 = tg.create_task(task("Beta", 1))
t3 = tg.create_task(task("Gamma", 2))
# All tasks guaranteed complete here
print(f"Results: {t1.result()}, {t2.result()}, {t3.result()}")
asyncio.run(main())
The async with block waits for every task to finish before continuing. If any task fails, the remaining tasks are cancelled and an ExceptionGroup is raised containing all the errors. This pattern is called structured concurrency — the idea that concurrent tasks should have clearly defined lifetimes tied to a scope, rather than floating around independently.
Structured concurrency matters because it prevents a class of bugs that are invisible in simple examples but devastating in production: orphaned tasks that keep running after an error, tasks whose exceptions are silently swallowed, and resource leaks from coroutines that never clean up. TaskGroup eliminates all three.
The official asyncio documentation describes TaskGroup as providing "stronger safety guarantees than gather" for nested subtasks. — Python Docs, Coroutines and Tasks
If you need to support Python 3.9 or 3.10, the taskgroup package on PyPI provides a backport of this API. Use it with a version check: if sys.version_info >= (3, 11): from asyncio import TaskGroup else import from the backport.
Handling Errors in Async Code
Error handling in async code works with standard try/except blocks, but there are important differences in how exceptions propagate depending on whether you use gather() or TaskGroup.
With gather(), if one coroutine raises an exception and return_exceptions is False (the default), the exception is raised to the caller but the other tasks keep running in the background. That is often not what you want.
import asyncio
async def risky_task(name, should_fail=False):
await asyncio.sleep(1)
if should_fail:
raise ValueError(f"{name} encountered an error")
return f"{name} succeeded"
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(risky_task("Safe"))
tg.create_task(risky_task("Dangerous", should_fail=True))
except* ValueError as eg:
for err in eg.exceptions:
print(f"Caught: {err}")
asyncio.run(main())
Notice the except* syntax. Since Python 3.11, this is how you catch exceptions from an ExceptionGroup, which is what TaskGroup raises when one or more tasks fail. Each individual exception is accessible through the group's .exceptions attribute. If you are still on an older Python version, you can catch ExceptionGroup with a regular except and iterate through its contents manually.
A second, less obvious pitfall: if you create a task with asyncio.create_task() but never await it and it raises an exception, the error is silently lost. Python logs a warning — "Task exception was never retrieved" — but your program keeps running with corrupted state. Always await your tasks or handle their results.
Wrapping Blocking Code with asyncio.to_thread
Real-world programs rarely live in a pure async environment. You will inevitably encounter libraries that are synchronous only — requests, database drivers without async support, file I/O operations. Calling these directly in a coroutine blocks the event loop and freezes every other concurrent task.
Python 3.9 introduced asyncio.to_thread() as a clean solution. It runs a synchronous function in a separate thread while keeping the event loop responsive.
import asyncio
import time
def slow_database_query():
# Simulates a blocking database call
time.sleep(2)
return "query results"
async def main():
# Run the blocking call in a thread, keep the loop free
result = await asyncio.to_thread(slow_database_query)
print(result)
asyncio.run(main())
Under the hood, to_thread() uses the event loop's default ThreadPoolExecutor. For CPU-bound work where the GIL would prevent real parallelism, you need to go further: use loop.run_in_executor() with a ProcessPoolExecutor to run work in a completely separate process.
import asyncio
import concurrent.futures
def cpu_heavy_work(n):
# Simulates CPU-bound computation
return sum(i * i for i in range(n))
async def main():
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_heavy_work, 10_000_000)
print(f"Result: {result}")
asyncio.run(main())
This distinction matters more than many tutorials acknowledge. asyncio.to_thread() solves the blocking-I/O-in-a-coroutine problem. ProcessPoolExecutor solves the CPU-bound-in-a-coroutine problem. They are different tools for different bottlenecks, and picking the wrong one gives you the worst of both worlds.
Async Context Managers and Async Iteration
Coroutines are not limited to plain function calls. Python's async with and async for extend the coroutine model to resource management and iteration — two patterns that show up constantly in I/O-heavy code.
Async Context Managers
An async context manager works just like a regular with statement, except it can await during setup and teardown. This is essential for resources that require an asynchronous handshake to acquire or release — database connections, HTTP sessions, WebSocket connections.
import asyncio
class AsyncConnection:
async def __aenter__(self):
print("Opening connection...")
await asyncio.sleep(0.5) # simulate handshake
return self
async def __aexit__(self, exc_type, exc, tb):
print("Closing connection...")
await asyncio.sleep(0.2) # simulate graceful close
async def query(self, sql):
await asyncio.sleep(0.3)
return f"Results for: {sql}"
async def main():
async with AsyncConnection() as conn:
result = await conn.query("SELECT * FROM users")
print(result)
asyncio.run(main())
The __aexit__ method is guaranteed to run even if an exception occurs inside the block, just like a regular context manager. This makes async with the correct pattern for any resource that must be explicitly released.
Async Iteration
The async for statement lets you loop over data that arrives incrementally — streaming API responses, paginated database results, or live event feeds. The underlying async iteration protocol (__aiter__ and __anext__) was introduced in PEP 492, and PEP 525 extended it to support asynchronous generators in Python 3.6.
import asyncio
async def countdown(n):
for i in range(n, 0, -1):
yield i
await asyncio.sleep(0.5)
async def main():
async for count in countdown(5):
print(f"T-minus {count}")
asyncio.run(main())
This is an asynchronous generator. It uses both yield and await — yielding values to the consumer while suspending between yields to wait for I/O. Asynchronous generators were added in Python 3.6 via PEP 525, and they are a powerful tool for building streaming data pipelines without loading entire datasets into memory.
When to Use Coroutines
Coroutines shine in I/O-bound scenarios — situations where your program spends time waiting rather than computing. Good candidates include:
- Making multiple HTTP requests at once (using libraries like
aiohttporhttpx) - Querying databases without blocking your web server (via
asyncpg,aiomysql, or SQLAlchemy's async engine) - Reading and writing files asynchronously with
aiofiles(though note thataiofilesuses a thread pool internally since the OS does not support truly non-blocking filesystem I/O) - Handling many simultaneous WebSocket connections in real-time applications
- Building web APIs with async frameworks like FastAPI, which handles routing and request dispatch through coroutines natively
Coroutines are not the right tool when your code is CPU-bound — meaning it spends its time doing heavy computation like image processing, cryptography, or machine learning inference. In those situations, the event loop cannot help because the processor is busy, not idle. Here is the decision tree:
- I/O-bound with an async library available: Use coroutines directly with
async/await. - I/O-bound with a sync-only library: Use
asyncio.to_thread()to wrap the blocking call. - CPU-bound work: Use
loop.run_in_executor()with aProcessPoolExecutor, or use Python'smultiprocessingmodule directly. - CPU-bound with free-threaded Python: Python 3.13 introduced an experimental build without the GIL, and Python 3.14 promoted it to officially supported status via PEP 779. The free-threaded build enables real parallelism in threads, but it is still optional and not the default build, and ecosystem support is still catching up.
PEP 492 aimed to establish an approachable mental model for async that stays close to synchronous style. — PEP 492, Coroutines with async and await syntax
Debugging Coroutines
Async bugs are notoriously hard to track down because execution order is non-linear. A coroutine might pause, let five other coroutines run, and then resume — and the stack trace does not capture any of that history. Here are concrete strategies that go beyond "add more print statements."
Enable asyncio debug mode. Set the environment variable PYTHONASYNCIODEBUG=1 or pass debug=True to asyncio.run(). This logs slow callbacks, warns about unawaited coroutines, and shows where tasks were created. The overhead is significant, so only use it during development.
Use Python 3.14's introspection tools. Python 3.14 added python -m asyncio ps <PID> and python -m asyncio pstree <PID>, which let you inspect all active tasks in a running process without modifying your code. The pstree variant shows the full async call graph as a tree, making it immediately visible which tasks are waiting on which. This is invaluable for diagnosing deadlocks and slow-running tasks in production.
Name your tasks. When you create tasks with asyncio.create_task() or tg.create_task(), pass a name argument. Unnamed tasks show up in logs as "Task-1", "Task-2", which is useless when you have dozens. Named tasks make log output and the Python 3.14 introspection output far more readable.
task = asyncio.create_task(
fetch_user_data(user_id),
name=f"fetch-user-{user_id}"
)
Watch for silent exceptions. A task that is created but never awaited will have its exception silently discarded. Python logs a message when the task is garbage collected, but by then the damage may already be done. Make it a rule: every create_task() call must have a corresponding await or be inside a TaskGroup.
Key Takeaways
- A coroutine is a pausable function: Defined with
async def, it can suspend execution at anyawaitpoint and resume later without losing its state. This is cooperative multitasking — the coroutine decides when to yield. - Calling a coroutine returns an object, not a result: You must run it through an event loop — typically via
asyncio.run()— to get actual execution. Forgettingawaitis the single most common async bug. - await is the suspension mechanism: It pauses the current coroutine and gives the event loop a chance to run other work while waiting.
- TaskGroup is the modern way to run concurrent tasks: Introduced in Python 3.11, it provides automatic cancellation on failure and structured concurrency guarantees that
gather()lacks. - Blocking code needs special handling: Use
asyncio.to_thread()for sync I/O libraries andProcessPoolExecutorfor CPU-bound work. Never call blocking functions directly in a coroutine. - async with and async for extend the coroutine model: They enable asynchronous resource management and streaming iteration, which are essential for database connections, HTTP sessions, and real-time data feeds.
- Debugging is a first-class concern: Name your tasks, enable debug mode during development, and use Python 3.14's introspection tools to visualize the async call graph in running processes.
Once you internalize what a coroutine actually is — a function that knows how to pause and resume — the rest of Python's async ecosystem starts making a lot more sense. The syntax is just the surface. The model underneath, cooperative multitasking on a single thread managed by an event loop, is what matters. Every await is a deliberate decision point where your code says "I am waiting, let someone else work." That level of explicit control is what makes async Python both powerful and, once the model clicks, surprisingly intuitive.