Skip to content

Instantly share code, notes, and snippets.

@itn3000
Created May 17, 2017 04:05
Show Gist options
  • Save itn3000/f34bdae8bcf04f6b885d59f92506dd88 to your computer and use it in GitHub Desktop.
Save itn3000/f34bdae8bcf04f6b885d59f92506dd88 to your computer and use it in GitHub Desktop.
nats-streaming subscribe test
namespace StanTest
{
using System;
using STAN.Client;
using System.Threading.Tasks;
using System.Threading;
using System.Text;
static class Tests
{
public static async Task CreateProducerTask(StanConnectionFactory cf, StanOptions opts)
{
using (var c = cf.CreateConnection("test-cluster", "stantest-producer", opts))
{
for (int i = 0; i < 1000; i++)
{
await c.PublishAsync("foo", Encoding.UTF8.GetBytes($"x{i}")).ConfigureAwait(false);
await Task.Delay(10).ConfigureAwait(false);
}
}
Console.WriteLine($"producer done");
}
public static async Task CreateConsumerTask(StanConnectionFactory cf, StanOptions opts, CancellationToken ct)
{
var subopts = StanSubscriptionOptions.GetDefaultOptions();
// set DurableName for resuming.
subopts.DurableName = "standurable";
while (!ct.IsCancellationRequested)
{
using (var c = cf.CreateConnection("test-cluster", "stantest-consumer", opts))
{
using (var sub = c.Subscribe("foo", subopts, (sender, ev) =>
{
Console.WriteLine($"recv:{Encoding.UTF8.GetString(ev.Message.Data)}");
}))
{
await Task.Delay(1000).ConfigureAwait(false);
// sub.Close();
}
}
// simulate reboot service
Console.WriteLine($"rebooting subscription");
await Task.Delay(1000).ConfigureAwait(false);
Console.WriteLine($"rebooting subscription done");
}
}
public static async Task DoTest(string natsUrl)
{
var cf = new StanConnectionFactory();
var opts = StanOptions.GetDefaultOptions();
opts.NatsURL = natsUrl;
using (var cts = new CancellationTokenSource())
{
await Task.WhenAll(
Task.Run(async () =>
{
await CreateProducerTask(cf, opts).ConfigureAwait(false);
cts.Cancel();
})
,
CreateConsumerTask(cf, opts, cts.Token)
).ConfigureAwait(false);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment