Skip to content

Instantly share code, notes, and snippets.

@everilae
Created March 27, 2018 06:42
Show Gist options
  • Save everilae/b7d48f3abab8338a9f1bf18c1f8876b4 to your computer and use it in GitHub Desktop.
Save everilae/b7d48f3abab8338a9f1bf18c1f8876b4 to your computer and use it in GitHub Desktop.
Relational division for SQLAlchemy, inspired by jOOQ.
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.elements import ClauseElement
from sqlalchemy.sql import util as sqlutil
from sqlalchemy.sql.base import Executable, _generative
from sqlalchemy import select, exists, and_
class Divide(Executable, ClauseElement):
def __init__(self, dividend, divisor, on=None, returning=None):
self._dividend = dividend
self._divisor = divisor
self._on = on
self._returning = returning
self._unique_columns = []
self._common_columns = []
self._inspect_columns()
@_generative
def on(self, *criterion):
self._on = and_(*criterion)
@_generative
def returning(self, *cols):
self._returning = cols
def _inspect_columns(self):
for col in self._dividend.columns:
rcol = self._divisor.columns.get(col.name)
if rcol is None:
self._unique_columns.append(col)
else:
self._common_columns.append((col, rcol))
@property
def _default_on(self):
return and_(*[l == r for l, r in self._common_columns])
@property
def _default_returning(self):
return self._unique_columns
@property
def select(self):
d2 = self._dividend.alias()
adapter = sqlutil.ClauseAdapter(d2)
on = (self._on if self._on is not None
else self._default_on)
returning = (self._returning if self._returning is not None
else self._default_returning)
div_on = adapter.traverse(on)
join_on = and_(*[c == adapter.traverse(c) for c in returning])
return select(list(returning)).\
where(~exists().
select_from(self._divisor).\
where(~exists().
where(and_(join_on, div_on)).
correlate_except(d2))).\
distinct()
@compiles(Divide)
def compile_divide(element, compiler, **kw):
return compiler.process(element.select, **kw)
@everilae
Copy link
Author

everilae commented Mar 27, 2018

Example:

In [1]: from sqlalchemy import create_engine, Table, Column, Integer, MetaData

In [2]: from division import Divide

In [3]: engine = create_engine('sqlite:///')

In [4]: metadata = MetaData(bind=engine)

In [5]: a = Table('a', metadata,
   ...:           Column('x', Integer),
   ...:           Column('y', Integer),
   ...:           Column('z', Integer))
   ...:           

In [6]: b = Table('b', metadata,
   ...:           Column('y', Integer),
   ...:           Column('z', Integer))
   ...:           

In [7]: metadata.create_all()

A:

x y z
1 0 0
... ... ...
1 9 9
2 0 0
... ... ...
2 4 4

B:

y z
0 0
... ...
8 8
In [8]: engine.execute(a.insert().values([(1, v, v) for v in range(10)]))
Out[8]: <sqlalchemy.engine.result.ResultProxy at 0x7f76a3b14128>

In [9]: engine.execute(a.insert().values([(2, v, v) for v in range(5)]))
Out[9]: <sqlalchemy.engine.result.ResultProxy at 0x7f76a3b24550>

In [10]: engine.execute(b.insert().values([(v, v) for v in range(9)]))
Out[10]: <sqlalchemy.engine.result.ResultProxy at 0x7f76a3b2f550>
In [11]: engine.execute(Divide(a, b)).fetchall()
Out[11]: [(1,)]

In [12]: engine.execute(b.delete().where(b.c.y >= 5))
Out[12]: <sqlalchemy.engine.result.ResultProxy at 0x7f76a3b2feb8>

In [13]: engine.execute(Divide(a, b)).fetchall()
Out[13]: [(1,), (2,)]

In [14]: engine.execute(Divide(a, b).on(a.c.y == b.c.y)).fetchall()
Out[14]: [(1,), (2,)]

In [15]: engine.execute(Divide(a, b).on(a.c.y == b.c.y).returning(a.c.z)).fetchall()
Out[15]: []

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment