Python asyncio.gather Explained: Run Multiple API Requests at the Same Time

You know how to write an async function. You know how to await a single API call. But now you need to hit five different endpoints and collect all the results before moving on. Awaiting each one sequentially defeats the purpose of writing async code in the first place. That is where asyncio.gather comes in -- it takes a group of coroutines, launches them all at once, and hands you back every result in a single list.

This article walks through asyncio.gather from its basic signature to production-ready patterns. You will learn how it handles return values, what happens when one of your coroutines throws an exception, how to use return_exceptions to keep your batch running even when individual calls fail, and when the newer TaskGroup might be a better fit.

What asyncio.gather Does and How It Works

The function signature is simple:

asyncio.gather(*aws, return_exceptions=False)

It accepts any number of awaitable objects -- coroutines, tasks, or futures -- and schedules them to run concurrently on the event loop. When every awaitable completes, gather returns a list containing each result. If any awaitable is a bare coroutine (not already wrapped in a task), gather automatically wraps it in a Task for you.

The key word is concurrently, not in parallel. All the coroutines share a single thread. When one coroutine hits an await (waiting for a network response, for example), the event loop switches to another coroutine that is ready to do work. This is cooperative multitasking: no two coroutines execute at the exact same instant, but they overlap their waiting periods so the total elapsed time drops dramatically.

Note

If you pass an empty sequence to asyncio.gather() (no arguments), it returns an empty list immediately. No event loop magic needed.

A Practical Example: Fetching Multiple API Endpoints

Suppose you are building a dashboard that pulls user data, recent orders, and account settings from three separate API endpoints. With synchronous code, you would call each endpoint one after another. With asyncio.gather, all three requests fly out at the same time.

import asyncio
import aiohttp

async def fetch_json(session, url):
    async with session.get(url) as response:
        return await response.json()

async def load_dashboard(user_id):
    base = "https://api.example.com"
    urls = {
        "profile": f"{base}/users/{user_id}",
        "orders":  f"{base}/users/{user_id}/orders?limit=5",
        "settings": f"{base}/users/{user_id}/settings",
    }

    async with aiohttp.ClientSession() as session:
        profile, orders, settings = await asyncio.gather(
            fetch_json(session, urls["profile"]),
            fetch_json(session, urls["orders"]),
            fetch_json(session, urls["settings"]),
        )

    return {"profile": profile, "orders": orders, "settings": settings}

asyncio.run(load_dashboard(42))

Three requests, three await points where the event loop can switch between them. If each endpoint takes 200 milliseconds to respond, the synchronous version would take 600 milliseconds. The gather version finishes in roughly 200 milliseconds -- the time of the single slowest call.

Return Order Is Guaranteed

One of the things that trips people up about concurrent code is wondering whether results come back in the order they were dispatched or in the order they finished. With asyncio.gather, the answer is straightforward: results always come back in the same order you passed the awaitables in.

import asyncio

async def delayed_value(value, delay):
    await asyncio.sleep(delay)
    return value

async def main():
    results = await asyncio.gather(
        delayed_value("slow",   3),   # Takes 3 seconds
        delayed_value("medium", 2),   # Takes 2 seconds
        delayed_value("fast",   1),   # Takes 1 second
    )
    print(results)

asyncio.run(main())
# Output: ['slow', 'medium', 'fast']

Even though "fast" finishes first and "slow" finishes last, the results list maintains the original argument order: ['slow', 'medium', 'fast']. This makes it safe to unpack results by position, as we did with profile, orders, settings in the dashboard example above.

Pro Tip

If you are building a list of coroutines dynamically (from a loop or list comprehension), keep a parallel list of labels or identifiers so you can match each result to the request that produced it.

Error Handling: What Happens When a Request Fails

When one coroutine inside gather raises an exception, the default behavior can be surprising. With return_exceptions=False (the default), the first exception to occur is immediately propagated to whoever is awaiting gather. The other coroutines are not cancelled. They keep running in the background, but you never see their results.

import asyncio

async def succeed():
    await asyncio.sleep(1)
    return "success"

async def fail():
    await asyncio.sleep(0.5)
    raise ValueError("something went wrong")

async def main():
    try:
        results = await asyncio.gather(
            succeed(),
            fail(),
            succeed(),
        )
    except ValueError as e:
        print(f"Caught: {e}")
        # The two succeed() coroutines are still running
        # but their results are lost

asyncio.run(main())
# Output: Caught: something went wrong

This is the part that catches people off guard. The two succeed() coroutines do finish in the background, but because the exception was already raised from gather, there is no mechanism to collect those results. If those coroutines had side effects (writing to a database, sending a notification), those side effects still happen. You just lose the return values.

Warning

With the default return_exceptions=False, only the first exception is raised. If multiple coroutines fail, the additional exceptions are silently ignored. This can hide bugs. If you need visibility into every failure, use return_exceptions=True.

Using return_exceptions=True for Resilient Code

Setting return_exceptions=True changes gather's behavior fundamentally. Instead of raising the first exception, it treats exceptions as return values. Every coroutine runs to completion (or failure), and the results list contains a mix of successful values and exception objects.

import asyncio
import aiohttp

async def fetch_json(session, url):
    async with session.get(url) as response:
        response.raise_for_status()
        return await response.json()

async def fetch_multiple(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_json(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    successes = []
    failures = []

    for url, result in zip(urls, results):
        if isinstance(result, Exception):
            failures.append({"url": url, "error": str(result)})
        else:
            successes.append({"url": url, "data": result})

    return successes, failures

urls = [
    "https://jsonplaceholder.typicode.com/users/1",
    "https://jsonplaceholder.typicode.com/users/999",  # Does not exist
    "https://jsonplaceholder.typicode.com/users/3",
]

successes, failures = asyncio.run(fetch_multiple(urls))

print(f"Succeeded: {len(successes)}")
print(f"Failed: {len(failures)}")
for f in failures:
    print(f"  {f['url']}: {f['error']}")

This pattern is extremely useful in production code where partial success is acceptable. You might be pulling data from 20 microservices to render a dashboard. If one service is down, you would rather show 19 panels with data and one panel with an error message than crash the entire page.

The isinstance(result, Exception) check is the standard way to separate successes from failures in the results list. Because you know the return order matches the input order, pairing results with their corresponding URLs using zip is safe.

Real-World Pattern: Aggregating Data from Multiple Services

Here is a pattern you will see in production async code: a function that gathers data from several sources, handles individual failures gracefully, and logs every error for later investigation.

import asyncio
import aiohttp
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

async def fetch_service(session, name, url, timeout=5.0):
    try:
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=timeout)) as resp:
            resp.raise_for_status()
            data = await resp.json()
            logger.info(f"[{name}] responded with {resp.status}")
            return {"service": name, "data": data, "status": "ok"}
    except asyncio.TimeoutError:
        logger.warning(f"[{name}] timed out after {timeout}s")
        return {"service": name, "data": None, "status": "timeout"}
    except aiohttp.ClientResponseError as e:
        logger.warning(f"[{name}] returned HTTP {e.status}")
        return {"service": name, "data": None, "status": f"http_{e.status}"}
    except Exception as e:
        logger.error(f"[{name}] unexpected error: {e}")
        return {"service": name, "data": None, "status": "error"}

async def aggregate_dashboard(user_id):
    services = {
        "profile":  f"https://api.example.com/users/{user_id}",
        "orders":   f"https://api.example.com/orders?user={user_id}",
        "payments": f"https://api.example.com/payments?user={user_id}",
        "reviews":  f"https://api.example.com/reviews?user={user_id}",
    }

    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(
            *[fetch_service(session, name, url) for name, url in services.items()]
        )

    dashboard = {r["service"]: r for r in results}
    failed = [r["service"] for r in results if r["status"] != "ok"]

    if failed:
        logger.warning(f"Dashboard incomplete. Failed services: {failed}")

    return dashboard

Notice that this version does not use return_exceptions=True. Instead, each fetch_service function catches its own exceptions and returns a standardized dictionary with a "status" field. This approach gives you more control over error formatting and logging compared to checking isinstance after the fact. Both patterns are valid. Choose whichever keeps your code clearer for the use case at hand.

gather vs TaskGroup: Which One Should You Use

Python 3.11 introduced asyncio.TaskGroup, which provides a structured alternative to gather. The core difference is how they handle failures: when one task in a TaskGroup raises an exception, all remaining tasks are cancelled. With gather, the remaining tasks keep running.

Behavior asyncio.gather asyncio.TaskGroup
Minimum Python version 3.7 3.11
On first exception (default) Propagates to caller; other tasks keep running Cancels all remaining tasks; raises ExceptionGroup
Partial success mode Yes, via return_exceptions=True No built-in equivalent
Return values List in argument order Access via individual task.result()
Structured concurrency No -- tasks can outlive the gather call Yes -- all tasks are scoped to the async with block
Best for Batch operations where partial failure is acceptable Operations where all-or-nothing is the correct behavior

Here is what the dashboard example looks like with TaskGroup:

import asyncio
import aiohttp

async def fetch_json(session, url):
    async with session.get(url) as response:
        return await response.json()

async def load_dashboard_taskgroup(user_id):
    base = "https://api.example.com"

    async with aiohttp.ClientSession() as session:
        async with asyncio.TaskGroup() as tg:
            profile_task  = tg.create_task(fetch_json(session, f"{base}/users/{user_id}"))
            orders_task   = tg.create_task(fetch_json(session, f"{base}/users/{user_id}/orders"))
            settings_task = tg.create_task(fetch_json(session, f"{base}/users/{user_id}/settings"))

    return {
        "profile":  profile_task.result(),
        "orders":   orders_task.result(),
        "settings": settings_task.result(),
    }

If any task fails, the other two are automatically cancelled and an ExceptionGroup is raised. This is the safer choice when partial data is not useful -- for example, a payment processing pipeline where you need both the user record and the payment method to succeed, or there is no point continuing.

asyncio.gather (default) Task A -- completes OK Task B -- FAILS Task C -- still running Exception raised to caller Task C keeps running (result lost) asyncio.TaskGroup (Python 3.11+) Task A -- completes OK Task B -- FAILS Task C ExceptionGroup raised Task C cancelled automatically Completed Failed Cancelled
With gather, remaining tasks continue running after a failure. With TaskGroup, remaining tasks are cancelled immediately.

Key Takeaways

  1. asyncio.gather runs coroutines concurrently, not in parallel: All coroutines share one thread. They overlap their I/O waits, which is why total execution time equals the slowest individual call rather than the sum of all calls.
  2. Results always match input order: No matter which coroutine finishes first, the results list preserves the same position as the awaitables you passed to gather. This makes it safe to unpack by position or pair with a parallel list of identifiers.
  3. Default error behavior hides problems: With return_exceptions=False, only the first exception surfaces. Other coroutines keep running but their results are discarded. Use return_exceptions=True when you need visibility into every failure.
  4. return_exceptions=True enables partial success: Exceptions become values in the results list. Check each result with isinstance(result, Exception) to separate successes from failures. This is essential for batch operations where some failures are tolerable.
  5. TaskGroup is the modern alternative for all-or-nothing work: Available in Python 3.11+, TaskGroup cancels remaining tasks when one fails. Use it when partial results are not useful. Use gather when you want every task to run to completion regardless of individual failures.

Understanding asyncio.gather is the turning point where async Python stops being a curiosity and starts being a practical tool. Once you can launch, collect, and handle errors across a group of concurrent operations, you have the foundation for building services that talk to multiple APIs, databases, and external systems without wasting time waiting in line.

back to articles