Created
October 18, 2017 03:13
-
-
Save mavieth/19b5ac345e25376b0a719f279bee04e7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import division | |
from collections import Counter, OrderedDict, defaultdict | |
from itertools import chain, combinations | |
from copy import deepcopy | |
from random import getrandbits, uniform | |
from pulp import LpProblem, LpMaximize, LpVariable, LpInteger, lpSum | |
from .exceptions import LineupOptimizerException, LineupOptimizerIncorrectTeamName, LineupOptimizerIncorrectPositionName | |
from .settings import BaseSettings | |
from .player import Player | |
from .lineup import Lineup | |
from .utils import ratio, list_intersection | |
class PositionPlaces: | |
def __init__(self, min, optional): | |
self.min = min | |
self._init_optional = optional | |
self.optional = optional | |
@property | |
def max(self): | |
return self.min + self.optional | |
def add(self): | |
if self.min: | |
self.min -= 1 | |
else: | |
self.optional -= 1 if self.optional else 0 | |
def remove(self): | |
if self.optional < self._init_optional: | |
self.optional += 1 | |
else: | |
self.min += 1 | |
class LineupOptimizer(object): | |
def __init__(self, settings): | |
""" | |
LineupOptimizer select the best lineup for daily fantasy sports. | |
:type settings: BaseSettings | |
""" | |
self._players = [] | |
self._lineup = [] | |
self._available_positions = [] | |
self._available_teams = [] | |
self._positions = {} | |
self._not_linked_positions = {} | |
self._max_from_one_team = None | |
self._settings = settings | |
self._set_settings() | |
self._removed_players = [] | |
self._search_threshold = 0.8 | |
self._min_deviation = 0.06 | |
self._max_deviation = 0.12 | |
@property | |
def lineup(self): | |
""" | |
:rtype: list[Player] | |
""" | |
return self._lineup | |
@property | |
def budget(self): | |
""" | |
:rtype: int | |
""" | |
return self._budget | |
@property | |
def players(self): | |
""" | |
:rtype: list[Player] | |
""" | |
return [player for player in self._players | |
if player not in self._removed_players and player not in self._lineup] | |
@property | |
def removed_players(self): | |
""" | |
:rtype: list[Player] | |
""" | |
return self._removed_players | |
def _set_settings(self): | |
""" | |
Set settings with daily fantasy sport site and kind of sport to optimizer. | |
""" | |
self._budget = self._settings.budget | |
self._total_players = self._settings.get_total_players() | |
self._max_from_one_team = self._settings.max_from_one_team | |
self._get_positions_for_optimizer(self._settings.positions) | |
self._available_positions = self._positions.keys() | |
def _get_positions_for_optimizer(self, positions_list): | |
""" | |
Convert positions list into dict for using in optimizer. | |
:type positions_list: List[LineupPosition] | |
""" | |
positions = {} | |
not_linked_positions = {} | |
positions_counter = Counter([tuple(sorted(p.positions)) for p in positions_list]) | |
for key in positions_counter.keys(): | |
additional_pos = len(list(filter( | |
lambda p: len(p.positions) > len(key) and list_intersection(key, p.positions), positions_list | |
))) | |
min_value = positions_counter[key] + len(list(filter( | |
lambda p: len(p.positions) < len(key) and list_intersection(key, p.positions), positions_list | |
))) | |
positions[key] = PositionPlaces(min_value, additional_pos) | |
for first_position, second_position in combinations(positions.items(), 2): | |
if list_intersection(first_position[0], second_position[0]): | |
continue | |
new_key = tuple(sorted(chain(first_position[0], second_position[0]))) | |
if new_key in positions: | |
continue | |
not_linked_positions[new_key] = PositionPlaces( | |
first_position[1].min + second_position[1].min, | |
first_position[1].optional + second_position[1].optional | |
) | |
positions = OrderedDict(sorted(positions.items(), key=lambda item: len(item[0]))) | |
self._not_linked_positions = not_linked_positions | |
self._positions = positions | |
self._init_positions = positions | |
def set_deviation(self, min_deviation, max_deviation): | |
""" | |
Set deviation ranges for randomness mode | |
:type min_deviation: float | |
:type max_deviation: float | |
""" | |
self._min_deviation = min_deviation | |
self._max_deviation = max_deviation | |
def reset_lineup(self): | |
""" | |
Reset current lineup. | |
""" | |
self._set_settings() | |
self._lineup = [] | |
def load_players_from_CSV(self, filename): | |
""" | |
Load player list from CSV file with passed filename. | |
Calls load_players_from_CSV method from _settings object. | |
:type filename: str | |
""" | |
self._players = self._settings.load_players_from_CSV(filename) | |
self._set_available_teams() | |
def load_players(self, players): | |
""" | |
Manually loads player to optimizer | |
:type players: List[Player] | |
""" | |
self._players = players | |
self._set_available_teams() | |
def _set_available_teams(self): | |
""" | |
Evaluate all available teams. | |
""" | |
self._available_teams = set([p.team for p in self._players]) | |
def remove_player(self, player): | |
""" | |
Remove player from list for selecting players for lineup. | |
:type player: Player | |
""" | |
self._removed_players.append(player) | |
def restore_player(self, player): | |
""" | |
Restore removed player. | |
:type player: Player | |
""" | |
try: | |
self._removed_players.remove(player) | |
except ValueError: | |
pass | |
def _add_to_lineup(self, player): | |
""" | |
Adding player to lineup without checks | |
:type player: Player | |
""" | |
self._lineup.append(player) | |
self._total_players -= 1 | |
self._budget -= player.salary | |
def find_players(self, name): | |
""" | |
Return list of players with similar name. | |
:param name: str | |
:return: List[Player] | |
""" | |
players = self.players | |
possibilities = [(player, ratio(name, player.full_name)) for player in players] | |
possibilities = filter(lambda pos: pos[1] >= self._search_threshold, possibilities) | |
players = sorted(possibilities, key=lambda pos: -pos[1]) | |
return list(map(lambda p: p[0], players)) | |
def get_player_by_name(self, name): | |
""" | |
Return closest player with similar name or None. | |
:param name: str | |
:return: Player | |
""" | |
players = self.find_players(name) | |
return players[0] if players else None | |
def _recalculate_positions(self, players): | |
""" | |
Realculates available positions for optimizer with locked specified players. | |
Return dict with positions for optimizer and number of placed players. | |
:type players: List[Player] | |
:return: Dict, int | |
""" | |
positions = deepcopy(self._init_positions) | |
players.sort(key=lambda p: len(p.positions)) | |
total_added = 0 | |
for player in players: | |
is_added = False | |
changed_positions = [] | |
for position, places in positions.items(): | |
if not list_intersection(player.positions, position): | |
continue | |
if not places.max and list(player.positions) == list(position): | |
is_added = False | |
break | |
is_added = True | |
changed_positions.append(position) | |
if is_added: | |
total_added += 1 | |
[positions[position].add() for position in changed_positions] | |
return positions, total_added | |
def add_player_to_lineup(self, player): | |
""" | |
Force adding specified player to lineup. | |
Return true if player successfully added to lineup. | |
:type player: Player | |
""" | |
if player in self._lineup: | |
raise LineupOptimizerException("This player already in your line up!") | |
if not isinstance(player, Player): | |
raise LineupOptimizerException("This function accept only Player objects!") | |
if self._budget - player.salary < 0: | |
raise LineupOptimizerException("Can't add this player to line up! Your team is over budget!") | |
if self._total_players - 1 < 0: | |
raise LineupOptimizerException("Can't add this player to line up! You already select all {} players!". | |
format(len(self._lineup))) | |
if self._max_from_one_team: | |
from_same_team = len(list(filter(lambda p: p.team == player.team, self.lineup))) | |
if from_same_team + 1 > self._max_from_one_team: | |
raise LineupOptimizerException("You can't set more than {} players from one team.". | |
format(self._max_from_one_team)) | |
players = self.lineup[:] | |
players.append(player) | |
positions, total_added = self._recalculate_positions(players) | |
if total_added == len(players): | |
self._add_to_lineup(player) | |
self._positions = positions | |
for position, places in self._not_linked_positions.items(): | |
if list_intersection(position, player.positions): | |
self._not_linked_positions[position].add() | |
else: | |
raise LineupOptimizerException("You're already select all {}'s".format("/".join(player.positions))) | |
def remove_player_from_lineup(self, player): | |
""" | |
Remove specified player from lineup. | |
:type player: Player | |
""" | |
if not isinstance(player, Player): | |
raise LineupOptimizerException("This function accept only Player objects!") | |
try: | |
self._lineup.remove(player) | |
self._budget += player.salary | |
self._total_players += 1 | |
for position, places in self._positions.items(): | |
if list_intersection(position, player.positions): | |
self._positions[position].remove() | |
for position, places in self._not_linked_positions.items(): | |
if list_intersection(position, player.positions): | |
self._not_linked_positions[position].remove() | |
except ValueError: | |
raise LineupOptimizerException("Player not in line up!") | |
def _validate_optimizer_params(self, teams=None, positions=None): | |
""" | |
Validate passed to optimizer parameters. | |
:type teams: dict[str, int] | |
:type positions: dict[str, int] | |
:return: processed teams and positions | |
""" | |
# check teams parameter | |
if teams: | |
if not isinstance(teams, dict) or not all([isinstance(team, str) for team in teams.keys()]) or \ | |
not all([isinstance(num_of_players, int) for num_of_players in teams.values()]): | |
raise LineupOptimizerException("Teams parameter must be dict where key is team name and value is number" | |
" of players from specified team.") | |
teams = {team.upper(): num_of_players for team, num_of_players in teams.items()} | |
for team, num_of_players in teams.items(): | |
if team not in self._available_teams: | |
raise LineupOptimizerIncorrectTeamName("{} is incorrect team name.".format(team)) | |
if self._max_from_one_team and num_of_players > self._max_from_one_team: | |
raise LineupOptimizerException("You can't set more than {} players from one team.". | |
format(self._max_from_one_team)) | |
# check positions parameter | |
if positions: | |
if not isinstance(positions, dict) or \ | |
not all([isinstance(position, str) for position in positions.keys()]) or \ | |
not all([isinstance(num_of_players, int) for num_of_players in positions.values()]): | |
raise LineupOptimizerException("Positions parameter must be dict where key is position name and value " | |
"is number of players from specified position.") | |
positions = {position.upper(): num_of_players for position, num_of_players in positions.items()} | |
for pos, val in positions.items(): | |
available_places = self._positions[(pos,)].optional | |
if val > self._positions[(pos,)].optional: | |
raise LineupOptimizerException("Max available places for position {} is {}. Got {} ". | |
format(pos, available_places, val)) | |
if (pos,) not in self._available_positions: | |
raise LineupOptimizerIncorrectPositionName("{} is incorrect position name.".format(pos)) | |
else: | |
positions = {} | |
return teams, positions | |
def optimize(self, n, teams=None, positions=None, max_exposure=None, randomness=None, with_injured=False): | |
""" | |
Select optimal lineup from players list. | |
This method uses Mixed Integer Linear Programming method for evaluating best starting lineup. | |
It"s return generator. If you don"t specify n it will return generator with all possible lineups started | |
from highest fppg to lowest fppg. | |
:type n: int | |
:type teams: dict[str, int] | |
:type positions: dict[str, int] | |
:type max_exposure: float | |
:type randomness: bool | |
:type with_injured: bool | |
:rtype: List[Lineup] | |
""" | |
teams, positions = self._validate_optimizer_params(teams, positions) | |
if len(self._lineup) == self._settings.get_total_players(): | |
lineup = Lineup(self._lineup) | |
yield lineup | |
return | |
locked_players = self._lineup[:] | |
players = [player for player in self._players | |
if player not in self._removed_players and player not in self._lineup | |
and isinstance(player, Player) and player.max_exposure != 0.0] | |
for player in self._lineup: | |
if player.max_exposure == 0: | |
self.remove_player_from_lineup(player) | |
current_max_points = 10000000 | |
lineup_points = sum(player.fppg for player in self._lineup) | |
used_players = defaultdict(int) | |
counter = 0 | |
while n > counter: | |
# filter players with exceeded max exposure | |
for player, used in used_players.items(): | |
exposure = player.max_exposure if player.max_exposure is not None else max_exposure | |
if exposure is not None and exposure <= used / n: | |
if player in players: | |
players.remove(player) | |
if player in self.lineup: | |
self.remove_player_from_lineup(player) | |
current_max_points += player.fppg | |
lineup_points -= player.fppg | |
prob = LpProblem("Daily Fantasy Sports", LpMaximize) | |
x = LpVariable.dicts( | |
"table", players, | |
lowBound=0, | |
upBound=1, | |
cat=LpInteger | |
) | |
if randomness: | |
for i, player in enumerate(players): | |
player.deviated_fppg = player.fppg * (1 + (-1 if bool(getrandbits(1)) else 1) * | |
uniform(self._min_deviation, self._max_deviation)) | |
prob += lpSum([player.deviated_fppg * x[player] for player in players]) | |
else: | |
prob += lpSum([player.fppg * x[player] for player in players]) | |
prob += lpSum([player.fppg * x[player] for player in players]) <= current_max_points | |
prob += lpSum([player.salary * x[player] for player in players]) <= self._budget | |
prob += lpSum([x[player] for player in players]) == self._total_players | |
if not with_injured: | |
prob += lpSum([x[player] for player in players if not player.is_injured]) == self._total_players | |
for position, places in self._positions.items(): | |
extra = 0 | |
if len(position) == 1: | |
extra = positions.get(position[0], 0) | |
prob += lpSum([x[player] for player in players if | |
any([player_position in position for player_position in player.positions]) | |
]) >= places.min + extra | |
for position, places in self._not_linked_positions.items(): | |
prob += lpSum([x[player] for player in players if | |
any([player_position in position for player_position in player.positions]) | |
]) >= places.min | |
if teams is not None: | |
for key, value in teams.items(): | |
prob += lpSum([x[player] for player in players if player.team == key]) == value | |
if self._max_from_one_team: | |
for team in self._available_teams: | |
prob += lpSum([x[player] for player in players if player.team == team]) <= self._max_from_one_team | |
prob.solve() | |
if prob.status == 1: | |
lineup_players = self._lineup[:] | |
for player in players: | |
if x[player].value() == 1.0: | |
lineup_players.append(player) | |
for player in lineup_players: | |
used_players[player] += 1 | |
lineup = Lineup(lineup_players) | |
current_max_points = lineup.fantasy_points_projection - lineup_points - 0.001 | |
yield lineup | |
counter += 1 | |
else: | |
raise LineupOptimizerException("Can't generate lineups") | |
self._lineup = locked_players | |
return |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment