After following the instruction in Installing Spark on MR3, the user can create a Kubernetes cluster for running Spark on MR3. Multiple Spark drivers can run either outside Kubernetes or in Kubernetes Pods while sharing a common DAGAppMaster Pod.
A running Kubernetes cluster should be available and the user should be able to execute the command
Before trying to run Spark on MR3 on Kubernetes, check if swap space is disabled in the Kubernetes cluster.
Configuring Spark on MR3 on Kubernetes
For running the Spark driver outside Kubernetes, the following configuration keys should be set.
If the Spark driver runs inside Kubernetes, the user should comment out these configuration keys in
so that their default values are used.
mr3.k8s.api.server.urlspecifies the URL for the Kubernetes API server (e.g.,
mr3.k8s.client.config.filespecifies the configuration file (kubeconfig file) for creating a Kubernetes client (e.g.,
mr3.k8s.service.account.use.token.ca.cert.pathshould be set to false.
mr3.k8s.am.service.portshould match the address specified in
If the user plans to mix Spark drivers running inside and outside Kubernetes,
it is recommended to execute the first Spark driver inside Kubernetes with the above configuration keys commented out in
This is because the first Spark driver creates a ConfigMap out of the directory
and all Spark drivers running inside Kubernetes reuse this ConfigMap.
For a Spark driver running outside Kubernetes,
mr3-site.xml on the local node can be updated without affecting existing or future Spark drivers.
Since Kubernetes Pods may use a different DNS,
the user should set the configuration key
mr3-site.xml if necessary.
- For running the Spark driver outside Kubernetes,
it should include a mapping for the node where the Spark driver is to run
so that Spark executors (which run in ContainerWorker Pods) can reach the Spark driver.
If the Spark driver cannot be reached, Spark executors fail after throwing an exception
java.net.UnknownHostException. For example, if the Spark driver runs on a node
gold0at IP address 10.1.90.9,
mr3.k8s.host.aliasescan be set as follows.
$ vi kubernetes/spark/conf/mr3-site.xml <property> <name>mr3.k8s.host.aliases</name> <value>gold0=10.1.90.9</value> </property>
- For accessing HDFS, it should include a mapping for the HDFS NameNode.
- In order to exploit location hints provided by HDFS when running the Spark driver inside Kubernetes, it should include mappings for HDFS DataNodes. This is because location hints provided by HDFS can be host names that are not resolved to IP addresses inside Kubernetes. (See Performance Tuning for Kubernetes for the same problem when running Hive on MR3 on Kubernetes.) If the Spark driver runs outside Kubernetes, such mappings are unnecessary as long as the Spark driver can resolve location hints provided by HDFS.
Configuring ContainerWorker Pods to create shuffle handler processes
By default, Spark on MR3 sets
spark.shuffle.service.enabled to true in
mr3.use.daemon.shufflehandler set to 0 in
Every ContainerWorker Pod creates a separate process for running shuffle handlers
which is regarded as external shuffle service by Spark on MR3.
The user can find the command for executing the process in
if [ "$runShuffleHandlerProcess" = "true" ]; then export SPARK_CONF_DIR=$BASE_DIR/conf SPARK_SHUFFLE_CLASS=org.apache.spark.deploy.ExternalShuffleService $JAVA -cp "$BASE_DIR/spark/jars/*" -Dscala.usejavacp=true -Xmx1g $SPARK_SHUFFLE_CLASS &
The default setting is recommended because shuffle handler processes continue to serve shuffle requests even after ContainerWorkers are recycled.
spark.shuffle.service.enabled is set to false
mr3.use.daemon.shufflehandler set to a number larger than zero (e.g., 1),
Spark on MR3 uses built-in shuffle handlers included in Spark executors.
In this case,
when a ContainerWorker is recycled, a new Spark executor is created
and the output produced by the previous Spark executor (inside the same ContainerWorker) is all lost.
Creating a PersistentVolume (Optional)
There are two cases in which the user should create a PersistentVolume before running the Spark driver.
- The Spark driver running inside Kubernetes needs additional jar files and resources. (The Spark driver running outside Kubernetes can read additional jar files and resources directly from local disks.)
- MR3 enables DAG recovery by setting the configuration key
mr3.dag.recovery.enabledto true in
kubernetes/spark/conf/mr3-site.xml. By default,
mr3.dag.recovery.enabledis set to false and DAG recovery is disabled in MR3. As a result, if a DAGAppMaster Pod crashes, all running DAGs are lost and not recovered by a new DAGAppMaster Pod. (Normally, however, this is not a practical problem because the Spark driver automatically retries those stages corresponding to lost DAGs.)
In order to create a PersistentVolume,
workdir-pv.yaml creates a PersistentVolume.
The sample file in the MR3 release uses an NFS volume, and the user should update it in order to use different types of PersistentVolumes.
workdir-pvc.yaml creates a PersistentVolumeClaim which references the PersistentVolume created by
The user should specify the size of the storage:
kubernetes/run-spark-setup.sh so as to read
$ vi kubernetes/run-spark-setup.sh if [ $RUN_AWS_EKS = true ]; then echo "assume that PersistentVolumeClaim workdir-pvc has been created" else kubectl create -f $YAML_DIR/workdir-pv.yaml kubectl create -n $SPARK_MR3_NAMESPACE -f $YAML_DIR/workdir-pvc.yaml fi
If the Spark driver runs inside Kubernetes, update
kubernetes/spark-yaml/spark-submit.yaml to mount the PersistentVolume.
Then the Spark driver can access additional jar files and resources under the mount point
$ vi kubernetes/spark-yaml/spark-submit.yaml - name: work-dir-volume mountPath: /opt/mr3-run/work-dir - name: work-dir-volume persistentVolumeClaim: claimName: workdir-pvc
For mounting the PersistentVolume inside a DAGAppMaster Pod (e.g., when DAG recovery is enabled), update
$ vi kubernetes/spark/env.sh WORK_DIR_PERSISTENT_VOLUME_CLAIM=workdir-pvc WORK_DIR_PERSISTENT_VOLUME_CLAIM_MOUNT_DIR=/opt/mr3-run/work-dir
Creating Kubernetes resources for Spark on MR3
Before running Spark on MR3, the user should create various Kubernetes resources by executing the script
The script reads some of the YAML files in the directory
If the Spark driver runs outside Kubernetes,
the user should update
mr3-service.yaml appropriately which creates a Service for exposing MR3 DAGAppMaster.
(If the Spark driver runs inside the Kubernetes cluster, updating
mr3-service.yaml is unnecessary.)
The user should specify a public IP address with a valid host name and a port number for DAGAppMaster
so that Spark drivers can connect to it from the outside of the Kubernetes cluster.
ports: - protocol: TCP port: 9862 targetPort: 8080 externalIPs: - 10.1.90.10
Executing the script generates two environment variables
$ kubernetes/run-spark-setup.sh ... export CLIENT_TO_AM_TOKEN_KEY=88995354-d73d-46d7-900b-7627c829e07a export MR3_APPLICATION_ID_TIMESTAMP=21910 configmap/client-am-config created service/spark-driver created service/mr3 created
The user should set these environment variables on every node where Spark on MR3 is to be launched so that all Spark drivers can share the same DAGAppMaster Pod (and consequently the same set of ContainerWorker Pods). If the environment variables are not set, each Spark driver creates its own DAGAppMaster Pod and no ContainerWorker Pods are shared.
$ export CLIENT_TO_AM_TOKEN_KEY=88995354-d73d-46d7-900b-7627c829e07a $ export MR3_APPLICATION_ID_TIMESTAMP=21910
Running Spark on MR3 on Kubernetes
Option 1. Running the Spark driver inside Kubernetes
To run the Spark driver inside Kubernetes,
spec: containers: - image: 10.1.90.9:5000/spark3:latest command: ["/opt/mr3-run/spark/run-spark-submit.sh"] args: [ "--conf spark.driver.bindAddress=0.0.0.0 --conf spark.driver.host=spark-driver.sparkmr3.svc.cluster.local --conf spark.driver.port=9850", "--class com.datamonad.mr3.spark.tpcds.RunManyQueries /opt/mr3-run/work-dir/lib/spark_mr3_tpcds_2.12-0.1.jar hdfs://gold0:8020/tmp/tpcds-generate/1000 /opt/mr3-run/work-dir/sparksql/ 19,42,52,55 1 false"] resources: requests: cpu: 6 memory: 24Gi limits: cpu: 6 memory: 24Gi
- The field
spec/containers/imagespecifies the full name of the Docker image including a tag.
- The field
spec/containers/argsspecifies the arguments for Spark.
resources/limitsfields specify the resources to to be allocated to a Spark driver Pod.
spark-submit.yaml, the user can use it to create a Spark driver Pod.
$ kubectl create -f kubernetes/spark-yaml/spark-submit.yaml
Note that in order to share a DAGAppMaster Pod among Spark drivers,
the environment variables
MR3_APPLICATION_ID_TIMESTAMP should be set properly.
Option 2. Running the Spark driver outside Kubernetes
Before running the Spark driver outside Kubernetes,
create a directory as specified by the configuration key
$ mkdir -p /opt/mr3-run/work-dir
Spark on MR3 creates a new subdirectory (e.g.,
.mr3/application_16555_0000) which remains empty.
Then the user can execute
which in turn executes the script
bin/spark-shell under the directory of the Spark installation.
The user may provide additional arguments for Spark.
$ kubernetes/spark/spark/run-spark-submit.sh --class com.datamonad.mr3.spark.tpcds.RunManyQueries /home/spark/mr3-run/spark/benchmarks/smb2/tpcds/target/scala-2.12/spark_mr3_tpcds_2.12-0.1.jar hdfs://gold0:8020/tmp/tpcds-generate/1000 /home/spark/mr3-run/hive/benchmarks/sparksql/ 19,42,52,55 1 false $ kubernetes/spark/spark/run-spark-shell.sh
The first Spark driver creates a DAGAppMaster Pod.
If the environment variables
MR3_APPLICATION_ID_TIMESTAMP are properly set,
all subsequent Spark drivers connect to the existing DAGAppMaster Pod without creating new ones.
Terminating Spark on MR3
Terminating a Spark driver does not delete the DAGAppMaster Pod.
Instead all the Kubernetes resources remain intact and continue to serve subsequent Spark drivers.
In order to delete the Kubernetes resources, the user should manually execute
$ kubectl -n sparkmr3 delete replicationcontrollers --all; kubectl -n sparkmr3 delete pod --all; kubectl -n sparkmr3 delete configmap --all; kubectl -n sparkmr3 delete svc --all; kubectl -n sparkmr3 delete secret --all; kubectl -n sparkmr3 delete serviceaccount spark-service-account; kubectl -n sparkmr3 delete role --all; kubectl -n sparkmr3 delete rolebinding --all; kubectl delete clusterrole node-reader; kubectl delete clusterrolebinding spark-clusterrole-binding; kubectl -n sparkmr3 delete persistentvolumeclaims workdir-pvc; kubectl delete persistentvolumes workdir-pv