Skip to content

Instantly share code, notes, and snippets.

@emmettbutler
Last active March 27, 2019 21:34
Show Gist options
  • Save emmettbutler/2c7e1a5bc63ef38dfc70639a491c886a to your computer and use it in GitHub Desktop.
Save emmettbutler/2c7e1a5bc63ef38dfc70639a491c886a to your computer and use it in GitHub Desktop.
import datetime as dt
import random
from collections import namedtuple
Heartbeat = namedtuple("Heartbeat", ["nginx_ts", "inc"])
# how many seconds should a pageview be assumed to last for each test?
# it might be useful to include the actual average pageview length
# or to set these to quartiles from actual data
EXAMPLE_PAGEVIEW_LENGTHS = (5, 20, 120, 600, 1200)
# how many iterations of each backoff strategy to run
ITERATIONS = 1000
class BackoffStrategy:
def get_interval(self, total_time):
return dt.timedelta(seconds=10.5)
@staticmethod
def average_engagement_per_heartbeat(heartbeats):
percentages = []
for i in range(1, len(heartbeats)):
current = heartbeats[i]
prev = heartbeats[i - 1]
delta = (current.nginx_ts - prev.nginx_ts).total_seconds()
percentage = current.inc / delta
percentages.append(percentage)
return sum(percentages) / len(percentages) if percentages else 0
def adjust_heartbeats_for_backoff(self, heartbeats, pv_length):
heartbeats = sorted(heartbeats, key=lambda h: h.nginx_ts)
avg_engagement_pct = self.average_engagement_per_heartbeat(heartbeats)
total_time = dt.timedelta()
iteration_ts = heartbeats[0].nginx_ts
end_time = (
iteration_ts - self.get_interval(dt.timedelta(seconds=0))
) + pv_length
new_heartbeats = []
while iteration_ts <= end_time:
interval = self.get_interval(total_time)
inc = interval.total_seconds() * avg_engagement_pct
new_heartbeat = Heartbeat(nginx_ts=iteration_ts, inc=int(round(inc)))
new_heartbeats.append(new_heartbeat)
total_time += dt.timedelta(seconds=inc)
iteration_ts += interval
return new_heartbeats
class LongerIntervalstrategy(BackoffStrategy):
def get_interval(self, total_time):
return dt.timedelta(seconds=21)
class DoublingBackoffStrategy(BackoffStrategy):
def __init__(self):
super(DoublingBackoffStrategy, self).__init__()
self.threshold = dt.timedelta(seconds=60)
self.current_interval = super(DoublingBackoffStrategy, self).get_interval(
dt.timedelta(seconds=0)
)
def get_interval(self, total_time):
if total_time <= self.threshold:
return dt.timedelta(seconds=10.5)
self.current_interval *= 2
return self.current_interval
class WaitLongerDoublingBackoffStrategy(DoublingBackoffStrategy):
def __init__(self):
super(WaitLongerDoublingBackoffStrategy, self).__init__()
self.threshold = dt.timedelta(seconds=120)
def build_example_heartbeats(pv_length):
result = []
iteration_time = dt.datetime(
2019,
1,
random.randint(1, 28),
random.randint(1, 23),
random.randint(1, 59),
random.randint(1, 59),
)
interval = BackoffStrategy().get_interval(dt.timedelta(seconds=0))
end_time = iteration_time + pv_length
while iteration_time <= end_time:
inc = random.randint(1, int(interval.total_seconds()))
heartbeat = Heartbeat(nginx_ts=iteration_time, inc=inc)
result.append(heartbeat)
iteration_time += interval
return result
def get_statistics(heartbeats, target=None):
result = {
"count": len(heartbeats),
"inc_sum": sum([h.inc for h in heartbeats]),
"iterations": 1,
}
if target:
count_delta = result["count"] - target["count"]
inc_sum_delta = result["inc_sum"] - target["inc_sum"]
result["count_delta"] = count_delta
result["inc_sum_delta"] = inc_sum_delta
return result
def add_statistics(stats1, stats2):
if stats1 is None:
return stats2
return {
"count": stats1["count"] + stats2["count"],
"inc_sum": stats1["inc_sum"] + stats2["inc_sum"],
"count_delta": stats1["count_delta"] + stats2["count_delta"],
"inc_sum_delta": stats1["inc_sum_delta"] + stats2["inc_sum_delta"],
"iterations": stats1["iterations"] + 1,
}
def avg_statistics(stats):
return {
"count": stats["count"] / stats["iterations"],
"inc_sum": stats["inc_sum"] / stats["iterations"],
"count_delta": stats["count_delta"] / stats["iterations"],
"inc_sum_delta": stats["inc_sum_delta"] / stats["iterations"],
"iterations": stats["iterations"],
}
def main():
tested_strategies = (
LongerIntervalstrategy,
DoublingBackoffStrategy,
WaitLongerDoublingBackoffStrategy,
)
for pv_seconds in EXAMPLE_PAGEVIEW_LENGTHS:
pv_length = dt.timedelta(seconds=pv_seconds)
print("Input pv: {}".format(pv_length))
for strategy_cls in tested_strategies:
summed_stats = None
print("{}:".format(strategy_cls.__name__))
for _ in range(ITERATIONS):
example_heartbeats = build_example_heartbeats(pv_length)
target_stats = get_statistics(example_heartbeats)
strategy = strategy_cls()
new_heartbeats = strategy.adjust_heartbeats_for_backoff(
example_heartbeats, pv_length
)
stats = get_statistics(new_heartbeats, target=target_stats)
summed_stats = add_statistics(summed_stats, stats)
print(avg_statistics(summed_stats))
print("-" * 30)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment