Skip to content

Instantly share code, notes, and snippets.

@allenyang79
Last active April 12, 2018 06:01
Show Gist options
  • Save allenyang79/faf6b3593bd3b280ef44679dc85a2281 to your computer and use it in GitHub Desktop.
Save allenyang79/faf6b3593bd3b280ef44679dc85a2281 to your computer and use it in GitHub Desktop.
duration.py
import collections
import bisect
import json
import copy
import sys
Cut = collections.namedtuple('Cut', ['at', 'status'])
class Duration(object):
def __init__(self, begin_at, end_at):
self.begin_at = begin_at
self.end_at = end_at
self.items = [
Cut(begin_at, "on")
]
def apply_mask(self, begin_at, end_at):
begin_at = max(self.begin_at, begin_at)
end_at = min(self.end_at, end_at)
if begin_at >= end_at:
return
#raise Exception('begin_at: %s can not gather then end_at: %s' % (begin_at, end_at))
self.items.append(Cut(begin_at, "off"))
self.items.append(Cut(end_at, "on"))
def __iter__(self):
def fn(a, b):
if a.at < b.at:
return -1
elif b.at < a.at:
return 1
elif a.at == b.at:
if a.status == "on":
return -1
else:
return 1
items = sorted(self.items, cmp=fn)
return self.consume(items)
def consume(self, items):
retain = 0
a = None
b = None
for p in items:
if p.at >= self.end_at:
break
if retain == 0:
if p.status == "on":
a = p.at
else:
b = p.at
retain += 1
else:
if p.status == "on":
retain -= 1
else:
retain += 1
if retain == 0:
a = p.at
if a and b:
if a != b:
yield (a,b)
a = None
b = None
if a:
yield (a, self.end_at)
d = Duration(1000, 2000)
d.apply_mask(1200, 1700)
d.apply_mask(1400, 1600)
d.apply_mask(1800, 1900)
print list(d)
assert list(d) == [(1000, 1200), (1700, 1800), (1900, 2000)]
d = Duration(1000, 2000)
d.apply_mask(1000, 1700)
d.apply_mask(1800, 1900)
print list(d)
assert list(d) == [(1700, 1800), (1900, 2000)]
d = Duration(1000, 2000)
d.apply_mask(1000, 1100)
d.apply_mask(1400, 1700)
d.apply_mask(1700, 1900)
print list(d)
assert list(d) == [(1100, 1400), (1900, 2000)]
d = Duration(1000, 2000)
d.apply_mask(1100, 1100)
d.apply_mask(1400, 1700)
d.apply_mask(1700, 1900)
print list(d)
assert list(d) == [(1000, 1400), (1900, 2000)]
d = Duration(1000, 2000)
d.apply_mask(1100, 1200)
d.apply_mask(1400, 1500)
d.apply_mask(1600, 1800)
print list(d)
assert list(d) == [(1000, 1100), (1200, 1400), (1500, 1600), (1800, 2000)]
d = Duration(1000, 2000)
d.apply_mask(1100, 1200)
d.apply_mask(1400, 1500)
d.apply_mask(1600, 2400)
print list(d)
#assert list(d) == [(1000, 1100), (1200, 1400), (1500, 1600)]
import bisect
import json
import collections
class Duration(object):
def __init__(self, begin_at, end_at):
self.begin_at = begin_at
self.end_at = end_at
self.data = collections.OrderedDict([
(begin_at, True),
(end_at, None)
])
def __str__(self):
return str(self.data)
def remove(self, begin_at, end_at):
indexes = self.data.keys()
if indexes[0] <= begin_at <= indexes[-1] and begin_at not in indexes:
i = bisect.bisect_left(indexes,begin_at)
indexes.insert(i, begin_at)
if indexes[0] <= end_at <= indexes[-1] and end_at not in indexes:
i = bisect.bisect_left(indexes, end_at)
indexes.insert(i, end_at)
ret = collections.OrderedDict()
for i, at in enumerate(indexes):
if at < begin_at:
ret[at] = self.data[at]
elif at == begin_at:
if ret[indexes[i - 1]]:
ret[at] = False
elif begin_at < at < end_at:
continue
elif end_at == at:
ret[at] = True
elif end_at < at:
ret[at] = self.data[at]
self.data = ret
d = Duration(1000, 2000)
d.remove(1200, 1500)
d.remove(1400, 1600)
d.remove(1700, 1800)
d.remove(1800, 2300)
print json.dumps(d.data, indent=2, sort_keys=True)
#d.remove(1000, 1600)
#d.remove(1500, 1800)
#d.remove(800, 1300)
#d.remove(1500, 1800)
print d
#print json.dumps(d.data, indent=2, sort_keys=True)
import bisect
import json
import collections
import sys
duration = [
(1000, True),
(1200, False),
(1500, True),
(1700, False),
(2000, True),
]
mask = [
(1400, False),
(1600, True)
]
def remove(duration, mask):
ret = []
while duration or mask:
dt = duration[0] if duration else (sys.maxint, None)
mt = mask[0] if mask else (sys.maxint, None)
if not ret:
if dt[0] < mt[0]:
ret.append(duration.pop(0))
elif mt[0] <= dt[0]:
ret.append(mask.pop(0))
else:
last = ret[-1]
print "===", dt, mt, last
if dt[0] <= mt[0]:
t = duration.pop(0)
if t[0] < last[0]:
pass
else:
ret.append(t)
elif mt[0] < dt[0]:
t = mask.pop(0)
if t[0] < last[0]:
pass
else:
ret.append(t)
print ret#, dt, mt
return ret
#remove(duration, mask)
remove(
[
(1000, True),
(2000, False)
],
[
(1200, False),
(1400, True)
]
)
import collections
import bisect
import json
import copy
import sys
class DurationError(Exception):
pass
STATUS_ON = "on"
STATUS_OFF = "off"
STATUS_END = "end"
#class Section(object)
Section = collections.namedtuple('Section', ['at', 'status'])
class Duration(object):
def __init__(self, begin_at, end_at):
self.begin_at = begin_at
self.end_at = end_at
self.durations = [
Section(begin_at, STATUS_ON),
Section(end_at, STATUS_OFF)
]
def __str__(self):
return str(self.durations)
def apply_mask(self, begin_at, end_at):
if begin_at > end_at:
raise DurationError('end_at: %s muse be gather then begin_at: %s' % (end_at, begin_at))
if begin_at == end_at:
return True
begin_at = max(self.begin_at, begin_at)
end_at = min(self.end_at, end_at)
mask = [
Section(begin_at, STATUS_OFF),
Section(end_at, STATUS_ON)
]
durations = copy.deepcopy(self.durations)
ret = []
in_mask = False
while durations or mask:
dh = durations[0] if durations else Section(sys.maxint, STATUS_ON)
mh = mask[0] if mask else Section(sys.maxint, STATUS_ON)
if dh.at < mh.at:
tmp = durations.pop(0)
else: # if mh.at <= dh.at:
# on mask phase
tmp = mask.pop(0)
if tmp.status == STATUS_OFF:
in_mask = True
else:
in_mask = False
if tmp.at >= self.end_at:
break
if not ret:
ret.append(tmp)
continue
if in_mask:
status = STATUS_OFF
else:
status = tmp.status
if status == ret[-1].status:
continue
#if status != ret[-1].status:
ret.append(Section(tmp.at, tmp.status))
ret.append(Section(self.end_at, STATUS_END))
self.durations = ret
def __iter__(self):
for i, d in enumerate(self.durations):
if d.status == STATUS_ON:
yield (d.at, self.durations[i + 1].at)
d = Duration(1000, 2000)
d.apply_mask(900,1300)
d.apply_mask(1800,2700)
assert d.durations == [
Section(1000, STATUS_OFF),
Section(1300, STATUS_ON),
Section(1800, STATUS_OFF),
Section(2000, STATUS_END)
]
#
#
d = Duration(1000, 2000)
d.apply_mask(100,1300)
d.apply_mask(1400,1700)
assert d.durations == [
Section(1000, STATUS_OFF),
Section(1300, STATUS_ON),
Section(1400, STATUS_OFF),
Section(1700, STATUS_ON),
Section(2000, STATUS_END)
]
print "===================="
d = Duration(1000, 2000)
d.apply_mask(1200,1600)
d.apply_mask(1400,1700)
d.apply_mask(1900,2100)
for row in d:
print row
assert d.durations == [
Section(1000, STATUS_ON),
Section(1200, STATUS_OFF),
Section(1700, STATUS_ON),
Section(1900, STATUS_OFF),
Section(2000, STATUS_END)
]
print "===================="
d = Duration(1000, 2000)
d.apply_mask(1200,1400)
d.apply_mask(1600,1800)
assert d.durations == [
Section(1000, STATUS_ON),
Section(1200, STATUS_OFF),
Section(1400, STATUS_ON),
Section(1600, STATUS_OFF),
Section(1800, STATUS_ON),
Section(2000, STATUS_END)
]
for row in d:
print row
d = Duration(1000, 2000)
d.apply_mask(1000,2000)
assert d.durations == [Section(1000, STATUS_OFF), Section(2000, STATUS_END)]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment