Created
August 1, 2016 09:03
-
-
Save prassee/65e6792a20417e1dbddf549168b5920b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package service | |
import _root_.util.Configurations | |
import _root_.util.futures.SafeFutures._ | |
import akka.actor.{Actor, ActorSystem, Props} | |
import com.google.inject.{Inject, Singleton} | |
import com.typesafe.config.ConfigRenderOptions | |
import org.joda.time.DateTime | |
import play.api.Logger | |
import play.api.libs.json._ | |
import service.aggregation_records.AggregationRecordsService | |
import service.aggregation_records.AggregationRecordsService.{CurrentRuleSetVersionType, RecordKey} | |
import service.formula.{FormulaModel, QueryDurationWindow} | |
import service.popularity_extraction.PopularityExtractionService | |
import service.popularity_scores.ProductPopularityScoresService | |
import service.popularity_scores.ProductPopularityScoresService.RuleSetVersion | |
import service.sync_statuses.ProductSyncStatusesService | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.concurrent.Future | |
import scala.concurrent.duration._ | |
import scala.util.Try | |
import service.ProductPopularityAggregators.ProductPopularityRule | |
@Singleton | |
class ProductPopularityAggregators @Inject()(actorSystem: ActorSystem, aggregationRecordsService: AggregationRecordsService, | |
popularityExtractionService: PopularityExtractionService, productSyncStatusesService: ProductSyncStatusesService, | |
productPopularityScoresService: ProductPopularityScoresService) { | |
private val x = (a: String) => a * 2 | |
private def calcQueryWindow(formula: ProductPopularityRule) = { | |
lazy val presentDay = new DateTime() | |
for { | |
qryWindow <- Configurations.getIntOpt("rules.product_popularity_rules.productViewWindow") | |
ohWindow <- Configurations.getIntOpt("rules.product_popularity_rules.orderCountWindow") | |
a <- Option(presentDay minusDays formula.productViewWindow.getOrElse(qryWindow)) | |
b <- Option(presentDay minusDays formula.orderCountWindow.getOrElse(ohWindow)) | |
qDurWindow = QueryDurationWindow((a, presentDay), (b, presentDay)) | |
} yield qDurWindow | |
} | |
def onStart() = { | |
import service.ProductPopularityAggregators._ | |
for { | |
ppr <- Json.fromJson[ProductPopularityRules](Json.parse("rules.product_popularity_rules".configAsJson)) | |
formula = { | |
productSyncStatusesService.checkAndUpdateCurrentRuleSetVersion( | |
Option(RecordKey.fromString(s"${CurrentRuleSetVersionType.prefix}-${ppr.version}"))) | |
ppr.formulas.foreach(formula => { | |
val formulaModel = FormulaModel(formula.expression, formula.variables.toSet) | |
// create actors for each formula version | |
actorSystem | |
.actorOf(Props(new ProductPopularityAggregator(RuleSetVersion(formula.version), calcQueryWindow(formula).get, | |
formulaModel, aggregationRecordsService, popularityExtractionService, productPopularityScoresService)), | |
s"product-view-aggregator-${formula.version}") | |
}) | |
} | |
} yield () | |
} | |
} | |
object ProductPopularityAggregators { | |
case class ProductPopularityRule(expression: String, variables: Array[String], version: String, productViewWindow: Option[Int], | |
orderCountWindow: Option[Int]) | |
object ProductPopularityRule { | |
implicit val popScoreRuleFormat: Format[ProductPopularityRule] = Json.format[ProductPopularityRule] | |
} | |
case class ProductPopularityRules(id: String, version: String, productViewWindow: Int, orderCountWindow: Int, | |
formulas: List[ProductPopularityRule]) | |
object ProductPopularityRules { | |
implicit val popScoreRulesFormat: Format[ProductPopularityRules] = Json.format[ProductPopularityRules] | |
} | |
implicit class PathToJson(path: String) { | |
def configAsJson = Configurations.getConfigObject(path).toConfig.root().render(ConfigRenderOptions.concise()) | |
} | |
} | |
private class ProductPopularityAggregator(ruleSet: RuleSetVersion, queryDurationWindow: QueryDurationWindow, | |
formulaModel: FormulaModel, aggregationRecordsService: AggregationRecordsService, popExtSvc: PopularityExtractionService, | |
productPopularityScoresService: ProductPopularityScoresService) | |
extends Actor with LogMessageTagger { | |
private val interval = Configurations.getInt("popularity.aggregator.schedule.frequency").minutes | |
@throws[Exception](classOf[Exception]) | |
override def preStart(): Unit = { | |
Logger.trace(s"Aggregator actor for rule set - ${ruleSet.version} created ".tag(this.getClass.getName)) | |
super.preStart() | |
self ! Aggregate | |
} | |
override def receive: Receive = { | |
case Aggregate => | |
val traceBackSecs = Configurations.getLong("popularity.aggregator.batchSize") | |
val readUpTo = new DateTime(new DateTime().getMillis - traceBackSecs) | |
val aggregate = for { | |
lastReadTimeOpt <- aggregationRecordsService.lastReadTime(ruleSet) | |
popularityScoresEither <- Try(popExtSvc.extractPopularity(formulaModel, queryDurationWindow).safely).flattenedEither | |
_ <- popularityScoresEither match { | |
case Left(t) => | |
Logger.trace(s"Extracting popularity scores had failed for version ${ruleSet.version}".tag(this.getClass.getName), t) | |
Future.successful(()) | |
case Right(popularityScores) => | |
Logger.trace(s"Updating popularity scores for version ${ruleSet.version}".tag(this.getClass.getName)) | |
productPopularityScoresService.updatePopularityScores(ruleSet, readUpTo, popularityScores) | |
} | |
} yield () | |
aggregate.onComplete(_ => context.system.scheduler.scheduleOnce(interval, self, Aggregate)) | |
} | |
private case object Aggregate | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment