Skip to content

Instantly share code, notes, and snippets.

@nomisRev
Created January 22, 2020 19:11
Show Gist options
  • Select an option

  • Save nomisRev/b6c631de14f87b28b194b583eaa8d4f2 to your computer and use it in GitHub Desktop.

Select an option

Save nomisRev/b6c631de14f87b28b194b583eaa8d4f2 to your computer and use it in GitHub Desktop.
Arrow FS2 Actors example ported
package com.fortyseven.fptraining.slides
import arrow.Kind
import arrow.core.Either
import arrow.core.None
import arrow.core.Option
import arrow.core.Some
import arrow.core.Tuple2
import arrow.core.getOrElse
import arrow.fx.IO
import arrow.fx.Promise
import arrow.fx.Queue
import arrow.fx.Ref
import arrow.fx.extensions.fx
import arrow.fx.typeclasses.Async
import arrow.fx.typeclasses.Concurrent
import arrow.fx.typeclasses.seconds
import java.lang.RuntimeException
import java.time.Instant
fun <F, S, O> Concurrent<F>.actor(
initialState: S,
receive: (Ref<F, S>) -> Kind<F, O>
): Kind<F, Kind<F, O>> = fx.concurrent {
val ref = !Ref(initialState)
val queue = !Queue.bounded<F, Promise<F, O>>(1000, this)
val fiber = !fx.concurrent {
val promise = !queue.take()
val output = !receive(ref)
!promise.complete(output)
}.unit().fork() // Fire & forget
val ask = fx.concurrent {
val promise = !Promise<O>()
!queue.offer(promise)
val output = !dispatchers().default()
.raceN(fiber.join(), promise.get())
when (output) {
is Either.Left -> !raiseError<O>(RuntimeException("Boom!"))
is Either.Right -> output.b
}
}
ask
}
fun <F, S, I, O> Concurrent<F>.actor(
initialState: S,
receive: (I, Ref<F, S>) -> Kind<F, O>
): Kind<F, (I) -> Kind<F, O>> = fx.concurrent {
val ref = !Ref(initialState)
val queue = !Queue.bounded<F, Tuple2<I, Promise<F, O>>>(1000, this)
val fiber = !fx.concurrent {
val (input, promise) = !queue.take()
val output = !receive(input, ref)
!promise.complete(output)
}.unit().fork() // Fire & forget
val ask = { i: I ->
fx.concurrent {
val promise = !Promise<O>()
!queue.offer(Tuple2(i, promise))
val output = !dispatchers().default()
.raceN(fiber.join(), promise.get())
when (output) {
is Either.Left -> !raiseError<O>(RuntimeException("Boom!"))
is Either.Right -> output.b
}
}
}
ask
}
data class AuthToken(val expiresAt: Instant, val name: String = "token") {
fun isActive(now: Instant): Boolean = now.isBefore(expiresAt)
}
fun <F> Concurrent<F>.requestNewAuthToken(): Kind<F, AuthToken> = fx.concurrent {
!sleep(1.seconds)
val now = !now()
AuthToken(now, "token")
}
fun <F> Async<F>.now(): Kind<F, Instant> =
effect { Instant.now() }
fun <F> Concurrent<F>.ActiveAuthTokenActor(): Kind<F, Kind<F, AuthToken>> =
actor<F, Option<AuthToken>, AuthToken>(None) { ref ->
fx.concurrent {
val existingToken = !ref.get()
val now = !now()
val token = !existingToken
.filter { it.isActive(now) }
.map { just(it) }
.getOrElse {
fx.concurrent {
val newToken = !requestNewAuthToken()
!ref.set(Some(newToken))
newToken
}
}
token
}
}
suspend fun main(): Unit = IO.fx {
val requestActiveAuthToken = !ActiveAuthTokenActor()
val token = !requestActiveAuthToken
!effect { println(token) }
}.suspended()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment