Created
August 7, 2013 13:47
-
-
Save jboner/6174191 to your computer and use it in GitHub Desktop.
Bloom filter in Scala taken from Twitter's Algebird project.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Copyright 2012 Twitter, Inc. | |
Licensed under the Apache License, Version 2.0 (the "License"); | |
you may not use this file except in compliance with the License. | |
You may obtain a copy of the License at | |
http://www.apache.org/licenses/LICENSE-2.0 | |
Unless required by applicable law or agreed to in writing, software | |
distributed under the License is distributed on an "AS IS" BASIS, | |
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
See the License for the specific language governing permissions and | |
limitations under the License. | |
*/ | |
package com.twitter.algebird | |
import scala.collection.immutable.BitSet | |
import scala.collection.JavaConverters._ | |
import com.googlecode.javaewah.{EWAHCompressedBitmap => CBitSet} | |
object RichCBitSet { | |
def apply(x : Int*) = { | |
CBitSet.bitmapOf(x.sorted : _*) | |
} | |
implicit def cb2rcb(cb : CBitSet) : RichCBitSet = new RichCBitSet(cb) | |
} | |
// An enrichment to give some scala-like operators to the compressed | |
// bit set. | |
class RichCBitSet(val cb : CBitSet) { | |
def ++(b : CBitSet) : CBitSet = cb.or(b) | |
def ==(b : CBitSet) : Boolean = cb.equals(b) | |
def toBitSet(width : Int) : BitSet = { | |
val a = new Array[Long]((width+63)/64) | |
cb.asScala.foreach{ i : java.lang.Integer => a(i.intValue / 64) |= 1L << (i.intValue % 64) } | |
BitSet.fromArray(a) | |
} | |
} | |
object BloomFilter{ | |
def apply(numEntries: Int, fpProb: Double, seed: Int = 0) = { | |
val width = BloomFilter.optimalWidth(numEntries, fpProb) | |
val numHashes = BloomFilter.optimalNumHashes(numEntries, width) | |
BloomFilterMonoid(numHashes, width, seed) | |
} | |
// compute optimal number of hashes: k = m/n ln(2) | |
def optimalNumHashes(numEntries: Int, width: Int): Int = math.ceil(width / numEntries * math.log(2)).toInt | |
// compute optimal width: m = - n ln(p) / (ln(2))^2 | |
def optimalWidth(numEntries: Int, fpProb: Double): Int = math.ceil(-1 * numEntries * math.log(fpProb) / math.log(2) / math.log(2)).toInt | |
} | |
/** | |
* Bloom Filter - a probabilistic data structure to test presence of an element. | |
* | |
* Operations | |
* 1) insert: hash the value k times, updating the bitfield at the index equal to the each hashed value | |
* 2) query: hash the value k times. If there are k collisions, then return true; otherwise false. | |
* | |
* http://en.wikipedia.org/wiki/Bloom_filter | |
* | |
*/ | |
case class BloomFilterMonoid(numHashes: Int, width: Int, seed: Int) extends Monoid[BF]{ | |
val hashes: BFHash = BFHash(numHashes, width, seed) | |
val zero: BF = BFZero(hashes, width) | |
/** | |
* Assume the bloom filters are compatible (same width and same hashing functions). This | |
* is the union of the 2 bloom filters. | |
*/ | |
def plus(left: BF, right: BF): BF = left ++ right | |
/** | |
* Create a bloom filter with one item. | |
*/ | |
def create(item: String): BF = BFItem(item, hashes, width) | |
/** | |
* Create a bloom filter with multiple items. | |
*/ | |
def create(data: String*): BF = { | |
data.foldLeft(zero) { case (acc, x) => plus(acc, create(x)) } | |
} | |
} | |
/** | |
* Bloom Filter data structure | |
*/ | |
sealed abstract class BF extends java.io.Serializable{ | |
val numHashes: Int | |
val width: Int | |
def ++ (other: BF): BF | |
def + (other: String): BF | |
def contains(item: String): ApproximateBoolean | |
// Estimates the cardinality of the set of elements that have been | |
// inserted into the Bloom Filter. | |
def size: Approximate[Long] | |
} | |
/** | |
* Empty bloom filter. | |
*/ | |
case class BFZero(hashes: BFHash, width: Int) extends BF { | |
lazy val numHashes: Int = hashes.size | |
def ++(other: BF) = other | |
def + (other: String) = BFItem(other, hashes, width) | |
def contains(item: String) = ApproximateBoolean.exactFalse | |
def size = Approximate.exact[Long](0) | |
} | |
/** | |
* Bloom Filter with 1 value. | |
*/ | |
case class BFItem(item: String, hashes: BFHash, width: Int) extends BF { | |
lazy val numHashes: Int = hashes.size | |
def ++(other: BF): BF = { | |
other match { | |
case bf@BFZero(_,_) => this | |
case bf@BFItem(otherItem,_,_) => BFSparse(hashes,RichCBitSet(hashes(item) : _*),width) + otherItem | |
case _ => other + item | |
} | |
} | |
def + (other: String) = this ++ BFItem(other, hashes, width) | |
def contains(x: String) = ApproximateBoolean.exact(item == x) | |
def size = Approximate.exact[Long](1) | |
} | |
case class BFSparse(hashes : BFHash, bits : CBitSet, width : Int) extends BF { | |
import RichCBitSet._ | |
lazy val numHashes: Int = hashes.size | |
lazy val dense : BFInstance = BFInstance(hashes, bits.toBitSet(width), width) | |
def ++ (other: BF): BF = { | |
require(this.width == other.width) | |
require(this.numHashes == other.numHashes) | |
other match { | |
case bf@BFZero(_,_) => this | |
case bf@BFItem(item,_,_) => this + item | |
case bf@BFSparse(_,otherBits,_) => { | |
// assume same hashes used | |
BFSparse(hashes, | |
bits ++ otherBits, | |
width) | |
} | |
case _ => other ++ this | |
} | |
} | |
def + (item: String): BF = { | |
val bitsToActivate = RichCBitSet(hashes(item) : _*) | |
BFSparse(hashes, | |
bits ++ bitsToActivate, | |
width) | |
} | |
def contains(item: String): ApproximateBoolean = dense.contains(item) | |
def size: Approximate[Long] = dense.size | |
} | |
/* | |
* Bloom filter with multiple values | |
*/ | |
case class BFInstance(hashes : BFHash, bits: BitSet, width: Int) extends BF { | |
lazy val numHashes: Int = hashes.size | |
lazy val numBits: Int = bits.size | |
def ++ (other: BF) = { | |
require(this.width == other.width) | |
require(this.numHashes == other.numHashes) | |
other match { | |
case bf@BFZero(_,_) => this | |
case bf@BFItem(item,_,_) => this + item | |
case bf@BFSparse(_,_,_) => this ++ bf.dense | |
case bf@BFInstance(_,otherBits,_) => { | |
// assume same hashes used | |
BFInstance(hashes, | |
bits ++ otherBits, | |
width) | |
} | |
} | |
} | |
def + (item: String): BFInstance = { | |
val bitsToActivate = BitSet(hashes(item) : _*) | |
BFInstance(hashes, | |
bits ++ bitsToActivate, | |
width) | |
} | |
def bitSetContains(bs : BitSet, il : Int*) : Boolean = { | |
il.map{i : Int => bs.contains(i)}.reduce{_&&_} | |
} | |
def contains(item: String) = { | |
if (bitSetContains(bits, hashes(item) : _*)) { | |
// The false positive probability (the probability that the Bloom filter erroneously | |
// claims that an element x is in the set when x is not) is roughly | |
// p = (1 - e^(-numHashes * setCardinality / width))^numHashes | |
// See: http://en.wikipedia.org/wiki/Bloom_filter#Probability_of_false_positives | |
// | |
// However, the true set cardinality may not be known. From empirical evidence, though, | |
// it is upper bounded with high probability by 1.1 * estimatedCardinality (as long as the | |
// Bloom filter is not too full), so we plug this into the formula instead. | |
// TODO: investigate this upper bound and density more closely (or derive a better formula). | |
val fpProb = | |
if (density > 0.95) 1.0 // No confidence in the upper bound on cardinality. | |
else scala.math.pow(1 - scala.math.exp(-numHashes * size.estimate * 1.1 / width), numHashes) | |
ApproximateBoolean(true, 1 - fpProb) | |
} else { | |
// False negatives are not possible. | |
ApproximateBoolean.exactFalse | |
} | |
} | |
// Proportion of bits that are set to true. | |
def density = numBits.toDouble / width | |
/** | |
* Cardinality estimates are taken from Theorem 1 on page 15 of | |
* "Cardinality estimation and dynamic length adaptation for Bloom filters" | |
* by Papapetrou, Siberski, and Nejdl: | |
* http://www.softnet.tuc.gr/~papapetrou/publications/Bloomfilters-DAPD.pdf | |
* | |
* Roughly, by using bounds on the expected number of true bits after n elements | |
* have been inserted into the Bloom filter, we can go from the actual number of | |
* true bits (which is known) to an estimate of the cardinality. | |
* | |
* approximationWidth defines an interval around the maximum-likelihood cardinality | |
* estimate. Namely, the approximation returned is of the form | |
* (min, estimate, max) = | |
* ((1 - approxWidth) * estimate, estimate, (1 + approxWidth) * estimate) | |
*/ | |
lazy val size : Approximate[Long] = size(approximationWidth = 0.05) | |
def size(approximationWidth : Double = 0.05) : Approximate[Long] = { | |
assert(0 <= approximationWidth && approximationWidth < 1, "approximationWidth must lie in [0, 1)") | |
// Variable names correspond to those used in the paper. | |
val t = numBits | |
val n = sInverse(t).round.toInt | |
// Take the min and max because the probability formula assumes | |
// nl <= sInverse(t - 1) and sInverse(t + 1) <= nr | |
val nl = scala.math.min(sInverse(t - 1).floor, (1 - approximationWidth) * n).toInt | |
val nr = scala.math.max(sInverse(t + 1).ceil, (1 + approximationWidth) * n).toInt | |
val prob = | |
1 - | |
scala.math.exp(t - 1 - s(nl)) * | |
scala.math.pow(s(nl) / (t - 1), t - 1) - | |
scala.math.exp(-scala.math.pow(t + 1 - s(nr), 2) / (2 * s(nr))) | |
Approximate[Long](nl, n, nr, scala.math.max(0, prob)) | |
} | |
/** | |
* s(n) is the expected number of bits that have been set to true after | |
* n elements have been inserted into the Bloom filter. | |
* This is \hat{S}(n) in the cardinality estimation paper used above. | |
*/ | |
private def s(n : Int) : Double = { | |
width * (1 - scala.math.pow(1 - 1.0 / width, numHashes * n)) | |
} | |
/** | |
* sInverse(t) is the maximum likelihood value for the number of elements | |
* that have been inserted into the Bloom filter when it has t bits set to true. | |
* This is \hat{S}^{-1}(t) in the cardinality estimation paper used above. | |
*/ | |
private def sInverse(t : Int) : Double = { | |
scala.math.log(1 - t.toDouble / width) / (numHashes * scala.math.log(1 - 1.0 / width)) | |
} | |
} | |
object BFInstance{ | |
def apply(hashes: BFHash, width: Int): BFInstance = | |
BFInstance(hashes, BitSet.empty, width) | |
} | |
case class BFHash(numHashes: Int, width: Int, seed: Long = 0L) extends Function1[String, Iterable[Int]]{ | |
val size = numHashes | |
def apply(s: String) = nextHash(s.getBytes, numHashes) | |
private def splitLong(x: Long) = { | |
val upper = math.abs(x >> 32).toInt | |
val lower = math.abs((x << 32) >> 32).toInt | |
(upper, lower) | |
} | |
private def nextHash(bytes: Array[Byte], k: Int, digested: Seq[Int] = Seq.empty): Stream[Int] = { | |
if(k == 0) | |
Stream.empty | |
else{ | |
val d = if(digested.isEmpty){ | |
val (a, b) = MurmurHash128(k)(bytes) | |
val (x1, x2) = splitLong(a) | |
val (x3, x4) = splitLong(b) | |
Seq(x1, x2, x3, x4) | |
}else | |
digested | |
Stream.cons(d(0) % width, nextHash(bytes, k - 1, d.drop(1))) | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment