Last active
August 29, 2015 13:57
-
-
Save benkolera/9384051 to your computer and use it in GitHub Desktop.
scalaz-stream Exchange.run
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scalaz._ | |
import scalaz.stream._ | |
import scalaz.stream.actor._ | |
object Foo { | |
// I need to create a bidirectional link between a websocket and a tcp socket. | |
// To do the TCP side, I was hoping to use the new NIO stream from scalaz-stream (snapshot-0.4) | |
// The problem that I am hitting, though, is that I will have the requirement | |
// to need to be able to close the socket from outside the stream (because the websocket has | |
// been disconnected.) I can't seem to find a way to do that. :( | |
def foo () = { | |
val q = async.boundedQueue[String]() // This is bytes in the real version | |
val conn = nio.connect(new java.net.InetSocketAddress("localhost",1337)) | |
val out = conn.flatMap{ | |
_.run( | |
q.dequeue |> process1.utf8Encode.map( Bytes.of(_) ) | |
).map( | |
_.decode() | |
).to( | |
websocketSink | |
) | |
} | |
(q,out) | |
} | |
var done = false | |
val websocketSink = { | |
// The real version writes to the websocket and throw Process.End if the websocket | |
// is gone. Closing the connection this way works fine, but it depends on the socket | |
// emitting something to the sink. | |
io.channel((s: String) => Task.delay { if ( done) throw Process.End else println(s) } ) | |
} | |
// NOTE: All of these are just connecting to socat which echos the same string back: | |
// socat -v PIPE:STDOUT TCP4-LISTEN:1337 | |
// > val (q,p) = Foo.foo | |
// > p.run.runAsync( println _ ) | |
// > q.enqueueOne( "foobar" ) // Sends the message into the socket and I get a response | |
// I'd really like to just be able to close the input queue to shutdown the connection: | |
// > q.close.run | |
// But that just shuts down the input but the connection is still open and receiving | |
// (which is as expected given the comment on Exchange.run) | |
// Given that I can't seem to close the process from the input queue inputs, is there | |
// a way to shutdown the entire process / socket from the outside? Notably the cleanup | |
// of the toplevel process does have a cleanup that will release the channel | |
// (https://github.com/scalaz/scalaz-stream/blob/master/src/main/scala/scalaz/stream/nio/package.scala#L98) | |
// but calling p.cleanup.run.run doesn't work to shut the connection. | |
// Strangely enough, this does: | |
// > val (q,p) = Foo.foo | |
// > q.enqueueOne( "foo" ).run | |
// > val s = p.runStep.run | |
// > s.tail.cleanup.run.run | |
// > s.tail.run.run <== Connection gets killed here. Possibly because I broke the | |
// channel and the awaits that run on the next step get messed up. | |
// So there is obviously some way and I'm just a noob. If someone can point out my noobiness that'd be | |
// wonderful. :) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment