Created
April 1, 2024 03:20
-
-
Save normanlmfung/d3e62c3d29d3c57a7f2f9ad984166839 to your computer and use it in GitHub Desktop.
csharp_ws_pinned_memory
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Newtonsoft.Json; | |
using System; | |
using System.Collections.Generic; | |
using System.Net.WebSockets; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
/* | |
'AllocateMemory' will allocate byte[] pinned in memory. You'd only do this ONCE. Pinned memory will not be garbage collected, reducing performance impact from GC. | |
'SerializeOrderBook' will use BitConverter.GetBytes and Buffer.BlockCopy to serialize 'originalOrderBook' to a byte[] pinned in memory. | |
'DeserializeOrderBook' will recovert from bytes[] pinned in memory, back to an OrderBook object. | |
'SerializeOrderBook' and 'DeserializeOrderBook' is called once during initial snapshot, but every updates. | |
*/ | |
class Level | |
{ | |
public float Price { get; set; } | |
public float Amount { get; set; } | |
public int NumOrders { get; set; } | |
} | |
class OrderBookUpdate | |
{ | |
public string Symbol { get; } | |
public List<Level> Bids { get; set; } | |
public List<Level> Asks { get; set; } | |
public long Timestamp { get; set; } | |
public long PrevSeqId { get; set; } | |
public long SeqId { get; set; } | |
public long Checksum { get; set; } | |
public OrderBookUpdate(string symbol) | |
{ | |
Symbol = symbol; | |
} | |
public static OrderBookUpdate ParseUpdate(string symbol, dynamic messageObject) | |
{ | |
OrderBookUpdate update = new OrderBookUpdate(symbol); | |
update.Asks = ParseLevels(messageObject.data[0].asks); | |
update.Bids = ParseLevels(messageObject.data[0].bids); | |
update.Timestamp = messageObject.data[0].ts; | |
update.PrevSeqId = messageObject.data[0].prevSeqId; | |
update.SeqId = messageObject.data[0].seqId; | |
update.Checksum = messageObject.data[0].checksum; | |
return update; | |
} | |
private static List<Level> ParseLevels(dynamic levels) | |
{ | |
List<Level> parsedLevels = new List<Level>(); | |
foreach (var level in levels) | |
{ | |
parsedLevels.Add(new Level | |
{ | |
Price = float.Parse(level[0].ToString()), | |
Amount = float.Parse(level[1].ToString()), | |
NumOrders = int.Parse(level[3].ToString()) | |
}); | |
} | |
return parsedLevels; | |
} | |
} | |
class OrderBook | |
{ | |
public string Symbol { get; } | |
public long LastTimeStamp { get; set; } // For example, 1711941149203, 13 digits (i.e. in milli-sec) | |
public List<Level> Bids { get; set; } | |
public List<Level> Asks { get; set; } | |
public OrderBook(string symbol, long lastTimestamp) | |
{ | |
Symbol = symbol.Trim(); | |
LastTimeStamp = lastTimestamp; | |
Bids = new List<Level>(); | |
Asks = new List<Level>(); | |
} | |
public static OrderBook ParseSnapshot(string symbol, dynamic messageObject) | |
{ | |
OrderBook orderBook = new OrderBook(symbol, Convert.ToInt64(messageObject.data[0].ts)); | |
foreach (var ask in messageObject.data[0].asks) | |
{ | |
orderBook.Asks.Add(new Level | |
{ | |
Price = float.Parse(ask[0].ToString()), | |
Amount = float.Parse(ask[1].ToString()), | |
NumOrders = int.Parse(ask[3].ToString()) | |
}); | |
} | |
foreach (var bid in messageObject.data[0].bids) | |
{ | |
orderBook.Bids.Add(new Level | |
{ | |
Price = float.Parse(bid[0].ToString()), | |
Amount = float.Parse(bid[1].ToString()), | |
NumOrders = int.Parse(bid[3].ToString()) | |
}); | |
} | |
return orderBook; | |
} | |
public float GetMidPrice() | |
{ | |
if (Bids.Count > 0 && Asks.Count > 0) | |
{ | |
float bestBid = Bids[0].Price; | |
float bestAsk = Asks[0].Price; | |
return (bestBid + bestAsk) / 2; | |
} | |
return 0; | |
} | |
public void UpdateBook(OrderBookUpdate update) | |
{ | |
UpdateLevels(Bids, update.Bids); | |
UpdateLevels(Asks, update.Asks); | |
} | |
private void UpdateLevels(List<Level> currentLevels, List<Level> newLevels) | |
{ | |
foreach (var newLevel in newLevels) | |
{ | |
var existingLevel = currentLevels.Find(l => l.Price == newLevel.Price); | |
if (existingLevel != null) | |
{ | |
if (newLevel.Amount == 0) | |
{ | |
currentLevels.Remove(existingLevel); | |
} | |
else | |
{ | |
existingLevel.Amount = newLevel.Amount; | |
existingLevel.NumOrders = newLevel.NumOrders; | |
} | |
} | |
else | |
{ | |
currentLevels.Add(newLevel); | |
} | |
} | |
currentLevels.Sort((a, b) => a.Price.CompareTo(b.Price)); | |
} | |
} | |
class Program | |
{ | |
static void Log(string message) | |
{ | |
string utcDateTime = DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss"); | |
Console.WriteLine($"{utcDateTime} {message}"); | |
} | |
static async Task Main(string[] args) | |
{ | |
string baseUrl = "wss://ws.okx.com:8443/ws/v5/public"; | |
string symbol = "BTC-USDT-SWAP"; | |
string subscribeBooksRequest = "{\"op\":\"subscribe\",\"args\":[{\"channel\":\"books\",\"instId\":\"" + symbol + "\"}]}"; | |
string subscribeBooks5Request = "{\"op\":\"subscribe\",\"args\":[{\"channel\":\"books5\",\"instId\":\"" + symbol + "\"}]}"; | |
using (ClientWebSocket clientWebSocket = new ClientWebSocket()) | |
{ | |
try | |
{ | |
await clientWebSocket.ConnectAsync(new Uri(baseUrl), CancellationToken.None); | |
await SendMessage(clientWebSocket, subscribeBooksRequest); | |
await ReceiveMessage(clientWebSocket); | |
await SendMessage(clientWebSocket, subscribeBooks5Request); | |
OrderBook orderBook = null; | |
// You allocate byte[] pinned in memory only once. | |
byte[] orderBookBuffer = AllocateMemory(symbol); | |
while (clientWebSocket.State == WebSocketState.Open) | |
{ | |
string receivedMessage = await ReceiveMessage(clientWebSocket); | |
/* | |
SerializeOrderBook will use BitConverter.GetBytes and Buffer.BlockCopy to serialize 'originalOrderBook' to a byte[] pinned in memory. | |
DeserializeOrderBook will recovert from bytes[] pinned in memory, back to an OrderBook object. | |
*/ | |
unsafe | |
{ | |
dynamic messageObject = JsonConvert.DeserializeObject(receivedMessage); | |
if (messageObject.action == "snapshot") | |
{ | |
OrderBook originalOrderBook = OrderBook.ParseSnapshot(symbol, messageObject); | |
SerializeOrderBook(symbol, originalOrderBook, orderBookBuffer); | |
fixed (byte* ptr = orderBookBuffer) | |
{ | |
orderBook = DeserializeOrderBook(symbol, ptr, orderBookBuffer.Length, originalOrderBook.Bids.Count, originalOrderBook.Asks.Count); | |
} | |
} | |
else if (messageObject.action == "update") | |
{ | |
if (orderBook != null) | |
{ | |
fixed (byte* ptr = orderBookBuffer) | |
{ | |
orderBook = DeserializeOrderBook(symbol, ptr, orderBookBuffer.Length, orderBook.Bids.Count, orderBook.Asks.Count); | |
} | |
OrderBookUpdate update = OrderBookUpdate.ParseUpdate(symbol, messageObject); | |
orderBook.LastTimeStamp = update.Timestamp; | |
orderBook.UpdateBook(update); | |
SerializeOrderBook(symbol, orderBook, orderBookBuffer); | |
} | |
} | |
if (orderBook != null) | |
{ | |
Log($"{symbol} mid: {orderBook.GetMidPrice()}"); | |
} | |
} | |
} | |
} | |
catch (Exception ex) | |
{ | |
Log($"An error occurred: {ex.Message} {ex.StackTrace}"); | |
} | |
} | |
} | |
static async Task SendMessage(ClientWebSocket clientWebSocket, string message) | |
{ | |
byte[] bytes = Encoding.UTF8.GetBytes(message); | |
await clientWebSocket.SendAsync(new ArraySegment<byte>(bytes), WebSocketMessageType.Text, true, CancellationToken.None); | |
} | |
static async Task<string> ReceiveMessage(ClientWebSocket clientWebSocket) | |
{ | |
byte[] buffer = new byte[1024]; | |
WebSocketReceiveResult result; | |
StringBuilder message = new StringBuilder(); | |
do | |
{ | |
result = await clientWebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None); | |
message.Append(Encoding.UTF8.GetString(buffer, 0, result.Count)); | |
} | |
while (!result.EndOfMessage); | |
return message.ToString(); | |
} | |
static byte[] AllocateMemory(string symbol) { | |
const int MAX_BID_COUNT = 1000; | |
const int MAX_ASK_COUNT = 1000; | |
int size = symbol.Length * 2 + // String: 2 bytes per char | |
sizeof(long) + // LastTimeStamp (8 bytes) | |
sizeof(float) * 3 * MAX_BID_COUNT + // Bids: Price, Amount, NumOrders | |
sizeof(float) * 3 * MAX_ASK_COUNT; // Asks: Price, Amount, NumOrders | |
byte[] pinnedMemory = new byte[size]; | |
return pinnedMemory; | |
} | |
static void SerializeOrderBook(string symbol, OrderBook orderBook, byte[] bytes) | |
{ | |
int offset = 0; | |
byte[] symbolBytes = Encoding.UTF8.GetBytes(orderBook.Symbol); | |
Buffer.BlockCopy(symbolBytes, 0, bytes, offset, symbolBytes.Length); | |
offset += symbolBytes.Length; | |
Buffer.BlockCopy(BitConverter.GetBytes(orderBook.LastTimeStamp), 0, bytes, offset, sizeof(long)); | |
offset += sizeof(long); | |
// Serialize Bids | |
Buffer.BlockCopy(BitConverter.GetBytes(orderBook.Bids.Count), 0, bytes, offset, sizeof(int)); | |
offset += sizeof(int); | |
foreach (var bid in orderBook.Bids) | |
{ | |
Buffer.BlockCopy(BitConverter.GetBytes(bid.Price), 0, bytes, offset, sizeof(float)); | |
offset += sizeof(float); | |
Buffer.BlockCopy(BitConverter.GetBytes(bid.Amount), 0, bytes, offset, sizeof(float)); | |
offset += sizeof(float); | |
Buffer.BlockCopy(BitConverter.GetBytes(bid.NumOrders), 0, bytes, offset, sizeof(int)); | |
offset += sizeof(int); | |
} | |
// Serialize Asks | |
Buffer.BlockCopy(BitConverter.GetBytes(orderBook.Asks.Count), 0, bytes, offset, sizeof(int)); | |
offset += sizeof(int); | |
foreach (var ask in orderBook.Asks) | |
{ | |
Buffer.BlockCopy(BitConverter.GetBytes(ask.Price), 0, bytes, offset, sizeof(float)); | |
offset += sizeof(float); | |
Buffer.BlockCopy(BitConverter.GetBytes(ask.Amount), 0, bytes, offset, sizeof(float)); | |
offset += sizeof(float); | |
Buffer.BlockCopy(BitConverter.GetBytes(ask.NumOrders), 0, bytes, offset, sizeof(int)); | |
offset += sizeof(int); | |
} | |
} | |
static unsafe OrderBook DeserializeOrderBook(string symbol, byte* ptr, int length, int numBids, int numAsks) | |
{ | |
int offset = symbol.Length; | |
long timestamp = BitConverter.ToInt64(new ReadOnlySpan<byte>(ptr+offset, sizeof(long))); | |
OrderBook orderBook = new OrderBook(symbol, timestamp); | |
offset += sizeof(long); | |
for (int i = 0; i<numBids; i++) | |
{ | |
float price = BitConverter.ToSingle(new ReadOnlySpan<byte>(ptr + offset, sizeof(float))); | |
offset += sizeof(float); | |
float amount = BitConverter.ToSingle(new ReadOnlySpan<byte>(ptr + offset, sizeof(float))); | |
offset += sizeof(float); | |
int numOrders = BitConverter.ToInt32(new ReadOnlySpan<byte>(ptr + offset, sizeof(int))); | |
offset += sizeof(int); | |
orderBook.Bids.Add(new Level { Price = price, Amount = amount, NumOrders = numOrders }); | |
} | |
for (int i = 0; i<numAsks; i++) | |
{ | |
float price = BitConverter.ToSingle(new ReadOnlySpan<byte>(ptr + offset, sizeof(float))); | |
offset += sizeof(float); | |
float amount = BitConverter.ToSingle(new ReadOnlySpan<byte>(ptr + offset, sizeof(float))); | |
offset += sizeof(float); | |
int numOrders = BitConverter.ToInt32(new ReadOnlySpan<byte>(ptr + offset, sizeof(int))); | |
offset += sizeof(int); | |
orderBook.Asks.Add(new Level { Price = price, Amount = amount, NumOrders = numOrders }); | |
} | |
return orderBook; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment