Python Asyncio Rate Limiting: How to Throttle Concurrent API Requests Without Blocking

Asyncio lets you fire hundreds of HTTP requests concurrently -- and that is exactly the problem. Without throttling, your code will blast through an API's rate limit in seconds, trigger a storm of 429 errors, and potentially get your key revoked. The challenge is controlling the pace of requests while keeping everything non-blocking.

This article covers two distinct but related problems: concurrency limiting (how many requests are active at once) and rate limiting (how many requests are made per second). You will implement both using standard library tools, build a custom async token bucket, and then see how libraries like aiolimiter, aiometer, asynciolimiter, and pyrate-limiter simplify the process. Beyond the basics, you will also see how worker pools, circuit breakers, and connection-level limits round out a production throttling stack.

Concurrency Limiting vs. Rate Limiting: Two Separate Problems

These two concepts are easy to conflate, but they solve different issues. Concurrency limiting caps how many requests are in flight at the same time. If your semaphore is set to 10, at most 10 requests can be awaiting a server response simultaneously. This protects your machine's resources -- file descriptors, memory, and connection pool capacity -- and prevents you from overwhelming the target server with too many open connections.

Rate limiting caps how many requests are made per unit of time. An API might allow 100 requests per minute. With a concurrency limit of 10 and fast response times, your code could easily burn through those 100 requests in a few seconds. The concurrency limit does not slow down the throughput -- it just caps the parallelism.

In practice, you usually need both. The semaphore keeps your connection count sane, and the rate limiter keeps your request frequency within the API's quota.

Note

Python's asyncio is single-threaded. Unlike threaded code, you do not need locks for shared state -- only one coroutine runs at a time, and context switches happen only at await points. This makes implementing rate limiters much simpler because there are no race conditions between await calls.

Controlling Concurrency with asyncio.Semaphore

A semaphore is the standard tool for limiting how many coroutines can enter a section of code at once. It maintains an internal counter that decrements when acquired and increments when released. When the counter hits zero, additional coroutines wait until one of the active ones finishes.

import asyncio
import httpx


async def fetch(
    client: httpx.AsyncClient,
    url: str,
    semaphore: asyncio.Semaphore,
) -> dict:
    """Fetch a URL with concurrency limiting."""
    async with semaphore:
        response = await client.get(url)
        response.raise_for_status()
        return response.json()


async def fetch_all(urls: list[str], max_concurrent: int = 10) -> list[dict]:
    """
    Fetch multiple URLs with bounded concurrency.

    At most max_concurrent requests will be in flight at
    any given moment, regardless of how many URLs are provided.
    """
    semaphore = asyncio.Semaphore(max_concurrent)

    async with httpx.AsyncClient() as client:
        tasks = [fetch(client, url, semaphore) for url in urls]
        return await asyncio.gather(*tasks)

The async with semaphore pattern ensures the semaphore is always released, even if the request raises an exception. Without it, an unhandled error inside the critical section would permanently consume one of the semaphore's slots, gradually reducing your available concurrency until all slots are stuck.

Warning

Do not use time.sleep() inside an async function. It blocks the entire event loop, freezing all other coroutines. Always use await asyncio.sleep() for delays in async code.

For Python 3.11 and later, consider using asyncio.TaskGroup instead of asyncio.gather(). TaskGroup provides structured concurrency -- if any task raises an exception, all other tasks in the group are cancelled automatically, preventing orphaned tasks from consuming resources.

async def fetch_all_taskgroup(
    urls: list[str], max_concurrent: int = 10
) -> list[dict]:
    """Fetch with TaskGroup for structured concurrency (Python 3.11+)."""
    semaphore = asyncio.Semaphore(max_concurrent)

    async with httpx.AsyncClient() as client:
        async with asyncio.TaskGroup() as tg:
            tasks = [
                tg.create_task(fetch(client, url, semaphore))
                for url in urls
            ]

    return [task.result() for task in tasks]

Building an Async Token Bucket Rate Limiter

A semaphore limits parallelism but not throughput. To enforce a requests-per-second ceiling, you need a rate limiter. The token bucket algorithm adapts naturally to async code using asyncio.Queue as the bucket:

import asyncio
import time


class AsyncTokenBucket:
    """
    Async rate limiter using the token bucket algorithm.

    Tokens are consumed before each request. If the bucket
    is empty, the coroutine awaits until a token is available.
    No threads, no blocking -- purely event-loop driven.
    """

    def __init__(self, rate: float, capacity: int = None):
        """
        Args:
            rate: Tokens added per second (e.g., 10 = 10 requests/sec)
            capacity: Max burst size. Defaults to rate.
        """
        self.rate = rate
        self.capacity = capacity or int(rate)
        self._tokens = float(self.capacity)
        self._last_refill = time.monotonic()
        self._lock = asyncio.Lock()

    async def acquire(self) -> None:
        """Wait until a token is available, then consume it."""
        while True:
            async with self._lock:
                self._refill()
                if self._tokens >= 1:
                    self._tokens -= 1
                    return

            # No token available -- sleep briefly and try again
            await asyncio.sleep(1.0 / self.rate)

    def _refill(self) -> None:
        """Add tokens based on elapsed time."""
        now = time.monotonic()
        elapsed = now - self._last_refill
        self._tokens = min(
            self.capacity, self._tokens + elapsed * self.rate
        )
        self._last_refill = now

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, *exc):
        pass

The asyncio.Lock() here prevents a subtle issue. Even though asyncio is single-threaded, multiple coroutines can interleave at await points. Without the lock, two coroutines could both call _refill(), both see sufficient tokens, and both decrement -- allowing more requests through than intended. The lock ensures only one coroutine checks and modifies the token count at a time.

The class implements the async context manager protocol (__aenter__ and __aexit__), so you can use it with async with:

async def fetch_rate_limited(
    client: httpx.AsyncClient,
    url: str,
    limiter: AsyncTokenBucket,
) -> dict:
    """Fetch a URL with rate limiting."""
    async with limiter:
        response = await client.get(url)
        response.raise_for_status()
        return response.json()

Combining Both: Semaphore + Rate Limiter

The real-world pattern combines a semaphore for concurrency control with a token bucket for rate control. The semaphore prevents you from opening too many connections, and the rate limiter prevents you from exceeding the API's requests-per-second quota:

async def fetch_throttled(
    client: httpx.AsyncClient,
    url: str,
    semaphore: asyncio.Semaphore,
    limiter: AsyncTokenBucket,
) -> dict:
    """Fetch with both concurrency and rate limiting."""
    async with semaphore:
        async with limiter:
            response = await client.get(url)
            response.raise_for_status()
            return response.json()


async def process_urls(urls: list[str]) -> list[dict]:
    """
    Process many URLs with controlled concurrency and rate.

    max_concurrent=20 keeps connection count manageable.
    rate=10 stays under a 10 requests/second API limit.
    """
    semaphore = asyncio.Semaphore(20)
    limiter = AsyncTokenBucket(rate=10, capacity=10)

    async with httpx.AsyncClient() as client:
        tasks = [
            fetch_throttled(client, url, semaphore, limiter)
            for url in urls
        ]
        return await asyncio.gather(*tasks, return_exceptions=True)
Pro Tip

Pass return_exceptions=True to asyncio.gather() when processing large batches. Without it, a single failed request cancels the entire batch. With it, failed requests return their exception object in the results list, and the rest of the batch completes normally.

The order of the context managers matters. Acquiring the semaphore first ensures that coroutines waiting for rate limit tokens do not consume a semaphore slot while they wait. If you reversed the order -- rate limiter first, then semaphore -- coroutines waiting for the rate limiter would still hold a semaphore slot, reducing your effective concurrency.

Using aiolimiter and aiometer

If you prefer a battle-tested library over a custom implementation, two packages stand out for async rate limiting in Python.

aiolimiter: Leaky Bucket for asyncio

aiolimiter implements the leaky bucket algorithm with a clean async context manager interface. It requires Python 3.9 or newer and has no dependencies beyond asyncio:

from aiolimiter import AsyncLimiter
import httpx


# Allow 10 requests per second
rate_limit = AsyncLimiter(max_rate=10, time_period=1)

# Or: 100 requests per 60 seconds
# rate_limit = AsyncLimiter(max_rate=100, time_period=60)


async def fetch(client: httpx.AsyncClient, url: str) -> dict:
    async with rate_limit:
        response = await client.get(url)
        return response.json()


async def main():
    urls = [f"https://api.example.com/items/{i}" for i in range(200)]

    async with httpx.AsyncClient() as client:
        tasks = [fetch(client, url) for url in urls]
        results = await asyncio.gather(*tasks)
    return results

The max_rate and time_period parameters give you flexible control. Setting AsyncLimiter(100, 60) allows 100 requests within any 60-second window, with bursts allowed up to the max_rate capacity.

aiometer: Combined Concurrency + Rate Limiting

aiometer solves both problems in a single function call. It provides run_on_each and run_all functions that accept max_at_once (concurrency) and max_per_second (rate) parameters:

import aiometer
import httpx
import functools


async def fetch_item(client: httpx.AsyncClient, item_id: int) -> dict:
    response = await client.get(f"https://api.example.com/items/{item_id}")
    response.raise_for_status()
    return response.json()


async def main():
    async with httpx.AsyncClient() as client:
        results = await aiometer.run_on_each(
            functools.partial(fetch_item, client),
            range(500),
            max_at_once=20,       # concurrency limit
            max_per_second=10,    # rate limit
        )
    return results

aiometer.run_on_each takes a callable, an iterable of arguments, and the two limit parameters. It handles all the scheduling internally -- no semaphores or custom rate limiters to wire up. For workloads where you need to process a batch of items with controlled throughput, this is the most concise option available.

Comparing Async Throttling Approaches

Approach Concurrency Control Rate Control Dependencies Best For
asyncio.Semaphore Yes -- caps in-flight requests No -- does not limit requests/sec Standard library Simple concurrency caps with no rate requirement
Custom Token Bucket No (pair with semaphore) Yes -- precise requests/sec Standard library Full control over algorithm, no third-party packages
aiolimiter No (pair with semaphore) Yes -- leaky bucket aiolimiter Clean rate limiting with burst support
aiometer Yes -- max_at_once Yes -- max_per_second aiometer Batch processing with both limits in one call
asyncio-throttle No (pair with semaphore) Yes -- simple rate/period asyncio-throttle Lightweight throttling with minimal API surface
asynciolimiter No (pair with semaphore) Yes -- three modes: Limiter, StrictLimiter, LeakyBucketLimiter asynciolimiter Choosing between burst-tolerant, strict, or leaky bucket behavior per endpoint
pyrate-limiter No (pair with semaphore) Yes -- multi-tier limits with Redis/SQLite backends pyrate-limiter APIs with layered quotas (per-second, per-minute, per-day) and persistence across restarts
asyncio.Queue worker pool Yes -- fixed worker count Indirectly -- workers control throughput Standard library Long-running consumers processing a continuous stream of tasks

For small scripts or one-off tasks, asyncio.Semaphore combined with a custom token bucket keeps your dependencies at zero. For production applications processing large batches, aiometer provides the most complete solution with the least boilerplate. And for services that need fine-grained rate control as a standalone component, aiolimiter slots in cleanly alongside whatever concurrency strategy you already have.

When an API enforces multiple overlapping quotas -- for example, 10 requests per second and 500 per hour and 5,000 per day -- pyrate-limiter handles this natively by stacking Rate objects against a single bucket. It also supports Redis and SQLite backends, so your rate limit state survives process restarts and can be shared across multiple workers or containers. This matters in production deployments where a single in-memory counter is not enough.

The asynciolimiter package (distinct from aiolimiter) offers three limiter classes that target different traffic profiles. The default Limiter compensates for CPU-bound delays by allowing catch-up bursts. StrictLimiter enforces a hard ceiling with no burst compensation, which suits APIs that reject any excess within a given second. LeakyBucketLimiter allows an initial burst up to a configurable capacity and then drains at a fixed rate, making it a good fit for endpoints that tolerate short spikes but throttle sustained traffic.

Worker Pool with asyncio.Queue

The semaphore-per-request model works well for batch processing, but for long-running services that consume tasks from a stream -- a message queue, a WebSocket feed, or a paginated API -- a fixed pool of worker coroutines pulling from an asyncio.Queue is often a better architecture. The queue decouples task production from consumption, and the number of workers acts as a natural concurrency limit:

import asyncio
import httpx


async def worker(
    name: str,
    queue: asyncio.Queue,
    client: httpx.AsyncClient,
    results: list,
) -> None:
    """Pull URLs from the queue and fetch them."""
    while True:
        url = await queue.get()
        try:
            response = await client.get(url)
            results.append(response.json())
        except httpx.HTTPError as e:
            results.append({"error": str(e), "url": url})
        finally:
            queue.task_done()


async def process_stream(urls: list[str], num_workers: int = 5) -> list:
    """Process URLs through a fixed-size worker pool."""
    queue: asyncio.Queue = asyncio.Queue()
    results: list = []

    for url in urls:
        await queue.put(url)

    async with httpx.AsyncClient() as client:
        workers = [
            asyncio.create_task(worker(f"w-{i}", queue, client, results))
            for i in range(num_workers)
        ]

        await queue.join()  # Block until all items are processed

        for w in workers:
            w.cancel()

    return results

The key advantage over the semaphore pattern is backpressure. If the queue fills up (you can set a maxsize), producers are forced to wait before adding more work. This prevents unbounded memory growth when tasks arrive faster than workers can process them -- a problem that asyncio.gather() with thousands of tasks does not solve on its own.

Circuit Breakers: Stopping Before You Retry

Retry logic with exponential backoff handles transient 429 errors, but what happens when an API goes down entirely, or starts returning errors on every request? Without a circuit breaker, your retry logic will dutifully keep hammering a broken endpoint, wasting time and quota on requests that have no chance of succeeding.

A circuit breaker tracks recent failure rates and trips open after a threshold is crossed. While open, it short-circuits all requests immediately -- no network call, no wasted quota, no delay. After a cooldown period, it allows a single probe request through. If that succeeds, it closes and resumes normal traffic. Libraries like aiobreaker and purgatory implement this pattern for asyncio:

from aiobreaker import CircuitBreaker
from datetime import timedelta
import httpx

# Open after 5 consecutive failures, retry after 30 seconds
api_breaker = CircuitBreaker(
    fail_max=5,
    reset_timeout=timedelta(seconds=30),
)


@api_breaker
async def fetch_with_breaker(
    client: httpx.AsyncClient, url: str
) -> dict:
    response = await client.get(url)
    response.raise_for_status()
    return response.json()

In a production throttling stack, the circuit breaker sits outside your retry logic. A request first passes through the circuit breaker -- if it is open, the request fails immediately with a CircuitBreakerError. If it is closed, the request proceeds to the semaphore, then the rate limiter, then the network call. If that call fails and triggers a retry, the retry loop operates inside the circuit breaker's monitoring. This layering prevents your retry logic from running endlessly against an API that is not going to recover within your backoff window.

Connection-Level Throttling with httpx.Limits

All of the patterns above operate at the application level -- your code decides when to send requests. But httpx.AsyncClient also supports connection-level limits through its limits parameter, which controls the underlying connection pool:

import httpx

client = httpx.AsyncClient(
    limits=httpx.Limits(
        max_connections=20,         # total connection pool size
        max_keepalive_connections=10,  # idle connections to keep
    ),
    timeout=httpx.Timeout(10.0, connect=5.0),
)

This is not a substitute for a semaphore or rate limiter, but it adds a safety floor at the transport layer. If a bug in your throttling logic allows too many coroutines through, the connection pool limit prevents your process from opening hundreds of TCP connections and exhausting file descriptors. Treat it as the outermost guard in a defense-in-depth strategy: connection limits at the transport layer, a semaphore at the application layer, a rate limiter for API compliance, and a circuit breaker for failure isolation.

Handling 429 Errors: Retry Logic with Exponential Backoff

Rate limiting reduces the chance of hitting an API's ceiling, but it does not eliminate it. Shared rate limits, clock drift between your client and the server, or sudden quota changes can all cause a 429 Too Many Requests response even when your throttling looks correct. Your code needs a plan for when prevention fails.

The standard approach is exponential backoff with jitter. Instead of retrying immediately -- which just burns through more of your quota -- you wait progressively longer between each attempt. Jitter adds a random offset to that wait time, preventing multiple coroutines from retrying in lockstep and creating another burst.

import asyncio
import random
import httpx


async def fetch_with_retry(
    client: httpx.AsyncClient,
    url: str,
    max_retries: int = 5,
    base_delay: float = 1.0,
) -> httpx.Response:
    """
    Fetch a URL with exponential backoff on 429 responses.

    Respects the Retry-After header when the server provides one.
    Falls back to exponential backoff with jitter otherwise.
    """
    for attempt in range(max_retries):
        response = await client.get(url)

        if response.status_code != 429:
            response.raise_for_status()
            return response

        # Server says how long to wait -- respect it
        retry_after = response.headers.get("Retry-After")
        if retry_after:
            delay = float(retry_after)
        else:
            # Exponential backoff: 1s, 2s, 4s, 8s, 16s ...
            delay = base_delay * (2 ** attempt)

        # Add jitter: +/- 25% to avoid thundering herd
        jitter = delay * 0.25 * (2 * random.random() - 1)
        await asyncio.sleep(delay + jitter)

    raise httpx.HTTPStatusError(
        f"Rate limited after {max_retries} retries",
        request=response.request,
        response=response,
    )

The Retry-After header is the key detail here. When a server sends one, it is telling you the exact cooldown period. Ignoring it and substituting your own delay is both wasteful and disrespectful -- you might wait too long, or not long enough. The header value is typically an integer representing seconds, though it can also be an HTTP-date string.

Warning

Retrying a 429 immediately -- without any delay -- still counts against your quota on many APIs. Doing so repeatedly can escalate to a longer ban or permanent key revocation. Always wait before retrying, and always cap the total number of retry attempts.

For production systems, the tenacity library provides a robust retry framework that supports asyncio natively. It handles exponential backoff, jitter, conditional retries based on exception type or return value, and configurable stop conditions -- all without requiring you to write the retry loop yourself. Its decorator-based API keeps retry logic separate from business logic:

from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential_jitter,
    retry_if_exception_type,
)
import httpx


class RateLimitedError(Exception):
    """Raised when the API returns a 429 status code."""
    pass


@retry(
    retry=retry_if_exception_type(RateLimitedError),
    wait=wait_exponential_jitter(initial=1, max=60),
    stop=stop_after_attempt(6),
)
async def fetch_item(client: httpx.AsyncClient, url: str) -> dict:
    response = await client.get(url)
    if response.status_code == 429:
        raise RateLimitedError(f"429 from {url}")
    response.raise_for_status()
    return response.json()

The wait_exponential_jitter strategy combines exponential growth with built-in randomization. The initial parameter sets the starting delay, and max caps how long any single retry wait can be. Combined with stop_after_attempt, this prevents infinite retry loops while giving the server ample time to recover.

Adaptive Rate Limiting with Response Headers

Static rate limits work well when you know the API's exact quota in advance. But many APIs communicate their limits dynamically through response headers, and the smartest clients adjust their behavior based on that feedback in real time.

The three headers to watch for are X-RateLimit-Limit (the total allowed requests in the current window), X-RateLimit-Remaining (how many you have left), and X-RateLimit-Reset (when the window resets, usually as a Unix timestamp). Not every API uses these exact names -- some use RateLimit-Limit, RateLimit-Remaining, and RateLimit-Reset following the IETF draft standard -- but the concept is the same.

import asyncio
import time
import httpx


class AdaptiveRateLimiter:
    """
    Adjusts request pacing based on rate limit headers.

    Reads X-RateLimit-Remaining and X-RateLimit-Reset from
    each response and slows down as the quota drains.
    """

    def __init__(self, default_rate: float = 10.0):
        self._default_rate = default_rate
        self._remaining: int | None = None
        self._reset_at: float | None = None
        self._lock = asyncio.Lock()

    def update(self, headers: httpx.Headers) -> None:
        """Parse rate limit headers from a response."""
        remaining = headers.get("X-RateLimit-Remaining")
        reset = headers.get("X-RateLimit-Reset")

        if remaining is not None:
            self._remaining = int(remaining)
        if reset is not None:
            self._reset_at = float(reset)

    async def wait(self) -> None:
        """Pause if the remaining quota is running low."""
        async with self._lock:
            if self._remaining is None or self._reset_at is None:
                # No header data yet -- use default pacing
                await asyncio.sleep(1.0 / self._default_rate)
                return

            if self._remaining <= 0:
                # Quota exhausted -- wait until the window resets
                wait_time = self._reset_at - time.time()
                if wait_time > 0:
                    await asyncio.sleep(wait_time)
                return

            # Spread remaining requests evenly across the window
            time_left = self._reset_at - time.time()
            if time_left > 0 and self._remaining > 0:
                delay = time_left / self._remaining
                await asyncio.sleep(delay)

This limiter calculates a per-request delay by dividing the time remaining in the current window by the number of requests still allowed. When the quota is nearly exhausted, it automatically slows down. When a fresh window opens, it speeds back up. The result is a client that naturally pushes right up to the API's ceiling without going over.

Pro Tip

Combine adaptive limiting with a static rate limiter as a safety net. The adaptive limiter optimizes throughput based on live feedback, while the static limiter provides a hard floor that prevents accidental bursts if the adaptive logic encounters unexpected header values or missing headers.

Use it alongside the semaphore and retry logic from the earlier sections for a complete three-layer defense: a semaphore to cap connections, an adaptive limiter to pace requests based on server feedback, and exponential backoff to recover gracefully when a 429 still gets through.

Coroutines task 1 task 2 task 3 task 4 (waiting) task 5 (waiting) task N ... SEMAPHORE max_concurrent=3 slots: 3/3 used caps parallelism RATE LIMITER 10 req/sec tokens: 7/10 caps throughput API limit: 10/sec 200 OK
Two-layer throttling -- a semaphore limits how many coroutines can run concurrently, while a token bucket rate limiter controls how many requests per second reach the API.

Key Takeaways

  1. Concurrency and rate limiting solve different problems: A semaphore caps how many requests are in flight at once (protecting resources). A rate limiter caps how many requests are made per second (respecting API quotas). You usually need both.
  2. asyncio.Semaphore is the go-to for concurrency control: Use async with semaphore to gate entry into network-calling code. The context manager guarantees release even on exceptions.
  3. Never use time.sleep() in async code: It blocks the entire event loop. Always use await asyncio.sleep() for any delay in async functions.
  4. Token buckets adapt naturally to asyncio: The lazy-refill pattern works with asyncio.Lock() for safe state access and asyncio.sleep() for non-blocking waits.
  5. Match the library to the traffic profile: aiolimiter provides a clean leaky bucket for rate-per-second control. aiometer combines concurrency and rate limiting in one call. asynciolimiter offers three distinct modes (burst-tolerant, strict, leaky bucket). pyrate-limiter handles multi-tier quotas with persistent backends.
  6. Order your context managers correctly: Acquire the semaphore first, then the rate limiter. This prevents coroutines waiting for tokens from consuming semaphore slots.
  7. Always plan for 429 responses: Exponential backoff with jitter prevents retry storms. Respect the Retry-After header when the server provides one, and cap total retry attempts to avoid infinite loops.
  8. Read rate limit headers to adapt dynamically: Headers like X-RateLimit-Remaining and X-RateLimit-Reset let you pace requests based on live server feedback instead of static guesses, squeezing out maximum throughput without crossing the line.
  9. Use circuit breakers to stop retrying broken endpoints: A circuit breaker short-circuits requests after repeated failures, preventing your retry logic from wasting time and quota on an API that is not going to recover within your backoff window.
  10. Layer your defenses: A production throttling stack combines connection-level limits (httpx.Limits) at the transport layer, a semaphore or worker pool at the application layer, a rate limiter for API compliance, adaptive pacing from response headers, and a circuit breaker for failure isolation.

Async programming in Python makes it trivially easy to fire off hundreds of concurrent requests. The difficult part is not going fast -- it is going precisely as fast as you are allowed to. A well-tuned combination of concurrency control, rate limiting, adaptive pacing, and failure isolation lets you maximize your throughput right up to the API's ceiling without crossing it, keeping your application fast, reliable, and welcome on the servers it talks to.