Created
November 6, 2024 15:11
-
-
Save manzanit0/86c4b759b8dffa504e0ba4f91eb0a020 to your computer and use it in GitHub Desktop.
Managing NATS streams
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const ( | |
streamName = "my-stream" | |
consumerName = "my-consumer" | |
) | |
var subjects = []string{ | |
"subject.foo", | |
"subject.bar", | |
} | |
func main() { | |
if err := run(context.TODO()); err != nil { | |
log.Fatal(err.Error()) | |
} | |
} | |
func run(ctx context.Context) error { | |
stream, err := EnsureStream(ctx, streamName, subjects...) | |
if err != nil { | |
return fmt.Errorf("ensure stream: %w", err) | |
} | |
_, err = EnsureConsumer(ctx, stream, consumerName, subjects...) | |
if err != nil { | |
return fmt.Errorf("ensure consumer: %w", err) | |
} | |
return nil | |
} | |
func EnsureStream(ctx context.Context, streamName string, subjects ...string) (jetstream.Stream, error) { | |
nc, err := nats.Connect(nats.DefaultURL) | |
if err != nil { | |
return nil, fmt.Errorf("connect to NATS: %v", err) | |
} | |
js, err := jetstream.New(nc) | |
if err != nil { | |
return nil, fmt.Errorf("new jetstream: %v", err) | |
} | |
stream, err := js.CreateStream(ctx, jetstream.StreamConfig{ | |
Name: streamName, | |
Subjects: subjects, | |
}) | |
if err != nil { | |
if !errors.Is(err, jetstream.ErrStreamNameAlreadyInUse) { | |
return nil, fmt.Errorf("create stream: %w", err) | |
} | |
var jsErr error | |
stream, jsErr = js.UpdateStream(ctx, jetstream.StreamConfig{ | |
Name: streamName, | |
Subjects: subjects, | |
}) | |
if jsErr != nil { | |
return nil, fmt.Errorf("update stream: %w", err) | |
} | |
} | |
return stream, nil | |
} | |
func EnsureConsumer(ctx context.Context, stream jetstream.Stream, consumerName string, subjects ...string) (jetstream.Consumer, error) { | |
consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ | |
Durable: consumerName, | |
FilterSubjects: subjects, | |
}) | |
if err != nil { | |
return nil, fmt.Errorf("create or update consumer: %w", err) | |
} | |
return consumer, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment