Last active
July 15, 2016 19:16
-
-
Save spockz/ebf0ccb47c912a21b924efc45d120859 to your computer and use it in GitHub Desktop.
Netty3 version of HttpProxyConnectHandler
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.twitter.finagle.netty3.proxy | |
import com.twitter.finagle._ | |
import com.twitter.finagle.client.Transporter | |
import com.twitter.finagle.client.Transporter.Credentials | |
import com.twitter.io.Charsets | |
import com.twitter.util.Base64StringEncoder | |
import java.net.{InetSocketAddress, SocketAddress} | |
import java.util.concurrent.atomic.AtomicReference | |
import org.jboss.netty.channel._ | |
import org.jboss.netty.handler.codec.http._ | |
import org.jboss.netty.handler.queue.BufferedWriteHandler | |
/** | |
* An internal handler that upgrades the pipeline to delay connect-promise satisfaction until the | |
* remote HTTP proxy server is ready to proxy traffic to an ultimate destination represented as | |
* `host` (i.e., HTTP proxy connect procedure is successful). | |
* | |
* This enables "Tunneling TCP-based protocols (i.e., TLS/SSL) through Web proxy servers" [1] and | |
* may be used with any TCP traffic, not only HTTP(S). See Squid documentation on this feature [2]. | |
* | |
* @note We don't use Netty's implementation [3] here because it supports an opposite direction: the | |
* destination passed to `Channel.connect` is an ultimate target and the `HttpProxyHandler` | |
* is supposed to replace it with proxy addr (represented as a `SocketAddress`). This is the | |
* exact approach we used for Netty 3 implementation, but we don't do that anymore because we | |
* don't want to bypass Finagle's load balancers while resolving the proxy endpoint. | |
* @note This mixes in a [[BufferingChannelOutboundHandler]] so we can protect ourselves from | |
* channel handlers that write on `channelAdded` or `channelActive`. | |
* | |
* [1]: http://www.web-cache.com/Writings/Internet-Drafts/draft-luotonen-web-proxy-tunneling-01.txt | |
* [2]: http://wiki.squid-cache.org/Features/HTTPS | |
* [3]: https://github.com/netty/netty/blob/4.1/handler-proxy/src/main/java/io/netty/handler/proxy/HttpProxyHandler.java | |
* @param host the ultimate host a remote proxy server connects to | |
* @param credentialsOption optional credentials for a proxy server | |
*/ | |
private[netty3] class HttpProxyConnectHandler( | |
host: String, | |
credentialsOption: Option[Transporter.Credentials], | |
httpClientCodec: ChannelHandler = new HttpClientCodec()) // exposed for testing | |
extends BufferedWriteHandler | |
// with ConnectPromiseDelayListeners | |
{ self => | |
private[this] val httpCodecKey: String = "http proxy client codec" | |
// private[this] var connectPromise: ChannelPromise = _ | |
private[this] val connectFuture = new AtomicReference[ChannelFuture](null) | |
private[this] def proxyAuthorizationHeader(c: Credentials): String = { | |
val bytes = "%s:%s".format(c.username, c.password).getBytes(Charsets.Utf8) | |
"Basic " + Base64StringEncoder.encode(bytes) | |
} | |
private[this] def fail(ctx: ChannelHandlerContext, t: SocketAddress => Throwable): Unit = { | |
// We "try" because it might be already cancelled and we don't need to handle | |
// cancellations here - it's already done by `proxyCancellationsTo`. | |
connectFuture.get().setFailure(t(ctx.getChannel.getRemoteAddress)) | |
} | |
private[this] def fail(c: Channel, t: SocketAddress => Throwable) { | |
Option(connectFuture.get) foreach { _.setFailure(t(c.getRemoteAddress)) } | |
Channels.close(c) | |
} | |
override def connectRequested(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = { | |
println("entering connectRequested") | |
e match { | |
case de: DownstreamChannelStateEvent => | |
if (!connectFuture.compareAndSet(null, e.getFuture)) { | |
fail(ctx.getChannel, (remoteAddress : SocketAddress) => new InconsistentStateException(remoteAddress)) | |
return | |
} | |
// proxy cancellation | |
val wrappedConnectFuture = Channels.future(de.getChannel, true) | |
de.getFuture.addListener(new ChannelFutureListener { | |
def operationComplete(f: ChannelFuture) { | |
if (f.isCancelled) | |
wrappedConnectFuture.cancel() | |
} | |
}) | |
// Proxy failures here so that if the connect fails, it is | |
// propagated to the listener, not just on the channel. | |
wrappedConnectFuture.addListener(new ChannelFutureListener { | |
def operationComplete(f: ChannelFuture) { | |
if (f.isSuccess || f.isCancelled) { | |
return | |
} | |
fail(f.getChannel, (_: SocketAddress) => f.getCause) | |
} | |
}) | |
// We propagate the pipeline with a new promise thereby delaying the original connect's | |
// satisfaction. | |
// TODO: Figure out why the remoteAddress is | |
val wrappedEvent = new DownstreamChannelStateEvent( | |
de.getChannel, wrappedConnectFuture, | |
de.getState, de.getValue) | |
println(s"Connecting to ${wrappedEvent.getChannel.getRemoteAddress} with $wrappedEvent") | |
super.connectRequested(ctx, wrappedEvent) | |
} | |
} | |
override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = { | |
println("channelConnected") | |
if (connectFuture.get eq null) { | |
fail(ctx, (remoteAddress: SocketAddress) => new InconsistentStateException(remoteAddress)) | |
return | |
} | |
// Add HTTP client codec so we can talk to an HTTP proxy. | |
ctx.getPipeline().addBefore(ctx.getName(), httpCodecKey, httpClientCodec) | |
val httpProxyConnectRequestFuture = Channels.future(e.getChannel, true) | |
// Create new connect HTTP proxy connect request. | |
val req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.CONNECT, host) | |
req.headers().set(HttpHeaders.Names.HOST, host) | |
credentialsOption.foreach(c => | |
req.headers().add(HttpHeaders.Names.PROXY_AUTHORIZATION, proxyAuthorizationHeader(c)) | |
) | |
println(s"Sending Connect message downstream [$req]") | |
ctx.sendDownstream(new DownstreamMessageEvent(ctx.getChannel, httpProxyConnectRequestFuture, req, ctx.getChannel.getRemoteAddress)) | |
// httpProxyConnectRequestFuture.addListener(???) | |
// proxy cancellations again. | |
connectFuture.get.addListener(new ChannelFutureListener { | |
override def operationComplete(f: ChannelFuture): Unit = | |
if (f.isCancelled) { | |
fail(ctx.getChannel, (remoteAddress : SocketAddress) =>new ChannelClosedException(remoteAddress)) | |
} | |
}) | |
//perhaps read is necessary | |
} | |
override def messageReceived(ctx: ChannelHandlerContext, messageEvent: MessageEvent): Unit = { | |
println(s"Received Message: $messageEvent") | |
messageEvent.getMessage match { | |
case rep: DefaultHttpResponse => | |
println(s"Received response: ${rep.toString}") | |
// A remote HTTP proxy is ready to proxy traffic to an ultimate destination. We no longer | |
// need HTTP proxy pieces in the pipeline. | |
if (rep.getStatus == HttpResponseStatus.OK) { | |
ctx.getPipeline.remove(httpCodecKey) | |
ctx.getPipeline.remove(self) // drains pending writes when removed | |
connectFuture.get.setSuccess() | |
// We don't release `req` since by specs, we don't expect any payload sent back from a | |
// a web proxy server. | |
} else { | |
val failure = (remote: SocketAddress) => new ConnectionFailedException( | |
Failure(s"Unexpected status returned from an HTTP proxy server: ${rep.getStatus()}."), | |
remote | |
) | |
fail(ctx, failure) | |
ctx.getChannel.close() | |
} | |
case _ => ctx.sendUpstream(messageEvent) | |
// case other => ctx.fireChannelRead(other) | |
} | |
} | |
// | |
// override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = { | |
// fail(ctx, (_:SocketAddress) => cause) | |
//// TODO: What to do with: ctx.fireExceptionCaught(cause) // we don't call super.exceptionCaught since we've already filed | |
// // both connect promise and pending writes in `fail` | |
// | |
//// TODO: What to do with: ctx.close() // close a channel since we've failed to perform an HTTP proxy handshake | |
// } | |
// override def channelInactive(ctx: ChannelHandlerContext): Unit = { | |
// fail(ctx, new ChannelClosedException(ctx.getChannel().getRemoteAddress())) | |
// ctx.fireChannelInactive() | |
// } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment