- JVM がスケジューリングする仮想スレッド
VirtualThread
が Java 19 にてプレビューリリースされます - ここでは、
VirtualThread
でどのようにタスクは中断・再開されるのかを焦点にコードを読んだまとめです - OpenJDK のタグ jdk-19+25(すでに結構古い)の Java のコード、 C++ のコードを追いかけていきます
- C++ 読んだことがない(文法もわからない)、 JVM の基本的な構造(
frame
など)を理解してないのでよくわからない箇所がいくつもあります
/**
* A thread that is scheduled by the Java virtual machine rather than the operating
* system.
*/
final class VirtualThread extends Thread {
}
- OS でなく、 JVM によってスケジュールされる
Thread
- 利用するための API 使用例は以下の通り
Thread thread = Thread.ofVirtual()
.name("my-virtual-thread")
.allowSetThreadLocals(true)
.inheritInheritableThreadLocals(true)
.unstarted(() -> {
for(int i = 0; i < 10; i++) {
System.out.println("hello world");
}
});
thread.start();
thread.join();
リクエストをスレッドに割り当てる同期アプリケーションはプログラムが読みやすい等のメリットがある一方で、 一定時間に到着するリクエストの増大に対応するにはスレッドをより多く作る必要があります。 従来の JDK のスレッドは生成コストの高い OS スレッドのラッパーであり、 OS で利用できるスレッドの数には制限があります。 そのため、アプリケーションの同時処理能力の性能向上は、スレッドの数で頭打ちになることが多く、 その他のリソース(CPU/ネットワーク接続)が有効活用されていません。
そこで非同期フレームワーク(Netty/reactive-streams)を用いてアプリケーションを組み立てることで、 スレッド数の頭打ちを解消できます。 しかし、これらはコールバック関数(ラムダ式)の組み合わせによってアプリケーションを構築するため、 逐次的なロジックの組み合わせを前提としてる Java では複雑なコードになります。
一つの OS スレッドに、複数の(OS スレッドに紐付かない)仮想のスレッドを担わせることで スレッドが無数にあるように見せられます。 これによりプログラミングのスタイルを変更することなく、 スレッド数の上限の問題を克服します。
- JEP 425: Virtual Threads - https://openjdk.org/jeps/425
- 上記の日本語訳 - https://b.chiroito.dev/entry/virtualthreads1
java.lang.Thread
を継承した軽量スレッド- 生成コストが低いため、容易にいくらでも生成できる
- プールせずに使い、目的の処理が終わったら破棄する
VirtualThread
の実行を OS のスレッドに割り当てるスケジューラーVirtualThread#start()
はForkJoinPool
にVirtualThread#runContinuation()
を実行するタスクを登録する- タスクを実行するプールされるスレッド(OS に紐づく)のクラスは
jdk.internal.misc.CarrierThread
java.lang.Thread
に追加されたフィールドにcarrierThread
がある- みかけのスレッドが
VirtualThread
で、実際の OS スレッドがCarrierThread
- 限定継続をあらわす(?)クラス
VirtualThread
に渡されたタスクの実行開始・中断・再開を扱う- 仮想スレッド内で仮想スレッドを起動したケースにも対応している
- スレッドの sp 、フレームのサイズ、フレームの引数のサイズを持つクラス(?)
- スレッドの
park
/unpark
を行う
- 各 poller にファイルディスクリプタを登録するインターフェース(抽象クラス)
- Linux :
EPollPoller
- Windows :
WEPollPoller
- Windows ではwepoll
を使ってる!? - Mac :
KQueuePoller
- Linux :
- ネットワーク起因でブロックしているスレッドを持っている
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import static java.net.http.HttpResponse.BodyHandlers.discarding;
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.GET().uri(URI.create("https://example.com")).build();
Thread thread = Thread.ofVirtual()
.name("example-virtual-thread")
.unstarted(() -> {
HttpResponse<Void> response = client.send(request, discarding());
System.out.println(response.statusCode());
});
thread.start();
thread.join();
VirtualThread
はインスタンス生成時に下記のフィールドが割り当てられる- スケジューラー - スタティックフィールドの
ForkJoinPool
を割り当てる Continuation
- 正確にはVirtualThread$VThreadContinuation
/コンストラクターではVirtualThread#run(Runnable)
runContinuation
-runContinuation()
メソッドを呼び出すRunnable
state
はNEW
- スケジューラー - スタティックフィールドの
VirtualThread
のstart()
メソッドを呼び出すと、state
がSTARTED
に更新され、submitRunContinuation()
メソッドが呼び出されるsubmitRunContinuation()
メソッドでは、スケジューラー(ForkJoinPool
)にrunContinuation
がタスク登録されるForkJoinPool
にタスクが登録されると、そのタスクはワークキューに詰まれて、ワークキューにCarrierThread
が生成・実行開始するVirtualThread#join()
を呼び出すと、VirtualThread
のtermination
フィールド(CountDownLatch
) を用いてタスクの終了を待つ
- 実行開始した
CarrierThread
はVirtualThread#runContinuation
を呼び出す- なお、
runContinuation
を呼び出したスレッドがVirtualThread
である場合はjava.lang.WrongThreadException
が発生する
- なお、
runContinuation
では以下の処理が行われるstate
をRUNNING
に更新Continuation#run
を呼び出す
Continuation
のスタティック・ネイティブ・メソッドenterSpecial(Continuation, false, true)
を呼び出す- このメソッドは
@IntrinsicCandidate
なメソッドで、 HotSpot VM が手による温かみのあるアセンブラに置き換える - 詳しくは JEP 348 : Compiler Intrinsics for Java SE APIs https://openjdk.org/jeps/348 (日本語 https://kagamihoge.hatenablog.com/entry/2019/03/07/185647)
enterSpecial
の呼び出し先は、 JVM の初期化時(?)にgen_continuation_enter
関数により生成される
- このメソッドは
gen_continuation_enter
(Continuation#enterSpecial
) では、 以下の処理を行い、Continuation#enter
を実行する- 次に呼び出すメソッドのアドレス解決(
Continuation#enter()
) ContinuationEntry
のメモリ確保・準備Continuation#enter()
の呼び出し
- 次に呼び出すメソッドのアドレス解決(
Continuation#enter
は、 タスク(VirtualThread#run(Runnable)
)を呼び出すVirtualThread#run(Runnable)
では、currentCarrierThread
(今実行しているワーカースレッド = OS スレッド)に対してsetCurrentThread(this)
を呼び出して、現在のスレッドをCarrierThread
からVirtualThread
に変更する- このことをマウントと読んでいる
- タスクを起動する(
Runnable#run()
)。これで仮想スレッド上で、Runnable
の実行が開始される。 HttpClient#send(HttpRequest, BodyHandler)
は内部的にはsendAsync(HttpRequest, BodyHandler)
を実行して得たFuture<Void>
(正確にはCompletableFuture
) からFuture#get
を実行してレスポンスを取り出すCompletableFuture#get
は最終的にLockSupport#park(Object)
を呼び出すLockSupport#park(Object)
は最終的に現在実行中の仮想スレッドに対してVirtualThread#doPark()
を呼び出すVirtualThread#doPark()
は以下の通りstate
をPARKING
に更新するcarrierThread
(= 今実行している仮想スレッドの OS スレッド)に対して、setCurrentThread(carrierThread)
を呼び出し、現在のスレッドをVirtualThread
からCarrierThread
に戻す(アンマウント)- スタティックメソッド
Continuation#yield(ContinuationScope)
から、最終的にContinuation#doYield()
を呼び出す
Continuation#doYield()
は@IntrinsicCandidate
なネイティブメソッドで次の通りTemplateInterpreterGenerator::generate_Continuation_doYield_entry
とStubGenerator::generate_cont_doYield()
により生成される- 実体の関数はたぶん
Config::freeze
、 パラメーターはJavaThread*
とスタックポインタ
freeze_internal
の主要な動作を担うのがFreeze
オブジェクトのFreeze::freeze_slow()
関数Freeze::freeze_start_frame()
にて、呼び出し元のスタックポインタ、リンクアドレス、プログラムカウンタを取り出す???(実行中のコードの位置)stackChunkOop
(jdk.internal.vm.StackChunk
) を生成、上記 i. で取得した実行中のコードの位置、および現在のContinuation
を保存する- 上記 ii. の
StackChunk
を現在のContinuation.tail
に保存する - この処理を失敗した場合は
doYield()
の呼び出し元に戻る/成功した場合はContinuation#enterSpecial(Continuation, false, true)
の呼び出し元(Continuation#run
)に戻る(??????)
VirtualThread#runContinuation()
に戻り、afterYield()
を呼び出すVirtualThread#afterYield()
でstate
をPARKING
からPARKED
に更新して、runContinuation
を終了- スケジューラーに登録された別のタスク(仮想スレッド)を開始する
- 仮想スレッドの動作2. にて返した
CompletableFuture
に対して、complete(value)
を呼び出し、Future#get
の値を設定する LockSupport#unpark(VirtualThread)
を呼び出す。これは最終的にVirtualThread#unpark()
を呼び出すVirtualThread#unpark
では以下の通りstate
をPARKED
からRUNNABLE
に更新するsubmitRunContinuation
経由で、スケジューラー(ForkJoinPool
)にrunContinuation
をタスク登録する
- ワーカースレッドの動作 1. と同じ(
state
はRUNNABLE
からRUNNING
) Continuation
のネイティブメソッドenterSpecial(Continuation, true, true)
を呼び出す- 第 2 パラメーターが
true
になっている - これは、この
Continuation.tail
にStackChunk
が積まれているため
- 第 2 パラメーターが
gen_continuation_enter
(Continuation#enterSpecial
) では以下の処理を行うContinuationEntry
のメモリ確保・準備cont_thaw
関数の呼び出し(generate_cont_thaw
関数で生成される)
cont_thaw
関数ではContinuationEntry
とjdk.internal.vm.Continuation
からフレームの状態を復元、スタックを調整する(???)cont_thaw
関数の戻り先がdoYield()
呼び出し地点になる(???!!!???!!!?!?!?!?!)
Continuation.yield(ContinuationScope)
からVirtualThread#yieldContinuation()
に戻り、再びcurrentThread
がCarrierThread
からVirtualThread
に戻る(マウント)
CompletableFuture#get
に戻り、 ネットワークスレッドの動作 1. で設定したオブジェクトを返すVirtualThread
のタスクが終了し、VirtualThread#run(Runnable)
に戻るcurrentThread
にcarrierThread
を設定する(アンマウント)state
をRUNNING
からTERMINATED
に更新する
VirtualThread#run(Runnable)
->Continuation#enter0()
->Continuation#enter(Continuation, boolean)
と戻り、Continuation.done
にtrue
が設定されるContinuation#enterSpecial
(ネイティブ)に戻り、 クリーンアップしてContinuation#run()
に戻り、Continuation.tail
をnull
に設定するVirtualThread#runContinuation()
に戻りVirtualThread#afterTerminate()
へ進み、VirtualThread.termination
をカウントダウンするCarrierThread
のタスクが終了し、別のタスク(仮想スレッド)を開始する
VirtualThread.temination
に対するCountDownLatch#await()
が完了し、VirtualThread#join()
が完了- main プログラム終了