Build a FastAPI Async Backend That Calls External APIs Without Blocking

FastAPI is built on async Python. Its entire architecture is designed to handle thousands of concurrent requests by never blocking the event loop. But the moment your endpoint calls an external API using a synchronous library like requests, that entire advantage disappears. The event loop freezes, incoming requests queue up, and your fast API becomes a slow API. This article shows you how to call external services the right way using httpx.AsyncClient, so your backend stays responsive under load.

By the end, you will have a working FastAPI application that manages an httpx client through lifespan events, makes concurrent requests to external services, handles failures gracefully, and offloads non-critical work to background tasks.

Why Blocking Calls Break FastAPI

FastAPI runs on a single-threaded event loop (via Uvicorn). When you define a route with async def, it runs directly on that event loop. If your code hits a blocking operation -- like requests.get() or time.sleep() -- the event loop cannot do anything else until that operation finishes. Every other incoming request waits.

import requests
from fastapi import FastAPI

app = FastAPI()

# BAD: blocks the event loop for ~500ms per call
@app.get("/weather")
async def get_weather():
    response = requests.get("https://api.weather.example.com/current")
    return response.json()

If this endpoint takes 500 milliseconds to respond, and 100 users hit it at the same time, user #100 waits roughly 50 seconds. The event loop processes each blocking call sequentially because it cannot switch away from a synchronous operation.

Warning

FastAPI does have a safety net: if you define a route with def (not async def), it automatically runs the function in a thread pool. This prevents total event loop lockup, but it is still slower and less efficient than proper async code. The thread pool has a fixed size, so under high load you can exhaust it.

Setting Up httpx.AsyncClient with Lifespan Events

The recommended way to manage an HTTP client in FastAPI is through lifespan events. You create the client when the application starts and close it when the application shuts down. This gives every endpoint access to a shared client with a warm connection pool.

import httpx
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: create the shared HTTP client
    app.state.http_client = httpx.AsyncClient(
        timeout=httpx.Timeout(10.0, connect=5.0),
        limits=httpx.Limits(max_connections=50, max_keepalive_connections=20),
    )
    yield
    # Shutdown: close the client and release connections
    await app.state.http_client.aclose()

app = FastAPI(lifespan=lifespan)

# Access the client in any endpoint via request.app.state
@app.get("/data")
async def get_data(request: Request):
    client = request.app.state.http_client
    response = await client.get("https://jsonplaceholder.typicode.com/posts/1")
    return response.json()

The lifespan function is an async context manager. Everything before yield runs at startup. Everything after yield runs at shutdown. By storing the client on app.state, it is accessible from any endpoint through the Request object.

The httpx.Timeout object sets a 10-second overall timeout and a 5-second connection timeout. The httpx.Limits object caps the connection pool at 50 total connections and 20 keep-alive connections. These settings prevent your client from opening too many sockets or hanging on slow servers.

Pro Tip

HTTPX is FastAPI's recommended HTTP client. It is installed automatically when you install FastAPI and provides both synchronous and asynchronous interfaces with an API that mirrors the popular requests library.

Making a Single Non-Blocking External Call

With the shared client in place, making a non-blocking external API call is a single await expression. While the external server is processing the request, FastAPI's event loop is free to handle other incoming requests.

@app.get("/user/{user_id}")
async def get_user(user_id: int, request: Request):
    client = request.app.state.http_client
    response = await client.get(
        f"https://jsonplaceholder.typicode.com/users/{user_id}"
    )
    response.raise_for_status()
    return response.json()

The await keyword is the critical piece. It tells the event loop: "I am waiting for a network response. Go handle other requests in the meantime." When the response arrives, execution resumes right where it left off. No threads, no callbacks, no complexity.

Calling Multiple APIs in Parallel with asyncio.gather

Many API endpoints need to aggregate data from several sources. A user dashboard might pull profile data, recent orders, and notification counts from three different services. Making these calls sequentially triples the response time. Making them in parallel with asyncio.gather reduces it to the time of the single slowest call.

import asyncio
from fastapi import FastAPI, Request, HTTPException

@app.get("/dashboard/{user_id}")
async def get_dashboard(user_id: int, request: Request):
    client = request.app.state.http_client
    base = "https://api.example.com"

    try:
        profile, orders, notifications = await asyncio.gather(
            client.get(f"{base}/users/{user_id}"),
            client.get(f"{base}/users/{user_id}/orders?limit=5"),
            client.get(f"{base}/users/{user_id}/notifications/count"),
        )
    except httpx.RequestError as e:
        raise HTTPException(status_code=502, detail=f"Upstream service error: {e}")

    return {
        "profile": profile.json(),
        "orders": orders.json(),
        "notification_count": notifications.json(),
    }

If each service takes 200 milliseconds, the sequential version takes 600 milliseconds. The gather version takes about 200 milliseconds. For three calls this is a 3x improvement. For endpoints that fan out to ten or twenty microservices, the difference is the margin between a responsive app and a timeout.

Handling Errors from External Services

External APIs fail. They time out, return 500 errors, or go down entirely. Your FastAPI endpoint needs to handle these failures without crashing.

import httpx
from fastapi import Request, HTTPException

@app.get("/resilient/{user_id}")
async def get_user_resilient(user_id: int, request: Request):
    client = request.app.state.http_client
    url = f"https://api.example.com/users/{user_id}"

    try:
        response = await client.get(url)
        response.raise_for_status()
        return response.json()

    except httpx.TimeoutException:
        raise HTTPException(
            status_code=504,
            detail="The upstream service took too long to respond"
        )

    except httpx.HTTPStatusError as e:
        if e.response.status_code == 404:
            raise HTTPException(status_code=404, detail="User not found")
        raise HTTPException(
            status_code=502,
            detail=f"Upstream returned {e.response.status_code}"
        )

    except httpx.RequestError as e:
        raise HTTPException(
            status_code=502,
            detail=f"Could not reach upstream service: {e}"
        )

This maps upstream errors to appropriate HTTP status codes for your own API consumers. A timeout becomes 504 (Gateway Timeout). An upstream 404 passes through as 404. Other HTTP errors become 502 (Bad Gateway). Network-level failures also return 502 with a descriptive message.

Note

The raise_for_status() method throws an httpx.HTTPStatusError for any 4xx or 5xx response. If you omit it, the response object will contain the error status code but no exception will be raised, which can lead to silent failures.

Offloading Slow Work with Background Tasks

Sometimes an endpoint triggers work that the user does not need to wait for -- sending a notification email, logging analytics, or updating a cache. FastAPI's BackgroundTasks lets you return a response immediately and handle the slow work after the response is sent.

from fastapi import BackgroundTasks, Request

async def log_api_call(client: httpx.AsyncClient, event: dict):
    """Send analytics event to logging service (fire-and-forget)."""
    try:
        await client.post("https://analytics.example.com/events", json=event)
    except httpx.RequestError:
        pass  # Logging failure should not affect the user

@app.get("/products/{product_id}")
async def get_product(
    product_id: int,
    request: Request,
    background_tasks: BackgroundTasks,
):
    client = request.app.state.http_client
    response = await client.get(f"https://api.example.com/products/{product_id}")
    response.raise_for_status()
    product = response.json()

    # Schedule analytics logging AFTER the response is returned
    background_tasks.add_task(
        log_api_call,
        client,
        {"event": "product_viewed", "product_id": product_id},
    )

    return product

The user gets the product data immediately. The analytics call happens in the background after the response is already on its way back. If the analytics service is slow or down, the user never notices.

The Blocking Code Cheat Sheet

The number one mistake in FastAPI async code is accidentally using a blocking library inside an async def function. Here is a quick reference for common blocking calls and their async replacements.

Blocking (do not use in async def) Non-Blocking Replacement
requests.get() await httpx_client.get()
time.sleep(n) await asyncio.sleep(n)
open().read() await aiofiles.open()
psycopg2 queries asyncpg or SQLAlchemy AsyncSession
pymongo queries motor (async MongoDB driver)
subprocess.run() await asyncio.create_subprocess_exec()

If you must use a blocking library that has no async equivalent, define your route with def instead of async def. FastAPI will automatically run it in a thread pool, which prevents event loop lockup at the cost of slightly reduced efficiency.

Key Takeaways

  1. Never use synchronous HTTP libraries in async def endpoints: Libraries like requests block the event loop and prevent FastAPI from handling concurrent requests. Use httpx.AsyncClient with await instead.
  2. Create one shared AsyncClient via lifespan events: Initialize httpx.AsyncClient at startup and close it at shutdown. Store it on app.state so every endpoint can reuse the same connection pool without creating new clients per request.
  3. Use asyncio.gather for parallel external calls: When an endpoint needs data from multiple services, gather all the calls together. Total latency equals the slowest individual call rather than the sum of all calls.
  4. Map upstream errors to proper HTTP status codes: Catch httpx.TimeoutException, httpx.HTTPStatusError, and httpx.RequestError separately and translate them into meaningful error responses for your API consumers.
  5. Use BackgroundTasks for non-critical work: Analytics logging, notification emails, and cache updates should not delay the response to the user. FastAPI's BackgroundTasks lets you schedule this work to happen after the response is sent.

FastAPI gives you a high-performance async foundation. But that foundation only delivers on its promise when every I/O operation in your code is non-blocking. Replace synchronous libraries with async alternatives, manage your HTTP client lifecycle through lifespan events, and use asyncio.gather to parallelize external calls. The result is a backend that handles thousands of concurrent users without breaking a sweat.

back to articles