Created
March 16, 2024 00:39
-
-
Save kennyyu/e424a0d7155a8951d51944869e8d87e7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
from collections import deque | |
from dataclasses import dataclass | |
# Key is hour number | |
# 0 == midnight | |
# 1 == 1am, etc. | |
NUM_PATIENTS_PER_HOUR = { | |
4: 2, | |
5: 11, | |
6: 36, | |
7: 47, | |
8: 61, | |
9: 75, | |
10: 75, | |
11: 58, | |
12: 36, | |
13: 25, | |
14: 12, | |
15: 3, | |
16: 1, | |
} | |
# Key is hour number | |
NUM_PROVIDERS_PER_HOUR = { | |
4: 1, | |
5: 1, | |
6: 12, | |
7: 16, | |
8: 21, | |
9: 3, | |
10: 3, | |
11: 5, | |
12: 5, | |
13: 9, | |
14: 4, | |
15: 1, | |
16: 1, | |
} | |
# How long each patient's visit is, in minutes | |
PATIENT_VISIT_TIME_MIN = 20 | |
# How many patients a provider can see at a time | |
NUM_WORKERS_PER_PROVIDER = 3 | |
# If a patient waits longer than this, this is bad | |
PATIENT_WAIT_TIME_MIN = 12 | |
### Adjust the simulation variables above | |
###################################################### | |
@dataclass | |
class Patient: | |
# index into PATIENTS | |
id: int | |
# Time the patient arrived | |
arrival_min: int | |
# Time the patient's visit started | |
# None if it hasn't started yet | |
visit_started_min: int | None | |
# Time the patient's visit finished | |
# None if it hasn't finished yet | |
visit_finished_min: int | None | |
@dataclass | |
class Worker: | |
# index into WORKERS | |
id: int | |
# Time the worker starts | |
shift_start_min: int | |
# Time the worker stops working. If the worker is still seeing a patient, | |
# the worker will stop after the patient visit is finished. | |
shift_end_min: int | |
# Current patient id the worker is seeing, if there is one | |
current_patient_id: int | None | |
def make_patients(num_patients_per_hour: dict[int, int]) -> list[Patient]: | |
patients = [] | |
patient_id = 0 | |
for hour_num, num_patients in num_patients_per_hour.items(): | |
for i in range(num_patients): | |
patients.append(Patient( | |
id=patient_id, | |
arrival_min=hour_num * 60 + random.randint(0, 59), | |
visit_started_min=None, | |
visit_finished_min=None, | |
)) | |
patient_id += 1 | |
return patients | |
def make_workers(num_providers_per_hour: dict[int, int]) -> list[Worker]: | |
workers = [] | |
worker_id = 0 | |
for hour_num, num_providers in num_providers_per_hour.items(): | |
for i in range(num_providers * NUM_WORKERS_PER_PROVIDER): | |
workers.append(Worker( | |
id=worker_id, | |
shift_start_min=hour_num * 60, | |
shift_end_min=(hour_num + 1) * 60, | |
current_patient_id=None, | |
)) | |
worker_id += 1 | |
return workers | |
PATIENTS = make_patients(NUM_PATIENTS_PER_HOUR) | |
WORKERS = make_workers(NUM_PROVIDERS_PER_HOUR) | |
def patient_tick(current_min: int, queue: deque[int], patient: Patient) -> None: | |
if current_min == patient.arrival_min: | |
queue.append(patient.id) | |
def worker_tick(current_min: int, queue: deque[int], worker: Worker) -> None: | |
# check if the shift has started yet | |
if current_min < worker.shift_start_min: | |
return | |
# worker ends their shift if we've reached the end of the shift, | |
# or when the current patient is finished (might exceed end of shift) | |
if current_min >= worker.shift_end_min and worker.current_patient_id is None: | |
return | |
if worker.current_patient_id is None: | |
# Not currently seeing a patient | |
if len(queue) == 0: | |
# No patients waiting | |
return | |
# Find the next patient and start their visit | |
worker.current_patient_id = queue.popleft() | |
patient = PATIENTS[worker.current_patient_id] | |
patient.visit_started_min = current_min | |
return | |
# Check if the current patient visit is finished | |
patient = PATIENTS[worker.current_patient_id] | |
if current_min - patient.visit_started_min == PATIENT_VISIT_TIME_MIN: | |
patient.visit_finished_min = current_min | |
worker.current_patient_id = None | |
return | |
def simulation(): | |
# Last minute of the day | |
end_min = 24 * 60 | |
# List of patient ids in the queue | |
queue = deque[int]() | |
# Every minute, run the simulation for all patients and workers | |
for current_min in range(end_min): | |
for patient in PATIENTS: | |
patient_tick(current_min, queue, patient) | |
for worker in WORKERS: | |
worker_tick(current_min, queue, worker) | |
# Aggregate results | |
num_patients_still_waiting = 0 | |
num_patients_waiting_too_long = 0 | |
for patient in PATIENTS: | |
if patient.visit_started_min is None: | |
num_patients_still_waiting += 1 | |
continue | |
if patient.visit_started_min - patient.arrival_min > PATIENT_WAIT_TIME_MIN: | |
num_patients_waiting_too_long += 1 | |
print(f"total patients: {len(PATIENTS)}") | |
print(f"num patients still waiting: {num_patients_still_waiting}") | |
print(f"num patients waiting too long: {num_patients_waiting_too_long}") | |
print(f"pct patients waiting too long: {num_patients_waiting_too_long * 100/ len(PATIENTS)}%") | |
simulation() |
Author
kennyyu
commented
Mar 16, 2024
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment