Polars Has Inequality Joins!#
It finally happened, Polars supports inequality joins — at least, if you are
using version 1.7.0
or later.
This week I want to tackle a familiar problem from
a recent blog post I wrote covering a few
different types of joins and when they are useful. This week, I will tackle
the same problem in Polars, and discuss how this optimization can simplify
our lives and make our compute workloads more manageable by using an inequality
join.
Background#
In many data analysis scenarios, simple equality joins—matching rows where column values are exactly the same—aren’t enough. A common use case involves working with time-based data, such as tracking state changes of devices and logging alert events for those devices. Here, you don’t just want to match records by a shared device_id; you need to match based on time intervals, like determining which alerts occurred during a specific state. These kinds of joins, known as inequality joins, are essential in areas like time-series analysis, event-driven systems, and continuous monitoring, where relationships between data points depend on conditions beyond simple equality (i.e., a timestamp falling within a specific range).
Traditionally, solving this problem involves performing a full join on a common key, and then filtering the result based on conditions like time ranges. However, this approach is inefficient, as it creates a large intermediate result, only to discard most of it through filtering, leading to slower performance and higher memory consumption. To address this, Polars introduced the .join_where method, which allows inequality conditions to be specified directly within the join operation. This method significantly improves efficiency by avoiding the need to create and then filter a massive intermediate dataset, making it ideal for complex time window queries and other use cases that require range-based conditions.
The Data#
In this first block, we prepare two datasets: state_polling
, which tracks the
state of various devices at specific times, and alert_events
, which logs
timestamped events associated with the same devices.
By transforming the timestamp
into a polar.Datetime
datatype, we set up the
data for more complex temporal queries. This step is essential as we aim to
join these two datasets based on the time windows during which events occurred
in a given device state.
from polars import from_records, col
state_polling = from_records(
orient="row",
schema=[ 'timestamp', 'device_id', 'state'],
data=[
['2000-01-01 04:00:00', 'abc', 'state1'],
['2000-01-01 04:30:00', 'abc', 'state1'],
['2000-01-01 05:00:00', 'abc', 'state1'],
['2000-01-01 05:30:00', 'abc', 'state3'],
['2000-01-01 06:00:00', 'abc', 'state3'],
['2000-01-01 06:30:00', 'abc', 'state2'],
['2000-01-01 07:00:00', 'abc', 'state2'],
['2000-01-01 07:30:00', 'abc', 'state1'],
['2000-01-01 04:00:00', 'def', 'state2'],
['2000-01-01 04:30:00', 'def', 'state1'],
['2000-01-01 05:00:00', 'def', 'state3'],
['2000-01-01 05:30:00', 'def', 'state3'],
['2000-01-01 06:00:00', 'def', 'state1'],
['2000-01-01 06:30:00', 'def', 'state1'],
]
).with_columns(col('timestamp').str.to_datetime())
alert_events = from_records(
orient="row",
schema=[ 'timestamp', 'device_id'],
data=[
['2000-01-01 03:15:00', 'abc'],
['2000-01-01 04:05:00', 'abc'],
['2000-01-01 04:17:00', 'abc'],
['2000-01-01 04:44:00', 'abc'],
['2000-01-01 05:10:00', 'abc'],
['2000-01-01 05:23:00', 'abc'],
['2000-01-01 05:43:00', 'abc'],
['2000-01-01 05:53:00', 'abc'],
['2000-01-01 06:02:00', 'abc'],
['2000-01-01 06:08:00', 'abc'],
['2000-01-01 06:10:00', 'abc'],
['2000-01-01 06:23:00', 'abc'],
['2000-01-01 06:51:00', 'abc'],
['2000-01-01 03:05:00', 'def'],
['2000-01-01 04:15:00', 'def'],
['2000-01-01 04:27:00', 'def'],
['2000-01-01 04:34:00', 'def'],
['2000-01-01 05:20:00', 'def'],
['2000-01-01 05:33:00', 'def'],
['2000-01-01 06:22:00', 'def'],
['2000-01-01 06:29:00', 'def'],
['2000-01-01 06:43:00', 'def'],
['2000-01-01 07:01:00', 'def'],
]
).with_columns(col('timestamp').str.to_datetime())
display(
state_polling.head(),
alert_events.head()
)
timestamp | device_id | state |
---|---|---|
datetime[μs] | str | str |
2000-01-01 04:00:00 | "abc" | "state1" |
2000-01-01 04:30:00 | "abc" | "state1" |
2000-01-01 05:00:00 | "abc" | "state1" |
2000-01-01 05:30:00 | "abc" | "state3" |
2000-01-01 06:00:00 | "abc" | "state3" |
timestamp | device_id |
---|---|
datetime[μs] | str |
2000-01-01 03:15:00 | "abc" |
2000-01-01 04:05:00 | "abc" |
2000-01-01 04:17:00 | "abc" |
2000-01-01 04:44:00 | "abc" |
2000-01-01 05:10:00 | "abc" |
States → Deltas#
We can condense down the state_polling
table into a denser representation by
capturing the transition points. This will help further reduce the work to do for the joins we are going to attempt.
With this approach, we’re going to aggregate the device’s state changes into intervals. Here, we define the time periods during which a device was in a particular state by grouping on the device_id and state. For each device-state pair, we compute the earliest and latest timestamp, forming “state shifts.”
This aggregated state_shifts DataFrame will be the backbone for our subsequent joins, helping us establish windows for the device’s state transitions.
state_shifts = (
state_polling
.group_by(
'device_id',
'state',
col('state').rle_id().over('device_id').alias('state_group')
)
.agg(
start=col('timestamp').min(),
stop= col('timestamp').max(),
)
)
state_shifts.sort('device_id', 'start')
device_id | state | state_group | start | stop |
---|---|---|---|---|
str | str | u32 | datetime[μs] | datetime[μs] |
"abc" | "state1" | 0 | 2000-01-01 04:00:00 | 2000-01-01 05:00:00 |
"abc" | "state3" | 1 | 2000-01-01 05:30:00 | 2000-01-01 06:00:00 |
"abc" | "state2" | 2 | 2000-01-01 06:30:00 | 2000-01-01 07:00:00 |
"abc" | "state1" | 3 | 2000-01-01 07:30:00 | 2000-01-01 07:30:00 |
"def" | "state2" | 0 | 2000-01-01 04:00:00 | 2000-01-01 04:00:00 |
"def" | "state1" | 1 | 2000-01-01 04:30:00 | 2000-01-01 04:30:00 |
"def" | "state3" | 2 | 2000-01-01 05:00:00 | 2000-01-01 05:30:00 |
"def" | "state1" | 3 | 2000-01-01 06:00:00 | 2000-01-01 06:30:00 |
Join + Filter vs Inequality Join#
To demonstrate the difference in memory pressure between our two approaches, let’s
first examine the sizes of both the state_shifts
and alert_events
DataFrames.
print(
f'{len(state_shifts) = }',
f'{len(alert_events) = }',
sep='\n'
)
len(state_shifts) = 8
len(alert_events) = 23
Join + Filter#
Let’s try the typical .join()
, followed by a .filter()
. This approach involves
joining the datasets on "device_id"
and then filtering rows based on whether the
event "timestamp"
falls within the device’s state time window ("start"
, "stop"
).
While this method works, it is inefficient because the full join occurs first, and
filtering happens afterward, requiring more memory and time for large datasets.
(
state_shifts
.join(alert_events, on='device_id')
.filter(col('timestamp').is_between(col('start'), col('stop')))
)
device_id | state | state_group | start | stop | timestamp |
---|---|---|---|---|---|
str | str | u32 | datetime[μs] | datetime[μs] | datetime[μs] |
"abc" | "state1" | 0 | 2000-01-01 04:00:00 | 2000-01-01 05:00:00 | 2000-01-01 04:05:00 |
"abc" | "state1" | 0 | 2000-01-01 04:00:00 | 2000-01-01 05:00:00 | 2000-01-01 04:17:00 |
"abc" | "state1" | 0 | 2000-01-01 04:00:00 | 2000-01-01 05:00:00 | 2000-01-01 04:44:00 |
"abc" | "state3" | 1 | 2000-01-01 05:30:00 | 2000-01-01 06:00:00 | 2000-01-01 05:43:00 |
"abc" | "state3" | 1 | 2000-01-01 05:30:00 | 2000-01-01 06:00:00 | 2000-01-01 05:53:00 |
"abc" | "state2" | 2 | 2000-01-01 06:30:00 | 2000-01-01 07:00:00 | 2000-01-01 06:51:00 |
"def" | "state3" | 2 | 2000-01-01 05:00:00 | 2000-01-01 05:30:00 | 2000-01-01 05:20:00 |
"def" | "state1" | 3 | 2000-01-01 06:00:00 | 2000-01-01 06:30:00 | 2000-01-01 06:22:00 |
"def" | "state1" | 3 | 2000-01-01 06:00:00 | 2000-01-01 06:30:00 | 2000-01-01 06:29:00 |
For comparison, we demonstrate the basic join syntax without any filtering. This highlights the inefficiency of doing a large join operation upfront, especially when we need to filter the results afterward based on multiple conditions.
(
state_shifts
.join(alert_events, on='device_id')
# .filter(col('timestamp').is_between(col('start'), col('stop')))
.pipe(len) # check the size of the intermediate
)
92
The above full join creates an intermediate table that has 92 rows in it! While this is a very small amount of data overall, remember that the 2 input tables for this operation had 8 and 23 rows each so the intermediate is nearly 4× larger than the largest input table and nearly 12× larger than the smallest input table.
Because of this issue with the size of the intermediate table, we may want to opt for a more intelligent joining operation.
Inequality Join#
Finally, Polars’ .join_where method allows us to apply the filter conditions
directly within the join operation. This is much more efficient because it
avoids creating an overly large intermediate DataFrame, joining only the
relevant rows where the timestamp from alert_events
falls within the start
and stop of the state. This approach streamlines the operation, leading to
faster execution and lower memory usage for large datasets.
(
state_shifts
.join_where(
alert_events,
col('device_id') == col('device_id_right'), # align on matching ids
col('timestamp') > col('start'), # find where timestamp btwn
col('timestamp') <= col('stop'), # the start/stop interval
)
)
device_id | state | state_group | start | stop | timestamp |
---|---|---|---|---|---|
str | str | u32 | datetime[μs] | datetime[μs] | datetime[μs] |
"abc" | "state1" | 0 | 2000-01-01 04:00:00 | 2000-01-01 05:00:00 | 2000-01-01 04:05:00 |
"abc" | "state1" | 0 | 2000-01-01 04:00:00 | 2000-01-01 05:00:00 | 2000-01-01 04:17:00 |
"abc" | "state1" | 0 | 2000-01-01 04:00:00 | 2000-01-01 05:00:00 | 2000-01-01 04:44:00 |
"abc" | "state3" | 1 | 2000-01-01 05:30:00 | 2000-01-01 06:00:00 | 2000-01-01 05:43:00 |
"abc" | "state3" | 1 | 2000-01-01 05:30:00 | 2000-01-01 06:00:00 | 2000-01-01 05:53:00 |
"abc" | "state2" | 2 | 2000-01-01 06:30:00 | 2000-01-01 07:00:00 | 2000-01-01 06:51:00 |
"def" | "state3" | 2 | 2000-01-01 05:00:00 | 2000-01-01 05:30:00 | 2000-01-01 05:20:00 |
"def" | "state1" | 3 | 2000-01-01 06:00:00 | 2000-01-01 06:30:00 | 2000-01-01 06:22:00 |
"def" | "state1" | 3 | 2000-01-01 06:00:00 | 2000-01-01 06:30:00 | 2000-01-01 06:29:00 |
Same result, no obvious intermediate!
Wrap-Up#
We can see that Polars’ .join_where
significantly improves the efficiency of
operations that involve temporal joins or conditional joins. By combining the
join logic with the filter conditions, we can achieve faster performance and
cleaner code, especially when working with large-scale datasets.
What do you think about this development? Do you think it will make your life easier? Let me know on the DUTC Discord server.
Talk to you all next week!