- Presto 0.73
https://gist.github.com/ashigeru/9518638
QueryResource
public Response createQuery(...)
presto-main/src/main/java/com/facebook/presto/server/QueryResource.java:L103
リクエストを受け取る
SqlQueryManager
public QueryInfo createQuery(ConnectorSession session, String query)
presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java:L196
クエリ発行リクエストの処理
SqlParser
public Statement createStatement(String sql)
presto-parser/src/main/java/com/facebook/presto/sql/parser/SqlParser.java:L50
return createStatement(parseStatement(sql));
CommonTree parseStatement(String sql)
presto-parser/src/main/java/com/facebook/presto/sql/parser/SqlParser.java:L96
ジェネリックなASTを返すStatement createStatement(CommonTree tree)
presto-parser/src/main/java/com/facebook/presto/sql/parser/SqlParser.java:L71
ASTモデル構築
SqlQueryExecutionFactory
presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java:L425
SqlQueryExecution
を返すSqlQueryStarter
presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java:L392
queryExecution.start()
SqlQueryExecution
presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java:L134
Analyer
presto-main/src/main/java/com/facebook/presto/sql/analyzer/Analyzer.java:L48
名前表を作ったり、型の計算をしたり
LogicalPlanner
presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java:L64
論理オペレータツリーの作成、最適化など
DistributedLogicalPlanner
presto-main/src/main/java/com/facebook/presto/sql/planner/DistributedLogicalPlanner.java:L84
ステージに分割
private void planDistribution(SubPlan subplan)
presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java:L215DistributedExecutionPlanner
presto-main/src/main/java/com/facebook/presto/sql/planner/DistributedExecutionPlanner.java:L65SqlStageExecution
public Future<?> start()
presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L500public Future<?> scheduleStartTasks()
presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L509private void startTasks()
presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L527
private RemoteTask scheduleTask(int id, Node node, PlanNodeId sourceId, Iterable<? extends Split> sourceSplits)
presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L720private boolean addNewExchangesAndBuffers()
presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L783private Split createRemoteSplitFor(String nodeId, URI taskLocation)
presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L952public RemoteTask createRemoteTask(ConnectorSession session,...)
presto-main/src/main/java/com/facebook/presto/server/HttpRemoteTaskFactory.java:L79public URI createTaskLocation(Node node, TaskId taskId)
presto-main/src/main/java/com/facebook/presto/server/HttpLocationFactory.java:L76
リクエストオブジェクトを作成し、 /v1/task
に投げている。
// presto-main/src/main/java/com/facebook/presto/server/HttpRemoteTask.java:L336
private synchronized void scheduleUpdate()
{
...
TaskUpdateRequest updateRequest = new TaskUpdateRequest(session,
planFragment,
sources,
outputBuffers.get());
Request request = preparePost()
.setUri(uriBuilderFrom(taskInfo.get().getSelf()).addParameter("summarize").build())
.setHeader(HttpHeaders.CONTENT_TYPE, MediaType.JSON_UTF_8.toString())
.setBodyGenerator(jsonBodyGenerator(taskUpdateRequestCodec, updateRequest))
.build();
...
}
おそらくこの辺りでタスクが起動するはず。
TaskResource
がリクエストを受け取る。
// presto-main/src/main/java/com/facebook/presto/server/TaskResource.java:L92
public Response createOrUpdateTask(@PathParam("taskId") TaskId taskId, TaskUpdateRequest taskUpdateRequest, @Context UriInfo uriInfo)
{
checkNotNull(taskUpdateRequest, "taskUpdateRequest is null");
TaskInfo taskInfo = taskManager.updateTask(taskUpdateRequest.getSession(),
taskId,
taskUpdateRequest.getFragment(),
taskUpdateRequest.getSources(),
taskUpdateRequest.getOutputIds());
if (shouldSummarize(uriInfo)) {
taskInfo = taskInfo.summarize();
}
return Response.ok().entity(taskInfo).build();
}
タスクの情報はTaskManager
が管理している。
TaskManager
の実態はSqlTaskManager
。
// presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java:L154
binder.bind(TaskManager.class).to(SqlTaskManager.class).in(Scopes.SINGLETON);
というわけでSqlTaskManager
のupdateTask
を見てみる。
// presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java:L282
public TaskInfo updateTask(ConnectorSession session, TaskId taskId, PlanFragment fragment, List<TaskSource> sources, OutputBuffers outputBuffers)
{
TaskExecution taskExecution;
synchronized (this) {
taskExecution = tasks.get(taskId);
if (taskExecution == null) {
// is task already complete?
TaskInfo taskInfo = taskInfos.get(taskId);
if (taskInfo != null) {
return taskInfo;
}
taskExecution = SqlTaskExecution.createSqlTaskExecution(session,
taskId,
location,
fragment,
sources,
outputBuffers,
planner,
maxBufferSize,
taskExecutor,
taskNotificationExecutor,
maxTaskMemoryUsage,
operatorPreAllocatedMemory,
queryMonitor,
cpuTimerEnabled
);
tasks.put(taskId, taskExecution);
}
}
taskExecution.recordHeartbeat();
taskExecution.addSources(sources);
taskExecution.addResultQueue(outputBuffers);
return getTaskInfo(taskExecution);
}
タスクの登録がなければ新規にタスクを生成して登録、その情報を返す。
SqlTaskExecution.createSqlTaskExecution
では、新規タスクを作成したらすぐにtask.start()
を呼び出している。
// presto-main/src/main/java/com/facebook/presto/execution/SqlTaskExecution.java:L111
public static SqlTaskExecution createSqlTaskExecution(ConnectorSession session,
TaskId taskId,
URI location,
PlanFragment fragment,
List<TaskSource> sources,
OutputBuffers outputBuffers,
LocalExecutionPlanner planner,
DataSize maxBufferSize,
TaskExecutor taskExecutor,
ExecutorService notificationExecutor,
DataSize maxTaskMemoryUsage,
DataSize operatorPreAllocatedMemory,
QueryMonitor queryMonitor,
boolean cpuTimerEnabled)
{
SqlTaskExecution task = new SqlTaskExecution(session,
taskId,
location,
fragment,
outputBuffers,
planner,
maxBufferSize,
taskExecutor,
maxTaskMemoryUsage,
operatorPreAllocatedMemory,
queryMonitor,
notificationExecutor,
cpuTimerEnabled
);
try (SetThreadName ignored = new SetThreadName("Task-%s", taskId)) {
task.start();
task.addSources(sources);
task.recordHeartbeat();
return task;
}
}
SqlTaskExecution
のコンストラクタでいろいろなことをやってる。
TaskStateMachine
の作成TaskContext
の作成SharedBuffer
の作成- ローカル実行計画
// presto-main/src/main/java/com/facebook/presto/execution/SqlTaskExecution.java:L214
LocalExecutionPlan localExecutionPlan = planner.plan(session, fragment.getRoot(), fragment.getSymbols(), new TaskOutputFactory(sharedBuffer));
List<DriverFactory> driverFactories = localExecutionPlan.getDriverFactories();
LocalExecutionPlanner
// presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java:L194
public LocalExecutionPlan plan(ConnectorSession session,
PlanNode plan,
Map<Symbol, Type> types,
OutputFactory outputOperatorFactory)
{
LocalExecutionPlanContext context = new LocalExecutionPlanContext(session, types);
PhysicalOperation physicalOperation = plan.accept(new Visitor(session), context);
DriverFactory driverFactory = new DriverFactory(
context.isInputDriver(),
true,
ImmutableList.<OperatorFactory>builder()
.addAll(physicalOperation.getOperatorFactories())
.add(outputOperatorFactory.createOutputOperator(context.getNextOperatorId(), physicalOperation.getTypes()))
.build());
context.addDriverFactory(driverFactory);
return new LocalExecutionPlan(context.getDriverFactories());
}
PlanNode
からVisitor
を使ってPhysicalOperation
を作ってる。
作りながら、LocalExecutionPlanContext
にDriver
を追加していく。
最後にOutputOperatorFactory
のDriverFactory
を登録して、すべてのDriverFactory
をLogicalExecutionPlan
として返す。
物理オペレータのファクトリを格納するためのクラス。
// presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java:L1430
private static class PhysicalOperation
{
private final List<OperatorFactory> operatorFactories;
private final Map<Symbol, Input> layout;
private final List<Type> types;
...
}
自分用のOperatorFactory
は他のsource
の後に登録される。
PlanNode
をPhysicalOperation
に変換するためのクラス。
- 必要に応じてバイトコードへのコンパイルも行っている。
ExpressionCompiler
JoinProbeCompiler
- ...
VisitorなのでそれぞれのPlanNode
に対応したメソッドを見てみるべし。
例えばTableScanNode
// presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java:L791
public PhysicalOperation visitTableScan(TableScanNode node, LocalExecutionPlanContext context)
{
ImmutableMap.Builder<Symbol, Input> outputMappings = ImmutableMap.builder();
List<ColumnHandle> columns = new ArrayList<>();
int channel = 0;
for (Symbol symbol : node.getOutputSymbols()) {
columns.add(node.getAssignments().get(symbol));
outputMappings.put(symbol, new Input(channel)); // one column per channel
channel++;
}
List<Type> types = getSourceOperatorTypes(node, context.getTypes());
OperatorFactory operatorFactory = new TableScanOperatorFactory(context.getNextOperatorId(), node.getId(), dataStreamProvider, types, columns);
return new PhysicalOperation(operatorFactory, outputMappings.build());
}
例えばProjectNode
// presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java:L613
public PhysicalOperation visitProject(ProjectNode node, LocalExecutionPlanContext context)
{
PlanNode sourceNode;
Expression filterExpression;
if (node.getSource() instanceof FilterNode) {
FilterNode filterNode = (FilterNode) node.getSource();
sourceNode = filterNode.getSource();
filterExpression = filterNode.getPredicate();
}
else {
sourceNode = node.getSource();
filterExpression = BooleanLiteral.TRUE_LITERAL;
}
List<Expression> projectionExpressions = node.getExpressions();
List<Symbol> outputSymbols = node.getOutputSymbols();
return visitScanFilterAndProject(context, sourceNode, filterExpression, projectionExpressions, outputSymbols);
}
最終的にvisitScanFilterAndProject
でPhysicalOperation
を作ってるけど詳細は割愛。
// presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java:L1173
private PhysicalOperation createInMemoryExchange(PlanNode node, LocalExecutionPlanContext context)
{
...
// add sub-context to current context
context.addDriverFactory(new DriverFactory(subContext.isInputDriver(), false, factories));
exchange.noMoreSinkFactories();
// the main driver is not an input: the source is the input for the plan
context.setInputDriver(false);
...
}
// presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java:L1023
private PhysicalOperation createJoinOperator(JoinNode node,
PlanNode probeNode,
List<Symbol> probeSymbols,
PlanNode buildNode,
List<Symbol> buildSymbols,
LocalExecutionPlanContext context)
{
...
DriverFactory buildDriverFactory = new DriverFactory(
buildContext.isInputDriver(),
false,
ImmutableList.<OperatorFactory>builder()
.addAll(buildSource.getOperatorFactories())
.add(hashBuilderOperatorFactory)
.build());
context.addDriverFactory(buildDriverFactory);
...
}
// presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java:L1088
public PhysicalOperation visitSemiJoin(SemiJoinNode node, LocalExecutionPlanContext context)
{
...
DriverFactory buildDriverFactory = new DriverFactory(
buildContext.isInputDriver(),
false,
ImmutableList.<OperatorFactory>builder()
.addAll(buildSource.getOperatorFactories())
.add(setBuilderOperatorFactory)
.build());
context.addDriverFactory(buildDriverFactory);
...
}
// presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java:L1217
public PhysicalOperation visitUnion(UnionNode node, LocalExecutionPlanContext context)
{
...
for (int i = 0; i < node.getSources().size(); i++) {
...
DriverFactory driverFactory = new DriverFactory(subContext.isInputDriver(), false, operatorFactories);
context.addDriverFactory(driverFactory);
}
inMemoryExchange.noMoreSinkFactories();
// the main driver is not an input... the union sources are the input for the plan
context.setInputDriver(false);
...
}
// index driver factories
DriverSplitRunnerFactory partitionedDriverFactory = null;
ImmutableList.Builder<DriverSplitRunnerFactory> unpartitionedDriverFactories = ImmutableList.builder();
for (DriverFactory driverFactory : driverFactories) {
if (driverFactory.getSourceIds().contains(fragment.getPartitionedSource())) {
checkState(partitionedDriverFactory == null, "multiple partitioned sources are not supported");
partitionedDriverFactory = new DriverSplitRunnerFactory(driverFactory);
}
else {
unpartitionedDriverFactories.add(new DriverSplitRunnerFactory(driverFactory));
}
}
this.unpartitionedDriverFactories = unpartitionedDriverFactories.build();
if (fragment.getDistribution() == PlanDistribution.SOURCE) {
checkArgument(partitionedDriverFactory != null, "Fragment is partitioned, but no partitioned driver found");
}
this.partitionedSourceId = fragment.getPartitionedSource();
this.partitionedDriverFactory = partitionedDriverFactory;
}
}
ここでDriverFactory
をDriverSplitRunnerFactory
にくるみつつ、partitioned
なDriverFactory
を取り出す。
ただし、
- 複数の
partitioned
なものは持てない fragment.getDistribution() == PlanDistribution.SOURCE
の場合にはpartitioned
なものがなければならない
SqlTaskExecution
のコンストラクタはここまで。
SqlTaskExecution.createSqlTaskExecution
がすぐにSqlTaskExecution.start()
を呼び出していた。
// presto-main/src/main/java/com/facebook/presto/execution/SqlTaskExecution.java:L242
private void start()
{
// start unpartitioned drivers
List<DriverSplitRunner> runners = new ArrayList<>();
for (DriverSplitRunnerFactory driverFactory : unpartitionedDriverFactories) {
runners.add(driverFactory.createDriverRunner(null));
driverFactory.setNoMoreSplits();
}
enqueueDrivers(true, runners);
}
DriverSplitRunner
に変換してenqueueDrivers
を呼び出す。
// presto-main/src/main/java/com/facebook/presto/execution/SqlTaskExecution.java:L397
private synchronized void enqueueDrivers(boolean forceRunSplit, List<DriverSplitRunner> runners)
{
// schedule driver to be executed
List<ListenableFuture<?>> finishedFutures = taskExecutor.enqueueSplits(taskHandle, forceRunSplit, runners);
checkState(finishedFutures.size() == runners.size(), "Expected %s futures but got %s", runners.size(), finishedFutures.size());
...
for (int i = 0; i < finishedFutures.size(); i++) {
ListenableFuture<?> finishedFuture = finishedFutures.get(i);
final DriverSplitRunner splitRunner = runners.get(i);
Futures.addCallback(finishedFuture, new FutureCallback<Object>()
{
@Override
public void onSuccess(Object result)
{
try (SetThreadName ignored = new SetThreadName("Task-%s", taskId)) {
// record driver is finished
remainingDrivers.decrementAndGet();
checkTaskCompletion();
queryMonitor.splitCompletionEvent(taskId, getDriverStats());
}
}
...
private DriverStats getDriverStats()
{
DriverContext driverContext = splitRunner.getDriverContext();
DriverStats driverStats;
if (driverContext != null) {
driverStats = driverContext.getDriverStats();
}
else {
// split runner did not start successfully
driverStats = new DriverStats();
}
return driverStats;
}
}, notificationExecutor);
}
}
更にTaskExecutor.enqueueSplits()
を呼び出す。
// presto-main/src/main/java/com/facebook/presto/execution/TaskExecutor.java:L187
public synchronized List<ListenableFuture<?>> enqueueSplits(TaskHandle taskHandle, boolean forceStart, List<? extends SplitRunner> taskSplits)
{
List<ListenableFuture<?>> finishedFutures = new ArrayList<>(taskSplits.size());
for (SplitRunner taskSplit : taskSplits) {
PrioritizedSplitRunner prioritizedSplitRunner = new PrioritizedSplitRunner(taskHandle, taskSplit, ticker);
if (forceStart) {
// Note: we do not record queued time for forced splits
startSplit(prioritizedSplitRunner);
// add the runner to the handle so it can be destroyed if the task is canceled
taskHandle.recordRunningSplit(prioritizedSplitRunner);
}
else {
// add this to the work queue for the task
taskHandle.enqueueSplit(prioritizedSplitRunner);
// if task is under the limit for gaurenteed splits, start one
scheduleTaskIfNecessary(taskHandle);
// if globally we have more resources, start more
addNewEntrants();
}
finishedFutures.add(prioritizedSplitRunner.getFinishedFuture());
}
return finishedFutures;
}
PrioritaizedSplitRunner
にくるんで、startSplit
に渡す。
forceStart
がfalse
の場合にはTaskHande.enqueueSplit
に渡されていったん保留になる。
finishedFutures
に加えられるprioritizedSplitRunner.getFinishedFuture()
は、Promise
のようなやつで、タスクが終了した時にnull
をセットされる。
するとCallback
にonSuccess
が通知される。
// presto-main/src/main/java/com/facebook/presto/execution/TaskExecutor.java:L260
private synchronized void startSplit(PrioritizedSplitRunner split)
{
allSplits.add(split);
pendingSplits.put(split);
}
startSplit
と言いつつ、pendingSplits
に入れるだけ。
実際に処理をしているのは、TaskExecutor
内で動いているスレッド
// presto-main/src/main/java/com/facebook/presto/execution/TaskExecutor.java:L553
public void run()
{
...
final PrioritizedSplitRunner split;
try {
split = pendingSplits.take();
...
}
...
ListenableFuture<?> blocked;
try {
blocked = split.process();
...
}
finally {
runningSplits.remove(split);
}
...
}
TaskExecutor.Runner.run()
が、pendingSplits
からPrioritaizedSplitRunner
を取り出してprocess()
を実行
// presto-main/src/main/java/com/facebook/presto/execution/TaskExecutor.java:L436
public ListenableFuture<?> process()
throws Exception
{
try {
...
ListenableFuture<?> blocked = split.processFor(SPLIT_RUN_QUANTA);
...
return blocked;
}
catch (Throwable e) {
finishedFuture.setException(e);
throw e;
}
}
PrioritaizedSplitRunner
が更にDriverSplitRunner
のprocessFor
を実行
// presto-main/src/main/java/com/facebook/presto/execution/SqlTaskExecution.java:L663
public ListenableFuture<?> processFor(Duration duration)
{
Driver driver;
synchronized (this) {
// if close() was called before we get here, there's not point in even creating the driver
if (closed) {
return Futures.immediateFuture(null);
}
if (this.driver == null) {
this.driver = driverSplitRunnerFactory.createDriver(driverContext, partitionedSplit);
}
driver = this.driver;
}
return driver.processFor(duration);
}
DriverSplitRunner
はDriver
を作ってprocessFor
を実行
// presto-main/src/main/java/com/facebook/presto/operator/Driver.java:L261
public ListenableFuture<?> processFor(Duration duration)
{
checkLockNotHeld("Can not process for a duration while holding the driver lock");
checkNotNull(duration, "duration is null");
long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
long start = System.nanoTime();
do {
ListenableFuture<?> future = process();
if (!future.isDone()) {
return future;
}
}
while (System.nanoTime() - start < maxRuntime && !isFinished());
return NOT_BLOCKED;
}
ごにょごにょしてからそのままDriver
のprocess()
を実行
// presto-main/src/main/java/com/facebook/presto/operator/Driver.java:L281
public ListenableFuture<?> process()
{
...
driverContext.start();
if (!newSources.isEmpty()) {
processNewSources();
}
for (int i = 0; i < operators.size() - 1 && !driverContext.isDone(); i++) {
// check if current operator is blocked
Operator current = operators.get(i);
ListenableFuture<?> blocked = current.isBlocked();
if (!blocked.isDone()) {
current.getOperatorContext().recordBlocked(blocked);
return blocked;
}
// check if next operator is blocked
Operator next = operators.get(i + 1);
blocked = next.isBlocked();
if (!blocked.isDone()) {
next.getOperatorContext().recordBlocked(blocked);
return blocked;
}
// if current operator is finished...
if (current.isFinished()) {
// let next operator know there will be no more data
next.getOperatorContext().startIntervalTimer();
next.finish();
next.getOperatorContext().recordFinish();
}
else {
// if next operator needs input...
if (next.needsInput()) {
// get an output page from current operator
current.getOperatorContext().startIntervalTimer();
Page page = current.getOutput();
current.getOperatorContext().recordGetOutput(page);
// if we got an output page, add it to the next operator
if (page != null) {
next.getOperatorContext().startIntervalTimer();
next.addInput(page);
next.getOperatorContext().recordAddInput(page);
}
}
}
}
return NOT_BLOCKED;
...
}
}
順番に物理オペレータを実行していく
// presto-main/src/main/java/com/facebook/presto/operator/Operator.java:L22
public interface Operator
{
ListenableFuture<?> NOT_BLOCKED = Futures.immediateFuture(null);
OperatorContext getOperatorContext();
List<Type> getTypes();
void finish();
boolean isFinished();
ListenableFuture<?> isBlocked();
boolean needsInput();
void addInput(Page page);
Page getOutput();
}
- AbstractFilterAndProjectOperator
- FilterAndProjectOperator
- FilterAndProjectOperator_$id
- AggregationOperator
- AlignmentOperator
- DistinctLimitOperator
- FinishedOperator
- HashAggregationOperator
- HashBuilderOperator
- HashSemiJoinOperator
- InMemoryExchangeSinkOperator
- InMemoryExchangeSourceOperator
- LimitOperator
- LookupJoinOperator
- MarkDistinctOperator
- MarkDistinctSampledOperator
- MaterializeSampleOperator
- OrderByOperator
- PagesIndexBuilderOperator
- RecordProjectOperator
- SampleOperator
- SetBuilderOperator
- TableCommitOperator
- TableWriterOperator
- TaskOutputOperator
- TopNOperator
- ValuesOperator
- WindowOperator
- SourceOperator
- AbstractScanFilterAndProjectOperator
- AbstractScanFilterAndProjectOperator_$id
- ExchangeOperator
- IndexSourceOperator
- TableScanOperator
これはとても便利です。 書いてくれてありがとうございました。