Task scheduling in Spark on MR3 is obtained as a special case of Task scheduling in MR3. A Spark job can create many Spark stages, each of which is converted to an MR3 DAG consisting of a single Vertex. Hence there is no need to analyze the dependency between Tasks, and TaskScheduler of Spark on MR3 skips the optimization implemented for increasing the temporal locality of intermediate data. As a new component, TaskScheduler of Spark on MR3 implements delay scheduling (similarly to vanilla Spark but in its own way). Note that Task scheduling (managed by TaskScheduler) is independent of recycling ContainerWorkers (managed by ContainerScheduler), although both deal with scheduling policies such as fair scheduling and FIFO scheduling.

Task scheduling in Spark on MR3 is controlled by four configuration parameters inherited from MR3 and a new configuration parameter for delay scheduling.

  • mr3.dag.queue.scheme can be set to either common or individual.
  • mr3.dag.priority.scheme can be set to either fifo or concurrent.
  • mr3.taskattempt.queue.scheme is fixed at opt.
  • mr3.vertex.priority.scheme can be set to either intact or normalize.
  • mr3.spark.delay.scheduling.interval.ms specifies the interval (in milliseconds) of checking delay scheduling.

Mapping DAGs to Task queues

Spark on MR3 uses the configuration key mr3.dag.queue.scheme to assign DAGs to Task queues.

  • If it is set to common (which is the default value), Spark on MR3 uses a common Task queue for DAGs from all Spark jobs.
  • If it is set to individual, Spark on MR3 creates a Task queue for each individual Spark job (not for each individual DAG). This setting corresponds to fair scheduling within a Spark application in vanilla Spark (when spark.scheduler.mode is set to FAIR).

Assigning DAG priorities

Similarly to MR3, the priority of a Task is determined by 1) the priority of its DAG and 2) the priority of its Vertex where the DAG priority takes precedence over the Vertex priority.

Spark on MR3 uses the configuration key mr3.dag.priority.scheme to assign DAG priorities.

  • If it is set to fifo (which is the default value), Spark on MR3 uses Spark job priorities for DAG priorities. Hence DAGs from an earlier Spark job are assigned a higher priority.
  • If it is set to concurrent, Spark on MR3 assigns the same DAG priority (namely zero) to all DAGs.

Note that if mr3.dag.queue.scheme is set to individual, the user may ignore mr3.dag.priority.scheme because every Spark job maintains its own Task queue.

Spark MR3 uses the configuration key mr3.vertex.priority.scheme to update Vertex priorities. Since every DAG consists of a single Vertex, it effectively decides DAG priorities within the same Spark job.

  • intact: Spark on MR3 uses Spark stage IDs for Vertex priorities.
  • normalize (which is the default value): Spark on MR3 set all Vertex priorities to zero.

Delay scheduling

If mr3.spark.delay.scheduling.interval.ms is set to zero, delay scheduling is not used and no ContainerWorkers are left idle unless Task queues are empty.

If mr3.spark.delay.scheduling.interval.ms is set to a value greater than zero, TaskScheduler uses delay scheduling by postponing the execution of Tasks whose location hints have no matching ContainerWorkers. TaskScheduler tries to execute such Tasks without considering their location hints at a regular interval specified by mr3.spark.delay.scheduling.interval.ms. Thus the strategy is a bit different from delay scheduling in vanilla Spark, in which all those Tasks whose location hints have no matching Spark executors get executed once an initial delay specified by spark.locality.wait passes.

To use fair scheduling among Spark jobs (submitted by multiple threads) within the same Spark application, mr3.dag.queue.scheme should be set to individual.

To use FIFO scheduling among Spark jobs, set mr3.dag.queue.scheme to common and mr3.dag.priority.scheme to fifo.

Usually setting mr3.vertex.priority.scheme to normalize works okay.