DataFrame Inequality Joins#

Hello, and welcome back to Cameron’s Corner! This week, I want to follow up on two blog posts from a couple months back that discussed DataFrame Joins & Sets and DataFrame Joins & MultiSets.

Instead of speaking more about equality joins, I want to talk about inequality joins. These are a special table join operation that handles conditions when keys don’t match up perfectly, particularly when working with continuous (non-categorical) data.

Let’s take a look at a scenario where this comes up in our work: aligning two timeseries signals.

Data#

Say we have two signals: one is an event-based signal while the other is a polling signal. The difference here is that the event signal records the timestamp when an event occurred, whereas the polling signal gives us back the result at the time of check.

For our purposes, these signals will map multiple devices ('device_id') to the given state ('state') that they are in. We have two DataFrames to work with: one containing a polling signal to indicate what state the device is in at a given timestamp 'timestamp' and another DataFrame that records when a given device raised an event (in this case we collect no record as to what the event was, simply that an event happened).

from pandas import DataFrame, Timedelta

state_polling_df = DataFrame(
    columns=[      'timestamp', 'device_id',  'state'],
    data=[
        ['2000-01-01 04:00:00',       'abc', 'state1'],
        ['2000-01-01 04:30:00',       'abc', 'state1'],
        ['2000-01-01 05:00:00',       'abc', 'state1'],
        ['2000-01-01 05:30:00',       'abc', 'state3'],
        ['2000-01-01 06:00:00',       'abc', 'state3'],
        ['2000-01-01 06:30:00',       'abc', 'state2'],
        ['2000-01-01 07:00:00',       'abc', 'state2'],
        ['2000-01-01 07:30:00',       'abc', 'state1'],

        ['2000-01-01 04:00:00',       'def', 'state2'],
        ['2000-01-01 04:30:00',       'def', 'state1'],
        ['2000-01-01 05:00:00',       'def', 'state3'],
        ['2000-01-01 05:30:00',       'def', 'state3'],
        ['2000-01-01 06:00:00',       'def', 'state1'],
        ['2000-01-01 06:30:00',       'def', 'state1'],
    ]
).astype({'timestamp': 'datetime64[ns]'})

alert_events = DataFrame(
    columns=[      'timestamp', 'device_id'],
    data=[
        ['2000-01-01 03:15:00',       'abc'],
        ['2000-01-01 04:05:00',       'abc'],
        ['2000-01-01 04:17:00',       'abc'],
        ['2000-01-01 04:44:00',       'abc'],
        ['2000-01-01 05:10:00',       'abc'],
        ['2000-01-01 05:23:00',       'abc'],
        ['2000-01-01 05:43:00',       'abc'],
        ['2000-01-01 05:53:00',       'abc'],
        ['2000-01-01 06:02:00',       'abc'],
        ['2000-01-01 06:08:00',       'abc'],
        ['2000-01-01 06:10:00',       'abc'],
        ['2000-01-01 06:23:00',       'abc'],
        ['2000-01-01 06:51:00',       'abc'],

        ['2000-01-01 03:05:00',       'def'],
        ['2000-01-01 04:15:00',       'def'],
        ['2000-01-01 04:27:00',       'def'],
        ['2000-01-01 04:34:00',       'def'],
        ['2000-01-01 05:20:00',       'def'],
        ['2000-01-01 05:33:00',       'def'],
        ['2000-01-01 06:22:00',       'def'],
        ['2000-01-01 06:29:00',       'def'],
        ['2000-01-01 06:43:00',       'def'],
        ['2000-01-01 07:01:00',       'def'],
    ]
).astype({'timestamp': 'datetime64[ns]'})

display(state_polling_df.head(), alert_events.head())
timestamp device_id state
0 2000-01-01 04:00:00 abc state1
1 2000-01-01 04:30:00 abc state1
2 2000-01-01 05:00:00 abc state1
3 2000-01-01 05:30:00 abc state3
4 2000-01-01 06:00:00 abc state3
timestamp device_id
0 2000-01-01 03:15:00 abc
1 2000-01-01 04:05:00 abc
2 2000-01-01 04:17:00 abc
3 2000-01-01 04:44:00 abc
4 2000-01-01 05:10:00 abc

How many alert events occurred within each device & state? This is a common problem that arises when aligning these two timeseries. While there are many ways to solve it—including an “as of” join—I am going to focus on how we can solve this problem with a few data transformations and an inequality join.

An inequality join is when we align two DataFrames/Tables based on some inequality. For example, in our two tables, we can locate all rows from the alert_events DataFrame that have a 'timestamp' that is larger than each row in state_polling_df. This lets us join these two tables based on an inequality.

Unfortunately, neither pandas nor Polars supports inequality joins directly, so we’ll need to unravel this topic to apply its concepts to our data.

pandas#

Before we can think about alignment and joins, we first need to condense our data. Since this is polling data, we have many duplicate entries. For example, note the first three rows of data:

state_polling_df.head(3)
timestamp device_id state
0 2000-01-01 04:00:00 abc state1
1 2000-01-01 04:30:00 abc state1
2 2000-01-01 05:00:00 abc state1

They all report that the same device is in the same state for 3 entries. We can condense this information to be a single row that also has a start time and stop time of the state. So, something that looks like this:

from pandas import Series

Series(
    ['2021-01-01 04:00:00', '2000-01-01 05:00:00', 'abc', 'state1'],
    index=['start', 'stop', 'device_id', 'state'],
).to_frame().T
start stop device_id state
0 2021-01-01 04:00:00 2000-01-01 05:00:00 abc state1

We can accomplish this transformation by aggregating along a run-length encoding of the 'state' column by 'device_id'. In pandas, we accomplish this by finding the locations where a given row of data is not equal to its previous row. Then we can aggregate along these stretches of locations.

state_df = (
    state_polling_df
    .assign(
        state_shift=lambda d:
            d.groupby('device_id')['state'].shift() != d['state'],
        state_group=lambda d: d.groupby('device_id')['state_shift'].cumsum(),
    )
    .groupby(['device_id', 'state', 'state_group'], as_index=False)
    .agg(
        start=('timestamp', 'min'),
        stop= ('timestamp', 'max'),
    )
)

state_df.sort_values(['device_id', 'start'])
device_id state state_group start stop
0 abc state1 1 2000-01-01 04:00:00 2000-01-01 05:00:00
3 abc state3 2 2000-01-01 05:30:00 2000-01-01 06:00:00
2 abc state2 3 2000-01-01 06:30:00 2000-01-01 07:00:00
1 abc state1 4 2000-01-01 07:30:00 2000-01-01 07:30:00
6 def state2 1 2000-01-01 04:00:00 2000-01-01 04:00:00
4 def state1 2 2000-01-01 04:30:00 2000-01-01 04:30:00
7 def state3 3 2000-01-01 05:00:00 2000-01-01 05:30:00
5 def state1 4 2000-01-01 06:00:00 2000-01-01 06:30:00

With a smaller dataset to operate along, we are ready to align these data. Since pandas does not support inequality joins, we are going to perform this join in an indirect manner.

First, we need to align these tables using an equality join along 'device_id'. This creates a result where we have a many:many join between our state_df and alert_events. The idea is similar to performing a cross-join, but we are only concerned with “crossing” the rows within each equality of "device_id".

state_df.merge(alert_events, on='device_id')
device_id state state_group start stop timestamp
0 abc state1 1 2000-01-01 04:00:00 2000-01-01 05:00:00 2000-01-01 03:15:00
1 abc state1 1 2000-01-01 04:00:00 2000-01-01 05:00:00 2000-01-01 04:05:00
2 abc state1 1 2000-01-01 04:00:00 2000-01-01 05:00:00 2000-01-01 04:17:00
3 abc state1 1 2000-01-01 04:00:00 2000-01-01 05:00:00 2000-01-01 04:44:00
4 abc state1 1 2000-01-01 04:00:00 2000-01-01 05:00:00 2000-01-01 05:10:00
... ... ... ... ... ... ...
87 def state3 3 2000-01-01 05:00:00 2000-01-01 05:30:00 2000-01-01 05:33:00
88 def state3 3 2000-01-01 05:00:00 2000-01-01 05:30:00 2000-01-01 06:22:00
89 def state3 3 2000-01-01 05:00:00 2000-01-01 05:30:00 2000-01-01 06:29:00
90 def state3 3 2000-01-01 05:00:00 2000-01-01 05:30:00 2000-01-01 06:43:00
91 def state3 3 2000-01-01 05:00:00 2000-01-01 05:30:00 2000-01-01 07:01:00

92 rows × 6 columns

From here, all we need to do is filter down to our inequality. In this case, we are concerned with locating alerts that occurred between the start/stop of any given state within the device.

(
    state_df
    .merge(alert_events, on='device_id')
    .loc[lambda d: d['timestamp'].between(d['start'], d['stop'])]
)
device_id state state_group start stop timestamp
1 abc state1 1 2000-01-01 04:00:00 2000-01-01 05:00:00 2000-01-01 04:05:00
2 abc state1 1 2000-01-01 04:00:00 2000-01-01 05:00:00 2000-01-01 04:17:00
3 abc state1 1 2000-01-01 04:00:00 2000-01-01 05:00:00 2000-01-01 04:44:00
38 abc state2 3 2000-01-01 06:30:00 2000-01-01 07:00:00 2000-01-01 06:51:00
45 abc state3 2 2000-01-01 05:30:00 2000-01-01 06:00:00 2000-01-01 05:43:00
46 abc state3 2 2000-01-01 05:30:00 2000-01-01 06:00:00 2000-01-01 05:53:00
68 def state1 4 2000-01-01 06:00:00 2000-01-01 06:30:00 2000-01-01 06:22:00
69 def state1 4 2000-01-01 06:00:00 2000-01-01 06:30:00 2000-01-01 06:29:00
86 def state3 3 2000-01-01 05:00:00 2000-01-01 05:30:00 2000-01-01 05:20:00

Finally, we can perform our aggregation to get the count of alerts that happened when a device was in a given state (preserving non-duplicated states).

(
    state_df
    .merge(alert_events, on='device_id')
    .loc[lambda d: d['timestamp'].between(d['start'], d['stop'])]
    .groupby(state_df.columns.tolist())['timestamp'].count().rename('count')
    .reset_index()
    .sort_values(['device_id', 'start'])
)
device_id state state_group start stop count
0 abc state1 1 2000-01-01 04:00:00 2000-01-01 05:00:00 3
2 abc state3 2 2000-01-01 05:30:00 2000-01-01 06:00:00 2
1 abc state2 3 2000-01-01 06:30:00 2000-01-01 07:00:00 1
4 def state3 3 2000-01-01 05:00:00 2000-01-01 05:30:00 1
3 def state1 4 2000-01-01 06:00:00 2000-01-01 06:30:00 2

This informs us that device "abc" was in "state1" from "04:00:00" to "05:00:00" and exhibited 3 alerts during that time. This is a very handy trick when joining two DataFrames that have timeseries or related continuous data.

The issue with this approach is that the join 'device_id' can create a VERY large table, depending on the number of unique entities present. So, while this works for our small tables here we should not expect this to be hugely performant. Instead we can reach for a tool like DuckDB that contains inequality joins as a fully supported feature.

Polars#

Note

Since Polars version 1.7.0, there is now official support for inequality joins via the join_where method! The following content will be out of date until I can update this blog post.

Welcome back to Cameron’s Corner! This week, I want to wrap up the conversation on inequality joins from last week. We’ll take a look at some different tools and approaches by tackling combining and polling tables via an inequality join in both Polars and DuckDB.

While Polars, like pandas, does not support inequality joins, we can take the same conceptual approach as we did in the above. Since we’ve already discussed these ideas, let’s take a closer look at the code:

from polars import from_pandas, col

state_df = (
    from_pandas(state_polling_df)
    .group_by(
        'device_id',
        'state',
        col('state').rle_id().over('device_id').alias('state_group')
    )
    .agg(
        start=col('timestamp').min(),
        stop= col('timestamp').max(),
    )
)


(
    state_df
    .join(from_pandas(alert_events), on='device_id')
    .filter(col('timestamp').is_between(col('start'), col('stop')))
    .group_by(['device_id', 'state', 'state_group'])
    .agg(
        col('start', 'stop').first(),
        col('timestamp').count().alias('count')
    )
    .sort('device_id', 'start')
)
shape: (5, 6)
device_idstatestate_groupstartstopcount
strstru32datetime[ns]datetime[ns]u32
"abc""state1"02000-01-01 04:00:002000-01-01 05:00:003
"abc""state3"12000-01-01 05:30:002000-01-01 06:00:002
"abc""state2"22000-01-01 06:30:002000-01-01 07:00:001
"def""state3"22000-01-01 05:00:002000-01-01 05:30:001
"def""state1"32000-01-01 06:00:002000-01-01 06:30:002

The largest difference is that Polars has a very handy rle_id expression that abstracts away the munging we had to do in pandas with multiple df.groupby(…)[column].shift() != df[column]s.

After that, the rest of the code is just a translation of the pandas syntax, join on 'device_id', filter by 'timestamp', and aggregate.

Let’s see how we would code this using DuckDB.

DuckDB#

With a big switch up in thinking, we need to use our SQL knowledge to accomplish this task. I have mentioned how DuckDB has a nice set of optimizations for inequality joins, but let’s see how we can put it into action.

Before we actually align our tables, we need to re-derive our aggregate transition states like we have done previously.

import duckdb

state_df = duckdb.query('''
    with transitions as (
        select 
            *,
            case
                when state != lag(state) over (partition by device_id) then 1 
                else 0
            end as state_transitions,
        from state_polling_df
    ),
    
    rles as (
        select 
            *,
            sum(state_transitions) over (partition by device_id order by timestamp) 
                as state_group,
        from transitions
    )
    
    select 
        device_id,
        first(state) as state,
        state_group,
        min(timestamp) as start,
        max(timestamp) as stop,
    from rles
    group by device_id, state_group
    order by device_id, state_group, state
''')

state_df.pl()
shape: (8, 5)
device_idstatestate_groupstartstop
strstrdecimal[38,0]datetime[ns]datetime[ns]
"abc""state1"02000-01-01 04:00:002000-01-01 05:00:00
"abc""state3"12000-01-01 05:30:002000-01-01 06:00:00
"abc""state2"22000-01-01 06:30:002000-01-01 07:00:00
"abc""state1"32000-01-01 07:30:002000-01-01 07:30:00
"def""state2"02000-01-01 04:00:002000-01-01 04:00:00
"def""state1"12000-01-01 04:30:002000-01-01 04:30:00
"def""state3"22000-01-01 05:00:002000-01-01 05:30:00
"def""state1"32000-01-01 06:00:002000-01-01 06:30:00

The above ends up being quite verbose and unfortunately requires us to split the computation for the contiguous states into two separate CTEs. This presents a slight optimization barrier to our program, but for this case, they should both be relatively relatively straightforward. I will say that the code to get to this step is much more verbose than the equivalent pandas or Polars code.

Now, let’s move on to the join and aggregation step. We use an implicit cross join via from state_df as st, alert_events as al, but since we also have a where clause, DuckDB can efficiently compute our join without needing to materialize the results first. We can also perform our aggregations in this same query.

duckdb.query('''
    select
        st.device_id,
        st.state,
        st.state_group,
        
        first(st.start) as start,
        first(st.stop) as stop,
        count(al.timestamp) as n_alerts,
    from state_df as st, alert_events as al
    where (st.device_id = al.device_id)
        and (al.timestamp between st.start and st.stop)
    group by st.device_id, st.state, st.state_group
    order by st.device_id, start
''').pl()
shape: (5, 6)
device_idstatestate_groupstartstopn_alerts
strstrdecimal[38,0]datetime[ns]datetime[ns]i64
"abc""state1"02000-01-01 04:00:002000-01-01 05:00:003
"abc""state3"12000-01-01 05:30:002000-01-01 06:00:002
"abc""state2"22000-01-01 06:30:002000-01-01 07:00:001
"def""state3"22000-01-01 05:00:002000-01-01 05:30:001
"def""state1"32000-01-01 06:00:002000-01-01 06:30:002

AsOf Joins#

Finally, to bring us full circle, how we can tackle this problem without needing a range join, using the only type of inequality join that is available in both pandas and Polars? This is an asof join—pandas.merge_asof, polars.join_asof, DuckDB asof join— which takes each row from a given DataFrame and locates the row in another DataFrame whose value is closest to the current row. This is commonly used for finding the nearest events in timeseries data, but does work with any floating point values.

In a simple example:

from polars import DataFrame, concat, col

# both DataFrames must be sorted by the join key!
left  = DataFrame({'a': [1, 5, 7, 10]})
right = DataFrame({'b': [1, 20, 8, 6, 4]}).sort('b')

left, right = left.lazy(), right.lazy()

queries = []
for strategy in ['forward', 'backward', 'nearest']:
    queries.append(
        left.join_asof(right, left_on='a', right_on='b', strategy=strategy)
        .select(col('b').name.suffix(f'_{strategy}'))
    )
    
concat([left, *queries], how='horizontal').collect()
shape: (4, 4)
ab_forwardb_backwardb_nearest
i64i64i64i64
1111
5646
7868
102088

Note that the strategy argument dictates the direction in which we search from the left. In the 'b_forward' column, we visited each value in column 'a' and found the next larger value in 'b'.

With our simple example behind us, let’s take a look at the dataset we’ve worked with:

from polars import from_pandas

(
    from_pandas(alert_events)
    .join_asof(
        # by='...' aligns subsets of data within `'device_id'`
        #  within each of these groups, we find the 
        #  nearest values of `'timestamp'` to `'start'`
        state_df.pl(), by='device_id', left_on='timestamp', right_on='start'
    )
    # due to the assumptions about our polling states, we need to filter
    #   after our join to ensure we only care about events who are associated
    #   with a non-ambiguous state.
    #   the join_asof ensures the timestamp >= start, so 
    #   we just need to filter by the compliment.
    .filter(col('timestamp') <= col('stop'))
    .group_by(['device_id', 'state', 'state_group'])
    .agg(
        col('start', 'stop').first(),
        col('timestamp').count().alias('count')
    )
    .sort('device_id', 'start')
).head(10)
shape: (5, 6)
device_idstatestate_groupstartstopcount
strstrdecimal[38,0]datetime[ns]datetime[ns]u32
"abc""state1"02000-01-01 04:00:002000-01-01 05:00:003
"abc""state3"12000-01-01 05:30:002000-01-01 06:00:002
"abc""state2"22000-01-01 06:30:002000-01-01 07:00:001
"def""state3"22000-01-01 05:00:002000-01-01 05:30:001
"def""state1"32000-01-01 06:00:002000-01-01 06:30:002

All of our tools have the ability to perform AsOf joins efficiently, so this is a very useful approach. While it works in this case, there are cases where we may need to prefer the flexibility of an inequality/range join.

Wrap-Up#

We’ve solved our problem! Not just once, but four times over using three different tools. I hope you enjoyed this small series of blog posts and can incorporate more pandas, Polars, DuckDB, and join types in your work.

What do you think of my solutions? Let me know on the DUTC Discord. Talk to you all next week!