As it allows concurrent DAGs to share ContainerWorkers, MR3 employs a couple of schemes for scheduling Tasks originating from different DAGs in order to maximize the throughput (i.e., the number of queries completed per unit time) or minimize the turnaround time (i.e., the time from accepting a query to completing it). Internally scheduling Tasks in MR3 is managed by a component called TaskScheduler which basically 1) maps DAGs to Task queues and 2) schedules Tasks in a particular Task queue according to their priorities. Hence the effectiveness of TaskScheduler depends to a large extent on the way that MR3 maps DAGs to Task queues and assigns priorities to Tasks. In the actual implementation, TaskScheduler sometimes ignores Task priorities in order to maximize the cluster utilization.
Mapping DAGs to Task queues
MR3 uses a configuration key mr3.dag.queue.scheme
to assign DAGs to Task queues.
- If it is set to
common
(which is the default value), MR3 uses a common Task queue for all DAGs. - If it is set to
individual
, MR3 creates a Task queue for each individual DAG. - If it is set to
capacity
, MR3 uses capacity scheduling.
If mr3.dag.queue.scheme
is set to individual
,
TaskScheduler tries to schedule a Task from the DAG
that is currently consuming the least amount of resources in terms of memory.
In this way, MR3 makes its best efforts to allocate the same amount of resources to all active DAGs.
Since a new DAG is instantly allocated its fair share of resources
while an old DAG is not penalized for its long duration,
the execution time of a DAG is predictable in concurrent environments.
For example,
if a DAG completes in 100 seconds with no concurrent DAGs,
it has a reasonable chance of completing in around 100 * 10 = 1000 seconds with 9 concurrent DAGs of the same structure.
In concurrent environments,
setting mr3.dag.queue.scheme
to common
usually achieves a higher throughput than setting it to individual
.
This is because
the optimization implemented for increasing the temporal locality of intermediate data works best
when MR3 is allowed to analyze the dependency between Tasks of all DAGs at once.
For example,
when the entire input of a Task becomes available,
MR3 may decide to schedule it immediately even in the presence of many Tasks of higher priorities.
If mr3.dag.queue.scheme
is set to individual
, however,
the optimization is much less effective because
MR3 can analyze only the dependency between Tasks belonging to the same DAG.
In short,
setting mr3.dag.queue.scheme
to common
enables MR3 to globally optimize the execution of all DAGs
whereas setting it to individual
does not.
Capacity scheduling
If mr3.dag.queue.scheme
is set to capacity
,
MR3 uses capacity scheduling by managing multiple Task queues with different priorities and capacity requirements.
The user specifies the policy for capacity scheduling with another configuration key mr3.dag.queue.capacity.specs
.
Each entry consists of the name of a Task queue and the minimum capacity in percentage.
Task queues are specified in the order of priority.
As an example,
setting mr3.dag.queue.capacity.specs
to high=80,medium=50,default=20,background=0
is interpreted as follows.
- MR3 creates four Task queues (
high
,medium
,default
,background
) in the order of priority. That is, Task queuehigh
is assigned the highest priority while Task queuebackground
is assigned the lowest priority. - Task queue
high
is guaranteed 80 percent of resources. Idle resources not claimed byhigh
, however, can be allocated to Task queues with lower priorities. - Task queue
medium
is guaranteed 50 percent of resources ifhigh
consumes no more than 50% of resources. - Task queue
default
is guaranteed 20 percent of resources ifhigh
andmedium
consume no more than 80% of resources. - Task queue
background
is guaranteed no resources at all. Hence it consumes resources only when no other Task queues request resources.
As another example,
assume that mr3.dag.queue.capacity.specs
is set to high=40,medium=20,default=10,background=0
.
After meeting capacity requirements of Task queues high
, medium
, and default
,
30 percent of resources still remain.
In such a case,
MR3 allocates remaining resources to Task queues with higher priorities first.
As a result,
Task queue background
is allocated no resources
as long as the other Task queues with higher priorities request the remaining 30 percent of resources.
As a special rule, if a Task queue default
is not specified,
MR3 automatically appends default:0
to the value for mr3.dag.queue.capacity.specs
.
For example,
foo=80,bar=20
is automatically expanded to foo=80,bar=20,default=0
.
Hence a Task queue default
is always available under capacity scheduling.
A DAG specifies its Task queue with the configuration key mr3.dag.queue.name
(which is effective only with capacity scheduling).
If a DAG chooses a non-existent Task queue or does not specify its Task queue,
it is assigned to the Task queue default
.
TaskScheduler logs the status of capacity scheduling periodically
(every 10 seconds by default).
In the following example, we see that
Task queue high
is consuming 36 percent of resources with 208 pending TaskAttempts.
2022-08-12T09:10:48,635 INFO [All-In-One] TaskAttemptQueue$: DAGScheduler All-In-One 2211840MB: high = 208/36%, medium = 2371/43%, default = 169/20%, background = 856/0%
MR3 never preempts TaskAttempts running in Task queues with lower priorities to find resources requested by Task queues with higher priorities. Hence even the Task queue with the highest priority may have to wait for a while if all the resources are being used by Tasks queues with lower priorities.
Assigning Task priorities
In MR3, the priority of a Task is determined by 1) the priority of its DAG and 2) the priority of its Vertex where the DAG priority takes precedence over the Vertex priority. For example, Tasks with a higher DAG priority have higher priorities in TaskScheduler than Tasks with a lower DAG priority regardless of their Vertex priorities. For Tasks with the same DAG priority, TaskScheduler considers their Vertex priorities.
MR3 uses a configuration key mr3.dag.priority.scheme
to assign DAG priorities.
- If it is set to
fifo
(which is the default value), MR3 assign DAG priorities sequentially. That is, the first DAG is assigned DAG priority 0, the second DAG is assigned DAG priority 1, and so on. - If it is set to
concurrent
, MR3 assigns the same DAG priority to all DAGs.
Note that if mr3.dag.queue.scheme
is set to individual
, the user may ignore mr3.dag.priority.scheme
because every DAG maintains its own Task queue.
MR3 uses another configuration key mr3.vertex.priority.scheme
to update Vertex priorities.
The interpretation of the configuration key mr3.vertex.priority.scheme
is as follows.
intact
(which is the default value): MR3 does not update Vertex priorities specified in the DAG.roots
: MR3 recalculates Vertex priorities based on the distance from root Vertexes that read input data.leaves
: MR3 recalculates Vertex priorities based on the distance to leaf Vertexes that produce output data.postorder
: MR3 recalculates Vertex priorities based on the postorder traversal of the DAG.normalize
: MR3 normalizes Vertex priorities in the range between 0 and 12252240 (2^4 * 3^2 * 5 * 7 * 11 * 13 * 17, a highly composite number).
Note that if mr3.dag.priority.scheme
is set to fifo
,
the user may set mr3.vertex.priority.scheme
to any value
because all DAGs are assigned different DAG priorities and
mr3.vertex.priority.scheme
does not heavily affect the performance.
The precise definition of roots
/leaves
/postorder
/normalize
is a lot more subtle than it appears
because in general, DAGs are not single-rooted trees.
For our purpose, we content ourselves with an example that shows the result of assigning Vertex priorities to the same DAG.
The following diagram shows the result of assigning Vertex priorities to a DAG that Hive on MR3 generates
from query 44 of the TPC-DS benchmark.
Note that in the case of postorder
, all Vertexes have different priorities.
Recommended settings in concurrent environments
If every user should be allocated a strictly fair share of cluster resources,
we recommend the configuration key mr3.dag.queue.scheme
set to individual
.
If the throughput or the turnaround time is the primary concern,
the configuration key mr3.dag.queue.scheme
should be set to common
with the following recommendations.
- If every user submits DAGs of similar characteristics,
set
mr3.dag.priority.scheme
tofifo
in order to maximize the throughput. - If not, set
mr3.dag.priority.scheme
toconcurrent
.- If most users submit small DAGs (e.g., interactive jobs)
while a small number of large DAGs (e.g., ETL jobs) are constantly running in the background,
set
mr3.vertex.priority.scheme
topostorder
orleaves
in order to minimize the turnaround time for small DAGs. - If every user should be allocated a roughly fair share of cluster resources,
set
mr3.vertex.priority.scheme
tonormalize
. - If the client should decide Vertex priorities,
set
mr3.vertex.priority.scheme
tointact
.
- If most users submit small DAGs (e.g., interactive jobs)
while a small number of large DAGs (e.g., ETL jobs) are constantly running in the background,
set
Experiment 1: Every Beeline client submits the same set of queries.
In the first experiment, we run 16 Beeline clients each of which submits 17 queries, query 25 to query 40, from the TPC-DS benchmark. (The scale factor for the TPC-DS benchmark is 10TB.) When executed sequentially, these queries complete in 46 seconds on average while the longest-running query takes about 110 seconds. In order to better simulate a realistic environment, each Beeline client submits these 17 queries in a unique sequence.
The following graph shows the progress of every Beeline client (where the y-axis denotes the elapsed time). Each color corresponds to a unique query across all Beeline clients.
- We set
mr3.dag.queue.scheme
tocommon
. - The graph labeled
fifo
setsmr3.dag.priority.scheme
tofifo
andmr3.vertex.priority.scheme
tointact
. - Every remaining graph sets
mr3.dag.priority.scheme
toconcurrent
andmr3.vertex.priority.scheme
to its label.
We make the following observation.
- Setting
mr3.dag.priority.scheme
tofifo
yields the shortest running time, i.e., the highest throughput.
Experiment 2: Every Beeline client submits a unique query repeatedly.
In the second experiment, we run 8 Beeline clients each of which submits a unique query from the TPC-DS benchmark a total of 10 times. (The scale factor for the TPC-DS benchmark is 10TB.) The following table shows the properties of the 8 queries. We may think of Beeline 1 and 2 as executing short-running interactive jobs, and Beeline 7 and 8 as executing long-running ETL jobs.
Beeline | Query | Number of Vertexes | Execution time in a sequential run (seconds) |
---|---|---|---|
Beeline 1 (leftmost, red) | Query 91 | 9 | 5.479 |
Beeline 2 | Query 3 | 4 | 24.959 |
Beeline 3 | Query 57 | 10 | 36.549 |
Beeline 4 | query 30 | 11 | 52.502 |
Beeline 5 | query 5 | 18 | 77.906 |
Beeline 6 | query 29 | 13 | 99.199 |
Beeline 7 | query 50 | 9 | 273.457 |
Beeline 8 (rightmost, pink) | query 64 | 31 | 424.751 |
The following graphs shows the progress of every Beeline client.
We make the following observations.
- Setting
mr3.dag.queue.scheme
toindividual
achieves the fairest distribution of resources among Beeline clients because the total execution time for a Beeline client is approximately proportional to the execution time of its query in a sequential run. Note that settingmr3.dag.priority.scheme
toconcurrent
andmr3.vertex.priority.scheme
tonormalize
produces a completely different result. For example, every Beeline client spends about the same amount of time for the first execution of its query. - If
mr3.dag.priority.scheme
is set toconcurrent
, settingmr3.vertex.priority.scheme
topostorder
yields both the highest throughput and the shortest turnaround time. Settingmr3.vertex.priority.scheme
toleaves
is the second best choice.