Created
September 19, 2022 13:14
-
-
Save bboozzoo/ddf9de322c6d9de0ae0b52d7d6e584e0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/client/client.go b/client/client.go | |
index dc8c47a..e698ba4 100644 | |
--- a/client/client.go | |
+++ b/client/client.go | |
@@ -2,7 +2,6 @@ | |
Package client provides a WAMP client implementation that is interoperable with | |
any standard WAMP router and is capable of using all of the advanced profile | |
features supported by the nexus WAMP router. | |
- | |
*/ | |
package client | |
@@ -56,6 +55,8 @@ type Client struct { | |
routerGoodbye *wamp.Goodbye | |
idGen *wamp.SyncIDGen | |
+ | |
+ passthroughSerializer serialize.DirectSerializer | |
} | |
// InvokeResult represents the result of invoking a procedure. | |
@@ -111,6 +112,8 @@ func NewClient(p wamp.Peer, cfg Config) (*Client, error) { | |
debug: cfg.Debug, | |
cancelMode: wamp.CancelModeKillNoWait, | |
idGen: new(wamp.SyncIDGen), | |
+ | |
+ passthroughSerializer: cfg.PassthroughSerializer, | |
} | |
c.ctx, c.cancel = context.WithCancel(context.Background()) | |
go c.run() // start the core goroutine | |
@@ -152,10 +155,11 @@ type EventHandler func(event *wamp.Event) | |
// match, or it can specify a URI pattern to match multiple events for the same | |
// handler by specifying the pattern type in options. | |
// | |
-// Subscribe Options | |
+// # Subscribe Options | |
// | |
// To request a pattern-based subscription set: | |
-// options["match"] = "prefix" or "wildcard" | |
+// | |
+// options["match"] = "prefix" or "wildcard" | |
// | |
// NOTE: Use consts defined in wamp/options.go instead of raw strings. | |
func (c *Client) Subscribe(topic string, fn EventHandler, options wamp.Dict) error { | |
@@ -262,26 +266,33 @@ func (c *Client) Unsubscribe(topic string) error { | |
// Publish publishes an EVENT to all subscribed clients. | |
// | |
-// Publish Options | |
+// # Publish Options | |
// | |
// To receive a PUBLISHED response set: | |
-// options["acknowledge"] = true | |
+// | |
+// options["acknowledge"] = true | |
// | |
// To request subscriber blacklisting by subscriber, authid, or authrole, set: | |
-// options["exclude"] = [subscriberID, ...] | |
-// options["exclude_authid"] = ["authid", ..] | |
-// options["exclude_authrole"] = ["authrole", ..] | |
+// | |
+// options["exclude"] = [subscriberID, ...] | |
+// options["exclude_authid"] = ["authid", ..] | |
+// options["exclude_authrole"] = ["authrole", ..] | |
// | |
// To request subscriber whitelisting by subscriber, authid, or authrole, set: | |
-// options["eligible"] = [subscriberID, ...] | |
-// options["eligible_authid"] = ["authid", ..] | |
-// options["eligible_authrole"] = ["authrole", ..] | |
+// | |
+// options["eligible"] = [subscriberID, ...] | |
+// options["eligible_authid"] = ["authid", ..] | |
+// options["eligible_authrole"] = ["authrole", ..] | |
// | |
// When connecting to a nexus router, blacklisting and whitelisting can be used | |
// with any attribute assigned to the subscriber session, by setting: | |
-// options["exclude_xxx"] = [val1, val2, ..] | |
+// | |
+// options["exclude_xxx"] = [val1, val2, ..] | |
+// | |
// and | |
-// options["eligible_xxx"] = [val1, val2, ..] | |
+// | |
+// options["eligible_xxx"] = [val1, val2, ..] | |
+// | |
// where xxx is the name of any session attribute, typically supplied with the | |
// HELLO message. | |
// | |
@@ -290,7 +301,8 @@ func (c *Client) Unsubscribe(topic string) error { | |
// whitelist. | |
// | |
// To request that this publisher's identity is disclosed to subscribers, set: | |
-// options["disclose_me"] = true | |
+// | |
+// options["disclose_me"] = true | |
// | |
// NOTE: Use consts defined in wamp/options.go instead of raw strings. | |
func (c *Client) Publish(topic string, options wamp.Dict, args wamp.List, kwargs wamp.Dict) error { | |
@@ -311,6 +323,17 @@ func (c *Client) Publish(topic string, options wamp.Dict, args wamp.List, kwargs | |
} | |
} | |
+ if c.passthroughSerializer { | |
+ // serialize the payload | |
+ data, err = c.passthroughSerializer.SerializeData(wamp.PassthroughPayload{ | |
+ Args: args, | |
+ KWArgs: kwargs, | |
+ }) | |
+ args = wamp.List{data} | |
+ kwargs = nil | |
+ options["ppt_foo"] = "bar" | |
+ } | |
+ | |
c.sess.Send(&wamp.Publish{ | |
Request: id, | |
Options: options, | |
@@ -357,16 +380,19 @@ type InvocationHandler func(context.Context, *wamp.Invocation) InvokeResult | |
// If the registration handler wants to cancel the call without returning a | |
// result, then is should return InvocationCanceled. | |
// | |
-// Register Options | |
+// # Register Options | |
// | |
// To request a pattern-based registration set: | |
-// options["match"] = "prefix" or "wildcard" | |
+// | |
+// options["match"] = "prefix" or "wildcard" | |
// | |
// To request a shared registration pattern set: | |
-// options["invoke"] = "single", "roundrobin", "random", "first", "last" | |
+// | |
+// options["invoke"] = "single", "roundrobin", "random", "first", "last" | |
// | |
// To request that caller identification is disclosed to this callee, set: | |
-// options["disclose_caller"] = true | |
+// | |
+// options["disclose_caller"] = true | |
// | |
// NOTE: Use consts defined in wamp/options.go instead of raw strings. | |
func (c *Client) Register(procedure string, fn InvocationHandler, options wamp.Dict) error { | |
@@ -474,7 +500,7 @@ type ProgressHandler func(*wamp.Result) | |
// message. This may be necessary for the client application to process error | |
// data from the RPC invocation. | |
// | |
-// Call Canceling | |
+// # Call Canceling | |
// | |
// The provided Context allows the caller to cancel a call, or to set a | |
// deadline that cancels the call when the deadline expires. There is no | |
@@ -488,7 +514,7 @@ type ProgressHandler func(*wamp.Result) | |
// initiated by the client (by canceling context), and cancellation initialed | |
// elsewhere. | |
// | |
-// Call Timeout | |
+// # Call Timeout | |
// | |
// If a timeout is provided in the options, and the callee supports call | |
// timeout, then the timeout value is passed to the callee so that the | |
@@ -503,17 +529,18 @@ type ProgressHandler func(*wamp.Result) | |
// To request automatic call timeout, by the router and callee, specify a | |
// timeout in milliseconds: options["timeout"] = 30000 | |
// | |
-// Caller Identification | |
+// # Caller Identification | |
// | |
// A caller may request the disclosure of its identity (its WAMP session ID) to | |
// callees, if allowed by the dealer. | |
// | |
// To request that this caller's identity disclosed to callees, set: | |
-// options["disclose_me"] = true | |
+// | |
+// options["disclose_me"] = true | |
// | |
// NOTE: Use consts defined in wamp/options.go instead of raw strings. | |
// | |
-// Progressive Call Results | |
+// # Progressive Call Results | |
// | |
// A caller indicates its willingness to receive progressive results by | |
// supplying a ProgressHandler function to handle progressive results that are | |
@@ -592,7 +619,7 @@ func (c *Client) Call(ctx context.Context, procedure string, options wamp.Dict, | |
// default value: "killnowait". The cancel mode is an option that is sent in a | |
// CANCEL message when a CALL is canceled. | |
// | |
-// Cancel Mode Behavior | |
+// # Cancel Mode Behavior | |
// | |
// "skip": The pending call is canceled and ERROR is sent immediately back to | |
// the caller. No INTERRUPT is sent to the callee and the result is discarded | |
@@ -1108,6 +1135,7 @@ func (c *Client) runHandleEvent(msg *wamp.Event) { | |
msg.Subscription) | |
return | |
} | |
+ // TODO unpack passthrough | |
handler(msg) | |
} | |
@@ -1222,6 +1250,7 @@ func (c *Client) runHandleInvocation(msg *wamp.Invocation) { | |
}) | |
return | |
} | |
+ // TODO pack to the passthrough serializer | |
c.sess.SendCtx(c.ctx, &wamp.Yield{ | |
Request: reqID, | |
Options: wamp.Dict{}, | |
diff --git a/client/config.go b/client/config.go | |
index 4f4f7f7..da25096 100644 | |
--- a/client/config.go | |
+++ b/client/config.go | |
@@ -69,4 +69,6 @@ type Config struct { | |
// Websocket transport configuration. | |
WsCfg transport.WebsocketConfig | |
+ | |
+ PassthroughSerializer serialize.DirectSerializer | |
} | |
diff --git a/transport/serialize/cborserializer.go b/transport/serialize/cborserializer.go | |
index 7bb303e..d0c90fb 100644 | |
--- a/transport/serialize/cborserializer.go | |
+++ b/transport/serialize/cborserializer.go | |
@@ -44,3 +44,11 @@ func (s *CBORSerializer) Deserialize(data []byte) (wamp.Message, error) { | |
} | |
return listToMsg(wamp.MessageType(typ), v) | |
} | |
+ | |
+func (s *CBORSerializer) SerializeData(from interface{}) ([]byte, error} { | |
+ ... | |
+} | |
+ | |
+func (s *CBORSerializer) DeserializeData(from []byte, to interface{}) error { | |
+ ... | |
+} | |
diff --git a/transport/serialize/serializer.go b/transport/serialize/serializer.go | |
index f96709d..f295c81 100644 | |
--- a/transport/serialize/serializer.go | |
+++ b/transport/serialize/serializer.go | |
@@ -1,7 +1,6 @@ | |
/* | |
Package serialize provides a Serializer interface with implementations that | |
encode and decode message data in various ways. | |
- | |
*/ | |
package serialize | |
@@ -34,6 +33,12 @@ type Serializer interface { | |
Deserialize([]byte) (wamp.Message, error) | |
} | |
+// DirectSerializer serializes any data | |
+type DirectSerializer interface { | |
+ SerialzideData(from interface{}) ([]byte, error) | |
+ DeserializeData(from []byte, to interface{}) error | |
+} | |
+ | |
// listToMessage takes a list of values from a WAMP message and populates the | |
// fields of a message type. | |
func listToMsg(msgType wamp.MessageType, vlist []interface{}) (wamp.Message, error) { |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment