After following the instruction in Install Spark on MR3, the user can configure Spark on MR3.

On Kubernetes, the user can configure Spark on MR3 by updating the files in the directory kubernetes/spark/conf.

$ ls kubernetes/spark/conf
core-site.xml  jgss.conf  log4j.properties  spark-defaults.conf
hive-site.xml  krb5.conf  mr3-site.xml      spark-env.sh

On Hadoop, the user can configure Spark on MR3 by updating the files in the configuration directories such as conf/cluster/mr3 and conf/cluster/spark.

$ ls conf
cluster  local  tpcds
$ ls conf/cluster/mr3
mr3-site.xml
$ ls conf/cluster/spark
hive-site.xml  log4j.properties  spark-defaults.conf  spark-env.sh

spark-defaults.conf

spark-defaults.conf contains default settings for Spark. Since Spark on MR3 does not modify vanilla Spark, it interprets all the configuration keys exactly in the same way. Spark on MR3 reads the following configuration keys to adjust the settings for MR3.

  • The sum of values specified by spark.executor.memory and spark.executor.memoryOverhead becomes the memory size of ContainerWorker Pods/containers. For example, spark.executor.memory=73728m and spark.executor.memoryOverhead=8192m creates ContainerWorker Pods/containers each with 80GB of memory.
  • As in vanilla Spark, the value for spark.executor.memory becomes the heap size of Java VM for ContainerWorkers (corresponding to -Xmx option). Internally Spark on MR3 derives the value for mr3.container.max.java.heap.fraction as spark.executor.memory / (spark.executor.memory + spark.executor.memoryOverhead).
  • For the maximum number of concurrent tasks in a ContainerWorker, Spark on MR3 uses spark.executor.cores / spark.task.cpus. For example, setting spark.executor.cores=20 and spark.task.cpus=2 means that 10 Spark tasks can be executed concurrently in each ContainerWorker.
  • spark.local.dir specifies the list of local directories to be used by the Spark Driver (on both Yarn and Kubernetes) and external shuffle service (on Kubernetes only).

Note that we do not set spark.driver.cores and spark.driver.memory for the resources of the Spark driver in spark-defaults.conf because we may run multiple Spark drivers with different resource requirements.

Spark on MR3 also introduces a few configuration keys of its own.

Name Default value Description
spark.mr3.appid Application ID of an existing DAGAppMaster to which the Spark driver connects. Use only on Hadoop and do not use on Kubernetes.
spark.mr3.keep.am true (On Hadoop only) true: do not kill DAGAppMaster when the Spark driver terminates. false: kill DAGAppMaster when the Spark driver terminates.
spark.mr3.client.connect.timeout.ms 30000 Time in milliseconds for trying to connect to DAGAppMaster
spark.mr3.dag.status.checker.period.ms 1000 Time interval in milliseconds for checking the status of every DAG

In order to recycle ContainerWorkers among Spark applications, set the configuration key spark.files.useFetchCache to false. This is necessary to make sure that intermediate data produced by a Spark application is deleted from local disks once it is finished. (Intermediate data is still deleted even if spark.files.useFetchCache is set to true, but ContainerWorkers often terminate themselves without being recycled.)

mr3-site.xml

mr3-site.xml is the configuration file for MR3. The user should adjust the following configuration keys according to the settings of the Kubernetes/Hadoop cluster. See Configuring MR3 for the description of all the configuration keys.

  • mr3.am.max.num.concurrent.dags specifies the maximum number of Spark stages that can be executed concurrently in DAGAppMaster because each Spark stage is converted to an MR3 DAG (with a single Vertex). For example, a value of 32 means that 32 Spark stages can be executed concurrently.
  • mr3.am.client.thread-count should be set to the same value as (or higher than) mr3.am.max.num.concurrent.dags.
  • mr3.container.scheduler.scheme specifies the policy for recycling ContainerWorkers among Spark applications. In order to recycle ContainerWorkers, the user should set it to fifo or fair (see Recycling ContainerWorkers).
  • mr3.am.resource.memory.mb specifies the memory size (in MB) of the DAGAppMaster Pod/container.
  • mr3.am.resource.cpu.cores specifies the number of cores to be allocated to the DAGAppMaster Pod/container.
  • (On Kubernetes only) mr3.k8s.pod.worker.hostpaths specifies the list of directories on each node in the Kubernetes cluster to which hostPath volumes point for ContainerWorkers. Internally MR3 sets the environment variable LOCAL_DIRS to the mount points of the hostPath volumes.
  • mr3.spark.delay.scheduling.interval.ms specifies the interval (in milliseconds) for checking delay scheduling when mr3.taskattempt.queue.scheme is set to opt.

The user can also adjust MR3 configuration keys by updating spark-defaults.conf. Specifically Spark on MR3 converts any configuration key in spark-defaults.conf starting with the string spark.mr3 (except those introduced by Spark on MR3 such as spark.mr3.keep.am) by stripping the prefix spark. and passes it to DAGAppMaster of MR3. For example, spark.mr3.am.client.thread-count=128 in spark-defaults.conf is converted to mr3.am.client.thread-count=128 and passed to DAGAppMaster of MR3.

spark-env.sh

spark-env.sh is the additional script executed by the Spark driver.

log4j.properties

log4j.properties contains the logging configuration.

  • On Kubernetes, log4j.properties applies to all of the Spark driver, DAGAppMaster, and ContainerWorkers.
  • On Hadoop, log4j.properties applies only to the Spark driver, while DAGAppMaster (in LocalProcess or Yarn mode) and ContainerWorkers use the default logging configuration with logging level INFO. (In order to change the logging configuration for DAGAppMaster and ContainerWorkers, the MR3 release should include log4j.properties.) The environment variable LOG_LEVEL in env.sh affects logging in DAGAppMaster and ContainerWorkers, but only for those classes that belong to the package org.apache.hadoop. Thus it is best to set LOG_LEVEL in env.sh to INFO.