Created
December 21, 2023 04:59
-
-
Save 270ajay/92805c30c1b0f0d9c795e5aadb514b83 to your computer and use it in GitHub Desktop.
Knapsack solver library from ortools in python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Knapsack library by ortools | |
# Translated from c++ code: | |
# https://github.com/google/or-tools/blob/stable/ortools/algorithms/knapsack_solver.h | |
# https://github.com/google/or-tools/blob/stable/ortools/algorithms/knapsack_solver.cc | |
import bisect | |
import heapq | |
import math | |
from ortools.linear_solver import pywraplp | |
""" | |
* This library solves knapsack problems. | |
* | |
* Problems the library solves include: | |
* - 0-1 knapsack problems, | |
* - Multi-dimensional knapsack problems, | |
* | |
* Given n items, each with a profit and a weight, given a knapsack of | |
* capacity c, the goal is to find a subset of items which fits inside c | |
* and maximizes the total profit. | |
* The knapsack problem can easily be extended from 1 to d dimensions. | |
* As an example, this can be useful to constrain the maximum number of | |
* items inside the knapsack. | |
* Without loss of generality, profits and weights are assumed to be positive. | |
* | |
* From a mathematical point of view, the multi-dimensional knapsack problem | |
* can be modeled by d linear constraints: | |
* | |
* ForEach(j:1..d)(Sum(i:1..n)(weight_ij * item_i) <= c_j | |
* where item_i is a 0-1 integer variable. | |
* | |
* Then the goal is to maximize: | |
* | |
* Sum(i:1..n)(profit_i * item_i). | |
* | |
* There are several ways to solve knapsack problems. One of the most | |
* efficient is based on dynamic programming (mainly when weights, profits | |
* and dimensions are small, and the algorithm runs in pseudo polynomial time). | |
* Unfortunately, when adding conflict constraints the problem becomes strongly | |
* NP-hard, i.e. there is no pseudo-polynomial algorithm to solve it. | |
* That's the reason why the most of the following code is based on branch and | |
* bound search. | |
* | |
* For instance to solve a 2-dimensional knapsack problem with 9 items, | |
* one just has to feed a profit vector with the 9 profits, a vector of 2 | |
* vectors for weights, and a vector of capacities. | |
* E.g.: | |
\b Python: | |
\code{.py} | |
profits = [ 1, 2, 3, 4, 5, 6, 7, 8, 9 ] | |
weights = [ [ 1, 2, 3, 4, 5, 6, 7, 8, 9 ], | |
[ 1, 1, 1, 1, 1, 1, 1, 1, 1 ] | |
] | |
capacities = [ 34, 4 ] | |
solver = pywrapknapsack_solver.KnapsackSolver( | |
pywrapknapsack_solver.KnapsackSolver | |
.KNAPSACK_MULTIDIMENSION_BRANCH_AND_BOUND_SOLVER, | |
'Multi-dimensional solver') | |
solver.Init(profits, weights, capacities) | |
profit = solver.Solve() | |
\endcode | |
\b C++: | |
\code{.cpp} | |
const std::vector<int64_t> profits = { 1, 2, 3, 4, 5, 6, 7, 8, 9 }; | |
const std::vector<std::vector<int64_t>> weights = | |
{ { 1, 2, 3, 4, 5, 6, 7, 8, 9 }, | |
{ 1, 1, 1, 1, 1, 1, 1, 1, 1 } }; | |
const std::vector<int64_t> capacities = { 34, 4 }; | |
KnapsackSolver solver( | |
KnapsackSolver::KNAPSACK_MULTIDIMENSION_BRANCH_AND_BOUND_SOLVER, | |
"Multi-dimensional solver"); | |
solver.Init(profits, weights, capacities); | |
const int64_t profit = solver.Solve(); | |
\endcode | |
\b Java: | |
\code{.java} | |
final long[] profits = { 1, 2, 3, 4, 5, 6, 7, 8, 9 }; | |
final long[][] weights = { { 1, 2, 3, 4, 5, 6, 7, 8, 9 }, | |
{ 1, 1, 1, 1, 1, 1, 1, 1, 1 } }; | |
final long[] capacities = { 34, 4 }; | |
KnapsackSolver solver = new KnapsackSolver( | |
KnapsackSolver.SolverType.KNAPSACK_MULTIDIMENSION_BRANCH_AND_BOUND_SOLVER, | |
"Multi-dimensional solver"); | |
solver.init(profits, weights, capacities); | |
final long profit = solver.solve(); | |
\endcode | |
*/ | |
""" | |
# Constants | |
_NO_SELECTION = -1 | |
_PRIMARY_PROPAGATOR_ID = 0 | |
_MAX_NUMBER_OF_BRUTE_FORCE_ITEMS = 30 | |
_MAX_NUMBER_OF_64_ITEMS = 64 | |
def _upper_bound_of_ratio(numerator_1: int, numerator_2: int, denominator: int) -> int: | |
"""Returns an upper bound of (numerator_1 * numerator_2)/denominator.""" | |
return math.ceil((numerator_1 * numerator_2) / denominator) | |
def _one_bit(pos: int) -> int: | |
"""Returns a word with only bit pos set.""" | |
return 1 << pos | |
class KnapsackSolver: | |
KNAPSACK_BRUTE_FORCE_SOLVER: int = 0 | |
KNAPSACK_64ITEMS_SOLVER: int = 1 | |
KNAPSACK_DYNAMIC_PROGRAMMING_SOLVER: int = 2 | |
KNAPSACK_MULTIDIMENSION_CBC_MIP_SOLVER: int = 3 | |
KNAPSACK_MULTIDIMENSION_BRANCH_AND_BOUND_SOLVER: int = 4 | |
KNAPSACK_DIVIDE_AND_CONQUER_SOLVER: int = 5 | |
def __init__(self, solver_type: int, solver_name: str): | |
self.solver_name: str = solver_name | |
self._is_solution_optimal: bool = False | |
self._use_reduction: bool = True | |
self._solver: BaseKnapsackSolver = None | |
self._known_value: list[bool] = [] | |
self._best_solution: list[bool] = [] | |
self._is_problem_solved: bool = False | |
self._additional_profit: int = 0 | |
self._mapping_reduced_item_id: list[int] = [] | |
if solver_type == KnapsackSolver.KNAPSACK_BRUTE_FORCE_SOLVER: | |
# Brute force method. | |
# | |
# Limited to 30 items and one dimension, this | |
# solver uses a brute force algorithm, i.e. explores all possible | |
# states. Experiments show competitive performance for instances | |
# with less than 15 items. | |
self._solver = KnapsackBruteForceSolver(solver_name) | |
elif solver_type == KnapsackSolver.KNAPSACK_64ITEMS_SOLVER: | |
# Optimized method for single dimension small problems | |
# | |
# Limited to 64 items and one dimension, this | |
# solver uses a branch & bound algorithm. This solver is about 4 | |
# times faster than KNAPSACK_MULTIDIMENSION_BRANCH_AND_BOUND_SOLVER. | |
self._solver = Knapsack64ItemsSolver(solver_name) | |
elif solver_type == KnapsackSolver.KNAPSACK_DYNAMIC_PROGRAMMING_SOLVER: | |
# Dynamic Programming approach for single dimension problems | |
# | |
# Limited to one dimension, this solver is based on a dynamic | |
# programming algorithm. The time and space complexity is | |
# O(capacity * number_of_items). | |
self._solver = KnapsackDynamicProgrammingSolver(solver_name) | |
elif ( | |
solver_type | |
== KnapsackSolver.KNAPSACK_MULTIDIMENSION_BRANCH_AND_BOUND_SOLVER | |
): | |
# Generic Solver. | |
# | |
# This solver can deal with both large number of items and several | |
# dimensions. This solver is based on branch and bound. | |
self._solver = KnapsackGenericSolver(solver_name) | |
elif solver_type == KnapsackSolver.KNAPSACK_DIVIDE_AND_CONQUER_SOLVER: | |
# Divide and Conquer approach for single dimension problems | |
# | |
# Limited to one dimension, this solver is based on a divide and | |
# conquer technique and is suitable for larger problems than | |
# Dynamic Programming Solver. The time complexity is O(capacity * | |
# number_of_items) and the space complexity is O(capacity + | |
# number_of_items). | |
self._solver = KnapsackDivideAndConquerSolver(solver_name) | |
elif solver_type == KnapsackSolver.KNAPSACK_MULTIDIMENSION_CBC_MIP_SOLVER: | |
# CBC Based Solver | |
# | |
# This solver can deal with both large number of items and several | |
# dimensions. This solver is based on Integer Programming solver | |
# CBC. | |
self._solver = KnapsackMIPSolver("CBC_MIP", solver_name) | |
else: | |
raise ValueError("Unknown knapsack solver type.") | |
def init( | |
self, profits: list[int], weights: list[list[int]], capacities: list[int] | |
) -> None: | |
"""Initializes the solver and enters the problem to be solved.""" | |
for w in weights: | |
if len(profits) != len(w): | |
raise ValueError("Profits and inner weights must have the same size") | |
if len(capacities) != len(weights): | |
raise ValueError("Capacities and weights must have the same size") | |
self._is_solution_optimal = False | |
self._additional_profit = 0 | |
self._is_problem_solved = False | |
num_items = len(profits) | |
reduced_weights = [] | |
reduced_capacities = [] | |
if self._use_reduction: | |
num_reduced_items = self._reduce_capacities( | |
num_items, weights, capacities, reduced_weights, reduced_capacities | |
) | |
if num_reduced_items > 0: | |
self._compute_additional_profit(profits) | |
else: | |
reduced_weights = weights | |
reduced_capacities = capacities | |
if not self._is_problem_solved: | |
self._solver.init(profits, reduced_weights, reduced_capacities) | |
if self._use_reduction: | |
num_reduced_items = self._reduce_problem(num_items) | |
if num_reduced_items > 0: | |
self._compute_additional_profit(profits) | |
if num_reduced_items > 0 and num_reduced_items < num_items: | |
self._init_reduced_problem( | |
profits, reduced_weights, reduced_capacities | |
) | |
if self._is_problem_solved: | |
self._is_solution_optimal = True | |
def solve(self) -> int: | |
"""Solves the problem and returns the profit of the optimal solution.""" | |
if self._is_problem_solved: | |
profit = 0 | |
else: | |
profit, self._is_solution_optimal = self._solver.solve() | |
return self._additional_profit + profit | |
def best_solution_contains(self, item_id: int) -> bool: | |
"""Returns true if the item 'item_id' is packed in the optimal knapsack.""" | |
mapped_item_id = ( | |
self._mapping_reduced_item_id[item_id] if self._use_reduction else item_id | |
) | |
if self._use_reduction and self._known_value[item_id]: | |
return self._best_solution[item_id] | |
else: | |
return self._solver.best_solution(mapped_item_id) | |
def is_solution_optimal(self) -> bool: | |
"""Returns true if the solution was proven optimal.""" | |
return self._is_solution_optimal | |
def get_name(self) -> str: | |
return self._solver.get_name() | |
def use_reduction(self) -> bool: | |
return self._use_reduction | |
def set_use_reduction(self, use_reduction: bool) -> None: | |
self._use_reduction = use_reduction | |
def _reduce_capacities( | |
self, | |
num_items: int, | |
weights: list[list[int]], | |
capacities: list[int], | |
reduced_weights: list[list[int]], | |
reduced_capacities: list[int], | |
) -> int: | |
"""Trivial reduction of capacity constraints when the capacity is | |
higher than the sum of the weights of the items. Returns the number | |
of reduced items. | |
""" | |
self._known_value = [False] * num_items | |
self._best_solution = [False] * num_items | |
self._mapping_reduced_item_id = [0] * num_items | |
active_capacities = [True] * len(weights) | |
num_of_active_capacities = 0 | |
for i in range(len(weights)): | |
max_weight = 0 | |
for weight in weights[i]: | |
max_weight += weight | |
if max_weight <= capacities[i]: | |
active_capacities[i] = False | |
else: | |
num_of_active_capacities += 1 | |
for i in range(len(weights)): | |
if active_capacities[i]: | |
reduced_weights.append(weights[i]) | |
reduced_capacities.append(capacities[i]) | |
if len(reduced_capacities) == 0: | |
# There are no capacity constraints in the problem so we can reduce all | |
# items and just add them to the best solution. | |
for item_id in range(num_items): | |
self._known_value[item_id] = True | |
self._best_solution[item_id] = True | |
self._is_problem_solved = True | |
# All items are reduced | |
return num_items | |
# There are still capacity constraints so no item reduction is done. | |
return 0 | |
def _reduce_problem(self, num_items: int) -> int: | |
self._known_value = [False] * num_items | |
self._best_solution = [False] * num_items | |
self._mapping_reduced_item_id = [0] * num_items | |
self._additional_profit = 0 | |
for item_id in range(num_items): | |
self._mapping_reduced_item_id[item_id] = item_id | |
best_lower_bound = 0 | |
j0_upper_bounds = [9223372036854775807] * num_items | |
j1_upper_bounds = [9223372036854775807] * num_items | |
for item_id in range(num_items): | |
lower_bound = 0 | |
upper_bound = 9223372036854775807 | |
( | |
lower_bound, | |
upper_bound, | |
) = self._solver.get_lower_and_upper_bound_when_item( | |
item_id, False, lower_bound, upper_bound | |
) | |
j1_upper_bounds[item_id] = upper_bound | |
best_lower_bound = max(best_lower_bound, lower_bound) | |
( | |
lower_bound, | |
upper_bound, | |
) = self._solver.get_lower_and_upper_bound_when_item( | |
item_id, True, lower_bound, upper_bound | |
) | |
j0_upper_bounds[item_id] = upper_bound | |
best_lower_bound = max(best_lower_bound, lower_bound) | |
num_reduced_items = 0 | |
for item_id in range(num_items): | |
if best_lower_bound > j0_upper_bounds[item_id]: | |
self._known_value[item_id] = True | |
self._best_solution[item_id] = False | |
num_reduced_items += 1 | |
elif best_lower_bound > j1_upper_bounds[item_id]: | |
self._known_value[item_id] = True | |
self._best_solution[item_id] = True | |
num_reduced_items += 1 | |
self._is_problem_solved = num_reduced_items == num_items | |
return num_reduced_items | |
def _compute_additional_profit(self, profits: list[int]) -> None: | |
num_items = len(profits) | |
self._additional_profit = 0 | |
for item_id in range(num_items): | |
if self._known_value[item_id] and self._best_solution[item_id]: | |
self._additional_profit += profits[item_id] | |
def _init_reduced_problem( | |
self, profits: list[int], weights: list[list[int]], capacities: list[int] | |
) -> None: | |
num_items = len(profits) | |
num_dimensions = len(capacities) | |
reduced_profits = [] | |
for item_id in range(num_items): | |
if not self._known_value[item_id]: | |
self._mapping_reduced_item_id[item_id] = len(reduced_profits) | |
reduced_profits.append(profits[item_id]) | |
reduced_weights = [] | |
reduced_capacities = capacities | |
for dim in range(num_dimensions): | |
one_dimension_weights = weights[dim] | |
one_dimension_reduced_weights = [] | |
for item_id in range(num_items): | |
if self._known_value[item_id]: | |
if self._best_solution[item_id]: | |
reduced_capacities[dim] -= one_dimension_weights[item_id] | |
else: | |
one_dimension_reduced_weights.append(one_dimension_weights[item_id]) | |
reduced_weights.append(one_dimension_reduced_weights) | |
self._solver.init(reduced_profits, reduced_weights, reduced_capacities) | |
# The following code defines needed classes for the KnapsackGenericSolver | |
# class which is the entry point to extend knapsack with new constraints such | |
# as conflicts between items. | |
# | |
# Constraints are enforced using KnapsackPropagator objects, in the current | |
# code there is one propagator per dimension (KnapsackCapacityPropagator). | |
# One of those propagators, named master propagator, is used to guide the | |
# search, i.e. decides which item should be assigned next. | |
# Roughly speaking the search algorithm is: | |
# - While not optimal | |
# - Select next search node to expand | |
# - Select next item_i to assign (using master propagator) | |
# - Generate a new search node where item_i is in the knapsack | |
# - Check validity of this new partial solution (using propagators) | |
# - If valid, add this new search node to the search | |
# - Generate a new search node where item_i is not in the knapsack | |
# - Check validity of this new partial solution (using propagators) | |
# - If valid, add this new search node to the search | |
# | |
# TODO(user): Add a new propagator class for conflict constraint. | |
# TODO(user): Add a new propagator class used as a guide when the problem has | |
# several dimensions. | |
class KnapsackAssignment: | |
"""KnapsackAssignment is a small struct used to pair an item with its | |
assignment. It is mainly used for search nodes and updates. | |
""" | |
def __init__(self, item_id: int, is_in: bool): | |
self.item_id: int = item_id | |
self.is_in: bool = is_in | |
def __repr__(self) -> str: | |
return f"item_id:{self.item_id},is_in:{self.is_in}" | |
class KnapsackItem: | |
"""KnapsackItem is a small struct to pair an item with its corresponding | |
profit. | |
The aim of the knapsack problem is to pack as many valuable items as | |
possible. A straight forward heuristic is to take those with the greatest | |
profit-per-unit-weight. This ratio is called efficiency in this | |
implementation. So items will be grouped in vectors, and sorted by | |
decreasing efficiency. | |
Note that profits are duplicated for each dimension. This is done to | |
simplify the code, especially the get_efficiency method and vector sorting. | |
As there usually are only few dimension, the overheard should not be an | |
issue. | |
""" | |
def __init__(self, index: int, weight: int, profit: int): | |
"""The index field is used to retrieve the initial item in order to | |
communicate with other propagators and state. | |
""" | |
self.index: int = index | |
self.weight: int = weight | |
self.profit: int = profit | |
def get_efficiency(self, profit_max: int) -> float: | |
return (self.profit / self.weight) if self.weight > 0 else profit_max | |
def __repr__(self) -> str: | |
return f"id:{self.index},weight:{self.weight},profit:{self.profit}" | |
class KnapsackSearchNode: | |
"""KnapsackSearchNode is a class used to describe a decision in the | |
decision search tree. | |
The node is defined by a pointer to the parent search node and an | |
assignment (see KnapsackAssignment). | |
As the current state is not explicitly stored in a search node, one should | |
go through the search tree to incrementally build a partial solution from | |
a previous search node. | |
""" | |
def __init__(self, parent: "KnapsackSearchNode", assignment: KnapsackAssignment): | |
# 'depth' field is used to navigate efficiently through the search tree | |
# (see KnapsackSearchPath) | |
self._depth: int = 0 if parent is None else parent.depth() + 1 | |
self._parent: "KnapsackSearchNode" = parent | |
self._assignment: KnapsackAssignment = assignment | |
# 'current_profit' and 'profit_upper_bound' fields are used to sort search | |
# nodes using a priority queue. That allows to pop the node with the best | |
# upper bound, and more importantly to stop the search when optimality is | |
# proved. | |
self._current_profit: int = 0 | |
self._profit_upper_bound: int = 9223372036854775807 | |
# 'next_item_id' field allows to avoid an O(number_of_items) scan to find | |
# next item to select. This is done for free by the upper bound computation. | |
self._next_item_id: int = _NO_SELECTION | |
def depth(self) -> int: | |
return self._depth | |
def parent(self) -> "KnapsackSearchNode": | |
return self._parent | |
def assignment(self) -> KnapsackAssignment: | |
return self._assignment | |
def current_profit(self) -> int: | |
return self._current_profit | |
def set_current_profit(self, profit: int) -> None: | |
self._current_profit = profit | |
def profit_upper_bound(self) -> int: | |
return self._profit_upper_bound | |
def set_profit_upper_bound(self, profit: int) -> None: | |
self._profit_upper_bound = profit | |
def next_item_id(self) -> int: | |
return self._next_item_id | |
def set_next_item_id(self, index: int) -> None: | |
self._next_item_id = index | |
def __lt__(self, other) -> bool: | |
"""Comparator used to sort search nodes in the priority queue in order | |
to pop first the node with the highest profit upper bound | |
(see KnapsackSearchNode). When two nodes have the same upper bound, we | |
prefer the one with the highest current profit, ie. usually the one | |
closer to a leaf. In practice, the main advantage is to have smaller | |
path. | |
""" | |
if self.profit_upper_bound() == other.profit_upper_bound(): | |
return self.current_profit() > other.current_profit() | |
return self.profit_upper_bound() > other.profit_upper_bound() | |
def __repr__(self) -> str: | |
return ( | |
f"depth:{self._depth},parent:{self._parent},assignment:" | |
f"{self._assignment},current_profit:{self._current_profit}," | |
f"profit_upper_bound:{self._profit_upper_bound},next_item_id:" | |
f"{self._next_item_id}" | |
) | |
class KnapsackSearchPath: | |
"""KnapsackSearchPath is a small class used to represent the path between a | |
node to another node in the search tree. | |
As the solution state is not stored for each search node, the state should | |
be rebuilt at each node. One simple solution is to apply all decisions | |
between the node 'to' and the root. This can be computed in | |
O(number_of_items). | |
However, it is possible to achieve better average complexity. Two | |
consecutively explored nodes are usually close enough (i.e., much less than | |
number_of_items) to benefit from an incremental update from the node | |
'from' to the node 'to'. | |
The 'via' field is the common parent of 'from' field and 'to' field. | |
So the state can be built by reverting all decisions from 'from' to 'via' | |
and then applying all decisions from 'via' to 'to'. | |
""" | |
def __init__(self, from_node: "KnapsackSearchNode", to_node: "KnapsackSearchNode"): | |
self._from: KnapsackSearchNode = from_node | |
self._to: KnapsackSearchNode = to_node | |
self._via: KnapsackSearchNode = None | |
def init(self) -> None: | |
node_from = self.move_up_to_depth(self._from, self._to.depth()) | |
node_to = self.move_up_to_depth(self._to, self._from.depth()) | |
# Find common parent. | |
while node_from != node_to: | |
node_from = node_from.parent() | |
node_to = node_to.parent() | |
self._via = node_from | |
def from_node(self) -> KnapsackSearchNode: | |
return self._from | |
def via(self) -> KnapsackSearchNode: | |
return self._via | |
def to_node(self) -> KnapsackSearchNode: | |
return self._to | |
def move_up_to_depth( | |
self, node: KnapsackSearchNode, depth: int | |
) -> KnapsackSearchNode: | |
current_node = node | |
while current_node.depth() > depth: | |
current_node = current_node.parent() | |
return current_node | |
def __repr__(self) -> str: | |
return f"from:{self._from},to:{self._to},via:{self._via}" | |
class KnapsackState: | |
"""KnapsackState represents a partial solution to the knapsack problem.""" | |
def __init__(self): | |
"""Vectors 'is_bound_' and 'is_in_' contain a boolean value for each | |
item. 'is_bound_(item_i)' is false when there is no decision for | |
item_i yet. When item_i is bound, 'is_in_(item_i)' represents the | |
presence (true) or the absence (false) of item_i in the current | |
solution. | |
""" | |
self._is_bound: list[bool] = [] | |
self._is_in: list[bool] = [] | |
def init(self, number_of_items: int) -> None: | |
"""Initializes vectors with number_of_items set to false | |
(i.e. not bound yet). | |
""" | |
self._is_bound = [False] * number_of_items | |
self._is_in = [False] * number_of_items | |
def update_state(self, revert: bool, assignment: KnapsackAssignment) -> bool: | |
"""Updates the state by applying or reverting a decision. | |
Returns false if fails, i.e., trying to apply an inconsistent decision | |
to an already assigned item. | |
""" | |
if revert: | |
self._is_bound[assignment.item_id] = False | |
else: | |
if (self._is_bound[assignment.item_id]) and ( | |
self._is_in[assignment.item_id] != assignment.is_in | |
): | |
return False | |
self._is_bound[assignment.item_id] = True | |
self._is_in[assignment.item_id] = assignment.is_in | |
return True | |
def get_number_of_items(self) -> int: | |
return len(self._is_bound) | |
def is_bound(self, index) -> bool: | |
return self._is_bound[index] | |
def is_in(self, index) -> bool: | |
return self._is_in[index] | |
def __repr__(self) -> str: | |
return f"is_bound:{self._is_bound},is_in:{self._is_in}" | |
class KnapsackPropagator: | |
"""KnapsackPropagator is the base class for modeling and propagating a | |
constraint given an assignment. | |
When some work has to be done both by the base and the derived class, a | |
protected pure virtual method ending by 'propagator' is defined. | |
For instance, 'init' creates a vector of items, and then calls | |
'init_propagator' to let the derived class perform its own initialization. | |
""" | |
def __init__(self, state: KnapsackState): | |
self._items: list[KnapsackItem] = [] | |
self._current_profit: int = 0 | |
self._profit_lower_bound: int = 0 | |
self._profit_upper_bound: int = 9223372036854775807 | |
self._state: KnapsackState = state | |
def init(self, profits: list[int], weights: list[int]) -> None: | |
"""Initializes data structure and then calls init_propagator.""" | |
number_of_items = len(profits) | |
self._items = [ | |
KnapsackItem(i, weights[i], profits[i]) for i in range(number_of_items) | |
] | |
self._current_profit = 0 | |
self._profit_lower_bound = -9223372036854775807 | |
self._profit_upper_bound = 9223372036854775807 | |
self._init_propagator() | |
def update(self, revert: bool, assignment: KnapsackAssignment) -> bool: | |
"""Updates data structure and then calls update_propagator. | |
Returns false when failure. | |
""" | |
if assignment.is_in: | |
if revert: | |
self._current_profit -= self._items[assignment.item_id].profit | |
else: | |
self._current_profit += self._items[assignment.item_id].profit | |
return self._update_propagator(revert, assignment) | |
def compute_profit_bounds(self) -> None: | |
"""compute_profit_bounds() should set '_profit_lower_bound' and | |
'_profit_upper_bound' which are constraint specific.""" | |
def get_next_item_id(self) -> int: | |
"""Returns the index of next item to assign. | |
Returns kNoSelection when all items are bound.""" | |
def current_profit(self) -> int: | |
return self._current_profit | |
def profit_lower_bound(self) -> int: | |
return self._profit_lower_bound | |
def profit_upper_bound(self) -> int: | |
return self._profit_upper_bound | |
def copy_current_state_to_solution( | |
self, has_one_propagator: bool, solution: list[bool] | |
) -> None: | |
"""Copies the current state into 'solution'. | |
All unbound items are set to false (i.e. not in the knapsack). | |
When 'has_one_propagator' is true, copy_current_solution_propagator() is | |
called to have a better solution. When there is only one propagator | |
there is no need to check the solution with other propagators, so the | |
partial solution can be smartly completed. | |
""" | |
for item in self._items: | |
item_id = item.index | |
solution[item_id] = self._state.is_bound(item_id) and self._state.is_in( | |
item_id | |
) | |
if has_one_propagator: | |
self._copy_current_state_to_solution_propagator(solution) | |
def state(self) -> KnapsackState: | |
return self._state | |
def items(self) -> list[KnapsackItem]: | |
return self._items | |
def set_profit_lower_bound(self, profit: int) -> None: | |
self._profit_lower_bound = profit | |
def set_profit_upper_bound(self, profit: int) -> None: | |
self._profit_upper_bound = profit | |
# -------- Protected methods below -------------- | |
def _init_propagator(self) -> None: | |
"""Initializes data structure. This method is called after | |
initialization of KnapsackPropagator data structure. | |
""" | |
def _update_propagator(self, revert: bool, assignment: KnapsackAssignment) -> bool: | |
"""Updates internal data structure incrementally. This method is | |
called after update of KnapsackPropagator data structure. | |
""" | |
def _copy_current_state_to_solution_propagator(self, solution: list[bool]) -> None: | |
"""Copies the current state into 'solution' | |
Only unbound items have to copied as copy_current_solution was already | |
called with current state. | |
This method is useful when a propagator is able to find a better | |
solution than the blind instantiation to false of unbound items. | |
""" | |
def __repr__(self) -> str: | |
return ( | |
f"items:{self._items},current_profit:{self._current_profit}," | |
f"profit_lower_bound:{self._profit_lower_bound}," | |
f"profit_upper_bound:{self._profit_upper_bound}," | |
f"state:{self._state}" | |
) | |
class KnapsackCapacityPropagator(KnapsackPropagator): | |
"""KnapsackCapacityPropagator is a KnapsackPropagator used to enforce | |
a capacity constraint. | |
As a KnapsackPropagator is supposed to compute profit lower and upper | |
bounds, and get the next item to select, it can be seen as a 0-1 Knapsack | |
solver. The most efficient way to compute the upper bound is to iterate on | |
items in profit-per-unit-weight decreasing order. The break item is | |
commonly defined as the first item for which there is not enough remaining | |
capacity. Selecting this break item as the next-item-to-assign usually | |
gives the best results (see Greenberg & Hegerich). | |
This is exactly what is implemented in this class. | |
When there is only one propagator, it is possible to compute a better | |
profit lower bound almost for free. During the scan to find the | |
break element all unbound items are added just as if they were part of | |
the current solution. This is used in both compute_profit_bounds and | |
copy_current_solution_propagator. | |
For incrementality reasons, the ith item should be accessible in O(1). | |
That's the reason why the item vector has to be duplicated 'sorted_items_'. | |
""" | |
def __init__(self, state: KnapsackState, capacity: int): | |
super().__init__(state) | |
self._capacity: int = capacity | |
self._consumed_capacity: int = 0 | |
self._break_item_id: int = _NO_SELECTION | |
self._sorted_items: list[KnapsackItem] = [] | |
self._profit_max: int = 0 | |
def compute_profit_bounds(self) -> None: | |
self.set_profit_lower_bound(self.current_profit()) | |
self._break_item_id = _NO_SELECTION | |
remaining_capacity = self._capacity - self._consumed_capacity | |
break_sorted_item_id = _NO_SELECTION | |
number_of_sorted_items = len(self._sorted_items) | |
for sorted_id in range(number_of_sorted_items): | |
item = self._sorted_items[sorted_id] | |
if not self.state().is_bound(item.index): | |
self._break_item_id = item.index | |
if remaining_capacity >= item.weight: | |
remaining_capacity -= item.weight | |
self.set_profit_lower_bound(self.profit_lower_bound() + item.profit) | |
else: | |
break_sorted_item_id = sorted_id | |
break | |
self.set_profit_upper_bound(self.profit_lower_bound()) | |
if break_sorted_item_id != _NO_SELECTION: | |
additional_profit = self._get_additional_profit( | |
remaining_capacity, break_sorted_item_id | |
) | |
self.set_profit_upper_bound(self.profit_upper_bound() + additional_profit) | |
def get_next_item_id(self) -> int: | |
return self._break_item_id | |
def _init_propagator(self) -> None: | |
"""Initializes KnapsackCapacityPropagator (e.g., sort items in | |
decreasing order). | |
""" | |
self._consumed_capacity = 0 | |
self._break_item_id = _NO_SELECTION | |
self._sorted_items = [item for item in self.items()] | |
self._profit_max = 0 | |
for item in self._sorted_items: | |
self._profit_max = max(self._profit_max, item.profit) | |
self._profit_max += 1 | |
self._sorted_items.sort( | |
key=lambda item: item.get_efficiency(self._profit_max), reverse=True | |
) | |
def _update_propagator(self, revert: bool, assignment: KnapsackAssignment) -> bool: | |
"""Updates internal data structure incrementally | |
(i.e., 'consumed_capacity_') to avoid a O(number_of_items) scan. | |
""" | |
if assignment.is_in: | |
if revert: | |
self._consumed_capacity -= self.items()[assignment.item_id].weight | |
else: | |
self._consumed_capacity += self.items()[assignment.item_id].weight | |
if self._consumed_capacity > self._capacity: | |
return False | |
return True | |
def _copy_current_state_to_solution_propagator(self, solution: list[bool]) -> None: | |
"""Copies the current state into 'solution'. | |
Only unbound items have to be copied as CopyCurrentSolution was already | |
called with current state. | |
This method is useful when a propagator is able to find a better | |
solution than the blind instantiation to false of unbound items. | |
""" | |
remaining_capacity = self._capacity - self._consumed_capacity | |
for item in self._sorted_items: | |
if not self.state().is_bound(item.index): | |
if remaining_capacity >= item.weight: | |
remaining_capacity -= item.weight | |
solution[item.index] = True | |
else: | |
return | |
def _get_additional_profit( | |
self, remaining_capacity: int, break_item_id: int | |
) -> int: | |
"""An obvious additional profit upper bound corresponds to the linear | |
relaxation: remaining_capacity * efficiency of the break item. | |
It is possible to do better in O(1), using Martello-Toth bound U2. | |
The main idea is to enforce integrality constraint on the break item, | |
ie. either the break item is part of the solution, either it is not. | |
So basically the linear relaxation is done on the item before the break | |
item, or the one after the break item. | |
This is what this method implements. | |
""" | |
after_break_item_id = break_item_id + 1 | |
additional_profit_when_no_break_item = 0 | |
if after_break_item_id < len(self._sorted_items): | |
# As items are sorted by decreasing profit / weight ratio, and the | |
# current weight is non-zero, the next_weight is non-zero too. | |
next_weight = self._sorted_items[after_break_item_id].weight | |
next_profit = self._sorted_items[after_break_item_id].profit | |
additional_profit_when_no_break_item = _upper_bound_of_ratio( | |
remaining_capacity, next_profit, next_weight | |
) | |
before_break_item_id = break_item_id - 1 | |
additional_profit_when_break_item = 0 | |
if before_break_item_id >= 0: | |
previous_weight = self._sorted_items[before_break_item_id].weight | |
# Having previous_weight == 0 means the total capacity is smaller | |
# than the weight of the current item. In such as case the item | |
# cannot be part of a solution of the local one dimension | |
# problem. | |
if previous_weight != 0: | |
previous_profit = self._sorted_items[before_break_item_id].profit | |
overused_capacity = ( | |
self._sorted_items[break_item_id].weight - remaining_capacity | |
) | |
ratio = _upper_bound_of_ratio( | |
overused_capacity, previous_profit, previous_weight | |
) | |
additional_profit_when_break_item = ( | |
self._sorted_items[break_item_id].profit - ratio | |
) | |
additional_profit = max( | |
additional_profit_when_no_break_item, additional_profit_when_break_item | |
) | |
return additional_profit | |
def __repr__(self) -> str: | |
return ( | |
f"{super().__repr__()},capacity:{self._capacity}," | |
f"consumed_capacity:{self._consumed_capacity},break_item_id:" | |
f"{self._break_item_id},sorted_items:{self._sorted_items}," | |
f"profit_max:{self._profit_max}" | |
) | |
class BaseKnapsackSolver: | |
"""This is the base class for knapsack solvers.""" | |
def __init__(self, solver_name: str): | |
self._solver_name: str = solver_name | |
def init( | |
self, profits: list[int], weights: list[list[int]], capacities: list[int] | |
) -> None: | |
"""Initializes the solver and enters the problem to be solved.""" | |
def get_lower_and_upper_bound_when_item( | |
self, item_id: int, is_item_in: bool, lower_bound: int, upper_bound: int | |
) -> tuple[int, int]: | |
"""Gets the lower and upper bound when the item is in or out of the | |
knapsack. To ensure objects are correctly initialized, this method | |
should not be called before init(). | |
""" | |
lower_bound = 0 | |
upper_bound = 9223372036854775807 | |
return lower_bound, upper_bound | |
def solve(self) -> tuple[int, bool]: | |
"""Solves the problem and returns the profit of the optimal solution.""" | |
def best_solution(self, item_id) -> bool: | |
"""Returns True if the item 'item_id' is packed in the optimal | |
knapsack. | |
""" | |
def get_name(self) -> str: | |
return self._solver_name | |
def __repr__(self) -> str: | |
return f"{self._solver_name}" | |
class KnapsackGenericSolver(BaseKnapsackSolver): | |
"""KnapsackGenericSolver is the multi-dimensional knapsack solver class. | |
In the current implementation, the next item to assign is given by the | |
master propagator. Using SetMasterPropagator allows changing the default | |
(propagator of the first dimension), and selecting another dimension when | |
more constrained. | |
TODO(user): In the case of a multi-dimensional knapsack problem, implement | |
an aggregated propagator to combine all dimensions and give a better guide | |
to select the next item (see, for instance, Dobson's aggregated efficiency). | |
""" | |
def __init__(self, solver_name: str): | |
super().__init__(solver_name) | |
self._propagators: list[KnapsackPropagator] = [] | |
self._primary_propagator_id: int = _PRIMARY_PROPAGATOR_ID | |
self._search_nodes: list[KnapsackSearchNode] = [] | |
self._state: KnapsackState = KnapsackState() | |
self._best_solution_profit: int = 0 | |
self._best_solution: list[bool] = [] | |
def init( | |
self, profits: list[int], weights: list[list[int]], capacities: list[int] | |
) -> None: | |
"""Initializes the solver and enters the problem to be solved.""" | |
number_of_items = len(profits) | |
number_of_dimensions = len(weights) | |
self._state.init(number_of_items) | |
self._best_solution = [False] * number_of_items | |
for i in range(number_of_dimensions): | |
propagator = KnapsackCapacityPropagator(self._state, capacities[i]) | |
propagator.init(profits, weights[i]) | |
self._propagators.append(propagator) | |
self._primary_propagator_id = _PRIMARY_PROPAGATOR_ID | |
def get_number_of_items(self) -> int: | |
return self._state.get_number_of_items() | |
def get_lower_and_upper_bound_when_item( | |
self, item_id: int, is_item_in: bool, lower_bound: int, upper_bound: int | |
) -> tuple[int, int]: | |
assignment = KnapsackAssignment(item_id, is_item_in) | |
fail = not self._incremental_update(False, assignment) | |
if fail: | |
lower_bound = 0 | |
upper_bound = 0 | |
else: | |
lower_bound = ( | |
self._propagators[self._primary_propagator_id].profit_lower_bound() | |
if self._has_one_propagator() | |
else 0 | |
) | |
upper_bound = self._get_aggregated_profit_upper_bound() | |
fail_revert = not self._incremental_update(True, assignment) | |
if fail_revert: | |
lower_bound = 0 | |
upper_bound = 0 | |
return lower_bound, upper_bound | |
def set_primary_propagator_id(self, primary_propagator_id: int) -> None: | |
"""Sets which propagator should be used to guide the search. | |
'primary_propagator_id' should be in 0..p-1 with p the number of | |
propagators.""" | |
self._primary_propagator_id = primary_propagator_id | |
def solve(self) -> tuple[int, bool]: | |
"""Solves the problem and returns the profit of the optimal solution.""" | |
self._best_solution_profit = 0 | |
is_solution_optimal = True | |
search_queue = [] | |
assignment = KnapsackAssignment(_NO_SELECTION, True) | |
root_node = KnapsackSearchNode(None, assignment) | |
root_node.set_current_profit(self._get_current_profit()) | |
root_node.set_profit_upper_bound(self._get_aggregated_profit_upper_bound()) | |
root_node.set_next_item_id(self._get_next_item_id()) | |
self._search_nodes.append(root_node) | |
if self._make_new_node(root_node, False): | |
heapq.heappush(search_queue, self._search_nodes[-1]) | |
if self._make_new_node(root_node, True): | |
heapq.heappush(search_queue, self._search_nodes[-1]) | |
current_node = root_node | |
while (len(search_queue) > 0) and ( | |
search_queue[0].profit_upper_bound() > self._best_solution_profit | |
): | |
node = heapq.heappop(search_queue) | |
if node != current_node: | |
path = KnapsackSearchPath(current_node, node) | |
path.init() | |
no_fail = self._update_propagators(path) | |
current_node = node | |
if self._make_new_node(node, False): | |
heapq.heappush(search_queue, self._search_nodes[-1]) | |
if self._make_new_node(node, True): | |
heapq.heappush(search_queue, self._search_nodes[-1]) | |
return self._best_solution_profit, is_solution_optimal | |
def best_solution(self, item_id: int) -> bool: | |
"""Returns true if the item 'item_id' is packed in the optimal knapsack.""" | |
return self._best_solution[item_id] | |
def _update_propagators(self, path: KnapsackSearchPath) -> bool: | |
"""Updates all propagators reverting/applying all decision on the path. | |
Returns true if fails. Note that, even if fails, all propagators should | |
be updated to be in a stable state in order to stay incremental. | |
""" | |
# Returns false when at least one propagator fails. | |
no_fail = True | |
# Revert previous changes. | |
node = path.from_node() | |
via = path.via() | |
while node != via: | |
no_fail = self._incremental_update(True, node.assignment()) and no_fail | |
node = node.parent() | |
# Apply current changes. | |
node = path.to_node() | |
while node != via: | |
no_fail = self._incremental_update(False, node.assignment()) and no_fail | |
node = node.parent() | |
return no_fail | |
def _get_aggregated_profit_upper_bound(self) -> int: | |
"""Gets the aggregated (min) profit upper bound among all propagators.""" | |
upper_bound = 9223372036854775807 | |
for prop in self._propagators: | |
prop.compute_profit_bounds() | |
propagator_upper_bound = prop.profit_upper_bound() | |
upper_bound = min(upper_bound, propagator_upper_bound) | |
return upper_bound | |
def _make_new_node(self, node: KnapsackSearchNode, is_in: bool) -> bool: | |
"""Returns true if new relevant search node was added to the nodes | |
array, that means this node should be added to the search queue too. | |
""" | |
if node.next_item_id() == _NO_SELECTION: | |
return False | |
assignment = KnapsackAssignment(node.next_item_id(), is_in) | |
new_node = KnapsackSearchNode(node, assignment) | |
path = KnapsackSearchPath(node, new_node) | |
path.init() | |
no_fail = self._update_propagators(path) | |
if no_fail: | |
new_node.set_current_profit(self._get_current_profit()) | |
new_node.set_profit_upper_bound(self._get_aggregated_profit_upper_bound()) | |
new_node.set_next_item_id(self._get_next_item_id()) | |
self._update_best_solution() | |
# Revert to be able to create another node from parent. | |
revert_path = KnapsackSearchPath(new_node, node) | |
revert_path.init() | |
self._update_propagators(revert_path) | |
if (not no_fail) or ( | |
new_node.profit_upper_bound() < self._best_solution_profit | |
): | |
return False | |
# The node is relevant. | |
relevant_node = KnapsackSearchNode(node, assignment) | |
relevant_node.set_current_profit(new_node.current_profit()) | |
relevant_node.set_profit_upper_bound(new_node.profit_upper_bound()) | |
relevant_node.set_next_item_id(new_node.next_item_id()) | |
self._search_nodes.append(relevant_node) | |
return True | |
def _incremental_update(self, revert: bool, assignment: KnapsackAssignment) -> bool: | |
"""Updates all propagators reverting/applying one decision. | |
Return true if fails. Note that, even if fails, all propagators should | |
be updated to be in a stable state in order to stay incremental. | |
""" | |
# Do not stop on a failure: To be able to be incremental on the update, | |
# partial solution (state) and propagators must all be in the same | |
# state. | |
no_fail = self._state.update_state(revert, assignment) | |
for prop in self._propagators: | |
no_fail = prop.update(revert, assignment) and no_fail | |
return no_fail | |
def _update_best_solution(self) -> None: | |
"""Updates the best solution if the current solution has better | |
profit. | |
""" | |
profit_lower_bound = ( | |
self._propagators[self._primary_propagator_id].profit_lower_bound() | |
if self._has_one_propagator() | |
else self._propagators[self._primary_propagator_id].current_profit() | |
) | |
if self._best_solution_profit < profit_lower_bound: | |
self._best_solution_profit = profit_lower_bound | |
self._propagators[ | |
self._primary_propagator_id | |
].copy_current_state_to_solution( | |
self._has_one_propagator(), self._best_solution | |
) | |
def _has_one_propagator(self) -> bool: | |
return len(self._propagators) == 1 | |
def _get_current_profit(self) -> int: | |
return self._propagators[self._primary_propagator_id].current_profit() | |
def _get_next_item_id(self) -> int: | |
return self._propagators[self._primary_propagator_id].get_next_item_id() | |
class KnapsackBruteForceSolver(BaseKnapsackSolver): | |
"""KnapsackBruteForceSolver solves the 0-1 knapsack problem when the | |
number of items is less or equal to 30 with brute force, i.e. | |
explores all states. Experiments show better results than | |
KnapsackGenericSolver when the number of items is less than 15. | |
""" | |
def __init__(self, solver_name: str): | |
super().__init__(solver_name) | |
self._num_items: int = 0 | |
self._capacity: int = 0 | |
self._best_solution_profit: int = 0 | |
self._best_solution: int = 0 | |
self._profits_weights: list[int] = [] | |
def init( | |
self, profits: list[int], weights: list[list[int]], capacities: list[int] | |
) -> None: | |
"""Initializes the solver and enters the problem to be solved.""" | |
self._num_items = len(profits) | |
if self._num_items > _MAX_NUMBER_OF_BRUTE_FORCE_ITEMS: | |
raise Exception( | |
f"To use KnapsackBruteForceSolver the number of items " | |
f"should be less than {_MAX_NUMBER_OF_BRUTE_FORCE_ITEMS}" | |
f". Current value: {self._num_items}." | |
) | |
for i in range(self._num_items): | |
self._profits_weights.append(profits[i]) | |
self._profits_weights.append(weights[0][i]) | |
self._capacity = capacities[0] | |
def solve(self) -> tuple[int, bool]: | |
"""Solves the problem and returns the profit of the optimal solution.""" | |
is_solution_optimal = True | |
self._best_solution_profit = 0 | |
self._best_solution = 0 | |
num_states = _one_bit(self._num_items) | |
prev_state = 0 | |
sum_profit = 0 | |
sum_weight = 0 | |
diff_state = 0 | |
local_state = 0 | |
item_id = 0 | |
# This loop starts at 1, because state = 0 was already considered | |
# previously, i.e., when no items are in, sum_profit = 0 | |
for state in range(1, num_states): | |
diff_state = state ^ prev_state | |
local_state = state | |
item_id = 0 | |
while diff_state: | |
if diff_state & 1: # There is a diff. | |
if local_state & 1: # This item is now in the knapsack. | |
sum_profit += self._profits_weights[item_id] | |
sum_weight += self._profits_weights[item_id + 1] | |
else: # This item has been removed of the knapsack. | |
sum_profit -= self._profits_weights[item_id] | |
sum_weight -= self._profits_weights[item_id + 1] | |
item_id += 2 | |
local_state = local_state >> 1 | |
diff_state = diff_state >> 1 | |
if (sum_weight <= self._capacity) and ( | |
self._best_solution_profit < sum_profit | |
): | |
self._best_solution_profit = sum_profit | |
self._best_solution = state | |
prev_state += 1 | |
return self._best_solution_profit, is_solution_optimal | |
def best_solution(self, item_id: int) -> bool: | |
"""Returns true if the item 'item_id' is packed in the optimal | |
knapsack. | |
""" | |
return (self._best_solution & _one_bit(item_id)) != 0 | |
class KnapsackItemWithEfficiency: | |
"""KnapsackItem is a small struct to pair an item weight with its | |
corresponding profit. | |
This struct is used by Knapsack64ItemsSolver. As this solver deals only | |
with one dimension, that's more efficiency to store 'efficiency' than | |
computing it on the fly. | |
""" | |
def __init__(self, index: int, profit: int, weight: int, profit_max: int): | |
self.index: int = index | |
self.profit: int = profit | |
self.weight: int = weight | |
self.efficiency: float = profit / weight if weight > 0 else profit_max | |
class Knapsack64ItemsSolver(BaseKnapsackSolver): | |
"""Knapsack64ItemsSolver solves the 0-1 knapsack problem when the number of | |
items is less or equal to 64. This implementation is about 4 times faster | |
than KnapsackGenericSolver. | |
""" | |
def __init__(self, solver_name: str): | |
super().__init__(solver_name) | |
self._sorted_items: list[KnapsackItemWithEfficiency] = [] | |
self._sum_profits: list[int] = [] | |
self._sum_weights: list[int] = [] | |
self._capacity: int = 0 | |
self._state: int = 0 | |
self._state_depth: int = 0 | |
self._best_solution_profit: int = 0 | |
self._best_solution: int = 0 | |
self._best_solution_depth: int = 0 | |
# Sum of weights of included item in state. | |
self._state_weight: int = 0 | |
# Sum of profits of non included items in state | |
self._rejected_items_profit: int = 0 | |
# Sum of weights of non included items in state | |
self._rejected_items_weight: int = 0 | |
def init( | |
self, profits: list[int], weights: list[list[int]], capacities: list[int] | |
) -> None: | |
"""Initializes the solver and enters the problem to be solved.""" | |
if len(weights) > 1: | |
raise Exception("Brute force solver only works with one dimension.") | |
self._capacity = capacities[0] | |
num_items = len(profits) | |
if num_items > _MAX_NUMBER_OF_64_ITEMS: | |
raise Exception( | |
f"To use Knapsack64Items the number of items should be " | |
f"less than {_MAX_NUMBER_OF_64_ITEMS}. Current value: " | |
f"{num_items}." | |
) | |
profit_max = max(profits) | |
for i in range(num_items): | |
self._sorted_items.append( | |
KnapsackItemWithEfficiency(i, profits[i], weights[0][i], profit_max) | |
) | |
self._sorted_items.sort(key=lambda item: item.efficiency, reverse=True) | |
sum_profit = 0 | |
sum_weight = 0 | |
self._sum_profits.append(sum_profit) | |
self._sum_weights.append(sum_weight) | |
for i in range(num_items): | |
sum_profit += self._sorted_items[i].profit | |
sum_weight += self._sorted_items[i].weight | |
self._sum_profits.append(sum_profit) | |
self._sum_weights.append(sum_weight) | |
def solve(self) -> tuple[int, bool]: | |
"""Solves the problem and returns the profit of the optimal solution.""" | |
is_solution_optimal = True | |
num_items = len(self._sorted_items) | |
self._state = 1 | |
self._state_depth = 0 | |
self._state_weight = self._sorted_items[0].weight | |
self._rejected_items_profit = 0 | |
self._rejected_items_weight = 0 | |
self._best_solution_profit = 0 | |
self._best_solution = 0 | |
self._best_solution_depth = 0 | |
lower_bound = 0 | |
upper_bound = 0 | |
fail = False | |
while self._state_depth >= 0: | |
fail = False | |
if self._state_weight > self._capacity or self._state_depth >= num_items: | |
fail = True | |
else: | |
lower_bound, upper_bound = self._get_lower_and_upper_bound() | |
if self._best_solution_profit < lower_bound: | |
self._best_solution_profit = lower_bound | |
self._best_solution = self._state | |
self._best_solution_depth = self._state_depth | |
fail = fail or self._best_solution_profit >= upper_bound | |
self._go_to_next_state(fail) | |
self._build_best_solution() | |
return self._best_solution_profit, is_solution_optimal | |
def best_solution(self, item_id: int) -> bool: | |
"""Returns true if the item 'item_id' is packed in the optimal knapsack.""" | |
return (self._best_solution & _one_bit(item_id)) != 0 | |
def _get_break_item_id(self, capacity: int) -> int: | |
index = bisect.bisect_right(self._sum_weights, capacity) | |
return index - 1 | |
def _get_lower_and_upper_bound(self) -> tuple[int, int]: | |
"""This method is called for each possible state. | |
Lower and upper bounds can be equal from one state to another. | |
For instance state 1010???? and state 101011?? have exactly the same | |
bounds. So it sounds like a good idea to cache those bounds. | |
Unfortunately, experiments show equivalent results with or without this | |
code optimization (only 1/7 of calls can be reused). | |
In order to simplify the code, this optimization is not implemented. | |
""" | |
available_capacity = self._capacity + self._rejected_items_weight | |
break_item_id = self._get_break_item_id(available_capacity) | |
num_items = len(self._sorted_items) | |
if break_item_id >= num_items: | |
lower_bound = self._sum_profits[num_items] - self._rejected_items_profit | |
upper_bound = lower_bound | |
return lower_bound, upper_bound | |
lower_bound = self._sum_profits[break_item_id] - self._rejected_items_profit | |
upper_bound = lower_bound | |
consumed_capacity = self._sum_weights[break_item_id] | |
remaining_capacity = available_capacity - consumed_capacity | |
efficiency = self._sorted_items[break_item_id].efficiency | |
additional_profit = remaining_capacity * efficiency | |
upper_bound += additional_profit | |
return lower_bound, upper_bound | |
def _go_to_next_state(self, has_failed: bool) -> None: | |
"""As self._state_depth is the position of the most significant bit on | |
self._state it is possible to remove the loop and so be in O(1) instead of | |
O(depth). In such a case self._rejected_items_profit is computed using | |
self._sum_profits list. Unfortunately experiments show smaller computation | |
time using the 'while' (10% speed-up). That's the reason why the loop | |
version is implemented. | |
""" | |
mask = _one_bit(self._state_depth) | |
if not has_failed: # Go to next item. | |
self._state_depth += 1 | |
self._state = self._state | (mask << 1) | |
self._state_weight += self._sorted_items[self._state_depth].weight | |
else: # Backtrack to last item in. | |
while (self._state & mask) == 0 and self._state_depth >= 0: | |
item = self._sorted_items[self._state_depth] | |
self._rejected_items_profit -= item.profit | |
self._rejected_items_weight -= item.weight | |
self._state_depth -= 1 | |
mask = mask >> 1 | |
if self._state & mask: # Item was in, remove it. | |
self._state = self._state & ~mask | |
item = self._sorted_items[self._state_depth] | |
self._rejected_items_profit += item.profit | |
self._rejected_items_weight += item.weight | |
self._state_weight -= item.weight | |
def _build_best_solution(self) -> None: | |
remaining_capacity = self._capacity | |
check_profit = 0 | |
# Compute remaining capacity at self._best_solution_depth to be able to | |
# redo the _get_lower_and_upper_bound computation. | |
for i in range(self._best_solution_depth): | |
if self._best_solution & _one_bit(i): | |
remaining_capacity -= self._sorted_items[i].weight | |
check_profit += self._sorted_items[i].profit | |
# Add all items till the break item. | |
num_items = len(self._sorted_items) | |
for i in range(self._best_solution_depth + 1, num_items): | |
weight = self._sorted_items[i].weight | |
if remaining_capacity >= weight: | |
remaining_capacity -= weight | |
check_profit += self._sorted_items[i].profit | |
self._best_solution = self._best_solution | _one_bit(i) | |
else: | |
self._best_solution = self._best_solution & ~_one_bit(i) | |
# Items were sorted by efficiency, solution should be unsorted to be | |
# in user order. | |
# Note that self._best_solution will not be in the same order than other | |
# data structures anymore. | |
tmp_solution = 0 | |
for i in range(num_items): | |
if self._best_solution & _one_bit(i): | |
original_id = self._sorted_items[i].index | |
tmp_solution = tmp_solution | _one_bit(original_id) | |
self._best_solution = tmp_solution | |
class KnapsackDynamicProgrammingSolver(BaseKnapsackSolver): | |
"""KnapsackDynamicProgrammingSolver solves the 0-1 knapsack problem | |
using dynamic programming. This algorithm is pseudo-polynomial because it | |
depends on capacity, i.e. the time and space complexity is | |
O(capacity * number_of_items). | |
The implemented algorithm is 'DP-3' in "Knapsack problems", Hans Kellerer, | |
Ulrich Pferschy and David Pisinger, Springer book (ISBN 978-3540402862) | |
""" | |
def __init__(self, solver_name: str): | |
super().__init__(solver_name) | |
self._profits: list[int] = [] | |
self._weights: list[int] = [] | |
self._capacity: int = 0 | |
self._computed_profits: list[int] = [] | |
self._selected_item_ids: list[int] = [] | |
self._best_solution: list[bool] = [] | |
def init( | |
self, profits: list[int], weights: list[list[int]], capacities: list[int] | |
) -> None: | |
"""Initializes the solver and enters the problem to be solved.""" | |
if len(weights) > 1: | |
raise Exception( | |
"Current implementation of the dynamic programming " | |
f"solver only deals with one dimension." | |
) | |
self._profits = profits | |
self._weights = weights[0] | |
self._capacity = capacities[0] | |
def solve(self) -> tuple[int, bool]: | |
"""Solves the problem and returns the profit of the optimal solution.""" | |
is_solution_optimal = True | |
capacity_plus_1 = self._capacity + 1 | |
self._selected_item_ids = [0] * capacity_plus_1 | |
self._computed_profits = [0] * capacity_plus_1 | |
remaining_capacity = self._capacity | |
num_items = len(self._profits) | |
self._best_solution = [False] * num_items | |
while remaining_capacity > 0 and num_items > 0: | |
selected_item_id = self._solve_sub_problem(remaining_capacity, num_items) | |
remaining_capacity -= self._weights[selected_item_id] | |
num_items = selected_item_id | |
if remaining_capacity >= 0: | |
self._best_solution[selected_item_id] = True | |
return self._computed_profits[self._capacity], is_solution_optimal | |
def best_solution(self, item_id: int) -> bool: | |
"""Returns true if the item 'item_id' is packed in the optimal knapsack.""" | |
return self._best_solution[item_id] | |
def _solve_sub_problem(self, capacity: int, num_items: int) -> int: | |
capacity_plus_1 = capacity + 1 | |
for i in range(capacity_plus_1): | |
self._selected_item_ids[i] = 0 | |
self._computed_profits[i] = 0 | |
for item_id in range(num_items): | |
item_weight = self._weights[item_id] | |
item_profit = self._profits[item_id] | |
for used_capacity in range(capacity, item_weight - 1, -1): | |
if ( | |
self._computed_profits[used_capacity - item_weight] + item_profit | |
> self._computed_profits[used_capacity] | |
): | |
self._computed_profits[used_capacity] = ( | |
self._computed_profits[used_capacity - item_weight] | |
+ item_profit | |
) | |
self._selected_item_ids[used_capacity] = item_id | |
return self._selected_item_ids[capacity] | |
class KnapsackDivideAndConquerSolver(BaseKnapsackSolver): | |
"""KnapsackDivideAndConquerSolver solves the 0-1 knapsack problem (KP) using | |
divide and conquer and dynamic programming. | |
By using one-dimensional vectors it keeps a complexity of O(capacity * | |
number_of_items) in time, but reduces the space complexity to O(capacity + | |
number_of_items) and is therefore suitable for large hard to solve (KP)/(SSP). | |
The implemented algorithm is based on 'DP-2' and Divide and Conquer for | |
storage reduction from [Hans Kellerer et al., "Knapsack problems" | |
(DOI 10.1007/978-3-540-24777-7)]. | |
""" | |
def __init__(self, solver_name: str): | |
super().__init__(solver_name) | |
self._profits: list[int] = [] | |
self._weights: list[int] = [] | |
self._capacity: int = 0 | |
self._computed_profits_storage1: list[int] = [] | |
self._computed_profits_storage2: list[int] = [] | |
self._best_solution: list[bool] = [] | |
def init( | |
self, profits: list[int], weights: list[list[int]], capacities: list[int] | |
) -> None: | |
"""Initializes the solver and enters the problem to be solved.""" | |
if len(weights) > 1: | |
raise Exception( | |
"Current implementation of the divide and conquer solver " | |
"only deals with one dimension." | |
) | |
self._profits = profits | |
self._weights = weights[0] | |
self._capacity = capacities[0] | |
def solve(self) -> tuple[int, bool]: | |
"""Solves the problem and returns the profit of the optimal solution.""" | |
is_solution_optimal = True | |
capacity_plus_1 = self._capacity + 1 | |
self._computed_profits_storage1 = [0] * capacity_plus_1 | |
self._computed_profits_storage2 = [0] * capacity_plus_1 | |
self._best_solution = [False] * len(self._profits) | |
best_solution_profit = self._divide_and_conquer( | |
self._capacity, 0, len(self._profits) | |
) | |
return best_solution_profit, is_solution_optimal | |
def best_solution(self, item_id: int) -> bool: | |
"""Returns true if the item 'item_id' is packed in the optimal knapsack.""" | |
return self._best_solution[item_id] | |
def _solve_sub_problem( | |
self, first_storage: bool, capacity: int, start_item: int, end_item: int | |
) -> None: | |
"""'DP 2' computes solution 'z' for 0 up to capacity.""" | |
computed_profits_storage = ( | |
self._computed_profits_storage1 | |
if first_storage | |
else self._computed_profits_storage2 | |
) | |
capacity_plus_1 = capacity + 1 | |
for i in range(capacity_plus_1): | |
computed_profits_storage[i] = 0 | |
for item_id in range(start_item, end_item): | |
item_weight = self._weights[item_id] | |
item_profit = self._profits[item_id] | |
for used_capacity in range(capacity, item_weight - 1, -1): | |
if ( | |
computed_profits_storage[used_capacity - item_weight] + item_profit | |
> computed_profits_storage[used_capacity] | |
): | |
computed_profits_storage[used_capacity] = ( | |
computed_profits_storage[used_capacity - item_weight] | |
+ item_profit | |
) | |
def _divide_and_conquer(self, capacity: int, start_item: int, end_item: int) -> int: | |
"""Calculates self._best_solution and return 'z' from first instance.""" | |
item_boundary = int(start_item + ((end_item - start_item) / 2)) | |
self._solve_sub_problem(True, capacity, start_item, item_boundary) | |
self._solve_sub_problem(False, capacity, item_boundary, end_item) | |
max_solution = 0 | |
capacity1 = 0 | |
capacity2 = 0 | |
for capacity_id in range(capacity + 1): | |
if ( | |
self._computed_profits_storage1[capacity_id] | |
+ self._computed_profits_storage2[capacity - capacity_id] | |
) > max_solution: | |
capacity1 = capacity_id | |
capacity2 = capacity - capacity_id | |
max_solution = ( | |
self._computed_profits_storage1[capacity_id] | |
+ self._computed_profits_storage2[capacity - capacity_id] | |
) | |
if (item_boundary - start_item) == 1: | |
if self._weights[start_item] <= capacity1: | |
self._best_solution[start_item] = True | |
elif (item_boundary - start_item) > 1: | |
self._divide_and_conquer(capacity1, start_item, item_boundary) | |
if (end_item - item_boundary) == 1: | |
if self._weights[item_boundary] <= capacity2: | |
self._best_solution[item_boundary] = True | |
elif (end_item - item_boundary) > 1: | |
self._divide_and_conquer(capacity2, item_boundary, end_item) | |
return max_solution | |
class KnapsackMIPSolver(BaseKnapsackSolver): | |
"""KnapsackMIPSolver.""" | |
def __init__(self, problem_type: str, solver_name: str): | |
super().__init__(solver_name) | |
self._problem_type: str = problem_type | |
self._profits: list[int] = [] | |
self._weights: list[list[int]] = [] | |
self._capacities: list[int] = [] | |
self._best_solution: list[bool] = [] | |
def init( | |
self, profits: list[int], weights: list[list[int]], capacities: list[int] | |
) -> None: | |
"""Initializes the solver and enters the problem to be solved.""" | |
self._profits = profits | |
self._weights = weights | |
self._capacities = capacities | |
def solve(self) -> tuple[int, bool]: | |
"""Solves the problem and returns the profit of the optimal solution.""" | |
solver = pywraplp.Solver.CreateSolver(self._problem_type) | |
num_items = len(self._profits) | |
variables = [] | |
for i in range(num_items): | |
variables.append(solver.BoolVar(f"SelectItem|{i}")) | |
# Add constraints. | |
num_dimensions = len(self._capacities) | |
for dimension_index in range(num_dimensions): | |
coeff_var_list = [] | |
for var_index, var in enumerate(variables): | |
coeff_var_list.append(self._weights[dimension_index][var_index] * var) | |
solver.Add( | |
solver.Sum(coeff_var_list) <= self._capacities[dimension_index], | |
f"CapacityConstraint{dimension_index}", | |
) | |
# Define objective to minimize. Minimization is used instead of | |
# maximization because of an issue with CBC solver which does not always | |
# find the optimal solution on maximization problems. | |
objective = solver.Objective() | |
for j, var in enumerate(variables): | |
objective.SetCoefficient(var, -self._profits[j]) | |
objective.SetMinimization() | |
# print(solver.ExportModelAsLpFormat(False)) | |
status = solver.Solve() | |
# Store best solution. | |
ROUND_NEAR = 0.5 | |
self._best_solution = [False] * num_items | |
for j in range(num_items): | |
value = variables[j].solution_value() | |
self._best_solution[j] = value >= ROUND_NEAR | |
return int(-objective.Value() + ROUND_NEAR), True | |
def best_solution(self, item_id: int) -> bool: | |
"""Returns true if the item 'item_id' is packed in the optimal knapsack.""" | |
return self._best_solution[item_id] | |
if __name__ == "__main__": | |
solver = KnapsackSolver( | |
KnapsackSolver.KNAPSACK_DYNAMIC_PROGRAMMING_SOLVER, "KnapsackExample" | |
) | |
values = [1, 2, 3] | |
weights = [[1, 2, 3]] | |
capacities = [5] | |
solver.init(values, weights, capacities) | |
computed_value = solver.solve() | |
packed_items = [] | |
for i in range(len(values)): | |
if solver.best_solution_contains(i): | |
packed_items.append(i) | |
packed_weights = [] | |
total_weights = 0 | |
for i in packed_items: | |
packed_weights.append(weights[0][i]) | |
total_weights += weights[0][i] | |
print("Total value:", computed_value) | |
print("Packed items:", packed_items) | |
print("Total weight:", total_weights) | |
print("Packed weights:", packed_weights) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment