Skip to content

Instantly share code, notes, and snippets.

@mike-neck
Last active July 12, 2022 12:10
Show Gist options
  • Save mike-neck/85cf67e940a70f2fcc0249308adac87e to your computer and use it in GitHub Desktop.
Save mike-neck/85cf67e940a70f2fcc0249308adac87e to your computer and use it in GitHub Desktop.
ヴァーチャルスレッド

VirtualThread のコードについてメモ

  • JVM がスケジューリングする仮想スレッド VirtualThread が Java 19 にてプレビューリリースされます
  • ここでは、 VirtualThread でどのようにタスクは中断・再開されるのかを焦点にコードを読んだまとめです
  • OpenJDK のタグ jdk-19+25(すでに結構古い)の Java のコード、 C++ のコードを追いかけていきます
  • C++ 読んだことがない(文法もわからない)、 JVM の基本的な構造(frame など)を理解してないのでよくわからない箇所がいくつもあります

java.lang.VirtualThread

/**
 * A thread that is scheduled by the Java virtual machine rather than the operating
 * system.
 */
final class VirtualThread extends Thread {
}
  • OS でなく、 JVM によってスケジュールされる Thread
  • 利用するための API 使用例は以下の通り
Thread thread = Thread.ofVirtual()
        .name("my-virtual-thread")
        .allowSetThreadLocals(true)
        .inheritInheritableThreadLocals(true)
        .unstarted(() -> {
            for(int i = 0; i < 10; i++) {
                System.out.println("hello world");
            }
        });
thread.start();
thread.join();

従来の OS スレッド を使ったアプリケーションの問題点

リクエストをスレッドに割り当てる同期アプリケーションはプログラムが読みやすい等のメリットがある一方で、 一定時間に到着するリクエストの増大に対応するにはスレッドをより多く作る必要があります。 従来の JDK のスレッドは生成コストの高い OS スレッドのラッパーであり、 OS で利用できるスレッドの数には制限があります。 そのため、アプリケーションの同時処理能力の性能向上は、スレッドの数で頭打ちになることが多く、 その他のリソース(CPU/ネットワーク接続)が有効活用されていません。

非同期フレームワークによる解決

そこで非同期フレームワーク(Netty/reactive-streams)を用いてアプリケーションを組み立てることで、 スレッド数の頭打ちを解消できます。 しかし、これらはコールバック関数(ラムダ式)の組み合わせによってアプリケーションを構築するため、 逐次的なロジックの組み合わせを前提としてる Java では複雑なコードになります。

仮想スレッドによる解決

一つの OS スレッドに、複数の(OS スレッドに紐付かない)仮想のスレッドを担わせることで スレッドが無数にあるように見せられます。 これによりプログラミングのスタイルを変更することなく、 スレッド数の上限の問題を克服します。


参考

VirtualThread まわりのクラス群

java.lang.VirtualThread

  • java.lang.Thread を継承した軽量スレッド
  • 生成コストが低いため、容易にいくらでも生成できる
  • プールせずに使い、目的の処理が終わったら破棄する

java.util.concurrent.ForkJoinPool

  • VirtualThread の実行を OS のスレッドに割り当てるスケジューラー
  • VirtualThread#start()ForkJoinPoolVirtualThread#runContinuation() を実行するタスクを登録する
  • タスクを実行するプールされるスレッド(OS に紐づく)のクラスは jdk.internal.misc.CarrierThread
    • java.lang.Thread に追加されたフィールドに carrierThread がある
    • みかけのスレッドが VirtualThread で、実際の OS スレッドが CarrierThread

jdk.internal.vm.Continuation

  • 限定継続をあらわす(?)クラス
  • VirtualThread に渡されたタスクの実行開始・中断・再開を扱う
  • 仮想スレッド内で仮想スレッドを起動したケースにも対応している

jdk.internal.vm.StackChunk

  • スレッドの sp 、フレームのサイズ、フレームの引数のサイズを持つクラス(?)

java.util.concurrent.locks.LockSupport

  • スレッドの park/unpark を行う

sun.nio.ch.Poller

  • 各 poller にファイルディスクリプタを登録するインターフェース(抽象クラス)
    • Linux : EPollPoller
    • Windows : WEPollPoller - Windows では wepoll を使ってる!?
    • Mac : KQueuePoller
  • ネットワーク起因でブロックしているスレッドを持っている

仮想スレッド実行までの流れ

シナリオのコード

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import static java.net.http.HttpResponse.BodyHandlers.discarding;

HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
    .GET().uri(URI.create("https://example.com")).build();

Thread thread = Thread.ofVirtual()
  .name("example-virtual-thread")
  .unstarted(() -> {
    HttpResponse<Void> response = client.send(request, discarding());

    System.out.println(response.statusCode());
  });
thread.start();
thread.join();

main スレッドの動作 1

  1. VirtualThread はインスタンス生成時に下記のフィールドが割り当てられる
    1. スケジューラー - スタティックフィールドの ForkJoinPool を割り当てる
    2. Continuation - 正確には VirtualThread$VThreadContinuation/コンストラクターでは VirtualThread#run(Runnable)
    3. runContinuation - runContinuation() メソッドを呼び出す Runnable
    4. stateNEW
  2. VirtualThreadstart() メソッドを呼び出すと、 stateSTARTED に更新され、 submitRunContinuation() メソッドが呼び出される
  3. submitRunContinuation() メソッドでは、スケジューラー(ForkJoinPool)に runContinuation がタスク登録される
  4. ForkJoinPool にタスクが登録されると、そのタスクはワークキューに詰まれて、ワークキューに CarrierThread が生成・実行開始する
  5. VirtualThread#join() を呼び出すと、VirtualThreadtermination フィールド(CountDownLatch) を用いてタスクの終了を待つ

ワーカースレッド(CarrierThread) の動作 1

  1. 実行開始した CarrierThreadVirtualThread#runContinuation を呼び出す
    1. なお、 runContinuation を呼び出したスレッドが VirtualThread である場合は java.lang.WrongThreadException が発生する
  2. runContinuation では以下の処理が行われる
    1. stateRUNNING に更新
    2. Continuation#run を呼び出す
  3. Continuation のスタティック・ネイティブ・メソッド enterSpecial(Continuation, false, true) を呼び出す
    1. このメソッドは @IntrinsicCandidate なメソッドで、 HotSpot VM が手による温かみのあるアセンブラに置き換える
    2. 詳しくは JEP 348 : Compiler Intrinsics for Java SE APIs https://openjdk.org/jeps/348 (日本語 https://kagamihoge.hatenablog.com/entry/2019/03/07/185647)
    3. enterSpecial の呼び出し先は、 JVM の初期化時(?)に gen_continuation_enter 関数により生成される
  4. gen_continuation_enter(Continuation#enterSpecial) では、 以下の処理を行い、 Continuation#enter を実行する
    1. 次に呼び出すメソッドのアドレス解決(Continuation#enter())
    2. ContinuationEntry のメモリ確保・準備
    3. Continuation#enter() の呼び出し
  5. Continuation#enter は、 タスク(VirtualThread#run(Runnable))を呼び出す
  6. VirtualThread#run(Runnable) では、 currentCarrierThread(今実行しているワーカースレッド = OS スレッド)に対して setCurrentThread(this) を呼び出して、現在のスレッドを CarrierThread から VirtualThread に変更する
    1. このことをマウントと読んでいる

仮想スレッド(VirtualThread) の動作 1

  1. タスクを起動する(Runnable#run())。これで仮想スレッド上で、 Runnable の実行が開始される。
  2. HttpClient#send(HttpRequest, BodyHandler) は内部的には sendAsync(HttpRequest, BodyHandler) を実行して得た Future<Void>(正確には CompletableFuture) から Future#get を実行してレスポンスを取り出す
  3. CompletableFuture#get は最終的に LockSupport#park(Object) を呼び出す
  4. LockSupport#park(Object) は最終的に現在実行中の仮想スレッドに対して VirtualThread#doPark() を呼び出す
  5. VirtualThread#doPark() は以下の通り
    1. statePARKING に更新する
    2. carrierThread(= 今実行している仮想スレッドの OS スレッド)に対して、 setCurrentThread(carrierThread) を呼び出し、現在のスレッドを VirtualThread から CarrierThread に戻す(アンマウント)
    3. スタティックメソッド Continuation#yield(ContinuationScope) から、最終的に Continuation#doYield() を呼び出す

ワーカースレッド(CarrierThread) の動作 2

  1. Continuation#doYield()@IntrinsicCandidate なネイティブメソッドで次の通り
    1. TemplateInterpreterGenerator::generate_Continuation_doYield_entryStubGenerator::generate_cont_doYield() により生成される
    2. 実体の関数はたぶん Config::freeze 、 パラメーターは JavaThread* とスタックポインタ
  2. freeze_internal の主要な動作を担うのが Freeze オブジェクトの Freeze::freeze_slow() 関数
    1. Freeze::freeze_start_frame() にて、呼び出し元のスタックポインタ、リンクアドレス、プログラムカウンタを取り出す???(実行中のコードの位置)
    2. stackChunkOop(jdk.internal.vm.StackChunk) を生成、上記 i. で取得した実行中のコードの位置、および現在の Continuation を保存する
    3. 上記 ii. の StackChunk を現在の Continuation.tail に保存する
    4. この処理を失敗した場合は doYield() の呼び出し元に戻る/成功した場合は Continuation#enterSpecial(Continuation, false, true) の呼び出し元(Continuation#run)に戻る(??????)
  3. VirtualThread#runContinuation() に戻り、 afterYield() を呼び出す
  4. VirtualThread#afterYield()statePARKING から PARKED に更新して、 runContinuation を終了
  5. スケジューラーに登録された別のタスク(仮想スレッド)を開始する

ネットワークスレッドの動作

  1. 仮想スレッドの動作2. にて返した CompletableFuture に対して、 complete(value) を呼び出し、 Future#get の値を設定する
  2. LockSupport#unpark(VirtualThread) を呼び出す。これは最終的に VirtualThread#unpark() を呼び出す
  3. VirtualThread#unpark では以下の通り
    1. statePARKED から RUNNABLE に更新する
    2. submitRunContinuation 経由で、スケジューラー(ForkJoinPool)に runContinuation をタスク登録する

ワーカースレッド(CarrierThread) の動作 3

  1. ワーカースレッドの動作 1. と同じ(stateRUNNABLE から RUNNING)
  2. Continuation のネイティブメソッド enterSpecial(Continuation, true, true) を呼び出す
    1. 第 2 パラメーターが true になっている
    2. これは、この Continuation.tailStackChunk が積まれているため
  3. gen_continuation_enter(Continuation#enterSpecial) では以下の処理を行う
    1. ContinuationEntry のメモリ確保・準備
    2. cont_thaw 関数の呼び出し(generate_cont_thaw 関数で生成される)
  4. cont_thaw 関数では ContinuationEntryjdk.internal.vm.Continuation からフレームの状態を復元、スタックを調整する(???)
    1. cont_thaw 関数の戻り先が doYield() 呼び出し地点になる(???!!!???!!!?!?!?!?!)
  5. Continuation.yield(ContinuationScope) から VirtualThread#yieldContinuation() に戻り、再び currentThreadCarrierThread から VirtualThread に戻る(マウント)

仮想スレッド(VirtualThread) の動作 2

  1. CompletableFuture#get に戻り、 ネットワークスレッドの動作 1. で設定したオブジェクトを返す
  2. VirtualThread のタスクが終了し、VirtualThread#run(Runnable) に戻る
    1. currentThreadcarrierThread を設定する(アンマウント)
    2. stateRUNNING から TERMINATED に更新する

ワーカースレッド(CarrierThread) の動作 4

  1. VirtualThread#run(Runnable) -> Continuation#enter0() -> Continuation#enter(Continuation, boolean) と戻り、 Continuation.donetrue が設定される
  2. Continuation#enterSpecial(ネイティブ)に戻り、 クリーンアップして Continuation#run() に戻り、 Continuation.tailnull に設定する
  3. VirtualThread#runContinuation() に戻り VirtualThread#afterTerminate() へ進み、 VirtualThread.termination をカウントダウンする
  4. CarrierThread のタスクが終了し、別のタスク(仮想スレッド)を開始する

main スレッドの動作 2

  1. VirtualThread.temination に対する CountDownLatch#await() が完了し、 VirtualThread#join() が完了
  2. main プログラム終了
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment