Last active
July 16, 2016 06:18
-
-
Save luciferous/35728c76082b925ae9313aece00c00c8 to your computer and use it in GitHub Desktop.
STM example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"math/rand" | |
"net" | |
"strconv" | |
"time" | |
) | |
const port = "8000" | |
func startPublisher(messages chan<- string) { | |
rand.Seed(time.Now().Unix()) | |
for { | |
time.Sleep(1 * time.Second) | |
messages <- strconv.Itoa(rand.Int()) | |
} | |
} | |
func startAcceptor(ln net.Listener, incoming chan<- net.Conn, closing chan<- net.Conn) { | |
for { | |
conn, err := ln.Accept() | |
if err != nil { | |
fmt.Println(err) | |
break | |
} | |
incoming <- conn | |
go func() { | |
waitForDisconnect(conn) | |
closing <- conn | |
}() | |
} | |
} | |
func waitForDisconnect(conn net.Conn) { | |
discard := make([]byte, 1024) | |
for { | |
bytes, err := conn.Read(discard) | |
if err != nil || bytes == 0 { | |
break | |
} | |
} | |
} | |
func indexOf(conn net.Conn, clients []net.Conn) int { | |
for i, c := range clients { | |
if c == conn { | |
return i | |
} | |
} | |
return -1 | |
} | |
func loop( | |
clients []net.Conn, | |
messages <-chan string, | |
incoming <-chan net.Conn, | |
closing <-chan net.Conn) { | |
for { | |
select { | |
case conn := <-incoming: | |
fmt.Println("Incoming:", conn) | |
clients = append(clients, conn) | |
case conn := <-closing: | |
fmt.Println("Closing:", conn) | |
ix := indexOf(conn, clients) | |
if ix > -1 { | |
clients[ix] = clients[len(clients)-1] | |
clients = clients[:len(clients)-1] | |
} | |
case n := <-messages: | |
fmt.Println("Publishing", n, "to", len(clients), "clients") | |
for _, conn := range clients { | |
buf := []byte(n + "\n") | |
conn.Write(buf) | |
} | |
} | |
} | |
} | |
func main() { | |
ln, err := net.Listen("tcp", ":"+port) | |
if err != nil { | |
panic(err) | |
} | |
defer ln.Close() | |
messages := make(chan string) | |
incoming := make(chan net.Conn) | |
closing := make(chan net.Conn) | |
go startPublisher(messages) | |
go startAcceptor(ln, incoming, closing) | |
clients := make([]net.Conn, 0) | |
loop(clients, messages, incoming, closing) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE ScopedTypeVariables #-} | |
module Main where | |
import Network ( withSocketsDo, listenOn, PortID(..) ) | |
import Network.Socket ( accept, close, recv, send, Socket ) | |
import Control.Concurrent ( forkIO, threadDelay, ThreadId ) | |
import Control.Concurrent.STM ( atomically | |
, orElse | |
, newTChan | |
, readTChan | |
, retry | |
, writeTChan | |
, TChan | |
) | |
import Control.Exception ( catch, SomeException ) | |
import Control.Monad ( forM_, forever, join, void ) | |
import System.Random ( randomIO ) | |
import Text.Printf ( printf ) | |
port :: Int | |
port = 8000 | |
startPublisher :: TChan String -> IO ThreadId | |
startPublisher messages = | |
forkIO $ forever $ do | |
n <- randomIO :: IO Int | |
atomically (writeTChan messages (show n)) | |
threadDelay 1000000 | |
startAcceptor :: Socket -> TChan Socket -> TChan Socket -> IO ThreadId | |
startAcceptor sock incoming closing = | |
forkIO $ forever $ do | |
(conn, _) <- accept sock | |
atomically (writeTChan incoming conn) | |
forkIO $ do | |
waitForClose conn | |
atomically (writeTChan closing conn) | |
where | |
waitForClose conn = do | |
buf <- recv conn 4096 `catch` \(_ :: SomeException) -> return "" | |
if null buf then return () else waitForClose conn | |
main :: IO () | |
main = withSocketsDo $ do | |
sock <- listenOn (PortNumber (fromIntegral port)) | |
incoming <- atomically newTChan | |
closing <- atomically newTChan | |
messages <- atomically newTChan | |
startPublisher messages | |
startAcceptor sock incoming closing | |
loop [] incoming closing messages | |
where | |
loop clients incoming closing messages = join $ atomically $ | |
(do conn <- readTChan incoming | |
return $ do | |
printf "Incoming: %s\n" (show conn) | |
loop (conn:clients) incoming closing messages) | |
`orElse` | |
(do conn <- readTChan closing | |
return $ do | |
printf "Closing: %s\n" (show conn) | |
close conn | |
loop ([c | c <- clients, c /= conn]) incoming closing messages) | |
`orElse` | |
(do message <- readTChan messages | |
return $ do | |
printf "Publishing %s to %d clients\n" message (length clients) | |
forM_ clients (\conn -> | |
void (send conn (message ++ "\n")) `catch` | |
\(_ :: SomeException) -> atomically (writeTChan closing conn)) | |
loop clients incoming closing messages) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name: stm-example | |
version: 0.1.0.0 | |
synopsis: Fanout TCP to demonstrate STM | |
description: Fanout TCP to demonstrate STM | |
homepage: http://lcfrs.org | |
license: BSD3 | |
license-file: LICENSE | |
author: Neuman vong | |
maintainer: [email protected] | |
copyright: 2016 Neuman Vong | |
category: Web | |
build-type: Simple | |
-- extra-source-files: | |
cabal-version: >=1.10 | |
executable stm-example-exe | |
hs-source-dirs: . | |
main-is: Main.hs | |
ghc-options: -threaded -rtsopts -with-rtsopts=-N | |
build-depends: base | |
, network | |
, stm | |
, random | |
default-language: Haskell2010 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment