Skip to content

Instantly share code, notes, and snippets.

@benkolera
Last active August 29, 2015 13:57
Show Gist options
  • Save benkolera/9384051 to your computer and use it in GitHub Desktop.
Save benkolera/9384051 to your computer and use it in GitHub Desktop.
scalaz-stream Exchange.run
import scalaz._
import scalaz.stream._
import scalaz.stream.actor._
object Foo {
// I need to create a bidirectional link between a websocket and a tcp socket.
// To do the TCP side, I was hoping to use the new NIO stream from scalaz-stream (snapshot-0.4)
// The problem that I am hitting, though, is that I will have the requirement
// to need to be able to close the socket from outside the stream (because the websocket has
// been disconnected.) I can't seem to find a way to do that. :(
def foo () = {
val q = async.boundedQueue[String]() // This is bytes in the real version
val conn = nio.connect(new java.net.InetSocketAddress("localhost",1337))
val out = conn.flatMap{
_.run(
q.dequeue |> process1.utf8Encode.map( Bytes.of(_) )
).map(
_.decode()
).to(
websocketSink
)
}
(q,out)
}
var done = false
val websocketSink = {
// The real version writes to the websocket and throw Process.End if the websocket
// is gone. Closing the connection this way works fine, but it depends on the socket
// emitting something to the sink.
io.channel((s: String) => Task.delay { if ( done) throw Process.End else println(s) } )
}
// NOTE: All of these are just connecting to socat which echos the same string back:
// socat -v PIPE:STDOUT TCP4-LISTEN:1337
// > val (q,p) = Foo.foo
// > p.run.runAsync( println _ )
// > q.enqueueOne( "foobar" ) // Sends the message into the socket and I get a response
// I'd really like to just be able to close the input queue to shutdown the connection:
// > q.close.run
// But that just shuts down the input but the connection is still open and receiving
// (which is as expected given the comment on Exchange.run)
// Given that I can't seem to close the process from the input queue inputs, is there
// a way to shutdown the entire process / socket from the outside? Notably the cleanup
// of the toplevel process does have a cleanup that will release the channel
// (https://github.com/scalaz/scalaz-stream/blob/master/src/main/scala/scalaz/stream/nio/package.scala#L98)
// but calling p.cleanup.run.run doesn't work to shut the connection.
// Strangely enough, this does:
// > val (q,p) = Foo.foo
// > q.enqueueOne( "foo" ).run
// > val s = p.runStep.run
// > s.tail.cleanup.run.run
// > s.tail.run.run <== Connection gets killed here. Possibly because I broke the
// channel and the awaits that run on the next step get messed up.
// So there is obviously some way and I'm just a noob. If someone can point out my noobiness that'd be
// wonderful. :)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment