Created
November 11, 2017 09:33
-
-
Save Nikolay-Lysenko/492a93aa834fb73b8090ade9506c3c90 to your computer and use it in GitHub Desktop.
How to join two DataFrames on conditions that can be other than exact match?
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Implementation of non-equi inner join in `pandas`. | |
Non-equi join is a join with conditions other than exact match. | |
For example, values from a column of a left `DataFrame` must be higher | |
than values from a column of a right `DataFrame` and lower than values | |
from another column of the right `DataFrame`. | |
@author: Nikolay Lysenko | |
""" | |
from typing import List, Union, Callable, Any, Optional | |
import numpy as np | |
import pandas as pd | |
ColumnarType = Union[str, List[str]] | |
def maybe_convert_to_list(something: Union[Any, List[Any]]) -> List[Any]: | |
""" | |
Make a list of one element if argument is not a list. | |
:param something: | |
single value or list | |
:return: | |
non-empty list | |
""" | |
return something if isinstance(something, list) else [something] | |
def non_equi_inner_join( | |
left: pd.DataFrame, | |
right: pd.DataFrame, | |
non_equi_conditions: Union[Callable, List[Callable]], | |
equi_on: Optional[Union[ColumnarType]] = None, | |
equi_left_on: Optional[ColumnarType] = None, | |
equi_right_on: Optional[ColumnarType] = None, | |
n_partitioning_equi_keys: Optional[int] = None | |
) -> pd.DataFrame: | |
""" | |
Join two `DataFrame`s by arbitrary conditions and, | |
optionally, by exact match of some columns (this | |
equi join part is regular merge from `pandas`). | |
:param left: | |
left `DataFrame` | |
:param right: | |
right `DataFrame` | |
:param non_equi_conditions: | |
functions such that their only argument is a `DataFrame` | |
produced by join that ignores non-equi join part | |
and result of each of them is a boolean mask that | |
tells which rows satisfy corresponding non-equi join | |
condition | |
:param equi_on: | |
columns to be used for equi join part, this argument | |
overrides `equi_left_on` and `equi_right_on` | |
:param equi_left_on: | |
columns of `left` to be used for equi join part | |
:param equi_right_on: | |
columns of `right` to be used for equi join part | |
:param n_partitioning_equi_keys: | |
number of equi join keys to be used for partitioning; | |
the corresponding number of first keys is selected, so | |
take this into consideration when ordering equi join keys; | |
partitioning means that, instead of running equi join | |
followed with filtering by non-equi conditions, | |
both `DataFrame`s are split by some of equi join keys, | |
then (equi_join+filtering) is run for each of the | |
partitions and, finally, results are concatenated; | |
the higher this argument, the slower execution can be, | |
but also the less memory is taken in case of large | |
enough `DataFrames`; default leads to usage of all | |
equi join keys for partitioning | |
:return: | |
joined `DataFrame` | |
Examples: | |
>>> left = pd.DataFrame([[1, 2, 3], | |
>>> [4, 5, 6], | |
>>> [7, 8, 9]], | |
>>> columns=['a', 'b', 'c']) | |
>>> right = pd.DataFrame([[1, 2.5, -3], | |
>>> [4, 7, -6], | |
>>> [6, 8.5, -9]], | |
>>> columns=['a', 'd', 'e']) | |
>>> non_equi_inner_join( | |
>>> left, | |
>>> right, | |
>>> non_equi_conditions=[lambda x: x['d'] > x['b'], | |
>>> lambda x: x['d'] < x['c']], | |
>>> equi_on='a' | |
>>> ) | |
pd.DataFrame([[1, 2, 3, 2.5, -3]], | |
columns=['a', 'b', 'c', 'd', 'e']) | |
>>> non_equi_inner_join( | |
>>> left, | |
>>> right, | |
>>> non_equi_conditions=[lambda x: x['d'] > x['b'], | |
>>> lambda x: x['d'] < x['c']] | |
>>> ) | |
pd.DataFrame([[1, 2, 3, 1, 2.5, -3], | |
[7, 8, 9, 6, 8.5, -9]], | |
columns=['a_x', 'b', 'c', 'a_y', 'd', 'e']) | |
""" | |
# Validate and preprocess inputs. | |
non_equi_conditions = maybe_convert_to_list(non_equi_conditions) | |
if equi_on is not None: | |
equi_left_on = maybe_convert_to_list(equi_on) | |
equi_right_on = maybe_convert_to_list(equi_on) | |
else: | |
equi_left_on = maybe_convert_to_list(equi_left_on) | |
equi_right_on = maybe_convert_to_list(equi_right_on) | |
if equi_left_on[0] is None: | |
if equi_right_on[0] is not None: | |
raise ValueError('Left keys are set, right keys are not set.') | |
equi_left_on = ['__dummy_key'] | |
equi_right_on = ['__dummy_key'] | |
left['__dummy_key'] = 0 | |
right['__dummy_key'] = 0 | |
if n_partitioning_equi_keys is None: | |
n_partitioning_equi_keys = len(equi_left_on) | |
elif n_partitioning_equi_keys > len(equi_left_on): | |
raise ValueError( | |
'Number of keys for partitioning exceeds number of equi join keys.' | |
) | |
# Use equi keys (if they are set) for optimizing memory consumption. | |
if n_partitioning_equi_keys > 0: | |
left_partition_keys = equi_left_on[:n_partitioning_equi_keys] | |
right_partition_keys = equi_right_on[:n_partitioning_equi_keys] | |
left_partitions = { | |
idx: group | |
for idx, group | |
in left.groupby(left_partition_keys, as_index=False) | |
} | |
right_partitions = { | |
idx: group | |
for idx, group | |
in right.groupby(right_partition_keys, as_index=False) | |
} | |
else: | |
left_partitions = {None: left} | |
right_partitions = {None: right} | |
# Join DataFrames. | |
join_results = [] | |
for idx, curr_left in left_partitions.items(): | |
curr_right = right_partitions.get(idx) | |
if curr_right is None: | |
continue | |
curr_joined = pd.merge( | |
curr_left, | |
curr_right, | |
left_on=equi_left_on, | |
right_on=equi_right_on | |
) | |
conditions = [ | |
checking_fn(curr_joined) for checking_fn in non_equi_conditions | |
] | |
curr_joined = curr_joined[np.logical_and.reduce(conditions)] | |
join_results.append(curr_joined) | |
if len(join_results) > 0: | |
joined = pd.concat(join_results) | |
else: | |
joined = pd.DataFrame(columns=left.columns) | |
tmp_columns = [x for x in joined.columns.tolist() if '__dummy_key' in x] | |
joined.drop(tmp_columns, axis=1, inplace=True) | |
return joined | |
# TODO: preserve index of left `DataFrame`. | |
# TODO: return all necessary columns in case of empty result. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment