http://hadoop-scr13th.eventbrite.com/
- github repository
- No SpringSource!
- For low latency compared to MapReduce
- Mainly data analytics
- Join and Sort are the bomb!
- Java Frontend and C++ Backend
- Impala Q&A
- User queries HiveQL to Impalad using Thrift API
- Frontend Planner generates Query Plan Tree using meta information
- Backend Coordinator sends an execution request to each query execution engine
- Backend Query execution engine executes a query fragment, e.g. HDFS Scan, Aggregation, Merge and etc
a. State Store notifies Impalad when cluster state changed
This presentaion focuses 3 and a steps.
-
ImpalaServer#query (Async API)
-
ImpalaServer#Execute with QueryExecState
-
ImpalaServer#ExecuteInternal
- Call ImpalaServer#GetExecRequest for creating TExecRequest via FE
- Register exec_state with query id
- After call QueryExecState#Exec, stores query location for runninig hosts (map<Host, QueryId>)
-
QueryExecState#Exec
- Condition check whether Impala uses Coordinator or not
- Call Coordinator#Exec
- Call QueryExecState#PrepareSelectListExprs. In this phase, Impala doesn't use LLVM Codegen.
-
Coordinator#Exec
- Call Coordinator#ComputeFragmentExecParams
- Use Coordinator#ComputeFragmentHosts for computing host, which executes a fragment. This function considers data localtion(SimpleScheduler#GetHosts), input fragment locatinon(exchange node) and coodinator location(single node).
- Call Coordinator#ComputeScanRangeAssignment
- Generates vector<map<host, map<node_id, vector<scan_range_params>>>>. vector's index is fragment id.
- Call PlanFragmentExecutor#Prepare(ask oza_x86 for more details).
- Setup profiles and create BackendExecState for each fragment.
- Execute Coordinator#ExecRemoteFragment using ParallelExecutor::Exec
- ExecRemoteFragment call ImpaladServer#ExecPlanFragment using BE client
- See ImpalaServer#ExecPlanFragment
- Invoke a thread for each fragment
- If a fragment execution failed, Coordinator caccels other fragments. No recovery.
- ExecRemoteFragment call ImpaladServer#ExecPlanFragment using BE client
- Call Coordinator#ComputeFragmentExecParams
-
SimpleScheduler#GetHosts
- all running impalad listed in map<host, list<port>>. This function checks ScanNode location using this map for data locality.
- State Store for sharing running Impalad via SubscriptionManager
-
ParallelExecutor#Exec
- Execute multiple functions with thread in parallel
$ be/src/service/impala-servser.{h,cc}
- Provide Async API
- Construct a TExecRequest via FE
- TExecRequest has Query Plan Tree
- Store query id and its state
- Fetch, Cancel and other APIs use those states
- One instance per request
- Capture all states for request execution
- Convert Coordinator’s result(RowBatch) to client result
$ be/src/runtime/coordinator.{h,cc}
- Used by ImpalaServer for query execution
- Called in QueryExecState#Exec
- Use SimpleScheduler for data locality
- Call GetHosts in Coordinator#Exec
- ImpalaServer gets query result via Coordinator in distributed environment
$ fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java
$ fe/src/main/java/com/cloudera/impala/planner/Planner.java
- UNPARTITIONED type is single-node fragment(Merge, Sort and others). And single-node fragment runs on the coordinator host
- Not UNPARTITIONED fragments are ScanNode, Partitioned AggregationNode and others. Those fragments are executed on each host
$ be/src/sparrow/simple-scheduler.{h, cc}
- Data locality and round robin approaches
- If data location hits Impalad running host, uses its host. Otherwise, use round robin
$ be/src/sparrow/state-store.{h, cc}
$ be/src/sparrow/subscription-manager.{h, cc}
$ be/src/sparrow/failure-detector.{h, cc}
- Using State Store Server and Subscriber
- StateStore uses heartbeat to detect failure
- ImpalaServer and SimpleScheduler register own Subscriber
- SubscriptionManager notifies Impalads using Subscriber callback
- If disable state store, SimpleScheduler uses fixed backend, not dynamic
- Impala Coordinator doesn't consider cluster load
- Sparrow scheduler is simple and doesn't depend on Original Sparrow.
- If a fragment execution failed, query itself failed. Please retry!
Probably, Impala Sparrow implementor is the one of Sparrow developers.
- Distributed cluster scheduler by AMPLAB
- For high throughput and low latency
- compared to YARN, Mesos and others
- https://github.com/radlab/sparrow
- Original implementation is java
- Decentralized scheduler
- No shared state about cluster load
- instantaneous info from slaves for task distribution
- Scheduler, Lanucher, Placer, Monitor
- Support Algorithms
- Round Robin, FIFO, Batch Sampling, etc...
My post about HSCR 13th
http://repeatedly.github.com/2012/11/inside-impala-coordinator-at-hscr-13th/