Created
June 5, 2019 01:28
-
-
Save imetallica/cbb37db232a0a7275d490b9d1b92b56f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace Macaw.Server.Internal | |
open System | |
open System.Threading | |
open System.Collections.Generic | |
open System.Net.Sockets | |
open System.Net | |
module SocketConnectionListener2 = | |
let internal createSocket (endpoint : IPEndPoint) = | |
new Socket(endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp) | |
let internal toIList<'T> (data : 'T array) = | |
let segment = new System.ArraySegment<'T>(data) | |
let data = new List<System.ArraySegment<'T>>() :> IList<System.ArraySegment<'T>> | |
data.Add(segment) | |
data | |
let processReceivePayload payload = | |
let bytes = payload |> Seq.concat |> Seq.toArray | |
System.Text.Encoding.UTF8.GetString(bytes) | |
let processSendPayload (payload : string) = | |
payload |> System.Text.Encoding.UTF8.GetBytes |> toIList | |
let bind (endpoint : IPEndPoint) (socket : Socket) = | |
printfn "Binding to %A:%A" endpoint.Address endpoint.Port | |
socket.Bind(endpoint) | |
let listen (backlog : int) (socket : Socket) = | |
socket.Listen(backlog) | |
let accept (socket : Socket) = async { | |
return! socket.AcceptAsync(socket) |> Async.AwaitTask | |
} | |
let rec receive (payload : IList<ArraySegment<byte>>) (socket : Socket) = async { | |
let! length = socket.ReceiveAsync(payload, SocketFlags()) |> Async.AwaitTask | |
if length > 0 then | |
return! receive payload socket | |
else | |
return processReceivePayload payload | |
} | |
let rec send (payload : string) (socket : Socket) = async { | |
let processedPayload = processSendPayload payload | |
let! result = socket.SendAsync(processedPayload, SocketFlags()) |> Async.AwaitTask | |
return () | |
} | |
let start (concurrentConnections : int) endpoint = | |
async { | |
use socket = createSocket endpoint | |
printfn "Socket created" | |
do bind endpoint socket | |
printfn "Socket binded" | |
do listen 512 socket | |
printfn "Socket listening" | |
let concurrentSocketPool = seq { for _ in 1 .. concurrentConnections -> async { | |
let buffer = Array.zeroCreate<byte>(512) |> toIList | |
let rec loop _a = async { | |
let! acceptSocket = accept socket | |
let! result = receive buffer socket | |
let! _ = send (sprintf "Echo: %A" result) socket | |
return! loop _a | |
} | |
do! loop () | |
return () | |
} } | |
return! Async.Parallel concurrentSocketPool | |
} |> Async.RunSynchronously | |
module SocketConnectionListener = | |
let internal createSocket (endpoint : IPEndPoint) = | |
new Socket(endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp) | |
let internal toIList<'T> (data : 'T array) = | |
let segment = new System.ArraySegment<'T>(data) | |
let data = new List<System.ArraySegment<'T>>() :> IList<System.ArraySegment<'T>> | |
data.Add(segment) | |
data | |
let processReceivePayload payload = | |
let bytes = payload |> Seq.concat |> Seq.toArray | |
System.Text.Encoding.UTF8.GetString(bytes) | |
let processSendPayload (payload : string) = | |
payload |> System.Text.Encoding.UTF8.GetBytes |> toIList | |
let bind (endpoint : IPEndPoint) (socket : Socket) = | |
printfn "Binding to %A:%A" endpoint.Address endpoint.Port | |
socket.Bind(endpoint) | |
let listen (backlog : int) (socket : Socket) = | |
socket.Listen(backlog) | |
let accept (socket : Socket) = | |
Async.FromBeginEnd(socket.BeginAccept, socket.EndAccept) | |
let rec receive (payload : IList<ArraySegment<byte>>) (socket : Socket) = async { | |
let! length = Async.FromBeginEnd( | |
payload, | |
SocketFlags(), | |
(fun (payload, flags, callback, state) -> socket.BeginReceive(payload, flags, callback, state)), | |
socket.EndReceive | |
) | |
if length > 0 then | |
return! receive payload socket | |
else | |
return processReceivePayload payload | |
} | |
let send (payload : string) (socket : Socket) = async { | |
let decodedPayload = processSendPayload payload | |
Async.FromBeginEnd( | |
decodedPayload, | |
SocketFlags(), | |
(fun (payload, flags, callback, state) -> socket.BeginSend(payload, flags, callback, state)), | |
socket.EndSend | |
) | |
} | |
let start (concurrentConnections : int) endpoint = | |
async { | |
use socket = createSocket endpoint | |
do bind endpoint socket | |
do listen 512 socket | |
let rec loop p = async { | |
let buffer = Array.zeroCreate<byte>(8_000) |> toIList | |
let! acceptedSocket = accept socket | |
let! result = receive buffer acceptedSocket | |
printfn "[Thread: %A] is [Threadpool: %A] Received: %A" Thread.CurrentThread.ManagedThreadId Thread.CurrentThread.IsThreadPoolThread result | |
do! send "Hello World!" acceptedSocket | |
return! loop p | |
} | |
let receivers = seq { for _ in 1 .. concurrentConnections -> async { | |
do! loop () | |
return 0 | |
}} | |
return! Async.Parallel receivers | |
} |> Async.RunSynchronously |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment