Last active
March 17, 2020 13:30
-
-
Save graipher/8441f7d079742561c22dd7af82d4e2cc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from string import ascii_letters | |
from itertools import cycle | |
import random | |
from copy import deepcopy | |
from functools import reduce | |
import numpy as np | |
from functools import partial | |
import timeit | |
import matplotlib.pyplot as plt | |
import pandas as pd | |
from itertools import product, count, chain | |
import multiprocessing | |
from uncertainties import ufloat, unumpy as unp | |
###TIMING functions | |
def get_time(func, *x, copy=False): | |
if copy: | |
x = deepcopy(x) | |
timer = timeit.Timer(partial(func, *x)) | |
t = timer.repeat(repeat=5, number=3) | |
if DEBUG: | |
print(func.__name__, len(x[0]), np.min(t) / 3) | |
return ufloat(np.min(t) / 3, np.std(t) / 3 / np.sqrt(len(t))) | |
def flatten_once(it): | |
for x in it: | |
yield list(chain([x[0]], x[1])) | |
def get_timings_df(funcs, inputs, key=identity, star=False, doc=False, copy=False): | |
df = pd.DataFrame(list(map(key, inputs)), columns=["x"]) | |
labels = [func.__name__ for func in funcs] | |
if doc: | |
labels = [func.__doc__ for func in funcs] | |
y = product(funcs, inputs) | |
if star: | |
y = flatten_once(y) | |
with multiprocessing.Pool(processes=N_CPU-1) as pool: | |
times_result = pool.starmap_async(partial(get_time, copy=copy), y) | |
try: | |
times = times_result.get(timeout=len(funcs) * len(inputs)) | |
except multiprocessing.TimeoutError: | |
times = times_result._value | |
print(times) | |
times = np.array(times).reshape(len(funcs), -1) | |
for label, t in zip(labels, times): | |
df[label] = t | |
return df | |
def plot_times(funcs, inputs, key=identity, xlabel="x", ylabel="Time [s]", logx=False, logy=False, star=False, ratio=False, doc=False, copy=False, fmt='o-'): | |
df = get_timings_df(funcs, inputs, key, star, doc, copy) | |
for label in df.columns[1:]: | |
x, y = df["x"], df[label] | |
if ratio: | |
y = y / df.T.iloc[1] | |
plt.errorbar(x, unp.nominal_values(y), unp.std_devs(y), fmt=fmt, label=label) | |
plt.xlabel(xlabel) | |
if ratio: | |
ylabel = "Time / Time of {}".format(df.columns[1]) | |
plt.ylabel(ylabel) | |
if logx: | |
plt.xscale("log") | |
if logy: | |
plt.yscale("log") | |
plt.legend() | |
plt.show() | |
### Functions to test | |
def push_nepnep(list_): | |
def find_current_element(list_, from_): | |
for i in range(from_+1, len(list_)): | |
if list_[i] != " ": | |
return i | |
return None | |
list_ = list_[::-1] | |
good_index = list_.index(" ") | |
current_element_index = find_current_element(list_, 0) | |
while(current_element_index is not None): | |
for i in range(good_index, len(list_)): | |
if list_[i] != " ": | |
list_[good_index], list_[i] = list_[i], " " | |
good_index += 1 | |
current_element_index = find_current_element(list_, current_element_index) | |
return reversed(list_) | |
def push_nepnep_(x): | |
return list(x) | |
def push_heap_overflow(x): | |
x.sort(key=" ".__ne__) | |
return x | |
def is_empty(x): | |
return x == " " | |
def find_nonempty(values, is_empty): | |
for i in reversed(range(len(values))): | |
if not is_empty(values[i]): | |
yield i | |
def push_peilonrayz(values, is_empty): | |
good_index = len(values) - 1 | |
for i in find_nonempty(values, is_empty): | |
values[i], values[good_index] = values[good_index], values[i] | |
good_index -= 1 | |
def push_peilonrayz_(x): | |
push_peilonrayz(x, is_empty) | |
return x | |
def push_peilonrayz2(values, is_empty): | |
return ( | |
[v for v in values if is_empty(v)] | |
+ [v for v in values if not is_empty(v)] | |
) | |
def push_peilonrayz2_(x): | |
return push_peilonrayz2(x, is_empty) | |
def empty_first(it): | |
buffer = [] | |
for x in it: | |
if x == " ": | |
yield x | |
else: | |
buffer.append(x) | |
yield from buffer | |
def empty_first_(x): | |
return list(empty_first(x)) | |
def push_opus(y): | |
return reduce(lambda xs, x: xs + [x] if x.strip() != "" else [x] + xs, y, []) | |
alphabet = cycle(ascii_letters) | |
x = [[" " if random.random() > 0.5 else next(alphabet) for _ in range(n)] for n in np.logspace(1, 7, dtype=int)] | |
plot_times([push_nepnep_, push_heap_overflow, push_peilonrayz_, push_peilonrayz2_, empty_first_, push_opus], x, key=len, logx=True, logy=True, xlabel="len(list_)", copy=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment