Last active
March 2, 2020 22:10
-
-
Save chrilves/dc1bd08e0665a7243b1d410e48cc5513 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** Simulates a resource can be created and closed | |
* like a file handler, database connection handler, etc. | |
* | |
* Like the ones above, its methods are only available if | |
* the resource is opened, it must always be closed | |
* after use to avoid leaks and closing and already closed | |
* connection is a fault. | |
* | |
* @param name the name of the connection | |
* @param failureProba the probability opening and closing fail. | |
* echoing will fail on probability {{{failureProba / 4}}} | |
*/ | |
final class Connection(name: String, failureProba: Double = 0D) { | |
import Connection._ | |
println(s"Opening Connection $name.") | |
// Opening the connection may fail with probaility {{{failureProba}}} | |
if (scala.util.Random.nextDouble < failureProba) throw OpeningFailed(name) | |
_numberOpened += 1 // We need to keep the number of connection opened valid | |
/** The state of the connection */ | |
private var opened : Boolean = true | |
/** Close an OPENED the connection. | |
* If the connection is already closed, it fails. | |
*/ | |
def close(): Unit = | |
if (opened) { | |
println(s"Closing connection $name.") | |
_numberOpened -= 1 // {{{close}}} was called, so the number of connection opened should be decreased. | |
opened = false | |
// Closing the connection may fail with probaility {{{failureProba}}} | |
if (scala.util.Random.nextDouble < failureProba) throw ClosingFailed(name) | |
} | |
else throw ConnectionAlreadyClosed(name) | |
/** print and return the input number. | |
* The connection MUST BE OPENED, otherwise it fails. | |
*/ | |
def echo(i: Int): Int = | |
if (opened) { | |
println(s"Echoing $i on connection $name.") | |
if (scala.util.Random.nextDouble < (failureProba / 4)) throw EchoingFailed(name, i) | |
i | |
} | |
else throw ConnectionClosed(name) | |
/** Fails if the connection is opened */ | |
def checkClosed(): Unit = | |
if (opened) throw ConnectionNotClosed(name) | |
} | |
object Connection { | |
/** Create a new connection with the given name */ | |
def open(name: String, failureProba: Double = 0D): Connection = new Connection(name, failureProba) | |
/** Count the number of opened connections */ | |
private var _numberOpened: Int = 0 | |
/** Return the number of opened connections */ | |
def numberOpened: Int = _numberOpened | |
/** Reset the number of opened connections to 0*/ | |
def resetOpened(): Unit = _numberOpened = 0 | |
final case class OpeningFailed(name: String) extends Exception(s"Opening $name failed!") | |
final case class ClosingFailed(name: String) extends Exception(s"Closing $name failed!") | |
final case class EchoingFailed(name: String, i: Int) extends Exception(s"Echoing $i on $name failed!") | |
final case class ConnectionAlreadyClosed(name: String) extends Exception(s"Can not close $name, it is already closed!") | |
final case class ConnectionClosed(name: String) extends Exception(s"Connection $name is closed!") | |
final case class ConnectionNotClosed(name: String) extends Exception(s"Connection $name is NOT closed!") | |
} | |
/** A safe wrapper to open and close a resource of type {{{A}}}. */ | |
trait Resource[+A] { self => | |
/** Open a new resource of type {{{A}}} | |
* Returns {{{(aNewA, functionClosingTheNewA)}}} | |
* - {{{aNewA}}} is the created resource. | |
* - {{{functionClosingTheNewA}}} is a function that closes the resource {{{aNewA}}}. | |
* A resource MUST NEVER be accessed/used after being closed. | |
*/ | |
def open(): (A, () => Unit) | |
/* Call {{{f}}} with a newly created {{{a:A}}}. | |
* Ensures that {{{a}}} is properly closed after the call to {{{f}}}. | |
* The return value {{{f(a):B}}} MUST NEVER rely on {{{a}}} being opened. | |
*/ | |
final def use[B](f: A => B): B = { | |
val (a, close) = open() | |
try f(a) | |
finally close() | |
} | |
final def map[B](f: A => B): Resource[B] = | |
new Resource[B] { | |
def open(): (B, () => Unit) = { | |
val (a, closeA) = self.open() | |
try (f(a), closeA) | |
catch { case e : Throwable => | |
try closeA() catch { case e2: Throwable => e.addSuppressed(e2) } | |
throw e | |
} | |
} | |
} | |
final def flatMap[B](f: A => Resource[B]): Resource[B] = | |
new Resource[B] { | |
def open(): (B, () => Unit) = { | |
val (a, closeA) = self.open() | |
try { | |
val (b, closeB) = f(a).open() | |
(b, () => { | |
try closeB() | |
finally closeA() | |
}) | |
} catch { case e: Throwable => | |
closeA() | |
throw e | |
} | |
} | |
} | |
/* Call {{{f}}} with a newly created {{{a:A}}}. | |
* Ensures that the returned iterator will open/close an {{{a:A}}} properly. | |
* | |
* A resource {{{a:A}}} is created the first time the resulting iterator | |
* methods {{{hasNext}}}/{{{next}}} are called. | |
* The resource {{{a:A}}} is closed when there is no next value in the iterator | |
* OR if an exception was raised. | |
*/ | |
final def useInIterator[B](f: A => Iterator[B]): Iterator[B] = { | |
sealed trait State | |
final case class Running(close: () => Unit, iter: Iterator[B]) extends State // Means a {{{a:A}}} has been opened and {{{f(a)}}} called. | |
final case object Finished extends State // Means the iterator {{{f(a:A)}}} has been consumed and {{{a:A}}} closed without error. | |
final case class Failed(error: Throwable) extends State // Means an exception has happened. | |
final class WrapedItetator(initialState: State) extends Iterator[B] { | |
var state: State = initialState | |
def hasNext: Boolean = | |
state match { | |
case Running(cls, iter) => | |
try if (iter.hasNext) | |
true | |
else { | |
state = Finished | |
cls() | |
false | |
} | |
catch { case e : Throwable => | |
if (state != Finished) | |
try cls() | |
catch { case e2: Throwable => e.addSuppressed(e2) } | |
state = Failed(e) | |
throw e | |
} | |
case Finished => false | |
case Failed(e) => throw e | |
} | |
def next(): B = | |
state match { | |
case Running(cls, iter) => | |
try iter.next() | |
catch { case e : Throwable => | |
try cls() catch { case e2: Throwable => e.addSuppressed(e2) } | |
state = Failed(e) | |
throw e | |
} | |
case Finished => | |
throw new java.util.NoSuchElementException("next on empty iterator") | |
case Failed(e) => | |
throw e | |
} | |
} | |
val (a, close) = self.open() | |
var opened: Boolean = true | |
try { | |
val iter = f(a) | |
if (!iter.hasNext) { | |
opened = false | |
close() | |
Iterator.empty | |
} else new WrapedItetator(Running(close, iter)) | |
} catch { case e : Throwable => | |
if (opened) | |
try close() | |
catch { case e2 : Throwable => e.addSuppressed(e2) } | |
throw e | |
} | |
} | |
} | |
object Resource { | |
/** A pure {{{a:A}}} that does not need to be opened/closed */ | |
def pure[A](a:A): Resource[A] = | |
new Resource[A] { | |
def open(): (A, () => Unit) = (a, () => ()) | |
} | |
/** Create a {{{Resource[A]}}} from an opening and closing function */ | |
def of[A](opn: => A, cls: A => Unit): Resource[A] = | |
new Resource[A] { | |
def open(): (A, () => Unit) = { | |
val a = opn | |
(a, () => cls(a)) | |
} | |
} | |
} | |
/*********** | |
* TESTING * | |
***********/ | |
final class Stats { | |
def count : Long = _count | |
def avg : Double = _sum / _count | |
def min : Double = _min | |
def max : Double = _max | |
def stddev: Double = math.sqrt(_sumSquares / _count - avg*avg) | |
private var _min : Double = Double.MaxValue | |
private var _max : Double = Double.MinValue | |
private var _sum : Double = 0 | |
private var _sumSquares : Double = 0 | |
private var _count : Long = 0 | |
def +(d : Double): Unit = { | |
_min = math.min(_min, d) | |
_max = math.max(_max, d) | |
_sum += d | |
_sumSquares += d*d | |
_count += 1 | |
} | |
def ++(s: Stats): Unit = { | |
_min = math.min(_min, s._min) | |
_max = math.max(_max, s._max) | |
_sum += s._sum | |
_sumSquares += s._sumSquares | |
_count += s._count | |
} | |
override def toString = | |
s"""count = ${count} | |
|avg = ${avg} | |
|min = ${min} | |
|max = ${max} | |
|stddev = ${stddev} | |
""".stripMargin | |
} | |
object Tests { | |
/** Open two resources A and B. B may depends on A, so B must be closed before A */ | |
val rc : Resource[(Connection, Connection)] = | |
for { | |
a <- Resource.of[Connection](Connection.open("A", 0.2), _.close()) | |
b <- Resource.of[Connection](Connection.open("B", 0.2), _.close()) | |
} yield { | |
if (scala.util.Random.nextDouble < 0.2) throw new Exception("Map failed!") | |
(a,b) | |
} | |
/** Use the above resource to map an iterator. | |
* use the resulting operator to obverse what | |
* is happening. | |
*/ | |
def iter() = | |
rc.useInIterator { case (connA,connB) => | |
List(1,2,3,4,5) | |
.iterator | |
.map { i => connA.echo(i) + connB.echo(i) } | |
} | |
final case class TestStats(exceptions: Stats, correctClosing: Stats) | |
def test(times: Long): TestStats = { | |
val exceptions = new Stats | |
val correctClosing = new Stats | |
for (i <- 1L to times) { | |
Connection.resetOpened() | |
try { | |
iter().size | |
exceptions + 0d | |
} catch { case _ : Throwable => | |
exceptions + 1d | |
} | |
correctClosing + (if (Connection.numberOpened == 0) 1d else 0d) | |
} | |
TestStats(exceptions, correctClosing) | |
} | |
} | |
object Benchmark { | |
def iterN(n: Long): Iterator[Long] = | |
new Iterator[Long] { | |
var _next: Long = -1 | |
def hasNext: Boolean = _next < n | |
def next(): Long = { | |
_next += 1 | |
_next | |
} | |
} | |
def iterFor(millis: Long): Iterator[Long] = { | |
val start = System.currentTimeMillis | |
new Iterator[Long] { | |
var _next: Long = -1 | |
def hasNext: Boolean = System.currentTimeMillis < start + millis | |
def next(): Long = { | |
_next += 1 | |
_next | |
} | |
} | |
} | |
val conn: Resource[Connection] = | |
Resource.of[Connection](Connection.open("A", 0D), _.close()) | |
def bench(warm: Long, hot: Long)(mkIter: => Double): Stats = { | |
val s = new Stats | |
println("Warming") | |
for (i <- 1L to warm) mkIter | |
println("Statging") | |
for (i <- 1L to hot) s + mkIter | |
s | |
} | |
def time[A](a: => A): Double = { | |
val start = System.nanoTime | |
a | |
(System.nanoTime - start).toDouble / 1000000000L | |
} | |
val iterations: Long = 100000000L | |
def timeForNoRes(): Stats = | |
bench(100, 100)(time(iterN(iterations).size)) | |
def timeForRes(): Stats = | |
bench(100, 100) { | |
val it = conn.useInIterator(_ => iterN(iterations)) | |
time(it.size) | |
} | |
def numberForNoRes(): Stats = | |
bench(100, 100)(iterFor(100).size.toDouble) | |
def numberForRes(): Stats = | |
bench(100, 100)(conn.useInIterator(_ => iterFor(100)).size.toDouble) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment