Skip to content

Instantly share code, notes, and snippets.

@dealproc
Created May 4, 2019 03:21
Show Gist options
  • Save dealproc/9e989840c42e460cb7aab999d204f986 to your computer and use it in GitHub Desktop.
Save dealproc/9e989840c42e460cb7aab999d204f986 to your computer and use it in GitHub Desktop.
public class StreamServer : IDisposable
{
static Serilog.ILogger Log = Serilog.Log.ForContext<StreamServer>();
private const byte ACK = 0x06;
private const byte NAK = 0x15;
private const string HOST_TO_TERMINAL = "[Host->Terminal]";
private const string TERMINAL_TO_HOST = "[Terminal->Host]";
private const byte STX = 0x02;
private const byte ETX = 0x03;
private const string ACK_STR = "<ACK>";
private const string NAK_STR = "<NAK>";
private const string STX_STR = "<STX>";
private const string ETX_STR = "<ETX>";
public ITransportChannel TransportChannel { get; private set; }
readonly bool _disconnectOnError;
private byte[] _lastDataSent;
private int _sendFailures = 0;
public event EventHandler<Exception> OnError;
public event EventHandler<string> OnPacketReceived;
public event EventHandler<string> OnPacketSent;
public event EventHandler<string> OnNotification;
public event EventHandler<EventArgs> OnDisconnect;
public StreamServer(ITransportChannel transportChannel, bool disconnectOnError = false)
{
TransportChannel = transportChannel ?? throw new ArgumentNullException(nameof(transportChannel));
_disconnectOnError = disconnectOnError;
}
public void Start(IStreamProcessor processor)
{
processor.OnStartAsync().GetAwaiter().GetResult();
TransportChannel.OnClientConnected += (s, stream) =>
{
Task.Run(async () =>
{
try
{
FireNotification("Client Connected!");
MemoryStream logBuffer = new MemoryStream();
MemoryStream buffer = new MemoryStream();
int failCounter = 0;
var bytesBuffer = new byte[1024];
while (true)
{
var bytesRead = stream.Read(bytesBuffer, 0, bytesBuffer.Length);
if (bytesRead == 0) continue;
var actualBytes = bytesBuffer.Take(bytesRead).ToArray();
if (actualBytes.Any(b => b == ACK))
{
_sendFailures = 0;
FireNotification(ACK_STR);
continue;
}
if (actualBytes.Any(b => b == NAK))
{
_sendFailures++;
FireNotification($"{NAK_STR}: {_sendFailures} failures");
// If <NAK> 3 times we should disconnect
if (_sendFailures == 3)
{
var msg = $"[FAILURE] Disconnected due to invalid messages.";
FireNotification(msg);
if (_disconnectOnError) { await Disconnect(msg); }
break;
}
//Try again
await Write(_lastDataSent, stream);
continue;
}
if (actualBytes.Any(b => b == STX))
{
logBuffer?.Dispose();
logBuffer = new MemoryStream();
logBuffer.WriteByte(STX);
buffer?.Dispose();
buffer = new MemoryStream();
var stxIndex = Array.IndexOf(actualBytes, STX);
actualBytes = actualBytes.Skip(stxIndex + 1).ToArray();
}
logBuffer.Write(actualBytes, 0, actualBytes.Length);
buffer.Write(actualBytes, 0, actualBytes.Length);
await Task.Delay(5);
var data = buffer.ToArray();
var etxIndex = Array.IndexOf(data, ETX);
if (data.Any(b => b == ETX) && (etxIndex <= (data.Length - 1)))
{
var readLRC = data.Skip(etxIndex + 1).Take(1).Single();
data = data.Take(etxIndex + 1).ToArray();
logBuffer.WriteByte(readLRC);
FirePacketIn(logBuffer.ToArray());
var calculatedLRC = data.CalculateLRC();
if (calculatedLRC == readLRC)
{
failCounter = 0;
stream.WriteByte(ACK);
await processor.ProcessAsync(data.Take(data.Length - 1).ToArray(), stream);
}
else
{
failCounter++;
stream.WriteByte(NAK);
Log.Warning($"Wrong LRC from message: {readLRC}. It should be {calculatedLRC}.");
if (failCounter == 3)
{
var msg = $"Disconnecting due to wrong LRC from message: {readLRC}. It should be {calculatedLRC}.";
Log.Warning(msg);
if (_disconnectOnError) { await Disconnect(msg); }
}
}
}
}
}
catch (ObjectDisposedException) { OnNotification?.Invoke(this, "Host disconnected"); }
catch (SocketException)
{
OnNotification?.Invoke(this, "Disconnected");
}
catch (Exception ex)
{
OnError?.Invoke(this, ex);
//Exceptions should not happen here. But if it does, log it and Disconnect()
Log.Error(ex.StackTrace);
if (_disconnectOnError) { await Disconnect(ex.Message); }
}
});
};
TransportChannel.Start().GetAwaiter().GetResult();
}
/// <summary>
/// Disconnect a client and stop the KSMServer
/// </summary>
public Task Disconnect(string reason)
{
return Task.Run(() =>
{
try
{
OnNotification?.Invoke(this, reason);
OnDisconnect?.Invoke(this, EventArgs.Empty);
TransportChannel.Stop();
}
catch (Exception ex)
{
Log.Warning(ex, ex.Message);
}
});
}
/// <summary>
/// Write data back to the client
/// The message written will follow the format "<STX>data<ETX>{LRC}"
/// </summary>
/// <param name="data">Buffer Data</param>
public async Task Write(byte[] data, Stream stream)
{
try
{
if (data == default(byte[])) { return; }
_lastDataSent = data;
var toLrc = new MemoryStream();
await toLrc.WriteAsync(data, 0, data.Length);
toLrc.WriteByte(ETX);
var lrc = toLrc.ToArray().CalculateLRC();
// <STX>[MESSAGE]<ETX>{LRC}
var output = new MemoryStream();
output.WriteByte(STX); //<STX>
await output.WriteAsync(data, 0, data.Length); //[MESSAGE]
output.WriteByte(ETX); //<ETX>
output.WriteByte(lrc); //{LRC}
var outputData = output.ToArray();
await stream.WriteAsync(outputData, 0, outputData.Length);
FirePacketOut(outputData);
}
catch (IOException exc)
{
Log.Error(exc, "Seems as if we may have lost the connection. We are disconnecting.");
await Disconnect("Lost connection?");
}
catch (Exception ex)
{
OnError?.Invoke(this, ex);
//Exceptions should not happen here. But if it does, log it and Disconnect()
Log.Error(ex, ex.Message);
if (_disconnectOnError) { await Disconnect(ex.Message); }
}
}
public void Dispose()
{
TransportChannel?.Stop();
if (TransportChannel is IDisposable)
{
((IDisposable)TransportChannel).Dispose();
}
TransportChannel = null;
}
private void FirePacketOut(byte[] data)
{
Log.Debug($"{TERMINAL_TO_HOST}: {PacketToString(data)}");
OnPacketSent?.Invoke(this, $"{TERMINAL_TO_HOST} {PacketToString(data)}");
}
private void FirePacketIn(byte[] data)
{
Log.Debug($"{HOST_TO_TERMINAL}: {PacketToString(data)}");
OnPacketReceived?.Invoke(this, $"{HOST_TO_TERMINAL} {PacketToString(data)}");
}
private void FireNotification(string msg)
{
Log.Debug(msg);
OnNotification?.Invoke(this, msg);
}
private string PacketToString(byte[] ba)
{
var withoutStx = ba.Skip(1).ToArray();
var data = withoutStx.Take(withoutStx.Length - 2).ToArray();
string ascii = Encoding.ASCII.GetString(data);
var sb = new StringBuilder();
sb.Append(STX_STR);
sb.Append(ascii);
sb.Append(ETX_STR);
var lrc = withoutStx[withoutStx.Length - 1];
sb.AppendFormat("{0:X2}", lrc);
return sb.ToString();
}
}
internal class StreamServer : IDisposable
{
static Serilog.ILogger Log = Serilog.Log.ForContext<StreamServer>();
private const byte ACK = 0x06;
private const byte NAK = 0x15;
private const string HOST_TO_TERMINAL = "[Host->Terminal]";
private const string TERMINAL_TO_HOST = "[Terminal->Host]";
private const byte STX = 0x02;
private const byte ETX = 0x03;
private const string ACK_STR = "<ACK>";
private const string NAK_STR = "<NAK>";
private const string STX_STR = "<STX>";
private const string ETX_STR = "<ETX>";
readonly ITransportChannel _transportChannel;
readonly bool _disconnectOnError;
private byte[] _lastDataSent;
private int _sendFailures = 0;
public event EventHandler<Exception> OnError;
public event EventHandler<string> OnPacketReceived;
public event EventHandler<string> OnPacketSent;
public event EventHandler<string> OnNotification;
public event EventHandler<EventArgs> OnDisconnect;
public StreamServer(ITransportChannel transportChannel, bool disconnectOnError = false)
{
_transportChannel = transportChannel ?? throw new ArgumentNullException(nameof(transportChannel));
_disconnectOnError = disconnectOnError;
}
public void Start(IStreamProcessor processor)
{
processor.OnStartAsync().GetAwaiter().GetResult();
_transportChannel.OnConnected += (s, stream) =>
{
Task.Run(async () =>
{
try
{
FireNotification("Client Connected!");
MemoryStream logBuffer = new MemoryStream();
MemoryStream buffer = new MemoryStream();
int failCounter = 0;
while(true)
{
var readedByte = (byte)stream.ReadByte();
if (readedByte == ACK)
{
_sendFailures = 0;
FireNotification(ACK_STR);
continue;
}
if (readedByte == NAK)
{
_sendFailures++;
FireNotification($"{NAK_STR}: {_sendFailures} failures");
// If <NAK> 3 times we should disconnect
if (_sendFailures == 3)
{
var msg = $"[FAILURE] Disconnected due to invalid messages.";
FireNotification(msg);
if (_disconnectOnError) { await Disconnect(msg); }
break;
}
//Try again
await Write(_lastDataSent, stream);
continue;
}
if (readedByte == STX)
{
logBuffer?.Dispose();
logBuffer = new MemoryStream();
logBuffer.WriteByte(STX);
buffer?.Dispose();
buffer = new MemoryStream();
continue;
}
logBuffer.WriteByte(readedByte);
buffer.WriteByte(readedByte);
await Task.Delay(5);
if (readedByte == ETX)
{
var readedLRC = (byte)stream.ReadByte();
logBuffer.WriteByte(readedLRC);
FirePacketIn(logBuffer.ToArray());
var data = buffer.ToArray();
var calculatedLRC = data.CalculateLRC();
if (calculatedLRC == readedLRC)
{
failCounter = 0;
stream.WriteByte(ACK);
await processor.ProcessAsync(data.Take(data.Length - 1).ToArray(), stream);
}
else
{
failCounter++;
stream.WriteByte(NAK);
Log.Warning($"Wrong LRC from message: {readedLRC}. It should be {calculatedLRC}.");
if (failCounter == 3)
{
var msg = $"Disconnecting due to wrong LRC from message: {readedLRC}. It should be {calculatedLRC}.";
Log.Warning(msg);
if (_disconnectOnError) { await Disconnect(msg); }
}
}
}
}
}
catch (ObjectDisposedException) { OnNotification?.Invoke(this, "Host disconnected"); }
catch (SocketException)
{
OnNotification?.Invoke(this, "Disconnected");
}
catch (Exception ex)
{
OnError?.Invoke(this, ex);
//Exceptions should not happen here. But if it does, log it and Disconnect()
Log.Error(ex.StackTrace);
if (_disconnectOnError) { await Disconnect(ex.Message); }
}
});
};
_transportChannel.Start().GetAwaiter().GetResult();
}
/// <summary>
/// Disconnect a client and stop the KSMServer
/// </summary>
public Task Disconnect(string reason)
{
return Task.Run(() =>
{
try
{
OnNotification?.Invoke(this, reason);
OnDisconnect?.Invoke(this, EventArgs.Empty);
_transportChannel.Stop();
}
catch (Exception ex)
{
Log.Warning(ex, ex.Message);
}
});
}
/// <summary>
/// Write data back to the client
/// The message written will follow the format "<STX>data<ETX>{LRC}"
/// </summary>
/// <param name="data">Buffer Data</param>
public async Task Write(byte[] data, Stream stream)
{
try
{
if (data == default(byte[])) { return; }
_lastDataSent = data;
var toLrc = new MemoryStream();
await toLrc.WriteAsync(data, 0, data.Length);
toLrc.WriteByte(ETX);
var lrc = toLrc.ToArray().CalculateLRC();
// <STX>[MESSAGE]<ETX>{LRC}
var output = new MemoryStream();
output.WriteByte(STX); //<STX>
await output.WriteAsync(data, 0, data.Length); //[MESSAGE]
output.WriteByte(ETX); //<ETX>
output.WriteByte(lrc); //{LRC}
var outputData = output.ToArray();
await stream.WriteAsync(outputData, 0, outputData.Length);
FirePacketOut(outputData);
}
catch (Exception ex)
{
OnError?.Invoke(this, ex);
//Exceptions should not happen here. But if it does, log it and Disconnect()
Log.Error(ex, ex.Message);
if (_disconnectOnError) { await Disconnect(ex.Message); }
}
}
public void Dispose()
{
_transportChannel?.Stop();
}
private void FirePacketOut(byte[] data)
{
Log.Debug($"{TERMINAL_TO_HOST}: {PacketToString(data)}");
OnPacketSent?.Invoke(this, $"{TERMINAL_TO_HOST} {PacketToString(data)}");
}
private void FirePacketIn(byte[] data)
{
Log.Debug($"{HOST_TO_TERMINAL}: {PacketToString(data)}");
OnPacketReceived?.Invoke(this, $"{HOST_TO_TERMINAL} {PacketToString(data)}");
}
private void FireNotification(string msg)
{
Log.Debug(msg);
OnNotification?.Invoke(this, msg);
}
private string PacketToString(byte[] ba)
{
var withoutStx = ba.Skip(1).ToArray();
var data = withoutStx.Take(withoutStx.Length - 2).ToArray();
string ascii = Encoding.ASCII.GetString(data);
var sb = new StringBuilder();
sb.Append(STX_STR);
sb.Append(ascii);
sb.Append(ETX_STR);
var lrc = withoutStx[withoutStx.Length - 1];
sb.AppendFormat("{0:X2}", lrc);
return sb.ToString();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment