Last active
August 28, 2021 11:02
-
-
Save akashgoswami/7e0d69e95e160f1592de8ee0d94268af to your computer and use it in GitHub Desktop.
A simple NATS.io jetstream publish/subscribe example in nats.js
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { connect, JSONCodec, AckPolicy } = require("nats"); | |
async function main(count = 1000){ | |
const nc = await connect(['nats://localhost:4222']); | |
// create a jetstream client: | |
const jsm = await nc.jetstreamManager(); | |
console.log("JSM connected"); | |
await jsm.consumers.add("results", { | |
durable_name: "resultService", | |
ack_policy: AckPolicy.Explicit, | |
}); | |
// list all consumers for a stream: | |
const consumers = await jsm.consumers.list('results').next(); | |
consumers.forEach((ci) => { | |
console.log("Consumer name", ci.name); | |
}); | |
const js = nc.jetstream(); | |
const jc = new JSONCodec(); | |
// to publish messages to a stream: | |
for (var i = 0; i< count; i++){ | |
let msg = await js.fetch("results", "resultService", { batch: 10, expires: 1000 }); | |
for await (const m of msg) { | |
console.log(jc.decode(m.data)) | |
m.ack(); | |
} | |
} | |
} | |
try{ | |
main(); | |
} | |
catch(e){ | |
console.error(e); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { connect, JSONCodec } = require("nats"); | |
async function main(count = 100){ | |
const nc = await connect(['nats://localhost:4222']); | |
console.log("NATS connected"); | |
const jsm = await nc.jetstreamManager(); | |
// Delete an existing stream if required | |
//await jsm.streams.delete("results"); | |
await jsm.streams.add({ name: "results", subjects: ["results.topic.io"] }); | |
// create a jetstream client: | |
const js = nc.jetstream(); | |
const jc = new JSONCodec(); | |
// to publish messages to a stream: | |
for (var i = 0; i< count; i++){ | |
let pa = await js.publish("results.topic.io", jc.encode({ hello: "world", index: i})); | |
//Alternatively you could also do a simple publish via nats | |
//await nc.publish("results.iotify.io", jc.encode({ hello: "world", index: i})); | |
} | |
console.log("Published", count); | |
} | |
try{ | |
main(process.argv[2]); | |
} | |
catch(e){ | |
console.error(e); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment