Skip to content

Instantly share code, notes, and snippets.

@nniranjhana
Created March 9, 2021 16:39
Show Gist options
  • Save nniranjhana/c5b4b87304e9409759250c7eb7f39a5a to your computer and use it in GitHub Desktop.
Save nniranjhana/c5b4b87304e9409759250c7eb7f39a5a to your computer and use it in GitHub Desktop.
RTOS assignment
import random, sys
import numpy as np
import matplotlib.pyplot as plt
# This is to divide generated values by their total sum so they sum up to U
def bigint(num):
# Round off floats to 10 decimal places
num = float('%.10f'%(num))
# Convert to a big int preserving all 10 decimal digits
return int(num * 10**10)
def roundoff(num):
return float('%.10f'%(num))
def checkRMsched(u, n):
count = 0.
for j in range(T):
prod = 1
for i in range(n):
prod = (prod*(u[j][i] + 1.))
if(prod <= 2):
count = count + 1.
return count
# Generate utilization of the task set
U = np.arange(0.05, 1.0, 0.05)
# U = U[15:19]
# Total number of task sets to generate
T = 100000
# Number of tasks in each task set
n = [8, 16, 32, 64]
y = []
for l in range(len(n)):
y.append([])
for k in range(len(n)):
for Ui in U:
Uii = roundoff(Ui)
Ui = bigint(Ui)
# Construct T utilization vectors
u = []
for j in range(T):
ut = []
for i in range(n[k]):
# Uniform sampling in Σ ui = Ui
ut.append(bigint(random.betavariate(1, 10000)))
u.append(ut)
# Sum of n generated utilization vector values for T task sets
us = []
for j in range(T):
uts = 0
for i in range(n[k]):
uts = uts + u[j][i]
us.append(uts)
# Actual utilization vectors that sum up to Ui
for j in range(T):
usj = us[j]
for i in range(n[k]):
u[j][i] = ((u[j][i]/usj)*Ui)/10**10
# Fraction of T task sets that will be schedulable by RM
count = checkRMsched(u, n[k])
count = (count/T)*100.
print("Percentage of tasks sets schedulable by RM for Ui of", Uii, 'is', count)
y[k].append(count)
x = U
fig, ax = plt.subplots()
ax.plot(x,y[0], label='RM, 8 tasks')
ax.plot(x,y[1], label='RM, 16 tasks')
ax.plot(x,y[2], label='RM, 32 tasks')
ax.plot(x,y[3], label='RM, 64 tasks')
ax.legend()
ax.plot()
ax.set(xlabel="Utilization factor", ylabel="Percent of task sets schedulable by RM",
title="Percent of task sets schedulable by RM vs utilization")
ax.grid()
plt.show()
import random, sys, time, threading
import numpy as np
from scipy.stats import loguniform
# Number of tasks in each task set
n = 16
# Total utilization
U0 = float(sys.argv[1])
# U0 = 0.1
# Time to run the algorithm for and consequently the time that caps the maximum period possible
ts = int(sys.argv[2])
# ts = 1000 # Value we have been using, around 16 minutes
# Values possible: RMS, SPTF, MUF
sched = str(sys.argv[3])
# This is to divide generated values by their total sum so they sum up to U
def bigint(num):
# Round off floats to 3 decimal places
num = float('%.3f'%(num))
# Convert to a big int preserving all 3 decimal digits
return int(num * 10**3)
# Round off to three places
def round3(num):
num = float('%.3f'%(num))
return num
U = bigint(U0)
while(True):
u = []
for i in range(n):
# Uniform sampling in Σ ui = U
ut = bigint(random.betavariate(1, 100))
u.append(ut)
# Sum of n generated utilization vector values for T task sets
us = []
uts = 0
for i in range(n):
uts = uts + u[i]
# Actual utilization vectors that sum up to U
for i in range(n):
u[i] = ((u[i]/uts)*U)/10**3
# Utilization vectors for tasks
flag = 0
sumt = 0
for i in range(n):
u[i] = round3(u[i])
sumt = sumt + u[i]
# Add the round off for one task when sum is slightly larger or lesser than U
offset = sumt - U0
offset = round3(offset)
ind = random.randint(0, n-1)
u[ind] = u[ind] - offset
u[ind] = round3(u[ind])
# Our rounding off and computations could results in some invalid utilization vectors
# So regenerate in case that happens, or break out of loop if u values are legit
for i in range(n):
if(u[i] <= 0):
flag = 1
if(flag == 0):
break
print("Util vectors: ", u)
if (sched == "RMS"):
check = 1
# Check schedulable
for i in range(n):
check = check*(u[i] + 1)
if(check <= 2):
print("Schedulable with Ubound: ", check)
else:
print("Some tasks might be missed because bound is: ", check)
# Question asks between 10 to 100000, but upper limit should be less than time you run for
P = loguniform.rvs(1, 5, size=n)
P = list(P)
for i in range(n):
P[i] = float('%.2f'%(P[i]))
print("Task periods: ", P)
# Calculate computation times
C = []
for i in range(n):
C.append(u[i]*P[i])
for i in range(n):
C[i] = float('%.3f'%(C[i]))
print("Task computation: ", C)
# Calculate priorities based on if RMS or SPTF or MUF
if (sched == "SPTF"):
Ci = C[:]
Pr = [0] * n
for i in range(n):
cmin = min(Ci)
j = Ci.index(cmin)
Pr[j] = i+1
Ci[j] = 1000001
elif (sched == "RMS"):
Pi = P[:]
Pr = [0] * n
for i in range(n):
pmin = min(Pi)
j = Pi.index(pmin)
Pr[j] = i+1
Pi[j] = 1000001
elif (sched == "MUF"):
Ui = u[:]
Pr = [0] * n
for i in range(n):
umax = max(Ui)
j = Ui.index(umax)
Pr[j] = i+1
Ui[j] = 0
print("Task priorities", Pr)
ready_q = []
missed_tasks = []
completed_tasks = []
def print_current_sec():
while(True):
print(time.ctime())
time.sleep(1)
def arrive_task(i, P, C, Pr):
while True:
print("Task %d has arrived" % i)
ready_q.append([Pr[i], C[i], i])
time.sleep(P[i])
time_slice = 0.001 # Can't be lesser because time.sleep on Linux is accuracte till 1ms
def preemptive():
while True:
if(ready_q):
for i in range(len(ready_q)-1):
if(ready_q[i][0] == ready_q[-1][0]):
missed_tasks.append(ready_q[i])
print("Task %d has missed deadline and was removed" % ready_q[i][2])
ready_q.remove(ready_q[i])
# print(ready_q)
qmin = min(ready_q)
index = ready_q.index(qmin)
ready_q[index][1] = ready_q[index][1] - time_slice
if(ready_q[index][1] < time_slice):
print("Task %d was removed" % ready_q[index][2])
completed_tasks.append(ready_q[index][2])
ready_q.remove(qmin)
time.sleep(time_slice)
def nonpreemtive():
while True:
if(ready_q):
for i in range(len(ready_q)-1):
if(ready_q[i][0] == ready_q[-1][0]):
missed_tasks.append(ready_q[i])
print("Task %d has missed deadline and was removed" % ready_q[i][2])
ready_q.remove(ready_q[i])
qmin = min(ready_q)
index = ready_q.index(qmin)
while(ready_q[index][1] > time_slice):
ready_q[index][1] = ready_q[index][1] - time_slice
time.sleep(time_slice)
print("Task %d was removed" % ready_q[index][2])
completed_tasks.append(ready_q[index][2])
ready_q.remove(qmin)
# Run simulation
gtime = time.time()
for i in range(n):
thr = threading.Thread(target=arrive_task, args=(i, P, C, Pr), daemon=True)
thr.start()
# thr = threading.Thread(target=print_current_sec, daemon=True)
# thr.start()
thr = threading.Thread(target=preemptive, daemon=True)
thr.start()
# thr = threading.Thread(target=nonpreemptive, daemon=True)
# thr.start()
time.sleep(ts)
print("# missed tasks: ", len(missed_tasks))
print("# completed tasks", len(completed_tasks))
print("Scheduling time elapsed:", time.time()-gtime)
import random, sys
import numpy as np
from mpl_toolkits import mplot3d
import matplotlib.pyplot as plt
from matplotlib.pyplot import cm
# This is to divide generated values by their total sum so they sum up to U
def bigint(num):
# Round off floats to 10 decimal places
num = float('%.10f'%(num))
# Convert to a big int preserving all 10 decimal digits
return int(num * 10**10)
# Fix the utilization of the task set
U=0.1
U=bigint(U)
# Number of task sets
T = 10000
# Number of tasks in task set
n = 3
u = []
alphas = [1., 1., 1.]
for j in range(T):
# Uniform sampling in Σ ui = U
ut = list(np.random.dirichlet(alphas))
for i in range(n):
ut[i] = bigint(ut[i])
u.append(ut)
# Sum of n generated utilization vector values for T task sets
us = []
for j in range(T):
uts = 0
for i in range(n):
uts = uts + u[j][i]
us.append(uts)
# Actual utilization vectors that sum up to Ui
for j in range(T):
usj = us[j]
for i in range(n):
u[j][i] = ((u[j][i]/usj)*U)/10**10
us = []
fig = plt.figure()
ax = plt.axes(projection='3d')
xd = []
yd = []
zd = []
for j in range(T):
xd.append(u[j][0])
yd.append(u[j][1])
zd.append(u[j][2])
ax.scatter3D(xd, yd, zd, s=0.2, c='blue')
plt.gca().invert_xaxis()
plt.gca().invert_zaxis()
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment