Skip to content

Instantly share code, notes, and snippets.

@ccapndave
Created August 31, 2017 13:42
Show Gist options
  • Save ccapndave/f4703fa7fd09b861399e8564b9cfa0f9 to your computer and use it in GitHub Desktop.
Save ccapndave/f4703fa7fd09b861399e8564b9cfa0f9 to your computer and use it in GitHub Desktop.
effect module Component.Licensing.WebSocket where { command = MyCmd, subscription = MySub } exposing
( send
, listen
, listenWithDisconnect
, keepAlive
)
{-| Web sockets make it cheaper to talk to your servers.
Connecting to a server takes some time, so with web sockets, you make that
connection once and then keep using. The major benefits of this are:
1. It faster to send messages. No need to do a bunch of work for every single
message.
2. The server can push messages to you. With normal HTTP you would have to
keep *asking* for changes, but a web socket, the server can talk to you
whenever it wants. This means there is less unnecessary network traffic.
The API here attempts to cover the typical usage scenarios, but if you need
many unique connections to the same endpoint, you need a different library.
===
This is a fiddled version of the original elm-lang/WebSocket library. Specifically
it adds a subscription for detecting disconnections.
# Web Sockets
@docs listen, keepAlive, send
-}
import Dict
import Process
import Task exposing (Task)
import Time exposing (Time)
import WebSocket.LowLevel as WS
-- COMMANDS
type MyCmd msg
= Send String String
{-| Send a message to a particular address. You might say something like this:
send "ws://echo.websocket.org" "Hello!"
**Note:** It is important that you are also subscribed to this address with
`listen` or `keepAlive`. If you are not, the web socket will be created to
send one message and then closed. Not good!
-}
send : String -> String -> Cmd msg
send url message =
command (Send url message)
cmdMap : (a -> b) -> MyCmd a -> MyCmd b
cmdMap _ (Send url msg) =
Send url msg
-- SUBSCRIPTIONS
type MySub msg
= Listen String (String -> msg)
| ListenWithDisconnect String (String -> msg) ({ code : Int, reason : String, wasClean : Bool } -> msg)
| KeepAlive String
{-| Subscribe to any incoming messages on a websocket. You might say something
like this:
type Msg = Echo String | ...
subscriptions model =
listen "ws://echo.websocket.org" Echo
**Note:** If the connection goes down, the effect manager tries to reconnect
with an exponential backoff strategy. Any messages you try to `send` while the
connection is down are queued and will be sent as soon as possible.
-}
listen : String -> (String -> msg) -> Sub msg
listen url tagger =
subscription (Listen url tagger)
{-| Subscribe to any incoming messages on a websocket. You might say something
like this:
type Msg = Echo String | Disconnected { code : Int, reason : String, wasClean : Bool } ...
subscriptions model =
listen "ws://echo.websocket.org" Echo Disconnected
**Note:** If the connection goes down, the effect manager tries to reconnect
with an exponential backoff strategy. Any messages you try to `send` while the
connection is down are queued and will be sent as soon as possible.
-}
listenWithDisconnect : String -> (String -> msg) -> ({ code : Int, reason : String, wasClean : Bool } -> msg) -> Sub msg
listenWithDisconnect url tagger disconnectedTagger =
subscription (ListenWithDisconnect url tagger disconnectedTagger)
{-| Keep a connection alive, but do not report any messages. This is useful
for keeping a connection open for when you only need to `send` messages. So
you might say something like this:
subscriptions model =
keepAlive "ws://echo.websocket.org"
**Note:** If the connection goes down, the effect manager tries to reconnect
with an exponential backoff strategy. Any messages you try to `send` while the
connection is down are queued and will be sent as soon as possible.
-}
keepAlive : String -> Sub msg
keepAlive url =
subscription (KeepAlive url)
subMap : (a -> b) -> MySub a -> MySub b
subMap func sub =
case sub of
Listen url tagger ->
Listen url (tagger >> func)
ListenWithDisconnect url tagger disconnectedTagger ->
ListenWithDisconnect url (tagger >> func) (disconnectedTagger >> func)
KeepAlive url ->
KeepAlive url
-- MANAGER
type alias State msg =
{ sockets : SocketsDict
, queues : QueuesDict
, subs : SubsDict msg
}
type alias SocketsDict =
Dict.Dict String Connection
type alias QueuesDict =
Dict.Dict String (List String)
type alias SubsDict msg =
Dict.Dict String (List ((String -> msg), Maybe ({ code : Int, reason : String, wasClean : Bool } -> msg)))
type Connection
= Opening Int Process.Id
| Connected WS.WebSocket
init : Task Never (State msg)
init =
Task.succeed (State Dict.empty Dict.empty Dict.empty)
-- HANDLE APP MESSAGES
(&>) t1 t2 =
Task.andThen (\_ -> t2) t1
onEffects
: Platform.Router msg Msg
-> List (MyCmd msg)
-> List (MySub msg)
-> State msg
-> Task Never (State msg)
onEffects router cmds subs state =
let
sendMessagesGetNewQueues =
sendMessagesHelp cmds state.sockets state.queues
newSubs =
buildSubDict subs Dict.empty
cleanup newQueues =
let
newEntries =
Dict.union newQueues (Dict.map (\k v -> []) newSubs)
leftStep name _ getNewSockets =
getNewSockets
|> Task.andThen (\newSockets -> attemptOpen router 0 name
|> Task.andThen (\pid -> Task.succeed (Dict.insert name (Opening 0 pid) newSockets)))
bothStep name _ connection getNewSockets =
Task.map (Dict.insert name connection) getNewSockets
rightStep name connection getNewSockets =
closeConnection connection &> getNewSockets
collectNewSockets =
Dict.merge leftStep bothStep rightStep newEntries state.sockets (Task.succeed Dict.empty)
in
collectNewSockets
|> Task.andThen (\newSockets -> Task.succeed (State newSockets newQueues newSubs))
in
sendMessagesGetNewQueues
|> Task.andThen cleanup
sendMessagesHelp : List (MyCmd msg) -> SocketsDict -> QueuesDict -> Task x QueuesDict
sendMessagesHelp cmds socketsDict queuesDict =
case cmds of
[] ->
Task.succeed queuesDict
Send name msg :: rest ->
case Dict.get name socketsDict of
Just (Connected socket) ->
WS.send socket msg
&> sendMessagesHelp rest socketsDict queuesDict
_ ->
sendMessagesHelp rest socketsDict (Dict.update name (add msg) queuesDict)
buildSubDict : List (MySub msg) -> SubsDict msg -> SubsDict msg
buildSubDict subs dict =
case subs of
[] ->
dict
Listen name tagger :: rest ->
buildSubDict rest (Dict.update name (add (tagger, Nothing)) dict)
ListenWithDisconnect name tagger disconnectedTagger :: rest ->
buildSubDict rest (Dict.update name (add (tagger, Just disconnectedTagger)) dict)
KeepAlive name :: rest ->
buildSubDict rest (Dict.update name (Just << Maybe.withDefault []) dict)
add : a -> Maybe (List a) -> Maybe (List a)
add value maybeList =
case maybeList of
Nothing ->
Just [value]
Just list ->
Just (value :: list)
-- HANDLE SELF MESSAGES
type Msg
= Receive String String
| Die String { code : Int, reason : String, wasClean : Bool }
| GoodOpen String WS.WebSocket
| BadOpen String
onSelfMsg : Platform.Router msg Msg -> Msg -> State msg -> Task Never (State msg)
onSelfMsg router selfMsg state =
case selfMsg of
Receive name str ->
let
sends =
Dict.get name state.subs
|> Maybe.withDefault []
|> List.map (\(tagger, disconnectedTagger) -> Platform.sendToApp router (tagger str))
in
Task.sequence sends &> Task.succeed state
Die name details ->
let
sends =
Dict.get name state.subs
|> Maybe.withDefault []
|> List.filterMap (\(tagger, maybeDisconnectedTagger) ->
maybeDisconnectedTagger
|> Maybe.map (\disconnectedTagger -> Platform.sendToApp router (disconnectedTagger details))
)
in
Task.sequence sends &>
case Dict.get name state.sockets of
Nothing ->
Task.succeed state
Just _ ->
attemptOpen router 0 name
|> Task.andThen (\pid -> Task.succeed (updateSocket name (Opening 0 pid) state))
GoodOpen name socket ->
case Dict.get name state.queues of
Nothing ->
Task.succeed (updateSocket name (Connected socket) state)
Just messages ->
List.foldl
(\msg task -> WS.send socket msg &> task)
(Task.succeed (removeQueue name (updateSocket name (Connected socket) state)))
messages
BadOpen name ->
case Dict.get name state.sockets of
Nothing ->
Task.succeed state
Just (Opening n _) ->
attemptOpen router (n + 1) name
|> Task.andThen (\pid -> Task.succeed (updateSocket name (Opening (n + 1) pid) state))
Just (Connected _) ->
Task.succeed state
updateSocket : String -> Connection -> State msg -> State msg
updateSocket name connection state =
{ state | sockets = Dict.insert name connection state.sockets }
removeQueue : String -> State msg -> State msg
removeQueue name state =
{ state | queues = Dict.remove name state.queues }
-- OPENING WEBSOCKETS WITH EXPONENTIAL BACKOFF
attemptOpen : Platform.Router msg Msg -> Int -> String -> Task x Process.Id
attemptOpen router backoff name =
let
goodOpen ws =
Platform.sendToSelf router (GoodOpen name ws)
badOpen _ =
Platform.sendToSelf router (BadOpen name)
actuallyAttemptOpen =
open name router
|> Task.andThen goodOpen
|> Task.onError badOpen
in
Process.spawn (after backoff &> actuallyAttemptOpen)
open : String -> Platform.Router msg Msg -> Task WS.BadOpen WS.WebSocket
open name router =
WS.open name
{ onMessage = \_ msg -> Platform.sendToSelf router (Receive name msg)
, onClose = \details -> Platform.sendToSelf router (Die name details)
}
after : Int -> Task x ()
after backoff =
if backoff < 1 then
Task.succeed ()
else
Process.sleep (toFloat (10 * 2 ^ backoff))
-- CLOSE CONNECTIONS
closeConnection : Connection -> Task x ()
closeConnection connection =
case connection of
Opening _ pid ->
Process.kill pid
Connected socket ->
WS.close socket
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment