Created
November 8, 2012 08:33
-
-
Save rblaze/4037573 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Main where | |
import KwHen | |
import YBus | |
import Network | |
import Data.Word | |
import Control.Concurrent | |
import Control.Concurrent.STM | |
import Control.Monad | |
import Data.HashSet | |
import System.IO | |
import Text.ProtocolBuffers | |
import NKiwiHen.TResponse | |
import qualified Data.ByteString.Lazy as BS | |
data YState = YState { | |
inFlight :: TVar Word, | |
pendingIDs :: TVar (HashSet YBusKey), | |
finished :: TVar Bool | |
} | |
addKey :: YBusKey -> YState -> STM () | |
addKey key state = do | |
modifyTVar' (inFlight state) $ \s -> s + 1 | |
modifyTVar' (pendingIDs state) $ insert key | |
delKey :: YBusKey -> YState -> STM () | |
delKey key state = do | |
inflight <- readTVar (inFlight state) | |
when (inflight == 0) $ error "zero inflight" | |
modifyTVar' (inFlight state) $ \s -> s - 1 | |
modifyTVar' (pendingIDs state) $ delete key | |
sender :: Int -> Handle -> YState -> IO () | |
sender 0 _ state = atomically $ writeTVar (finished state) True | |
sender n h state = do | |
-- atomically $ do | |
-- s <- readTVar (inFlight state) | |
-- when (s > 1000) retry | |
let r = makeReadRequest 50 "http://www.lenta.ru" | |
sentid <- sendYBusMessage h (messagePut r) | |
atomically $ addKey sentid state | |
hFlush h | |
sender (n - 1) h state | |
readReplies :: Handle -> YState -> IO () | |
readReplies h state = do | |
(fin, inflight) <- atomically $ do | |
fin <- readTVar $ finished state | |
inflight <- readTVar $ inFlight state | |
return (fin, inflight) | |
unless (fin && (inflight == 0)) $ do | |
when fin $ print inflight | |
(rcvid, dStr) <- readYBusMessage h | |
atomically $ delKey rcvid state | |
let reply = messageGet dStr :: (Either String (TResponse, BS.ByteString)) | |
let msg = either id | |
(\(d, _) -> show d) | |
reply | |
-- print msg | |
readReplies h state | |
main :: IO() | |
main = withSocketsDo $ do | |
state <- atomically $ liftM3 YState (newTVar 0) (newTVar empty) (newTVar False) | |
h <- connectTo "duck" (PortNumber 10000) | |
_ <- forkIO (sender 100000 h state) | |
readReplies h state |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sendYBusMessage :: Handle -> BS.ByteString -> IO YBusKey | |
sendYBusMessage h req = do | |
let msglen = fromIntegral $ yBusHeaderSize + BS.length req | |
yid <- randomIO :: IO Word64 | |
(CTime ytime) <- epochTime | |
let hdr = YBusHeader yid msglen (fromIntegral ytime * 1000) 0 0xb4 | |
let stream = BS.append (encode hdr) req | |
BS.hPut h stream | |
return yid | |
readYBusMessage :: Handle -> IO (YBusKey, BS.ByteString) | |
readYBusMessage h = do | |
hStr <- BS.hGet h yBusHeaderSize | |
let header = decode hStr :: YBusHeader | |
let dataSize = fromIntegral (yBusSize header - yBusHeaderSize) | |
message <- BS.hGet h dataSize | |
return (yBusKey header, message) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
sendYBusMessage :: Handle -> BS.ByteString -> IO YBusKey
sendYBusMessage h req = do
let msglen = fromIntegral $ yBusHeaderSize + BS.length req
yid <- randomIO :: IO Word64
(CTime ytime) <- epochTime
let hdr = YBusHeader yid msglen (fromIntegral ytime * 1000) 0 0xb4
let stream = BS.append (encode hdr) req
BS.hPut h stream
return yid