Skip to content

Instantly share code, notes, and snippets.

@rblaze
Created November 8, 2012 08:33
Show Gist options
  • Save rblaze/4037573 to your computer and use it in GitHub Desktop.
Save rblaze/4037573 to your computer and use it in GitHub Desktop.
module Main where
import KwHen
import YBus
import Network
import Data.Word
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import Data.HashSet
import System.IO
import Text.ProtocolBuffers
import NKiwiHen.TResponse
import qualified Data.ByteString.Lazy as BS
data YState = YState {
inFlight :: TVar Word,
pendingIDs :: TVar (HashSet YBusKey),
finished :: TVar Bool
}
addKey :: YBusKey -> YState -> STM ()
addKey key state = do
modifyTVar' (inFlight state) $ \s -> s + 1
modifyTVar' (pendingIDs state) $ insert key
delKey :: YBusKey -> YState -> STM ()
delKey key state = do
inflight <- readTVar (inFlight state)
when (inflight == 0) $ error "zero inflight"
modifyTVar' (inFlight state) $ \s -> s - 1
modifyTVar' (pendingIDs state) $ delete key
sender :: Int -> Handle -> YState -> IO ()
sender 0 _ state = atomically $ writeTVar (finished state) True
sender n h state = do
-- atomically $ do
-- s <- readTVar (inFlight state)
-- when (s > 1000) retry
let r = makeReadRequest 50 "http://www.lenta.ru"
sentid <- sendYBusMessage h (messagePut r)
atomically $ addKey sentid state
hFlush h
sender (n - 1) h state
readReplies :: Handle -> YState -> IO ()
readReplies h state = do
(fin, inflight) <- atomically $ do
fin <- readTVar $ finished state
inflight <- readTVar $ inFlight state
return (fin, inflight)
unless (fin && (inflight == 0)) $ do
when fin $ print inflight
(rcvid, dStr) <- readYBusMessage h
atomically $ delKey rcvid state
let reply = messageGet dStr :: (Either String (TResponse, BS.ByteString))
let msg = either id
(\(d, _) -> show d)
reply
-- print msg
readReplies h state
main :: IO()
main = withSocketsDo $ do
state <- atomically $ liftM3 YState (newTVar 0) (newTVar empty) (newTVar False)
h <- connectTo "duck" (PortNumber 10000)
_ <- forkIO (sender 100000 h state)
readReplies h state
sendYBusMessage :: Handle -> BS.ByteString -> IO YBusKey
sendYBusMessage h req = do
let msglen = fromIntegral $ yBusHeaderSize + BS.length req
yid <- randomIO :: IO Word64
(CTime ytime) <- epochTime
let hdr = YBusHeader yid msglen (fromIntegral ytime * 1000) 0 0xb4
let stream = BS.append (encode hdr) req
BS.hPut h stream
return yid
readYBusMessage :: Handle -> IO (YBusKey, BS.ByteString)
readYBusMessage h = do
hStr <- BS.hGet h yBusHeaderSize
let header = decode hStr :: YBusHeader
let dataSize = fromIntegral (yBusSize header - yBusHeaderSize)
message <- BS.hGet h dataSize
return (yBusKey header, message)
@rblaze
Copy link
Author

rblaze commented Nov 8, 2012

sendYBusMessage :: Handle -> BS.ByteString -> IO YBusKey
sendYBusMessage h req = do
let msglen = fromIntegral $ yBusHeaderSize + BS.length req
yid <- randomIO :: IO Word64
(CTime ytime) <- epochTime
let hdr = YBusHeader yid msglen (fromIntegral ytime * 1000) 0 0xb4
let stream = BS.append (encode hdr) req
BS.hPut h stream
return yid

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment