Understanding Pipelines in Python

Data flows through software the way water flows through plumbing. You connect discrete stages together, each one transforming what it receives before passing it along. In Python, this concept — the pipeline — isn’t a single feature but a design philosophy woven deeply into the language itself, from generators and itertools all the way through to machine learning frameworks and data engineering platforms.

This article covers how pipelines actually work in Python, why the language is so naturally suited to them, which PEPs shaped the features that make them possible, and how to build your own from scratch. Real code, real explanations, no hand-waving.

What Is a Pipeline, Really?

A pipeline is a sequence of processing steps where the output of one step becomes the input to the next. If you have ever written something like this:

result = step_three(step_two(step_one(data)))

…you have already built a pipeline. The problem is readability. As the number of steps grows, nested function calls become difficult to parse visually. Pipelines solve this by making the flow of data explicit and linear.

The concept traces directly back to Unix shell pipes. In 1973, Ken Thompson implemented pipes in Unix at Bell Labs — driven by Doug McIlroy, who had been advocating for them since at least 1964, describing his vision in a typed memo as a way of “connecting programs like garden hose” so that programmers could screw in another segment whenever data needed to be massaged in a new way. Thompson did it in a single night, rewriting the existing Unix utilities to accept standard input in that same session. McIlroy later formalized the underlying philosophy in the 1978 Bell System Technical Journal with what became the canonical Unix philosophy: make each program do one thing well, expect the output of every program to become the input to another, and write programs to handle text streams. When you type cat access.log | grep "404" | wc -l in a terminal, you are executing that philosophy directly. Python brings it into application-level code.

The Foundation: Iterators and the Iterator Protocol

Before you can build a pipeline in Python, you need to understand what makes pipelining possible at the language level: the iterator protocol.

Every for loop in Python works through iterators. When you write for item in collection, Python calls iter() on the collection to get an iterator object, then repeatedly calls __next__() on that iterator until a StopIteration exception is raised. This protocol was formalized in PEP 234 — Iterators, authored by Ka-Ping Yee and Guido van Rossum, created for Python 2.1 and fully shipped with Python 2.2.

The iterator protocol is what allows pipeline stages to communicate. Each stage produces values on demand rather than computing everything upfront. This lazy evaluation is the key to memory-efficient pipelines.

Here is the protocol at its most basic:

class Counter:
    """A simple iterator that counts up to a maximum."""
    def __init__(self, maximum):
        self.maximum = maximum
        self.current = 0

    def __iter__(self):
        return self

    def __next__(self):
        if self.current >= self.maximum:
            raise StopIteration
        value = self.current
        self.current += 1
        return value

# Usage
for number in Counter(5):
    print(number)  # 0, 1, 2, 3, 4

That class is verbose for what it does, and this is exactly the problem that generators solve.

PEP 255: Generators Change Everything

PEP 255 — Simple Generators, written by Neil Schemenauer, Tim Peters, and Magnus Lie Hetland, was accepted for Python 2.2 and introduced the yield statement. It remains one of the most consequential PEPs ever written for pipeline-style programming.

The PEP’s motivation section identifies the core problem: when a producer must maintain state between the values it yields, most languages offer no clean solution except forcing the caller to provide a callback. Generators eliminated this friction entirely.

When Guido van Rossum was asked about generators in a 2002 interview with Linux Journal, he described them as originating from the Icon programming language and called them an especially powerful way to write more readable code for situations that require maintaining state between values.

A generator function looks like a normal function but uses yield instead of return. When called, it does not execute the function body. Instead, it returns a generator object — an iterator that lazily produces values:

def read_lines(filepath):
    """Yield lines from a file one at a time."""
    with open(filepath) as f:
        for line in f:
            yield line.strip()

def filter_nonempty(lines):
    """Yield only non-empty lines."""
    for line in lines:
        if line:
            yield line

def to_uppercase(lines):
    """Yield each line in uppercase."""
    for line in lines:
        yield line.upper()

Each of these functions is a pipeline stage. You connect them by passing one generator into the next:

lines = read_lines("data.txt")
cleaned = filter_nonempty(lines)
uppercased = to_uppercase(cleaned)

for line in uppercased:
    print(line)

Nothing happens until you iterate. The for loop at the bottom pulls values through the entire pipeline. Each __next__() call cascades backward through the chain: to_uppercase asks filter_nonempty for a value, which asks read_lines, which reads exactly one line from the file. At no point is the entire file loaded into memory.

PEP 289: Generator Expressions as Inline Pipelines

PEP 289 — Generator Expressions, authored by Raymond Hettinger, was accepted for Python 2.4. It introduced a compact syntax for creating generators inline, which is perfect for building lightweight pipelines without defining separate functions.

The PEP opens with a clear rationale, describing generator expressions as a high-performance, memory-efficient generalization of both list comprehensions and generators. Raymond Hettinger noted when presenting PEP 289 in 2003 on python-dev that the new syntax allowed operations like sum(x*x for x in roots) to run without creating a full list in memory, saving both allocation time and memory.

Here is a complete pipeline built entirely from generator expressions:

with open("access-log") as wwwlog:
    bytecolumn = (line.rsplit(None, 1)[1] for line in wwwlog)
    bytes_sent = (int(x) for x in bytecolumn if x != '-')
    print("Total", sum(bytes_sent))
Note

This example comes from David Beazley’s influential “Generator Tricks for Systems Programmers” tutorial, originally presented at PyCon 2008 in Chicago and revised for Python 3.7 in October 2018. As Beazley notes in the slides, iteration is what holds the pipeline together: the sum() function drives the entire chain by pulling values through via __next__() calls at each stage.

Beazley also demonstrated that this generator-based approach is not slow. In his 2018 Python 3.7 revision of the slides, testing against the same 1.3 GB log file, the generator pipeline completed in 16.7 seconds compared to 18.6 seconds for the equivalent imperative loop — roughly 10% faster. The original 2008 version of the benchmark, run on Python 2.5 on a Mac Pro, showed 25.96 seconds for the generator pipeline versus 27.20 seconds for the loop, a narrower margin of about 5%. The direction was consistent across both versions: generators were never slower. The pipeline version was faster because it avoided redundant intermediate variable assignments and leveraged Python’s internal C-level iteration optimizations when evaluating sum() over a generator.

PEP 342: Coroutine Pipelines with send()

PEP 342 — Coroutines via Enhanced Generators, written by Guido van Rossum and Phillip J. Eby and accepted for Python 2.5, transformed generators from one-way producers into two-way communication channels. The send() method allowed values to be pushed into a generator at the point where it was suspended.

This opened up a second model of pipelining. Where generator pipelines pull data through the chain (the consumer drives iteration), coroutine pipelines push data forward (the producer drives execution):

def coroutine(func):
    """Decorator to auto-advance a coroutine to its first yield."""
    def start(*args, **kwargs):
        cr = func(*args, **kwargs)
        next(cr)
        return cr
    return start

@coroutine
def grep(pattern, target):
    """Filter lines containing pattern and send to target."""
    while True:
        line = (yield)
        if pattern in line:
            target.send(line)

@coroutine
def printer():
    """Print everything received."""
    while True:
        line = (yield)
        print(line)

# Build the pipeline (note: constructed from the end backward)
pipeline = grep("python", printer())

# Push data through
pipeline.send("We love python programming")
pipeline.send("Java is also popular")
pipeline.send("python generators are powerful")

The PEP itself includes a pipeline example involving thumbnail image processing, demonstrating the pattern with a thumbnail_pager coroutine feeding into a jpeg_writer coroutine. As the “What’s New in Python 2.5” documentation explained, generators enhanced with send() become both producers and consumers simultaneously — which is precisely what makes them coroutines.

A critical difference from generator pipelines: coroutine pipelines can branch. Because send() is an explicit call, a single coroutine can send data to multiple targets, enabling fan-out patterns that are impossible with simple pull-based iteration.

PEP 380: yield from Simplifies Pipeline Composition

PEP 380 — Syntax for Delegating to a Subgenerator, authored by Greg Ewing and officially accepted by Guido van Rossum on June 26, 2011, addressed a practical pain point in pipeline construction: delegating to sub-generators.

Before PEP 380, if you had a generator that needed to yield all values from another generator, you wrote a loop:

def chain_manually(*iterables):
    for iterable in iterables:
        for item in iterable:
            yield item

With yield from, this becomes:

def chain_delegated(*iterables):
    for iterable in iterables:
        yield from iterable

This is not merely syntactic sugar. PEP 380 notes that the new syntax creates optimization possibilities when one generator re-yields values from another. It also correctly forwards send(), throw(), and close() calls to the subgenerator, making it possible to compose coroutine pipelines from smaller pieces without losing the full generator protocol.

For pipeline construction, this means you can factor out stages into sub-generators and compose them cleanly:

def read_csv_files(directory):
    """Yield rows from all CSV files in a directory."""
    from pathlib import Path
    for csv_file in Path(directory).glob("*.csv"):
        yield from read_single_csv(csv_file)

def read_single_csv(filepath):
    """Yield rows from a single CSV file."""
    import csv
    with open(filepath) as f:
        reader = csv.DictReader(f)
        for row in reader:
            yield row

The itertools Module: Iterator Algebra for Pipelines

Raymond Hettinger’s itertools module provides a standardized toolkit of pipeline-ready building blocks. The official documentation describes these tools as forming an “iterator algebra” — composable pieces inspired by constructs from APL, Haskell, and SML, recast in a form suitable for Python.

The documentation further explains the design philosophy: the tools are meant to be linked together in a functional style to minimize code volume, while relying on C-implemented “vectorized” building blocks rather than Python-level loops to avoid interpreter overhead.

Here is a practical pipeline built from itertools components:

from itertools import chain, filterfalse, islice

def process_log_pipeline(log_files):
    """Process multiple log files through an itertools pipeline."""

    # Stage 1: Chain all files into a single stream
    all_lines = chain.from_iterable(
        open(f) for f in log_files
    )

    # Stage 2: Strip whitespace
    stripped = (line.strip() for line in all_lines)

    # Stage 3: Remove blank lines
    non_empty = filterfalse(lambda line: not line, stripped)

    # Stage 4: Take only the first 1000 lines
    sample = islice(non_empty, 1000)

    return sample

Every stage here is lazy. The entire pipeline processes one line at a time, regardless of how many files or how many total lines exist. This is the power of iterator algebra: you compose specialized tools from general-purpose pieces, and the resulting pipeline has the same memory characteristics as if you had written a single carefully optimized loop.

Building a Reusable Pipeline Class

For more complex applications, you may want a formal pipeline abstraction. Here is a clean implementation using functools.reduce:

from functools import reduce

def pipe(data, *functions):
    """Pass data through a sequence of functions."""
    return reduce(lambda value, func: func(value), functions, data)

# Define stages as regular functions
def normalize(records):
    return (
        {k: v.strip() if isinstance(v, str) else v for k, v in r.items()}
        for r in records
    )

def filter_active(records):
    return (r for r in records if r.get("status") == "active")

def extract_emails(records):
    return (r["email"] for r in records if "email" in r)

# Execute the pipeline
records = [
    {"name": "  Alice ", "email": "alice@example.com", "status": "active"},
    {"name": "Bob", "email": "bob@example.com", "status": "inactive"},
    {"name": " Charlie  ", "email": "charlie@example.com", "status": "active"},
]

result = pipe(
    iter(records),
    normalize,
    filter_active,
    extract_emails,
)

for email in result:
    print(email)
# Output:
# alice@example.com
# charlie@example.com
Pro Tip

This pipe pattern was proposed for inclusion in the standard library in a November 2024 Discourse thread titled “functools.pipe — Function Composition Utility” and tracked as CPython issue #127029. The proposal argued it is one of the basic components for functional programming. As of this writing, the proposal was closed as not planned — meaning this remains a pattern you implement yourself.

The functools.reduce Connection

The pipe function above relies on functools.reduce, which is itself a pipeline of sorts: it threads an accumulator through a sequence of values and a binary function. Understanding reduce deeply is essential for understanding function composition in Python.

from functools import reduce

# reduce is itself a pipeline:
# reduce(f, [a, b, c, d], init)
# computes f(f(f(f(init, a), b), c), d)

# Function composition with reduce
def compose(*functions):
    """Create a new function that applies functions right-to-left."""
    return reduce(
        lambda f, g: lambda *args, **kwargs: f(g(*args, **kwargs)),
        functions
    )

# compose(h, g, f)(x) == h(g(f(x)))
double = lambda x: x * 2
add_one = lambda x: x + 1
square = lambda x: x ** 2

transform = compose(square, add_one, double)
print(transform(3))  # square(add_one(double(3))) = square(7) = 49

Scikit-learn’s Pipeline: The ML Standard

In machine learning, scikit-learn’s Pipeline class is the canonical example of pipeline design. It chains together preprocessing steps and a final estimator, ensuring that each transformation is applied in order during both training and prediction:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression

# Each step is a (name, transformer) tuple
ml_pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('pca', PCA(n_components=2)),
    ('classifier', LogisticRegression()),
])

# fit() calls fit_transform() on each step, then fit() on the last
ml_pipeline.fit(X_train, y_train)

# predict() calls transform() on each step, then predict() on the last
predictions = ml_pipeline.predict(X_test)

The brilliance of scikit-learn’s design is that Pipeline itself implements the estimator interface. A pipeline can be nested inside another pipeline, used as a step in GridSearchCV, or serialized with joblib. This composability mirrors the Unix philosophy: each component has a standard interface, making them interchangeable.

PEP 525: Async Generators for Async Pipelines

PEP 525 — Asynchronous Generators, authored by Yury Selivanov and accepted for Python 3.6, extended the generator concept to asynchronous code. This enables pipelines over data sources that involve I/O waiting — network requests, database queries, or file operations using async libraries.

The PEP explicitly connects its motivation to the original generator PEP, noting that the rationale Schemenauer, Peters, and Hetland laid out for PEP 255 applies equally in the async context.

import asyncio
import aiohttp

async def fetch_urls(urls):
    """Async generator: yield response text from URLs."""
    async with aiohttp.ClientSession() as session:
        for url in urls:
            async with session.get(url) as response:
                yield await response.text()

async def extract_titles(pages):
    """Async generator: extract <title> from HTML pages."""
    import re
    async for page in pages:
        match = re.search(r'<title>(.*?)</title>', page, re.IGNORECASE)
        if match:
            yield match.group(1)

async def main():
    urls = [
        "https://example.com",
        "https://httpbin.org/html",
    ]

    pages = fetch_urls(urls)
    titles = extract_titles(pages)

    async for title in titles:
        print(title)

asyncio.run(main())

The pipeline here looks structurally identical to a synchronous generator pipeline. The only differences are async def, async for, and yield inside an async function. The mental model is the same: each stage lazily produces values, and the consumer drives the chain.

The Pipe Operator: Python’s Ongoing Discussion

Many languages have native pipe operators. Elixir uses |>, F# uses |>, and R has %>% (from the magrittr package). Python does not have a built-in pipe operator, but the discussion has been active for years.

In March 2024, a CPython issue (#116744) proposed adding support for function composition using the | operator. In June 2025, a Discourse thread titled “Pipe operator |>” explored the syntax further. Neither has been accepted into the language.

Python does allow operator overloading, which means you can build your own pipe syntax. Here is one approach using __or__:

class Pipe:
    """Enable pipe syntax: value | Pipe(func)."""
    def __init__(self, func):
        self.func = func

    def __ror__(self, other):
        return self.func(other)

# Usage
double = Pipe(lambda x: x * 2)
add_ten = Pipe(lambda x: x + 10)
as_string = Pipe(str)

result = 5 | double | add_ten | as_string
print(result)  # "20"

This works because Python evaluates 5 | double by checking if int has an __or__ method for Pipe objects. It does not, so Python falls back to Pipe.__ror__, which receives 5 as the other argument. The result feeds into the next | operation.

The third-party pipe library on PyPI takes this further, providing where, select, chain, traverse, and other composable operations that use the | operator for clean, readable data transformations.

Error Handling in Pipelines

Production pipelines need error handling. Generator pipelines can use try/except within each stage, but a more reusable pattern is a decorator that wraps pipeline stages:

import logging

def resilient(func):
    """Wrap a generator stage to skip items that raise exceptions."""
    def wrapper(iterable, *args, **kwargs):
        for item in iterable:
            try:
                yield from func([item], *args, **kwargs)
            except Exception as e:
                logging.warning(f"Skipping item due to {type(e).__name__}: {e}")
    return wrapper

@resilient
def parse_integers(items):
    for item in items:
        yield int(item)

# Bad data doesn't crash the pipeline
data = ["1", "2", "three", "4", "five"]
results = list(parse_integers(data))
print(results)  # [1, 2, 4]

This pattern comes from the practical reality that pipelines processing real data will encounter malformed records, network timeouts, and encoding errors. Making each stage independently resilient keeps the pipeline running while logging failures for later investigation.

Backpressure: When Producers Outrun Consumers

A question the standard pipeline pattern does not answer: what happens when your producer generates data faster than your consumer can process it? This is the backpressure problem, and it is one of the gaps between toy pipeline examples and production systems.

Pull-based generator pipelines handle backpressure naturally. Because the consumer drives iteration, a slow consumer simply pulls more slowly — the producer does not generate the next item until asked. This is the core advantage of lazy evaluation, and it is why generators behave well under memory pressure even against large data sources.

The problem surfaces in three specific situations: when you parallelize stages with threads or processes, when you introduce async I/O that can buffer results, and when a coroutine pipeline fans out to multiple targets running at different speeds. In these cases you need an explicit mechanism to limit how far ahead a fast stage can run.

The standard Python answer is queue.Queue with a bounded maxsize:

import queue
import threading

def producer_stage(output_q: queue.Queue, data):
    """Push items to a bounded queue; blocks when queue is full."""
    for item in data:
        processed = expensive_transform(item)
        output_q.put(processed)  # Blocks if queue is full -- this IS backpressure
    output_q.put(None)  # Sentinel to signal completion

def consumer_stage(input_q: queue.Queue):
    """Pull items from the queue and process them."""
    while True:
        item = input_q.get()
        if item is None:
            break
        yield item

# A queue with maxsize=100 means the producer can never be more
# than 100 items ahead of the consumer. Memory usage is bounded.
pipeline_q = queue.Queue(maxsize=100)

producer_thread = threading.Thread(
    target=producer_stage, args=(pipeline_q, data_source)
)
producer_thread.start()

for result in consumer_stage(pipeline_q):
    write_to_database(result)

The maxsize parameter is not a guess — it should be sized to reflect how many items you can afford to buffer given the memory profile of each item and the total memory available to the process. An unbounded queue is a slow memory leak waiting to happen.

For async pipelines, asyncio.Queue provides the same bounded-queue semantics without blocking threads. The await queue.put(item) call suspends the coroutine if the queue is full, yielding control back to the event loop instead of consuming a thread.

Production Warning

If you chain generator stages and then parallelize them with ThreadPoolExecutor.map() or ProcessPoolExecutor.map(), be aware that the default chunksize argument controls how many items are batched per worker call. A large chunksize materializes more items in memory at once, partially defeating lazy evaluation. Use chunksize=1 when items are large or when memory is constrained.

Testing Pipeline Stages in Isolation

A question that gets surprisingly little coverage: how do you test a pipeline? The good news is that the same property that makes generator pipelines memory-efficient also makes them easy to test. Because each stage is a function from iterable to iterable, you can test any stage by passing in a known list and asserting on the output.

import pytest

def filter_active(records):
    return (r for r in records if r.get("status") == "active")

def test_filter_active_keeps_active_records():
    records = [
        {"id": 1, "status": "active"},
        {"id": 2, "status": "inactive"},
        {"id": 3, "status": "active"},
    ]
    result = list(filter_active(records))
    assert len(result) == 2
    assert all(r["status"] == "active" for r in result)

def test_filter_active_empty_input():
    assert list(filter_active([])) == []

def test_filter_active_all_inactive():
    records = [{"id": 1, "status": "inactive"}]
    assert list(filter_active(records)) == []

The pattern holds for the full pipeline too. Compose the stages in your test the same way you would in production, feed in a small in-memory list, and assert on the collected output. You do not need mocks for stages that accept iterables — just pass in test data directly.

The trickier testing scenario is error handling. If a stage is wrapped with the resilient decorator from the previous section, you need to verify that bad items are skipped and good items still pass through. The test structure is the same: construct a mixed list, run it through the stage, and assert on both the output and the logged warnings.

One pattern worth building into your test suite: a pipeline_harness fixture that materializes a pipeline and returns its output as a list, making assertions readable and pipeline construction reusable across tests:

@pytest.fixture
def run_pipeline():
    def _run(*stages, data):
        """Chain stages and collect to a list."""
        result = data
        for stage in stages:
            result = stage(result)
        return list(result)
    return _run

def test_full_etl_pipeline(run_pipeline):
    data = [
        {"name": "  Alice ", "email": "alice@example.com", "status": "active"},
        {"name": "Bob", "email": "bob@example.com", "status": "inactive"},
    ]
    emails = run_pipeline(normalize, filter_active, extract_emails, data=iter(data))
    assert emails == ["alice@example.com"]

Type Annotations and Pipelines

Python’s type system has specific vocabulary for generators and iterables. Using it correctly makes pipeline stages more readable and catches a category of integration bugs before runtime.

The key types, from the typing (and in Python 3.9+, from built-in generics) module:

  • Iterable[T]: Anything you can iterate over once. Use this for the input to a pipeline stage when you do not need to iterate more than once.
  • Iterator[T]: An iterable that also has __next__(). Generator objects are iterators. A pipeline stage that returns a generator should annotate its return type as Iterator[T].
  • Generator[YieldType, SendType, ReturnType]: The full generator type. For pipelines that only yield (no send()), use Generator[T, None, None]. Most generator pipeline stages fall here.
from typing import Iterator, Iterable
from collections.abc import Iterator as AbcIterator

def read_lines(filepath: str) -> Iterator[str]:
    with open(filepath) as f:
        for line in f:
            yield line.strip()

def filter_nonempty(lines: Iterable[str]) -> Iterator[str]:
    for line in lines:
        if line:
            yield line

def to_uppercase(lines: Iterable[str]) -> Iterator[str]:
    for line in lines:
        yield line.upper()

Notice that inputs are typed as Iterable[str] (broader) and outputs as Iterator[str] (more specific). This is deliberate: by accepting Iterable, a stage can receive a list, a generator, a file object, or any other iterable. By returning Iterator, it makes clear that the result is lazy and single-pass.

For scikit-learn-style pipelines where stages have specific interfaces, Protocol classes from typing let you express the contract without coupling stages to a shared base class:

from typing import Protocol

class Transformer(Protocol):
    def fit(self, X, y=None): ...
    def transform(self, X): ...

def build_pipeline(steps: list[tuple[str, Transformer]]) -> ...:
    ...

Type annotations do not change how pipelines run, but they make the data contract between stages explicit. When a stage annotation says Iterator[dict[str, str]], you immediately know it will not produce integers or accept lists of tuples. That clarity pays dividends when debugging a pipeline whose stages were written by different people on different days.

Performance Considerations

Generator pipelines have minimal overhead. Each stage adds roughly the cost of one function call per item — the __next__() call that pulls a value through. For CPU-bound work, this overhead is negligible compared to the actual computation in each stage.

For I/O-bound pipelines, consider:

  • concurrent.futures: Parallelize individual stages with ThreadPoolExecutor for network-heavy work or ProcessPoolExecutor for CPU-heavy work.
  • asyncio: Use async generators (PEP 525) when stages involve waiting for external resources.
  • itertools: Prefer C-implemented itertools functions over Python-level generator expressions when possible, since they avoid interpreter overhead entirely.

The critical performance advantage of generator pipelines is memory. A pipeline processing a 10 GB file uses only enough memory for the current item at each stage, plus Python’s normal overhead. This is the direct result of lazy evaluation — the design decision that PEP 255 and PEP 289 baked into the language.

Splitting a Stream: itertools.tee and Its Costs

Every pipeline example in most tutorials has a single linear chain: one source, one consumer. But real systems often need to split a single stream and send it to multiple destinations. Python has a built-in tool for this: itertools.tee(). And it comes with a cost that is almost always underdiscussed.

itertools.tee(iterable, n=2) returns n independent iterators from a single iterable. Each can be consumed independently at its own pace:

from itertools import tee, islice

def read_events(source):
    for item in source:
        yield item

stream = read_events(range(1000))
stream_a, stream_b = tee(stream)

# Each iterator is independent
first_ten = list(islice(stream_a, 10))
first_five = list(islice(stream_b, 5))

This looks like free duplication. It is not. Internally, tee buffers values that one consumer has seen but the other has not yet consumed. If stream_a advances 10,000 items ahead of stream_b, those 10,000 items are held in memory. In the worst case — where one consumer exhausts the stream before the other even starts — you have materialized the entire source into memory. The lazy evaluation you worked so hard to preserve is gone.

The official Python documentation explicitly warns against using the original iterable after calling tee(), and notes that the internal buffer grows without bound if the iterators advance at different rates. This is documented behavior, but it surprises many developers who encounter it in production.

When you need true fan-out from a single source without buffering the entire stream, the correct pattern depends on what you can afford to tolerate:

import queue
import threading

def fan_out(source, *output_queues, sentinel=None):
    """
    Route each item from source to all output queues.
    Use bounded queues to preserve backpressure per consumer.
    """
    for item in source:
        for q in output_queues:
            q.put(item)
    for q in output_queues:
        q.put(sentinel)

# Two consumers, each with independent bounded queues
q_analytics = queue.Queue(maxsize=500)
q_storage   = queue.Queue(maxsize=500)

producer = threading.Thread(
    target=fan_out,
    args=(read_events(big_source), q_analytics, q_storage)
)
producer.start()

This pattern keeps both consumers fed at the pace of the slower one (bounded queues enforce backpressure per consumer). tee is fine when consumers advance at roughly the same rate, or when the source is small enough that the buffer is not a concern. The key is knowing which situation you are in.

The Full Generator Protocol: .throw() and .close()

Most introductions to generator pipelines explain __next__(). Fewer explain the rest of the protocol, which matters considerably when you are building production-grade pipelines that need to handle exceptions and clean up resources.

The generator protocol has three methods beyond __next__():

  • generator.send(value): Resume the generator and inject value at the yield expression. Covered in the PEP 342 section above.
  • generator.throw(type, value=None, traceback=None): Raise an exception inside the generator at the point where it is currently suspended. The generator can catch it with a try/except block and continue yielding, or let it propagate.
  • generator.close(): Throw a GeneratorExit exception into the generator, which should cause it to clean up and return. This is called automatically when a generator is garbage collected.

Why does this matter for pipelines? Because yield from (PEP 380) propagates all three correctly through the delegation chain. If you call close() on the outermost generator of a yield from chain, the GeneratorExit is forwarded all the way down to the innermost generator. Every stage in the chain that uses try/finally gets its cleanup code executed. This is how a pipeline that opens files, database connections, or network sockets can release those resources when the consumer stops consuming early:

def read_lines_with_cleanup(filepath):
    """
    Generator that ensures the file is closed even if
    the consumer stops iterating early.
    """
    try:
        with open(filepath) as f:
            for line in f:
                yield line.strip()
    finally:
        # This executes on GeneratorExit (close()) AND on normal exhaustion
        print(f"Closed {filepath}")

def pipeline(filepaths):
    for path in filepaths:
        yield from read_lines_with_cleanup(path)

# If we stop consuming early, the currently-open file is still closed
gen = pipeline(["a.txt", "b.txt", "c.txt"])
for i, line in enumerate(gen):
    if i >= 5:
        gen.close()  # Triggers GeneratorExit in read_lines_with_cleanup
        break

The .throw() method is less commonly needed but is the right tool when a pipeline stage should react to an external signal. For example, a pipeline reading from a network source could receive a timeout signal injected via .throw(TimeoutError), allowing the generator to handle the error and either recover or propagate it cleanly.

Resource Cleanup Warning

If you use a for loop to consume a pipeline and break out early, Python calls generator.close() automatically when the generator object is garbage collected — but not necessarily immediately. If your generator holds open file handles or database cursors, do not rely on timely garbage collection. Call gen.close() explicitly in a finally block, or wrap the generator in a context manager using contextlib.closing().

Instrumenting Pipelines for Observability

A question this topic almost never addresses: once a pipeline is in production, how do you know what is happening inside it? Generators are opaque by design. You cannot inspect the internal state of a suspended generator from outside it. You cannot see how many items are buffered between stages. You cannot measure how long each stage is taking per item without modifying the stage itself.

This is not a problem unique to Python pipelines, but it is one where Python’s design makes the solution particularly clean. Because each stage is a function from iterable to iterable, you can insert a transparent monitoring stage anywhere in the chain without modifying the surrounding stages:

import time
import logging
from collections.abc import Iterable

def monitor(iterable: Iterable, *, name: str, log_every: int = 1000):
    """
    Transparent monitoring stage: passes items through unchanged
    while logging throughput and latency statistics.
    """
    count = 0
    total_time = 0.0
    for item in iterable:
        t0 = time.perf_counter()
        yield item
        elapsed = time.perf_counter() - t0
        total_time += elapsed
        count += 1
        if count % log_every == 0:
            avg_ms = (total_time / count) * 1000
            logging.info(
                f"[{name}] {count} items processed, "
                f"avg downstream latency: {avg_ms:.2f}ms"
            )

# Insert between any two stages without changing their code
lines    = read_lines("data.txt")
filtered = filter_nonempty(lines)
observed = monitor(filtered, name="filter_nonempty", log_every=500)
result   = to_uppercase(observed)

This pattern measures the time spent downstream of the monitoring point — from the moment an item is yielded to the moment control returns after the next stage consumed it. That asymmetric timing reveals where slow consumers are creating latency, not just where slow producers are generating data.

For async pipelines, the same pattern works with async for. For pipelines using queue.Queue between threads, the queue’s qsize() method gives you the current backlog at each inter-stage boundary, which you can export as a metric to Prometheus or any other monitoring system.

A more advanced pattern is a counted decorator that wraps any pipeline stage and exposes throughput metrics without requiring the stage to be aware of monitoring:

from dataclasses import dataclass, field
from typing import Iterator, Iterable

@dataclass
class StageMetrics:
    name: str
    items_in: int = 0
    items_out: int = 0
    errors_skipped: int = 0

    @property
    def drop_rate(self) -> float:
        if self.items_in == 0:
            return 0.0
        return (self.items_in - self.items_out) / self.items_in

def instrumented(func, metrics: StageMetrics):
    """Wrap a generator stage to count items in, out, and dropped."""
    def wrapper(iterable: Iterable) -> Iterator:
        for item in iterable:
            metrics.items_in += 1
            try:
                for result in func([item]):
                    metrics.items_out += 1
                    yield result
            except Exception:
                metrics.errors_skipped += 1
    return wrapper

This gives you per-stage drop rates as a first-class concept. A stage that filters records will have a non-zero drop rate by design. A stage that is supposed to pass everything through with a zero drop rate but does not is a bug you can detect automatically.

Can You Restart a Pipeline?

Generator pipelines are single-pass. Once a generator is exhausted, it cannot be rewound. This constraint surprises developers coming from list-processing backgrounds, where you can iterate the same data structure as many times as you like.

The question of restartability is worth asking explicitly because it shapes how you design sources. Consider three levels:

Non-restartable sources are network streams, file handles, and other I/O sources. Once consumed, the data is gone. A pipeline built on these sources is inherently single-pass.

Restartable sources are functions or classes that create a new generator each time they are called. A pipeline built from a factory function rather than a generator object can be run multiple times:

# Not restartable: source is a generator object
source = read_lines("data.txt")  # Generator created once
pipeline_a = process(source)
list(pipeline_a)  # Exhausted
list(pipeline_a)  # Empty -- the source is gone

# Restartable: source is a callable that produces a new generator
def make_source():
    return read_lines("data.txt")

# Now you can run the pipeline multiple times
for run in range(3):
    results = list(process(make_source()))
    evaluate(results)

Partially materialized pipelines use itertools.tee or an explicit list to checkpoint the stream at a specific point, allowing the data from that point onward to be re-processed. This is the right pattern when the expensive part is producing the data (e.g., a slow network fetch) and you need to apply multiple downstream transformations to the same batch:

# Expensive to produce -- do it once
raw_records = list(fetch_from_api(batch_size=10000))

# Now apply two different downstream transformations
for_analytics = process_for_analytics(iter(raw_records))
for_storage   = process_for_storage(iter(raw_records))

save(for_storage)
analyze(for_analytics)

The underlying insight is that a pipeline is a description of computation, not the computation itself. The generator objects are one instantiation of that description. If you separate the description (the chain of stage functions) from the instantiation (the call to iter(source)), restartability becomes a property of the source, not of the pipeline architecture. This is one reason the pipe(data, *functions) pattern shown earlier explicitly accepts data as a parameter rather than closing over it: it makes reuse obvious.

When to Use What

Here is a practical decision framework:

Generator pipelines (pull-based) work best when you are transforming a stream of data from a single source through a sequence of steps. This is the common case for file processing, log analysis, and ETL operations.

Coroutine pipelines (push-based, using send()) work best when you need branching — sending data to multiple destinations from a single source. Beazley’s broadcasting example from his A Curious Course on Coroutines and Concurrency tutorial at PyCon 2009 remains the clearest demonstration of this pattern.

functools.reduce / pipe function works best for composing pure functions where each step returns a complete value (not a generator). Good for data transformation chains on individual objects.

Scikit-learn Pipeline is purpose-built for machine learning workflows where steps must share a consistent fit/transform/predict interface.

Framework-level pipelines (Apache Airflow, Luigi, Prefect, Dagster) are for orchestrating scheduled batch jobs with dependency management, retries, and monitoring. These operate at a much higher level of abstraction than language-level pipelines.

Bounded queues with threads or async are necessary the moment you parallelize stages or introduce any source that can produce items faster than the consumer processes them. An unbounded queue in a threaded pipeline is a memory exhaustion bug waiting to happen. Size your queues explicitly.

Async pipelines with asyncio.TaskGroup (Python 3.11+) are the modern solution for fan-out patterns where multiple async consumers should share a single async source. TaskGroup provides structured concurrency: all child tasks are awaited before leaving the group’s context, and if any task raises an exception, the remaining tasks are cancelled. This is significantly safer than spawning bare asyncio.create_task() calls in a pipeline and hoping they all finish. For rate-limiting within an async pipeline — for example, capping concurrent API calls to avoid throttling — asyncio.Semaphore pairs naturally with async generators as a lightweight alternative to bounded queues.

Do not use generators when you need random access to items, when you need to iterate the data more than once without re-running the source, or when the overhead of a lazy chain adds complexity without the memory benefit. If your data fits in memory and you need to sort it globally, just use a list.

Conclusion

Pipelines in Python are not a single feature. They are a pattern that emerges from the language’s deep support for iteration, lazy evaluation, and function composition. PEP 255 gave us generators. PEP 289 gave us generator expressions. PEP 342 gave us coroutine-style push pipelines. PEP 380 gave us clean sub-generator delegation. PEP 525 brought it all to async code. And Raymond Hettinger’s itertools gave us a battle-tested toolkit of composable building blocks.

The thread connecting all of these is the iterator protocol: a simple, universal interface that lets any two stages of a pipeline communicate without either one needing to know anything about the other. That is good engineering, and that is why Python is so naturally suited to pipeline thinking.

But knowing the tools is only the beginning. The questions that distinguish a practitioner from a student are the ones this article adds to the standard narrative: What happens when my producer runs faster than my consumer? How do I test a stage without touching its neighbors? How do I tell a type checker what flows between stages? What do .throw() and .close() do, and why does resource cleanup depend on them? What happens if I try to split a stream with tee between consumers that advance at different rates? How do I instrument a pipeline without modifying the stages themselves? And can I even run this pipeline twice?

None of these questions have exotic answers. They have straightforward Python answers that fit naturally into the same lazy, composable model. The iterator protocol is general enough to carry all of it. The art is knowing which question to ask before a production incident forces it.

Primary Sources & References

← Back to articles