Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save normanlmfung/d3e62c3d29d3c57a7f2f9ad984166839 to your computer and use it in GitHub Desktop.
Save normanlmfung/d3e62c3d29d3c57a7f2f9ad984166839 to your computer and use it in GitHub Desktop.
csharp_ws_pinned_memory
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
/*
'AllocateMemory' will allocate byte[] pinned in memory. You'd only do this ONCE. Pinned memory will not be garbage collected, reducing performance impact from GC.
'SerializeOrderBook' will use BitConverter.GetBytes and Buffer.BlockCopy to serialize 'originalOrderBook' to a byte[] pinned in memory.
'DeserializeOrderBook' will recovert from bytes[] pinned in memory, back to an OrderBook object.
'SerializeOrderBook' and 'DeserializeOrderBook' is called once during initial snapshot, but every updates.
*/
class Level
{
public float Price { get; set; }
public float Amount { get; set; }
public int NumOrders { get; set; }
}
class OrderBookUpdate
{
public string Symbol { get; }
public List<Level> Bids { get; set; }
public List<Level> Asks { get; set; }
public long Timestamp { get; set; }
public long PrevSeqId { get; set; }
public long SeqId { get; set; }
public long Checksum { get; set; }
public OrderBookUpdate(string symbol)
{
Symbol = symbol;
}
public static OrderBookUpdate ParseUpdate(string symbol, dynamic messageObject)
{
OrderBookUpdate update = new OrderBookUpdate(symbol);
update.Asks = ParseLevels(messageObject.data[0].asks);
update.Bids = ParseLevels(messageObject.data[0].bids);
update.Timestamp = messageObject.data[0].ts;
update.PrevSeqId = messageObject.data[0].prevSeqId;
update.SeqId = messageObject.data[0].seqId;
update.Checksum = messageObject.data[0].checksum;
return update;
}
private static List<Level> ParseLevels(dynamic levels)
{
List<Level> parsedLevels = new List<Level>();
foreach (var level in levels)
{
parsedLevels.Add(new Level
{
Price = float.Parse(level[0].ToString()),
Amount = float.Parse(level[1].ToString()),
NumOrders = int.Parse(level[3].ToString())
});
}
return parsedLevels;
}
}
class OrderBook
{
public string Symbol { get; }
public long LastTimeStamp { get; set; } // For example, 1711941149203, 13 digits (i.e. in milli-sec)
public List<Level> Bids { get; set; }
public List<Level> Asks { get; set; }
public OrderBook(string symbol, long lastTimestamp)
{
Symbol = symbol.Trim();
LastTimeStamp = lastTimestamp;
Bids = new List<Level>();
Asks = new List<Level>();
}
public static OrderBook ParseSnapshot(string symbol, dynamic messageObject)
{
OrderBook orderBook = new OrderBook(symbol, Convert.ToInt64(messageObject.data[0].ts));
foreach (var ask in messageObject.data[0].asks)
{
orderBook.Asks.Add(new Level
{
Price = float.Parse(ask[0].ToString()),
Amount = float.Parse(ask[1].ToString()),
NumOrders = int.Parse(ask[3].ToString())
});
}
foreach (var bid in messageObject.data[0].bids)
{
orderBook.Bids.Add(new Level
{
Price = float.Parse(bid[0].ToString()),
Amount = float.Parse(bid[1].ToString()),
NumOrders = int.Parse(bid[3].ToString())
});
}
return orderBook;
}
public float GetMidPrice()
{
if (Bids.Count > 0 && Asks.Count > 0)
{
float bestBid = Bids[0].Price;
float bestAsk = Asks[0].Price;
return (bestBid + bestAsk) / 2;
}
return 0;
}
public void UpdateBook(OrderBookUpdate update)
{
UpdateLevels(Bids, update.Bids);
UpdateLevels(Asks, update.Asks);
}
private void UpdateLevels(List<Level> currentLevels, List<Level> newLevels)
{
foreach (var newLevel in newLevels)
{
var existingLevel = currentLevels.Find(l => l.Price == newLevel.Price);
if (existingLevel != null)
{
if (newLevel.Amount == 0)
{
currentLevels.Remove(existingLevel);
}
else
{
existingLevel.Amount = newLevel.Amount;
existingLevel.NumOrders = newLevel.NumOrders;
}
}
else
{
currentLevels.Add(newLevel);
}
}
currentLevels.Sort((a, b) => a.Price.CompareTo(b.Price));
}
}
class Program
{
static void Log(string message)
{
string utcDateTime = DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss");
Console.WriteLine($"{utcDateTime} {message}");
}
static async Task Main(string[] args)
{
string baseUrl = "wss://ws.okx.com:8443/ws/v5/public";
string symbol = "BTC-USDT-SWAP";
string subscribeBooksRequest = "{\"op\":\"subscribe\",\"args\":[{\"channel\":\"books\",\"instId\":\"" + symbol + "\"}]}";
string subscribeBooks5Request = "{\"op\":\"subscribe\",\"args\":[{\"channel\":\"books5\",\"instId\":\"" + symbol + "\"}]}";
using (ClientWebSocket clientWebSocket = new ClientWebSocket())
{
try
{
await clientWebSocket.ConnectAsync(new Uri(baseUrl), CancellationToken.None);
await SendMessage(clientWebSocket, subscribeBooksRequest);
await ReceiveMessage(clientWebSocket);
await SendMessage(clientWebSocket, subscribeBooks5Request);
OrderBook orderBook = null;
// You allocate byte[] pinned in memory only once.
byte[] orderBookBuffer = AllocateMemory(symbol);
while (clientWebSocket.State == WebSocketState.Open)
{
string receivedMessage = await ReceiveMessage(clientWebSocket);
/*
SerializeOrderBook will use BitConverter.GetBytes and Buffer.BlockCopy to serialize 'originalOrderBook' to a byte[] pinned in memory.
DeserializeOrderBook will recovert from bytes[] pinned in memory, back to an OrderBook object.
*/
unsafe
{
dynamic messageObject = JsonConvert.DeserializeObject(receivedMessage);
if (messageObject.action == "snapshot")
{
OrderBook originalOrderBook = OrderBook.ParseSnapshot(symbol, messageObject);
SerializeOrderBook(symbol, originalOrderBook, orderBookBuffer);
fixed (byte* ptr = orderBookBuffer)
{
orderBook = DeserializeOrderBook(symbol, ptr, orderBookBuffer.Length, originalOrderBook.Bids.Count, originalOrderBook.Asks.Count);
}
}
else if (messageObject.action == "update")
{
if (orderBook != null)
{
fixed (byte* ptr = orderBookBuffer)
{
orderBook = DeserializeOrderBook(symbol, ptr, orderBookBuffer.Length, orderBook.Bids.Count, orderBook.Asks.Count);
}
OrderBookUpdate update = OrderBookUpdate.ParseUpdate(symbol, messageObject);
orderBook.LastTimeStamp = update.Timestamp;
orderBook.UpdateBook(update);
SerializeOrderBook(symbol, orderBook, orderBookBuffer);
}
}
if (orderBook != null)
{
Log($"{symbol} mid: {orderBook.GetMidPrice()}");
}
}
}
}
catch (Exception ex)
{
Log($"An error occurred: {ex.Message} {ex.StackTrace}");
}
}
}
static async Task SendMessage(ClientWebSocket clientWebSocket, string message)
{
byte[] bytes = Encoding.UTF8.GetBytes(message);
await clientWebSocket.SendAsync(new ArraySegment<byte>(bytes), WebSocketMessageType.Text, true, CancellationToken.None);
}
static async Task<string> ReceiveMessage(ClientWebSocket clientWebSocket)
{
byte[] buffer = new byte[1024];
WebSocketReceiveResult result;
StringBuilder message = new StringBuilder();
do
{
result = await clientWebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
message.Append(Encoding.UTF8.GetString(buffer, 0, result.Count));
}
while (!result.EndOfMessage);
return message.ToString();
}
static byte[] AllocateMemory(string symbol) {
const int MAX_BID_COUNT = 1000;
const int MAX_ASK_COUNT = 1000;
int size = symbol.Length * 2 + // String: 2 bytes per char
sizeof(long) + // LastTimeStamp (8 bytes)
sizeof(float) * 3 * MAX_BID_COUNT + // Bids: Price, Amount, NumOrders
sizeof(float) * 3 * MAX_ASK_COUNT; // Asks: Price, Amount, NumOrders
byte[] pinnedMemory = new byte[size];
return pinnedMemory;
}
static void SerializeOrderBook(string symbol, OrderBook orderBook, byte[] bytes)
{
int offset = 0;
byte[] symbolBytes = Encoding.UTF8.GetBytes(orderBook.Symbol);
Buffer.BlockCopy(symbolBytes, 0, bytes, offset, symbolBytes.Length);
offset += symbolBytes.Length;
Buffer.BlockCopy(BitConverter.GetBytes(orderBook.LastTimeStamp), 0, bytes, offset, sizeof(long));
offset += sizeof(long);
// Serialize Bids
Buffer.BlockCopy(BitConverter.GetBytes(orderBook.Bids.Count), 0, bytes, offset, sizeof(int));
offset += sizeof(int);
foreach (var bid in orderBook.Bids)
{
Buffer.BlockCopy(BitConverter.GetBytes(bid.Price), 0, bytes, offset, sizeof(float));
offset += sizeof(float);
Buffer.BlockCopy(BitConverter.GetBytes(bid.Amount), 0, bytes, offset, sizeof(float));
offset += sizeof(float);
Buffer.BlockCopy(BitConverter.GetBytes(bid.NumOrders), 0, bytes, offset, sizeof(int));
offset += sizeof(int);
}
// Serialize Asks
Buffer.BlockCopy(BitConverter.GetBytes(orderBook.Asks.Count), 0, bytes, offset, sizeof(int));
offset += sizeof(int);
foreach (var ask in orderBook.Asks)
{
Buffer.BlockCopy(BitConverter.GetBytes(ask.Price), 0, bytes, offset, sizeof(float));
offset += sizeof(float);
Buffer.BlockCopy(BitConverter.GetBytes(ask.Amount), 0, bytes, offset, sizeof(float));
offset += sizeof(float);
Buffer.BlockCopy(BitConverter.GetBytes(ask.NumOrders), 0, bytes, offset, sizeof(int));
offset += sizeof(int);
}
}
static unsafe OrderBook DeserializeOrderBook(string symbol, byte* ptr, int length, int numBids, int numAsks)
{
int offset = symbol.Length;
long timestamp = BitConverter.ToInt64(new ReadOnlySpan<byte>(ptr+offset, sizeof(long)));
OrderBook orderBook = new OrderBook(symbol, timestamp);
offset += sizeof(long);
for (int i = 0; i<numBids; i++)
{
float price = BitConverter.ToSingle(new ReadOnlySpan<byte>(ptr + offset, sizeof(float)));
offset += sizeof(float);
float amount = BitConverter.ToSingle(new ReadOnlySpan<byte>(ptr + offset, sizeof(float)));
offset += sizeof(float);
int numOrders = BitConverter.ToInt32(new ReadOnlySpan<byte>(ptr + offset, sizeof(int)));
offset += sizeof(int);
orderBook.Bids.Add(new Level { Price = price, Amount = amount, NumOrders = numOrders });
}
for (int i = 0; i<numAsks; i++)
{
float price = BitConverter.ToSingle(new ReadOnlySpan<byte>(ptr + offset, sizeof(float)));
offset += sizeof(float);
float amount = BitConverter.ToSingle(new ReadOnlySpan<byte>(ptr + offset, sizeof(float)));
offset += sizeof(float);
int numOrders = BitConverter.ToInt32(new ReadOnlySpan<byte>(ptr + offset, sizeof(int)));
offset += sizeof(int);
orderBook.Asks.Add(new Level { Price = price, Amount = amount, NumOrders = numOrders });
}
return orderBook;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment