Last active
December 12, 2022 06:56
-
-
Save xfenix/6726844 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Number(models.Model): | |
MASKS = dict( | |
xyxy=r'(?:(\d)((?!\1)\d)\1\2)', | |
xyyx=r'(?:(\d)((?!\1)\d)\2\1)', | |
xxyy=r'(?:(\d)\1((?!\1)\d)\2)', | |
two_digits=lambda value, where: Number.match_two_digits(value), | |
sequence=lambda value, where: Number.match_sequence(value, where), | |
x0y0z0=r'(?:(\d)0((?!\1)\d)0((?!\1|\2)\d)0)', | |
xyzaxyz=r'(?:(\d)((?!\1)\d)((?!\1|\2)\d)\d\1\2\3)', | |
xyzxyza=r'(?:(\d)((?!\1)\d)((?!\1|\2)\d)\1\2\3\d)', | |
axyzxyz=r'(?:\d(\d)((?!\1)\d)((?!\1|\2)\d)\1\2\3)', | |
xxxx=r'(\d)(\1){3}', | |
xxx=r'(\d)(\1){2}', | |
) | |
SEQUENCE_MIN_LEN = 4 | |
SEQUENCE_MAX_SWITCHES = 1 | |
MASK_BATCH_SIZE = 5000 | |
STR_START = 'start' | |
STR_MIDDLE = 'mid' | |
STR_END = 'end' | |
STR_SIDES = (STR_START, STR_MIDDLE, STR_END) | |
SKIP_ADD_SIDES = ('two_digits', 'direct', 'xyzaxyz', 'axyzxyz', 'xyzxyza') | |
flags = BitField( | |
flags={ | |
0: 'direct', | |
1: '%s_xyxy' % STR_START, | |
2: '%s_xyxy' % STR_MIDDLE, | |
3: '%s_xyxy' % STR_END, | |
4: '%s_xyyx' % STR_START, | |
5: '%s_xyyx' % STR_MIDDLE, | |
6: '%s_xyyx' % STR_END, | |
7: '%s_xxyy' % STR_START, | |
8: '%s_xxyy' % STR_MIDDLE, | |
9: '%s_xxyy' % STR_END, | |
10: 'two_digits', | |
11: '%s_sequence' % STR_START, | |
12: '%s_sequence' % STR_MIDDLE, | |
13: '%s_sequence' % STR_END, | |
14: '%s_x0y0z0' % STR_START, | |
15: '%s_x0y0z0' % STR_END, | |
16: 'xyzaxyz', | |
17: 'xyzxyza', | |
18: 'axyzxyz', | |
19: '%s_xxxx' % STR_START, | |
20: '%s_xxxx' % STR_MIDDLE, | |
21: '%s_xxxx' % STR_END, | |
22: '%s_xxx' % STR_START, | |
23: '%s_xxx' % STR_MIDDLE, | |
24: '%s_xxx' % STR_END, | |
25: 'processed' | |
}, | |
default=0, | |
db_index=True, | |
verbose_name=u"Флаги" | |
) | |
@classmethod | |
def match_two_digits(cls, value): | |
""" | |
Checks whether the number is two digits (9051212112) | |
""" | |
return len(set(value)) == 2 | |
@classmethod | |
def match_sequence(cls, value, where): | |
""" | |
Checks whether the number is ladder (12345, 1234321 and etc.) | |
If ladder found, return max ladder length, else | |
return false | |
""" | |
UP = 2 | |
DOWN = 1 | |
ladder_lengths = {} | |
skip_to = 0 | |
def _prepare(i): | |
vals = [] | |
for index in (i, i + 1): | |
try: | |
vals.append(int(value[index])) | |
except: | |
return False | |
return vals | |
def is_step(i): | |
vals = _prepare(i) | |
if vals: | |
return abs(vals[1] - vals[0]) == 1 | |
else: | |
return False | |
def get_direction(i): | |
vals = _prepare(i) | |
if vals: | |
return DOWN if (vals[1] - vals[0]) < 0 else UP | |
else: | |
return False | |
for now, _ in enumerate(value): | |
if now < skip_to: | |
continue | |
# check is next value is a step | |
if is_step(now): | |
# go forward and try to find ladder | |
ladder = 1 | |
direct_counter = 0 | |
direct_now = None | |
for forw_now, __ in enumerate(value[now:]): | |
index = forw_now + now | |
# count direction switch | |
direct_next = get_direction(index) | |
if direct_next: | |
if direct_now and direct_now != direct_next: | |
direct_counter += 1 | |
direct_now = direct_next | |
# if is step and count of direction changes | |
# less than SEQUENCE_MAX_SWITCHES, incr ladder | |
if direct_counter <= cls.SEQUENCE_MAX_SWITCHES and is_step(index): | |
ladder += 1 | |
# else skip | |
else: | |
skip_to = index + 1 | |
break | |
ladder_lengths[now] = ladder | |
if ladder_lengths: | |
index, length = max(ladder_lengths.iteritems(), key=lambda x: x[1]) | |
if length >= cls.SEQUENCE_MIN_LEN: | |
if index == 0: | |
pos = cls.STR_START | |
elif index + length >= len(value): | |
pos = cls.STR_END | |
else: | |
pos = cls.STR_MIDDLE | |
if where == 0 or where == pos: | |
return length | |
return False | |
@classmethod | |
def calculate_masks(cls, is_seven=True, slice_size=None): | |
all_items = [] | |
slice_size = cls.MASK_BATCH_SIZE if not slice_size else slice_size | |
calculated = {flag_type: [] for flag_type in cls.flags} | |
uncalculated = cls.objects.filter(flags=~cls.flags.processed)[:slice_size] | |
for item in uncalculated.iterator(): | |
for flag_type in cls.flags: | |
is_this_flag = False | |
where = 0 | |
parts = flag_type.split('_') | |
if parts[0] and parts[0] in cls.STR_SIDES: | |
where = parts[0] | |
checker = cls.MASKS[parts[1]] | |
else: | |
try: | |
checker = cls.MASKS[flag_type] | |
except: | |
continue | |
value = str(item.number10) | |
value = value[3:] if is_seven else value | |
if hasattr(checker, '__call__'): | |
is_this_flag = checker(value, where) | |
else: | |
if where == cls.STR_START: | |
checker = r'^%s.+$' % checker | |
elif where == cls.STR_MIDDLE: | |
checker = r'^.+%s.+$' % checker | |
elif where == cls.STR_END: | |
checker = r'^.+%s$' % checker | |
is_this_flag = re.search(checker, value) | |
if is_this_flag: | |
calculated[flag_type].append(item.pk) | |
all_items.append(item.pk) | |
del uncalculated | |
for flag_type, keys in calculated.items(): | |
flag = getattr(cls.flags, flag_type) | |
if keys: | |
# remove flag from all items, except keys | |
cls.objects\ | |
.filter(pk__in=set(all_items) - set(keys))\ | |
.update(flags=F('flags') & ~flag) | |
# add flag to calculated items | |
cls.objects\ | |
.filter(pk__in=keys)\ | |
.update(flags=F('flags') | flag) | |
cls.objects\ | |
.filter(pk__in=all_items)\ | |
.update(flags=F('flags') | cls.flags.processed) | |
@classmethod | |
def calculate_all_masks(cls, batch=True): | |
total = cls.objects.count() | |
if batch: | |
cls.calculate_masks(slice_size=total) | |
else: | |
iterates = int(math.ceil(float(total) / cls.MASK_BATCH_SIZE)) | |
for i in xrange(iterates): | |
cls.calculate_masks() | |
""" | |
Hook this into tastypie's apply_filter and go on | |
""" | |
@classmethod | |
def mask_filter(cls, filt, lookup=''): | |
number_field = '%snumber10' % lookup | |
flag_field = '%sflags' % lookup | |
base_filter = None | |
mask_filter = None | |
flagset = cls.flags | |
position = filt['position'] if 'position' in filt else '' | |
base = filt['base'] if 'base' in filt else '' | |
mask = filt['mask'] if 'mask' in filt else '' | |
if not position: | |
position = cls.STR_SIDES | |
""" | |
Base patterns from checkbox list | |
""" | |
if base: | |
flags = [] | |
for flag in base: | |
if flag in cls.SKIP_ADD_SIDES: | |
flags.append(flag) | |
else: | |
for str_type in position: | |
if str_type == cls.STR_MIDDLE and flag == 'x0y0z0': | |
continue | |
if str_type in cls.STR_SIDES: | |
flags.append(str_type + '_' + flag) | |
if flags: | |
base_filter = Q(**{flag_field: getattr(flagset, flags.pop())}) | |
for flag in flags: | |
base_filter |= Q(**{flag_field: getattr(flagset, flag)}) | |
print base_filter | |
""" | |
Mask patterns (with user input) | |
""" | |
if mask: | |
# simple patterns | |
mask = mask.replace('-', '').replace(' ', '') | |
simple_patterns = dict( | |
xxxx=dict( | |
flags=Q(**{flag_field: flagset.start_xxxx}) | Q(**{flag_field: flagset.mid_xxxx}) | Q(**{flag_field: flagset.end_xxxx}), | |
re=r'^(x|y|z){4,7}$' | |
), | |
xxx=dict( | |
flags=Q(**{flag_field: flagset.start_xxx}) | Q(**{flag_field: flagset.start_xxx}) | Q(**{flag_field: flagset.start_xxx}) | |
), | |
xyzyzyz=dict( | |
flags=Q(**{flag_field: flagset.end_xyxy}) & Q(**{flag_field: flagset.mid_xyxy}) | |
), | |
xxxyyzz=dict( | |
flags=Q(**{flag_field: flagset.start_xxx}) & Q(**{flag_field: flagset.end_xxyy}) | |
), | |
xxxyzyz=dict( | |
flags=Q(**{flag_field: flagset.start_xxx}) & Q(**{flag_field: flagset.end_xyxy}) | |
), | |
xxyyzzz=dict( | |
flags=Q(**{flag_field: flagset.start_xxyy}) & Q(**{flag_field: flagset.end_xxx}) | |
), | |
xxyyyzz=dict( | |
flags=Q(**{flag_field: flagset.mid_xxx}), | |
cond=['xxyyyzz', 'xyzzzxy'] | |
), | |
xyzxyz=dict( | |
flags=Q(**{flag_field: flagset.axyzxyz}) | Q(**{flag_field: flagset.xyzaxyz}) | Q(**{flag_field: flagset.xyzxyza}) | |
), | |
two_digits=dict( | |
flags=Q(**{flag_field: flagset.two_digits}) | |
), | |
sequence=dict( | |
flags=Q(**{flag_field: flagset.start_sequence}) | Q(**{flag_field: flagset.mid_sequence}) | Q(**{flag_field: flagset.end_sequence}), | |
cond=['ladder', 'sequence'] | |
) | |
) | |
for ctype, data in simple_patterns.items(): | |
check = False | |
if 'cond' in data: | |
check = data['cond'].pop() == mask | |
for item in data['cond']: | |
check = check or (item == mask) | |
elif 're' in data: | |
check = re.search(data['re'], mask) | |
else: | |
check = mask == ctype | |
if check: | |
mask_filter = data['flags'] | |
break | |
# complex mixed patterns | |
if not mask_filter: | |
start_re = lambda tpl: '^[0-9]{3}%s[0-9]+$' % tpl.group(1) | |
end_re = lambda tpl: '^[0-9]+%s$' % tpl.group(1) | |
complex_patterns = { | |
'abc-xy-xy': dict( | |
str_regexp=r'^(\d*)xyxy$', | |
flags='end_xyxy', | |
db_regexp=start_re, | |
), | |
'abc-xx-yy': dict( | |
str_regexp=r'^(\d*)xxyy$', | |
flags='end_xxyy', | |
db_regexp=start_re, | |
), | |
'abc-xy-yx': dict( | |
str_regexp=r'^(\d*)xyyx$', | |
flags='end_xyyx', | |
db_regexp=start_re, | |
), | |
'xy-xy-abc': dict( | |
str_regexp=r'^xyxy(\d*)$', | |
flags='start_xyxy', | |
db_regexp=end_re, | |
), | |
'xx-yy-abc': dict( | |
str_regexp=r'^xxyy(\d*)$', | |
flags='start_xxyy', | |
db_regexp=end_re, | |
), | |
'xy-yx-abc': dict( | |
str_regexp=r'^xyyx(\d*)$', | |
flags='start_xyyx', | |
db_regexp=end_re, | |
), | |
'xyz-a-xyz': dict( | |
str_regexp=r'^xyz(\d)xyz$', | |
flags='xyzaxyz', | |
db_regexp=lambda tpl: '^[0-9]{6}%s[0-9]+$' % tpl.group(1), | |
), | |
'abcd-xxx': dict( | |
str_regexp=r'^(\d*)xxx$', | |
flags='end_xxx', | |
db_regexp=start_re, | |
), | |
'xxx-abcd': dict( | |
str_regexp=r'^xxx(\d*)$', | |
flags='start_xxx', | |
db_regexp=end_re, | |
), | |
'abc-xxx-d': dict( | |
str_regexp=r'^(\d*)xxx(\d)$', | |
flags='mid_xxx', | |
db_regexp=lambda tpl: '^[0-9]{3}%s[0-9]+%s$' % ( | |
tpl.group(1), tpl.group(2) | |
), | |
) | |
} | |
for key, data in complex_patterns.items(): | |
if not mask_filter: | |
tpl = re.search(data['str_regexp'], mask) | |
if tpl: | |
mask_filter = Q(**{ | |
flag_field: getattr(flagset, data['flags']) | |
}) & Q(**{ | |
'%s__regex' % number_field: data['db_regexp'](tpl) | |
}) | |
else: | |
break | |
# if still not enough, try to match digit mask | |
if not mask_filter: | |
if len(position) == 3 or cls.STR_MIDDLE in position: | |
mask_filter = Q(**{'%s__contains' % number_field: mask}) | |
else: | |
if cls.STR_START in position: | |
mask_filter = Q(**{'%s__startswith' % number_field: mask}) | |
if cls.STR_END in position: | |
f = Q(**{'%s__endswith' % number_field: mask}) | |
if mask_filter: | |
mask_filter |= f | |
else: | |
mask_filter = f | |
return base_filter, mask_filter |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment