Task scheduling in Spark on MR3 is obtained as a special case of Task scheduling in MR3. A Spark job can create many Spark stages, each of which is converted to an MR3 DAG consisting of a single Vertex. Hence there is no need to analyze the dependency between Tasks, and TaskScheduler of Spark on MR3 skips the optimization implemented for increasing the temporal locality of intermediate data. As a new component, TaskScheduler of Spark on MR3 implements delay scheduling (similarly to vanilla Spark but in its own way). Note that Task scheduling (managed by TaskScheduler) is independent of recycling ContainerWorkers (managed by ContainerScheduler), although both deal with scheduling policies such as fair scheduling and FIFO scheduling.
Task scheduling in Spark on MR3 is controlled by four configuration parameters inherited from MR3 and a new configuration parameter for delay scheduling.
mr3.dag.queue.scheme
can be set to eithercommon
orindividual
.mr3.dag.priority.scheme
can be set to eitherfifo
orconcurrent
.mr3.taskattempt.queue.scheme
is fixed atopt
.mr3.vertex.priority.scheme
can be set to eitherintact
ornormalize
.mr3.spark.delay.scheduling.interval.ms
specifies the interval (in milliseconds) of checking delay scheduling.
Mapping DAGs to Task queues
Spark on MR3 uses the configuration key mr3.dag.queue.scheme
to assign DAGs to Task queues.
- If it is set to
common
(which is the default value), Spark on MR3 uses a common Task queue for DAGs from all Spark jobs. - If it is set to
individual
, Spark on MR3 creates a Task queue for each individual Spark job (not for each individual DAG). This setting corresponds to fair scheduling within a Spark application in vanilla Spark (whenspark.scheduler.mode
is set toFAIR
).
Assigning DAG priorities
Similarly to MR3, the priority of a Task is determined by 1) the priority of its DAG and 2) the priority of its Vertex where the DAG priority takes precedence over the Vertex priority.
Spark on MR3 uses the configuration key mr3.dag.priority.scheme
to assign DAG priorities.
- If it is set to
fifo
(which is the default value), Spark on MR3 uses Spark job priorities for DAG priorities. Hence DAGs from an earlier Spark job are assigned a higher priority. - If it is set to
concurrent
, Spark on MR3 assigns the same DAG priority (namely zero) to all DAGs.
Note that if mr3.dag.queue.scheme
is set to individual
,
the user may ignore mr3.dag.priority.scheme
because every Spark job maintains its own Task queue.
Spark MR3 uses the configuration key mr3.vertex.priority.scheme
to update Vertex priorities.
Since every DAG consists of a single Vertex,
it effectively decides DAG priorities within the same Spark job.
intact
: Spark on MR3 uses Spark stage IDs for Vertex priorities.normalize
(which is the default value): Spark on MR3 set all Vertex priorities to zero.
Delay scheduling
If mr3.spark.delay.scheduling.interval.ms
is set to zero,
delay scheduling is not used and no ContainerWorkers are left idle unless Task queues are empty.
If mr3.spark.delay.scheduling.interval.ms
is set to a value greater than zero,
TaskScheduler uses delay scheduling by postponing the execution of Tasks whose location hints have no matching ContainerWorkers.
TaskScheduler tries to execute such Tasks without considering their location hints
at a regular interval specified by mr3.spark.delay.scheduling.interval.ms
.
Thus the strategy is a bit different from delay scheduling in vanilla Spark,
in which all those Tasks whose location hints have no matching Spark executors get executed
once an initial delay specified by spark.locality.wait
passes.
Recommended settings for Task scheduling within a Spark application
To use fair scheduling among Spark jobs (submitted by multiple threads) within the same Spark application,
mr3.dag.queue.scheme
should be set to individual
.
To use FIFO scheduling among Spark jobs,
set mr3.dag.queue.scheme
to common
and mr3.dag.priority.scheme
to fifo
.
Usually setting mr3.vertex.priority.scheme
to normalize
works okay.