Created
August 18, 2010 19:58
-
-
Save kennytm/535970 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2.7 | |
# | |
# parrondo_optimizer.py ... Parrondo's paradox which optimizes the sequence. | |
# Copyright (C) 2010 KennyTM~ <[email protected]> | |
# | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program. If not, see <http://www.gnu.org/licenses/>. | |
from __future__ import division, print_function, unicode_literals | |
from collections import defaultdict | |
from math import fsum | |
# the global configuration variables. | |
aCos = [0.5**0.5] * 4 | |
aSin = [0.5**0.5 * 1j] * 4 | |
bCos = aCos[:] | |
bSin = aSin[:] | |
coinsCount = 3 | |
coinsMask = 7 | |
seq1 = [] | |
seq2 = [] | |
length = 0 | |
save = None | |
optimize = 0 | |
optimize_method = 'hill' | |
write_mean = False | |
game_type = 'qhd' | |
observeInterval = 0 | |
def computeWeight(probs): | |
# compute total weight of a single sequence. | |
return fsum(w for _, w in probs) | |
def computeMean(probs): | |
# compute unnormalized mean of a single sequence. | |
from itertools import starmap | |
from operator import mul | |
return fsum(starmap(mul, probs)) | |
def computeVar(probs, mean): | |
# compute unnormalized variance of a single sequence. | |
return fsum( (x - mean)**2 * w for x, w in probs ) | |
def writeSeq(seq): | |
from itertools import imap | |
write('#', ''.join(imap("01".__getitem__, seq))) | |
class Parrondo(object): | |
def reset(self): | |
assert False | |
def __init__(self): | |
self.reset() | |
def flip(self, coss, sins): | |
assert False | |
def translate(self): | |
assert False | |
def rotate(self): | |
assert False | |
def observe(self): | |
pass | |
def step(self, whichGame): | |
(coss, sins) = ((aCos, aSin), (bCos, bSin))[whichGame] | |
self.flip(coss, sins) | |
self.translate() | |
self.rotate() | |
def writeMean(self, i, game): | |
probs = self.prob() | |
weight = computeWeight(probs) | |
mean = computeMean(probs)/weight | |
var = computeVar(probs, mean)/weight | |
write(i, "\t", game, "\t", mean, "\t", var) | |
return (mean, var) | |
def performOneSeq(self, seq): | |
# evaluate the sequence. | |
self.reset() | |
writeSeq(seq) | |
if write_mean: | |
self.writeMean(0, '-') | |
for i, whichGame in enumerate(seq): | |
self.step(whichGame) | |
if observeInterval and i % observeInterval == observeInterval-1: | |
self.observe() | |
if write_mean: | |
self.writeMean(i+1, "AB"[whichGame]) | |
(mean, var) = self.writeMean(length, "Last") | |
return mean | |
def performOptimizeMean(self, seq): | |
curMax = self.performOneSeq(seq) | |
theSeq = None | |
for j in xrange(optimize): | |
for i, n in enumerate(seq): | |
newSeq = seq[:] | |
newSeq[i] = 1-n | |
value = self.performOneSeq(newSeq) | |
if value > curMax: | |
curMax = value | |
theSeq = newSeq | |
if theSeq: | |
write("# ({0}/{1}) Found new maximum = {2}".format(j, optimize, curMax)) | |
writeSeq(theSeq) | |
seq = theSeq | |
theSeq = None | |
else: | |
break | |
write("# Found local maximum = {0}".format(curMax)) | |
writeSeq(seq) | |
def performBrute(self, seqlen): | |
from itertools import product | |
curMax = float("-inf") | |
theSeq = None | |
for seq0 in product([0, 1], repeat=seqlen-5): | |
seq = (0, 0, 1, 1, 1) + seq0 | |
value = self.performOneSeq(seq) | |
if value > curMax: | |
curMax = value | |
theSeq = seq | |
write("# Found maximum = {0}".format(curMax)) | |
writeSeq(theSeq) | |
################################################################################ | |
# | |
# OK, so the ket looks like this. | |
# the entries are pairs of integers (indices) and complex numbers (amplitudes). | |
# each integer can be split into two parts by modulo 2^h. | |
# e.g. 12773 = (1596)*8 + (5) | |
# the quotient is the walker's position and the remainder is the coins. | |
# with 0 = --- | |
# 1 = --+ etc | |
# the most current coin is at the least significant digit. | |
# so the (5) above = +-+ means | |
# the current coin is |+>, and previous coin is |->, | |
# and the oldest coin is |+> | |
# | |
def cfsum(lst): | |
real = fsum(c.real for c in lst) | |
imag = fsum(c.imag for c in lst) | |
return complex(real, imag) | |
class QHDParrondoFast(Parrondo): | |
def reset(self): | |
sqrt12 = complex(0.5**0.5) | |
self.ket = [[0, sqrt12], [coinsMask, -sqrt12]] | |
def flip(self, coss, sins): | |
# Apply the coin operator. | |
resDict = defaultdict(list) | |
for x, a in self.ket: | |
coinHistory = (x & coinsMask) >> 1 | |
resDict[x].append(a * coss[coinHistory]) | |
resDict[x^1].append(a * sins[coinHistory]) | |
self.ket = [[x, cfsum(a)] for x, a in resDict.iteritems()] | |
def translate(self): | |
coinsModulo = coinsMask + 1 | |
for i, (x, _) in enumerate(self.ket): | |
self.ket[i][0] = x + (coinsModulo if x & 1 else -coinsModulo) | |
def rotate(self): | |
# recycle the coins. | |
# rotation won't cause superposition. | |
for i, (x, _) in enumerate(self.ket): | |
self.ket[i][0] = x & ~coinsMask | (x << 1) & coinsMask | (x >> (coinsCount-1)) & 1 | |
def prob(self): | |
# convert ket to probability. | |
probs = defaultdict(list) | |
for x, a in self.ket: | |
probs[x >> coinsCount].append(abs(a)**2) | |
return [(x, fsum(a)) for x, a in probs.iteritems()] | |
def observe(self): | |
assert False, "Error: Cannot perform observation with kets. Please use '-g qhddm' instead." | |
# This uses | |
class QHDParrondo(Parrondo): | |
def reset(self): | |
self.rho = [ | |
[(0, 0), 0.5], | |
[(coinsMask, 0), -0.5], | |
[(0, coinsMask), -0.5], | |
[(coinsMask, coinsMask), 0.5] | |
] | |
def flip(self, coss, sins): | |
# Apply the coin operator. | |
resDict = defaultdict(list) | |
for (x, y), a in self.rho: | |
coinHistoryX = (x & coinsMask) >> 1 | |
coinHistoryY = (y & coinsMask) >> 1 | |
# C|X> = cos1|X> + isin1|~X> | |
# C|Y> = cos2|Y> + isin2|~Y> | |
# i.e. <Y|C* = <Y|cos2 - <~Y|isin2 | |
# thus, | |
# C|X><Y|C* = cos1cos2|X><Y| + isin1cos2|~X><Y| - icos1sin2|X><~Y| - i^2 sin1sin2|~X><~Y| | |
cos1 = coss[coinHistoryX] | |
cos2 = coss[coinHistoryY] | |
sin1 = sins[coinHistoryX] | |
sin2 = sins[coinHistoryY].conjugate() | |
resDict[x,y].append(a * cos1 * cos2) | |
resDict[x^1,y].append(a * sin1 * cos2) | |
resDict[x,y^1].append(a * cos1 * sin2) | |
resDict[x^1,y^1].append(a * sin1 * sin2) | |
self.rho = [[p, cfsum(a)] for p, a in resDict.iteritems()] | |
def translate(self): | |
coinsModulo = coinsMask + 1 | |
for i, ((x, y), _) in enumerate(self.rho): | |
x += coinsModulo if x & 1 else -coinsModulo | |
y += coinsModulo if y & 1 else -coinsModulo | |
self.rho[i][0] = (x, y) | |
def rotate(self): | |
# recycle the coins. | |
# rotation won't cause superposition. | |
for i, ((x, y), _) in enumerate(self.rho): | |
x = x & ~coinsMask | (x << 1) & coinsMask | (x >> (coinsCount-1)) & 1 | |
y = y & ~coinsMask | (y << 1) & coinsMask | (y >> (coinsCount-1)) & 1 | |
self.rho[i][0] = (x, y) | |
def prob(self): | |
# convert ket to probability. | |
probs = defaultdict(list) | |
for (x, y), a in self.rho: | |
if x == y: | |
probs[x >> coinsCount].append(abs(a)) | |
return [(x, fsum(a)) for x, a in probs.iteritems()] | |
def observe(self): | |
# block-diagonalize the density matrix. | |
self.rho = [[(x, y), a] for (x, y), a in self.rho if (x >> coinsCount) == (y >> coinsCount)] | |
################################################################################ | |
# | |
# So, this is classical history-dependent walk. | |
# | |
class CHDParrondo(Parrondo): | |
def reset(self): | |
self.probs = [[0, 0.5], [coinsMask, 0.5]] | |
def flip(self, coss, sins): | |
resDict = defaultdict(list) | |
coss2 = [abs(x)**2 for x in coss] | |
sins2 = [abs(x)**2 for x in sins] | |
for x, a in self.probs: | |
coinHistory = (x & coinsMask) >> 1 | |
resDict[x].append(a * coss2[coinHistory]**2) | |
resDict[x^1].append(a * sins2[coinHistory]**2) | |
self.probs = [[x, fsum(a)] for x, a in resDict.iteritems()] | |
def translate(self): | |
coinsModulo = coinsMask + 1 | |
for i, (x, _) in enumerate(self.probs): | |
self.probs[i][0] = x + (coinsModulo if x & 1 else -coinsModulo) | |
def rotate(self): | |
# recycle the coins. | |
# rotation won't cause superposition. | |
for i, (x, _) in enumerate(self.probs): | |
self.probs[i][0] = x & ~coinsMask | (x << 1) & coinsMask | (x >> (coinsCount-1)) & 1 | |
def prob(self): | |
probs = defaultdict(list) | |
for x, a in self.probs: | |
probs[x >> coinsCount].append(a) | |
return [(x, fsum(a)) for x, a in probs.iteritems()] | |
################################################################################ | |
def retrieve(): | |
'Retrieve result from command line and parse it' | |
from argparse import ArgumentParser, FileType, RawTextHelpFormatter | |
from sys import stdin | |
parser = ArgumentParser(description='History-Dependent Quantum Parrondo Analyzer', formatter_class=RawTextHelpFormatter) | |
# i/o. | |
parser.add_argument('--seq1', '-1', metavar='FILE', type=FileType('r'), default=stdin, help= | |
"File name of first game sequence. Read from stdin if omitted.\n" | |
"The file should contain line-separated integers of 0 or 1 \n" | |
"indicating which game to play in sequence. 0 means play game A, \n" | |
"and 1 means play game B.") | |
parser.add_argument('--seq2', '-2', metavar='FILE', type=FileType('r'), help='File name of second game sequence.') | |
parser.add_argument('--save', '-o', metavar='FILE', type=FileType('w'), help='File name to fork the output into.') | |
# game parameters. | |
parser.add_argument('-g', choices=['qhd', 'qhddm', 'chd', 'qpd', 'cpd'], default='qhd', help= | |
"Game type, where \n" | |
" qhd = Quantum history-dependent, \n" | |
" qhddm = Quantum history-dependent with density matrix, \n" | |
" chd = Classical history-dependent, \n" | |
" qpd = Quantum position-dependent, \n" | |
" cpd = Classical position-dependent.") | |
parser.add_argument('-a', metavar='NUM', default=45, type=float, help='Angle of game A, in degrees') | |
parser.add_argument('-c', metavar='COUNT', default=3, type=int, help='Number of coins to use.') | |
parser.add_argument('-b', nargs='+', default=[42], type=float, help= | |
"Angle of game B with given history.\n" | |
"This argument should be passed like '-b 42 45 45 ...', which denotes \n" | |
"the angle at history |-->, |-+>, |+-> etc. If a value is missing, it \n" | |
"will be padded by 45." | |
) | |
parser.add_argument('-m', default=0, type=int, help='Intervals to collapse the wavefunction.') | |
# advanced option. | |
parser.add_argument('-l', metavar='LENGTH', default=133, type=int, help='Length of the game. If longer than the given sequence, it will be repeated.') | |
parser.add_argument('-v', action='store_true', help='Record mean and variance vs. time.') | |
parser.add_argument('-O', metavar='STEPS', default=0, type=int, help='How many steps to optimize.') | |
parser.add_argument('--om', choices=['hill', 'brute'], default='hill', help='How to optimize.') | |
analyze(parser.parse_args()) | |
def analyze(result): | |
'Analyze the result from command line.' | |
from math import radians, cos, sin | |
from itertools import cycle, islice | |
global coinsMask, aCos, aSin, bCos, bSin, seq1, seq2, length, save, optimize, write_mean, game_type, optimize_method, observeInterval | |
game_type = result.g | |
# convert angle into sin & cos form. | |
coinsCount = result.c | |
coinsModulo = 1 << coinsCount | |
coinsMask = coinsModulo - 1 | |
coinsModulo //= 2 | |
angle = radians(result.a) | |
bAngles = map(radians, result.b + [45] * (coinsModulo - len(result.b))) | |
aCos = [cos(angle)] * coinsModulo | |
aSin = [sin(angle)*1j] * coinsModulo | |
bCos = map(cos, bAngles) | |
bSin = [sin(q)*1j for q in bAngles] | |
# read game sequence. | |
seq1_list = map(int, result.seq1) | |
seq2_list = [False] | |
if result.seq2: | |
seq2_list = map(int, result.seq2) | |
length = result.l or max(len(seq1_list), len(seq2_list)) | |
seq1 = list(islice(cycle(seq1_list), length)) | |
seq2 = list(islice(cycle(seq2_list), length)) | |
save = result.save | |
optimize = result.O | |
optimize_method = result.om | |
write_mean = result.v | |
observeInterval = result.m | |
def write(*args, **kwargs): | |
'Convenient print statement that forks to our destinated file.' | |
print(*args, **kwargs) | |
if save: | |
print(file=save, *args, **kwargs) | |
################################################################################ | |
retrieve() | |
parr = { | |
'qhd': QHDParrondoFast, | |
'chd': CHDParrondo, | |
'qhddm': QHDParrondo, | |
}[game_type]() | |
if optimize_method == 'brute': | |
parr.performBrute(len(seq1)) | |
else: | |
parr.performOptimizeMean(seq1) | |
print('\a') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment