Last active
December 20, 2015 21:39
-
-
Save pchlupacek/6199638 to your computer and use it in GitHub Desktop.
Attempt to integrate process with outside world in nonblocking way
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.spinoco.util.streams | |
import java.util.concurrent.{LinkedBlockingQueue, ConcurrentLinkedQueue} | |
import scalaz.stream.Process | |
import scalaz.stream.processes._ | |
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} | |
import scalaz.concurrent.Task | |
import scalaz.\/ | |
import scalaz.syntax.id._ | |
import java.util | |
/** | |
* | |
* User: pach | |
* Date: 8/9/13 | |
* Time: 5:53 PM | |
* (c) 2011-2013 Spinoco Czech Republic, a.s. | |
*/ | |
/** | |
* Asynchronous Queue that allows asynchronously push to the Process. Items from the | |
* Queue can be only taken by the accompanying `Process[Task,A]`. | |
* | |
* Source process would not block on the queue when the queue is empty, but will process | |
* any elements present in the queue. | |
* | |
* If the queue is empty, process will register its call back that will eventually make | |
* next step of the process once the new A will appear in the Queue. | |
* | |
* | |
* Also please note that once Source is stopped, queue will reject all enqueue operations | |
* | |
* | |
*/ | |
trait ProcessQueue[A] { | |
/* | |
* This implementation is based on the assumption that process never reads from two threads | |
* simultaneously. | |
*/ | |
import Task._ | |
private[streams] val q: util.Queue[A] | |
private[streams] val cb = new AtomicReference[Option[A => Unit]](None) | |
private[streams] val qs = new AtomicInteger(0) | |
//set to true if, the process has terminated | |
@volatile private[streams] var processTerminated = false | |
/** | |
* Gets the size of the queue. The operation is constant time, but may not be 100% accurate. | |
* Specifically queue size may be slightly larger than actually this method reports | |
* @return | |
*/ | |
def size: Int = qs.get | |
/** | |
* Indicates that the underlying process terminated. The enqueue method will fail when this returns true | |
* @return | |
*/ | |
def terminated: Boolean = processTerminated | |
/** | |
* Multiple threads can enqueue messages to this queue. If the process is not taking | |
* messages from this queue, the thread that enqueue have opportunity to check size of the queue | |
* via [[com.spinoco.util.streams.ProcessQueue.size]] method and eventually throttle the speed | |
* of the insertion. | |
* Alternatively every enqueue operation return estimated size if the enqueue was successful or the exception | |
* indicating that A's cannot get enqueue. | |
* | |
* @param a item to enqueue | |
* @return either current estimated size of the queue after insertion or the failure when | |
* the queue is not ready to accept more elements | |
*/ | |
def enqueue(a: A): Throwable \/ Int = { | |
//to preserve order we must offer in q first | |
if (!processTerminated) { | |
if (addToQueue(a)) { | |
qs.incrementAndGet() | |
tryProcess | |
qs.get.right | |
} else { | |
new IllegalStateException("Maximum size of the queue exceeded").left | |
} | |
} else { | |
new IllegalStateException("Process already terminated").left | |
} | |
} | |
/** | |
* Safely inserts to queue, returning true if insert succeeds, false if not | |
* @param a | |
* @return | |
*/ | |
private[streams] def addToQueue(a: A): Boolean | |
private def tryProcess = { | |
def go(): Unit = { | |
if (q.peek() != null) { | |
cb.getAndSet(None) match { | |
case scb@Some(callback) => | |
q.poll() match { | |
case null => | |
// it very rare scenarios this is valid state so lets put callback back in the position | |
// and retry | |
cb.set(scb) | |
go | |
case a => | |
callback(a) | |
qs.decrementAndGet() | |
} | |
case None => //no callback process must get that from queue | |
} | |
} | |
} | |
go | |
} | |
/** | |
* Produces the process that can be used as source of A's to be further processed. | |
* | |
* @return | |
*/ | |
private[streams] def makeSource: Process[Task, A] = resource[Unit, A](now())(_ => delay { | |
processTerminated = true | |
})({ | |
_ => | |
async { | |
(register: Throwable \/ A => Unit) => | |
val ourCb: Option[A => Unit] = Some(a => register(a.right)) | |
cb.set(ourCb) | |
if (q.peek() != null) { | |
if (cb.compareAndSet(ourCb, None)) { | |
//ok we have 'a' in queue and we are owner of the callback | |
//that means nobody else in the world can get that 'a' and process it | |
//but to be 100% safe we just register case for null situation here | |
q.poll match { | |
case null => register(new RuntimeException("Unexpected state, leaking queue :-)").left) | |
case a => | |
register(a.right) | |
qs.decrementAndGet() | |
} | |
} | |
} | |
} | |
}) | |
} | |
object ProcessQueue { | |
def unbounded[A]: (ProcessQueue[A], Process[Task, A]) = { | |
val pq = new ProcessQueue[A] { | |
private[streams] val q: util.Queue[A] = new ConcurrentLinkedQueue[A]() | |
private[streams] def addToQueue(a: A): Boolean = q.offer(a) | |
} | |
(pq, pq.makeSource) | |
} | |
def bounded[A](maxSize: Int): (ProcessQueue[A], Process[Task, A]) = { | |
val pq = new ProcessQueue[A] { | |
private[streams] val q: util.Queue[A] = new LinkedBlockingQueue[A](maxSize) | |
private[streams] def addToQueue(a: A): Boolean = q.offer(a) | |
} | |
(pq, pq.makeSource) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment