Skip to content

Instantly share code, notes, and snippets.

@tommyod
Last active March 1, 2022 12:27
Show Gist options
  • Save tommyod/7d3ee88b7c08fadab6de1ea1e615a925 to your computer and use it in GitHub Desktop.
Save tommyod/7d3ee88b7c08fadab6de1ea1e615a925 to your computer and use it in GitHub Desktop.
Python 3.7 implementation of the Wagner-Whitin algorithm for the dynamic lot size problem
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
The Wagner-Whitin algorithm for the dynamic lot size problem
--------------------------------------------------------------
@author: tommyod (https://github.com/tommyod)
An implementation of the Wagner-Whitin O(n^2) algorithm for the dynamic lot
size problem, also called the single-item uncapacitated lot-sizing problem.
Although the algorithm is O(n^2) in theory, it scales linearily with input
size in practise, as seen in the table below. The linear scaling is due to
using the Planning Horizon Theorem in the implementation.
Speed
-----
Periods(n) Time
10 => 45.4 µs
100 => 440 µs
1,000 => 4.53 ms
10,000 => 47.5 ms
100,000 => 495 ms
1,000,000 => 4.74 s
Correctness
-----------
The algorithm has been tested against the test case in the paper, and against
more naive (and slow) implementations. It gave the same answer as a naive
implementation on more than 10 million inputs.
For a description of the problem, see Wikipedia:
- https://en.wikipedia.org/wiki/Dynamic_lot-size_model
The reference is:
- Harvey M. Wagner and Thomson M. Whitin,
"Dynamic version of the economic lot size model,"
Management Science, Vol. 5, pp. 89–96, 1958
See also: https://tommyodland.com/articles/2021/the-dynamic-lot-size-problem
MIT License (https://choosealicense.com/licenses/mit/)
"""
# =============================================================================
# MAIN PROGRAM
# =============================================================================
import itertools
class CumSumList:
"""Fast sums from index i to j, by storing cumulative sums."""
def __init__(self, elements):
self.cumsums = [0] * len(elements)
partial_sum = 0
for i, element in enumerate(elements):
partial_sum += element
self.cumsums[i] = partial_sum
def sum_between(self, i, j):
"""Compute sum: elements[i] + elements[i+1] + ... + elements[j]"""
if i > j:
return 0
top = self.cumsums[j] if j < len(self.cumsums) else self.cumsums[-1]
low = self.cumsums[i - 1] if i >= 1 else 0
return top - low
def wagner_whitin(demands, order_costs, holding_costs):
"""Compute the optimal cost and program for the dynamic lot size problem.
Parameters
----------
demands : list
A list of non-negative demands. The final demand should be non-zero,
since if it's zero then this last element can be removed from the
problem.
order_costs : list
A list of non-negative order costs. The order cost is independent of
the number of products ordered. If products are ordered at all, then
the order cost must be paid. It does not matter if we order 1 or 99.
holding_costs : list
A list of non-negative order costs. The holding cost at index t
is the cost (per product) of holding from period t to period t+1.
Returns
-------
dict
Dictionary with keys 'cost' (minimal cost) and 'solution'
(optimal solution). One problem can have several optimal solutions,
but each optimal solution must achieve the minimum cost.
"""
d, o, h = demands, order_costs, holding_costs
_validate_inputs(demands, order_costs, holding_costs)
d_cumsum = CumSumList(d)
assert d[-1] > 0, "Final demand should be positive"
T = len(d) # Problem size
F = {-1: 0} # Base case for recursion. F[t] = min cost from period 0 to t
t_star_star = 0 # Highest period where minimum was achieved
# Used to construct the solution
cover_by = {} # cover_by[t] = j => cover period j to t by ordering at j
# Main forward recursion loop. At time t, choose a period j to order from
# to cover from period j to period t (inclusive on both ends)
for t in range(len(demands)):
# If there is no demand in period t
if d[t] == 0:
F[t] = F[t - 1]
cover_by[t] = t
continue
# There is demand in period t
assert d[t] > 0
S_t = 0 # Partial sum S_t(j) := h_j * d_{j+1, t} + S_t(j+1)
min_args = [] # List of arguments for the min() function
for j in reversed(range(t_star_star, t + 1)):
S_t += h[j] * d_cumsum.sum_between(j + 1, t)
min_args.append(o[j] + S_t + F[j - 1])
# Index of minimum value in list `min_args`
argmin = min_args.index(min(min_args))
# Update the t** variable - using the Planning Horizon Theorem
t_star_star = max(t_star_star, t - argmin)
# Update the variables for cost and program
F[t] = min_args[argmin]
cover_by[t] = t - argmin
# Construct the optimal solution. Order d[j:t+1] at time j to cover
# demands in period from j to t (inclusive)
t = T - 1
solution = [0] * T
while True:
j = cover_by[t] # Get order time in period from j to t
solution[j] = sum(d[j : t + 1])
t = j - 1 # Set t (period end) to (period start) minus one
if j == 0:
break # Break out if this period started at 0
return {"cost": F[len(demands) - 1], "solution": solution}
# =============================================================================
# TEST CASES
# =============================================================================
def test_against_example_in_paper():
"""Test against the problem in the Wagner-Whitin paper."""
demands = [69, 29, 36, 61, 61, 26, 34, 67, 45, 67, 79, 56]
order_costs = [85, 102, 102, 101, 98, 114, 105, 86, 119, 110, 98, 114]
holding_costs = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
# Costs and programs copied by hand from the original paper
costs = [85, 114, 186, 277, 348, 400, 469, 555, 600, 710, 789, 864]
programs = [
[69],
[69 + 29, 0],
[69 + 29 + 36, 0, 0],
[69 + 29, 0, 36 + 61, 0],
[69 + 29 + 36, 0, 0, 61 + 61, 0],
[69 + 29 + 36, 0, 0, 61 + 61 + 26, 0, 0],
[69 + 29, 0, 36 + 61, 0, 61 + 26 + 34, 0, 0],
[69 + 29, 0, 36 + 61, 0, 61 + 26 + 34, 0, 0, 67],
[69 + 29, 0, 36 + 61, 0, 61 + 26 + 34, 0, 0, 67 + 45, 0],
[69 + 29, 0, 36 + 61, 0, 61 + 26 + 34, 0, 0, 67 + 45, 0, 67],
[69 + 29, 0, 36 + 61, 0, 61 + 26 + 34, 0, 0, 67 + 45, 0, 67 + 79, 0],
[69 + 29, 0, 36 + 61, 0, 61 + 26 + 34, 0, 0, 67 + 45, 0, 67, 79 + 56, 0],
]
for t in range(1, len(demands)):
result = wagner_whitin(demands[:t], order_costs[:t], holding_costs[:t])
assert costs[t - 1] == result["cost"]
assert programs[t - 1] == result["solution"]
def test_against_handcrafted_example1():
demands = [10, 5, 10, 20]
order_costs = [3, 1, 5, 5]
holding_costs = [1, 1, 1, 1]
result = wagner_whitin(demands, order_costs, holding_costs)
assert result["cost"] == 14
assert result["solution"] == [10, 5, 10, 20]
def test_against_handcrafted_example2():
demands = [0, 10]
order_costs = [10, 100]
holding_costs = [1, 0]
result = wagner_whitin(demands, order_costs, holding_costs)
assert result["cost"] == 20
assert result["solution"] == [10, 0]
def test_against_handcrafted_example3():
demands = [0, 0, 7]
order_costs = [60, 90, 90]
holding_costs = [9, 8, 5]
result = wagner_whitin(demands, order_costs, holding_costs)
assert result["cost"] == 90
assert result["solution"] == [0, 0, 7]
def test_against_handcrafted_example4():
demands = [0, 5, 0, 10, 10]
order_costs = [0, 10, 15, 5, 20]
holding_costs = [1, 1, 3, 1, 2]
result = wagner_whitin(demands, order_costs, holding_costs)
assert result["cost"] == 20
assert result["solution"] == [5, 0, 0, 20, 0]
# =============================================================================
# UTILITY FUNCTIONS (might be nice to have)
# =============================================================================
def _validate_inputs(demands: list, order_costs: list, holding_costs: list):
assert isinstance(demands, list)
assert isinstance(order_costs, list)
assert isinstance(holding_costs, list)
assert len(demands) == len(order_costs)
assert len(demands) - 1 <= len(holding_costs) <= len(demands)
assert demands[-1] > 0, "Last period must have positive demand"
assert all(d >= 0 for d in demands), "All demands must be non-negative"
assert all(o >= 0 for o in order_costs), "All order costs must be non-negative"
def evaluate(demands, order_costs, holding_costs, solution):
assert sum(demands) == sum(solution)
assert len(demands) == len(solution) == len(order_costs) == len(holding_costs)
cost = 0
in_stock = 0
for t in range(len(demands)):
# Update stock and the cost of this order
in_stock += solution[t] - demands[t]
cost += order_costs[t] if solution[t] > 0 else 0
# Sell products, pay the price of bringing stock to next period
cost += in_stock * holding_costs[t]
assert in_stock >= 0
return cost
def solve_naively(demands, order_costs, holding_costs):
"""Compute the best solution in O(n * n^2) time."""
best_solution = None
min_cost = float("inf")
T = len(demands)
for order_at in itertools.product([0, 1], repeat=T): # Count in binary
# Construct a solution
solution, accumulated_demand = T * [0], 0
for t in reversed(range(T)):
accumulated_demand += demands[t]
if order_at[t] > 0:
solution[t] = accumulated_demand
accumulated_demand = 0
# If demands at not met, the solutions is not feasible
if not sum(demands) == sum(solution):
continue
# Update best found solution
cost = evaluate(demands, order_costs, holding_costs, solution)
if cost < min_cost:
min_cost = cost
best_solution = solution
return {"cost": min_cost, "solution": best_solution}
# =============================================================================
# RUN TEST CASES (this algo has also been tested against naive implementations)
# =============================================================================
if __name__ == "__main__":
test_against_example_in_paper()
test_against_handcrafted_example1()
test_against_handcrafted_example2()
test_against_handcrafted_example3()
test_against_handcrafted_example4()
# Generate random instances and test naive computation
import random
N = 10 ** 4
for test in range(N):
# Generate a random problem
length = random.randint(1, 8)
demands = [random.randint(0, 5) for _ in range(length)]
order_costs = [random.randint(0, 9) for _ in range(length)]
holding_costs = [random.randint(0, 5) for _ in range(length)]
demands[-1] = random.randint(1, 5)
# Get results from both implementations
results_naive = solve_naively(demands, order_costs, holding_costs)
results = wagner_whitin(demands, order_costs, holding_costs)
# The solution need not be unique, test that both have min cost
assert results["cost"] == results_naive["cost"]
cost_wagner = evaluate(demands, order_costs, holding_costs, results["solution"])
cost_naive = evaluate(demands, order_costs, holding_costs, results_naive["solution"])
assert cost_wagner == cost_naive
print(f"Success: tested 'wagner_whitin' against naive algorithm on {N} inputs.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment