DataFrame Inequality Joins#
Hello, and welcome back to Cameron’s Corner! This week, I want to follow up on two blog posts from a couple months back that discussed DataFrame Joins & Sets and DataFrame Joins & MultiSets.
Instead of speaking more about equality joins, I want to talk about inequality joins. These are a special table join operation that handles conditions when keys don’t match up perfectly, particularly when working with continuous (non-categorical) data.
Let’s take a look at a scenario where this comes up in our work: aligning two timeseries signals.
Data#
Say we have two signals: one is an event-based signal while the other is a polling signal. The difference here is that the event signal records the timestamp when an event occurred, whereas the polling signal gives us back the result at the time of check.
For our purposes, these signals will map multiple devices ('device_id'
) to
the given state ('state'
) that they are in. We have two DataFrame
s to work
with: one containing a polling signal to indicate what state the device is
in at a given timestamp 'timestamp'
and another DataFrame
that records when
a given device raised an event (in this case we collect no record as to what the
event was, simply that an event happened).
from pandas import DataFrame, Timedelta
state_polling_df = DataFrame(
columns=[ 'timestamp', 'device_id', 'state'],
data=[
['2000-01-01 04:00:00', 'abc', 'state1'],
['2000-01-01 04:30:00', 'abc', 'state1'],
['2000-01-01 05:00:00', 'abc', 'state1'],
['2000-01-01 05:30:00', 'abc', 'state3'],
['2000-01-01 06:00:00', 'abc', 'state3'],
['2000-01-01 06:30:00', 'abc', 'state2'],
['2000-01-01 07:00:00', 'abc', 'state2'],
['2000-01-01 07:30:00', 'abc', 'state1'],
['2000-01-01 04:00:00', 'def', 'state2'],
['2000-01-01 04:30:00', 'def', 'state1'],
['2000-01-01 05:00:00', 'def', 'state3'],
['2000-01-01 05:30:00', 'def', 'state3'],
['2000-01-01 06:00:00', 'def', 'state1'],
['2000-01-01 06:30:00', 'def', 'state1'],
]
).astype({'timestamp': 'datetime64[ns]'})
alert_events = DataFrame(
columns=[ 'timestamp', 'device_id'],
data=[
['2000-01-01 03:15:00', 'abc'],
['2000-01-01 04:05:00', 'abc'],
['2000-01-01 04:17:00', 'abc'],
['2000-01-01 04:44:00', 'abc'],
['2000-01-01 05:10:00', 'abc'],
['2000-01-01 05:23:00', 'abc'],
['2000-01-01 05:43:00', 'abc'],
['2000-01-01 05:53:00', 'abc'],
['2000-01-01 06:02:00', 'abc'],
['2000-01-01 06:08:00', 'abc'],
['2000-01-01 06:10:00', 'abc'],
['2000-01-01 06:23:00', 'abc'],
['2000-01-01 06:51:00', 'abc'],
['2000-01-01 03:05:00', 'def'],
['2000-01-01 04:15:00', 'def'],
['2000-01-01 04:27:00', 'def'],
['2000-01-01 04:34:00', 'def'],
['2000-01-01 05:20:00', 'def'],
['2000-01-01 05:33:00', 'def'],
['2000-01-01 06:22:00', 'def'],
['2000-01-01 06:29:00', 'def'],
['2000-01-01 06:43:00', 'def'],
['2000-01-01 07:01:00', 'def'],
]
).astype({'timestamp': 'datetime64[ns]'})
display(state_polling_df.head(), alert_events.head())
timestamp | device_id | state | |
---|---|---|---|
0 | 2000-01-01 04:00:00 | abc | state1 |
1 | 2000-01-01 04:30:00 | abc | state1 |
2 | 2000-01-01 05:00:00 | abc | state1 |
3 | 2000-01-01 05:30:00 | abc | state3 |
4 | 2000-01-01 06:00:00 | abc | state3 |
timestamp | device_id | |
---|---|---|
0 | 2000-01-01 03:15:00 | abc |
1 | 2000-01-01 04:05:00 | abc |
2 | 2000-01-01 04:17:00 | abc |
3 | 2000-01-01 04:44:00 | abc |
4 | 2000-01-01 05:10:00 | abc |
How many alert events occurred within each device & state? This is a common problem that arises when aligning these two timeseries. While there are many ways to solve it—including an “as of” join—I am going to focus on how we can solve this problem with a few data transformations and an inequality join.
An inequality join is when we align two DataFrames/Tables based on some inequality.
For example, in our two tables, we can locate all rows from the alert_events
DataFrame
that have a 'timestamp'
that is larger than each row in state_polling_df
. This lets
us join these two tables based on an inequality.
Unfortunately, neither pandas nor Polars supports inequality joins directly, so we’ll need to unravel this topic to apply its concepts to our data.
pandas#
Before we can think about alignment and joins, we first need to condense our data. Since this is polling data, we have many duplicate entries. For example, note the first three rows of data:
state_polling_df.head(3)
timestamp | device_id | state | |
---|---|---|---|
0 | 2000-01-01 04:00:00 | abc | state1 |
1 | 2000-01-01 04:30:00 | abc | state1 |
2 | 2000-01-01 05:00:00 | abc | state1 |
They all report that the same device is in the same state for 3 entries. We can condense this information to be a single row that also has a start time and stop time of the state. So, something that looks like this:
from pandas import Series
Series(
['2021-01-01 04:00:00', '2000-01-01 05:00:00', 'abc', 'state1'],
index=['start', 'stop', 'device_id', 'state'],
).to_frame().T
start | stop | device_id | state | |
---|---|---|---|---|
0 | 2021-01-01 04:00:00 | 2000-01-01 05:00:00 | abc | state1 |
We can accomplish this transformation by aggregating along a run-length encoding
of the 'state'
column by 'device_id'
. In pandas, we accomplish this by finding
the locations where a given row of data is not equal to its previous row. Then
we can aggregate along these stretches of locations.
state_df = (
state_polling_df
.assign(
state_shift=lambda d:
d.groupby('device_id')['state'].shift() != d['state'],
state_group=lambda d: d.groupby('device_id')['state_shift'].cumsum(),
)
.groupby(['device_id', 'state', 'state_group'], as_index=False)
.agg(
start=('timestamp', 'min'),
stop= ('timestamp', 'max'),
)
)
state_df.sort_values(['device_id', 'start'])
device_id | state | state_group | start | stop | |
---|---|---|---|---|---|
0 | abc | state1 | 1 | 2000-01-01 04:00:00 | 2000-01-01 05:00:00 |
3 | abc | state3 | 2 | 2000-01-01 05:30:00 | 2000-01-01 06:00:00 |
2 | abc | state2 | 3 | 2000-01-01 06:30:00 | 2000-01-01 07:00:00 |
1 | abc | state1 | 4 | 2000-01-01 07:30:00 | 2000-01-01 07:30:00 |
6 | def | state2 | 1 | 2000-01-01 04:00:00 | 2000-01-01 04:00:00 |
4 | def | state1 | 2 | 2000-01-01 04:30:00 | 2000-01-01 04:30:00 |
7 | def | state3 | 3 | 2000-01-01 05:00:00 | 2000-01-01 05:30:00 |
5 | def | state1 | 4 | 2000-01-01 06:00:00 | 2000-01-01 06:30:00 |
With a smaller dataset to operate along, we are ready to align these data. Since pandas does not support inequality joins, we are going to perform this join in an indirect manner.
First, we need to align these tables using an equality join along 'device_id'
.
This creates a result where we have a many:many join between our state_df
and
alert_events
. The idea is similar to performing a cross-join, but we are only
concerned with “crossing” the rows within each equality of "device_id"
.
state_df.merge(alert_events, on='device_id')
device_id | state | state_group | start | stop | timestamp | |
---|---|---|---|---|---|---|
0 | abc | state1 | 1 | 2000-01-01 04:00:00 | 2000-01-01 05:00:00 | 2000-01-01 03:15:00 |
1 | abc | state1 | 1 | 2000-01-01 04:00:00 | 2000-01-01 05:00:00 | 2000-01-01 04:05:00 |
2 | abc | state1 | 1 | 2000-01-01 04:00:00 | 2000-01-01 05:00:00 | 2000-01-01 04:17:00 |
3 | abc | state1 | 1 | 2000-01-01 04:00:00 | 2000-01-01 05:00:00 | 2000-01-01 04:44:00 |
4 | abc | state1 | 1 | 2000-01-01 04:00:00 | 2000-01-01 05:00:00 | 2000-01-01 05:10:00 |
... | ... | ... | ... | ... | ... | ... |
87 | def | state3 | 3 | 2000-01-01 05:00:00 | 2000-01-01 05:30:00 | 2000-01-01 05:33:00 |
88 | def | state3 | 3 | 2000-01-01 05:00:00 | 2000-01-01 05:30:00 | 2000-01-01 06:22:00 |
89 | def | state3 | 3 | 2000-01-01 05:00:00 | 2000-01-01 05:30:00 | 2000-01-01 06:29:00 |
90 | def | state3 | 3 | 2000-01-01 05:00:00 | 2000-01-01 05:30:00 | 2000-01-01 06:43:00 |
91 | def | state3 | 3 | 2000-01-01 05:00:00 | 2000-01-01 05:30:00 | 2000-01-01 07:01:00 |
92 rows × 6 columns
From here, all we need to do is filter down to our inequality. In this case, we are concerned with locating alerts that occurred between the start/stop of any given state within the device.
(
state_df
.merge(alert_events, on='device_id')
.loc[lambda d: d['timestamp'].between(d['start'], d['stop'])]
)
device_id | state | state_group | start | stop | timestamp | |
---|---|---|---|---|---|---|
1 | abc | state1 | 1 | 2000-01-01 04:00:00 | 2000-01-01 05:00:00 | 2000-01-01 04:05:00 |
2 | abc | state1 | 1 | 2000-01-01 04:00:00 | 2000-01-01 05:00:00 | 2000-01-01 04:17:00 |
3 | abc | state1 | 1 | 2000-01-01 04:00:00 | 2000-01-01 05:00:00 | 2000-01-01 04:44:00 |
38 | abc | state2 | 3 | 2000-01-01 06:30:00 | 2000-01-01 07:00:00 | 2000-01-01 06:51:00 |
45 | abc | state3 | 2 | 2000-01-01 05:30:00 | 2000-01-01 06:00:00 | 2000-01-01 05:43:00 |
46 | abc | state3 | 2 | 2000-01-01 05:30:00 | 2000-01-01 06:00:00 | 2000-01-01 05:53:00 |
68 | def | state1 | 4 | 2000-01-01 06:00:00 | 2000-01-01 06:30:00 | 2000-01-01 06:22:00 |
69 | def | state1 | 4 | 2000-01-01 06:00:00 | 2000-01-01 06:30:00 | 2000-01-01 06:29:00 |
86 | def | state3 | 3 | 2000-01-01 05:00:00 | 2000-01-01 05:30:00 | 2000-01-01 05:20:00 |
Finally, we can perform our aggregation to get the count of alerts that happened when a device was in a given state (preserving non-duplicated states).
(
state_df
.merge(alert_events, on='device_id')
.loc[lambda d: d['timestamp'].between(d['start'], d['stop'])]
.groupby(state_df.columns.tolist())['timestamp'].count().rename('count')
.reset_index()
.sort_values(['device_id', 'start'])
)
device_id | state | state_group | start | stop | count | |
---|---|---|---|---|---|---|
0 | abc | state1 | 1 | 2000-01-01 04:00:00 | 2000-01-01 05:00:00 | 3 |
2 | abc | state3 | 2 | 2000-01-01 05:30:00 | 2000-01-01 06:00:00 | 2 |
1 | abc | state2 | 3 | 2000-01-01 06:30:00 | 2000-01-01 07:00:00 | 1 |
4 | def | state3 | 3 | 2000-01-01 05:00:00 | 2000-01-01 05:30:00 | 1 |
3 | def | state1 | 4 | 2000-01-01 06:00:00 | 2000-01-01 06:30:00 | 2 |
This informs us that device "abc"
was in "state1"
from "04:00:00"
to "05:00:00"
and exhibited 3
alerts during that time. This is a very handy trick when joining
two DataFrames
that have timeseries or related continuous data.
The issue with this approach is that the join 'device_id'
can create a VERY
large table, depending on the number of unique entities present. So, while this
works for our small tables here we should not expect this to be hugely performant.
Instead we can reach for a tool like DuckDB that contains inequality joins
as a fully supported feature.
Polars#
Note
Since Polars version 1.7.0, there is now official
support for inequality joins via the join_where
method! The following content will be out of date until I can update this blog post.
Welcome back to Cameron’s Corner! This week, I want to wrap up the conversation on inequality joins from last week. We’ll take a look at some different tools and approaches by tackling combining and polling tables via an inequality join in both Polars and DuckDB.
While Polars, like pandas, does not support inequality joins, we can take the same conceptual approach as we did in the above. Since we’ve already discussed these ideas, let’s take a closer look at the code:
from polars import from_pandas, col
state_df = (
from_pandas(state_polling_df)
.group_by(
'device_id',
'state',
col('state').rle_id().over('device_id').alias('state_group')
)
.agg(
start=col('timestamp').min(),
stop= col('timestamp').max(),
)
)
(
state_df
.join(from_pandas(alert_events), on='device_id')
.filter(col('timestamp').is_between(col('start'), col('stop')))
.group_by(['device_id', 'state', 'state_group'])
.agg(
col('start', 'stop').first(),
col('timestamp').count().alias('count')
)
.sort('device_id', 'start')
)
device_id | state | state_group | start | stop | count |
---|---|---|---|---|---|
str | str | u32 | datetime[ns] | datetime[ns] | u32 |
"abc" | "state1" | 0 | 2000-01-01 04:00:00 | 2000-01-01 05:00:00 | 3 |
"abc" | "state3" | 1 | 2000-01-01 05:30:00 | 2000-01-01 06:00:00 | 2 |
"abc" | "state2" | 2 | 2000-01-01 06:30:00 | 2000-01-01 07:00:00 | 1 |
"def" | "state3" | 2 | 2000-01-01 05:00:00 | 2000-01-01 05:30:00 | 1 |
"def" | "state1" | 3 | 2000-01-01 06:00:00 | 2000-01-01 06:30:00 | 2 |
The largest difference is that Polars has a very handy rle_id
expression that abstracts away the munging we had to do in pandas with multiple
df.groupby(…)[column].shift() != df[column]
s.
After that, the rest of the code is just a translation of the pandas syntax, join
on 'device_id'
, filter by 'timestamp'
, and aggregate.
Let’s see how we would code this using DuckDB.
DuckDB#
With a big switch up in thinking, we need to use our SQL knowledge to accomplish this task. I have mentioned how DuckDB has a nice set of optimizations for inequality joins, but let’s see how we can put it into action.
Before we actually align our tables, we need to re-derive our aggregate transition states like we have done previously.
import duckdb
state_df = duckdb.query('''
with transitions as (
select
*,
case
when state != lag(state) over (partition by device_id) then 1
else 0
end as state_transitions,
from state_polling_df
),
rles as (
select
*,
sum(state_transitions) over (partition by device_id order by timestamp)
as state_group,
from transitions
)
select
device_id,
first(state) as state,
state_group,
min(timestamp) as start,
max(timestamp) as stop,
from rles
group by device_id, state_group
order by device_id, state_group, state
''')
state_df.pl()
device_id | state | state_group | start | stop |
---|---|---|---|---|
str | str | decimal[38,0] | datetime[ns] | datetime[ns] |
"abc" | "state1" | 0 | 2000-01-01 04:00:00 | 2000-01-01 05:00:00 |
"abc" | "state3" | 1 | 2000-01-01 05:30:00 | 2000-01-01 06:00:00 |
"abc" | "state2" | 2 | 2000-01-01 06:30:00 | 2000-01-01 07:00:00 |
"abc" | "state1" | 3 | 2000-01-01 07:30:00 | 2000-01-01 07:30:00 |
"def" | "state2" | 0 | 2000-01-01 04:00:00 | 2000-01-01 04:00:00 |
"def" | "state1" | 1 | 2000-01-01 04:30:00 | 2000-01-01 04:30:00 |
"def" | "state3" | 2 | 2000-01-01 05:00:00 | 2000-01-01 05:30:00 |
"def" | "state1" | 3 | 2000-01-01 06:00:00 | 2000-01-01 06:30:00 |
The above ends up being quite verbose and unfortunately requires us to split the computation for the contiguous states into two separate CTEs. This presents a slight optimization barrier to our program, but for this case, they should both be relatively relatively straightforward. I will say that the code to get to this step is much more verbose than the equivalent pandas or Polars code.
Now, let’s move on to the join and aggregation step. We use an implicit cross join
via from state_df as st, alert_events as al
, but since we also have a where
clause, DuckDB can efficiently compute our join without needing to materialize
the results first. We can also perform our aggregations in this same query.
duckdb.query('''
select
st.device_id,
st.state,
st.state_group,
first(st.start) as start,
first(st.stop) as stop,
count(al.timestamp) as n_alerts,
from state_df as st, alert_events as al
where (st.device_id = al.device_id)
and (al.timestamp between st.start and st.stop)
group by st.device_id, st.state, st.state_group
order by st.device_id, start
''').pl()
device_id | state | state_group | start | stop | n_alerts |
---|---|---|---|---|---|
str | str | decimal[38,0] | datetime[ns] | datetime[ns] | i64 |
"abc" | "state1" | 0 | 2000-01-01 04:00:00 | 2000-01-01 05:00:00 | 3 |
"abc" | "state3" | 1 | 2000-01-01 05:30:00 | 2000-01-01 06:00:00 | 2 |
"abc" | "state2" | 2 | 2000-01-01 06:30:00 | 2000-01-01 07:00:00 | 1 |
"def" | "state3" | 2 | 2000-01-01 05:00:00 | 2000-01-01 05:30:00 | 1 |
"def" | "state1" | 3 | 2000-01-01 06:00:00 | 2000-01-01 06:30:00 | 2 |
AsOf Joins#
Finally, to bring us full circle, how we can tackle this
problem without needing a range join, using the only type of inequality join
that is available in both pandas and Polars? This is an asof
join—pandas.merge_asof,
polars.join_asof, DuckDB asof join—
which takes each row from a given DataFrame
and locates the row in another DataFrame
whose value
is closest to the current row. This is commonly used for finding the nearest events
in timeseries data, but does work with any floating point values.
In a simple example:
from polars import DataFrame, concat, col
# both DataFrames must be sorted by the join key!
left = DataFrame({'a': [1, 5, 7, 10]})
right = DataFrame({'b': [1, 20, 8, 6, 4]}).sort('b')
left, right = left.lazy(), right.lazy()
queries = []
for strategy in ['forward', 'backward', 'nearest']:
queries.append(
left.join_asof(right, left_on='a', right_on='b', strategy=strategy)
.select(col('b').name.suffix(f'_{strategy}'))
)
concat([left, *queries], how='horizontal').collect()
a | b_forward | b_backward | b_nearest |
---|---|---|---|
i64 | i64 | i64 | i64 |
1 | 1 | 1 | 1 |
5 | 6 | 4 | 6 |
7 | 8 | 6 | 8 |
10 | 20 | 8 | 8 |
Note that the strategy
argument dictates the direction in which we search from
the left
. In the 'b_forward'
column, we visited each value in column 'a'
and found the next larger value in 'b'
.
With our simple example behind us, let’s take a look at the dataset we’ve worked with:
from polars import from_pandas
(
from_pandas(alert_events)
.join_asof(
# by='...' aligns subsets of data within `'device_id'`
# within each of these groups, we find the
# nearest values of `'timestamp'` to `'start'`
state_df.pl(), by='device_id', left_on='timestamp', right_on='start'
)
# due to the assumptions about our polling states, we need to filter
# after our join to ensure we only care about events who are associated
# with a non-ambiguous state.
# the join_asof ensures the timestamp >= start, so
# we just need to filter by the compliment.
.filter(col('timestamp') <= col('stop'))
.group_by(['device_id', 'state', 'state_group'])
.agg(
col('start', 'stop').first(),
col('timestamp').count().alias('count')
)
.sort('device_id', 'start')
).head(10)
device_id | state | state_group | start | stop | count |
---|---|---|---|---|---|
str | str | decimal[38,0] | datetime[ns] | datetime[ns] | u32 |
"abc" | "state1" | 0 | 2000-01-01 04:00:00 | 2000-01-01 05:00:00 | 3 |
"abc" | "state3" | 1 | 2000-01-01 05:30:00 | 2000-01-01 06:00:00 | 2 |
"abc" | "state2" | 2 | 2000-01-01 06:30:00 | 2000-01-01 07:00:00 | 1 |
"def" | "state3" | 2 | 2000-01-01 05:00:00 | 2000-01-01 05:30:00 | 1 |
"def" | "state1" | 3 | 2000-01-01 06:00:00 | 2000-01-01 06:30:00 | 2 |
All of our tools have the ability to perform AsOf joins efficiently, so this is a very useful approach. While it works in this case, there are cases where we may need to prefer the flexibility of an inequality/range join.
Wrap-Up#
We’ve solved our problem! Not just once, but four times over using three different tools. I hope you enjoyed this small series of blog posts and can incorporate more pandas, Polars, DuckDB, and join types in your work.
What do you think of my solutions? Let me know on the DUTC Discord. Talk to you all next week!