Last active
January 20, 2017 00:32
-
-
Save maurisvh/1a4d7752e2900459011c to your computer and use it in GitHub Desktop.
tiny terminal livestreaming server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- to watch: telnet localhost 8887 | |
-- to stream: script -f >( (echo hello streamname; cat -) | nc -q5 localhost 8888 > /dev/tty ) | |
{-# LANGUAGE OverloadedStrings #-} | |
module Main where | |
import Control.Concurrent | |
import Control.Concurrent.Chan | |
import Control.Concurrent.MVar | |
import Control.Monad | |
import Control.Monad.Fix | |
import Data.Time.Clock | |
import Network | |
import System.IO | |
import System.IO.Error | |
import Text.Printf | |
import qualified Data.Map.Strict as M | |
import qualified Data.ByteString.Char8 as B | |
viewerPort :: PortID | |
viewerPort = PortNumber 8887 | |
streamerPort :: PortID | |
streamerPort = PortNumber 8888 | |
type Server = MVar (M.Map String Stream) | |
data Stream = Stream { streamHandle :: Handle | |
, streamChannel :: Chan B.ByteString | |
, streamUpdateTime :: MVar UTCTime | |
, streamHistory :: MVar [B.ByteString] | |
, streamViewers :: MVar Int } | |
-- IO action that applies a pure function to an MVar. | |
(%=) :: MVar a -> (a -> a) -> IO () | |
var %= f = modifyMVar_ var (return . f) | |
takeEnd :: Int -> B.ByteString -> B.ByteString | |
takeEnd n bs = B.drop (B.length bs - n) bs | |
-- Run an interactive line prompt over the given handle. | |
hGetLineInteractive :: Handle -> IO String | |
hGetLineInteractive h = go "" where | |
go buf = do | |
c <- hGetChar h | |
case c of | |
c | c >= ' ' && c <= '~' -> hPutChar h c >> go (c:buf) | |
'\ESC' -> return "" | |
'\DEL' -> case buf of (_:t) -> hPutStr h "\BS \BS" >> go t | |
[] -> go buf | |
c | c == '\n' || c == '\r' -> return (reverse buf) | |
_ -> go buf | |
-- Print a notification at the top of the handle's screen. | |
notify :: Handle -> String -> IO () | |
notify h s = hPutStr h ("\ESC7\ESC[1;1H\ESC[1;41m\ESC[2K" ++ s ++ "\ESC8") | |
-- Ask a viewer for a message, and send it to a streamer. | |
message :: Handle -> Handle -> IO () | |
message viewH streamH = do | |
hPutStr viewH $ "\ESC[2J\ESC[0m\ESC[1;1HMessage: " | |
m <- take 79 `fmap` hGetLineInteractive viewH | |
when (not (null m)) $ notify streamH m | |
return () | |
runViewer :: Server -> Handle -> IO () | |
runViewer server h = do | |
hSetNewlineMode h (NewlineMode CRLF CRLF) | |
B.hPut h "\xFF\xFB\1\xFF\xFB\3" -- IAC WILL ECHO, IAC WILL SGA | |
B.hGet h 6 | |
showMenu | |
where showMenu = do | |
streams <- readMVar server | |
now <- getCurrentTime | |
let menu = zip ['a'..'p'] (M.toList streams) | |
hPutStr h $ "\ESC[2J\ESC[0m\ESC[1;32HWelcome to fcast!\n\n" | |
when (null menu) $ hPutStrLn h "No streams found." | |
forM_ menu $ \(c, (name, stream)) -> do | |
t <- readMVar (streamUpdateTime stream) | |
viewers <- readMVar (streamViewers stream) | |
let dt = realToFrac $ diffUTCTime now t :: Double | |
hPrintf h "(%c) %-20s (%d viewers, %.3fs idle)\n" | |
c (take 20 name) viewers dt | |
ch <- hGetChar h | |
when (ch /= 'q') $ case lookup ch menu of | |
Just (_, stream) -> watch stream | |
Nothing -> showMenu | |
watch stream = do | |
hPutStr h "\ESC[2J\ESC[0m" | |
let views k = streamViewers stream %= (+k) | |
views 1 | |
hist <- readMVar (streamHistory stream) | |
let bs = takeEnd 15000 $ B.concat (reverse hist) | |
B.hPut h bs | |
swapMVar (streamHistory stream) [bs] | |
dup <- dupChan $ streamChannel stream | |
t <- forkIO $ fix $ \loop -> do ch <- readChan dup | |
B.hPut h ch >> loop | |
fix $ \loop -> do | |
ch <- hGetChar h | |
case ch of | |
'q' -> views (-1) >> killThread t >> showMenu | |
'm' -> do views (-1); killThread t | |
message h (streamHandle stream) | |
watch stream | |
_ -> loop | |
runStreamer :: Server -> Handle -> IO () | |
runStreamer server h = do | |
-- If this match fails, we disconnect. | |
("hello":name:_) <- words `fmap` hGetLine h | |
s <- readMVar server | |
when (M.member name s) $ fail "name already taken" | |
-- Initialize a new stream. | |
notify h $ "Started broadcasting as " ++ name ++ "." | |
chan <- newChan | |
time <- newMVar =<< getCurrentTime | |
hist <- newMVar [] | |
viewers <- newMVar 0 | |
let stream = Stream h chan time hist viewers | |
server %= M.insert name stream | |
printf "stream %s has started\n" name | |
-- Disconnect nicely if we hit EOF. | |
let disconnect = do | |
server %= M.delete name | |
printf "stream %s has stopped\n" name | |
-- Read characters and update the stream. | |
fix $ \loop -> flip catchIOError (\_ -> disconnect) $ do | |
bs <- B.hGetSome h 5000 | |
now <- getCurrentTime | |
hist %= (bs:) | |
writeChan chan bs | |
swapMVar time now | |
loop | |
-- Listen for connections interacting with the server on a given port and | |
-- handle them. | |
listen :: String -> PortID -> (Server -> Handle -> IO ()) -> Server -> IO () | |
listen who port run server = do | |
sock <- listenOn port | |
printf "listening to %s connections\n" who | |
forever $ do | |
(handle, host, hPort) <- accept sock | |
printf "%s connected (%s, %s)\n" who host (show hPort) | |
forkFinally (run server handle) | |
(\_ -> printf "%s disconnected\n" who >> hClose handle) | |
main :: IO () | |
main = withSocketsDo $ do | |
server <- newMVar M.empty | |
forkIO $ listen "viewer" viewerPort runViewer server | |
listen "streamer" streamerPort runStreamer server |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment