Skip to content

Instantly share code, notes, and snippets.

@minwoox
Created November 4, 2019 10:13
Show Gist options
  • Save minwoox/baded19e989364c61cce25d04d71c11b to your computer and use it in GitHub Desktop.
Save minwoox/baded19e989364c61cce25d04d71c11b to your computer and use it in GitHub Desktop.
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpObject;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpResponseWriter;
import com.linecorp.armeria.common.ResponseHeaders;
import com.linecorp.armeria.common.util.EventLoopGroups;
import com.linecorp.armeria.server.HttpService;
import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.ServiceRequestContext;
import io.netty.channel.EventLoopGroup;
class FutureTest {
@Test
void futureCallback() throws InterruptedException {
System.err.println("Thread name: " + Thread.currentThread().getName());
final CompletableFuture<String> future = new CompletableFuture<>();
final Thread thread = new Thread(() -> {
future.complete("hello");
});
thread.setName("MyThread");
thread.start();
final EventLoopGroup eventLoopGroup = EventLoopGroups.newEventLoopGroup(1);
future.handleAsync((unused1, unused2) -> {
System.err.println("In handle. Thread name: " + Thread.currentThread().getName());
return null;
});
}
@Test
void myAggregatedHttpResponse() {
final HttpResponseWriter res = HttpResponse.streaming();
res.write(ResponseHeaders.of(200));
res.write(HttpData.ofUtf8("hello"));
res.close();
final CompletableFuture<MyAggregatedHttpResponse> future = aggregate(res);
final MyAggregatedHttpResponse join = future.join();
System.err.println(join.headers());
}
private CompletableFuture<MyAggregatedHttpResponse> aggregate(HttpResponse res) {
final CompletableFuture<MyAggregatedHttpResponse> result = new CompletableFuture<>();
res.subscribe(new Subscriber<HttpObject>() {
private ResponseHeaders responseHeaders;
private HttpData httpData;
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
s.request(1);
}
@Override
public void onNext(HttpObject httpObject) {
if (httpObject instanceof ResponseHeaders) {
responseHeaders = (ResponseHeaders) httpObject;
} else {
httpData = (HttpData) httpObject;
}
subscription.request(1);
}
@Override
public void onError(Throwable t) {
result.completeExceptionally(t);
}
@Override
public void onComplete() {
result.complete(new MyAggregatedHttpResponse(responseHeaders, httpData));
}
});
return result;
}
static class MyAggregatedHttpResponse {
private final ResponseHeaders responseHeaders;
private final HttpData httpData;
MyAggregatedHttpResponse(ResponseHeaders responseHeaders, HttpData httpData) {
this.responseHeaders = responseHeaders;
this.httpData = httpData;
}
public ResponseHeaders headers() {
return responseHeaders;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment