Skip to content

Instantly share code, notes, and snippets.

@mccv
Created May 14, 2011 15:55
Show Gist options
  • Save mccv/972337 to your computer and use it in GitHub Desktop.
Save mccv/972337 to your computer and use it in GitHub Desktop.
FinagleHands
import com.twitter.concurrent.{ChannelSource, Channel}
import com.twitter.util.Future
class ReliableChannel[A](f: => Channel[B]) extends ChannelSource[A] {
this.serialized {
val that = f
val observer = that.respond { msg =>
Future.join(this.send(msg))
}
that.closes ensure {
add(f)
}
}
}
// build up a new ReliableChannel for each kestrel host
// ReliableChannels just reconnect when a connection is dropped
val channels = config.kestrelHosts.map { host =>
val kestrelFactory = ClientBuilder()
.codec(new Kestrel)
.hosts(host)
.hostConnectionLimit(1)
.buildFactory()
new ReliableChannel({
val client = KestrelClient(kestrelFactory)
client.from(config.queueName, config.pollMs.milliseconds)
})
}
// this merges all the reliable channels into a single channel.
// this combined channel should be reliable given all its mergees
// are reliable
val combinedChannels = channels.foldLeft(new Channel[ChannelBuffer) { channel =>
rootChannel.merge(channel)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment