Created
December 13, 2022 13:16
-
-
Save mbillingr/937b83da23b72ad20d83c242ae2629fa to your computer and use it in GitHub Desktop.
Simulate belt balancers (Factorio)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import itertools | |
class Entity: | |
def __init__(self): | |
entities.append(self) | |
class Lane(Entity): | |
def __init__(self): | |
super().__init__() | |
self.inbox = 0 | |
self.outbox = 0 | |
self.capacity = 1 | |
self.flow = [] | |
def tick(self): | |
transfer = min(self.inbox, self.capacity - self.outbox) | |
self.inbox -= transfer | |
self.outbox += transfer | |
self.flow.append(transfer) | |
def accepts(self): | |
return self.capacity - self.inbox | |
def provides(self): | |
return self.outbox | |
class Splitter(Entity): | |
def __init__(self, a, b, x, y, priority_input=False, priority_output=False): | |
super().__init__() | |
self.in1 = a | |
self.in2 = b | |
self.out1 = x | |
self.out2 = y | |
self.input_order = [a, b] | |
self.output_order = [x, y] | |
self.priority_input = priority_input | |
self.priority_output = priority_output | |
def tick(self): | |
total_out = 0 | |
if self.output_order[0].accepts() > 0: | |
total_out += 1 | |
if self.output_order[1].accepts() > 0: | |
total_out += 1 | |
total_in = 0 | |
if self.input_order[0].provides() > 0: | |
total_in += 1 | |
if self.input_order[1].provides() > 0: | |
total_in += 1 | |
for _ in range(min(total_in, total_out)): | |
n = self.fetch() | |
assert n == 1 | |
self.put(n) | |
if self.priority_input: | |
self.input_order = [self.in1, self.in2] | |
if self.priority_output: | |
self.output_order = [self.out1, self.out2] | |
def fetch(self): | |
if self.input_order[0].provides() > 0: | |
self.input_order[0].outbox -= 1 | |
self.input_order = self.input_order[::-1] | |
return 1 | |
if self.input_order[1].provides() > 0: | |
self.input_order[1].outbox -= 1 | |
return 1 | |
return 0 | |
def put(self, n): | |
if self.output_order[0].accepts() > 0: | |
self.output_order[0].inbox += n | |
self.output_order = self.output_order[::-1] | |
if self.output_order[1].accepts() > 0: | |
self.output_order[1].inbox += n | |
class Source(Entity): | |
def __init__(self, x): | |
super().__init__() | |
self.out = x | |
def tick(self): | |
self.out.inbox += self.out.accepts() | |
class Sink(Entity): | |
def __init__(self, a): | |
super().__init__() | |
self.inp = a | |
def tick(self): | |
self.inp.outbox -= self.inp.provides() | |
def simple_balancer(a, c): | |
b = [Lane(), Lane(), Lane(), Lane()] | |
Splitter(a[0], a[1], b[0], b[1]) | |
Splitter(a[2], a[3], b[2], b[3]) | |
Splitter(b[0], b[2], c[0], c[1]) | |
Splitter(b[1], b[3], c[2], c[3]) | |
return b | |
def advanced_balancer(a, c): | |
a1 = [Lane(), Lane(), Lane(), Lane()] | |
c1 = [Lane(), Lane(), Lane(), Lane()] | |
simple_balancer(a1, c1) | |
a2 = [Lane(), Lane(), Lane(), Lane()] | |
c2 = [Lane(), Lane(), Lane(), Lane()] | |
simple_balancer(a2, c2) | |
merger(a, c2, a1) | |
tee(c1, c, a2) | |
return a2 + c2 | |
def merger(a, b, c): | |
for x, y, z in zip(a, b, c): | |
dummy = Lane() | |
Splitter(x, y, z, dummy, priority_input=True) | |
def tee(a, b, c): | |
for x, y, z in zip(a, b, c): | |
dummy = Lane() | |
Splitter(dummy, x, y, z, priority_output=True) | |
entities = [] | |
a = [Lane(), Lane(), Lane(), Lane()] | |
c = [Lane(), Lane(), Lane(), Lane()] | |
b = advanced_balancer(a, c) | |
#Source(a[0]) | |
#Source(a[1]) | |
Source(a[2]) | |
#Source(a[3]) | |
#Sink(c[0]) | |
Sink(c[1]) | |
Sink(c[2]) | |
#Sink(c[3]) | |
for _ in range(1000): | |
for e in entities: | |
e.tick() | |
for x in a+b+c: | |
print(x.flow[-50:], sum(x.flow[-50:])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment