Created
August 31, 2020 19:41
-
-
Save byt3bl33d3r/266cf541b34d73624fd19c085e3220d3 to your computer and use it in GitHub Desktop.
Async websocket C# client (producer/consumer pattern)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Requires reference to System.Web.Extensions | |
*/ | |
using System; | |
using System.Collections.Concurrent; | |
using System.Web.Script.Serialization; | |
using System.Text; | |
using System.Net.WebSockets; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace WebSocketsTest | |
{ | |
class Program | |
{ | |
public static BlockingCollection<string> recvQueue = new BlockingCollection<string>(); | |
public static BlockingCollection<string> sendQueue = new BlockingCollection<string>(); | |
public static Guid ClientId = Guid.NewGuid(); | |
static async Task Main(string[] args) | |
{ | |
Uri serverUrl = new Uri(new Uri(args[0]), ClientId.ToString()); | |
CancellationToken token = new CancellationToken(); | |
using (ClientWebSocket ws = new ClientWebSocket()) | |
{ | |
Console.WriteLine("Connecting to {0}", serverUrl.ToString()); | |
await ws.ConnectAsync(serverUrl, token); | |
try | |
{ | |
var sendTask = Task.Run(() => Send(ws, token)); | |
var recvTask = Task.Run(() => Recv(ws, token)); | |
do | |
{ | |
string recvMsg = recvQueue.Take(); | |
Console.WriteLine("Processed message: {0}", recvMsg); | |
var sendMsg = new Message | |
{ | |
ClientId = ClientId.ToString(), | |
Payload = "Testing" | |
}; | |
var json = new JavaScriptSerializer().Serialize(sendMsg); | |
sendQueue.Add(json); | |
} while (ws.State == WebSocketState.Open); | |
} | |
finally | |
{ | |
Console.WriteLine("Closing connection..."); | |
await ws.CloseAsync(WebSocketCloseStatus.Empty, "", token); | |
} | |
} | |
} | |
async static Task Recv(ClientWebSocket ws, CancellationToken token) | |
{ | |
Console.WriteLine("Recv task started..."); | |
var buffer = WebSocket.CreateClientBuffer(1024, 1024); | |
WebSocketReceiveResult taskResult; | |
while (ws.State == WebSocketState.Open) | |
{ | |
string jsonResult = ""; | |
do | |
{ | |
taskResult = await ws.ReceiveAsync(buffer, token); | |
jsonResult += Encoding.UTF8.GetString(buffer.Array, 0, taskResult.Count); | |
} while (!taskResult.EndOfMessage); | |
if (!string.IsNullOrEmpty(jsonResult)) | |
{ | |
Console.WriteLine("Queueing {0}", jsonResult); | |
recvQueue.Add(jsonResult); | |
} | |
} | |
Console.WriteLine("Recv task exiting..."); | |
} | |
async static Task Send(ClientWebSocket ws, CancellationToken token) | |
{ | |
Console.WriteLine("Send task started..."); | |
do | |
{ | |
string sendMsg = sendQueue.Take(); | |
Console.WriteLine("Sending {0}", sendMsg); | |
var sendMsgBytes = Encoding.UTF8.GetBytes(sendMsg); | |
ArraySegment<byte> segmentBuffer = new ArraySegment<byte>(sendMsgBytes, 0, sendMsgBytes.Length); | |
/* | |
while ((segmentBuffer = new ArraySegment<byte>(sendMsgBytes, segmentBuffer.Offset, 1024)).Count > 1024) | |
{ | |
await ws.SendAsync(segmentBuffer, WebSocketMessageType.Binary, false, token); | |
} | |
*/ | |
await ws.SendAsync(segmentBuffer, WebSocketMessageType.Binary, true, token); | |
} while (ws.State == WebSocketState.Open); | |
Console.WriteLine("Send task exiting..."); | |
} | |
} | |
class Message | |
{ | |
public string ClientId; | |
public string Payload; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment