This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def updateIndexFiles(newFile: IndexPage.File) = | |
for { | |
_ <- logger.info("Retrieving index page") | |
files <- getIndexByNamePage.map(IndexPage.parseFiles) | |
_ <- logger.info(s"Retrieved index page with ${files.size} files") | |
_ <- uploadNewIndexFiles(files, newFile) | |
} yield () | |
def uploadNewIndexFiles(files: List[IndexPage.File], newFile: IndexPage.File) = { | |
val newFiles = files.toSet + newFile |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def safeUpdateIndexFiles(newFile: IndexPage.File) = | |
leaseRepository | |
.acquire(LeaseID(ks"file-uploader"), HolderID.unique) | |
.use(_.guard(updateIndexFiles(newFile)).map(_.embedError)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
trait LeaseRepository[F[_]]: | |
def labels: List[Label] | |
def acquire(id: LeaseID, holderID: HolderID, annotations: List[Annotation] = Nil)(implicit | |
parameters: LeaseParameters = LeaseParameters.Default | |
): Resource[F, HeldLease[F]] | |
def get(id: LeaseID): F[Option[Lease[F]]] | |
def list: F[List[Lease[F]]] | |
def watcher: fs2.Stream[F, LeaseEvent[F]] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
trait Lease[F[_]]: | |
def id: LeaseID | |
def holder: F[HolderID] | |
def labels: F[List[Label]] | |
def annotations: F[List[Annotation]] | |
def isExpired: F[Boolean] | |
def expired: fs2.Stream[F, Unit] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def prepare(transferID: TransferID, transfer: Transfer): F[Branch.Vote[TransferFailure]] = { | |
if (accountID === transfer.origin) | |
Logger[F].debug(show"Preparing outgoing transfer $transferID: $transfer for account $accountID") >> | |
account.prepareOutgoingTransfer(transferID, transfer) | |
.onErrorRetryWithBackoff(Logger[F].warn(_)(show"Error preparing outgoing transfer $transferID, retrying in a bit")) | |
.onLeftRetryWithBackoff { case Account.PendingOutgoingTransfer => | |
Logger[F].warn(show"Account $accountID has a pending outgoing transfer, retrying in a bit") | |
}(retryParameters.onPendingTransfer) | |
.flatMap { | |
case Left(Account.Unknown) => Branch.Vote.Abort(TransferFailure.AccountNotFound(accountID)).pure[F] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def transfer(from: AccountID, to: AccountID, amount: PosAmount): F[TransferFailure \/ Unit] = | |
coordinator | |
.create(TransferID.random, Transfer(from, to, amount), from, to) | |
.use(_.pollForFinalStatus()) | |
.flatMap { | |
case Status.Committed => ().asRight[TransferFailure].pure | |
case Status.Aborted(reason) => | |
reason match { | |
case AbortReason.Timeout => | |
EitherT.leftT(TransferFailure.Timeout: TransferFailure).value |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
transactor.coordinator[TransferID, AccountID, Transfer, TransferFailure]( | |
"transfer", | |
{ accountID => | |
val account = sharding.entityFor(accountID) | |
new TransferBranch(accountID, account) | |
}, | |
Some(transferParameters.timeout) | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
trait Branch[F[_], TID, Q, R] | |
def prepare(id: TID, query: Q): F[Vote[R]] | |
def commit(id: TID): F[Unit] | |
def abort(id: TID): F[Unit] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Example: orchestration of the booking process for a journey - prepare phase | Data store | Transaction branch operation | |
---|---|---|---|
Create cancelable hotel & flight reservations | External API | Make a reversible synchronous HTTP POST/PUT request | |
Request the billing backend for the credit card guarantee charge and await confirmation | Internal service | Send a message or call an endpoint and wait for an event to occur | |
Add details to the customer row in the database | Database | Acquire an exclusive lock on a database row or use builtin XA support | |
Grab a semaphore to update the recent bookings cache | In-memory resource | Lock an in-memory resource | |
Schedule traveller reminder notifications | Actor cluster | Send a command to an actor | |
Add an entry in a bookings log | File | Persist a rollback-ready change in a file |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.{ Eq, Show } | |
import cats.syntax.eq._ | |
import cats.syntax.show._ | |
import cats.instances.int._ | |
import FunctionHelpers._ | |
object Differ { | |
sealed trait Diff[T] | |
object Diff { | |
final case class Insert[T](revision: T) extends Diff[T] |
NewerOlder