Created
April 14, 2017 17:03
-
-
Save nathan-sixnines/307cebacdb314d3e775f15ad4901cfa0 to your computer and use it in GitHub Desktop.
modified gnugo_vs_gnugo from https://github.com/jtauber/gtp/ to review games with Leela
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from subprocess import Popen, PIPE, STDOUT | |
import string | |
import thread | |
from threading import Timer | |
from gtp import parse_vertex, gtp_move, gtp_color | |
from gtp import BLACK, WHITE, PASS | |
import sgf | |
class GTPSubProcess(object): | |
def __init__(self, label, args): | |
self.label = label | |
self.subprocess = Popen(args, stdin=PIPE, stdout=PIPE) | |
print("{} subprocess created".format(label)) | |
def send(self, data): | |
print("sending {}: {}".format(self.label, data)) | |
self.subprocess.stdin.write(data) | |
result = "" | |
while True: | |
data = self.subprocess.stdout.readline() | |
if not data.strip(): | |
break | |
result += data | |
print("got: {}".format(result)) | |
return result | |
def close(self): | |
print("quitting {} subprocess".format(self.label)) | |
self.subprocess.communicate("quit\n") | |
class GTPFacade(object): | |
def __init__(self, label, args): | |
self.label = label | |
self.gtp_subprocess = GTPSubProcess(label, args) | |
def name(self): | |
self.gtp_subprocess.send("name\n") | |
def version(self): | |
self.gtp_subprocess.send("version\n") | |
def boardsize(self, boardsize): | |
self.gtp_subprocess.send("boardsize {}\n".format(boardsize)) | |
def komi(self, komi): | |
self.gtp_subprocess.send("komi {}\n".format(komi)) | |
def clear_board(self): | |
self.gtp_subprocess.send("clear_board\n") | |
def genmove(self, color): | |
message = self.gtp_subprocess.send( | |
"genmove {}\n".format(gtp_color(color))) | |
assert message[0] == "=" | |
return parse_vertex(message[1:].strip()) | |
def undo(self): | |
self.gtp_subprocess.send("undo\n") | |
def showboard(self): | |
self.gtp_subprocess.send("showboard\n") | |
#message = | |
#return message.strip() | |
def play(self, color, vertex): | |
self.gtp_subprocess.send("play {}\n".format(gtp_move(color, vertex))) | |
def final_score(self): | |
self.gtp_subprocess.send("final_score\n") | |
def close(self): | |
self.gtp_subprocess.close() | |
def run_with_timeout(timeout, default, f, *args, **kwargs): | |
if not timeout: | |
return f(*args, **kwargs) | |
try: | |
timeout_timer = Timer(timeout, thread.interrupt_main) | |
timeout_timer.start() | |
result = f(*args, **kwargs) | |
return result | |
except KeyboardInterrupt: | |
return default | |
finally: | |
timeout_timer.cancel() | |
def sgfSizeandKomi(SGFstring): | |
tokens = SGFstring.strip().replace('\n',']').replace(']','[').split('[') | |
komiDone = False | |
sizeDone = False | |
nextKomi = False | |
nextSize = False | |
for token in tokens: | |
if nextKomi and not komiDone: | |
komi = token | |
komiDone = True | |
if nextSize and not sizeDone: | |
size = token | |
sizeDone = True | |
if token.upper() == 'KM': | |
nextKomi = True | |
print("next token is komi") | |
if token.upper() == 'SZ': | |
nextSize = True | |
print("next token is size") | |
return size, komi | |
def sgfVertex(instring): | |
lower = instring.lower() | |
x = ord(lower[0]) - 96 | |
y = ord(lower[1]) - 96 | |
return x, y | |
def vertexSGF(incoords): | |
#lower = instring.lower() | |
x = chr(96 + incoords[0]) | |
y = chr(96 + incoords[1]) | |
#(x, y): | |
#return "%s%s" % (char(x + 96), char(y + 96)) | |
def review(filename): | |
with open(filename) as f: | |
gameString = f.read() | |
game = sgf.parse(gameString)[0] | |
(size, komi) = sgfSizeandKomi(gameString) | |
#GNUGO = ["gnugo", "--mode", "gtp"] | |
#GNUGO_LEVEL_ONE = ["gnugo", "--mode", "gtp", "--level", "1"] | |
#GNUGO_MONTE_CARLO = ["gnugo", "--mode", "gtp", "--monte-carlo"] | |
LEELA = ["../leela_090_linux_x64", "-g"] | |
AI = GTPFacade("Leela", LEELA) | |
#white = GTPFacade("white", LEELA) | |
print ("done init leela") | |
AI.name() | |
AI.version() | |
AI.boardsize(size) | |
AI.komi(komi) | |
AI.clear_board() | |
first_pass = False | |
moves = [] | |
print ("starting review") | |
lastNode = None | |
for node in game: | |
#print node.__dict__ | |
if (not ('B' in node.properties or 'W' in node.properties)): | |
continue | |
elif 'B' in node.properties: | |
LeelaVertex = AI.genmove(BLACK) | |
moves.append((BLACK, LeelaVertex)) | |
node.next_variation = sgf.Node(game, lastNode) | |
#node.next_variation['properties'] = {'B': ['%s' % vertexSGF(LeelaVertex)]} | |
node.next_variation.my_start_property('B') | |
node.next_variation.my_add_prop_value(LeelaVertex) | |
print (AI.showboard()) | |
AI.undo() | |
RecordVertex = node.properties['B'][0] | |
if RecordVertex == '': | |
RecordVertex = 'pass' | |
if first_pass: | |
break | |
else: | |
first_pass = True | |
else: | |
first_pass = False | |
#print "RecordVertex: ~~~~~~~~~~~" | |
#print RecordVertex | |
AI.play(BLACK, sgfVertex(RecordVertex)) | |
print (AI.showboard()) | |
elif 'W' in node.properties: | |
LeelaVertex = AI.genmove(WHITE) | |
moves.append((WHITE, LeelaVertex)) | |
node.next_variation = sgf.Node(game, lastNode) | |
#node.next_variation.properties = {'W': ['%s' % vertexSGF(LeelaVertex)]} | |
node.next_variation.my_start_property('W') | |
node.next_variation.my_add_prop_value(LeelaVertex) | |
print (AI.showboard()) | |
AI.undo() | |
RecordVertex = node.properties['W'][0] | |
if RecordVertex == '': | |
RecordVertex = 'pass' | |
if first_pass: | |
break | |
else: | |
first_pass = True | |
else: | |
first_pass = False | |
AI.play(WHITE, sgfVertex(RecordVertex)) | |
print (AI.showboard()) | |
lastNode = node | |
#AI.showboard() | |
print(moves) | |
with open("%s-review.sgf" % filename,"w") as f: | |
game.output(f) | |
with open('output.sgf') as f: | |
sgfText = f.read() + "\n" | |
sgfText = sgfText.strip().replace(';','\n(;') | |
for move in moves[::-1]: | |
if move[0] == 1: | |
color = 'B' | |
if move[0] == -1: | |
color = 'W' | |
try: | |
addText = "(;%s[%s%s]))\n" % (color, chr(96 + move[1][0]), chr(96 + move[1][1])) | |
except: | |
addText = "(;%s[]))\n" % color | |
sgfText = sgfText + addText | |
print sgfText | |
with open("output.sgf","w") as f: | |
f.write(sgfText[1:]) # cut off extra parenthesis at the start | |
#stderr = "" | |
#scan_process = Popen(AI.gtp_subprocess.subprocess.stderr.readline, stdout=PIPE, stderr=STDOUT) | |
#while(1): | |
# errLine = run_with_timeout(3, None, scan_process.stdout.readline) | |
# if errLine is None: | |
# break | |
# else: | |
# print (errLine) | |
# stderr = stderr + errLine | |
#with open('stderrOut.txt','w') as f: | |
# f.write(stderr) | |
""" | |
while True: | |
vertex = black.genmove(BLACK) | |
if vertex == PASS: | |
if first_pass: | |
break | |
else: | |
first_pass = True | |
else: | |
first_pass = False | |
black.showboard() | |
white.play(BLACK, vertex) | |
white.showboard() | |
vertex = white.genmove(WHITE) | |
if vertex == PASS: | |
if first_pass: | |
break | |
else: | |
first_pass = True | |
else: | |
first_pass = False | |
white.showboard() | |
black.play(WHITE, vertex) | |
black.showboard() | |
black.final_score() | |
white.final_score() | |
black.close() | |
white.close() | |
""" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment