Skip to content

Instantly share code, notes, and snippets.

@oza
Last active December 10, 2019 22:43
Show Gist options
  • Save oza/8912147 to your computer and use it in GitHub Desktop.
Save oza/8912147 to your computer and use it in GitHub Desktop.

Presto source code reading #1

Overview of Coordinator's code path

  • HTTP Post request to "/v1/query"
    • SqlQueryManager#createQuery()
      • SqlQueryExecution#start()
@Path("/v1/query")
public class QueryResource {
    private final QueryManager queryManager;
    ...
    @POST
    @Produces(MediaType.APPLICATION_JSON)
    public Response createQuery(String query,
            @HeaderParam(PRESTO_USER) String user,
            @HeaderParam(PRESTO_SOURCE) String source,
            @HeaderParam(PRESTO_CATALOG) @DefaultValue(DEFAULT_CATALOG) String catalog,
            @HeaderParam(PRESTO_SCHEMA) @DefaultValue(DEFAULT_SCHEMA) String schema,
            @HeaderParam(USER_AGENT) String userAgent,
            @Context HttpServletRequest requestContext,
            @Context UriInfo uriInfo)
    {
        checkNotNull(query, "query is null");
        checkArgument(!query.isEmpty(), "query is empty");
        checkNotNull(catalog, "catalog is null");
        checkNotNull(schema, "schema is null");

        String remoteUserAddress = requestContext.getRemoteAddr();
        QueryInfo queryInfo = queryManager.createQuery(new Session(user, source, catalog, schema, remoteUserAddress, userAgent), query); // Dive into this code!
        URI pagesUri = uriBuilderFrom(uriInfo.getRequestUri()).appendPath(queryInfo.getQueryId().toString()).build();
        return Response.created(pagesUri).entity(queryInfo).build();
    }
    ...
}
@ThreadSafe
public class SqlQueryManager
        implements QueryManager
{
    ...
    @Override
    public QueryInfo createQuery(Session session, String query)
    {
        checkNotNull(query, "query is null");
        Preconditions.checkArgument(!query.isEmpty(), "query must not be empty string");

        QueryId queryId = queryIdGenerator.createNextQueryId();

        Statement statement;
        // assume to be returned Query class
        statement = SqlParser.createStatement(query);

        ...
        // executionFactories is initialized by Guice(CoordinatorModule#setup()).
        QueryExecutionFactory<?> queryExecutionFactory = executionFactories.get(statement.getClass());
        final QueryExecution queryExecution = queryExecutionFactory.createQueryExecution(queryId, query, session, statement);
        // start the query in the background
        queryExecutor.submit(new QueryStarter(queryExecution, stats));
          // -> call queryExecution#start()
        ...
        return queryExecution.getQueryInfo();
    }
    ...
}
@ThreadSafe
public class SqlQueryExecution
        implements QueryExecution
{
    ...
    @Override
    public void start()
    {
        try (SetThreadName setThreadName = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
            try {
                // transition to planning
                if (!stateMachine.beginPlanning()) {
                    // query already started or finished
                    return;
                }

                // analyze query
                SubPlan subplan = analyzeQuery();

                // plan distribution of query: initialize outputStage
                planDistribution(subplan);

                // transition to starting
                if (!stateMachine.starting()) {
                    // query already started or finished
                    return;
                }

                // if query is not finished, start the stage, otherwise cancel it
                SqlStageExecution stage = outputStage.get();

                if (!stateMachine.isDone()) {
                    stage.start();
                }
                else {
                    stage.cancel(true);
                }
            }
            catch (Throwable e) {
                fail(e);
                Throwables.propagateIfInstanceOf(e, Error.class);
            }
        }
    }
    ....
}

SQL Transformation overview

SQL(String) -- [SQLParser] --> Query -- [SqlQueryExecution#analayzeQuery] --> SubPlan -- [SqlQueryExecution#planDistribution()] --> SqlStageExecution

  • SubPlan: Logical Plan

    • Tree structure
      • fragment: body of the node
      • children: child node of the node
  • SqlStageExecution:

    1. Select Node to execute RemoteTask by NodeScheduler
    2. Wrapper of ExecutorService, which run RemoteTasks
    3. ExecutorService launches HttpRemoteTask via createRemoteTask()

Executor Overview

  • HttpRemoteTask#start() - HTTP Request -> Worker
    • TaskResource#createOrUpdateTask() -> SqlTaskManager#updateTask()
      • SqlTaskExecution.createSqlTaskExecution()
  1. SqlTaskExecution:
  2. LocalExecutionPlanner#plan(): Transforming Logical Plan into Physical Plan
  3. Driver#process():
/**
 * Manages tasks on this worker node
 */
@Path("/v1/task")
public class TaskResource
{
    private static final DataSize DEFAULT_MAX_SIZE = new DataSize(10, Unit.MEGABYTE);
    private static final Duration DEFAULT_MAX_WAIT_TIME = new Duration(1, SECONDS);

    private final TaskManager taskManager;
    ...

    @POST
    @Path("{taskId}")
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    public Response createOrUpdateTask(@PathParam("taskId") TaskId taskId, TaskUpdateRequest taskUpdateRequest, @Context UriInfo uriInfo)
    {
        checkNotNull(taskUpdateRequest, "taskUpdateRequest is null");

        TaskInfo taskInfo = taskManager.updateTask(taskUpdateRequest.getSession(),
                taskId,
                taskUpdateRequest.getFragment(),
                taskUpdateRequest.getSources(),
                taskUpdateRequest.getOutputIds());

        return Response.ok().entity(taskInfo).build();
    }
    ...
}
public class SqlTaskManager
        implements TaskManager
{
    private final TaskExecutor taskExecutor;
    ...
    @Override
    public TaskInfo updateTask(Session session, TaskId taskId, PlanFragment fragment, List<TaskSource> sources, OutputBuffers outputBuffers)
    {
        URI location = locationFactory.createLocalTaskLocation(taskId);

        TaskExecution taskExecution;
        synchronized (this) {
            taskExecution = tasks.get(taskId);
            if (taskExecution == null) {
                // is task already complete?
                TaskInfo taskInfo = taskInfos.get(taskId);
                if (taskInfo != null) {
                    return taskInfo;
                }

                taskExecution = SqlTaskExecution.createSqlTaskExecution(session,
                        taskId,
                        location,
                        fragment,
                        sources,
                        outputBuffers,
                        planner,
                        maxBufferSize,
                        taskExecutor,
                        taskNotificationExecutor,
                        maxTaskMemoryUsage,
                        operatorPreAllocatedMemory,
                        queryMonitor,
                        cpuTimerEnabled
                );
                tasks.put(taskId, taskExecution);
            }
        }

        taskExecution.recordHeartbeat();
        taskExecution.addSources(sources);
        taskExecution.addResultQueue(outputBuffers);

        return getTaskInfo(taskExecution, false);
    }
    ...
}
public class SqlTaskExecution
        implements TaskExecution
{
    ...
    private SqlTaskExecution(Session session,
            TaskId taskId,
            URI location,
            PlanFragment fragment,
            OutputBuffers outputBuffers,
            LocalExecutionPlanner planner,
            DataSize maxBufferSize,
            TaskExecutor taskExecutor,
            DataSize maxTaskMemoryUsage,
            DataSize operatorPreAllocatedMemory,
            QueryMonitor queryMonitor,
            Executor notificationExecutor,
            boolean cpuTimerEnabled)
    {
    ...
        
            LocalExecutionPlan localExecutionPlan = planner.plan(session, fragment.getRoot(), fragment.getSymbols(), new TaskOutputFactory(sharedBuffer));
            List<DriverFactory> driverFactories = localExecutionPlan.getDriverFactories();
    }
    ....


    private synchronized void enqueueDrivers(boolean forceRunSplit, List<DriverSplitRunner> runners)
    {
        // schedule driver to be executed
        List<ListenableFuture<?>> finishedFutures = taskExecutor.enqueueSplits(taskHandle, forceRunSplit, runners);
        checkState(finishedFutures.size() == runners.size(), "Expected %s futures but got %s", runners.size(), finishedFutures.size());

        // record new driver
        remainingDrivers.addAndGet(finishedFutures.size());

        // when driver completes, update state and fire events
        for (int i = 0; i < finishedFutures.size(); i++) {
            ListenableFuture<?> finishedFuture = finishedFutures.get(i);
            final DriverSplitRunner splitRunner = runners.get(i);
            Futures.addCallback(finishedFuture, new FutureCallback<Object>()
            {
                @Override
                public void onSuccess(Object result)
                {
                    try (SetThreadName setThreadName = new SetThreadName("Task-%s", taskId)) {
                        // record driver is finished
                        remainingDrivers.decrementAndGet();

                        checkTaskCompletion();

                        queryMonitor.splitCompletionEvent(taskId, splitRunner.getDriverContext().getDriverStats());
                    }
                }

                @Override
                public void onFailure(Throwable cause)
                {
                    try (SetThreadName setThreadName = new SetThreadName("Task-%s", taskId)) {
                        taskStateMachine.failed(cause);

                        // record driver is finished
                        remainingDrivers.decrementAndGet();

                        DriverContext driverContext = splitRunner.getDriverContext();
                        DriverStats driverStats;
                        if (driverContext != null) {
                            driverStats = driverContext.getDriverStats();
                        }
                        else {
                            // split runner did not start successfully
                            driverStats = new DriverStats();
                        }

                        // fire failed event with cause
                        queryMonitor.splitFailedEvent(taskId, driverStats, cause);
                    }
                }
            }, notificationExecutor);
        }
    }
    ...

    //
    // This code starts registers a callback with access to this class, and this
    // call back is access from another thread, so this code can not be placed in the constructor
    private void start()
    {
        // start unpartitioned drivers
        List<DriverSplitRunner> runners = new ArrayList<>();
        for (DriverSplitRunnerFactory driverFactory : unpartitionedDriverFactories) {
            runners.add(driverFactory.createDriverRunner(null));
            driverFactory.setNoMoreSplits();
        }
        enqueueDrivers(true, runners);
    }
    ...
}
@ThreadSafe
public class TaskExecutor
{
    ...

    private class Runner
            implements Runnable
    {
        private final long runnerId = NEXT_RUNNER_ID.getAndIncrement();

        @Override
        public void run()
        {
            try (SetThreadName runnerName = new SetThreadName("SplitRunner-%s", runnerId)) {
                while (!closed && !Thread.currentThread().isInterrupted()) {
                    // select next worker
                    final PrioritizedSplitRunner split;
                    try {
                        split = pendingSplits.take();
                        if (split.updatePriorityLevel()) {
                            // priority level changed, return split to queue for re-prioritization
                            pendingSplits.put(split);
                            continue;
                        }
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }

                    try (SetThreadName splitName = new SetThreadName(split.getTaskHandle().getTaskId() + "-" + split.getSplitId())) {
                        runningSplits.add(split);

                        boolean finished;
                        ListenableFuture<?> blocked;
                        try {
                            blocked = split.process();
                            finished = split.isFinished();
                        }
                        finally {
                            runningSplits.remove(split);
                        }

                        if (finished) {
                            log.debug("%s is finished", split.getInfo());
                            splitFinished(split);
                        }
                        else {
                            if (blocked.isDone()) {
                                pendingSplits.put(split);
                            }
                            else {
                                blockedSplits.add(split);
                                blocked.addListener(new Runnable()
                                {
                                    @Override
                                    public void run()
                                    {
                                        blockedSplits.remove(split);
                                        split.updatePriorityLevel();
                                        pendingSplits.put(split);
                                    }
                                }, executor);
                            }
                        }
                    }
                    catch (Throwable t) {
                        log.error(t, "Error processing %s", split.getInfo());
                        splitFinished(split);
                    }
                }
            }
            finally {
                // unless we have been closed, we need to replace this thread
                if (!closed) {
                    addRunnerThread();
                }
            }
        }
    }
    ...
}

MultiTenant feature

  • multitenancy is assured by priority queue
    private static class PrioritizedSplitRunner
            implements Comparable<PrioritizedSplitRunner>
    {
        ...

        public ListenableFuture<?> process()
                throws Exception
        {
            try {
                start.compareAndSet(0, System.currentTimeMillis());

                processCalls.incrementAndGet();
                CpuTimer timer = new CpuTimer();
                ListenableFuture<?> blocked = split.processFor(SPLIT_RUN_QUANTA);

                CpuTimer.CpuDuration elapsed = timer.elapsedTime();

                // update priority level base on total thread usage of task
                long durationNanos = elapsed.getWall().roundTo(TimeUnit.NANOSECONDS);
                long threadUsageNanos = taskHandle.addThreadUsageNanos(durationNanos);
                this.threadUsageNanos.set(threadUsageNanos);
                priorityLevel.set(calculatePriorityLevel(threadUsageNanos));

                // record last run for prioritization within a level
                lastRun.set(ticker.read());

                cpuTime.addAndGet(elapsed.getCpu().roundTo(TimeUnit.NANOSECONDS));
                return blocked;
            }
            catch (Throwable e) {
                finishedFuture.setException(e);
                throw e;
            }
        }
        ...
    }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment