How to Implement API Rate Limiting in Python Using the Token Bucket Algorithm

Every API you build or consume has a breaking point. Without rate limiting, a single misconfigured client, a retry loop gone wrong, or a deliberate attack can overwhelm your server and take it offline. The token bucket algorithm is one of the most effective and widely adopted solutions to this problem, and Python makes it straightforward to implement from scratch.

This article walks through the token bucket algorithm from concept to working code. You will build an in-memory rate limiter class, make it thread-safe, extend it to use Redis for distributed environments, and integrate it into a FastAPI application. By the end, you will have a reusable rate limiting module that you can drop into any Python project.

What Is the Token Bucket Algorithm and Why It Works

The token bucket algorithm models rate limiting using a simple metaphor. Imagine a bucket that can hold a fixed number of tokens. Tokens are added to the bucket at a steady rate -- for example, 10 tokens per second. Every time a request arrives, it must remove a token from the bucket before it can proceed. If the bucket is empty, the request is denied.

This approach gives you two important properties at once. First, it enforces a long-term average rate. If you add 10 tokens per second, then over any sufficiently long period, only 10 requests per second will be allowed through. Second, it permits controlled bursts. If the bucket has been sitting idle and has accumulated its full capacity of tokens, a sudden burst of requests can all be served immediately -- up to the bucket's capacity -- before the rate limit kicks in.

Note

The token bucket algorithm traces its origins to network traffic shaping (RFC 1633). It was originally designed to control bandwidth on network links, but the same principles apply perfectly to API request throttling.

This is what makes the token bucket different from a simple fixed window counter. A fixed window approach divides time into discrete intervals and counts requests per interval. The problem is that a client can send the maximum number of requests at the end of one window and again at the start of the next, effectively doubling the allowed rate for a brief period. The token bucket avoids this because tokens are replenished gradually, not all at once.

Building a Token Bucket Rate Limiter in Python

The core implementation requires only a few attributes: the bucket's maximum capacity, the rate at which tokens refill, the current number of available tokens, and a timestamp for the last refill. Here is a basic version:

import time
import threading
from dataclasses import dataclass, field
from typing import Tuple


@dataclass
class TokenBucket:
    """Thread-safe token bucket rate limiter."""
    capacity: float
    refill_rate: float  # tokens added per second
    _tokens: float = field(init=False)
    _last_refill: float = field(init=False)
    _lock: threading.Lock = field(init=False, repr=False)

    def __post_init__(self):
        self._tokens = self.capacity
        self._last_refill = time.monotonic()
        self._lock = threading.Lock()

    def _refill(self) -> None:
        """Add tokens based on elapsed time."""
        now = time.monotonic()
        elapsed = now - self._last_refill
        tokens_to_add = elapsed * self.refill_rate
        self._tokens = min(self.capacity, self._tokens + tokens_to_add)
        self._last_refill = now

    def consume(self, tokens: float = 1.0) -> Tuple[bool, float]:
        """
        Attempt to consume tokens.
        Returns (allowed, wait_time).
        """
        with self._lock:
            self._refill()
            if self._tokens >= tokens:
                self._tokens -= tokens
                return True, 0.0
            wait_time = (tokens - self._tokens) / self.refill_rate
            return False, wait_time

There are several design choices worth noting in this implementation. The class uses time.monotonic() instead of time.time(). This is important because monotonic() is guaranteed to never go backward, even if the system clock is adjusted. Using time.time() could cause incorrect token calculations if the clock is set back by NTP synchronization or a manual change.

The _refill() method is called lazily -- it only recalculates the token count when someone tries to consume. This means the rate limiter does not need a background timer thread to periodically add tokens. It simply calculates how many tokens should have been added since the last check.

Pro Tip

The consume() method returns both a boolean and a wait time. This lets calling code decide whether to reject the request outright with a 429 status or to queue it and retry after the wait period.

The threading.Lock() makes this safe for multithreaded applications. Without the lock, two threads could both read the token count, both see sufficient tokens, and both proceed -- allowing more requests through than the rate limit permits. This kind of race condition is easy to miss in testing but causes real problems under load.

Managing Multiple Clients

A single bucket is rarely enough. In production, you need a separate bucket for each client, identified by IP address, API key, or user ID. Here is a manager class that handles this:

from typing import Dict


class RateLimiter:
    """Manages per-client token buckets."""

    def __init__(
        self,
        capacity: float = 100,
        refill_rate: float = 10,
        cleanup_interval: int = 300,
    ):
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.cleanup_interval = cleanup_interval
        self._buckets: Dict[str, TokenBucket] = {}
        self._global_lock = threading.Lock()
        self._last_cleanup = time.monotonic()

    def is_allowed(self, client_id: str, tokens: float = 1.0) -> Tuple[bool, float]:
        """Check if a request from client_id is allowed."""
        bucket = self._get_bucket(client_id)
        return bucket.consume(tokens)

    def _get_bucket(self, client_id: str) -> TokenBucket:
        """Get or create a bucket for a client."""
        with self._global_lock:
            self._maybe_cleanup()
            if client_id not in self._buckets:
                self._buckets[client_id] = TokenBucket(
                    capacity=self.capacity,
                    refill_rate=self.refill_rate,
                )
            return self._buckets[client_id]

    def _maybe_cleanup(self) -> None:
        """Remove stale buckets to prevent memory leaks."""
        now = time.monotonic()
        if now - self._last_cleanup < self.cleanup_interval:
            return
        self._last_cleanup = now
        stale_keys = [
            key for key, bucket in self._buckets.items()
            if bucket._tokens >= bucket.capacity
        ]
        for key in stale_keys:
            del self._buckets[key]

The _maybe_cleanup() method is critical for long-running services. Without it, every unique client ID creates a bucket that lives in memory forever. The cleanup removes buckets that have refilled to capacity, meaning those clients have not made requests recently. In a production application that serves thousands of distinct clients, this prevents gradual memory growth.

Scaling with Redis for Distributed Rate Limiting

The in-memory approach works well for single-process applications, but it falls apart when you have multiple server instances behind a load balancer. Each instance maintains its own set of buckets, so a client could effectively multiply their rate limit by the number of instances. Redis solves this by providing a shared state store that all instances can read from and write to.

The key challenge with Redis is atomicity. You need to check the token count and decrement it in a single operation. If you do a separate read and write, two instances could both read the same token count and both allow a request through. Lua scripts executed within Redis guarantee atomic operations:

import redis
import time
from typing import Tuple


class RedisTokenBucket:
    """Distributed token bucket using Redis and Lua scripting."""

    LUA_SCRIPT = """
    local key = KEYS[1]
    local capacity = tonumber(ARGV[1])
    local refill_rate = tonumber(ARGV[2])
    local now = tonumber(ARGV[3])
    local requested = tonumber(ARGV[4])

    local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
    local tokens = tonumber(bucket[1])
    local last_refill = tonumber(bucket[2])

    if tokens == nil then
        tokens = capacity
        last_refill = now
    end

    local elapsed = now - last_refill
    tokens = math.min(capacity, tokens + elapsed * refill_rate)

    local allowed = 0
    local wait_time = 0

    if tokens >= requested then
        tokens = tokens - requested
        allowed = 1
    else
        wait_time = (requested - tokens) / refill_rate
    end

    redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
    redis.call('EXPIRE', key, math.ceil(capacity / refill_rate) + 10)

    return {allowed, tostring(wait_time)}
    """

    def __init__(
        self,
        redis_url: str = "redis://localhost:6379",
        capacity: float = 100,
        refill_rate: float = 10,
    ):
        self.redis = redis.from_url(redis_url, decode_responses=True)
        self.capacity = capacity
        self.refill_rate = refill_rate
        self._script = self.redis.register_script(self.LUA_SCRIPT)

    def consume(self, key: str, tokens: float = 1.0) -> Tuple[bool, float]:
        """Atomically consume tokens from a distributed bucket."""
        result = self._script(
            keys=[f"ratelimit:{key}"],
            args=[self.capacity, self.refill_rate, time.time(), tokens],
        )
        allowed = bool(int(result[0]))
        wait_time = float(result[1])
        return allowed, wait_time
Warning

The Redis implementation uses time.time() instead of time.monotonic(). This is intentional -- multiple server instances need to agree on the current time, and monotonic() is only meaningful within a single process. Make sure your servers use NTP synchronization to keep their clocks aligned.

The EXPIRE command at the end of the Lua script is an important detail. It sets a TTL (time-to-live) on the Redis key so that inactive clients do not leave stale data in Redis indefinitely. The TTL is calculated as the time it would take to fully refill the bucket, plus a small buffer.

Integrating the Rate Limiter with FastAPI

With the rate limiter built, the next step is wiring it into a web framework. FastAPI's dependency injection system makes this clean and testable. You can create a dependency that checks the rate limit before the endpoint function ever runs:

from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.responses import JSONResponse


app = FastAPI()
limiter = RateLimiter(capacity=20, refill_rate=5)


def get_client_id(request: Request) -> str:
    """Extract a client identifier from the request."""
    forwarded = request.headers.get("X-Forwarded-For")
    if forwarded:
        return forwarded.split(",")[0].strip()
    return request.client.host


async def check_rate_limit(request: Request):
    """FastAPI dependency that enforces rate limiting."""
    client_id = get_client_id(request)
    allowed, wait_time = limiter.is_allowed(client_id)

    if not allowed:
        raise HTTPException(
            status_code=429,
            detail={
                "error": "Rate limit exceeded",
                "retry_after": round(wait_time, 2),
            },
            headers={"Retry-After": str(int(wait_time) + 1)},
        )


@app.get("/api/data", dependencies=[Depends(check_rate_limit)])
async def get_data():
    return {"message": "Request successful"}


@app.get("/api/heavy", dependencies=[Depends(check_rate_limit)])
async def heavy_operation():
    return {"message": "Heavy operation complete"}

The get_client_id() function checks the X-Forwarded-For header first because if your API sits behind a reverse proxy or load balancer, request.client.host will return the proxy's IP address rather than the actual client's IP. You should always use the first address in the X-Forwarded-For chain, which represents the original client.

The Retry-After header in the 429 response tells well-behaved clients exactly how long to wait before retrying. This is part of the HTTP specification (RFC 6585) and helps clients implement backoff strategies without guessing.

Pro Tip

For production deployments, consider adding X-RateLimit-Limit, X-RateLimit-Remaining, and X-RateLimit-Reset headers to every response -- not just 429 responses. This gives clients visibility into their current usage and helps them self-regulate before hitting the limit.

Comparing Rate Limiting Algorithms

The token bucket is not the only option. Choosing the right algorithm depends on your specific requirements around burst tolerance, accuracy, and implementation complexity. Here is how the common approaches compare:

Algorithm Burst Handling Accuracy Complexity Best For
Token Bucket Allows controlled bursts up to bucket capacity Good long-term average enforcement Low APIs where short bursts are acceptable
Leaky Bucket No bursts -- processes at a fixed constant rate Very consistent output rate Low Smoothing bursty traffic into a steady stream
Fixed Window Allows double-rate bursts at window edges Imprecise at boundary crossings Very Low Simple counters where precision is not critical
Sliding Window Minimal bursts, smooth enforcement High precision across time boundaries Medium Strict rate enforcement without boundary issues

For the majority of API rate limiting use cases, the token bucket hits the right balance. It is simple to implement, performs well, uses minimal memory (just a counter and a timestamp per client), and handles real-world traffic patterns where legitimate users send short bursts of requests -- like loading a dashboard that fires several API calls at once.

The leaky bucket is the better choice when you need a perfectly smooth output rate, such as when you are feeding data to a downstream service that cannot handle any bursts at all. The sliding window counter is ideal when you need strict precision and cannot tolerate the boundary-crossing issues that come with fixed windows.

Existing Python Libraries

If you prefer not to build from scratch, several mature Python libraries implement these algorithms. The pyrate-limiter package supports the leaky bucket algorithm with both in-memory and Redis backends, along with SQLite for persistence. The requests-ratelimiter library wraps pyrate-limiter and integrates directly with the requests library as a session or transport adapter. For async workloads, aiolimiter provides an asyncio-native leaky bucket, and throttled-py offers both synchronous and asynchronous support with multiple algorithm choices including fixed window, sliding window, token bucket, leaky bucket, and GCRA.

Incoming Requests TOKEN BUCKET capacity: 20 refill: 5/sec tokens: 14 +5 tokens/sec tokens >= 1? YES 200 OK Process NO 429 Rejected Retry-After: 2s
Token bucket request flow -- tokens refill at a constant rate, each request consumes one token, and empty buckets trigger a 429 response with a Retry-After header.

Key Takeaways

  1. The token bucket balances burst tolerance with steady-rate enforcement: Unlike fixed window counters that reset abruptly, the token bucket refills gradually. This prevents the boundary-crossing problem while still allowing legitimate short bursts of traffic.
  2. Thread safety requires explicit locking: Wrap your token check-and-decrement logic in a threading.Lock() to prevent race conditions. Without it, concurrent requests can exceed the intended rate limit.
  3. Use Redis and Lua scripts for distributed deployments: In-memory buckets only work within a single process. When you have multiple server instances, Redis provides shared state and Lua scripts ensure atomic check-and-decrement operations.
  4. Always include rate limit headers in your API responses: The Retry-After, X-RateLimit-Limit, X-RateLimit-Remaining, and X-RateLimit-Reset headers help clients self-regulate and avoid hitting your limits in the first place.
  5. Clean up stale buckets to prevent memory leaks: Every unique client ID creates a bucket. Without periodic cleanup of idle buckets, a long-running service will gradually consume more and more memory.

Rate limiting is one of those features that seems optional until something goes wrong. A runaway script, a bot swarm, or a sudden traffic spike can take down an unprotected API in seconds. The token bucket algorithm gives you a robust, well-understood defense that is straightforward to implement in Python, whether you are building a small personal project or a distributed service handling thousands of requests per second.

back to articles