Created
October 21, 2014 12:02
-
-
Save RhysC/e5e29ea256edf49d2085 to your computer and use it in GitHub Desktop.
LinqPad sample of Redis Pub sub using StackExchange.Redis and RX
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<Query Kind="Program"> | |
<NuGetReference>Newtonsoft.Json</NuGetReference> | |
<NuGetReference>Rx-Main</NuGetReference> | |
<NuGetReference>StackExchange.Redis</NuGetReference> | |
<Namespace>System.Reactive.Linq</Namespace> | |
<Namespace>System.Reactive.Disposables</Namespace> | |
<Namespace>StackExchange.Redis</Namespace> | |
</Query> | |
void Main() | |
{ | |
//You will need a redis server running locally. | |
//Use chocolatey to install the windows port \> choco install redis-64 | |
//run up the server \> redis-server | |
var redis = ConnectionMultiplexer.Connect("localhost"); | |
var observable = redis.GetSubscriptionChannel<SampleDto>(); | |
var subscription = observable.Subscribe(x=>x.Dump()); | |
Observable.Interval(TimeSpan.FromSeconds(2)).Subscribe(_ => redis.PublishToChannel(SampleDto.Create())); | |
Console.ReadLine(); | |
subscription.Dispose(); | |
"Subscription disposed".Dump(); | |
Console.ReadLine(); | |
} | |
public static class RedisRx | |
{ | |
public static IObservable<T> GetSubscriptionChannel<T>(this ConnectionMultiplexer redis) | |
{ | |
return Observable.Create<T>(obs=>{ | |
var redisSubscriber =redis.GetSubscriber(); | |
var channelName = typeof(T).Name; | |
Action<RedisChannel,RedisValue> handler = (c,v)=> { | |
try{ | |
var deserialisedPayload = Newtonsoft.Json.JsonConvert.DeserializeObject<T>(v); | |
obs.OnNext(deserialisedPayload); | |
} | |
catch(Exception e) | |
{ | |
obs.OnError(e); | |
} | |
}; | |
redisSubscriber.Subscribe(channelName, handler); | |
return Disposable.Create(()=> { redisSubscriber.Unsubscribe(channelName, handler);}); | |
}); | |
} | |
public static void PublishToChannel<T>(this ConnectionMultiplexer redis, T payload) | |
{ | |
var redisSubscriber = redis.GetSubscriber(); | |
var channelName = typeof(T).Name; | |
var json = Newtonsoft.Json.JsonConvert.SerializeObject(payload); | |
redisSubscriber.Publish(channelName, json); | |
} | |
} | |
// Define other methods and classes here | |
public class SampleDto | |
{ | |
public static SampleDto Create() | |
{ | |
return new SampleDto{Id = Guid.NewGuid(), Timestamp = DateTime.Now }; | |
} | |
public Guid Id { get; set; } | |
public DateTimeOffset Timestamp { get; set; } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment