Created
May 9, 2024 22:12
-
-
Save udf/8b156d8c9fc6f7ae0778a2194aad9108 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pathlib import Path | |
from collections import defaultdict | |
import matplotlib.axes | |
import matplotlib.pyplot as plt | |
import matplotlib.ticker as ticker | |
import matplotlib.animation as animation | |
# number of ticks to simulate per frame (1 is real speed, 2 is 2x speed, etc) | |
TICKS_PER_FRAME = 1 | |
# the maximum number of items to process per tick | |
MAX_PER_TICK = 100 | |
# the minimum number of items to process per tick | |
MIN_PER_TICK = 10 | |
# number of bars in the plot | |
NUM_BARS = 120 | |
# if set, will render the animation to the file instead of displaying it | |
OUTPUT_VIDEO = None # 'combined2.mp4' | |
# how long to run the animation for when saving a video | |
VIDEO_LENGTH = 60 * 60 | |
# if set, will write the the number of items proceed per tick to the specified file | |
# (when the video render ends, or when the window is closed) | |
OUTPUT_DATA = Path(OUTPUT_VIDEO).with_suffix('.txt') if OUTPUT_VIDEO else None | |
# {due tick: list of items} | |
queue = defaultdict(list) | |
queue[120] = list(range(1000)) | |
n_processed_history = [] | |
ax: matplotlib.axes.Axes | |
fig, ax = plt.subplots() | |
fig.set_size_inches(16 / 2, 9 / 2) | |
history = [0 for _ in range(120)] | |
average2 = 0 | |
ticks_text = ax.annotate('Tick: 0', (0, 1), fontsize='x-large', xycoords='axes fraction', xytext=(10, -10), textcoords='offset points', ha='left', va='top', animated=True,) | |
avg_line = ax.plot(range(NUM_BARS), [1000 for _ in range(NUM_BARS)], color='grey', linestyle='--', label=f'{len(history)} tick avg')[0] | |
avg_line2 = ax.plot(range(NUM_BARS), [1000 for _ in range(NUM_BARS)], color='green', linestyle='--', label='weighted "avg"')[0] | |
bars = ax.bar(range(NUM_BARS), [0 for _ in range(NUM_BARS)], label='queue size') | |
ax.set( | |
xlim=[0, NUM_BARS], | |
ylim=[0, MAX_PER_TICK * 2], | |
xlabel='Queue due tick', | |
ylabel='Items in queue' | |
) | |
ax.xaxis.set_major_formatter(ticker.FormatStrFormatter('+%d')) | |
ax.legend(loc='upper right') | |
fig.tight_layout() | |
def process_item(item): | |
return 120 | |
def process_first_queue(tick, force=False, max_to_process=MAX_PER_TICK, reschedule=True): | |
# this is a O(n) (slow) way to find the smallest element, | |
# it could be done in constant time with a min heap | |
# but this is fine for testing | |
next_update_tick = min(queue.keys()) | |
num_items = len(queue[next_update_tick]) | |
if not force and next_update_tick > tick: | |
return 0, num_items | |
print(f'tick {tick}, processing updates for {next_update_tick} (+{next_update_tick - tick})') | |
# process and schedule items for later, how later depends on what | |
# the process_item function returns | |
num_processed = 0 | |
items = queue.pop(next_update_tick) | |
for i in range(min(len(items), max_to_process)): | |
# note that this is popping items from the right side of the list | |
# so that previously delayed items are processed first | |
# this prevents items from being delayed indefinitely | |
item = items.pop() | |
delay = process_item(item)# - (i % 2) | |
queue[tick + delay].append(item) | |
num_processed += 1 | |
# move anything not processed to the right side of the next tick's queue | |
if items: | |
if reschedule: | |
print(f' delaying {len(items)} to tick {tick + 1}') | |
queue[tick + 1].extend(items) | |
else: | |
queue[next_update_tick].extend(items) | |
return num_processed, num_items | |
def update_standard(tick, average_items): | |
''' | |
Update method that updates up to MAX_PER_TICK items every tick | |
''' | |
total_processed = 0 | |
total_items = 0 | |
while 1: | |
num_processed, num_items = process_first_queue(tick, max_to_process=MAX_PER_TICK) | |
if not num_processed: | |
break | |
total_items += num_items | |
total_processed += num_processed | |
return total_items, total_processed | |
def update_overflow_to_avg(tick, average_items): | |
''' | |
Update method that limits the number of processed items to the average | |
''' | |
new_max = max(MIN_PER_TICK, min(average_items, MAX_PER_TICK)) | |
total_processed = 0 | |
total_items = 0 | |
while 1: | |
num_processed, num_items = process_first_queue(tick, max_to_process=new_max) | |
if not num_processed: | |
break | |
total_processed += num_processed | |
total_items += num_items | |
return total_items, total_processed | |
def update_extra_to_avg(tick, average_items): | |
'''Update method that processes extra items up to the average''' | |
total_processed = 0 | |
total_items = 0 | |
while 1: | |
num_processed, num_items = process_first_queue(tick) | |
if not num_processed: | |
break | |
total_processed += num_processed | |
total_items += num_items | |
if not queue: | |
return total_items, total_processed | |
# process some items early (up to the average, minus how far in the future the next queue is) | |
next_update_tick = min(queue.keys()) | |
new_max = max(MIN_PER_TICK, min(average_items, MAX_PER_TICK)) | |
want_to_process = new_max - total_processed - (next_update_tick - tick) - 1 | |
num_extra_to_process = max(0, want_to_process) | |
while num_extra_to_process > 0: | |
num_processed, num_items = process_first_queue(tick, force=True, max_to_process=num_extra_to_process, reschedule=False) | |
if not num_processed: | |
break | |
num_extra_to_process -= num_processed | |
total_processed += num_processed | |
total_items += num_items | |
return total_items, total_processed | |
def update_combined(tick, average_items): | |
''' | |
Update method that limits the number of processed items to the average as well as | |
processes extra items up to the average | |
''' | |
new_max = max(MIN_PER_TICK, min(average_items, MAX_PER_TICK)) | |
total_processed = 0 | |
total_items = 0 | |
while 1: | |
num_processed, num_items = process_first_queue(tick, max_to_process=new_max) | |
if not num_processed: | |
break | |
total_processed += num_processed | |
total_items += num_items | |
if not queue: | |
return total_items, total_processed | |
# process some items early (up to the average, minus how far in the future the next queue is) | |
next_update_tick = min(queue.keys()) | |
want_to_process = new_max - total_processed - (next_update_tick - tick) - 1 | |
num_extra_to_process = max(0, want_to_process) | |
while num_extra_to_process > 0: | |
num_processed, num_items = process_first_queue(tick, force=True, max_to_process=num_extra_to_process, reschedule=False) | |
if not num_processed: | |
break | |
num_extra_to_process -= num_processed | |
total_processed += num_processed | |
total_items += num_items | |
return total_items, total_processed | |
def do_update(tick): | |
global average2 | |
# update_func = update_standard | |
# update_func = update_overflow_to_avg | |
# update_func = update_extra_to_avg | |
update_func = update_combined | |
total_items, num_processed = update_func(tick, round(average2)) | |
if OUTPUT_DATA: | |
n_processed_history.append(num_processed) | |
average2 = average2 * (119/120) + total_items * (1/120) | |
history.pop(0) | |
history.append(total_items) | |
avg = sum(history) / len(history) | |
avg_line.set_data(range(NUM_BARS), [avg for _ in range(NUM_BARS)]) | |
avg_line2.set_data(range(NUM_BARS), [average2 for _ in range(NUM_BARS)]) | |
for i, bar_artist in enumerate(bars.patches): | |
bar_artist.set_height(len(queue.get(tick + i, []))) | |
secs, rem_ticks = divmod(tick, 60) | |
mins, secs = divmod(secs, 60) | |
ticks_text.set_text(f'Tick: {tick}\n{mins:02d}m{secs:02d}s+{rem_ticks}') | |
return bars.patches + [avg_line, avg_line2, ticks_text] | |
def update_bars(tick): | |
artists = [] | |
for i in range(TICKS_PER_FRAME): | |
update_tick = tick * TICKS_PER_FRAME + i + 1 | |
artists = do_update(update_tick) | |
return artists | |
ani = animation.FuncAnimation( | |
fig=fig, | |
func=update_bars, | |
frames=range(VIDEO_LENGTH) if OUTPUT_VIDEO else None, | |
interval=1 if OUTPUT_VIDEO else 1000 / 60, | |
cache_frame_data=False, | |
blit=True, | |
repeat=False | |
) | |
if OUTPUT_VIDEO: | |
ani.save(OUTPUT_VIDEO, fps=60, dpi=100) | |
else: | |
plt.show() | |
if OUTPUT_DATA: | |
with open(OUTPUT_DATA, 'w') as f: | |
f.write('\n'.join(str(n) for n in n_processed_history)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pathlib import Path | |
import numpy as np | |
import matplotlib.axes | |
import matplotlib.pyplot as plt | |
datasets = {} | |
for file in Path('.').glob('*.txt'): | |
name = file.stem.replace('_', ' ') | |
with open(file) as f: | |
data = [int(n) for n in f.read().split('\n')] | |
data = np.lib.stride_tricks.sliding_window_view(data, 120) | |
datasets[name] = np.std(data, axis=1) | |
ax: matplotlib.axes.Axes | |
fig, ax = plt.subplots() | |
fig.set_size_inches(16 / 2, 9 / 2) | |
# plot sorted by the last entry, ascending | |
for name, data in sorted(datasets.items(), key=lambda x: x[1][-1], reverse=True): | |
ax.plot(range(len(data)), data, label=name) | |
ax.set( | |
ylim=0, | |
xlabel='Time (ticks)', | |
ylabel='Standard deviation' | |
) | |
ax.legend(loc='upper right') | |
fig.tight_layout() | |
plt.autoscale(tight=True, axis='x') | |
fig.savefig('stdev.png', dpi=300) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment