This page shows how to operate Spark on MR3 on Kubernetes with multiple nodes. By following the instruction, the user will learn:
- how to run Spark on MR3
- how to execute Spark-shell inside Kubernetes
- how to submit Spark jobs inside Kubernetes
This scenario has the following prerequisites:
- A running Kubernetes cluster is available.
- Every node has an identical set of local directories for storing intermediate data (to be mapped to hostPath volumes). These directories should be writable to the user with UID 1000 because all containers run as non-root user with UID 1000.
We use a pre-built Docker image mr3project/spark3:3.2.2
available at DockerHub,
so the user does not have to build a new Docker image.
This scenario should take less than 30 minutes to complete, not including the time for downloading a pre-built Docker image.
For asking any questions, please visit MR3 Google Group or join MR3 Slack.
Installation
Download an MR3 release containing the executable scripts.
$ git clone https://github.com/mr3project/mr3-run-k8s.git
$ cd mr3-run-k8s/kubernetes/
spark-defaults.conf
spark/conf/spark-defaults.conf
contains default settings for Spark.
Adjust the following settings as necessary.
$ vi spark/conf/spark-defaults.conf
spark.executor.cores=16
spark.executor.memory=32768m
spark.executor.memoryOverhead=8192m
spark.task.cpus=2
spark.local.dir=/data1/k8s
spark.executor.cores
specifies the CPU resources of ContainerWorker Pods (in which Spark executors run).- The sum of values specified by
spark.executor.memory
andspark.executor.memoryOverhead
becomes the memory size of ContainerWorker Pods. spark.task.cpus
specifies the number of cores to be allocated to each individual Spark task. In our example, a single Spark executor can run 16 / 2 = 8 concurrent Spark tasks.spark.local.dir
should be set to the list of local directories available on every node. These local directories are mapped to hostPath volumes and used by block managers of Spark drivers.
mr3-site.xml
spark/conf/mr3-site.xml
is the configuration file for MR3.
Adjust the following settings as necessary.
$ vi spark/conf/mr3-site.xml
<property>
<name>mr3.container.scheduler.scheme</name>
<value>fair</value>
</property>
<property>
<name>mr3.am.resource.memory.mb</name>
<value>16384</value>
</property>
<property>
<name>mr3.am.resource.cpu.cores</name>
<value>4</value>
</property>
<property>
<name>mr3.k8s.pod.worker.hostpaths</name>
<value>/data1/k8s</value>
</property>
mr3.container.scheduler.scheme
specifies the policy for recycling ContainerWorkers among Spark applications, and can be set tofifo
orfair
. For details, see Recycling ContainerWorkers.mr3.am.resource.memory.mb
specifies the memory size (in MB) of the DAGAppMaster Pod.mr3.am.resource.cpu.cores
specifies the number of cores to be allocated to the DAGAppMaster Pod.mr3.k8s.pod.worker.hostpaths
should be set to the list of local directories available on every node. These local directories are mapped to hostPath volumes and used by block managers of Spark executors.
Setting host aliases (optional)
To use host names (instead of IP addresses) when configuring Spark on MR3, the user should update:
- configuration key
mr3.k8s.host.aliases
inspark/conf/mr3-site.xml
spec/template/spec/hostAliases
field inspark-yaml/spark-run.yaml
For example,
to use a host name called orange0
with IP address 192.168.10.100, update as follows:
$ vi spark/conf/mr3-site.xml
<property>
<name>mr3.k8s.host.aliases</name>
<value>orange0=192.168.10.100</value>
</property>
$ vi spark-yaml/spark-run.yaml
spec:
hostAliases:
- ip: "192.168.10.100"
hostnames:
- "orange0"
For accessing HDFS, host aliases should include a mapping for the HDFS NameNode.
Configuring S3 (optional)
For accessing S3-compatible storage,
additional configuration keys should be set in spark/conf/core-site.xml
.
Open spark/conf/core-site.xml
and set configuration keys for S3.
The configuration key fs.s3a.endpoint
should be set to point to the storage server.
$ vi spark/conf/core-site.xml
<property>
<name>fs.s3a.aws.credentials.provider</name>
<value>com.amazonaws.auth.EnvironmentVariableCredentialsProvider</value>
</property>
<property>
<name>fs.s3a.endpoint</name>
<value>http://192.168.10.100:9000</value>
</property>
<property>
<name>fs.s3a.path.style.access</name>
<value>true</value>
</property>
The user may need to change the parameters for accessing S3
to avoid SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
.
For more details, see Troubleshooting.
When using
the class EnvironmentVariableCredentialsProvider
to read AWS credentials,
add two environment variables AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
in spark/env.sh
.
$ vi spark/env.sh
export AWS_ACCESS_KEY_ID=_your_aws_access_key_id_
export AWS_SECRET_ACCESS_KEY=_your_aws_secret_secret_key_
Note that AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
are already appended
to the values of the configuration keys mr3.am.launch.env
and mr3.container.launch.env
in spark/conf/mr3-site.xml
.
For the security purpose, the user should NOT write AWS access key ID and secret access key in conf/mr3-site.xml
.
$ vi spark/conf/mr3-site.xml
<property>
<name>mr3.am.launch.env</name>
<value>LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native/,HADOOP_CREDSTORE_PASSWORD,AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,USE_JAVA_17</value>
</property>
<property>
<name>mr3.container.launch.env</name>
<value>LD_LIBRARY_PATH=/opt/mr3-run/hadoop/apache-hadoop/lib/native,HADOOP_CREDSTORE_PASSWORD,AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,USE_JAVA_17</value>
</property>
Docker image
By default, we use a pre-built Docker image mr3project/spark3:3.2.2
.
If the user wants to use a different Docker image,
check the environment variables DOCKER_SPARK_IMG
in spark/env.sh
and the field spec/containers/image
in spark-yaml/spark-run.yaml
.
$ vi spark/env.sh
DOCKER_SPARK_IMG=mr3project/spark3:3.2.2
$ vi spark-yaml/spark-run.yaml
spec:
containers:
- image: mr3project/spark3:3.2.2
Creating a Spark driver Pod
In order to run a Spark driver inside Kubernetes,
edit spark-yaml/driver-service.yaml
and spark-yaml/spark-run.yaml
.
In our example, we use spark1
as the name of the Spark driver Pod.
$ vi spark-yaml/driver-service.yaml
metadata:
name: spark1
spec:
selector:
sparkmr3_app: spark1
- The fields
metadata/name
andspec/selector
should use the name of the Spark driver Pod.
$ vi spark-yaml/spark-run.yaml
metadata:
name: spark1
labels:
sparkmr3_app: spark1
spec:
containers:
env:
- name: DRIVER_NAME
value: "spark1"
- name: SPARK_DRIVER_CORES
value: '2'
- name: SPARK_DRIVER_MEMORY_MB
value: '13107'
resources:
requests:
cpu: 2
memory: 16Gi
limits:
cpu: 2
memory: 16Gi
volumeMounts:
- name: spark-hostpath-1
mountPath: /data1/k8s
volumes:
- name: spark-hostpath-1
hostPath:
path: /data1/k8s
type: Directory
- The field
metadata/name
specifies the name of the Spark driver Pod. - The field
metadata/labels
should use the name of the Spark driver Pod. - The environment variable
DRIVER_NAME
should be set to the name of the Spark driver Pod. - The environment variables
SPARK_DRIVER_CORES
andSPARK_DRIVER_MEMORY_MB
specify the arguments for the configuration keysspark.driver.cores
andspark.driver.memory
(in MB) for the Spark driver, respectively. - The
spec/containers/resources/requests
andspec/containers/resources/limits
fields specify the resources to to be allocated to the Spark driver Pod. - The fields
spec/containers/volumeMounts
andspec/volumes
specify the list of directories to which hostPath volumes point for the Spark driver. The list of directories should match the value forspark.local.dir
inspark-defaults.conf
.
As we do not run Spark drivers outside Kubernetes,
comment out the following lines in run-spark-setup.sh
:
$ vi run-spark-setup.sh
#kubectl create -f $YAML_DIR/mr3-service.yaml
#export JAVA_HOME=/usr/jdk64/jdk1.8.0_112
#export PATH=$JAVA_HOME/bin:$PATH
Execute the script run-spark-setup.sh
to create various Kubernetes resources.
$ ./run-spark-setup.sh
...
export CLIENT_TO_AM_TOKEN_KEY=4690a1bf-139f-473c-aec7-4f739343e003
export MR3_APPLICATION_ID_TIMESTAMP=14265
configmap/client-am-config created
Then the user can create a Spark driver Pod as follows:
$ kubectl create -f spark-yaml/driver-service.yaml
service/spark1 created
$ kubectl create -f spark-yaml/spark-run.yaml
pod/spark1 created
$ kubectl get pods -n sparkmr3
NAME READY STATUS RESTARTS AGE
spark1 1/1 Running 0 8s
Running Spark shell
To demonstrate how to create a Spark application, we run Spark shell inside the Spark driver Pod. Starting Spark shell takes a while because it creates a new DAGAppMaster Pod.
$ kubectl exec -it -n sparkmr3 spark1 /bin/bash
spark@spark1:/opt/mr3-run/spark$ ./run-spark-shell.sh
...
Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_302)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
We find that a new DAGAppMaster Pod has been created.
$ kubectl get pods -n sparkmr3
NAME READY STATUS RESTARTS AGE
mr3master-spark-4265-0-76894c85-v669q 1/1 Running 0 51s
spark1 1/1 Running 0 92s
On Spark shell, execute a job for calculating Pi.
scala> import scala.math.random
val n = 100000
val p = sc.parallelize(1 until n, 100)
val m = p.map { i => val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y <= 1) 1 else 0
}
val count = m.reduce(_ + _)
...
scala>
count: Int = 78488
We find three new ContainerWorker Pods, each of which runs a Spark executor.
$ kubectl get pods -n sparkmr3
NAME READY STATUS RESTARTS AGE
mr3executor-8c5c-1 1/1 Running 0 61s
mr3executor-8c5c-2 1/1 Running 0 61s
mr3executor-8c5c-3 1/1 Running 0 60s
mr3master-spark-4265-0-76894c85-v669q 1/1 Running 0 3m23s
spark1 1/1 Running 0 4m4s
Now terminate Spark shell.
scala> :quit
spark@spark1:/opt/mr3-run/spark$
We see that even though there is no Spark application running, the DAGAppMaster Pod and the three ContainerWorker Pods are still alive. This is a feature, not a bug, because DAGAppMaster and ContainerWorkers are not owned by a particular Spark application.
$ kubectl get pods -n sparkmr3
NAME READY STATUS RESTARTS AGE
mr3executor-8c5c-1 1/1 Running 0 2m54s
mr3executor-8c5c-2 1/1 Running 0 2m54s
mr3executor-8c5c-3 1/1 Running 0 2m53s
mr3master-spark-4265-0-76894c85-v669q 1/1 Running 0 5m16s
spark1 1/1 Running 0 5m57s
For example, another instance of Spark shell running inside the same Spark driver Pod uses the existing DAGAppMaster (without creating a new DAGAppMaster Pod) and thus initializes itself much faster than the previous one. Executing a job also reuses the existing ContainerWorker Pods.
spark@spark1:/opt/mr3-run/spark$ ./run-spark-shell.sh
...
Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_302)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import scala.math.random
val n = 100000
val p = sc.parallelize(1 until n, 100)
val m = p.map { i => val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y <= 1) 1 else 0
}
val count = m.reduce(_ + _)
...
scala>
count: Int = 78578
Submitting Spark jobs
To submit Spark jobs,
the user can use run-spark-submit.sh
in the same way as spark-submit
(which is called by run-spark-submit.sh
).
In our example, we submit a WordCount job with an input file s3a://hivemr3/pokemon.csv
.
In order to download a WordCount jar file, we install wget
.
The root password is set to spark
.
spark@spark1:/opt/mr3-run/spark$ sudo su -
root@spark1:~# apt-get update; apt-get install -y wget
root@spark1:~# exit
Next we download spark-wordcount.jar
and submit a WordCount job.
spark@spark1:/opt/mr3-run/spark$ wget https://github.com/mr3project/mr3-release/releases/download/v1.5/spark-wordcount.jar
spark@spark1:/opt/mr3-run/spark$ ./run-spark-submit.sh --class WordCount ./spark-wordcount.jar s3a://hivemr3/pokemon.csv
Output Directory:
/opt/mr3-run/spark/spark-submit-result/spark-mr3-2022-08-01-13-38-09-ea3cf957-851d-4c90-94a1-43e5529db212
...
The user can find the log file under the output directory.
spark@spark1:/opt/mr3-run/spark$ ls /opt/mr3-run/spark/spark-submit-result/spark-mr3-2022-08-01-13-38-09-ea3cf957-851d-4c90-94a1-43e5529db212/spark-logs/
out-spark-submit.txt
Running two Spark applications
To demonstrate two Spark applications sharing ContainerWorker Pods,
we create another Spark driver Pod called spark2
.
First edit spark-yaml/driver-service.yaml
and spark-yaml/spark-run.yaml
to change the name.
$ vi spark-yaml/driver-service.yaml
metadata:
name: spark2
spec:
selector:
sparkmr3_app: spark2
$ vi spark-yaml/spark-run.yaml
metadata:
name: spark2
labels:
sparkmr3_app: spark2
spec:
containers:
env:
- name: DRIVER_NAME
value: "spark2"
Next create a second Spark driver Pod.
$ kubectl create -f spark-yaml/driver-service.yaml
service/spark2 created
$ kubectl create -f spark-yaml/spark-run.yaml
pod/spark2 created
$ kubectl get pods -n sparkmr3
NAME READY STATUS RESTARTS AGE
mr3executor-8c5c-1 1/1 Running 0 9m15s
mr3executor-8c5c-2 1/1 Running 0 9m15s
mr3executor-8c5c-3 1/1 Running 0 9m14s
mr3master-spark-4265-0-76894c85-v669q 1/1 Running 0 11m
spark1 1/1 Running 0 12m
spark2 1/1 Running 0 14s
Inside the new Pod, run Spark shell and execute a job for calculating Pi.
$ kubectl exec -it -n sparkmr3 spark2 /bin/bash
spark@spark2:/opt/mr3-run/spark$ ./run-spark-shell.sh
...
Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_302)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import scala.math.random
val n = 100000
val p = sc.parallelize(1 until n, 100)
val m = p.map { i => val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y <= 1) 1 else 0
}
val count = m.reduce(_ + _)
...
scala>
count: Int = 78317
We find that no additional ContainerWorker Pods have been created, which implies that the second instance of Spark shell completes the job using existing ContainerWorker Pods.
$ kubectl get pods -n sparkmr3
NAME READY STATUS RESTARTS AGE
mr3executor-8c5c-1 1/1 Running 0 12m
mr3executor-8c5c-2 1/1 Running 0 12m
mr3executor-8c5c-3 1/1 Running 0 12m
mr3master-spark-4265-0-76894c85-v669q 1/1 Running 0 14m
spark1 1/1 Running 0 15m
spark2 1/1 Running 0 3m16s
In our example,
we use fair scheduling (with mr3.container.scheduler.scheme
set to fair
in mr3-site.xml
),
and thus
one ContainerWorker Pod (with 40960MB) is assigned to one of the Spark applications
while the remaining two ContainerWorker Pods (with 81920MB) are assigned to the other.
For more information,
see Recycling ContainerWorkers.
2022-08-01 13:40:46,631 [SparkContainerGroup-spark-mr3-app-7619f648] INFO TaskScheduler - SparkContainerGroup-spark-mr3-app-7619f648 current memory usage = 0.0% (0MB / 40960MB)
2022-08-01 13:40:51,001 [SparkContainerGroup-spark-mr3-app-6f15b91a] INFO TaskScheduler - SparkContainerGroup-spark-mr3-app-6f15b91a current memory usage = 0.0% (0MB / 81920MB)
Next delete both the Spark driver Pods. We see that the DAGAppMaster Pod and the three ContainerWorker Pods are still alive.
$ kubectl delete pod -n sparkmr3 spark1
pod "spark1" deleted
$ kubectl delete pod -n sparkmr3 spark2
pod "spark2" deleted
$ kubectl get pods -n sparkmr3
NAME READY STATUS RESTARTS AGE
mr3executor-8c5c-1 1/1 Running 0 17m
mr3executor-8c5c-2 1/1 Running 0 17m
mr3executor-8c5c-3 1/1 Running 0 17m
mr3master-spark-4265-0-76894c85-v669q 1/1 Running 0 20m
Since there is no Spark application running,
ContainerWorkers are waiting for a new Spark application.
In our example,
mr3.container.command.num.waits.in.reserved
is set to 360 in mr3-site.xml
, so
ContainerWorkers wait for 360 seconds until they terminate themselves.
$ kubectl logs -n sparkmr3 -f mr3executor-8c5c-1
...
2022-08-01 13:48:37,575 [run-19-1] INFO ContainerWorker - Reserved and not registered to TaskCommunicator: K@1, 359
2022-08-01 13:48:37,578 [run-19-1] ERROR ContainerWorker - Kill because DAGAppMaster cannot be reached or ContainerWorker is not registered
...
2022-08-01 13:48:37,654 [shutdownHook1] WARN ContainerWorker - Shutdown hook called
2022-08-01 13:48:37,656 [shutdown-hook-0] INFO org.apache.spark.util.ShutdownHookManager - Shutdown hook called
Stopping Spark on MR3
In order to stop DAGAppMaster, the user can delete Deployment for DAGAppMaster.
$ kubectl get deployment -n sparkmr3
NAME READY UP-TO-DATE AVAILABLE AGE
mr3master-spark-4265-0 1/1 1 1 27m
$ kubectl delete deployment -n sparkmr3 mr3master-spark-4265-0
deployment.apps "mr3master-spark-4265-0" deleted
To delete all remaining resources, execute the following command:
$ kubectl -n sparkmr3 delete configmap --all; kubectl -n sparkmr3 delete svc --all; kubectl -n sparkmr3 delete secret --all; kubectl -n sparkmr3 delete serviceaccount spark-service-account; kubectl -n sparkmr3 delete role --all; kubectl -n sparkmr3 delete rolebinding --all; kubectl delete clusterrole node-reader; kubectl delete clusterrolebinding spark-clusterrole-binding; kubectl delete clusterrolebinding spark-master-clusterrole-binding; kubectl -n sparkmr3 delete serviceaccount master-service-account; kubectl -n sparkmr3 delete serviceaccount worker-service-account;