Skip to content

Instantly share code, notes, and snippets.

@Davidaredding
Created January 15, 2019 06:11
Show Gist options
  • Save Davidaredding/ab7ee80842ade7e94891e28e6bf79a71 to your computer and use it in GitHub Desktop.
Save Davidaredding/ab7ee80842ade7e94891e28e6bf79a71 to your computer and use it in GitHub Desktop.
Simple REDIS to Websocket via Kestral and C#
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using StackExchange.Redis;
using System;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
namespace RedisWebSocketFacad
{
class Program
{
static void Main(string[] args)
{
var host = WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>()
.Build();
host.Run();
}
}
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<IConnectionMultiplexer>(s => ConnectionMultiplexer.Connect("localhost"));
}
public void Configure(IApplicationBuilder app)
{
app.UseWebSockets();
app.Use(async (context, n) =>
{
if (context.WebSockets.IsWebSocketRequest)
{
WebSocket webSocket = await context.WebSockets.AcceptWebSocketAsync();
var channel = context.Request.Path.ToString().TrimStart('/').Replace('/', ':');
Console.WriteLine(channel);
var redis = new RedisInputServer(
app.ApplicationServices.GetService<IConnectionMultiplexer>(),
webSocket, channel);
await redis.Connect();
if (webSocket.CloseStatus.HasValue)
Console.WriteLine($"Socket closed by client: {webSocket.CloseStatusDescription}");
else
await webSocket.CloseAsync(WebSocketCloseStatus.Empty, "Dead or abandoned socket", CancellationToken.None);
Console.WriteLine($"Socket {channel} Finalized");
}
else
{
await n();
}
});
}
}
public class RedisInputServer
{
IConnectionMultiplexer _redis;
ISubscriber _subscriber;
WebSocket _socket;
string _path;
public RedisInputServer(IConnectionMultiplexer redis, WebSocket socket, string Path)
{
_redis = redis;
_path = Path;
_socket = socket;
}
public async Task Connect()
{
_subscriber = _redis.GetSubscriber();
var wsToPub = WsToPub();
var subToWs = SubToWs();
Task.WaitAll(wsToPub, subToWs);
}
private async Task WsToPub()
{
while (true)
{
if (_socket.CloseStatus.HasValue)
return;
var aseg = new ArraySegment<byte>(new byte[1024 * 4], 0, 1024 * 4);
await _socket.ReceiveAsync(aseg, CancellationToken.None);
var msg = System.Text.Encoding.Default.GetString(aseg.Array).Trim('\0');
_subscriber.Publish(_path, msg);
}
}
private async Task SubToWs()
{
await _subscriber.SubscribeAsync(_path, async (c, v) =>
{
if (_socket.CloseStatus.HasValue)
return;
var buffer = System.Text.Encoding.Default.GetBytes((string)v);
var aseg = new ArraySegment<byte>(buffer, 0, buffer.Length);
await _socket.SendAsync(aseg, WebSocketMessageType.Text, true, CancellationToken.None);
});
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment