This page shows how to operate Spark on MR3 on Kubernetes with multiple nodes. By following the instruction, the user will learn:

  1. how to run Spark on MR3
  2. how to execute Spark-shell inside Kubernetes
  3. how to submit Spark jobs inside Kubernetes

This scenario has the following prerequisites:

  1. A running Kubernetes cluster is available.
  2. Every node has an identical set of local directories for storing intermediate data (to be mapped to hostPath volumes). These directories should be writable to the user with UID 1000 because all containers run as non-root user with UID 1000.

We use a pre-built Docker image mr3project/spark3:3.2.2 available at DockerHub, so the user does not have to build a new Docker image.

This scenario should take less than 30 minutes to complete, not including the time for downloading a pre-built Docker image.

For asking any questions, please visit MR3 Google Group or join MR3 Slack.

Installation

Download an MR3 release containing the executable scripts.

$ git clone https://github.com/mr3project/mr3-run-k8s.git
$ cd mr3-run-k8s/kubernetes/

spark-defaults.conf

spark/conf/spark-defaults.conf contains default settings for Spark. Adjust the following settings as necessary.

$ vi spark/conf/spark-defaults.conf

spark.executor.cores=16
spark.executor.memory=32768m
spark.executor.memoryOverhead=8192m
spark.task.cpus=2

spark.local.dir=/data1/k8s
  • spark.executor.cores specifies the CPU resources of ContainerWorker Pods (in which Spark executors run).
  • The sum of values specified by spark.executor.memory and spark.executor.memoryOverhead becomes the memory size of ContainerWorker Pods.
  • spark.task.cpus specifies the number of cores to be allocated to each individual Spark task. In our example, a single Spark executor can run 16 / 2 = 8 concurrent Spark tasks.
  • spark.local.dir should be set to the list of local directories available on every node. These local directories are mapped to hostPath volumes and used by block managers of Spark drivers.

mr3-site.xml

spark/conf/mr3-site.xml is the configuration file for MR3. Adjust the following settings as necessary.

$ vi spark/conf/mr3-site.xml

<property>
  <name>mr3.container.scheduler.scheme</name>
  <value>fair</value>
</property>

<property>
  <name>mr3.am.resource.memory.mb</name>
  <value>16384</value>
</property>

<property>
  <name>mr3.am.resource.cpu.cores</name>
  <value>4</value>
</property>

<property>
  <name>mr3.k8s.pod.worker.hostpaths</name>
  <value>/data1/k8s</value>
</property>
  • mr3.container.scheduler.scheme specifies the policy for recycling ContainerWorkers among Spark applications, and can be set to fifo or fair. For details, see Recycling ContainerWorkers.
  • mr3.am.resource.memory.mb specifies the memory size (in MB) of the DAGAppMaster Pod.
  • mr3.am.resource.cpu.cores specifies the number of cores to be allocated to the DAGAppMaster Pod.
  • mr3.k8s.pod.worker.hostpaths should be set to the list of local directories available on every node. These local directories are mapped to hostPath volumes and used by block managers of Spark executors.

Setting host aliases (optional)

To use host names (instead of IP addresses) when configuring Spark on MR3, the user should update:

  1. configuration key mr3.k8s.host.aliases in spark/conf/mr3-site.xml
  2. spec/template/spec/hostAliases field in spark-yaml/spark-run.yaml

For example, to use a host name called orange0 with IP address 192.168.10.100, update as follows:

$ vi spark/conf/mr3-site.xml

<property>
  <name>mr3.k8s.host.aliases</name>
  <value>orange0=192.168.10.100</value>
</property>
$ vi spark-yaml/spark-run.yaml

spec:
  hostAliases:
  - ip: "192.168.10.100"
    hostnames:
    - "orange0"

For accessing HDFS, host aliases should include a mapping for the HDFS NameNode.

Configuring S3 (optional)

For accessing S3-compatible storage, additional configuration keys should be set in spark/conf/core-site.xml. Open spark/conf/core-site.xml and set configuration keys for S3. The configuration key fs.s3a.endpoint should be set to point to the storage server.

$ vi spark/conf/core-site.xml

<property>
  <name>fs.s3a.aws.credentials.provider</name>
  <value>com.amazonaws.auth.EnvironmentVariableCredentialsProvider</value>
</property>

<property>
  <name>fs.s3a.endpoint</name>
  <value>http://192.168.10.100:9000</value>
</property>

<property>
  <name>fs.s3a.path.style.access</name>
  <value>true</value>
</property>

The user may need to change the parameters for accessing S3 to avoid SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool. For more details, see Troubleshooting.

When using the class EnvironmentVariableCredentialsProvider to read AWS credentials, add two environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY in spark/env.sh.

$ vi spark/env.sh

export AWS_ACCESS_KEY_ID=_your_aws_access_key_id_
export AWS_SECRET_ACCESS_KEY=_your_aws_secret_secret_key_

Note that AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are already appended to the values of the configuration keys mr3.am.launch.env and mr3.container.launch.env in spark/conf/mr3-site.xml. For the security purpose, the user should NOT write AWS access key ID and secret access key in conf/mr3-site.xml.

$ vi spark/conf/mr3-site.xml

<property>
  <name>mr3.am.launch.env</name>
  <value>LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native/,HADOOP_CREDSTORE_PASSWORD,AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,USE_JAVA_17</value>
</property>

<property>
  <name>mr3.container.launch.env</name>
  <value>LD_LIBRARY_PATH=/opt/mr3-run/hadoop/apache-hadoop/lib/native,HADOOP_CREDSTORE_PASSWORD,AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,USE_JAVA_17</value>
</property>

Docker image

By default, we use a pre-built Docker image mr3project/spark3:3.2.2. If the user wants to use a different Docker image, check the environment variables DOCKER_SPARK_IMG in spark/env.sh and the field spec/containers/image in spark-yaml/spark-run.yaml.

$ vi spark/env.sh

DOCKER_SPARK_IMG=mr3project/spark3:3.2.2
$ vi spark-yaml/spark-run.yaml

spec:
  containers:
  - image: mr3project/spark3:3.2.2

Creating a Spark driver Pod

In order to run a Spark driver inside Kubernetes, edit spark-yaml/driver-service.yaml and spark-yaml/spark-run.yaml. In our example, we use spark1 as the name of the Spark driver Pod.

$ vi spark-yaml/driver-service.yaml

metadata:
  name: spark1
spec:
  selector:
    sparkmr3_app: spark1
  • The fields metadata/name and spec/selector should use the name of the Spark driver Pod.
$ vi spark-yaml/spark-run.yaml

metadata:
  name: spark1
  labels:
    sparkmr3_app: spark1
spec:
  containers:
    env:
    - name: DRIVER_NAME
      value: "spark1"
    - name: SPARK_DRIVER_CORES
      value: '2'
    - name: SPARK_DRIVER_MEMORY_MB
      value: '13107'
    resources:
      requests:
        cpu: 2
        memory: 16Gi
      limits:
        cpu: 2
        memory: 16Gi
    volumeMounts:
    - name: spark-hostpath-1
      mountPath: /data1/k8s
  volumes:
  - name: spark-hostpath-1
    hostPath:
      path: /data1/k8s
      type: Directory

  • The field metadata/name specifies the name of the Spark driver Pod.
  • The field metadata/labels should use the name of the Spark driver Pod.
  • The environment variable DRIVER_NAME should be set to the name of the Spark driver Pod.
  • The environment variables SPARK_DRIVER_CORES and SPARK_DRIVER_MEMORY_MB specify the arguments for the configuration keys spark.driver.cores and spark.driver.memory (in MB) for the Spark driver, respectively.
  • The spec/containers/resources/requests and spec/containers/resources/limits fields specify the resources to to be allocated to the Spark driver Pod.
  • The fields spec/containers/volumeMounts and spec/volumes specify the list of directories to which hostPath volumes point for the Spark driver. The list of directories should match the value for spark.local.dir in spark-defaults.conf.

As we do not run Spark drivers outside Kubernetes, comment out the following lines in run-spark-setup.sh:

$ vi run-spark-setup.sh

#kubectl create -f $YAML_DIR/mr3-service.yaml 
  
#export JAVA_HOME=/usr/jdk64/jdk1.8.0_112
#export PATH=$JAVA_HOME/bin:$PATH

Execute the script run-spark-setup.sh to create various Kubernetes resources.

$ ./run-spark-setup.sh
...
export CLIENT_TO_AM_TOKEN_KEY=4690a1bf-139f-473c-aec7-4f739343e003
export MR3_APPLICATION_ID_TIMESTAMP=14265
configmap/client-am-config created

Then the user can create a Spark driver Pod as follows:

$ kubectl create -f spark-yaml/driver-service.yaml
service/spark1 created

$ kubectl create -f spark-yaml/spark-run.yaml
pod/spark1 created

$ kubectl get pods -n sparkmr3
NAME     READY   STATUS    RESTARTS   AGE
spark1   1/1     Running   0          8s

Running Spark shell

To demonstrate how to create a Spark application, we run Spark shell inside the Spark driver Pod. Starting Spark shell takes a while because it creates a new DAGAppMaster Pod.

$ kubectl exec -it -n sparkmr3 spark1 /bin/bash

spark@spark1:/opt/mr3-run/spark$ ./run-spark-shell.sh
...
Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_302)
Type in expressions to have them evaluated.
Type :help for more information.

scala> 

We find that a new DAGAppMaster Pod has been created.

$ kubectl get pods -n sparkmr3
NAME                                    READY   STATUS    RESTARTS   AGE
mr3master-spark-4265-0-76894c85-v669q   1/1     Running   0          51s
spark1                                  1/1     Running   0          92s

On Spark shell, execute a job for calculating Pi.

scala> import scala.math.random
val n = 100000
val p = sc.parallelize(1 until n, 100)
val m = p.map { i => val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y <= 1) 1 else 0
}
val count = m.reduce(_ + _)
...
scala>
count: Int = 78488

We find three new ContainerWorker Pods, each of which runs a Spark executor.

$ kubectl get pods -n sparkmr3
NAME                                    READY   STATUS    RESTARTS   AGE
mr3executor-8c5c-1                      1/1     Running   0          61s
mr3executor-8c5c-2                      1/1     Running   0          61s
mr3executor-8c5c-3                      1/1     Running   0          60s
mr3master-spark-4265-0-76894c85-v669q   1/1     Running   0          3m23s
spark1                                  1/1     Running   0          4m4s

Now terminate Spark shell.

scala> :quit
spark@spark1:/opt/mr3-run/spark$ 

We see that even though there is no Spark application running, the DAGAppMaster Pod and the three ContainerWorker Pods are still alive. This is a feature, not a bug, because DAGAppMaster and ContainerWorkers are not owned by a particular Spark application.

$ kubectl get pods -n sparkmr3
NAME                                    READY   STATUS    RESTARTS   AGE
mr3executor-8c5c-1                      1/1     Running   0          2m54s
mr3executor-8c5c-2                      1/1     Running   0          2m54s
mr3executor-8c5c-3                      1/1     Running   0          2m53s
mr3master-spark-4265-0-76894c85-v669q   1/1     Running   0          5m16s
spark1                                  1/1     Running   0          5m57s

For example, another instance of Spark shell running inside the same Spark driver Pod uses the existing DAGAppMaster (without creating a new DAGAppMaster Pod) and thus initializes itself much faster than the previous one. Executing a job also reuses the existing ContainerWorker Pods.

spark@spark1:/opt/mr3-run/spark$ ./run-spark-shell.sh
...
Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_302)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import scala.math.random
val n = 100000
val p = sc.parallelize(1 until n, 100)
val m = p.map { i => val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y <= 1) 1 else 0
}
val count = m.reduce(_ + _)
...
scala>
count: Int = 78578                                                              

Submitting Spark jobs

To submit Spark jobs, the user can use run-spark-submit.sh in the same way as spark-submit (which is called by run-spark-submit.sh). In our example, we submit a WordCount job with an input file s3a://hivemr3/pokemon.csv.

In order to download a WordCount jar file, we install wget. The root password is set to spark.

spark@spark1:/opt/mr3-run/spark$ sudo su -
root@spark1:~# apt-get update; apt-get install -y wget
root@spark1:~# exit

Next we download spark-wordcount.jar and submit a WordCount job.

spark@spark1:/opt/mr3-run/spark$ wget https://github.com/mr3project/mr3-release/releases/download/v1.5/spark-wordcount.jar

spark@spark1:/opt/mr3-run/spark$ ./run-spark-submit.sh --class WordCount ./spark-wordcount.jar s3a://hivemr3/pokemon.csv
Output Directory: 
/opt/mr3-run/spark/spark-submit-result/spark-mr3-2022-08-01-13-38-09-ea3cf957-851d-4c90-94a1-43e5529db212
...

The user can find the log file under the output directory.

spark@spark1:/opt/mr3-run/spark$ ls /opt/mr3-run/spark/spark-submit-result/spark-mr3-2022-08-01-13-38-09-ea3cf957-851d-4c90-94a1-43e5529db212/spark-logs/
out-spark-submit.txt

Running two Spark applications

To demonstrate two Spark applications sharing ContainerWorker Pods, we create another Spark driver Pod called spark2. First edit spark-yaml/driver-service.yaml and spark-yaml/spark-run.yaml to change the name.

$ vi spark-yaml/driver-service.yaml 

metadata:
  name: spark2
spec:
  selector:
    sparkmr3_app: spark2

$ vi spark-yaml/spark-run.yaml 

metadata:
  name: spark2
  labels:
    sparkmr3_app: spark2
spec:
  containers:
    env:
    - name: DRIVER_NAME
      value: "spark2"

Next create a second Spark driver Pod.

$ kubectl create -f spark-yaml/driver-service.yaml
service/spark2 created

$ kubectl create -f spark-yaml/spark-run.yaml
pod/spark2 created

$ kubectl get pods -n sparkmr3
NAME                                    READY   STATUS    RESTARTS   AGE
mr3executor-8c5c-1                      1/1     Running   0          9m15s
mr3executor-8c5c-2                      1/1     Running   0          9m15s
mr3executor-8c5c-3                      1/1     Running   0          9m14s
mr3master-spark-4265-0-76894c85-v669q   1/1     Running   0          11m
spark1                                  1/1     Running   0          12m
spark2                                  1/1     Running   0          14s

Inside the new Pod, run Spark shell and execute a job for calculating Pi.

$ kubectl exec -it -n sparkmr3 spark2 /bin/bash
spark@spark2:/opt/mr3-run/spark$ ./run-spark-shell.sh
...
Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_302)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import scala.math.random
val n = 100000
val p = sc.parallelize(1 until n, 100)
val m = p.map { i => val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y <= 1) 1 else 0
}
val count = m.reduce(_ + _)
...
scala>
count: Int = 78317

We find that no additional ContainerWorker Pods have been created, which implies that the second instance of Spark shell completes the job using existing ContainerWorker Pods.

$ kubectl get pods -n sparkmr3
NAME                                    READY   STATUS    RESTARTS   AGE
mr3executor-8c5c-1                      1/1     Running   0          12m
mr3executor-8c5c-2                      1/1     Running   0          12m
mr3executor-8c5c-3                      1/1     Running   0          12m
mr3master-spark-4265-0-76894c85-v669q   1/1     Running   0          14m
spark1                                  1/1     Running   0          15m
spark2                                  1/1     Running   0          3m16s

In our example, we use fair scheduling (with mr3.container.scheduler.scheme set to fair in mr3-site.xml), and thus one ContainerWorker Pod (with 40960MB) is assigned to one of the Spark applications while the remaining two ContainerWorker Pods (with 81920MB) are assigned to the other. For more information, see Recycling ContainerWorkers.

2022-08-01 13:40:46,631 [SparkContainerGroup-spark-mr3-app-7619f648] INFO  TaskScheduler  - SparkContainerGroup-spark-mr3-app-7619f648 current memory usage = 0.0% (0MB / 40960MB)

2022-08-01 13:40:51,001 [SparkContainerGroup-spark-mr3-app-6f15b91a] INFO  TaskScheduler  - SparkContainerGroup-spark-mr3-app-6f15b91a current memory usage = 0.0% (0MB / 81920MB)

Next delete both the Spark driver Pods. We see that the DAGAppMaster Pod and the three ContainerWorker Pods are still alive.

$ kubectl delete pod -n sparkmr3 spark1
pod "spark1" deleted
$ kubectl delete pod -n sparkmr3 spark2
pod "spark2" deleted

$ kubectl get pods -n sparkmr3
NAME                                    READY   STATUS    RESTARTS   AGE
mr3executor-8c5c-1                      1/1     Running   0          17m
mr3executor-8c5c-2                      1/1     Running   0          17m
mr3executor-8c5c-3                      1/1     Running   0          17m
mr3master-spark-4265-0-76894c85-v669q   1/1     Running   0          20m

Since there is no Spark application running, ContainerWorkers are waiting for a new Spark application. In our example, mr3.container.command.num.waits.in.reserved is set to 360 in mr3-site.xml, so ContainerWorkers wait for 360 seconds until they terminate themselves.

$ kubectl logs -n sparkmr3 -f mr3executor-8c5c-1
...
2022-08-01 13:48:37,575 [run-19-1] INFO  ContainerWorker  - Reserved and not registered to TaskCommunicator: K@1, 359
2022-08-01 13:48:37,578 [run-19-1] ERROR ContainerWorker  - Kill because DAGAppMaster cannot be reached or ContainerWorker is not registered
...
2022-08-01 13:48:37,654 [shutdownHook1] WARN  ContainerWorker  - Shutdown hook called
2022-08-01 13:48:37,656 [shutdown-hook-0] INFO  org.apache.spark.util.ShutdownHookManager  - Shutdown hook called

Stopping Spark on MR3

In order to stop DAGAppMaster, the user can delete Deployment for DAGAppMaster.

$ kubectl get deployment -n sparkmr3
NAME                     READY   UP-TO-DATE   AVAILABLE   AGE
mr3master-spark-4265-0   1/1     1            1           27m

$ kubectl delete deployment -n sparkmr3 mr3master-spark-4265-0
deployment.apps "mr3master-spark-4265-0" deleted

To delete all remaining resources, execute the following command:

$ kubectl -n sparkmr3 delete configmap --all; kubectl -n sparkmr3 delete svc --all; kubectl -n sparkmr3 delete secret --all; kubectl -n sparkmr3 delete serviceaccount spark-service-account; kubectl -n sparkmr3 delete role --all; kubectl -n sparkmr3 delete rolebinding --all; kubectl delete clusterrole node-reader; kubectl delete clusterrolebinding spark-clusterrole-binding; kubectl delete clusterrolebinding spark-master-clusterrole-binding; kubectl -n sparkmr3 delete serviceaccount master-service-account; kubectl -n sparkmr3 delete serviceaccount worker-service-account;