Skip to content

Instantly share code, notes, and snippets.

@djspiewak
Created February 10, 2022 18:47
Show Gist options
  • Select an option

  • Save djspiewak/2ba4ebadd2fec973f4fa344380a83e86 to your computer and use it in GitHub Desktop.

Select an option

Save djspiewak/2ba4ebadd2fec973f4fa344380a83e86 to your computer and use it in GitHub Desktop.
private[effect] final class UnsafeBounded[A](bound: Int) {
private[this] val buffer = new Array[AnyRef](bound)
private[this] val first = new AtomicInteger(0)
private[this] val last = new AtomicInteger(0)
private[this] val length = new AtomicInteger(0)
def size(): Int = length.get()
@tailrec
def put(data: A): Unit = {
val oldLast = last.get()
if (length.get() >= bound) {
throw FailureSignal
} else {
if (last.compareAndSet(oldLast, (oldLast + 1) % bound)) {
buffer(oldLast) = data.asInstanceOf[AnyRef]
// we're already exclusive with other puts, and take can only *decrease* length, so we don't gate
// this also forms a write barrier for buffer
length.getAndIncrement()
()
} else {
put(data)
}
}
}
@tailrec
def take(): A = {
val oldFirst = first.get()
if (length.get() <= 0) { // read barrier and check that boundary puts have completed
throw FailureSignal
} else {
if (first.compareAndSet(oldFirst, (oldFirst + 1) % bound)) {
val back = buffer(oldFirst).asInstanceOf[A]
// we're already exclusive with other takes, and put can only *increase* length, so we don't gate
// doing this *after* we read from the buffer ensures we don't read a new value when full
// specifically, it guarantees that puts will fail even if first has already advanced
length.getAndDecrement()
buffer(oldFirst) = null // prevent memory leaks (no need to eagerly publish)
back
} else {
take()
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment