Last active
August 29, 2015 13:56
-
-
Save carymrobbins/9103365 to your computer and use it in GitHub Desktop.
Perform a left join in Python without a database.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from collections import Iterable | |
| from itertools import chain | |
| def left_join(xss, yss, on=None): | |
| """ | |
| :type xss: Iterable[Iterable] | |
| :type yss: Iterable[Iterable] | |
| :type on: Iterable[int] | (Iterable, Iterable) -> bool | |
| :rtype: generator[tuple[list]] | | |
| (((Iterable, Iterable) -> bool) -> generator[tuple[list]]) | |
| """ | |
| if isinstance(on, Iterable): | |
| indexes = on | |
| def on(xs, ys): | |
| """Function to test that all items from xs match ys for indexes.""" | |
| for i in indexes: | |
| if xs[i] != ys[i]: | |
| # If not matched, short-circuit and return False, no join. | |
| return False | |
| # If we got here all indexes passed, so do the join! | |
| return True | |
| def do(f): | |
| no_join = None | |
| for xs in xss: | |
| matched = False | |
| for ys in yss: | |
| # Memoize no_join - null results where ys doesn't match. | |
| no_join = no_join or [None] * len(list(ys)) | |
| if f(xs, ys): | |
| matched = True | |
| yield (xs, ys) | |
| if not matched: | |
| yield (xs, no_join) | |
| if on: | |
| return do(on) | |
| return do | |
| # Example usage: | |
| import unittest | |
| class LeftJoinTest(unittest.TestCase): | |
| def test_left_join_on_index(self): | |
| xss = [ | |
| [1, 2, 3], | |
| [4, 5, 6], | |
| ] | |
| yss = [ | |
| [1, 2, 3], | |
| [7, 5, 6], | |
| ] | |
| self.assertAsListEqual(left_join(xss, yss, on=[0]), [ | |
| ([1, 2, 3], [1, 2, 3]), | |
| ([4, 5, 6], [None, None, None]), | |
| ]) | |
| self.assertAsListEqual(left_join(yss, xss, on=(1, 2)), [ | |
| ([1, 2, 3], [1, 2, 3]), | |
| ([7, 5, 6], [4, 5, 6]), | |
| ]) | |
| def test_left_join_on_function(self): | |
| xss = [ | |
| [1, 2, 3], | |
| [4, 5, 6], | |
| ] | |
| yss = [ | |
| [1, 2, 3], | |
| [7, 5, 6], | |
| ] | |
| result = left_join(xss, yss, on=lambda xs, ys: xs[0] - 2 == ys[1]) | |
| self.assertAsListEqual(result, [ | |
| ([1, 2, 3], [None, None, None]), | |
| ([4, 5, 6], [1, 2, 3]), | |
| ]) | |
| # As a decorator. | |
| @left_join(xss, yss) | |
| def result(xs, ys): | |
| return xs[0] + 4 == ys[1] | |
| self.assertAsListEqual(result, [ | |
| ([1, 2, 3], [7, 5, 6]), | |
| ([4, 5, 6], [None, None, None]), | |
| ]) | |
| def test_cartesian_product(self): | |
| xss = [ | |
| [1, 2, 3], | |
| [4, 5, 6], | |
| ] | |
| yss = [ | |
| [1, 2, 3], | |
| [7, 5, 6], | |
| ] | |
| # Empty indexes mean no checking. | |
| self.assertAsListEqual(left_join(xss, yss, on=()), [ | |
| ([1, 2, 3], [1, 2, 3]), | |
| ([1, 2, 3], [7, 5, 6]), | |
| ([4, 5, 6], [1, 2, 3]), | |
| ([4, 5, 6], [7, 5, 6]), | |
| ]) | |
| # With a function that always returns True. | |
| self.assertAsListEqual(left_join(xss, yss, on=lambda *_: True), [ | |
| ([1, 2, 3], [1, 2, 3]), | |
| ([1, 2, 3], [7, 5, 6]), | |
| ([4, 5, 6], [1, 2, 3]), | |
| ([4, 5, 6], [7, 5, 6]), | |
| ]) | |
| # noinspection PyPep8Naming | |
| def assertAsListEqual(self, xs, ys, msg=None): | |
| """Evaluate each sequence as a list and assert equality.""" | |
| self.assertListEqual(list(xs), list(ys), msg) | |
| if __name__ == '__main__': | |
| unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment