How to Build an Async REST API Client Class in Python with Connection Pooling

Sprinkling httpx.AsyncClient() calls throughout your codebase works for small scripts, but it becomes a maintenance problem as your project grows. You end up repeating the same base URL, authentication headers, timeout settings, and error handling in every file that talks to your API. The better approach is to build a dedicated client class that encapsulates all of this in one place -- a single, reusable object with connection pooling, centralized configuration, and a clean interface for the rest of your code to use.

Why Connection Pooling Matters

Every time you create a new httpx.AsyncClient() or call httpx.get() at the module level, a new TCP connection is established. That means a fresh DNS lookup, TCP handshake, and (for HTTPS) a TLS negotiation -- all before a single byte of your request payload is sent. For a single request this overhead is barely noticeable. For 100 requests it adds up to seconds of wasted time.

The cost is measurable. A TLS 1.2 handshake requires two full round trips after the TCP connection is established, and a TLS 1.3 handshake requires one. For a cross-region connection with 150ms round-trip time, that translates to 300-450ms of setup overhead per new connection before any application data flows. ThousandEyes research measured this directly: a user in London connecting to a server in San Jose adds roughly 150ms per round trip, meaning a full TLS 1.2 handshake alone costs around 300ms (source: ThousandEyes, "Optimizing Web Performance with TLS 1.3").

A connection pool solves this by keeping TCP connections open after a request completes. The next request to the same host reuses the existing connection, skipping the handshake entirely. The httpx AsyncClient provides connection pooling by default, but you need to configure it correctly and make sure you are reusing the same client instance across your application. The official httpx documentation specifically warns against creating multiple client instances in tight loops, because each new instance spins up its own connection pool and defeats the purpose of reuse (source: httpx Async Support docs).

Note

The default AsyncClient pool allows up to 100 total connections and 20 keep-alive connections, with idle connections expiring after 5 seconds. These defaults are defined in the httpx Resource Limits documentation and match the Limits(max_connections=100, max_keepalive_connections=20, keepalive_expiry=5.0) signature in the httpx API reference. For high-throughput applications, you will want to tune these values.

What Happens Inside the Connection Pool

To understand why connection pooling saves so much time, it helps to understand what the operating system is doing under the hood every time a new TCP connection is opened. This is the invisible work that a pooled connection skips entirely.

When your code calls await client.get("/users/1") on a cold start -- no existing connections in the pool -- the following sequence executes at the kernel level. The OS resolves the hostname through DNS (which itself may involve multiple UDP round trips to nameservers), then allocates a socket file descriptor, then performs a three-way TCP handshake (SYN, SYN-ACK, ACK), then negotiates TLS by exchanging cipher suites and certificates, and only then does the HTTP request travel over the wire. Each of those steps involves at least one full network round trip, and for TLS 1.2, the handshake alone requires two. On a connection with 100ms round-trip latency, the total overhead before the first byte of application data flows can exceed 400ms.

A pooled connection eliminates all of that. The socket file descriptor stays open, the TLS session remains negotiated, and the TCP state machine stays in the ESTABLISHED state. The next request on that connection skips straight to writing HTTP frames onto an already-warm pipe. The operating system maintains the connection state in kernel memory -- around 3-4 KB per socket on Linux -- which is a trivial cost compared to the latency savings.

The Keep-Alive Lifecycle

HTTP keep-alive is the mechanism that makes pooling work at the protocol level. After a response is received, the server does not close the connection. Instead, the connection stays open, waiting for the next request. The httpx keepalive_expiry setting controls how long the client keeps an idle connection before closing it. But the server has its own timeout, and it may close the connection before your client does. When this happens, the next request on that connection hits a "reset" or "broken pipe" error on the first byte sent, and the pool transparently retries on a fresh connection. This is why you occasionally see a single slow request in an otherwise fast stream -- the pool is recovering from a server-side closure.

There is also a subtler lifecycle issue. When a TCP connection is closed, the socket enters a TIME_WAIT state on the side that initiated the close. During TIME_WAIT (which lasts 60 seconds by default on Linux), that socket's port number is unavailable for reuse. If your application opens and closes thousands of connections rapidly -- the exact pattern you get when you create a new AsyncClient per request -- you can exhaust the local port range entirely. The kernel has roughly 28,000 ephemeral ports available by default (ports 32768-60999 on most Linux distributions). At high throughput without pooling, you will hit this limit, and new connections will fail with OSError: [Errno 99] Cannot assign requested address. Connection pooling prevents this entirely because pooled connections are never closed during normal operation.

Pro Tip

You can observe your application's TCP connection state distribution with ss -s or ss -tnap | grep ESTAB on Linux. If you see a large number of sockets in TIME_WAIT relative to ESTABLISHED, your application is churning connections instead of reusing them. A healthy async client using connection pooling will show a stable, small number of ESTABLISHED connections and very few TIME_WAIT entries.

Pool Sizing and the Queuing Problem

The connection pool is not just a bag of open sockets. It is a bounded queue with two distinct populations: active connections (currently handling a request) and idle connections (finished their last request, waiting for the next one). The max_connections setting caps the total of both populations. The max_keepalive_connections setting caps only the idle population.

This distinction matters because the two settings create different failure modes. If your idle limit is too low, the pool will close connections that could have been reused, forcing new handshakes on the next burst of requests. If your total connection limit is too low relative to your concurrency, requests will queue up waiting for a connection and eventually trigger PoolTimeout. The pool timeout (configured via httpx.Timeout(pool=...)) controls how long a request will wait in that queue before giving up. It is the timeout you are least likely to configure explicitly and the one most likely to bite you in production -- because the default pool timeout inherits from the overall timeout, and a 5-second wait for a connection slot is often too short for bursty workloads.

The Minimal Client Class

The simplest useful client wraps AsyncClient in an async context manager so the connection pool is created when you enter the block and closed when you exit.

import httpx

class ApiClient:
    def __init__(self, base_url: str):
        self._base_url = base_url
        self._client: httpx.AsyncClient | None = None

    async def __aenter__(self):
        self._client = httpx.AsyncClient(base_url=self._base_url)
        return self

    async def __aexit__(self, *exc):
        await self._client.aclose()
        self._client = None

    async def get(self, path: str, **kwargs) -> dict:
        response = await self._client.get(path, **kwargs)
        response.raise_for_status()
        return response.json()

    async def post(self, path: str, **kwargs) -> dict:
        response = await self._client.post(path, **kwargs)
        response.raise_for_status()
        return response.json()

# Usage
async def main():
    async with ApiClient("https://jsonplaceholder.typicode.com") as api:
        user = await api.get("/users/1")
        print(user["name"])

This class gives you three things that raw httpx.get() calls do not: the base URL is set once and shared across all requests, the connection pool is reused, and the client is properly closed when the block exits. Every call to api.get() or api.post() uses a relative path, keeping your business logic clean.

Task Safety

The AsyncClient is designed to be shared across concurrent async tasks within the same event loop. When asked directly about this, httpx maintainer Tom Christie confirmed: "HTTPX is thread and task safe" (source: httpx Discussion #1633). Sharing one client instance is not just safe -- it is the intended usage pattern and the only way to get full benefit from the connection pool. Do not, however, share a single AsyncClient across multiple event loops.

Adding Configuration: Timeouts and Limits

A production client needs explicit control over timeouts and connection pool sizing. Without these, a slow server can hang your application indefinitely, or a burst of requests can exhaust your system's file descriptors.

import httpx

class ApiClient:
    def __init__(
        self,
        base_url: str,
        timeout: float = 10.0,
        max_connections: int = 50,
        max_keepalive: int = 20,
    ):
        self._base_url = base_url
        self._timeout = httpx.Timeout(
            timeout,
            connect=5.0,
        )
        self._limits = httpx.Limits(
            max_connections=max_connections,
            max_keepalive_connections=max_keepalive,
            keepalive_expiry=30.0,
        )
        self._client: httpx.AsyncClient | None = None

    async def __aenter__(self):
        self._client = httpx.AsyncClient(
            base_url=self._base_url,
            timeout=self._timeout,
            limits=self._limits,
        )
        return self

    async def __aexit__(self, *exc):
        await self._client.aclose()
        self._client = None

The httpx.Timeout object sets a 10-second overall timeout with a 5-second connection timeout. Without an explicit timeout, httpx defaults to 5 seconds for all operations -- connect, read, write, and pool acquisition (source: httpx Timeouts documentation). The httpx.Limits object caps the pool at 50 total connections and 20 idle keep-alive connections, with idle connections closed after 30 seconds. Note that the pool timeout (how long to wait for an available connection from the pool) is separate from the connect timeout -- if all connections are in use and the pool is full, httpx.PoolTimeout is raised, not ConnectTimeout. These values should be tuned to match your API's expected latency and your application's concurrency requirements.

Pro Tip

The keepalive_expiry controls how long an idle connection is kept open. Set it higher (60-120 seconds) for APIs you call frequently, and lower (5-10 seconds) for APIs you call sporadically. Keeping connections open costs a small amount of memory but saves significant latency on the next request.

Adding Authentication

Authentication headers should be set once at the client level, not repeated in every request. The AsyncClient accepts a headers parameter that applies to all outgoing requests. Per-request headers can still be passed to individual calls and will be merged with the client-level headers (source: httpx Client documentation).

class ApiClient:
    def __init__(
        self,
        base_url: str,
        api_key: str | None = None,
        bearer_token: str | None = None,
        timeout: float = 10.0,
        max_connections: int = 50,
        max_keepalive: int = 20,
    ):
        self._base_url = base_url
        self._headers = {"Accept": "application/json"}

        if api_key:
            self._headers["X-API-Key"] = api_key
        if bearer_token:
            self._headers["Authorization"] = f"Bearer {bearer_token}"

        self._timeout = httpx.Timeout(timeout, connect=5.0)
        self._limits = httpx.Limits(
            max_connections=max_connections,
            max_keepalive_connections=max_keepalive,
            keepalive_expiry=30.0,
        )
        self._client: httpx.AsyncClient | None = None

    async def __aenter__(self):
        self._client = httpx.AsyncClient(
            base_url=self._base_url,
            headers=self._headers,
            timeout=self._timeout,
            limits=self._limits,
        )
        return self

Now you can instantiate the client with ApiClient("https://api.example.com", bearer_token="abc123") and every request includes the Authorization header automatically. Individual requests can still override or add headers by passing a headers parameter to get() or post().

Adding Retry Logic

A robust client handles transient failures transparently. Adding retry logic with exponential backoff inside the client class means the rest of your code does not need to worry about network hiccups or temporary server errors.

import asyncio
import random
import logging

logger = logging.getLogger(__name__)

RETRYABLE_STATUS_CODES = {429, 500, 502, 503, 504}

class ApiClient:
    # ... __init__ and __aenter__/__aexit__ from above ...

    async def _request(self, method: str, path: str, max_retries: int = 3, **kwargs):
        last_exception = None

        for attempt in range(max_retries):
            try:
                response = await self._client.request(method, path, **kwargs)

                if response.status_code == 429:
                    retry_after = float(response.headers.get("Retry-After", 1))
                    logger.warning(f"Rate limited on {path}, waiting {retry_after}s")
                    await asyncio.sleep(retry_after)
                    continue

                response.raise_for_status()
                return response.json()

            except httpx.HTTPStatusError as e:
                last_exception = e
                if e.response.status_code not in RETRYABLE_STATUS_CODES:
                    raise

            except (httpx.TimeoutException, httpx.ConnectError) as e:
                last_exception = e

            if attempt < max_retries - 1:
                delay = min(2 ** attempt + random.uniform(0, 1), 30)
                logger.info(f"Retry {attempt + 1}/{max_retries} for {path} in {delay:.1f}s")
                await asyncio.sleep(delay)

        raise last_exception

    async def get(self, path: str, **kwargs) -> dict:
        return await self._request("GET", path, **kwargs)

    async def post(self, path: str, **kwargs) -> dict:
        return await self._request("POST", path, **kwargs)

    async def put(self, path: str, **kwargs) -> dict:
        return await self._request("PUT", path, **kwargs)

    async def delete(self, path: str, **kwargs) -> dict:
        return await self._request("DELETE", path, **kwargs)

The _request method is the internal engine. It retries on transient HTTP errors and network failures, respects Retry-After headers for rate-limited responses, applies exponential backoff with jitter, and raises immediately on non-retryable client errors (4xx except 429). The public methods (get, post, put, delete) are thin wrappers that delegate to _request.

Warning

Be careful retrying POST requests. Unlike GET requests, POST requests may not be idempotent -- retrying a payment submission could charge the user twice. Only retry POST requests if the API supports idempotency keys or if you know the operation is safe to repeat.

The Full Production Client

Here is the complete class with all the pieces assembled: connection pooling, timeouts, authentication, retry logic, and concurrent batch fetching.

import asyncio
import random
import logging
import httpx

logger = logging.getLogger(__name__)

RETRYABLE_STATUS_CODES = {429, 500, 502, 503, 504}

class ApiClient:
    def __init__(
        self,
        base_url: str,
        api_key: str | None = None,
        bearer_token: str | None = None,
        timeout: float = 10.0,
        max_connections: int = 50,
        max_keepalive: int = 20,
        max_retries: int = 3,
    ):
        self._base_url = base_url
        self._max_retries = max_retries
        self._headers = {"Accept": "application/json"}

        if api_key:
            self._headers["X-API-Key"] = api_key
        if bearer_token:
            self._headers["Authorization"] = f"Bearer {bearer_token}"

        self._timeout = httpx.Timeout(timeout, connect=5.0)
        self._limits = httpx.Limits(
            max_connections=max_connections,
            max_keepalive_connections=max_keepalive,
            keepalive_expiry=30.0,
        )
        self._client: httpx.AsyncClient | None = None

    async def __aenter__(self):
        self._client = httpx.AsyncClient(
            base_url=self._base_url,
            headers=self._headers,
            timeout=self._timeout,
            limits=self._limits,
        )
        return self

    async def __aexit__(self, *exc):
        if self._client:
            await self._client.aclose()
            self._client = None

    async def _request(self, method: str, path: str, **kwargs):
        max_retries = kwargs.pop("max_retries", self._max_retries)
        last_exception = None

        for attempt in range(max_retries):
            try:
                response = await self._client.request(method, path, **kwargs)

                if response.status_code == 429:
                    retry_after = float(response.headers.get("Retry-After", 1))
                    logger.warning(f"Rate limited on {path}, retrying in {retry_after}s")
                    await asyncio.sleep(retry_after)
                    continue

                response.raise_for_status()

                content_type = response.headers.get("content-type", "")
                if "application/json" in content_type:
                    return response.json()
                return response.text

            except httpx.HTTPStatusError as e:
                last_exception = e
                if e.response.status_code not in RETRYABLE_STATUS_CODES:
                    raise

            except (httpx.TimeoutException, httpx.ConnectError) as e:
                last_exception = e

            if attempt < max_retries - 1:
                delay = min(2 ** attempt + random.uniform(0, 1), 30)
                logger.info(f"Retry {attempt + 1}/{max_retries} for {method} {path} in {delay:.1f}s")
                await asyncio.sleep(delay)

        raise last_exception

    async def get(self, path: str, **kwargs):
        return await self._request("GET", path, **kwargs)

    async def post(self, path: str, **kwargs):
        return await self._request("POST", path, **kwargs)

    async def put(self, path: str, **kwargs):
        return await self._request("PUT", path, **kwargs)

    async def delete(self, path: str, **kwargs):
        return await self._request("DELETE", path, **kwargs)

    async def get_many(self, paths: list[str], **kwargs) -> list:
        tasks = [self.get(path, **kwargs) for path in paths]
        return await asyncio.gather(*tasks, return_exceptions=True)

The get_many method is a convenience for batch fetching. It takes a list of paths, creates a coroutine for each, and runs them concurrently with asyncio.gather. The return_exceptions=True flag ensures that one failed request does not cancel the others -- you get a mix of results and exceptions in the returned list.

Using the Client in Your Application

import asyncio

async def main():
    async with ApiClient(
        base_url="https://jsonplaceholder.typicode.com",
        timeout=15.0,
        max_connections=30,
    ) as api:

        # Single request
        user = await api.get("/users/1")
        print(f"User: {user['name']}")

        # Concurrent batch
        paths = [f"/posts/{i}" for i in range(1, 21)]
        results = await api.get_many(paths)

        successes = [r for r in results if not isinstance(r, Exception)]
        failures = [r for r in results if isinstance(r, Exception)]
        print(f"Fetched {len(successes)} posts, {len(failures)} failures")

asyncio.run(main())

The calling code is clean. It does not know or care about connection pooling, retry logic, or timeout configuration. It creates a client, calls get() or get_many(), and receives data. All the infrastructure concerns are hidden inside the class.

Pro Tip

In a FastAPI application, create the client in your lifespan handler and store it on app.state. This gives every endpoint access to the same pooled client without creating it per request. See the FastAPI article in this series for the full pattern.

Known Production Pitfall: PoolTimeout Exhaustion

There is a well-documented production issue where a long-lived AsyncClient can enter a state where every request raises httpx.PoolTimeout, even at low request rates. This has been reported by teams running httpx at scale (source: httpx Discussion #2556). The root cause appears related to connection pool state corruption after extended periods of transient errors -- particularly ReadTimeout exceptions that leave connections in an inconsistent state.

The practical mitigation is to add a health-check mechanism to your client class. If multiple consecutive PoolTimeout exceptions occur, destroy the current client and create a fresh one. Here is a minimal implementation that extends the production client from the previous section:

    async def _request(self, method: str, path: str, **kwargs):
        max_retries = kwargs.pop("max_retries", self._max_retries)
        last_exception = None
        pool_timeouts = 0

        for attempt in range(max_retries):
            try:
                response = await self._client.request(method, path, **kwargs)
                # ... normal retry logic from above ...

            except httpx.PoolTimeout as e:
                pool_timeouts += 1
                last_exception = e
                if pool_timeouts >= 2:
                    logger.error("Pool exhausted, recycling client")
                    old_client = self._client
                    self._client = httpx.AsyncClient(
                        base_url=self._base_url,
                        headers=self._headers,
                        timeout=self._timeout,
                        limits=self._limits,
                    )
                    await old_client.aclose()
                    pool_timeouts = 0

            # ... rest of retry logic ...

        raise last_exception

This pattern -- recycle the client when the pool becomes unresponsive -- has been validated by production teams handling sustained traffic. The key is to close the old client asynchronously so in-flight requests on its connections are not abruptly terminated.

HTTP/2 Note

If the API you are calling supports HTTP/2, you can enable it by passing http2=True to the AsyncClient (requires the h2 package: pip install httpx[http2]). HTTP/2 multiplexes many requests over a single TCP connection, so connection pool exhaustion is far less likely. However, HTTP/2 support in httpx is disabled by default because not all servers handle it correctly. The httpx HTTP/2 documentation covers the tradeoffs.

Controlling Concurrency with Semaphores

The get_many method fires every request simultaneously. If you pass it 500 paths, 500 coroutines compete for the connection pool at once. Even with a pool limit of 50, this creates a pile of coroutines waiting on PoolTimeout and can overwhelm an API that enforces per-second rate limits. The connection pool controls how many TCP connections exist, but it does not control how many requests are queued up waiting for those connections.

An asyncio.Semaphore solves this by capping how many requests are in flight at any given moment. Instead of launching all coroutines and letting the pool sort it out, a semaphore holds excess coroutines at the gate until a slot opens.

class ApiClient:
    def __init__(
        self,
        base_url: str,
        # ... other params from production client ...
        max_concurrency: int = 20,
    ):
        # ... existing init code ...
        self._semaphore = asyncio.Semaphore(max_concurrency)

    async def _request(self, method: str, path: str, **kwargs):
        async with self._semaphore:
            max_retries = kwargs.pop("max_retries", self._max_retries)
            last_exception = None

            for attempt in range(max_retries):
                # ... existing retry logic unchanged ...
                pass

            raise last_exception

    async def get_many(self, paths: list[str], **kwargs) -> list:
        tasks = [self.get(path, **kwargs) for path in paths]
        return await asyncio.gather(*tasks, return_exceptions=True)

Wrapping the semaphore around the entire _request method (including retries) means a single request holds its slot for the full duration of its retry cycle. This prevents a scenario where retrying requests flood back into the queue and starve new requests. The get_many method itself does not change at all -- the concurrency control is invisible to callers because it lives inside _request.

Pro Tip

Set max_concurrency lower than max_connections. If your pool allows 50 connections, a semaphore of 20-30 leaves headroom for connection churn without triggering PoolTimeout. The httpx maintainer Tom Christie has recommended using Python 3.11's asyncio.TaskGroup as a more structured alternative to bare asyncio.gather for managing concurrent work (source: httpx Discussion #2662). TaskGroup cancels all sibling tasks if any one task raises an unhandled exception, which gives you cleaner failure semantics than gather with return_exceptions=True.

Structured Concurrency with TaskGroup

The get_many method uses asyncio.gather because it was the only practical option before Python 3.11. But gather has a fundamental design problem: when one task fails, the other tasks keep running unless you explicitly handle cancellation. With return_exceptions=True, failures are silently mixed into the results list, and the caller has to filter them out manually. With return_exceptions=False (the default), the first exception propagates, but the remaining tasks become orphans -- still running in the background with no parent waiting for them.

Python 3.11 introduced asyncio.TaskGroup to solve this. A TaskGroup is an async context manager that owns every task created inside it. When the context manager exits, it waits for all tasks to finish. If any task raises an exception, the TaskGroup cancels every sibling task and raises an ExceptionGroup containing all the errors. This is structured concurrency: the lifetime of every concurrent operation is scoped to a visible block, and no task can outlive its parent.

Here is a get_many alternative using TaskGroup:

    async def get_many(self, paths: list[str], **kwargs) -> list:
        results: dict[int, object] = {}

        async def _fetch(index: int, path: str):
            results[index] = await self.get(path, **kwargs)

        async with asyncio.TaskGroup() as tg:
            for i, path in enumerate(paths):
                tg.create_task(_fetch(i, path))

        return [results[i] for i in sorted(results)]

The behavior difference is significant. If one request fails with a non-retryable error (say, a 404), the TaskGroup cancels every other in-flight request immediately. No wasted work, no orphaned coroutines consuming connection pool slots. The caller receives an ExceptionGroup that they handle with Python 3.11's except* syntax, which lets them catch different exception types from the group selectively.

async def main():
    async with ApiClient("https://api.example.com") as api:
        try:
            results = await api.get_many(["/users/1", "/users/999", "/users/2"])
        except* httpx.HTTPStatusError as eg:
            for exc in eg.exceptions:
                print(f"HTTP error on {exc.request.url}: {exc.response.status_code}")
        except* httpx.TimeoutException as eg:
            print(f"{len(eg.exceptions)} requests timed out")

The tradeoff is clear: gather with return_exceptions=True gives you partial results -- you get data from the requests that succeeded even if others failed. TaskGroup gives you all-or-nothing semantics -- if any request fails, everything is cancelled and you get an ExceptionGroup instead of results. Which approach is correct depends on your use case. Fetching user profiles for a dashboard page? Partial results are better -- show what you have. Fetching all parts of an order before processing a payment? All-or-nothing is safer -- you do not want to charge for an incomplete order.

Note

The Python standard library documentation states that TaskGroup provides stronger safety guarantees than gather because it cancels remaining tasks when a subtask fails (source: Python asyncio Task documentation). If you need the partial-results behavior of gather but the structured lifetime guarantees of TaskGroup, wrap each inner task in a try/except that catches and stores exceptions instead of letting them propagate. This gives you both: scoped task lifetimes and resilient partial results.

Adding Observability with Event Hooks

When something goes wrong in production -- slow responses, unexpected status codes, auth failures -- you need visibility into what the client is sending and receiving. Sprinkling logger.info calls inside _request is one approach, but httpx provides a cleaner mechanism: event hooks. These are callbacks that fire before every request and after every response, and they are configured at the client level so they apply globally without modifying your request logic (source: httpx Event Hooks documentation).

import time
import logging
import httpx

logger = logging.getLogger(__name__)

async def log_request(request: httpx.Request):
    request.extensions["request_start"] = time.monotonic()
    logger.info(f"--> {request.method} {request.url}")

async def log_response(response: httpx.Response):
    request = response.request
    elapsed = time.monotonic() - request.extensions.get("request_start", 0)
    logger.info(
        f"<-- {response.status_code} {request.method} {request.url} "
        f"({elapsed:.3f}s)"
    )

class ApiClient:
    async def __aenter__(self):
        self._client = httpx.AsyncClient(
            base_url=self._base_url,
            headers=self._headers,
            timeout=self._timeout,
            limits=self._limits,
            event_hooks={
                "request": [log_request],
                "response": [log_response],
            },
        )
        return self

The request.extensions dictionary is a convenient place to stash per-request metadata like timestamps. The request hook writes the start time, and the response hook reads it back to calculate elapsed duration. Because hooks registered on an AsyncClient must be async functions (source: httpx Event Hooks documentation), make sure you define them with async def even if they do not perform any I/O.

Warning

Do not call response.json() or response.read() inside a response hook unless you have a specific reason. Reading the response body in a hook consumes the stream, which means the caller's subsequent response.json() call will return an empty result. If you need to inspect the body for logging, call await response.aread() inside the hook -- this buffers the body so it remains available for subsequent reads.

Handling Token Refresh and Expiring Credentials

The authentication section earlier covers static tokens -- API keys and bearer tokens that are set once and never change. In practice, many APIs issue short-lived tokens (OAuth2 access tokens, JWTs) that expire after minutes or hours. If your client class does not handle renewal, every request after expiration fails with a 401 until someone manually passes in a new token.

The httpx library supports custom authentication flows through its Auth class, which can intercept requests and modify headers dynamically. But for a straightforward token-refresh pattern, a simpler approach works: check the token's expiry before each request and refresh it when needed.

import time
import asyncio
import httpx

class ApiClient:
    def __init__(
        self,
        base_url: str,
        client_id: str,
        client_secret: str,
        token_url: str,
        # ... other params ...
    ):
        self._base_url = base_url
        self._client_id = client_id
        self._client_secret = client_secret
        self._token_url = token_url
        self._access_token: str | None = None
        self._token_expiry: float = 0.0
        self._token_lock = asyncio.Lock()
        # ... rest of init ...

    async def _ensure_token(self):
        if self._access_token and time.monotonic() < self._token_expiry:
            return

        async with self._token_lock:
            # Double-check after acquiring lock
            if self._access_token and time.monotonic() < self._token_expiry:
                return

            response = await self._client.post(
                self._token_url,
                data={
                    "grant_type": "client_credentials",
                    "client_id": self._client_id,
                    "client_secret": self._client_secret,
                },
            )
            response.raise_for_status()
            data = response.json()
            self._access_token = data["access_token"]
            # Refresh 60 seconds early to avoid edge-case expiry
            self._token_expiry = time.monotonic() + data["expires_in"] - 60

    async def _request(self, method: str, path: str, **kwargs):
        await self._ensure_token()
        headers = kwargs.pop("headers", {})
        headers["Authorization"] = f"Bearer {self._access_token}"

        # ... existing retry logic, but also handle 401 ...
        response = await self._client.request(
            method, path, headers=headers, **kwargs
        )

        if response.status_code == 401:
            self._access_token = None  # Force refresh
            await self._ensure_token()
            headers["Authorization"] = f"Bearer {self._access_token}"
            response = await self._client.request(
                method, path, headers=headers, **kwargs
            )

        response.raise_for_status()
        return response.json()

The asyncio.Lock in _ensure_token is critical. Without it, 50 concurrent requests that all detect an expired token will each fire off a separate token refresh request. The lock ensures only one coroutine performs the refresh while the others wait, and the double-check pattern after acquiring the lock prevents redundant refreshes. The 60-second early renewal buffer avoids a race condition where a token expires between the check and the moment the request reaches the server.

Adding a Circuit Breaker

Retry logic handles transient failures -- a single 503, a momentary network hiccup, a brief rate-limit window. But what happens when the API you are calling is not just momentarily slow but genuinely down? Your retry logic keeps trying, each attempt burning a connection pool slot and a semaphore slot for the full backoff duration. Multiply that by every coroutine in your application, and you have a system that is spending all of its resources patiently waiting for a service that is not coming back anytime soon. Retries handle the symptom. A circuit breaker addresses the underlying problem: stop calling a service that is known to be unhealthy.

The circuit breaker pattern, described in Michael Nygard's Release It!, works by tracking failure rates and short-circuiting requests when a threshold is exceeded. It has three states: closed (normal operation, requests flow through), open (failures exceeded the threshold, requests fail immediately without hitting the network), and half-open (after a recovery timeout, one test request is allowed through to check if the service has recovered).

Here is a minimal circuit breaker that integrates directly into the client class:

import time

class CircuitOpen(Exception):
    """Raised when the circuit breaker is open."""
    pass

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 30.0):
        self._failure_count = 0
        self._failure_threshold = failure_threshold
        self._recovery_timeout = recovery_timeout
        self._state = "closed"  # closed | open | half-open
        self._opened_at: float = 0.0

    def record_success(self):
        self._failure_count = 0
        self._state = "closed"

    def record_failure(self):
        self._failure_count += 1
        if self._failure_count >= self._failure_threshold:
            self._state = "open"
            self._opened_at = time.monotonic()

    def allow_request(self) -> bool:
        if self._state == "closed":
            return True
        if self._state == "open":
            if time.monotonic() - self._opened_at >= self._recovery_timeout:
                self._state = "half-open"
                return True
            return False
        # half-open: allow one request to test recovery
        return True

Integrating this into the client's _request method requires only a few lines. Check the breaker before sending a request, record the outcome after:

class ApiClient:
    def __init__(self, base_url: str, **kwargs):
        # ... existing init ...
        self._breaker = CircuitBreaker(
            failure_threshold=5,
            recovery_timeout=30.0,
        )

    async def _request(self, method: str, path: str, **kwargs):
        if not self._breaker.allow_request():
            raise CircuitOpen(
                f"Circuit breaker open for {self._base_url}, "
                f"retry after {self._breaker._recovery_timeout}s"
            )

        try:
            result = await self._do_request(method, path, **kwargs)
            self._breaker.record_success()
            return result
        except (httpx.TimeoutException, httpx.ConnectError) as e:
            self._breaker.record_failure()
            raise

The critical design decision is what counts as a failure. Not every error should trip the breaker. A 404 means the resource does not exist -- that is a correct response from a healthy server. A 400 means you sent a bad request -- also not a server health issue. The breaker should only track failures that indicate the server itself is struggling: connection errors, timeouts, 502/503/504 responses, and PoolTimeout exceptions. Client-side errors (4xx except 429) should pass through without affecting the breaker state.

Notice also that the circuit breaker and the retry logic serve different layers of the failure spectrum. Retries handle isolated, transient errors within a single request lifecycle. The circuit breaker aggregates failure signals across many requests over time and makes a system-level decision to stop trying. Retries operate within seconds. The circuit breaker operates over minutes. Both belong in the client class, but they solve different problems.

Pro Tip

For production systems, consider the aiobreaker library, which is an asyncio-native circuit breaker with configurable failure thresholds, recovery timeouts, excluded exceptions, and listener callbacks for monitoring. It works as a decorator or a context manager and integrates cleanly with httpx clients. If you need shared circuit breaker state across multiple workers or instances, libraries like pybreaker support Redis-backed state storage.

Testing the Client with respx

A client class that wraps all your HTTP logic in one place is also a client class you can test without hitting a real server. The respx library is purpose-built for mocking httpx. It intercepts outgoing requests at the transport layer and returns canned responses, so your retry logic, error handling, and response parsing all execute exactly as they would in production.

import pytest
import respx
import httpx
from your_module import ApiClient

@pytest.mark.asyncio
@respx.mock
async def test_get_returns_json():
    respx.get("https://api.example.com/users/1").mock(
        return_value=httpx.Response(200, json={"id": 1, "name": "Alice"})
    )

    async with ApiClient("https://api.example.com") as api:
        result = await api.get("/users/1")

    assert result == {"id": 1, "name": "Alice"}

@pytest.mark.asyncio
@respx.mock
async def test_retry_on_server_error():
    route = respx.get("https://api.example.com/data")
    route.side_effect = [
        httpx.Response(503),
        httpx.Response(503),
        httpx.Response(200, json={"status": "ok"}),
    ]

    async with ApiClient("https://api.example.com", max_retries=3) as api:
        result = await api.get("/data")

    assert result == {"status": "ok"}
    assert route.call_count == 3

@pytest.mark.asyncio
@respx.mock
async def test_raises_on_client_error():
    respx.get("https://api.example.com/missing").mock(
        return_value=httpx.Response(404)
    )

    async with ApiClient("https://api.example.com") as api:
        with pytest.raises(httpx.HTTPStatusError) as exc_info:
            await api.get("/missing")

    assert exc_info.value.response.status_code == 404

The side_effect list in the retry test is the key pattern. Each call to the mocked route returns the next response in the list, letting you verify that the client retries on 503 and succeeds on the third attempt without waiting for real backoff delays. For tests where you need to verify request headers, authentication, or query parameters, respx captures every request made to a route via route.calls, so you can assert on exactly what was sent.

Alternative: httpx's Built-in Transport Mocking

If you prefer not to add a dependency, httpx provides httpx.MockTransport which accepts a handler function and can be passed directly to AsyncClient(transport=...) (source: httpx Transports documentation). This gives you full control but requires more boilerplate than respx. For ASGI applications, httpx also provides ASGITransport, which lets you test against a running ASGI app (like FastAPI) without a network layer.

Key Takeaways

  1. Connection pooling eliminates per-request handshake overhead: Reusing TCP connections across requests to the same host skips DNS lookups, TCP handshakes, and TLS negotiations. A TLS 1.2 handshake alone requires two full round trips; for a cross-region connection with 150ms RTT, that is 300ms of overhead eliminated per pooled request (source: ThousandEyes). TLS 1.3 reduces this to one round trip, but even one saved round trip at scale adds up fast.
  2. Understand the pool at the OS level: Each pooled connection is a kernel socket file descriptor in ESTABLISHED state. Without pooling, rapid connection churn produces thousands of sockets in TIME_WAIT, which can exhaust the ephemeral port range (roughly 28,000 ports on default Linux) and prevent new connections entirely. Pooling prevents this by keeping connections open and reusing them.
  3. Wrap your client in an async context manager: Implement __aenter__ and __aexit__ on your class so the underlying AsyncClient is created on entry and properly closed on exit. This prevents connection leaks and ensures the pool is cleaned up even if an exception occurs.
  4. Centralize configuration in the constructor: Base URL, authentication headers, timeout settings, connection limits, and retry parameters should all be set once when the client is created. Individual request calls should only specify the path and request-specific data. The httpx docs emphasize that using a Client instance can bring significant performance improvements compared to using the top-level API (source: httpx Client docs).
  5. Build retry logic into the client, not the caller: A private _request method that handles transient errors, respects Retry-After headers, and applies exponential backoff with jitter keeps the public API simple. Callers get transparent resilience without writing any error handling code.
  6. Provide a batch method for concurrent requests: A get_many method that wraps asyncio.gather with return_exceptions=True lets callers fetch multiple resources concurrently in a single call, with partial failure handling built in.
  7. Plan for pool exhaustion in long-running services: Long-lived AsyncClient instances can enter a state where the pool becomes unresponsive after sustained transient errors. Build a recycling mechanism that detects consecutive PoolTimeout exceptions and replaces the client (source: httpx Discussion #2556).
  8. Cap concurrency with a semaphore, not just the connection pool: The connection pool limits how many TCP connections exist, but asyncio.Semaphore limits how many requests are actively queued. Without a semaphore, batch methods like get_many can create hundreds of waiting coroutines that trigger PoolTimeout cascades.
  9. Choose between gather and TaskGroup deliberately: asyncio.gather with return_exceptions=True gives partial results when individual requests fail. asyncio.TaskGroup (Python 3.11+) gives all-or-nothing semantics with automatic cancellation of sibling tasks on failure. The Python standard library documentation notes that TaskGroup provides stronger safety guarantees than gather (source: Python asyncio docs). Pick based on whether your use case tolerates partial results.
  10. Use event hooks for structured observability: The httpx event_hooks parameter accepts request and response callbacks that fire on every request without modifying business logic. Use them to log timing, status codes, and request metadata at the client level (source: httpx Event Hooks documentation).
  11. Handle token expiry inside the client, not the caller: For APIs that use short-lived credentials, build a token-refresh mechanism with an asyncio.Lock to prevent concurrent refresh storms. Refresh early (before expiry) and force-refresh on 401 responses.
  12. Add a circuit breaker for sustained failures: Retries handle transient errors within a single request. A circuit breaker tracks failure rates across many requests over time and stops calling an unhealthy service entirely, preventing resource exhaustion across your application. Retries operate within seconds; the circuit breaker operates over minutes.
  13. Test the client with transport-level mocking: Libraries like respx intercept httpx requests at the transport layer, letting you verify retry logic, error handling, and header construction without hitting a real server.

A well-built async client class is one of the highest-leverage patterns in Python async programming. It turns scattered, repetitive HTTP calls into a clean, testable, and resilient interface that encapsulates everything from kernel-level connection reuse to application-level circuit breaking. Build it once, tune the configuration for your API, and every part of your codebase benefits from connection pooling, automatic retries, structured concurrency, and centralized error handling without thinking about it.