-
-
Save HelloGrayson/ca4f6b9c17861327aa8a to your computer and use it in GitHub Desktop.
# raw | |
response = yield tchannel.call( | |
argscheme="raw", | |
service="someservice", | |
endpoint="something", | |
body="RAWRESPONSE", | |
headers={ | |
'X-Source': 'geo' | |
}, | |
ttl=1000 | |
) | |
# json | |
response = yield tchannel.call( | |
argscheme = "json" | |
service="someservice", | |
endpoint="maps", | |
body="{ | |
'lat': 100, | |
'lng': 140 | |
}", | |
headers={ | |
'X-Source': 'geo' | |
}, | |
ttl=1000 | |
) | |
# thrift | |
Foo = get_tchannel_thrift(FooThrift, 'fooservice') | |
response = yield tchannel.call( | |
argscheme="thrift", | |
"some_binary", | |
headers={ | |
'X-Source': 'geo' | |
}, | |
ttl=1000 | |
) | |
response = yield tchannel.call_thrift( | |
Foo.getBar(Foo.BarResponse("hi")), | |
headers={ | |
'X-Source': 'geo' | |
}, | |
timeout=5000 | |
) | |
response = yield tchannel.as_thrift().call( | |
Foo.getBar(Foo.BarResponse("hi")), | |
headers={ | |
'X-Source': 'geo' | |
}, | |
timeout=5000 | |
) | |
response = yield tchannel.thrift.call( | |
Foo.getBar(Foo.BarResponse("hi")), | |
headers={ | |
'X-Source': 'geo' | |
}, | |
timeout=5000 | |
) | |
future = tchannel_sync.call( | |
Foo.baz(True), | |
headers={ | |
'X-Source': 'geo' | |
}, | |
ttl=1000 | |
) |
Foo = get_tchannel_thrift(FooThrift, 'fooservice')
Probably need a better name for that. Also, keep in mind that the service name will match the module name. So, the user will end up with:
from my_service import Foo
FooRequest = get_tchannel_thrift(Foo)
(We can't reuse the Foo
name because we want the user to be able to access the types defined in Foo
.)
Also, I think we should try to be more explicit instead of magically determining the as
header. What are your thoughts on:
root_tchannel = TChannel(..)
# The default TChannel object defaults to "as": "raw"
tchannel = root_tchannel.as_json()
# tchannel is a proxy object that forwards calls to as_json except it always adds the "as":"json" header
# and encodes/decodes JSON
tchannel = root_tchannel.as_http()
# same as above but for HTTP
tchannel = root_tchannel.as_thrift()
The .call
will remain the same as you designed otherwise.
This is how gRPC does it:
Given the following IDL:
syntax = "proto3";
option java_package = "io.grpc.examples";
package helloworld;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
Then you generate the client which looks a bit like this:
import helloworld_pb2
_TIMEOUT_SECONDS = 10
def run():
with helloworld_pb2.early_adopter_create_Greeter_stub('localhost', 50051) as stub:
response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'), _TIMEOUT_SECONDS)
print "Greeter client received: " + response.message
if __name__ == '__main__':
run()
From what I can tell, when you generate a stub, it breaks the 1:1 relationship between the IDL procedure and the actual parameters. Notice how _TIMEOUT_SECONDS
exists as a parameter above but not in the definition:
rpc SayHello (HelloRequest) returns (HelloReply) {}
This is probably acceptable in Protobufs because you only are allowed to have 1 parameter, some type of Request struct.
For Thrift, it's a bit weird for us to be adding additional params that don't map to definition.
sources:
Finagle's is brutal:
- create filters
- add to stack
- write current stack to header buffer
- use builder to create client with stack
- filter stack applies to each request
So with a trace filter they can set zipkin headers for each request.
val tracer = mock[Tracer]
//tracer.sampleTrace(any(classManifest[TraceId])) returns Some(true)
when(tracer.sampleTrace(any(classOf[TraceId]))).thenReturn(Some(true))
val filter = new TTwitterClientFilter("service", true, None, protocolFactory)
val buffer = new OutputBuffer(protocolFactory)
buffer().writeMessageBegin(
new TMessage(ThriftTracing.CanTraceMethodName, TMessageType.CALL, 0))
val options = new thrift.ConnectionOptions
options.write(buffer())
buffer().writeMessageEnd()
val tracing = new TracingFilter[ThriftClientRequest, Array[Byte]](tracer, "TTwitterClientFilterTest")
val service = mock[Service[ThriftClientRequest, Array[Byte]]]
val _request = ArgumentCaptor.forClass(classOf[ThriftClientRequest])
when(service(_request.capture)).thenReturn(Future(Array[Byte]()))
val stack = tracing andThen filter
stack(new ThriftClientRequest(buffer.toArray, false), service)
val header = new thrift.RequestHeader
InputBuffer.peelMessage(_request.getValue.message, header, protocolFactory)
assert(header.isSampled)
sources:
- https://github.com/twitter/finagle/blob/master/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/TTwitterClientFilterTest.scala
- https://github.com/twitter/finagle/blob/master/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/EndToEndTest.scala
- https://github.com/jghoman/finagle-java-example/blob/master/src/main/java/jghoman/Client.java
Wow. no Finagle.
Based on discussion earlier, this is how we expect the client-side streaming API to look:
Response streaming
We'll introduce a stream
method on the TChannel
object and its argscheme-specific proxies that will accept the same arguments as call
. Instead of a standard response, it'll return a streaming response object which provides a .read()
method.
response = tchannel.stream(service='foo', endpoint='bar', body='listItems')
headers = response.headers
try:
while True:
chunk = yield response.read()
process(chunk)
except EndOfStream:
pass
Request streaming
We'll have a body_producer
parameter on both call
and stream
. body_producer
may be passed in lieu of the body
. It will be a function that accepts a write
function and calls it to write to the stream. The function must be a coroutine, or return a function that resolves to None when it finishes writing.
@gen.coroutine
def producer(write):
for line in some_file:
yield write(line)
response = yield tchannel.call(endpoint='foo', service='bar', body_producer=producer)
# Bidirectional streaming:
response = yield tchannel.stream(endpoint='foo', service='bar', body_producer=producer)
try:
while True:
chunk = yield response.read()
process(chunk)
except EndOfStream:
pass
Suggestion: make the streamed response iterable, e.g. for thing in response
.
@blampe +1
I love that most thrift calls would look like: