Skip to content

Instantly share code, notes, and snippets.

@Nikolay-Lysenko
Created November 11, 2017 09:33
Show Gist options
  • Save Nikolay-Lysenko/492a93aa834fb73b8090ade9506c3c90 to your computer and use it in GitHub Desktop.
Save Nikolay-Lysenko/492a93aa834fb73b8090ade9506c3c90 to your computer and use it in GitHub Desktop.
How to join two DataFrames on conditions that can be other than exact match?
"""
Implementation of non-equi inner join in `pandas`.
Non-equi join is a join with conditions other than exact match.
For example, values from a column of a left `DataFrame` must be higher
than values from a column of a right `DataFrame` and lower than values
from another column of the right `DataFrame`.
@author: Nikolay Lysenko
"""
from typing import List, Union, Callable, Any, Optional
import numpy as np
import pandas as pd
ColumnarType = Union[str, List[str]]
def maybe_convert_to_list(something: Union[Any, List[Any]]) -> List[Any]:
"""
Make a list of one element if argument is not a list.
:param something:
single value or list
:return:
non-empty list
"""
return something if isinstance(something, list) else [something]
def non_equi_inner_join(
left: pd.DataFrame,
right: pd.DataFrame,
non_equi_conditions: Union[Callable, List[Callable]],
equi_on: Optional[Union[ColumnarType]] = None,
equi_left_on: Optional[ColumnarType] = None,
equi_right_on: Optional[ColumnarType] = None,
n_partitioning_equi_keys: Optional[int] = None
) -> pd.DataFrame:
"""
Join two `DataFrame`s by arbitrary conditions and,
optionally, by exact match of some columns (this
equi join part is regular merge from `pandas`).
:param left:
left `DataFrame`
:param right:
right `DataFrame`
:param non_equi_conditions:
functions such that their only argument is a `DataFrame`
produced by join that ignores non-equi join part
and result of each of them is a boolean mask that
tells which rows satisfy corresponding non-equi join
condition
:param equi_on:
columns to be used for equi join part, this argument
overrides `equi_left_on` and `equi_right_on`
:param equi_left_on:
columns of `left` to be used for equi join part
:param equi_right_on:
columns of `right` to be used for equi join part
:param n_partitioning_equi_keys:
number of equi join keys to be used for partitioning;
the corresponding number of first keys is selected, so
take this into consideration when ordering equi join keys;
partitioning means that, instead of running equi join
followed with filtering by non-equi conditions,
both `DataFrame`s are split by some of equi join keys,
then (equi_join+filtering) is run for each of the
partitions and, finally, results are concatenated;
the higher this argument, the slower execution can be,
but also the less memory is taken in case of large
enough `DataFrames`; default leads to usage of all
equi join keys for partitioning
:return:
joined `DataFrame`
Examples:
>>> left = pd.DataFrame([[1, 2, 3],
>>> [4, 5, 6],
>>> [7, 8, 9]],
>>> columns=['a', 'b', 'c'])
>>> right = pd.DataFrame([[1, 2.5, -3],
>>> [4, 7, -6],
>>> [6, 8.5, -9]],
>>> columns=['a', 'd', 'e'])
>>> non_equi_inner_join(
>>> left,
>>> right,
>>> non_equi_conditions=[lambda x: x['d'] > x['b'],
>>> lambda x: x['d'] < x['c']],
>>> equi_on='a'
>>> )
pd.DataFrame([[1, 2, 3, 2.5, -3]],
columns=['a', 'b', 'c', 'd', 'e'])
>>> non_equi_inner_join(
>>> left,
>>> right,
>>> non_equi_conditions=[lambda x: x['d'] > x['b'],
>>> lambda x: x['d'] < x['c']]
>>> )
pd.DataFrame([[1, 2, 3, 1, 2.5, -3],
[7, 8, 9, 6, 8.5, -9]],
columns=['a_x', 'b', 'c', 'a_y', 'd', 'e'])
"""
# Validate and preprocess inputs.
non_equi_conditions = maybe_convert_to_list(non_equi_conditions)
if equi_on is not None:
equi_left_on = maybe_convert_to_list(equi_on)
equi_right_on = maybe_convert_to_list(equi_on)
else:
equi_left_on = maybe_convert_to_list(equi_left_on)
equi_right_on = maybe_convert_to_list(equi_right_on)
if equi_left_on[0] is None:
if equi_right_on[0] is not None:
raise ValueError('Left keys are set, right keys are not set.')
equi_left_on = ['__dummy_key']
equi_right_on = ['__dummy_key']
left['__dummy_key'] = 0
right['__dummy_key'] = 0
if n_partitioning_equi_keys is None:
n_partitioning_equi_keys = len(equi_left_on)
elif n_partitioning_equi_keys > len(equi_left_on):
raise ValueError(
'Number of keys for partitioning exceeds number of equi join keys.'
)
# Use equi keys (if they are set) for optimizing memory consumption.
if n_partitioning_equi_keys > 0:
left_partition_keys = equi_left_on[:n_partitioning_equi_keys]
right_partition_keys = equi_right_on[:n_partitioning_equi_keys]
left_partitions = {
idx: group
for idx, group
in left.groupby(left_partition_keys, as_index=False)
}
right_partitions = {
idx: group
for idx, group
in right.groupby(right_partition_keys, as_index=False)
}
else:
left_partitions = {None: left}
right_partitions = {None: right}
# Join DataFrames.
join_results = []
for idx, curr_left in left_partitions.items():
curr_right = right_partitions.get(idx)
if curr_right is None:
continue
curr_joined = pd.merge(
curr_left,
curr_right,
left_on=equi_left_on,
right_on=equi_right_on
)
conditions = [
checking_fn(curr_joined) for checking_fn in non_equi_conditions
]
curr_joined = curr_joined[np.logical_and.reduce(conditions)]
join_results.append(curr_joined)
if len(join_results) > 0:
joined = pd.concat(join_results)
else:
joined = pd.DataFrame(columns=left.columns)
tmp_columns = [x for x in joined.columns.tolist() if '__dummy_key' in x]
joined.drop(tmp_columns, axis=1, inplace=True)
return joined
# TODO: preserve index of left `DataFrame`.
# TODO: return all necessary columns in case of empty result.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment