Last active
May 1, 2020 20:41
-
-
Save ahancock1/e6111e6bc2fd6177b8c1e8f2b45699a9 to your computer and use it in GitHub Desktop.
Simple ClientWebSocket implementation C#
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public interface ISocket : IDisposable | |
{ | |
event Action<Exception> OnError; | |
event Action<dynamic> OnData; | |
event Action OnDisconnected; | |
event Action OnConnected; | |
Task<bool> ConnectAsync(CancellationToken token = default); | |
Task CloseAsync(); | |
void Send(string data); | |
} | |
public class Socket : Disposable, ISocket | |
{ | |
private readonly ILogger _logger = Logger.Create<Socket>(); | |
private readonly string _endpoint; | |
private readonly Channel<string> _messages = Channel.CreateUnbounded<string>( | |
new UnboundedChannelOptions | |
{ | |
SingleReader = true, | |
SingleWriter = false | |
}); | |
private readonly SocketOptions _options; | |
public event Action<Exception> OnError; | |
public event Action<dynamic> OnData; | |
public event Action OnDisconnected; | |
public event Action OnConnected; | |
private ClientWebSocket _socket; | |
private readonly SemaphoreSlim _sync = new SemaphoreSlim(1, 1); | |
private CancellationTokenSource _source; | |
private CancellationTokenSource _linked; | |
public Socket(string endpoint, IOptions<SocketOptions> options) | |
{ | |
_endpoint = endpoint; | |
_options = options.Value; | |
} | |
public bool IsConnected => _socket?.State == WebSocketState.Open; | |
public async Task<bool> ConnectAsync(CancellationToken token = default) | |
{ | |
ThrowIfDisposed(nameof(Socket)); | |
await _sync.WaitAsync(token); | |
try | |
{ | |
if (IsConnected) | |
{ | |
return IsConnected; | |
} | |
if (_socket?.State != WebSocketState.None) | |
{ | |
_socket = new ClientWebSocket | |
{ | |
Options = | |
{ | |
KeepAliveInterval = TimeSpan.FromSeconds(20) | |
} | |
}; | |
} | |
_source = new CancellationTokenSource(); | |
_linked = CancellationTokenSource | |
.CreateLinkedTokenSource(_source.Token, token); | |
await _socket.ConnectAsync(new Uri(_endpoint), token); | |
ReceiveData(token); | |
SendData(_linked.Token); | |
if (IsConnected) | |
{ | |
HandleConnected(); | |
} | |
} | |
catch (OperationCanceledException) | |
{ | |
/* Ignore */ | |
} | |
catch (Exception e) | |
{ | |
_logger.LogError(e, $"Error connecting to endpoint: {_endpoint}"); | |
HandleError(e); | |
} | |
finally | |
{ | |
_sync.Release(); | |
} | |
return IsConnected; | |
} | |
private void TryReconnect(CancellationToken token) | |
{ | |
if (!_options.AutoReconnect) | |
{ | |
return; | |
} | |
try | |
{ | |
Task.Run(async () => | |
{ | |
while (!token.IsCancellationRequested) | |
{ | |
_logger.LogInformation( | |
$"Attempting to reconnect to endpoint: {_endpoint}"); | |
if (await ConnectAsync(token)) | |
{ | |
break; | |
} | |
await Task.Delay(_options.AutoReconnectDelay, token); | |
} | |
}, token); | |
} | |
catch (OperationCanceledException) | |
{ | |
/* Ignore */ | |
} | |
} | |
private void HandleConnected() | |
{ | |
_logger.LogInformation($"Socket connected to endpoint: {_endpoint}"); | |
OnConnected?.Invoke(); | |
} | |
public void Send(string data) | |
{ | |
ThrowIfDisposed(nameof(Socket)); | |
_messages.Writer.TryWrite(data); | |
} | |
public async Task CloseAsync() | |
{ | |
ThrowIfDisposed(nameof(Socket)); | |
_logger.LogInformation($"Socket closed to endpoint: {_endpoint}"); | |
try | |
{ | |
_source.Cancel(); | |
_linked.Dispose(); | |
_source.Dispose(); | |
if (IsConnected) | |
{ | |
await _socket.CloseAsync( | |
WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); | |
} | |
} | |
finally | |
{ | |
_socket.Dispose(); | |
} | |
OnDisconnected?.Invoke(); | |
} | |
protected override void Dispose(bool disposing) | |
{ | |
if (disposing) | |
{ | |
_source.Cancel(); | |
_linked.Dispose(); | |
_source.Dispose(); | |
_socket?.Dispose(); | |
_sync.Dispose(); | |
} | |
} | |
private void HandleData(string message) | |
{ | |
var delegates = OnData?.GetInvocationList(); | |
if (delegates != null) | |
{ | |
Task.Run(() => | |
{ | |
var data = JObject.Parse(message); | |
Parallel.ForEach(delegates, d => | |
{ | |
try | |
{ | |
d.DynamicInvoke(data); | |
} | |
catch (Exception) | |
{ | |
/* Ignore */ | |
} | |
}); | |
}); | |
} | |
} | |
private void HandleError(Exception exception) | |
{ | |
OnError?.Invoke(exception); | |
} | |
private void SendData(CancellationToken token) | |
{ | |
Task.Run(async () => | |
{ | |
try | |
{ | |
var reader = _messages.Reader; | |
while (await reader.WaitToReadAsync(token)) | |
{ | |
if (_socket.State != WebSocketState.Open) | |
{ | |
break; | |
} | |
if (reader.TryRead(out var item)) | |
{ | |
var data = new ArraySegment<byte>(Encoding.UTF8.GetBytes(item)); | |
await _socket | |
.SendAsync(data, WebSocketMessageType.Text, true, token); | |
} | |
} | |
} | |
catch (OperationCanceledException) | |
{ | |
/* Ignore */ | |
} | |
catch (Exception e) | |
{ | |
_logger.LogError(e, "Error sending message."); | |
HandleError(e); | |
} | |
}, token); | |
} | |
private void ReceiveData(CancellationToken token) | |
{ | |
Task.Run(async () => | |
{ | |
try | |
{ | |
await using var stream = new MemoryStream(); | |
var buffer = new ArraySegment<byte>(new byte[_options.ReceiveBufferSize]); | |
while (!token.IsCancellationRequested) | |
{ | |
if (!IsConnected) | |
{ | |
break; | |
} | |
stream.Seek(0, SeekOrigin.Begin); | |
var count = 0; | |
WebSocketReceiveResult result; | |
do | |
{ | |
result = await _socket.ReceiveAsync(buffer, token); | |
if (result.MessageType == WebSocketMessageType.Close) | |
{ | |
break; | |
} | |
if (buffer.Array != null) | |
{ | |
stream.Write(buffer.Array, 0, result.Count); | |
count += result.Count; | |
} | |
} while (!result.EndOfMessage); | |
var message = Encoding.UTF8.GetString(stream.GetBuffer(), 0, count); | |
HandleData(message); | |
} | |
} | |
catch (OperationCanceledException) | |
{ | |
/* Ignore */ | |
} | |
catch (Exception e) | |
{ | |
_logger.LogError(e, "Error receiving message."); | |
HandleError(e); | |
} | |
finally | |
{ | |
await CloseAsync(); | |
TryReconnect(token); | |
} | |
}, token); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment