Skip to content

Instantly share code, notes, and snippets.

@akashgoswami
Last active August 28, 2021 11:02
Show Gist options
  • Save akashgoswami/7e0d69e95e160f1592de8ee0d94268af to your computer and use it in GitHub Desktop.
Save akashgoswami/7e0d69e95e160f1592de8ee0d94268af to your computer and use it in GitHub Desktop.
A simple NATS.io jetstream publish/subscribe example in nats.js
const { connect, JSONCodec, AckPolicy } = require("nats");
async function main(count = 1000){
const nc = await connect(['nats://localhost:4222']);
// create a jetstream client:
const jsm = await nc.jetstreamManager();
console.log("JSM connected");
await jsm.consumers.add("results", {
durable_name: "resultService",
ack_policy: AckPolicy.Explicit,
});
// list all consumers for a stream:
const consumers = await jsm.consumers.list('results').next();
consumers.forEach((ci) => {
console.log("Consumer name", ci.name);
});
const js = nc.jetstream();
const jc = new JSONCodec();
// to publish messages to a stream:
for (var i = 0; i< count; i++){
let msg = await js.fetch("results", "resultService", { batch: 10, expires: 1000 });
for await (const m of msg) {
console.log(jc.decode(m.data))
m.ack();
}
}
}
try{
main();
}
catch(e){
console.error(e);
}
const { connect, JSONCodec } = require("nats");
async function main(count = 100){
const nc = await connect(['nats://localhost:4222']);
console.log("NATS connected");
const jsm = await nc.jetstreamManager();
// Delete an existing stream if required
//await jsm.streams.delete("results");
await jsm.streams.add({ name: "results", subjects: ["results.topic.io"] });
// create a jetstream client:
const js = nc.jetstream();
const jc = new JSONCodec();
// to publish messages to a stream:
for (var i = 0; i< count; i++){
let pa = await js.publish("results.topic.io", jc.encode({ hello: "world", index: i}));
//Alternatively you could also do a simple publish via nats
//await nc.publish("results.iotify.io", jc.encode({ hello: "world", index: i}));
}
console.log("Published", count);
}
try{
main(process.argv[2]);
}
catch(e){
console.error(e);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment