After following the instruction in Configuring Spark on MR3, the user can create a Kubernetes cluster for running Spark on MR3. Multiple Spark drivers can run either outside Kubernetes or in Kubernetes Pods while sharing a common DAGAppMaster Pod.

spark.mr3.k8s.mixed.mode

A running Kubernetes cluster should be available and the user should be able to execute the command kubectl. Before trying to run Spark on MR3 on Kubernetes, check if swap space is disabled in the Kubernetes cluster.

Configuring the Kubernetes client

For running a Spark driver outside Kubernetes, the following configuration keys should be set. For running a Spark driver inside Kubernetes, the user should comment out these configuration keys in mr3-site.xml so that their default values are used.

  • mr3.k8s.api.server.url specifies the URL for the Kubernetes API server (e.g., https://10.1.90.9:6443).
  • mr3.k8s.client.config.file specifies the configuration file (kubeconfig file) for creating a Kubernetes client (e.g., ~/.kube/config).
  • mr3.k8s.service.account.use.token.ca.cert.path should be set to false.
  • mr3.k8s.am.service.host and mr3.k8s.am.service.port should match the address specified in kubernetes/spark-yaml/mr3-service.yaml.

If the user plans to mix Spark drivers running inside and outside Kubernetes, it is recommended to execute the first Spark driver inside Kubernetes with the above configuration keys commented out in mr3-site.xml. This is because the first Spark driver creates a ConfigMap out of the directory kubernetes/spark/conf and all Spark drivers running inside Kubernetes reuse this ConfigMap. For a Spark driver running outside Kubernetes, mr3-site.xml on the local node can be updated without affecting existing or future Spark drivers.

Setting host aliases

Since Kubernetes Pods may use a different DNS, the user should set the configuration key mr3.k8s.host.aliases in mr3-site.xml if necessary.

  • For running a Spark driver outside Kubernetes, it should include a mapping for the node where the Spark driver is to run so that Spark executors (which run in ContainerWorker Pods) can reach the Spark driver. If the Spark driver cannot be reached, Spark executors fail after throwing an exception java.net.UnknownHostException. For example, if the Spark driver runs on a node gold0 at IP address 10.1.90.9, mr3.k8s.host.aliases can be set as follows.
    $ vi kubernetes/spark/conf/mr3-site.xml
    
    <property>
      <name>mr3.k8s.host.aliases</name>
      <value>gold0=10.1.90.9</value>
    </property>
    
  • For accessing HDFS, it should include a mapping for the HDFS NameNode.
  • In order to exploit location hints provided by HDFS when running a Spark driver inside Kubernetes, it should include mappings for HDFS DataNodes. This is because location hints provided by HDFS can be host names that are not resolved to IP addresses inside Kubernetes. (See Performance Tuning for Kubernetes for the same problem when running Hive on MR3 on Kubernetes.) If a Spark driver runs outside Kubernetes, such mappings are unnecessary as long as the Spark driver can resolve location hints provided by HDFS.

Configuring ContainerWorker Pods to create shuffle handler processes

By default, Spark on MR3 sets spark.shuffle.service.enabled to true in spark-defaults.conf along with mr3.use.daemon.shufflehandler set to 0 in mr3-site.xml. Every ContainerWorker Pod creates a separate process for running shuffle handlers which is regarded as external shuffle service by Spark on MR3. The user can find the command for executing the process in kubernetes/spark/spark/run-worker.sh.

    if [ "$runShuffleHandlerProcess" = "true" ]; then
      export SPARK_CONF_DIR=$BASE_DIR/conf
      SPARK_SHUFFLE_CLASS=org.apache.spark.deploy.ExternalShuffleService
      $JAVA -cp "$BASE_DIR/spark/jars/*" -Dspark.local.dir=/tmp -Dscala.usejavacp=true -Xmx1g $SPARK_SHUFFLE_CLASS &

The default setting is recommended because shuffle handler processes continue to serve shuffle requests even after ContainerWorkers are recycled.

If spark.shuffle.service.enabled is set to false along with mr3.use.daemon.shufflehandler set to a number larger than zero (e.g., 1), Spark on MR3 uses built-in shuffle handlers included in Spark executors. In this case, when a ContainerWorker is recycled, a new Spark executor is created and the output produced by the previous Spark executor (inside the same ContainerWorker) is all lost.

Creating a PersistentVolume (Optional)

There is a case in which the user should create a PersistentVolume before running a Spark driver.

  • MR3 enables DAG recovery by setting the configuration key mr3.dag.recovery.enabled to true in kubernetes/spark/conf/mr3-site.xml. By default, mr3.dag.recovery.enabled is set to false and DAG recovery is disabled in MR3. As a result, if a DAGAppMaster Pod crashes, all running DAGs are lost and not recovered by a new DAGAppMaster Pod. Normally, however, this is not a practical problem because the Spark driver automatically retries those stages corresponding to lost DAGs.

In order to create a PersistentVolume, first update kubernetes/spark-yaml/workdir-pv.yaml and kubernetes/spark-yaml/workdir-pvc.yaml. workdir-pv.yaml creates a PersistentVolume. The sample file in the MR3 release uses an NFS volume, and the user should update it in order to use different types of PersistentVolumes. workdir-pvc.yaml creates a PersistentVolumeClaim which references the PersistentVolume created by workdir-pv.yaml. The user should specify the size of the storage:

      storage: 10Gi

Then set CREATE_PERSISTENT_VOLUME to true in kubernetes/spark/env.sh so as to read workdir-pv.yaml and workdir-pvc.yaml.

$ vi kubernetes/spark/env.sh

CREATE_PERSISTENT_VOLUME=true

If the Spark driver runs inside Kubernetes, update kubernetes/spark-yaml/spark-run.yaml to mount the PersistentVolume. Then the Spark driver can access resources under the mount point /opt/mr3-run/work-dir.

$ vi kubernetes/spark-yaml/spark-run.yaml 

    - name: work-dir-volume
      mountPath: /opt/mr3-run/work-dir

  - name: work-dir-volume
    persistentVolumeClaim:
      claimName: workdir-pvc

For mounting the PersistentVolume inside a DAGAppMaster Pod (e.g., when DAG recovery is enabled), update kubernetes/spark/env.sh.

$ vi kubernetes/spark/env.sh

WORK_DIR_PERSISTENT_VOLUME_CLAIM=workdir-pvc
WORK_DIR_PERSISTENT_VOLUME_CLAIM_MOUNT_DIR=/opt/mr3-run/work-dir

Creating PersistentVolumes is also necessary if the user wants to keep the log files of Spark drivers running inside Kubernetes. By default, Spark drivers running inside Kubernetes store their log files under the following directories:

  • /opt/mr3-run/spark/spark-shell-result
  • /opt/mr3-run/spark/spark-submit-result

Since these directories are container-local directories, the log files are not accessible after Spark driver Pods terminate. In order to keep the log files, the user can mount PersistentVolumes under these directories.

Creating Kubernetes resources

Before running Spark on MR3, the user should create various Kubernetes resources by executing the script kubernetes/run-spark-setup.sh. The script reads some of the YAML files in the directory kubernetes/spark-yaml.

For running a Spark driver outside Kubernetes, the user should update mr3-service.yaml appropriately which creates a Service for exposing MR3 DAGAppMaster. (If the Spark driver runs inside the Kubernetes cluster, updating mr3-service.yaml is unnecessary.) The user should specify a public IP address with a valid host name and a port number for DAGAppMaster so that Spark drivers can connect to it from the outside of the Kubernetes cluster.

$ vi kubernetes/spark-yaml/mr3-service.yaml

  ports:
  - protocol: TCP
    port: 9862
    targetPort: 8080
  externalIPs:
  - 10.1.90.10

Executing the script generates two environment variables CLIENT_TO_AM_TOKEN_KEY and MR3_APPLICATION_ID_TIMESTAMP.

$ kubernetes/run-spark-setup.sh
...
export CLIENT_TO_AM_TOKEN_KEY=e07c35ca-1a2c-4d97-9037-9fb84de5f7c4
export MR3_APPLICATION_ID_TIMESTAMP=29526
configmap/client-am-config created
service/mr3 created

The user should set these environment variables on every node where Spark on MR3 is to be launched so that all Spark drivers can share the same DAGAppMaster Pod (and consequently the same set of ContainerWorker Pods). If the environment variables are not set, each Spark driver creates its own DAGAppMaster Pod and no ContainerWorker Pods are shared.

$ export CLIENT_TO_AM_TOKEN_KEY=e07c35ca-1a2c-4d97-9037-9fb84de5f7c4
$ export MR3_APPLICATION_ID_TIMESTAMP=29526

Running Spark on MR3

Option 1. Running a Spark driver inside Kubernetes

To run a Spark driver inside Kubernetes, edit kubernetes/spark-yaml/driver-service.yaml and kubernetes/spark-yaml/spark-run.yaml.

$ vi kubernetes/spark-yaml/driver-service.yaml

metadata:
  name: spark1
spec:
  selector:
    sparkmr3_app: spark1
  • The field metadata/name specifies the name of the Spark driver Pod.
  • The field spec/selector should use the name of the Spark driver Pod.
$ vi kubernetes/spark-yaml/spark-run.yaml

metadata:
  name: spark1
  labels:
    sparkmr3_app: spark1
spec:
  hostAliases:
  - ip: "10.1.90.9"
    hostnames:
    - "gold0"
  containers:
  - image: 10.1.90.9:5000/spark3:latest
    env:
    - name: DRIVER_NAME
      value: "spark1"
    - name: SPARK_DRIVER_CORES
      value: '2'
    - name: SPARK_DRIVER_MEMORY_MB
      value: '13107'
    resources:
      requests:
        cpu: 2
        memory: 16Gi
      limits:
        cpu: 2
        memory: 16Gi
    volumeMounts:
    - name: spark-hostpath-1
      mountPath: /data1/k8s
  volumes:
  - name: spark-hostpath-1
    hostPath:
      path: /data1/k8s
      type: Directory

  • The field metadata/name specifies the name of the Spark driver Pod.
  • The field metadata/labels should use the name of the Spark driver Pod.
  • The field spec/template/spec/hostAliases specifies mappings for host names.
  • The field spec/containers/image specifies the full name of the Docker image including a tag.
  • The environment variable DRIVER_NAME should be set to the name of the Spark driver Pod.
  • The environment variables SPARK_DRIVER_CORES and SPARK_DRIVER_MEMORY_MB specify the arguments for the configuration keys spark.driver.cores and spark.driver.memory (in MB) for the Spark driver, respectively.
  • The spec/containers/resources/requests and spec/containers/resources/limits fields specify the resources to to be allocated to the Spark driver Pod. As a particular case, if DAGAppMaster runs in LocalProcess mode (see the case 2 in Running MR3Client inside/outside Kubernetes), the resource for the Spark driver Pod should be large enough to accommodate both the Spark driver and DAGAppMaster.
  • The fields spec/containers/volumeMounts and spec/volumes specify the list of directories to which hostPath volumes point for the Spark driver. The list of directories should match the value for spark.local.dir in spark-defaults.conf.

Then the user can use it to create a Spark driver Pod as follows:

$ kubectl create -f kubernetes/spark-yaml/driver-service.yaml
$ kubectl create -f kubernetes/spark-yaml/spark-run.yaml

The environment variables CLIENT_TO_AM_TOKEN_KEY and MR3_APPLICATION_ID_TIMESTAMP are set automatically so that a common DAGAppMaster can be shared by Spark drivers.

After creating a Spark driver Pod, the user can execute run-spark-shell.sh or run-spark-submit.sh inside the Pod to start a Spark driver. The scripts accept the following options.

  • --driver-java-options, --jars, --driver-class-path, --conf: These options with arguments are passed on to bin/spark-submit and bin/spark-shell.

For example, the user can execute Spark shell as follows:

$ kubectl exec -it -n sparkmr3 spark1 /bin/bash

spark@spark1:/opt/mr3-run/spark$ ./run-spark-shell.sh

Option 2. Running a Spark driver outside Kubernetes

Before running a Spark driver outside Kubernetes, create a directory as specified by the configuration key mr3.am.staging-dir in mr3-site.xml.

$ mkdir -p /opt/mr3-run/work-dir

Spark on MR3 creates a new subdirectory (e.g., .mr3/application_16555_0000) which remains empty.

Then set environment variables SPARK_DRIVER_CORES and SPARK_DRIVER_MEMORY_MB to specify the arguments for the configuration keys spark.driver.cores and spark.driver.memory (in MB) for the Spark driver, respectively, e.g.:

$ export SPARK_DRIVER_CORES=2
$ export SPARK_DRIVER_MEMORY_MB=13107

Finally the user can execute kubernetes/spark/spark/run-spark-submit.sh or kubernetes/spark/spark/run-spark-shell.sh, which in turn executes the script bin/spark-submit or bin/spark-shell under the directory of the Spark installation. The scripts accept the following options.

  • --driver-java-options, --jars, --driver-class-path, --conf: These options with arguments are passed on to bin/spark-submit and bin/spark-shell.

The user should use a --conf spark.driver.host option to specify the host name or address where the Spark driver runs. In order to run multiple Spark drivers on the same node, the user should also specify a unique port for each individual driver with a --conf spark.driver.port option. The user may provide additional options for Spark, e.g:

$ kubernetes/spark/spark/run-spark-submit.sh --conf spark.driver.host=gold0 --class com.datamonad.mr3.spark.tpcds.RunManyQueries /home/spark/mr3-run/spark/benchmarks/smb2/tpcds/target/scala-2.12/spark_mr3_tpcds_2.12-0.1.jar hdfs://gold0:8020/tmp/tpcds-generate/1000 /home/spark/mr3-run/hive/benchmarks/sparksql/ 19,42,52,55 1 false
$ kubernetes/spark/spark/run-spark-shell.sh --conf spark.driver.host=gold0

The first Spark driver creates a DAGAppMaster Pod. If the environment variables CLIENT_TO_AM_TOKEN_KEY and MR3_APPLICATION_ID_TIMESTAMP are properly set, all subsequent Spark drivers connect to the existing DAGAppMaster Pod without creating new ones.

Terminating Spark on MR3

Terminating a Spark driver does not delete the DAGAppMaster Pod. Instead all the Kubernetes resources remain intact and continue to serve subsequent Spark drivers. In order to delete the Kubernetes resources, the user should manually execute kubectl.

$ kubectl -n sparkmr3 delete deployments --all; kubectl -n sparkmr3 delete pod --all; kubectl -n sparkmr3 delete configmap --all; kubectl -n sparkmr3 delete svc --all; kubectl -n sparkmr3 delete secret --all; kubectl -n sparkmr3 delete serviceaccount spark-service-account; kubectl -n sparkmr3 delete role --all; kubectl -n sparkmr3 delete rolebinding --all; kubectl delete clusterrole node-reader; kubectl delete clusterrolebinding spark-clusterrole-binding; kubectl -n sparkmr3 delete persistentvolumeclaims workdir-pvc; kubectl delete persistentvolumes workdir-pv