Skip to content

Instantly share code, notes, and snippets.

@devster31
Last active November 16, 2021 16:25
Show Gist options
  • Save devster31/5fcd76310180fdff1654ee993be34397 to your computer and use it in GitHub Desktop.
Save devster31/5fcd76310180fdff1654ee993be34397 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""Creates a shift scheduling problem and solves it."""
from absl import app
from absl import flags
from ortools.sat.python import cp_model
from google.protobuf import text_format
FLAGS = flags.FLAGS
flags.DEFINE_string("params", "max_time_in_seconds:10.0", "Sat solver parameters.")
def negated_bounded_span(works, start, length):
"""Filters an isolated sub-sequence of variables assined to True.
Extract the span of Boolean variables [start, start + length), negate them,
and if there is variables to the left/right of this span, surround the span by
them in non negated form.
Args:
works: a list of variables to extract the span from.
start: the start to the span.
length: the length of the span.
Returns:
a list of variables which conjunction will be false if the sub-list is
assigned to True, and correctly bounded by variables assigned to False,
or by the start or end of works.
"""
sequence = []
# Left border (start of works, or works[start - 1])
if start > 0:
sequence.append(works[start - 1])
for i in range(length):
sequence.append(works[start + i].Not())
# Right border (end of works or works[start + length])
if start + length < len(works):
sequence.append(works[start + length])
return sequence
def add_soft_sequence_constraint(
model, works, hard_min, soft_min, min_cost, soft_max, hard_max, max_cost, prefix
):
"""Sequence constraint on true variables with soft and hard bounds.
This constraint look at every maximal contiguous sequence of variables
assigned to true. If forbids sequence of length < hard_min or > hard_max.
Then it creates penalty terms if the length is < soft_min or > soft_max.
Args:
model: the sequence constraint is built on this model.
works: a list of Boolean variables.
hard_min: any sequence of true variables must have a length of at least
hard_min.
soft_min: any sequence should have a length of at least soft_min, or a
linear penalty on the delta will be added to the objective.
min_cost: the coefficient of the linear penalty if the length is less than
soft_min.
soft_max: any sequence should have a length of at most soft_max, or a linear
penalty on the delta will be added to the objective.
hard_max: any sequence of true variables must have a length of at most
hard_max.
max_cost: the coefficient of the linear penalty if the length is more than
soft_max.
prefix: a base name for penalty literals.
Returns:
a tuple (variables_list, coefficient_list) containing the different
penalties created by the sequence constraint.
"""
cost_literals = []
cost_coefficients = []
# Forbid sequences that are too short.
for length in range(1, hard_min):
for start in range(len(works) - length + 1):
model.AddBoolOr(negated_bounded_span(works, start, length))
# Penalize sequences that are below the soft limit.
if min_cost > 0:
for length in range(hard_min, soft_min):
for start in range(len(works) - length + 1):
span = negated_bounded_span(works, start, length)
name = ": under_span(start=%i, length=%i)" % (start, length)
lit = model.NewBoolVar(prefix + name)
span.append(lit)
model.AddBoolOr(span)
cost_literals.append(lit)
# We filter exactly the sequence with a short length.
# The penalty is proportional to the delta with soft_min.
cost_coefficients.append(min_cost * (soft_min - length))
# Penalize sequences that are above the soft limit.
if max_cost > 0:
for length in range(soft_max + 1, hard_max + 1):
for start in range(len(works) - length + 1):
span = negated_bounded_span(works, start, length)
name = ": over_span(start=%i, length=%i)" % (start, length)
lit = model.NewBoolVar(prefix + name)
span.append(lit)
model.AddBoolOr(span)
cost_literals.append(lit)
# Cost paid is max_cost * excess length.
cost_coefficients.append(max_cost * (length - soft_max))
# Just forbid any sequence of true variables with length hard_max + 1
for start in range(len(works) - hard_max):
model.AddBoolOr([works[i].Not() for i in range(start, start + hard_max + 1)])
return cost_literals, cost_coefficients
def add_soft_sum_constraint(
model, works, hard_min, soft_min, min_cost, soft_max, hard_max, max_cost, prefix
):
"""Sum constraint with soft and hard bounds.
This constraint counts the variables assigned to true from works.
If forbids sum < hard_min or > hard_max.
Then it creates penalty terms if the sum is < soft_min or > soft_max.
Args:
model: the sequence constraint is built on this model.
works: a list of Boolean variables.
hard_min: any sequence of true variables must have a sum of at least
hard_min.
soft_min: any sequence should have a sum of at least soft_min, or a linear
penalty on the delta will be added to the objective.
min_cost: the coefficient of the linear penalty if the sum is less than
soft_min.
soft_max: any sequence should have a sum of at most soft_max, or a linear
penalty on the delta will be added to the objective.
hard_max: any sequence of true variables must have a sum of at most
hard_max.
max_cost: the coefficient of the linear penalty if the sum is more than
soft_max.
prefix: a base name for penalty variables.
Returns:
a tuple (variables_list, coefficient_list) containing the different
penalties created by the sequence constraint.
"""
cost_variables = []
cost_coefficients = []
sum_var = model.NewIntVar(hard_min, hard_max, "")
# This adds the hard constraints on the sum.
model.Add(sum_var == sum(works))
# Penalize sums below the soft_min target.
if soft_min > hard_min and min_cost > 0:
delta = model.NewIntVar(-len(works), len(works), "")
model.Add(delta == soft_min - sum_var)
# TODO(user): Compare efficiency with only excess >= soft_min - sum_var.
excess = model.NewIntVar(0, 7, prefix + ": under_sum")
model.AddMaxEquality(excess, [delta, 0])
cost_variables.append(excess)
cost_coefficients.append(min_cost)
# Penalize sums above the soft_max target.
if soft_max < hard_max and max_cost > 0:
delta = model.NewIntVar(-7, 7, "")
model.Add(delta == sum_var - soft_max)
excess = model.NewIntVar(0, 7, prefix + ": over_sum")
model.AddMaxEquality(excess, [delta, 0])
cost_variables.append(excess)
cost_coefficients.append(max_cost)
return cost_variables, cost_coefficients
def solve_shift_scheduling(params):
"""Solves the shift scheduling problem."""
# Data
spec = ["Dan", "Mic", "Gio", "Alb", "Fra"]
num_spec = len(spec)
num_weeks = 4
shifts = ["Rip", "Rep", "Pom", "AmA", "AmB"]
num_days = num_weeks * 7
num_shifts = len(shifts)
# Shift constraints on continuous sequence:
# (shift, hard_min, soft_min, min_penalty,
# soft_max, hard_max, max_penalty)
shift_constraints = [
# One or two consecutive days of rest, this is a hard constraint.
('Rip', 1, 1, 0, 2, 2, 0),
# max 1 afternoon
('Pom', 1, 1, 0, 1, 1, 0),
# min 1 max 5 days of reparto
('Rep', 1, 2, 2, 4, 5, 2),
]
# Weekly sum constraints on shifts days:
# (shift, hard_min, soft_min, min_penalty,
# soft_max, hard_max, max_penalty)
weekly_sum_constraints = [
# Constraints on rests per week.
("Rip", 1, 2, 7, 2, 2, 4),
]
# daily demands for work shifts (morning, afternon, night) for each day
# of the week starting on Monday.
weekly_cover_demands = [
(2, 1, 1, 0), # Monday
(2, 1, 1, 0), # Tuesday
(2, 1, 1, 0), # Wednesday
(2, 1, 1, 0), # Thursday
(2, 1, 1, 0), # Friday
(1, 0, 0, 0), # Saturday
(1, 0, 0, 0), # Sunday
]
# Penalty for exceeding the cover constraint per shift type.
excess_cover_penalties = (5, 5, 5, 0)
model = cp_model.CpModel()
work = {}
for e in spec:
for s in shifts:
for d in range(num_days):
work[e, s, d] = model.NewBoolVar(f"work_{e}_{s}_{d}")
# Linear terms of the objective in a minimization context.
obj_int_vars = []
obj_int_coeffs = []
obj_bool_vars = []
obj_bool_coeffs = []
# Exactly one shift per day.
for e in spec:
for d in range(num_days):
model.Add(sum(work[e, s, d] for s in shifts) == 1)
# Shift constraints
for ct in shift_constraints:
shift, hard_min, soft_min, min_cost, soft_max, hard_max, max_cost = ct
for e in spec:
works = [work[e, shift, d] for d in range(num_days)]
variables, coeffs = add_soft_sequence_constraint(
model,
works,
hard_min,
soft_min,
min_cost,
soft_max,
hard_max,
max_cost,
f"shift_constraint(employee {e}, shift {shift})",
)
obj_bool_vars.extend(variables)
obj_bool_coeffs.extend(coeffs)
# Assign saturdays and sundays together
for d in range(num_days):
if d % 7 == 5:
for e in spec:
model.AddImplication(work[e, shift, d], work[e, shift, d + 1])
for s in shifts:
model.Add(sum(work[e, s, d] for e in spec) == 1)
# Weekly sum constraints
for ct in weekly_sum_constraints:
shift, hard_min, soft_min, min_cost, soft_max, hard_max, max_cost = ct
for e in spec:
for w in range(num_weeks):
works = [work[e, shift, d + w * 7] for d in range(7)]
variables, coeffs = add_soft_sum_constraint(
model,
works,
hard_min,
soft_min,
min_cost,
soft_max,
hard_max,
max_cost,
f"weekly_sum_constraint(employee {e}, shift {shift}, week {w:d})"
)
obj_int_vars.extend(variables)
obj_int_coeffs.extend(coeffs)
# domains for weekend?
# missing bans on employee assignments
for e in ["Gio", "Alb", "Fra"]:
for s in ["AmA", "AmB"]:
works = [work[e, s, d] for d in range(num_days)]
model.AddForbiddenAssignments(works, tuple([1 for x in range(num_days)]))
# Cover constraints
for s in shifts[1:]:
for w in range(num_weeks):
for d in range(7):
works = [work[e, s, w * 7 + d] for e in spec]
min_demand = weekly_cover_demands[d][shifts.index(s) - 1] # Ignore Off shift.
worked = model.NewIntVar(min_demand, num_spec, "")
model.Add(worked == sum(works))
over_penalty = excess_cover_penalties[shifts.index(s) - 1]
if over_penalty > 0:
name = f"excess_demand(shift={s}, week={w:d}, day={d:d})"
excess = model.NewIntVar(0, num_spec - min_demand, name)
model.Add(excess == worked - min_demand)
obj_int_vars.append(excess)
obj_int_coeffs.append(over_penalty)
# Objective
model.Minimize(
sum(obj_bool_vars[i] * obj_bool_coeffs[i] for i in range(len(obj_bool_vars)))
+ sum(obj_int_vars[i] * obj_int_coeffs[i] for i in range(len(obj_int_vars)))
)
# Solve the model.
solver = cp_model.CpSolver()
if params:
text_format.Parse(params, solver.parameters)
solution_printer = cp_model.ObjectiveSolutionPrinter()
status = solver.Solve(model, solution_printer)
# Print solution.
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
print()
print(f" {'Mon Tue Wed Thu Fri Sat Sun ' * num_weeks}")
for e in spec:
schedule = ""
for d in range(num_days):
for s in shifts:
if solver.BooleanValue(work[e, s, d]):
schedule += shifts[shifts.index(s)] + " "
print(f"worker {e}: {schedule}")
print()
print("Penalties:")
for i, var in enumerate(obj_bool_vars):
if solver.BooleanValue(var):
penalty = obj_bool_coeffs[i]
if penalty > 0:
print(" %s violated, penalty=%i" % (var.Name(), penalty))
else:
print(" %s fulfilled, gain=%i" % (var.Name(), -penalty))
for i, var in enumerate(obj_int_vars):
if solver.Value(var) > 0:
print(
" %s violated by %i, linear penalty=%i"
% (var.Name(), solver.Value(var), obj_int_coeffs[i])
)
print()
print("Statistics")
print(" - status : %s" % solver.StatusName(status))
print(" - conflicts : %i" % solver.NumConflicts())
print(" - branches : %i" % solver.NumBranches())
print(" - wall time : %f s" % solver.WallTime())
def main(_):
solve_shift_scheduling(FLAGS.params)
if __name__ == "__main__":
app.run(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment