本文章は,Impala のメモ の TODO のうち,BackEnd の特にクエリの実行部を読み,文章化したものである.
BackEnd(BE) は単体のデーモンとして動作し,FE 側でパースした結果(Job ID/Plan)に基づいて処理を実行する.FE と BE のやりとりには Thrift が用いられている.
|FE| - パース結果(Job ID/実行Planなど) thrift -> |BE|
実行プランは,Tree として表現される.Tree の構成要素を Node と呼ぶ.Node は Expr インタフェースを実装する必要がある. Tree は,FE で構築されて,Thrift 経由でシリアライズされ,BE 側の ExecNode::CreateTree() を実行することで再構築できる.
Node の種類には,実際に処理をおこなう ExecNode とデータの読み込み・スキャンを行う ScanNode がある.Node 一覧は以下の通り.
-
HDFS_SCAN_NODE : HDFS からデータ読み込みを行う.RCFile/Text/SequentialFile などに対応.
-
HBASE_SCAN_NODE : HDFS からデータ読み込みを行う.
-
HASH_JOIN_NODE : Hash Join を行う.
-
AGGREGATION_NODE : MIN/MAX/SUM/COUNT などの集約処理を行う.
-
SORT_NODE : 入力された RowBatch に対して ソートをおこなう.未実装.
-
EXCHANGE_NODE : Receiver node for data streams. This simply feeds row batches received from the ata stream into the execution tree(exchange-node.h より引用).
-
MERGE_NODE : Node that merges the results of its children by materializing their evaluated expressions into row batches. The MergeNode pulls row batches sequentially from its children sequentially(merge-node.h より引用).
- Prepare : Row の集合全体を Table と見立てて初期化したり,LLVM の codegen を行ったりする.
- Open : HDFS へ接続を行う.
- GetNext : HDFS のブロックを読み込み,Row に変換する.RCFile なら hdfs-scanner-rcfile.cc に実装がある.
- Close : HDFS への接続を破棄する.
- SetScanRanges : ファイル読み込みのための初期化をおこなう.
- Prepare: 結果を格納する Tuple の初期化や,LLVM の codegen の初期化を行ったりする.
- Open: codegen された IR を JIT する.Row の集合である RowBatch を初期化し,処理できるようにする.
- Close : バッファをクリアする.
- UpdateAggTuple : 集約処理の本体.FEでパースした結果を利用して UpdateMinSlot/UpdateMaxSlot/UpdateSumSlot を呼び出し,入力値である RowBatch を対象に Min/Max/Sum などの処理を行う.
Node は以下のメソッドを実装する必要がある.
- Prepare : 初期化メソッド.リクエストを受け付けた際に呼び出される.多くの Node では,ここで LLVM の Codegen を行う.
- Open : 初期化メソッド.HDFS_SCAN_NODE の場合は HDFS に接続しにいく.
- GetNext : ScanNode の場合は,次の Row を読み込んでバッファに格納する.
- Close : ScanNode の場合はストレージへの接続を解除する.
- CreateTree : 実行プランを Tree として構築する.
- EvalConjuncts: ??
- CodegenEvalConjuncts: ??
各 Node は,処理を Tuple という単位で処理を行う.実際に ExecNode が処理を行う場合は,Tuple クラスを直接用いず,Tuple を抽象化した TupleRow クラスおよび TupleRow の集合である RowBatch クラスを用いて処理を行う.以下に各クラスの概要を示す.
1レコード.何を1レコードと見なすかは各ScanNodeが決めている. HBASE_SCAN_NODE の場合は HBase の 1 Rowに相当.API は以下の通り.
- Create
- Init
- DeepCopy
- SetNull
- SetNotNull
- IsNull
- GetSlot
Tuple を抽象化したクラスで,これが処理する最小単位.内部で Tuple へのポインタを保持している.API は以下の通り.
- SetTuple
- DeepCopy
メモリ内にある Tuple の集合State としては in-flight(未処理のtupleがあるかないか)/non in-flight がある.API は以下の通り.
- AddRow: 行を追加し,in-flight を false にする.
- CommitRows(int n): n 行分だけ
- Swap : 現在処理している RowBatch と,引数で与えられた RowBatch に変える. 非同期実行中に,新たな入力が来た際に使うらしい.
- まだあるけど省略
-
読み切れていない Node を全て読み,文章化する.
- HBASE_SCAN_NODE
- HASH_JOIN_NODE
- SORT_NODE
- EXCHANGE_NODE
- MERGE_NODE
とりわけ,EXCHANGE Node はデータのやりとりをする際のバッファとして機能するようなので,全体の動きを把握するために理解が必須.
-
Coordnator を文章化する.
-
Tree の実行フローを記述する.
-
LLVM 依存部を理解し,文章化する.特に EvalConjuncts 周り.