Skip to content

Instantly share code, notes, and snippets.

@EtaCassiopeia
Last active January 3, 2016 11:06
Show Gist options
  • Save EtaCassiopeia/a8c5107e8bcc8cd5ec66 to your computer and use it in GitHub Desktop.
Save EtaCassiopeia/a8c5107e8bcc8cd5ec66 to your computer and use it in GitHub Desktop.
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import com.couchbase.client.java.document.JsonLongDocument
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment
import com.couchbase.client.java.{Cluster, CouchbaseCluster}
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import ir.sahab.sepah.lib.resources.SepahStreamConst
import org.apache.logging.log4j.{LogManager, Logger}
import rx.functions.Func1
import rx.{Observable, Observer}
import scala.collection.JavaConversions._
import scalaz.concurrent.Task
import scalaz.stream.{Process, async}
/**
* <h1>CouchbaseAsyncSink</h1>
* this class queues increment request and performs them on the couchbase in an Asynchronous way
*
* @author Mohsen Zainalpour
* @version 1.0
* @since 10/10/15
*/
class CouchbaseAsyncSink(batchSize: Int = CouchbaseAsyncSink.defaultBatchSize)(alert: (String, Long) => Unit)(createCluster: => Cluster) extends Serializable {
private val mutex = new AtomicInteger()
//createCluster has been called by-name
private lazy val cluster = createCluster
//does later evaluation worth to the guard overhead?
private lazy val bucket = cluster.openBucket("default", "").async()
private val queue = async.unboundedQueue[(String, Long)]
/**
* each change on the queue size will emit a signal
*/
private val eventStream: Process[Task, Int] = queue.size.discrete
/**
* when a `size changed` signal received we can get a bunch of instructions and flush them to the couchbase
* dequeueBatch operation just runs once.[all of these operations run in Async and nonblocking mode]
*/
eventStream.map(x => {
if (x >= batchSize)
queue.dequeueBatch(batchSize).map(x => flush(x)).once.run.run
}).run.runAsync(_ => ())
/**
* a method to add an increment request to the operations queue
* @param key key name
* @param by increment by
*/
def inc(key: String, by: Long): Unit = {
queue.enqueueOne((key, by)).run
}
def incAll(seq: Seq[(String, Long)]): Unit = {
queue.enqueueAll(seq).run
}
/**
* fluch the increment request in the batch to the couchbase
* this methods runs in an Async mode and can emit the alerts whenever results came back
* @param cache
*/
private def flush(cache: Seq[(String, Long)]): Unit = {
if (cache.size > 0) {
mutex.incrementAndGet()
Observable.from(cache).flatMap(new Func1[(String, Long), Observable[JsonLongDocument]] {
override def call(record: (String, Long)): Observable[JsonLongDocument] = {
bucket.counter(record._1, 1l, 1l)
}
}).doOnEach(new Observer[JsonLongDocument] {
override def onCompleted(): Unit = {
mutex.decrementAndGet()
}
override def onError(throwable: Throwable): Unit = CouchbaseAsyncSink.logger.error(throwable)
override def onNext(t: JsonLongDocument): Unit = {
alert(t.id(), t.content())
}
}).subscribe() //in the async mode sink must have enough time to complete it`s jobs
//performing it in a blocking manner is slow and time consuming but it guaranties that all of the write operations will be completed
/*.last()
.toBlocking()
.single()*/
// println("flushed " + System.currentTimeMillis())
}
}
//does we need to close the connection?
def shutdown(): Boolean = {
cluster.disconnect()
}
def isTerminated(): Boolean = {
mutex.get() == 0
}
}
object CouchbaseAsyncSink {
var instance: Option[CouchbaseAsyncSink] = None
private val logger: Logger = LogManager.getLogger(classOf[CouchbaseAsyncSink])
val defaultConnectTimeOut = 8000
val defaultRequestBufferSize = 1048576
val defaultKeepAliveInterval = TimeUnit.HOURS.toMillis(1)
val defaultBatchSize = 2000
val SEPAH_COUCHBASE_CONNECT_TIMEOUT: String = "spark.couchbase.connect.timeout"
val SEPAH_COUCHBASE_REQUEST_BUFFER_SIZE: String = "spark.couchbase.request.buffer.size"
val SEPAH_COUCHBASE_KEEP_ALIVE_INTERVAL: String = "spark.couchbase.keep.alive.ainterval"
val SEPAH_COUCHBASE_BATCH_SIZE: String = "spark.couchbase.batch.size"
def apply(config: Config, alert: (String, Long) => Unit): CouchbaseAsyncSink = {
val defaultConfig = config.withFallback {
ConfigFactory.empty()
.withValue(SEPAH_COUCHBASE_CONNECT_TIMEOUT, ConfigValueFactory.fromAnyRef(defaultConnectTimeOut))
.withValue(SEPAH_COUCHBASE_REQUEST_BUFFER_SIZE, ConfigValueFactory.fromAnyRef(defaultRequestBufferSize))
.withValue(SEPAH_COUCHBASE_KEEP_ALIVE_INTERVAL, ConfigValueFactory.fromAnyRef(defaultKeepAliveInterval))
.withValue(SEPAH_COUCHBASE_BATCH_SIZE, ConfigValueFactory.fromAnyRef(defaultBatchSize))
}
/**
* CouchbaseCluster creator function literal
*/
val f = {
val hosts = defaultConfig.getString(SepahStreamConst.SEPAH_COUCHBASE_CONNECT_IP_LIST)
val env = DefaultCouchbaseEnvironment.builder()
.connectTimeout(defaultConfig.getLong(SEPAH_COUCHBASE_CONNECT_TIMEOUT) )
.keepAliveInterval(defaultConfig.getLong(SEPAH_COUCHBASE_KEEP_ALIVE_INTERVAL))
.requestBufferSize(defaultConfig.getInt(SEPAH_COUCHBASE_REQUEST_BUFFER_SIZE)) //to prevent BackpressureException on heavy load, bufferSize must be a power of 2 (default is : 2^14 , now is : 2^20)
.build()
CouchbaseCluster.create(env, hosts.split(",").toList)
}
//if we want singleton object
synchronized {
instance match {
case x: Some[CouchbaseAsyncSink] => x.get
case None => instance = Some(new CouchbaseAsyncSink(defaultConfig.getInt(SEPAH_COUCHBASE_BATCH_SIZE))(alert)(f)); instance.get
}
}
//if we dont want singleton object
//new CouchbaseAsyncSink(f, alert)
}
//this method would be used in test suites only
def apply(alert: (String, Long) => Unit): CouchbaseAsyncSink = {
var map: Map[String, String] = Map()
map += (SepahStreamConst.SEPAH_COUCHBASE_CONNECT_IP_LIST -> "192.168.1.44")
val config = ConfigFactory.parseMap(map)
apply(config, alert)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment