Created
September 11, 2025 17:32
-
-
Save machinaut/23141a0a226899995dba92e37a0f306d to your computer and use it in GitHub Desktop.
icfp 2025
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import random | |
import time | |
from itertools import product | |
from ortools.sat.python import cp_model | |
# use globals to emulate the server behavior | |
current_layout = None | |
queryCount = None | |
problemSizes = {'probatio': 3, 'primus': 6, 'secundus': 12, | |
'tertius': 18, 'quartus': 24, 'quintus': 30} | |
def fully_connected(n, edges): | |
""" Return True if all rooms and doors are reachable """ | |
queue = [(0, d) for d in range(6)] | |
visited = set() | |
while queue: | |
state = queue.pop() | |
if state in visited: | |
continue | |
visited.add(state) | |
room, door = state | |
dest = edges[(room, door)][0] | |
for d in range(6): | |
next_state = (dest, d) | |
if next_state not in visited: | |
queue.append(next_state) | |
return len(visited) == 6 * n # every door in every room | |
def generate_edges(n): | |
""" Generate a symmetric map of connections, including self-loops """ | |
while True: # retry until we get a fully connected map | |
doors = set(product(range(n), range(6))) | |
edges = {} | |
while doors: | |
a, b = random.choices(list(doors), k=2) | |
doors.discard(a) | |
doors.discard(b) | |
edges[a] = b | |
edges[b] = a | |
if fully_connected(n, edges): | |
break | |
return edges | |
def edges_from_layout(layout): | |
""" Get map of (room,door) -> (room,door) from layout object """ | |
unpack = lambda x: (x['room'], x['door']) | |
return {(unpack(conn['from'])): (unpack(conn['to'])) | |
for conn in layout['connections']} | |
def select(problemName): | |
""" Generate a new layout """ | |
global current_layout | |
global queryCount | |
n = problemSizes.get(problemName, problemName) | |
rooms = random.sample([i % 4 for i in range(n)], n) | |
edges = generate_edges(n) | |
# store the layout in the submission format | |
current_layout = { | |
'rooms': rooms, | |
'startingRoom': random.randint(0, n - 1), | |
'connections': [ | |
{'from': {'room': A, 'door': B}, | |
'to': {'room': edges[(A, B)][0], 'door': edges[(A, B)][1]}} | |
for A, B in product(range(n), range(6)) | |
] | |
} | |
queryCount = 0 | |
return {'problemName': problemName} | |
def explore(plans): | |
""" Given a list of plans, return observed labels (results) """ | |
global current_layout | |
global queryCount | |
queryCount += 1 | |
labels = current_layout['rooms'] | |
edges = edges_from_layout(current_layout) | |
results = [] | |
for plan in plans: | |
assert len(plan) <= len(labels) * 18 | |
queryCount += 1 | |
room = current_layout['startingRoom'] | |
result = [labels[room]] | |
for p in [int(c) for c in plan]: # ensure int | |
room = edges[(room, p)][0] | |
result.append(labels[room]) | |
results.append(result) | |
return {'results': results, 'queryCount': queryCount} | |
def guess(guess_layout): | |
""" Submit a layout guess, returns correct or not """ | |
global current_layout | |
# preprocess to get edge maps (room,door) -> (room,door) | |
current_edges = edges_from_layout(current_layout) | |
guess_edges = edges_from_layout(guess_layout) | |
# return True if the observation matches in both layouts | |
def observation_match(current_room, guess_room, door): | |
current_dest = current_edges[(current_room, door)][0] | |
current_obs = current_layout['rooms'][current_dest] | |
guess_dest = guess_edges[(guess_room, door)][0] | |
guess_obs = guess_layout['rooms'][guess_dest] | |
return current_obs == guess_obs | |
# breadth first search both layouts in parallel, from starting room | |
current_room = current_layout['startingRoom'] | |
guess_room = guess_layout['startingRoom'] | |
# states are (current_room, guess_room, door_to_try) | |
queue = [(current_room, guess_room, d) for d in range(6)] | |
visited = set() | |
while queue: | |
state = queue.pop() | |
if state in visited: | |
continue | |
current_room, guess_room, door = state | |
# try the door | |
if not observation_match(current_room, guess_room, door): | |
print(f"mismatch at room {guess_room} door {door}") | |
return {'correct': False} | |
visited.add(state) | |
# add next rooms to the queue | |
next_current = current_edges[(current_room, door)][0] | |
next_guess = guess_edges[(guess_room, door)][0] | |
for d in range(6): | |
next_state = (next_current, next_guess, d) | |
if next_state not in visited: | |
queue.append(next_state) | |
# verify that we visited every room and door | |
current_visited = set((r, d) for r, _, d in visited) | |
guess_visited = set((r, d) for _, r, d in visited) | |
if current_visited != guess_visited: | |
print("not all rooms/doors connected") | |
return {'correct': False} | |
return {'correct': True} | |
def solve(n, plans, results): | |
model = cp_model.CpModel() | |
# variables | |
start = model.NewIntVar(0, n - 1, 'start') | |
label = [model.NewIntVar(0, 3, f'l{A}') for A in range(n)] | |
rooms = [model.NewIntVar(0, n - 1, f'r{AB}') for AB in range(6 * n)] | |
doors = [model.NewIntVar(0, 5, f'd{AB}') for AB in range(6 * n)] | |
index = [model.NewIntVar(0, 6 * n - 1, f'x{AB}') for AB in range(6 * n)] | |
# index | |
for AB in range(6 * n): | |
model.Add(index[AB] == rooms[AB] * 6 + doors[AB]) | |
model.AddAllDifferent(index) | |
# symmetry | |
for AB in range(6 * n): | |
model.AddElement(index[AB], rooms, AB // 6) | |
model.AddElement(index[AB], doors, AB % 6) | |
# journeys | |
for j, (plan, result) in enumerate(zip(plans, results)): | |
room = model.NewIntVar(0, n - 1, f'j{j}s') | |
model.Add(room == start) # first room is start | |
model.AddElement(room, label, result[0]) # first observed label | |
for i, (door, obs) in enumerate(zip(plan, result[1:])): | |
dest = model.NewIntVar(0, n - 1, f'j{j}{i}') | |
temp = model.NewIntVar(0, 6 * n - 1, f'i{j}{i}') | |
model.Add(temp == room * 6 + door) # required by ortools | |
model.AddElement(temp, rooms, dest) # connection | |
model.AddElement(dest, label, obs) # observation | |
room = dest # step into the next room | |
# solve | |
solver = cp_model.CpSolver() | |
status = solver.Solve(model) | |
if status not in (cp_model.OPTIMAL, cp_model.FEASIBLE): | |
raise Exception('No solution found.') | |
# layout | |
return { | |
"startingRoom": solver.Value(start), | |
"rooms": [solver.Value(label[i]) for i in range(n)], | |
"connections": | |
[{'from': {'room': AB // 6, 'door': AB % 6}, | |
'to': { | |
'room': solver.Value(rooms[AB]), | |
'door': solver.Value(doors[AB])}} | |
for AB in range(6 * n)] | |
} | |
def main(problemName, K=1, L=None, seed=None): | |
""" problemName is string or int of number of rooms """ | |
random.seed(seed) | |
start = time.time() | |
# setup the problem by name or by number of rooms | |
n = problemSizes.get(problemName, problemName) | |
print(select(problemName)) | |
# generate random plans of given length | |
L = L or (18 * n) # default to max plan length | |
plans = [random.choices(range(6), k=L) for _ in range(K)] | |
print("plans:", plans) | |
# explore and get results | |
response = explore(plans) | |
print("response:", response) | |
# solve the layout from plans and results | |
layout = solve(n, plans, response['results']) | |
print(f"Time: {time.time() - start:.1f} sec, n={n}, K={K}, L={L}") | |
# submit the guess | |
return guess(layout) | |
main('primus') # default to 1 path of max length |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment