-
-
Save bitemyapp/06f86eb8c0c5c3cbdb6c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- to watch: telnet localhost 8887 | |
-- to stream: script -f >( (echo hello streamname; cat -) | nc localhost 8888 > /dev/null ) | |
{-# LANGUAGE OverloadedStrings #-} | |
module Fcast where | |
import Control.Concurrent | |
import Control.Concurrent.STM | |
import Control.Monad | |
import Control.Monad.Fix | |
import Data.Time.Clock | |
import Network | |
import System.IO | |
import System.IO.Error | |
import Text.Printf | |
import qualified Data.Map.Strict as M | |
import qualified Data.ByteString.Char8 as B | |
viewerPort :: PortID | |
viewerPort = PortNumber 8887 | |
streamerPort :: PortID | |
streamerPort = PortNumber 8888 | |
type Server = TVar (M.Map String Stream) | |
data Stream = Stream { streamHandle :: Handle | |
, streamChannel :: TChan Char | |
, streamUpdateTime :: TVar UTCTime | |
, streamHistory :: TVar B.ByteString } | |
runViewer :: Server -> Handle -> IO () | |
runViewer server h = do | |
hSetNewlineMode h (NewlineMode CRLF CRLF) | |
B.hPut h "\xFF\xFB\1\xFF\xFB\3" -- IAC WILL ECHO, IAC WILL SGA | |
replicateM_ 6 $ hGetChar h | |
showMenu | |
where showMenu = do | |
streams <- readTVarIO server | |
now <- getCurrentTime | |
let menu = zip ['a'..'p'] (M.toList streams) | |
hPutStr h "\ESC[2J\ESC[1;32HWelcome to fcast!\n\n" | |
when (null menu) $ hPutStrLn h "No streams found." | |
forM_ menu $ \(c, (name, stream)) -> do | |
t <- readTVarIO (streamUpdateTime stream) | |
let dt = realToFrac $ diffUTCTime now t :: Double | |
hPrintf h "(%c) %-20s (%.3fs idle)\n" c (take 20 name) dt | |
ch <- hGetChar h | |
when (ch /= 'q') $ case lookup ch menu of | |
Just (_, stream) -> watch stream | |
Nothing -> showMenu | |
watch stream = do | |
B.hPut h . B.reverse =<< readTVarIO (streamHistory stream) | |
dup <- atomically $ dupTChan $ streamChannel stream | |
t <- forkIO $ fix $ \loop -> do ch <- atomically (readTChan dup) | |
hPutChar h ch >> loop | |
fix $ \loop -> do ch <- hGetChar h | |
if (ch == 'q') then killThread t >> showMenu | |
else loop | |
runStreamer :: Server -> Handle -> IO () | |
runStreamer server h = do | |
-- If this match fails, we disconnect. | |
("hello":name:_) <- words `fmap` hGetLine h | |
s <- atomically $ readTVar server | |
when (M.member name s) $ fail "name already taken" | |
-- Initialize a new stream. | |
chan <- newTChanIO | |
time <- newTVarIO =<< getCurrentTime | |
hist <- newTVarIO B.empty | |
let stream = Stream h chan time hist | |
atomically $ modifyTVar server (M.insert name stream) | |
printf "stream %s has started\n" name | |
-- Disconnect nicely if we hit EOF. | |
let disconnect = do | |
atomically $ modifyTVar server (M.delete name) | |
printf "stream %s has stopped\n" name | |
-- Read characters and update the stream. | |
fix $ \loop -> flip catchIOError (\_ -> disconnect) $ do | |
ch <- hGetChar h | |
now <- getCurrentTime | |
atomically $ do | |
modifyTVar hist (B.cons ch) | |
writeTChan chan ch | |
writeTVar time now | |
loop | |
-- Listen for connections interacting with the server on a given port and | |
-- handle them. | |
listen :: String -> PortID -> (Server -> Handle -> IO ()) -> Server -> IO () | |
listen who port run server = do | |
sock <- listenOn port | |
printf "listening to %s connections\n" who | |
forever $ do | |
(handle, host, hPort) <- accept sock | |
printf "%s connected (%s, %s)\n" who host (show hPort) | |
forkFinally (run server handle) | |
(\_ -> printf "%s disconnected\n" who >> hClose handle) | |
main :: IO () | |
main = withSocketsDo $ do | |
server <- newTVarIO M.empty | |
_ <- forkIO $ listen "viewers" viewerPort runViewer server | |
listen "streamer" streamerPort runStreamer server |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment