Created
October 29, 2018 08:04
-
-
Save kleptog/7f79132d59217e07976231a97149d577 to your computer and use it in GitHub Desktop.
Python implementation for boolean expression index
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf8 | |
# This is a python implementation of the boolean expression algorithm | |
# described in the paper "Indexing Boolean Expressions" by Whong et al. | |
# https://theory.stanford.edu/~sergei/papers/vldb09-indexing.pdf | |
# Note this really only uses the index ideas from the paper, because the | |
# ctual algorithm it describes involves shuffling through lists, something | |
# which is extremely difficult to do efficiently in python. I went for the | |
# easier route of simply counting entries. As such, I don't give any | |
# guarentees about the complexity of the algorithm, but it seems it should | |
# be comparable. | |
# This file implements the CNF algorithm in nthe paper. | |
import pprint | |
from collections import Counter | |
import itertools | |
class Condition(object): | |
var = None | |
val = None | |
inv = None | |
def __init__(self, var, val, inv=False): | |
self.var = var | |
self.val = val | |
self.inv = (inv is not False) | |
def __str__(self): | |
if self.inv: | |
return "(%s ∉ %r)" % (self.var, self.val) | |
else: | |
return "(%s ∈ %r)" % (self.var, self.val) | |
class And(list): | |
def __init__(self, *args): | |
return list.__init__(self, args) | |
def __str__(self): | |
return " & ".join(str(c) for c in self) | |
class Or(list): | |
def __init__(self, *args): | |
return list.__init__(self, args) | |
def __str__(self): | |
return " | ".join("(%s)" % c for c in self) | |
class BE(object): | |
""" Represents a Boolean Expression to be indexed and evaluated IN CNF form""" | |
def __init__(self, name, be): | |
if isinstance(be, Or): | |
be = And(be) | |
assert isinstance(be, And) | |
self.name = name | |
self.be = be | |
def cnf_size(self): | |
""" The size of a CNF BE is defined as the number of disjunctions in the BE with no ∉ predicates """ | |
size = 0 | |
for c in self.be: | |
assert isinstance(c, Or) | |
for d in c: | |
if d.inv: | |
break | |
else: | |
size += 1 | |
return size | |
def cnf_index_terms(self): | |
for i, c in enumerate(self.be): | |
for d in c: | |
for val in d.val: | |
yield ((d.var, val), (self, d.inv, i)) | |
def get_disjunction_counts(self): | |
return [sum(1 for term in and_term if term.inv) for and_term in self.be] | |
def num_terms(self): | |
return len(self.be) | |
def eval(self, ass): | |
ass = dict(ass) | |
def eval_term(term): | |
if isinstance(term, And): | |
return all(eval_term(t) for t in term) | |
if isinstance(term, Or): | |
return any(eval_term(t) for t in term) | |
if isinstance(term, Condition): | |
val = ass.get(term.var) | |
if val is None: | |
return term.inv | |
if term.inv: | |
return val not in term.val | |
return val in term.val | |
return eval_term(self.be) | |
def __repr__(self): | |
return "<BE %r>" % self.name | |
c1 = BE('c1', And(Or(Condition('A', [1]), Condition('B',[1])), Or(Condition('C', [1]), Condition('D', [1])))) | |
c2 = BE('c2', And(Or(Condition('A', [1]), Condition('C',[2])), Or(Condition('B', [1]), Condition('D', [1])))) | |
c3 = BE('c3', And(Or(Condition('A', [1]), Condition('B',[1])), Or(Condition('C', [2]), Condition('D', [1])))) | |
c4 = BE('c4', And(Or(Condition('A', [1, 2]), Condition('B',[1])), Or(Condition('A', [1]), Condition('D', [1])))) # Typo in paper | |
c5 = BE('c5', And(Or(Condition('A', [1]), Condition('B',[1])), Or(Condition('C', [1, 2], True), Condition('D', [1], True), Condition('E', [1])))) | |
c6 = BE('c6', Or(Condition('A', [1], True), Condition('B', [1]))) | |
exprs = [c1, c2, c3, c4, c5, c6] | |
class CNFIndex(object): | |
def __init__(self, exprs): | |
self.index, self.zeros = self.build_index(exprs) | |
# Index conditions | |
def build_index(self, exprs): | |
index = dict() | |
zeros = set() | |
for be in exprs: | |
k = be.cnf_size() | |
if k not in index: | |
index[k] = dict() | |
if k == 0: | |
zeros.add(be) | |
for key, val in be.cnf_index_terms(): | |
if key not in index[k]: | |
index[k][key] = [] | |
index[k][key].append(val) | |
return index, zeros | |
def dump_index(self): | |
pprint.pprint(self.index) | |
def match(self, ass): | |
result = set() | |
for k in self.index: | |
counts = Counter(itertools.chain.from_iterable(self.index[k].get(a, []) for a in ass)) | |
rules = set(rule for rule, inv, iid in counts) | |
if k == 0: | |
rules.update(self.zeros) # Always do zeros | |
for rule in rules: | |
disjunction_count_list = rule.get_disjunction_counts() | |
for dis_id in range(rule.num_terms()): | |
if counts.get((rule, False, dis_id), 0) == 0 and counts.get((rule, True, dis_id), 0) >= disjunction_count_list[dis_id]: | |
break | |
else: | |
result.add(rule) | |
return result | |
class NoIndex(object): | |
def __init__(self, exprs): | |
self.exprs = exprs | |
def match(self, ass): | |
result = set() | |
for expr in self.exprs: | |
if expr.eval(ass): | |
result.add(expr) | |
return result | |
no_index = NoIndex(exprs) | |
cnf_index = CNFIndex(exprs) | |
#print cnf_index.match((('A', 1), ('C', 2))) | |
#print no_index.match((('A', 1), ('C', 2))) | |
all_ass = itertools.product( | |
(('A', x) for x in (None, 1, 2)), | |
(('B', x) for x in (None, 1, 2)), | |
(('C', x) for x in (None, 1, 2)), | |
(('D', x) for x in (None, 1, 2)), | |
(('E', x) for x in (None, 1, 2)), | |
) | |
errors = Counter() | |
for ass in all_ass: | |
ass = [x for x in ass if x[1] is not None] | |
r1 = cnf_index.match(ass) | |
r2 = no_index.match(ass) | |
if r1 != r2: | |
errors.update(r1 ^ r2) | |
print r1, r2, ass | |
import pdb; pdb.set_trace() | |
pprint.pprint(errors) | |
import timeit | |
def testcnf(): | |
for ass in all_ass: | |
cnf_index(ass) | |
def testno(): | |
for ass in all_ass: | |
no_index(ass) | |
print timeit.timeit(testcnf, number=10000) | |
print timeit.timeit(testno, number=10000) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf8 | |
# This is a python implementation of the boolean expression algorithm | |
# described in the paper "Indexing Boolean Expressions" by Whong et al. | |
# https://theory.stanford.edu/~sergei/papers/vldb09-indexing.pdf | |
# Note this really only uses the index ideas from the paper, because the | |
# ctual algorithm it describes involves shuffling through lists, something | |
# which is extremely difficult to do efficiently in python. I went for the | |
# easier route of simply counting entries. As such, I don't give any | |
# guarentees about the complexity of the algorithm, but it seems it should | |
# be comparable. | |
# This file implements the DNF algorithm in the paper | |
import pprint | |
from collections import Counter | |
import itertools | |
class Condition(object): | |
var = None | |
val = None | |
inv = None | |
def __init__(self, var, val, inv=False): | |
self.var = var | |
self.val = val | |
self.inv = (inv is not False) | |
def __str__(self): | |
if self.inv: | |
return "(%s ∉ %r)" % (self.var, self.val) | |
else: | |
return "(%s ∈ %r)" % (self.var, self.val) | |
class And(list): | |
def __init__(self, *args): | |
return list.__init__(self, args) | |
def __str__(self): | |
return " & ".join(str(c) for c in self) | |
class Or(list): | |
def __init__(self, *args): | |
return list.__init__(self, args) | |
def __str__(self): | |
return " | ".join("(%s)" % c for c in self) | |
class BE(object): | |
""" Represents a Boolean Expression to be indexed and evaluated IN DNF form""" | |
def __init__(self, name, be): | |
if isinstance(be, Or): | |
be = And(be) | |
assert isinstance(be, And) | |
self.name = name | |
self.be = be | |
def dnf_size(self): | |
""" The size of a DNF BE is defined as the number of ∈ predicates """ | |
size = 0 | |
for c in self.be: | |
assert isinstance(c, Condition) | |
if not c.inv: | |
size += 1 | |
return size | |
def dnf_index_terms(self): | |
for i, d in enumerate(self.be): | |
for val in d.val: | |
yield ((d.var, val), (self, d.inv)) | |
def eval(self, ass): | |
ass = dict(ass) | |
def eval_term(term): | |
if isinstance(term, And): | |
return all(eval_term(t) for t in term) | |
if isinstance(term, Or): | |
return any(eval_term(t) for t in term) | |
if isinstance(term, Condition): | |
val = ass.get(term.var) | |
if val is None: | |
return term.inv | |
if term.inv: | |
return val not in term.val | |
return val in term.val | |
return eval_term(self.be) | |
def __repr__(self): | |
return "<BE %r>" % self.name | |
c1 = BE('c1', And(Condition('age', [3]), Condition('state', ['NY']))) | |
c2 = BE('c2', And(Condition('age', [3]), Condition('gender', ['F']))) | |
c3 = BE('c3', And(Condition('age', [3]), Condition('gender', ['M']), Condition('state', ['CA'], True))) | |
c4 = BE('c4', And(Condition('state', ['CA']), Condition('gender', ['M']))) | |
c5 = BE('c5', And(Condition('age', [3, 4]))) | |
c6 = BE('c6', And(Condition('state', ['CA', 'NY'], True))) | |
exprs = [c1, c2, c3, c4, c5, c6] | |
class DNFIndex(object): | |
def __init__(self, exprs): | |
self.index, self.zeros = self.build_index(exprs) | |
# Index conditions | |
def build_index(self, exprs): | |
index = dict() | |
zeros = set() | |
for be in exprs: | |
k = be.dnf_size() | |
if k not in index: | |
index[k] = dict() | |
if k == 0: | |
zeros.add(be) | |
for key, val in be.dnf_index_terms(): | |
if key not in index[k]: | |
index[k][key] = [] | |
index[k][key].append(val) | |
return index, zeros | |
def dump_index(self): | |
pprint.pprint(self.index) | |
def match(self, ass): | |
result = set() | |
for k in self.index: | |
counts = Counter(itertools.chain.from_iterable(self.index[k].get(a, []) for a in ass)) | |
rules = set(rule for rule, inv in counts) | |
if k == 0: | |
rules.update(self.zeros) # Always do zeros | |
for rule in rules: | |
if counts.get((rule, False), 0) == k and counts.get((rule, True), 0) == 0: | |
result.add(rule) | |
return result | |
class NoIndex(object): | |
def __init__(self, exprs): | |
self.exprs = exprs | |
def match(self, ass): | |
result = set() | |
for expr in self.exprs: | |
if expr.eval(ass): | |
result.add(expr) | |
return result | |
no_index = NoIndex(exprs) | |
dnf_index = DNFIndex(exprs) | |
#print dnf_index.match((('A', 1), ('C', 2))) | |
#print no_index.match((('A', 1), ('C', 2))) | |
all_ass = itertools.product( | |
(('state', x) for x in (None, 'NY', 'CA')), | |
(('gender', x) for x in (None, 'M', 'F')), | |
(('age', x) for x in (None, 3, 4)), | |
) | |
errors = Counter() | |
for ass in all_ass: | |
ass = [x for x in ass if x[1] is not None] | |
r1 = dnf_index.match(ass) | |
r2 = no_index.match(ass) | |
if r1 != r2: | |
errors.update(r1 ^ r2) | |
print r1, r2, ass | |
import pdb; pdb.set_trace() | |
pprint.pprint(errors) | |
import timeit | |
def testdnf(): | |
for ass in all_ass: | |
dnf_index(ass) | |
def testno(): | |
for ass in all_ass: | |
no_index(ass) | |
print timeit.timeit(testdnf, number=10000) | |
print timeit.timeit(testno, number=10000) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment