Skip to content

Instantly share code, notes, and snippets.

@zombiezen
Last active August 29, 2015 14:26
Show Gist options
  • Save zombiezen/2878a6085b48486d4ccf to your computer and use it in GitHub Desktop.
Save zombiezen/2878a6085b48486d4ccf to your computer and use it in GitHub Desktop.
go-capnproto call ordering
// RPC implementation of calling a local vat method.
// This is a simplified version of the code in server.go:45.
// go-capnproto doesn't reflect it now, but I was going to make callOnClient serialized.
func (s *server) callOnClient(call *Call) Answer {
// Find the function for the call's method
fn := s.methods.find(call.Method)
// Build a new answer/future.
ans := newServerAnswer()
// Run the implementation in a separate goroutine.
go func() {
results := NewStruct(call.ObjectSize)
err := fn(call.Ctx, call.Options, call.Params, results)
if err == nil {
ans.resolve(ImmediateAnswer(results))
} else {
ans.resolve(ErrorAnswer(err))
}
}()
return ans
}
// A typical locally implemented capability function.
func (s *myStream) Write(
ctx context.Context,
opts capnp.CallOptions,
params Writer_write_Params,
results Writer_write_Results) error {
// do the write
return nil
}
// I could have the implementation methods return a future...
// but really all I'm signaling is when the next message should be accepted.
// Also, now local capability implementers have to worry about "blocking" the RPC goroutine.
// This isn't something a Go programmer naturally worries about.
func (s *server) callOnClient(call *Call) Answer {
// Find the function for the call's method
fn := s.methods.find(call.Method)
return fn(call.Ctx, call.Options, call.Params)
}
// A typical locally implemented capability function.
func (s *myStream) Write(
ctx context.Context,
opts capnp.CallOptions,
params Writer_write_Params) capnp.Answer {
ans := newServerAnswer()
go func() {
// do the write
results := NewWriter_write_Results(capnp.NewSegment(nil))
ans.resolve(results)
}()
return ans
}
// Another possibility is to pass in a channel to
// directly indicate when the next message can be read...
// but again, this is a weird decision to give application methods.
func (s *server) callOnClient(call *Call) Answer {
// Find the function for the call's method
fn := s.methods.find(call.Method)
// Build a new answer/future.
ans := newServerAnswer()
// Create a signalling channel
receivedMsg := make(chan struct{})
// Run the implementation in a separate goroutine.
go func() {
results := NewStruct(call.ObjectSize)
SetMsgReceiveChan(call.Options, receivedMsg)
err := fn(call.Ctx, call.Options, call.Params, results)
if err == nil {
ans.resolve(ImmediateAnswer(results))
} else {
ans.resolve(ErrorAnswer(err))
}
}()
return ans
}
// A typical locally implemented capability function.
func (s *myStream) Write(
ctx context.Context,
opts capnp.CallOptions,
params Writer_write_Params,
results Writer_write_Results) error {
// ... enforce order somehow, maybe by locking ...
// Now signal that we have "received" the message.
MsgReceivedChan(opts) <- struct{}{}
// ... do the write ...
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment