Created
February 27, 2023 22:38
-
-
Save hanishi/dc7a5d3c6a41a2ada38287f4e5c31139 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package actors | |
import actors.CpcActuator.Actor.{ FetchMeasurementKey, TimeoutKey, Work } | |
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerScheduler } | |
import akka.actor.typed.{ ActorRef, Behavior, SupervisorStrategy } | |
import akka.util.Timeout | |
import com.google.inject.Provides | |
import play.api.Configuration | |
import play.api.libs.concurrent.ActorModule | |
import usecase.SamplePublisherService | |
import scala.concurrent.duration.{ DurationInt, FiniteDuration } | |
import scala.concurrent.{ ExecutionContext, Future } | |
import scala.util.chaining.scalaUtilChainingOps | |
import scala.util.{ Failure, Success } | |
case class CpcActuator(minPrice: Float = 0f, cpc: Float = 0f, maxCpc: Float = 0f) | |
object CpcActuator { | |
val OVERSHOOTS_LIMIT = 5 | |
val UNDERSHOOTS_LIMIT = 5 | |
val CPC_STEP = 10 | |
sealed trait Command | |
private case class UpdateSucceeded(message: String) extends Command | |
private case class MeasurementsFetched(minPrice: Float, setpoint: Int, measurement: Int, cpc: Float, maxCpc: Float) | |
extends Command | |
private case class FetchMeasurementFailed(throwable: Throwable) extends Command | |
private class Actor( | |
samplePublisher: SamplePublisherService, | |
controller: ActorRef[PIController.Command], | |
context: ActorContext[Command], | |
buffer: StashBuffer[Command], | |
timers: TimerScheduler[Command], | |
)( | |
implicit timeout: Timeout, | |
ec: ExecutionContext | |
) { | |
val TIMEOUT_DURATION: FiniteDuration = 30.minutes | |
def idle(cpcActuator: CpcActuator): Behavior[Command] = { | |
if (timers.isTimerActive(TimeoutKey)) timers.cancel(TimeoutKey) | |
timers.startSingleTimer(TimeoutKey, Terminated, TIMEOUT_DURATION) | |
buffer.unstashAll(active(cpcActuator)) | |
} | |
def active(cpcActuator: CpcActuator): Behavior[Command] = Behaviors.receiveMessagePartial { | |
case Work(input) => | |
update(cpcActuator, input).pipe(context.pipeToSelf) { | |
case Success(cpc) => | |
UpdateSucceeded(cpc) | |
case Failure(throwable) => | |
throw throwable | |
} | |
Behaviors.same | |
case UpdateSucceeded(message) => | |
context.log.info(message) | |
if (timers.isTimerActive(FetchMeasurementKey)) timers.cancel(FetchMeasurementKey) | |
timers.startSingleTimer(FetchMeasurementKey, FetchMeasurement, 6.seconds) | |
Behaviors.same | |
case FetchMeasurement => | |
fetchMeasurements().pipe(context.pipeToSelf) { | |
case Success((minPrice, setpoint, measuremrnt, cpc, maxCpc)) => | |
MeasurementsFetched(minPrice, setpoint, measuremrnt, cpc, maxCpc) | |
case Failure(throwable) => | |
FetchMeasurementFailed(throwable) | |
} | |
fetchingMeasurements(cpcActuator) | |
} | |
private def fetchingMeasurements(cpcActuator: CpcActuator = CpcActuator()): Behavior[Command] = | |
Behaviors.receiveMessage { | |
case MeasurementsFetched(minPrice, setpoint, measurement, cpc, maxCpc) => | |
context.log.info(s"setpoint=$setpoint, measurement=$measurement, cpc=$cpc") | |
context.askWithStatus( | |
controller, | |
replyTo => PIController.Output(setpoint, measurement, replyTo) | |
) { | |
case Success(PIController.Input(value, undershoots, overshoots)) => | |
if (undershoots > UNDERSHOOTS_LIMIT) Work(value + 5) | |
else if (overshoots > OVERSHOOTS_LIMIT) Work(value - 5) | |
else Work(value) | |
case Failure(exception) => | |
throw exception | |
} | |
idle(cpcActuator.copy(minPrice = minPrice, cpc = cpc, maxCpc = maxCpc)) | |
case FetchMeasurementFailed(throwable) => | |
throw throwable | |
case other => | |
buffer.stash(other) | |
Behaviors.same | |
} | |
private def fetchMeasurements(): Future[(Float, Int, Int, Float, Float)] = | |
samplePublisher | |
.getSamplePublisher() | |
.map( | |
samplePublisher => | |
( | |
samplePublisher.minPrice, | |
samplePublisher.averageClicks, | |
samplePublisher.clicksEveryMinute, | |
samplePublisher.cpc, | |
samplePublisher.maxCpc | |
) | |
) | |
private def update(cpcActuator: CpcActuator, input: Float): Future[String] = { | |
println(input) | |
val cpc = 0.02f * input + cpcActuator.cpc | |
val result = if (input > 0) Math.min(cpc, cpcActuator.maxCpc) else Math.max(cpc, cpcActuator.minPrice) | |
samplePublisher.bid(result) | |
Future.successful(s"CPC:${cpcActuator.cpc} -> $result, minPrice: ${cpcActuator.minPrice}, maxPrice: ${cpcActuator.maxCpc}") | |
} | |
} | |
case object FetchMeasurement extends Command | |
object Actor extends ActorModule { | |
override type Message = Command | |
@Provides def apply(configuration: Configuration, samplePublisher: SamplePublisherService)( | |
implicit ec: ExecutionContext | |
): Behavior[Command] = | |
Behaviors | |
.supervise[Command]( | |
Behaviors.withStash(configuration.get[Int](STASH_BUFFER_SIZE)) { buffer => | |
Behaviors.setup { | |
context => | |
Behaviors.withTimers { timers => | |
implicit val timeout: Timeout = | |
configuration.getOptional[Int](ASK_TIME_OUT).getOrElse(DEFAULT_ASK_TIMEOUT).seconds | |
samplePublisher | |
.getSamplePublisher() | |
.map( | |
samplePublisher => | |
( | |
samplePublisher.minPrice, | |
samplePublisher.averageClicks, | |
samplePublisher.clicksEveryMinute, | |
samplePublisher.cpc, | |
samplePublisher.maxCpc | |
) | |
) | |
.pipe(context.pipeToSelf) { | |
case Success((minPrice, setpoint, measurement, cpc, maxCpc)) => | |
MeasurementsFetched(minPrice, setpoint, measurement, cpc, maxCpc) | |
case Failure(throwable) => | |
FetchMeasurementFailed(throwable) | |
} | |
new Actor( | |
samplePublisher, | |
context.spawnAnonymous(PIController.Actor(configuration)), | |
context, | |
buffer, | |
timers | |
).fetchingMeasurements() | |
} | |
} | |
} | |
) | |
.onFailure(SupervisorStrategy.restart) | |
private case class Work(value: Float) extends Command | |
private case object FetchMeasurementKey | |
private case object TimeoutKey | |
} | |
private case object Terminated extends Command | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package actors | |
import actors.PIController.Command | |
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer } | |
import akka.actor.typed.{ ActorRef, Behavior, SupervisorStrategy } | |
import akka.pattern.StatusReply | |
import play.api.Configuration | |
import java.time.LocalDateTime | |
import java.time.temporal.ChronoUnit | |
import scala.concurrent.{ ExecutionContext, Future } | |
import scala.util.chaining.scalaUtilChainingOps | |
import scala.util.{ Failure, Success } | |
case class PIController( | |
kp: Float, | |
ki: Float, | |
kd: Float, /* Derivative low-pass filter time constant */ | |
tau: Float, /* Output limits */ | |
limMin: Float, | |
limMax: Float, | |
/* Integrator limits */ | |
limMinInt: Float, | |
limMaxInt: Float, | |
/* Sample time (in seconds) */ | |
t: Float, | |
proportion: Float = 0.0f, | |
integrator: Float = 0.0f, | |
differentiator: Float = 0.0f, | |
prevError: Float = 0.0f, | |
prevMeasurement: Int = 0, | |
undershoots: Int = 0, | |
overshoots: Int = 0 | |
) extends Command { | |
// def out = { | |
// println(s"P=$proportion, I=$integrator, undershoots=$undershoots, overshoots=$overshoots, out= ${proportion + integrator}") | |
// proportion + integrator | |
// } | |
def out = { | |
val out = proportion + integrator + differentiator | |
println(s"P=$proportion, I=$integrator, D=$differentiator, out=${out}") | |
if (out > limMax) limMax else if (out < limMin) limMin else out | |
} | |
// private def update(setpoint: Int, measurement: Int): PIController = { | |
// val error = setpoint - measurement | |
// val gain = if (prevMeasurement > 0) measurement - prevMeasurement else 0 | |
// print(s""" | |
// |setpoint: $setpoint | |
// |measurement: $measurement | |
// |---------------------------------------- | |
// |error: $error | |
// |gain: $gain, | |
// |measurement: $measurement, | |
// |prevMeasurement: $prevMeasurement | |
// |""".stripMargin) | |
// if (error > gain) { | |
// val effectiveError = (error - gain).toFloat / measurement | |
// print( | |
// s"""effective error: $effectiveError | |
// |---------------------------------------- | |
// |""".stripMargin) | |
// this.copy( | |
// proportion = proportion(effectiveError), | |
// integrator = integrate(effectiveError, ki), | |
// prevError = effectiveError, | |
// prevMeasurement = measurement, | |
// undershoots = if (error > measurement) undershoots + 1 else undershoots, | |
// overshoots = 0 | |
// ) | |
// } else if (-error > gain) { | |
// val effectiveError = (error - gain).toFloat / measurement | |
// print( | |
// s"""effective error: $effectiveError | |
// |---------------------------------------- | |
// |""".stripMargin) | |
// this.copy( | |
// proportion = proportion(effectiveError), | |
// integrator = integrate(effectiveError, ki), | |
// prevError = effectiveError, | |
// prevMeasurement = measurement, | |
// undershoots = 0, | |
// overshoots = if (-error > measurement) overshoots + 1 else overshoots | |
// ) | |
// } else | |
// this.copy( | |
// proportion = proportion(0.0f), | |
// prevMeasurement = measurement, | |
// undershoots = if (undershoots > 0) undershoots - 1 else 0, | |
// overshoots = if (overshoots > 0) overshoots - 1 else 0 | |
// ) | |
// } | |
private def update(setpoint: Int, measurement: Int): PIController = { | |
val error = setpoint - measurement | |
print( | |
s""" | |
|setpoint: $setpoint | |
|measurement: $measurement | |
|---------------------------------------- | |
|error: $error | |
|measurement: $measurement, | |
|prevMeasurement: $prevMeasurement | |
|""".stripMargin) | |
this.copy( | |
proportion = proportion(error.toFloat), | |
integrator = integrate(error.toFloat), | |
differentiator = differentiate(measurement), | |
prevError = error.toFloat, | |
prevMeasurement = measurement | |
) | |
} | |
private def proportion(error: Float): Float = error * kp | |
private def integrate(error: Float): Float = { | |
val i = integrator + 0.5f * ki * t * (error + prevError) | |
if (i > limMaxInt) limMaxInt else if (i < limMinInt) limMinInt else i | |
} | |
private def differentiate(measurement: Int) = | |
-(2.0f * kd * (measurement - prevMeasurement) + (2.0f * tau - t) * differentiator) / (2.0f * tau + t) | |
} | |
object PIController { | |
val Kp = 2.0f | |
val Ki = 0.5f | |
val Kd = 0.25f | |
val LIM_MIN = -10.0f | |
val LIM_MAX = 10.0f | |
val LIM_MIN_INTEGRATOR = -5.0f | |
val LIM_MAX_INTEGRATOR = 5.0f | |
val TAU = 0.04f | |
val SAMPLE_TIME_S = 0.02f //setpointの値が300秒毎のclick数の平均値とし、フィードバックが60秒間隔の場合、60/300 = 0.2 | |
def load() = | |
Future.successful( | |
PIController( | |
Kp, | |
Ki, | |
Kd, | |
TAU, | |
LIM_MIN, | |
LIM_MAX, | |
LIM_MIN_INTEGRATOR, | |
LIM_MAX_INTEGRATOR, | |
SAMPLE_TIME_S | |
) | |
) | |
sealed trait Command | |
case class Output(setpoint: Int, measurement: Int, replyTo: ActorRef[StatusReply[Input]]) extends Command | |
case class Input(value: Float, undershoots: Int, overshoots: Int) extends Command | |
private case class InitializationSucceeded(controller: PIController) extends Command | |
private case class InitializationFailed(throwable: Throwable) extends Command | |
private class Actor( | |
context: ActorContext[Command], | |
buffer: StashBuffer[Command], | |
started: LocalDateTime = LocalDateTime.now() | |
) { | |
private def initializing(): Behavior[Command] = Behaviors.receiveMessage { | |
case InitializationSucceeded(controller) => buffer.unstashAll(active(controller)) | |
case InitializationFailed(throwable) => | |
context.log.error("Initialization Failed") | |
throw throwable | |
case other => | |
buffer.stash(other) | |
Behaviors.same | |
} | |
private def active(controller: PIController): Behavior[Command] = | |
Behaviors.receiveMessagePartial { | |
case Output(setpoint, measurement, replyTo) => | |
val updated = controller.update(setpoint, measurement) | |
replyTo ! StatusReply.Success(Input(updated.out, controller.undershoots, controller.overshoots)) | |
idle(updated) | |
} | |
private def idle(controller: PIController): Behavior[Command] = { | |
println( | |
s"""duration: ${ChronoUnit.SECONDS.between(started, LocalDateTime.now())} | |
|---------------------------------------- | |
|""".stripMargin) | |
active(controller) | |
} | |
} | |
case object Reset extends Command | |
object Actor { | |
def apply(configuration: Configuration)(implicit ec: ExecutionContext): Behavior[Command] = | |
Behaviors | |
.supervise[Command] { | |
Behaviors.withStash(configuration.get[Int](STASH_BUFFER_SIZE)) { buffer => | |
Behaviors.setup { context => | |
load().pipe(context.pipeToSelf) { | |
case Success(controller) => InitializationSucceeded(controller) | |
case Failure(exception) => InitializationFailed(exception) | |
} | |
new Actor(context, buffer).initializing() | |
} | |
} | |
} | |
.onFailure(SupervisorStrategy.restart) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package actors | |
import actors.CpcActuator.CPC_STEP | |
import actors.SamplePublisher.Command | |
import akka.actor.typed.scaladsl.{ Behaviors, TimerScheduler } | |
import akka.actor.typed.{ ActorRef, Behavior } | |
import akka.pattern.StatusReply | |
import com.google.inject.Provides | |
import play.api.libs.concurrent.ActorModule | |
import scala.concurrent.duration.DurationInt | |
import scala.util.Random | |
case class SamplePublisher( | |
minPrice: Float, | |
cpc: Float, | |
maxCpc: Float, | |
averageClicks: Int = 0, | |
clicksEveryMinute: Int = 0, | |
count: Int = 0 | |
) extends Command { | |
def incrementAverageClicks(): SamplePublisher = { | |
if (count % 5 == 0) { | |
this.copy( | |
averageClicks = averageClicks + (1 to 5 map (_ => Random.nextInt(100))).sum / 5, | |
count = count + 1 | |
) | |
} else | |
this.copy(count = count + 1) | |
} | |
def bid(cpc: Float) = { | |
if (cpc <= minPrice) | |
this.copy(cpc = cpc) | |
else { | |
val clicks = (Random.nextInt(100) * (cpc / minPrice)).toInt | |
if (count % 5 == 0) | |
this.copy(clicksEveryMinute = clicksEveryMinute + clicks, cpc = cpc) | |
else | |
this.copy(clicksEveryMinute = Math.max(clicksEveryMinute, clicks), cpc = cpc) | |
} | |
} | |
def boost(value: Int) = { | |
if (System.currentTimeMillis() % 2 == 0 && cpc > maxCpc) { | |
this.copy(clicksEveryMinute = clicksEveryMinute + value) | |
} else this.copy(averageClicks = averageClicks + value) | |
} | |
} | |
object SamplePublisher { | |
sealed trait Command | |
case class GetSamplePublisher(replyTo: ActorRef[StatusReply[SamplePublisher]]) extends Command | |
case class Bid(cpc: Float) extends Command | |
case class Pace(value: Int) extends Command | |
private class Actor(timers: TimerScheduler[Command]) { | |
private def active(publisher: SamplePublisher): Behavior[Command] = | |
Behaviors.receiveMessagePartial { | |
case GetSamplePublisher(replyTo) => | |
replyTo ! StatusReply.Success(publisher) | |
if (timers.isTimerActive(PaceChange)) timers.cancel(PaceChange) | |
timers.startSingleTimer(PaceChange, Pace(Random.between(100, 1000)), Random.between(0, 60).seconds) | |
active(publisher.incrementAverageClicks()) | |
case Bid(cpc) => | |
active(publisher.bid(cpc)) | |
case Pace(value) => | |
active(publisher.boost(value)) | |
} | |
} | |
object Actor extends ActorModule { | |
override type Message = Command | |
@Provides def apply(): Behavior[Command] = Behaviors.withTimers { timers => | |
val minPrice = Random.between(20, 25).toFloat | |
val cpc = minPrice + Random.between(0, 10) | |
new Actor(timers).active( | |
SamplePublisher( | |
minPrice, | |
cpc, | |
cpc + CPC_STEP, | |
Random.nextInt(100), | |
Random.nextInt(100) * (cpc / minPrice).toInt, | |
count = 1 | |
) | |
) | |
} | |
} | |
case object PaceChange extends Command | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package usecase | |
import actors.ASK_TIME_OUT | |
import actors.SamplePublisher.{Bid, Command, GetSamplePublisher} | |
import akka.actor.typed.scaladsl.AskPattern.{Askable, schedulerFromActorSystem} | |
import akka.actor.typed.{ActorRef, Scheduler} | |
import akka.util.Timeout | |
import com.google.inject.{Inject, Provides} | |
import play.api.Configuration | |
import scala.concurrent.ExecutionContext | |
import scala.concurrent.duration.DurationInt | |
@Provides class SamplePublisherService @Inject()(configuration: Configuration, samplePublisher: ActorRef[Command])( | |
implicit scheduler: Scheduler, | |
ex: ExecutionContext | |
) { | |
implicit val timeout: Timeout = configuration.get[Int](ASK_TIME_OUT).seconds | |
def getSamplePublisher() = samplePublisher.askWithStatus(GetSamplePublisher) | |
def bid(price: Float) = samplePublisher ! Bid(price) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment