Python asyncio.TaskGroup vs gather: Modern Patterns for Concurrent API Calls

Every asyncio application has a ghost story. A task is spawned, its parent crashes, and the task keeps running -- holding a database connection, retrying an HTTP request, mutating shared state -- with no one watching. The traceback never shows it. The log file never mentions it. You only find out when a connection pool drains, a rate limit trips, or a row in production has data that should not exist. Python 3.11 introduced asyncio.TaskGroup to make this class of bug structurally impossible. If you have been using asyncio.gather to run concurrent API calls, you now have two tools with fundamentally different philosophies about what happens when things go wrong. This article puts them side by side, walks through the internal mechanics, and maps the full decision space -- including the tools that sit between them.

What Structured Concurrency Means

In traditional asyncio code, tasks can outlive the scope that created them. You call asyncio.create_task(), and that task runs in the background with no guarantee about when it finishes or what happens if it fails. If you forget to await it, exceptions are silently swallowed. If it runs longer than expected, resources it holds (database connections, file handles) may not be released when you think they are. If you are new to coroutines in Python, understanding how they differ from regular functions is essential context for everything that follows.

Structured concurrency fixes this by scoping every task to a well-defined block. When the block exits, all tasks inside it are guaranteed to be done -- either completed successfully, failed with an exception, or cancelled. No dangling tasks, no orphaned coroutines, no silent failures. TaskGroup is Python's standard library implementation of this pattern, inspired by Trio's nurseries and formalized in the asyncio module starting with Python 3.11.

The Python documentation makes the distinction explicit: when any task (or subtask) raises an exception, TaskGroup cancels the remaining scheduled tasks, while gather does not. This difference in safety guarantees is the core motivation for TaskGroup's existence.
-- paraphrased from the Python 3.11+ documentation, Coroutines and Tasks

The motivation behind TaskGroup runs deeper than API convenience. PEP 654, authored by Irit Katriel with support from asyncio maintainer Yury Selivanov and Python creator Guido van Rossum, explains the problem it solves: the interpreter could previously propagate only one exception at a time, but concurrent code can produce multiple unrelated failures simultaneously. Libraries like Trio had been using workarounds such as MultiError, but handling multiple exceptions properly required language-level changes. The ExceptionGroup type and except* syntax were the result.

Note

asyncio.TaskGroup requires Python 3.11 or later. The except* syntax used to catch ExceptionGroup errors also requires Python 3.11+. If you need to support earlier versions, asyncio.gather is your only option in the standard library.

The Same Task, Two Approaches

Here is a function that fetches three users from an API. First with gather, then with TaskGroup.

Using asyncio.gather

import asyncio
import httpx

async def fetch_user(client, user_id):
    response = await client.get(f"https://jsonplaceholder.typicode.com/users/{user_id}")
    response.raise_for_status()
    return response.json()

async def main_gather():
    async with httpx.AsyncClient() as client:
        results = await asyncio.gather(
            fetch_user(client, 1),
            fetch_user(client, 2),
            fetch_user(client, 3),
        )
    for user in results:
        print(user["name"])

asyncio.run(main_gather())

Using asyncio.TaskGroup

import asyncio
import httpx

async def fetch_user(client, user_id):
    response = await client.get(f"https://jsonplaceholder.typicode.com/users/{user_id}")
    response.raise_for_status()
    return response.json()

async def main_taskgroup():
    async with httpx.AsyncClient() as client:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(fetch_user(client, 1))
            task2 = tg.create_task(fetch_user(client, 2))
            task3 = tg.create_task(fetch_user(client, 3))

        # All tasks are guaranteed complete at this point
        for task in [task1, task2, task3]:
            print(task.result()["name"])

asyncio.run(main_taskgroup())

Both versions run three requests concurrently. Both finish in the time of the slowest request. The difference is not in the happy path -- it is in what happens when something goes wrong.

How Each Handles a Failing Task

gather: first exception propagates, other tasks keep running

With gather's default behavior (return_exceptions=False), the first exception raised by any task is immediately propagated to the caller. The other tasks are not cancelled. They continue running in the background, but their results are never collected. If those tasks have side effects, those side effects still happen.

async def succeed():
    await asyncio.sleep(2)
    print("succeed() finished")  # This still prints
    return "ok"

async def fail():
    await asyncio.sleep(0.5)
    raise ValueError("something broke")

async def main():
    try:
        results = await asyncio.gather(succeed(), fail())
    except ValueError as e:
        print(f"Caught: {e}")
        # succeed() is still running in the background

asyncio.run(main())
# Output:
# Caught: something broke
# succeed() finished    <-- runs after the except block

TaskGroup: all remaining tasks are cancelled

With TaskGroup, when one task fails, the TaskGroup cancels all remaining tasks and waits for the cancellations to complete. Only then does it raise an ExceptionGroup containing all the exceptions that occurred.

async def succeed():
    await asyncio.sleep(2)
    print("succeed() finished")  # This does NOT print
    return "ok"

async def fail():
    await asyncio.sleep(0.5)
    raise ValueError("something broke")

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(succeed())
            tg.create_task(fail())
    except* ValueError as eg:
        for exc in eg.exceptions:
            print(f"Caught: {exc}")

asyncio.run(main())
# Output:
# Caught: something broke
# (succeed() was cancelled -- no "finished" message)

This is the fundamental difference. With gather, tasks can outlive the error. With TaskGroup, every task is contained within the scope of the async with block. When the block exits, everything inside it is done.

Warning

TaskGroup raises an ExceptionGroup, not a single exception. You must use except* (with the asterisk) to catch it. A regular except ValueError will not match an ExceptionGroup containing a ValueError.

Catching Errors with except*

The except* syntax is new in Python 3.11 (PEP 654). It handles ExceptionGroup objects by matching against the types of exceptions inside the group. You can have multiple except* clauses, each handling different exception types.

import asyncio
import httpx

async def fetch_or_fail(client, url):
    response = await client.get(url)
    response.raise_for_status()
    return response.json()

async def main():
    async with httpx.AsyncClient() as client:
        try:
            async with asyncio.TaskGroup() as tg:
                tg.create_task(fetch_or_fail(client, "https://jsonplaceholder.typicode.com/users/1"))
                tg.create_task(fetch_or_fail(client, "https://httpbin.org/status/500"))
                tg.create_task(fetch_or_fail(client, "https://nonexistent.invalid/data"))
        except* httpx.HTTPStatusError as eg:
            for exc in eg.exceptions:
                print(f"HTTP error: {exc.response.status_code}")
        except* httpx.ConnectError as eg:
            for exc in eg.exceptions:
                print(f"Connection failed: {exc}")

asyncio.run(main())

Each except* clause receives a sub-group containing only the exceptions of that type. If the ExceptionGroup contains two HTTPStatusErrors and one ConnectError, the first except* block handles both HTTP errors, and the second block handles the connection error. This is a significant improvement over gather, which only surfaces the first exception and hides the rest.

Pro Tip

The .exceptions attribute on the matched group is a tuple of all the individual exceptions of that type. Iterate over it to handle each one separately, or access len(eg.exceptions) to get a count.

The Comparison Table

Behavior asyncio.gather asyncio.TaskGroup
Minimum Python version3.4 (async/await syntax from 3.5)3.11
On first failure (default)Raises the first exception; other tasks keep runningCancels all remaining tasks; raises ExceptionGroup
Multiple exceptionsOnly the first is visible (others silently lost)All exceptions collected in ExceptionGroup
Partial success modeYes, via return_exceptions=TrueNo built-in equivalent
Return valuesOrdered list matching input orderAccess via task.result() on individual Task objects
Task scopingTasks can outlive the gather callAll tasks scoped to the async with block
Dynamic task creationNo -- all awaitables passed upfrontYes -- call tg.create_task() at any time within the block
Error handling syntaxStandard exceptexcept* (ExceptionGroup matching)
Best forBatch operations tolerating partial failureAll-or-nothing operations requiring clean cancellation
Built-in concurrency limitNo -- use asyncio.SemaphoreNo -- use asyncio.Semaphore

When to Use gather

Use asyncio.gather when partial success is acceptable and you want results as an ordered list. The classic use case is a dashboard that pulls data from five different microservices. If one service is down, you would rather show four panels with data and one with an error message than cancel the entire page load.

async def load_dashboard(client, user_id):
    results = await asyncio.gather(
        client.get(f"/users/{user_id}"),
        client.get(f"/orders/{user_id}"),
        client.get(f"/notifications/{user_id}"),
        client.get(f"/settings/{user_id}"),
        client.get(f"/recommendations/{user_id}"),
        return_exceptions=True,
    )

    dashboard = {}
    labels = ["profile", "orders", "notifications", "settings", "recommendations"]
    for label, result in zip(labels, results):
        if isinstance(result, Exception):
            dashboard[label] = {"error": str(result)}
        else:
            dashboard[label] = result.json()

    return dashboard

Also use gather when you need to support Python versions before 3.11, or when the simplicity of getting a results list in argument order matters more than structured cleanup guarantees.

When to Use TaskGroup

Use TaskGroup when all tasks must succeed for the operation to be meaningful. The classic use case is a transaction where you need to validate a payment, reserve inventory, and update the order record. If any step fails, you want the others cancelled immediately -- not left running in the background with unknown side effects.

async def process_order(client, order):
    try:
        async with asyncio.TaskGroup() as tg:
            payment = tg.create_task(
                client.post("/payments/validate", json=order["payment"])
            )
            inventory = tg.create_task(
                client.post("/inventory/reserve", json=order["items"])
            )
            record = tg.create_task(
                client.post("/orders/create", json=order)
            )

        # All three succeeded
        return {
            "payment_id": payment.result().json()["id"],
            "reservation_id": inventory.result().json()["id"],
            "order_id": record.result().json()["id"],
        }

    except* httpx.HTTPStatusError as eg:
        # Any failure cancels the other tasks automatically
        failed_services = [str(e) for e in eg.exceptions]
        raise OrderProcessingError(
            f"Order failed. Service errors: {failed_services}"
        )

Also use TaskGroup when you need to dynamically add tasks during execution. Unlike gather, where you must pass all awaitables upfront, TaskGroup lets you call tg.create_task() from anywhere inside the block, including from within tasks that are already running.

The Tools Between Them: asyncio.wait and as_completed

The conversation around concurrent asyncio patterns often collapses into a binary: gather or TaskGroup. But the standard library has two other primitives that occupy the space between them, and understanding where they fit sharpens your mental model of the entire concurrency toolkit.

asyncio.wait: fine-grained control over completion

asyncio.wait returns two sets -- done and pending -- and accepts a return_when parameter that controls when it stops waiting. The three options are ALL_COMPLETED (the default), FIRST_COMPLETED, and FIRST_EXCEPTION. This gives you something neither gather nor TaskGroup provides: the ability to process results incrementally while other tasks are still running, without cancelling them.

import asyncio
import httpx

async def fetch(client, url):
    response = await client.get(url)
    response.raise_for_status()
    return response.json()

async def fetch_with_progress(urls):
    async with httpx.AsyncClient() as client:
        tasks = {asyncio.create_task(fetch(client, url)): url for url in urls}
        pending = set(tasks.keys())
        results = {}

        while pending:
            done, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
            for task in done:
                url = tasks[task]
                try:
                    results[url] = task.result()
                except Exception as e:
                    results[url] = e
                print(f"Completed {len(results)}/{len(urls)}")

    return results

The tradeoff is that asyncio.wait provides no structured concurrency guarantees. Tasks you create are not scoped to any block. If your function raises before all tasks are done, you are responsible for cancelling the pending set yourself -- exactly the problem TaskGroup was designed to eliminate. Use asyncio.wait when you need incremental processing and are willing to manage the task lifecycle manually.

asyncio.as_completed: results in arrival order

asyncio.as_completed returns an iterator of awaitables that yield results in the order tasks finish, not the order they were submitted. This is useful when the fastest result matters more than the original order -- for example, querying multiple CDN endpoints and using whichever responds first.

import asyncio
import httpx

async def fetch(client, url):
    response = await client.get(url)
    response.raise_for_status()
    return url, response.json()

async def first_available(urls):
    async with httpx.AsyncClient() as client:
        tasks = [asyncio.create_task(fetch(client, url)) for url in urls]
        for coro in asyncio.as_completed(tasks):
            try:
                url, data = await coro
                # Cancel remaining tasks once we have one result
                for t in tasks:
                    t.cancel()
                return data
            except Exception:
                continue  # Try the next one to finish
    raise RuntimeError("All sources failed")

Like asyncio.wait, as_completed does not manage task lifetimes for you. The cancellation in the example above is manual. In Python 3.12+, as_completed can also be used as an async context manager with a timeout, but it still does not scope or cancel tasks on its own.

Note

Think of the four tools as a spectrum of control versus safety. TaskGroup gives you the least control over individual task lifecycles but the strongest safety guarantees. asyncio.wait gives you the finest control but requires you to handle every edge case yourself. gather and as_completed sit in between, each with a different emphasis on result ordering and error handling.

A Mental Model for Choosing

Rather than memorizing rules, it helps to think about these tools through a single question: who is responsible for cleaning up when something goes wrong?

With TaskGroup, the runtime is responsible. The async with block is a contract: nothing escapes. If a task fails, cancellation propagates automatically. If the parent is cancelled, children are cancelled automatically. You write the happy path and the error-handling logic; the lifecycle management is handled for you. This is the same principle behind context managers for files and database connections -- except applied to concurrent execution.

With gather, responsibility is shared. The function collects results and can surface errors, but it does not cancel anything on failure (by default) and does not scope task lifetimes. You have to think about what happens to the tasks you did not await.

With asyncio.wait and as_completed, you are fully responsible. These are power tools. They give you the most flexibility, but every task you create, you must also track and cancel.

Here is a quick decision path:

Decision Path

Do all tasks need to succeed for the result to be valid? Use TaskGroup.
Can the operation return partial results? Use gather(return_exceptions=True).
Do you need to process results as they arrive? Use asyncio.wait(return_when=FIRST_COMPLETED) or asyncio.as_completed.
Do you need to race tasks and use the first success? Use asyncio.as_completed with manual cancellation, or TaskGroup with the early termination pattern.
Are you supporting Python versions before 3.11? Use gather or asyncio.wait.

Limiting Concurrency with a Semaphore

One question neither gather nor TaskGroup answers on its own: what happens when you have 500 URLs and the API allows 10 concurrent requests? Both tools will happily launch all 500 at once, saturate connection pools, trigger HTTP 429 responses, and possibly get your client IP blocked. Neither provides a built-in throttle.

The standard solution is asyncio.Semaphore. A semaphore manages an internal counter: each acquire() decrements it, each release() increments it, and when it hits zero, additional callers wait until a slot opens. Wrapping your coroutine body in an async with semaphore block limits how many instances of that coroutine can execute simultaneously.

Semaphore with TaskGroup

import asyncio
import httpx

async def fetch(client, semaphore, url):
    async with semaphore:
        response = await client.get(url)
        response.raise_for_status()
        return response.json()

async def fetch_all(urls, max_concurrent=10):
    semaphore = asyncio.Semaphore(max_concurrent)
    async with httpx.AsyncClient() as client:
        async with asyncio.TaskGroup() as tg:
            tasks = [
                tg.create_task(fetch(client, semaphore, url))
                for url in urls
            ]
    return [t.result() for t in tasks]

Semaphore with gather

async def fetch_all_gather(urls, max_concurrent=10):
    semaphore = asyncio.Semaphore(max_concurrent)
    async with httpx.AsyncClient() as client:
        results = await asyncio.gather(
            *(fetch(client, semaphore, url) for url in urls),
            return_exceptions=True,
        )
    return results

The pattern is identical in both cases -- the semaphore lives inside the coroutine, not at the scheduling layer. This means the choice between TaskGroup and gather remains a question of error handling philosophy, not concurrency control.

Pro Tip

Set your semaphore limit at or below your HTTP client's max_connections value. If you use httpx.AsyncClient(limits=httpx.Limits(max_connections=20)), a semaphore of 50 will not give you 50 concurrent requests -- the connection pool becomes the bottleneck instead, and the extra tasks just queue at the TCP layer with no visibility into why.

Gotchas and Edge Cases

Both patterns have subtle behaviors that can surprise you in production. These are the edge cases that other tutorials leave out.

TaskGroup and CancelledError

If the parent task running a TaskGroup is cancelled from outside (for example, by a timeout or an explicit task.cancel()), the TaskGroup cancels all its children and then propagates the CancelledError upward. However, if a child task catches CancelledError and suppresses it, the TaskGroup may misbehave. The Python documentation explicitly warns against this:

The Python documentation explicitly warns that the structured concurrency primitives in asyncio -- including TaskGroup and asyncio.timeout() -- rely on cancellation as an internal mechanism. If a coroutine catches and suppresses CancelledError, these primitives may not behave correctly.
-- paraphrased from the Python documentation, Coroutines and Tasks

If your coroutines use try/except blocks that might catch CancelledError (for example, a bare except Exception), always re-raise it. Since Python 3.9, CancelledError subclasses BaseException rather than Exception, so a plain except Exception will not catch it -- but legacy code or overly broad exception handling can still cause problems.

Nesting TaskGroups

TaskGroup supports nesting. An outer TaskGroup can contain inner TaskGroups, and cancellation propagates correctly through the hierarchy. If an inner group fails, only the inner group's tasks are cancelled initially. The inner group's ExceptionGroup then propagates to the outer group, which decides whether the failure should cancel the outer scope as well.

async def main():
    async with asyncio.TaskGroup() as outer:
        outer.create_task(independent_work())

        async with asyncio.TaskGroup() as inner:
            inner.create_task(subtask_a())
            inner.create_task(subtask_b())  # If this fails...

        # ...inner group cancels subtask_a, raises ExceptionGroup
        # ...which propagates to outer group, cancelling independent_work

gather's return_exceptions silently consumes CancelledError

When using gather(return_exceptions=True), cancelled tasks appear in the results list as CancelledError instances. This makes it easy to accidentally ignore cancellations when iterating through results with isinstance(result, Exception). Since CancelledError is a BaseException (not an Exception), your type check may miss it entirely.

# Bug: CancelledError is a BaseException, not Exception
results = await asyncio.gather(*tasks, return_exceptions=True)
for r in results:
    if isinstance(r, Exception):  # Misses CancelledError!
        handle_error(r)

# Fix: check BaseException instead
for r in results:
    if isinstance(r, BaseException):
        handle_error(r)

TaskGroup does not support timeouts natively

If you need a timeout on the entire group, wrap the TaskGroup in asyncio.timeout(). Do not try to implement timeouts by cancelling individual tasks inside the group -- let the structured concurrency primitives compose naturally.

async def fetch_all_with_timeout(client, urls, timeout_seconds):
    try:
        async with asyncio.timeout(timeout_seconds):
            async with asyncio.TaskGroup() as tg:
                tasks = [tg.create_task(fetch(client, url)) for url in urls]
        return [t.result() for t in tasks]
    except TimeoutError:
        # All tasks were cancelled automatically by the timeout
        raise

Terminating a TaskGroup early

Sometimes you want to stop all tasks in a group before they finish -- not because something failed, but because you already have the result you need. A common example: you query three mirror servers for the same file and want to cancel the remaining requests as soon as the first one responds.

TaskGroup has no cancel() method. The standard library pattern for early termination is to inject a task that raises a custom exception, then catch and discard that exception at the group boundary.

import asyncio

class TerminateTaskGroup(Exception):
    """Raised to deliberately stop all tasks in a group."""

async def force_terminate():
    raise TerminateTaskGroup()

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(long_running_job())
            tg.create_task(another_job())

            # After some condition is met, terminate the group
            await asyncio.sleep(5)
            tg.create_task(force_terminate())

    except* TerminateTaskGroup:
        pass  # Expected -- group was deliberately stopped

This works because raising any exception inside a TaskGroup triggers the cancellation of all other tasks. By using a custom exception type and catching it with except*, you get clean termination without mixing it up with real errors. This pattern is documented in the Python 3.13+ documentation.

Python 3.13 improved TaskGroup cancellation internals

Python 3.13 fixed several edge cases in how TaskGroup handles overlapping cancellations. In Python 3.11 and 3.12, simultaneous internal cancellations (from a child task failing) and external cancellations (from an outer timeout or parent task) could interact in unpredictable ways, sometimes losing track of cancellation counts or failing to propagate the external cancellation correctly.

Starting with Python 3.13, TaskGroup correctly preserves cancellation counts when both kinds of cancellation happen at the same time. The uncancel() method was also updated to rescind pending cancellation requests when the count reaches zero, which prevents stale cancellations from misfiring later. If you are running production workloads with nested TaskGroups and asyncio.timeout(), upgrading to 3.13 or later eliminates a class of subtle bugs that are difficult to reproduce in testing.

What Is Happening Inside TaskGroup

Understanding the internal state machine of TaskGroup helps explain why it behaves the way it does -- and why certain patterns (like suppressing CancelledError) cause it to break.

When you enter an async with asyncio.TaskGroup() as tg block, the group creates an internal asyncio.Event and registers itself as the active task group for the current task. Each call to tg.create_task() adds a done callback to the new task. That callback is the mechanism: when any child task finishes (successfully or not), the callback fires and checks whether the task raised an exception.

If the callback detects a failure, it sets an internal abort flag and cancels the current task (the one that owns the async with block). This is the critical detail: TaskGroup uses cancellation of the parent task as the mechanism for unwinding. The parent receives a CancelledError, which causes it to re-enter the __aexit__ method of the context manager. Inside __aexit__, the group iterates over all remaining tasks, calls .cancel() on each one, and then awaits them until they are all done. Only after every task has finished (successfully, with an error, or cancelled) does the group raise the ExceptionGroup.

This is why suppressing CancelledError is destructive. The parent task's cancellation is not a signal that something went wrong with the parent -- it is TaskGroup's internal mechanism for gaining control back from the event loop. If a coroutine catches and suppresses that CancelledError, the group never gets the chance to run its cleanup logic, and the invariant that "all tasks are done when the block exits" is violated.

It also explains why TaskGroup raises an ExceptionGroup rather than a single exception. Between the moment the first task fails and the moment __aexit__ finishes awaiting all children, additional tasks may also fail -- either because of their own logic or because they raise an exception in response to being cancelled. The group collects all of these into a single ExceptionGroup so that no failure is silently lost.

Note

The internal mechanics described here are based on the CPython 3.13+ implementation. The TaskGroup source is roughly 200 lines of Python and is readable in the standard library at Lib/asyncio/taskgroups.py. Reading it is one of the best ways to build intuition for how structured concurrency works in practice.

Python 3.14: eager_start, Call Graphs, and Free Threading

Python 3.14 brings three changes to asyncio that directly affect how you work with TaskGroup.

The eager_start parameter

In Python 3.14, TaskGroup.create_task() accepts an eager_start keyword argument. When set to True, the coroutine begins executing synchronously during the create_task() call itself, rather than waiting to be scheduled by the event loop. If the coroutine completes without hitting an await, it never enters the event loop's scheduling queue at all.

# Python 3.14+
async def get_user_from_cache(user_id):
    """Returns immediately if cached, awaits network if not."""
    if user_id in cache:
        return cache[user_id]
    return await fetch_from_api(user_id)

async def main():
    async with asyncio.TaskGroup() as tg:
        # If the user is cached, this completes synchronously
        # and never enters the event loop scheduler
        task = tg.create_task(
            get_user_from_cache(42),
            eager_start=True,
        )

This is a performance optimization, not a behavioral change. The results are identical -- the difference is in scheduling overhead. For workloads where many tasks resolve from a cache or a local lookup, eager start can eliminate the cost of round-tripping through the event loop. Meta (formerly Facebook) reported roughly 4% CPU savings in production Django workloads when they first prototyped this pattern in their Cinder Python fork.

Warning

Eager start changes when your code runs, not what it does. If a coroutine modifies shared state before its first await, that mutation now happens during create_task() rather than later. For code that depends on the order of task scheduling, this can introduce subtle issues. Use it deliberately on coroutines you know are safe for synchronous execution.

Call graph introspection

Python 3.14 adds asyncio.capture_call_graph() and asyncio.print_call_graph(), which let you visualize the full tree of nested TaskGroups and tasks at runtime. This makes it possible to inspect, from a debugger or a production monitoring hook, exactly which tasks are waiting on which other tasks.

# Python 3.14+
# From the command line, inspect a running process:
#   python -m asyncio pstree <PID>
#
# Output:
# └── (T) Task-1
#     └── main example.py:13
#         ├── (T) fetch-user-1
#         │   └── fetch example.py:5
#         │       └── sleep
#         └── (T) fetch-user-2
#             └── fetch example.py:5
#                 └── ClientSession.get

# Or programmatically:
import asyncio

async def debug_hook():
    """Call from any running task to print the full graph."""
    asyncio.print_call_graph(asyncio.current_task())

For applications using nested TaskGroups -- where an outer group spawns inner groups, and those spawn further tasks -- this is a significant debugging improvement. Previously, figuring out which task was blocking which other task required careful logging or third-party tools. Now the entire awaiter chain is available as a built-in.

Free-threaded asyncio performance

Python 3.14 reworked asyncio's internals for the free-threaded build (no GIL). The current task is now stored in thread-local state rather than a global dictionary, and the set of all tasks uses per-thread data structures. The result is a measured 10-20% improvement in single-threaded asyncio performance (on the standard pyperformance benchmark suite), with the ability to scale linearly across threads in the free-threaded build. For code using TaskGroup, this means lower scheduling overhead and reduced memory usage per task, even if you are not running the free-threaded build.

Migration Checklist

If you are migrating existing gather calls to TaskGroup, use this checklist to avoid common pitfalls.

  1. Confirm Python 3.11+ is your minimum target. TaskGroup and except* have no standard library backport. The exceptiongroup package on PyPI backports the ExceptionGroup type to Python 3.7+, but it cannot provide the except* syntax. A separate taskgroup package by Thomas Grainger backports TaskGroup, asyncio.Runner, and asyncio.timeout to Python 3.8 through 3.10 -- though without except*, you will need to catch ExceptionGroup with a regular except and iterate its .exceptions attribute manually.
  2. Identify whether you need partial success. If any call site uses return_exceptions=True and processes mixed results/errors, that pattern has no direct TaskGroup equivalent. Keep gather for those cases, or wrap individual tasks in try/except inside the TaskGroup.
  3. Consider whether asyncio.wait fits better. If existing code processes results as they arrive (for example, a progress bar that updates per-response), neither gather nor TaskGroup is the right replacement. Use asyncio.wait(return_when=FIRST_COMPLETED) in a loop, with a try/finally block to cancel pending tasks on error.
  4. Replace result list access with task.result(). gather returns results in an ordered list. TaskGroup returns nothing -- you call .result() on each task object after the async with block exits.
  5. Replace except with except*. Every try/except around a gather call must become try/except* around the TaskGroup block. A regular except ValueError will not catch a ValueError wrapped in an ExceptionGroup.
  6. Audit for CancelledError suppression. Any coroutine that catches and suppresses CancelledError (even accidentally, via except BaseException) will interfere with TaskGroup's cancellation machinery. This includes third-party libraries that use broad exception handling internally.
  7. Test cancellation paths. The biggest behavioral change is that TaskGroup cancels siblings on failure. If your tasks have side effects, verify that mid-execution cancellation is safe or add proper cleanup in finally blocks.
  8. Target Python 3.13+ for production if possible. The cancellation internals in Python 3.11 and 3.12 have known edge cases with overlapping cancellations. Python 3.13 resolved these, making nested TaskGroup and asyncio.timeout() combinations more reliable.
  9. Evaluate Python 3.14 for new projects. The eager_start parameter and call graph introspection tools make TaskGroup both faster and more debuggable. The free-threaded asyncio improvements also reduce per-task scheduling overhead.

Key Takeaways

  1. TaskGroup provides structured concurrency; gather does not: With TaskGroup, every task is scoped to the async with block. When the block exits, all tasks are done. With gather, tasks can continue running after the gather call completes or raises.
  2. TaskGroup cancels remaining tasks on failure; gather does not: When one task in a TaskGroup raises an exception, all other tasks are cancelled before the ExceptionGroup is raised. With gather (default), the first exception propagates but other tasks keep running in the background.
  3. TaskGroup surfaces all errors; gather hides extras: TaskGroup collects every exception into an ExceptionGroup. gather (default) only shows the first exception. You need return_exceptions=True to see all failures with gather.
  4. Use gather for partial success; TaskGroup for all-or-nothing: If your operation can still provide value with incomplete results (dashboards, batch fetches), use gather with return_exceptions=True. If all tasks must succeed or the entire operation is meaningless (transactions, pipelines), use TaskGroup.
  5. asyncio.wait and as_completed fill the middle ground: When you need to process results as they arrive or implement custom early-exit logic, asyncio.wait and asyncio.as_completed offer finer control than gather without the all-or-nothing semantics of TaskGroup. The tradeoff is that you manage task lifetimes yourself.
  6. except* is required for TaskGroup errors: ExceptionGroup cannot be caught with a regular except clause. Use except* to match exception types within the group. Each except* block receives a sub-group containing only the exceptions of that type.
  7. Never suppress CancelledError inside a TaskGroup: TaskGroup uses cancellation of the parent task as its internal mechanism for cleanup. Suppressing CancelledError prevents the group from running its __aexit__ logic, violating the invariant that all tasks are done when the block exits.
  8. Use a semaphore to throttle concurrency: Neither TaskGroup nor gather limits how many tasks run at the same time. Wrap coroutine bodies in async with semaphore to respect API rate limits, and keep the semaphore value at or below your HTTP client's connection pool size.
  9. Prefer Python 3.13+ for production TaskGroup usage: Python 3.13 fixed edge cases in how TaskGroup handles simultaneous internal and external cancellations. Earlier versions can lose track of cancellation counts in nested timeout and task group scenarios.
  10. Python 3.14 makes TaskGroup faster and more debuggable: The eager_start parameter eliminates scheduling overhead for tasks that resolve synchronously. Call graph introspection lets you visualize the entire tree of nested groups and tasks at runtime. Free-threaded asyncio improvements reduce per-task overhead even on the standard GIL build.

The addition of TaskGroup to the standard library marks a significant step forward for Python's async story. It eliminates an entire category of bugs related to orphaned tasks and hidden exceptions. But the real shift is conceptual: Python's concurrency model is moving from "launch tasks and hope for the best" toward "every concurrent operation has a well-defined owner and a guaranteed cleanup path." That is the same trajectory that Trio charted years earlier, and it is now being formalized at the language level through PEP 654, PEP 789 (preventing cancellation bugs in async generators), and the ongoing work on free-threaded asyncio.

For new projects running on Python 3.11 or later, TaskGroup should be the default starting point for concurrent operations. gather remains valuable for its partial success mode and backward compatibility. asyncio.wait and as_completed remain valuable for fine-grained control over task processing order. And as Python 3.14 matures, the combination of eager task execution, call graph introspection, and free-threaded performance improvements will make structured concurrency not just safer, but faster and more observable. The tools are converging on a future where the right thing to do is also the easy thing to do.

Sources and Further Reading