Python Decorator Patterns for Logging, Caching, and Rate Limiting in Production Code

Decorators solve one of the oldest problems in software engineering: how to add cross-cutting behavior to functions without scattering duplicate code across an entire codebase. In production Python, three decorator patterns appear over and over again—structured logging, result caching, and rate limiting. Each one addresses a different operational concern, but all three follow the same underlying principle of wrapping a callable to inject behavior before, after, or around the original execution. This article builds each pattern from scratch with complete, annotated code you can drop into a real project.

Every decorator in this article uses functools.wraps to preserve the wrapped function's __name__, __doc__, __qualname__, and __annotations__. Skipping functools.wraps means your debugger, your logging output, your documentation generators, and your test runners all report the wrapper function's identity instead of the original. It costs one line to fix, and there is no good reason to omit it.

Before looking at domain-specific patterns, here is the minimal decorator skeleton that every example in this article builds on:

import functools

def skeleton(func):
    """Minimal decorator skeleton preserving metadata."""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        # pre-call logic
        result = func(*args, **kwargs)
        # post-call logic
        return result
    return wrapper

The *args, **kwargs signature ensures the wrapper accepts any combination of positional and keyword arguments, making it safe to apply to functions with varying parameter lists. The @functools.wraps(func) line copies the original function's metadata onto the wrapper so that introspection tools continue to see the correct name, docstring, and type annotations.

Structured Logging Decorators

A logging decorator captures function entry, exit, execution time, arguments, return values, and exceptions in a consistent format across every decorated function. Instead of manually adding logger.info() calls at the top and bottom of each function, the decorator handles it in one place.

Basic Logging Decorator with the Standard Library

The following decorator uses Python's built-in logging module and time.perf_counter for high-resolution timing:

import functools
import logging
import time

logger = logging.getLogger(__name__)

def log_calls(func):
    """Log function entry, exit, duration, and exceptions."""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        func_name = func.__qualname__
        logger.info(
            "call_start: %s | args=%r kwargs=%r",
            func_name, args, kwargs,
        )
        start = time.perf_counter()
        try:
            result = func(*args, **kwargs)
            elapsed = time.perf_counter() - start
            logger.info(
                "call_end: %s | duration=%.4fs result=%r",
                func_name, elapsed, result,
            )
            return result
        except Exception as exc:
            elapsed = time.perf_counter() - start
            logger.exception(
                "call_error: %s | duration=%.4fs error=%s",
                func_name, elapsed, exc,
            )
            raise
    return wrapper


@log_calls
def fetch_user(user_id: int) -> dict:
    """Retrieve a user record by ID."""
    # simulate database lookup
    time.sleep(0.05)
    return {"id": user_id, "name": "Ada Lovelace"}

When fetch_user(42) runs, the logger emits two entries: one at function entry with the arguments, and one at exit with the return value and elapsed wall-clock time. If the function raises, logger.exception captures the full traceback alongside the timing data. This gives operators a single, consistent log trail for every instrumented function without touching the function's own logic.

Parameterized Logging Decorator

A fixed logging decorator works until you need to control the log level or exclude sensitive arguments from the output. A parameterized decorator—sometimes called a decorator factory—solves this by adding an outer function that accepts configuration:

import functools
import logging
import time

logger = logging.getLogger(__name__)

def log_calls(level=logging.INFO, exclude_args=None):
    """Decorator factory for configurable function call logging.

    Args:
        level: Logging level for entry/exit messages.
        exclude_args: Set of argument names to redact from log output.
    """
    exclude = exclude_args or set()

    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            import inspect
            sig = inspect.signature(func)
            bound = sig.bind(*args, **kwargs)
            bound.apply_defaults()

            safe_args = {
                k: "***REDACTED***" if k in exclude else v
                for k, v in bound.arguments.items()
            }

            func_name = func.__qualname__
            logger.log(level, "call_start: %s | %s", func_name, safe_args)
            start = time.perf_counter()
            try:
                result = func(*args, **kwargs)
                elapsed = time.perf_counter() - start
                logger.log(
                    level,
                    "call_end: %s | duration=%.4fs",
                    func_name, elapsed,
                )
                return result
            except Exception as exc:
                elapsed = time.perf_counter() - start
                logger.exception(
                    "call_error: %s | duration=%.4fs error=%s",
                    func_name, elapsed, exc,
                )
                raise
        return wrapper
    return decorator


@log_calls(level=logging.DEBUG, exclude_args={"password"})
def authenticate(username: str, password: str) -> bool:
    """Validate user credentials against the auth backend."""
    return username == "admin" and password == "s3cret"

Calling authenticate("admin", "s3cret") produces a log entry where the password field appears as ***REDACTED*** instead of the plaintext value. The inspect.signature call binds positional and keyword arguments to their parameter names so you can filter by name regardless of how the caller passed them in.

Pro Tip

In production systems that emit JSON logs, consider replacing logging with structlog. Structlog's bound loggers let you attach context variables (request IDs, user IDs, trace spans) that automatically appear in every log entry emitted during a request lifecycle, including entries from your logging decorator. The library has been production-stable since 2013 and supports asyncio, context variables, and full type hint coverage as of version 25.x.

Caching Decorators with TTL

Caching decorators store the return value of a function call and serve that stored value on subsequent calls with the same arguments. This eliminates redundant computation or I/O for expensive operations like database queries, API requests, or CPU-intensive calculations.

Using functools.lru_cache

Python's standard library includes functools.lru_cache, a decorator that maintains a Least Recently Used cache with a configurable maximum size. It requires all function arguments to be hashable because it stores them as dictionary keys internally:

import functools

@functools.lru_cache(maxsize=256)
def fibonacci(n: int) -> int:
    """Compute the nth Fibonacci number with memoization."""
    if n < 2:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)

# After multiple calls, inspect cache performance
print(fibonacci(100))
print(fibonacci.cache_info())
# CacheInfo(hits=98, misses=101, maxsize=256, currsize=101)

The cache_info() method returns a named tuple with hits, misses, maxsize, and currsize, which gives you direct visibility into cache effectiveness. The cache_clear() method flushes all entries, and the __wrapped__ attribute gives access to the original uncached function for testing or bypass scenarios.

Note

lru_cache is thread-safe for concurrent reads and writes as of Python 3.2+. The underlying data structure uses a lock to maintain coherence during concurrent updates. However, the wrapped function itself may be called more than once for the same arguments if a second thread makes a call before the first thread's result is cached.

Custom Caching Decorator with TTL Expiration

The limitation of lru_cache is that entries never expire. In production, you often need cached results to go stale after a fixed time-to-live (TTL) so the function re-executes and fetches fresh data. The following decorator adds TTL-based expiration on top of a dictionary cache:

import functools
import time

def cache_with_ttl(ttl_seconds: float = 300.0, maxsize: int = 1024):
    """Cache function results with TTL-based expiration.

    Args:
        ttl_seconds: Time-to-live for each cached entry in seconds.
        maxsize: Maximum number of entries before evicting the oldest.
    """
    def decorator(func):
        cache = {}
        cache_order = []  # tracks insertion order for eviction

        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # Build a hashable cache key from arguments
            key = (args, tuple(sorted(kwargs.items())))
            now = time.monotonic()

            # Check for valid cached entry
            if key in cache:
                result, timestamp = cache[key]
                if now - timestamp < ttl_seconds:
                    return result
                # Expired: remove stale entry
                del cache[key]
                cache_order.remove(key)

            # Evict oldest entries if at capacity
            while len(cache) >= maxsize and cache_order:
                oldest_key = cache_order.pop(0)
                cache.pop(oldest_key, None)

            # Call the function and store the result
            result = func(*args, **kwargs)
            cache[key] = (result, now)
            cache_order.append(key)
            return result

        def cache_clear():
            """Flush all cached entries."""
            cache.clear()
            cache_order.clear()

        def cache_info():
            """Return the current number of cached entries."""
            return {"size": len(cache), "maxsize": maxsize, "ttl": ttl_seconds}

        wrapper.cache_clear = cache_clear
        wrapper.cache_info = cache_info
        return wrapper
    return decorator


@cache_with_ttl(ttl_seconds=60.0, maxsize=512)
def get_exchange_rate(base: str, target: str) -> float:
    """Fetch the current exchange rate from an external API."""
    # In production, this would call an HTTP endpoint
    import random
    return round(random.uniform(0.8, 1.2), 4)

This decorator uses time.monotonic() instead of time.time() because monotonic clocks are immune to system clock adjustments (NTP syncs, daylight saving changes, manual corrections). The maxsize parameter prevents unbounded memory growth in long-running processes by evicting the oldest entries first. The attached cache_clear() and cache_info() methods follow the same API pattern established by lru_cache, making the decorator a drop-in replacement with added TTL capability.

When to Use Each Caching Approach

Pattern Best For Limitations
functools.lru_cache Pure computations, recursive algorithms, immutable lookups No TTL, requires hashable args, in-process only
Custom TTL cache API responses, database queries, config lookups In-process only, not thread-safe without locks
Redis-backed cache Distributed systems, shared state across workers Network latency, serialization overhead, external dependency

Rate Limiting Decorators

Rate limiting controls how frequently a function can be called within a given time window. This is critical for production code that interacts with external APIs (which enforce their own rate limits), protects internal services from traffic spikes, or throttles resource-intensive operations to prevent system overload.

Token Bucket Rate Limiter

The token bucket algorithm is one of the most widely used rate limiting strategies. A bucket starts with a fixed number of tokens. Each function call consumes one token. Tokens refill at a constant rate up to the bucket's capacity. If no tokens are available when a call arrives, the call either blocks, raises an exception, or returns a default value depending on the configuration.

import functools
import time
import threading

class RateLimitExceeded(Exception):
    """Raised when a rate-limited function is called too frequently."""
    def __init__(self, retry_after: float):
        self.retry_after = retry_after
        super().__init__(
            f"Rate limit exceeded. Retry after {retry_after:.2f}s"
        )

def rate_limit(calls_per_second: float = 10.0, burst: int = None):
    """Token bucket rate limiting decorator.

    Args:
        calls_per_second: Sustained rate of allowed calls per second.
        burst: Maximum burst capacity. Defaults to calls_per_second.
    """
    capacity = burst if burst is not None else int(calls_per_second)

    def decorator(func):
        tokens = capacity
        last_refill = time.monotonic()
        lock = threading.Lock()

        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            nonlocal tokens, last_refill

            with lock:
                now = time.monotonic()
                elapsed = now - last_refill
                # Refill tokens based on elapsed time
                tokens_to_add = elapsed * calls_per_second
                tokens = min(capacity, tokens + tokens_to_add)
                last_refill = now

                if tokens >= 1.0:
                    tokens -= 1.0
                else:
                    # Calculate wait time until one token is available
                    deficit = 1.0 - tokens
                    retry_after = deficit / calls_per_second
                    raise RateLimitExceeded(retry_after)

            return func(*args, **kwargs)

        wrapper.reset = lambda: _reset()

        def _reset():
            nonlocal tokens, last_refill
            with lock:
                tokens = capacity
                last_refill = time.monotonic()

        wrapper.reset = _reset
        return wrapper
    return decorator


@rate_limit(calls_per_second=5.0, burst=10)
def call_external_api(endpoint: str) -> dict:
    """Send a request to an external API endpoint."""
    return {"status": "ok", "endpoint": endpoint}

This implementation is thread-safe because the token check and decrement happen inside a threading.Lock. The time.monotonic() call ensures consistent timing even if the system clock changes. The burst parameter allows a short burst of rapid calls (up to the bucket capacity) followed by steady-state throttling at the configured rate.

Blocking Rate Limiter

The previous decorator raises an exception when the limit is exceeded. In some cases, you want the call to block and wait until a token becomes available. This is useful for background workers or batch processors that should slow down rather than fail:

import functools
import time
import threading

def rate_limit_blocking(calls_per_second: float = 10.0, burst: int = None):
    """Blocking token bucket rate limiter.

    Instead of raising an exception, sleeps until a token is available.
    """
    capacity = burst if burst is not None else int(calls_per_second)

    def decorator(func):
        tokens = float(capacity)
        last_refill = time.monotonic()
        lock = threading.Lock()

        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            nonlocal tokens, last_refill

            while True:
                with lock:
                    now = time.monotonic()
                    elapsed = now - last_refill
                    tokens_to_add = elapsed * calls_per_second
                    tokens = min(capacity, tokens + tokens_to_add)
                    last_refill = now

                    if tokens >= 1.0:
                        tokens -= 1.0
                        break
                    else:
                        deficit = 1.0 - tokens
                        wait_time = deficit / calls_per_second

                time.sleep(wait_time)

            return func(*args, **kwargs)

        return wrapper
    return decorator


@rate_limit_blocking(calls_per_second=2.0, burst=5)
def sync_record(record_id: int) -> None:
    """Synchronize a record to the remote data warehouse."""
    print(f"Syncing record {record_id}")

The while True loop re-checks the token count after sleeping, which handles the edge case where multiple threads are competing for tokens. The sleep duration is calculated precisely based on the token deficit and refill rate, so the function resumes as soon as a token is mathematically available rather than waiting an arbitrary fixed interval.

Warning

Blocking rate limiters using time.sleep() should not be used in async code. The sleep call blocks the entire event loop. For asyncio-based applications, replace time.sleep() with await asyncio.sleep() and use an asyncio.Lock instead of threading.Lock.

Per-Key Rate Limiting

In web applications and API gateways, you typically rate limit per user, per IP address, or per API key rather than globally. The following decorator accepts a key_func parameter that extracts the limiting key from the function's arguments:

import functools
import time
import threading
from collections import defaultdict

class RateLimitExceeded(Exception):
    def __init__(self, retry_after: float, key: str):
        self.retry_after = retry_after
        self.key = key
        super().__init__(
            f"Rate limit exceeded for '{key}'. "
            f"Retry after {retry_after:.2f}s"
        )

def per_key_rate_limit(
    calls_per_second: float = 5.0,
    burst: int = None,
    key_func=None,
):
    """Per-key token bucket rate limiter.

    Args:
        calls_per_second: Sustained rate per key.
        burst: Burst capacity per key.
        key_func: Callable that extracts the rate limit key from
                  the function's arguments. Receives (*args, **kwargs).
    """
    capacity = burst if burst is not None else int(calls_per_second)
    if key_func is None:
        key_func = lambda *a, **kw: "global"

    def decorator(func):
        buckets = defaultdict(lambda: {
            "tokens": float(capacity),
            "last_refill": time.monotonic(),
        })
        lock = threading.Lock()

        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            key = key_func(*args, **kwargs)

            with lock:
                bucket = buckets[key]
                now = time.monotonic()
                elapsed = now - bucket["last_refill"]
                bucket["tokens"] = min(
                    capacity,
                    bucket["tokens"] + elapsed * calls_per_second,
                )
                bucket["last_refill"] = now

                if bucket["tokens"] >= 1.0:
                    bucket["tokens"] -= 1.0
                else:
                    deficit = 1.0 - bucket["tokens"]
                    retry_after = deficit / calls_per_second
                    raise RateLimitExceeded(retry_after, key)

            return func(*args, **kwargs)

        return wrapper
    return decorator


@per_key_rate_limit(
    calls_per_second=10.0,
    burst=20,
    key_func=lambda user_id, *a, **kw: str(user_id),
)
def process_request(user_id: int, payload: dict) -> dict:
    """Handle an incoming API request for a given user."""
    return {"user_id": user_id, "status": "processed"}

Each unique key gets its own independent token bucket. User 42 can exhaust their quota without affecting user 99. The defaultdict with a lambda factory creates new buckets on first access, so there is no need to pre-register keys. In a production deployment, you would periodically prune stale buckets from the dictionary to prevent memory growth from one-time users.

Stacking Decorators in Production

Python applies stacked decorators from bottom to top. The decorator closest to the function definition wraps first, and the outermost decorator wraps last, which means it executes first. Choosing the right stacking order determines what gets logged, what gets cached, and what gets rate-limited:

@log_calls                              # 3rd applied, 1st to execute
@cache_with_ttl(ttl_seconds=120.0)      # 2nd applied, 2nd to execute
@rate_limit(calls_per_second=5.0)       # 1st applied, 3rd to execute
def get_weather(city: str) -> dict:
    """Fetch current weather data for a city."""
    # This function only runs on cache misses that pass rate limiting
    return external_weather_api(city)

With this stacking order, the execution flow works like this: the logging decorator fires first and records every call attempt including cache hits. The caching decorator fires second and returns the cached result if one exists, skipping both the rate limiter and the underlying function. If the cache misses, the rate limiter fires third and either allows the function to execute or raises RateLimitExceeded. This means cache hits are fast and unlimited, while cache misses are throttled to protect the external API.

If you instead placed @rate_limit above @cache_with_ttl, every call—including cache hits—would count against the rate limit. That wastes tokens on operations that never touch the external service.

Combining Logging and Caching Metrics

A decorator that merges caching and logging into a single wrapper can emit cache hit/miss metrics alongside standard call data. This reduces decorator stacking overhead and gives you correlated metrics in a single log line:

import functools
import logging
import time

logger = logging.getLogger(__name__)

def logged_cache(ttl_seconds: float = 300.0):
    """Combined logging + TTL caching decorator."""
    def decorator(func):
        cache = {}
        hits = 0
        misses = 0

        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            nonlocal hits, misses
            key = (args, tuple(sorted(kwargs.items())))
            now = time.monotonic()

            if key in cache:
                result, timestamp = cache[key]
                if now - timestamp < ttl_seconds:
                    hits += 1
                    logger.debug(
                        "cache_hit: %s | hits=%d misses=%d",
                        func.__qualname__, hits, misses,
                    )
                    return result
                del cache[key]

            misses += 1
            start = time.perf_counter()
            result = func(*args, **kwargs)
            elapsed = time.perf_counter() - start

            cache[key] = (result, now)
            logger.info(
                "cache_miss: %s | duration=%.4fs hits=%d misses=%d ratio=%.2f",
                func.__qualname__, elapsed, hits, misses,
                hits / (hits + misses) if (hits + misses) > 0 else 0.0,
            )
            return result

        wrapper.cache_clear = lambda: cache.clear()
        wrapper.cache_stats = lambda: {
            "hits": hits, "misses": misses, "size": len(cache),
        }
        return wrapper
    return decorator


@logged_cache(ttl_seconds=60.0)
def resolve_dns(hostname: str) -> str:
    """Resolve a hostname to an IP address."""
    import socket
    return socket.gethostbyname(hostname)

The cache_stats() method exposes the cumulative hit/miss ratio, which you can scrape into a monitoring system like Prometheus or export to structured log aggregators. Cache hits log at DEBUG level to reduce noise, while cache misses log at INFO with the full execution duration and running hit ratio.

Pro Tip

When stacking decorators, always verify that each decorator correctly uses functools.wraps. If even one decorator in the stack omits it, the outermost decorator will copy the wrong metadata. You can verify by checking your_function.__name__ and your_function.__qualname__ after all decorators are applied.

Key Takeaways

  1. Always use functools.wraps: Every decorator in this article applies @functools.wraps(func) to preserve the original function's name, docstring, qualified name, and type annotations. Omitting it breaks debuggers, logging output, documentation generators, and introspection tools.
  2. Use time.monotonic() for timing and TTL: The monotonic clock is immune to system clock adjustments. Use time.perf_counter() for high-resolution benchmarking within a single call, and time.monotonic() for TTL calculations that span multiple calls over longer durations.
  3. Choose the right caching strategy for the problem: functools.lru_cache handles pure computations with hashable arguments. A custom TTL cache handles data that needs to expire. A Redis-backed cache handles distributed state across multiple workers or processes.
  4. Thread safety is not optional in production: Both the caching and rate limiting decorators in this article use threading.Lock to protect shared mutable state. In async codebases, replace these with asyncio.Lock and await asyncio.sleep() to avoid blocking the event loop.
  5. Stacking order matters: Place logging outermost (executes first) so it captures every call including cache hits. Place caching in the middle so it short-circuits before rate limiting. Place rate limiting innermost so it only fires on cache misses that reach the protected function.

These three decorator patterns—structured logging, TTL caching, and token bucket rate limiting—address the operational concerns that appear in nearly every production Python application. Each pattern starts with the same foundational structure (a wrapper function, functools.wraps, *args/**kwargs forwarding) and extends it with domain-specific logic. Once you internalize the skeleton, building new decorators for retry logic, circuit breaking, input validation, or metric collection follows the same workflow: wrap, instrument, return.