How to Build an Async REST API Client Class in Python with Connection Pooling

Final Exam & Certification

Complete this tutorial and pass the 14-question final exam to earn a downloadable certificate of completion.

skip to exam

Sprinkling httpx.AsyncClient() calls throughout your codebase works for small scripts, but it becomes a maintenance problem as your project grows. You end up repeating the same base URL, authentication headers, timeout settings, and error handling in every file that talks to your API. This tutorial walks you step by step through building a dedicated client class that encapsulates all of this in one place -- starting from a minimal working example you can run immediately, then layering in connection pooling configuration, authentication, retry logic, and a circuit breaker until you have a production-ready client. By the end you will have written the class, run it against a real API, and verified its behaviour with a test suite.

What's in this Python Tutorial

Why Connection Pooling Matters

Every time you create a new httpx.AsyncClient() or call httpx.get() at the module level, a new TCP connection is established. That means a fresh DNS lookup, TCP handshake, and (for HTTPS) a TLS negotiation -- all before a single byte of your request payload is sent. For a single request this overhead is barely noticeable. For 100 requests it adds up to seconds of wasted time.

The cost is measurable. A TLS 1.2 handshake requires two full round trips after the TCP connection is established, and a TLS 1.3 handshake requires one. For a cross-region connection with 150ms round-trip time, that translates to 300-450ms of setup overhead per new connection before any application data flows. ThousandEyes research measured this directly: a user in London connecting to a server in San Jose adds roughly 150ms per round trip, meaning a full TLS 1.2 handshake alone costs around 300ms (source: ThousandEyes, "Optimizing Web Performance with TLS 1.3").

A connection pool solves this by keeping TCP connections open after a request completes. The next request to the same host reuses the existing connection, skipping the handshake entirely. The httpx AsyncClient provides connection pooling by default, but you need to configure it correctly and make sure you are reusing the same client instance across your application. The official httpx documentation specifically warns against creating multiple client instances in tight loops, because each new instance spins up its own connection pool and defeats the purpose of reuse (source: httpx Async Support docs).

INTERACTIVE — CONNECTION OVERHEAD CALCULATOR

Adjust the sliders to see the real cost of opening a new connection for every request versus reusing a pooled connection.

80 ms
50 requests
TLS 1.3 (1 extra round-trip)
50 ms
WITHOUT POOLING (new conn each request)
WITH POOLING (reused connections)
Without pooling
With pooling
Note

The default AsyncClient pool allows up to 100 total connections and 20 keep-alive connections, with idle connections expiring after 5 seconds. These defaults are defined in the httpx Resource Limits documentation and match the Limits(max_connections=100, max_keepalive_connections=20, keepalive_expiry=5.0) signature in the httpx API reference. For high-throughput applications, you will want to tune these values.

What Happens Inside the Connection Pool

To understand why connection pooling saves so much time, it helps to understand what the operating system is doing under the hood every time a new TCP connection is opened. This is the invisible work that a pooled connection skips entirely.

When your code calls await client.get("/users/1") on a cold start -- no existing connections in the pool -- the following sequence executes at the kernel level. The OS resolves the hostname through DNS (which itself may involve multiple UDP round trips to nameservers), then allocates a socket file descriptor, then performs a three-way TCP handshake (SYN, SYN-ACK, ACK), then negotiates TLS by exchanging cipher suites and certificates, and only then does the HTTP request travel over the wire. Each of those steps involves at least one full network round trip, and for TLS 1.2, the handshake alone requires two. On a connection with 100ms round-trip latency, the total overhead before the first byte of application data flows can exceed 400ms.

A pooled connection eliminates all of that. The socket file descriptor stays open, the TLS session remains negotiated, and the TCP state machine stays in the ESTABLISHED state. The next request on that connection skips straight to writing HTTP frames onto an already-warm pipe. The operating system maintains the connection state in kernel memory -- around 3-4 KB per socket on Linux -- which is a trivial cost compared to the latency savings.

The Keep-Alive Lifecycle

HTTP keep-alive is the mechanism that makes pooling work at the protocol level. After a response is received, the server does not close the connection. Instead, the connection stays open, waiting for the next request. The httpx keepalive_expiry setting controls how long the client keeps an idle connection before closing it. But the server has its own timeout, and it may close the connection before your client does. When this happens, the next request on that connection hits a "reset" or "broken pipe" error on the first byte sent, and the pool transparently retries on a fresh connection. This is why you occasionally see a single slow request in an otherwise fast stream -- the pool is recovering from a server-side closure.

There is also a subtler lifecycle issue. When a TCP connection is closed, the socket enters a TIME_WAIT state on the side that initiated the close. During TIME_WAIT (which lasts 60 seconds by default on Linux), that socket's port number is unavailable for reuse. If your application opens and closes thousands of connections rapidly -- the exact pattern you get when you create a new AsyncClient per request -- you can exhaust the local port range entirely. The kernel has roughly 28,000 ephemeral ports available by default (ports 32768-60999 on many Linux distributions). At high throughput without pooling, you will hit this limit, and new connections will fail with OSError: [Errno 99] Cannot assign requested address. Connection pooling prevents this entirely because pooled connections are never closed during normal operation.

TCP CONNECTION LIFECYCLE — POOLED VS. UNPOOLED
Request 1 DNS TCP HS TLS HS Request CLOSE Request 2 DNS TCP HS TLS HS Request CLOSE Every request pays DNS + TCP handshake + TLS handshake overhead. Ports enter TIME_WAIT after each close. At 80ms RTT + TLS 1.3: ~240ms overhead per request (DNS + TCP + TLS 1.3), paid again for every single request.
Connection setup (once) DNS TCP HS TLS HS Request 1 Connection stays ESTABLISHED Request 2 Request 3 Request 4 ... N more requests, no overhead Handshake overhead paid once. All subsequent requests go straight to the wire. Port stays ESTABLISHED — no TIME_WAIT, no port exhaustion risk.
Pro Tip

You can observe your application's TCP connection state distribution with ss -s or ss -tnap | grep ESTAB on Linux. If you see a large number of sockets in TIME_WAIT relative to ESTABLISHED, your application is churning connections instead of reusing them. A healthy async client using connection pooling will show a stable, small number of ESTABLISHED connections and very few TIME_WAIT entries.

Pool Sizing and the Queuing Problem

The connection pool is not just a bag of open sockets. It is a bounded queue with two distinct populations: active connections (currently handling a request) and idle connections (finished their last request, waiting for the next one). The max_connections setting caps the total of both populations. The max_keepalive_connections setting caps only the idle population.

This distinction matters because the two settings create different failure modes. If your idle limit is too low, the pool will close connections that could have been reused, forcing new handshakes on the next burst of requests. If your total connection limit is too low relative to your concurrency, requests will queue up waiting for a connection and eventually trigger PoolTimeout. The pool timeout (configured via httpx.Timeout(pool=...)) controls how long a request will wait in that queue before giving up. It is the timeout you are least likely to configure explicitly and the one most likely to bite you in production -- because the default pool timeout inherits from the overall timeout, and a 5-second wait for a connection slot is often too short for bursty workloads.

The Minimal Client Class

Before You Write Any Code: Set Up Your Environment

Follow these steps exactly before continuing. Every code block in this tutorial assumes this environment is in place.

Step 1 — Confirm your Python version. This tutorial requires Python 3.10 or later (the X | Y union type syntax used in type hints is not available in earlier versions). Run the following and check the output:

python3 --version

You should see Python 3.10.x or higher. If you see anything lower, install a current version before continuing.

Step 2 — Create a project folder and a virtual environment. Using a virtual environment keeps this project's dependencies isolated from your system Python.

mkdir api_client_tutorial
cd api_client_tutorial
python3 -m venv .venv

Step 3 — Activate the virtual environment. The command differs by OS:

# macOS / Linux
source .venv/bin/activate

# Windows (Command Prompt)
.venv\Scripts\activate.bat

# Windows (PowerShell)
.venv\Scripts\Activate.ps1

After activation, your terminal prompt should show (.venv) at the start. If it does not, stop and resolve the activation before continuing -- all subsequent install and run commands depend on it.

Step 4 — Install httpx.

pip install httpx

Verify the install succeeded:

python3 -c "import httpx; print(httpx.__version__)"

You should see a version string like 0.28.1. Any version 0.23 or later will work with this tutorial.

Step 5 — Create the file you will be working in.

touch api_client.py   # macOS / Linux
# Windows: create a new file named api_client.py in your editor

Every code block in sections 2 through 6 belongs in this file, with each section building on the previous one. Section 7 shows you how to run it.

The simplest useful client wraps AsyncClient in an async context manager so the connection pool is created when you enter the block and closed when you exit.

import asyncio
import httpx

class ApiClient:
    def __init__(self, base_url: str):
        self._base_url = base_url
        self._client: httpx.AsyncClient | None = None

    async def __aenter__(self):
        self._client = httpx.AsyncClient(base_url=self._base_url)
        return self

    async def __aexit__(self, *exc):
        await self._client.aclose()
        self._client = None

    async def get(self, path: str, **kwargs) -> dict:
        response = await self._client.get(path, **kwargs)
        response.raise_for_status()
        return response.json()

    async def post(self, path: str, **kwargs) -> dict:
        response = await self._client.post(path, **kwargs)
        response.raise_for_status()
        return response.json()

    async def patch(self, path: str, **kwargs) -> dict:
        response = await self._client.patch(path, **kwargs)
        response.raise_for_status()
        return response.json()

# Usage
async def main():
    async with ApiClient("https://jsonplaceholder.typicode.com") as api:
        user = await api.get("/users/1")
        print(user["name"])

asyncio.run(main())

This class gives you three things that raw httpx.get() calls do not: the base URL is set once and shared across all requests, the connection pool is reused, and the client is properly closed when the block exits. Every call to api.get() or api.post() uses a relative path, keeping your business logic clean.

Walk-Through: What Each Part Does

Copy the class above into api_client.py, then read through each piece before running anything.

The constructor (__init__). It stores the base URL and initialises self._client to None. The client does not exist yet -- it is created in __aenter__. This matters because you need the async with block to be the thing that controls when the connection pool opens and closes. If you created the client in __init__, it would open a pool with no guarantee that anyone will close it.

__aenter__. This runs when Python enters the async with block. It creates the httpx.AsyncClient, passing in the base URL so all relative paths will be resolved against it. It then returns self -- meaning the object you get as the as api variable is the ApiClient instance itself, not the inner AsyncClient.

__aexit__. This runs when Python exits the async with block, regardless of whether an exception was raised. It calls await self._client.aclose() -- the async close method -- which drains any in-flight requests, closes all pooled connections, and frees the associated socket file descriptors. Setting self._client = None afterwards prevents any accidental reuse of a closed client.

get and post. These are thin wrappers. They call the equivalent method on the inner client, then call response.raise_for_status(), which raises an httpx.HTTPStatusError for any 4xx or 5xx response. If the response was successful, they return response.json() -- the parsed response body as a Python dictionary.

Try it now. The code block above already includes asyncio.run(main()) at the bottom. Copy the entire class (including the import httpx line and the runner at the end) into api_client.py and run it:

python3 api_client.py

You should see output like:

Leanne Graham

If you see a name, the client is working. The request went out over HTTPS, the response was parsed from JSON, and the connection pool was opened and cleanly closed. If you see an error instead, check that your virtual environment is active and that httpx is installed.

Task Safety

The AsyncClient is designed to be shared across concurrent async tasks within the same event loop. When asked directly about this, httpx maintainer Tom Christie confirmed: "HTTPX is thread and task safe" (source: httpx Discussion #1633). Sharing one client instance is not just safe -- it is the intended usage pattern and the only way to get full benefit from the connection pool. Do not, however, share a single AsyncClient across multiple event loops.

CHECK YOUR UNDERSTANDING
You create an ApiClient instance inside a for loop and call get() 100 times. What is the key problem with this pattern?
SPOT THE BUG

The following ApiClient implementation has a subtle but serious defect. Read it carefully before revealing the answer.

class ApiClient:
    def __init__(self, base_url: str):
        self._base_url = base_url
        self._client = httpx.AsyncClient(base_url=self._base_url)

    async def __aenter__(self):
        return self

    async def __aexit__(self, *exc):
        await self._client.aclose()
        self._client = None

    async def get(self, path: str) -> dict:
        response = await self._client.get(path)
        response.raise_for_status()
        return response.json()
The bug: The AsyncClient is created in __init__, not in __aenter__. This means the connection pool opens the moment the object is instantiated — before async with is entered — with no guarantee it will ever be closed. If the caller forgets async with and just calls ApiClient(url).get(path), the pool leaks. It also means the client is created in whatever context __init__ runs in, which may not be an async context at all. The fix is to move self._client = httpx.AsyncClient(...)​ into __aenter__, where lifecycle ownership is explicit and guaranteed.

Adding Configuration: Timeouts and Limits

A production client needs explicit control over timeouts and connection pool sizing. Without these, a slow server can hang your application indefinitely, or a burst of requests can exhaust your system's file descriptors.

import httpx

class ApiClient:
    def __init__(
        self,
        base_url: str,
        timeout: float = 10.0,
        max_connections: int = 50,
        max_keepalive: int = 20,
    ):
        self._base_url = base_url
        self._timeout = httpx.Timeout(
            timeout,
            connect=5.0,
        )
        self._limits = httpx.Limits(
            max_connections=max_connections,
            max_keepalive_connections=max_keepalive,
            keepalive_expiry=30.0,
        )
        self._client: httpx.AsyncClient | None = None

    async def __aenter__(self):
        self._client = httpx.AsyncClient(
            base_url=self._base_url,
            timeout=self._timeout,
            limits=self._limits,
        )
        return self

    async def __aexit__(self, *exc):
        await self._client.aclose()
        self._client = None

The httpx.Timeout object sets a 10-second overall timeout with a 5-second connection timeout. Without an explicit timeout, httpx defaults to 5 seconds for all operations -- connect, read, write, and pool acquisition (source: httpx Timeouts documentation). The httpx.Limits object caps the pool at 50 total connections and 20 idle keep-alive connections, with idle connections closed after 30 seconds (the keepalive_expiry=30.0 value set here is a custom choice; httpx's own default is 5 seconds, which is short enough to cause unnecessary handshake overhead on bursty workloads). Note that the pool timeout (how long to wait for an available connection from the pool) is separate from the connect timeout -- if all connections are in use and the pool is full, httpx.PoolTimeout is raised, not ConnectTimeout.

Pro Tip

The keepalive_expiry controls how long an idle connection is kept open. Set it higher (60-120 seconds) for APIs you call frequently, and lower (5-10 seconds) for APIs you call sporadically. Keeping connections open costs a small amount of memory but saves significant latency on the next request.

Adding Authentication

Authentication headers should be set once at the client level, not repeated in every request. The AsyncClient accepts a headers parameter that applies to all outgoing requests. Per-request headers can still be passed to individual calls and will be merged with the client-level headers (source: httpx Client documentation).

class ApiClient:
    def __init__(
        self,
        base_url: str,
        api_key: str | None = None,
        bearer_token: str | None = None,
        timeout: float = 10.0,
        max_connections: int = 50,
        max_keepalive: int = 20,
    ):
        self._base_url = base_url
        self._headers = {"Accept": "application/json"}

        if api_key:
            self._headers["X-API-Key"] = api_key
        if bearer_token:
            self._headers["Authorization"] = f"Bearer {bearer_token}"

        self._timeout = httpx.Timeout(timeout, connect=5.0)
        self._limits = httpx.Limits(
            max_connections=max_connections,
            max_keepalive_connections=max_keepalive,
            keepalive_expiry=30.0,
        )
        self._client: httpx.AsyncClient | None = None

    async def __aenter__(self):
        self._client = httpx.AsyncClient(
            base_url=self._base_url,
            headers=self._headers,
            timeout=self._timeout,
            limits=self._limits,
        )
        return self

Now you can instantiate the client with ApiClient("https://api.example.com", bearer_token="abc123") and every request includes the Authorization header automatically. Individual requests can still override or add headers by passing a headers parameter to get() or post().

Adding Retry Logic

A robust client handles transient failures transparently. Adding retry logic with exponential backoff inside the client class means the rest of your code does not need to worry about network hiccups or temporary server errors.

import asyncio
import random
import logging

logger = logging.getLogger(__name__)

RETRYABLE_STATUS_CODES = {429, 500, 502, 503, 504}

class ApiClient:
    # ... __init__ and __aenter__/__aexit__ from above ...

    async def _request(self, method: str, path: str, max_retries: int = 3, **kwargs):
        last_exception = None

        for attempt in range(max_retries):
            try:
                response = await self._client.request(method, path, **kwargs)

                if response.status_code == 429:
                    retry_after = float(response.headers.get("Retry-After", 1))
                    logger.warning(f"Rate limited on {path}, waiting {retry_after}s")
                    await asyncio.sleep(retry_after)
                    continue

                response.raise_for_status()
                return response.json()

            except httpx.HTTPStatusError as e:
                last_exception = e
                if e.response.status_code not in RETRYABLE_STATUS_CODES:
                    raise

            except (httpx.TimeoutException, httpx.ConnectError) as e:
                last_exception = e

            if attempt < max_retries - 1:
                delay = min(2 ** attempt + random.uniform(0, 1), 30)
                logger.info(f"Retry {attempt + 1}/{max_retries} for {path} in {delay:.1f}s")
                await asyncio.sleep(delay)

        raise last_exception

    async def get(self, path: str, **kwargs) -> dict:
        return await self._request("GET", path, **kwargs)

    async def post(self, path: str, **kwargs) -> dict:
        return await self._request("POST", path, **kwargs)

    async def put(self, path: str, **kwargs) -> dict:
        return await self._request("PUT", path, **kwargs)

    async def patch(self, path: str, **kwargs) -> dict:
        return await self._request("PATCH", path, **kwargs)

    async def delete(self, path: str, **kwargs) -> dict:
        return await self._request("DELETE", path, **kwargs)

The _request method is the internal engine. It retries on transient HTTP errors and network failures, respects Retry-After headers for rate-limited responses, applies exponential backoff with jitter, and raises immediately on non-retryable client errors (4xx except 429). The public methods (get, post, put, patch, delete) are thin wrappers that delegate to _request.

Warning

Be careful retrying POST requests. Unlike GET requests, POST requests may not be idempotent -- retrying a payment submission could charge the user twice. Only retry POST requests if the API supports idempotency keys or if you know the operation is safe to repeat.

CHECK YOUR UNDERSTANDING
Your _request method receives an httpx.HTTPStatusError with status code 422 Unprocessable Entity. What should happen?

The Full Production Client

Here is the complete class with all the pieces assembled: connection pooling, timeouts, authentication, retry logic, and concurrent batch fetching.

import asyncio
import random
import logging
import httpx

logger = logging.getLogger(__name__)

RETRYABLE_STATUS_CODES = {429, 500, 502, 503, 504}

class ApiClient:
    def __init__(
        self,
        base_url: str,
        api_key: str | None = None,
        bearer_token: str | None = None,
        timeout: float = 10.0,
        max_connections: int = 50,
        max_keepalive: int = 20,
        max_retries: int = 3,
    ):
        self._base_url = base_url
        self._max_retries = max_retries
        self._headers = {"Accept": "application/json"}

        if api_key:
            self._headers["X-API-Key"] = api_key
        if bearer_token:
            self._headers["Authorization"] = f"Bearer {bearer_token}"

        self._timeout = httpx.Timeout(timeout, connect=5.0)
        self._limits = httpx.Limits(
            max_connections=max_connections,
            max_keepalive_connections=max_keepalive,
            keepalive_expiry=30.0,
        )
        self._client: httpx.AsyncClient | None = None

    async def __aenter__(self):
        self._client = httpx.AsyncClient(
            base_url=self._base_url,
            headers=self._headers,
            timeout=self._timeout,
            limits=self._limits,
        )
        return self

    async def __aexit__(self, *exc):
        if self._client:
            await self._client.aclose()
            self._client = None

    async def _request(self, method: str, path: str, **kwargs):
        max_retries = kwargs.pop("max_retries", self._max_retries)
        last_exception = None

        for attempt in range(max_retries):
            try:
                response = await self._client.request(method, path, **kwargs)

                if response.status_code == 429:
                    retry_after = float(response.headers.get("Retry-After", 1))
                    logger.warning(f"Rate limited on {path}, retrying in {retry_after}s")
                    await asyncio.sleep(retry_after)
                    continue

                response.raise_for_status()

                content_type = response.headers.get("content-type", "")
                if "application/json" in content_type:
                    return response.json()
                return response.text

            except httpx.HTTPStatusError as e:
                last_exception = e
                if e.response.status_code not in RETRYABLE_STATUS_CODES:
                    raise

            except (httpx.TimeoutException, httpx.ConnectError) as e:
                last_exception = e

            if attempt < max_retries - 1:
                delay = min(2 ** attempt + random.uniform(0, 1), 30)
                logger.info(f"Retry {attempt + 1}/{max_retries} for {method} {path} in {delay:.1f}s")
                await asyncio.sleep(delay)

        raise last_exception

    async def get(self, path: str, **kwargs):
        return await self._request("GET", path, **kwargs)

    async def post(self, path: str, **kwargs):
        return await self._request("POST", path, **kwargs)

    async def put(self, path: str, **kwargs):
        return await self._request("PUT", path, **kwargs)

    async def patch(self, path: str, **kwargs):
        return await self._request("PATCH", path, **kwargs)

    async def delete(self, path: str, **kwargs):
        return await self._request("DELETE", path, **kwargs)

    async def get_many(self, paths: list[str], **kwargs) -> list:
        tasks = [self.get(path, **kwargs) for path in paths]
        return await asyncio.gather(*tasks, return_exceptions=True)

The get_many method is a convenience for batch fetching. It takes a list of paths, creates a coroutine for each, and runs them concurrently with asyncio.gather. The return_exceptions=True flag ensures that one failed request does not cancel the others -- you get a mix of results and exceptions in the returned list.

Using the Client in Your Application

This section walks you through running the full production client from section 6 against a real public API. By the end you will have seen the client make a single request, then a batch of 20 concurrent requests, and you will be able to read the output to confirm both work correctly.

Step 1 — Make sure your file contains the full production class from section 6. Replace the contents of api_client.py with the complete class shown in that section (including the import statements at the top). Do not add the minimal class from section 2 as well -- they define the same class name and will conflict.

Step 2 — Add the runner at the bottom of the file. Paste the following after the class definition:

async def main():
    async with ApiClient(
        base_url="https://jsonplaceholder.typicode.com",
        timeout=15.0,
        max_connections=30,
    ) as api:

        # Single request
        user = await api.get("/users/1")
        print(f"User: {user['name']}")

        # Concurrent batch
        paths = [f"/posts/{i}" for i in range(1, 21)]
        results = await api.get_many(paths)

        successes = [r for r in results if not isinstance(r, Exception)]
        failures = [r for r in results if isinstance(r, Exception)]
        print(f"Fetched {len(successes)} posts, {len(failures)} failures")

asyncio.run(main())

Step 3 — Run it.

python3 api_client.py

Expected output:

User: Leanne Graham
Fetched 20 posts, 0 failures

The first line confirms the single get() request succeeded and the JSON was parsed correctly. The second line confirms that all 20 concurrent requests in get_many() returned successfully -- zero failures. The 20 requests ran concurrently over the same connection pool, not one after another.

If you see any failures in the count, it is likely a transient network issue with the public test API. Run the script again -- the retry logic will handle it. If the failure count is consistently non-zero, verify your internet connection and check that jsonplaceholder.typicode.com is reachable.

What you just proved: the client opened one connection pool, made 21 total requests (1 + 20) through it, and closed the pool cleanly when the async with block exited. Without pooling, those 21 requests would have opened 21 separate TCP connections. With pooling, they share a small set of reused connections.

The calling code is clean. It does not know or care about connection pooling, retry logic, or timeout configuration. It creates a client, calls get() or get_many(), and receives data. All the infrastructure concerns are hidden inside the class.

Pro Tip

In a FastAPI application, create the client in your lifespan handler and store it on app.state. This gives every endpoint access to the same pooled client without creating it per request. See the FastAPI article in this series for the full pattern.

Known Production Pitfall: PoolTimeout Exhaustion

There is a well-documented production issue where a long-lived AsyncClient can enter a state where every request raises httpx.PoolTimeout, even at low request rates. This has been reported by teams running httpx at scale (source: httpx Discussion #2556). The root cause appears related to connection pool state corruption after extended periods of transient errors -- particularly ReadTimeout exceptions that leave connections in an inconsistent state.

The practical mitigation is to add a health-check mechanism to your client class. If multiple consecutive PoolTimeout exceptions occur, destroy the current client and create a fresh one. Here is a minimal implementation that extends the production client from the previous section:

    async def _request(self, method: str, path: str, **kwargs):
        max_retries = kwargs.pop("max_retries", self._max_retries)
        last_exception = None
        pool_timeouts = 0

        for attempt in range(max_retries):
            try:
                response = await self._client.request(method, path, **kwargs)
                # ... normal retry logic from above ...

            except httpx.PoolTimeout as e:
                pool_timeouts += 1
                last_exception = e
                if pool_timeouts >= 2:
                    logger.error("Pool exhausted, recycling client")
                    old_client = self._client
                    self._client = httpx.AsyncClient(
                        base_url=self._base_url,
                        headers=self._headers,
                        timeout=self._timeout,
                        limits=self._limits,
                    )
                    await old_client.aclose()
                    pool_timeouts = 0

            # ... rest of retry logic ...

        raise last_exception

This pattern -- recycle the client when the pool becomes unresponsive -- has been validated by production teams handling sustained traffic. The key is to close the old client asynchronously so in-flight requests on its connections are not abruptly terminated.

HTTP/2 Note

If the API you are calling supports HTTP/2, you can enable it by passing http2=True to the AsyncClient (requires the h2 package: pip install httpx[http2]). HTTP/2 multiplexes many requests over a single TCP connection, so connection pool exhaustion is far less likely. However, HTTP/2 support in httpx is disabled by default because not all servers handle it correctly. The httpx HTTP/2 documentation covers the tradeoffs.

Controlling Concurrency with Semaphores

The get_many method fires every request simultaneously. If you pass it 500 paths, 500 coroutines compete for the connection pool at once. Even with a pool limit of 50, this creates a pile of coroutines waiting on PoolTimeout and can overwhelm an API that enforces per-second rate limits. The connection pool controls how many TCP connections exist, but it does not control how many requests are queued up waiting for those connections.

An asyncio.Semaphore solves this by capping how many requests are in flight at any given moment. Instead of launching all coroutines and letting the pool sort it out, a semaphore holds excess coroutines at the gate until a slot opens.

class ApiClient:
    def __init__(
        self,
        base_url: str,
        # ... other params from production client ...
        max_concurrency: int = 20,
    ):
        # ... existing init code ...
        self._semaphore = asyncio.Semaphore(max_concurrency)

    async def _request(self, method: str, path: str, **kwargs):
        async with self._semaphore:
            max_retries = kwargs.pop("max_retries", self._max_retries)
            last_exception = None

            for attempt in range(max_retries):
                # ... existing retry logic unchanged ...
                pass

            raise last_exception

    async def get_many(self, paths: list[str], **kwargs) -> list:
        tasks = [self.get(path, **kwargs) for path in paths]
        return await asyncio.gather(*tasks, return_exceptions=True)

Wrapping the semaphore around the entire _request method (including retries) means a single request holds its slot for the full duration of its retry cycle. This prevents a scenario where retrying requests flood back into the queue and starve new requests. The get_many method itself does not change at all -- the concurrency control is invisible to callers because it lives inside _request.

Pro Tip

Set max_concurrency lower than max_connections. If your pool allows 50 connections, a semaphore of 20-30 leaves headroom for connection churn without triggering PoolTimeout. The httpx maintainer Tom Christie has recommended using Python 3.11's asyncio.TaskGroup as a more structured alternative to bare asyncio.gather for managing concurrent work (source: httpx Discussion #2662).

Structured Concurrency with TaskGroup

The get_many method uses asyncio.gather because it was the only practical option before Python 3.11. But gather has a fundamental design problem: when one task fails, the other tasks keep running unless you explicitly handle cancellation. With return_exceptions=True, failures are silently mixed into the results list, and the caller has to filter them out manually. With return_exceptions=False (the default), the first exception propagates, but the remaining tasks become orphans -- still running in the background with no parent waiting for them.

Python 3.11 introduced asyncio.TaskGroup to solve this. A TaskGroup is an async context manager that owns every task created inside it. When the context manager exits, it waits for all tasks to finish. If any task raises an exception, the TaskGroup cancels every sibling task and raises an ExceptionGroup containing all the errors. This is structured concurrency: the lifetime of every concurrent operation is scoped to a visible block, and no task can outlive its parent.

Here is a get_many alternative using TaskGroup:

    async def get_many(self, paths: list[str], **kwargs) -> list:
        results: dict[int, object] = {}

        async def _fetch(index: int, path: str):
            results[index] = await self.get(path, **kwargs)

        async with asyncio.TaskGroup() as tg:
            for i, path in enumerate(paths):
                tg.create_task(_fetch(i, path))

        return [results[i] for i in sorted(results)]

The behavior difference is significant. If one request fails with a non-retryable error (say, a 404), the TaskGroup cancels every other in-flight request immediately. No wasted work, no orphaned coroutines consuming connection pool slots. The caller receives an ExceptionGroup that they handle with Python 3.11's except* syntax.

async def main():
    async with ApiClient("https://api.example.com") as api:
        try:
            results = await api.get_many(["/users/1", "/users/999", "/users/2"])
        except* httpx.HTTPStatusError as eg:
            for exc in eg.exceptions:
                print(f"HTTP error on {exc.request.url}: {exc.response.status_code}")
        except* httpx.TimeoutException as eg:
            print(f"{len(eg.exceptions)} requests timed out")

The tradeoff is clear: gather with return_exceptions=True gives you partial results -- you get data from the requests that succeeded even if others failed. TaskGroup gives you all-or-nothing semantics. Which approach is correct depends on your use case. Fetching user profiles for a dashboard page? Partial results are better -- show what you have. Fetching all parts of an order before processing a payment? All-or-nothing is safer.

Note

The Python standard library documentation states that TaskGroup provides stronger safety guarantees than gather because it cancels remaining tasks when a subtask fails (source: Python asyncio Task documentation). If you need the partial-results behavior of gather but the structured lifetime guarantees of TaskGroup, wrap each inner task in a try/except that catches and stores exceptions instead of letting them propagate.

INTERACTIVE — GATHER vs. TASKGROUP: WHICH SHOULD YOU USE?
Are you running Python 3.11 or later?
If one of your concurrent requests fails, what should happen to the others?
Will you handle errors with except* syntax?
USE asyncio.gather(return_exceptions=True)

TaskGroup requires Python 3.11. On older versions, asyncio.gather with return_exceptions=True is the standard approach. Filter the returned list for Exception instances to identify failures.

USE asyncio.gather(return_exceptions=True)

You want partial results — data from successful requests even when some fail. gather with return_exceptions=True returns a mixed list of results and exceptions. Ideal for dashboard pages, previews, or any case where some data is better than none.

USE asyncio.TaskGroup

You want structured concurrency with automatic cancellation on failure and ExceptionGroup error handling. TaskGroup is the right tool — it prevents orphaned coroutines and gives strong lifetime guarantees. Best for pipelines, payment flows, or any case where partial execution is worse than no execution.

USE asyncio.TaskGroup + inner try/except

Use TaskGroup for structured lifetimes, but wrap each task's body in a try/except that stores exceptions to a shared list rather than letting them propagate. This gives you TaskGroup's cancellation guarantees while surfacing a single exception to the caller instead of an ExceptionGroup.

Adding Observability with Event Hooks

When something goes wrong in production -- slow responses, unexpected status codes, auth failures -- you need visibility into what the client is sending and receiving. Sprinkling logger.info calls inside _request is one approach, but httpx provides a cleaner mechanism: event hooks. These are callbacks that fire before every request and after every response, and they are configured at the client level so they apply globally without modifying your request logic (source: httpx Event Hooks documentation).

import time
import logging
import httpx

logger = logging.getLogger(__name__)

async def log_request(request: httpx.Request):
    request.extensions["request_start"] = time.monotonic()
    logger.info(f"--> {request.method} {request.url}")

async def log_response(response: httpx.Response):
    request = response.request
    elapsed = time.monotonic() - request.extensions.get("request_start", 0)
    logger.info(
        f"<-- {response.status_code} {request.method} {request.url} "
        f"({elapsed:.3f}s)"
    )

class ApiClient:
    async def __aenter__(self):
        self._client = httpx.AsyncClient(
            base_url=self._base_url,
            headers=self._headers,
            timeout=self._timeout,
            limits=self._limits,
            event_hooks={
                "request": [log_request],
                "response": [log_response],
            },
        )
        return self

The request.extensions dictionary is a convenient place to stash per-request metadata like timestamps. The request hook writes the start time, and the response hook reads it back to calculate elapsed duration. Because hooks registered on an AsyncClient must be async functions (source: httpx Event Hooks documentation), make sure you define them with async def even if they do not perform any I/O.

Warning

Do not call response.json() or response.read() inside a response hook unless you have a specific reason. Reading the response body in a hook consumes the stream, which means the caller's subsequent response.json() call will return an empty result. If you need to inspect the body for logging, call await response.aread() inside the hook -- this buffers the body so it remains available for subsequent reads.

Handling Token Refresh and Expiring Credentials

The authentication section earlier covers static tokens -- API keys and bearer tokens that are set once and never change. In practice, many APIs issue short-lived tokens (OAuth2 access tokens, JWTs) that expire after minutes or hours. If your client class does not handle renewal, every request after expiration fails with a 401 until someone manually passes in a new token.

The httpx library supports custom authentication flows through its Auth class, which can intercept requests and modify headers dynamically. But for a straightforward token-refresh pattern, a simpler approach works: check the token's expiry before each request and refresh it when needed.

import time
import asyncio
import httpx

class ApiClient:
    def __init__(
        self,
        base_url: str,
        client_id: str,
        client_secret: str,
        token_url: str,
        # ... other params ...
    ):
        self._base_url = base_url
        self._client_id = client_id
        self._client_secret = client_secret
        self._token_url = token_url
        self._access_token: str | None = None
        self._token_expiry: float = 0.0
        self._token_lock = asyncio.Lock()
        # ... rest of init ...

    async def _ensure_token(self):
        if self._access_token and time.monotonic() < self._token_expiry:
            return

        async with self._token_lock:
            # Double-check after acquiring lock
            if self._access_token and time.monotonic() < self._token_expiry:
                return

            response = await self._client.post(
                self._token_url,
                data={
                    "grant_type": "client_credentials",
                    "client_id": self._client_id,
                    "client_secret": self._client_secret,
                },
            )
            response.raise_for_status()
            data = response.json()
            self._access_token = data["access_token"]
            # Refresh 60 seconds early to avoid edge-case expiry
            self._token_expiry = time.monotonic() + data["expires_in"] - 60

    async def _request(self, method: str, path: str, **kwargs):
        await self._ensure_token()
        headers = kwargs.pop("headers", {})
        headers["Authorization"] = f"Bearer {self._access_token}"

        response = await self._client.request(
            method, path, headers=headers, **kwargs
        )

        if response.status_code == 401:
            self._access_token = None  # Force refresh
            await self._ensure_token()
            headers["Authorization"] = f"Bearer {self._access_token}"
            response = await self._client.request(
                method, path, headers=headers, **kwargs
            )

        response.raise_for_status()
        return response.json()

The asyncio.Lock in _ensure_token is critical. Without it, 50 concurrent requests that all detect an expired token will each fire off a separate token refresh request. The lock ensures only one coroutine performs the refresh while the others wait, and the double-check pattern after acquiring the lock prevents redundant refreshes. The 60-second early renewal buffer avoids a race condition where a token expires between the check and the moment the request reaches the server.

Adding a Circuit Breaker

Retry logic handles transient failures -- a single 503, a momentary network hiccup, a brief rate-limit window. But what happens when the API you are calling is not just momentarily slow but genuinely down? Your retry logic keeps trying, each attempt burning a connection pool slot and a semaphore slot for the full backoff duration. Multiply that by every coroutine in your application, and you have a system that is spending all of its resources patiently waiting for a service that is not coming back anytime soon. Retries handle the symptom. A circuit breaker addresses the underlying problem: stop calling a service that is known to be unhealthy.

The circuit breaker pattern, described in Michael Nygard's Release It!, works by tracking failure rates and short-circuiting requests when a threshold is exceeded. It has three states: closed (normal operation, requests flow through), open (failures exceeded the threshold, requests fail immediately without hitting the network), and half-open (after a recovery timeout, one test request is allowed through to check if the service has recovered).

Here is a minimal circuit breaker that integrates directly into the client class:

import time

class CircuitOpen(Exception):
    """Raised when the circuit breaker is open."""
    pass

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 30.0):
        self._failure_count = 0
        self._failure_threshold = failure_threshold
        self._recovery_timeout = recovery_timeout
        self._state = "closed"  # closed | open | half-open
        self._opened_at: float = 0.0

    def record_success(self):
        self._failure_count = 0
        self._state = "closed"

    def record_failure(self):
        self._failure_count += 1
        if self._failure_count >= self._failure_threshold:
            self._state = "open"
            self._opened_at = time.monotonic()

    def allow_request(self) -> bool:
        if self._state == "closed":
            return True
        if self._state == "open":
            if time.monotonic() - self._opened_at >= self._recovery_timeout:
                self._state = "half-open"
                return True
            return False
        # half-open: allow one request to test recovery
        return True

Integrating this into the client's _request method requires only a few lines. Check the breaker before sending a request, record the outcome after:

class ApiClient:
    def __init__(self, base_url: str, **kwargs):
        # ... existing init ...
        self._breaker = CircuitBreaker(
            failure_threshold=5,
            recovery_timeout=30.0,
        )

    async def _request(self, method: str, path: str, **kwargs):
        if not self._breaker.allow_request():
            raise CircuitOpen(
                f"Circuit breaker open for {self._base_url}, "
                f"retry after {self._breaker._recovery_timeout}s"
            )

        try:
            result = await self._do_request(method, path, **kwargs)
            self._breaker.record_success()
            return result
        except (httpx.TimeoutException, httpx.ConnectError) as e:
            self._breaker.record_failure()
            raise

The critical design decision is what counts as a failure. Not every error should trip the breaker. A 404 means the resource does not exist -- that is a correct response from a healthy server. A 400 means you sent a bad request -- also not a server health issue. The breaker should only track failures that indicate the server itself is struggling: connection errors, timeouts, 502/503/504 responses, and PoolTimeout exceptions. Client-side errors (4xx except 429) should pass through without affecting the breaker state.

Notice also that the circuit breaker and the retry logic serve different layers of the failure spectrum. Retries handle isolated, transient errors within a single request lifecycle. The circuit breaker aggregates failure signals across many requests over time and makes a system-level decision to stop trying. Retries operate within seconds. The circuit breaker operates over minutes. Both belong in the client class, but they solve different problems.

Pro Tip

For production systems, consider the aiobreaker library, which is an asyncio-native circuit breaker with configurable failure thresholds, recovery timeouts, excluded exceptions, and listener callbacks for monitoring. It works as a decorator or a context manager and integrates cleanly with httpx clients. If you need shared circuit breaker state across multiple workers or instances, libraries like pybreaker support Redis-backed state storage.

INTERACTIVE — CIRCUIT BREAKER STATE MACHINE
Click the buttons to simulate requests and watch the circuit breaker move through its states. Threshold: 5 failures to open. Recovery timeout: simulated.
CLOSED
Normal operation. Requests flow through.
failures →
OPEN
Service unhealthy. Requests blocked immediately.
timeout →
HALF-OPEN
One probe request allowed to test recovery.
Failures: 0 / 5  |  State: CLOSED

Testing the Client with respx

A client class that wraps all your HTTP logic in one place is also a client class you can test without hitting a real server. The respx library is purpose-built for mocking httpx. It intercepts outgoing requests at the transport layer and returns canned responses, so your retry logic, error handling, and response parsing all execute exactly as they would in production.

Walk-Through: Writing and Running Your First Three Tests

Step 1 — Install the testing dependencies.

pip install pytest pytest-asyncio respx

Verify respx is available:

python3 -c "import respx; print(respx.__version__)"

Step 2 — Configure pytest-asyncio. Create a pytest.ini file in your project root with the following content. This tells pytest-asyncio to run in auto mode, which automatically recognises any async def test function without requiring a @pytest.mark.asyncio decorator on every test. The asyncio_default_fixture_loop_scope line sets the default event loop scope for async fixtures explicitly. In pytest-asyncio 1.0 (released May 2025), this option's default was resolved to function, so leaving it unset no longer emits a deprecation warning in 1.x — but setting it explicitly is still recommended for clarity and to future-proof your configuration. Setting it to function gives each test its own isolated event loop, which is the safest default for most test suites. The current release as of April 2026 is pytest-asyncio 1.3.0.

[pytest]
asyncio_mode = auto
asyncio_default_fixture_loop_scope = function

Step 3 — Create the test file. Create a new file named test_api_client.py in the same folder as api_client.py:

touch test_api_client.py   # macOS / Linux

Step 4 — Paste the three tests below into test_api_client.py. Each test covers a distinct behaviour: a successful response, retry logic, and a non-retryable error. Read the comment above each one before moving on.

import pytest
import respx
import httpx
from api_client import ApiClient

@respx.mock
async def test_get_returns_json():
    respx.get("https://api.example.com/users/1").mock(
        return_value=httpx.Response(200, json={"id": 1, "name": "Alice"})
    )

    async with ApiClient("https://api.example.com") as api:
        result = await api.get("/users/1")

    assert result == {"id": 1, "name": "Alice"}

@respx.mock
async def test_retry_on_server_error():
    responses = iter([
        httpx.Response(503),
        httpx.Response(503),
        httpx.Response(200, json={"status": "ok"}),
    ])
    route = respx.get("https://api.example.com/data").mock(side_effect=responses)

    async with ApiClient("https://api.example.com", max_retries=3) as api:
        result = await api.get("/data")

    assert result == {"status": "ok"}
    assert route.call_count == 3

@respx.mock
async def test_raises_on_client_error():
    respx.get("https://api.example.com/missing").mock(
        return_value=httpx.Response(404)
    )

    async with ApiClient("https://api.example.com") as api:
        with pytest.raises(httpx.HTTPStatusError) as exc_info:
            await api.get("/missing")

    assert exc_info.value.response.status_code == 404

Step 5 — Run the tests.

pytest test_api_client.py -v

Expected output:

test_api_client.py::test_get_returns_json PASSED
test_api_client.py::test_retry_on_server_error PASSED
test_api_client.py::test_raises_on_client_error PASSED

3 passed in 0.XXs

All three should pass immediately, with no network traffic leaving your machine. respx intercepted every request before it could reach the internet.

If any test fails with FAILED, the most common causes are: the import path in from api_client import ApiClient does not match your filename (rename the file or adjust the import), or asyncio_mode = auto is missing from pytest.ini (without it, pytest-asyncio defaults to strict mode and will not automatically pick up async def test functions). Fix whichever applies and re-run.

What each test proves: test_get_returns_json confirms that a 200 response is parsed and returned correctly. test_retry_on_server_error confirms that two consecutive 503 responses trigger retries and the third 200 response is returned -- and that route.call_count == 3 proves the client made exactly three attempts. test_raises_on_client_error confirms that a 404 is not retried and instead raises httpx.HTTPStatusError immediately.

The side_effect=iter([...]) pattern in the retry test is the key technique. Passing an iterator to .mock(side_effect=...) causes each call to the mocked route to consume the next item from the iterator, letting you verify that the client retries on 503 and succeeds on the third attempt without waiting for real backoff delays. For tests where you need to verify request headers, authentication, or query parameters, respx captures every request made to a route via route.calls, so you can assert on exactly what was sent.

Alternative: httpx's Built-in Transport Mocking

If you prefer not to add a dependency, httpx provides httpx.MockTransport which accepts a handler function and can be passed directly to AsyncClient(transport=...) (source: httpx Transports documentation). This gives you full control but requires more boilerplate than respx. For ASGI applications, httpx also provides ASGITransport, which lets you test against a running ASGI app (like FastAPI) without a network layer.

SSL/TLS Configuration

By default, httpx.AsyncClient validates TLS certificates against the system CA bundle, which is the correct setting for production. There are three scenarios where you need to change this: working against a server with a self-signed certificate in development, using a corporate proxy with a private CA, or connecting to an internal service whose certificate chain is not in the system bundle.

httpx delegates TLS configuration to the Python ssl module via the verify parameter on AsyncClient. As of httpx 0.28, the parameter accepts two supported forms: a boolean, or an ssl.SSLContext object for full control. Passing a string path to a CA bundle file was deprecated in 0.28 and will be removed in a future release — use ssl.create_default_context(cafile=...) to build an SSLContext instead.

import ssl
import httpx

# Option 1: Disable verification (DEVELOPMENT ONLY -- never in production)
async with httpx.AsyncClient(verify=False) as client:
    response = await client.get("https://self-signed.example.com/api")

# Option 2: Provide a custom CA bundle using an SSLContext
# (required since httpx 0.28 -- passing a string path to verify= is now deprecated)
ctx = ssl.create_default_context(cafile="/path/to/corporate-ca-bundle.pem")
async with httpx.AsyncClient(verify=ctx) as client:
    response = await client.get("https://internal.corp.example.com/api")

# Option 3: Build a custom SSLContext for full control (e.g. mutual TLS)
ctx = ssl.create_default_context()
ctx.load_verify_locations("/path/to/corporate-ca-bundle.pem")
ctx.load_cert_chain(certfile="/path/to/client-cert.pem", keyfile="/path/to/client-key.pem")

async with httpx.AsyncClient(verify=ctx) as client:
    response = await client.get("https://mtls-required.example.com/api")

To add TLS configuration to your ApiClient class, accept a verify parameter in __init__ and pass it through to the AsyncClient:

class ApiClient:
    def __init__(
        self,
        base_url: str,
        verify: bool | ssl.SSLContext = True,  # True = use certifi's CA bundle (not the OS trust store)
        # ... other params ...
    ):
        self._verify = verify
        # ... rest of __init__ unchanged ...

    async def __aenter__(self):
        self._client = httpx.AsyncClient(
            base_url=self._base_url,
            headers=self._headers,
            timeout=self._timeout,
            limits=self._limits,
            verify=self._verify,
        )
        return self
Warning

Passing verify=False disables certificate validation entirely. Any server can present any certificate and the client will accept it, making the connection vulnerable to man-in-the-middle attacks. This is acceptable in a controlled development environment on a private network; it is never acceptable in production. If you are dealing with a corporate CA, get the CA bundle from your IT team, build an ssl.SSLContext with ssl.create_default_context(cafile="/path/to/bundle.pem"), and pass that as verify=ctx instead. Passing a string path directly to verify is deprecated since httpx 0.28 and will be removed in a future version. The httpx documentation covers the current SSL API at httpx SSL documentation.

If your environment has the SSL_CERT_FILE or SSL_CERT_DIR environment variables set (common in corporate environments), httpx reads them automatically when trust_env=True, which is the default. No extra code is needed for those. However, httpx does not read REQUESTS_CA_BUNDLE — that is a requests-library-specific variable. If your team uses REQUESTS_CA_BUNDLE as a convention, you need to handle it explicitly in the client constructor:

import os
import ssl

class ApiClient:
    def __init__(self, base_url: str, verify=None, **kwargs):
        if verify is None:
            # httpx picks up SSL_CERT_FILE / SSL_CERT_DIR automatically via trust_env=True.
            # REQUESTS_CA_BUNDLE is requests-only; handle it explicitly if your team uses it.
            ca_bundle = os.environ.get("REQUESTS_CA_BUNDLE")
            if ca_bundle:
                verify = ssl.create_default_context(cafile=ca_bundle)
            else:
                verify = True  # httpx will use SSL_CERT_FILE/SSL_CERT_DIR automatically
        self._verify = verify

Streaming Large Responses

Every code example up to this point calls response.json() or response.text after a request completes. Both methods load the entire response body into memory before returning. For a JSON object that is a few kilobytes, this is fine. For a 500 MB file download, a long-running server-sent events stream, or a newline-delimited JSON API that emits one record per line, loading the full response into memory before processing it is either impractical or impossible.

httpx provides streaming responses through async with client.stream(method, url), which returns a response object where the body has not been downloaded yet. You then iterate over it using async generators.

The stream_download method below uses the aiofiles library for non-blocking file writes. Install it first:

pip install aiofiles
import httpx
import aiofiles

class ApiClient:
    # ... existing class ...

    async def stream_download(self, path: str, dest_path: str) -> int:
        """Stream a binary response directly to disk. Returns bytes written."""
        bytes_written = 0
        async with self._client.stream("GET", path) as response:
            response.raise_for_status()
            async with aiofiles.open(dest_path, "wb") as f:
                async for chunk in response.aiter_bytes(chunk_size=65536):
                    await f.write(chunk)
                    bytes_written += len(chunk)
        return bytes_written

    async def stream_ndjson(self, path: str):
        """Yield parsed objects from a newline-delimited JSON stream."""
        import json
        async with self._client.stream("GET", path) as response:
            response.raise_for_status()
            async for line in response.aiter_lines():
                line = line.strip()
                if line:
                    yield json.loads(line)

The three async iterators httpx exposes on a streaming response are aiter_bytes(chunk_size) for raw binary chunks, aiter_text() for decoded text chunks, and aiter_lines() for full lines (split on \n). The chunk_size parameter on aiter_bytes controls how many bytes are requested from the socket at a time -- 65536 bytes (64 KB) is a reasonable default that matches many OS network buffer sizes. Smaller values increase syscall overhead; larger values increase the latency before your code sees the first chunk (source: httpx Streaming Responses documentation).

Pro Tip

A streaming response holds a connection open for the duration of the iteration. This means a single long-running download occupies one of your pool's connections the entire time. If you are running many concurrent downloads, account for this in your max_connections setting. A pool of 20 connections trying to serve 30 simultaneous streaming downloads will queue requests and may trigger PoolTimeout.

CHECK YOUR UNDERSTANDING
Your ApiClient has max_connections=20. You launch 30 concurrent stream_download() calls. What is the most likely outcome?

The stream_ndjson method above is an async generator -- note the yield inside an async def. Callers consume it with async for:

async with ApiClient("https://api.example.com") as api:
    async for record in api.stream_ndjson("/events/stream"):
        process(record)

Frequently Asked Questions

Can I use one client instance for requests to multiple different APIs?

You can, but it is not recommended. The base_url parameter is set at client construction time and cannot be changed on a live client. If you omit base_url and pass full URLs to get() and post(), one client instance can reach multiple hosts -- but the connection pool is then shared across all of them. This can cause pool contention between unrelated services, and authentication headers set at the client level apply to every request regardless of destination. The cleaner pattern is one ApiClient instance per external API, each with its own base URL, auth headers, and pool configuration.

What happens if I forget to use async with and just call ApiClient(url) directly?

Without async with, __aenter__ is never called, so self._client stays None. Any call to api.get() will immediately raise AttributeError: 'NoneType' object has no attribute 'request'. Even if you manually assign self._client, __aexit__ is never called, so the connection pool is never closed -- the connections remain open until the Python process exits and the OS reclaims the sockets. In a long-running service, this leads to file descriptor leaks. Always use async with.

Can I reuse the same client instance across multiple asyncio.run() calls?

No. Each call to asyncio.run() creates a new event loop and destroys it when the coroutine finishes. An httpx.AsyncClient is bound to the event loop that was running when it was created. If you close the event loop and then try to use the client on a new one, you will get errors about using a closed event loop or a transport attached to a different loop. Create a fresh client inside each asyncio.run() call, or restructure your code to do all async work in a single top-level coroutine.

Should I put the client in a FastAPI lifespan handler or create it per request?

Always use the lifespan handler. Creating a new ApiClient per request defeats connection pooling entirely -- you spin up a new pool, make one request, and throw the pool away. The correct pattern is to create the client once at application startup and store it on app.state, then inject it into endpoints as needed. FastAPI's lifespan context manager (introduced in Starlette 0.20, available in FastAPI since version 0.93) is the standard place to do this, as it guarantees the client is properly closed when the application shuts down (source: FastAPI Events documentation).

from contextlib import asynccontextmanager
from fastapi import FastAPI, Request

@asynccontextmanager
async def lifespan(app: FastAPI):
    async with ApiClient("https://api.example.com", bearer_token="...") as api:
        app.state.api = api
        yield

app = FastAPI(lifespan=lifespan)

@app.get("/users/{user_id}")
async def get_user(user_id: int, request: Request):
    return await request.app.state.api.get(f"/users/{user_id}")

What is the difference between httpx and aiohttp for building this kind of client class?

Both libraries are production-capable. The practical differences are: httpx has a near-identical API to the synchronous requests library, making it easier to adapt existing code; it also supports both sync and async in one package. aiohttp is async-only and uses a slightly different session/connector model. httpx's Limits object maps directly to what is described in this article; aiohttp uses a TCPConnector with a limit parameter that controls per-host concurrency rather than a global pool. For new projects, httpx is generally preferred because of its cleaner API and built-in HTTP/2 support; for projects that already use aiohttp, the architectural patterns described here apply equally -- the async context manager shape and retry logic are library-agnostic.

How do I handle an API that returns something other than JSON, like plain text or a binary file?

The production client in this article returns response.json() when the response content-type is application/json, and response.text otherwise. For binary responses (images, PDFs, archives), neither is appropriate -- you want the raw bytes. Either add a separate get_bytes() method that returns response.content, or use the stream_download method from the streaming section above. Accessing response.content loads the full body into memory as a bytes object; aiter_bytes() streams it in chunks. For files larger than a few hundred megabytes, always stream.

Why does my test run much faster than the real client? Is the backoff being skipped?

Yes -- and that is expected and correct. The retry test shown in the testing section does not actually wait for backoff delays because respx intercepts requests synchronously at the transport layer before the asyncio.sleep call in the retry loop is reached. The sleep still runs, but because respx returns immediately with the mocked response, the coroutine is not suspended for the full delay interval. If you need to verify that the correct delay was calculated (not just that retries happened), patch asyncio.sleep in your test and assert on the arguments it was called with.

Task Cancellation and Connection Leaks

The most insidious source of PoolTimeout in production is not high traffic or misconfigured limits — it is task cancellation. When an asyncio task is cancelled mid-request, Python raises asyncio.CancelledError inside the coroutine at the point where it is currently awaiting. If the cancellation lands while the task is reading the response body, the connection is not cleanly returned to the pool. The pool's accounting records the connection as in use, but it will never be released. Do this enough times and the pool fills entirely with phantom connections, after which every new request raises PoolTimeout immediately.

This matters because cancellation is more common than it looks. asyncio.wait_for() cancels the inner coroutine on timeout. asyncio.wait(return_when=FIRST_COMPLETED) cancels remaining tasks. Manual task.cancel() calls in error handlers. Any of these, applied to an in-flight httpx request, can leave a connection stranded.

The fix requires two pieces working together. First, asyncio.shield() the cleanup in __aexit__ so the pool drains completely even if the outer coroutine is cancelled while cleanup is running. Second, wrap every request attempt in a try/finally-style block that catches CancelledError and ensures the response stream is explicitly closed before re-raising:

import asyncio
import httpx

class ApiClient:
    # ... existing __init__ ...

    async def __aexit__(self, *exc):
        if self._client:
            # Shield the cleanup so it completes even if the parent task
            # is cancelled while we are draining in-flight connections.
            await asyncio.shield(self._client.aclose())
            self._client = None

    async def _request(self, method: str, path: str, **kwargs):
        max_retries = kwargs.pop("max_retries", self._max_retries)
        last_exception = None

        for attempt in range(max_retries):
            response = None
            try:
                response = await self._client.request(method, path, **kwargs)

                if response.status_code == 429:
                    retry_after = float(response.headers.get("Retry-After", 1))
                    await asyncio.sleep(retry_after)
                    continue

                response.raise_for_status()
                content_type = response.headers.get("content-type", "")
                if "application/json" in content_type:
                    return response.json()
                return response.text

            except asyncio.CancelledError:
                # Cancellation arrived mid-request. Close the response
                # stream so the connection returns to the pool before
                # propagating the cancellation upward.
                if response is not None:
                    await response.aclose()
                raise  # Always re-raise CancelledError

            except httpx.HTTPStatusError as e:
                last_exception = e
                if e.response.status_code not in RETRYABLE_STATUS_CODES:
                    raise

            except (httpx.TimeoutException, httpx.ConnectError) as e:
                last_exception = e

            if attempt < max_retries - 1:
                delay = min(2 ** attempt + random.uniform(0, 1), 30)
                await asyncio.sleep(delay)

        raise last_exception

The two key changes are await asyncio.shield(self._client.aclose()) in __aexit__, and the except asyncio.CancelledError block in _request. The shield wraps the cleanup coroutine in a future the event loop runs to completion regardless of whether the enclosing task is cancelled. The CancelledError handler closes the response stream — releasing the connection back to the pool — then re-raises so cancellation still propagates normally up the call stack. Without the re-raise, the task would appear to complete successfully rather than be cancelled, silently breaking any code that depends on cancellation semantics.

Warning

asyncio.shield() does not prevent the wrapped coroutine from being cancelled if the event loop itself is shutting down — it only protects against cancellation of the immediate parent task. For __aexit__ cleanup this is the right tool. Do not use shield() around request logic itself in an attempt to "protect" requests from cancellation — that defeats the purpose of any timeout or cancel imposed by the caller and can leave coroutines running silently after the caller has already given up.

Per-Host Transport Sharding

A single AsyncClient with a shared connection pool works well when you are calling one API. Real applications call several: an internal user service, a third-party payment provider, a slow analytics platform. When all of these share one pool, a badly-behaved upstream crowds out the others. A payment provider that takes 8 seconds to respond on 20% of requests will gradually fill 20% of your pool's connections with slow in-flight requests, leaving fewer connections for the fast internal calls that should complete in 20 ms.

httpx's mounts parameter solves this by routing requests to different URL prefixes through separate AsyncHTTPTransport instances, each with its own completely isolated connection pool and timeout configuration:

import httpx

# Fast internal service: small pool, tight timeouts
internal_transport = httpx.AsyncHTTPTransport(
    limits=httpx.Limits(
        max_connections=20,
        max_keepalive_connections=15,
        keepalive_expiry=60.0,
    ),
)

# Slow third-party partner API: separate pool, relaxed timeouts
partner_transport = httpx.AsyncHTTPTransport(
    limits=httpx.Limits(
        max_connections=10,
        max_keepalive_connections=5,
        keepalive_expiry=10.0,
    ),
)

# Default transport for everything else
default_transport = httpx.AsyncHTTPTransport(
    limits=httpx.Limits(
        max_connections=30,
        max_keepalive_connections=20,
        keepalive_expiry=30.0,
    ),
)

client = httpx.AsyncClient(
    timeout=httpx.Timeout(10.0, connect=3.0),
    mounts={
        "https://internal.corp.example.com": internal_transport,
        "https://api.slow-partner.com":       partner_transport,
    },
    transport=default_transport,
)

The mounts dictionary maps URL prefixes to transport instances. httpx matches by longest prefix, so https://api.slow-partner.com/v2/orders routes through partner_transport and https://internal.corp.example.com/users routes through internal_transport. Anything else uses default_transport. A slow response on one transport does not touch the pool of any other.

To integrate this into the ApiClient class, accept an optional mounts parameter:

class ApiClient:
    def __init__(
        self,
        base_url: str,
        mounts: dict[str, httpx.AsyncHTTPTransport] | None = None,
        # ... other params ...
    ):
        self._mounts = mounts

    async def __aenter__(self):
        self._client = httpx.AsyncClient(
            base_url=self._base_url,
            headers=self._headers,
            timeout=self._timeout,
            limits=self._limits,
            mounts=self._mounts,  # None is valid — httpx uses defaults
        )
        return self
Pro Tip

You can map a URL scheme to None in mounts to completely block requests matching that prefix: mounts={"http://": None} raises an error for any plain HTTP request, enforcing HTTPS-only at the transport layer rather than relying on application-level checks. Transport sharding also lets you assign different circuit breaker instances per upstream — a slow partner API going unhealthy should not trip the breaker protecting your internal service, and with separate transports the failure accounting is naturally isolated.

Idempotency Keys for Safe POST and PATCH Retries

The retry logic built earlier correctly refuses to retry POST requests by default, because POST is not idempotent — retrying a payment submission could charge the user twice. But the warning stops there. The actual solution is idempotency keys: a UUID generated once per logical operation, sent as a request header, and reused on every retry attempt for that same operation. When the server receives a request carrying an idempotency key it has already processed, it returns the cached result instead of executing the operation again.

The implementation belongs inside _request. Generate the key once before the retry loop begins, attach it to every attempt, and let the server handle deduplication:

import uuid

NON_IDEMPOTENT_METHODS = {"POST", "PATCH"}

class ApiClient:
    async def _request(self, method: str, path: str, **kwargs):
        max_retries = kwargs.pop("max_retries", self._max_retries)
        idempotency_key = kwargs.pop("idempotency_key", None)
        last_exception = None

        # Generate once before the loop — the SAME key on every retry
        # is what lets the server recognise duplicates.
        if method.upper() in NON_IDEMPOTENT_METHODS and idempotency_key is None:
            idempotency_key = str(uuid.uuid4())

        # Extract caller-supplied headers before the loop so repeated
        # iterations do not see an empty dict after the first pop.
        req_headers = dict(kwargs.pop("headers", {}))
        if idempotency_key:
            req_headers["Idempotency-Key"] = idempotency_key

        for attempt in range(max_retries):
            try:
                response = await self._client.request(
                    method, path, headers=req_headers, **kwargs
                )
                response.raise_for_status()
                content_type = response.headers.get("content-type", "")
                return response.json() if "application/json" in content_type else response.text

            except asyncio.CancelledError:
                raise

            except httpx.HTTPStatusError as e:
                last_exception = e
                if e.response.status_code not in RETRYABLE_STATUS_CODES:
                    raise

            except (httpx.TimeoutException, httpx.ConnectError) as e:
                last_exception = e

            if attempt < max_retries - 1:
                delay = min(2 ** attempt + random.uniform(0, 1), 30)
                await asyncio.sleep(delay)

        raise last_exception

The critical design detail: the UUID is generated once before the loop, not once per attempt. Every retry sends the same key. If you generated a new UUID on each attempt, the server would treat each attempt as a distinct operation and the deduplication would not work.

CHECK YOUR UNDERSTANDING
A developer modifies _request so that a new uuid.uuid4() is generated at the top of each loop iteration instead of before the loop. What is the consequence?

Callers can also pass their own key for operations where a natural identifier exists in the business domain:

async with ApiClient("https://payments.example.com") as api:
    # Using the order_id means the same charge is always idempotent
    # even across process restarts, not just within a single request's
    # retry loop.
    result = await api.post(
        "/charges",
        json={"amount": 4999, "currency": "usd"},
        idempotency_key=f"charge-{order_id}",
    )
Note

The Idempotency-Key header name is widely used but not formally standardised — it originated in the Stripe API. Some APIs use different names: PayPal uses PayPal-Request-Id, others use X-Idempotency-Key. Server-side support is required for this to work. If the API does not implement idempotency keys, sending the header does nothing. The two most reliable indicators that an API supports them: the documentation explicitly says so, or the API returns 409 Conflict on a detected duplicate rather than executing the operation twice.

Adaptive Backoff Based on Live Response Times

The exponential backoff logic uses static delay floors: 1 second after the first failure, 2 after the second, 4 after the third. These defaults are reasonable but are chosen without any knowledge of how the server is actually performing at the moment of failure. A server that is responding in 2 seconds under normal load does not need a 4-second backoff after a single timeout. A server whose p95 response time has climbed from 200 ms to 4 seconds over the last 30 requests is signalling active degradation, and a 1-second backoff floor will pile more requests onto an already strained system.

Adaptive backoff uses the event hooks already wired into the client to maintain a rolling window of recent response times, then scales the backoff floor dynamically based on the observed p95 latency:

import collections
import time

class ApiClient:
    def __init__(self, base_url: str, adaptive_backoff: bool = True, **kwargs):
        # ... existing init ...
        self._response_times: collections.deque[float] = collections.deque(maxlen=50)
        self._adaptive_backoff = adaptive_backoff

    def _p95_response_time(self) -> float | None:
        if len(self._response_times) < 10:
            return None
        sorted_times = sorted(self._response_times)
        idx = int(len(sorted_times) * 0.95)
        return sorted_times[min(idx, len(sorted_times) - 1)]

    def _backoff_floor(self) -> float:
        """
        Adaptive backoff floor based on observed p95 latency.
        p95 < 0.5s  ->  floor = 0.5s (minimum)
        p95 > 5.0s  ->  floor = 5.0s (cap)
        Scales linearly between those extremes.
        Falls back to 1.0s when fewer than 10 samples exist.
        """
        p95 = self._p95_response_time()
        if p95 is None or not self._adaptive_backoff:
            return 1.0
        return max(0.5, min(p95, 5.0))

    async def __aenter__(self):
        async def _stamp_request(request: httpx.Request):
            request.extensions["request_start"] = time.monotonic()

        async def _record_response(response: httpx.Response):
            start = response.request.extensions.get("request_start")
            if start is not None:
                self._response_times.append(time.monotonic() - start)

        self._client = httpx.AsyncClient(
            base_url=self._base_url,
            headers=self._headers,
            timeout=self._timeout,
            limits=self._limits,
            event_hooks={
                "request":  [_stamp_request],
                "response": [_record_response],
            },
        )
        return self

    async def _request(self, method: str, path: str, **kwargs):
        max_retries = kwargs.pop("max_retries", self._max_retries)
        last_exception = None

        for attempt in range(max_retries):
            try:
                response = await self._client.request(method, path, **kwargs)
                response.raise_for_status()
                content_type = response.headers.get("content-type", "")
                return response.json() if "application/json" in content_type else response.text

            except asyncio.CancelledError:
                raise

            except httpx.HTTPStatusError as e:
                last_exception = e
                if e.response.status_code not in RETRYABLE_STATUS_CODES:
                    raise

            except (httpx.TimeoutException, httpx.ConnectError) as e:
                last_exception = e

            if attempt < max_retries - 1:
                floor = self._backoff_floor()
                delay = min(floor * (2 ** attempt) + random.uniform(0, floor), 30)
                logger.info(
                    f"Retry {attempt + 1}/{max_retries} for {method} {path} "
                    f"in {delay:.2f}s (p95={self._p95_response_time()!r}s)"
                )
                await asyncio.sleep(delay)

        raise last_exception

The mechanics: the request hook stamps request.extensions["request_start"]. The response hook reads it back and appends elapsed time to a collections.deque capped at 50 entries. _p95_response_time() sorts the deque and returns the 95th percentile. _backoff_floor() clamps the p95 to [0.5, 5.0] seconds. The backoff formula shifts from 2^attempt + jitter to floor × 2^attempt + jitter(0, floor), so when the server p95 is 200 ms the first retry delay is roughly 0.5–1.0 seconds, and when the p95 climbs to 4 seconds the first retry delay is roughly 4–8 seconds. The jitter range also scales with the floor, which preserves thundering-herd protection even under degraded conditions.

Pro Tip

Pass adaptive_backoff=False when constructing your client in tests. respx returns responses instantly, so the rolling window fills with near-zero samples and produces very short backoff floors. Disabling it in tests falls back to the standard 1-second floor and makes backoff behaviour predictable and easy to assert on.

Key Takeaways

  • Connection pooling eliminates per-request handshake overhead: Reusing TCP connections across requests to the same host skips DNS lookups, TCP handshakes, and TLS negotiations. A TLS 1.2 handshake alone requires two full round trips; for a cross-region connection with 150ms RTT, that is 300ms of overhead eliminated per pooled request. TLS 1.3 reduces this to one round trip, but even one saved round trip at scale adds up fast.
  • Understand the pool at the OS level: Each pooled connection is a kernel socket file descriptor in ESTABLISHED state. Without pooling, rapid connection churn produces thousands of sockets in TIME_WAIT, which can exhaust the ephemeral port range (roughly 28,000 ports on default Linux) and prevent new connections entirely.
  • Wrap your client in an async context manager: Implement __aenter__ and __aexit__ on your class so the underlying AsyncClient is created on entry and properly closed on exit. This prevents connection leaks and ensures the pool is cleaned up even if an exception occurs.
  • Centralize configuration in the constructor: Base URL, authentication headers, timeout settings, connection limits, and retry parameters should all be set once when the client is created. The httpx docs emphasize that using a Client instance can bring significant performance improvements compared to using the top-level API (source: httpx Client docs).
  • Build retry logic into the client, not the caller: A private _request method that handles transient errors, respects Retry-After headers, and applies exponential backoff with jitter keeps the public API simple.
  • Provide a batch method for concurrent requests: A get_many method that wraps asyncio.gather with return_exceptions=True lets callers fetch multiple resources concurrently in a single call, with partial failure handling built in.
  • Plan for pool exhaustion in long-running services: Long-lived AsyncClient instances can enter a state where the pool becomes unresponsive after sustained transient errors. Build a recycling mechanism that detects consecutive PoolTimeout exceptions and replaces the client (source: httpx Discussion #2556).
  • Cap concurrency with a semaphore, not just the connection pool: The connection pool limits how many TCP connections exist, but asyncio.Semaphore limits how many requests are actively queued. Without a semaphore, batch methods like get_many can create hundreds of waiting coroutines that trigger PoolTimeout cascades.
  • Choose between gather and TaskGroup deliberately: asyncio.gather with return_exceptions=True gives partial results when individual requests fail. asyncio.TaskGroup (Python 3.11+) gives all-or-nothing semantics with automatic cancellation of sibling tasks on failure. The Python standard library documentation notes that TaskGroup provides stronger safety guarantees than gather (source: Python asyncio docs).
  • Use event hooks for structured observability: The httpx event_hooks parameter accepts request and response callbacks that fire on every request without modifying business logic (source: httpx Event Hooks documentation).
  • Handle token expiry inside the client, not the caller: For APIs that use short-lived credentials, build a token-refresh mechanism with an asyncio.Lock to prevent concurrent refresh storms. Refresh early (before expiry) and force-refresh on 401 responses.
  • Add a circuit breaker for sustained failures: Retries handle transient errors within a single request. A circuit breaker tracks failure rates across many requests over time and stops calling an unhealthy service entirely, preventing resource exhaustion across your application.
  • Test the client with transport-level mocking: Libraries like respx intercept httpx requests at the transport layer, letting you verify retry logic, error handling, and header construction without hitting a real server.
  • Handle task cancellation to prevent phantom connections: When an asyncio task is cancelled mid-request, the response stream must be explicitly closed in a CancelledError handler or the connection is never returned to the pool. Wrap __aexit__ cleanup in asyncio.shield() to prevent partial cleanup on cancellation of the parent task.
  • Shard connection pools per upstream with mounts: A single shared pool lets a slow or unhealthy upstream consume connections that other upstreams need. httpx's mounts parameter routes different URL prefixes through completely isolated AsyncHTTPTransport instances, each with its own pool limits and timeout configuration.
  • Make POST and PATCH retries safe with idempotency keys: Generate a UUID once before the retry loop and send it as Idempotency-Key on every attempt. The same key on every retry allows the server to recognise duplicates and return the cached result, making non-idempotent operations safe to retry when the server supports it.
  • Let observed latency drive backoff floors: Static backoff floors are guesses. Tracking a rolling p95 of recent response times via event hooks and scaling the backoff floor proportionally means your client backs off more aggressively when the server is actually struggling, and recovers faster when it is not.
  • A well-built async client class is one of the highest-leverage patterns in Python async programming. It turns scattered, repetitive HTTP calls into a clean, testable, and resilient interface that encapsulates everything from kernel-level connection reuse to application-level circuit breaking. Build it once, tune the configuration for your API, and every part of your codebase benefits from connection pooling, automatic retries, structured concurrency, and centralized error handling without thinking about it.

    Certificate of Completion
    Final Exam
    Pass mark: 80% · Score 80% or higher to receive your certificate · 14 questions

    Enter your name as you want it to appear on your certificate, then start the exam. Your name is used only to generate your certificate and is never transmitted or stored anywhere.

    Question 1 of 14