Last active
December 25, 2022 15:32
-
-
Save mateon1/b63eabc371ac35e2a14a9c5ce37413bc to your computer and use it in GitHub Desktop.
Closed Position Set (CPS) TM decider
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from bbconv import * | |
from collections import deque | |
# Machine, N - segment length, s - initial state, p in [0..N) - initial position, t - tape state encoded as an integer | |
# returns (p, t, s) - final position, tape state, head state | |
def segrun(M, N, s, p, t, limit=-1): | |
seen = set() | |
while 0 <= p < N: | |
#print(": " + bin(t)[2:].zfill(N)[::-1]) | |
#print("- " + " "*p + "^") | |
# Detect loops | |
key = ((t << 4 | s) << 8) | p | |
if key in seen: break | |
seen.add(key) | |
limit -= 1 | |
if limit == 0: break | |
# Perform forward transition | |
w, d, s = M[s][(t>>p)&1] | |
if s < 0: break | |
if w: | |
t |= 1<<p | |
else: | |
t &= ~(1 << p) | |
if d: | |
p += 1 | |
else: | |
p += -1 | |
return (p, t, s), limit | |
# M - machine, N - atom size, T - segment size (always 3*N in savask) | |
# returns False if the machine runs forever, or None if it can't decide | |
def decide(M, N, T, P=None, limit=999999999, loud=False, savaskcert=False): | |
if P is None: P = N | |
assert N > 0 # N=1 is useless, but valid | |
assert P <= N # Larger prefixes are theoretically possible, but current implementation is not sound with them | |
assert T >= N + P | |
leftg = {0: {0}} | |
rightg = {0: {0}} | |
queue = deque() | |
queue.append((T-N, 0, 0)) # (pos, tape, state) tuple | |
#queue = set() | |
#queue.add((T-N, 0, 0)) # (pos, tape, state) tuple | |
SEG_MASK = (1<<T)-1 | |
ATOM_MASK = (1<<N)-1 | |
PFX_MASK = (1<<P)-1 | |
finleft = {} | |
finright = {} | |
loops = set() | |
atomcopiesl = {} | |
atomcopiesr = {} | |
seen = set() # avoid duplicating work | |
while queue: | |
p, t, s = queue.popleft() | |
if (p, t, s) in seen: continue | |
seen.add((p, t, s)) | |
if loud >= 3: print("Unqueued: ", prettytape(t, T, p, s)) | |
#assert N-1 <= p < T-N+1 | |
res, limit = segrun(M, T, s, p, t, limit) | |
if limit <= 0: return None, 0 | |
pn, tn, sn = res | |
if loud >= 3: print("Sim result: ", prettytape(tn, T, pn, sn) + (" *HALT*" if sn < 0 else " *LOOP*" if 0 <= pn < T else "")) | |
if sn < 0: | |
if loud >= 1: | |
print("Found halt in position set!") | |
print(":UNDECIDED:") | |
#print((N, N, T), limit, "; s ", len(finleft), len(finright), "; g ", sum(len(v) for v in leftg.values()), sum(len(v) for v in rightg.values())) | |
return None, limit | |
if 0 <= pn < T: | |
loops.add((p, t, s, pn, tn, sn)) | |
continue | |
elif pn < 0: | |
origpfx = t & PFX_MASK | |
finleft.setdefault(origpfx, set()) | |
finleft[origpfx].add((p, t, s, tn, sn)) | |
else: | |
origpfx = t >> (T - P) | |
finright.setdefault(origpfx, set()) | |
finright[origpfx].add((p, t, s, tn, sn)) | |
# Queue up this state according to current graph transitions | |
if pn < 0: | |
origpfx = t & PFX_MASK | |
for a in leftg[origpfx]: | |
tnn = ((tn << N) | a) & SEG_MASK | |
queue.append((N-1, tnn, sn)) | |
#queue.add((N-1, tnn, sn)) | |
else: | |
origpfx = t >> (T - P) | |
for a in rightg[origpfx]: | |
tnn = (tn | (a << T)) >> N | |
queue.append((T-N, tnn, sn)) | |
#queue.add((T-N, tnn, sn)) | |
# See if shifting the tape extends the left/right graph | |
# if so, queue up the new transitions for all matching positions in set | |
def addrightatom(a, b): | |
nonlocal limit | |
toadd = [(a, b)] | |
while toadd: | |
a, b = toadd.pop() | |
oset = rightg.get(a, set()) | |
if b in oset: continue | |
limit -= 1 | |
rightg[a] = oset | {b} | |
if loud >= 2: print("right edge: " + prettytape(a, P) + " -> " + prettytape(b, N)) | |
for ia in atomcopiesr.get(a, set()): toadd.append((ia, b)) | |
# queue affected positions | |
for (rp, rt, rs, rtn, rsn) in finright.get(a, set()): | |
#rorigatom = rt >> (T - N) | |
#if rorigatom != a: continue | |
rtnn = (rtn | (b << T)) >> N | |
queue.append((T-N, rtnn, rsn)) | |
#queue.add((T-N, rtnn, rsn)) | |
def addleftatom(a, b): | |
nonlocal limit | |
toadd = [(a, b)] | |
while toadd: | |
a, b = toadd.pop() | |
oset = leftg.get(a, set()) | |
if b in oset: continue | |
limit -= 1 | |
leftg[a] = oset | {b} | |
if loud >= 2: print("left edge: " + prettytape(a, P) + " -> " + prettytape(b, N)) | |
for ia in atomcopiesl.get(a, set()): toadd.append((ia, b)) | |
# queue affected positions | |
for (rp, rt, rs, rtn, rsn) in finleft.get(a, set()): | |
#rorigatom = rt & ATOM_MASK | |
#if rorigatom != a: continue | |
rtnn = ((rtn << N) | b) & SEG_MASK | |
queue.append((N-1, rtnn, rsn)) | |
#queue.add((N-1, rtnn, rsn)) | |
if pn < 0: | |
origpfx = t >> (T-P) | |
origatom = t >> (T-N) | |
newatom = tn >> (T-N) | |
newpfx = tn >> (T-P) | |
midpfx = (tn >> (T-N-P)) & PFX_MASK | |
# xxxxxxx xxxx A>orig ... (implied rightg[orig] entries) | |
# v v v | |
# <D xxxxxxx mida newa | |
# ^--- shifted out | |
# New graph edge: mida -> newa | |
# newa -> everything in orig set | |
if midpfx not in rightg or newatom not in rightg[midpfx]: | |
addrightatom(midpfx, newatom) | |
if origpfx != newpfx: | |
rightg.setdefault(newpfx, set()) | |
newatomnewedges = rightg[origpfx] - rightg[newpfx] | |
atomcopiesr.setdefault(origpfx, set()) | |
atomcopiesr[origpfx] |= {newpfx} | |
for a in newatomnewedges: | |
addrightatom(newpfx, a) | |
else: | |
origpfx = t & PFX_MASK | |
origatom = t & ATOM_MASK | |
newatom = tn & ATOM_MASK | |
newpfx = tn & PFX_MASK | |
midpfx = (tn >> N) & PFX_MASK | |
if midpfx not in leftg or newatom not in leftg[midpfx]: | |
addleftatom(midpfx, newatom) | |
if origpfx != newpfx: | |
leftg.setdefault(newpfx, set()) | |
newatomnewedges = leftg[origpfx] - leftg[newpfx] | |
atomcopiesl.setdefault(origpfx, set()) | |
atomcopiesl[origpfx] |= {newpfx} | |
for a in newatomnewedges: | |
addleftatom(newpfx, a) | |
#print(atomcopiesl) | |
#print(atomcopiesr) | |
if loud >= 1: | |
print("Ran out of positions to visit, have a closed set") | |
print("Left graph:") | |
for k, v in sorted(leftg.items()): | |
print("\t", prettytape(k, P) + ": {" + ",".join(sorted([prettytape(vv, N) for vv in v])) + "}") | |
print("Right graph:") | |
for k, v in sorted(rightg.items()): | |
print("\t", prettytape(k, P) + ": {" + ",".join(sorted([prettytape(vv, N) for vv in v])) + "}") | |
print("Left pos:") | |
for (p, t, s, tn, sn) in sorted([m for k in finleft.values() for m in k], key=lambda x: (x[2], x[1], x[0], x[3], x[4])): | |
print("\t", prettytape(t, T, p, s), "-->", prettytape(tn, T, -1, sn)) | |
if not finleft: print(" None") | |
print("Right pos:") | |
for (p, t, s, tn, sn) in sorted([m for k in finright.values() for m in k], key=lambda x: (x[2], x[1], x[0], x[3], x[4])): | |
print("\t", prettytape(t, T, p, s), "-->", prettytape(tn, T, T, sn)) | |
if not finright: print(" None") | |
print("Looping pos:") | |
for (p, t, s, pn, tn, sn) in sorted([m for m in loops], key=lambda x: (x[2], x[1], x[0], x[5], x[4], x[3])): | |
print("\t", prettytape(t, T, p, s), "-->", prettytape(tn, T, pn, sn), "*LOOPS*") | |
if not loops: print(" None") | |
print(":NONHALTING:") | |
if savaskcert: | |
lefts = [] | |
for a, v in sorted(leftg.items()): | |
for b in sorted(v): | |
lefts.append(prettytape(a, P) + " " + prettytape(b, N)) | |
rights = [] | |
for a, v in sorted(rightg.items()): | |
for b in sorted(v): | |
rights.append(prettytape(a, P) + " " + prettytape(b, N)) | |
tapes = [] | |
for (p, t, s, _, _) in sorted((m for end in [finleft, finright, {0: [(p, t, s, tn, sn) for (p, t, s, pn, tn, sn) in loops]}] for k in end.values() for m in k), key=lambda x: (x[2], x[1], x[0], x[3], x[4])): | |
tapes.append(prettytape(t, T, p, s)) | |
print(stdstr(M), "Result %d %s %d %s %d %s" % (len(lefts), " ".join(lefts), len(rights), " ".join(rights), len(tapes), " ".join(tapes))) | |
#print((N, T), limit, "; s ", len(finleft), len(finright), "; g ", sum(len(v) for v in leftg.values()), sum(len(v) for v in rightg.values())) | |
return False, limit | |
def prettytape(t, N, p=None, s=None): | |
ts = bin(t)[2:].zfill(N)[::-1] | |
if p is not None and s is not None: | |
if p < N//2: | |
ts = ts[:p+1] + "<" + chr(s + ord("A")) + ts[p+1:] | |
else: | |
ts = ts[:p] + chr(s + ord("A")) + ">" + ts[p:] | |
return ts | |
def rdecide(M, limit=500000): | |
if isinstance(M, int): M = read_short(M)[0] | |
olimit = limit | |
for i in range(2,256): | |
for j in range(1, i//2+1): | |
res, limit = decide(M, j, i, limit=limit) | |
#print((j, i), res, limit) | |
if limit == 0: return (None, i, j) | |
if res is False: | |
return (False, i, j, olimit-limit) | |
return (None, 256, 0, limit) | |
import multiprocessing, sys | |
def prettyscan(M, T=16, N=8, P=None, limit=10000000): | |
if isinstance(M, int): M = read_short(M)[0] | |
for y in range(1, N+1): | |
for x in range(1, T+1): | |
if (x < 2*y) if P is None else (x < y+P or y < P): | |
sys.stdout.write("x") | |
sys.stdout.flush() | |
else: | |
r, l = decide(M, y, x, P, limit=limit) | |
if l == 0: sys.stdout.write("?") | |
elif r is None: sys.stdout.write(".") | |
elif r is False: sys.stdout.write("#") | |
else: | |
print("ERROR, don't know how to interpret result", r) | |
return | |
sys.stdout.flush() | |
print() | |
def wrap(p): | |
N, M, i = p | |
if i%(88664064//500) == 0: | |
print(".", end="") | |
sys.stdout.flush() | |
r, l = decide(read_short(i)[0], N, M, limit=1000000000) | |
assert l > 0 | |
#return (r, 1000000000-l) if l > 0 else (None, -1) | |
return r | |
def wrap2(i): | |
if i%(88664064//1000) == 0: | |
print(".", end="") | |
sys.stdout.flush() | |
M = read_short(i)[0] | |
if decide(M, 1, 2)[0] is False: | |
return sum(1<<i for i in range(63)) # cheaty, but saves so much compute. Verified empirically and seems obviously true that if 1,2 decides everything else does too | |
res = 0 | |
resi = 0 | |
for N, T in ((i, j) for j in range(3, 17) for i in range(1,j//2+1)): | |
r, l = decide(M, N, T, limit=1000000000) | |
assert l > 0 | |
#res += b"\0" if r is None else b"\x01" | |
if r is False: res |= 1 << resi | |
resi += 1 | |
return res | |
if __name__ == "__main__": | |
ts = [(i, j) for j in range(3, 17) for i in range(1,j//2+1)] | |
allres = {} | |
with multiprocessing.Pool(20) as pl: | |
# for t in [(1,i) for i in range(2,11)] + [(2,i) for i in range(4,11)] + [(3,i) for i in range(6,10)] + [(4,8), (4,9), (5,10)]: | |
# print("Running",t) | |
# allres[t] = pl.map(wrap, (t+(i,) for i in range(len(shortdata)//8-1))) | |
# print() | |
# print("Computing %d params for all machines" % len(ts)) | |
machres = pl.map(wrap2, range(len(shortdata)//8-1)) | |
print() | |
#print("Loading results from file...") | |
#with open("res63.bin", "rb") as f: | |
# machres = list(struct.unpack("<"+"Q"*88664064, f.read())) | |
#print("Done!") | |
import collections | |
ctr = collections.Counter(machres) | |
#[i for i in sorted(collections.Counter(machres).items(),key=lambda v:(v[1],v))[::-1]] | |
#for i, m in enumerate(machres): | |
# if ctr[m] <= 1: | |
# print("---") | |
# print("Rare machine!", i, stdstr(read_short(i)[0])) | |
# prettyscan(i, 20, 10, 500000) | |
#print("---") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import struct | |
import mmap | |
def find(l, x): | |
if x in l: return l.index(x) | |
return -1 | |
permuteback = [0, 1, 2, 4, 3, 5, 6, 7, 12, 18, 13, 19, 8, 10, 14, 20, 16, 22, 9, 11, 15, 21, 17, 23] | |
def rupermute(rules, n): | |
m = list(range(2,len(rules))) | |
pidx = [0,1] # 1RB assumed | |
inv = [] | |
for i in range(1,len(m)+1): | |
n, k = divmod(n, i) | |
inv.append(k) | |
while m: | |
pidx.append(m.pop(inv.pop())) | |
#print(pidx) | |
fixup = lambda r: tuple((ri[0], ri[1], find(pidx, ri[2])) for ri in r) | |
newrules = [fixup(rules[i]) for i in pidx] | |
return newrules | |
def ruswap(rules, a, b): | |
pidx = list(range(len(rules))) | |
pidx[a] = b | |
pidx[b] = a | |
fixup = lambda r: tuple((ri[0], ri[1], find(pidx, ri[2])) for ri in r) | |
newrules = [fixup(rules[i]) for i in pidx] | |
return newrules | |
# Defines lexical order on rules | |
def rulekey(rules): | |
ts = [t for r in rules for t in r] | |
ts.pop(0) # first transition is fixed, don't bother comparing | |
return [t[2] for t in ts] + [b for t in ts for b in [t[0], t[1]]] | |
def runorm(ru): | |
# Skip initial tape shuffling before writing 1 | |
while not ru[0][0][0]: | |
s=ru[0][0][2].index(True) | |
if s == 0: return [((False, True, [True, False]), (True, True, [False, True]))] # 0RA1RH | |
ru = ruswap(ru, 0, s) | |
# Rule is now 1?? ??? ... | |
# Mirror rule to always go right first | |
if not ru[0][0][1]: | |
ru = [tuple((t[0], not t[1], t[2]) if t[2]>=0 else (1, True, -1) for t in r) for r in ru] | |
# Rule is now 1R? ??? ... | |
if ru[0][0][2] != 1: | |
s=ru[0][0][2] | |
if s == 0: return [((True, True, 0), (True, True, -1))] # 1RA1RH | |
if s > 1: | |
ru = ruswap(ru, 1, s) | |
# Rule is now 1RB ??? ... | |
assert ru[0][0][0] | |
assert ru[0][0][1] | |
assert ru[0][0][2] == 1 | |
for t in (t for r in ru for t in r): | |
if t[2] == -1: | |
assert t[0] == 1 and t[1] == True, t | |
# Choose smallest permutation to achieve canonical lexical normal form (as defined by rulekey) | |
N = [1,1,2,6,24,120,720][len(ru)-2] | |
mi = 0 | |
mp = ru | |
mk = rulekey(ru) | |
for i in range(1, N): | |
p = rupermute(ru, i) | |
k = rulekey(p) | |
assert rupermute(p, permuteback[i]) == ru | |
if k < mk: | |
mi = i | |
mp = p | |
mk = k | |
return mp, mi | |
def ruleparse(s, norm=True, partial=False): | |
assert not (partial and norm) | |
s=s.replace(" ","") | |
s=s.replace("_","") | |
assert len(s)%6 == 0 | |
N = len(s)//6 | |
ru = [] | |
for i in range(0, len(s), 6): | |
if not partial: | |
w0, d0, s0 = ord(s[i+0]) - ord("0"), s[i+1].upper() != "L", ord(s[i+2])-ord("A") | |
w1, d1, s1 = ord(s[i+3]) - ord("0"), s[i+4].upper() != "L", ord(s[i+5])-ord("A") | |
if not (0 <= s0 < N): s0 = -1 | |
if not (0 <= s1 < N): s1 = -1 | |
if s0 == -1: w0, d0, s0 = (1, True, -1) | |
if s1 == -1: w1, d1, s1 = (1, True, -1) | |
ru.append(((w0, d0, s0), (w1, d1, s1))) | |
else: | |
w0, d0, s0 = [0, 1, None]["01".find(s[i+0])], [False, True, None]["LR".find(s[i+1].upper())], ord(s[i+2])-ord("A") | |
w1, d1, s1 = [0, 1, None]["01".find(s[i+3])], [False, True, None]["LR".find(s[i+4].upper())], ord(s[i+5])-ord("A") | |
if not (0 <= s0 <= N): s0 = None if s[i+2] not in "!HXZ" else -1 | |
if not (0 <= s1 <= N): s1 = None if s[i+5] not in "!HXZ" else -1 | |
ru.append(((w0, d0, (s0 if s0 is not None else None)), (w1, d1, (s1 if s1 is not None else None)))) | |
if norm: | |
ru, _ = runorm(ru) | |
return ru | |
def largeparse(s, norm=True, partial=False): | |
assert not (partial and norm) | |
s=s.replace(" ","") | |
s=s.split("_") | |
N = len(s) | |
M = len(s[0])//3 | |
assert all(len(t) == 3*M for t in s) | |
ru = [] | |
for i in range(N): | |
ts = [] | |
for j in range(M): | |
if not partial: | |
w0, d0, s0 = ord(s[i][3*j]) - ord("0"), s[i][3*j+1].upper() != "L", ord(s[i][3*j+2])-ord("A") | |
if not (0 <= s0 < N): s0 = -1 | |
if s0 == -1: w0, d0, s0 = (1, True, -1) | |
ts.append((w0, d0, s0)) | |
else: | |
w0, d0, s0 = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, None]["0123456789".find(s[i][3*j+0])], [False, True, None]["LR".find(s[i][3*j+1].upper())], ord(s[i][3*j+2])-ord("A") | |
if not (0 <= s0 <= N): s0 = None if s[i+2] not in "!HXZ" else -1 | |
ts.append((w0, d0, (s0 if s0 is not None else None))) | |
ru.append(tuple(ts)) | |
if norm: | |
ru, _ = runorm(ru) | |
return ru | |
def rustr(rules, ass=None): | |
c = lambda s, x: s[x] if x is not None else "?" | |
l = lambda i: chr(ord("A")+i) if 0 <= i < len(rules) else "Z?"[i == None] | |
return [" ".join([c("0123456789", w) + c("LR", d) + l(s) for (w, d, s) in r]) for r in rules] | |
def stdstr(rules): | |
c = lambda s, x: s[x] if x is not None else "?" | |
l = lambda i: chr(ord("A")+i) if 0 <= i < len(rules) else "Z?"[i == None] | |
return "_".join(["".join([c("0123456789", w) + c("LR", d) + l(s) for (w, d, s) in r]) for r in rules]) | |
# --- # | |
def read_wide(i=-1, data=None): | |
if data is None: data = widedata | |
b = struct.unpack_from("B"*30, widedata, i*30+30) | |
return runorm([ | |
(([0, 1][b[i ]], [True, False][b[i+1]], b[i+2]-1) if b[i+2] else (1, True, -1), | |
([0, 1][b[i+3]], [True, False][b[i+4]], b[i+5]-1) if b[i+5] else (1, True, -1)) for i in range(0, 30, 6) | |
]) | |
def read_short(i=-1, data=None): | |
if data is None: data = shortdata | |
st, pk = struct.unpack_from("<II", data, i*8+8) | |
return ([ | |
(((pk>>(4*i )) & 1, bool((pk>>(4*i+1)) & 1), [0,1,2,3,4,5,6,-1][(st>>(i*6 ))&7]), | |
((pk>>(4*i+2)) & 1, bool((pk>>(4*i+3)) & 1), [0,1,2,3,4,5,6,-1][(st>>(i*6+3))&7])) for i in range(5) | |
], pk >> 20) | |
def write_short(rules, n, data=None, pos=-1): | |
st, pk = 0, 0 | |
for i, (r0, r1) in enumerate(rules): | |
st |= (r0[2]&7) << (6*i) | |
st |= (r1[2]&7) << (6*i+3) | |
pk |= r0[0] << (4*i) | |
pk |= int(r0[1]) << (4*i+1) | |
pk |= r1[0] << (4*i+2) | |
pk |= int(r1[1]) << (4*i+3) | |
pk |= n << 20 | |
if data is None: | |
a = bytearray(8) | |
struct.pack_into("<II", a, 0, st, pk) | |
return a | |
else: | |
struct.pack_into("<II", data, pos*8+8, st, pk) | |
with open("all_5_states_undecided_machines_with_global_header", "rb") as widedbfile: | |
widedata = mmap.mmap(widedbfile.fileno(), 0, access=mmap.ACCESS_READ) | |
import sys | |
if __name__ == "__main__" and sys.argv[1:] == ["convert"]: # Convert | |
nmach = struct.unpack_from(">I", widedata, 8)[0] | |
shortdata = bytearray(8) | |
struct.pack_into("<I", shortdata, 0, nmach) | |
for i in range(nmach): | |
if i % 50000 == 0: | |
print(" %d / %d " % (i, nmach), end="\r") | |
sys.stdout.flush() | |
ru, n = read_wide(i) | |
shortdata += write_short(ru, n) | |
print("\nDone!") | |
with open("seeddb_short_norm.bin", "rb") as f: f.write(shortdata) | |
with open("seeddb_short_norm.bin", "rb") as shortdbfile: | |
shortdata = mmap.mmap(shortdbfile.fileno(), 0, access=mmap.ACCESS_READ) | |
with open("bb5_undecided_index", "rb") as f: | |
undecided = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment