Created
July 15, 2020 22:26
-
-
Save HaloFour/e46da71f9a2876e5541b932789df6b36 to your computer and use it in GitHub Desktop.
Experiments with Project Loom
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sandbox; | |
import java.net.URI; | |
import java.net.http.HttpClient; | |
import java.net.http.HttpRequest; | |
import java.net.http.HttpResponse; | |
import java.time.Duration; | |
import java.time.temporal.ChronoUnit; | |
import java.util.concurrent.CompletionStage; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.TimeUnit; | |
import com.github.tomakehurst.wiremock.WireMockServer; | |
import com.github.tomakehurst.wiremock.client.WireMock; | |
import com.github.tomakehurst.wiremock.core.WireMockConfiguration; | |
import org.junit.jupiter.api.Test; | |
import reactor.core.publisher.Flux; | |
import reactor.core.publisher.Mono; | |
import reactor.test.StepVerifier; | |
import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly; | |
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; | |
import static org.assertj.core.api.Assertions.assertThat; | |
import static sandbox.Async.async; | |
import static sandbox.Async.await; | |
import static sandbox.Async.mono; | |
import static sandbox.Generators.createGenerator; | |
import static sandbox.Helpers.delay; | |
import static sandbox.Helpers.hot; | |
import static sandbox.Helpers.log; | |
import static sandbox.Helpers.logStats; | |
import static sandbox.Helpers.withVirtualExecutor; | |
public class LoomTest { | |
@Test | |
void testContinuation() { | |
var continuationScope = new ContinuationScope("Continuations"); | |
Continuation.yield(continuationScope); | |
Runnable task = () -> log("Hello from Continuation!"); | |
var continuation = new Continuation(continuationScope, task); | |
continuation.run(); | |
assertThat(continuation.isDone()).isTrue(); | |
} | |
@Test | |
void testContinuationYield() { | |
var continuationScope = new ContinuationScope("Continuations"); | |
var continuation = new Continuation(continuationScope, () -> { | |
log("Hello from Continuation!"); | |
Continuation.yield(continuationScope); | |
log("Hello again from Continuation!"); | |
}); | |
continuation.run(); | |
log("Now we're outside of the Continuation!"); | |
assertThat(continuation.isDone()).isFalse(); | |
continuation.run(); | |
log("Outside of the Continuation yet again!"); | |
assertThat(continuation.isDone()).isTrue(); | |
} | |
@Test | |
void testGenerators() { | |
Iterable<Integer> iterable = createGenerator(consumer -> { | |
consumer.accept(3); | |
consumer.accept(2); | |
consumer.accept(1); | |
}); | |
for (var i : iterable) { | |
log("Iterated %d.", i); | |
} | |
} | |
@Test | |
void testResumeFromAnotherThread() throws InterruptedException { | |
var continuationScope = new ContinuationScope("Continuations"); | |
var continuation = new Continuation(continuationScope, () -> { | |
log("Hello from Continuation!"); | |
Continuation.yield(continuationScope); | |
log("Hello again from Continuation!"); | |
}); | |
continuation.run(); | |
log("Now we're outside of the Continuation!"); | |
assertThat(continuation.isDone()).isFalse(); | |
Thread.builder() | |
.name("some-other-thread") | |
.task(continuation::run) | |
.start() | |
.join(); | |
assertThat(continuation.isDone()).isTrue(); | |
} | |
@Test | |
void testAsyncAwait() { | |
CompletionStage<String> future = async(() -> { | |
log("Async method started."); | |
CompletionStage<String> future1 = delay(500, TimeUnit.MILLISECONDS) | |
.thenApply(ignored -> "Hello"); | |
log("First future started."); | |
CompletionStage<String> future2 = delay(750, TimeUnit.MILLISECONDS) | |
.thenApply(ignored -> "World"); | |
log("Second future started."); | |
String value1 = await(future1); | |
log("First future completed = %s.", value1); | |
String value2 = await(future2); | |
log("Second future completed = %s.", value2); | |
return String.format("%s %s!", value1, value2); | |
}); | |
log("We're waiting on the coroutine to finish now."); | |
var result = future.toCompletableFuture().join(); | |
assertThat(result).isEqualTo("Hello World!"); | |
} | |
@Test | |
void testAsyncAwaitReactive() { | |
Mono<String> mono = mono(() -> { | |
log("Async method started."); | |
Mono<String> mono1 = Mono.delay(Duration.ofMillis(500)) | |
.map(ignored -> "Hello") | |
.as(hot()); | |
Mono<String> mono2 = Mono.delay(Duration.ofMillis(600)) | |
.map(ignored -> "World") | |
.as(hot()); | |
String value1 = await(mono1); | |
log("First publisher published: %s", value1); | |
String value2 = await(mono2); | |
log("Second publisher published: %s", value2); | |
return String.format("%s %s!", value1, value2); | |
}); | |
StepVerifier.create(mono) | |
.expectNext("Hello World!") | |
.verifyComplete(); | |
} | |
@Test | |
void testVirtualThread() throws InterruptedException { | |
var virtualThread1 = Thread.startVirtualThread(() -> log("Hello from Virtual Thread!")); | |
virtualThread1.join(); | |
var virtualThread2 = Thread.builder() | |
.virtual() | |
.name("virtual-thread") | |
.task(() -> log("Hello from another Virtual Thread!")) | |
.start(); | |
virtualThread2.join(); | |
var virtualThread3 = Thread.builder() | |
.virtual(Runnable::run) | |
.name("another-virtual-thread") | |
.task(() -> { | |
log("I'm sleepy"); | |
sleepUninterruptibly(1, TimeUnit.SECONDS); | |
log("Hello from a third Virtual Thread!"); | |
}) | |
.start(); | |
log("Third virtual thread started!"); | |
virtualThread3.join(); | |
} | |
@Test | |
void testVirtualThreadExecutor() { | |
var workerThreadFactory = Thread.builder() | |
.name("worker-thread") | |
.factory(); | |
try (var workerExecutor = Executors.newSingleThreadExecutor(workerThreadFactory)) { | |
workerExecutor.submitTask(() -> "prime").join(); | |
var virtualThreadFactory = Thread.builder() | |
.name("virtual-", 0) | |
.virtual(workerExecutor) | |
.factory(); | |
try (var executorService = Executors.newUnboundedExecutor(virtualThreadFactory)) { | |
int threads = 25_000; | |
var startLatch = new CountDownLatch(threads); | |
var endLatch = new CountDownLatch(1); | |
var memoryStats = logStats(); | |
log("Starting virtual threads."); | |
for (int i = 0; i < threads; i++) { | |
executorService.submit(() -> { | |
startLatch.countDown(); | |
awaitUninterruptibly(endLatch); | |
}); | |
} | |
awaitUninterruptibly(startLatch); | |
log("Finished starting %d virtual threads.", threads); | |
logStats(threads, memoryStats); | |
endLatch.countDown(); | |
} | |
log("Virtual threads finished."); | |
} | |
} | |
@Test | |
void testFluxBlockingIterable() { | |
var flux = Flux.interval(Duration.of(1, ChronoUnit.SECONDS)) | |
.publish() | |
.autoConnect(0); | |
withVirtualExecutor(1, virtualExecutor -> { | |
for (int i = 0; i < 5; i++) { | |
final int id = i; | |
virtualExecutor.submit(() -> { | |
for (var value : flux.take(10).toIterable()) { | |
log("%d: Received value %d", id, value); | |
} | |
}); | |
} | |
}); | |
} | |
@Test | |
void testChannelQueue() { | |
var channel = new Channel<String>(); | |
withVirtualExecutor(1, virtualExecutor -> { | |
virtualExecutor.submit(() -> { | |
for (var value : channel.receive()) { | |
log("Received \"%s\"", value); | |
sleepUninterruptibly(100, TimeUnit.MILLISECONDS); | |
} | |
log("All messages received."); | |
}); | |
virtualExecutor.submit(() -> { | |
for (int i = 0; i < 10; i++) { | |
var value = String.format("Hello %d!", i); | |
log("Sending \"%s\".", value); | |
channel.send(value); | |
} | |
log("Closing channel."); | |
channel.close(); | |
}); | |
}); | |
} | |
@Test | |
void testHttpClient() { | |
WireMockServer wireMockServer = new WireMockServer(WireMockConfiguration.options() | |
.port(8080) | |
.containerThreads(20) | |
); | |
log("Starting WireMock"); | |
wireMockServer.start(); | |
try { | |
WireMock.stubFor(WireMock.get("/hello") | |
.willReturn(WireMock.ok("Hello World!") | |
.withFixedDelay(2000))); | |
var httpClient = HttpClient.newBuilder() | |
.executor(Executors.newSingleThreadExecutor()) | |
.build(); | |
withVirtualExecutor(1, virtualExecutor -> { | |
for (int i = 0; i < 10; i++) { | |
final int id = i; | |
virtualExecutor.execute(() -> { | |
var request = HttpRequest.newBuilder(URI.create("http://localhost:8080/hello")) | |
.GET() | |
.build(); | |
log("%d: Sending request.", id); | |
try { | |
var response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); | |
log("%d: Got response: %s", id, response.body()); | |
} catch (Exception exception) { | |
log("%d: Error! %s", id, exception); | |
} | |
}); | |
} | |
}); | |
} finally { | |
wireMockServer.stop(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment