#Thoughts on retry in streams ##This document is a WIP Too often, when writing akka-stream code, I find my self in need of retrying some logic.
a very common scenario,
is when I use connection pools, e.g: mySource.via(Http().superPool())
in this case, what we get from superPool
is a flow of type: Flow[(HttpRequest,T),(Try[HttpResponse,T),NotUsed]
.
we can't do anything with this flow, let alone retry based on some smart decision.
So why not have a generic retry combinator?
since akka-http connection pools yield flows that look like Flow[(A,S),(Try[B],S),_]
,
I decided to base the API on this kind of flows.
assume the existence of this FanOutShape2
, which would be easy to implement:
class PartitionWith[In,Out0,Out1](p: In => Either[Out0,Out1]) extends GraphStage[FanOutShape2[In,Out0,Out1]]
and would work very similar to a method Iv'e written in my personal util, to operate on collections.
with this stage and 2 other trivial stages, we can come up with a fairly reasonable API:
def retry[I,O,S,M](flow: Flow[(I,S),(Try[O],S),M])(retryWith: S => Option[(I,S)]): Flow[(I,S),Try[O],M]
which could be implemented like:
the custom increment and decrement stages is needed for graceful stream completion. the increment stage will intercept upstream completion, and will only complete itself if there is no retry leftovers cycling in the graph. if there are such leftovers, it will wait until enough decrement counter messages is received.
PartitionWith
stage is created using retryWith
as follows:
{
case (t@Success(_),_) => Right(t)
case (t,s) => retryWith(s).fold(Right(t))(Left.apply)
}
the generic type S
should be some state object that the user can extract another (I,S)
- input and state tuple,
similar to how you might use unfold
.
##Bonus also, we can consider a "family" of retries, such as:
def retryConcat[I,O,S,M](flow: Flow[(I,S),(Try[O],S),M])(retryWith: S => Option[Seq[(I,S)]]): Flow[(I,S),Try[O],M]
//or (empty sequence means to not retry again):
def retryConcat[I,O,S,M](flow: Flow[(I,S),(Try[O],S),M])(retryWith: S => Seq[(I,S)]): Flow[(I,S),Try[O],M]
which might be handy if the input elements I
can be broke down to smaller pieces, that may succeed where the uber object failed.
e.g: an HttpRequest
for some web service bulk API, with heavy body, may be broken down to multiple requests using the regular API.
def retryAsync[I,O,S,M](flow: Flow[(I,S),(Try[O],S),M])(retryWith: S => Future[Option[(I,S)]]): Flow[(I,S),Try[O],M]
which is pretty obvious. if I had the base version analogous to unfold
, this may be the unfoldAsync
equivalent.
feedback, and c&c are very welcome.
That's interesting. Do you have an implementation for "Custom Increment" and "Custom Decrement" stages?