-
-
Save He-Pin/f6e985290030664ab8f5 to your computer and use it in GitHub Desktop.
Part of server with the old akka (akka-typed version: https://gist.github.com/anonymous/28accfa8e5f3fe187c4d)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package app.actors.game | |
import akka.actor.{Actor, ActorLogging, ActorRef, Props} | |
import akka.event.{LoggingAdapter, LoggingReceive} | |
import app.actors.game.GameActor.Out.Joined | |
import app.actors.game.GameActorGame.Result | |
import app.algorithms.Pathfinding.Path | |
import app.models.game._ | |
import app.models.game.events._ | |
import app.models.game.world.WObject.Id | |
import app.models.game.world._ | |
import app.models.game.world.buildings._ | |
import app.models.game.world.maps.WorldMaterializer | |
import app.models.game.world.props.ExtractionSpeed | |
import app.models.game.world.units._ | |
import implicits._ | |
import org.joda.time.DateTime | |
import utils.data.{Timeframe, NonEmptyVector} | |
import scala.annotation.tailrec | |
import scala.concurrent.duration._ | |
import scala.language.existentials | |
import scalaz._, Scalaz._ | |
object GameActor { | |
sealed trait In | |
object In { | |
case object CheckTurnTime extends In | |
case class Join(human: Human) extends In | |
case class Leave(human: Human) extends In | |
case class Warp( | |
human: Human, position: Vect2, warpable: WarpableCompanion.Some | |
) extends In | |
/* path does not include objects position and ends in target position */ | |
case class Move(human: Human, id: WObject.Id, path: NonEmptyVector[Vect2]) extends In | |
case class Attack(human: Human, id: WObject.Id, target: Vect2 \/ WObject.Id) extends In | |
case class MoveAttack(move: Move, target: Vect2 \/ WObject.Id) extends In | |
case class Special(human: Human, id: WObject.Id) extends In | |
case class ToggleWaitingForRoundEnd(human: Human) extends In | |
case class Concede(human: Human) extends In | |
} | |
sealed trait Out | |
sealed trait ClientOut extends Out | |
object Out { | |
case class Joined(human: Human, game: ActorRef) extends Out | |
case class Init( | |
id: World.Id, bounds: Bounds, objects: WorldObjs.All, | |
warpZonePoints: Iterable[Vect2], visiblePoints: Iterable[Vect2], | |
selfTeam: Team, otherTeams: Iterable[Team], | |
self: HumanState, others: Iterable[(Player, Option[HumanState])], | |
warpableObjects: Iterable[WarpableStats], | |
objectives: RemainingObjectives, currentTurn: TurnStartedEvt, | |
extractionSpeeds: Set[ExtractionSpeed] | |
) extends ClientOut | |
case class Events(events: Vector[FinalEvent]) extends ClientOut | |
case class Error(error: String) extends ClientOut | |
} | |
private[this] def initMsg(human: Human, tbgame: GameActorGame) | |
(implicit log: LoggingAdapter): Either[String, Out.Init] = { | |
val game = tbgame.game | |
val visibleGame = game.visibleBy(human) | |
val states = visibleGame.states | |
val resourceMap = visibleGame.world.resourcesMap | |
def stateFor(p: Player): Either[String, HumanState] = for { | |
gameState <- states.get(p). | |
toRight(s"can't get game state for $p in $states").right | |
resources <- resourceMap.get(p). | |
toRight(s"can't get game state for $p in $resourceMap").right | |
} yield HumanState(resources, visibleGame.world.populationFor(p), gameState) | |
stateFor(human).right.map { selfState => | |
Out.Init( | |
game.world.id, visibleGame.world.bounds, | |
visibleGame.world.objects ++ | |
game.world.noLongerVisibleImmovableObjectsFor(human.team), | |
visibleGame.world.warpZoneMap.map.keys.map(_._1), | |
visibleGame.world.visibilityMap.map.keys.map(_._1), | |
human.team, game.world.teams - human.team, selfState, | |
(game.world.players - human).map { player => | |
player -> ( | |
if (player.isFriendOf(human)) stateFor(player).right.toOption | |
else None | |
) | |
}, | |
selfState.gameState.canWarp, | |
game.remainingObjectives(human.team), | |
TurnStartedEvt(tbgame.currentPlayer, tbgame.currentTurnTimeframe), | |
ExtractionSpeed.values | |
) | |
} | |
} | |
private def init( | |
human: Human, ref: ActorRef, tbgame: GameActorGame | |
)(implicit log: LoggingAdapter): Unit = | |
initMsg(human, tbgame).fold( | |
err => throw new IllegalStateException(s"cannot init game state: $err"), | |
msg => ref ! msg | |
) | |
private def events( | |
human: Human, ref: ActorRef, events: Events | |
)(implicit log: LoggingAdapter): Unit = { | |
log.debug("### Dispatching events for {} ###", human) | |
log.debug("Events ({}):", events.size) | |
val viewedEvents = events.flatMap { event => | |
log.debug("* {}", event) | |
val viewed = event.asViewedBy(human) | |
if (log.isDebugEnabled) viewed.foreach(log.debug("*** {}", _)) | |
viewed | |
} | |
ref ! Out.Events(viewedEvents) | |
} | |
def props( | |
worldMaterializer: WorldMaterializer, turnTimerSettings: Option[TurnTimers.Settings], | |
aiTeam: Team, starting: Set[GameActor.StartingHuman] | |
) = Props(new GameActor(worldMaterializer, turnTimerSettings, aiTeam, starting)) | |
case class StartingHuman(human: Human, resources: Resources, client: ActorRef) { | |
def game = Game.StartingPlayer(human, resources) | |
} | |
} | |
object GameActorGame { | |
type Result = Game.ResultT[Winner \/ GameActorGame] | |
} | |
trait GameActorGame { | |
import GameActorGame._ | |
def warp(human: Human, position: Vect2, warpable: WarpableCompanion.Some, now: DateTime) | |
(implicit log: LoggingAdapter): Result | |
def move(human: Human, id: WObject.Id, path: NonEmptyVector[Vect2], now: DateTime) | |
(implicit log: LoggingAdapter): Result | |
def special(human: Human, id: WObject.Id, now: DateTime)(implicit log: LoggingAdapter): Result | |
def attack(human: Human, id: WObject.Id, target: Vect2 \/ WObject.Id, now: DateTime) | |
(implicit log: LoggingAdapter): Result | |
def moveAttack( | |
human: Human, id: Id, path: NonEmptyVector[Vect2], target: Vect2 \/ WObject.Id, | |
now: DateTime | |
)(implicit log: LoggingAdapter): Result | |
def toggleWaitingForRoundEnd(human: Human, now: DateTime)(implicit log: LoggingAdapter): Result | |
def concede(human: Human, now: DateTime)(implicit log: LoggingAdapter): Result | |
def game: Game | |
def isJoined(human: Human)(implicit log: LoggingAdapter): Boolean | |
def currentPlayer: Player | |
def currentTurnTimeframe: Option[Timeframe] | |
def currentTurnStartedEvt = TurnStartedEvt(currentPlayer, currentTurnTimeframe) | |
def checkTurnTimes(time: DateTime)(implicit log: LoggingAdapter) | |
: Evented[Winner \/ GameActorGame] | |
} | |
trait GameActorGameStarter[GAGame <: GameActorGame] { | |
type StartedGame = String \/ Evented[GAGame] | |
def apply( | |
world: World, starting: Set[Game.StartingPlayer], | |
objectives: Game.ObjectivesMap, | |
turnTimerSettings: Option[WithCurrentTime[TurnTimers.Settings]] | |
)(implicit log: LoggingAdapter): StartedGame = { | |
val game = Game(world, starting, objectives) | |
game.flatMap(apply(_, turnTimerSettings)) | |
} | |
def apply(game: Game, turnTimerSettings: Option[WithCurrentTime[TurnTimers.Settings]]) | |
(implicit log: LoggingAdapter): StartedGame = { | |
val turnTimers = turnTimerSettings.map(_.map(TurnTimers(game.world.humans, _))) | |
startNewGame(game, turnTimers) | |
} | |
protected[this] def startNewGame( | |
game: Game, turnTimers: Option[WithCurrentTime[TurnTimers]] | |
)(implicit log: LoggingAdapter): String \/ Evented[GAGame] | |
} | |
class GameActor private ( | |
worldMaterializer: WorldMaterializer, turnTimerSettings: Option[TurnTimers.Settings], | |
aiTeam: Team, starting: Set[GameActor.StartingHuman] | |
) extends Actor with ActorLogging { | |
import app.actors.game.GameActor._ | |
import context.dispatcher | |
implicit val logging = log | |
log.debug( | |
"initializing game actor: starting={} turnTimer={}, aiTeam={}", | |
starting, turnTimerSettings, aiTeam | |
) | |
private[this] var clients = starting.map(data => data.human -> data.client).toMap | |
private[this] var game: GameActorGame = { | |
val humanTeams = starting.map(_.human.team) | |
val world = worldMaterializer.materialize(humanTeams).right_! | |
log.debug("World initialized to {}", world) | |
val objectives = Map( | |
aiTeam -> Objectives( | |
destroyAllCriticalObjects = Some(Objective.DestroyAllCriticalObjects) | |
) | |
) ++ humanTeams.map { _ -> Objectives( | |
// gatherResources = Some(Objective.GatherResources(world, Resources(200), Percentage(0.15))), | |
// collectVps = Some(Objective.CollectVPs(VPS(10))), | |
destroyAllCriticalObjects = Some(Objective.DestroyAllCriticalObjects) | |
) }.toMap | |
log.debug("Objectives initialized to {}", objectives) | |
SemiRealtimeGame( | |
world, starting.map(_.game), objectives, | |
turnTimerSettings.map(WithCurrentTime(_, DateTime.now)) | |
).fold( | |
err => throw new IllegalStateException(s"Cannot initialize game: $err"), | |
evented => { | |
log.debug("Turn based game initialized to {}", evented) | |
starting.foreach { data => | |
data.client ! Joined(data.human, self) | |
// We need to init the game to starting state. | |
init(data.human, data.client, evented.value) | |
events(data.human, data.client, evented.events) | |
} | |
starting.foreach { data => | |
events(data.human, data.client, evented.events) | |
} | |
evented.value | |
} | |
) | |
} | |
val turnTimerChecker = | |
context.system.scheduler.schedule(1.second, 1.second, self, In.CheckTurnTime) | |
@throws[Exception](classOf[Exception]) | |
override def postStop() = { | |
super.postStop() | |
turnTimerChecker.cancel() | |
} | |
val notLoggedReceive: Receive = { | |
case In.CheckTurnTime => | |
postGameChange(checkedTurnTimes) | |
} | |
val loggedReceive = LoggingReceive { | |
case In.Join(human) => | |
val ref = sender() | |
ref ! Out.Joined(human, self) | |
def doInit(tbg: GameActorGame): Unit = { | |
init(human, ref, tbg) | |
clients += human -> ref | |
} | |
if (game.isJoined(human)) { | |
log.info("Rejoining {} to {}", human, self) | |
doInit(game) | |
} | |
else { | |
log.error("Unknown human trying to join the game: {}", human) | |
// TODO: allow new joins? | |
// update( | |
// ref, human, | |
// _.join(human, GameActor.StartingResources).right.map { evtTbg => | |
// doInit(evtTbg.value) | |
// evtTbg | |
// } | |
// ) | |
} | |
// case In.Leave(human) => | |
// if (clients.contains(human)) { | |
// update(sender(), human, _.leave(human).right.map { evtTbg => | |
// clients -= human | |
// evtTbg | |
// }) | |
// } | |
// else { | |
// sender ! Out.Error(s"No human $human is joined.") | |
// } | |
case In.Warp(human, position, warpable) => | |
update(sender(), human, _.warp(human, position, warpable, DateTime.now)) | |
case In.Move(human, id, path) => | |
update(sender(), human, _.move(human, id, path, DateTime.now)) | |
case In.Attack(human, id, target) => | |
update(sender(), human, _.attack(human, id, target, DateTime.now)) | |
case In.MoveAttack(move, target) => | |
update( | |
sender(), move.human, | |
_.moveAttack(move.human, move.id, move.path, target, DateTime.now) | |
) | |
case In.Special(human, id) => | |
update(sender(), human, _.special(human, id, DateTime.now)) | |
case In.ToggleWaitingForRoundEnd(human) => | |
update(sender(), human, _.toggleWaitingForRoundEnd(human, DateTime.now)) | |
case In.Concede(human) => | |
update(sender(), human, _.concede(human, DateTime.now)) | |
} | |
val receive: PartialFunction[Any, Unit] = notLoggedReceive orElse loggedReceive | |
private[this] def checkedTurnTimes = game.checkTurnTimes(DateTime.now) | |
private[this] def update( | |
requester: ActorRef, human: Human, f: GameActorGame => GameActorGame.Result | |
): Unit = { | |
log.debug("Updating game by a request from {}", requester) | |
val afterTimeCheck = checkedTurnTimes | |
afterTimeCheck.value.fold( | |
_ => postGameChange(afterTimeCheck), | |
tbg => f(tbg).map(evt => afterTimeCheck.events ++: evt).fold( | |
err => { | |
log.error(err) | |
requester ! Out.Error(err) | |
}, | |
postGameChange | |
) | |
) | |
} | |
private[this] def postGameChange(evented: Evented[Winner \/ GameActorGame]): Unit = { | |
dispatchEvents(evented.events) | |
evented.value.fold( | |
winner => { | |
log.info("Game is finished, won by {}", winner) | |
context.stop(self) | |
}, | |
g => game = g | |
) | |
} | |
private[this] def dispatchEvents(events: Events): Unit = { | |
if (events.nonEmpty) clients.foreach { case (human, ref) => | |
GameActor.events(human, ref, events) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package app.actors.game | |
import java.util.UUID | |
import akka.actor.SupervisorStrategy.Stop | |
import akka.actor._ | |
import akka.event.{LoggingAdapter, LoggingReceive} | |
import app.actors.MsgHandler.Client2Server.BackgroundSFO | |
import app.actors.NetClient.Management.In.JoinGame.{Mode, PvPMode} | |
import app.actors.game.GameActor.StartingHuman | |
import app.actors.{MsgHandler, NetClient, Server} | |
import app.models.User | |
import app.models.game.world.maps.{GameMaps, SingleplayerMap, WorldMaterializer} | |
import app.models.game.world.{ExtractorStats, Resources, World} | |
import app.models.game.{Bot, Human, Team, TurnTimers} | |
import implicits._ | |
import infrastructure.GCM | |
import launch.RTConfig | |
import org.joda.time.DateTime | |
import spire.math.UInt | |
import scala.concurrent.duration._ | |
import scalaz.Scalaz._ | |
import scalaz._ | |
import scalaz.effect.IO | |
object GamesManagerActor { | |
val StartingResources = ExtractorStats.cost * Resources(4) | |
sealed trait In | |
object In { | |
// After user connects to the server, he should check whether he is in game or not. | |
case class CheckUserStatus(user: User) extends In | |
// Game joining | |
case class Join(user: User, mode: NetClient.Management.In.JoinGame.Mode) extends In | |
case class CancelJoinGame(user: User) extends In | |
// Stats report for control client | |
case object StatsReport extends In | |
} | |
sealed trait Out | |
object Out { | |
case class StatsReport(users: UInt, games: UInt) extends Out | |
} | |
sealed trait Internal | |
object Internal { | |
case object CleanupBackgroundWaitingList | |
/* Check if we can shutdown. */ | |
case object CheckShutdown | |
} | |
// TODO: proper singleplayer | |
// object PVEGame { | |
// sealed trait PresetTeam { | |
// def gameTeam: Team | |
// } | |
// object PresetTeam { | |
// object Red extends PresetTeam { val gameTeam = Team() } | |
// object Blue extends PresetTeam { val gameTeam = Team() } | |
// } | |
// | |
// val empty = PVEGame(None, Set.empty, Set.empty) | |
// } | |
// case class PVEGame(ref: Option[ActorRef], redTeamPlayers: Set[User], blueTeamPlayers: Set[User]) { | |
// def giveTeam: PVEGame.PresetTeam = | |
// redTeamPlayers.size ?|? blueTeamPlayers.size match { | |
// case Ordering.LT => PVEGame.PresetTeam.Red | |
// case Ordering.GT => PVEGame.PresetTeam.Blue | |
// case Ordering.EQ => if (Random.chance(0.5)) PVEGame.PresetTeam.Red else PVEGame.PresetTeam.Blue | |
// } | |
// | |
// def add(user: User, team: PresetTeam): PVEGame = team match { | |
// case PresetTeam.Red => copy(redTeamPlayers = redTeamPlayers + user) | |
// case PresetTeam.Blue => copy(blueTeamPlayers = blueTeamPlayers + user) | |
// } | |
// } | |
case class BackgroundToken(value: String) extends AnyVal | |
object BackgroundToken { | |
val newToken = IO { BackgroundToken(UUID.randomUUID().toString) } | |
} | |
case class WaitingListEntry(user: User, client: ActorRef, backgroundToken: BackgroundToken) | |
private def joinGame(game: ActorRef, human: Human, client: ActorRef): Unit = | |
game.tell(GameActor.In.Join(human), client) | |
} | |
class GamesManagerActor( | |
maps: GameMaps, gcm: Option[(ActorRef, RTConfig.GCM)] | |
)(implicit rtConfig: RTConfig) extends Actor with ActorLogging { | |
import app.actors.game.GamesManagerActor._ | |
import context.dispatcher | |
private[this] var waitingList = Vector.empty[WaitingListEntry] | |
// token -> last heartbeat | |
private[this] var waitingInBackground = Map.empty[BackgroundToken, DateTime] | |
private[this] var user2game = Map.empty[User, (ActorRef, Human)] | |
private[this] var game2humans = Map.empty[ActorRef, Set[Human]] | |
context.system.scheduler.schedule( | |
0.seconds, 1.second, self, GamesManagerActor.Internal.CleanupBackgroundWaitingList | |
) | |
override def supervisorStrategy = OneForOneStrategy() { | |
case _ => Stop | |
} | |
private[this] val notLoggedReceive: Receive = { | |
case GamesManagerActor.Internal.CleanupBackgroundWaitingList => | |
val now = DateTime.now() | |
val expiredKeys = waitingInBackground.keys.filter { token => | |
val lastBeat = waitingInBackground(token) | |
val timePassed = now - lastBeat | |
val active = timePassed <= rtConfig.gamesManager.backgroundHeartbeatTTL.duration | |
if (! active) log.debug( | |
"Timing out background token {}: {} > {}", | |
token, timePassed, rtConfig.gamesManager.backgroundHeartbeatTTL.duration | |
) | |
!active | |
} | |
expiredKeys.foreach(waitingInBackground -= _) | |
if (expiredKeys.nonEmpty) notifyGCM() | |
case GamesManagerActor.Internal.CheckShutdown => | |
val games = game2humans.size | |
log.debug("Checking for shutdown state, games: {}", games) | |
if (games === 0) { | |
log.info("No games alive, shutting down.") | |
context.system.shutdown() | |
} | |
} | |
override def receive = notLoggedReceive orElse LoggingReceive { | |
case GamesManagerActor.In.CheckUserStatus(user) => | |
user2game.get(user).foreach { case (game, human) => | |
log.info("{} joining game {} on user status check", human, game) | |
joinGame(game, human, sender()) | |
} | |
case GamesManagerActor.In.Join(user, mode) => | |
user2game.get(user).fold2( | |
{ | |
if (waitingList.exists(_.user === user)) log.warning( | |
"Not joining a new game, because {} is already in a waiting list, ref: {}", | |
user, sender() | |
) | |
else noExistingGame(user, mode, sender()) | |
}, | |
{ case (game, human) => | |
log.info("{} joining game {} on game join", human, game) | |
joinGame(game, human, sender()) | |
} | |
) | |
case GamesManagerActor.In.CancelJoinGame(user) => | |
waitingList.indexWhere(_.user === user) match { | |
case -1 => | |
log.warning("Not cancelling join game, because {} is not in a waiting list.", user) | |
case idx => | |
val entry = waitingList(idx) | |
context.unwatch(entry.client) | |
waitingList = waitingList.removeAt(idx) | |
notifyGCM() | |
sender() ! NetClient.Management.Out.JoinGameCancelled | |
} | |
case NetClient.Management.In.CancelBackgroundToken(token) => | |
removeBackgroundToken(token) | |
case MsgHandler.Client2Server.BackgroundSFO(kind, token) => | |
if (waitingInBackground contains token) { | |
kind match { | |
case BackgroundSFO.Kind.Heartbeat => | |
waitingInBackground += token -> DateTime.now() | |
log.debug("Background heartbeat from {}", token) | |
case BackgroundSFO.Kind.Cancel => | |
removeBackgroundToken(token) | |
} | |
} | |
else { | |
// TODO: should we tell sender that his heartbeat was expired? | |
log.info("Ignoring background {} from unknown token: {}", kind, token) | |
} | |
case Terminated(ref) => | |
// Game termination | |
game2humans.get(ref).foreach { humans => | |
log.info("Game {} terminated for humans {}", ref, humans) | |
game2humans -= ref | |
humans.foreach { human => user2game -= human.user } | |
} | |
// NetClient termination | |
waitingList.zipWithIndex.collectFirst { | |
case (entry @ WaitingListEntry(_, `ref`, _), idx) => | |
(entry, idx) | |
}.foreach { case (entry, idx) => | |
log.info("{} going into background", entry) | |
waitingList = waitingList.removeAt(idx) | |
waitingInBackground += entry.backgroundToken -> DateTime.now() | |
notifyGCM() | |
} | |
case Server.ShutdownInitiated => | |
log.info("Shutdown mode initiated.") | |
context.system.scheduler | |
.schedule(0.seconds, 1.second, self, GamesManagerActor.Internal.CheckShutdown) | |
case GamesManagerActor.In.StatsReport => | |
sender ! GamesManagerActor.Out.StatsReport(UInt(user2game.size), UInt(game2humans.size)) | |
} | |
private[this] def removeBackgroundToken(token: BackgroundToken): Unit = { | |
log.info("Removing background token: {}", token) | |
waitingInBackground -= token | |
notifyGCM() | |
} | |
private[this] def noExistingGame(user: User, mode: Mode, client: ActorRef): Unit = { | |
mode match { | |
case Mode.Singleplayer => | |
//launchRandomGenerated(user, client) | |
launchPVE(user, client) | |
case pvp: PvPMode => | |
val token = BackgroundToken.newToken.unsafePerformIO() | |
val entry = WaitingListEntry(user, client, token) | |
waitingList :+= entry | |
if (waitingList.size < pvp.playersNeeded) { | |
log.debug( | |
"Added {} from {} to {} waiting list: {}", | |
user, client, mode, waitingList | |
) | |
notifyGCM() | |
context.watch(client) | |
client ! NetClient.Management.Out.WaitingListJoined(token) | |
} | |
else fromWaitingList(pvp) | |
} | |
} | |
private[this] def notifyGCM(): Unit = { | |
gcm.foreach { case (ref, cfg) => | |
val foreground = GCM.Data.SearchingForOpponent.InForeground(UInt(waitingList.size)) | |
val background = GCM.Data.SearchingForOpponent.InBackground(UInt(waitingInBackground.size)) | |
ref ! GCM.searchingForOpponent(foreground, background, cfg.searchForOpponentTTL) | |
} | |
} | |
private[this] def launchPVE(user: User, client: ActorRef) = { | |
// TODO: proper PVE | |
// val team = pveGame.giveTeam | |
// if (pveGame.ref.isEmpty) { | |
// val game = createGame( | |
// maps.pve.random, Some(TurnTimers.Settings()), Team(), | |
// Set(StartingHuman(Human(user, team.gameTeam), StartingResources, client)) | |
// ) | |
// pveGame = pveGame.copy(ref = Some(game)) | |
// } | |
// pveGame = pveGame.add(user, team) | |
createGame( | |
maps.pve.random, None, Team(), | |
Set(StartingHuman(Human(user, Team()), StartingResources, client)) | |
) | |
} | |
private[this] def launchRandomGenerated(user: User, client: ActorRef) = { | |
val materializer = SingleplayerMap { data => implicit log => | |
val npcTeam = Team() | |
val npcBot = Bot(npcTeam) | |
val spawnerBot = Bot(npcTeam) | |
World.create( | |
data.humanTeam, () => npcBot, () => spawnerBot, staticObjectsKnownAtStart = false | |
) | |
} | |
createGame( | |
materializer, None, Team(), | |
Set(StartingHuman(Human(user, Team()), StartingResources, client)) | |
) | |
} | |
private[this] def fromWaitingList(mode: PvPMode): Unit = { | |
val (entries, newWaitingList) = waitingList.splitAt(mode.playersNeeded) | |
waitingList = newWaitingList | |
notifyGCM() | |
val teams = Vector.fill(mode.teams)(Team()) | |
val players = entries.zipWithIndex.map { case (entry, idx) => | |
val team = teams.wrapped(idx) | |
StartingHuman(Human(entry.user, team), StartingResources, entry.client) | |
}.toSet | |
log.debug( | |
"Fetched {} from waiting list for mode {}, rest={}", players, mode, newWaitingList | |
) | |
// TODO: will fail if we have more teams than any of the maps support | |
val map = maps.pvpMapFor(mode.playersNeeded).right_!.unsafePerformIO() | |
val npcTeam = Team() | |
createGame(map, Some(TurnTimers.Settings()), npcTeam, players) | |
} | |
private[this] def createGame( | |
worldMaterializer: WorldMaterializer, turnTimerSettings: Option[TurnTimers.Settings], | |
npcTeam: Team, starting: Set[GameActor.StartingHuman] | |
): ActorRef = { | |
val game = context.actorOf(GameActor.props( | |
worldMaterializer, turnTimerSettings, npcTeam, starting | |
)) | |
context.watch(game) | |
starting.foreach { data => | |
user2game += data.human.user -> ((game, data.human)) | |
} | |
game2humans += game -> starting.map(_.human) | |
log.info("Game {} created for {}", game, starting) | |
game | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package app.actors | |
import akka.actor.{Actor, ActorLogging} | |
import argonaut._, Argonaut._ | |
import infrastructure.GCM | |
import launch.RTConfig | |
import spray.client.pipelining._ | |
import spray.http._ | |
import spray.httpx.marshalling.Marshaller | |
import scala.util.Try | |
class GCMSender(key: RTConfig.GCM.Key) extends Actor with ActorLogging { | |
import context.dispatcher | |
implicit private[this] val jsonMarshaller = | |
Marshaller.delegate[Json, String](ContentTypes.`application/json`)(_.nospaces) | |
private[this] val pipeline = | |
addHeader("Authorization", s"key=${key.value}") ~> sendReceive | |
override def receive: Receive = { | |
case m: GCM.Message => | |
log.info("Sending GCM message: {}", m) | |
val body = m.asJson | |
log.debug("GCM message as JSON: {}", body.nospaces) | |
val future = pipeline(Post("https://gcm-http.googleapis.com/gcm/send", body)) | |
// Logging isn't thread safe. | |
future.onComplete(r => self ! GCMSender.Internal.GCMComplete(m, r)) | |
case GCMSender.Internal.GCMComplete(message, result) => | |
log.info("GCM response for {}: {}", message, result) | |
} | |
} | |
object GCMSender { | |
object Internal { | |
case class GCMComplete(message: GCM.Message, result: Try[HttpResponse]) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package app.actors | |
import java.nio.ByteOrder | |
import akka.actor.{Actor, ActorLogging, ActorRef, Props} | |
import akka.event.{LoggingAdapter, LoggingReceive} | |
import akka.io.Tcp._ | |
import akka.util.ByteString | |
import app.actors.game.GamesManagerActor | |
import app.protobuf.parsing.Parsing | |
import app.protobuf.serializing.Serializing | |
import utils.network.IntFramedPipeline | |
import utils.network.IntFramedPipeline.Frame | |
import scalaz.Scalaz._ | |
import scalaz._ | |
/** | |
* Created by arturas on 2014-10-15. | |
*/ | |
object MsgHandler { | |
private case object Ack extends Event | |
// We need this because we can't pattern match in Receive on \/ | |
sealed trait Server2Client { | |
def toEither = this match { | |
case Server2Client.GameMsg(msg) => msg.left | |
case Server2Client.ControlMsg(msg) => msg.right | |
} | |
} | |
object Server2Client { | |
case class GameMsg(msg: NetClient.Msgs.FromServer) extends Server2Client | |
case class ControlMsg(msg: NetClient.Control.Out) extends Server2Client | |
} | |
sealed trait Client2Server { | |
def message: Serializable | |
} | |
object Client2Server { | |
case class GameMsg(message: NetClient.Msgs.FromClient) extends Client2Server | |
case class ControlMsg(message: NetClient.Msgs.FromControlClient) extends Client2Server | |
// Background searching for opponent heartbeat | |
case class BackgroundSFO( | |
kind: BackgroundSFO.Kind, token: GamesManagerActor.BackgroundToken | |
) extends Client2Server { | |
override def message = this | |
} | |
object BackgroundSFO { | |
sealed trait Kind | |
object Kind { | |
case object Heartbeat extends Kind | |
case object Cancel extends Kind | |
} | |
} | |
} | |
} | |
class MsgHandler( | |
connection: ActorRef, netClientProps: ActorRef => Props, | |
maxToClientBufferSize: Int = 1024 * 1024 | |
)(implicit byteOrder: ByteOrder) | |
extends Actor with ActorLogging { | |
import MsgHandler._ | |
context.watch(connection) | |
private[this] val netClient = | |
context.actorOf(netClientProps(self), "net-client") | |
context.watch(netClient) | |
private[this] implicit val logger = log | |
private[this] val pipeline = new MsgHandlerPipeline | |
private[this] val lowWatermark = maxToClientBufferSize / 4 | |
private[this] val highWatermark = maxToClientBufferSize * 3 / 4 | |
private[this] var storage = Vector.empty[ByteString] | |
private[this] var stored = 0 | |
private[this] var closing = false | |
private[this] var suspended = false | |
private[this] val fromClient: Receive = { | |
case Received(data) => pipeline.unserialize(data).foreach { | |
case -\/(err) => log.error(err) | |
case \/-(clientOrControlMsg) => netClient ! clientOrControlMsg.message | |
} | |
} | |
private[this] val buffering = { | |
LoggingReceive(fromClient orElse { | |
case msg: MsgHandler.Server2Client => | |
buffer(pipeline.serialize(msg)) | |
case Ack => | |
acknowledge() | |
case msg: ConnectionClosed => | |
log.info(s"closing = true by {}.", msg) | |
closing = true | |
}) | |
} | |
override val receive = LoggingReceive(fromClient orElse { | |
case msg: Server2Client => | |
val data = pipeline.serialize(msg) | |
buffer(data) | |
connection ! Write(data, Ack) | |
context.become(buffering, discardOld = false) | |
case msg: Server.ShutdownInitiated.type => | |
netClient ! msg | |
case msg: ConnectionClosed => | |
log.info(s"Connection closed by {}.", msg) | |
context.stop(self) | |
}) | |
private def buffer(data: ByteString): Unit = { | |
storage :+= data | |
stored += data.size | |
if (stored > maxToClientBufferSize) { | |
log.warning(s"drop connection to [$connection] (buffer overrun)") | |
context stop self | |
} else if (stored > highWatermark) { | |
log.debug(s"suspending reading") | |
connection ! SuspendReading | |
suspended = true | |
} | |
} | |
private def acknowledge(): Unit = { | |
require(storage.nonEmpty, "storage was empty") | |
val size = storage.head.size | |
stored -= size | |
storage = storage.tail | |
if (suspended && stored < lowWatermark) { | |
log.debug("resuming reading") | |
connection ! ResumeReading | |
suspended = false | |
} | |
if (storage.isEmpty) { | |
if (closing) context stop self | |
else context.unbecome() | |
} | |
else connection ! Write(storage.head, Ack) | |
} | |
} | |
class MsgHandlerPipeline(implicit byteOrder: ByteOrder, log: LoggingAdapter) { | |
private[this] val intFramed = new IntFramedPipeline() | |
def unserialize(data: ByteString) = intFramed.fromFramedData(data).map { frame => | |
Parsing.parse(frame.data).leftMap(err => s"Cannot decode $frame into message: $err") | |
} | |
def serialize(data: MsgHandler.Server2Client) = | |
data |> Serializing.serialize |> Frame |> intFramed.withFrameSize | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package app.actors | |
import java.util.UUID | |
import akka.actor._ | |
import akka.pattern._ | |
import netmsg.ProtoChecksum | |
import spire.math.UInt | |
import scala.concurrent.Future | |
import scala.concurrent.duration._ | |
import akka.event.LoggingReceive | |
import akka.io.Tcp.Unbind | |
import akka.util.Timeout | |
import app.actors.NetClient.Management.{SessionToken, PlainPassword, Credentials} | |
import app.actors.game.{GamesManagerActor, GameActor} | |
import app.models._ | |
import app.models.game.Human | |
import app.persistence.tables.Tables | |
import implicits._ | |
import scala.reflect.ClassTag | |
import scalaz._, Scalaz._ | |
import app.persistence.DBDriver._ | |
import org.joda.time.DateTime | |
import scala.util.Try | |
object NetClient { | |
type GameInMsg = Human => GameActor.In | |
object Control { | |
case class SecretKey(key: String) extends AnyVal | |
sealed trait In | |
object In { | |
case object Shutdown extends In | |
case object Status extends In | |
} | |
sealed trait Out | |
object Out { | |
case class GenericReply(success: Boolean, message: Option[String]) extends Out | |
object GenericReply { | |
val success = GenericReply(success = true, None) | |
def error(msg: String) = GenericReply(success = false, Some(msg)) | |
} | |
case class Status( | |
tcpClients: Option[UInt], playingUsers: Option[UInt], games: Option[UInt] | |
) extends Out { | |
override def toString = { | |
import Status.asStr | |
s"Status[tcp clients: ${asStr(tcpClients)}, playing users: ${ | |
asStr(playingUsers)}, games: ${asStr(games)}]" | |
} | |
} | |
object Status { | |
def asStr(o: Option[UInt]) = o.fold2("-", _.toString()) | |
} | |
} | |
} | |
object Management { | |
sealed trait AuthToken | |
case class SessionToken(value: String) extends AuthToken | |
object SessionToken { | |
def random() = SessionToken(UUID.randomUUID().shortStr) | |
} | |
case class PlainPassword(value: String) extends AuthToken { | |
import com.github.t3hnar.bcrypt._ | |
def encrypted = value.bcrypt | |
def check(hash: String) = value.isBcrypted(hash) | |
} | |
case class Credentials(name: String, auth: AuthToken) { | |
def check(sessionToken: String, passwordHash: String): Boolean = | |
auth match { | |
case SessionToken(token) => sessionToken == token | |
case password: PlainPassword => password.check(passwordHash) | |
} | |
} | |
sealed trait In | |
object In { | |
case object AutoRegister extends In | |
case class CheckNameAvailability(name: String) extends In | |
case class Register( | |
username: String, password: PlainPassword, email: String | |
) extends In | |
case class Login(credentials: Credentials) extends In | |
object JoinGame { | |
sealed trait Mode | |
sealed trait PvPMode extends Mode { | |
def playersPerTeam: Int | |
def teams: Int | |
def playersNeeded = teams * playersPerTeam | |
} | |
object Mode { | |
case object Singleplayer extends Mode | |
case object OneVsOne extends PvPMode { def playersPerTeam = 1; def teams = 2 } | |
} | |
} | |
case class JoinGame(mode: JoinGame.Mode) extends In | |
case object CancelJoinGame extends In | |
// After client logs in it should cancel the active background token. | |
case class CancelBackgroundToken(token: GamesManagerActor.BackgroundToken) extends In | |
} | |
sealed trait Out | |
object Out { | |
case class CheckNameAvailabilityResponse(name: String, available: Boolean) extends Out | |
case class RegisterResponse(newToken: Option[SessionToken]) extends Out | |
sealed trait LoginResponse extends Out | |
case object InvalidCredentials extends LoginResponse | |
case class LoggedIn( | |
user: User, token: SessionToken, autogenerated: Boolean | |
) extends LoginResponse | |
case class GameJoined(human: Human) extends Out | |
case object JoinGameCancelled extends Out | |
case class WaitingListJoined(token: GamesManagerActor.BackgroundToken) extends Out | |
} | |
} | |
object Msgs { | |
sealed trait FromClient extends Serializable | |
object FromClient { | |
case object ProtoVersionCheck extends FromClient | |
case class Game(msg: GameInMsg) extends FromClient | |
case class Management(msg: NetClient.Management.In) extends FromClient | |
case class TimeSync(clientNow: DateTime) extends FromClient | |
} | |
case class FromControlClient(key: NetClient.Control.SecretKey, msg: NetClient.Control.In) | |
sealed trait FromServer | |
object FromServer { | |
case class ProtoVersionCheck(checksum: String) extends FromServer | |
case class Game(msg: GameActor.ClientOut) extends FromServer | |
case class Management(msg: NetClient.Management.Out) extends FromServer | |
case class TimeSync(clientNow: DateTime, serverNow: DateTime) extends FromServer | |
} | |
} | |
} | |
class NetClient( | |
msgHandler: ActorRef, gamesManager: ActorRef, server: ActorRef, | |
controlKey: NetClient.Control.SecretKey, | |
db: Database | |
) extends Actor with ActorLogging { | |
import app.actors.NetClient.Management.In._ | |
import app.actors.NetClient.Management.Out._ | |
import app.actors.NetClient.Msgs._ | |
import app.actors.NetClient._ | |
implicit class ServerMsgExts(msg: FromServer) { | |
def out(): Unit = msgHandler ! MsgHandler.Server2Client.GameMsg(msg) | |
} | |
implicit class ManagementMsgExts(msg: Management.Out) { | |
def out(): Unit = FromServer.Management(msg).out() | |
} | |
implicit class GameMsgExts(msg: GameActor.ClientOut) { | |
def out(): Unit = FromServer.Game(msg).out() | |
} | |
implicit class ControlMsgExts(msg: Control.Out) { | |
def out(): Unit = msgHandler ! MsgHandler.Server2Client.ControlMsg(msg) | |
} | |
context.watch(msgHandler) | |
override def receive = notLoggedIn | |
private[this] var shutdownInitiated = false | |
private[this] var inGameOpt = Option.empty[(ActorRef, Human)] | |
@throws[Exception](classOf[Exception]) | |
override def postStop(): Unit = { | |
if (shutdownInitiated) { | |
inGameOpt.foreach { case (gameRef, human) => | |
// Auto-concede if lost connection when shutdown is initiated. | |
log.info("Auto conceding because lost connection in shutdown mode.") | |
gameRef ! GameActor.In.Concede(human) | |
} | |
} | |
} | |
private[this] val common: Receive = { | |
case FromClient.ProtoVersionCheck => | |
FromServer.ProtoVersionCheck(ProtoChecksum.checksum).out() | |
case FromClient.TimeSync(clientNow) => | |
FromServer.TimeSync(clientNow, DateTime.now).out() | |
case m: MsgHandler.Client2Server.BackgroundSFO => | |
gamesManager ! m | |
case FromClient.Management(m: NetClient.Management.In.CancelBackgroundToken) => | |
gamesManager ! m | |
case m: NetClient.Management.Out.WaitingListJoined => | |
m.out() | |
case Server.ShutdownInitiated => | |
shutdownInitiated = true | |
case c: FromControlClient => | |
import context.dispatcher | |
handleControl(c).onComplete { | |
case util.Success(m) => m.out() | |
case util.Failure(err) => log.error("Error while handling control message {}: {}", c, err) | |
} | |
} | |
def handleControl(c: FromControlClient): Future[Control.Out] = { | |
if (c.key === controlKey) c.msg match { | |
case Control.In.Shutdown => | |
server ! Unbind | |
Future.successful(Control.Out.GenericReply.success) | |
case Control.In.Status => | |
import context.dispatcher | |
def ask[Reply : ClassTag, Result]( | |
ref: AskableActorRef, message: Any, f: Reply => Result | |
) = { | |
ref.ask(message)(Timeout(3.seconds)).mapTo[Reply].map(r => Some(f(r))).recover { | |
case e => | |
log.error("Error while asking for {}: {}", message, e) | |
None | |
} | |
} | |
val clientsCountF = ask[Server.Out.ReportClientCount, UInt]( | |
server, Server.In.ReportClientCount, r => r.clients | |
) | |
val gamesCountF = ask[GamesManagerActor.Out.StatsReport, (UInt, UInt)]( | |
gamesManager, GamesManagerActor.In.StatsReport, r => (r.users, r.games) | |
) | |
(clientsCountF zip gamesCountF).map { case (clients, gameManagerOpt) => | |
Control.Out.Status(clients, gameManagerOpt.map(_._1), gameManagerOpt.map(_._2)) | |
} | |
} | |
else Future.successful(Control.Out.GenericReply.error(s"Invalid control key '${c.key}'")) | |
} | |
private[this] val notLoggedIn: Receive = { | |
def logIn(user: User, sessionToken: SessionToken, autogenerated: Boolean): Unit = { | |
context.become(loggedIn(user)) | |
LoggedIn(user, sessionToken, autogenerated).out() | |
gamesManager ! GamesManagerActor.In.CheckUserStatus(user) | |
} | |
LoggingReceive(({ | |
case FromClient.Management(AutoRegister) => | |
val password = PlainPassword(UUID.randomUUID().shortStr) | |
val sessionToken = SessionToken.random() | |
val id = UUID.randomUUID() | |
val user = User(id, s"autogen-${id.shortStr}") | |
val credentials = Credentials(user.name, password) | |
db.withSession { implicit session => | |
Tables.users. | |
map(t => (t.user, t.sessionToken, t.password, t.email)). | |
insert((user, sessionToken.value, password.encrypted, None)) | |
} | |
logIn(user, sessionToken, autogenerated = true) | |
case FromClient.Management(Login(credentials)) => | |
val optQ = Tables.users. | |
filter(t => t.name === credentials.name). | |
map(t => (t.id, t.sessionToken, t.email, t.password)) | |
val idOpt = db.withSession(optQ.firstOption(_)).filter { | |
case (_, sessionToken, _, pwHash) => | |
credentials.check(sessionToken, pwHash) | |
}.map(t => (t._1, SessionToken(t._2), t._3.isEmpty)) | |
idOpt.fold2( | |
InvalidCredentials.out(), | |
{ case (id, token, autogenerated) => | |
logIn(User(id, credentials.name), token, autogenerated) } | |
) | |
}: Receive) orElse common) | |
} | |
private[this] def loggedIn(user: User): Receive = LoggingReceive(({ | |
case FromClient.Management(CheckNameAvailability(name)) => | |
val query = Tables.users.map(_.name).filter(_ === name).exists | |
val exists = db.withSession(query.run(_)) | |
CheckNameAvailabilityResponse(name, ! exists).out() | |
case FromClient.Management(Register(username, password, email)) => | |
val token = SessionToken.random() | |
val query = Tables.users. | |
filter(t => t.id === user.id && t.email.isEmpty). | |
map(t => (t.name, t.email, t.password, t.sessionToken)) | |
val success = Try { | |
db.withSession(query.update(( | |
username, Some(email), password.encrypted, token.value | |
))(_)) | |
}.getOrElse(0) === 1 | |
RegisterResponse(if (success) Some(token) else None).out() | |
case FromClient.Management(JoinGame(mode)) => | |
gamesManager ! GamesManagerActor.In.Join(user, mode) | |
case FromClient.Management(CancelJoinGame) => | |
gamesManager ! GamesManagerActor.In.CancelJoinGame(user) | |
case msg: JoinGameCancelled.type => | |
msg.out() | |
case GameActor.Out.Joined(human, game) => | |
GameJoined(human).out() | |
context.become(inGame(user, human, game)) | |
}: Receive) orElse common) | |
private[this] def inGame(user: User, human: Human, game: ActorRef): Receive = { | |
inGameOpt = Some((game, human)) | |
context.watch(game) | |
LoggingReceive(({ | |
case FromClient.Game(msgFn) => | |
val msg = msgFn(human) | |
game ! msg | |
case msg: GameActor.ClientOut => | |
msg.out() | |
case Terminated if sender() == game => | |
log.error("Game was terminated") | |
inGameOpt = None | |
context.become(loggedIn(user)) | |
}: Receive) orElse common) | |
} | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package app.actors | |
import java.net.InetSocketAddress | |
import java.nio.ByteOrder | |
import akka.actor._ | |
import akka.io.{IO, Tcp} | |
import app.persistence.DBDriver | |
import implicits._ | |
import launch.RTConfig | |
import spire.math.UInt | |
import scala.concurrent.duration._ | |
import scalaz._, Scalaz._, implicits._ | |
class Server( | |
rtConfig: RTConfig, gamesManager: ActorRef, db: DBDriver.Database | |
)(implicit byteOrder: ByteOrder) extends Actor with ActorLogging { | |
import context.system, context.dispatcher | |
def port = rtConfig.port | |
val manager = IO(Tcp) | |
manager ! Tcp.Bind(self, new InetSocketAddress(port.signed)) | |
/* Actor that is handling our bound socket. */ | |
private[this] var socketRef = Option.empty[ActorRef] | |
def receive = { | |
case Tcp.Bound(localAddress) => | |
socketRef = Some(sender()) | |
log.info("Server bound to {}", localAddress) | |
case msg: Tcp.Unbind.type => | |
socketRef.fold2( | |
log.error("Can't unbind, socket not bound to {}", port), | |
ref => { | |
log.debug("Received a request to unbind, forwarding to {}", ref) | |
ref ! msg | |
} | |
) | |
case Tcp.Unbound => | |
socketRef = None | |
log.info("Socket to port {} unbound, initiating shutdown.", port) | |
context.children.foreach(_ ! Server.ShutdownInitiated) | |
gamesManager ! Server.ShutdownInitiated | |
case Tcp.CommandFailed(b: Tcp.Bind) => | |
log.error(s"Cannot bind to ${b.localAddress}!") | |
context.stop(self) | |
case Tcp.Connected(remote, local) => | |
log.info(s"Client connected from $remote.") | |
val connection = sender() | |
val msgHandler = context.actorOf(Props(new MsgHandler( | |
connection, | |
handlerRef => Props(new NetClient(handlerRef, gamesManager, self, rtConfig.controlKey, db)) | |
)), s"${remote.getHostString}-${remote.getPort}") | |
connection ! Tcp.Register(msgHandler, keepOpenOnPeerClosed = true) | |
case Server.In.ReportClientCount => | |
sender ! Server.Out.ReportClientCount(UInt(context.children.size)) | |
} | |
@throws[Exception](classOf[Exception]) | |
override def postStop(): Unit = { | |
log.info("Shutting down actor system because server has stopped.") | |
system.shutdown() | |
} | |
} | |
object Server { | |
sealed trait In | |
object In { | |
case object ReportClientCount extends In | |
} | |
sealed trait Out | |
object Out { | |
case class ReportClientCount(clients: UInt) extends In | |
} | |
case object ShutdownInitiated | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment