Created
March 9, 2021 16:39
-
-
Save nniranjhana/c5b4b87304e9409759250c7eb7f39a5a to your computer and use it in GitHub Desktop.
RTOS assignment
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random, sys | |
import numpy as np | |
import matplotlib.pyplot as plt | |
# This is to divide generated values by their total sum so they sum up to U | |
def bigint(num): | |
# Round off floats to 10 decimal places | |
num = float('%.10f'%(num)) | |
# Convert to a big int preserving all 10 decimal digits | |
return int(num * 10**10) | |
def roundoff(num): | |
return float('%.10f'%(num)) | |
def checkRMsched(u, n): | |
count = 0. | |
for j in range(T): | |
prod = 1 | |
for i in range(n): | |
prod = (prod*(u[j][i] + 1.)) | |
if(prod <= 2): | |
count = count + 1. | |
return count | |
# Generate utilization of the task set | |
U = np.arange(0.05, 1.0, 0.05) | |
# U = U[15:19] | |
# Total number of task sets to generate | |
T = 100000 | |
# Number of tasks in each task set | |
n = [8, 16, 32, 64] | |
y = [] | |
for l in range(len(n)): | |
y.append([]) | |
for k in range(len(n)): | |
for Ui in U: | |
Uii = roundoff(Ui) | |
Ui = bigint(Ui) | |
# Construct T utilization vectors | |
u = [] | |
for j in range(T): | |
ut = [] | |
for i in range(n[k]): | |
# Uniform sampling in Σ ui = Ui | |
ut.append(bigint(random.betavariate(1, 10000))) | |
u.append(ut) | |
# Sum of n generated utilization vector values for T task sets | |
us = [] | |
for j in range(T): | |
uts = 0 | |
for i in range(n[k]): | |
uts = uts + u[j][i] | |
us.append(uts) | |
# Actual utilization vectors that sum up to Ui | |
for j in range(T): | |
usj = us[j] | |
for i in range(n[k]): | |
u[j][i] = ((u[j][i]/usj)*Ui)/10**10 | |
# Fraction of T task sets that will be schedulable by RM | |
count = checkRMsched(u, n[k]) | |
count = (count/T)*100. | |
print("Percentage of tasks sets schedulable by RM for Ui of", Uii, 'is', count) | |
y[k].append(count) | |
x = U | |
fig, ax = plt.subplots() | |
ax.plot(x,y[0], label='RM, 8 tasks') | |
ax.plot(x,y[1], label='RM, 16 tasks') | |
ax.plot(x,y[2], label='RM, 32 tasks') | |
ax.plot(x,y[3], label='RM, 64 tasks') | |
ax.legend() | |
ax.plot() | |
ax.set(xlabel="Utilization factor", ylabel="Percent of task sets schedulable by RM", | |
title="Percent of task sets schedulable by RM vs utilization") | |
ax.grid() | |
plt.show() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random, sys, time, threading | |
import numpy as np | |
from scipy.stats import loguniform | |
# Number of tasks in each task set | |
n = 16 | |
# Total utilization | |
U0 = float(sys.argv[1]) | |
# U0 = 0.1 | |
# Time to run the algorithm for and consequently the time that caps the maximum period possible | |
ts = int(sys.argv[2]) | |
# ts = 1000 # Value we have been using, around 16 minutes | |
# Values possible: RMS, SPTF, MUF | |
sched = str(sys.argv[3]) | |
# This is to divide generated values by their total sum so they sum up to U | |
def bigint(num): | |
# Round off floats to 3 decimal places | |
num = float('%.3f'%(num)) | |
# Convert to a big int preserving all 3 decimal digits | |
return int(num * 10**3) | |
# Round off to three places | |
def round3(num): | |
num = float('%.3f'%(num)) | |
return num | |
U = bigint(U0) | |
while(True): | |
u = [] | |
for i in range(n): | |
# Uniform sampling in Σ ui = U | |
ut = bigint(random.betavariate(1, 100)) | |
u.append(ut) | |
# Sum of n generated utilization vector values for T task sets | |
us = [] | |
uts = 0 | |
for i in range(n): | |
uts = uts + u[i] | |
# Actual utilization vectors that sum up to U | |
for i in range(n): | |
u[i] = ((u[i]/uts)*U)/10**3 | |
# Utilization vectors for tasks | |
flag = 0 | |
sumt = 0 | |
for i in range(n): | |
u[i] = round3(u[i]) | |
sumt = sumt + u[i] | |
# Add the round off for one task when sum is slightly larger or lesser than U | |
offset = sumt - U0 | |
offset = round3(offset) | |
ind = random.randint(0, n-1) | |
u[ind] = u[ind] - offset | |
u[ind] = round3(u[ind]) | |
# Our rounding off and computations could results in some invalid utilization vectors | |
# So regenerate in case that happens, or break out of loop if u values are legit | |
for i in range(n): | |
if(u[i] <= 0): | |
flag = 1 | |
if(flag == 0): | |
break | |
print("Util vectors: ", u) | |
if (sched == "RMS"): | |
check = 1 | |
# Check schedulable | |
for i in range(n): | |
check = check*(u[i] + 1) | |
if(check <= 2): | |
print("Schedulable with Ubound: ", check) | |
else: | |
print("Some tasks might be missed because bound is: ", check) | |
# Question asks between 10 to 100000, but upper limit should be less than time you run for | |
P = loguniform.rvs(1, 5, size=n) | |
P = list(P) | |
for i in range(n): | |
P[i] = float('%.2f'%(P[i])) | |
print("Task periods: ", P) | |
# Calculate computation times | |
C = [] | |
for i in range(n): | |
C.append(u[i]*P[i]) | |
for i in range(n): | |
C[i] = float('%.3f'%(C[i])) | |
print("Task computation: ", C) | |
# Calculate priorities based on if RMS or SPTF or MUF | |
if (sched == "SPTF"): | |
Ci = C[:] | |
Pr = [0] * n | |
for i in range(n): | |
cmin = min(Ci) | |
j = Ci.index(cmin) | |
Pr[j] = i+1 | |
Ci[j] = 1000001 | |
elif (sched == "RMS"): | |
Pi = P[:] | |
Pr = [0] * n | |
for i in range(n): | |
pmin = min(Pi) | |
j = Pi.index(pmin) | |
Pr[j] = i+1 | |
Pi[j] = 1000001 | |
elif (sched == "MUF"): | |
Ui = u[:] | |
Pr = [0] * n | |
for i in range(n): | |
umax = max(Ui) | |
j = Ui.index(umax) | |
Pr[j] = i+1 | |
Ui[j] = 0 | |
print("Task priorities", Pr) | |
ready_q = [] | |
missed_tasks = [] | |
completed_tasks = [] | |
def print_current_sec(): | |
while(True): | |
print(time.ctime()) | |
time.sleep(1) | |
def arrive_task(i, P, C, Pr): | |
while True: | |
print("Task %d has arrived" % i) | |
ready_q.append([Pr[i], C[i], i]) | |
time.sleep(P[i]) | |
time_slice = 0.001 # Can't be lesser because time.sleep on Linux is accuracte till 1ms | |
def preemptive(): | |
while True: | |
if(ready_q): | |
for i in range(len(ready_q)-1): | |
if(ready_q[i][0] == ready_q[-1][0]): | |
missed_tasks.append(ready_q[i]) | |
print("Task %d has missed deadline and was removed" % ready_q[i][2]) | |
ready_q.remove(ready_q[i]) | |
# print(ready_q) | |
qmin = min(ready_q) | |
index = ready_q.index(qmin) | |
ready_q[index][1] = ready_q[index][1] - time_slice | |
if(ready_q[index][1] < time_slice): | |
print("Task %d was removed" % ready_q[index][2]) | |
completed_tasks.append(ready_q[index][2]) | |
ready_q.remove(qmin) | |
time.sleep(time_slice) | |
def nonpreemtive(): | |
while True: | |
if(ready_q): | |
for i in range(len(ready_q)-1): | |
if(ready_q[i][0] == ready_q[-1][0]): | |
missed_tasks.append(ready_q[i]) | |
print("Task %d has missed deadline and was removed" % ready_q[i][2]) | |
ready_q.remove(ready_q[i]) | |
qmin = min(ready_q) | |
index = ready_q.index(qmin) | |
while(ready_q[index][1] > time_slice): | |
ready_q[index][1] = ready_q[index][1] - time_slice | |
time.sleep(time_slice) | |
print("Task %d was removed" % ready_q[index][2]) | |
completed_tasks.append(ready_q[index][2]) | |
ready_q.remove(qmin) | |
# Run simulation | |
gtime = time.time() | |
for i in range(n): | |
thr = threading.Thread(target=arrive_task, args=(i, P, C, Pr), daemon=True) | |
thr.start() | |
# thr = threading.Thread(target=print_current_sec, daemon=True) | |
# thr.start() | |
thr = threading.Thread(target=preemptive, daemon=True) | |
thr.start() | |
# thr = threading.Thread(target=nonpreemptive, daemon=True) | |
# thr.start() | |
time.sleep(ts) | |
print("# missed tasks: ", len(missed_tasks)) | |
print("# completed tasks", len(completed_tasks)) | |
print("Scheduling time elapsed:", time.time()-gtime) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random, sys | |
import numpy as np | |
from mpl_toolkits import mplot3d | |
import matplotlib.pyplot as plt | |
from matplotlib.pyplot import cm | |
# This is to divide generated values by their total sum so they sum up to U | |
def bigint(num): | |
# Round off floats to 10 decimal places | |
num = float('%.10f'%(num)) | |
# Convert to a big int preserving all 10 decimal digits | |
return int(num * 10**10) | |
# Fix the utilization of the task set | |
U=0.1 | |
U=bigint(U) | |
# Number of task sets | |
T = 10000 | |
# Number of tasks in task set | |
n = 3 | |
u = [] | |
alphas = [1., 1., 1.] | |
for j in range(T): | |
# Uniform sampling in Σ ui = U | |
ut = list(np.random.dirichlet(alphas)) | |
for i in range(n): | |
ut[i] = bigint(ut[i]) | |
u.append(ut) | |
# Sum of n generated utilization vector values for T task sets | |
us = [] | |
for j in range(T): | |
uts = 0 | |
for i in range(n): | |
uts = uts + u[j][i] | |
us.append(uts) | |
# Actual utilization vectors that sum up to Ui | |
for j in range(T): | |
usj = us[j] | |
for i in range(n): | |
u[j][i] = ((u[j][i]/usj)*U)/10**10 | |
us = [] | |
fig = plt.figure() | |
ax = plt.axes(projection='3d') | |
xd = [] | |
yd = [] | |
zd = [] | |
for j in range(T): | |
xd.append(u[j][0]) | |
yd.append(u[j][1]) | |
zd.append(u[j][2]) | |
ax.scatter3D(xd, yd, zd, s=0.2, c='blue') | |
plt.gca().invert_xaxis() | |
plt.gca().invert_zaxis() | |
plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment