Last active
May 25, 2021 17:46
-
-
Save Cabeda/0bfac179ea455ddb845f4d271c86ea47 to your computer and use it in GitHub Desktop.
System control simulator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import scipy.stats as st | |
import matplotlib.pyplot as plt | |
def validate_parameters(n1, n2, k1, k2, w): | |
"""Validate that sim parameteres are valid""" | |
if k1 < w: | |
raise Exception(f"K1 {k1} must be greater than w {w}") | |
elif k1 < k2: | |
raise Exception(f"K1 {k1} must be greater than k2 {k2}") | |
elif k2 < w: | |
raise Exception(f"K2 {k2} must be greater than W {w}") | |
elif n2 < n1: | |
raise Exception(f"N2 {n2} must be greater than N1 {n1}") | |
def get_samples(u0, sigma): | |
"""Get a sample from a normal distribution.""" | |
return np.random.normal(u0, sigma) | |
def get_time_of_occurence(freq_occurence_per_hour=0.01): | |
"""Gets a random rate of occurence. Default value is 0.01""" | |
return (-np.log(np.random.random())) / freq_occurence_per_hour | |
def run_cycle(n, u0, sigma, is_error=False): | |
samples = [] | |
for x in range(n): | |
if is_error is True: | |
sample = get_samples(u0 + sigma, sigma) | |
samples.append(sample) | |
else: | |
sample = get_samples(u0, sigma) | |
samples.append(sample) | |
mean_sample = np.mean(np.array(samples)) | |
# print(f"Mean of sample {mean_sample}\n") | |
return mean_sample | |
def validate_limits(mean_sample, LWL, UWL, LCL, UCL, h, h1, h2, error_count): | |
if mean_sample > LCL and mean_sample < UCL: | |
if mean_sample > LWL and mean_sample < UWL: | |
h = h1 | |
else: | |
h = h2 | |
return True, h, error_count | |
else: | |
error_count = error_count + 1 | |
print( | |
"""Outside control parameters. Stopping process.But why Watson? 🤔🤔🤔 | |
""" | |
) | |
return False, h, error_count | |
def generate_graph(filename, mean_samples, times, LCL, UCL, LWL, UWL): | |
"""Generates graph and saves to a png file""" | |
fig, ax = plt.subplots() | |
ax.plot(times, mean_samples) | |
ax.plot(times, np.repeat(LCL, len(times)), color="red") | |
ax.plot(times, np.repeat(UCL, len(times)), color="red") | |
ax.plot(times, np.repeat(LWL, len(times)), color="yellow") | |
ax.plot(times, np.repeat(UWL, len(times)), color="yellow") | |
ax.set( | |
xlabel="time unit", | |
ylabel="mean samples", | |
title="Sim sample", | |
) | |
ax.legend(["Samples", "LCL", "UCL", "LWL", "UWL"]) | |
ax.grid() | |
fig.savefig(f"{filename}.png") | |
def run_and_validate_cycle( | |
LWL, UWL, LCL, UCL, n, u0, sigma, h, total_time, time_of_ocurrence | |
): | |
h1 = 3.3 | |
h2 = 0.1 | |
error_count_1 = 0 | |
error_count_2 = 0 | |
total_time = total_time + h | |
if total_time < time_of_ocurrence: | |
mean_sample = run_cycle(n, u0, sigma) | |
is_inside_limits, h, error_count_1 = validate_limits( | |
mean_sample, LWL, UWL, LCL, UCL, h, h1, h2, error_count_1 | |
) | |
return ( | |
is_inside_limits, | |
total_time, | |
h, | |
error_count_1, | |
error_count_2, | |
mean_sample, | |
) | |
else: | |
mean_sample = run_cycle(n, u0, sigma, is_error=True) | |
is_inside_limits, h, error_count_2 = validate_limits( | |
mean_sample, LWL, UWL, LCL, UCL, h, h1, h2, error_count_2 | |
) | |
return ( | |
is_inside_limits, | |
total_time, | |
h, | |
error_count_1, | |
error_count_2, | |
mean_sample, | |
) | |
def calculate_cost(sample_size, total_time, time_of_ocurrence): | |
c_cost = 1 | |
L0 = 0 | |
L1 = 0 | |
M = 0 | |
sample_cost = sample_size * c_cost | |
if total_time < time_of_ocurrence: | |
L0 = 100 | |
else: | |
L1 = 200 | |
M = (total_time - time_of_ocurrence) * 100 | |
total_cost = sample_cost + M + L0 + L1 | |
return total_cost | |
def run_sim(u0, n1, n2, k1, k2, w, h1, h2, sigma, individual_run=False): | |
print( | |
f""" | |
Running with the following params: | |
n1: {n1} | |
n2: {n2} | |
k1: {k1} | |
k2: {k2} | |
w: {w} | |
""" | |
) | |
validate_parameters(n1, n2, k1, k2, w) | |
# Warning limits 1 | |
UWL1 = u0 + w * sigma / np.sqrt(n1) | |
LWL1 = u0 - w * sigma / np.sqrt(n1) | |
# print(f"Warning Limits 1: [{LWL1}, {UWL1}]\n") | |
# Control Limits 1 | |
UCL1 = u0 + k1 * sigma / np.sqrt(n1) | |
LCL1 = u0 - k1 * sigma / np.sqrt(n1) | |
# print(f"Control Limits 1: [{LCL1}, {UCL1}]\n") | |
# Warning limits 2 | |
UWL2 = u0 + w * sigma / np.sqrt(n2) | |
LWL2 = u0 - w * sigma / np.sqrt(n2) | |
# print(f"Warning Limits 2: [{LWL2}, {UWL2}]\n") | |
# Control Limits 2 | |
UCL2 = u0 + k2 * sigma / np.sqrt(n2) | |
LCL2 = u0 - k2 * sigma / np.sqrt(n2) | |
# print(f"Control Limits 2: [{LCL2}, {UCL2}]\n") | |
time_of_occurence = get_time_of_occurence() | |
is_inside_controls = True | |
total_cost = 0 | |
total_time = 0 | |
total_per_hour = 0 | |
cost_per_hours = [] | |
count = 0 | |
h = h1 | |
sum_error_count_1 = 0 | |
sum_error_count_2 = 0 | |
mean_samples = [] | |
times = [] | |
while is_inside_controls: | |
if h == h1: | |
( | |
is_inside_controls, | |
total_time, | |
h, | |
error_count_1, | |
error_count_2, | |
mean_sample, | |
) = run_and_validate_cycle( | |
LWL1, UWL1, LCL1, UCL1, n1, u0, sigma, h1, total_time, time_of_occurence | |
) | |
mean_samples.append(mean_sample) | |
times.append(total_time) | |
total_cost = total_cost + calculate_cost(n1, total_time, time_of_occurence) | |
total_per_hour = round(total_cost / total_time, 2) | |
cost_per_hours.append(total_per_hour) | |
count = count + 1 | |
sum_error_count_1 = sum_error_count_1 + error_count_1 | |
sum_error_count_2 = sum_error_count_2 + error_count_2 | |
elif is_inside_controls: | |
( | |
is_inside_controls, | |
total_time, | |
h, | |
_, | |
_, | |
mean_sample, | |
) = run_and_validate_cycle( | |
LWL2, | |
UWL2, | |
LCL2, | |
UCL2, | |
n2, | |
u0, | |
sigma, | |
h2, | |
total_time, | |
time_of_occurence, | |
) | |
mean_samples.append(mean_sample) | |
times.append(total_time) | |
total_cost = total_cost + calculate_cost(n2, total_time, time_of_occurence) | |
total_per_hour = round(total_cost / total_time, 2) | |
cost_per_hours.append(total_per_hour) | |
count = count + 1 | |
sum_error_count_1 = sum_error_count_1 + error_count_1 | |
sum_error_count_2 = sum_error_count_2 + error_count_2 | |
if individual_run: | |
generate_graph("cicle_1", mean_samples, times, LCL1, UCL1, LWL1, UWL1) | |
generate_graph("cicle_2", mean_samples, times, LCL2, UCL2, LWL2, UWL2) | |
print( | |
f"Simulator run for {count} cycles for a total of {round(total_time, 1)} hours 🐇🕚" | |
) | |
print(f"Total cost of {round(total_cost, 2)}€ 💸💸💸") | |
print(f"Total cost/hour of {total_per_hour}€\n") | |
print( | |
f"There were {error_count_1} error(s) of type 1 for a total of {error_count_1 + error_count_2} error(s)" | |
) | |
return ( | |
total_per_hour, | |
cost_per_hours, | |
total_cost, | |
sum_error_count_1, | |
sum_error_count_2, | |
total_time, | |
time_of_occurence, | |
) | |
def run_individual_sim(u0, n1, n2, k1, k2, w, h1, h2, sigma): | |
( | |
_, | |
costs_per_hour, | |
total_cost, | |
sum_error_count_1, | |
sum_error_count_2, | |
total_time, | |
time_of_occurence, | |
) = run_sim(u0, n1, n2, k1, k2, w, h1, h2, sigma, True) | |
confidence = st.t.interval( | |
0.95, | |
len(costs_per_hour) - 1, | |
loc=np.mean(costs_per_hour), | |
scale=st.sem(costs_per_hour), | |
) | |
print( | |
f"The 95% confidence interval for the cost per time unit (in a normal distribution): {confidence}" | |
) | |
return ( | |
total_cost, | |
sum_error_count_1, | |
sum_error_count_2, | |
total_time, | |
time_of_occurence, | |
) | |
def get_optimal_values(number_cycles): | |
""" | |
Get optimal values | |
""" | |
h1 = 3.3 | |
h2 = 0.1 | |
u0 = 0 | |
sigma = 1 | |
k2 = 2.6 | |
n1_max = 5 | |
n2_min = 7 | |
n2_max = 11 | |
w = 1.3 | |
k1_max = 5.0 | |
k1 = 2.7 | |
min_avg_cost_per_hour = 0 | |
min_params = {} | |
total = 0 | |
for n1 in range(1, 1 + n1_max): | |
for n2 in range(n2_min, 1 + n2_max): | |
for k1 in np.arange(k2 + 1, k1_max, 0.1): | |
for _ in range(1, number_cycles): | |
total_per_hour, _, _, _, _, _, _ = run_sim( | |
u0, n1, n2, k1, k2, w, h1, h2, sigma | |
) | |
total = total + total_per_hour | |
avg_cost_per_hour = total / number_cycles | |
if ( | |
min_avg_cost_per_hour == 0 | |
or min_avg_cost_per_hour > avg_cost_per_hour | |
): | |
min_params = { | |
"n1": n1, | |
"n2": n2, | |
"k1": k1, | |
"k2": k2, | |
"w": w, | |
} | |
min_avg_cost_per_hour = avg_cost_per_hour | |
print( | |
f"Optimal results are {min_params} with a cost/hour of {round(min_avg_cost_per_hour,2)}€ 🤑" | |
) | |
def error_several_sims(n1, n2, k1, k2, w, h1, h2, u0, sigma, l): | |
sum_error_count_1 = 0 | |
sum_error_count_2 = 0 | |
for i in range(1, l): | |
_, error_count_1, error_count_2, _, _ = run_individual_sim( | |
u0, n1, n2, k1, k2, w, h1, h2, sigma | |
) | |
sum_error_count_1 = sum_error_count_1 + error_count_1 | |
sum_error_count_2 = sum_error_count_2 + error_count_2 | |
print( | |
f"There were {sum_error_count_1} error of type one out of all the {sum_error_count_1+sum_error_count_2} errors" | |
) | |
def time_btw_error_and_stop(n1, n2, k1, k2, w, h1, h2, u0, sigma, number_cycles): | |
time = 0 | |
count = 0 | |
while count < number_cycles: | |
_, _, _, _, _, total_time, time_of_occurence = run_sim( | |
u0, n1, n2, k1, k2, w, h1, h2, sigma | |
) | |
# Is type 2 error | |
if time_of_occurence < total_time: | |
count = count + 1 | |
time = time + (total_time - time_of_occurence) | |
avg_time = time / number_cycles | |
print(f"The time between the occurence and the stop is {round(avg_time, 2)} hours") | |
def avg_time_several_sims(u0, n1, n2, k1, k2, w, h1, h2, sigma, number_cycles): | |
times = [] | |
occurence = 0 | |
previous_time_of_occurence = 0 | |
time_type_1 = 0 | |
for _ in range(1, number_cycles): | |
_, _, _, _, _, total_time, time_of_ocurrence = run_sim( | |
u0, n1, n2, k1, k2, w, h1, h2, sigma | |
) | |
# Is type 2 error | |
if time_of_ocurrence > total_time: | |
occurence = time_of_ocurrence - previous_time_of_occurence + time_type_1 | |
previous_time_of_occurence = time_of_ocurrence | |
times.append(occurence) | |
time_type_1 = 0 | |
else: | |
time_type_1 = time_type_1 + total_time | |
avg_time = np.round(np.mean(times), 2) | |
print( | |
f"The average time between two consecutive assignable causes is {avg_time} hours" | |
) | |
def get_optimal_values_2(l): | |
h1 = 3.3 | |
h2 = 0.1 | |
u0 = 0 | |
sigma = 1 | |
min_avg_cost_per_hour = 0 | |
n_max = 50 | |
w = 1.3 | |
k_max = 3 | |
total = 0 | |
for n in range(1, 1 + n_max): | |
for k in np.arange(1.4, k_max + 0.1, 0.1): | |
for _ in range(1, l): | |
total_per_hour, _, _, _, _, _, _ = run_sim( | |
u0, | |
n, | |
n, | |
k, | |
k, | |
w, | |
h1, | |
h2, | |
sigma, | |
) | |
total = total + total_per_hour | |
avg_cost_per_hour = total / l | |
if min_avg_cost_per_hour == 0 or min_avg_cost_per_hour > avg_cost_per_hour: | |
min_avg_cost_per_hour = avg_cost_per_hour | |
min_params = { | |
"n": n, | |
"k": k, | |
"w": w, | |
} | |
print( | |
f"Optimal results are {min_params} with an average cost/hour of {min_avg_cost_per_hour}€ " | |
) | |
return n, k, w, min_avg_cost_per_hour | |
def get_optimal_values_3(l): | |
""" | |
Get optimal values | |
""" | |
u0 = 0 | |
sigma = 1 | |
n_max = 5 | |
h_max = 10 | |
w = 1.3 | |
k_max = 3 | |
min_avg_cost_per_hour = 0 | |
min_params = {} | |
total = 0 | |
for n in range(1, 1 + n_max): | |
for h in np.arange(1, h_max + 0.1, 0.1): | |
for k in np.arange(w, k_max + 0.1, 0.1): | |
for _ in range(1, l): | |
total_per_hour, _, _, _, _, _, _ = run_sim( | |
u0, | |
n, | |
n, | |
k, | |
k, | |
w, | |
h, | |
h, | |
sigma, | |
) | |
total = total + total_per_hour | |
avg_cost_per_hour = total / l | |
if ( | |
min_avg_cost_per_hour == 0 | |
or min_avg_cost_per_hour > avg_cost_per_hour | |
): | |
min_params = { | |
"n": n, | |
"k": k, | |
"h": h, | |
} | |
min_avg_cost_per_hour = avg_cost_per_hour | |
print( | |
f"Optimal results are {min_params} with a cost/hour of {round(min_avg_cost_per_hour,2)}€ 🤑" | |
) | |
# Question 1.1 | |
# get_optimal_values(number_cycles=50) | |
# Question 1.2 and 2 | |
# run_individual_sim(u0=0, n1=1, n2=7, k1=3.6, k2=2.6, w=1.3, h1=3.3, h2=0.1, sigma=1) | |
# Question 3 | |
# get_optimal_values_3(l=50) | |
# Question4 | |
avg_time_several_sims( | |
u0=0, n1=1, n2=1, k1=1.3, k2=1.3, w=1.3, h1=1, h2=1, sigma=1, number_cycles=50000 | |
) | |
# Question5 | |
# error_several_sims(n1=1,n2=1,k1=1.3,k2=1.3,w=1.3,h1=1,h2=1,u0=0,sigma=1,l=50000) | |
# Question 6 | |
# time_btw_error_and_stop(n1=1,n2=7,k1=3.6,k2=2.6,w=1.3,h1=3.3,h2=0.1,u0=0,sigma=1,number_cycles=500) | |
# Question 7 | |
# get_optimal_values_2(l=10) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment