Skip to content

Instantly share code, notes, and snippets.

@ktoso
Last active August 29, 2015 14:07
Show Gist options
  • Save ktoso/7ad72660f966e3133256 to your computer and use it in GitHub Desktop.
Save ktoso/7ad72660f966e3133256 to your computer and use it in GitHub Desktop.
/*
trait MaterializedMap {
def materializedDrain[Res](key: DrainWithKey[_, Res]): Res
}
*/
class X {
{
MaterializedMap materializedMap = new MaterializedMap();
RealFold<String, String> fold = new RealFold<String, String>();
materializedMap.materializedDrain(fold);
}
}
// extends the "real DrainwithKey" but with changed type member to type param:
class RealFold<In, U> implements DrainWithKey<In, Future<U>> {
@Override public Future<U> runWith(TapWithKey<In> tap, FlowMaterializer materializer) {
return null;
}
@Override
public Future<U> attach(Publisher<In> flowPublisher, ActorBasedFlowMaterializer materializer, String flowName) {
return null;
}
@Override public Tuple2<Subscriber<In>, Future<U>> create(ActorBasedFlowMaterializer materializer, String flowName) {
return null;
}
@Override public boolean isActive() {
return false;
}
}
trait DrainWithKey[-In, +MaterializedType] extends Drain[In, MaterializedType] {
// type MaterializedType
/**
* Attach this drain to the given [[org.reactivestreams.Publisher]]. Using the given
* [[FlowMaterializer]] is completely optional, especially if this drain belongs to
* a different Reactive Streams implementation. It is the responsibility of the
* caller to provide a suitable FlowMaterializer that can be used for running
* Flows if necessary.
*
* @param flowPublisher the Publisher to consume elements from
* @param materializer a FlowMaterializer that may be used for creating flows
* @param flowName the name of the current flow, which should be used in log statements or error messages
*/
def attach(flowPublisher: Publisher[In @uncheckedVariance], materializer: ActorBasedFlowMaterializer, flowName: String): MaterializedType
/**
* This method is only used for Drains that return true from [[#isActive]], which then must
* implement it.
*/
def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Subscriber[In] @uncheckedVariance, MaterializedType) =
throw new UnsupportedOperationException(s"forgot to implement create() for $getClass that says isActive==true")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment