Created
June 7, 2015 07:51
-
-
Save yinian1992/f08c52e73b88c2dc2fc4 to your computer and use it in GitHub Desktop.
Huawei Texas Hold'em Poker Player
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import errno | |
import socket | |
import sys | |
import time | |
PLAYER_NAMES = ( | |
'Alice', | |
'Bob', | |
'Charlie', | |
'Dave', | |
'Eve', | |
'Frank', | |
'Sam', | |
'Sally', | |
'Trent', | |
'Walter', | |
'Wendy' | |
) | |
class Player: | |
def __init__(self, pid, jetton, money): | |
self.pid = pid | |
self.jetton = jetton | |
self.money = money | |
def __repr__(self): | |
return '<Player {0}: J{1} M{2}>'.format( | |
self.pid, self.jetton, self.money) | |
class Card: | |
def __init__(self, color, point): | |
self.color = color | |
self.point = point | |
def __repr__(self): | |
return '<Card: {0}-{1}>'.format(self.color, self.point) | |
@property | |
def value(self): | |
if self.point.isdigit(): | |
return int(self.point) | |
else: | |
value_tab = {'J': 11, 'Q': 12, 'K': 13, 'A': 14} | |
return value_tab[self.point] | |
class CardSet: | |
def __init__(self, *args): | |
self.cards = list(*args) | |
def add(self, card): | |
self.cards.append(card) | |
@property | |
def values(self): | |
return [card.value for card in self.cards] | |
@property | |
def max(self): | |
return max(self.values) | |
@property | |
def min(self): | |
return min(self.values) | |
def any_binary(self, op, val): | |
any_bool = True | |
for card in self.cards: | |
any_bool &= op(card.value, val) | |
return any_bool | |
def have_binary(self, op, val): | |
have_bool = False | |
for card in self.cards: | |
have_bool |= op(card.value, val) | |
return have_bool | |
def count_dup(self, dup): | |
unique_vals = set(self.values) | |
result = 0 | |
for val in unique_vals: | |
if self.values.count(val) >= dup: | |
result += 1 | |
return result | |
def has_one_pair(self): | |
return self.count_dup(2) == 1 | |
def has_two_pairs(self): | |
return self.count_dup(2) == 2 | |
def has_three_of_a_kind(self): | |
return self.count_dup(3) == 1 | |
def has_four_of_a_kind(self): | |
return self.count_dup(4) == 1 | |
def has_straight(self, length=5): | |
sorted_unique_values = list(set(self.values)) | |
sorted_unique_values.sort() | |
diffs = [j - i for i, j in zip(sorted_unique_values[:-1], | |
sorted_unique_values[1:])] | |
diff_one = '' | |
for diff in diffs: | |
if diff == 1: | |
diff_one += '1' | |
else: | |
diff_one += '0' | |
return '1' * length in diff_one | |
def has_flush(self, length=5): | |
colors = [card.color for card in self.cards] | |
if len(self.cards) <= length: | |
return len(set(colors)) == 1 | |
else: | |
unique_colors = set(colors) | |
for color in unique_colors: | |
if colors.count(color) >= length: | |
return True | |
return False | |
def is_fullhouse(self): | |
return self.has_one_pair() and self.has_three_of_a_kind() | |
def is_straight_flush(self): | |
return self.has_straight() and self.has_flush() | |
def is_royal_flush(self): | |
return self.is_straight_flush() and self.min >= 10 | |
def is_high_card(self): | |
return not (self.has_straight() or self.has_flush() or | |
self.has_one_pair()) | |
class PlayerAction: | |
def __init__(self, player, bet, action): | |
self.player = player | |
self.bet = bet | |
self.action = action | |
def __repr__(self): | |
return '<PlayerAction: {0} {1} {2}>'.format(self.player.pid, self.bet, | |
self.action) | |
class PlayerActionSet: | |
def __init__(self, pid): | |
self.pid = pid | |
self.actions = [] | |
def add(self, action): | |
self.actions.append(action) | |
def count_actions_before(self, action_name): | |
result = 0 | |
for action in self.actions: | |
if action.player.pid == self.pid: | |
break | |
if action.action == action_name: | |
result += 1 | |
return result | |
def count_actions_after(self, action_name): | |
result = 0 | |
skip = True | |
for action in self.actions: | |
if skip: | |
if action.player.pid == self.pid: | |
skip = False | |
continue | |
if action.action == action_name: | |
result += 1 | |
return result | |
class Client: | |
SOCK_BUF_SIZE = 1024 | |
INIT = 0 | |
HOLD = 1 | |
FLOP = 2 | |
TURN = 3 | |
RIVER = 4 | |
def __init__(self, server, client, pid): | |
self.pid = pid | |
self.hand = 1 | |
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
self.cards = CardSet() | |
self.status = Client.INIT | |
self.socket.bind(client) | |
while True: | |
try: | |
self.socket.connect(server) | |
except socket.error as e: | |
if e.errno == errno.ECONNREFUSED: | |
time.sleep(0.1) | |
else: | |
raise e | |
else: | |
break | |
def send(self, msg): | |
self.socket.sendall(msg.encode('ascii')) | |
def recv(self): | |
return self.socket.recv(Client.SOCK_BUF_SIZE).decode('ascii') | |
def reg(self, notify=False): | |
self.send('reg: {0} {1} {2}\n'.format( | |
self.pid, PLAYER_NAMES[int(self.pid[0]) - 1], | |
'need_notify ' if notify else '')) | |
if notify: | |
msg = self.socket.recv(Client.SOCK_BUF_SIZE) | |
print(str(msg)) | |
def get_player_by_pid(self, pid): | |
for player in self.players: | |
if player.pid == pid: | |
return player | |
print('Player "{0}" doesn\'t exist'.format(pid)) | |
return None | |
def get_seats(self, segment): | |
def parse_seat_line(line): | |
if ':' in line: | |
line = line.split(':')[1].strip() | |
else: | |
line = line.strip() | |
info = line.split(' ') | |
return info[0], int(info[1]), int(info[2]) | |
self.players = [] | |
self.button = Player(*parse_seat_line(segment[0])) | |
self.players.append(self.button) | |
print('Button: {0}'.format(self.button)) | |
self.small_blind = Player(*parse_seat_line(segment[1])) | |
self.players.append(self.small_blind) | |
print('Small Blind: {0}'.format(self.small_blind)) | |
if len(segment) > 2: | |
self.big_blind = Player(*parse_seat_line(segment[2])) | |
print('Big Blind: {0}'.format(self.big_blind)) | |
self.players.append(self.big_blind) | |
self.other_players = [] | |
for line in segment[3::]: | |
player = Player(*parse_seat_line(line)) | |
self.players.append(player) | |
print('Other: {0}'.format(player)) | |
def get_blinds(self, segment): | |
def parse_blind_line(line): | |
pid, bet = line.split(':') | |
bet = int(bet.strip()) | |
return pid, bet | |
blinds = [] | |
for line in segment: | |
pid, bet = parse_blind_line(line) | |
blinds.append((pid, bet)) | |
if len(blinds) == 1: | |
self.small_blind_bet = blinds[0][1] | |
print('Small Blind Bet: {0}'.format(self.small_blind_bet)) | |
elif len(blinds) == 2: | |
blind_bool = blinds[0][1] < blinds[1][1] | |
small_blind_index = int(not blind_bool) | |
big_blind_index = int(blind_bool) | |
self.small_blind_bet = blinds[small_blind_index][1] | |
print('Small Blind Bet: {0}'.format(self.small_blind_bet)) | |
self.big_blind_bet = blinds[big_blind_index][1] | |
print('Big Blind Bet: {0}'.format(self.big_blind_bet)) | |
else: | |
print('Invalid blinds info') | |
def get_cards(self, card_type, segment): | |
cards_set_name = '{0}_cards'.format(card_type) | |
if len(segment) == 1: | |
cards_set_name = cards_set_name.rstrip('s') | |
setattr(self, cards_set_name, []) | |
for line in segment: | |
line = line.strip() | |
card = Card(*line.split(' ')) | |
self.cards.add(card) | |
getattr(self, cards_set_name).append(card) | |
print('{0} card: {1}'.format(card_type.title(), card)) | |
if len(segment) == 1: | |
setattr(self, cards_set_name, getattr(self, cards_set_name)[0]) | |
else: | |
setattr(self, cards_set_name, | |
CardSet(getattr(self, cards_set_name))) | |
def get_hold_cards(self, cards_info): | |
self.get_cards('hold', cards_info) | |
def get_flop_cards(self, cards_info): | |
self.get_cards('flop', cards_info) | |
def get_turn_card(self, cards_info): | |
self.get_cards('turn', cards_info) | |
def get_river_card(self, cards_info): | |
self.get_cards('river', cards_info) | |
def get_inquire(self, segment): | |
self.pot = int(segment.pop(-1).strip().split(':')[1].strip()) | |
print('Pot: {0}'.format(self.pot)) | |
self.actions = PlayerActionSet(self.pid) | |
def parse_player_action(line): | |
line = line.strip() | |
pid, jetton, money, bet, action = line.split(' ') | |
jetton = int(jetton) | |
money = int(money) | |
bet = int(bet) | |
player = self.get_player_by_pid(pid) | |
player.jetton = jetton | |
player.money = money | |
return player, bet, action | |
for line in segment: | |
p_action = PlayerAction(*parse_player_action(line)) | |
self.actions.add(p_action) | |
print('Action: {0}'.format(p_action)) | |
def check(self): | |
self.send('check') | |
def call(self): | |
self.send('call') | |
def raise_(self, bet): | |
self.send('raise {0}'.format(bet)) | |
def all_in(self): | |
self.send('all_in') | |
def fold(self): | |
self.send('fold') | |
def dispatch(self): | |
def msg_split(msg): | |
msg = msg.strip() | |
msg_lines = msg.split('\n') | |
segments = [] | |
tmp_segment = [] | |
new_segment = True | |
for line in msg_lines: | |
line = line.strip() | |
if new_segment: | |
if line.endswith('/'): | |
tmp_segment.append(line.rstrip('/')) | |
elif line.startswith('/'): | |
new_segment = False | |
segments.append(tmp_segment) | |
tmp_segment = [] | |
else: | |
tmp_segment.append(line) | |
return segments | |
while True: | |
msg = self.recv() | |
print('Received from server:') | |
print(msg + '\n\n') | |
if msg.startswith('game-over'): | |
self.socket.close() | |
print('Game Over') | |
break | |
segments = msg_split(msg) | |
for segment in segments: | |
segment_type = segment[0] | |
segment_body = segment[1::] | |
if segment_type == 'seat': | |
self.get_seats(segment_body) | |
elif segment_type == 'blind': | |
self.get_blinds(segment_body) | |
elif segment_type == 'hold': | |
self.status = Client.HOLD | |
self.get_hold_cards(segment_body) | |
elif segment_type == 'inquire': | |
self.get_inquire(segment_body) | |
actions = (self.on_hold, self.on_flop, | |
self.on_turn, self.on_river) | |
actions[self.status - 1]() | |
elif segment_type == 'flop': | |
self.status = Client.FLOP | |
self.get_flop_cards(segment_body) | |
elif segment_type == 'turn': | |
self.status = Client.TURN | |
self.get_turn_card(segment_body) | |
elif segment_type == 'river': | |
self.status = Client.RIVER | |
self.get_river_card(segment_body) | |
elif segment_type == 'showdown': | |
self.hand += 1 | |
elif segment_type == 'pot-win': | |
print('TODO: pot-win') | |
print('') | |
def on_hold(self): | |
if self.cards.max <= 8 and \ | |
self.actions.count_actions_before('raise') >= 1: | |
self.fold() | |
return | |
if self.cards.has_flush() or self.cards.has_straight() or \ | |
self.cards.has_one_pair(): | |
self.raise_(100) | |
return | |
self.check() | |
def on_flop(self): | |
if self.cards.has_flush() or self.cards.has_straight(): | |
self.raise_(100) | |
return | |
if self.cards.is_high_card() or self.flop_cards.has_one_pair() or \ | |
(self.cards.has_one_pair() and | |
self.actions.count_actions_before('raise') >= 1): | |
self.fold() | |
return | |
self.check() | |
def on_turn(self): | |
if (self.cards.has_straight() or self.cards.has_flush() or | |
self.cards.has_three_of_a_kind()) and \ | |
self.actions.count_actions_before('raise') <= 1: | |
self.raise_(100) | |
return | |
has_large_pair = False | |
for val in range(11, 15): | |
if self.cards.values.count(val) >= 2: | |
has_large_pair = True | |
if not has_large_pair and \ | |
self.actions.count_actions_before('raise') >= 1: | |
self.fold() | |
return | |
self.check() | |
def on_river(self): | |
if self.cards.has_straight() or self.cards.has_flush() or \ | |
self.cards.is_fullhouse() or self.cards.has_four_of_a_kind() \ | |
or self.cards.is_straight_flush() or \ | |
self.cards.is_royal_flush(): | |
if self.actions.count_actions_before('raise') <= 1: | |
self.raise_(100) | |
return | |
self.check() | |
return | |
has_large_two_pairs = False | |
if self.cards.has_two_pairs(): | |
pair_values = [] | |
value_set = set(self.cards.values) | |
for val in value_set: | |
if self.cards.values.count(val) == 2: | |
pair_values.append(val) | |
if min(pair_values) >= 8: | |
has_large_two_pairs = True | |
if (has_large_two_pairs or self.cards.has_three_of_a_kind()) and \ | |
not (self.cards.has_flush() or self.cards.has_straight()): | |
self.check() | |
return | |
else: | |
self.fold() | |
return | |
self.fold() | |
return | |
def main(argv): | |
if len(argv) < 6: | |
print('Invalid init args') | |
return False | |
server_ip = argv[1] | |
client_ip = argv[3] | |
try: | |
server_port = int(argv[2]) | |
client_port = int(argv[4]) | |
except ValueError: | |
print('Invalid connection port') | |
return False | |
pid = argv[5] | |
client = Client((server_ip, server_port), (client_ip, client_port), pid) | |
client.reg() | |
client.dispatch() | |
if __name__ == '__main__': | |
main(sys.argv) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment