Created
December 1, 2020 16:03
-
-
Save jamesthompson/8c2cc62df78fa08e03d9828b84354cf4 to your computer and use it in GitHub Desktop.
Example pub/sub grpc interface with fused-effects
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE AllowAmbiguousTypes #-} | |
{-# LANGUAGE DataKinds #-} | |
{-# LANGUAGE DerivingStrategies #-} | |
{-# LANGUAGE FlexibleContexts #-} | |
{-# LANGUAGE FlexibleInstances #-} | |
{-# LANGUAGE GADTs #-} | |
{-# LANGUAGE GeneralizedNewtypeDeriving #-} | |
{-# LANGUAGE InstanceSigs #-} | |
{-# LANGUAGE KindSignatures #-} | |
{-# LANGUAGE LambdaCase #-} | |
{-# LANGUAGE MultiParamTypeClasses #-} | |
{-# LANGUAGE OverloadedStrings #-} | |
{-# LANGUAGE RankNTypes #-} | |
{-# LANGUAGE ScopedTypeVariables #-} | |
{-# LANGUAGE TemplateHaskell #-} | |
{-# LANGUAGE TypeApplications #-} | |
{-# LANGUAGE TypeFamilies #-} | |
{-# LANGUAGE TypeOperators #-} | |
{-# LANGUAGE UndecidableInstances #-} | |
module Effects.PubSub where | |
import qualified Control.Concurrent.Chan.Unagi.Bounded as U | |
import Data.ProtoLens.Message (defMessage) | |
import qualified Data.Text as T | |
import Effects.Crypto.GoogleCredentials | |
import Effects.Network.HTTP (Manager) | |
import Prelude | |
import Control.Lens | |
import Network.GRPC.Client | |
( CompressMode (Compressed), | |
IncomingEvent (..), | |
OutgoingEvent (..), | |
) | |
import Network.GRPC.Client.Helpers | |
import Network.GRPC.HTTP2.ProtoLens | |
import Network.GRPC.HTTP2.Types | |
( GRPCStatusCode (..), | |
grpcMessageH, | |
grpcStatusH, | |
statusCodeForTrailer, | |
) | |
import Network.HTTP2 | |
( ErrorCodeId (..), | |
toErrorCodeId, | |
) | |
import Network.HTTP2.Client | |
( TooMuchConcurrency (..), | |
runClientIO, | |
) | |
import qualified Proto.Google.Pubsub.V1.Pubsub as P | |
import qualified Proto.Google.Pubsub.V1.Pubsub_Fields as PF | |
declareWrapped | |
[d| | |
newtype AckId = AckId Text deriving (Eq, Show) | |
newtype ClientId = ClientId Text deriving (Eq, Show) | |
newtype MessageId = MessageId Text deriving (Eq, Show) | |
newtype OrderingKey = OrderingKey Text deriving (Eq, Show) | |
newtype Seconds = Seconds Int32 deriving (Eq, Show) | |
newtype StreamBufferCapacity = StreamBufferCapacity Int deriving (Eq, Show) | |
newtype Subscription = Subscription Text deriving (Eq, Show) | |
newtype Topic = Topic Text deriving (Eq, Show) | |
|] | |
-- | PubSub error states | |
data PubSubError | |
= GrpcError (GRPCStatusCode, ByteString) | |
| TransportError ErrorCodeId | |
| TooFastSlowDown | |
| FatalClientErrorStreamEnded | |
deriving (Eq, Show) | |
data PubSub (m :: Type -> Type) (k :: Type) where | |
Publish :: | |
P.PublishRequest -> | |
PubSub m (Either PubSubError [MessageId]) | |
Ack :: | |
P.AcknowledgeRequest -> | |
PubSub m (Either PubSubError ()) | |
UnaryPull :: | |
P.PullRequest -> | |
PubSub m (Either PubSubError [P.ReceivedMessage]) | |
data StreamingPubSub (subscription :: Symbol) (m :: Type -> Type) (k :: Type) where | |
StreamAck :: | |
Proxy subscription -> | |
[AckId] -> | |
StreamingPubSub subscription m () | |
BumpAckDeadlines :: | |
Proxy subscription -> | |
[(AckId, Seconds)] -> | |
StreamingPubSub subscription m () | |
StreamPull :: | |
Proxy subscription -> | |
StreamingPubSub subscription m P.ReceivedMessage | |
publish :: | |
Has PubSub sig m => | |
Topic -> | |
Maybe OrderingKey -> | |
[(ByteString, Map Text Text)] -> | |
m (Either PubSubError [MessageId]) | |
publish topic ordKey msgs = | |
send . Publish $ | |
defMessage & PF.topic .~ (topic ^. _Wrapped') | |
& PF.messages | |
.~ ( case ordKey of | |
Just (OrderingKey okey) -> | |
fmap (set PF.orderingKey okey) msgs' | |
Nothing -> msgs' | |
) | |
where | |
msgs' = | |
msgs <&> \(m, attrs) -> | |
defMessage & PF.data' .~ m | |
& PF.attributes .~ attrs | |
ack :: | |
Has PubSub sig m => | |
Subscription -> | |
[AckId] -> | |
m (Either PubSubError ()) | |
ack sub ackIds = | |
send . Ack $ | |
defMessage & PF.subscription .~ (sub ^. _Wrapped') | |
& PF.ackIds .~ (view _Wrapped' <$> ackIds) | |
unaryPull :: | |
Has PubSub sig m => | |
Subscription -> | |
Int32 -> | |
m (Either PubSubError [P.ReceivedMessage]) | |
unaryPull sub maxMessages = | |
send . UnaryPull $ | |
defMessage & PF.subscription .~ (sub ^. _Wrapped') | |
& PF.maxMessages .~ maxMessages | |
streamAck :: | |
Has (StreamingPubSub subscription) sig m => | |
Proxy subscription -> | |
[AckId] -> | |
m () | |
streamAck p ackIds = | |
send (StreamAck p ackIds) | |
bumpAckDeadlines :: | |
Has (StreamingPubSub subscription) sig m => | |
Proxy subscription -> | |
[(AckId, Seconds)] -> | |
m () | |
bumpAckDeadlines p ackIdsWithSeconds = | |
send (BumpAckDeadlines p ackIdsWithSeconds) | |
streamPull :: | |
Has (StreamingPubSub subscription) sig m => | |
Proxy subscription -> | |
m P.ReceivedMessage | |
streamPull = | |
send . StreamPull | |
-- | `http2-grpc-haskell` IO implementations | |
newtype PubSubIOC m a = PubSubIOC {runPubSubIOC :: ReaderC GrpcClient m a} | |
deriving newtype (Functor, Applicative, Monad, MonadIO) | |
instance | |
(MonadIO m, Algebra sig m) => | |
Algebra (PubSub :+: sig) (PubSubIOC m) | |
where | |
alg hdl sig ctx = PubSubIOC $ case sig of | |
L (Publish req) -> do | |
grpcClient <- ask | |
fmap ((<$ ctx) . fmap (fmap MessageId . view PF.messageIds) . handlePubSubResp) . liftIO . runClientIO $ | |
rawUnary | |
(RPC :: RPC P.Publisher "publish") | |
grpcClient | |
req | |
L (UnaryPull req) -> do | |
grpcClient <- ask | |
fmap ((<$ ctx) . (fmap (view PF.receivedMessages)) . handlePubSubResp) . liftIO . runClientIO $ | |
rawUnary | |
(RPC :: RPC P.Subscriber "pull") | |
grpcClient | |
req | |
L (Ack req) -> do | |
grpcClient <- ask | |
fmap ((<$ ctx) . (() <$) . handlePubSubResp) . liftIO . runClientIO $ | |
rawUnary | |
(RPC :: RPC P.Subscriber "acknowledge") | |
grpcClient | |
req | |
R other -> | |
alg (runPubSubIOC . hdl) (R other) ctx | |
where | |
handlePubSubResp resp = case resp of | |
Right (Right (Right (_, _, Right resp'))) -> pure resp' | |
Right (Right (Right (_, Just errHdrs, Left _))) -> | |
Left $ | |
GrpcError $ | |
fromMaybe (INTERNAL, "No grpc error message given") $ | |
(,) | |
<$> ( find ((==) grpcStatusH . fst) errHdrs | |
>>= \(_, v) -> statusCodeForTrailer v | |
) | |
<*> (snd <$> find ((==) grpcMessageH . fst) errHdrs) | |
Right (Right (Right (_, Nothing, Left _))) -> Left $ GrpcError (INTERNAL, "No grpc error message given") | |
Right (Right (Left transportError)) -> Left $ TransportError (toErrorCodeId transportError) | |
Right (Left (TooMuchConcurrency _)) -> Left TooFastSlowDown | |
Left _ -> Left FatalClientErrorStreamEnded | |
newtype StreamingPubSubIOC (subscription :: Symbol) m a = StreamingPubSubIOC | |
{ runStreamingPubSubIOC :: ReaderC (U.OutChan P.ReceivedMessage, U.InChan P.StreamingPullRequest) m a | |
} | |
deriving newtype (Functor, Applicative, Monad, MonadIO) | |
instance | |
(MonadIO m, Algebra sig m, KnownSymbol subscription) => | |
Algebra (StreamingPubSub subscription :+: sig) (StreamingPubSubIOC subscription m) | |
where | |
alg hdl sig ctx = StreamingPubSubIOC $ case sig of | |
L (StreamAck p ids) -> do | |
(_, chan) <- ask @(U.OutChan P.ReceivedMessage, U.InChan P.StreamingPullRequest) | |
fmap (<$ ctx) . liftIO $ | |
U.writeChan chan $ | |
defMessage & PF.subscription .~ (T.pack $ symbolVal p) | |
& PF.ackIds .~ (view _Wrapped' <$> ids) | |
L (BumpAckDeadlines p ids) -> do | |
(_, chan) <- ask @(U.OutChan P.ReceivedMessage, U.InChan P.StreamingPullRequest) | |
fmap (<$ ctx) . liftIO $ | |
U.writeChan chan $ | |
defMessage & PF.subscription .~ (T.pack $ symbolVal p) | |
& PF.modifyDeadlineSeconds .~ (view (_2 . _Wrapped') <$> ids) | |
& PF.modifyDeadlineAckIds .~ (view (_1 . _Wrapped') <$> ids) | |
L (StreamPull _) -> do | |
(chan, _) <- ask @(U.OutChan P.ReceivedMessage, U.InChan P.StreamingPullRequest) | |
fmap (<$ ctx) . liftIO $ U.readChan chan | |
R other -> | |
alg (runStreamingPubSubIOC . hdl) (R other) ctx | |
-- | Run a pubsub computation given a standard TLS 'Manager' | |
runPubSub :: | |
MonadIO m => | |
-- | GRPC client TLS Manager | |
Manager -> | |
-- | PubSub computation | |
PubSubIOC m a -> | |
m a | |
runPubSub mgr p = do | |
s <- liftIO . runClientIO $ initCredsStore mgr | |
case s of | |
Right s' -> do | |
grpcClient' <- | |
liftIO . runClientIO $ | |
setupGrpcClient | |
( (grpcClientConfigSimple "pubsub.googleapis.com" 443 UseTls) | |
{ _grpcClientConfigHeaders = pure <$> getOAuthTokenHeader mgr s' | |
} | |
) | |
case grpcClient' of | |
Right grpcClient -> runReader grpcClient $ runPubSubIOC p | |
Left failure -> panic ("Unable to boot Pub/Sub client: " <> show failure) | |
Left failure -> panic ("Unable to find default google application credentials: " <> show failure) | |
-- | Run a streaming pubsub subscription given a standard TLS 'Manager' | |
runStreamingPubSub :: | |
forall subscription m a. | |
(KnownSymbol subscription, MonadIO m) => | |
-- | GRPC client TLS Manager | |
Manager -> | |
-- | Streaming ack deadline seconds | |
Seconds -> | |
-- | Unique client id | |
ClientId -> | |
-- | Channel buffering capacity | |
StreamBufferCapacity -> | |
-- | PubSub computation | |
StreamingPubSubIOC subscription m a -> | |
m a | |
runStreamingPubSub mgr deadline uniqueClientId bufferCapacity p = do | |
s <- liftIO . runClientIO $ initCredsStore mgr | |
case s of | |
Right s' -> do | |
grpcClient' <- | |
liftIO . runClientIO $ | |
setupGrpcClient | |
( (grpcClientConfigSimple "pubsub.googleapis.com" 443 UseTls) | |
{ _grpcClientConfigHeaders = pure <$> getOAuthTokenHeader mgr s' | |
} | |
) | |
case grpcClient' of | |
Right grpcClient -> do | |
let capacity = bufferCapacity ^. _Wrapped' | |
(input, output) <- liftIO $ U.newChan capacity | |
(input', output') <- liftIO $ U.newChan capacity | |
let incomingHandler _ (RecvMessage m) = | |
liftIO $ traverse_ (U.writeChan input) (m ^. PF.receivedMessages) | |
incomingHandler _ _ = liftIO $ pure () | |
let outgoingHandler _ = do | |
acks <- liftIO $ U.readChan output' | |
pure ((), SendMessage Compressed acks) | |
_ <- | |
liftIO . async | |
. runClientIO | |
$ rawGeneralStream | |
(RPC :: RPC P.Subscriber "streamingPull") | |
grpcClient | |
() | |
incomingHandler | |
() | |
outgoingHandler | |
-- Kick off the subscription - TODO : how to restart a subscription if the stream dies? | |
liftIO $ | |
U.writeChan input' $ | |
defMessage & PF.subscription .~ (T.pack $ symbolVal (Proxy @subscription)) | |
& PF.streamAckDeadlineSeconds .~ (deadline ^. _Wrapped') | |
& PF.clientId .~ (uniqueClientId ^. _Wrapped') | |
-- TODO: Bump googleapis version to set flow control params for backpressure to server | |
-- see: https://github.com/googleapis/googleapis/blob/b821f320473c8ec05a1c7fb9a496c958b1ab9834/google/pubsub/v1/pubsub.proto#L1096-L1117 | |
runReader (output, input') $ runStreamingPubSubIOC p | |
Left failure -> panic ("Unable to boot Pub/Sub client: " <> show failure) | |
Left failure -> panic ("Unable to find default google application credentials: " <> show failure) | |
-- * TEST unary and streaming programs | |
-- testProgram :: (MonadIO m, Has PubSub sig m) => m () | |
-- testProgram = do | |
-- liftIO $ print "Running pubsub test" | |
-- res <- | |
-- publish | |
-- (Topic "projects/james-78278/topics/jamestest") | |
-- Nothing | |
-- [ ("test message number one", _Wrapped # []), | |
-- ("test message number two", _Wrapped # []), | |
-- ("test message number three", _Wrapped # []) | |
-- ] | |
-- liftIO $ print res | |
-- type TestSubscription1 = "projects/james-78278/subscriptions/james-test-sub" | |
-- type TestSubscription2 = "projects/james-78278/subscriptions/james-test-sub234" | |
-- testSubscription1 :: Proxy TestSubscription1 | |
-- testSubscription1 = Proxy | |
-- testSubscription2 :: Proxy TestSubscription2 | |
-- testSubscription2 = Proxy | |
-- testStream :: | |
-- ( Has (StreamingPubSub TestSubscription1) sig m, | |
-- Has (StreamingPubSub TestSubscription2) sig m | |
-- ) => | |
-- m () | |
-- testStream = do | |
-- res <- streamPull testSubscription1 | |
-- res2 <- streamPull testSubscription2 | |
-- res3 <- streamPull testSubscription1 | |
-- streamAck testSubscription1 (AckId <$> [res ^. PF.ackId, res3 ^. PF.ackId]) | |
-- streamAck testSubscription2 [AckId $ res2 ^. PF.ackId] | |
-- pure () |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment