Skip to content

Instantly share code, notes, and snippets.

@manzanit0
Created November 6, 2024 15:11
Show Gist options
  • Save manzanit0/86c4b759b8dffa504e0ba4f91eb0a020 to your computer and use it in GitHub Desktop.
Save manzanit0/86c4b759b8dffa504e0ba4f91eb0a020 to your computer and use it in GitHub Desktop.
Managing NATS streams
const (
streamName = "my-stream"
consumerName = "my-consumer"
)
var subjects = []string{
"subject.foo",
"subject.bar",
}
func main() {
if err := run(context.TODO()); err != nil {
log.Fatal(err.Error())
}
}
func run(ctx context.Context) error {
stream, err := EnsureStream(ctx, streamName, subjects...)
if err != nil {
return fmt.Errorf("ensure stream: %w", err)
}
_, err = EnsureConsumer(ctx, stream, consumerName, subjects...)
if err != nil {
return fmt.Errorf("ensure consumer: %w", err)
}
return nil
}
func EnsureStream(ctx context.Context, streamName string, subjects ...string) (jetstream.Stream, error) {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
return nil, fmt.Errorf("connect to NATS: %v", err)
}
js, err := jetstream.New(nc)
if err != nil {
return nil, fmt.Errorf("new jetstream: %v", err)
}
stream, err := js.CreateStream(ctx, jetstream.StreamConfig{
Name: streamName,
Subjects: subjects,
})
if err != nil {
if !errors.Is(err, jetstream.ErrStreamNameAlreadyInUse) {
return nil, fmt.Errorf("create stream: %w", err)
}
var jsErr error
stream, jsErr = js.UpdateStream(ctx, jetstream.StreamConfig{
Name: streamName,
Subjects: subjects,
})
if jsErr != nil {
return nil, fmt.Errorf("update stream: %w", err)
}
}
return stream, nil
}
func EnsureConsumer(ctx context.Context, stream jetstream.Stream, consumerName string, subjects ...string) (jetstream.Consumer, error) {
consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: consumerName,
FilterSubjects: subjects,
})
if err != nil {
return nil, fmt.Errorf("create or update consumer: %w", err)
}
return consumer, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment