Decorators solve one of the oldest problems in software engineering: how to add cross-cutting behavior to functions without scattering duplicate code across an entire codebase. In production Python, three decorator patterns appear over and over again—structured logging, result caching, and rate limiting. Each one addresses a different operational concern, but all three follow the same underlying principle of wrapping a callable to inject behavior before, after, or around the original execution. This article builds each pattern from scratch with complete, annotated code you can drop into a real project.
Every decorator in this article uses functools.wraps to preserve the wrapped function's __name__, __doc__, __qualname__, and __annotations__. Skipping functools.wraps means your debugger, your logging output, your documentation generators, and your test runners all report the wrapper function's identity instead of the original. It costs one line to fix, and there is no good reason to omit it.
Before looking at domain-specific patterns, here is the minimal decorator skeleton that every example in this article builds on:
import functools
def skeleton(func):
"""Minimal decorator skeleton preserving metadata."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
# pre-call logic
result = func(*args, **kwargs)
# post-call logic
return result
return wrapper
The *args, **kwargs signature ensures the wrapper accepts any combination of positional and keyword arguments, making it safe to apply to functions with varying parameter lists. The @functools.wraps(func) line copies the original function's metadata onto the wrapper so that introspection tools continue to see the correct name, docstring, and type annotations.
Structured Logging Decorators
A logging decorator captures function entry, exit, execution time, arguments, return values, and exceptions in a consistent format across every decorated function. Instead of manually adding logger.info() calls at the top and bottom of each function, the decorator handles it in one place.
Basic Logging Decorator with the Standard Library
The following decorator uses Python's built-in logging module and time.perf_counter for high-resolution timing:
import functools
import logging
import time
logger = logging.getLogger(__name__)
def log_calls(func):
"""Log function entry, exit, duration, and exceptions."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
func_name = func.__qualname__
logger.info(
"call_start: %s | args=%r kwargs=%r",
func_name, args, kwargs,
)
start = time.perf_counter()
try:
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
logger.info(
"call_end: %s | duration=%.4fs result=%r",
func_name, elapsed, result,
)
return result
except Exception as exc:
elapsed = time.perf_counter() - start
logger.exception(
"call_error: %s | duration=%.4fs error=%s",
func_name, elapsed, exc,
)
raise
return wrapper
@log_calls
def fetch_user(user_id: int) -> dict:
"""Retrieve a user record by ID."""
# simulate database lookup
time.sleep(0.05)
return {"id": user_id, "name": "Ada Lovelace"}
When fetch_user(42) runs, the logger emits two entries: one at function entry with the arguments, and one at exit with the return value and elapsed wall-clock time. If the function raises, logger.exception captures the full traceback alongside the timing data. This gives operators a single, consistent log trail for every instrumented function without touching the function's own logic.
Parameterized Logging Decorator
A fixed logging decorator works until you need to control the log level or exclude sensitive arguments from the output. A parameterized decorator—sometimes called a decorator factory—solves this by adding an outer function that accepts configuration:
import functools
import logging
import time
logger = logging.getLogger(__name__)
def log_calls(level=logging.INFO, exclude_args=None):
"""Decorator factory for configurable function call logging.
Args:
level: Logging level for entry/exit messages.
exclude_args: Set of argument names to redact from log output.
"""
exclude = exclude_args or set()
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
import inspect
sig = inspect.signature(func)
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()
safe_args = {
k: "***REDACTED***" if k in exclude else v
for k, v in bound.arguments.items()
}
func_name = func.__qualname__
logger.log(level, "call_start: %s | %s", func_name, safe_args)
start = time.perf_counter()
try:
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
logger.log(
level,
"call_end: %s | duration=%.4fs",
func_name, elapsed,
)
return result
except Exception as exc:
elapsed = time.perf_counter() - start
logger.exception(
"call_error: %s | duration=%.4fs error=%s",
func_name, elapsed, exc,
)
raise
return wrapper
return decorator
@log_calls(level=logging.DEBUG, exclude_args={"password"})
def authenticate(username: str, password: str) -> bool:
"""Validate user credentials against the auth backend."""
return username == "admin" and password == "s3cret"
Calling authenticate("admin", "s3cret") produces a log entry where the password field appears as ***REDACTED*** instead of the plaintext value. The inspect.signature call binds positional and keyword arguments to their parameter names so you can filter by name regardless of how the caller passed them in.
In production systems that emit JSON logs, consider replacing logging with structlog. Structlog's bound loggers let you attach context variables (request IDs, user IDs, trace spans) that automatically appear in every log entry emitted during a request lifecycle, including entries from your logging decorator. The library has been production-stable since 2013 and supports asyncio, context variables, and full type hint coverage as of version 25.x.
Caching Decorators with TTL
Caching decorators store the return value of a function call and serve that stored value on subsequent calls with the same arguments. This eliminates redundant computation or I/O for expensive operations like database queries, API requests, or CPU-intensive calculations.
Using functools.lru_cache
Python's standard library includes functools.lru_cache, a decorator that maintains a Least Recently Used cache with a configurable maximum size. It requires all function arguments to be hashable because it stores them as dictionary keys internally:
import functools
@functools.lru_cache(maxsize=256)
def fibonacci(n: int) -> int:
"""Compute the nth Fibonacci number with memoization."""
if n < 2:
return n
return fibonacci(n - 1) + fibonacci(n - 2)
# After multiple calls, inspect cache performance
print(fibonacci(100))
print(fibonacci.cache_info())
# CacheInfo(hits=98, misses=101, maxsize=256, currsize=101)
The cache_info() method returns a named tuple with hits, misses, maxsize, and currsize, which gives you direct visibility into cache effectiveness. The cache_clear() method flushes all entries, and the __wrapped__ attribute gives access to the original uncached function for testing or bypass scenarios.
lru_cache is thread-safe for concurrent reads and writes as of Python 3.2+. The underlying data structure uses a lock to maintain coherence during concurrent updates. However, the wrapped function itself may be called more than once for the same arguments if a second thread makes a call before the first thread's result is cached.
Custom Caching Decorator with TTL Expiration
The limitation of lru_cache is that entries never expire. In production, you often need cached results to go stale after a fixed time-to-live (TTL) so the function re-executes and fetches fresh data. The following decorator adds TTL-based expiration on top of a dictionary cache:
import functools
import time
def cache_with_ttl(ttl_seconds: float = 300.0, maxsize: int = 1024):
"""Cache function results with TTL-based expiration.
Args:
ttl_seconds: Time-to-live for each cached entry in seconds.
maxsize: Maximum number of entries before evicting the oldest.
"""
def decorator(func):
cache = {}
cache_order = [] # tracks insertion order for eviction
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Build a hashable cache key from arguments
key = (args, tuple(sorted(kwargs.items())))
now = time.monotonic()
# Check for valid cached entry
if key in cache:
result, timestamp = cache[key]
if now - timestamp < ttl_seconds:
return result
# Expired: remove stale entry
del cache[key]
cache_order.remove(key)
# Evict oldest entries if at capacity
while len(cache) >= maxsize and cache_order:
oldest_key = cache_order.pop(0)
cache.pop(oldest_key, None)
# Call the function and store the result
result = func(*args, **kwargs)
cache[key] = (result, now)
cache_order.append(key)
return result
def cache_clear():
"""Flush all cached entries."""
cache.clear()
cache_order.clear()
def cache_info():
"""Return the current number of cached entries."""
return {"size": len(cache), "maxsize": maxsize, "ttl": ttl_seconds}
wrapper.cache_clear = cache_clear
wrapper.cache_info = cache_info
return wrapper
return decorator
@cache_with_ttl(ttl_seconds=60.0, maxsize=512)
def get_exchange_rate(base: str, target: str) -> float:
"""Fetch the current exchange rate from an external API."""
# In production, this would call an HTTP endpoint
import random
return round(random.uniform(0.8, 1.2), 4)
This decorator uses time.monotonic() instead of time.time() because monotonic clocks are immune to system clock adjustments (NTP syncs, daylight saving changes, manual corrections). The maxsize parameter prevents unbounded memory growth in long-running processes by evicting the oldest entries first. The attached cache_clear() and cache_info() methods follow the same API pattern established by lru_cache, making the decorator a drop-in replacement with added TTL capability.
When to Use Each Caching Approach
| Pattern | Best For | Limitations |
|---|---|---|
functools.lru_cache |
Pure computations, recursive algorithms, immutable lookups | No TTL, requires hashable args, in-process only |
| Custom TTL cache | API responses, database queries, config lookups | In-process only, not thread-safe without locks |
| Redis-backed cache | Distributed systems, shared state across workers | Network latency, serialization overhead, external dependency |
Rate Limiting Decorators
Rate limiting controls how frequently a function can be called within a given time window. This is critical for production code that interacts with external APIs (which enforce their own rate limits), protects internal services from traffic spikes, or throttles resource-intensive operations to prevent system overload.
Token Bucket Rate Limiter
The token bucket algorithm is one of the most widely used rate limiting strategies. A bucket starts with a fixed number of tokens. Each function call consumes one token. Tokens refill at a constant rate up to the bucket's capacity. If no tokens are available when a call arrives, the call either blocks, raises an exception, or returns a default value depending on the configuration.
import functools
import time
import threading
class RateLimitExceeded(Exception):
"""Raised when a rate-limited function is called too frequently."""
def __init__(self, retry_after: float):
self.retry_after = retry_after
super().__init__(
f"Rate limit exceeded. Retry after {retry_after:.2f}s"
)
def rate_limit(calls_per_second: float = 10.0, burst: int = None):
"""Token bucket rate limiting decorator.
Args:
calls_per_second: Sustained rate of allowed calls per second.
burst: Maximum burst capacity. Defaults to calls_per_second.
"""
capacity = burst if burst is not None else int(calls_per_second)
def decorator(func):
tokens = capacity
last_refill = time.monotonic()
lock = threading.Lock()
@functools.wraps(func)
def wrapper(*args, **kwargs):
nonlocal tokens, last_refill
with lock:
now = time.monotonic()
elapsed = now - last_refill
# Refill tokens based on elapsed time
tokens_to_add = elapsed * calls_per_second
tokens = min(capacity, tokens + tokens_to_add)
last_refill = now
if tokens >= 1.0:
tokens -= 1.0
else:
# Calculate wait time until one token is available
deficit = 1.0 - tokens
retry_after = deficit / calls_per_second
raise RateLimitExceeded(retry_after)
return func(*args, **kwargs)
wrapper.reset = lambda: _reset()
def _reset():
nonlocal tokens, last_refill
with lock:
tokens = capacity
last_refill = time.monotonic()
wrapper.reset = _reset
return wrapper
return decorator
@rate_limit(calls_per_second=5.0, burst=10)
def call_external_api(endpoint: str) -> dict:
"""Send a request to an external API endpoint."""
return {"status": "ok", "endpoint": endpoint}
This implementation is thread-safe because the token check and decrement happen inside a threading.Lock. The time.monotonic() call ensures consistent timing even if the system clock changes. The burst parameter allows a short burst of rapid calls (up to the bucket capacity) followed by steady-state throttling at the configured rate.
Blocking Rate Limiter
The previous decorator raises an exception when the limit is exceeded. In some cases, you want the call to block and wait until a token becomes available. This is useful for background workers or batch processors that should slow down rather than fail:
import functools
import time
import threading
def rate_limit_blocking(calls_per_second: float = 10.0, burst: int = None):
"""Blocking token bucket rate limiter.
Instead of raising an exception, sleeps until a token is available.
"""
capacity = burst if burst is not None else int(calls_per_second)
def decorator(func):
tokens = float(capacity)
last_refill = time.monotonic()
lock = threading.Lock()
@functools.wraps(func)
def wrapper(*args, **kwargs):
nonlocal tokens, last_refill
while True:
with lock:
now = time.monotonic()
elapsed = now - last_refill
tokens_to_add = elapsed * calls_per_second
tokens = min(capacity, tokens + tokens_to_add)
last_refill = now
if tokens >= 1.0:
tokens -= 1.0
break
else:
deficit = 1.0 - tokens
wait_time = deficit / calls_per_second
time.sleep(wait_time)
return func(*args, **kwargs)
return wrapper
return decorator
@rate_limit_blocking(calls_per_second=2.0, burst=5)
def sync_record(record_id: int) -> None:
"""Synchronize a record to the remote data warehouse."""
print(f"Syncing record {record_id}")
The while True loop re-checks the token count after sleeping, which handles the edge case where multiple threads are competing for tokens. The sleep duration is calculated precisely based on the token deficit and refill rate, so the function resumes as soon as a token is mathematically available rather than waiting an arbitrary fixed interval.
Blocking rate limiters using time.sleep() should not be used in async code. The sleep call blocks the entire event loop. For asyncio-based applications, replace time.sleep() with await asyncio.sleep() and use an asyncio.Lock instead of threading.Lock.
Per-Key Rate Limiting
In web applications and API gateways, you typically rate limit per user, per IP address, or per API key rather than globally. The following decorator accepts a key_func parameter that extracts the limiting key from the function's arguments:
import functools
import time
import threading
from collections import defaultdict
class RateLimitExceeded(Exception):
def __init__(self, retry_after: float, key: str):
self.retry_after = retry_after
self.key = key
super().__init__(
f"Rate limit exceeded for '{key}'. "
f"Retry after {retry_after:.2f}s"
)
def per_key_rate_limit(
calls_per_second: float = 5.0,
burst: int = None,
key_func=None,
):
"""Per-key token bucket rate limiter.
Args:
calls_per_second: Sustained rate per key.
burst: Burst capacity per key.
key_func: Callable that extracts the rate limit key from
the function's arguments. Receives (*args, **kwargs).
"""
capacity = burst if burst is not None else int(calls_per_second)
if key_func is None:
key_func = lambda *a, **kw: "global"
def decorator(func):
buckets = defaultdict(lambda: {
"tokens": float(capacity),
"last_refill": time.monotonic(),
})
lock = threading.Lock()
@functools.wraps(func)
def wrapper(*args, **kwargs):
key = key_func(*args, **kwargs)
with lock:
bucket = buckets[key]
now = time.monotonic()
elapsed = now - bucket["last_refill"]
bucket["tokens"] = min(
capacity,
bucket["tokens"] + elapsed * calls_per_second,
)
bucket["last_refill"] = now
if bucket["tokens"] >= 1.0:
bucket["tokens"] -= 1.0
else:
deficit = 1.0 - bucket["tokens"]
retry_after = deficit / calls_per_second
raise RateLimitExceeded(retry_after, key)
return func(*args, **kwargs)
return wrapper
return decorator
@per_key_rate_limit(
calls_per_second=10.0,
burst=20,
key_func=lambda user_id, *a, **kw: str(user_id),
)
def process_request(user_id: int, payload: dict) -> dict:
"""Handle an incoming API request for a given user."""
return {"user_id": user_id, "status": "processed"}
Each unique key gets its own independent token bucket. User 42 can exhaust their quota without affecting user 99. The defaultdict with a lambda factory creates new buckets on first access, so there is no need to pre-register keys. In a production deployment, you would periodically prune stale buckets from the dictionary to prevent memory growth from one-time users.
Stacking Decorators in Production
Python applies stacked decorators from bottom to top. The decorator closest to the function definition wraps first, and the outermost decorator wraps last, which means it executes first. Choosing the right stacking order determines what gets logged, what gets cached, and what gets rate-limited:
@log_calls # 3rd applied, 1st to execute
@cache_with_ttl(ttl_seconds=120.0) # 2nd applied, 2nd to execute
@rate_limit(calls_per_second=5.0) # 1st applied, 3rd to execute
def get_weather(city: str) -> dict:
"""Fetch current weather data for a city."""
# This function only runs on cache misses that pass rate limiting
return external_weather_api(city)
With this stacking order, the execution flow works like this: the logging decorator fires first and records every call attempt including cache hits. The caching decorator fires second and returns the cached result if one exists, skipping both the rate limiter and the underlying function. If the cache misses, the rate limiter fires third and either allows the function to execute or raises RateLimitExceeded. This means cache hits are fast and unlimited, while cache misses are throttled to protect the external API.
If you instead placed @rate_limit above @cache_with_ttl, every call—including cache hits—would count against the rate limit. That wastes tokens on operations that never touch the external service.
Combining Logging and Caching Metrics
A decorator that merges caching and logging into a single wrapper can emit cache hit/miss metrics alongside standard call data. This reduces decorator stacking overhead and gives you correlated metrics in a single log line:
import functools
import logging
import time
logger = logging.getLogger(__name__)
def logged_cache(ttl_seconds: float = 300.0):
"""Combined logging + TTL caching decorator."""
def decorator(func):
cache = {}
hits = 0
misses = 0
@functools.wraps(func)
def wrapper(*args, **kwargs):
nonlocal hits, misses
key = (args, tuple(sorted(kwargs.items())))
now = time.monotonic()
if key in cache:
result, timestamp = cache[key]
if now - timestamp < ttl_seconds:
hits += 1
logger.debug(
"cache_hit: %s | hits=%d misses=%d",
func.__qualname__, hits, misses,
)
return result
del cache[key]
misses += 1
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
cache[key] = (result, now)
logger.info(
"cache_miss: %s | duration=%.4fs hits=%d misses=%d ratio=%.2f",
func.__qualname__, elapsed, hits, misses,
hits / (hits + misses) if (hits + misses) > 0 else 0.0,
)
return result
wrapper.cache_clear = lambda: cache.clear()
wrapper.cache_stats = lambda: {
"hits": hits, "misses": misses, "size": len(cache),
}
return wrapper
return decorator
@logged_cache(ttl_seconds=60.0)
def resolve_dns(hostname: str) -> str:
"""Resolve a hostname to an IP address."""
import socket
return socket.gethostbyname(hostname)
The cache_stats() method exposes the cumulative hit/miss ratio, which you can scrape into a monitoring system like Prometheus or export to structured log aggregators. Cache hits log at DEBUG level to reduce noise, while cache misses log at INFO with the full execution duration and running hit ratio.
When stacking decorators, always verify that each decorator correctly uses functools.wraps. If even one decorator in the stack omits it, the outermost decorator will copy the wrong metadata. You can verify by checking your_function.__name__ and your_function.__qualname__ after all decorators are applied.
Key Takeaways
- Always use
functools.wraps: Every decorator in this article applies@functools.wraps(func)to preserve the original function's name, docstring, qualified name, and type annotations. Omitting it breaks debuggers, logging output, documentation generators, and introspection tools. - Use
time.monotonic()for timing and TTL: The monotonic clock is immune to system clock adjustments. Usetime.perf_counter()for high-resolution benchmarking within a single call, andtime.monotonic()for TTL calculations that span multiple calls over longer durations. - Choose the right caching strategy for the problem:
functools.lru_cachehandles pure computations with hashable arguments. A custom TTL cache handles data that needs to expire. A Redis-backed cache handles distributed state across multiple workers or processes. - Thread safety is not optional in production: Both the caching and rate limiting decorators in this article use
threading.Lockto protect shared mutable state. In async codebases, replace these withasyncio.Lockandawait asyncio.sleep()to avoid blocking the event loop. - Stacking order matters: Place logging outermost (executes first) so it captures every call including cache hits. Place caching in the middle so it short-circuits before rate limiting. Place rate limiting innermost so it only fires on cache misses that reach the protected function.
These three decorator patterns—structured logging, TTL caching, and token bucket rate limiting—address the operational concerns that appear in nearly every production Python application. Each pattern starts with the same foundational structure (a wrapper function, functools.wraps, *args/**kwargs forwarding) and extends it with domain-specific logic. Once you internalize the skeleton, building new decorators for retry logic, circuit breaking, input validation, or metric collection follows the same workflow: wrap, instrument, return.