Skip to content

Instantly share code, notes, and snippets.

@kastnerkyle
Last active November 9, 2023 09:51
Show Gist options
  • Save kastnerkyle/0f305ac02185b0712bedd3c37cdaf787 to your computer and use it in GitHub Desktop.
Save kastnerkyle/0f305ac02185b0712bedd3c37cdaf787 to your computer and use it in GitHub Desktop.
# Author: Kyle Kastner
# License: BSD 3-Clause
# Inspired from blogpost by Justin Sermeno https://justinsermeno.com/posts/cfr/
# Extended with algorithms described by Oskari Tammelin http://jeskola.net/cfr/demo/ by
# Particularly, solve has core game logic
# http://jeskola.net/cfr/demo/solve.js
# basecfr: http://poker.cs.ualberta.ca/publications/NIPS07-cfr.pdf
# cfrplus: https://arxiv.org/abs/1407.5042
# cfrplus: http://poker.cs.ualberta.ca/publications/2015-ijcai-cfrplus.pdf
# cfrplusavg: https://www.tngtech.com/fileadmin/Public/Images/BigTechday/BTD10/Folien/Folien_MichaelSchubert.pdf
# cfrplusavg: https://arxiv.org/abs/1811.00164, section 2.2 (also called linear cfr)
# cfrplusavg2: variant with squared iterations, https://arxiv.org/abs/1809.04040
# lcfr: https://arxiv.org/abs/1809.04040
# dcfr: https://arxiv.org/abs/1809.04040
# cfrbr: http://www.johanson.ca/publications/poker/2012-aaai-cfr-br/2012-aaai-cfr-br.pdf
# xfp: http://www0.cs.ucl.ac.uk/staff/d.silver/web/Publications_files/fsp.pdf
# cfu: ?
# escfr: http://www.cs.cmu.edu/~waugh/publications/nips09b.pdf
# memory efficient cfr
# https://jeskola.net/cfr/dcfu/
# Additional references:
# Tutorial post
# https://int8.io/counterfactual-regret-minimization-for-poker-ai/
# Multiplayer games
# http://webdocs.cs.ualberta.ca/~duane/publications/pdf/2010aamas.pdf
# Overview and other applications
# http://webdocs.cs.ualberta.ca/~hayward/670gga/jem/burch.pdf
# Detailed tutorial
# http://cs.gettysburg.edu/~tneller/modelai/2013/cfr/cfr.pdf
# Course writeup with detailed notes and some code
# https://github.com/Limegrass/Counterfactual-Regret-Minimization/blob/notes/Learning_to_Play_Poker_using_CounterFactual_Regret_Minimization.pdf
# PhD theses
# http://poker.cs.ualberta.ca/publications/Burch_Neil_E_201712_PhD.pdf
# http://mlanctot.info/files/papers/PhD_Thesis_MarcLanctot.pdf
# http://richardggibson.appspot.com/static/work/thesis-phd/thesis-phd-paper.pdf
# MSc theses
# http://poker.cs.ualberta.ca/publications/Davis_Trevor_R_201506_MSc.pdf
# Explanation of CFR
# https://www.quora.com/What-is-an-intuitive-explanation-of-counterfactual-regret-minimization
# Demo discussion of Liberatus
# https://www.cs.cmu.edu/~noamb/papers/17-IJCAI-Libratus.pdf
# Demo discussion of older bot
# https://www.cs.cmu.edu/~sandholm/BabyTartanian8.ijcai16demo.pdf
# Discussion of some work on poker bot forums
# http://www.poker-ai.org/phpbb/viewtopic.php?f=25&t=2852
# Recent integrations with deep learning
# https://arxiv.org/pdf/1809.03057.pdf
# https://arxiv.org/abs/1811.00164
# https://openreview.net/forum?id=Bkeuz20cYm
# Source code for MATLAB including iqph solver...
# http://www.cs.cmu.edu/~ggordon/poker/
# http://www.cs.cmu.edu/~ggordon/poker/source/
import numpy as np
import copy
DECK_SIZE = 13
Q_MODE = 0
global ITERATION_COUNT
ITERATION_COUNT = 0
global RANDOM_STATE
RANDOM_STATE = np.random.RandomState(12)
def set_random_seed(seed):
global RANDOM_STATE
RANDOM_STATE = np.random.RandomState(seed)
def set_iteration_count():
global ITERATION_COUNT
ITERATION_COUNT = 0
def increment_iteration_count():
global ITERATION_COUNT
ITERATION_COUNT += 1
def get_iteration_count():
return ITERATION_COUNT
def create_decision_node(player, children):
return Node(children, player, name="Decision")
def create_showdown_node(player, payoff):
return Node(None, player, name="Showdown", payoff=payoff)
def create_fold_node(player, payoff):
return Node(None, player, name="Fold", payoff=payoff)
def bet_size(bet_count):
return bet_count if bet_count != 0 else 0.
# 1 card poker, 2 actions leading to 3 types of nodes
def create_game(player, bet_count, cap, pot, parent=None):
foldp = -1 if player == 0 else 1
if bet_count == 0:
# ^ 1 will swap between player 0 and player 1
parent = Node(None, None, name="Root")
return create_decision_node(player,
[create_game(player ^ 1, bet_count + 1, cap, pot, parent),
create_game(player ^ 1, bet_count, cap, pot, parent) if player == 0 else create_showdown_node(player, pot / float(2.))])
elif bet_count < cap:
return create_decision_node(player,
[create_game(player ^ 1, bet_count + 1, cap, pot, parent),
create_showdown_node(player, pot / float(2.) + bet_size(bet_count)),
create_fold_node(player, foldp * (pot / float(2.) + bet_size(bet_count - 1)))])
else:
return create_decision_node(player,
[create_showdown_node(player, pot / float(2.) + bet_size(bet_count)),
create_fold_node(player, foldp * (pot / float(2.) + bet_size(bet_count - 1)))])
def deal_hands(random_state):
return random_state.choice(DECK_SIZE, 2, replace=False)
def copy_or_add(copy_flag, a, b):
assert len(a) == len(b)
if copy_flag == 1:
for i in range(len(a)):
a[i] = b[i]
else:
for i in range(len(a)):
a[i] += b[i]
def normalize(s):
p_sum = 0.
for i in range(len(s)):
p_sum += s[i]
r = np.zeros(len(s))
if p_sum > 0:
r = s / float(p_sum)
else:
r += 1. / float(len(s))
return r
class Node(object):
def __init__(self, children, player, name=None, payoff=None):
self.children = children
self.player = player
self.name = name
self.payoff = payoff
if name == "Decision":
self.cfr = [np.zeros(len(self.children)) for i in range(DECK_SIZE)]
self.strategy = [np.zeros(len(self.children)) for i in range(DECK_SIZE)]
self.strategy_sum = [np.zeros(len(self.children)) for i in range(DECK_SIZE)]
self.iterations = 0
self.best_response_actions = [0 for i in range(DECK_SIZE)]
def is_terminal(self):
if self.children is None:
return True
else:
return False
def terminal_values(self, player, op):
ev = np.zeros(len(op))
sum_op = 0.
if self.name == "Fold":
p_value = self.payoff if player == 0 else -self.payoff
sum_op = sum(op)
ev = (sum_op - op) * float(p_value)
else:
# showdown?
for i in range(len(op)):
ev[i] = sum_op * float(self.payoff)
sum_op += op[i]
sum_op = 0
for i in reversed(range(len(op))):
ev[i] -= sum_op * float(self.payoff)
sum_op += op[i]
return ev
def payoff_values(self, player, hands):
if self.name == "Fold":
if player == 0:
return float(self.payoff)
else:
return -float(self.payoff)
else:
return float(self.payoff) if hands[player] > hands[player ^ 1] else -float(self.payoff)
def get_current_strategy(self, hand):
p_sum = 0.
for a in range(len(self.children)):
p_sum += max(self.cfr[hand][a], 0.)
s = np.zeros(len(self.children))
if p_sum > 0:
s = self.cfr[hand] / float(p_sum)
s = np.maximum(s, 0.)
else:
s += 1. / float(len(self.children))
return s
def get_normalized_strategy(self, hand):
return normalize(self.strategy[hand])
def get_average_strategy(self):
totals = [sum(self.strategy[i]) for i in range(len(self.strategy))]
strategy = []
for i in range(len(self.strategy)):
if totals[i] > 0:
strategy.append(self.strategy[i] / float(totals[i]))
else:
strategy.append(np.zeros(len(self.children)) + 1. / len(self.children))
return strategy
def update_cfr(self, i, a, delta, pos_weight=1., neg_weight=1.):
if Q_MODE == 0:
#self.cfr[i][a] += delta
if self.cfr[i][a] >= 0:
self.cfr[i][a] += pos_weight * delta
else:
self.cfr[i][a] += neg_weight * delta
elif Q_MODE == 1:
print("Q_MODE 1")
from IPython import embed; embed(); raise ValueError()
else:
print("Q_MODE other")
from IPython import embed; embed(); raise ValueError()
def __repr__(self):
return "Node(name={})".format(self.name)
def best_response(player, node, op):
if node.is_terminal():
return node.terminal_values(player, op)
if node.player == player:
ev = best_response(player, node.children[0], op)
for a in range(1, len(node.children)):
ev = np.maximum(ev, best_response(player, node.children[a], op))
else:
new_op = 0. * op
for a in range(len(node.children)):
for i in range(len(op)):
s = node.get_normalized_strategy(i)
new_op[i] = s[a] * op[i]
br = best_response(player, node.children[a], new_op)
if a == 0:
ev = br
else:
ev += br
return ev
def best_response_value(player, game):
op = np.zeros(DECK_SIZE) + 1. / DECK_SIZE
ev = best_response(player, game, op)
return sum(ev) / float(DECK_SIZE)
def get_exploitability(game):
br0 = best_response_value(0, game)
br1 = best_response_value(1, game)
return (br0 + br1) / float(2) * 1000
def set_best_response_actions(player, node, op):
if node.is_terminal():
return node.terminal_values(player, op)
if node.player == player:
ev = set_best_response_actions(player, node.children[0], op)
for i in range(len(op)):
node.best_response_actions[i] *= 0
for a in range(1, len(node.children)):
u = set_best_response_actions(player, node.children[a], op)
for i in range(len(op)):
if u[i] > ev[i]:
ev[i] = u[i]
node.best_response_actions[i] = a
else:
s = [node.get_current_strategy(i) for i in range(len(op))]
ev = 0. * op
for a in range(len(node.children)):
new_op = 0. * op
for i in range(len(op)):
new_op[i] = s[i][a] * op[i]
copy_or_add(a == 0, ev, set_best_response_actions(player, node.children[a], new_op))
return ev
def basecfr(player, node, op):
if node.is_terminal():
return node.terminal_values(player, op)
s = [node.get_current_strategy(i) for i in range(len(op))]
ev = 0. * op
if node.player == player:
u = []
for a in range(len(node.children)):
u.append(basecfr(player, node.children[a], op))
for i in range(len(op)):
ev[i] += s[i][a] * u[a][i]
for i in range(len(op)):
for a in range(len(node.children)):
node.update_cfr(i, a, u[a][i] - ev[i])
else:
for i in range(len(op)):
for a in range(len(node.children)):
node.strategy[i][a] += op[i] * s[i][a]
for a in range(len(node.children)):
new_op = 0. * op
for i in range(len(op)):
new_op[i] = s[i][a] * op[i]
copy_or_add(a == 0, ev, basecfr(player, node.children[a], new_op))
return ev
def lcfr(player, node, op):
alpha = 1.
beta = 1.
gamma = 1.
iteration_count = get_iteration_count()
i_a = iteration_count ** alpha
i_b = iteration_count ** beta
pos_weight = i_a / float(i_a + 1.)
neg_weight = i_b / float(i_b + 1.)
gamma_weight = (iteration_count / float(iteration_count + 1.)) ** gamma
if node.is_terminal():
return node.terminal_values(player, op)
s = [node.get_current_strategy(i) for i in range(len(op))]
ev = 0. * op
if node.player == player:
u = []
for a in range(len(node.children)):
u.append(lcfr(player, node.children[a], op))
for i in range(len(op)):
ev[i] += s[i][a] * u[a][i]
for i in range(len(op)):
for a in range(len(node.children)):
node.update_cfr(i, a, u[a][i] - ev[i], pos_weight, neg_weight)
else:
for i in range(len(op)):
for a in range(len(node.children)):
node.strategy[i][a] += gamma_weight * op[i] * s[i][a]
for a in range(len(node.children)):
new_op = 0. * op
for i in range(len(op)):
new_op[i] = s[i][a] * op[i]
copy_or_add(a == 0, ev, lcfr(player, node.children[a], new_op))
return ev
def dcfr(player, node, op):
alpha = 3. / 2.
beta = 0.
gamma = 2.
iteration_count = get_iteration_count()
i_a = iteration_count ** alpha
i_b = iteration_count ** beta
pos_weight = i_a / float(i_a + 1.)
neg_weight = i_b / float(i_b + 1.)
gamma_weight = (iteration_count / float(iteration_count + 1.)) ** gamma
if node.is_terminal():
return node.terminal_values(player, op)
s = [node.get_current_strategy(i) for i in range(len(op))]
ev = 0. * op
if node.player == player:
u = []
for a in range(len(node.children)):
u.append(dcfr(player, node.children[a], op))
for i in range(len(op)):
ev[i] += s[i][a] * u[a][i]
for i in range(len(op)):
for a in range(len(node.children)):
node.update_cfr(i, a, u[a][i] - ev[i], pos_weight, neg_weight)
else:
for i in range(len(op)):
for a in range(len(node.children)):
node.strategy[i][a] += gamma_weight * op[i] * s[i][a]
for a in range(len(node.children)):
new_op = 0. * op
for i in range(len(op)):
new_op[i] = s[i][a] * op[i]
copy_or_add(a == 0, ev, dcfr(player, node.children[a], new_op))
return ev
def cfrplus(player, node, op):
if node.is_terminal():
return node.terminal_values(player, op)
s = [node.get_current_strategy(i) for i in range(len(op))]
ev = 0. * op
if node.player == player:
u = []
for a in range(len(node.children)):
u.append(cfrplus(player, node.children[a], op))
for i in range(len(op)):
ev[i] += s[i][a] * u[a][i]
for i in range(len(op)):
for a in range(len(node.children)):
node.update_cfr(i, a, u[a][i] - ev[i])
node.cfr[i][a] = max(node.cfr[i][a], 0.)
node.strategy[i][a] = node.cfr[i][a]
else:
for a in range(len(node.children)):
new_op = 0. * op
for i in range(len(op)):
new_op[i] = s[i][a] * op[i]
copy_or_add(a == 0, ev, cfrplus(player, node.children[a], new_op))
return ev
def cfrplusavg(player, node, op):
if node.is_terminal():
return node.terminal_values(player, op)
s = [node.get_current_strategy(i) for i in range(len(op))]
ev = 0. * op
if node.player == player:
u = []
for a in range(len(node.children)):
u.append(cfrplusavg(player, node.children[a], op))
for i in range(len(op)):
ev[i] += s[i][a] * u[a][i]
for i in range(len(op)):
for a in range(len(node.children)):
node.update_cfr(i, a, u[a][i] - ev[i])
node.cfr[i][a] = max(node.cfr[i][a], 0.)
else:
delay = 0
iteration_count = get_iteration_count()
weight = max(0, iteration_count - delay + 1)
for i in range(len(op)):
for a in range(len(node.children)):
node.strategy[i][a] += op[i] * s[i][a] * weight
for a in range(len(node.children)):
new_op = 0. * op
for i in range(len(op)):
new_op[i] = s[i][a] * op[i]
copy_or_add(a == 0, ev, cfrplusavg(player, node.children[a], new_op))
return ev
def cfrplusavg2(player, node, op):
if node.is_terminal():
return node.terminal_values(player, op)
s = [node.get_current_strategy(i) for i in range(len(op))]
ev = 0. * op
if node.player == player:
u = []
for a in range(len(node.children)):
u.append(cfrplusavg2(player, node.children[a], op))
for i in range(len(op)):
ev[i] += s[i][a] * u[a][i]
for i in range(len(op)):
for a in range(len(node.children)):
node.update_cfr(i, a, u[a][i] - ev[i])
node.cfr[i][a] = max(node.cfr[i][a], 0.)
else:
# need global tracker for this
delay = 0
iteration_count = get_iteration_count()
# squared weight
weight = pow(max(0, iteration_count - delay + 1), 2)
for i in range(len(op)):
for a in range(len(node.children)):
node.strategy[i][a] += op[i] * s[i][a] * weight
for a in range(len(node.children)):
new_op = 0. * op
for i in range(len(op)):
new_op[i] = s[i][a] * op[i]
copy_or_add(a == 0, ev, cfrplusavg2(player, node.children[a], new_op))
return ev
def cfrbr(player, node, op):
if node.is_terminal():
return node.terminal_values(player, op)
s = [node.get_current_strategy(i) for i in range(len(op))]
ev = 0. * op
if node.player == player:
u = []
for a in range(len(node.children)):
u.append(cfrbr(player, node.children[a], op))
for i in range(len(op)):
ev[i] += s[i][a] * u[a][i]
for i in range(len(op)):
for a in range(len(node.children)):
node.update_cfr(i, a, u[a][i] - ev[i])
node.strategy[i] = node.get_current_strategy(i)
else:
for a in range(len(node.children)):
new_op = 0. * op
for i in range(len(op)):
new_op[i] = op[i] if node.best_response_actions[i] == a else 0.
copy_or_add(a == 0, ev, cfrbr(player, node.children[a], new_op))
return ev
def basecfu(player, node, op):
if node.is_terminal():
return node.terminal_values(player, op)
s = [np.argmax(node.cfr[i]) for i in range(len(node.cfr))]
ev = 0. * op
if node.player == player:
u = []
for a in range(len(node.children)):
u.append(basecfu(player, node.children[a], op))
for i in range(len(op)):
node.update_cfr(i, a, u[a][i])
for i in range(len(op)):
ev[i] = u[s[i]][i]
else:
for i in range(len(op)):
if op[i] != 0:
node.strategy[i][s[i]] += 1
for a in range(len(node.children)):
new_op = 0. * op
for i in range(len(op)):
new_op[i] = op[i] if s[i] == a else 0.
copy_or_add(a == 0, ev, basecfu(player, node.children[a], new_op))
return ev
def xfp(player, node, p, brp):
if node.is_terminal():
return
if node.player == player:
iteration_count = get_iteration_count()
s = [node.get_current_strategy(i) for i in range(len(p))]
for a in range(len(node.children)):
for i in range(len(p)):
if brp[i] != 0.:
abrs = 1. if node.best_response_actions[i] == a else 0.
node.cfr[i][a] += (abrs - node.cfr[i][a]) / (1. + p[i] * (iteration_count + 1))
node.strategy[i][a] = node.cfr[i][a]
new_p = 0. * p
new_brp = 0. * brp
for i in range(len(p)):
abrs = 1. if node.best_response_actions[i] == a else 0.
new_p[i] = s[i][a] * p[i]
new_brp[i] = abrs * brp[i]
xfp(player, node.children[a], new_p, new_brp)
else:
for a in range(len(node.children)):
xfp(player, node.children[a], p, brp)
def escfr(player, node, hands):
if node.is_terminal():
return node.payoff_values(player, hands)
s = node.get_current_strategy(hands[node.player])
if node.player == player:
ev = 0.
u = []
for a in range(len(node.children)):
u.append(escfr(player, node.children[a], hands))
ev += s[a] * u[a]
for a in range(len(node.children)):
node.update_cfr(hands[player], a, u[a] - ev)
return ev
else:
for a in range(len(node.children)):
node.strategy[hands[node.player]][a] += s[a]
which = RANDOM_STATE.choice(len(s), p=s)
return escfr(player, node.children[which], hands)
def espurecfr(player, node, hands):
if node.is_terminal():
return node.payoff_values(player, hands)
s = node.get_current_strategy(hands[node.player])
s_a = RANDOM_STATE.choice(len(s), p=s)
if node.player == player:
u = []
for a in range(len(node.children)):
u.append(espurecfr(player, node.children[a], hands))
for a in range(len(node.children)):
node.update_cfr(hands[player], a, u[a] - u[s_a])
return u[s_a]
else:
node.strategy[hands[node.player]][s_a] += 1
return espurecfr(player, node.children[s_a], hands)
def espurecfrmax(player, node, hands):
if node.is_terminal():
return node.payoff_values(player, hands)
if node.player == player:
u = []
for a in range(len(node.children)):
u.append(espurecfrmax(player, node.children[a], hands))
s = node.get_current_strategy(hands[node.player])
s_a = RANDOM_STATE.choice(len(s), p=s)
for a in range(len(node.children)):
node.update_cfr(hands[player], a, u[a] - u[s_a])
return u[s_a]
else:
s = node.get_current_strategy(hands[node.player])
m_a = np.argmax(s)
node.strategy[hands[node.player]][m_a] += 1
return espurecfrmax(player, node.children[m_a], hands)
if __name__ == "__main__":
random_state = np.random.RandomState(12)
game = create_game(0, 0, 3, 2)
iterations = 1000
# need 10 to 100x more for sampling based approaches, at least
def basecfr_iteration():
op = np.zeros(DECK_SIZE) + 1.
basecfr(0, game, op)
basecfr(1, game, op)
def cfrplus_iteration():
op = np.zeros(DECK_SIZE) + 1.
cfrplus(0, game, op)
cfrplus(1, game, op)
def dcfr_iteration():
op = np.zeros(DECK_SIZE) + 1.
dcfr(0, game, op)
dcfr(1, game, op)
def lcfr_iteration():
op = np.zeros(DECK_SIZE) + 1.
lcfr(0, game, op)
lcfr(1, game, op)
def cfrplusavg_iteration():
op = np.zeros(DECK_SIZE) + 1.
cfrplusavg(0, game, op)
cfrplusavg(1, game, op)
def cfrplusavg2_iteration():
op = np.zeros(DECK_SIZE) + 1.
cfrplusavg2(0, game, op)
cfrplusavg2(1, game, op)
def cfrbr_iteration():
op = np.zeros(DECK_SIZE) + 1.
set_best_response_actions(0, game, op)
set_best_response_actions(1, game, op)
cfrbr(0, game, op)
cfrbr(1, game, op)
def xfp_iteration():
op = np.zeros(DECK_SIZE) + 1.
set_best_response_actions(0, game, op);
xfp(0, game, op, op)
set_best_response_actions(1, game, op)
xfp(1, game, op, op)
def cfu_iteration():
op = np.zeros(DECK_SIZE) + 1.
basecfu(0, game, op)
basecfu(1, game, op)
def escfr_iteration():
hands = deal_hands(random_state)
escfr(0, game, hands)
escfr(1, game, hands)
def espurecfr_iteration():
hands = deal_hands(random_state)
espurecfr(0, game, hands)
espurecfr(1, game, hands)
def espurecfrmax_iteration():
hands = deal_hands(random_state)
espurecfrmax(0, game, hands)
espurecfrmax(1, game, hands)
#iteration = basecfr_iteration
#iteration = cfrplus_iteration
#iteration = dcfr_iteration
#iteration = lcfr_iteration
#iteration = cfrplusavg_iteration
iteration = cfrplusavg2_iteration
#iteration = cfrbr_iteration
#iteration = xfp_iteration
#iteration = cfu_iteration
#iteration = escfr_iteration
#iteration = espurecfr_iteration
#iteration = espurecfrmax_iteration
print("Exploitability @-1: {}".format(get_exploitability(game)))
for i in range(iterations):
iteration()
increment_iteration_count()
print("Exploitability @{}: {}".format(i, get_exploitability(game)))
# TOMORROW - WRITE GAME SIMULATOR TO SEE ADVANTAGE OVER RANDOM PLAYER? OR PLAY AGAINST IT
from IPython import embed; embed(); raise ValueError()
op = np.zeros(DECK_SIZE) + 1.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment