Network calls fail. Databases time out. Rate-limited APIs return 429s. In every one of these cases, the correct first response is the same: wait a moment, then try again. A Python retry decorator with exponential backoff wraps that logic into a single reusable annotation, keeping the calling code clean while the decorator handles failure recovery behind the scenes.
This article walks through building a custom Python retry decorator from scratch. Starting with a minimal version, each section adds a new capability -- configurable exception types, exponential backoff with a maximum ceiling, randomized jitter, structured logging, and async support. By the end, you will have a production-grade @retry decorator and a clear understanding of when it makes sense to use it versus reaching for a third-party library like tenacity or backoff.
Why Retry Logic Needs a Decorator
Retry logic scattered across a codebase creates duplication and makes the actual business logic harder to read. Consider what inline retry handling looks like without a decorator:
import time
import requests
def fetch_user_profile(user_id):
max_retries = 3
delay = 1
for attempt in range(max_retries):
try:
response = requests.get(f"https://api.example.com/users/{user_id}")
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as exc:
if attempt == max_retries - 1:
raise
time.sleep(delay)
delay *= 2
Every function that needs retry behavior ends up carrying its own loop, its own delay calculation, and its own exception handling. The retry policy -- how many times, how long to wait, which exceptions to catch -- is tangled into the function body. Changing the policy means editing every function individually.
A decorator solves this by extracting the retry policy into a single, reusable wrapper. The decorated function only contains its core logic. The retry behavior is declared once at the definition site with @retry and its parameters. This separation follows the open-closed principle: you can change the retry policy without modifying the function itself.
Retry decorators only make sense for idempotent operations -- calls that produce the same result whether executed once or multiple times. Retrying a non-idempotent operation like an unguarded database insert or a payment charge can cause duplicate side effects.
Building the @retry Decorator Step by Step
The foundation is a parameterized decorator: a function that accepts configuration arguments and returns the actual decorator. That decorator, in turn, wraps the target function with a retry loop.
Minimal Retry with Fixed Delay
Start with the simplest possible version -- a fixed number of retries with a constant delay between them:
import time
import functools
def retry(max_tries=3, delay=1.0):
"""Retry a function up to max_tries times with a fixed delay."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(1, max_tries + 1):
try:
return func(*args, **kwargs)
except Exception as exc:
last_exception = exc
if attempt < max_tries:
time.sleep(delay)
raise last_exception
return wrapper
return decorator
There are three layers of nesting here, each serving a distinct role. The outermost function retry() captures the configuration parameters. The middle function decorator() receives the target function. The innermost function wrapper() contains the retry loop and is what replaces the original function in the namespace.
The @functools.wraps(func) line preserves the original function's __name__, __doc__, and __module__ attributes. Without it, debugging tools and logging frameworks would see "wrapper" instead of the original function name.
Adding Configurable Exceptions
Catching bare Exception is too broad for production use. You want the decorator to retry only on specific, expected failure types -- such as network timeouts or rate limit responses -- and let everything else propagate immediately:
import time
import functools
def retry(max_tries=3, delay=1.0, exceptions=(Exception,)):
"""Retry on specific exception types only."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(1, max_tries + 1):
try:
return func(*args, **kwargs)
except exceptions as exc:
last_exception = exc
if attempt < max_tries:
time.sleep(delay)
raise last_exception
return wrapper
return decorator
The exceptions parameter accepts a tuple of exception classes. Python's except clause natively supports tuples, so except (TimeoutError, ConnectionError) as exc works without any special handling. Passing a single exception class also works because a one-element tuple is still a valid argument.
Usage at the call site is now explicit about what the decorator will catch:
import requests
@retry(
max_tries=5,
delay=2.0,
exceptions=(requests.exceptions.Timeout, requests.exceptions.ConnectionError)
)
def fetch_remote_config(url):
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
If fetch_remote_config raises a Timeout or ConnectionError, the decorator retries. If it raises a ValueError from malformed JSON, that exception passes through the decorator without interception.
Introducing Exponential Backoff
Fixed delays treat every retry the same way, but transient failures often resolve on their own if you give the system time to recover. Exponential backoff addresses this by doubling the wait time after each failed attempt. The first retry waits 1 second, the second waits 2 seconds, the third waits 4 seconds, and so on.
The formula is straightforward: delay = base_delay * (backoff_factor ** (attempt - 1)). A max_delay ceiling prevents the wait from growing unbounded:
import time
import functools
def retry(
max_tries=3,
base_delay=1.0,
backoff_factor=2,
max_delay=60.0,
exceptions=(Exception,)
):
"""Retry with exponential backoff and a configurable delay ceiling."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(1, max_tries + 1):
try:
return func(*args, **kwargs)
except exceptions as exc:
last_exception = exc
if attempt < max_tries:
delay = base_delay * (backoff_factor ** (attempt - 1))
delay = min(delay, max_delay)
time.sleep(delay)
raise last_exception
return wrapper
return decorator
Choose a max_delay that reflects the maximum acceptable wait time for your application's context. For user-facing API calls, 30-60 seconds is a reasonable ceiling. For background worker tasks or batch processing, values of 300 seconds or more may be appropriate.
With base_delay=1.0 and backoff_factor=2, the delay progression for five attempts would be: 1s, 2s, 4s, 8s, 16s. With max_delay=10.0, that progression becomes: 1s, 2s, 4s, 8s, 10s.
Adding Jitter to Prevent Thundering Herds
Exponential backoff alone has a synchronization problem. When multiple clients experience the same failure at the same time -- for example, when a shared upstream service goes down -- they all calculate the same backoff intervals. Every client retries at 1s, then at 2s, then at 4s, creating perfectly synchronized waves of traffic that can keep the recovering service pinned.
Jitter adds a randomized component to each delay, spreading retries across a wider time window. There are two common strategies:
Full jitter randomizes the entire delay between zero and the calculated backoff value. This provides the widest spread but can occasionally produce very short delays. Equal jitter splits the delay in half -- one half is fixed, the other half is randomized -- guaranteeing a minimum wait while still introducing variation.
import time
import random
import functools
def retry(
max_tries=3,
base_delay=1.0,
backoff_factor=2,
max_delay=60.0,
exceptions=(Exception,),
jitter="full"
):
"""Retry with exponential backoff and configurable jitter.
Args:
max_tries: Maximum number of attempts before re-raising.
base_delay: Initial delay in seconds before the first retry.
backoff_factor: Multiplier applied to the delay after each attempt.
max_delay: Upper bound on the computed delay.
exceptions: Tuple of exception classes that trigger a retry.
jitter: Jitter strategy -- "full", "equal", or "none".
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(1, max_tries + 1):
try:
return func(*args, **kwargs)
except exceptions as exc:
last_exception = exc
if attempt < max_tries:
delay = base_delay * (backoff_factor ** (attempt - 1))
delay = min(delay, max_delay)
delay = _apply_jitter(delay, jitter)
time.sleep(delay)
raise last_exception
return wrapper
return decorator
def _apply_jitter(delay, strategy):
"""Apply a jitter strategy to the computed delay."""
if strategy == "full":
return random.uniform(0, delay)
elif strategy == "equal":
half = delay / 2
return half + random.uniform(0, half)
return delay
The _apply_jitter function is intentionally extracted as a standalone helper. This keeps the decorator body readable and makes it straightforward to test jitter logic in isolation. Passing jitter="none" disables randomization entirely, which is useful during testing when you need deterministic timing.
Here is how the decorator looks in practice with full jitter enabled:
import requests
@retry(
max_tries=5,
base_delay=1.0,
backoff_factor=2,
max_delay=30.0,
exceptions=(requests.exceptions.Timeout, requests.exceptions.ConnectionError),
jitter="full"
)
def call_payment_gateway(payload):
response = requests.post(
"https://payments.example.com/charge",
json=payload,
timeout=15
)
response.raise_for_status()
return response.json()
Integrating Logging and Callback Hooks
Silent retries are invisible retries, and invisible retries make debugging production issues significantly harder. Adding structured logging to the decorator turns each retry attempt into an observable event.
import time
import random
import logging
import functools
logger = logging.getLogger(__name__)
def retry(
max_tries=3,
base_delay=1.0,
backoff_factor=2,
max_delay=60.0,
exceptions=(Exception,),
jitter="full",
on_retry=None
):
"""Production-grade retry decorator with logging and callbacks.
Args:
max_tries: Maximum number of attempts before re-raising.
base_delay: Initial delay in seconds before the first retry.
backoff_factor: Multiplier applied to the delay after each attempt.
max_delay: Upper bound on the computed delay.
exceptions: Tuple of exception classes that trigger a retry.
jitter: Jitter strategy -- "full", "equal", or "none".
on_retry: Optional callback invoked before each retry.
Receives (func, attempt, delay, exception).
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(1, max_tries + 1):
try:
return func(*args, **kwargs)
except exceptions as exc:
last_exception = exc
if attempt < max_tries:
delay = base_delay * (backoff_factor ** (attempt - 1))
delay = min(delay, max_delay)
delay = _apply_jitter(delay, jitter)
logger.warning(
"Retry %d/%d for %s in %.2fs: %s",
attempt,
max_tries,
func.__name__,
delay,
exc
)
if on_retry is not None:
on_retry(func, attempt, delay, exc)
time.sleep(delay)
else:
logger.error(
"All %d attempts exhausted for %s: %s",
max_tries,
func.__name__,
exc
)
raise last_exception
return wrapper
return decorator
def _apply_jitter(delay, strategy):
"""Apply a jitter strategy to the computed delay."""
if strategy == "full":
return random.uniform(0, delay)
elif strategy == "equal":
half = delay / 2
return half + random.uniform(0, half)
return delay
The on_retry callback parameter opens the door for custom integrations. You might use it to increment a Prometheus counter, emit a StatsD metric, or trigger a circuit breaker check:
from prometheus_client import Counter
retry_counter = Counter(
"function_retries_total",
"Total retry attempts",
["function_name", "exception_type"]
)
def track_retry(func, attempt, delay, exc):
retry_counter.labels(
function_name=func.__name__,
exception_type=type(exc).__name__
).inc()
@retry(
max_tries=4,
base_delay=2.0,
exceptions=(TimeoutError, ConnectionError),
on_retry=track_retry
)
def sync_inventory(warehouse_id):
# ... external API call
pass
Async Retry: Supporting Coroutines
The synchronous decorator uses time.sleep(), which blocks the event loop in async code. To support async def functions, the wrapper needs to be a coroutine itself and use asyncio.sleep() instead:
import asyncio
import random
import logging
import functools
logger = logging.getLogger(__name__)
def async_retry(
max_tries=3,
base_delay=1.0,
backoff_factor=2,
max_delay=60.0,
exceptions=(Exception,),
jitter="full"
):
"""Async-compatible retry decorator with exponential backoff."""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(1, max_tries + 1):
try:
return await func(*args, **kwargs)
except exceptions as exc:
last_exception = exc
if attempt < max_tries:
delay = base_delay * (backoff_factor ** (attempt - 1))
delay = min(delay, max_delay)
delay = _apply_jitter(delay, jitter)
logger.warning(
"Async retry %d/%d for %s in %.2fs: %s",
attempt,
max_tries,
func.__name__,
delay,
exc
)
await asyncio.sleep(delay)
raise last_exception
return wrapper
return decorator
Usage is identical to the synchronous version, just applied to coroutines:
import aiohttp
@async_retry(
max_tries=4,
base_delay=1.0,
exceptions=(aiohttp.ClientError, asyncio.TimeoutError)
)
async def fetch_user_data(session, user_id):
async with session.get(
f"https://api.example.com/users/{user_id}",
timeout=aiohttp.ClientTimeout(total=10)
) as response:
response.raise_for_status()
return await response.json()
Do not apply the synchronous @retry decorator to async functions. Calling time.sleep() inside an event loop blocks the entire loop, freezing all concurrent tasks for the duration of the sleep.
A more advanced approach uses inspect.iscoroutinefunction() to auto-detect whether the target is synchronous or asynchronous and dispatch to the appropriate wrapper. This consolidates both decorators into a single @retry interface:
import inspect
def retry(max_tries=3, base_delay=1.0, backoff_factor=2,
max_delay=60.0, exceptions=(Exception,), jitter="full"):
"""Unified retry decorator supporting both sync and async functions."""
def decorator(func):
if inspect.iscoroutinefunction(func):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
last_exception = None
for attempt in range(1, max_tries + 1):
try:
return await func(*args, **kwargs)
except exceptions as exc:
last_exception = exc
if attempt < max_tries:
delay = min(
base_delay * (backoff_factor ** (attempt - 1)),
max_delay
)
delay = _apply_jitter(delay, jitter)
await asyncio.sleep(delay)
raise last_exception
return async_wrapper
else:
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
last_exception = None
for attempt in range(1, max_tries + 1):
try:
return func(*args, **kwargs)
except exceptions as exc:
last_exception = exc
if attempt < max_tries:
delay = min(
base_delay * (backoff_factor ** (attempt - 1)),
max_delay
)
delay = _apply_jitter(delay, jitter)
time.sleep(delay)
raise last_exception
return sync_wrapper
return decorator
Custom Decorator vs. Tenacity vs. Backoff
Building a custom retry decorator teaches you how the mechanism works and gives you full control over every detail. But for production systems with complex retry requirements, two third-party libraries dominate the Python ecosystem: tenacity and backoff.
tenacity is the successor to the now-unmaintained retrying library. It supports retry conditions based on exception types, return values, or custom predicates. It provides composable stop conditions (by attempt count, elapsed time, or both), a full suite of wait strategies (fixed, exponential, random, fibonacci), and native async support. It also exposes lifecycle hooks for logging before and after each attempt.
A tenacity-based retry looks like this:
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)
import requests
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=1, max=30),
retry=retry_if_exception_type(
(requests.exceptions.Timeout, requests.exceptions.ConnectionError)
)
)
def fetch_inventory(sku):
response = requests.get(
f"https://warehouse.example.com/inventory/{sku}",
timeout=10
)
response.raise_for_status()
return response.json()
backoff takes a different architectural approach, separating the backoff algorithm (exponential, fibonacci, constant) from the trigger condition (exception type or return-value predicate). Its on_exception and on_predicate decorators handle the two cases explicitly.
import backoff
import requests
@backoff.on_exception(
backoff.expo,
(requests.exceptions.Timeout, requests.exceptions.ConnectionError),
max_tries=5,
max_time=120
)
def fetch_inventory(sku):
response = requests.get(
f"https://warehouse.example.com/inventory/{sku}",
timeout=10
)
response.raise_for_status()
return response.json()
Here is how the three approaches compare across key dimensions:
| Capability | Custom Decorator | tenacity | backoff |
|---|---|---|---|
| External dependency | None | Yes (pip install tenacity) | Yes (pip install backoff) |
| Retry on exception type | Yes, via exceptions parameter | Yes, via retry_if_exception_type | Yes, via on_exception |
| Retry on return value | Requires manual implementation | Yes, via retry_if_result | Yes, via on_predicate |
| Exponential backoff | Yes | Yes, with wait_exponential | Yes, with backoff.expo |
| Jitter | Yes, manual implementation | Built-in (full jitter by default) | Built-in (full jitter by default) |
| Async support | Requires separate wrapper | Native async/await support | Native async/await support |
| Composable stop conditions | Single condition per decorator | Yes, combine with | operator | max_tries and max_time combined |
| Lifecycle hooks | Via on_retry callback | before, after, before_sleep hooks | on_success, on_backoff, on_giveup |
A custom decorator is the right tool when your retry needs are straightforward and you want zero additional dependencies. For anything more complex -- multiple stop conditions, return-value-based retries, or advanced instrumentation -- tenacity or backoff will save you significant development and testing time.
Key Takeaways
- Parameterized decorators separate retry policy from business logic. The three-layer nesting pattern (
retry() -> decorator() -> wrapper()) lets you configure max attempts, delay, backoff factor, and exception types at the decoration site while the wrapped function stays clean. - Exponential backoff with jitter is the industry standard for retry timing. The delay formula
base_delay * (backoff_factor ** attempt)progressively increases wait times, while jitter (full or equal) prevents synchronized retry storms when multiple clients fail simultaneously. - Always scope retries to specific exception types. Catching bare
Exceptionmasks programming errors likeTypeErrororKeyErrorthat should fail fast, not get silently retried. Pass a tuple of expected, transient exception classes to theexceptionsparameter. - Log every retry attempt. Silent retries hide systemic issues. Logging the function name, attempt number, computed delay, and exception message makes retry behavior visible in monitoring dashboards and log aggregators.
- Use
functools.wrapson every wrapper function. Without it, the decorated function loses its original name, docstring, and module reference -- breaking introspection tools, documentation generators, and debugging workflows. - Evaluate custom vs. library based on complexity. A hand-built decorator works well for simple cases with minimal dependencies. For production systems needing composable stop conditions, return-value-based retries, or built-in async support, tenacity and backoff provide battle-tested implementations.
Retry logic is one of those patterns that appears simple on the surface but develops real nuance once you account for thundering herds, mixed sync/async codebases, and observability requirements. The custom decorator built in this article handles all of those concerns in under 60 lines of code. When the requirements outgrow it, tenacity and backoff are waiting.