Skip to content

Instantly share code, notes, and snippets.

@axel22
Last active April 24, 2017 16:58
Show Gist options
  • Select an option

  • Save axel22/d208a0612679a0eee9f9305508bd807e to your computer and use it in GitHub Desktop.

Select an option

Save axel22/d208a0612679a0eee9f9305508bd807e to your computer and use it in GitHub Desktop.
import scala.reflect.ClassTag
/** A typeclass that describes how to instantiate an array for the given type `T`.
*
* This is a class tag on steroids.
* It is used in reactive containers that have to do a lot of array allocations.
*
* @tparam T type for which we want to instantiate an array
*/
trait Arrayable[@specialized(Byte, Short, Int, Float, Long, Double) T]
extends Serializable {
val classTag: ClassTag[T]
val nil: T
def newArray(sz: Int): Array[T]
def newRawArray(sz: Int): Array[T]
def apply(array: Array[T], idx: Int): T
def update(array: Array[T], idx: Int, v: T): Unit
def withNil(n: T) = new Arrayable.WithNil(this, classTag, n)
}
object Arrayable {
class WithNil[@specialized(Byte, Short, Int, Float, Long, Double) T](
val arrayable: Arrayable[T],
val classTag: ClassTag[T],
val nil: T
) extends Arrayable[T] {
def newArray(sz: Int) = {
val a = arrayable.newRawArray(sz)
var i = 0
while (i < sz) {
a(i) = nil
i += 1
}
a
}
def newRawArray(sz: Int) = arrayable.newRawArray(sz)
def apply(array: Array[T], idx: Int) = arrayable.apply(array, idx)
def update(array: Array[T], idx: Int, v: T) = arrayable.update(array, idx, v)
}
implicit def ref[T >: Null <: AnyRef: ClassTag]: Arrayable[T] = new Arrayable[T] {
val classTag = implicitly[ClassTag[T]]
val nil = null
def newArray(sz: Int) = new Array[T](sz)
def newRawArray(sz: Int) = newArray(sz)
def apply(array: Array[T], idx: Int): T = array(idx)
def update(array: Array[T], idx: Int, v: T): Unit = array(idx) = v
}
implicit val long: Arrayable[Long] = new Arrayable[Long] {
val classTag = implicitly[ClassTag[Long]]
val nil = Long.MinValue
def newArray(sz: Int) = {
val a = new Array[Long](sz)
var i = 0
while (i < sz) {
a(i) = nil
i += 1
}
a
}
def newRawArray(sz: Int) = new Array[Long](sz)
def apply(array: Array[Long], idx: Int): Long = array(idx)
def update(array: Array[Long], idx: Int, v: Long): Unit = array(idx) = v
}
implicit val int: Arrayable[Int] = new Arrayable[Int] {
val classTag = implicitly[ClassTag[Int]]
val nil = Int.MinValue
def newArray(sz: Int) = {
val a = new Array[Int](sz)
var i = 0
while (i < sz) {
a(i) = nil
i += 1
}
a
}
def newRawArray(sz: Int) = new Array[Int](sz)
def apply(array: Array[Int], idx: Int): Int = array(idx)
def update(array: Array[Int], idx: Int, v: Int): Unit = array(idx) = v
}
}
import scala.collection._
object Reactor {
trait ReactorLocalThread {
var marshalContext: MarshalContext = marshalContextThreadLocal.get
}
class MarshalContext() {
val written = new BloomMap[AnyRef, Int]
}
val marshalContextThreadLocal = new ThreadLocal[MarshalContext] {
override def initialValue = new MarshalContext
}
def marshalContext: MarshalContext =
Thread.currentThread match {
case rt: ReactorLocalThread => rt.marshalContext
case _ => marshalContextThreadLocal.get
}
}
import java.util.HashMap
import scala.util.hashing.Hashing
/** Bloom-filtered hash map that has fast checks when the key is not in the map.
*
* Equality and hashing are reference-based.
*
* The fast checks use identity hash. The map cannot contain `null` keys or values.
* Values are specialized for integers and longs.
*/
class BloomMap[K >: Null <: AnyRef: Arrayable, @specialized(Int, Long) V: Arrayable] {
private var keytable = implicitly[Arrayable[K]].newRawArray(8)
private var valtable = implicitly[Arrayable[V]].newArray(8)
private var rawSize = 0
private var bloom = new Array[Byte](4)
private val rawNil = implicitly[Arrayable[V]].nil
def contains(key: K): Boolean = {
val hash = System.identityHashCode(key)
val idx = (hash >>> 3) % bloom.length
val pos = 1 << (hash & 0x7)
val down = (bloom(idx) & pos) == 0
if (down) false
else lookup(key) != rawNil
}
private def lookup(key: K): V = {
var pos = System.identityHashCode(key) % keytable.length
var curr = keytable(pos)
while (curr != null && (curr ne key)) {
pos = (pos + 1) % keytable.length
curr = keytable(pos)
}
valtable(pos)
}
def nil: V = rawNil
def get(key: K): V = {
val hash = System.identityHashCode(key)
val idx = (hash >>> 3) % bloom.length
val pos = 1 << (hash & 0x7)
val down = (bloom(idx) & pos) == 0
if (down) rawNil
else lookup(key)
}
private def insert(key: K, value: V): V = {
assert(key != null)
checkResize(rawNil)
var pos = System.identityHashCode(key) % keytable.length
var curr = keytable(pos)
while (curr != null && (curr ne key)) {
pos = (pos + 1) % keytable.length
curr = keytable(pos)
}
val previousValue = valtable(pos)
keytable(pos) = key
valtable(pos) = value
val keyAdded = curr == null
if (keyAdded) rawSize += 1
previousValue
}
private def checkResize(nil: V): Unit = {
if (rawSize * 1000 / BloomMap.loadFactor > keytable.length) {
resize(nil)
}
}
private def resize(nil: V): Unit = {
val okeytable = keytable
val ovaltable = valtable
val ncapacity = keytable.length * 2
keytable = implicitly[Arrayable[K]].newRawArray(ncapacity)
valtable = implicitly[Arrayable[V]].newArray(ncapacity)
rawSize = 0
var pos = 0
while (pos < okeytable.length) {
val curr = okeytable(pos)
if (curr != null) {
val dummy = insert(curr, ovaltable(pos))
}
pos += 1
}
}
private def resizeBloomFilter() {
bloom = new Array(keytable.length / 2)
var i = 0
while (i < keytable.length) {
val key = keytable(i)
if (key != null) {
val hash = System.identityHashCode(key)
val idx = (hash >>> 3) % bloom.length
val pos = 1 << (hash & 0x7)
bloom(idx) = (bloom(idx) | pos).toByte
}
i += 1
}
}
def put(key: K, value: V): Unit = {
insert(key, value)
if (bloom.length > keytable.length / 4) {
val hash = System.identityHashCode(key)
val idx = (hash >>> 3) % bloom.length
val pos = 1 << (hash & 0x7)
bloom(idx) = (bloom(idx) | pos).toByte
} else resizeBloomFilter()
}
def size: Int = rawSize
def isEmpty: Boolean = size == 0
def nonEmpty: Boolean = !isEmpty
private def before(i: Int, j: Int): Boolean = {
val d = keytable.length >> 1
if (i <= j) j - i < d
else i - j > d
}
private def delete(key: K): V = {
assert(key != null)
var pos = System.identityHashCode(key) % keytable.length
var curr = keytable(pos)
while (curr != null && (curr ne key)) {
pos = (pos + 1) % keytable.length
curr = keytable(pos)
}
if (curr == null) rawNil
else {
val previousValue = valtable(pos)
var h0 = pos
var h1 = (h0 + 1) % keytable.length
while (keytable(h1) != null) {
val h2 = System.identityHashCode(keytable(h1)) % keytable.length
if (h2 != h1 && before(h2, h0)) {
keytable(h0) = keytable(h1)
valtable(h0) = valtable(h1)
h0 = h1
}
h1 = (h1 + 1) % keytable.length
}
keytable(h0) = null
valtable(h0) = rawNil
rawSize -= 1
previousValue
}
}
def remove(key: K): V = delete(key)
def clear()(implicit spec: BloomMap.Spec[V]): Unit = {
var i = 0
while (i < keytable.length) {
keytable(i) = null
valtable(i) = rawNil
i += 1
}
i = 0
while (i < bloom.length) {
bloom(i) = 0
i += 1
}
}
}
object BloomMap {
val loadFactor = 400
class Spec[@specialized(Int, Long) T]
implicit val intSpec = new Spec[Int]
implicit val longSpec = new Spec[Long]
private val anySpecValue = new Spec[Any]
implicit def anySpec[T] = anySpecValue.asInstanceOf[Spec[T]]
}
name := "spec-example-top"
scalaVersion in ThisBuild := "2.12.2"
lazy val root = project.in(file(".")).
aggregate(fooJS, fooJVM).
settings(
publish := {},
publishLocal := {}
)
lazy val foo = crossProject.in(file(".")).
settings(
name := "spec-example",
version := "0.1-SNAPSHOT",
libraryDependencies ++= Seq(
"org.scalatest" %%% "scalatest" % "3.0.1" % "test"
)
).
jvmSettings(
).
jsSettings(
)
lazy val fooJVM = foo.jvm
lazy val fooJS = foo.js
resolvers ++= Seq(
"Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
"Sonatype OSS Releases" at "https://oss.sonatype.org/content/repositories/releases"
)
addSbtPlugin("org.scala-js" % "sbt-scalajs" % "0.6.15")
import org.scalatest._
import org.scalatest.concurrent.AsyncTimeLimitedTests
import scala.concurrent._
import scala.concurrent.duration._
class BloomMapTest extends AsyncFunSuite
with Matchers with BeforeAndAfterAll with AsyncTimeLimitedTests {
def timeLimit = 10.seconds
implicit override def executionContext = ExecutionContext.Implicits.global
test("existing channel should be awaited") {
val done = Promise[Boolean]()
Reactor.marshalContext
done.success(true)
done.future.map(t => assert(t))
}
}
@axel22
Copy link
Author

axel22 commented Apr 24, 2017

Project uses standard directory layout: https://www.scala-js.org/doc/project/cross-build.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment