Created
January 15, 2019 06:11
-
-
Save Davidaredding/ab7ee80842ade7e94891e28e6bf79a71 to your computer and use it in GitHub Desktop.
Simple REDIS to Websocket via Kestral and C#
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Microsoft.AspNetCore; | |
using Microsoft.AspNetCore.Builder; | |
using Microsoft.AspNetCore.Hosting; | |
using Microsoft.Extensions.DependencyInjection; | |
using StackExchange.Redis; | |
using System; | |
using System.Net.WebSockets; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace RedisWebSocketFacad | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
var host = WebHost.CreateDefaultBuilder(args) | |
.UseStartup<Startup>() | |
.Build(); | |
host.Run(); | |
} | |
} | |
public class Startup | |
{ | |
public void ConfigureServices(IServiceCollection services) | |
{ | |
services.AddSingleton<IConnectionMultiplexer>(s => ConnectionMultiplexer.Connect("localhost")); | |
} | |
public void Configure(IApplicationBuilder app) | |
{ | |
app.UseWebSockets(); | |
app.Use(async (context, n) => | |
{ | |
if (context.WebSockets.IsWebSocketRequest) | |
{ | |
WebSocket webSocket = await context.WebSockets.AcceptWebSocketAsync(); | |
var channel = context.Request.Path.ToString().TrimStart('/').Replace('/', ':'); | |
Console.WriteLine(channel); | |
var redis = new RedisInputServer( | |
app.ApplicationServices.GetService<IConnectionMultiplexer>(), | |
webSocket, channel); | |
await redis.Connect(); | |
if (webSocket.CloseStatus.HasValue) | |
Console.WriteLine($"Socket closed by client: {webSocket.CloseStatusDescription}"); | |
else | |
await webSocket.CloseAsync(WebSocketCloseStatus.Empty, "Dead or abandoned socket", CancellationToken.None); | |
Console.WriteLine($"Socket {channel} Finalized"); | |
} | |
else | |
{ | |
await n(); | |
} | |
}); | |
} | |
} | |
public class RedisInputServer | |
{ | |
IConnectionMultiplexer _redis; | |
ISubscriber _subscriber; | |
WebSocket _socket; | |
string _path; | |
public RedisInputServer(IConnectionMultiplexer redis, WebSocket socket, string Path) | |
{ | |
_redis = redis; | |
_path = Path; | |
_socket = socket; | |
} | |
public async Task Connect() | |
{ | |
_subscriber = _redis.GetSubscriber(); | |
var wsToPub = WsToPub(); | |
var subToWs = SubToWs(); | |
Task.WaitAll(wsToPub, subToWs); | |
} | |
private async Task WsToPub() | |
{ | |
while (true) | |
{ | |
if (_socket.CloseStatus.HasValue) | |
return; | |
var aseg = new ArraySegment<byte>(new byte[1024 * 4], 0, 1024 * 4); | |
await _socket.ReceiveAsync(aseg, CancellationToken.None); | |
var msg = System.Text.Encoding.Default.GetString(aseg.Array).Trim('\0'); | |
_subscriber.Publish(_path, msg); | |
} | |
} | |
private async Task SubToWs() | |
{ | |
await _subscriber.SubscribeAsync(_path, async (c, v) => | |
{ | |
if (_socket.CloseStatus.HasValue) | |
return; | |
var buffer = System.Text.Encoding.Default.GetBytes((string)v); | |
var aseg = new ArraySegment<byte>(buffer, 0, buffer.Length); | |
await _socket.SendAsync(aseg, WebSocketMessageType.Text, true, CancellationToken.None); | |
}); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment