-
This document is created for Presto Source Code Reading #1.
-
Target: trunk code, Feb 10th, 2014
- Main Topic: Coordinator and Executor ./presto-main/src/main/java/com/facebook/presto/executions
- http://www.slideshare.net/frsyuki/hadoop-source-code-reading-15-in-japan-presto/10
-
NOTE
- All source code noted here is quoted from https://github.com/facebook/presto. License bases on https://github.com/facebook/presto/blob/master/LICENSE.
- This document can include something wrong. Pull requests to fix them are always welcome :-)
-
Prerequisite
-
Google Guice(Dependency Injection utility)
-
Google Guava(ListenableFuture, Argument check utility, etc.)
-
Java 7
- HTTP Post request to "/v1/query"
- SqlQueryManager#createQuery()
- SqlQueryExecution#start()
- SqlQueryManager#createQuery()
@Path("/v1/query")
public class QueryResource {
private final QueryManager queryManager;
...
@POST
@Produces(MediaType.APPLICATION_JSON)
public Response createQuery(String query,
@HeaderParam(PRESTO_USER) String user,
@HeaderParam(PRESTO_SOURCE) String source,
@HeaderParam(PRESTO_CATALOG) @DefaultValue(DEFAULT_CATALOG) String catalog,
@HeaderParam(PRESTO_SCHEMA) @DefaultValue(DEFAULT_SCHEMA) String schema,
@HeaderParam(USER_AGENT) String userAgent,
@Context HttpServletRequest requestContext,
@Context UriInfo uriInfo)
{
checkNotNull(query, "query is null");
checkArgument(!query.isEmpty(), "query is empty");
checkNotNull(catalog, "catalog is null");
checkNotNull(schema, "schema is null");
String remoteUserAddress = requestContext.getRemoteAddr();
QueryInfo queryInfo = queryManager.createQuery(new Session(user, source, catalog, schema, remoteUserAddress, userAgent), query); // Dive into this code!
URI pagesUri = uriBuilderFrom(uriInfo.getRequestUri()).appendPath(queryInfo.getQueryId().toString()).build();
return Response.created(pagesUri).entity(queryInfo).build();
}
...
}
@ThreadSafe
public class SqlQueryManager
implements QueryManager
{
...
@Override
public QueryInfo createQuery(Session session, String query)
{
checkNotNull(query, "query is null");
Preconditions.checkArgument(!query.isEmpty(), "query must not be empty string");
QueryId queryId = queryIdGenerator.createNextQueryId();
Statement statement;
// assume to be returned Query class
statement = SqlParser.createStatement(query);
...
// executionFactories is initialized by Guice(CoordinatorModule#setup()).
QueryExecutionFactory<?> queryExecutionFactory = executionFactories.get(statement.getClass());
final QueryExecution queryExecution = queryExecutionFactory.createQueryExecution(queryId, query, session, statement);
// start the query in the background
queryExecutor.submit(new QueryStarter(queryExecution, stats));
// -> call queryExecution#start()
...
return queryExecution.getQueryInfo();
}
...
}
@ThreadSafe
public class SqlQueryExecution
implements QueryExecution
{
...
@Override
public void start()
{
try (SetThreadName setThreadName = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
try {
// transition to planning
if (!stateMachine.beginPlanning()) {
// query already started or finished
return;
}
// analyze query
SubPlan subplan = analyzeQuery();
// plan distribution of query: initialize outputStage
planDistribution(subplan);
// transition to starting
if (!stateMachine.starting()) {
// query already started or finished
return;
}
// if query is not finished, start the stage, otherwise cancel it
SqlStageExecution stage = outputStage.get();
if (!stateMachine.isDone()) {
stage.start();
}
else {
stage.cancel(true);
}
}
catch (Throwable e) {
fail(e);
Throwables.propagateIfInstanceOf(e, Error.class);
}
}
}
....
}
SQL(String) -- [SQLParser] --> Query -- [SqlQueryExecution#analayzeQuery] --> SubPlan -- [SqlQueryExecution#planDistribution()] --> SqlStageExecution
-
SubPlan: Logical Plan
- Tree structure
- fragment: body of the node
- children: child node of the node
- Tree structure
-
SqlStageExecution:
- Select Node to execute RemoteTask by NodeScheduler
- Wrapper of ExecutorService, which run RemoteTasks
- ExecutorService launches HttpRemoteTask via createRemoteTask()
- HttpRemoteTask#start() - HTTP Request -> Worker
- TaskResource#createOrUpdateTask() -> SqlTaskManager#updateTask()
- SqlTaskExecution.createSqlTaskExecution()
- TaskResource#createOrUpdateTask() -> SqlTaskManager#updateTask()
- SqlTaskExecution:
- LocalExecutionPlanner#plan(): Transforming Logical Plan into Physical Plan
- Driver#process():
/**
* Manages tasks on this worker node
*/
@Path("/v1/task")
public class TaskResource
{
private static final DataSize DEFAULT_MAX_SIZE = new DataSize(10, Unit.MEGABYTE);
private static final Duration DEFAULT_MAX_WAIT_TIME = new Duration(1, SECONDS);
private final TaskManager taskManager;
...
@POST
@Path("{taskId}")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response createOrUpdateTask(@PathParam("taskId") TaskId taskId, TaskUpdateRequest taskUpdateRequest, @Context UriInfo uriInfo)
{
checkNotNull(taskUpdateRequest, "taskUpdateRequest is null");
TaskInfo taskInfo = taskManager.updateTask(taskUpdateRequest.getSession(),
taskId,
taskUpdateRequest.getFragment(),
taskUpdateRequest.getSources(),
taskUpdateRequest.getOutputIds());
return Response.ok().entity(taskInfo).build();
}
...
}
public class SqlTaskManager
implements TaskManager
{
private final TaskExecutor taskExecutor;
...
@Override
public TaskInfo updateTask(Session session, TaskId taskId, PlanFragment fragment, List<TaskSource> sources, OutputBuffers outputBuffers)
{
URI location = locationFactory.createLocalTaskLocation(taskId);
TaskExecution taskExecution;
synchronized (this) {
taskExecution = tasks.get(taskId);
if (taskExecution == null) {
// is task already complete?
TaskInfo taskInfo = taskInfos.get(taskId);
if (taskInfo != null) {
return taskInfo;
}
taskExecution = SqlTaskExecution.createSqlTaskExecution(session,
taskId,
location,
fragment,
sources,
outputBuffers,
planner,
maxBufferSize,
taskExecutor,
taskNotificationExecutor,
maxTaskMemoryUsage,
operatorPreAllocatedMemory,
queryMonitor,
cpuTimerEnabled
);
tasks.put(taskId, taskExecution);
}
}
taskExecution.recordHeartbeat();
taskExecution.addSources(sources);
taskExecution.addResultQueue(outputBuffers);
return getTaskInfo(taskExecution, false);
}
...
}
public class SqlTaskExecution
implements TaskExecution
{
...
private SqlTaskExecution(Session session,
TaskId taskId,
URI location,
PlanFragment fragment,
OutputBuffers outputBuffers,
LocalExecutionPlanner planner,
DataSize maxBufferSize,
TaskExecutor taskExecutor,
DataSize maxTaskMemoryUsage,
DataSize operatorPreAllocatedMemory,
QueryMonitor queryMonitor,
Executor notificationExecutor,
boolean cpuTimerEnabled)
{
...
LocalExecutionPlan localExecutionPlan = planner.plan(session, fragment.getRoot(), fragment.getSymbols(), new TaskOutputFactory(sharedBuffer));
List<DriverFactory> driverFactories = localExecutionPlan.getDriverFactories();
}
....
private synchronized void enqueueDrivers(boolean forceRunSplit, List<DriverSplitRunner> runners)
{
// schedule driver to be executed
List<ListenableFuture<?>> finishedFutures = taskExecutor.enqueueSplits(taskHandle, forceRunSplit, runners);
checkState(finishedFutures.size() == runners.size(), "Expected %s futures but got %s", runners.size(), finishedFutures.size());
// record new driver
remainingDrivers.addAndGet(finishedFutures.size());
// when driver completes, update state and fire events
for (int i = 0; i < finishedFutures.size(); i++) {
ListenableFuture<?> finishedFuture = finishedFutures.get(i);
final DriverSplitRunner splitRunner = runners.get(i);
Futures.addCallback(finishedFuture, new FutureCallback<Object>()
{
@Override
public void onSuccess(Object result)
{
try (SetThreadName setThreadName = new SetThreadName("Task-%s", taskId)) {
// record driver is finished
remainingDrivers.decrementAndGet();
checkTaskCompletion();
queryMonitor.splitCompletionEvent(taskId, splitRunner.getDriverContext().getDriverStats());
}
}
@Override
public void onFailure(Throwable cause)
{
try (SetThreadName setThreadName = new SetThreadName("Task-%s", taskId)) {
taskStateMachine.failed(cause);
// record driver is finished
remainingDrivers.decrementAndGet();
DriverContext driverContext = splitRunner.getDriverContext();
DriverStats driverStats;
if (driverContext != null) {
driverStats = driverContext.getDriverStats();
}
else {
// split runner did not start successfully
driverStats = new DriverStats();
}
// fire failed event with cause
queryMonitor.splitFailedEvent(taskId, driverStats, cause);
}
}
}, notificationExecutor);
}
}
...
//
// This code starts registers a callback with access to this class, and this
// call back is access from another thread, so this code can not be placed in the constructor
private void start()
{
// start unpartitioned drivers
List<DriverSplitRunner> runners = new ArrayList<>();
for (DriverSplitRunnerFactory driverFactory : unpartitionedDriverFactories) {
runners.add(driverFactory.createDriverRunner(null));
driverFactory.setNoMoreSplits();
}
enqueueDrivers(true, runners);
}
...
}
@ThreadSafe
public class TaskExecutor
{
...
private class Runner
implements Runnable
{
private final long runnerId = NEXT_RUNNER_ID.getAndIncrement();
@Override
public void run()
{
try (SetThreadName runnerName = new SetThreadName("SplitRunner-%s", runnerId)) {
while (!closed && !Thread.currentThread().isInterrupted()) {
// select next worker
final PrioritizedSplitRunner split;
try {
split = pendingSplits.take();
if (split.updatePriorityLevel()) {
// priority level changed, return split to queue for re-prioritization
pendingSplits.put(split);
continue;
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
try (SetThreadName splitName = new SetThreadName(split.getTaskHandle().getTaskId() + "-" + split.getSplitId())) {
runningSplits.add(split);
boolean finished;
ListenableFuture<?> blocked;
try {
blocked = split.process();
finished = split.isFinished();
}
finally {
runningSplits.remove(split);
}
if (finished) {
log.debug("%s is finished", split.getInfo());
splitFinished(split);
}
else {
if (blocked.isDone()) {
pendingSplits.put(split);
}
else {
blockedSplits.add(split);
blocked.addListener(new Runnable()
{
@Override
public void run()
{
blockedSplits.remove(split);
split.updatePriorityLevel();
pendingSplits.put(split);
}
}, executor);
}
}
}
catch (Throwable t) {
log.error(t, "Error processing %s", split.getInfo());
splitFinished(split);
}
}
}
finally {
// unless we have been closed, we need to replace this thread
if (!closed) {
addRunnerThread();
}
}
}
}
...
}
- multitenancy is assured by priority queue
private static class PrioritizedSplitRunner
implements Comparable<PrioritizedSplitRunner>
{
...
public ListenableFuture<?> process()
throws Exception
{
try {
start.compareAndSet(0, System.currentTimeMillis());
processCalls.incrementAndGet();
CpuTimer timer = new CpuTimer();
ListenableFuture<?> blocked = split.processFor(SPLIT_RUN_QUANTA);
CpuTimer.CpuDuration elapsed = timer.elapsedTime();
// update priority level base on total thread usage of task
long durationNanos = elapsed.getWall().roundTo(TimeUnit.NANOSECONDS);
long threadUsageNanos = taskHandle.addThreadUsageNanos(durationNanos);
this.threadUsageNanos.set(threadUsageNanos);
priorityLevel.set(calculatePriorityLevel(threadUsageNanos));
// record last run for prioritization within a level
lastRun.set(ticker.read());
cpuTime.addAndGet(elapsed.getCpu().roundTo(TimeUnit.NANOSECONDS));
return blocked;
}
catch (Throwable e) {
finishedFuture.setException(e);
throw e;
}
}
...
}