Last active
January 9, 2017 18:58
-
-
Save AndrewNewcomb/711664 to your computer and use it in GitHub Desktop.
OLD from 2010 ... Updated example of using F# MailboxProcessor against an HTML5 WebSocket (in Google Chrome)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Example of using F# MailboxProcessor against an HTML5 WebSocket (in Google Chrome) | |
// taken from http://v2matveev.blogspot.com/2010/04/mailboxprocessors-practical-application.html | |
// and then modified to work with the revised WebSocket protocol that includes a set of challenge bytes. | |
// The main changes are in the handshake function. | |
// Have a look at the http://nugget.codeplex.com for example WebSocket code in C#, on which I based the | |
// challenge processing code. | |
open System | |
open System.IO | |
open System.Linq | |
open System.Net | |
open System.Net.Sockets | |
open System.Text | |
open System.Threading | |
open System.Runtime.Serialization | |
[<DataContract>] | |
type Time = | |
{ [<DataMember(Name = "hour")>] mutable Hour : int | |
[<DataMember(Name = "minute")>] mutable Minute : int | |
[<DataMember(Name = "second")>] mutable Second : int } | |
static member New(dt : DateTime) = {Hour = dt.Hour; Minute = dt.Minute; Second = dt.Second} | |
type Msg = | |
| Connect of MailboxProcessor<Time> | |
| Disconnect of MailboxProcessor<Time> | |
| Tick of Time | |
let port = 1900 | |
let ipAddress = IPAddress.Loopback.ToString() | |
let origin = "http://localhost" | |
let startMailboxProcessor ct f = MailboxProcessor.Start(f, cancellationToken = ct) | |
let timer (ctrl : MailboxProcessor<Msg>) interval = async { | |
while true do | |
do! Async.Sleep interval | |
ctrl.Post(Tick <| Time.New(DateTime.Now)) | |
} | |
let runController (ct : CancellationToken) = | |
startMailboxProcessor ct (fun (inbox : MailboxProcessor<Msg>) -> | |
let listeners = new ResizeArray<_>() | |
async { | |
while not ct.IsCancellationRequested do | |
let! msg = inbox.Receive() | |
match msg with | |
| Connect l -> | |
Console.WriteLine "Connect" | |
listeners.Add(l) | |
| Disconnect l -> | |
Console.WriteLine "Disconnect" | |
listeners.Remove(l) |> ignore | |
| Tick msg -> listeners.ForEach(fun l -> l.Post msg) | |
} | |
) | |
let runWorker (tcp : TcpClient) (ctrl : MailboxProcessor<Msg>) ct = | |
ignore <| startMailboxProcessor ct (fun (inbox : MailboxProcessor<Time>) -> | |
let rec handshake = async { | |
let ns = tcp.GetStream() | |
let bytes = Array.create tcp.ReceiveBufferSize (byte 0) | |
let bytesReadCount = ns.Read (bytes, 0, bytes.Length) | |
if bytesReadCount > 8 then | |
// expected format is several CRLF terminated key value pair strings | |
// followed by a CRLF | |
// followed by the eight challenge bytes | |
let headerBytes = bytes.[..(bytesReadCount-9)] | |
let challengeBytes = bytes.[(bytesReadCount-8)..(bytesReadCount-1)] | |
let headerString = System.Text.UTF8Encoding.UTF8.GetString headerBytes | |
let lines = headerString.Split([|"\r\n"|], StringSplitOptions.RemoveEmptyEntries) | |
match lines with | |
| [| "GET /timer HTTP/1.1"; "Upgrade: WebSocket"; "Connection: Upgrade"; _; _; _; _|] -> | |
// TODO : parse WebSocket-Origin and WebSocket-Location | |
let keyValue keyNo arr = (Array.find (fun (s:String) -> s.StartsWith("Sec-WebSocket-Key" + keyNo)) arr).Substring 20 | |
let key1 = lines |> keyValue "1" | |
let key2 = lines |> keyValue "2" | |
let generateKeyBytes (keyVal:string) = | |
let decodedKey = // tuple (count of spaces, concatenated digits) | |
Array.fold (fun acc elem -> | |
match elem with | |
| ' ' -> (1 + fst acc, snd acc) // increment space count | |
| c when System.Char.IsDigit c -> (fst acc, snd acc + c.ToString()) // append digit | |
| _ -> acc // no change | |
) (0, "") (keyVal.ToCharArray()) | |
let int32ToBigEndianBytes (i:int32) = | |
match BitConverter.GetBytes i with | |
| littleEndian when BitConverter.IsLittleEndian -> Array.rev littleEndian | |
| bigEndian -> bigEndian | |
int32ToBigEndianBytes (int32 ( (Int64.Parse (snd decodedKey)) / int64 (fst decodedKey) )) | |
let challengeResponseMD5 = | |
using (System.Security.Cryptography.MD5.Create()) (fun md5 -> | |
md5.ComputeHash(Array.concat [ generateKeyBytes key1; generateKeyBytes key2; challengeBytes ]) | |
) | |
let bytesMain = System.Text.Encoding.ASCII.GetBytes( | |
"HTTP/1.1 101 Web Socket Protocol Handshake\r\n" + | |
"Upgrade: WebSocket\r\n" + | |
"Connection: Upgrade\r\n" + | |
"Sec-WebSocket-Origin: " + origin + "\r\n" + | |
"Sec-WebSocket-Location: ws://" + ipAddress + ":" + port.ToString() + "/timer\r\n" + | |
"\r\n") | |
do! ns.AsyncWrite(Array.append bytesMain challengeResponseMD5) | |
return! run ns | |
| _ -> | |
//validation failed - close connection | |
tcp.Close() | |
else | |
//validation failed - close connection | |
tcp.Close() | |
} | |
and run (ns : NetworkStream) = async { | |
let json = System.Runtime.Serialization.Json.DataContractJsonSerializer(typeof<Time>) | |
ctrl.Post(Connect inbox) | |
try | |
while not ct.IsCancellationRequested do | |
let! time = inbox.Receive() | |
let ms = new MemoryStream() | |
json.WriteObject(ms, time) | |
do ns.WriteByte(byte 0x00) | |
do! ns.AsyncWrite(ms.ToArray()) | |
do ns.WriteByte(byte 0xFF) | |
ms.Dispose() | |
finally | |
ns.Close() | |
ctrl.Post(Disconnect inbox) | |
} | |
handshake | |
) | |
let runRequestDispatcher () = | |
let listener = new TcpListener(IPAddress.Parse(ipAddress), port) | |
let cts = new CancellationTokenSource() | |
let token = cts.Token | |
let controller = runController token | |
Async.Start (timer controller 1000, token) | |
let main = async { | |
try | |
listener.Start(10) | |
while not cts.IsCancellationRequested do | |
let! client = Async.FromBeginEnd(listener.BeginAcceptTcpClient, listener.EndAcceptTcpClient) | |
runWorker client controller token | |
finally | |
listener.Stop() | |
} | |
Async.Start(main, token) | |
{ new IDisposable with member x.Dispose() = cts.Cancel()} | |
let dispose = runRequestDispatcher () | |
printfn "press any key to stop..." | |
Console.ReadKey() |> ignore | |
dispose.Dispose() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment