After following the instruction in Installing Spark on MR3, the user can create a Kubernetes cluster for running Spark on MR3. Multiple Spark drivers can run either outside Kubernetes or in Kubernetes Pods while sharing a common DAGAppMaster Pod.

spark.mr3.k8s.mixed.mode

A running Kubernetes cluster should be available and the user should be able to execute the command kubectl. Before trying to run Spark on MR3 on Kubernetes, check if swap space is disabled in the Kubernetes cluster.

Configuring Spark on MR3 on Kubernetes

For running the Spark driver outside Kubernetes, the following configuration keys should be set. If the Spark driver runs inside Kubernetes, the user should comment out these configuration keys in mr3-site.xml so that their default values are used.

  • mr3.k8s.api.server.url specifies the URL for the Kubernetes API server (e.g., https://10.1.90.9:6443).
  • mr3.k8s.client.config.file specifies the configuration file (kubeconfig file) for creating a Kubernetes client (e.g., ~/.kube/config).
  • mr3.k8s.service.account.use.token.ca.cert.path should be set to false.
  • mr3.k8s.am.service.host and mr3.k8s.am.service.port should match the address specified in kubernetes/spark-yaml/mr3-service.yaml.

If the user plans to mix Spark drivers running inside and outside Kubernetes, it is recommended to execute the first Spark driver inside Kubernetes with the above configuration keys commented out in mr3-site.xml. This is because the first Spark driver creates a ConfigMap out of the directory kubernetes/spark/conf and all Spark drivers running inside Kubernetes reuse this ConfigMap. For a Spark driver running outside Kubernetes, mr3-site.xml on the local node can be updated without affecting existing or future Spark drivers.

Since Kubernetes Pods may use a different DNS, the user should set the configuration key mr3.k8s.host.aliases in mr3-site.xml if necessary.

  • For running the Spark driver outside Kubernetes, it should include a mapping for the node where the Spark driver is to run so that Spark executors (which run in ContainerWorker Pods) can reach the Spark driver. If the Spark driver cannot be reached, Spark executors fail after throwing an exception java.net.UnknownHostException. For example, if the Spark driver runs on a node gold0 at IP address 10.1.90.9, mr3.k8s.host.aliases can be set as follows.
    $ vi kubernetes/spark/conf/mr3-site.xml
    
    <property>
      <name>mr3.k8s.host.aliases</name>
      <value>gold0=10.1.90.9</value>
    </property>
    
  • For accessing HDFS, it should include a mapping for the HDFS NameNode.
  • In order to exploit location hints provided by HDFS when running the Spark driver inside Kubernetes, it should include mappings for HDFS DataNodes. This is because location hints provided by HDFS can be host names that are not resolved to IP addresses inside Kubernetes. (See Performance Tuning for Kubernetes for the same problem when running Hive on MR3 on Kubernetes.) If the Spark driver runs outside Kubernetes, such mappings are unnecessary as long as the Spark driver can resolve location hints provided by HDFS.

Configuring ContainerWorker Pods to create shuffle handler processes

By default, Spark on MR3 sets spark.shuffle.service.enabled to true in spark-defaults.conf along with mr3.use.daemon.shufflehandler set to 0 in mr3-site.xml. Every ContainerWorker Pod creates a separate process for running shuffle handlers which is regarded as external shuffle service by Spark on MR3. The user can find the command for executing the process in kubernetes/spark/spark/run-worker.sh.

    if [ "$runShuffleHandlerProcess" = "true" ]; then
      export SPARK_CONF_DIR=$BASE_DIR/conf
      SPARK_SHUFFLE_CLASS=org.apache.spark.deploy.ExternalShuffleService
      $JAVA -cp "$BASE_DIR/spark/jars/*" -Dscala.usejavacp=true -Xmx1g $SPARK_SHUFFLE_CLASS &

The default setting is recommended because shuffle handler processes continue to serve shuffle requests even after ContainerWorkers are recycled.

If spark.shuffle.service.enabled is set to false along with mr3.use.daemon.shufflehandler set to a number larger than zero (e.g., 1), Spark on MR3 uses built-in shuffle handlers included in Spark executors. In this case, when a ContainerWorker is recycled, a new Spark executor is created and the output produced by the previous Spark executor (inside the same ContainerWorker) is all lost.

Creating a PersistentVolume (Optional)

There are two cases in which the user should create a PersistentVolume before running the Spark driver.

  • The Spark driver running inside Kubernetes needs additional jar files and resources. (The Spark driver running outside Kubernetes can read additional jar files and resources directly from local disks.)
  • MR3 enables DAG recovery by setting the configuration key mr3.dag.recovery.enabled to true in kubernetes/spark/conf/mr3-site.xml. By default, mr3.dag.recovery.enabled is set to false and DAG recovery is disabled in MR3. As a result, if a DAGAppMaster Pod crashes, all running DAGs are lost and not recovered by a new DAGAppMaster Pod. (Normally, however, this is not a practical problem because the Spark driver automatically retries those stages corresponding to lost DAGs.)

In order to create a PersistentVolume, first update kubernetes/spark-yaml/workdir-pv.yaml and kubernetes/spark-yaml/workdir-pvc.yaml. workdir-pv.yaml creates a PersistentVolume. The sample file in the MR3 release uses an NFS volume, and the user should update it in order to use different types of PersistentVolumes. workdir-pvc.yaml creates a PersistentVolumeClaim which references the PersistentVolume created by workdir-pv.yaml. The user should specify the size of the storage:

      storage: 10Gi

Then update kubernetes/run-spark-setup.sh so as to read workdir-pv.yaml and workdir-pvc.yaml.

$ vi kubernetes/run-spark-setup.sh

if [ $RUN_AWS_EKS = true ]; then
  echo "assume that PersistentVolumeClaim workdir-pvc has been created"
else
  kubectl create -f $YAML_DIR/workdir-pv.yaml 
  kubectl create -n $SPARK_MR3_NAMESPACE -f $YAML_DIR/workdir-pvc.yaml 
fi

If the Spark driver runs inside Kubernetes, update kubernetes/spark-yaml/spark-submit.yaml to mount the PersistentVolume. Then the Spark driver can access additional jar files and resources under the mount point /opt/mr3-run/work-dir.

$ vi kubernetes/spark-yaml/spark-submit.yaml 

    - name: work-dir-volume
      mountPath: /opt/mr3-run/work-dir

  - name: work-dir-volume
    persistentVolumeClaim:
      claimName: workdir-pvc

For mounting the PersistentVolume inside a DAGAppMaster Pod (e.g., when DAG recovery is enabled), update kubernetes/spark/env.sh.

$ vi kubernetes/spark/env.sh

WORK_DIR_PERSISTENT_VOLUME_CLAIM=workdir-pvc
WORK_DIR_PERSISTENT_VOLUME_CLAIM_MOUNT_DIR=/opt/mr3-run/work-dir

Creating Kubernetes resources for Spark on MR3

Before running Spark on MR3, the user should create various Kubernetes resources by executing the script kubernetes/run-spark-setup.sh. The script reads some of the YAML files in the directory kubernetes/spark-yaml.

If the Spark driver runs outside Kubernetes, the user should update mr3-service.yaml appropriately which creates a Service for exposing MR3 DAGAppMaster. (If the Spark driver runs inside the Kubernetes cluster, updating mr3-service.yaml is unnecessary.) The user should specify a public IP address with a valid host name and a port number for DAGAppMaster so that Spark drivers can connect to it from the outside of the Kubernetes cluster.

  ports:
  - protocol: TCP
    port: 9862
    targetPort: 8080
  externalIPs:
  - 10.1.90.10

Executing the script generates two environment variables CLIENT_TO_AM_TOKEN_KEY and MR3_APPLICATION_ID_TIMESTAMP.

$ kubernetes/run-spark-setup.sh
...
export CLIENT_TO_AM_TOKEN_KEY=88995354-d73d-46d7-900b-7627c829e07a
export MR3_APPLICATION_ID_TIMESTAMP=21910
configmap/client-am-config created
service/spark-driver created
service/mr3 created

The user should set these environment variables on every node where Spark on MR3 is to be launched so that all Spark drivers can share the same DAGAppMaster Pod (and consequently the same set of ContainerWorker Pods). If the environment variables are not set, each Spark driver creates its own DAGAppMaster Pod and no ContainerWorker Pods are shared.

$ export CLIENT_TO_AM_TOKEN_KEY=88995354-d73d-46d7-900b-7627c829e07a
$ export MR3_APPLICATION_ID_TIMESTAMP=21910

Running Spark on MR3 on Kubernetes

Option 1. Running the Spark driver inside Kubernetes

To run the Spark driver inside Kubernetes, edit kubernetes/spark-yaml/spark-submit.yaml.

spec:
  containers:
  - image: 10.1.90.9:5000/spark3:latest
    command: ["/opt/mr3-run/spark/run-spark-submit.sh"]
    args: [
      "--conf spark.driver.bindAddress=0.0.0.0 --conf spark.driver.host=spark-driver.sparkmr3.svc.cluster.local --conf spark.driver.port=9850",
      "--class com.datamonad.mr3.spark.tpcds.RunManyQueries /opt/mr3-run/work-dir/lib/spark_mr3_tpcds_2.12-0.1.jar hdfs://gold0:8020/tmp/tpcds-generate/1000 /opt/mr3-run/work-dir/sparksql/ 19,42,52,55 1 false"]
    resources:
      requests:
        cpu: 6
        memory: 24Gi
      limits:
        cpu: 6
        memory: 24Gi
  • The field spec/containers/image specifies the full name of the Docker image including a tag.
  • The field spec/containers/args specifies the arguments for Spark.
  • The resources/requests and resources/limits fields specify the resources to to be allocated to a Spark driver Pod.

After editing spark-submit.yaml, the user can use it to create a Spark driver Pod.

$ kubectl create -f kubernetes/spark-yaml/spark-submit.yaml

Note that in order to share a DAGAppMaster Pod among Spark drivers, the environment variables CLIENT_TO_AM_TOKEN_KEY and MR3_APPLICATION_ID_TIMESTAMP should be set properly.

Option 2. Running the Spark driver outside Kubernetes

Before running the Spark driver outside Kubernetes, create a directory as specified by the configuration key mr3.am.staging-dir in mr3-site.xml.

$ mkdir -p /opt/mr3-run/work-dir

Spark on MR3 creates a new subdirectory (e.g., .mr3/application_16555_0000) which remains empty. Then the user can execute kubernetes/spark/spark/run-spark-submit.sh or kubernetes/spark/spark/run-spark-shell.sh, which in turn executes the script bin/spark-submit or bin/spark-shell under the directory of the Spark installation. The user may provide additional arguments for Spark.

$ kubernetes/spark/spark/run-spark-submit.sh --class com.datamonad.mr3.spark.tpcds.RunManyQueries /home/spark/mr3-run/spark/benchmarks/smb2/tpcds/target/scala-2.12/spark_mr3_tpcds_2.12-0.1.jar hdfs://gold0:8020/tmp/tpcds-generate/1000 /home/spark/mr3-run/hive/benchmarks/sparksql/ 19,42,52,55 1 false
$ kubernetes/spark/spark/run-spark-shell.sh

The first Spark driver creates a DAGAppMaster Pod. If the environment variables CLIENT_TO_AM_TOKEN_KEY and MR3_APPLICATION_ID_TIMESTAMP are properly set, all subsequent Spark drivers connect to the existing DAGAppMaster Pod without creating new ones.

Terminating Spark on MR3

Terminating a Spark driver does not delete the DAGAppMaster Pod. Instead all the Kubernetes resources remain intact and continue to serve subsequent Spark drivers. In order to delete the Kubernetes resources, the user should manually execute kubectl.

$ kubectl -n sparkmr3 delete deployments --all; kubectl -n sparkmr3 delete pod --all; kubectl -n sparkmr3 delete configmap --all; kubectl -n sparkmr3 delete svc --all; kubectl -n sparkmr3 delete secret --all; kubectl -n sparkmr3 delete serviceaccount spark-service-account; kubectl -n sparkmr3 delete role --all; kubectl -n sparkmr3 delete rolebinding --all; kubectl delete clusterrole node-reader; kubectl delete clusterrolebinding spark-clusterrole-binding; kubectl -n sparkmr3 delete persistentvolumeclaims workdir-pvc; kubectl delete persistentvolumes workdir-pv