Created
June 3, 2019 11:35
-
-
Save stettix/8fd8ba1496f8653dcb369871b85d1351 to your computer and use it in GitHub Desktop.
Backfiller command line tool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package backfiller | |
import aws.AWS | |
import cats.effect.{ExitCode, IO, IOApp} | |
import com.amazonaws.AmazonServiceException | |
import com.amazonaws.retry.RetryUtils | |
import com.gu.emr.ClusterManager.ClusterID | |
import com.gu.emr.EmrClusterManager | |
import com.gu.emr.model.{ClusterDefinition, EmrStep, RunConfiguration} | |
import fs2.{Chunk, Pure, Stream} | |
import newworld.syntax.ClusterManagerSyntax | |
import org.joda.time.LocalDate | |
import scheduler.actions.ActionSet | |
import scala.concurrent.ExecutionContext | |
object Backfiller extends IOApp with ClusterManagerSyntax { | |
val MaxStepsPerCluster = 250 | |
private val defaultRunConfiguration = RunConfiguration.default("s3://ophan-temp/emr/logs") | |
def run(args: List[String]): IO[ExitCode] = { | |
if (args.length < 2 || args.length > 3) { | |
System.err.println("Usage: Backfiller <Job ID> <start date> <end date>") | |
System.exit(1) | |
} | |
val jobId = args(0) | |
val startDate = LocalDate.parse(args(1)) | |
val endDate = LocalDate.parse(args(2)) | |
val job = ActionSet.prod.findById(jobId).getOrElse(throw new Error(s"Couldn't find action with ID: $jobId")) | |
println(s"Running job '${job.id}' (${job.description}) from date $startDate to $endDate") | |
implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.global | |
val dates: Stream[Pure, LocalDate] = Stream | |
.range(0, Int.MaxValue) | |
.map(startDate.plusDays) | |
.takeThrough(date => endDate.isAfter(date)) | |
val jobSteps: Stream[Pure, Chunk[StepForDate]] = | |
dates | |
.flatMap(date => Stream.emits(job.steps(date).map(step => StepForDate(date, step)))) | |
.chunkN(MaxStepsPerCluster) | |
val emr = AWS.emr | |
val clusterManager = new EmrClusterManager("backfiller", emr)(ec) | |
val program: Stream[IO, ExitCode] = for { | |
jobStepChunk <- jobSteps.covary[IO] | |
clusterIdForChunk <- Stream.eval( | |
getOrCreateEmrCluster(clusterManager, | |
job.clusterDefinition, | |
backfillClusterName(job.clusterDefinition.name, jobStepChunk.toList.map(_.date)))) | |
_ = println(s"Adding ${jobStepChunk.size} steps to cluster $clusterIdForChunk") | |
_ <- Stream.eval(clusterManager.submitStepsIO(clusterIdForChunk, jobStepChunk.toList.map(_.step))) | |
_ = println("All done") | |
} yield ExitCode.Success | |
program.compile.last.map(_.getOrElse(ExitCode.Error)).guarantee(IO(emr.shutdown())) | |
} | |
def getOrCreateEmrCluster( | |
clusterManager: EmrClusterManager, | |
clusterDefinition: ClusterDefinition, | |
backfillClusterName: String): IO[ClusterID] = { | |
val renamedCluster = clusterDefinition.copy(name = backfillClusterName) | |
for { | |
maybeExistingClusterId <- clusterManager.findClusterIdIO(renamedCluster) | |
clusterId <- getOrElse(maybeExistingClusterId, | |
clusterManager.launchClusterIO(renamedCluster, defaultRunConfiguration)) | |
} yield clusterId | |
} | |
def backfillClusterName(originalName: String, dates: List[LocalDate]): String = | |
s"Backfill $originalName ${dates.head} to ${dates.last}" | |
import scala.concurrent.duration._ | |
def asRetriableStream[T](op: IO[T], operationDescription: String): Stream[IO, T] = | |
fs2.Stream | |
.retry(op, 10.seconds, nextDelay = identity, maxAttempts = 6 * 10, isRetriable(operationDescription)) | |
case class StepForDate(date: LocalDate, step: EmrStep) | |
case object Waiting extends Exception | |
def isRetriable(operation: String): Throwable => Boolean = { | |
case Waiting => | |
System.err.println(s"Retrying $operation") | |
true | |
case e: AmazonServiceException if RetryUtils.isRetryableServiceException(e) => | |
System.err.println(s"Retriable AWS error (${e.getErrorCode}) while performing $operation", e) | |
true | |
case e => | |
System.err.println(s"Error while performing $operation", e) | |
false | |
} | |
// Implement here as the old version of cats-effect that's currently on classpath doesn't have it. | |
def getOrElse[A](maybeA: Option[A], alt: => IO[A]): IO[A] = maybeA match { | |
case Some(a) => IO.pure(a) | |
case None => alt | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment