Skip to content

Instantly share code, notes, and snippets.

View djspiewak's full-sized avatar

Daniel Spiewak djspiewak

View GitHub Profile
// sbt-git workarounds (TODO upstream into sbt-tl)
inThisBuild {
git.uncommittedSignifier := Some("-SNAPSHOT")
git.formattedShaVersion := {
val Description = """^.*-(\d+)-[a-zA-Z0-9]+$""".r
val suffix = git.makeUncommittedSignifierSuffix(git.gitUncommittedChanges.value, git.uncommittedSignifier.value)
val description = Try("git describe --tags --match v*".!!.trim).toOption
// ported with love from http://psy-lob-saw.blogspot.com/2014/04/notes-on-concurrent-ring-buffer-queue.html
private[effect] final class UnsafeBounded[A](bound: Int) {
private[this] val buffer = new Array[AnyRef](bound)
private[this] val sequenceBuffer = new AtomicLongArray(bound)
private[this] val head = new AtomicLong(0)
private[this] val tail = new AtomicLong(0)
0.until(bound).foreach(i => sequenceBuffer.set(i, i.toLong))
final class Channel[F[_], A] private (q: Queue[F, A], closed: Ref[F, Boolean])(implicit F: Monad[F]) {
// doesn't interrupt taking in progress
def close: F[Unit] = closed.set(true)
def isClosed: F[Boolean] = closed.get
def send(a: A): F[Option[Unit]] =
closed.get.ifM(q.offer(a).map(Some(_)), F.pure(None))
class Channel[F[_], A] private (q: Queue[F, A], closed: Ref[F, Boolean])(implicit F: Monad[F]) {
// doesn't interrupt taking in progress
def close: F[Unit] = closed.set(true)
def isClosed: F[Boolean] = closed.get
def send(a: A): F[Option[Unit]] =
closed.get.ifM(q.offer(a).map(Some(_)), F.pure(None))
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
[info] enqueue max items and dequeue in order
[error] x parallel put and take
[error] List(1961, 1437, 1487, 1494, 1433, 1458, 1490, 1414, 1413, 1410, 1619, 1411, 1645, 1793, 1646, 1916, 1426, 1614, 1613, 1605, 1408, 1441, 1624, 1572, 1466, 1625, 1696, 1419, 1409, 1622, 1840, 1958, 1899, 1423, 1479, 1611, 1498, 1471, 1474, 1462, 1467, 1432, 1440, 1415, 1489, 1884, 1023, 1890, 1854, 1477, 1482, 1499, 1406, 1485, 1444, 1418, 1417, 1492, 1616, 1473, 1969, 1869, 1842, 1421, 1459, 1451, 1428, 1496, 1465, 1456, 1846, 1698, 1693, 1484, 1621, 960, 921, 1708, 1706, 1018, 1957, 1966, 1460, 1610, 1650, 1617, 1472, 1491, 1452, 1639, 1024, 917, 1928, 1709, 1970, 1920, 1921, 1888, 1442, 1478, 1486, 1792, 1443, 1453, 1468, 1584, 1009, 941, 929, 1885, 1689, 1712, 1850, 1877, 1894, 1436, 1447, 1500, 1476, 1455, 946, 1635, 950, 925, 957, 1905, 908, 1917, 1710, 1908, 1962, 1651, 1445, 1603, 1652, 958, 940, 1673, 1662, 1669, 1054, 1870, 913, 1882, 1972, 1878, 1866, 1980, 1427, 1454, 980, 1010, 1017, 1006, 938, 1677, 1
private[effect] final class UnsafeBounded[A](bound: Int) {
private[this] val buffer = new Array[AnyRef](bound)
private[this] val filled = new Array[Boolean](bound) // can't bitpack this because of word tearing
private[this] val first = new AtomicInteger(0)
private[this] val last = new AtomicInteger(0)
private[this] val length = new AtomicInteger(0)
def debug(): String = buffer.mkString("[", ", ", "]")
private[effect] final class UnsafeUnbounded[A] {
private[this] val first = new AtomicReference[Cell]
private[this] val last = new AtomicReference[Cell]
def put(data: A): () => Unit = {
val cell = new Cell(data)
val oldLast = last.getAndSet(cell)
if (oldLast == null) {
@tailrec
private[effect] final class UnsafeBounded[A](bound: Int) {
private[this] val buffer = new Array[AnyRef](bound)
private[this] val first = new AtomicInteger(0)
private[this] val last = new AtomicInteger(0)
private[this] val length = new AtomicInteger(0)
def size(): Int = length.get()
@tailrec
private final class BoundedAsyncQueue[F[_], A](capacity: Int)(implicit F: Async[F]) extends Queue[F, A] {
require(capacity > 0)
private[this] val buffer = new UnsafeBounded[A](capacity)
private[this] val waiters = new UnsafeUnbounded[Either[Throwable, Unit] => Unit]()
def offer(a: A): F[Unit] = F defer {
try {
buffer.put(a)
notifyOne()