“Broadcasting” in Polars#

Hello, everyone! Welcome back to Cameron’s Corner. This week, I want to look into performance in Polars when working with multiple DataFrames. We are going to cover “broadcasting”—a term used for aligning NumPy arrays against one another—in Polars. Polars always encourages us to be explicit when aligning and working with multiple DataFrames, but there are a few different conceptual approaches we can take to arrive at the same solution. This is where today’s micro-benchmark will come in. We want to find the fastest approach for this specific problem.

Premise#

Say we have two LazyFrames, where one is of an undetermined length and the other has a known length of 1 (after collection). We want to be able to perform arithmetic across these two LazyFrames as in the following example:

from polars import DataFrame, col, concat
from numpy.random import default_rng
rng = default_rng(0)

df1 = DataFrame({"x": rng.integers(10, size=5)})
df2 = DataFrame({"y": -2})

(
    concat([df1, df2], how='horizontal')
    .with_columns(
        result=col('x') * col('y').first()
    )
)
shape: (5, 3)
xyresult
i64i64i64
8-2-16
6null-12
5null-10
2null-4
3null-6

The approach we took above used polars.concat to combine our two DataFrames and performed an arithmetic operation between the two columns 'x' and 'y'. (Note that we used col('y').first() to notify pandas that we only want to operate on the first value in this column since the rest of the values will be null after concatenation.)

Selected Approaches#

We are going to write up and benchmark the following approaches. You can see that two of the approaches revolve around using polars.concat; two revolve around using an explicit .join; and the final approach will circumvent the need for an explicit alignment operation by collecting our smaller LazyFrame first.

  1. concat(..., how="horizontal") col('y').first()
    As seen above

  2. concat(..., how="horizontal") col('y').forward_fill()
    The same as the above, but forward_fill the y column instead of selecting the first value

  3. join(..., on="dummy")
    Create a dummy column, all with the same value, and join on that. This will repeat the value from our small LazyFrame columns against our larger LazyFrame.

  4. join(..., how="cross")
    Since we have a single row LazyFrame, we can cross-join these Frames without worrying about a cartesian product explosion. This will create the same intermediate result as the approach above.

  5. early collect
    Don’t worry about explicitly aligning the LazyFrames. Simply collect the smaller LazyFrame and work with those values as if they are scalars.

Performance Timing#

In order to benchmark the above concepts, we are going to need some type of timer. I wrote up a quick one that lets us track multiple different timings over multiple calls. You can see how it works below:

Disclaimer: I am not concerned with repeatedly testing these timings in an exhaustive manner.

from time import perf_counter, sleep
from contextlib import contextmanager
from dataclasses import dataclass, field
from polars import DataFrame

@dataclass
class Timer:
    start: float
    stop:  float
    @property
    def duration(self):
        return self.stop - self.start

@dataclass
class TimerManager:
    timers: list = field(default_factory=list)
    
    @contextmanager
    def timed(self, msg, verbose=True):
        start = perf_counter()
        yield
        stop = perf_counter()
        self.timers.append(
            (msg, timer := Timer(start, stop))
        )
        if verbose:
            print(f'{msg} elapsed {timer.duration * 1000:.2f}ms')
    
    def to_df(self, schema=['message', 'start', 'stop', 'duration[s]']):
        return DataFrame(
            data=[
                (msg, t.start, t.stop, t.duration)
                for msg, t in self.timers
            ],
            schema=schema,
            orient='row'
        )

timer = TimerManager()
with timer.timed('test ①'):
    sleep(.1)
    
with timer.timed('test ②'):
    sleep(.3)
    
timer.to_df()
test ① elapsed 100.09ms
test ② elapsed 300.41ms
shape: (2, 4)
messagestartstopduration[s]
strf64f64f64
"test ①"1.9301e61.9301e60.100087
"test ②"1.9301e61.9301e60.300415

Setup#

We’re going to create a fairly large LazyFrame to work with: 1,000,000 rows, including two unique columns in our single-row LazyFrame.

from polars import LazyFrame, col
from numpy.random import default_rng
rng = default_rng(0)

df1 = LazyFrame({"x": rng.integers(100_000, size=1_000_000)})
df2 = LazyFrame({"y1": -2, "y2": 4})

timer = TimerManager()
results = []

display(
    df1.head(5).collect(),
    df2.collect(),
)
shape: (5, 1)
x
i64
85062
63696
51113
26978
30782
shape: (1, 2)
y1y2
i64i64
-24

concat → first#

from polars import concat

with timer.timed('concat → first'):
    combined = concat([df1, df2], how="horizontal")
    res = combined.select(
        col("x") - col("x") * col("y1").first() * col("y2").first()
    ).collect()

results.append(res) # save result for later comparison
concat → first elapsed 9.54ms

concat → forward_fill#

with timer.timed('concat → ffill'):
    combined = concat([df1, df2], how="horizontal")
    res = combined.select(
        col("x") - col("x") * col("y1").forward_fill() * col("y2").forward_fill()
    ).collect()

results.append(res)
concat → ffill elapsed 17.37ms

join on dummy#

with timer.timed('join[on=dummy]'):
    joined = (
        df1.with_columns(dummy=1)
        .join(df2.with_columns(dummy=1), on="dummy")
    )
    res = joined.select(
        col("x") - col("x") * col("y1") * col("y2")
    ).collect()

results.append(res)
join[on=dummy] elapsed 23.42ms

cross join#

with timer.timed('join[cross]'):
    res = (
        df1.join(df2, how="cross")
        .select(col("x") - col("x") * col("y1") * col("y2"))
        .collect()
    )
    
results.append(res)
join[cross] elapsed 18.79ms

early collect#

with timer.timed('early collect'):
    early = df2.collect()
    res = df1.select(
        col("x") - col("x") * early["y1"] * early["y2"]
    ).collect()
    
results.append(res)
early collect elapsed 6.72ms

Verification#

Before we do anything with these timings, we should ensure that they all produce the same answers. Then we can look at a comparison of their execution times.

from itertools import pairwise
for x, y in pairwise(results):
    assert x.equals(y)
    
df = (
    timer.to_df()
    .select('message', col('duration[s]').round(4))
    .sort('duration[s]')
)

df
shape: (5, 2)
messageduration[s]
strf64
"early collect"0.0067
"concat → first"0.0095
"concat → ffill"0.0174
"join[cross]"0.0188
"join[on=dummy]"0.0234

Visual Comparison#

But what fun is a simple numeric comparison? If we’re benchmarking, we can create a data visualization to put these numbers into perspective. Let’s make a horizontal bar chart comparing the duration of these execution times.

%matplotlib inline
from matplotlib.pyplot import rc, subplots
from polars import __version__ as pl_version

rc('font', size=14)
rc('axes.spines', top=False, right=False, left=False, bottom=False)
rc('ytick', left=False)
rc('xtick', bottom=False, labelbottom=False)

unit_label, unit_mul = 'ms', 1_000
duration = df['duration[s]'] * unit_mul
fig, ax = subplots(figsize=(12, 8), facecolor='white')


elapsed_bc = ax.barh(df['message'], duration)
compare_bc = ax.barh(
    df['message'], duration.max(), zorder=0, color='gray', alpha=.1
)

ax.invert_yaxis()
ax.margins(y=0)

for elapsed, comp in zip(elapsed_bc, compare_bc):
    ax.annotate(
        f'{elapsed.get_width():.2f} {unit_label}',
        xy=(1, .5), xycoords=elapsed,
        xytext=(-5, 0), textcoords='offset points',
        ha='right',
        va='center',
        color=ax.get_facecolor(),
    )
    
    if elapsed.get_width() > duration.min():
        ax.annotate(
            f'{elapsed.get_width()/duration[0]:.2f}× slower\nthan {df[0, "message"]}',
            xy=(1, .5), xycoords=comp,
            xytext=(5, 0), textcoords='offset points',
            ha='left',
            va='center',
        )

subtitle = ax.annotate(
    f'Benchmarked on Polars: {pl_version}', 
    xy=(0, 1), xycoords=ax.transAxes,
    xytext=(0, 10), textcoords='offset points',
    va='bottom',
)

title = ax.annotate(
    f'Polars “Broadcasting” Operations',
    xy=(0, 1), xycoords=subtitle,
    xytext=(0, 5), textcoords='offset points',
    va='bottom',
    size='xx-large',
)
../_images/b343f15594e6ffab82fd040c55cae7090c0b1e54f81c1c96ba48479d1d8866e5.png

Wrap-Up#

And there you have it: a quick micro-benchmark comparing operations in Polars. There is definitely a large difference in these timings. Even though an optimizer takes a pass on your query, there are barriers that it simply cannot overcome that require a better implementation on the human-code side of the problem.

What do you think about working with multiple dataframes in Polars? How would you implement it in your code? Let me know on the DUTC Discord server.

Talk to you all next week!