Last active
January 3, 2016 11:06
-
-
Save EtaCassiopeia/a8c5107e8bcc8cd5ec66 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.TimeUnit | |
import java.util.concurrent.atomic.AtomicInteger | |
import com.couchbase.client.java.document.JsonLongDocument | |
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment | |
import com.couchbase.client.java.{Cluster, CouchbaseCluster} | |
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} | |
import ir.sahab.sepah.lib.resources.SepahStreamConst | |
import org.apache.logging.log4j.{LogManager, Logger} | |
import rx.functions.Func1 | |
import rx.{Observable, Observer} | |
import scala.collection.JavaConversions._ | |
import scalaz.concurrent.Task | |
import scalaz.stream.{Process, async} | |
/** | |
* <h1>CouchbaseAsyncSink</h1> | |
* this class queues increment request and performs them on the couchbase in an Asynchronous way | |
* | |
* @author Mohsen Zainalpour | |
* @version 1.0 | |
* @since 10/10/15 | |
*/ | |
class CouchbaseAsyncSink(batchSize: Int = CouchbaseAsyncSink.defaultBatchSize)(alert: (String, Long) => Unit)(createCluster: => Cluster) extends Serializable { | |
private val mutex = new AtomicInteger() | |
//createCluster has been called by-name | |
private lazy val cluster = createCluster | |
//does later evaluation worth to the guard overhead? | |
private lazy val bucket = cluster.openBucket("default", "").async() | |
private val queue = async.unboundedQueue[(String, Long)] | |
/** | |
* each change on the queue size will emit a signal | |
*/ | |
private val eventStream: Process[Task, Int] = queue.size.discrete | |
/** | |
* when a `size changed` signal received we can get a bunch of instructions and flush them to the couchbase | |
* dequeueBatch operation just runs once.[all of these operations run in Async and nonblocking mode] | |
*/ | |
eventStream.map(x => { | |
if (x >= batchSize) | |
queue.dequeueBatch(batchSize).map(x => flush(x)).once.run.run | |
}).run.runAsync(_ => ()) | |
/** | |
* a method to add an increment request to the operations queue | |
* @param key key name | |
* @param by increment by | |
*/ | |
def inc(key: String, by: Long): Unit = { | |
queue.enqueueOne((key, by)).run | |
} | |
def incAll(seq: Seq[(String, Long)]): Unit = { | |
queue.enqueueAll(seq).run | |
} | |
/** | |
* fluch the increment request in the batch to the couchbase | |
* this methods runs in an Async mode and can emit the alerts whenever results came back | |
* @param cache | |
*/ | |
private def flush(cache: Seq[(String, Long)]): Unit = { | |
if (cache.size > 0) { | |
mutex.incrementAndGet() | |
Observable.from(cache).flatMap(new Func1[(String, Long), Observable[JsonLongDocument]] { | |
override def call(record: (String, Long)): Observable[JsonLongDocument] = { | |
bucket.counter(record._1, 1l, 1l) | |
} | |
}).doOnEach(new Observer[JsonLongDocument] { | |
override def onCompleted(): Unit = { | |
mutex.decrementAndGet() | |
} | |
override def onError(throwable: Throwable): Unit = CouchbaseAsyncSink.logger.error(throwable) | |
override def onNext(t: JsonLongDocument): Unit = { | |
alert(t.id(), t.content()) | |
} | |
}).subscribe() //in the async mode sink must have enough time to complete it`s jobs | |
//performing it in a blocking manner is slow and time consuming but it guaranties that all of the write operations will be completed | |
/*.last() | |
.toBlocking() | |
.single()*/ | |
// println("flushed " + System.currentTimeMillis()) | |
} | |
} | |
//does we need to close the connection? | |
def shutdown(): Boolean = { | |
cluster.disconnect() | |
} | |
def isTerminated(): Boolean = { | |
mutex.get() == 0 | |
} | |
} | |
object CouchbaseAsyncSink { | |
var instance: Option[CouchbaseAsyncSink] = None | |
private val logger: Logger = LogManager.getLogger(classOf[CouchbaseAsyncSink]) | |
val defaultConnectTimeOut = 8000 | |
val defaultRequestBufferSize = 1048576 | |
val defaultKeepAliveInterval = TimeUnit.HOURS.toMillis(1) | |
val defaultBatchSize = 2000 | |
val SEPAH_COUCHBASE_CONNECT_TIMEOUT: String = "spark.couchbase.connect.timeout" | |
val SEPAH_COUCHBASE_REQUEST_BUFFER_SIZE: String = "spark.couchbase.request.buffer.size" | |
val SEPAH_COUCHBASE_KEEP_ALIVE_INTERVAL: String = "spark.couchbase.keep.alive.ainterval" | |
val SEPAH_COUCHBASE_BATCH_SIZE: String = "spark.couchbase.batch.size" | |
def apply(config: Config, alert: (String, Long) => Unit): CouchbaseAsyncSink = { | |
val defaultConfig = config.withFallback { | |
ConfigFactory.empty() | |
.withValue(SEPAH_COUCHBASE_CONNECT_TIMEOUT, ConfigValueFactory.fromAnyRef(defaultConnectTimeOut)) | |
.withValue(SEPAH_COUCHBASE_REQUEST_BUFFER_SIZE, ConfigValueFactory.fromAnyRef(defaultRequestBufferSize)) | |
.withValue(SEPAH_COUCHBASE_KEEP_ALIVE_INTERVAL, ConfigValueFactory.fromAnyRef(defaultKeepAliveInterval)) | |
.withValue(SEPAH_COUCHBASE_BATCH_SIZE, ConfigValueFactory.fromAnyRef(defaultBatchSize)) | |
} | |
/** | |
* CouchbaseCluster creator function literal | |
*/ | |
val f = { | |
val hosts = defaultConfig.getString(SepahStreamConst.SEPAH_COUCHBASE_CONNECT_IP_LIST) | |
val env = DefaultCouchbaseEnvironment.builder() | |
.connectTimeout(defaultConfig.getLong(SEPAH_COUCHBASE_CONNECT_TIMEOUT) ) | |
.keepAliveInterval(defaultConfig.getLong(SEPAH_COUCHBASE_KEEP_ALIVE_INTERVAL)) | |
.requestBufferSize(defaultConfig.getInt(SEPAH_COUCHBASE_REQUEST_BUFFER_SIZE)) //to prevent BackpressureException on heavy load, bufferSize must be a power of 2 (default is : 2^14 , now is : 2^20) | |
.build() | |
CouchbaseCluster.create(env, hosts.split(",").toList) | |
} | |
//if we want singleton object | |
synchronized { | |
instance match { | |
case x: Some[CouchbaseAsyncSink] => x.get | |
case None => instance = Some(new CouchbaseAsyncSink(defaultConfig.getInt(SEPAH_COUCHBASE_BATCH_SIZE))(alert)(f)); instance.get | |
} | |
} | |
//if we dont want singleton object | |
//new CouchbaseAsyncSink(f, alert) | |
} | |
//this method would be used in test suites only | |
def apply(alert: (String, Long) => Unit): CouchbaseAsyncSink = { | |
var map: Map[String, String] = Map() | |
map += (SepahStreamConst.SEPAH_COUCHBASE_CONNECT_IP_LIST -> "192.168.1.44") | |
val config = ConfigFactory.parseMap(map) | |
apply(config, alert) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment