Last active
August 22, 2017 00:39
-
-
Save quelgar/03e9f7da8a3075a69c9580791a309dd3 to your computer and use it in GitHub Desktop.
Testing accepting TCP connections with Monix NIO.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package monix.rsocket.tcp | |
import java.nio.charset.StandardCharsets | |
import monix.eval.{Callback, Task} | |
import monix.reactive.Observable | |
import monix.nio.tcp._ | |
import monix.execution.Scheduler.Implicits.global | |
/** | |
* Test TCP server that keeps accepting more client connections. | |
* | |
* Two problems: | |
* | |
* 1) Can't figure out how to shutdown the accept loop cleanly. | |
* Any attempt to close the `TaskServerSocketChannel` when it's suspended in the `accept` call results in a | |
* `java.nio.channels.AsynchronousCloseException`. The program never exits when main finishes. | |
* On my Mac, I end up with two running threads in `KQueue.keventPoll`. | |
* | |
* 2) Current code runs the actual work synchronously within the accept loop. Need to figure out the best way | |
* to run it asynchronously so we don't prevent accepting new connections. | |
*/ | |
object MonixNioTest { | |
def main(args: Array[String]): Unit = { | |
val serverTask = asyncServer(java.net.InetAddress.getByName(null).getHostName, 9001) | |
def accept(server: TaskServerSocketChannel): Task[TaskServerSocketChannel] = { | |
for { | |
socket <- { | |
println("Accept") | |
server.accept() | |
} | |
clientChannel = { | |
println(s"Accepted socket: $socket") | |
readWriteAsync(socket) | |
} | |
reader <- clientChannel.tcpObservable | |
writer <- clientChannel.tcpConsumer | |
length <- reader.map { | |
array => | |
('['.toByte +: array :+ ':'.toByte) ++ (array.reverse ++ "] ".getBytes(StandardCharsets.UTF_8)) | |
}.doOnTerminateEval(_ => clientChannel.stopWriting()).consumeWith(writer) | |
_ <- accept(server) | |
} yield { | |
println(s"Wrote $length bytes") | |
server | |
} | |
} | |
def serverProgram = serverTask.flatMap { | |
server => | |
accept(server).doOnCancel(Task.defer(server.close())) | |
} | |
def clientProgram = { | |
val client = readWriteAsync("localhost", 9001, 256 * 1024) | |
for { | |
writer <- client.tcpConsumer | |
reader <- client.tcpObservable | |
written <- Observable.range(1000L, 1005L).map(i => i.toString.getBytes(StandardCharsets.UTF_8)).consumeWith(writer) | |
_ <- client.stopWriting().doOnFinish(x => Task.now(println(s"Client stop writing done: $x"))) | |
_ <- reader.doOnTerminateEval { | |
x => | |
println(s"Reader terminated: $x") | |
client.close() | |
}.foreachL { | |
array => | |
println(s"client got: ${new String(array, StandardCharsets.UTF_8)}") | |
} | |
_ <- client.close().doOnFinish(x => Task.now(println(s"Client close done: $x"))) | |
} yield { | |
written | |
} | |
} | |
val serverFuture = serverProgram.runAsync(new Callback[TaskServerSocketChannel] { | |
override def onSuccess(value: TaskServerSocketChannel): Unit = { | |
println(s"Server program done") | |
} | |
override def onError(ex: Throwable): Unit = { | |
println("Server error") | |
ex.printStackTrace() | |
} | |
}) | |
sys.addShutdownHook { | |
println("Shutdown hook running") | |
serverFuture.cancel() | |
} | |
clientProgram.zipMap(clientProgram)(_ + _).runAsync(new Callback[Long] { | |
override def onError(ex: Throwable): Unit = { | |
ex.printStackTrace() | |
} | |
override def onSuccess(value: Long): Unit = { | |
println(s"Client wrote $value bytes") | |
} | |
}) | |
while (scala.io.StdIn.readLine("Q to quit:").toUpperCase() != "Q") { | |
} | |
serverFuture.cancel() // is there a better way? | |
println("Main done!") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment