STM example
package main
import (
const port = "8000"
func startPublisher(messages chan<- string) {
for {
time.Sleep(1 * time.Second)
messages <- strconv.Itoa(rand.Int())
func startAcceptor(ln net.Listener, incoming chan<- net.Conn, closing chan<- net.Conn) {
for {
conn, err := ln.Accept()
if err != nil {
incoming <- conn
go func() {
closing <- conn
func waitForDisconnect(conn net.Conn) {
discard := make([]byte, 1024)
for {
bytes, err := conn.Read(discard)
if err != nil || bytes == 0 {
func indexOf(conn net.Conn, clients []net.Conn) int {
for i, c := range clients {
if c == conn {
return i
return -1
func loop(
clients []net.Conn,
messages <-chan string,
incoming <-chan net.Conn,
closing <-chan net.Conn) {
for {
select {
case conn := <-incoming:
fmt.Println("Incoming:", conn)
clients = append(clients, conn)
case conn := <-closing:
fmt.Println("Closing:", conn)
ix := indexOf(conn, clients)
if ix > -1 {
clients[ix] = clients[len(clients)-1]
clients = clients[:len(clients)-1]
case n := <-messages:
fmt.Println("Publishing", n, "to", len(clients), "clients")
for _, conn := range clients {
buf := []byte(n + "\n")
func main() {
ln, err := net.Listen("tcp", ":"+port)
if err != nil {
defer ln.Close()
messages := make(chan string)
incoming := make(chan net.Conn)
closing := make(chan net.Conn)
go startPublisher(messages)
go startAcceptor(ln, incoming, closing)
clients := make([]net.Conn, 0)
loop(clients, messages, incoming, closing)
{-# LANGUAGE ScopedTypeVariables #-}
module Main where
import Network ( withSocketsDo, listenOn, PortID(..) )
import Network.Socket ( accept, close, recv, send, Socket )
import Control.Concurrent ( forkIO, threadDelay, ThreadId )
import Control.Concurrent.STM ( atomically
, orElse
, newTChan
, readTChan
, retry
, writeTChan
, TChan
import Control.Exception ( catch, SomeException )
import Control.Monad ( forM_, forever, join, void )
import System.Random ( randomIO )
import Text.Printf ( printf )
port :: Int
port = 8000
startPublisher :: TChan String -> IO ThreadId
startPublisher messages =
forkIO $ forever $ do
n <- randomIO :: IO Int
atomically (writeTChan messages (show n))
threadDelay 1000000
startAcceptor :: Socket -> TChan Socket -> TChan Socket -> IO ThreadId
startAcceptor sock incoming closing =
forkIO $ forever $ do
(conn, _) <- accept sock
atomically (writeTChan incoming conn)
forkIO $ do
waitForClose conn
atomically (writeTChan closing conn)
waitForClose conn = do
buf <- recv conn 4096 `catch` \(_ :: SomeException) -> return ""
if null buf then return () else waitForClose conn
main :: IO ()
main = withSocketsDo $ do
sock <- listenOn (PortNumber (fromIntegral port))
incoming <- atomically newTChan
closing <- atomically newTChan
messages <- atomically newTChan
startPublisher messages
startAcceptor sock incoming closing
loop [] incoming closing messages
loop clients incoming closing messages = join $ atomically $
(do conn <- readTChan incoming
return $ do
printf "Incoming: %s\n" (show conn)
loop (conn:clients) incoming closing messages)
(do conn <- readTChan closing
return $ do
printf "Closing: %s\n" (show conn)
close conn
loop ([c | c <- clients, c /= conn]) incoming closing messages)
(do message <- readTChan messages
return $ do
printf "Publishing %s to %d clients\n" message (length clients)
forM_ clients (\conn ->
void (send conn (message ++ "\n")) `catch`
\(_ :: SomeException) -> atomically (writeTChan closing conn))
loop clients incoming closing messages)
name: stm-example
synopsis: Fanout TCP to demonstrate STM
description: Fanout TCP to demonstrate STM
license: BSD3
license-file: LICENSE
author: Neuman vong
maintainer: [email protected]
copyright: 2016 Neuman Vong
category: Web
build-type: Simple
-- extra-source-files:
cabal-version: >=1.10
executable stm-example-exe
hs-source-dirs: .
main-is: Main.hs
ghc-options: -threaded -rtsopts -with-rtsopts=-N
build-depends: base
, network
, stm
, random
default-language: Haskell2010
