This page explains how to use the MR3 shuffle handler. For introduction, see MR3 Shuffle Handler.

Basic usage

In order to use the MR3 shuffle handler, the user should set three configuration keys in mr3-site.xml and tez-site.xml:

  • Set mr3.use.daemon.shufflehandler to the number of shuffle handlers in each ContainerWorker which should be larger than zero. Now a ContainerWorker runs its own threads for shuffle handlers. If it is set to zero, no shuffle handlers are created and MR3 uses an external shuffle service.
  • Set tez.am.shuffle.auxiliary-service.id to tez_shuffle (from mapreduce_shuffle) in tez-site.xml. Now the runtime system of MR3 routes intermediate data to the MR3 shuffle handler, not to an external shuffle service.
  • Set tez.shuffle.port to a port number for the shuffle handler in tez-site.xml. The default value is 13563.

If a ContainerWorker fails to secure a port number specified by tez.shuffle.port for a shuffle handler, it chooses a random port number instead. Thus multiple ContainerWorkers each with multiple shuffle handlers can run on the same node without conflicts.

The following configuration keys in tez-site.xml controls the behavior of the shuffle handler.

Name Default value Description
tez.shuffle.connection-keep-alive.enable false true: keep connections alive for reuse. false: do not reuse
tez.shuffle.max.thread 0 Number of threads for the shuffle handler. 0 sets the number of threads to 2 * the number of cores
tez.shuffle.listen.queue.size 128 Size of the listening queue. Can be set to the value in /proc/sys/net/core/somaxconn.
tez.shuffle.mapoutput-info.meta.cache.size 1000 Size of meta data of the output of mappers

Secure shuffle (using SSL mode)

The shuffle handler of MR3 supports secure shuffle using SSL (Secure Sockets Layer) mode. In comparison with Hadoop/MapReduce shuffle service, enabling secure shuffle in MR3 is much simpler because the incorporation of TEZ-4096 allows MR3 to include all SSL-related configurations in mr3-site.xml and tez-site.xml. That is, the user does not need separate configuration files such as ssl-server-mr3.xml and ssl-client-mr3.xml.

Enabling secure shuffle takes three steps:

  1. create JKS files: a KeyStore file and a TrustStore file
  2. update mr3-site.xml
  3. update tez-site.xml

Step 1. Create JKS files

Before creating JKS files, the user should choose CN (Common Name) for the nodes in the cluster. On Hadoop, the user can choose CN according to the domain (e.g., * or *.datamonad.com). On Kubernetes, however, it must be *.service-worker.hivemr3.svc.cluster.local because all ContainerWorker Pods belong to a headless Service service-worker in the namespace hivemr3.

After choosing CN, the user should create JKS files. Below we illustrate the creation of a KeyStore file mr3-keystore.jks and a TrustStore file mr3-truststore.jks with passwords key_password, keystore_password, and truststore_password.

# create a KeyStore
$ keytool -genkey -alias mr3-shuffle -keyalg RSA -keysize 2048 -dname "CN=*" -keypass key_password -keystore mr3-keystore.jks -storepass keystore_password -validity 3650

# extract the CSR (Certificate Signing Request) from the KeyStore
$ keytool -keystore mr3-keystore.jks -storepass keystore_password -alias mr3-shuffle -certreq -file mr3-shuffle.csr

# create a private key
$ openssl genrsa -out mr3.key 2048

# generate a CA certificate from the private key
$ openssl req -new -x509 -key mr3.key -out mr3.crt

# sign the certificate with the CA certificate
$ openssl x509 -req -in mr3-shuffle.csr -CA mr3.crt -CAkey mr3.key -CAcreateserial -out mr3-shuffle.crt

# import the certificate into the KeyStore
$ keytool -import -alias mr3-shuffle -file mr3-shuffle.crt -keystore mr3-shuffle.jks -storepass keystore_password

# create a TrustStore
$ keytool -importcert -alias mr3-shuffle -file mr3-shuffle.crt -keystore mr3-truststore.jks -storepass truststore_password

#  check all the files
$ ls mr3*
mr3.crt  mr3.key  mr3-keystore.jks  mr3-shuffle.crt  mr3-shuffle.csr  mr3-shuffle.jks  mr3.srl  mr3-truststore.jks

On Hadoop, copy mr3-keystore.jks and mr3-truststore.jks to a directory on HDFS (e.g., /user/hive/lib/). On Kubernetes, copy mr3-keystore.jks and mr3-truststore.jks to the directory kubernetes/key/ in the MR3 release. Change the permission if necessary.

Step 2. Update mr3-site.xml

On Hadoop, extend the configuration key mr3.aux.uris in mr3-site.xml to include the path on HDFS where mr3-keystore.jks and mr3-truststore.jks reside.

<property>
  <name>mr3.aux.uris</name>
  <value>${auxuris},/user/hive/lib/mr3-keystore.jks,/user/hive/lib/mr3-truststore.jks</value>
</property>

On Kubernetes, set CREATE_KEYTAB_SECRET to true in kubernetes/env.sh and mr3.k8s.mount.keytab.secret to true in kubernetes/conf/mr3-site.xml.

CREATE_KEYTAB_SECRET=true
<property>
  <name>mr3.k8s.mount.keytab.secret</name>
  <value>true</value>
</property>

Step 3. Update tez-site.xml

For updating tez-site.xml, the user should consider whether the configuration key hadoop.security.credential.provider.path in core-site.xml is set to a JKS file or not. If it is set, all passwords are retrieved from the JKS file, so the user needs to set only the following configuration keys in tez-site.xml.

  • ssl.server.keystore.location to mr3-keystore.jks on Hadoop and /opt/mr3-run/key/mr3-keystore.jks on Kubernetes
  • ssl.server.truststore.location to mr3-truststore.jks on Hadoop and /opt/mr3-run/key/mr3-truststore.jks on Kubernetes
  • ssl.client.truststore.location to mr3-truststore.jks on Hadoop and /opt/mr3-run/key/mr3-truststore.jks on Kubernetes.

If it is not set, all passwords should be provided in text, so the user needs to set the following configuration keys as well.

  • ssl.server.keystore.password to the KeyStore password
  • ssl.server.truststore.password to the TrustStore password
  • ssl.client.truststore.password to the TrusStore password

Finally the user should set the following configuration keys to enable secure shuffle.

<property>
  <name>tez.runtime.shuffle.ssl.enable</name>
  <value>true</value>
</property>

<property>
  <name>tez.runtime.shuffle.keep-alive.enabled</name>
  <value>true</value>
</property>

On old versions of Kubernetes, the shuffle handler may occasionally cause fetch-failures after generating SSLException. In such a case, the user may observe VertexReruns due to fetch-failures.

Running shuffle handlers in a separate process on Kubernetes

In order to run shuffle handlers in a separate process inside a ContainerWorker Pod, the user should set three configuration keys in mr3-site.xml.

  • Set mr3.use.daemon.shufflehandler to zero. Then the process for ContainerWorker itself does not create threads for shuffle handlers, and a separate process for shuffle handlers is created.
  • Set mr3.k8s.shuffle.process.ports to a comma-separated list of port numbers. Then MR3 creates a shuffle handler for each port number in the separate process for shuffle handlers. For example, if it is set to 15500,15510,15520,15530,15540, MR3 creates 5 shuffle handlers with port number 15500 to 15540.
  • Set mr3.k8s.shufflehandler.process.memory.mb to the size of memory in MB for the process for shuffle handlers. The default value is 1024.

The port numbers specified by the configuration key mr3.k8s.shuffle.process.ports should not conflict with those already in use inside a ContainerWorker Pod. Usually port numbers in the range of 10000 and above are safe to use. Since every ContainerWorker Pod is assigned a unique IP address, no conflict arises between different ContainerWorker Pods, whether they run on the same physical node or not.

Troubleshooting

A DAG fails with NON_FATAL: Shuffle Runner Failed.

A DAG may fail with an error message shown below:

2020-05-30T02:31:56,684 ERROR [HiveServer2-Background-Pool: Thread-726] SessionState: Terminating unsuccessfully: Vertex failed, vertex_1590721199917_0013_22_00, Task unsuccessful: Reducer 3, task_1590721199917_0013_22_00_000395, com.datamonad.mr3.api.common.MR3Exception: NON_FATAL: Shuffle Runner Failed
...
Caused by: java.io.IOException: Failed 5 times trying to download from Map 1_003078_00. threshold=5
  at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleScheduler.isAbortLimitExceedFor(ShuffleScheduler.java:803)
  at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleScheduler.isShuffleHealthy(ShuffleScheduler.java:994)
  at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleScheduler.copyFailed(ShuffleScheduler.java:781)
  at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetcherOrderedGrouped.copyFromHost(FetcherOrderedGrouped.java:327)
  at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetcherOrderedGrouped.fetchNext(FetcherOrderedGrouped.java:183)
  at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetcherOrderedGrouped.call(FetcherOrderedGrouped.java:195)
  at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetcherOrderedGrouped.call(FetcherOrderedGrouped.java:58)

In such a case, a TaskAttempt successfully establishes a connection to a remote shuffle handler, but fails to download data in several attempts. Since the successful connection indicates that the ContainerWorker with the shuffle handler is running normally, MR3 does not trigger a Vertex rerun to achieve fault tolerance. The user can try a large value for the configuration key tez.runtime.shuffle.src-attempt.abort.limit in tez-site.xml so that a TaskAttempt can make more attempts to download data.