Created September 23, 2017 18:11
VRP (Vehicle Routing Problem) Solver using OR-Tools library in python
from ortools.constraint_solver import pywrapcp
from ortools.constraint_solver import routing_enums_pb2
import argparse
import json
import sys
is_debug = False
class CreateDistanceCallback(object):
"""Create callback to calculate distances and travel times between points."""
def __init__(self, distance_matrix):
"""Initialize distance array."""
self.matrix = distance_matrix
def Distance(self, from_node, to_node):
return self.matrix[from_node][to_node]
class CreateDemandCallback(object):
"""Create callback to get demands at location node."""
def __init__(self, demands):
self.matrix = demands
def Demand(self, from_node, to_node):
return self.matrix[from_node]
# Service time (proportional to demand) callback.
class CreateServiceTimeCallback(object):
"""Create callback to get time windows at each location."""
def __init__(self, time_in_locations):
self.time_in_locations = time_in_locations
def ServiceTime(self, from_node, to_node):
return self.time_in_locations[from_node]
# Create total_time callback (equals service time plus travel time).
class CreateTotalTimeCallback(object):
def __init__(self, service_time_callback, dist_callback, speed):
self.service_time_callback = service_time_callback
self.dist_callback = dist_callback
self.speed = speed
def TotalTime(self, from_node, to_node):
service_time = self.service_time_callback(from_node, to_node)
travel_time = self.dist_callback(from_node, to_node) / self.speed
return service_time + travel_time
def main(args):
debug('Starting script')
# TODO: Validate json structure
global is_debug
is_debug = True if args.debug else False
input_data = json.loads(
locations = input_data["locations"]
distance_matrix = input_data["distance_matrix"]
num_locations = len(input_data["locations"])
start_location = input_data["start_location_index"] if "start_location_index" in input_data else 0
end_location = input_data["end_location_index"] if "end_location_index" in input_data else 1
num_vehicles = 1
speed = 1
fix_start_cumul_to_zero = True
search_time_limit = 8 * 1000
max_time_window = 24 * 3600
if num_locations == 0:
error_and_exit(error(ZERO_LOCATIONS_CODE, "Provide more than 0 locations"))
# The number of nodes of the VRP is num_locations.
# Nodes are indexed from 0 to num_locations - 1. By default the start of
# a route is node 0.
routing = pywrapcp.RoutingModel(num_locations, num_vehicles, [start_location], [end_location])
search_parameters = pywrapcp.RoutingModel.DefaultSearchParameters()
search_parameters.time_limit_ms = search_time_limit
# Setting first solution heuristic: the method for finding a first solution to the problem.
search_parameters.first_solution_strategy = routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC
# The 'PATH_CHEAPEST_ARC' method does the following:
# Starting from a route "start" node, connect it to the node which produces the
# cheapest route segment, then extend the route by iterating on the last node added to the route.
dist_between_locations = CreateDistanceCallback(distance_matrix)
dist_callback = dist_between_locations.Distance
# Adding time dimension constraints.
time_in_locations = [l["service_time"] for l in locations]
service_times = CreateServiceTimeCallback(time_in_locations)
service_time_callback = service_times.ServiceTime
total_times = CreateTotalTimeCallback(service_time_callback, dist_callback, speed)
total_time_callback = total_times.TotalTime
# Add a dimension for time-window constraints and limits on the start times and end times.
time = "Time"
routing.AddDimension(total_time_callback, # total time function callback
# Add limit on size of the time windows.
time_dimension = routing.GetDimensionOrDie(time)
# Transform time windows to "not allowed time windows"
for order in xrange(num_locations):
time_windows = locations[order]["time_windows"]
if len(time_windows) == 0:
continue # Skip empty time windows since it means they are available all the time
time_windows = sorted(time_windows, key=lambda tw: tw[0])
pairwise_tw = pairwise(time_windows)
# Check for overlapping time windows
for cur, next in pairwise_tw:
if next is not None:
cur_end, next_start = cur[1], next[0]
if cur_end >= next_start:
err = error(OVERLAPPING_TIME_WINDOWS, 'Overlapping time windows are not allowed',
{"time_windows": [cur, next]})
closed_time_windows = []
cur_time = 0
# Find available "not allowed time windows" between time windows in the interval [0, max_time_window]
for start, end in time_windows:
if start - cur_time > 1:
closed_time_windows.append([cur_time, start - 1])
cur_time = end + 1
if max_time_window - cur_time > 1:
closed_time_windows.append([cur_time, max_time_window])
# Add the not allowed time window constraints
solver = routing.solver()
[tw[0] for tw in closed_time_windows], [tw[1] for tw in closed_time_windows]
# Solve displays a solution if any.
assignment = routing.SolveWithParameters(search_parameters)
if not assignment:
error_and_exit(error(NO_SOLUTION, "No solution found"))
# Solution cost.
debug("Total distance of all routes: %s" % assignment.ObjectiveValue())
# Inspect solution.
time_dimension = routing.GetDimensionOrDie(time)
vehicle_nbr = 0
index = routing.Start(vehicle_nbr)
route = []
while not routing.IsEnd(index):
node_index = routing.IndexToNode(index)
time_var = time_dimension.CumulVar(index)
"index": node_index,
"min_arrive_time": assignment.Min(time_var),
"max_arrive_time": assignment.Max(time_var),
index = assignment.Value(routing.NextVar(index))
node_index = routing.IndexToNode(index)
time_var = time_dimension.CumulVar(index)
"index": node_index,
"min_arrive_time": assignment.Min(time_var),
"max_arrive_time": assignment.Max(time_var),
stats = {
"cost": assignment.ObjectiveValue()
result = {"is_error": False, "route": route, "stats": stats}
def error(code, msg, info=None):
err = {"is_error": True, "code": code, "message": msg}
if info is not None:
err["info"] = info
return err
def send(data):
def error_and_exit(data):
def send_and_exit(data):
def debug(msg):
global is_debug
if is_debug:
print("DEBUG: %s" % msg)
def parse_args():
parser = argparse.ArgumentParser(description='Vehicle Routing Problem Solver')
data_help = \
'JSON stringyfied object of the form:' \
'"{' \
' "locations": [' \
' {"time_windows": [[<start_time>, <end_time>], ...], "service_time": <seconds>},' \
' ...],' \
' "distance_matrix: [[<location_0_to_0>, ...], [<location_0_to_1>, ...], ...]' \
' "start_location_index": <index>,' \
' "end_location_index": <index>,' \
parser.add_argument('--data', dest='data', required=True, help=data_help)
parser.add_argument('--debug', dest='debug', action='store_true')
return parser.parse_args()
def pairwise(iterable):
it = iter(iterable)
a = next(it, None)
for b in it:
yield (a, b)
a = b
if __name__ == '__main__':
# try:
args = parse_args()
# except BaseException as err:
# error_and_exit(error(UNKNOWN_EXCEPTION, 'An unknown exception occurred', {"err": str(err)}))
