Skip to content

Instantly share code, notes, and snippets.

@pestilence669
Last active January 11, 2016 23:11
Show Gist options
  • Save pestilence669/3e347b10804ecbf1c2ab to your computer and use it in GitHub Desktop.
Save pestilence669/3e347b10804ecbf1c2ab to your computer and use it in GitHub Desktop.
Small experiment with Ant Colony Optimization for solving Facebook's FaceBull puzzle. This solution was not a winner; nor were my genetic algorithm, branch & bound, simulated annealing or brute force approaches.
#!/usr/bin/env python
# vim: set ts=4 sw=4 noet:
################################################################################
# Author: Paul Chandler <[email protected]>
# Facebook's FaceBull Puzzle
#
# NOTE: If psyco is installed, it's imported & enabled. The improvement is
# unimpressive, but it's something.
#
# /me crosses fingers
################################################################################
from __future__ import with_statement
import copy, os, sys
from random import *
################################################################################
# global tuning parameters
DEBUG = False # output debug messages & fix the random seed
ALPHA = 0.005 # global evaporation rate
RHO = 0.01 # local evaporation rate
BETA = 3.00 # weights the "look ahead"
TAU = 0.9 # initial pheromone
M = 10 # number of ants / population
MAX_ITERATIONS = 325000
STAGNATE_N = 2.25
STAGNATE_R = 2
MAGIC_WINNER_BONUS = 5 # pheromone multiplier, if you're a record leader
UNVISITED_BONUS = 0.10 # bonus applied to vertices not already visited
# in this problem, multiple visitations are allowed
if DEBUG: seed(1)
################################################################################
def pairs(lst):
'Generator to build pairs from permutation encoded sequences'
i = iter(lst)
first = prev = i.next()
for item in i:
yield prev, item
prev = item
def fact(n):
'Factorial / n!'
return 1 if n == 0 else n * fact(n - 1)
def costOfTour(t, costs):
'Returns the cost of a tour / path given its edges'
return sum((costs[i] for i in t))
def dp(msg, *args):
'Debug print. SEE: DEBUG'
if DEBUG:
print(msg % args)
class ACO(object):
'Ant Colony Optimization'
def __init__(self):
dp('ACO (Ant Colony Optimization) starting')
self.colony = []
self.vertices = []
self.vertexCount = 0
self.iterationCount = 0
self.best = None
self.bests = []
self.bestCount = 0
self.machineMap = {}
def loadFacebullFile(self, fileName):
'''Loads Facebook's FaceBull file format
Example Row (Edge ID, Source, Destination, Cost)
M1 C1 C2 2012
WARNING:
This class sets class attributes external to itself.
'''
# temporary structures (used to build others)
entries = []
uniqueVertices = {}
with open(fileName, 'r') as theInput:
for line in theInput:
if not line.isspace(): # skip empty lines e.g. if ends with nl
m, a, b, c = line.split()
# strip the leading characters, then convert to integers
m = int(m[1:])
a = int(a[1:])
b = int(b[1:])
c = int(c)
# append entries to lookup structures
entries.append((m, a, b, c)) # used for travel map gen
uniqueVertices[a] = 0
uniqueVertices[b] = 0
Path.gp[a, b] = TAU # initial pheromone seeding
Path.lp[a, b] = TAU # ...
# strip all higher cost duplicate edges
if (a, b) not in Path.costs or Path.costs[a, b] > c:
Path.costs[a, b] = c
self.machineMap[a, b] = m, c
# assemble our valid routes of travel
if a in Path.connectionsFrom:
Path.connectionsFrom[a].append(b)
else:
Path.connectionsFrom[a] = [b]
# convert temporary structures into something more concrete
uniqueVertices = uniqueVertices.keys()
uniqueVertices.sort()
self.vertices = uniqueVertices
Path.vertices = self.vertices
del uniqueVertices
# cache fixed len() lookups
self.vertexCount = len(self.vertices)
Path.vertexCount = len(Path.vertices)
# fill holes in costs with maxint values (poor man's "infinity")
for i in self.vertices:
for j in self.vertices:
edge = i, j
if edge not in Path.costs:
Path.costs[i, j] = sys.maxint
dp('Loaded "%s" (%d vertices, %d edges)', fileName, self.vertexCount, len(self.machineMap))
def spawnPopulation(self, size = None):
'Spawns all of the ants'
if size is None:
size = self.vertexCount
dp('Spawning %d ants for a %d vertex problem', size, self.vertexCount)
for i in range(size):
self.colony.append(Ant())
def resetAllAnts(self):
'Resets all ants for reuse (e.g. after they all reach destinations)'
for ant in self.colony:
ant.reset()
def run(self):
'Runs one iteration of the simulation'
self.iterationCount += 1
for i, ant in enumerate(self.colony):
ant.path.firstStep(
self.vertices[(i + self.iterationCount) % self.vertexCount]
)
# warning, exit condition based on a set of boolean results
while 1:
done = True # set to false whenever an ant is still moving
for ant in self.colony:
if ant.alive:
# this generation method, especially since we're not always
# making cycles, should be clipped by the best known bound
if self.best:
if ant.path.c > self.best.path.c:
# truncate known bad solutions as they're built
ant.kill()
continue
if ant.move():
done = False
if done: break # the routines above should always terminate
Path.evaporateLocally()
# find winner. ignore "lost" ants, but a random one if all are lost
winner, = sample(self.colony, 1)
for ant in self.colony:
if ant.path.c < winner.path.c:
if ant.valid:
winner = ant
# NOTE: altered so that if everybody is lost, reward randomly
# deposit the winner's pheromones
winner.path.deposit()
# see if it's best, save if it is
if self.best is None or (winner.valid and winner.path.c < self.best.path.c):
for i in range(MAGIC_WINNER_BONUS):
winner.path.deposit()
self.best = copy.deepcopy(winner)
self.bestCount = 1
dp('\tIteration %4d, New current best is: %s', self.iterationCount, self.best)
else:
self.bestCount += 1
def genocide(self):
'Kills the entire population, all of the trails, etc.'
for edge in Path.costs:
Path.gp[edge] = TAU
Path.lp[edge] = TAU
if self.best and self.best.path.isValid():
self.bests.append(self.best)
self.best = None
self.bestCount = 0
def solve(self):
'Runs all iterations to termination'
sn = min(max(100, int(STAGNATE_N * len(Path.costs) * Path.vertexCount)), 3000)
dp('Running simulation, to maximum of %d iterations. Stale at %d', MAX_ITERATIONS, sn)
restarted = 0
for i in range(MAX_ITERATIONS):
# start over again, just for good measure
if self.bestCount > sn:
if restarted < STAGNATE_R:
dp('Restarting due to stagnation')
self.genocide()
restarted += 1
else:
break # probably a good result (!!)
self.run()
self.resetAllAnts()
Path.resetLocalPheromones()
Path.evaporateGlobally()
# add the last best to the top list
if self.best and self.best.valid:
self.bests.append(self.best)
# done with the primary search, now try a local optimization
# it's a last ditch effort to improve upon what MIGHT be close
# and easily fall into optimum
for best in self.bests:
while best.path.opt2(): pass
# find the actual winner
winner = self.bests[0]
for best in self.bests:
if best.path.c < winner.path.c:
winner = best
# done, present output
machines = []
for edge in winner.path.asPairs():
m, c = self.machineMap[edge]
machines.append(m)
machines.sort()
print '%d\n%s' % (winner.path.c, ' '.join([str(m) for m in machines]))
class Ant(object):
'Represents a single ant and its basic behavior'
ANT_COUNT = 0
def __init__(self):
Ant.ANT_COUNT += 1
self.id = Ant.ANT_COUNT
self.path = Path()
self.reset()
def __del__(self):
Ant.ANT_COUNT -= 1
def reset(self):
'Resets the ant to a fresh state'
self.alive = True
self.valid = False
self.path.reset()
def kill(self):
self.alive = False
def move(self):
'''Moves the ant / iterates the search
Returns True if it moved, False if it's stopped or dead. All ants "die"
when they have done what they're here to do.
'''
choices = self.path.nextChoices()
# note, some real cycle detection is in order... heh
if not choices or len(self.path.p) > len(Path.costs) * 2:
if self.path.isValid():
self.valid = True
self.kill()
return False
# perform a weighted sampling of our choices given by the path
freeWill = random()
accum = 0.0
for p, choice in choices:
accum += p
if freeWill < accum:
self.path.goto(choice)
if self.path.isValid():
self.kill()
self.valid = True
return False
else:
return True
# this should never occur, so don't even bother trapping for it
raise RuntimeError('Failed travel sampling')
def __str__(self):
validTxt = 'YES' if self.valid else 'NO '
return 'Ant #%02d, Valid: %s, Cost: %12d, Path: %s' % \
(self.id, validTxt, self.path.c, self.path)
class Path(object):
'Represents the path of traversal / travel'
connectionsFrom = {}
costs = {} # maps edges to costs
vertices = []
vertexCount = 0
gp = {} # global pheromone deposits
lp = {} # local pheromone deposits
def __init__(self):
self.c = 0 # running total (cost)
self.ss = {} # unique sources
self.sd = {} # unique destinations
self.p = [] # node sequence (permutation encoding)
self.reset();
def reset(self):
'Resets and clears the path'
self.ss.clear()
self.sd.clear()
self.c = 0
del self.p[:]
def asPairs(self):
'Returns a sequence of edge tuples'
return pairs(self.p)
def isValid(self):
'Determines if the path achieves any goal'
# all valid paths must have at LEAST one outbound and one inbound edge
return len(self.ss) == self.vertexCount and len(self.sd) == self.vertexCount
def nextChoices(self, pruneVisited = False):
'''List of the next possible travel choices and probabilites
The probability is determined by both distance and pheromone deposits. A
list of tuples (p, choice) is returned.
'''
last = self.p[-1]
move_s = 0.0
moves = []
for choice in self.connectionsFrom[last]:
edge = last, choice
cost = Path.costs[edge]
move_p = Path.lp[edge] * (1.0 / cost) ** BETA
if choice in self.p: # award unique visitations
move_p *= UNVISITED_BONUS
moves.append((move_p, choice))
move_s += move_p # aggregated denominator
# normalize & prepare for weighted sampling
moves = [(p / move_s, choice) for p, choice in moves]
moves.sort()
return moves
def firstStep(self, choice):
'Like goto(), but takes the initial step onto the graph'
self.p.append(choice)
def goto(self, choice):
'Travels to a new vertex, while handling housekeeping'
last = self.p[-1]
edge = last, choice
# mark the usage for route termination
self.ss[last] = 0
self.sd[choice] = 0
# append vertex, increment cost
self.p.append(choice)
self.c += Path.costs[edge]
# update local pheromones (discourage followers)
Path.lp[edge] = \
(1.0 - RHO) * Path.lp[edge] + RHO * Path.gp[edge]
def deposit(self):
'Deposits global pheromones as the inverse of the cost'
for edge in pairs(self.p):
Path.gp[edge] += 1.0 / self.c
def opt2(self):
'Very shoddy attempt to find local minimum'
self.optimized = False
n = len(self.p)
for i in range(n):
for j in range(n):
if i != j:
oldCost = self.c
self.p[i], self.p[j] = self.p[j], self.p[i]
newPairs = list(self.asPairs())
newCost = costOfTour(newPairs, Path.costs)
if newCost < oldCost:
# perhaps we found a better local optimum
ss = {}
sd = {}
for a, b in newPairs:
ss[a] = 0
sd[b] = 0
if len(ss) != Path.vertexCount or len(sd) != Path.vertexCount:
# revert
self.p[i], self.p[j] = self.p[j], self.p[i]
else:
# we did find a better cost
self.c = newCost
self.optimized = True
else:
# revert
self.p[i], self.p[j] = self.p[j], self.p[i]
return self.optimized
def __str__(self):
'String representation'
return ' '.join([str(vertex) for vertex in self.p])
@classmethod
def evaporateGlobally(cls):
'Evaporates global pheromones, rate determined by ALPHA'
for edge in cls.gp:
cls.gp[edge] *= 1.0 - ALPHA
@classmethod
def evaporateLocally(cls):
'Evaporates local pheromones, rate determined by RHO'
for edge in cls.lp:
cls.lp[edge] *= 1.0 - RHO
@classmethod
def resetLocalPheromones(cls):
'Clones the global pheromones locally'
cls.lp = cls.gp.copy()
################################################################################
if __name__ == '__main__':
if len(sys.argv) < 2:
print 'usage: %s inputFile' % os.path.basename(sys.argv[0])
sys.exit(2)
else:
# Import Psyco if available
try:
import psyco
psyco.full()
except ImportError:
pass
colony = ACO()
colony.loadFacebullFile(sys.argv[1])
colony.spawnPopulation(M)
colony.solve()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment