Polars Has Inequality Joins!#

It finally happened, Polars supports inequality joins — at least, if you are using version 1.7.0 or later. This week I want to tackle a familiar problem from a recent blog post I wrote covering a few different types of joins and when they are useful. This week, I will tackle the same problem in Polars, and discuss how this optimization can simplify our lives and make our compute workloads more manageable by using an inequality join.

Background#

In many data analysis scenarios, simple equality joins—matching rows where column values are exactly the same—aren’t enough. A common use case involves working with time-based data, such as tracking state changes of devices and logging alert events for those devices. Here, you don’t just want to match records by a shared device_id; you need to match based on time intervals, like determining which alerts occurred during a specific state. These kinds of joins, known as inequality joins, are essential in areas like time-series analysis, event-driven systems, and continuous monitoring, where relationships between data points depend on conditions beyond simple equality (i.e., a timestamp falling within a specific range).

Traditionally, solving this problem involves performing a full join on a common key, and then filtering the result based on conditions like time ranges. However, this approach is inefficient, as it creates a large intermediate result, only to discard most of it through filtering, leading to slower performance and higher memory consumption. To address this, Polars introduced the .join_where method, which allows inequality conditions to be specified directly within the join operation. This method significantly improves efficiency by avoiding the need to create and then filter a massive intermediate dataset, making it ideal for complex time window queries and other use cases that require range-based conditions.

The Data#

In this first block, we prepare two datasets: state_polling, which tracks the state of various devices at specific times, and alert_events, which logs timestamped events associated with the same devices.

By transforming the timestamp into a polar.Datetime datatype, we set up the data for more complex temporal queries. This step is essential as we aim to join these two datasets based on the time windows during which events occurred in a given device state.

from polars import from_records, col

state_polling = from_records(
    orient="row",
    schema=[       'timestamp', 'device_id',  'state'],
    data=[
        ['2000-01-01 04:00:00',       'abc', 'state1'],
        ['2000-01-01 04:30:00',       'abc', 'state1'],
        ['2000-01-01 05:00:00',       'abc', 'state1'],
        ['2000-01-01 05:30:00',       'abc', 'state3'],
        ['2000-01-01 06:00:00',       'abc', 'state3'],
        ['2000-01-01 06:30:00',       'abc', 'state2'],
        ['2000-01-01 07:00:00',       'abc', 'state2'],
        ['2000-01-01 07:30:00',       'abc', 'state1'],

        ['2000-01-01 04:00:00',       'def', 'state2'],
        ['2000-01-01 04:30:00',       'def', 'state1'],
        ['2000-01-01 05:00:00',       'def', 'state3'],
        ['2000-01-01 05:30:00',       'def', 'state3'],
        ['2000-01-01 06:00:00',       'def', 'state1'],
        ['2000-01-01 06:30:00',       'def', 'state1'],
    ]
).with_columns(col('timestamp').str.to_datetime())

alert_events = from_records(
    orient="row",
    schema=[       'timestamp', 'device_id'],
    data=[
        ['2000-01-01 03:15:00',       'abc'],
        ['2000-01-01 04:05:00',       'abc'],
        ['2000-01-01 04:17:00',       'abc'],
        ['2000-01-01 04:44:00',       'abc'],
        ['2000-01-01 05:10:00',       'abc'],
        ['2000-01-01 05:23:00',       'abc'],
        ['2000-01-01 05:43:00',       'abc'],
        ['2000-01-01 05:53:00',       'abc'],
        ['2000-01-01 06:02:00',       'abc'],
        ['2000-01-01 06:08:00',       'abc'],
        ['2000-01-01 06:10:00',       'abc'],
        ['2000-01-01 06:23:00',       'abc'],
        ['2000-01-01 06:51:00',       'abc'],

        ['2000-01-01 03:05:00',       'def'],
        ['2000-01-01 04:15:00',       'def'],
        ['2000-01-01 04:27:00',       'def'],
        ['2000-01-01 04:34:00',       'def'],
        ['2000-01-01 05:20:00',       'def'],
        ['2000-01-01 05:33:00',       'def'],
        ['2000-01-01 06:22:00',       'def'],
        ['2000-01-01 06:29:00',       'def'],
        ['2000-01-01 06:43:00',       'def'],
        ['2000-01-01 07:01:00',       'def'],
    ]
).with_columns(col('timestamp').str.to_datetime())

display(
    state_polling.head(),
    alert_events.head()
)
shape: (5, 3)
timestampdevice_idstate
datetime[μs]strstr
2000-01-01 04:00:00"abc""state1"
2000-01-01 04:30:00"abc""state1"
2000-01-01 05:00:00"abc""state1"
2000-01-01 05:30:00"abc""state3"
2000-01-01 06:00:00"abc""state3"
shape: (5, 2)
timestampdevice_id
datetime[μs]str
2000-01-01 03:15:00"abc"
2000-01-01 04:05:00"abc"
2000-01-01 04:17:00"abc"
2000-01-01 04:44:00"abc"
2000-01-01 05:10:00"abc"

States → Deltas#

We can condense down the state_polling table into a denser representation by capturing the transition points. This will help further reduce the work to do for the joins we are going to attempt.

With this approach, we’re going to aggregate the device’s state changes into intervals. Here, we define the time periods during which a device was in a particular state by grouping on the device_id and state. For each device-state pair, we compute the earliest and latest timestamp, forming “state shifts.”

This aggregated state_shifts DataFrame will be the backbone for our subsequent joins, helping us establish windows for the device’s state transitions.

state_shifts = (
    state_polling
    .group_by(
        'device_id',
        'state',
        col('state').rle_id().over('device_id').alias('state_group')
    )
    .agg(
        start=col('timestamp').min(),
        stop= col('timestamp').max(),
    )
)

state_shifts.sort('device_id', 'start')
shape: (8, 5)
device_idstatestate_groupstartstop
strstru32datetime[μs]datetime[μs]
"abc""state1"02000-01-01 04:00:002000-01-01 05:00:00
"abc""state3"12000-01-01 05:30:002000-01-01 06:00:00
"abc""state2"22000-01-01 06:30:002000-01-01 07:00:00
"abc""state1"32000-01-01 07:30:002000-01-01 07:30:00
"def""state2"02000-01-01 04:00:002000-01-01 04:00:00
"def""state1"12000-01-01 04:30:002000-01-01 04:30:00
"def""state3"22000-01-01 05:00:002000-01-01 05:30:00
"def""state1"32000-01-01 06:00:002000-01-01 06:30:00

Join + Filter vs Inequality Join#

To demonstrate the difference in memory pressure between our two approaches, let’s first examine the sizes of both the state_shifts and alert_events DataFrames.

print(
    f'{len(state_shifts) = }',
    f'{len(alert_events) = }',
    sep='\n'
)
len(state_shifts) = 8
len(alert_events) = 23

Join + Filter#

Let’s try the typical .join(), followed by a .filter(). This approach involves joining the datasets on "device_id" and then filtering rows based on whether the event "timestamp" falls within the device’s state time window ("start", "stop"). While this method works, it is inefficient because the full join occurs first, and filtering happens afterward, requiring more memory and time for large datasets.

(
    state_shifts
    .join(alert_events, on='device_id')
    .filter(col('timestamp').is_between(col('start'), col('stop')))
)
shape: (9, 6)
device_idstatestate_groupstartstoptimestamp
strstru32datetime[μs]datetime[μs]datetime[μs]
"abc""state1"02000-01-01 04:00:002000-01-01 05:00:002000-01-01 04:05:00
"abc""state1"02000-01-01 04:00:002000-01-01 05:00:002000-01-01 04:17:00
"abc""state1"02000-01-01 04:00:002000-01-01 05:00:002000-01-01 04:44:00
"abc""state3"12000-01-01 05:30:002000-01-01 06:00:002000-01-01 05:43:00
"abc""state3"12000-01-01 05:30:002000-01-01 06:00:002000-01-01 05:53:00
"abc""state2"22000-01-01 06:30:002000-01-01 07:00:002000-01-01 06:51:00
"def""state3"22000-01-01 05:00:002000-01-01 05:30:002000-01-01 05:20:00
"def""state1"32000-01-01 06:00:002000-01-01 06:30:002000-01-01 06:22:00
"def""state1"32000-01-01 06:00:002000-01-01 06:30:002000-01-01 06:29:00

For comparison, we demonstrate the basic join syntax without any filtering. This highlights the inefficiency of doing a large join operation upfront, especially when we need to filter the results afterward based on multiple conditions.

(
    state_shifts
    .join(alert_events, on='device_id')
    # .filter(col('timestamp').is_between(col('start'), col('stop')))
    .pipe(len) # check the size of the intermediate
)
92

The above full join creates an intermediate table that has 92 rows in it! While this is a very small amount of data overall, remember that the 2 input tables for this operation had 8 and 23 rows each so the intermediate is nearly 4× larger than the largest input table and nearly 12× larger than the smallest input table.

Because of this issue with the size of the intermediate table, we may want to opt for a more intelligent joining operation.

Inequality Join#

Finally, Polars’ .join_where method allows us to apply the filter conditions directly within the join operation. This is much more efficient because it avoids creating an overly large intermediate DataFrame, joining only the relevant rows where the timestamp from alert_events falls within the start and stop of the state. This approach streamlines the operation, leading to faster execution and lower memory usage for large datasets.

(
    state_shifts
    .join_where(
        alert_events,
        col('device_id') == col('device_id_right'), # align on matching ids
        col('timestamp') > col('start'),            # find where timestamp btwn
        col('timestamp') <= col('stop'),            #   the start/stop interval
    )
)
shape: (9, 6)
device_idstatestate_groupstartstoptimestamp
strstru32datetime[μs]datetime[μs]datetime[μs]
"abc""state1"02000-01-01 04:00:002000-01-01 05:00:002000-01-01 04:05:00
"abc""state1"02000-01-01 04:00:002000-01-01 05:00:002000-01-01 04:17:00
"abc""state1"02000-01-01 04:00:002000-01-01 05:00:002000-01-01 04:44:00
"abc""state3"12000-01-01 05:30:002000-01-01 06:00:002000-01-01 05:43:00
"abc""state3"12000-01-01 05:30:002000-01-01 06:00:002000-01-01 05:53:00
"abc""state2"22000-01-01 06:30:002000-01-01 07:00:002000-01-01 06:51:00
"def""state3"22000-01-01 05:00:002000-01-01 05:30:002000-01-01 05:20:00
"def""state1"32000-01-01 06:00:002000-01-01 06:30:002000-01-01 06:22:00
"def""state1"32000-01-01 06:00:002000-01-01 06:30:002000-01-01 06:29:00

Same result, no obvious intermediate!

Wrap-Up#

We can see that Polars’ .join_where significantly improves the efficiency of operations that involve temporal joins or conditional joins. By combining the join logic with the filter conditions, we can achieve faster performance and cleaner code, especially when working with large-scale datasets.

What do you think about this development? Do you think it will make your life easier? Let me know on the DUTC Discord server.

Talk to you all next week!