Last active
July 24, 2020 18:16
-
-
Save ripienaar/c32db467923d7942cf5205226872500a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ nats -s demo.nats.io req 'registry.detect_type' '{ | |
"schema": "io.nats.jetstream.advisory.v1.api_audit", | |
"id": "uafvZ1UEDIW5FZV6kvLgWA", | |
"timestamp": "2020-04-23T16:51:18.516363Z", | |
"server": "NDJWE4SOUJOJT2TY5Y2YQEOAHGAK5VIGXTGKWJSFHVCII4ITI3LBHBUV", | |
"client": { | |
"host": "::1", | |
"port": 57924, | |
"cid": 17, | |
"account": "$G", | |
"name": "NATS CLI", | |
"lang": "go", | |
"version": "1.9.2" | |
}, | |
"subject": "$JS.STREAM.LIST", | |
"response": "[\n \"ORDERS\"\n]" | |
}' | |
16:22:38 Sending request on [registry.detect_type] | |
16:22:39 Received on [_INBOX.fbXUVBt2RTBF4WWYxH1ms2]: '{ | |
"address": "https://raw.githubusercontent.com/nats-io/jetstream/master/schemas/jetstream/advisory/v1/api_audit.json", | |
"type": "io.nats.jetstream.advisory.v1.api_audit" | |
}' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(code for this not shown) | |
% nats -s demo.nats.io req 'registry.generate.ruby.io.nats.server.advisory.v1.client_disconnect' '' --raw|jq -r .code | |
# This code may look unusually verbose for Ruby (and it is), but | |
# it performs some subtle and complex validation of JSON data. | |
# | |
# To parse this JSON, add 'dry-struct' and 'dry-types' gems, then do: | |
# | |
# the560028504 = The560028504.from_json! "{…}" | |
# puts the560028504.server.cluster | |
# | |
# If from_json! succeeds, the value returned matches the schema. | |
require 'json' | |
require 'dry-types' | |
require 'dry-struct' | |
module Types | |
include Dry::Types.module | |
Int = Strict::Int | |
Bool = Strict::Bool | |
Hash = Strict::Hash | |
String = Strict::String | |
Type = Strict::String.enum("io.nats.server.advisory.v1.client_disconnect") | |
end | |
# Details about the client that connected to the server | |
class Client < Dry::Struct | |
# The remote host the client is connected from | |
attribute :host, Types::String.optional | |
# The programming language library in use by the client | |
attribute :lang, Types::String.optional | |
# The name presented by the client during connection | |
attribute :client_name, Types::String.optional | |
# The last known latency between the NATS Server and the Client | |
attribute :rtt, Types::String.optional | |
# Timestamp when the client connected | |
attribute :start, Types::String.optional | |
# Timestamp when the client disconnected | |
attribute :stop, Types::String.optional | |
# The clients username | |
attribute :user, Types::String.optional | |
# The version of the client library in use | |
attribute :ver, Types::String.optional | |
attribute :id, Types::Any | |
attribute :acc, Types::Any | |
def self.from_dynamic!(d) | |
d = Types::Hash[d] | |
new( | |
..... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ nats -s demo.nats.io req 'registry.schema.io.nats.server.advisory.v1.client_disconnect' '' | |
16:12:51 Sending request on [registry.schema.io.nats.server.advisory.v1.client_disconnect] | |
16:12:51 Received on [_INBOX.HbMId2R0Q75ZnKMzwlKVuq]: '{ | |
"schema": { | |
"$schema": "http://json-schema.org/draft-07/schema#", | |
"$id": "https://nats.io/schemas/server/advisory/v1/client_disconnect.json", | |
"description": "Advisory published a client disconnects to the NATS Server", | |
"title": "io.nats.server.advisory.v1.client_disconnect", | |
"type": "object", | |
..... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ nats -s demo.nats.io req 'registry.url.io.nats.server.advisory.v1.client_disconnect' '' | |
16:13:42 Sending request on [registry.url.io.nats.server.advisory.v1.client_disconnect] | |
16:13:42 Received on [_INBOX.Cw62OYxvWuppWcLb9a0Cu1]: '{ | |
"url": "https://raw.githubusercontent.com/nats-io/jetstream/master/schemas/server/advisory/v1/client_disconnect.json" | |
}' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ nats -s demo.nats.io req 'registry.validate' '{ | |
"type": "io.nats.jetstream.advisory.v1.api_audit" | |
}' | |
16:21:05 Sending request on [registry.validate] | |
16:21:06 Received on [_INBOX.wTm0XBkT3HhwBHUVubeZQK]: '{ | |
"errors": [ | |
"timestamp: Does not match format 'date-time'", | |
"server: String length must be greater than or equal to 1", | |
"subject: String length must be greater than or equal to 1" | |
] | |
}' | |
$ nats -s demo.nats.io req 'registry.validate' '{ | |
"type": "io.nats.jetstream.advisory.v1.api_audit", | |
"id": "uafvZ1UEDIW5FZV6kvLgWA", | |
"timestamp": "2020-04-23T16:51:18.516363Z", | |
"server": "NDJWE4SOUJOJT2TY5Y2YQEOAHGAK5VIGXTGKWJSFHVCII4ITI3LBHBUV", | |
"client": { | |
"host": "::1", | |
"port": 57924, | |
"cid": 17, | |
"account": "$G", | |
"name": "NATS CLI", | |
"lang": "go", | |
"version": "1.9.2" | |
}, | |
"subject": "$JS.STREAM.LIST", | |
"response": "[\n \"ORDERS\"\n]" | |
}' | |
16:20:21 Sending request on [registry.validate] | |
16:20:21 Received on [_INBOX.Ysl7f03tdqSyswRFteAE16]: '{ | |
"valid": true | |
}' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"encoding/json" | |
"fmt" | |
"log" | |
"os" | |
"strings" | |
"time" | |
"github.com/nats-io/jsm.go/api" | |
"github.com/nats-io/nats.go" | |
"gopkg.in/alecthomas/kingpin.v2" | |
) | |
var ( | |
servers string | |
credentials string | |
ns string | |
nc *nats.Conn | |
) | |
func validateHandler(m *nats.Msg) { | |
kind, event, err := api.ParseEvent(m.Data) | |
if err != nil { | |
sendResponse(m, map[string]string{"error": fmt.Sprintf("could not parse event: %s", err)}) | |
return | |
} | |
ok, errs := api.ValidateStruct(event, kind) | |
if ok { | |
sendResponse(m, map[string]bool{"valid": true}) | |
return | |
} | |
sendResponse(m, map[string][]string{"errors": errs}) | |
} | |
func urlHandler(m *nats.Msg) { | |
st := strings.TrimPrefix(m.Subject, ns+".url.") | |
if !api.IsNatsEventType(st) { | |
sendResponse(m, map[string]string{"error": fmt.Sprintf("%q is not a valid NATS schema type", st)}) | |
return | |
} | |
address, _, err := api.SchemaURLForType(st) | |
if err != nil { | |
sendResponse(m, map[string]string{"error": fmt.Sprintf("could not retrieve schema url for %q: %s", st, err)}) | |
return | |
} | |
sendResponse(m, map[string]string{"url": address}) | |
} | |
func typeDetectHandler(m *nats.Msg) { | |
kind, err := api.SchemaTypeForEvent(m.Data) | |
if err != nil { | |
sendResponse(m, map[string]string{"error": fmt.Sprintf("could not detect event type: %s", err)}) | |
} | |
address, _, err := api.SchemaURLForType(kind) | |
if err != nil { | |
sendResponse(m, map[string]string{"error": fmt.Sprintf("could not detect event type: %s", err)}) | |
} | |
sendResponse(m, map[string]string{"type": kind, "address": address}) | |
} | |
func schemaHandler(m *nats.Msg) { | |
st := strings.TrimPrefix(m.Subject, ns+".schema.") | |
if !api.IsNatsEventType(st) { | |
sendResponse(m, map[string]string{"error": fmt.Sprintf("%q is not a valid NATS schema type", st)}) | |
return | |
} | |
schema, err := api.Schema(st) | |
if err != nil { | |
sendResponse(m, map[string]string{"error": fmt.Sprintf("could not retrieve schema for %q: %s", st, err)}) | |
return | |
} | |
sendResponse(m, map[string]json.RawMessage{"schema": schema}) | |
} | |
func main() { | |
app := kingpin.New("registry", "NATS Service Registry") | |
app.Flag("server", "NATS Servers").Default("localhost").StringVar(&servers) | |
app.Flag("creds", "NATS Credentials").StringVar(&credentials) | |
app.Flag("namespace", "Service namespace").Default("registry").StringVar(&ns) | |
kingpin.MustParse(app.Parse(os.Args[1:])) | |
var err error | |
nc, err = connection() | |
if err != nil { | |
panic(err.Error()) | |
} | |
nc.Subscribe(ns+".url.io.nats.>", urlHandler) | |
nc.Subscribe(ns+".schema.io.nats.>", schemaHandler) | |
nc.Subscribe(ns+".detect_type", typeDetectHandler) | |
nc.Subscribe(ns+".validate", validateHandler) | |
<-context.Background().Done() | |
} | |
func sendResponse(m *nats.Msg, r interface{}) error { | |
if m.Reply == "" { | |
return fmt.Errorf("no reply in message") | |
} | |
j, err := json.MarshalIndent(r, "", " ") | |
if err != nil { | |
return fmt.Errorf("could not encode %#v: %s", r, err) | |
} | |
return m.Respond(j) | |
} | |
func connection() (nc *nats.Conn, err error) { | |
if servers == "" { | |
return nil, fmt.Errorf("specify a server to connect to using NATS_URL") | |
} | |
opts := []nats.Option{ | |
nats.MaxReconnects(-1), | |
nats.ErrorHandler(errorHandler), | |
nats.ReconnectHandler(reconnectHandler), | |
nats.DisconnectErrHandler(disconnectHandler), | |
} | |
if credentials != "" { | |
opts = append(opts, nats.UserCredentials(credentials)) | |
} | |
for { | |
nc, err := nats.Connect(servers, opts...) | |
if err == nil { | |
return nc, nil | |
} | |
log.Printf("could not connect to NATS: %s\n", err) | |
time.Sleep(500 * time.Millisecond) | |
} | |
} | |
func errorHandler(nc *nats.Conn, s *nats.Subscription, err error) { | |
if s != nil { | |
log.Fatalf("Error in NATS connection: %s: subscription: %s: %s", nc.ConnectedUrl(), s.Subject, err) | |
} | |
log.Fatalf("Error in NATS connection: %s: %s", nc.ConnectedUrl(), err) | |
} | |
func reconnectHandler(nc *nats.Conn) { | |
log.Printf("Reconnected to %s", nc.ConnectedUrl()) | |
} | |
func disconnectHandler(nc *nats.Conn, err error) { | |
if err != nil { | |
log.Printf("Disconnected from NATS due to error: %v", err) | |
} else { | |
log.Printf("Disconnected from NATS") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment