Last active
January 11, 2016 23:11
-
-
Save pestilence669/3e347b10804ecbf1c2ab to your computer and use it in GitHub Desktop.
Small experiment with Ant Colony Optimization for solving Facebook's FaceBull puzzle. This solution was not a winner; nor were my genetic algorithm, branch & bound, simulated annealing or brute force approaches.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# vim: set ts=4 sw=4 noet: | |
################################################################################ | |
# Author: Paul Chandler <[email protected]> | |
# Facebook's FaceBull Puzzle | |
# | |
# NOTE: If psyco is installed, it's imported & enabled. The improvement is | |
# unimpressive, but it's something. | |
# | |
# /me crosses fingers | |
################################################################################ | |
from __future__ import with_statement | |
import copy, os, sys | |
from random import * | |
################################################################################ | |
# global tuning parameters | |
DEBUG = False # output debug messages & fix the random seed | |
ALPHA = 0.005 # global evaporation rate | |
RHO = 0.01 # local evaporation rate | |
BETA = 3.00 # weights the "look ahead" | |
TAU = 0.9 # initial pheromone | |
M = 10 # number of ants / population | |
MAX_ITERATIONS = 325000 | |
STAGNATE_N = 2.25 | |
STAGNATE_R = 2 | |
MAGIC_WINNER_BONUS = 5 # pheromone multiplier, if you're a record leader | |
UNVISITED_BONUS = 0.10 # bonus applied to vertices not already visited | |
# in this problem, multiple visitations are allowed | |
if DEBUG: seed(1) | |
################################################################################ | |
def pairs(lst): | |
'Generator to build pairs from permutation encoded sequences' | |
i = iter(lst) | |
first = prev = i.next() | |
for item in i: | |
yield prev, item | |
prev = item | |
def fact(n): | |
'Factorial / n!' | |
return 1 if n == 0 else n * fact(n - 1) | |
def costOfTour(t, costs): | |
'Returns the cost of a tour / path given its edges' | |
return sum((costs[i] for i in t)) | |
def dp(msg, *args): | |
'Debug print. SEE: DEBUG' | |
if DEBUG: | |
print(msg % args) | |
class ACO(object): | |
'Ant Colony Optimization' | |
def __init__(self): | |
dp('ACO (Ant Colony Optimization) starting') | |
self.colony = [] | |
self.vertices = [] | |
self.vertexCount = 0 | |
self.iterationCount = 0 | |
self.best = None | |
self.bests = [] | |
self.bestCount = 0 | |
self.machineMap = {} | |
def loadFacebullFile(self, fileName): | |
'''Loads Facebook's FaceBull file format | |
Example Row (Edge ID, Source, Destination, Cost) | |
M1 C1 C2 2012 | |
WARNING: | |
This class sets class attributes external to itself. | |
''' | |
# temporary structures (used to build others) | |
entries = [] | |
uniqueVertices = {} | |
with open(fileName, 'r') as theInput: | |
for line in theInput: | |
if not line.isspace(): # skip empty lines e.g. if ends with nl | |
m, a, b, c = line.split() | |
# strip the leading characters, then convert to integers | |
m = int(m[1:]) | |
a = int(a[1:]) | |
b = int(b[1:]) | |
c = int(c) | |
# append entries to lookup structures | |
entries.append((m, a, b, c)) # used for travel map gen | |
uniqueVertices[a] = 0 | |
uniqueVertices[b] = 0 | |
Path.gp[a, b] = TAU # initial pheromone seeding | |
Path.lp[a, b] = TAU # ... | |
# strip all higher cost duplicate edges | |
if (a, b) not in Path.costs or Path.costs[a, b] > c: | |
Path.costs[a, b] = c | |
self.machineMap[a, b] = m, c | |
# assemble our valid routes of travel | |
if a in Path.connectionsFrom: | |
Path.connectionsFrom[a].append(b) | |
else: | |
Path.connectionsFrom[a] = [b] | |
# convert temporary structures into something more concrete | |
uniqueVertices = uniqueVertices.keys() | |
uniqueVertices.sort() | |
self.vertices = uniqueVertices | |
Path.vertices = self.vertices | |
del uniqueVertices | |
# cache fixed len() lookups | |
self.vertexCount = len(self.vertices) | |
Path.vertexCount = len(Path.vertices) | |
# fill holes in costs with maxint values (poor man's "infinity") | |
for i in self.vertices: | |
for j in self.vertices: | |
edge = i, j | |
if edge not in Path.costs: | |
Path.costs[i, j] = sys.maxint | |
dp('Loaded "%s" (%d vertices, %d edges)', fileName, self.vertexCount, len(self.machineMap)) | |
def spawnPopulation(self, size = None): | |
'Spawns all of the ants' | |
if size is None: | |
size = self.vertexCount | |
dp('Spawning %d ants for a %d vertex problem', size, self.vertexCount) | |
for i in range(size): | |
self.colony.append(Ant()) | |
def resetAllAnts(self): | |
'Resets all ants for reuse (e.g. after they all reach destinations)' | |
for ant in self.colony: | |
ant.reset() | |
def run(self): | |
'Runs one iteration of the simulation' | |
self.iterationCount += 1 | |
for i, ant in enumerate(self.colony): | |
ant.path.firstStep( | |
self.vertices[(i + self.iterationCount) % self.vertexCount] | |
) | |
# warning, exit condition based on a set of boolean results | |
while 1: | |
done = True # set to false whenever an ant is still moving | |
for ant in self.colony: | |
if ant.alive: | |
# this generation method, especially since we're not always | |
# making cycles, should be clipped by the best known bound | |
if self.best: | |
if ant.path.c > self.best.path.c: | |
# truncate known bad solutions as they're built | |
ant.kill() | |
continue | |
if ant.move(): | |
done = False | |
if done: break # the routines above should always terminate | |
Path.evaporateLocally() | |
# find winner. ignore "lost" ants, but a random one if all are lost | |
winner, = sample(self.colony, 1) | |
for ant in self.colony: | |
if ant.path.c < winner.path.c: | |
if ant.valid: | |
winner = ant | |
# NOTE: altered so that if everybody is lost, reward randomly | |
# deposit the winner's pheromones | |
winner.path.deposit() | |
# see if it's best, save if it is | |
if self.best is None or (winner.valid and winner.path.c < self.best.path.c): | |
for i in range(MAGIC_WINNER_BONUS): | |
winner.path.deposit() | |
self.best = copy.deepcopy(winner) | |
self.bestCount = 1 | |
dp('\tIteration %4d, New current best is: %s', self.iterationCount, self.best) | |
else: | |
self.bestCount += 1 | |
def genocide(self): | |
'Kills the entire population, all of the trails, etc.' | |
for edge in Path.costs: | |
Path.gp[edge] = TAU | |
Path.lp[edge] = TAU | |
if self.best and self.best.path.isValid(): | |
self.bests.append(self.best) | |
self.best = None | |
self.bestCount = 0 | |
def solve(self): | |
'Runs all iterations to termination' | |
sn = min(max(100, int(STAGNATE_N * len(Path.costs) * Path.vertexCount)), 3000) | |
dp('Running simulation, to maximum of %d iterations. Stale at %d', MAX_ITERATIONS, sn) | |
restarted = 0 | |
for i in range(MAX_ITERATIONS): | |
# start over again, just for good measure | |
if self.bestCount > sn: | |
if restarted < STAGNATE_R: | |
dp('Restarting due to stagnation') | |
self.genocide() | |
restarted += 1 | |
else: | |
break # probably a good result (!!) | |
self.run() | |
self.resetAllAnts() | |
Path.resetLocalPheromones() | |
Path.evaporateGlobally() | |
# add the last best to the top list | |
if self.best and self.best.valid: | |
self.bests.append(self.best) | |
# done with the primary search, now try a local optimization | |
# it's a last ditch effort to improve upon what MIGHT be close | |
# and easily fall into optimum | |
for best in self.bests: | |
while best.path.opt2(): pass | |
# find the actual winner | |
winner = self.bests[0] | |
for best in self.bests: | |
if best.path.c < winner.path.c: | |
winner = best | |
# done, present output | |
machines = [] | |
for edge in winner.path.asPairs(): | |
m, c = self.machineMap[edge] | |
machines.append(m) | |
machines.sort() | |
print '%d\n%s' % (winner.path.c, ' '.join([str(m) for m in machines])) | |
class Ant(object): | |
'Represents a single ant and its basic behavior' | |
ANT_COUNT = 0 | |
def __init__(self): | |
Ant.ANT_COUNT += 1 | |
self.id = Ant.ANT_COUNT | |
self.path = Path() | |
self.reset() | |
def __del__(self): | |
Ant.ANT_COUNT -= 1 | |
def reset(self): | |
'Resets the ant to a fresh state' | |
self.alive = True | |
self.valid = False | |
self.path.reset() | |
def kill(self): | |
self.alive = False | |
def move(self): | |
'''Moves the ant / iterates the search | |
Returns True if it moved, False if it's stopped or dead. All ants "die" | |
when they have done what they're here to do. | |
''' | |
choices = self.path.nextChoices() | |
# note, some real cycle detection is in order... heh | |
if not choices or len(self.path.p) > len(Path.costs) * 2: | |
if self.path.isValid(): | |
self.valid = True | |
self.kill() | |
return False | |
# perform a weighted sampling of our choices given by the path | |
freeWill = random() | |
accum = 0.0 | |
for p, choice in choices: | |
accum += p | |
if freeWill < accum: | |
self.path.goto(choice) | |
if self.path.isValid(): | |
self.kill() | |
self.valid = True | |
return False | |
else: | |
return True | |
# this should never occur, so don't even bother trapping for it | |
raise RuntimeError('Failed travel sampling') | |
def __str__(self): | |
validTxt = 'YES' if self.valid else 'NO ' | |
return 'Ant #%02d, Valid: %s, Cost: %12d, Path: %s' % \ | |
(self.id, validTxt, self.path.c, self.path) | |
class Path(object): | |
'Represents the path of traversal / travel' | |
connectionsFrom = {} | |
costs = {} # maps edges to costs | |
vertices = [] | |
vertexCount = 0 | |
gp = {} # global pheromone deposits | |
lp = {} # local pheromone deposits | |
def __init__(self): | |
self.c = 0 # running total (cost) | |
self.ss = {} # unique sources | |
self.sd = {} # unique destinations | |
self.p = [] # node sequence (permutation encoding) | |
self.reset(); | |
def reset(self): | |
'Resets and clears the path' | |
self.ss.clear() | |
self.sd.clear() | |
self.c = 0 | |
del self.p[:] | |
def asPairs(self): | |
'Returns a sequence of edge tuples' | |
return pairs(self.p) | |
def isValid(self): | |
'Determines if the path achieves any goal' | |
# all valid paths must have at LEAST one outbound and one inbound edge | |
return len(self.ss) == self.vertexCount and len(self.sd) == self.vertexCount | |
def nextChoices(self, pruneVisited = False): | |
'''List of the next possible travel choices and probabilites | |
The probability is determined by both distance and pheromone deposits. A | |
list of tuples (p, choice) is returned. | |
''' | |
last = self.p[-1] | |
move_s = 0.0 | |
moves = [] | |
for choice in self.connectionsFrom[last]: | |
edge = last, choice | |
cost = Path.costs[edge] | |
move_p = Path.lp[edge] * (1.0 / cost) ** BETA | |
if choice in self.p: # award unique visitations | |
move_p *= UNVISITED_BONUS | |
moves.append((move_p, choice)) | |
move_s += move_p # aggregated denominator | |
# normalize & prepare for weighted sampling | |
moves = [(p / move_s, choice) for p, choice in moves] | |
moves.sort() | |
return moves | |
def firstStep(self, choice): | |
'Like goto(), but takes the initial step onto the graph' | |
self.p.append(choice) | |
def goto(self, choice): | |
'Travels to a new vertex, while handling housekeeping' | |
last = self.p[-1] | |
edge = last, choice | |
# mark the usage for route termination | |
self.ss[last] = 0 | |
self.sd[choice] = 0 | |
# append vertex, increment cost | |
self.p.append(choice) | |
self.c += Path.costs[edge] | |
# update local pheromones (discourage followers) | |
Path.lp[edge] = \ | |
(1.0 - RHO) * Path.lp[edge] + RHO * Path.gp[edge] | |
def deposit(self): | |
'Deposits global pheromones as the inverse of the cost' | |
for edge in pairs(self.p): | |
Path.gp[edge] += 1.0 / self.c | |
def opt2(self): | |
'Very shoddy attempt to find local minimum' | |
self.optimized = False | |
n = len(self.p) | |
for i in range(n): | |
for j in range(n): | |
if i != j: | |
oldCost = self.c | |
self.p[i], self.p[j] = self.p[j], self.p[i] | |
newPairs = list(self.asPairs()) | |
newCost = costOfTour(newPairs, Path.costs) | |
if newCost < oldCost: | |
# perhaps we found a better local optimum | |
ss = {} | |
sd = {} | |
for a, b in newPairs: | |
ss[a] = 0 | |
sd[b] = 0 | |
if len(ss) != Path.vertexCount or len(sd) != Path.vertexCount: | |
# revert | |
self.p[i], self.p[j] = self.p[j], self.p[i] | |
else: | |
# we did find a better cost | |
self.c = newCost | |
self.optimized = True | |
else: | |
# revert | |
self.p[i], self.p[j] = self.p[j], self.p[i] | |
return self.optimized | |
def __str__(self): | |
'String representation' | |
return ' '.join([str(vertex) for vertex in self.p]) | |
@classmethod | |
def evaporateGlobally(cls): | |
'Evaporates global pheromones, rate determined by ALPHA' | |
for edge in cls.gp: | |
cls.gp[edge] *= 1.0 - ALPHA | |
@classmethod | |
def evaporateLocally(cls): | |
'Evaporates local pheromones, rate determined by RHO' | |
for edge in cls.lp: | |
cls.lp[edge] *= 1.0 - RHO | |
@classmethod | |
def resetLocalPheromones(cls): | |
'Clones the global pheromones locally' | |
cls.lp = cls.gp.copy() | |
################################################################################ | |
if __name__ == '__main__': | |
if len(sys.argv) < 2: | |
print 'usage: %s inputFile' % os.path.basename(sys.argv[0]) | |
sys.exit(2) | |
else: | |
# Import Psyco if available | |
try: | |
import psyco | |
psyco.full() | |
except ImportError: | |
pass | |
colony = ACO() | |
colony.loadFacebullFile(sys.argv[1]) | |
colony.spawnPopulation(M) | |
colony.solve() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment