Window Functions in PySpark: The Complete Working Guide

If you have ever needed to rank rows within a group, compute a running total, or compare each row to the one before it — all without collapsing your DataFrame into an aggregated summary — you have needed window functions. They are one of the single most powerful features in PySpark, and understanding how they actually work under the hood separates people who write Spark jobs from people who write good Spark jobs.

Window functions landed in Apache Spark with version 1.4, released in June 2015 with contributions from over 210 developers. The Databricks engineering blog announced the feature that July, describing the problem window functions solve in a way that still holds up a decade later: before Spark 1.4, you could operate on a single row with built-in functions, or collapse a group of rows into one value with aggregate functions, but there was no way to operate across a group of rows while still returning a value for every individual row. The Window class has been available since version 1.4.0, and the API has remained remarkably stable since then — code you write today would look familiar to someone who wrote window functions in 2015.

That stability matters. With Spark 4.0 released in 2025 bringing Spark Connect, native PySpark plotting, and a lightweight 1.5 MB pyspark-client package, window functions continue to work exactly as they always have, now with full Spark Connect compatibility since version 3.4. This article walks through every category of window function PySpark offers, with working code you can run, traps you should know about, and the mental model that makes window specifications click.

The Core Concept: What a Window Actually Is

A window function calculates a return value for every input row based on a group of related rows called a frame. Unlike groupBy, which collapses rows into summaries, window functions keep every row intact and attach new computed columns alongside the original data.

Every window specification has up to three components:

Partitioning divides the DataFrame into independent groups. Rows in different partitions never interact during the window calculation. This is analogous to GROUP BY, except the rows stay separate.

Ordering sorts the rows within each partition. For ranking and analytic functions, ordering is mandatory — without it, there is no meaningful concept of "first," "next," or "previous."

Frame specification defines exactly which rows in the partition participate in the calculation for any given row. This is where things get interesting, and where many people get tripped up.

Here is the basic pattern in PySpark:

from pyspark.sql.window import Window

window_spec = Window.partitionBy("department").orderBy("salary")

That single line creates a reusable specification. You then apply it with the .over() method on any window function:

from pyspark.sql.functions import row_number

df.withColumn("rank_in_dept", row_number().over(window_spec))

Setting Up a Working Example

Every code example in this article runs against the same DataFrame, so you can follow along in a notebook or spark-shell:

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import (
    row_number, rank, dense_rank, percent_rank, ntile,
    lag, lead, cume_dist,
    sum as spark_sum, avg as spark_avg, min as spark_min,
    max as spark_max, count,
    col, desc, when, lit
)

spark = SparkSession.builder.appName("WindowFunctions").getOrCreate()

data = [
    ("Alice",   "Engineering", 95000,  "2020-03-15"),
    ("Bob",     "Engineering", 110000, "2018-07-01"),
    ("Carol",   "Engineering", 95000,  "2021-01-10"),
    ("Dan",     "Engineering", 120000, "2017-04-22"),
    ("Eve",     "Sales",       72000,  "2019-06-01"),
    ("Frank",   "Sales",       85000,  "2018-11-15"),
    ("Grace",   "Sales",       85000,  "2020-09-01"),
    ("Hank",    "Sales",       68000,  "2021-03-20"),
    ("Iris",    "Marketing",   78000,  "2019-01-10"),
    ("Jack",    "Marketing",   92000,  "2017-08-15"),
    ("Karen",   "Marketing",   88000,  "2020-05-01"),
]

columns = ["name", "department", "salary", "hire_date"]
df = spark.createDataFrame(data, columns)

Ranking Functions: row_number, rank, dense_rank

Ranking functions assign a positional value to each row within its partition based on the ordering you specify. PySpark provides three, and the differences matter more than people expect.

window_spec = Window.partitionBy("department").orderBy(desc("salary"))

ranked = df.withColumn("row_num", row_number().over(window_spec)) \
           .withColumn("rank", rank().over(window_spec)) \
           .withColumn("dense_rank", dense_rank().over(window_spec))

ranked.select("name", "department", "salary",
              "row_num", "rank", "dense_rank").show()

Output (Engineering partition):

+-----+-----------+------+-------+----+----------+
| name| department|salary|row_num|rank|dense_rank|
+-----+-----------+------+-------+----+----------+
|  Dan|Engineering|120000|      1|   1|         1|
|  Bob|Engineering|110000|      2|   2|         2|
|Alice|Engineering| 95000|      3|   3|         3|
|Carol|Engineering| 95000|      4|   3|         3|
+-----+-----------+------+-------+----+----------+

Look at Alice and Carol. They earn the same salary, so each function handles the tie differently: row_number() assigns them arbitrary but unique numbers (3 and 4) — the assignment is nondeterministic for ties, so Spark does not guarantee which one gets 3 and which gets 4 across different runs. rank() gives both the same rank (3) but skips the next position, so the next distinct value would get rank 5, not 4. dense_rank() gives both rank 3 and the next distinct value gets rank 4, with no gap.

Pro Tip

For selecting exactly the top N rows per group, row_number() is the right choice because it guarantees uniqueness — filter on row_num <= N and you get exactly N rows per partition. For analytical reporting where ties should be visible and gaps meaningful (like Olympic medal rankings), use rank(). For grading systems or bucketing where you want consecutive integers regardless of ties, use dense_rank().

Two more ranking-adjacent functions worth knowing:

ranked_extended = df.withColumn(
    "percentile", percent_rank().over(window_spec)
).withColumn(
    "quartile", ntile(4).over(window_spec)
)

percent_rank() returns a value between 0.0 and 1.0 representing each row's relative position, calculated as (rank - 1) / (total_rows - 1). ntile(n) divides the ordered partition into n roughly equal buckets and labels each row with its bucket number (1 through n). Both are useful for percentile analysis and distribution bucketing.

Analytic Functions: lag and lead

The lag() and lead() functions reach backward and forward through the ordered partition to access values from other rows. They eliminate the need for self-joins, which is a significant performance win on large DataFrames.

window_spec = Window.partitionBy("department").orderBy("hire_date")

comparisons = df.withColumn(
    "prev_salary", lag("salary", 1).over(window_spec)
).withColumn(
    "next_salary", lead("salary", 1).over(window_spec)
).withColumn(
    "salary_change_from_prev",
    col("salary") - col("prev_salary")
)

comparisons.select("name", "department", "hire_date", "salary",
                   "prev_salary", "next_salary",
                   "salary_change_from_prev").show()

The first argument is the column to retrieve, and the second is the offset (how many rows back or forward). You can also provide a third argument as a default value when there is no row at the specified offset:

# Use 0 instead of null for the first row in each partition
df.withColumn(
    "prev_salary", lag("salary", 1, 0).over(window_spec)
)

A classic real-world application is calculating period-over-period changes in time series data. Stock price analysis, monthly revenue comparisons, and churn rate tracking all rely heavily on lag() and lead().

# Calculate month-over-month percentage change
window_spec = Window.partitionBy("stock_symbol").orderBy("trade_date")

stock_df.withColumn(
    "prev_close", lag("closing_price", 1).over(window_spec)
).withColumn(
    "pct_change",
    ((col("closing_price") - col("prev_close")) / col("prev_close")) * 100
)

Aggregate Window Functions: Running Totals and Moving Averages

Any standard aggregate function — sum, avg, min, max, count — can be used as a window function. When applied over a window specification, these functions compute their result across the frame for each row, rather than collapsing the group into a single output row.

window_spec = Window.partitionBy("department").orderBy("salary")

aggregated = df.withColumn(
    "dept_total", spark_sum("salary").over(
        Window.partitionBy("department")
    )
).withColumn(
    "running_total", spark_sum("salary").over(window_spec)
).withColumn(
    "dept_avg", spark_avg("salary").over(
        Window.partitionBy("department")
    )
).withColumn(
    "running_count", count("*").over(window_spec)
)
Note

When you only use partitionBy without orderBy, the aggregate applies across the entire partition — every row gets the same total. When you add orderBy, Spark uses a default frame of RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, which creates a running calculation. This default frame behavior catches people off guard, and it is the single most important thing to understand about window frames.

The Frame Trap: rowsBetween vs. rangeBetween

When you define a window with orderBy, Spark silently applies a default frame. If you do not explicitly set the frame, you get rangeBetween(Window.unboundedPreceding, Window.currentRow). This is a range frame, not a rows frame, and the distinction matters enormously.

rowsBetween counts physical rows. rowsBetween(-2, 0) means "the two rows before this one and the current row," regardless of what values those rows contain.

rangeBetween counts by the value of the ordering column. rangeBetween(-100, 0) means "all rows whose ordering column value is within 100 units of the current row's value."

Warning

The code below does not compute a 3-row moving average when there are ties in the ordering column. Because the default frame is rangeBetween, tied salary values get grouped together, producing unexpected aggregation boundaries. If Alice and Carol both earn 95,000, the range frame includes both of them in the same calculation even though they are separate rows.

# DANGER: This does NOT compute a 3-row moving average
# when there are ties in the ordering column
window_bad = Window.partitionBy("department").orderBy("salary")
df.withColumn("moving_avg", spark_avg("salary").over(window_bad))

The fix is explicit:

# Explicit rows-based 3-row moving average
window_explicit = Window.partitionBy("department") \
    .orderBy("salary") \
    .rowsBetween(-2, Window.currentRow)

df.withColumn("moving_avg_3", spark_avg("salary").over(window_explicit))

For most analytical use cases — moving averages, running totals, sliding window computations — you want rowsBetween. Reserve rangeBetween for when you genuinely need value-based boundaries, such as including all transactions within a dollar amount range of the current row, or all events within a time range.

The special constants Window.unboundedPreceding, Window.unboundedFollowing, and Window.currentRow define the edges:

# Cumulative sum from partition start to current row
cumulative = Window.partitionBy("department") \
    .orderBy("salary") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Entire partition (same as no orderBy)
full_partition = Window.partitionBy("department") \
    .orderBy("salary") \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# Centered 5-row window (2 before, current, 2 after)
centered = Window.partitionBy("department") \
    .orderBy("salary") \
    .rowsBetween(-2, 2)

Combining Multiple Window Functions

Real analysis rarely uses a single window function in isolation. Here is a pattern that combines ranking, aggregation, and comparison to build an employee performance dashboard:

salary_window = Window.partitionBy("department").orderBy(desc("salary"))
dept_window = Window.partitionBy("department")
hire_window = Window.partitionBy("department").orderBy("hire_date")

dashboard = df \
    .withColumn("salary_rank", dense_rank().over(salary_window)) \
    .withColumn("dept_avg_salary",
                spark_avg("salary").over(dept_window)) \
    .withColumn("salary_vs_avg",
                col("salary") - col("dept_avg_salary")) \
    .withColumn("dept_max_salary",
                spark_max("salary").over(dept_window)) \
    .withColumn("pct_of_max",
                (col("salary") / col("dept_max_salary") * 100).cast("int")) \
    .withColumn("tenure_order", row_number().over(hire_window)) \
    .withColumn("prev_hire_salary", lag("salary", 1).over(hire_window))

dashboard.select(
    "name", "department", "salary", "salary_rank",
    "salary_vs_avg", "pct_of_max", "tenure_order"
).show()

Each withColumn call can reference a different window specification. Spark's Catalyst optimizer recognizes when multiple window functions share the same partitioning and ordering, and it consolidates them into a single sort and partition pass internally. This means adding multiple window functions that share a specification is far cheaper than you might expect.

Using Window Functions with Spark SQL

Everything available through the DataFrame API is also available through Spark SQL, and some people find the SQL syntax more readable for complex window expressions:

df.createOrReplaceTempView("employees")

result = spark.sql("""
    SELECT
        name,
        department,
        salary,
        RANK() OVER (
            PARTITION BY department ORDER BY salary DESC
        ) AS salary_rank,
        salary - AVG(salary) OVER (
            PARTITION BY department
        ) AS diff_from_avg,
        SUM(salary) OVER (
            PARTITION BY department
            ORDER BY hire_date
            ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
        ) AS running_total
    FROM employees
""")

result.show()

The SQL syntax maps directly to the DataFrame API. PARTITION BY corresponds to partitionBy, ORDER BY to orderBy, and ROWS BETWEEN / RANGE BETWEEN to rowsBetween / rangeBetween.

The Top-N Per Group Pattern

This is probably the single most common use of window functions in production PySpark code. You want the top N items from each group — the top 3 highest-paid employees per department, the 5 best-selling products per region, the latest 10 transactions per customer.

window_spec = Window.partitionBy("department").orderBy(desc("salary"))

top_2_per_dept = df \
    .withColumn("rn", row_number().over(window_spec)) \
    .filter(col("rn") <= 2) \
    .drop("rn")

top_2_per_dept.show()
+-----+-----------+------+----------+
| name| department|salary| hire_date|
+-----+-----------+------+----------+
|  Dan|Engineering|120000|2017-04-22|
|  Bob|Engineering|110000|2018-07-01|
|Frank|      Sales| 85000|2018-11-15|
|Grace|      Sales| 85000|2020-09-01|
| Jack|  Marketing| 92000|2017-08-15|
|Karen|  Marketing| 88000|2020-05-01|
+-----+-----------+------+----------+

Use row_number() when you need exactly N rows (it breaks ties arbitrarily). Use dense_rank() when you want all rows that tie for the top N distinct values — which might return more than N rows.

Performance: What Happens Under the Hood

Window functions are not free. Understanding the execution mechanics helps you write efficient pipelines.

Shuffling: partitionBy triggers a shuffle. All rows for the same partition key must land on the same executor. If your partition key has high cardinality (millions of unique values), you are creating millions of tiny partitions with significant overhead. If it has very low cardinality (one or two values), you concentrate all work on a few executors, wasting the rest of your cluster.

Sorting: orderBy sorts within each partition. This happens after the shuffle and is an additional O(n log n) cost per partition.

Frame computation: For each row, Spark evaluates the window function across the frame. Unbounded frames are optimized internally — Spark does not recompute from scratch for each row but uses incremental computation where possible.

# GOOD: Multiple window functions sharing the same spec
# Spark consolidates into one sort pass
spec = Window.partitionBy("department").orderBy("salary")
df.withColumn("rn", row_number().over(spec)) \
  .withColumn("rnk", rank().over(spec)) \
  .withColumn("running", spark_sum("salary").over(spec))

# COSTLY: Different ordering in each window
# Forces separate sort passes
spec_a = Window.partitionBy("department").orderBy("salary")
spec_b = Window.partitionBy("department").orderBy("hire_date")
df.withColumn("salary_rank", rank().over(spec_a)) \
  .withColumn("tenure_rank", rank().over(spec_b))

When different window specifications share the same partitionBy but have different orderBy clauses, Spark must sort the partition twice. This is unavoidable but worth knowing about, so you can structure your analysis to minimize the number of distinct sort orders when processing very large datasets.

Common Mistakes and How to Avoid Them

Forgetting orderBy with ranking functions. If you call row_number() over a window that has no orderBy, the results are nondeterministic. PySpark may not raise an error, but the output is meaningless.

Assuming row_number is deterministic with ties. When two rows have the same value in the ordering column, row_number() assigns them arbitrary distinct numbers. If you need reproducible tie-breaking, add a secondary sort column:

# Deterministic: ties broken by name
window_spec = Window.partitionBy("department") \
    .orderBy(desc("salary"), "name")

Using rangeBetween when you mean rowsBetween. This is subtle and produces silently wrong results. If your ordering column has duplicate values, rangeBetween groups the duplicates together, which inflates your running totals or moving averages. Always use rowsBetween for fixed-size sliding windows.

Not specifying a frame when using aggregate window functions with orderBy. The default RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW is often not what you want. Be explicit about your frame.

Ignoring null handling in lag and lead. By default, lag() and lead() return null when they reach beyond the partition boundary. If downstream calculations depend on these values, either provide a default or handle nulls explicitly:

from pyspark.sql.functions import coalesce

df.withColumn(
    "prev_salary",
    coalesce(lag("salary", 1).over(window_spec), col("salary"))
)

A Complete Pipeline: Detecting Salary Outliers by Department

Here is an end-to-end example that uses multiple window function categories together to identify salary outliers within each department:

from pyspark.sql.functions import stddev

dept_window = Window.partitionBy("department")
rank_window = Window.partitionBy("department").orderBy(desc("salary"))

analysis = df \
    .withColumn("dept_avg", spark_avg("salary").over(dept_window)) \
    .withColumn("dept_stddev", stddev("salary").over(dept_window)) \
    .withColumn("z_score",
                (col("salary") - col("dept_avg")) / col("dept_stddev")) \
    .withColumn("salary_rank", dense_rank().over(rank_window)) \
    .withColumn("dept_count", count("*").over(dept_window)) \
    .withColumn("percentile", percent_rank().over(rank_window)) \
    .withColumn("is_outlier",
                when(col("z_score").isNull(), False)
                .when((col("z_score") > 2) | (col("z_score") < -2), True)
                .otherwise(False))

analysis.select(
    "name", "department", "salary",
    "dept_avg", "z_score", "salary_rank",
    "percentile", "is_outlier"
).orderBy("department", desc("salary")).show(truncate=False)

This pipeline computes per-department statistics (mean, standard deviation), assigns z-scores, ranks employees, calculates percentiles, and flags outliers — all without a single groupBy that would destroy the row-level detail. Every employee retains their individual record while gaining context about where they stand relative to their department.

Key Takeaways

  1. Windows keep every row: Unlike groupBy, window functions compute across groups while returning a value for every individual row. The three components — partition, order, and frame — control exactly which rows participate in each calculation.
  2. Choose the right ranking function: Use row_number() for unique sequential integers, rank() when gaps at ties are meaningful, and dense_rank() when you want consecutive integers regardless of ties.
  3. Always be explicit about frames: The implicit default frame when you add orderBy is rangeBetween(unboundedPreceding, currentRow). For sliding window calculations, always specify rowsBetween explicitly to avoid silent errors with tied values.
  4. Use lag and lead instead of self-joins: Reaching backward or forward through ordered partitions with lag() and lead() is far more performant than joining a DataFrame to itself for the same result.
  5. Consolidate window specs for performance: Multiple window functions sharing the same partitionBy and orderBy are consolidated into a single sort pass. Different sort orders force separate passes — minimize distinct sort orders on large datasets.

Window functions have been part of PySpark since 2015 and remain one of its defining analytical capabilities. The API is stable, the optimization is mature, and the pattern applies everywhere data is grouped, ordered, and compared.

back to articles