Skip to content

Instantly share code, notes, and snippets.

@fairjm
Last active August 29, 2015 14:04
Show Gist options
  • Save fairjm/a74d863491812b176e66 to your computer and use it in GitHub Desktop.
Save fairjm/a74d863491812b176e66 to your computer and use it in GitHub Desktop.
package laozhao
import java.util.concurrent.Semaphore
trait IBuffer {
def flush: Unit
}
class Item {
val semaphore = new Semaphore(0)
def first: Unit = {
println("first")
semaphore.release()
}
def second: Unit = {
semaphore.acquire()
println("second")
}
}
class FirstBuffer extends IBuffer {
val _gate = new Object
private var _items: List[Item] = List.empty[Item]
def add(item: Item) = {
_gate.synchronized {
_items = (item :: _items.reverse).reverse
}
}
override def flush: Unit = {
var items: List[Item] = null
_gate.synchronized {
items = _items
_items = List.empty[Item]
}
if (items != null) {
for (item <- items) {
item.first
}
}
}
}
class SecondBuffer extends IBuffer {
val _gate = new Object
private var _items: List[Item] = List.empty[Item]
def add(item: Item) = {
_gate.synchronized {
_items = (item :: _items.reverse).reverse
}
}
override def flush: Unit = {
var items: List[Item] = null
_gate.synchronized {
items = _items
_items = List.empty[Item]
}
if (items != null) {
for (item <- items) {
item.second
}
}
}
}
object Buffer {
implicit def block2runnable(b: () => Unit): Runnable = {
new Runnable() {
def run() = b()
}
}
def main(args: Array[String]): Unit = {
val firstBuffer = new FirstBuffer()
val secondBuffer = new SecondBuffer()
new Thread(() => {
while (true) {
val item = new Item
firstBuffer.add(item)
secondBuffer.add(item)
Thread.sleep(1)
}
}).start()
new Thread(() => {
while (true) {
firstBuffer.flush
Thread.sleep(100)
}
}).start()
new Thread(() => {
while (true) {
secondBuffer.flush
Thread.sleep(100)
}
}).start()
}
}
package laozhao
import scala.collection.concurrent.TrieMap
trait IBuffer {
def flush: Unit
}
class Item {
def first: Unit = {
println(s"$this : first")
}
def second: Unit = {
println(s"$this : second")
}
}
class FirstBuffer(val map: TrieMap[String, Boolean]) extends IBuffer {
val _gate = new Object
private var _items: List[Item] = List.empty[Item]
def add(item: Item) = {
_gate.synchronized {
_items = (item :: _items.reverse).reverse
}
}
override def flush: Unit = {
var items: List[Item] = List.empty[Item]
_gate.synchronized {
items = _items
_items = List.empty[Item]
}
if (!items.isEmpty) {
for (item <- items) {
item.first
map.update(item.toString(), true)
}
}
}
}
class SecondBuffer(val map: TrieMap[String, Boolean]) extends IBuffer {
val _gate = new Object
private var _items: List[Item] = List.empty[Item]
def add(item: Item) = {
_gate.synchronized {
_items = (item :: _items.reverse).reverse
}
}
override def flush: Unit = {
var items: List[Item] = List.empty[Item]
_gate.synchronized {
items = _items
_items = List.empty[Item]
}
while (!items.isEmpty) {
var pending = List.empty[Item]
for (item <- items) {
val s = item.toString
if (!map.getOrElse(item.toString(), false)) {
pending = item :: pending
} else {
item.second
map.remove(item.toString)
}
}
items = pending
}
}
}
object Buffer {
implicit def block2runnable(b: () => Unit): Runnable = {
new Runnable() {
def run() = b()
}
}
def main(args: Array[String]): Unit = {
val map = new TrieMap[String, Boolean]
val firstBuffer = new FirstBuffer(map)
val secondBuffer = new SecondBuffer(map)
new Thread(() => {
while (true) {
val item = new Item
firstBuffer.add(item)
secondBuffer.add(item)
Thread.sleep(1)
}
}).start()
new Thread(() => {
while (true) {
firstBuffer.flush
Thread.sleep(100)
}
}).start()
new Thread(() => {
while (true) {
secondBuffer.flush
Thread.sleep(100)
}
}).start()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment