Last active
November 16, 2021 16:25
-
-
Save devster31/5fcd76310180fdff1654ee993be34397 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""Creates a shift scheduling problem and solves it.""" | |
from absl import app | |
from absl import flags | |
from ortools.sat.python import cp_model | |
from google.protobuf import text_format | |
FLAGS = flags.FLAGS | |
flags.DEFINE_string("params", "max_time_in_seconds:10.0", "Sat solver parameters.") | |
def negated_bounded_span(works, start, length): | |
"""Filters an isolated sub-sequence of variables assined to True. | |
Extract the span of Boolean variables [start, start + length), negate them, | |
and if there is variables to the left/right of this span, surround the span by | |
them in non negated form. | |
Args: | |
works: a list of variables to extract the span from. | |
start: the start to the span. | |
length: the length of the span. | |
Returns: | |
a list of variables which conjunction will be false if the sub-list is | |
assigned to True, and correctly bounded by variables assigned to False, | |
or by the start or end of works. | |
""" | |
sequence = [] | |
# Left border (start of works, or works[start - 1]) | |
if start > 0: | |
sequence.append(works[start - 1]) | |
for i in range(length): | |
sequence.append(works[start + i].Not()) | |
# Right border (end of works or works[start + length]) | |
if start + length < len(works): | |
sequence.append(works[start + length]) | |
return sequence | |
def add_soft_sequence_constraint( | |
model, works, hard_min, soft_min, min_cost, soft_max, hard_max, max_cost, prefix | |
): | |
"""Sequence constraint on true variables with soft and hard bounds. | |
This constraint look at every maximal contiguous sequence of variables | |
assigned to true. If forbids sequence of length < hard_min or > hard_max. | |
Then it creates penalty terms if the length is < soft_min or > soft_max. | |
Args: | |
model: the sequence constraint is built on this model. | |
works: a list of Boolean variables. | |
hard_min: any sequence of true variables must have a length of at least | |
hard_min. | |
soft_min: any sequence should have a length of at least soft_min, or a | |
linear penalty on the delta will be added to the objective. | |
min_cost: the coefficient of the linear penalty if the length is less than | |
soft_min. | |
soft_max: any sequence should have a length of at most soft_max, or a linear | |
penalty on the delta will be added to the objective. | |
hard_max: any sequence of true variables must have a length of at most | |
hard_max. | |
max_cost: the coefficient of the linear penalty if the length is more than | |
soft_max. | |
prefix: a base name for penalty literals. | |
Returns: | |
a tuple (variables_list, coefficient_list) containing the different | |
penalties created by the sequence constraint. | |
""" | |
cost_literals = [] | |
cost_coefficients = [] | |
# Forbid sequences that are too short. | |
for length in range(1, hard_min): | |
for start in range(len(works) - length + 1): | |
model.AddBoolOr(negated_bounded_span(works, start, length)) | |
# Penalize sequences that are below the soft limit. | |
if min_cost > 0: | |
for length in range(hard_min, soft_min): | |
for start in range(len(works) - length + 1): | |
span = negated_bounded_span(works, start, length) | |
name = ": under_span(start=%i, length=%i)" % (start, length) | |
lit = model.NewBoolVar(prefix + name) | |
span.append(lit) | |
model.AddBoolOr(span) | |
cost_literals.append(lit) | |
# We filter exactly the sequence with a short length. | |
# The penalty is proportional to the delta with soft_min. | |
cost_coefficients.append(min_cost * (soft_min - length)) | |
# Penalize sequences that are above the soft limit. | |
if max_cost > 0: | |
for length in range(soft_max + 1, hard_max + 1): | |
for start in range(len(works) - length + 1): | |
span = negated_bounded_span(works, start, length) | |
name = ": over_span(start=%i, length=%i)" % (start, length) | |
lit = model.NewBoolVar(prefix + name) | |
span.append(lit) | |
model.AddBoolOr(span) | |
cost_literals.append(lit) | |
# Cost paid is max_cost * excess length. | |
cost_coefficients.append(max_cost * (length - soft_max)) | |
# Just forbid any sequence of true variables with length hard_max + 1 | |
for start in range(len(works) - hard_max): | |
model.AddBoolOr([works[i].Not() for i in range(start, start + hard_max + 1)]) | |
return cost_literals, cost_coefficients | |
def add_soft_sum_constraint( | |
model, works, hard_min, soft_min, min_cost, soft_max, hard_max, max_cost, prefix | |
): | |
"""Sum constraint with soft and hard bounds. | |
This constraint counts the variables assigned to true from works. | |
If forbids sum < hard_min or > hard_max. | |
Then it creates penalty terms if the sum is < soft_min or > soft_max. | |
Args: | |
model: the sequence constraint is built on this model. | |
works: a list of Boolean variables. | |
hard_min: any sequence of true variables must have a sum of at least | |
hard_min. | |
soft_min: any sequence should have a sum of at least soft_min, or a linear | |
penalty on the delta will be added to the objective. | |
min_cost: the coefficient of the linear penalty if the sum is less than | |
soft_min. | |
soft_max: any sequence should have a sum of at most soft_max, or a linear | |
penalty on the delta will be added to the objective. | |
hard_max: any sequence of true variables must have a sum of at most | |
hard_max. | |
max_cost: the coefficient of the linear penalty if the sum is more than | |
soft_max. | |
prefix: a base name for penalty variables. | |
Returns: | |
a tuple (variables_list, coefficient_list) containing the different | |
penalties created by the sequence constraint. | |
""" | |
cost_variables = [] | |
cost_coefficients = [] | |
sum_var = model.NewIntVar(hard_min, hard_max, "") | |
# This adds the hard constraints on the sum. | |
model.Add(sum_var == sum(works)) | |
# Penalize sums below the soft_min target. | |
if soft_min > hard_min and min_cost > 0: | |
delta = model.NewIntVar(-len(works), len(works), "") | |
model.Add(delta == soft_min - sum_var) | |
# TODO(user): Compare efficiency with only excess >= soft_min - sum_var. | |
excess = model.NewIntVar(0, 7, prefix + ": under_sum") | |
model.AddMaxEquality(excess, [delta, 0]) | |
cost_variables.append(excess) | |
cost_coefficients.append(min_cost) | |
# Penalize sums above the soft_max target. | |
if soft_max < hard_max and max_cost > 0: | |
delta = model.NewIntVar(-7, 7, "") | |
model.Add(delta == sum_var - soft_max) | |
excess = model.NewIntVar(0, 7, prefix + ": over_sum") | |
model.AddMaxEquality(excess, [delta, 0]) | |
cost_variables.append(excess) | |
cost_coefficients.append(max_cost) | |
return cost_variables, cost_coefficients | |
def solve_shift_scheduling(params): | |
"""Solves the shift scheduling problem.""" | |
# Data | |
spec = ["Dan", "Mic", "Gio", "Alb", "Fra"] | |
num_spec = len(spec) | |
num_weeks = 4 | |
shifts = ["Rip", "Rep", "Pom", "AmA", "AmB"] | |
num_days = num_weeks * 7 | |
num_shifts = len(shifts) | |
# Shift constraints on continuous sequence: | |
# (shift, hard_min, soft_min, min_penalty, | |
# soft_max, hard_max, max_penalty) | |
shift_constraints = [ | |
# One or two consecutive days of rest, this is a hard constraint. | |
('Rip', 1, 1, 0, 2, 2, 0), | |
# max 1 afternoon | |
('Pom', 1, 1, 0, 1, 1, 0), | |
# min 1 max 5 days of reparto | |
('Rep', 1, 2, 2, 4, 5, 2), | |
] | |
# Weekly sum constraints on shifts days: | |
# (shift, hard_min, soft_min, min_penalty, | |
# soft_max, hard_max, max_penalty) | |
weekly_sum_constraints = [ | |
# Constraints on rests per week. | |
("Rip", 1, 2, 7, 2, 2, 4), | |
] | |
# daily demands for work shifts (morning, afternon, night) for each day | |
# of the week starting on Monday. | |
weekly_cover_demands = [ | |
(2, 1, 1, 0), # Monday | |
(2, 1, 1, 0), # Tuesday | |
(2, 1, 1, 0), # Wednesday | |
(2, 1, 1, 0), # Thursday | |
(2, 1, 1, 0), # Friday | |
(1, 0, 0, 0), # Saturday | |
(1, 0, 0, 0), # Sunday | |
] | |
# Penalty for exceeding the cover constraint per shift type. | |
excess_cover_penalties = (5, 5, 5, 0) | |
model = cp_model.CpModel() | |
work = {} | |
for e in spec: | |
for s in shifts: | |
for d in range(num_days): | |
work[e, s, d] = model.NewBoolVar(f"work_{e}_{s}_{d}") | |
# Linear terms of the objective in a minimization context. | |
obj_int_vars = [] | |
obj_int_coeffs = [] | |
obj_bool_vars = [] | |
obj_bool_coeffs = [] | |
# Exactly one shift per day. | |
for e in spec: | |
for d in range(num_days): | |
model.Add(sum(work[e, s, d] for s in shifts) == 1) | |
# Shift constraints | |
for ct in shift_constraints: | |
shift, hard_min, soft_min, min_cost, soft_max, hard_max, max_cost = ct | |
for e in spec: | |
works = [work[e, shift, d] for d in range(num_days)] | |
variables, coeffs = add_soft_sequence_constraint( | |
model, | |
works, | |
hard_min, | |
soft_min, | |
min_cost, | |
soft_max, | |
hard_max, | |
max_cost, | |
f"shift_constraint(employee {e}, shift {shift})", | |
) | |
obj_bool_vars.extend(variables) | |
obj_bool_coeffs.extend(coeffs) | |
# Assign saturdays and sundays together | |
for d in range(num_days): | |
if d % 7 == 5: | |
for e in spec: | |
model.AddImplication(work[e, shift, d], work[e, shift, d + 1]) | |
for s in shifts: | |
model.Add(sum(work[e, s, d] for e in spec) == 1) | |
# Weekly sum constraints | |
for ct in weekly_sum_constraints: | |
shift, hard_min, soft_min, min_cost, soft_max, hard_max, max_cost = ct | |
for e in spec: | |
for w in range(num_weeks): | |
works = [work[e, shift, d + w * 7] for d in range(7)] | |
variables, coeffs = add_soft_sum_constraint( | |
model, | |
works, | |
hard_min, | |
soft_min, | |
min_cost, | |
soft_max, | |
hard_max, | |
max_cost, | |
f"weekly_sum_constraint(employee {e}, shift {shift}, week {w:d})" | |
) | |
obj_int_vars.extend(variables) | |
obj_int_coeffs.extend(coeffs) | |
# domains for weekend? | |
# missing bans on employee assignments | |
for e in ["Gio", "Alb", "Fra"]: | |
for s in ["AmA", "AmB"]: | |
works = [work[e, s, d] for d in range(num_days)] | |
model.AddForbiddenAssignments(works, tuple([1 for x in range(num_days)])) | |
# Cover constraints | |
for s in shifts[1:]: | |
for w in range(num_weeks): | |
for d in range(7): | |
works = [work[e, s, w * 7 + d] for e in spec] | |
min_demand = weekly_cover_demands[d][shifts.index(s) - 1] # Ignore Off shift. | |
worked = model.NewIntVar(min_demand, num_spec, "") | |
model.Add(worked == sum(works)) | |
over_penalty = excess_cover_penalties[shifts.index(s) - 1] | |
if over_penalty > 0: | |
name = f"excess_demand(shift={s}, week={w:d}, day={d:d})" | |
excess = model.NewIntVar(0, num_spec - min_demand, name) | |
model.Add(excess == worked - min_demand) | |
obj_int_vars.append(excess) | |
obj_int_coeffs.append(over_penalty) | |
# Objective | |
model.Minimize( | |
sum(obj_bool_vars[i] * obj_bool_coeffs[i] for i in range(len(obj_bool_vars))) | |
+ sum(obj_int_vars[i] * obj_int_coeffs[i] for i in range(len(obj_int_vars))) | |
) | |
# Solve the model. | |
solver = cp_model.CpSolver() | |
if params: | |
text_format.Parse(params, solver.parameters) | |
solution_printer = cp_model.ObjectiveSolutionPrinter() | |
status = solver.Solve(model, solution_printer) | |
# Print solution. | |
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE: | |
print() | |
print(f" {'Mon Tue Wed Thu Fri Sat Sun ' * num_weeks}") | |
for e in spec: | |
schedule = "" | |
for d in range(num_days): | |
for s in shifts: | |
if solver.BooleanValue(work[e, s, d]): | |
schedule += shifts[shifts.index(s)] + " " | |
print(f"worker {e}: {schedule}") | |
print() | |
print("Penalties:") | |
for i, var in enumerate(obj_bool_vars): | |
if solver.BooleanValue(var): | |
penalty = obj_bool_coeffs[i] | |
if penalty > 0: | |
print(" %s violated, penalty=%i" % (var.Name(), penalty)) | |
else: | |
print(" %s fulfilled, gain=%i" % (var.Name(), -penalty)) | |
for i, var in enumerate(obj_int_vars): | |
if solver.Value(var) > 0: | |
print( | |
" %s violated by %i, linear penalty=%i" | |
% (var.Name(), solver.Value(var), obj_int_coeffs[i]) | |
) | |
print() | |
print("Statistics") | |
print(" - status : %s" % solver.StatusName(status)) | |
print(" - conflicts : %i" % solver.NumConflicts()) | |
print(" - branches : %i" % solver.NumBranches()) | |
print(" - wall time : %f s" % solver.WallTime()) | |
def main(_): | |
solve_shift_scheduling(FLAGS.params) | |
if __name__ == "__main__": | |
app.run(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment