Created
May 4, 2019 03:21
-
-
Save dealproc/9e989840c42e460cb7aab999d204f986 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class StreamServer : IDisposable | |
{ | |
static Serilog.ILogger Log = Serilog.Log.ForContext<StreamServer>(); | |
private const byte ACK = 0x06; | |
private const byte NAK = 0x15; | |
private const string HOST_TO_TERMINAL = "[Host->Terminal]"; | |
private const string TERMINAL_TO_HOST = "[Terminal->Host]"; | |
private const byte STX = 0x02; | |
private const byte ETX = 0x03; | |
private const string ACK_STR = "<ACK>"; | |
private const string NAK_STR = "<NAK>"; | |
private const string STX_STR = "<STX>"; | |
private const string ETX_STR = "<ETX>"; | |
public ITransportChannel TransportChannel { get; private set; } | |
readonly bool _disconnectOnError; | |
private byte[] _lastDataSent; | |
private int _sendFailures = 0; | |
public event EventHandler<Exception> OnError; | |
public event EventHandler<string> OnPacketReceived; | |
public event EventHandler<string> OnPacketSent; | |
public event EventHandler<string> OnNotification; | |
public event EventHandler<EventArgs> OnDisconnect; | |
public StreamServer(ITransportChannel transportChannel, bool disconnectOnError = false) | |
{ | |
TransportChannel = transportChannel ?? throw new ArgumentNullException(nameof(transportChannel)); | |
_disconnectOnError = disconnectOnError; | |
} | |
public void Start(IStreamProcessor processor) | |
{ | |
processor.OnStartAsync().GetAwaiter().GetResult(); | |
TransportChannel.OnClientConnected += (s, stream) => | |
{ | |
Task.Run(async () => | |
{ | |
try | |
{ | |
FireNotification("Client Connected!"); | |
MemoryStream logBuffer = new MemoryStream(); | |
MemoryStream buffer = new MemoryStream(); | |
int failCounter = 0; | |
var bytesBuffer = new byte[1024]; | |
while (true) | |
{ | |
var bytesRead = stream.Read(bytesBuffer, 0, bytesBuffer.Length); | |
if (bytesRead == 0) continue; | |
var actualBytes = bytesBuffer.Take(bytesRead).ToArray(); | |
if (actualBytes.Any(b => b == ACK)) | |
{ | |
_sendFailures = 0; | |
FireNotification(ACK_STR); | |
continue; | |
} | |
if (actualBytes.Any(b => b == NAK)) | |
{ | |
_sendFailures++; | |
FireNotification($"{NAK_STR}: {_sendFailures} failures"); | |
// If <NAK> 3 times we should disconnect | |
if (_sendFailures == 3) | |
{ | |
var msg = $"[FAILURE] Disconnected due to invalid messages."; | |
FireNotification(msg); | |
if (_disconnectOnError) { await Disconnect(msg); } | |
break; | |
} | |
//Try again | |
await Write(_lastDataSent, stream); | |
continue; | |
} | |
if (actualBytes.Any(b => b == STX)) | |
{ | |
logBuffer?.Dispose(); | |
logBuffer = new MemoryStream(); | |
logBuffer.WriteByte(STX); | |
buffer?.Dispose(); | |
buffer = new MemoryStream(); | |
var stxIndex = Array.IndexOf(actualBytes, STX); | |
actualBytes = actualBytes.Skip(stxIndex + 1).ToArray(); | |
} | |
logBuffer.Write(actualBytes, 0, actualBytes.Length); | |
buffer.Write(actualBytes, 0, actualBytes.Length); | |
await Task.Delay(5); | |
var data = buffer.ToArray(); | |
var etxIndex = Array.IndexOf(data, ETX); | |
if (data.Any(b => b == ETX) && (etxIndex <= (data.Length - 1))) | |
{ | |
var readLRC = data.Skip(etxIndex + 1).Take(1).Single(); | |
data = data.Take(etxIndex + 1).ToArray(); | |
logBuffer.WriteByte(readLRC); | |
FirePacketIn(logBuffer.ToArray()); | |
var calculatedLRC = data.CalculateLRC(); | |
if (calculatedLRC == readLRC) | |
{ | |
failCounter = 0; | |
stream.WriteByte(ACK); | |
await processor.ProcessAsync(data.Take(data.Length - 1).ToArray(), stream); | |
} | |
else | |
{ | |
failCounter++; | |
stream.WriteByte(NAK); | |
Log.Warning($"Wrong LRC from message: {readLRC}. It should be {calculatedLRC}."); | |
if (failCounter == 3) | |
{ | |
var msg = $"Disconnecting due to wrong LRC from message: {readLRC}. It should be {calculatedLRC}."; | |
Log.Warning(msg); | |
if (_disconnectOnError) { await Disconnect(msg); } | |
} | |
} | |
} | |
} | |
} | |
catch (ObjectDisposedException) { OnNotification?.Invoke(this, "Host disconnected"); } | |
catch (SocketException) | |
{ | |
OnNotification?.Invoke(this, "Disconnected"); | |
} | |
catch (Exception ex) | |
{ | |
OnError?.Invoke(this, ex); | |
//Exceptions should not happen here. But if it does, log it and Disconnect() | |
Log.Error(ex.StackTrace); | |
if (_disconnectOnError) { await Disconnect(ex.Message); } | |
} | |
}); | |
}; | |
TransportChannel.Start().GetAwaiter().GetResult(); | |
} | |
/// <summary> | |
/// Disconnect a client and stop the KSMServer | |
/// </summary> | |
public Task Disconnect(string reason) | |
{ | |
return Task.Run(() => | |
{ | |
try | |
{ | |
OnNotification?.Invoke(this, reason); | |
OnDisconnect?.Invoke(this, EventArgs.Empty); | |
TransportChannel.Stop(); | |
} | |
catch (Exception ex) | |
{ | |
Log.Warning(ex, ex.Message); | |
} | |
}); | |
} | |
/// <summary> | |
/// Write data back to the client | |
/// The message written will follow the format "<STX>data<ETX>{LRC}" | |
/// </summary> | |
/// <param name="data">Buffer Data</param> | |
public async Task Write(byte[] data, Stream stream) | |
{ | |
try | |
{ | |
if (data == default(byte[])) { return; } | |
_lastDataSent = data; | |
var toLrc = new MemoryStream(); | |
await toLrc.WriteAsync(data, 0, data.Length); | |
toLrc.WriteByte(ETX); | |
var lrc = toLrc.ToArray().CalculateLRC(); | |
// <STX>[MESSAGE]<ETX>{LRC} | |
var output = new MemoryStream(); | |
output.WriteByte(STX); //<STX> | |
await output.WriteAsync(data, 0, data.Length); //[MESSAGE] | |
output.WriteByte(ETX); //<ETX> | |
output.WriteByte(lrc); //{LRC} | |
var outputData = output.ToArray(); | |
await stream.WriteAsync(outputData, 0, outputData.Length); | |
FirePacketOut(outputData); | |
} | |
catch (IOException exc) | |
{ | |
Log.Error(exc, "Seems as if we may have lost the connection. We are disconnecting."); | |
await Disconnect("Lost connection?"); | |
} | |
catch (Exception ex) | |
{ | |
OnError?.Invoke(this, ex); | |
//Exceptions should not happen here. But if it does, log it and Disconnect() | |
Log.Error(ex, ex.Message); | |
if (_disconnectOnError) { await Disconnect(ex.Message); } | |
} | |
} | |
public void Dispose() | |
{ | |
TransportChannel?.Stop(); | |
if (TransportChannel is IDisposable) | |
{ | |
((IDisposable)TransportChannel).Dispose(); | |
} | |
TransportChannel = null; | |
} | |
private void FirePacketOut(byte[] data) | |
{ | |
Log.Debug($"{TERMINAL_TO_HOST}: {PacketToString(data)}"); | |
OnPacketSent?.Invoke(this, $"{TERMINAL_TO_HOST} {PacketToString(data)}"); | |
} | |
private void FirePacketIn(byte[] data) | |
{ | |
Log.Debug($"{HOST_TO_TERMINAL}: {PacketToString(data)}"); | |
OnPacketReceived?.Invoke(this, $"{HOST_TO_TERMINAL} {PacketToString(data)}"); | |
} | |
private void FireNotification(string msg) | |
{ | |
Log.Debug(msg); | |
OnNotification?.Invoke(this, msg); | |
} | |
private string PacketToString(byte[] ba) | |
{ | |
var withoutStx = ba.Skip(1).ToArray(); | |
var data = withoutStx.Take(withoutStx.Length - 2).ToArray(); | |
string ascii = Encoding.ASCII.GetString(data); | |
var sb = new StringBuilder(); | |
sb.Append(STX_STR); | |
sb.Append(ascii); | |
sb.Append(ETX_STR); | |
var lrc = withoutStx[withoutStx.Length - 1]; | |
sb.AppendFormat("{0:X2}", lrc); | |
return sb.ToString(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
internal class StreamServer : IDisposable | |
{ | |
static Serilog.ILogger Log = Serilog.Log.ForContext<StreamServer>(); | |
private const byte ACK = 0x06; | |
private const byte NAK = 0x15; | |
private const string HOST_TO_TERMINAL = "[Host->Terminal]"; | |
private const string TERMINAL_TO_HOST = "[Terminal->Host]"; | |
private const byte STX = 0x02; | |
private const byte ETX = 0x03; | |
private const string ACK_STR = "<ACK>"; | |
private const string NAK_STR = "<NAK>"; | |
private const string STX_STR = "<STX>"; | |
private const string ETX_STR = "<ETX>"; | |
readonly ITransportChannel _transportChannel; | |
readonly bool _disconnectOnError; | |
private byte[] _lastDataSent; | |
private int _sendFailures = 0; | |
public event EventHandler<Exception> OnError; | |
public event EventHandler<string> OnPacketReceived; | |
public event EventHandler<string> OnPacketSent; | |
public event EventHandler<string> OnNotification; | |
public event EventHandler<EventArgs> OnDisconnect; | |
public StreamServer(ITransportChannel transportChannel, bool disconnectOnError = false) | |
{ | |
_transportChannel = transportChannel ?? throw new ArgumentNullException(nameof(transportChannel)); | |
_disconnectOnError = disconnectOnError; | |
} | |
public void Start(IStreamProcessor processor) | |
{ | |
processor.OnStartAsync().GetAwaiter().GetResult(); | |
_transportChannel.OnConnected += (s, stream) => | |
{ | |
Task.Run(async () => | |
{ | |
try | |
{ | |
FireNotification("Client Connected!"); | |
MemoryStream logBuffer = new MemoryStream(); | |
MemoryStream buffer = new MemoryStream(); | |
int failCounter = 0; | |
while(true) | |
{ | |
var readedByte = (byte)stream.ReadByte(); | |
if (readedByte == ACK) | |
{ | |
_sendFailures = 0; | |
FireNotification(ACK_STR); | |
continue; | |
} | |
if (readedByte == NAK) | |
{ | |
_sendFailures++; | |
FireNotification($"{NAK_STR}: {_sendFailures} failures"); | |
// If <NAK> 3 times we should disconnect | |
if (_sendFailures == 3) | |
{ | |
var msg = $"[FAILURE] Disconnected due to invalid messages."; | |
FireNotification(msg); | |
if (_disconnectOnError) { await Disconnect(msg); } | |
break; | |
} | |
//Try again | |
await Write(_lastDataSent, stream); | |
continue; | |
} | |
if (readedByte == STX) | |
{ | |
logBuffer?.Dispose(); | |
logBuffer = new MemoryStream(); | |
logBuffer.WriteByte(STX); | |
buffer?.Dispose(); | |
buffer = new MemoryStream(); | |
continue; | |
} | |
logBuffer.WriteByte(readedByte); | |
buffer.WriteByte(readedByte); | |
await Task.Delay(5); | |
if (readedByte == ETX) | |
{ | |
var readedLRC = (byte)stream.ReadByte(); | |
logBuffer.WriteByte(readedLRC); | |
FirePacketIn(logBuffer.ToArray()); | |
var data = buffer.ToArray(); | |
var calculatedLRC = data.CalculateLRC(); | |
if (calculatedLRC == readedLRC) | |
{ | |
failCounter = 0; | |
stream.WriteByte(ACK); | |
await processor.ProcessAsync(data.Take(data.Length - 1).ToArray(), stream); | |
} | |
else | |
{ | |
failCounter++; | |
stream.WriteByte(NAK); | |
Log.Warning($"Wrong LRC from message: {readedLRC}. It should be {calculatedLRC}."); | |
if (failCounter == 3) | |
{ | |
var msg = $"Disconnecting due to wrong LRC from message: {readedLRC}. It should be {calculatedLRC}."; | |
Log.Warning(msg); | |
if (_disconnectOnError) { await Disconnect(msg); } | |
} | |
} | |
} | |
} | |
} | |
catch (ObjectDisposedException) { OnNotification?.Invoke(this, "Host disconnected"); } | |
catch (SocketException) | |
{ | |
OnNotification?.Invoke(this, "Disconnected"); | |
} | |
catch (Exception ex) | |
{ | |
OnError?.Invoke(this, ex); | |
//Exceptions should not happen here. But if it does, log it and Disconnect() | |
Log.Error(ex.StackTrace); | |
if (_disconnectOnError) { await Disconnect(ex.Message); } | |
} | |
}); | |
}; | |
_transportChannel.Start().GetAwaiter().GetResult(); | |
} | |
/// <summary> | |
/// Disconnect a client and stop the KSMServer | |
/// </summary> | |
public Task Disconnect(string reason) | |
{ | |
return Task.Run(() => | |
{ | |
try | |
{ | |
OnNotification?.Invoke(this, reason); | |
OnDisconnect?.Invoke(this, EventArgs.Empty); | |
_transportChannel.Stop(); | |
} | |
catch (Exception ex) | |
{ | |
Log.Warning(ex, ex.Message); | |
} | |
}); | |
} | |
/// <summary> | |
/// Write data back to the client | |
/// The message written will follow the format "<STX>data<ETX>{LRC}" | |
/// </summary> | |
/// <param name="data">Buffer Data</param> | |
public async Task Write(byte[] data, Stream stream) | |
{ | |
try | |
{ | |
if (data == default(byte[])) { return; } | |
_lastDataSent = data; | |
var toLrc = new MemoryStream(); | |
await toLrc.WriteAsync(data, 0, data.Length); | |
toLrc.WriteByte(ETX); | |
var lrc = toLrc.ToArray().CalculateLRC(); | |
// <STX>[MESSAGE]<ETX>{LRC} | |
var output = new MemoryStream(); | |
output.WriteByte(STX); //<STX> | |
await output.WriteAsync(data, 0, data.Length); //[MESSAGE] | |
output.WriteByte(ETX); //<ETX> | |
output.WriteByte(lrc); //{LRC} | |
var outputData = output.ToArray(); | |
await stream.WriteAsync(outputData, 0, outputData.Length); | |
FirePacketOut(outputData); | |
} | |
catch (Exception ex) | |
{ | |
OnError?.Invoke(this, ex); | |
//Exceptions should not happen here. But if it does, log it and Disconnect() | |
Log.Error(ex, ex.Message); | |
if (_disconnectOnError) { await Disconnect(ex.Message); } | |
} | |
} | |
public void Dispose() | |
{ | |
_transportChannel?.Stop(); | |
} | |
private void FirePacketOut(byte[] data) | |
{ | |
Log.Debug($"{TERMINAL_TO_HOST}: {PacketToString(data)}"); | |
OnPacketSent?.Invoke(this, $"{TERMINAL_TO_HOST} {PacketToString(data)}"); | |
} | |
private void FirePacketIn(byte[] data) | |
{ | |
Log.Debug($"{HOST_TO_TERMINAL}: {PacketToString(data)}"); | |
OnPacketReceived?.Invoke(this, $"{HOST_TO_TERMINAL} {PacketToString(data)}"); | |
} | |
private void FireNotification(string msg) | |
{ | |
Log.Debug(msg); | |
OnNotification?.Invoke(this, msg); | |
} | |
private string PacketToString(byte[] ba) | |
{ | |
var withoutStx = ba.Skip(1).ToArray(); | |
var data = withoutStx.Take(withoutStx.Length - 2).ToArray(); | |
string ascii = Encoding.ASCII.GetString(data); | |
var sb = new StringBuilder(); | |
sb.Append(STX_STR); | |
sb.Append(ascii); | |
sb.Append(ETX_STR); | |
var lrc = withoutStx[withoutStx.Length - 1]; | |
sb.AppendFormat("{0:X2}", lrc); | |
return sb.ToString(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment