Created
September 19, 2019 05:14
-
-
Save marty1885/442997b55897206d32c36d0082226006 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import torch | |
from torch import nn, optim | |
import torch.functional as F | |
import matplotlib.pyplot as plt | |
import numpy as np | |
# Use ROOT as a binding between C++ and Python | |
import ROOT | |
gROOT = ROOT.gROOT | |
# Import Etaler into Python | |
gROOT.ProcessLine('#pragma cling load("/usr/local/lib/libEtaler.so")') | |
headers = ['Etaler/Etaler.hpp', 'Etaler/Algorithms/TemporalMemory.hpp' | |
, 'Etaler/Encoders/GridCell1d.hpp', 'Etaler/Algorithms/SDRClassifer.hpp'] | |
for header in headers: | |
gROOT.ProcessLine('#include <{}>'.format(header)) | |
# Import Etaler's namespce | |
et = ROOT.et | |
# Setup PyTorch and Etaler to use GPU | |
use_gpu = False # May be slower unless you have a 2080Ti | |
if torch.cuda.is_available() and use_gpu: | |
device = torch.device('cuda') | |
else: | |
device = torch.device('cpu') | |
backend = et.defaultBackend() | |
if use_gpu: | |
try: | |
gROOT.ProcessLine("#include <Etaler/Backends/OpenCLBackend.hpp>") | |
backend = ROOT.std.shared_ptr(et.OpenCLBackend) | |
et.setDefaultBackend(backend) | |
except: | |
pass | |
# Game parameters | |
num_games = 20000 | |
# hard coded defines | |
ROCK = 0 | |
PAPER = 1 | |
SCISSOR = 2 | |
## helper functions | |
def to_winning_move(x): | |
if x == ROCK: | |
return PAPER | |
elif x == PAPER: | |
return SCISSOR | |
return ROCK | |
def solve_winner(m1, m2): | |
if m1 == m2: | |
return 0 # draw | |
elif (m1 == ROCK and m2 == SCISSOR) \ | |
or (m1 == SCISSOR and m2 == PAPER) \ | |
or (m1 == PAPER and m2 == ROCK): | |
return 1 | |
return -1 | |
## Neural Netowrk Model | |
def onehot(v): | |
return torch.Tensor([1 if v == i else 0 for i in range(3)]).view(1, 1, -1).to(device) | |
class NNModel(nn.Module): | |
def __init__(self): | |
super(NNModel, self).__init__() | |
self.hidden_size = 16 | |
self.num_lstm_stack = 3 | |
self.rnn1 = nn.LSTM(3, self.hidden_size, self.num_lstm_stack).to(device) | |
self.hidden = [torch.randn(self.num_lstm_stack, 1 ,self.hidden_size).to(device) | |
, torch.randn(self.num_lstm_stack, 1 ,self.hidden_size).to(device)] | |
self.fc1 = nn.Linear(self.hidden_size, 3).to(device) | |
self.softmax = nn.Softmax(dim=2).to(device) | |
def forward(self, x): | |
y, h = self.rnn1(x, self.hidden) | |
y = self.fc1(y) | |
y = self.softmax(y) | |
self.hidden = list(h) | |
return y | |
def drop(): | |
self.hidden = [torch.randn(self.num_lstm_stack, 1 ,self.hidden_size).to(device) | |
, torch.randn(self.num_lstm_stack, 1 ,self.hidden_size).to(device)] | |
class NNAgent: | |
def __init__(self): | |
self.train_length = 1 | |
self.model = NNModel() | |
self.optimizer = optim.SGD(self.model.parameters(), lr=0.01) | |
self.loss = nn.BCELoss() | |
def compute(self, x): | |
v = onehot(x) | |
y = self.model.forward(v) | |
# train the network | |
self.optimizer.zero_grad() | |
l = self.loss(y, v) | |
l.backward() | |
self.optimizer.step() | |
for i in range(len(self.model.hidden)): | |
self.model.hidden[i] = self.model.hidden[i].detach() | |
return torch.argmax(y) | |
## HTM Agent | |
class HTMAgent: | |
def __init__(self): | |
dummy_sdr = et.encoder.gridCell1d(0); | |
input_shape = dummy_sdr.shape() | |
cells_per_column = 16 | |
self.tm1 = et.TemporalMemory(input_shape, cells_per_column) | |
self.sc1 = et.SDRClassifer(input_shape, 3) | |
# Initalize Classifer | |
for v in [0, 1, 2]: | |
x = et.encoder.gridCell1d(v) | |
self.sc1.addPattern(x, v) | |
self.last_pred = et.zeros(input_shape+cells_per_column, et.DType.Bool) | |
self.last_active = et.zeros(input_shape+cells_per_column, et.DType.Bool) | |
def compute(self, x): | |
x = et.encoder.gridCell1d(x) | |
active, predict = self.tm1.compute(x, self.last_pred) | |
self.tm1.learn(active, self.last_active) | |
self.last_active, self.last_pred = active, predict | |
pred_sdr = predict.sum(1, et.DType.Bool) | |
return self.sc1.compute(pred_sdr) | |
# Players | |
nn_agent = NNAgent() | |
htm_agent = HTMAgent() | |
# Game states | |
nn_last_move = 0 | |
htm_last_move = 0 | |
# Record variables | |
winners = [] | |
# Play all the games | |
for i in range(num_games): | |
nn_move = to_winning_move(nn_agent.compute(htm_last_move)) | |
htm_move = to_winning_move(htm_agent.compute(nn_last_move)) | |
winner = solve_winner(nn_move, htm_move) | |
nn_last_move = nn_move | |
htm_last_move = htm_move | |
winners += [winner] | |
winners = np.array(winners) | |
draws = (winners==0) | |
nn_wins = (winners==1) | |
htm_wins = (winners==-1) | |
def prog_rate(a): | |
p = 0 | |
res = np.zeros(len(a)) | |
for i, v in enumerate(a): | |
j = i+1 | |
p = p*(i/j)+v*(1/j) | |
res[i] = p | |
return res | |
kernel = np.ones(window_size)/window_size | |
draw_rate = np.convolve(draws, kernel, 'valid') | |
htm_win_rate = np.convolve(htm_wins, kernel, 'valid') | |
nn_rate = np.convolve(nn_wins, kernel, 'valid') | |
htm_win_rate = prog_rate(htm_wins) | |
print("{} draws".format(draws.sum())) | |
print("{} NN wins".format(nn_wins.sum())) | |
print("{} HTM wins".format(htm_wins.sum())) | |
plt.plot(htm_win_rate) | |
plt.title("HTM winning rate over time") | |
plt.show() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment