Created
November 14, 2010 01:19
-
-
Save vito/675807 to your computer and use it in GitHub Desktop.
atomo zeromq wrapper
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE DeriveDataTypeable, QuasiQuotes, RankNTypes, StandaloneDeriving #-} | |
import Data.Typeable | |
import qualified Data.Text.Encoding as T | |
import qualified System.ZMQ as Z | |
import Atomo | |
import Atomo.Valuable | |
load :: VM () | |
load = do | |
eval [$e|super Context = Object clone|] | |
[$p|Context new: (size: Integer)|] =: do | |
here "size" | |
>>= liftM Atomo.fromInteger . findInteger | |
>>= liftIO . Z.init . fromIntegral | |
>>= toValue | |
eval [$e|Socket = Object clone|] | |
[$p|(s: Socket) show|] =::: [$e|"<socket " .. s type show .. ">"|] | |
[$p|(c: Context) terminate|] =: do | |
ctx <- here "c" >>= fromValue | |
liftIO (Z.term ctx) | |
return (particle "ok") | |
[$p|(c: Context) socket: (direction: Particle)|] =: do | |
ctx <- here "c" >>= fromValue | |
d <- here "direction" >>= liftM fromParticle . findParticle | |
sock <- | |
case d of | |
PMSingle "push" -> liftIO (Z.socket ctx Z.Push) >>= toValue | |
PMSingle "pull" -> liftIO (Z.socket ctx Z.Pull) >>= toValue | |
PMSingle "xrep" -> liftIO (Z.socket ctx Z.Xrep) >>= toValue | |
PMSingle "xreq" -> liftIO (Z.socket ctx Z.Xreq) >>= toValue | |
PMSingle "rep" -> liftIO (Z.socket ctx Z.Rep) >>= toValue | |
PMSingle "req" -> liftIO (Z.socket ctx Z.Req) >>= toValue | |
PMSingle "sub" -> liftIO (Z.socket ctx Z.Sub) >>= toValue | |
PMSingle "pub" -> liftIO (Z.socket ctx Z.Pub) >>= toValue | |
PMSingle "pair" -> liftIO (Z.socket ctx Z.Pair) >>= toValue | |
defineOn sock (Slot (psingle "type" PThis) (Particle d)) | |
return sock | |
[$p|(s: Socket) close|] =: do | |
here "s" >>= withSocket Z.close | |
return (particle "ok") | |
[$p|(s: Socket) set-option: (option: Particle)|] =: do | |
opt <- here "option" >>= findParticle >>= fromValue | |
here "s" >>= withSocket (flip Z.setOption opt) | |
return (particle "ok") | |
[$p|(s: Socket) get-option: (option: Particle)|] =: do | |
opt <- here "option" >>= findParticle >>= fromValue | |
here "s" >>= withSocket (flip Z.getOption opt) | |
>>= toValue | |
[$p|(s: Socket) subscribe: (sub: String)|] =: do | |
sub <- getString [$e|sub|] | |
here "s" >>= withSubSocket (flip Z.subscribe sub) | |
return (particle "ok") | |
[$p|(s: Socket) unsubscribe: (sub: String)|] =: do | |
sub <- getString [$e|sub|] | |
here "s" >>= withSubSocket (flip Z.unsubscribe sub) | |
return (particle "ok") | |
[$p|(s: Socket) bind: (addr: String)|] =: do | |
addr <- getString [$e|addr|] | |
here "s" >>= withSocket (flip Z.bind addr) | |
return (particle "ok") | |
[$p|(s: Socket) connect: (addr: String)|] =: do | |
addr <- getString [$e|addr|] | |
here "s" >>= withSocket (flip Z.connect addr) | |
return (particle "ok") | |
[$p|(s: Socket) send: (data: String)|] =: do | |
d <- getText [$e|data|] | |
here "s" >>= withSocket (\s -> Z.send s (T.encodeUtf8 d) []) | |
return (particle "ok") | |
[$p|(s: Socket) send!: (data: String)|] =: do | |
d <- getText [$e|data|] | |
here "s" >>= withSocket (\s -> Z.send s (T.encodeUtf8 d) [Z.NoBlock]) | |
return (particle "ok") | |
[$p|(s: Socket) receive|] =: do | |
here "s" >>= liftM (String . T.decodeUtf8) . withSocket (flip Z.receive []) | |
[$p|(s: Socket) receive!|] =: do | |
here "s" >>= liftM (String . T.decodeUtf8) . withSocket (flip Z.receive [Z.NoBlock]) | |
[$p|(s: Socket) ready?!|] =: do | |
here "s" >>= liftM Boolean . withSocket Z.moreToReceive | |
withSubSocket :: (Z.Socket Z.Sub -> IO b) -> Value -> VM b | |
withSubSocket f sock = | |
dispatch (single "socket" sock) | |
>>= fromHaskell "Socket Sub" | |
>>= liftIO . f | |
withSocket :: (forall a . Z.Socket a -> IO b) -> Value -> VM b | |
withSocket f sock = do | |
t <- dispatch (single "type" sock) >>= liftM fromParticle . findParticle | |
s <- dispatch (single "socket" sock) | |
case t of | |
PMSingle "push" -> do | |
s' <- fromHaskell "Socket Push" s :: VM (Z.Socket Z.Push) | |
liftIO (f s') | |
PMSingle "pull" -> do | |
s' <- fromHaskell "Socket Pull" s :: VM (Z.Socket Z.Pull) | |
liftIO (f s') | |
PMSingle "xrep" -> do | |
s' <- fromHaskell "Socket Xrep" s :: VM (Z.Socket Z.XRep) | |
liftIO (f s') | |
PMSingle "xreq" -> do | |
s' <- fromHaskell "Socket Xreq" s :: VM (Z.Socket Z.XReq) | |
liftIO (f s') | |
PMSingle "rep" -> do | |
s' <- fromHaskell "Socket Rep" s :: VM (Z.Socket Z.Rep) | |
liftIO (f s') | |
PMSingle "req" -> do | |
s' <- fromHaskell "Socket Req" s :: VM (Z.Socket Z.Req) | |
liftIO (f s') | |
PMSingle "sub" -> do | |
s' <- fromHaskell "Socket Sub" s :: VM (Z.Socket Z.Sub) | |
liftIO (f s') | |
PMSingle "pub" -> do | |
s' <- fromHaskell "Socket Pub" s :: VM (Z.Socket Z.Pub) | |
liftIO (f s') | |
PMSingle "pair" -> do | |
s' <- fromHaskell "Socket Pair" s :: VM (Z.Socket Z.Pair) | |
liftIO (f s') | |
deriving instance Typeable1 Z.Socket | |
deriving instance Typeable Z.Push | |
deriving instance Typeable Z.Pull | |
deriving instance Typeable Z.XRep | |
deriving instance Typeable Z.XReq | |
deriving instance Typeable Z.Rep | |
deriving instance Typeable Z.Req | |
deriving instance Typeable Z.Sub | |
deriving instance Typeable Z.Pub | |
deriving instance Typeable Z.Pair | |
deriving instance Typeable Z.Context | |
instance Valuable Z.Context where | |
toValue x = do | |
c <- eval [$e|Context clone|] | |
[$p|c|] =:: c | |
[$p|c context|] =:: haskell x | |
return c | |
fromValue v = dispatch (single "context" v) >>= fromHaskell "Context" | |
instance Typeable a => Valuable (Z.Socket a) where | |
toValue x = do | |
s <- eval [$e|Socket clone|] | |
[$p|s|] =:: s | |
[$p|s socket|] =:: haskell x | |
return s | |
fromValue v = dispatch (single "socket" v) >>= fromHaskell "Socket a" | |
instance Valuable Z.SocketOption where | |
toValue o = return (par o) | |
where | |
par (Z.HighWM i) = opt "high-watermark" (integer i) | |
par (Z.Swap i) = opt "swap" (integer i) | |
par (Z.Affinity i) = opt "affinity" (integer i) | |
par (Z.Identity s) = opt "identity" (string s) | |
par (Z.Rate i) = opt "rate" (integer i) | |
par (Z.RecoveryIVL i) = opt "recovery-interval" (integer i) | |
par (Z.McastLoop b) = opt "multicast-loopback" (Boolean (b == 1)) | |
par (Z.SendBuf i) = opt "send-buffer" (integer i) | |
par (Z.ReceiveBuf i) = opt "receive-buffer" (integer i) | |
opt n v = keyParticleN [n] [v] | |
integer :: Integral a => a -> Value | |
integer = Integer . fromIntegral | |
fromValue p = return $ opt (fromParticle p) | |
where | |
opt (PMKeyword ["high-watermark"] [_, Just (Integer i)]) = | |
Z.HighWM (fromIntegral i) | |
opt (PMKeyword ["high-watermark"] _) = Z.HighWM 0 | |
opt (PMKeyword ["swap"] [_, Just (Integer i)]) = | |
Z.Swap (fromIntegral i) | |
opt (PMKeyword ["swap"] _) = Z.Swap 0 | |
opt (PMKeyword ["affinity"] [_, Just (Integer i)]) = | |
Z.Affinity (fromIntegral i) | |
opt (PMKeyword ["affinity"] _) = Z.Affinity 0 | |
opt (PMKeyword ["identity"] [_, Just (String t)]) = | |
Z.Identity (fromText t) | |
opt (PMKeyword ["identity"] _) = Z.Identity "" | |
opt (PMKeyword ["rate"] [_, Just (Integer i)]) = | |
Z.Rate (fromIntegral i) | |
opt (PMKeyword ["rate"] _) = Z.Rate 0 | |
opt (PMKeyword ["recovery-interval"] [_, Just (Integer i)]) = | |
Z.RecoveryIVL (fromIntegral i) | |
opt (PMKeyword ["recovery-interval"] _) = Z.RecoveryIVL 0 | |
opt (PMKeyword ["multicast-loopback"] [_, Just (Boolean b)]) = | |
Z.McastLoop (if b then 1 else 0) | |
opt (PMKeyword ["multicast-loopback"] _) = Z.McastLoop 0 | |
opt (PMKeyword ["send-buffer"] [_, Just (Integer i)]) = | |
Z.SendBuf (fromIntegral i) | |
opt (PMKeyword ["send-buffer"] _) = Z.SendBuf 0 | |
opt (PMKeyword ["receive-buffer"] [_, Just (Integer i)]) = | |
Z.ReceiveBuf (fromIntegral i) | |
opt (PMKeyword ["receive-buffer"] _) = Z.ReceiveBuf 0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment