Skip to content

Instantly share code, notes, and snippets.

@ueshin
Created July 15, 2014 06:53
Show Gist options
  • Save ueshin/5e38cfe915ce15f756b1 to your computer and use it in GitHub Desktop.
Save ueshin/5e38cfe915ce15f756b1 to your computer and use it in GitHub Desktop.
20140715 Presto Source Code Reading #4

20140715 Presto SCR #4

  • Presto 0.73

論理実行計画の流れ

https://gist.github.com/ashigeru/9518638

analyzeQuery()

物理実行計画

前回のラスト

リクエストオブジェクトを作成し、 /v1/task に投げている。

// presto-main/src/main/java/com/facebook/presto/server/HttpRemoteTask.java:L336
private synchronized void scheduleUpdate()
{
    ...
    TaskUpdateRequest updateRequest = new TaskUpdateRequest(session,
            planFragment,
            sources,
            outputBuffers.get());
    Request request = preparePost()
            .setUri(uriBuilderFrom(taskInfo.get().getSelf()).addParameter("summarize").build())
            .setHeader(HttpHeaders.CONTENT_TYPE, MediaType.JSON_UTF_8.toString())
            .setBodyGenerator(jsonBodyGenerator(taskUpdateRequestCodec, updateRequest))
            .build();
    ...
}

おそらくこの辺りでタスクが起動するはず。

エントリポイント

TaskResourceがリクエストを受け取る。

// presto-main/src/main/java/com/facebook/presto/server/TaskResource.java:L92
public Response createOrUpdateTask(@PathParam("taskId") TaskId taskId, TaskUpdateRequest taskUpdateRequest, @Context UriInfo uriInfo)
{
    checkNotNull(taskUpdateRequest, "taskUpdateRequest is null");
    TaskInfo taskInfo = taskManager.updateTask(taskUpdateRequest.getSession(),
            taskId,
            taskUpdateRequest.getFragment(),
            taskUpdateRequest.getSources(),
            taskUpdateRequest.getOutputIds());
    if (shouldSummarize(uriInfo)) {
        taskInfo = taskInfo.summarize();
    }
    return Response.ok().entity(taskInfo).build();
}

タスクの情報はTaskManagerが管理している。


TaskManagerの実態はSqlTaskManager

// presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java:L154
binder.bind(TaskManager.class).to(SqlTaskManager.class).in(Scopes.SINGLETON);

というわけでSqlTaskManagerupdateTaskを見てみる。

// presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java:L282
public TaskInfo updateTask(ConnectorSession session, TaskId taskId, PlanFragment fragment, List<TaskSource> sources, OutputBuffers outputBuffers)
{
    TaskExecution taskExecution;
    synchronized (this) {
        taskExecution = tasks.get(taskId);
        if (taskExecution == null) {
            // is task already complete?
            TaskInfo taskInfo = taskInfos.get(taskId);
            if (taskInfo != null) {
                return taskInfo;
            }
            taskExecution = SqlTaskExecution.createSqlTaskExecution(session,
                    taskId,
                    location,
                    fragment,
                    sources,
                    outputBuffers,
                    planner,
                    maxBufferSize,
                    taskExecutor,
                    taskNotificationExecutor,
                    maxTaskMemoryUsage,
                    operatorPreAllocatedMemory,
                    queryMonitor,
                    cpuTimerEnabled
            );
            tasks.put(taskId, taskExecution);
        }
    }
    taskExecution.recordHeartbeat();
    taskExecution.addSources(sources);
    taskExecution.addResultQueue(outputBuffers);
    return getTaskInfo(taskExecution);
}

タスクの登録がなければ新規にタスクを生成して登録、その情報を返す。

タスクの実行

SqlTaskExecution.createSqlTaskExecutionでは、新規タスクを作成したらすぐにtask.start()を呼び出している。

// presto-main/src/main/java/com/facebook/presto/execution/SqlTaskExecution.java:L111
public static SqlTaskExecution createSqlTaskExecution(ConnectorSession session,
        TaskId taskId,
        URI location,
        PlanFragment fragment,
        List<TaskSource> sources,
        OutputBuffers outputBuffers,
        LocalExecutionPlanner planner,
        DataSize maxBufferSize,
        TaskExecutor taskExecutor,
        ExecutorService notificationExecutor,
        DataSize maxTaskMemoryUsage,
        DataSize operatorPreAllocatedMemory,
        QueryMonitor queryMonitor,
        boolean cpuTimerEnabled)
{
    SqlTaskExecution task = new SqlTaskExecution(session,
            taskId,
            location,
            fragment,
            outputBuffers,
            planner,
            maxBufferSize,
            taskExecutor,
            maxTaskMemoryUsage,
            operatorPreAllocatedMemory,
            queryMonitor,
            notificationExecutor,
            cpuTimerEnabled
    );
    try (SetThreadName ignored = new SetThreadName("Task-%s", taskId)) {
        task.start();
        task.addSources(sources);
        task.recordHeartbeat();
        return task;
    }
}

SqlTaskExecutionのコンストラクタでいろいろなことをやってる。

  • TaskStateMachineの作成
  • TaskContextの作成
  • SharedBufferの作成
  • ローカル実行計画

ローカル実行計画

// presto-main/src/main/java/com/facebook/presto/execution/SqlTaskExecution.java:L214
LocalExecutionPlan localExecutionPlan = planner.plan(session, fragment.getRoot(), fragment.getSymbols(), new TaskOutputFactory(sharedBuffer));
List<DriverFactory> driverFactories = localExecutionPlan.getDriverFactories();

LocalExecutionPlanner

// presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java:L194
public LocalExecutionPlan plan(ConnectorSession session,
        PlanNode plan,
        Map<Symbol, Type> types,
        OutputFactory outputOperatorFactory)
{
    LocalExecutionPlanContext context = new LocalExecutionPlanContext(session, types);
    PhysicalOperation physicalOperation = plan.accept(new Visitor(session), context);
    DriverFactory driverFactory = new DriverFactory(
            context.isInputDriver(),
            true,
            ImmutableList.<OperatorFactory>builder()
                    .addAll(physicalOperation.getOperatorFactories())
                    .add(outputOperatorFactory.createOutputOperator(context.getNextOperatorId(), physicalOperation.getTypes()))
                    .build());
    context.addDriverFactory(driverFactory);
    return new LocalExecutionPlan(context.getDriverFactories());
}

PlanNodeからVisitorを使ってPhysicalOperationを作ってる。
作りながら、LocalExecutionPlanContextDriverを追加していく。
最後にOutputOperatorFactoryDriverFactoryを登録して、すべてのDriverFactoryLogicalExecutionPlanとして返す。

PhysicalOperation

物理オペレータのファクトリを格納するためのクラス。

// presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java:L1430
private static class PhysicalOperation
{
    private final List<OperatorFactory> operatorFactories;
    private final Map<Symbol, Input> layout;
    private final List<Type> types;
    ...
}

自分用のOperatorFactoryは他のsourceの後に登録される。

Visitor

PlanNodePhysicalOperationに変換するためのクラス。

VisitorなのでそれぞれのPlanNodeに対応したメソッドを見てみるべし。


例えばTableScanNode

// presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java:L791
public PhysicalOperation visitTableScan(TableScanNode node, LocalExecutionPlanContext context)
{
    ImmutableMap.Builder<Symbol, Input> outputMappings = ImmutableMap.builder();
    List<ColumnHandle> columns = new ArrayList<>();
    int channel = 0;
    for (Symbol symbol : node.getOutputSymbols()) {
        columns.add(node.getAssignments().get(symbol));
        outputMappings.put(symbol, new Input(channel)); // one column per channel
        channel++;
    }
    List<Type> types = getSourceOperatorTypes(node, context.getTypes());
    OperatorFactory operatorFactory = new TableScanOperatorFactory(context.getNextOperatorId(), node.getId(), dataStreamProvider, types, columns);
    return new PhysicalOperation(operatorFactory, outputMappings.build());
}

例えばProjectNode

// presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java:L613
public PhysicalOperation visitProject(ProjectNode node, LocalExecutionPlanContext context)
{
    PlanNode sourceNode;
    Expression filterExpression;
    if (node.getSource() instanceof FilterNode) {
        FilterNode filterNode = (FilterNode) node.getSource();
        sourceNode = filterNode.getSource();
        filterExpression = filterNode.getPredicate();
    }
    else {
        sourceNode = node.getSource();
        filterExpression = BooleanLiteral.TRUE_LITERAL;
    }
    List<Expression> projectionExpressions = node.getExpressions();
    List<Symbol> outputSymbols = node.getOutputSymbols();
    return visitScanFilterAndProject(context, sourceNode, filterExpression, projectionExpressions, outputSymbols);
}

最終的にvisitScanFilterAndProjectPhysicalOperationを作ってるけど詳細は割愛。

DriverFactoryが作られる場所

createInMemoryExchange

// presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java:L1173
private PhysicalOperation createInMemoryExchange(PlanNode node, LocalExecutionPlanContext context)
{
    ...
    // add sub-context to current context
    context.addDriverFactory(new DriverFactory(subContext.isInputDriver(), false, factories));
    exchange.noMoreSinkFactories();
    // the main driver is not an input: the source is the input for the plan
    context.setInputDriver(false);
    ...
}

createJoinOperator

// presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java:L1023
private PhysicalOperation createJoinOperator(JoinNode node,
        PlanNode probeNode,
        List<Symbol> probeSymbols,
        PlanNode buildNode,
        List<Symbol> buildSymbols,
        LocalExecutionPlanContext context)
{
    ...

    DriverFactory buildDriverFactory = new DriverFactory(
            buildContext.isInputDriver(),
            false,
            ImmutableList.<OperatorFactory>builder()
                    .addAll(buildSource.getOperatorFactories())
                    .add(hashBuilderOperatorFactory)
                    .build());
    context.addDriverFactory(buildDriverFactory);
    ...
}

visitSemiJoin

// presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java:L1088
public PhysicalOperation visitSemiJoin(SemiJoinNode node, LocalExecutionPlanContext context)
{
    ...
    DriverFactory buildDriverFactory = new DriverFactory(
            buildContext.isInputDriver(),
            false,
            ImmutableList.<OperatorFactory>builder()
                    .addAll(buildSource.getOperatorFactories())
                    .add(setBuilderOperatorFactory)
                    .build());
    context.addDriverFactory(buildDriverFactory);
    ...
}

visitUnion

// presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java:L1217
public PhysicalOperation visitUnion(UnionNode node, LocalExecutionPlanContext context)
{
    ...
    for (int i = 0; i < node.getSources().size(); i++) {
    ...
        DriverFactory driverFactory = new DriverFactory(subContext.isInputDriver(), false, operatorFactories);
        context.addDriverFactory(driverFactory);
    }
    inMemoryExchange.noMoreSinkFactories();
    // the main driver is not an input... the union sources are the input for the plan
    context.setInputDriver(false);
    ...
}

SqlTaskExecutionに戻ります。

        // index driver factories
        DriverSplitRunnerFactory partitionedDriverFactory = null;
        ImmutableList.Builder<DriverSplitRunnerFactory> unpartitionedDriverFactories = ImmutableList.builder();
        for (DriverFactory driverFactory : driverFactories) {
            if (driverFactory.getSourceIds().contains(fragment.getPartitionedSource())) {
                checkState(partitionedDriverFactory == null, "multiple partitioned sources are not supported");
                partitionedDriverFactory = new DriverSplitRunnerFactory(driverFactory);
            }
            else {
                unpartitionedDriverFactories.add(new DriverSplitRunnerFactory(driverFactory));
            }
        }
        this.unpartitionedDriverFactories = unpartitionedDriverFactories.build();
        if (fragment.getDistribution() == PlanDistribution.SOURCE) {
            checkArgument(partitionedDriverFactory != null, "Fragment is partitioned, but no partitioned driver found");
        }
        this.partitionedSourceId = fragment.getPartitionedSource();
        this.partitionedDriverFactory = partitionedDriverFactory;
    }
}

ここでDriverFactoryDriverSplitRunnerFactoryにくるみつつ、partitionedDriverFactoryを取り出す。

ただし、

  • 複数のpartitionedなものは持てない
  • fragment.getDistribution() == PlanDistribution.SOURCEの場合にはpartitionedなものがなければならない

SqlTaskExecutionのコンストラクタはここまで。

SqlTaskExecution.createSqlTaskExecutionがすぐにSqlTaskExecution.start()を呼び出していた。

タスクの起動

// presto-main/src/main/java/com/facebook/presto/execution/SqlTaskExecution.java:L242
private void start()
{
    // start unpartitioned drivers
    List<DriverSplitRunner> runners = new ArrayList<>();
    for (DriverSplitRunnerFactory driverFactory : unpartitionedDriverFactories) {
        runners.add(driverFactory.createDriverRunner(null));
        driverFactory.setNoMoreSplits();
    }
    enqueueDrivers(true, runners);
}

DriverSplitRunnerに変換してenqueueDriversを呼び出す。


// presto-main/src/main/java/com/facebook/presto/execution/SqlTaskExecution.java:L397
private synchronized void enqueueDrivers(boolean forceRunSplit, List<DriverSplitRunner> runners)
{
    // schedule driver to be executed
    List<ListenableFuture<?>> finishedFutures = taskExecutor.enqueueSplits(taskHandle, forceRunSplit, runners);
    checkState(finishedFutures.size() == runners.size(), "Expected %s futures but got %s", runners.size(), finishedFutures.size());
    ...
    for (int i = 0; i < finishedFutures.size(); i++) {
        ListenableFuture<?> finishedFuture = finishedFutures.get(i);
        final DriverSplitRunner splitRunner = runners.get(i);
        Futures.addCallback(finishedFuture, new FutureCallback<Object>()
        {
            @Override
            public void onSuccess(Object result)
            {
                try (SetThreadName ignored = new SetThreadName("Task-%s", taskId)) {
                    // record driver is finished
                    remainingDrivers.decrementAndGet();
                    checkTaskCompletion();
                    queryMonitor.splitCompletionEvent(taskId, getDriverStats());
                }
            }
    ...
            private DriverStats getDriverStats()
            {
                DriverContext driverContext = splitRunner.getDriverContext();
                DriverStats driverStats;
                if (driverContext != null) {
                    driverStats = driverContext.getDriverStats();
                }
                else {
                    // split runner did not start successfully
                    driverStats = new DriverStats();
                }
                return driverStats;
            }
        }, notificationExecutor);
    }
}

更にTaskExecutor.enqueueSplits()を呼び出す。


// presto-main/src/main/java/com/facebook/presto/execution/TaskExecutor.java:L187
public synchronized List<ListenableFuture<?>> enqueueSplits(TaskHandle taskHandle, boolean forceStart, List<? extends SplitRunner> taskSplits)
{
    List<ListenableFuture<?>> finishedFutures = new ArrayList<>(taskSplits.size());
    for (SplitRunner taskSplit : taskSplits) {
        PrioritizedSplitRunner prioritizedSplitRunner = new PrioritizedSplitRunner(taskHandle, taskSplit, ticker);
        if (forceStart) {
            // Note: we do not record queued time for forced splits
            startSplit(prioritizedSplitRunner);
            // add the runner to the handle so it can be destroyed if the task is canceled
            taskHandle.recordRunningSplit(prioritizedSplitRunner);
        }
        else {
            // add this to the work queue for the task
            taskHandle.enqueueSplit(prioritizedSplitRunner);
            // if task is under the limit for gaurenteed splits, start one
            scheduleTaskIfNecessary(taskHandle);
            // if globally we have more resources, start more
            addNewEntrants();
        }
        finishedFutures.add(prioritizedSplitRunner.getFinishedFuture());
    }
    return finishedFutures;
}

PrioritaizedSplitRunnerにくるんで、startSplitに渡す。

forceStartfalseの場合にはTaskHande.enqueueSplitに渡されていったん保留になる。

finishedFuturesに加えられるprioritizedSplitRunner.getFinishedFuture()は、Promiseのようなやつで、タスクが終了した時にnullをセットされる。 するとCallbackonSuccessが通知される。


// presto-main/src/main/java/com/facebook/presto/execution/TaskExecutor.java:L260
private synchronized void startSplit(PrioritizedSplitRunner split)
{
    allSplits.add(split);
    pendingSplits.put(split);
}

startSplitと言いつつ、pendingSplitsに入れるだけ。


実際に処理をしているのは、TaskExecutor内で動いているスレッド

// presto-main/src/main/java/com/facebook/presto/execution/TaskExecutor.java:L553
public void run()
{
    ...
            final PrioritizedSplitRunner split;
            try {
                split = pendingSplits.take();
    ...
            }
    ...
                ListenableFuture<?> blocked;
                try {
                    blocked = split.process();
    ...
                }
                finally {
                    runningSplits.remove(split);
                }
    ...
}

TaskExecutor.Runner.run()が、pendingSplitsからPrioritaizedSplitRunnerを取り出してprocess()を実行


// presto-main/src/main/java/com/facebook/presto/execution/TaskExecutor.java:L436
public ListenableFuture<?> process()
        throws Exception
{
    try {
    ...
        ListenableFuture<?> blocked = split.processFor(SPLIT_RUN_QUANTA);
    ...
        return blocked;
    }
    catch (Throwable e) {
        finishedFuture.setException(e);
        throw e;
    }
}

PrioritaizedSplitRunnerが更にDriverSplitRunnerprocessForを実行


// presto-main/src/main/java/com/facebook/presto/execution/SqlTaskExecution.java:L663
public ListenableFuture<?> processFor(Duration duration)
{
    Driver driver;
    synchronized (this) {
        // if close() was called before we get here, there's not point in even creating the driver
        if (closed) {
            return Futures.immediateFuture(null);
        }
        if (this.driver == null) {
            this.driver = driverSplitRunnerFactory.createDriver(driverContext, partitionedSplit);
        }
        driver = this.driver;
    }
    return driver.processFor(duration);
}

DriverSplitRunnerDriverを作ってprocessForを実行


// presto-main/src/main/java/com/facebook/presto/operator/Driver.java:L261
public ListenableFuture<?> processFor(Duration duration)
{
    checkLockNotHeld("Can not process for a duration while holding the driver lock");
    checkNotNull(duration, "duration is null");
    long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
    long start = System.nanoTime();
    do {
        ListenableFuture<?> future = process();
        if (!future.isDone()) {
            return future;
        }
    }
    while (System.nanoTime() - start < maxRuntime && !isFinished());
    return NOT_BLOCKED;
}

ごにょごにょしてからそのままDriverprocess()を実行


// presto-main/src/main/java/com/facebook/presto/operator/Driver.java:L281
public ListenableFuture<?> process()
{
    ...
            driverContext.start();
            if (!newSources.isEmpty()) {
                processNewSources();
            }
            for (int i = 0; i < operators.size() - 1 && !driverContext.isDone(); i++) {
                // check if current operator is blocked
                Operator current = operators.get(i);
                ListenableFuture<?> blocked = current.isBlocked();
                if (!blocked.isDone()) {
                    current.getOperatorContext().recordBlocked(blocked);
                    return blocked;
                }
                // check if next operator is blocked
                Operator next = operators.get(i + 1);
                blocked = next.isBlocked();
                if (!blocked.isDone()) {
                    next.getOperatorContext().recordBlocked(blocked);
                    return blocked;
                }
                // if current operator is finished...
                if (current.isFinished()) {
                    // let next operator know there will be no more data
                    next.getOperatorContext().startIntervalTimer();
                    next.finish();
                    next.getOperatorContext().recordFinish();
                }
                else {
                    // if next operator needs input...
                    if (next.needsInput()) {
                        // get an output page from current operator
                        current.getOperatorContext().startIntervalTimer();
                        Page page = current.getOutput();
                        current.getOperatorContext().recordGetOutput(page);
                        // if we got an output page, add it to the next operator
                        if (page != null) {
                            next.getOperatorContext().startIntervalTimer();
                            next.addInput(page);
                            next.getOperatorContext().recordAddInput(page);
                        }
                    }
                }
            }
            return NOT_BLOCKED;
    ...
    }
}

順番に物理オペレータを実行していく

Operator

// presto-main/src/main/java/com/facebook/presto/operator/Operator.java:L22
public interface Operator
{
    ListenableFuture<?> NOT_BLOCKED = Futures.immediateFuture(null);

    OperatorContext getOperatorContext();

    List<Type> getTypes();

    void finish();

    boolean isFinished();

    ListenableFuture<?> isBlocked();

    boolean needsInput();

    void addInput(Page page);

    Page getOutput();
}

@danieltahara
Copy link

これはとても便利です。 書いてくれてありがとうございました。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment