Created
July 12, 2011 06:25
-
-
Save ketralnis/1077499 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/Users/dking/src/pypy/pypy/translator/goal/pypy-c | |
#!/usr/bin/env python2.7 | |
import sys | |
import math | |
import bisect | |
import sqlite3 | |
import os.path | |
import itertools | |
from datetime import datetime | |
from hashlib import md5 | |
from collections import namedtuple, defaultdict | |
# note: many of the following library functions are ripped right out of reddit's | |
# source code https://github.com/reddit/reddit/ | |
def in_chunks(it, size=25): | |
chunk = [] | |
it = iter(it) | |
try: | |
while True: | |
chunk.append(it.next()) | |
if len(chunk) >= size: | |
yield chunk | |
chunk = [] | |
except StopIteration: | |
if chunk: | |
yield chunk | |
def to_base(q, alphabet): | |
if q < 0: raise ValueError, "must supply a positive integer" | |
l = len(alphabet) | |
converted = [] | |
while q != 0: | |
q, r = divmod(q, l) | |
converted.insert(0, alphabet[r]) | |
return "".join(converted) or '0' | |
def to36(q): | |
return to_base(q, '0123456789abcdefghijklmnopqrstuvwxyz') | |
class emitter(object): | |
def __init__(self, f): | |
self.fname = f | |
self.data = [] | |
def emit(self, *s): | |
self.data.append(s) | |
def close(self): | |
with open(self.fname, 'w') as f: | |
for d in self.data: | |
f.write(','.join(map(str, d))) | |
f.write('\n') | |
class emit(object): | |
def __init__(self, fname): | |
self.fname = fname | |
def __enter__(self): | |
self.emitter = emitter(self.fname) | |
return self.emitter | |
def __exit__(self, type, value, traceback): | |
self.emitter.close() | |
del self.emitter | |
def hashdir(prefix, fname, postfix, l=2): | |
fname = str(fname) | |
h = md5(fname).hexdigest() | |
dname = os.path.join(prefix, h[:l]) | |
name = os.path.join(dname, fname+postfix) | |
if not os.path.isdir(dname): | |
os.makedirs(dname) | |
return name | |
def pow2(score): | |
if score <= 0: | |
return -1 | |
elif score < 2: | |
return 0 | |
else: | |
return 2 ** int(math.log(score, 2)) | |
def epoch_seconds(date): | |
"""Returns the number of seconds from the epoch to date. Should | |
match the number returned by the equivalent function in | |
postgres.""" | |
td = date - epoch | |
return td.days * 86400 + td.seconds + (float(td.microseconds) / 1000000) | |
def score(ups, downs): | |
return ups - downs | |
def hot(ups, downs, date): | |
return _hot(ups, downs, epoch_seconds(date)) | |
def _hot(ups, downs, date): | |
"""The hot formula. Should match the equivalent function in postgres.""" | |
s = score(ups, downs) | |
order = math.log10(max(abs(s), 1)) | |
if s > 0: | |
sign = 1 | |
elif s < 0: | |
sign = -1 | |
else: | |
sign = 0 | |
seconds = date - 1134028003 | |
return round(order + sign * seconds / 45000, 7) | |
def progress(it, verbosity=100, key=repr, estimate=None, persec=True): | |
"""An iterator that yields everything from `it', but prints progress | |
information along the way, including time-estimates if | |
possible""" | |
# stolen wholesale from reddit's utils.py | |
from itertools import islice | |
from datetime import datetime | |
from time import time | |
import sys | |
now = start = datetime.now() | |
elapsed = start - start | |
# try to guess at the estimate if we can | |
if estimate is None: | |
try: | |
estimate = len(it) | |
except: | |
pass | |
def timedelta_to_seconds(td): | |
return td.days * (24*60*60) + td.seconds + (float(td.microseconds) / 1000000) | |
def format_timedelta(td, sep=''): | |
ret = [] | |
s = timedelta_to_seconds(td) | |
if s < 0: | |
neg = True | |
s *= -1 | |
else: | |
neg = False | |
if s >= (24*60*60): | |
days = int(s//(24*60*60)) | |
ret.append('%dd' % days) | |
s -= days*(24*60*60) | |
if s >= 60*60: | |
hours = int(s//(60*60)) | |
ret.append('%dh' % hours) | |
s -= hours*(60*60) | |
if s >= 60: | |
minutes = int(s//60) | |
ret.append('%dm' % minutes) | |
s -= minutes*60 | |
if s >= 1: | |
seconds = int(s) | |
ret.append('%ds' % seconds) | |
s -= seconds | |
if not ret: | |
return '0s' | |
return ('-' if neg else '') + sep.join(ret) | |
def format_datetime(dt, show_date=False): | |
if show_date: | |
return dt.strftime('%Y-%m-%d %H:%M') | |
else: | |
return dt.strftime('%H:%M:%S') | |
def deq(dt1, dt2): | |
"Indicates whether the two datetimes' dates describe the same (day,month,year)" | |
d1, d2 = dt1.date(), dt2.date() | |
return ( d1.day == d2.day | |
and d1.month == d2.month | |
and d1.year == d2.year) | |
sys.stderr.write('Starting at %s\n' % (start,)) | |
# we're going to islice it so we need to start an iterator | |
it = iter(it) | |
seen = 0 | |
while True: | |
this_chunk = 0 | |
thischunk_started = time() | |
# the simple bit: just iterate and yield | |
while (this_chunk < verbosity | |
# don't print status reports more than once per second (this | |
# makes picking a verbosity that's too low a little less dicey) | |
or time() - thischunk_started < 1 | |
): | |
try: | |
item = it.next() | |
this_chunk += 1 | |
seen += 1 | |
yield item | |
except StopIteration: | |
break | |
if this_chunk < verbosity: | |
# we're done, the iterator is empty | |
break | |
now = datetime.now() | |
elapsed = now - start | |
thischunk_seconds = time() - thischunk_started | |
if estimate: | |
# the estimate is based on the total number of items that we've | |
# processed in the total amount of time that's passed, so it should | |
# smooth over momentary spikes in speed (but will take a while to | |
# adjust to long-term changes in speed). TODO: we could take the | |
# mean of the estimate based on all time and the estimate based on | |
# the most recent N periods? | |
remaining = ((elapsed/seen)*estimate)-elapsed | |
completion = now + remaining | |
count_str = ('%d/%d %.2f%%' | |
% (seen, estimate, float(seen)/estimate*100)) | |
completion_str = format_datetime(completion, not deq(completion,now)) | |
estimate_str = (' (%s remaining; completion %s)' | |
% (format_timedelta(remaining), | |
completion_str)) | |
else: | |
count_str = '%d' % seen | |
estimate_str = '' | |
if key: | |
key_str = ': %s' % key(item) | |
else: | |
key_str = '' | |
# unlike the estimate, the persec count is the number per | |
# second for *this* batch only, without smoothing | |
if persec and thischunk_seconds > 0: | |
persec_str = ' (%.1f/s)' % (float(this_chunk)/thischunk_seconds,) | |
else: | |
persec_str = '' | |
sys.stderr.write('%s%s, %s%s%s\n' | |
% (count_str, persec_str, | |
format_timedelta(elapsed), estimate_str, key_str)) | |
now = datetime.now() | |
elapsed = now - start | |
elapsed_seconds = timedelta_to_seconds(elapsed) | |
if persec and seen > 0 and elapsed_seconds > 0: | |
persec_str = ' (@%.1f/sec)' % (float(seen)/elapsed_seconds) | |
else: | |
persec_str = '' | |
sys.stderr.write('Processed %d%s items in %s..%s (%s)\n' | |
% (seen, | |
persec_str, | |
format_datetime(start, not deq(start, now)), | |
format_datetime(now, not deq(start, now)), | |
format_timedelta(elapsed))) | |
# Votes: 6064280 | |
# Links: 228664 | |
# Max v/l: 20857 | |
# Avg v/l: 27 | |
def _l(l): | |
return l if isinstance(l, list) else list(l) | |
class Link(object): | |
__slots__ = ['linkid', 'ups', 'downs', 'timestamp'] | |
def __init__(self, linkid, ups, downs, timestamp): | |
self.linkid = linkid | |
self.ups = ups | |
self.downs = downs | |
self.timestamp = timestamp | |
@property | |
def score(self): | |
return self.ups - self.downs | |
@property | |
def hot(self): | |
return _hot(self.ups, self.downs, self.timestamp) | |
def copy(self): | |
return Link(self.linkid, self.ups, self.downs, self.timestamp) | |
def report(slname): | |
sl = sqlite3.connect(slname) | |
# I just know how many of these there are | |
numvotes = 5362884 | |
print 'Finding mix/max of votes' | |
minvote, maxvote = sl.execute("select min(timestamp), max(timestamp) from votes").fetchone() | |
print 'Min: %s, Max: %s (%.2fs)' % (minvote, maxvote, maxvote-minvote) | |
verbosity = 10000 | |
scorehist = {} | |
numvoteshist = {} | |
timeofdayhist = {} | |
dirhist = {0:0, 1:0, -1:0} | |
print 'Preparing DB cursor for histogram gathering' | |
# the dump was filtered by vote time, so we probably have votes for old | |
# links in the DB that this query will ignore | |
c = sl.execute("""select v.link, | |
v.direction, | |
v.timestamp, | |
l.timestamp | |
from votes v, | |
links l | |
where v.link = l.link_id | |
and l.timestamp > ? | |
and l.timestamp < ? | |
order by v.link, v.timestamp""", | |
(minvote, maxvote)) | |
c = progress(c, verbosity, estimate=numvotes, key=lambda x: x[0]) | |
for l, vs in itertools.groupby(c, key=lambda x: x[0]): | |
# since we had the DB do the sorting for us, these are already | |
# sorted by timestamp | |
# we'll need to iterate this more than once | |
# vs = _l(vs) | |
byhour = {} | |
link = Link(l, 0, 0, 0) | |
# this loop also sets the timestamp and score | |
with emit(hashdir('data/scorebytime',l,'.csv')) as sbt, \ | |
emit(hashdir('data/votesbyhour',l,'.csv')) as vbh: | |
for linkid, direction, vtimestamp, ltimestamp in vs: | |
if link.timestamp == 0: | |
link.timestamp = ltimestamp | |
if direction == 1: | |
link.ups += 1 | |
elif direction == -1: | |
link.downs += 1 | |
sbt.emit(vtimestamp, link.score) | |
dt = datetime.utcfromtimestamp(vtimestamp) | |
timeofdayhist[dt.hour] = timeofdayhist.get(dt.hour, 0) + 1 | |
dirhist[direction] += 1 | |
h = vtimestamp/60/60 | |
byhour[h] = byhour.get(h, 0) + 1 | |
for h, c in sorted(byhour.iteritems()): | |
vbh.emit(h, c) | |
sp = pow2(link.score) | |
scorehist[sp] = scorehist.get(sp, 0) + 1 | |
if link.ups + link.downs > 0: | |
# otherwise all votes on this link were non-votes | |
nvp = pow2(link.ups+link.downs) | |
numvoteshist[nvp] = numvoteshist.get(nvp, 0) + 1 | |
with emit('data/scorehist.csv') as e: | |
for p, count in sorted(scorehist.iteritems()): | |
e.emit(p, count) | |
with emit('data/numvoteshist.csv') as e: | |
for p, count in sorted(numvoteshist.iteritems()): | |
e.emit(p, count) | |
with emit('data/timeofdayhist.csv') as e: | |
for p, count in sorted(timeofdayhist.iteritems()): | |
e.emit(p, count) | |
with emit('data/dirhist.csv') as e: | |
for p, count in sorted(dirhist.iteritems()): | |
e.emit(p, count) | |
links = {} | |
snapshotchunksize = 10*1000 | |
snapshotlength = 200 | |
# this has to be a whole second pass because we need it to be ordered by | |
# time instead of by link id | |
print 'Preparing DB cursor for chunked replay' | |
c = sl.execute("""select v.link, | |
v.direction, | |
v.timestamp as integer, | |
l.timestamp | |
from votes v, | |
links l | |
where v.link = l.link_id | |
and l.timestamp > ? | |
and l.timestamp < ? | |
order by v.timestamp""", | |
(minvote, maxvote)) | |
c = progress(c, verbosity, estimate=numvotes, key=lambda x: x[0]) | |
with emit('data/snapshots.csv') as e: | |
for chunk in in_chunks(c, snapshotchunksize): | |
for link, direction, vtimestamp, ltimestamp in chunk: | |
try: | |
l = links[link] | |
except KeyError: | |
l = links[link] = Link(link, 0, 0, ltimestamp) | |
if direction == -1: | |
l.downs += 1 | |
elif direction == 1: | |
l.ups += 1 | |
topfew = sorted(links.itervalues(), key=lambda x: x.hot, reverse=True) | |
topfew = topfew[:snapshotlength] | |
for n, link in enumerate(topfew): | |
e.emit(vtimestamp, link.linkid, link.hot, n) | |
if __name__ == '__main__': | |
report(sys.argv[1]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment