This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import namedtuple | |
import re | |
import sys | |
SIGNALFX_API = 'https://app.signalfx.com/#/' | |
RES_TYPE_RE= re.compile(r"^(?P<type>\S+?)\..*:$") | |
# STATES |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from md5 import md5 | |
class FakeHashRing(object): | |
def __init__(self, nodes): | |
self.nodes = list(nodes) | |
def _hash(self, key): | |
return int(md5(str(key).encode('utf-8')).hexdigest(), 16) % len(self.nodes) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from job_class import Job | |
def delete_expired_stuff(time): | |
print "Delete expired stuff before %s" % time | |
class CleanupJob(Job): | |
# configuration: run at 5am | |
run_at = '05:00' | |
# implementation: nuke expired sessions |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class storelast(object): | |
def __init__(self,source): | |
self.source = source | |
def next(self): | |
item = self.source.next() | |
self.last = item | |
return item | |
def __iter__(self): | |
return self |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def trace(source): | |
for item in source: | |
print item | |
yield item | |
r404 = trace(r for r in log if r['status'] == 404) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
lines = (for line in open("run/foo/access-log")) | |
# PYTHON 3 ONLY!!! | |
for i,line in enumerate(lines): | |
print line | |
if i == 10: lines.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
def follow(thefile): | |
thefile.seek(0,2) | |
while True: | |
line = thefile.readline() | |
if not line: | |
time.sleep(0.1) | |
continue | |
yield line |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
wwwlog = open("access-log") | |
total = 0 | |
for line in wwwlog: | |
bytestr = line.rsplit(None,1)[1] | |
if bytestr != '-': | |
total += int(bytestr) | |
print "Total", total | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# raise exception | |
mock = Mock(side_effect=KeyError('foo')) | |
mock() | |
# KeyError: 'foo' | |
# return value based on argument | |
values = {'a': 1, 'b': 2, 'c': 3} | |
def side_effect(arg): | |
return values[arg] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
test_params = { | |
'empty_line': ('', {}), | |
'get_ok': ('GET 200', {'request': 'GET', 'status': '200'}), | |
'get_not_found': ('GET 404', {'request': 'GET', 'status': '404'}), | |
} | |
@pytest.mark.parametrize('line,expected', test_params.values(), ids=test_params.keys()) | |
def test_decode(self, line, expected): | |
assert Decoder().decode(line) == expected | |
NewerOlder