After following the instruction in Install Spark on MR3, the user can configure Spark on MR3.
On Kubernetes, the user can configure Spark on MR3 by updating the files in the directory kubernetes/spark/conf
.
$ ls kubernetes/spark/conf
core-site.xml jgss.conf log4j.properties spark-defaults.conf
hive-site.xml krb5.conf mr3-site.xml spark-env.sh
On Hadoop,
the user can configure Spark on MR3 by updating the files in the configuration directories such as conf/cluster/mr3
and conf/cluster/spark
.
$ ls conf
cluster local tpcds
$ ls conf/cluster/mr3
mr3-site.xml
$ ls conf/cluster/spark
hive-site.xml log4j.properties spark-defaults.conf spark-env.sh
spark-defaults.conf
spark-defaults.conf
contains default settings for Spark.
Since Spark on MR3 does not modify vanilla Spark, it interprets all the configuration keys exactly in the same way.
Spark on MR3 reads the following configuration keys to adjust the settings for MR3.
- The sum of values specified by
spark.executor.memory
andspark.executor.memoryOverhead
becomes the memory size of ContainerWorker Pods/containers. For example,spark.executor.memory=73728m
andspark.executor.memoryOverhead=8192m
creates ContainerWorker Pods/containers each with 80GB of memory. - As in vanilla Spark, the value for
spark.executor.memory
becomes the heap size of Java VM for ContainerWorkers (corresponding to-Xmx
option). Internally Spark on MR3 derives the value formr3.container.max.java.heap.fraction
asspark.executor.memory
/ (spark.executor.memory
+spark.executor.memoryOverhead
). - For the maximum number of concurrent tasks in a ContainerWorker,
Spark on MR3 uses
spark.executor.cores / spark.task.cpus
. For example, settingspark.executor.cores=20
andspark.task.cpus=2
means that 10 Spark tasks can be executed concurrently in each ContainerWorker. spark.local.dir
specifies the list of local directories to be used by the Spark Driver (on both Yarn and Kubernetes) and external shuffle service (on Kubernetes only).
Note that we do not set spark.driver.cores
and spark.driver.memory
for the resources of the Spark driver in spark-defaults.conf
because we may run multiple Spark drivers with different resource requirements.
Spark on MR3 also introduces a few configuration keys of its own.
Name | Default value | Description |
---|---|---|
spark.mr3.appid | Application ID of an existing DAGAppMaster to which the Spark driver connects. Use only on Hadoop and do not use on Kubernetes. | |
spark.mr3.keep.am | true | (On Hadoop only) true: do not kill DAGAppMaster when the Spark driver terminates. false: kill DAGAppMaster when the Spark driver terminates. |
spark.mr3.client.connect.timeout.ms | 30000 | Time in milliseconds for trying to connect to DAGAppMaster |
spark.mr3.dag.status.checker.period.ms | 1000 | Time interval in milliseconds for checking the status of every DAG |
In order to recycle ContainerWorkers among Spark applications, set the configuration key spark.files.useFetchCache
to false.
This is necessary to make sure that intermediate data produced by a Spark application is deleted from local disks once it is finished.
(Intermediate data is still deleted even if spark.files.useFetchCache
is set to true,
but ContainerWorkers often terminate themselves without being recycled.)
mr3-site.xml
mr3-site.xml
is the configuration file for MR3.
The user should adjust the following configuration keys according to the settings of the Kubernetes/Hadoop cluster.
See Configuring MR3 for the description of all the configuration keys.
mr3.am.max.num.concurrent.dags
specifies the maximum number of Spark stages that can be executed concurrently in DAGAppMaster because each Spark stage is converted to an MR3 DAG (with a single Vertex). For example, a value of 32 means that 32 Spark stages can be executed concurrently.mr3.am.client.thread-count
should be set to the same value as (or higher than)mr3.am.max.num.concurrent.dags
.mr3.container.scheduler.scheme
specifies the policy for recycling ContainerWorkers among Spark applications. In order to recycle ContainerWorkers, the user should set it tofifo
orfair
(see Recycling ContainerWorkers).mr3.am.resource.memory.mb
specifies the memory size (in MB) of the DAGAppMaster Pod/container.mr3.am.resource.cpu.cores
specifies the number of cores to be allocated to the DAGAppMaster Pod/container.- (On Kubernetes only)
mr3.k8s.pod.worker.hostpaths
specifies the list of directories on each node in the Kubernetes cluster to which hostPath volumes point for ContainerWorkers. Internally MR3 sets the environment variableLOCAL_DIRS
to the mount points of the hostPath volumes. mr3.spark.delay.scheduling.interval.ms
specifies the interval (in milliseconds) for checking delay scheduling whenmr3.taskattempt.queue.scheme
is set toopt
.
The user can also adjust MR3 configuration keys by updating spark-defaults.conf
.
Specifically Spark on MR3 converts any configuration key in spark-defaults.conf
starting with the string spark.mr3
(except those introduced by Spark on MR3 such as spark.mr3.keep.am
)
by stripping the prefix spark.
and passes it to DAGAppMaster of MR3.
For example, spark.mr3.am.client.thread-count=128
in spark-defaults.conf
is converted to mr3.am.client.thread-count=128
and passed to DAGAppMaster of MR3.
spark-env.sh
spark-env.sh
is the additional script executed by the Spark driver.
log4j.properties
log4j.properties
contains the logging configuration.
- On Kubernetes,
log4j.properties
applies to all of the Spark driver, DAGAppMaster, and ContainerWorkers. - On Hadoop,
log4j.properties
applies only to the Spark driver, while DAGAppMaster (in LocalProcess or Yarn mode) and ContainerWorkers use the default logging configuration with logging levelINFO
. (In order to change the logging configuration for DAGAppMaster and ContainerWorkers, the MR3 release should includelog4j.properties
.) The environment variableLOG_LEVEL
inenv.sh
affects logging in DAGAppMaster and ContainerWorkers, but only for those classes that belong to the packageorg.apache.hadoop
. Thus it is best to setLOG_LEVEL
inenv.sh
toINFO
.