Last active
April 12, 2018 06:01
-
-
Save allenyang79/faf6b3593bd3b280ef44679dc85a2281 to your computer and use it in GitHub Desktop.
duration.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections | |
import bisect | |
import json | |
import copy | |
import sys | |
Cut = collections.namedtuple('Cut', ['at', 'status']) | |
class Duration(object): | |
def __init__(self, begin_at, end_at): | |
self.begin_at = begin_at | |
self.end_at = end_at | |
self.items = [ | |
Cut(begin_at, "on") | |
] | |
def apply_mask(self, begin_at, end_at): | |
begin_at = max(self.begin_at, begin_at) | |
end_at = min(self.end_at, end_at) | |
if begin_at >= end_at: | |
return | |
#raise Exception('begin_at: %s can not gather then end_at: %s' % (begin_at, end_at)) | |
self.items.append(Cut(begin_at, "off")) | |
self.items.append(Cut(end_at, "on")) | |
def __iter__(self): | |
def fn(a, b): | |
if a.at < b.at: | |
return -1 | |
elif b.at < a.at: | |
return 1 | |
elif a.at == b.at: | |
if a.status == "on": | |
return -1 | |
else: | |
return 1 | |
items = sorted(self.items, cmp=fn) | |
return self.consume(items) | |
def consume(self, items): | |
retain = 0 | |
a = None | |
b = None | |
for p in items: | |
if p.at >= self.end_at: | |
break | |
if retain == 0: | |
if p.status == "on": | |
a = p.at | |
else: | |
b = p.at | |
retain += 1 | |
else: | |
if p.status == "on": | |
retain -= 1 | |
else: | |
retain += 1 | |
if retain == 0: | |
a = p.at | |
if a and b: | |
if a != b: | |
yield (a,b) | |
a = None | |
b = None | |
if a: | |
yield (a, self.end_at) | |
d = Duration(1000, 2000) | |
d.apply_mask(1200, 1700) | |
d.apply_mask(1400, 1600) | |
d.apply_mask(1800, 1900) | |
print list(d) | |
assert list(d) == [(1000, 1200), (1700, 1800), (1900, 2000)] | |
d = Duration(1000, 2000) | |
d.apply_mask(1000, 1700) | |
d.apply_mask(1800, 1900) | |
print list(d) | |
assert list(d) == [(1700, 1800), (1900, 2000)] | |
d = Duration(1000, 2000) | |
d.apply_mask(1000, 1100) | |
d.apply_mask(1400, 1700) | |
d.apply_mask(1700, 1900) | |
print list(d) | |
assert list(d) == [(1100, 1400), (1900, 2000)] | |
d = Duration(1000, 2000) | |
d.apply_mask(1100, 1100) | |
d.apply_mask(1400, 1700) | |
d.apply_mask(1700, 1900) | |
print list(d) | |
assert list(d) == [(1000, 1400), (1900, 2000)] | |
d = Duration(1000, 2000) | |
d.apply_mask(1100, 1200) | |
d.apply_mask(1400, 1500) | |
d.apply_mask(1600, 1800) | |
print list(d) | |
assert list(d) == [(1000, 1100), (1200, 1400), (1500, 1600), (1800, 2000)] | |
d = Duration(1000, 2000) | |
d.apply_mask(1100, 1200) | |
d.apply_mask(1400, 1500) | |
d.apply_mask(1600, 2400) | |
print list(d) | |
#assert list(d) == [(1000, 1100), (1200, 1400), (1500, 1600)] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import bisect | |
import json | |
import collections | |
class Duration(object): | |
def __init__(self, begin_at, end_at): | |
self.begin_at = begin_at | |
self.end_at = end_at | |
self.data = collections.OrderedDict([ | |
(begin_at, True), | |
(end_at, None) | |
]) | |
def __str__(self): | |
return str(self.data) | |
def remove(self, begin_at, end_at): | |
indexes = self.data.keys() | |
if indexes[0] <= begin_at <= indexes[-1] and begin_at not in indexes: | |
i = bisect.bisect_left(indexes,begin_at) | |
indexes.insert(i, begin_at) | |
if indexes[0] <= end_at <= indexes[-1] and end_at not in indexes: | |
i = bisect.bisect_left(indexes, end_at) | |
indexes.insert(i, end_at) | |
ret = collections.OrderedDict() | |
for i, at in enumerate(indexes): | |
if at < begin_at: | |
ret[at] = self.data[at] | |
elif at == begin_at: | |
if ret[indexes[i - 1]]: | |
ret[at] = False | |
elif begin_at < at < end_at: | |
continue | |
elif end_at == at: | |
ret[at] = True | |
elif end_at < at: | |
ret[at] = self.data[at] | |
self.data = ret | |
d = Duration(1000, 2000) | |
d.remove(1200, 1500) | |
d.remove(1400, 1600) | |
d.remove(1700, 1800) | |
d.remove(1800, 2300) | |
print json.dumps(d.data, indent=2, sort_keys=True) | |
#d.remove(1000, 1600) | |
#d.remove(1500, 1800) | |
#d.remove(800, 1300) | |
#d.remove(1500, 1800) | |
print d | |
#print json.dumps(d.data, indent=2, sort_keys=True) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import bisect | |
import json | |
import collections | |
import sys | |
duration = [ | |
(1000, True), | |
(1200, False), | |
(1500, True), | |
(1700, False), | |
(2000, True), | |
] | |
mask = [ | |
(1400, False), | |
(1600, True) | |
] | |
def remove(duration, mask): | |
ret = [] | |
while duration or mask: | |
dt = duration[0] if duration else (sys.maxint, None) | |
mt = mask[0] if mask else (sys.maxint, None) | |
if not ret: | |
if dt[0] < mt[0]: | |
ret.append(duration.pop(0)) | |
elif mt[0] <= dt[0]: | |
ret.append(mask.pop(0)) | |
else: | |
last = ret[-1] | |
print "===", dt, mt, last | |
if dt[0] <= mt[0]: | |
t = duration.pop(0) | |
if t[0] < last[0]: | |
pass | |
else: | |
ret.append(t) | |
elif mt[0] < dt[0]: | |
t = mask.pop(0) | |
if t[0] < last[0]: | |
pass | |
else: | |
ret.append(t) | |
print ret#, dt, mt | |
return ret | |
#remove(duration, mask) | |
remove( | |
[ | |
(1000, True), | |
(2000, False) | |
], | |
[ | |
(1200, False), | |
(1400, True) | |
] | |
) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections | |
import bisect | |
import json | |
import copy | |
import sys | |
class DurationError(Exception): | |
pass | |
STATUS_ON = "on" | |
STATUS_OFF = "off" | |
STATUS_END = "end" | |
#class Section(object) | |
Section = collections.namedtuple('Section', ['at', 'status']) | |
class Duration(object): | |
def __init__(self, begin_at, end_at): | |
self.begin_at = begin_at | |
self.end_at = end_at | |
self.durations = [ | |
Section(begin_at, STATUS_ON), | |
Section(end_at, STATUS_OFF) | |
] | |
def __str__(self): | |
return str(self.durations) | |
def apply_mask(self, begin_at, end_at): | |
if begin_at > end_at: | |
raise DurationError('end_at: %s muse be gather then begin_at: %s' % (end_at, begin_at)) | |
if begin_at == end_at: | |
return True | |
begin_at = max(self.begin_at, begin_at) | |
end_at = min(self.end_at, end_at) | |
mask = [ | |
Section(begin_at, STATUS_OFF), | |
Section(end_at, STATUS_ON) | |
] | |
durations = copy.deepcopy(self.durations) | |
ret = [] | |
in_mask = False | |
while durations or mask: | |
dh = durations[0] if durations else Section(sys.maxint, STATUS_ON) | |
mh = mask[0] if mask else Section(sys.maxint, STATUS_ON) | |
if dh.at < mh.at: | |
tmp = durations.pop(0) | |
else: # if mh.at <= dh.at: | |
# on mask phase | |
tmp = mask.pop(0) | |
if tmp.status == STATUS_OFF: | |
in_mask = True | |
else: | |
in_mask = False | |
if tmp.at >= self.end_at: | |
break | |
if not ret: | |
ret.append(tmp) | |
continue | |
if in_mask: | |
status = STATUS_OFF | |
else: | |
status = tmp.status | |
if status == ret[-1].status: | |
continue | |
#if status != ret[-1].status: | |
ret.append(Section(tmp.at, tmp.status)) | |
ret.append(Section(self.end_at, STATUS_END)) | |
self.durations = ret | |
def __iter__(self): | |
for i, d in enumerate(self.durations): | |
if d.status == STATUS_ON: | |
yield (d.at, self.durations[i + 1].at) | |
d = Duration(1000, 2000) | |
d.apply_mask(900,1300) | |
d.apply_mask(1800,2700) | |
assert d.durations == [ | |
Section(1000, STATUS_OFF), | |
Section(1300, STATUS_ON), | |
Section(1800, STATUS_OFF), | |
Section(2000, STATUS_END) | |
] | |
# | |
# | |
d = Duration(1000, 2000) | |
d.apply_mask(100,1300) | |
d.apply_mask(1400,1700) | |
assert d.durations == [ | |
Section(1000, STATUS_OFF), | |
Section(1300, STATUS_ON), | |
Section(1400, STATUS_OFF), | |
Section(1700, STATUS_ON), | |
Section(2000, STATUS_END) | |
] | |
print "====================" | |
d = Duration(1000, 2000) | |
d.apply_mask(1200,1600) | |
d.apply_mask(1400,1700) | |
d.apply_mask(1900,2100) | |
for row in d: | |
print row | |
assert d.durations == [ | |
Section(1000, STATUS_ON), | |
Section(1200, STATUS_OFF), | |
Section(1700, STATUS_ON), | |
Section(1900, STATUS_OFF), | |
Section(2000, STATUS_END) | |
] | |
print "====================" | |
d = Duration(1000, 2000) | |
d.apply_mask(1200,1400) | |
d.apply_mask(1600,1800) | |
assert d.durations == [ | |
Section(1000, STATUS_ON), | |
Section(1200, STATUS_OFF), | |
Section(1400, STATUS_ON), | |
Section(1600, STATUS_OFF), | |
Section(1800, STATUS_ON), | |
Section(2000, STATUS_END) | |
] | |
for row in d: | |
print row | |
d = Duration(1000, 2000) | |
d.apply_mask(1000,2000) | |
assert d.durations == [Section(1000, STATUS_OFF), Section(2000, STATUS_END)] | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment