Last active
May 6, 2023 17:15
-
-
Save simonLeary42/9203b8d7afcdb4272730d6c5d6118ae3 to your computer and use it in GitHub Desktop.
A simulation of Rate Monotonic and Earliest Deadline First scheduling algorithms
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import bisect | |
import math | |
N = 4 # number of different tasks | |
T_MAX = 25 # how many time steps to simulate | |
MIN_PERIOD = 2 #integer! | |
MAX_PERIOD = 9 #integer! | |
class Task: | |
def __init__(self, exec_time: float, period: int, name: str): | |
self.exec_time = exec_time | |
self.period = period | |
self.name = name | |
class RuntimeTask: | |
def __init__(self, exec_time: float, period: int, deadline: int, name: str): | |
self.exec_time = exec_time | |
self.period = period | |
self.deadline = deadline | |
self.name = name | |
def __str__(self): | |
return f"task {self.name}: period={self.period}, deadline={self.deadline}, time_remaining={self.exec_time}" | |
def compare_lowest_period(x: RuntimeTask, y: RuntimeTask, time: int): | |
# negative -> choose x, positive -> choose y, 0 -> choose either | |
return x.period - y.period | |
def compare_earliest_deadline(x: RuntimeTask, y: RuntimeTask, time: int): | |
# negative -> choose x, positive -> choose y, 0 -> choose either | |
return (x.deadline - time) - (y.deadline - time) | |
def is_name_in_list(name: str, _list) -> bool: | |
for elem in _list: | |
if elem.name == name: | |
return True | |
return False | |
def schedule_taks(task_list, comparison) -> bool: | |
runtime_tasks = [] | |
for t in range(T_MAX): | |
print(f"time {t}") | |
# add new runtime tasks if this moment in time is a multiple of their period | |
for task in task_list: | |
if t == 0 or t % task.period == 0: | |
runtime_task = RuntimeTask(task.exec_time, task.period, t + task.period, task.name) | |
print(f" inserting task {task.name}") | |
if is_name_in_list(task.name, runtime_tasks): | |
print(f"scheduling failed! task {task.name} is already on the queue!") | |
return False | |
# insert the runtime task into the sorted list using the comparison | |
bisect.insort(runtime_tasks, runtime_task, key=lambda x: comparison(x, runtime_task, t)) | |
for x in runtime_tasks: | |
print(f" {x}") | |
# decide what to do with this one unit of CPU time | |
cpu_time = 1 | |
while(cpu_time > 0): | |
if len(runtime_tasks) == 0: | |
print(" no tasks to complete!") | |
break | |
lowest_period_task = runtime_tasks[0] | |
# how much time I have to spend, or how much time it takes to complete this task | |
time_spent = min([cpu_time, lowest_period_task.exec_time]) | |
print(f" spending {time_spent} units of time on task {lowest_period_task.name}") | |
cpu_time -= time_spent | |
lowest_period_task.exec_time -= time_spent | |
if lowest_period_task.exec_time == 0: | |
print(f" completed task {lowest_period_task.name}") | |
runtime_tasks.pop(0) | |
print("scheduling completed successfully.") | |
return True | |
def main(): | |
# generate N positive integers and then normalize | |
rand_ints = [ random.randint(5,95) for i in range(N) ] | |
sum_ints = sum(rand_ints) | |
norm_ints = [ x/sum_ints for x in rand_ints ] | |
fractions_cpu_time = [ math.floor(x * 100)/100.0 for x in norm_ints ] # 2 decimal places | |
tasks = [] | |
for i in range(N): | |
this_fraction_cpu_time = fractions_cpu_time[i] | |
this_period = random.randint(MIN_PERIOD, MAX_PERIOD) | |
# exec_time / period = fraction_cpu_time | |
# exec_time = fraction_cpu_time * period | |
this_exec_time = this_period * this_fraction_cpu_time | |
print(f"task {i}: e={this_exec_time}, P={this_period}, e/P={this_fraction_cpu_time}") | |
tasks.append(Task(this_exec_time, this_period, str(i))) | |
print(f"sum of all e/P: {sum(fractions_cpu_time)}") | |
print() | |
print("rate monotonic scheduling:") | |
rm_success = schedule_taks(tasks, compare_lowest_period) | |
print() | |
print("earliest deadline first scheduling:") | |
edf_success = schedule_taks(tasks, compare_earliest_deadline) | |
print() | |
print(f"RM: {rm_success}, EDF: {edf_success}") | |
if __name__=="__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment