Created
January 26, 2010 16:59
-
-
Save cbilson/286993 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(* | |
Very ugly, very rough, very F# 101 <- my attempt to implement | |
STOMP [http://stomp.codehaus.org/Protocol]. | |
This is the first time I've really tried using active patterns and | |
absolutely the first time I've used MailboxProcessors. | |
There are a couple of concepts I am still trying to work out: | |
1) What exactly happens with AsyncResponse's in mailboxes...I think I | |
am doing it wrong. Calling StompProcessor.Process below is essentially | |
synchronous. It would be nice if reading frames from the socket and | |
sending responses were totally decoupled. | |
2) Some of the active patterns below are just for matching and the don't | |
actually extract anything useful from the match (see: |Connect|). When | |
using these kinds of patterns, I just match on Some(_) and it feels kind | |
of wrong. Not sure if I am doing this right. | |
3) Not sure how I am going to turn bytes from a socket into Messages. This | |
seems like kind of a parsing problem, so I am going to read up on parser | |
combinators and see if that can be applied here. | |
*) | |
open System | |
open System.IO | |
open System.Net | |
open System.Net.Sockets | |
type Headers = Map<string,string> | |
let emptyHeaders = Map.empty<string,string> | |
// everyone can login for now | |
let canLogin login password = true | |
type Message = { | |
Command : string | |
Headers : Headers | |
Body : byte array option | |
} | |
// The STOMP protocol doesn't really say anything about sessions (other than | |
// mentioning that it's a header servers can send back in response to CONNECT | |
// messages,) but it seems like a useful concept if I ever try to implement a | |
// more complex protocol. | |
type Session = { | |
SessionId : string | |
} | |
// I had "Guid option" here but it seemed kind of messy in the matching below... | |
// ...but this actually seems messier. | |
let emptySession = { SessionId = Guid.Empty.ToString() } | |
// | |
// Patterns for various commands in STOMP frames | |
// | |
// Messages that are acceptable when waiting to connect | |
let (|ConnectingCommand|_|) request = | |
match request with | |
| { Command = "CONNECT"} -> Some(request) | |
| _ -> None | |
// Messages that are acceptable when connected | |
let (|ConnectedCommand|_|) request = | |
match request with | |
| { Command = "SEND"} | |
| { Command = "SUBSCRIBE"} | |
| { Command = "UNSUBSCRIBE"} | |
| { Command = "BEGIN"} | |
| { Command = "COMMIT"} | |
| { Command = "ABORT"} | |
| { Command = "ACK"} | |
| { Command = "DISCONNECT"} -> Some(request) | |
| _ -> None | |
// I had different patterns for different kinds of CONNECT messages (with login, | |
// without ...) but got rid of them in favor of putting optional things in the | |
// match. I like this better and should make the other active patterns like this. | |
let (|Connect|_|) request = | |
match request with | |
| { Command = "CONNECT" } | |
-> Some(request.Headers.TryFind("login"), request.Headers.TryFind("passcode")) | |
| _ -> None | |
// Should just have one send with optional headers | |
let (|SendWithTransactionAndReceipt|_|) request = | |
match request with | |
| { Command = "SEND"; Body = Some(body) } | |
when request.Headers.ContainsKey("receipt") | |
&& request.Headers.ContainsKey("transaction") | |
&& request.Headers.ContainsKey("destination") | |
-> Some(request.Headers.["destination"], | |
request.Headers.["transaction"], | |
request.Headers.["receipt"], | |
body) | |
| _ -> None | |
let (|SendWithReceipt|_|) request = | |
match request with | |
| { Command = "SEND"; Body = Some(body) } | |
when request.Headers.ContainsKey("receipt") | |
&& request.Headers.ContainsKey("destination") | |
-> Some(request.Headers.["destination"], | |
request.Headers.["receipt"], | |
body) | |
| _ -> None | |
let (|Send|_|) request = | |
match request with | |
| { Command = "SEND"; Body = Some(body) } | |
when request.Headers.ContainsKey("destination") | |
-> Some(request.Headers.["destination"], body) | |
| _ -> None | |
// This is another way I tried to deal with optional headers: Make defaults. | |
// In hindsight, I think it would be better to put this in the match below, like: | |
// match message with | |
// | Subscribe(destination, Some(ack), Some(selector), Some(id)) -> ... | |
// | Subscribe(...) -> ... | |
// | Subscribe(destination) -> ... | |
let (|Subscribe|_|) request = | |
match request with | |
| { Command = "SUBSCRIBE" } | |
when request.Headers.ContainsKey("destination") | |
&& request.Headers.ContainsKey("ack") | |
-> Some(request.Headers.["destination"], | |
request.Headers.["ack"], | |
request.Headers.TryFind("selector"), | |
request.Headers.TryFind("id")) | |
| { Command = "SUBSCRIBE" } | |
when request.Headers.ContainsKey("destination") | |
-> Some(request.Headers.["destination"], | |
"auto", | |
request.Headers.TryFind("selector"), | |
request.Headers.TryFind("id")) | |
| _ -> None | |
// this is a good example of 2) above: Some(_) is what the user of the pattern | |
// wants...why do I have to have the request here? What do real functional | |
// programmers do? | |
let (|Disconnect|_|) request = | |
match request with | |
| { Command = "DISCONNECT" } -> Some(request) | |
| _ -> None | |
// | |
// The processor itself: | |
// There are 2 states: connecting and connected | |
// | |
type StompProcessor () = | |
let stomp = MailboxProcessor.Start(fun inbox -> | |
// connecting state: we are waiting for a CONNECT message, where we'll | |
// respond with a CONNECTED and a session-id, then transition to | |
// connected state. | |
let rec connecting () = | |
async { | |
let! (msg, (rc:AsyncReplyChannel<Message option>)) = inbox.Receive() | |
match msg with | |
| Connect(Some(login), Some(passcode)) | |
when canLogin login passcode | |
-> let sessionid = Guid.NewGuid().ToString() | |
rc.Reply(Some({ Command = "CONNECTED"; | |
Headers = Map.ofArray <| [|("session", sessionid)|]; | |
Body = None })) | |
return! connected { SessionId = sessionid } | |
| Connect(_) | |
-> let sessionid = Guid.NewGuid().ToString() | |
rc.Reply(Some({ Command = "CONNECTED"; | |
Headers = Map.ofArray <| [|("session", sessionid)|]; | |
Body = None })) | |
return! connected { SessionId = sessionid } | |
// I think I should just have a helper function for these errors... | |
| ConnectedCommand(_) | |
-> rc.Reply(Some({ Command = "ERROR"; | |
Headers = Map.ofArray [|("message", "You're not connected. Send CONNECT first.")|]; | |
Body = None })) | |
return! connecting () | |
| _ -> rc.Reply(Some({ Command = "ERROR"; | |
Headers = Map.ofArray [|("message", "unrecognized message")|]; | |
Body = None })) | |
return! connecting () | |
} | |
// connected state, do whatever we need to respond to messages and wait | |
// for a DISCONNECT, at which point we'd leave? | |
and connected session = | |
async { | |
let! (msg, (rc:AsyncReplyChannel<Message option>)) = inbox.Receive() | |
match msg with | |
| SendWithTransactionAndReceipt(destination, transaction, receipt, body) | |
-> // doesn't actually send anything yet... | |
printfn "Hardcore sending action to %s..." destination | |
rc.Reply(Some({ Command = "RECEIPT"; | |
Headers = Map.ofArray [|("receipt-id", receipt); | |
("session-id", session.SessionId)|]; | |
Body = None })); | |
return! connected session | |
| SendWithReceipt(destination, receipt, body) | |
-> printfn "Hardcore sending action to %s..." destination | |
rc.Reply(Some({ Command = "RECEIPT"; | |
Headers = Map.ofArray [|("receipt-id", receipt); | |
("session-id", session.SessionId)|]; | |
Body = None })); | |
return! connected session | |
| Send(destination, body) | |
-> printfn "Hardcore sending action to %s..." destination | |
rc.Reply(None) | |
return! connected session | |
| Subscribe(destination, ack, selector, id) | |
-> // would add subscription to a map or something in a real implementation | |
printfn "Subscribed to %s" destination | |
rc.Reply(None) | |
return! connected session | |
| ConnectingCommand(_) | |
-> rc.Reply(Some({ Command = "ERROR"; | |
Headers = Map.ofArray [|("message", "You're already connected.")|]; | |
Body = None })) | |
return! connected session | |
| Disconnect(_) | |
-> // how do I get out of here? | |
return! connected session | |
| _ -> rc.Reply(Some({ Command = "ERROR"; | |
Headers = Map.ofArray [|("message", "unrecognized message")|]; | |
Body = None })) | |
return! connected session | |
} | |
connecting ()) | |
member this.Process r = stomp.PostAndReply(fun ch -> (r, ch)) | |
// | |
// Next...implement socket -> Message | |
// |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment