Skip to content

Instantly share code, notes, and snippets.

@ripienaar
Created January 3, 2020 17:52
Show Gist options
  • Save ripienaar/01cd30d3529e02b66e1b27541eb91d9c to your computer and use it in GitHub Desktop.
Save ripienaar/01cd30d3529e02b66e1b27541eb91d9c to your computer and use it in GitHub Desktop.
=== RUN TestAckFloor
--- FAIL: TestAckFloor (1.08s)
util_test.go:25: next DISPATCH: nats: timeout
FAIL
Process finished with exit code 1
=== RUN TestAckFloor
--- FAIL: TestAckFloor (0.07s)
jetstreammgmt_test.go:419: new set ORDERS didnt get the ack'd message
FAIL
Process finished with exit code 1
func TestAckFloor(t *testing.T) {
srv, _, jsm := setupJSMTest(t)
defer srv.Shutdown()
// make set ORDERS
err := jsm.MessageSetCreate(&natsd.MsgSetConfig{
Name: "ORDERS",
MaxAge: -1,
MaxBytes: -1,
MaxMsgs: -1,
Retention: natsd.StreamPolicy,
Storage: natsd.FileStorage,
Subjects: []string{"ORDERS.>"},
})
checkErr(t, err, "could not set up ORDERS: %v", err)
sinfo, err := jsm.MessageSetInfo("ORDERS")
checkErr(t, err, "could not get ORDERS: %v", err)
if sinfo.Stats.LastSeq != 0 || sinfo.Stats.FirstSeq != 0 {
t.Fatal("new set ORDERS isn't empty")
}
// make pull based obs DISPATCH
err = jsm.ObservableCreate("ORDERS", &natsd.ObservableConfig{
Durable: "DISPATCH",
DeliverAll: true,
AckPolicy: natsd.AckExplicit,
AckWait: 10 * time.Second,
Subject: "ORDERS.processed",
ReplayPolicy: natsd.ReplayInstant,
})
checkErr(t, err, "could not set up DISPATCH: %v", err)
// observable is on the right place
oinfo, err := jsm.ObservableInfo("ORDERS", "DISPATCH")
checkErr(t, err, "could get DISPATCH info: %v", err)
if oinfo.State.Delivered.ObsSeq != 1 || oinfo.State.Delivered.SetSeq != 1 {
t.Fatalf("expected ObsSeq=1 SetSeq=1, %#v", oinfo.State.Delivered)
}
if oinfo.State.AckFloor.ObsSeq != 0 || oinfo.State.AckFloor.SetSeq != 0 {
t.Fatalf("expected ObsSeq=1 SetSeq=1, %#v", oinfo.State.AckFloor)
}
// publish to ORDERS with ack
_, err = jsm.Nats().Request("ORDERS.processed", []byte("order 1"), 2*time.Second)
checkErr(t, err, "publish failed: %v", err)
// did it get it?
sinfo, err = jsm.MessageSetInfo("ORDERS")
checkErr(t, err, "could not get ORDERS: %v", err)
if sinfo.Stats.Msgs != 1 || sinfo.Stats.LastSeq != 1 || sinfo.Stats.FirstSeq != 1 {
t.Fatal("new set ORDERS didnt get the ack'd message")
}
// pull from it and ack it
msg, err := jsm.ObservableNext("ORDERS", "DISPATCH")
checkErr(t, err, "next DISPATCH: %v", err)
msg.Respond(nil)
if string(msg.Data) != "order 1" {
t.Fatalf("got wrong message: %s", string(msg.Data))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment