Created
April 22, 2019 12:40
-
-
Save inhzus/1aa89947614f446acd41f80d2d78f3f0 to your computer and use it in GitHub Desktop.
Frequent itemsets Algorithm: Apriori
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# created by inhzus | |
import typing | |
from collections import defaultdict | |
from itertools import combinations | |
from numbers import Number | |
from pprint import pprint | |
from bitarray import bitarray | |
T = typing.TypeVar('T') | |
def _join(itemsets: typing.List[tuple]) -> typing.Iterable[tuple]: | |
"""Join k length itemsets into k + 1 length itemsets""" | |
i = 0 | |
while i < len(itemsets): | |
step = 1 | |
*heads, tail = itemsets[i] | |
extras = [tail] | |
for j in range(i + 1, len(itemsets)): | |
*inner_heads, inner_tail = itemsets[j] | |
if heads == inner_heads: | |
extras.append(inner_tail) | |
step += 1 | |
else: | |
break | |
heads = tuple(heads) | |
for m, n in sorted(combinations(extras, 2)): | |
yield heads + (m, n,) | |
i += step | |
def _prune( | |
itemsets: typing.List[tuple], | |
possibilities: typing.Iterable[tuple] | |
) -> typing.Iterable[tuple]: | |
itemsets = set(itemsets) | |
for possible in possibilities: | |
# yield possible | |
for i in range(len(possible) - 2): | |
unchecked = possible[:i] + possible[i + 1:] | |
if unchecked not in itemsets: | |
break | |
else: | |
yield possible | |
def _generate(itemsets: typing.List[tuple]) -> typing.Iterable[tuple]: | |
yield from _prune(itemsets, _join(itemsets)) | |
def frequent_itemsets( | |
transactions: typing.List[tuple], | |
min_support: float | |
) -> (typing.Dict[int, typing.Dict[tuple, int]], int): | |
num_transactions = len(transactions) | |
# counts = Counter() | |
counter: typing.Dict[T, bitarray] = defaultdict(lambda: bitarray('0' * len(transactions))) | |
for idx, transaction in enumerate(transactions): | |
for item in transaction: | |
counter[item][idx] = 1 | |
# counts[item] += 1 | |
itemsets: typing.Dict[int, typing.Dict[tuple, int]] = { | |
1: {(k,): counter[k].count() | |
for k in counter | |
if counter[k].count() / num_transactions >= min_support}} | |
# itemsets = {1: {(k,): c for k, c in counts.items()}} | |
if not itemsets: | |
return dict(), num_transactions | |
k = 1 | |
while itemsets[k]: | |
children: list = sorted(itemsets[k].keys()) | |
candidates: typing.Iterable[tuple] = list(_generate(children)) | |
if not candidates: | |
break | |
level = {} | |
for candidate in candidates: | |
bits = bitarray('1' * num_transactions) | |
for item in candidate: | |
bits &= counter[item] | |
c = bits.count() | |
if c / num_transactions >= min_support: | |
level[candidate] = c | |
if not level: | |
break | |
itemsets[k + 1] = level | |
k += 1 | |
# k = 1 | |
# while itemsets[k]: | |
# children: list = sorted(itemsets[k].keys()) | |
# candidates = list(_generate(children)) | |
# if not candidates: | |
# break | |
# candidate_counter = Counter() | |
# for transaction in transactions: | |
# for candidate, candidate_set in zip( | |
# candidates, [set(item) for item in candidates]): | |
# if set.issubset(candidate_set, transaction): | |
# candidate_counter[candidate] += 1 | |
# | |
# candidates = {item: c for (item, c) in candidate_counter.items() | |
# if (c / num_transactions) >= min_support} | |
# if not candidates: | |
# break | |
# itemsets[k + 1] = candidates | |
# k += 1 | |
return itemsets, num_transactions | |
class Rule: | |
def __init__(self, | |
lhs: tuple, | |
rhs: tuple, | |
num_both: int = 0, | |
num_lhs: int = 0, | |
num_rhs: int = 0, | |
num_transactions: int = 0): | |
self.lhs = lhs | |
self.rhs = rhs | |
self.num_both = num_both | |
self.num_lhs = num_lhs | |
self.num_rhs = num_rhs | |
self.num_transactions = num_transactions | |
@property | |
def confidence(self): | |
try: | |
return self.num_both / self.num_lhs | |
except ZeroDivisionError or AttributeError: | |
return None | |
@property | |
def support(self): | |
try: | |
return self.num_both / self.num_transactions | |
except ZeroDivisionError or AttributeError: | |
return None | |
@property | |
def lift(self): | |
"""P(X and Y) / (P(X) * P(Y))""" | |
try: | |
expected = (self.num_lhs * self.num_rhs) / self.num_transactions ** 2 | |
observed = self.num_both / self.num_transactions | |
return observed / expected | |
except ZeroDivisionError or AttributeError: | |
return None | |
@property | |
def conviction(self): | |
"""P(not Y) / P(not Y | X)""" | |
try: | |
eps = 10e-10 | |
not_rhs = 1 - self.num_rhs / self.num_transactions | |
not_rhs_given_lhs = 1 - self.confidence | |
return not_rhs / (not_rhs_given_lhs + eps) | |
except ZeroDivisionError or AttributeError: | |
return None | |
@staticmethod | |
def _p(_s): | |
return '{{{}}}'.format(', '.join(str(k) for k in _s)) | |
def __repr__(self): | |
return '{} => {}'.format(self._p(self.lhs), self._p(self.rhs)) | |
def __eq__(self, other): | |
return set(self.lhs) == set(other.lhs) and set(self.rhs) == set(other.rhs) | |
def __hash__(self): | |
return hash(frozenset(self.lhs + self.rhs)) | |
def __len__(self): | |
return len(self.lhs + self.rhs) | |
def _ap_gen_rules( | |
itemset: tuple, | |
met: typing.List[tuple], | |
itemsets: typing.Dict[int, typing.Dict[tuple, int]], | |
min_conf: float, | |
num_transactions: int | |
) -> Rule: | |
def count(_set): | |
return itemsets[len(_set)][_set] | |
if len(itemset) <= (len(met[0]) + 1): | |
return | |
met = list(_generate(met)) | |
met_copy = met.copy() | |
for item in met: | |
lhs = tuple(sorted(list(set(itemset).difference(set(item))))) | |
if (count(itemset) / count(lhs)) >= min_conf: | |
yield Rule( | |
lhs, item, count(itemset), | |
count(lhs), count(item), num_transactions) | |
else: | |
met_copy.remove(item) | |
if met_copy: | |
yield from _ap_gen_rules( | |
itemset, met_copy, itemsets, min_conf, num_transactions | |
) | |
def rules( | |
itemsets: typing.Dict[int, typing.Dict[tuple, int]], | |
min_confidence: float, | |
num_transactions: int | |
) -> typing.Iterable[Rule]: | |
if not (0 <= min_confidence <= 1 and isinstance(min_confidence, Number)): | |
raise ValueError() | |
if not (num_transactions >= 0 and isinstance(num_transactions, Number)): | |
raise ValueError() | |
def count(_set): | |
return itemsets[len(_set)][_set] | |
for size, counter in itemsets.items(): | |
if size < 2: | |
continue | |
for itemset in sorted(counter.keys()): | |
for removed in combinations(itemset, 1): | |
lhs = tuple(sorted(list(set(itemset).difference(set(removed))))) | |
if count(itemset) / count(lhs) >= min_confidence: | |
yield Rule( | |
lhs, removed, count(itemset), | |
count(lhs), count(removed), num_transactions | |
) | |
yield from _ap_gen_rules( | |
itemset, list(combinations(itemset, 1)), | |
itemsets, min_confidence, num_transactions | |
) | |
def apriori( | |
transactions: typing.List[tuple], | |
min_support: float = 0.5, | |
min_confidence: float = 0.5 | |
) -> (typing.Dict[int, typing.Dict[tuple, int]], typing.List[Rule]): | |
itemsets, num_ = frequent_itemsets(transactions, min_support) | |
r = rules(itemsets, min_confidence, num_) | |
return itemsets, list(r) | |
if __name__ == '__main__': | |
s = [(1, 2, 5), (2, 4), (2, 3), (1, 2, 4), | |
(1, 3), (2, 3), (1, 3), (1, 2, 3, 5), (1, 2, 3)] | |
pprint(apriori(s, 0.2, 0.5)) | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment