Created
November 28, 2019 09:11
-
-
Save nomisRev/7e857956cc79e00810a4bf3da55fea1b to your computer and use it in GitHub Desktop.
Stacksafe Mono Reactor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import reactor.core.publisher.Mono | |
import java.util.concurrent.Executor | |
fun noTrampolining(): Mono<Int> { | |
var initial = Mono.just(0) | |
(0..5000).forEach { i -> | |
initial = initial.map { i } | |
} | |
return initial | |
} | |
fun <A> trampoline(a: A): Mono<A> = | |
Mono.create { sink -> | |
trampoline { sink.success(a) } | |
} | |
fun trampolined(): Mono<Int> { | |
var initial = Mono.just(0) | |
(0..5000).forEach { i -> | |
initial = if (i % maxStackDepthSize == 0) { | |
initial.flatMap { trampoline(it) } | |
} else { | |
initial.map { i } | |
} | |
} | |
return initial | |
} | |
fun main() { | |
trampolined().block() | |
noTrampolining().block() //StackOverFlow | |
} | |
/** | |
* EVERYTHING BELOW THIS POINT WAS COPY-PASTE FROM ARROW'S //arrow.fx.internal.Utils.kt | |
*/ | |
const val maxStackDepthSize = 127 | |
inline fun trampoline(crossinline f: () -> Unit): Unit = | |
_trampoline.get().execute(Runnable { f() }) | |
private val underlying = Executor { it.run() } | |
@PublishedApi | |
internal val _trampoline = object : ThreadLocal<TrampolineExecutor>() { | |
override fun initialValue(): TrampolineExecutor = | |
TrampolineExecutor(underlying) | |
} | |
@PublishedApi | |
internal class TrampolineExecutor(val underlying: Executor) { | |
private var immediateQueue = Platform.ArrayStack<Runnable>() | |
@Volatile | |
private var withinLoop = false | |
private fun startLoop(runnable: Runnable) { | |
withinLoop = true | |
try { | |
immediateLoop(runnable) | |
} finally { | |
withinLoop = false | |
} | |
} | |
fun execute(runnable: Runnable): Unit = | |
if (!withinLoop) startLoop(runnable) | |
else immediateQueue.push(runnable) | |
private fun forkTheRest() { | |
class ResumeRun(val head: Runnable, val rest: Platform.ArrayStack<Runnable>) : Runnable { | |
override fun run() { | |
immediateQueue.pushAll(rest) | |
immediateLoop(head) | |
} | |
} | |
val head = immediateQueue.pop() | |
if (head != null) { | |
val rest = immediateQueue | |
immediateQueue = Platform.ArrayStack() | |
underlying.execute(ResumeRun(head, rest)) | |
} | |
} | |
@Suppress("SwallowedException") // Should we rewrite with while?? | |
private tailrec fun immediateLoop(task: Runnable) { | |
try { | |
task.run() | |
} catch (ex: Throwable) { | |
forkTheRest() | |
// ex.nonFatalOrThrow() //not required??? | |
} | |
val next = immediateQueue.pop() | |
return if (next != null) immediateLoop(next) | |
else Unit | |
} | |
} | |
private const val initialIndex: Int = 0 | |
private const val chunkSize: Int = 8 | |
object Platform { | |
@Suppress("UNCHECKED_CAST") | |
class ArrayStack<A> { | |
private val initialArray: Array<Any?> = arrayOfNulls<Any?>(chunkSize) | |
private val modulo = chunkSize - 1 | |
private var array = initialArray | |
private var index = initialIndex | |
/** Returns `true` if the stack is empty. */ | |
fun isEmpty(): Boolean = | |
index == 0 && (array.getOrNull(0) == null) | |
/** Pushes an item on the stack. */ | |
fun push(a: A) { | |
if (index == modulo) { | |
val newArray = arrayOfNulls<Any?>(chunkSize) | |
newArray[0] = array | |
array = newArray | |
index = 1 | |
} else { | |
index += 1 | |
} | |
array[index] = a | |
} | |
/** Pushes an entire iterator on the stack. */ | |
fun pushAll(cursor: Iterator<A>) { | |
while (cursor.hasNext()) push(cursor.next()) | |
} | |
/** Pushes an entire iterable on the stack. */ | |
fun pushAll(seq: Iterable<A>) { | |
pushAll(seq.iterator()) | |
} | |
/** Pushes the contents of another stack on this stack. */ | |
fun pushAll(stack: ArrayStack<A>) { | |
pushAll(stack.iteratorReversed()) | |
} | |
/** Pops an item from the stack (in LIFO order). | |
* | |
* Returns `null` in case the stack is empty. | |
*/ | |
fun pop(): A? { | |
if (index == 0) { | |
if (array.getOrNull(0) != null) { | |
array = array[0] as Array<Any?> | |
index = modulo | |
} else { | |
return null | |
} | |
} | |
val result = array[index] as A | |
// GC purposes | |
array[index] = null | |
index -= 1 | |
return result | |
} | |
/** Builds an iterator out of this stack. */ | |
@Suppress("IteratorNotThrowingNoSuchElementException") | |
fun iteratorReversed(): Iterator<A> = | |
object : Iterator<A> { | |
private var array = [email protected] | |
private var index = [email protected] | |
override fun hasNext(): Boolean = | |
index > 0 || (array.getOrNull(0) != null) | |
override fun next(): A { | |
if (index == 0) { | |
array = array[0] as Array<Any?> | |
index = modulo | |
} | |
val result = array[index] as A | |
index -= 1 | |
return result | |
} | |
} | |
fun isNotEmpty(): Boolean = | |
!isEmpty() | |
} | |
/** | |
* Establishes the maximum stack depth for `IO#map` operations. | |
* | |
* The default is `128`, from which we substract one as an | |
* optimization. This default has been reached like this: | |
* | |
* - according to official docs, the default stack size on 32-bits | |
* Windows and Linux was 320 KB, whereas for 64-bits it is 1024 KB | |
* - according to measurements chaining `Function1` references uses | |
* approximately 32 bytes of stack space on a 64 bits system; | |
* this could be lower if "compressed oops" is activated | |
* - therefore a "map fusion" that goes 128 in stack depth can use | |
* about 4 KB of stack space | |
*/ | |
const val maxStackDepthSize = 127 | |
/** | |
* Composes multiple errors together, meant for those cases in which error suppression, due to a second error being | |
* triggered, is not acceptable. | |
* | |
* On top of the JVM this function uses Throwable#addSuppressed, available since Java 7. On top of JavaScript the | |
* function would return a CompositeException. | |
*/ | |
fun composeErrors(first: Throwable, vararg rest: Throwable): Throwable { | |
rest.forEach { if (it != first) first.addSuppressed(it) } | |
return first | |
} | |
/** | |
* Composes multiple errors together, meant for those cases in which error suppression, due to a second error being | |
* triggered, is not acceptable. | |
* | |
* On top of the JVM this function uses Throwable#addSuppressed, available since Java 7. On top of JavaScript the | |
* function would return a CompositeException. | |
*/ | |
fun composeErrors(first: Throwable, rest: List<Throwable>): Throwable { | |
rest.forEach { if (it != first) first.addSuppressed(it) } | |
return first | |
} | |
inline fun trampoline(crossinline f: () -> Unit): Unit = | |
_trampoline.get().execute(Runnable { f() }) | |
private val underlying = Executor { it.run() } | |
@PublishedApi | |
internal val _trampoline = object : ThreadLocal<TrampolineExecutor>() { | |
override fun initialValue(): TrampolineExecutor = | |
TrampolineExecutor(underlying) | |
} | |
@PublishedApi | |
internal class TrampolineExecutor(val underlying: Executor) { | |
private var immediateQueue = ArrayStack<Runnable>() | |
@Volatile | |
private var withinLoop = false | |
private fun startLoop(runnable: Runnable) { | |
withinLoop = true | |
try { | |
immediateLoop(runnable) | |
} finally { | |
withinLoop = false | |
} | |
} | |
fun execute(runnable: Runnable): Unit = | |
if (!withinLoop) startLoop(runnable) | |
else immediateQueue.push(runnable) | |
private fun forkTheRest() { | |
class ResumeRun(val head: Runnable, val rest: ArrayStack<Runnable>) : Runnable { | |
override fun run() { | |
immediateQueue.pushAll(rest) | |
immediateLoop(head) | |
} | |
} | |
val head = immediateQueue.pop() | |
if (head != null) { | |
val rest = immediateQueue | |
immediateQueue = ArrayStack() | |
underlying.execute(ResumeRun(head, rest)) | |
} | |
} | |
@Suppress("SwallowedException") // Should we rewrite with while?? | |
private tailrec fun immediateLoop(task: Runnable) { | |
try { | |
task.run() | |
} catch (ex: Throwable) { | |
forkTheRest() | |
// ex.nonFatalOrThrow() //not required??? | |
} | |
val next = immediateQueue.pop() | |
return if (next != null) immediateLoop(next) | |
else Unit | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment