Created
August 15, 2024 06:18
-
-
Save btmxh/903d12e07a904fdbe0dac3843b9c9347 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from abc import ABC, abstractmethod | |
from collections.abc import Callable, Iterable | |
from typing import cast, override, Protocol, Self | |
from random import choice, choices, random | |
from heapq import nsmallest, heapify, heappush, heappop | |
from math import sqrt | |
import csv | |
class Comparable(Protocol): | |
def __lt__(self, value: Self, /) -> bool: ... | |
class PriorityQueue[T, K: Comparable]: | |
heap: list[tuple[K, int, T]] | |
counter: int | |
key_fn: Callable[[T], K] | |
def __init__( | |
self, | |
lst: Iterable[T], | |
key_fn: Callable[[T], K] = lambda value: cast(K, value), | |
) -> None: | |
super().__init__() | |
self.counter = 0 | |
self.key_fn = key_fn | |
self.heap = [self.make_element(value) for value in lst] | |
heapify(self.heap) | |
def make_element(self, value: T) -> tuple[K, int, T]: | |
self.counter += 1 | |
return (self.key_fn(value), self.counter, value) | |
def push(self, value: T): | |
heappush(self.heap, self.make_element(value)) | |
def pop(self) -> T | None: | |
try: | |
return heappop(self.heap)[2] | |
except IndexError: | |
return None | |
def __len__(self) -> int: | |
return len(self.heap) | |
class Request: | |
x: float | |
y: float | |
demand: float | |
open: float | |
close: float | |
service_time: float | |
time: float | |
def __init__( | |
self, | |
x: float, | |
y: float, | |
demand: float, | |
open: float, | |
close: float, | |
service_time: float, | |
time: float, | |
): | |
self.x = x | |
self.y = y | |
self.demand = demand | |
self.open = open | |
self.close = close | |
self.service_time = service_time | |
self.time = time | |
@staticmethod | |
def load_csv(path: str): | |
reqs: list[Request] = [] | |
with open(path) as f: | |
r = csv.reader(f) | |
_headers = next(r) | |
for row in r: | |
x, y, demand, time_open, close, service_time, _drone_serve, time = [ | |
float(t) for t in row | |
] | |
reqs.append(Request(x, y, demand, time_open, close, service_time, time)) | |
return reqs | |
class Problem: | |
requests: list[Request] | |
truck_speed: float | |
truck_capacity: float | |
num_trucks: int | |
def start_x(self) -> float: | |
assert self.requests[0].demand == 0 | |
return self.requests[0].x | |
def start_y(self) -> float: | |
assert self.requests[0].demand == 0 | |
return self.requests[0].y | |
def __init__( | |
self, csv: str, truck_speed: float, truck_capacity: float, num_trucks: int | |
): | |
self.truck_speed = truck_speed | |
self.truck_capacity = truck_capacity | |
self.num_trucks = num_trucks | |
self.requests = Request.load_csv(csv) | |
RoutingRule = Callable[["Simulator", Request], int] | |
SequencingRule = Callable[["Simulator", int, list["QueueElem"]], int] | |
class Event(ABC): | |
@abstractmethod | |
def time(self) -> float: ... | |
class RequestEvent(Event): | |
request: Request | |
def __init__(self, request: Request) -> None: | |
super().__init__() | |
self.request = request | |
@override | |
def time(self) -> float: | |
return self.request.time | |
class MachineFinishEvent(Event): | |
machine: int | |
finish_time: float | |
def __init__(self, machine: int, finish_time: float): | |
self.machine = machine | |
self.finish_time = finish_time | |
@override | |
def time(self): | |
return self.finish_time | |
class QueueElem: | |
request: Request | |
ready_time: float | |
def __init__(self, request: Request, ready_time: float): | |
self.request = request | |
self.ready_time = ready_time | |
class MachineState: | |
queue: list[QueueElem] | |
machine: int | |
busy_until: float | |
x: float | |
y: float | |
def __init__(self, machine: int, x: float, y: float) -> None: | |
self.queue = [] | |
self.busy_until = 0.0 | |
self.machine = machine | |
self.x = x | |
self.y = y | |
def enqueue(self, s: "Simulator", r: Request) -> bool: | |
total_demand = sum(e.request.demand for e in self.queue) | |
if total_demand + r.demand > s.problem.truck_capacity: | |
return False | |
self.queue.append(QueueElem(r, s.time)) | |
s.update_queue(self.machine) | |
return True | |
def dequeue(self, s: "Simulator") -> int: | |
if s.time < self.busy_until: | |
return 0 | |
num_failed = 0 | |
while len(self.queue) > 0: | |
index = s.sequencing_rule(s, self.machine, self.queue) | |
value = self.queue.pop(index) | |
busy_until = s.time + self.cost_of(value.request, s) | |
if busy_until > value.request.close: | |
num_failed += 1 | |
continue | |
self.x = value.request.x | |
self.y = value.request.y | |
self.busy_until = busy_until | |
s.events.push(MachineFinishEvent(self.machine, self.busy_until)) | |
break | |
return num_failed | |
def cost_of(self, r: Request, s: "Simulator"): | |
dist = sqrt((self.x - r.x) ** 2 + (self.y - r.y) ** 2) | |
return max(r.open - s.time, 0) + r.service_time + dist / s.problem.truck_speed | |
class Simulator: | |
problem: Problem | |
routing_rule: RoutingRule | |
sequencing_rule: SequencingRule | |
time: float | |
states: list[MachineState] | |
events: PriorityQueue[Event, float] | |
num_failed_requests: int | |
def __init__( | |
self, | |
problem: Problem, | |
routing_rule: RoutingRule, | |
sequencing_rule: SequencingRule, | |
): | |
self.problem = problem | |
self.routing_rule = routing_rule | |
self.sequencing_rule = sequencing_rule | |
self.time = 0.0 | |
self.states = [ | |
MachineState(i, problem.start_x(), problem.start_y()) | |
for i in range(problem.num_trucks) | |
] | |
self.events = PriorityQueue([], key_fn=lambda e: e.time()) | |
self.num_failed_requests = 0 | |
def simulate(self) -> tuple[float, int]: | |
for request in self.problem.requests[1:]: | |
self.events.push(RequestEvent(request)) | |
self.time = 0.0 | |
while True: | |
event = self.events.pop() | |
if event is None: | |
break | |
assert self.time <= event.time() | |
self.time = event.time() | |
if isinstance(event, RequestEvent): | |
self.handle_request(event) | |
elif isinstance(event, MachineFinishEvent): | |
self.handle_machine_finish(event) | |
return self.time, self.num_failed_requests | |
def handle_request(self, e: RequestEvent): | |
machine = self.routing_rule(self, e.request) | |
if not self.states[machine].enqueue(self, e.request): | |
self.num_failed_requests += 1 | |
def handle_machine_finish(self, e: MachineFinishEvent): | |
self.update_queue(e.machine) | |
def update_queue(self, machine: int): | |
num_failed = self.states[machine].dequeue(self) | |
self.num_failed_requests += num_failed | |
for i in range(1, 4): | |
def CR(s: Simulator, r: Request) -> int: | |
return min( | |
range(s.problem.num_trucks), | |
key=lambda machine: s.states[machine].cost_of(r, s), | |
) | |
def CS(s: Simulator, machine: int, r: list[QueueElem]) -> int: | |
return min( | |
range(len(r)), key=lambda i: s.states[machine].cost_of(r[i].request, s) | |
) | |
def W(s: Simulator, machine: int, r: list[QueueElem]) -> int: | |
return min(range(len(r)), key=lambda i: r[i].request.demand) | |
def WIQ(s: Simulator, _: Request) -> int: | |
return min( | |
range(s.problem.num_trucks), | |
key=lambda machine: sum( | |
s.states[machine].cost_of(r.request, s) for r in s.states[machine].queue | |
), | |
) | |
problem = Problem("/home/torani/100/h100c103.csv", 50 / 60, 1300, i) | |
solver = Simulator(problem, CR, CS) | |
time, num_fail = solver.simulate() | |
print("C+C", i, time, num_fail, (1 - num_fail / (len(problem.requests) - 1))) | |
solver = Simulator(problem, CR, W) | |
time, num_fail = solver.simulate() | |
print("C+W", i, time, num_fail, (1 - num_fail / (len(problem.requests) - 1))) | |
solver = Simulator(problem, WIQ, CS) | |
time, num_fail = solver.simulate() | |
print("WIQ+C", i, time, num_fail, (1 - num_fail / (len(problem.requests) - 1))) | |
class NodeType[C](ABC): | |
name: str | |
def __init__(self, name: str): | |
self.name = name | |
@abstractmethod | |
def num_children(self) -> int: ... | |
@abstractmethod | |
def calc(self, ctx: C, children: list[float]) -> float: ... | |
class Node[C]: | |
typ: NodeType[C] | |
children: list["Node[C]"] | |
def __init__(self, typ: NodeType[C], children: list["Node[C]"]) -> None: | |
self.typ = typ | |
self.children = children | |
def eval(self, c: C) -> float: | |
return self.typ.calc(c, [child.eval(c) for child in self.children]) | |
def all_nodes(self) -> list["Node[C]"]: | |
return [self] + [c for child in self.children for c in child.all_nodes()] | |
def depth(self) -> int: | |
return max((child.depth() + 1 for child in self.children), default=0) | |
def assign(self, other: "Node[C]"): | |
self.typ = other.typ | |
self.children = other.children | |
def copy(self) -> "Node[C]": | |
return Node(self.typ, [c.copy() for c in self.children]) | |
@override | |
def __str__(self) -> str: | |
if len(self.children) == 0: | |
return self.typ.name | |
else: | |
return ( | |
self.typ.name | |
+ "(" | |
+ ", ".join(str(child) for child in self.children) | |
+ ")" | |
) | |
class RoutingContext: | |
s: Simulator | |
r: Request | |
machine: int | |
def __init__(self, s: Simulator, r: Request, machine: int) -> None: | |
self.s = s | |
self.r = r | |
self.machine = machine | |
class SequencingContext: | |
s: Simulator | |
machine: int | |
elem: QueueElem | |
def __init__(self, s: Simulator, machine: int, elem: QueueElem) -> None: | |
self.s = s | |
self.machine = machine | |
self.elem = elem | |
class Individual: | |
routing: Node[RoutingContext] | |
sequencing: Node[SequencingContext] | |
simulate_result: tuple[float, int] | None | |
fitness: float | |
def __init__( | |
self, routing: Node[RoutingContext], sequencing: Node[SequencingContext] | |
): | |
self.routing = routing | |
self.sequencing = sequencing | |
self.simulate_result = None | |
self.fitness = 0.0 | |
def as_routing_rule(self) -> RoutingRule: | |
def rr(s: Simulator, r: Request) -> int: | |
return min( | |
range(s.problem.num_trucks), | |
key=lambda machine: self.routing.eval(RoutingContext(s, r, machine)), | |
) | |
return rr | |
def as_sequencing_rule(self) -> SequencingRule: | |
def sr(s: Simulator, machine: int, queue: list[QueueElem]) -> int: | |
return min( | |
range(len(queue)), | |
key=lambda i: self.sequencing.eval( | |
SequencingContext(s, machine, queue[i]) | |
), | |
) | |
return sr | |
class TerminalNodeType[C](NodeType[C]): | |
fn: Callable[[C], float] | |
def __init__(self, name: str, fn: Callable[[C], float]): | |
super().__init__(name) | |
self.fn = fn | |
@override | |
def num_children(self) -> int: | |
return 0 | |
@override | |
def calc(self, ctx: C, children: list[float]) -> float: | |
return self.fn(ctx) | |
R_CIQ = TerminalNodeType[RoutingContext]( | |
"CIQ", lambda c: len(c.s.states[c.machine].queue) | |
) | |
R_WIQ = TerminalNodeType[RoutingContext]( | |
"WIQ", | |
lambda c: sum( | |
c.s.states[c.machine].cost_of(elem.request, c.s) | |
for elem in c.s.states[c.machine].queue | |
), | |
) | |
R_C = TerminalNodeType[RoutingContext]( | |
"C", lambda c: c.s.states[c.machine].cost_of(c.r, c.s) | |
) | |
S_W = TerminalNodeType[SequencingContext]("W", lambda c: c.elem.request.demand) | |
S_C = TerminalNodeType[SequencingContext]( | |
"C", lambda c: c.s.states[c.machine].cost_of(c.elem.request, c.s) | |
) | |
S_WT = TerminalNodeType[SequencingContext]("WT", lambda c: c.s.time - c.elem.ready_time) | |
class BinaryInternalNodeType(NodeType[object]): | |
fn: Callable[[float, float], float] | |
def __init__(self, name: str, fn: Callable[[float, float], float]): | |
super().__init__(name) | |
self.fn = fn | |
@override | |
def num_children(self) -> int: | |
return 2 | |
@override | |
def calc(self, ctx: object, children: list[float]) -> float: | |
return self.fn(children[0], children[1]) | |
ADD = BinaryInternalNodeType("ADD", lambda x, y: x + y) | |
SUB = BinaryInternalNodeType("SUB", lambda x, y: x - y) | |
MUL = BinaryInternalNodeType("MUL", lambda x, y: x * y) | |
DIV = BinaryInternalNodeType("DIV", lambda x, y: x / y if abs(y) > 1e-4 else 1.0) | |
MIN = BinaryInternalNodeType("MIN", lambda x, y: min(x, y)) | |
MAX = BinaryInternalNodeType("MAX", lambda x, y: max(x, y)) | |
class GP: | |
problem: Problem | |
pop_size: int | |
max_depth: int | |
reproduction_rate: tuple[float, float] | |
rnt: list[NodeType[RoutingContext]] | |
snt: list[NodeType[SequencingContext]] | |
def __init__( | |
self, | |
problem: Problem, | |
pop_size: int, | |
max_depth: int, | |
reproduction_rate: tuple[float, float], | |
rnt: list[NodeType[RoutingContext]], | |
snt: list[NodeType[SequencingContext]], | |
): | |
self.problem = problem | |
self.pop_size = pop_size | |
self.max_depth = max_depth | |
self.reproduction_rate = reproduction_rate | |
self.rnt = rnt | |
self.snt = snt | |
def get_node_types[C](self, cls: type[C]) -> list[NodeType[C]]: | |
if cls == RoutingContext: | |
return cast(list[NodeType[C]], self.rnt) | |
elif cls == SequencingContext: | |
return cast(list[NodeType[C]], self.snt) | |
else: | |
return [] | |
def gen_grow[C](self, cls: type[C], depth: int) -> Node[C]: | |
types = [ | |
t for t in self.get_node_types(cls) if t.num_children() == 0 or depth > 0 | |
] | |
typ = choice(types) | |
children = [self.gen_grow(cls, depth - 1) for _ in range(typ.num_children())] | |
return Node(typ, children) | |
def gen_full[C](self, cls: type[C], depth: int) -> Node[C]: | |
types = [ | |
t for t in self.get_node_types(cls) if (t.num_children() > 0) == (depth > 0) | |
] | |
typ = choice(types) | |
children = [self.gen_grow(cls, depth - 1) for _ in range(typ.num_children())] | |
return Node(typ, children) | |
def ramp_half_and_half[C](self, cls: type[C]) -> list[Node[C]]: | |
half_size = self.pop_size // 2 | |
pop: list[Node[C]] = [] | |
for depth in range(1, self.max_depth + 1): | |
for _ in range(half_size // self.max_depth): | |
pop.append(self.gen_grow(cls, depth)) | |
pop.append(self.gen_full(cls, depth)) | |
while len(pop) < self.pop_size: | |
pop.append(self.gen_grow(cls, self.max_depth)) | |
return pop | |
def reproduce(self, pop: list[Individual]): | |
match choices([1, 2], weights=list(self.reproduction_rate), k=1)[0]: | |
case 1: | |
p1 = max(choices(pop, k=8), key=lambda i: i.fitness) | |
p2 = max(choices(pop, k=8), key=lambda i: i.fitness) | |
r = self.crossover(p1.routing, p2.routing, RoutingContext) | |
s = self.crossover(p1.sequencing, p2.sequencing, SequencingContext) | |
return Individual(r, s) | |
case _: | |
p = max(choices(pop, k=8), key=lambda i: i.fitness) | |
r = self.mutate(p.routing, RoutingContext) | |
s = self.mutate(p.sequencing, SequencingContext) | |
return Individual(r, s) | |
def mutate[C](self, p: Node[C], cls: type[C]) -> Node[C]: | |
p = p.copy() | |
pos = choice(p.all_nodes()) | |
pos.assign(self.gen_grow(cls, self.max_depth - p.depth() + pos.depth())) | |
return p | |
def crossover[C](self, p1: Node[C], p2: Node[C], cls: type[C]) -> Node[C]: | |
if random() < 0.5: | |
p1, p2 = p2, p1 | |
p1 = p1.copy() | |
pos1 = choice(p1.all_nodes()) | |
pos2 = choice([ | |
n | |
for n in p2.all_nodes() | |
if n.depth() <= self.max_depth - p1.depth() + pos1.depth() | |
]) | |
pos1.assign(pos2) | |
return p1 | |
def fitness(self, values: tuple[float, int]) -> float: | |
time, num_fail = values | |
return time / 100 + num_fail | |
def solve(self): | |
pop = list( | |
map( | |
lambda t: Individual(t[0], t[1]), | |
zip( | |
self.ramp_half_and_half(RoutingContext), | |
self.ramp_half_and_half(SequencingContext), | |
), | |
) | |
) | |
while True: | |
for i in pop: | |
i.simulate_result = Simulator( | |
problem, i.as_routing_rule(), i.as_sequencing_rule() | |
).simulate() | |
i.fitness = self.fitness(i.simulate_result) | |
pop = nsmallest(self.pop_size, pop, key=lambda i: i.fitness) | |
yield pop[0] | |
pop.extend([self.reproduce(pop) for _ in range(self.pop_size)]) | |
for i in range(1, 4): | |
problem = Problem("100/h100c103.csv", 50 / 60, 1300, i) | |
gp = GP( | |
problem, | |
100, | |
6, | |
(0.8, 0.2), | |
[R_CIQ, R_WIQ, R_C, ADD, SUB, MUL, DIV, MIN, MAX], | |
[S_W, S_C, S_WT, ADD, SUB, MUL, DIV, MIN, MAX], | |
) | |
for gen, best in zip(range(1, 101), gp.solve()): | |
print(gen, best, best.simulate_result, best.fitness) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment