Skip to content

Instantly share code, notes, and snippets.

@simonLeary42
Last active May 6, 2023 17:15
Show Gist options
  • Save simonLeary42/9203b8d7afcdb4272730d6c5d6118ae3 to your computer and use it in GitHub Desktop.
Save simonLeary42/9203b8d7afcdb4272730d6c5d6118ae3 to your computer and use it in GitHub Desktop.
A simulation of Rate Monotonic and Earliest Deadline First scheduling algorithms
import random
import bisect
import math
N = 4 # number of different tasks
T_MAX = 25 # how many time steps to simulate
MIN_PERIOD = 2 #integer!
MAX_PERIOD = 9 #integer!
class Task:
def __init__(self, exec_time: float, period: int, name: str):
self.exec_time = exec_time
self.period = period
self.name = name
class RuntimeTask:
def __init__(self, exec_time: float, period: int, deadline: int, name: str):
self.exec_time = exec_time
self.period = period
self.deadline = deadline
self.name = name
def __str__(self):
return f"task {self.name}: period={self.period}, deadline={self.deadline}, time_remaining={self.exec_time}"
def compare_lowest_period(x: RuntimeTask, y: RuntimeTask, time: int):
# negative -> choose x, positive -> choose y, 0 -> choose either
return x.period - y.period
def compare_earliest_deadline(x: RuntimeTask, y: RuntimeTask, time: int):
# negative -> choose x, positive -> choose y, 0 -> choose either
return (x.deadline - time) - (y.deadline - time)
def is_name_in_list(name: str, _list) -> bool:
for elem in _list:
if elem.name == name:
return True
return False
def schedule_taks(task_list, comparison) -> bool:
runtime_tasks = []
for t in range(T_MAX):
print(f"time {t}")
# add new runtime tasks if this moment in time is a multiple of their period
for task in task_list:
if t == 0 or t % task.period == 0:
runtime_task = RuntimeTask(task.exec_time, task.period, t + task.period, task.name)
print(f" inserting task {task.name}")
if is_name_in_list(task.name, runtime_tasks):
print(f"scheduling failed! task {task.name} is already on the queue!")
return False
# insert the runtime task into the sorted list using the comparison
bisect.insort(runtime_tasks, runtime_task, key=lambda x: comparison(x, runtime_task, t))
for x in runtime_tasks:
print(f" {x}")
# decide what to do with this one unit of CPU time
cpu_time = 1
while(cpu_time > 0):
if len(runtime_tasks) == 0:
print(" no tasks to complete!")
break
lowest_period_task = runtime_tasks[0]
# how much time I have to spend, or how much time it takes to complete this task
time_spent = min([cpu_time, lowest_period_task.exec_time])
print(f" spending {time_spent} units of time on task {lowest_period_task.name}")
cpu_time -= time_spent
lowest_period_task.exec_time -= time_spent
if lowest_period_task.exec_time == 0:
print(f" completed task {lowest_period_task.name}")
runtime_tasks.pop(0)
print("scheduling completed successfully.")
return True
def main():
# generate N positive integers and then normalize
rand_ints = [ random.randint(5,95) for i in range(N) ]
sum_ints = sum(rand_ints)
norm_ints = [ x/sum_ints for x in rand_ints ]
fractions_cpu_time = [ math.floor(x * 100)/100.0 for x in norm_ints ] # 2 decimal places
tasks = []
for i in range(N):
this_fraction_cpu_time = fractions_cpu_time[i]
this_period = random.randint(MIN_PERIOD, MAX_PERIOD)
# exec_time / period = fraction_cpu_time
# exec_time = fraction_cpu_time * period
this_exec_time = this_period * this_fraction_cpu_time
print(f"task {i}: e={this_exec_time}, P={this_period}, e/P={this_fraction_cpu_time}")
tasks.append(Task(this_exec_time, this_period, str(i)))
print(f"sum of all e/P: {sum(fractions_cpu_time)}")
print()
print("rate monotonic scheduling:")
rm_success = schedule_taks(tasks, compare_lowest_period)
print()
print("earliest deadline first scheduling:")
edf_success = schedule_taks(tasks, compare_earliest_deadline)
print()
print(f"RM: {rm_success}, EDF: {edf_success}")
if __name__=="__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment