Skip to content

Instantly share code, notes, and snippets.

@cbilson
Created January 26, 2010 16:59
Show Gist options
  • Save cbilson/286993 to your computer and use it in GitHub Desktop.
Save cbilson/286993 to your computer and use it in GitHub Desktop.
(*
Very ugly, very rough, very F# 101 <- my attempt to implement
STOMP [http://stomp.codehaus.org/Protocol].
This is the first time I've really tried using active patterns and
absolutely the first time I've used MailboxProcessors.
There are a couple of concepts I am still trying to work out:
1) What exactly happens with AsyncResponse's in mailboxes...I think I
am doing it wrong. Calling StompProcessor.Process below is essentially
synchronous. It would be nice if reading frames from the socket and
sending responses were totally decoupled.
2) Some of the active patterns below are just for matching and the don't
actually extract anything useful from the match (see: |Connect|). When
using these kinds of patterns, I just match on Some(_) and it feels kind
of wrong. Not sure if I am doing this right.
3) Not sure how I am going to turn bytes from a socket into Messages. This
seems like kind of a parsing problem, so I am going to read up on parser
combinators and see if that can be applied here.
*)
open System
open System.IO
open System.Net
open System.Net.Sockets
type Headers = Map<string,string>
let emptyHeaders = Map.empty<string,string>
// everyone can login for now
let canLogin login password = true
type Message = {
Command : string
Headers : Headers
Body : byte array option
}
// The STOMP protocol doesn't really say anything about sessions (other than
// mentioning that it's a header servers can send back in response to CONNECT
// messages,) but it seems like a useful concept if I ever try to implement a
// more complex protocol.
type Session = {
SessionId : string
}
// I had "Guid option" here but it seemed kind of messy in the matching below...
// ...but this actually seems messier.
let emptySession = { SessionId = Guid.Empty.ToString() }
//
// Patterns for various commands in STOMP frames
//
// Messages that are acceptable when waiting to connect
let (|ConnectingCommand|_|) request =
match request with
| { Command = "CONNECT"} -> Some(request)
| _ -> None
// Messages that are acceptable when connected
let (|ConnectedCommand|_|) request =
match request with
| { Command = "SEND"}
| { Command = "SUBSCRIBE"}
| { Command = "UNSUBSCRIBE"}
| { Command = "BEGIN"}
| { Command = "COMMIT"}
| { Command = "ABORT"}
| { Command = "ACK"}
| { Command = "DISCONNECT"} -> Some(request)
| _ -> None
// I had different patterns for different kinds of CONNECT messages (with login,
// without ...) but got rid of them in favor of putting optional things in the
// match. I like this better and should make the other active patterns like this.
let (|Connect|_|) request =
match request with
| { Command = "CONNECT" }
-> Some(request.Headers.TryFind("login"), request.Headers.TryFind("passcode"))
| _ -> None
// Should just have one send with optional headers
let (|SendWithTransactionAndReceipt|_|) request =
match request with
| { Command = "SEND"; Body = Some(body) }
when request.Headers.ContainsKey("receipt")
&& request.Headers.ContainsKey("transaction")
&& request.Headers.ContainsKey("destination")
-> Some(request.Headers.["destination"],
request.Headers.["transaction"],
request.Headers.["receipt"],
body)
| _ -> None
let (|SendWithReceipt|_|) request =
match request with
| { Command = "SEND"; Body = Some(body) }
when request.Headers.ContainsKey("receipt")
&& request.Headers.ContainsKey("destination")
-> Some(request.Headers.["destination"],
request.Headers.["receipt"],
body)
| _ -> None
let (|Send|_|) request =
match request with
| { Command = "SEND"; Body = Some(body) }
when request.Headers.ContainsKey("destination")
-> Some(request.Headers.["destination"], body)
| _ -> None
// This is another way I tried to deal with optional headers: Make defaults.
// In hindsight, I think it would be better to put this in the match below, like:
// match message with
// | Subscribe(destination, Some(ack), Some(selector), Some(id)) -> ...
// | Subscribe(...) -> ...
// | Subscribe(destination) -> ...
let (|Subscribe|_|) request =
match request with
| { Command = "SUBSCRIBE" }
when request.Headers.ContainsKey("destination")
&& request.Headers.ContainsKey("ack")
-> Some(request.Headers.["destination"],
request.Headers.["ack"],
request.Headers.TryFind("selector"),
request.Headers.TryFind("id"))
| { Command = "SUBSCRIBE" }
when request.Headers.ContainsKey("destination")
-> Some(request.Headers.["destination"],
"auto",
request.Headers.TryFind("selector"),
request.Headers.TryFind("id"))
| _ -> None
// this is a good example of 2) above: Some(_) is what the user of the pattern
// wants...why do I have to have the request here? What do real functional
// programmers do?
let (|Disconnect|_|) request =
match request with
| { Command = "DISCONNECT" } -> Some(request)
| _ -> None
//
// The processor itself:
// There are 2 states: connecting and connected
//
type StompProcessor () =
let stomp = MailboxProcessor.Start(fun inbox ->
// connecting state: we are waiting for a CONNECT message, where we'll
// respond with a CONNECTED and a session-id, then transition to
// connected state.
let rec connecting () =
async {
let! (msg, (rc:AsyncReplyChannel<Message option>)) = inbox.Receive()
match msg with
| Connect(Some(login), Some(passcode))
when canLogin login passcode
-> let sessionid = Guid.NewGuid().ToString()
rc.Reply(Some({ Command = "CONNECTED";
Headers = Map.ofArray <| [|("session", sessionid)|];
Body = None }))
return! connected { SessionId = sessionid }
| Connect(_)
-> let sessionid = Guid.NewGuid().ToString()
rc.Reply(Some({ Command = "CONNECTED";
Headers = Map.ofArray <| [|("session", sessionid)|];
Body = None }))
return! connected { SessionId = sessionid }
// I think I should just have a helper function for these errors...
| ConnectedCommand(_)
-> rc.Reply(Some({ Command = "ERROR";
Headers = Map.ofArray [|("message", "You're not connected. Send CONNECT first.")|];
Body = None }))
return! connecting ()
| _ -> rc.Reply(Some({ Command = "ERROR";
Headers = Map.ofArray [|("message", "unrecognized message")|];
Body = None }))
return! connecting ()
}
// connected state, do whatever we need to respond to messages and wait
// for a DISCONNECT, at which point we'd leave?
and connected session =
async {
let! (msg, (rc:AsyncReplyChannel<Message option>)) = inbox.Receive()
match msg with
| SendWithTransactionAndReceipt(destination, transaction, receipt, body)
-> // doesn't actually send anything yet...
printfn "Hardcore sending action to %s..." destination
rc.Reply(Some({ Command = "RECEIPT";
Headers = Map.ofArray [|("receipt-id", receipt);
("session-id", session.SessionId)|];
Body = None }));
return! connected session
| SendWithReceipt(destination, receipt, body)
-> printfn "Hardcore sending action to %s..." destination
rc.Reply(Some({ Command = "RECEIPT";
Headers = Map.ofArray [|("receipt-id", receipt);
("session-id", session.SessionId)|];
Body = None }));
return! connected session
| Send(destination, body)
-> printfn "Hardcore sending action to %s..." destination
rc.Reply(None)
return! connected session
| Subscribe(destination, ack, selector, id)
-> // would add subscription to a map or something in a real implementation
printfn "Subscribed to %s" destination
rc.Reply(None)
return! connected session
| ConnectingCommand(_)
-> rc.Reply(Some({ Command = "ERROR";
Headers = Map.ofArray [|("message", "You're already connected.")|];
Body = None }))
return! connected session
| Disconnect(_)
-> // how do I get out of here?
return! connected session
| _ -> rc.Reply(Some({ Command = "ERROR";
Headers = Map.ofArray [|("message", "unrecognized message")|];
Body = None }))
return! connected session
}
connecting ())
member this.Process r = stomp.PostAndReply(fun ch -> (r, ch))
//
// Next...implement socket -> Message
//
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment