-
-
Save fairjm/a74d863491812b176e66 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package laozhao | |
import java.util.concurrent.Semaphore | |
trait IBuffer { | |
def flush: Unit | |
} | |
class Item { | |
val semaphore = new Semaphore(0) | |
def first: Unit = { | |
println("first") | |
semaphore.release() | |
} | |
def second: Unit = { | |
semaphore.acquire() | |
println("second") | |
} | |
} | |
class FirstBuffer extends IBuffer { | |
val _gate = new Object | |
private var _items: List[Item] = List.empty[Item] | |
def add(item: Item) = { | |
_gate.synchronized { | |
_items = (item :: _items.reverse).reverse | |
} | |
} | |
override def flush: Unit = { | |
var items: List[Item] = null | |
_gate.synchronized { | |
items = _items | |
_items = List.empty[Item] | |
} | |
if (items != null) { | |
for (item <- items) { | |
item.first | |
} | |
} | |
} | |
} | |
class SecondBuffer extends IBuffer { | |
val _gate = new Object | |
private var _items: List[Item] = List.empty[Item] | |
def add(item: Item) = { | |
_gate.synchronized { | |
_items = (item :: _items.reverse).reverse | |
} | |
} | |
override def flush: Unit = { | |
var items: List[Item] = null | |
_gate.synchronized { | |
items = _items | |
_items = List.empty[Item] | |
} | |
if (items != null) { | |
for (item <- items) { | |
item.second | |
} | |
} | |
} | |
} | |
object Buffer { | |
implicit def block2runnable(b: () => Unit): Runnable = { | |
new Runnable() { | |
def run() = b() | |
} | |
} | |
def main(args: Array[String]): Unit = { | |
val firstBuffer = new FirstBuffer() | |
val secondBuffer = new SecondBuffer() | |
new Thread(() => { | |
while (true) { | |
val item = new Item | |
firstBuffer.add(item) | |
secondBuffer.add(item) | |
Thread.sleep(1) | |
} | |
}).start() | |
new Thread(() => { | |
while (true) { | |
firstBuffer.flush | |
Thread.sleep(100) | |
} | |
}).start() | |
new Thread(() => { | |
while (true) { | |
secondBuffer.flush | |
Thread.sleep(100) | |
} | |
}).start() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package laozhao | |
import scala.collection.concurrent.TrieMap | |
trait IBuffer { | |
def flush: Unit | |
} | |
class Item { | |
def first: Unit = { | |
println(s"$this : first") | |
} | |
def second: Unit = { | |
println(s"$this : second") | |
} | |
} | |
class FirstBuffer(val map: TrieMap[String, Boolean]) extends IBuffer { | |
val _gate = new Object | |
private var _items: List[Item] = List.empty[Item] | |
def add(item: Item) = { | |
_gate.synchronized { | |
_items = (item :: _items.reverse).reverse | |
} | |
} | |
override def flush: Unit = { | |
var items: List[Item] = List.empty[Item] | |
_gate.synchronized { | |
items = _items | |
_items = List.empty[Item] | |
} | |
if (!items.isEmpty) { | |
for (item <- items) { | |
item.first | |
map.update(item.toString(), true) | |
} | |
} | |
} | |
} | |
class SecondBuffer(val map: TrieMap[String, Boolean]) extends IBuffer { | |
val _gate = new Object | |
private var _items: List[Item] = List.empty[Item] | |
def add(item: Item) = { | |
_gate.synchronized { | |
_items = (item :: _items.reverse).reverse | |
} | |
} | |
override def flush: Unit = { | |
var items: List[Item] = List.empty[Item] | |
_gate.synchronized { | |
items = _items | |
_items = List.empty[Item] | |
} | |
while (!items.isEmpty) { | |
var pending = List.empty[Item] | |
for (item <- items) { | |
val s = item.toString | |
if (!map.getOrElse(item.toString(), false)) { | |
pending = item :: pending | |
} else { | |
item.second | |
map.remove(item.toString) | |
} | |
} | |
items = pending | |
} | |
} | |
} | |
object Buffer { | |
implicit def block2runnable(b: () => Unit): Runnable = { | |
new Runnable() { | |
def run() = b() | |
} | |
} | |
def main(args: Array[String]): Unit = { | |
val map = new TrieMap[String, Boolean] | |
val firstBuffer = new FirstBuffer(map) | |
val secondBuffer = new SecondBuffer(map) | |
new Thread(() => { | |
while (true) { | |
val item = new Item | |
firstBuffer.add(item) | |
secondBuffer.add(item) | |
Thread.sleep(1) | |
} | |
}).start() | |
new Thread(() => { | |
while (true) { | |
firstBuffer.flush | |
Thread.sleep(100) | |
} | |
}).start() | |
new Thread(() => { | |
while (true) { | |
secondBuffer.flush | |
Thread.sleep(100) | |
} | |
}).start() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment