Async code is fast. Sometimes too fast. When you launch 500 concurrent requests with asyncio.gather, you will discover that the API on the other end has opinions about how many calls you can make per second. The result is HTTP 429 errors, throttled responses, temporary bans, and wasted time retrying failed requests. The fix is a Semaphore -- a concurrency primitive that puts a cap on how many coroutines can access a resource at the same time.
This article covers the full progression from understanding the problem, to basic semaphore usage, to production-ready patterns that combine concurrency limits with time-based throttling and connection pool management.
The Problem: Uncontrolled Concurrency
Here is what happens when you fire off 200 requests at once without any throttling:
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as response:
return response.status
async def main():
urls = [f"https://api.example.com/items/{i}" for i in range(200)]
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
status_counts = {}
for status in results:
status_counts[status] = status_counts.get(status, 0) + 1
print(status_counts)
# Likely output: {200: 87, 429: 113}
asyncio.run(main())
All 200 coroutines start at roughly the same instant. The API server sees a burst of 200 simultaneous connections, exceeds its rate limit threshold, and starts returning 429 (Too Many Requests) responses. You get partial data and a bunch of errors. The async code did exactly what you told it to do -- it just did it too aggressively.
How asyncio.Semaphore Works
A semaphore is a counter with a lock. You initialize it with a number -- say 10 -- and that number represents how many coroutines can hold the semaphore at the same time. When a coroutine acquires the semaphore, the counter decreases by one. When it releases the semaphore, the counter increases by one. If the counter is at zero, any coroutine that tries to acquire it will wait until another coroutine releases it.
import asyncio
semaphore = asyncio.Semaphore(3) # Allow 3 concurrent holders
async def worker(name):
async with semaphore:
print(f"{name} acquired semaphore")
await asyncio.sleep(1) # Simulate work
print(f"{name} releasing semaphore")
async def main():
# Launch 6 workers, but only 3 can run at a time
tasks = [worker(f"Worker-{i}") for i in range(6)]
await asyncio.gather(*tasks)
asyncio.run(main())
When you run this code, workers 0, 1, and 2 start immediately. Workers 3, 4, and 5 wait. As each of the first three finishes and releases the semaphore, the next waiting worker picks it up. The async with block handles both acquisition and release automatically, even if an exception occurs inside the block.
The async with semaphore: syntax is shorthand for calling await semaphore.acquire() at the start and semaphore.release() at the end. Always prefer the context manager form because it guarantees the semaphore is released even if your code raises an exception.
Basic Pattern: Wrapping API Calls with a Semaphore
Applying a semaphore to the earlier example is straightforward. Create the semaphore with the concurrency limit you want, and wrap the HTTP request inside an async with block.
import asyncio
import aiohttp
async def fetch(session, url, semaphore):
async with semaphore:
async with session.get(url) as response:
return response.status, await response.json()
async def main():
semaphore = asyncio.Semaphore(20) # Max 20 concurrent requests
urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 101)]
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
successes = sum(1 for status, _ in results if status == 200)
print(f"Fetched {successes} of {len(results)} posts successfully")
asyncio.run(main())
Now instead of 100 requests hitting the server at once, a maximum of 20 are in flight at any given moment. The remaining 80 coroutines wait in a queue until a slot opens up. The total time increases compared to fully unrestricted concurrency, but every request succeeds because the server never gets overwhelmed.
Semaphore vs BoundedSemaphore
Python's asyncio module provides two semaphore classes. Understanding the difference matters for production code.
| Behavior | asyncio.Semaphore | asyncio.BoundedSemaphore |
|---|---|---|
| Release without matching acquire | Allowed -- counter goes above initial value | Raises ValueError |
| Risk | Accidental over-release can silently increase concurrency beyond your intended limit | Fails loudly, making bugs easier to find |
| Best for | Simple scripts where the context manager handles release | Production code where correctness is critical |
If you always use the async with semaphore: pattern, the release is handled automatically and the distinction rarely matters. But if you are managing acquire/release manually anywhere in your code, BoundedSemaphore will catch mistakes that Semaphore silently ignores.
import asyncio
# Safe for production: raises ValueError on over-release
semaphore = asyncio.BoundedSemaphore(10)
# This will raise ValueError:
# await semaphore.release() # without a prior acquire
Adding Time-Based Throttling
A semaphore limits how many requests are in flight at once. But some APIs define their limits as "X requests per second" rather than "X concurrent connections." A semaphore alone does not enforce time-based spacing. If your 20 concurrent requests all complete in 50 milliseconds, the semaphore immediately lets the next 20 through, and you might exceed a per-second rate limit even though concurrency never went above 20.
To add time-based throttling, combine the semaphore with an asyncio.sleep delay:
import asyncio
import aiohttp
import time
async def fetch_throttled(session, url, semaphore, delay=0.05):
async with semaphore:
async with session.get(url) as response:
data = await response.json()
await asyncio.sleep(delay) # Space out requests
return response.status, data
async def main():
semaphore = asyncio.Semaphore(10)
urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 101)]
start = time.perf_counter()
async with aiohttp.ClientSession() as session:
tasks = [fetch_throttled(session, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
elapsed = time.perf_counter() - start
print(f"Fetched {len(results)} posts in {elapsed:.2f}s")
asyncio.run(main())
The asyncio.sleep(delay) inside the semaphore block ensures each slot is held for at least 50 milliseconds before being released. With 10 concurrent slots and a 50ms minimum hold time, the effective maximum throughput is about 200 requests per second (10 slots / 0.05 seconds). Adjust both the semaphore value and the delay to match your API's documented rate limit.
For precise per-second rate limiting, consider the aiolimiter library, which implements a token bucket algorithm. You initialize it with AsyncLimiter(max_rate, time_period) and use it as an async context manager, just like a semaphore. It handles the timing math for you.
Production Pattern: A Reusable Rate-Limited Client
In real applications, you do not want to pass a semaphore into every function call. A cleaner approach is to wrap the rate-limiting logic inside a client class that handles concurrency, throttling, and error handling in one place.
import asyncio
import aiohttp
import logging
logger = logging.getLogger(__name__)
class RateLimitedClient:
def __init__(self, max_concurrent=10, delay=0.0):
self._semaphore = asyncio.BoundedSemaphore(max_concurrent)
self._delay = delay
self._session = None
async def __aenter__(self):
self._session = aiohttp.ClientSession()
return self
async def __aexit__(self, *exc):
await self._session.close()
async def get_json(self, url, retries=2):
for attempt in range(retries + 1):
async with self._semaphore:
try:
async with self._session.get(url) as resp:
if resp.status == 429:
retry_after = float(resp.headers.get("Retry-After", 1))
logger.warning(f"Rate limited on {url}, waiting {retry_after}s")
await asyncio.sleep(retry_after)
continue
resp.raise_for_status()
data = await resp.json()
if self._delay > 0:
await asyncio.sleep(self._delay)
return data
except aiohttp.ClientError as e:
logger.error(f"Request failed for {url}: {e}")
if attempt == retries:
return None
await asyncio.sleep(0.5 * (attempt + 1))
return None
async def main():
urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 51)]
async with RateLimitedClient(max_concurrent=15, delay=0.05) as client:
tasks = [client.get_json(url) for url in urls]
results = await asyncio.gather(*tasks)
valid = [r for r in results if r is not None]
print(f"Got {len(valid)} of {len(urls)} responses")
asyncio.run(main())
This class handles three things at once: concurrency control via the BoundedSemaphore, time-based spacing via the optional delay, and automatic retry with backoff when the server returns a 429 response. The Retry-After header, when present, tells you exactly how long to wait before trying again.
Combining Semaphores with Connection Pool Limits
A semaphore limits how many coroutines enter a code block. But aiohttp also has its own connection pool limit via TCPConnector. These two limits serve different purposes and work well together.
import asyncio
import aiohttp
async def main():
# Limit TCP connections at the transport layer
connector = aiohttp.TCPConnector(
limit=30, # Max 30 total connections
limit_per_host=10, # Max 10 connections per host
)
# Limit concurrency at the application layer
semaphore = asyncio.BoundedSemaphore(15)
async with aiohttp.ClientSession(connector=connector) as session:
# ... use semaphore inside your fetch functions
asyncio.run(main())
The connector's limit prevents your process from opening too many TCP sockets. The limit_per_host prevents any single API endpoint from consuming all available connections. The semaphore adds application-level control on top -- for example, limiting how many requests enter your business logic at once, which might include processing steps beyond just the HTTP call.
The TCPConnector must be created inside an async function. Creating it at module level (outside of an async context) will raise an error in Python 3.14+ due to the deprecation of implicit event loop creation.
Key Takeaways
- Uncontrolled async concurrency triggers rate limits: Launching hundreds of simultaneous requests with
asyncio.gatherwill overwhelm API servers and produce HTTP 429 errors. Always throttle your request volume when calling external APIs. - asyncio.Semaphore caps concurrent coroutines: Initialize a semaphore with the maximum number of concurrent requests you want to allow, then wrap each API call in
async with semaphore:. Coroutines beyond the limit wait in a FIFO queue until a slot opens. - Use BoundedSemaphore in production:
BoundedSemaphoreraises an error if you accidentally release more times than you acquire, preventing silent bugs that could blow past your intended concurrency limit. - Semaphores control concurrency, not rate: A semaphore limits how many requests are in flight at once, not how many happen per second. For time-based rate limiting, add
asyncio.sleepinside the semaphore block or use a dedicated library likeaiolimiter. - Layer your limits: Use
TCPConnectorto control connection pool size at the transport layer, and a semaphore to control concurrency at the application layer. Together they prevent both socket exhaustion and API rate limit violations.
Rate limiting is not an afterthought -- it is a core requirement of any production code that calls external APIs. A Semaphore gives you a clean, Pythonic way to control the flow of concurrent requests without restructuring your entire async architecture. Start with a conservative limit, monitor for 429 responses, and adjust until you find the sweet spot where throughput is high and errors are zero.