After getting up and running Hive on MR3, the user may wish to improve its performance by changing configuration parameters. Because of the sheer number of configuration parameters in Hive, Tez, and MR3, however, it is out of the question to write a definitive guide to finding an optimal set of configuration parameters. As a consequence, the user should usually embark on a long journey to find a set of configuration parameters suitable for his or her workload. Still we can identify a relatively small number of configuration parameters that usually have a major and immediate impact on the performance for typical workloads. Below we describe such configuration parameters in the order of importance.

This page is work in progress and may be expanded in the future.

Resources for mappers (Map Tasks), reducers (Reduce Tasks), and ContainerWorkers

The following configuration keys in hive-site.xml specify resources (in terms of memory in MB and number of cores) to be allocated to each mapper, reducer, and ContainerWorker:

  • hive.mr3.map.task.memory.mb and hive.mr3.map.task.vcores: memory in MB and number of cores to be allocated to each mapper
  • hive.mr3.reduce.task.memory.mb and hive.mr3.reduce.task.vcores: memory in MB and number of cores to be allocated to each mapper
  • hive.mr3.all-in-one.containergroup.memory.mb and hive.mr3.all-in-one.containergroup.vcores (for all-in-one ContainerGroup scheme): memory in MB and number of cores to be allocated to each ContainerWorker under all-in-one scheme
  • hive.mr3.resource.vcores.divisor: divisor for the number of cores

hive.mr3.map.task.memory.mb and hive.mr3.reduce.task.memory.mb should be sufficiently large for the size of the dataset. Otherwise queries may fail with OutOfMemoryError or MapJoinMemoryExhaustionError.

The performance of Hive on MR3 usually improves if multiple mappers and reducers can run in a ContainerWorker concurrently. Moreover queries are less likely to fail with OutOfMemoryError or MapJoinMemoryExhaustionError because a mapper or reducer can use more memory than specified by hive.mr3.map.task.memory.mb or hive.mr3.reduce.task.memory.mb. With too many mappers and reducers in a ContainerWorker, however, the performance may deteriorate because of the increased overhead of memory allocation and garbage collection.

In the following example, we allocate 4GB and 1 core to each mapper and reducer. For a ContainerWorker, we allocate 40GB and 10 cores so that 10 mappers and reducers can run concurrently.

<property>
  <name>hive.mr3.resource.vcores.divisor</name>
  <value>1</value>
</property>

<property>
  <name>hive.mr3.map.task.memory.mb</name>
  <value>4096</value>
</property>

<property>
  <name>hive.mr3.map.task.vcores</name>
  <value>1</value>
</property>

<property>
  <name>hive.mr3.reduce.task.memory.mb</name>
  <value>4096</value>
</property>

<property>
  <name>hive.mr3.reduce.task.vcores</name>
  <value>1</value>
</property>

<property>
  <name>hive.mr3.all-in-one.containergroup.memory.mb</name>
  <value>40960</value>
</property>

<property>
  <name>hive.mr3.all-in-one.containergroup.vcores</name>
  <value>10</value>
</property>

The user can configure Hive on MR3 so that the default values for mappers and reducers can be overridden for each individual query inside Beeline connections. First add relevant configuration keys to the list specified by the configuration key hive.security.authorization.sqlstd.confwhitelist.append in hive-site.xml. Note that regular expressions for the list are separated by |, not ,. (We do not recommend to add hive.mr3.resource.vcores.divisor to the list because it implicitly affects hive.mr3.all-in-one.containergroup.vcores.)

<property>
  <name>hive.security.authorization.sqlstd.confwhitelist.append</name>
  <value>hive\.querylog\.location.*|hive\.mr3\.map\.task.*|hive\.mr3\.reduce\.task.*</value>
</property>

After restarting Hive on MR3, the user can override the default values inside Beeline connections. In the following example, the first query allocates 8GB and 2 cores to each mapper and reducer whereas the second query allocates 2GB and 1 core to each mapper and reducer.

0: jdbc:hive2://192.168.10.1:9852/> set hive.mr3.map.task.memory.mb=8192;
0: jdbc:hive2://192.168.10.1:9852/> set hive.mr3.map.task.vcores=2;
0: jdbc:hive2://192.168.10.1:9852/> set hive.mr3.reduce.task.memory.mb=8192;
0: jdbc:hive2://192.168.10.1:9852/> set hive.mr3.reduce.task.vcores=2;
0: jdbc:hive2://192.168.10.1:9852/> !run /home/hive/sample1.sql

0: jdbc:hive2://192.168.10.1:9852/> set hive.mr3.map.task.memory.mb=2048;
0: jdbc:hive2://192.168.10.1:9852/> set hive.mr3.map.task.vcores=1;
0: jdbc:hive2://192.168.10.1:9852/> set hive.mr3.reduce.task.memory.mb=2048;
0: jdbc:hive2://192.168.10.1:9852/> set hive.mr3.reduce.task.vcores=1;
0: jdbc:hive2://192.168.10.1:9852/> !run /home/hive/sample2.sql

Below we show examples of common mistakes in configuring resources. We assume that hive.mr3.resource.vcores.divisor is set to 1.

1. Memory not fully utilized

  • hive.mr3.map.task.memory.mb = 1024, hive.mr3.map.task.vcores = 1
  • hive.mr3.reduce.task.memory.mb = 1024, hive.mr3.reduce.task.vcores = 1
  • hive.mr3.all-in-one.containergroup.memory.mb = 8192, hive.mr3.all-in-one.containergroup.vcores = 4

A ContainerWorker (with 4 cores) can accommodate 4 mappers and reducers (each requesting 1 core). Since every mapper or reducer requests only 1024MB, a ContainerWorker never uses the remaining 8192 - 4 * 1024 = 4096MB of memory. As a result, the average memory usage reported by DAGAppMaster never exceeds 50%.

2020-07-19T10:07:28,159  INFO [All-In-One] TaskScheduler: All-In-One average memory usage = 50.0% (4096MB / 8192MB)

2. Cores not fully utilized

  • hive.mr3.map.task.memory.mb = 1024, hive.mr3.map.task.vcores = 1
  • hive.mr3.reduce.task.memory.mb = 1024, hive.mr3.reduce.task.vcores = 1
  • hive.mr3.all-in-one.containergroup.memory.mb = 4096, hive.mr3.all-in-one.containergroup.vcores = 8

A ContainerWorker (with 4096MB) can accommodate 4 mappers and reducers (each requesting 1024MB). Since every mapper or reducer requests 1 core, 8 - 4 * 1 = 4 cores are never used.

3. Memory and cores not fully utilized

  • hive.mr3.map.task.memory.mb = 2048, hive.mr3.map.task.vcores = 2
  • hive.mr3.reduce.task.memory.mb = 2048, hive.mr3.reduce.task.vcores = 2
  • hive.mr3.all-in-one.containergroup.memory.mb = 9216, hive.mr3.all-in-one.containergroup.vcores = 9

After taking 4 mappers and reducers, a ContainerWorker does not use the remaining resources (1024MB of memory and 1 core).

4. Resources for ContainerWorkers too large

The resources to be assigned to each ContainerWorker should not exceed the maximum resources allowed by the underlying resource manager (which is Yarn in the case of Hadoop). The maximum resources are usually smaller than the physical resources available on a worker node. For example, a worker node with 16GB of physical memory and 4 physical cores may allow up to 14GB of memory and 3 cores for ContainerWorkers only.

In addition, if a ContainerWorker starts with LLAP I/O enabled, the user should take into consideration the memory allocated for LLAP I/O as well (hive.mr3.llap.headroom.mb and hive.llap.io.memory.size). For more details, see LLAP I/O.

Memory settings for Hive and Tez

The following configuration keys specify the threshold for Map Join in Hive and the size of buffers in Tez.

  • hive.auto.convert.join.noconditionaltask.size in hive-site.xml
  • tez.runtime.io.sort.mb and tez.runtime.unordered.output.buffer.size-mb in tez-site.xml

The following example shows those values that we use for the TPC-DS benchmark.

<property>
  <name>hive.auto.convert.join.noconditionaltask.size</name>
  <value>1145044992</value>
</property>

<property>
  <name>tez.runtime.io.sort.mb</name>
  <value>1040</value>
</property>

<property>
  <name>tez.runtime.unordered.output.buffer.size-mb</name>
  <value>307</value>
</property>

The default values specified in hive-site.xml can be overridden for each individual query inside Beeline connections, as shown below.

0: jdbc:hive2://192.168.10.1:9852/> set hive.auto.convert.join.noconditionaltask.size=2000000000;
0: jdbc:hive2://192.168.10.1:9852/> set tez.runtime.io.sort.mb=2000;
0: jdbc:hive2://192.168.10.1:9852/> set tez.runtime.unordered.output.buffer.size-mb=600;
0: jdbc:hive2://192.168.10.1:9852/> !run /home/hive/sample.sql

Eliminating fetch delays

Eliminating fetch delays requires two features of MR3: multiple shuffle handlers in a ContainerWorker and speculative execution. As a preliminary step, the user should set the kernel parameter net.core.somaxconn to a sufficiently large value on every node in the cluster.

To allow a ContainerWorker to run multiple shuffle handlers, set the configuration key hive.mr3.use.daemon.shufflehandler in hive-site.xml to the number of shuffle handlers in each ContainerWorker. To enable speculative execution, set the configuration key hive.mr3.am.task.concurrent.run.threshold.percent in hive-site.xml to the percentage of completed Tasks before starting to watch TaskAttempts for speculative execution. In the following example, we run 10 shuffle handlers in each ContainerWorker and wait until 95 percent of Tasks complete before starting to watch TaskAttempts.

<property>
  <name>hive.mr3.use.daemon.shufflehandler</name>
  <value>10</value>
</property>

<property>
  <name>hive.mr3.am.task.concurrent.run.threshold.percent</name>
  <value>95</value>
</property>

Running too many shuffle handlers in a ContainerWorker with small resources can have an adverse effect on the performance. The user can start performance tuning by creating a shuffle handler per core, which works well in practice.

Auto parallelism

To fully enable auto parallelism, the user should set the configuration key tez.shuffle-vertex-manager.enable.auto-parallel to true in tez-site.xml. Enabling auto parallelism may cause individual queries to run slightly slower (because smaller resources are allocated), but usually improves the throughput for concurrent queries, especially when the cluster is under heavy load. If tez.shuffle-vertex-manager.enable.auto-parallel is set to true, the following configuration keys decide when to trigger auto parallelism and how to redistribute Tasks.

  • tez.shuffle-vertex-manager.auto-parallel.min.num.tasks
  • tez.shuffle-vertex-manager.auto-parallel.max.reduction.percentage
  • tez.shuffle-vertex-manager.use-stats-auto-parallelism
  • tez.shuffle.vertex.manager.auto.parallelism.min.percent

The interpretation of the following example is:

  • Vertexes with at least 40 Tasks are considered for auto parallelism.
  • The number of Tasks can be reduced by up to 100 - 10 = 90 percent, thereby leaving 10 percent of Tasks. For example, a Vertex of 100 Tasks in the beginning may end up with 10 Tasks when auto parallelism is used.
  • Vertexes analyze input statistics when applying auto parallelism.
  • An input size of zero is normalized to 20 while the maximum input size is mapped to 100.
<property>
  <name>tez.shuffle-vertex-manager.enable.auto-parallel</name>
  <value>true</value>
</property>

<property>
  <name>tez.shuffle-vertex-manager.auto-parallel.min.num.tasks</name>
  <value>40</value>
</property>

<property>
  <name>tez.shuffle-vertex-manager.auto-parallel.max.reduction.percentage</name>
  <value>10</value>
</property>

<property>
  <name>tez.shuffle-vertex-manager.use-stats-auto-parallelism</name>
  <value>true</value>
</property>

<property>
  <name>tez.shuffle.vertex.manager.auto.parallelism.min.percent</name>
  <value>20</value>
</property>

Soft references in Tez

In a cluster with plenty of memory for the workload, using soft references for ByteBuffers allocated in PipelinedSorter in Tez can make a noticeable difference. To be specific, setting the configuration key tez.runtime.pipelined.sorter.use.soft.reference to true in tez-site.xml creates soft references for ByteBuffers allocated in PipelinedSorter and allows these references to be reused across all TaskAttempts running in the same ContainerWorker, thus relieving pressure on the garbage collector.

<property>
  <name>tez.runtime.pipelined.sorter.use.soft.reference</name>
  <value>true</value>
</property>

(If using soft references improves the performance of Hive on MR3, setting another configuration key tez.runtime.pipelined.sorter.lazy-allocate.memory in tez-site.xml may further improve the performance.) When the size of memory allocated to each ContainerWorker is small, however, using soft references is less likely to improve the performance.

In the case of using soft references, the user should append a Java VM option SoftRefLRUPolicyMSPerMB (in milliseconds) for the configuration key mr3.container.launch.cmd-opts in mr3-site.xml. Otherwise ContainerWorkers use the default value of 1000 for SoftRefLRUPolicyMSPerMB. In the following example, we set SoftRefLRUPolicyMSPerMB to 25 milliseconds:

<property>
  <name>mr3.container.launch.cmd-opts</name>
  <value>-XX:+AlwaysPreTouch -Xss512k -XX:+UseG1GC -XX:TLABSize=8m -XX:+ResizeTLAB -XX:+UseNUMA -XX:+AggressiveOpts -XX:InitiatingHeapOccupancyPercent=40 -XX:G1ReservePercent=20 -XX:MaxGCPauseMillis=200 -XX:MetaspaceSize=1024m -server -Djava.net.preferIPv4Stack=true -XX:NewRatio=8 -XX:+UseNUMA -XX:SoftRefLRUPolicyMSPerMB=25</value>
</property>

Tuning garbage collection

In most cases, it is not worth the time and effort (and the user should not try) to rigorously tune those parameters governing garbage collection because Java VM is already good at choosing suitable values based on the number of processors and the heap size. In certain environments (e.g., CDH 5.15), however, Java VM makes a wrong assumption about the number of processors (e.g., because of the cgroup enablement with Java 1.8 update 191 or later), and as a consequence, the performance noticeably deteriorates as the memory allocated to an individual ContainerWorker increases. In such a case, the user should check if Runtime.getRuntime().availableProcessors() returns a correct value, and manually configure the garbage collector if necessary.

As an example, consider the output of the G1 garbage collector (with -XX:+UseG1GC) on a machine with 36 processors (with hyper-threading enabled):

2019-10-15T20:17:50.278+0900: [GC pause (G1 Evacuation Pause) (young), 0.0216146 secs]
   [Parallel Time: 4.7 ms, GC Workers: 2]

The garbage collector creates only two worker threads because Runtime.getRuntime().availableProcessors() returns 2. Usually the garbage collector creates at least half as many worker threads as the number of processors. For example, the following output shows that 18 worker threads are running on a machine with 24 processors.

2019-12-17T17:51:12.129+0900: [GC pause (G1 Evacuation Pause) (young), 0.0135165 secs]
   [Parallel Time: 10.4 ms, GC Workers: 18]

If the garbage collector is not working normally, the user should provide additional parameters to Java VM (with configuration keys mr3.am.launch.cmd-opts and mr3.container.launch.cmd-opts in mr3-site.xml). An example of specifying the number of worker threads for the G1 garbage collector is:

-XX:ParallelGCThreads=8 -XX:ConcGCThreads=2

If Runtime.getRuntime().availableProcessors() returns a wrong value, the user may also have to set the configuration key tez.shuffle.max.threads manually because with the default value of 0, it is set to double the number returned by Runtime.getRuntime().availableProcessors().

hive.tez.llap.min.reducer.per.executor in hive-site.xml

When auto parallelism is enabled, Hive on MR3 uses the configuration key hive.tez.llap.min.reducer.per.executor to decide the baseline for the number of reducers for each Reduce Vertex. For example, if the entire set of ContainerWorkers can run 100 reducers concurrently and hive.tez.llap.min.reducer.per.executor is set to 0.2, the query optimizer tries to assign at most 100 * 0.2 = 20 reducers to each Reduce Vertex. In this way, the configuration key hive.tez.llap.min.reducer.per.executor affects the running time of queries.

For typical workloads, the default value of 0.2 is acceptable, but depending on the characteristics of the cluster (e.g., number of ContainerWorkers, concurrency level, size of the dataset, resources for mappers and reducers, and so on), a different value may result in an improvement (or a degradation) in the performance.

tez.runtime.shuffle.connect.timeout in tez-site.xml

The configuration key tez.runtime.shuffle.connect.timeout specifies the maximum time (in milliseconds) for trying to connect to the shuffle service or the built-in shuffle handler before reporting fetch-failures. (See Fault Tolerance for a few examples.) With the default value of 12500, a TaskAttempt retries up to twice following the first attempt, each after waiting for 5 seconds.

If the connection fails too often because of the contention for disk access or network congestion, using a large value for tez.runtime.shuffle.connect.timeout may be a good decision because it leads to more retries, thus decreasing the chance of fetch-failures. (If the connection fails because of hardware problems, fetch-failures are eventually reported regardless of the value for tez.runtime.shuffle.connect.timeout.) On the other hand, using too large a value may delay reports of fetch-failures much longer than Task/Vertex reruns take, thus significantly increasing the execution time of the query. Hence the user should choose an appropriate value that triggers Task/Vertex reruns reasonably fast.

mr3.container.task.failure.num.sleeps in mr3-site.xml

The configuration key mr3.container.task.failure.num.sleeps specifies the number of times to sleep (60 seconds each) in a ContainerWorker thread after a TaskAttempt fails. Setting it to 1 or higher greatly helps Hive on MR3 to avoid successive query failures due to OutOfMemoryError, especially in a cluster with small memory for the workload. This is because after the failure of a TaskAttempt, a ContainerWorker tries to trigger garbage collection (by allocating an array of 1GB) and temporarily suspends the thread running the TaskAttempt, thus making subsequent TaskAttempts much less susceptible to OutOfMemoryError.

As an example, consider an experiment in which we submit query 14 to 18 of the TPC-DS benchmark 12 times on a dataset of 10TB. We execute a total of 6 * 12 = 72 queries because query 14 consists of two sub-queries. To each mapper and reducer, we allocate 4GB which is too small for the scale factor of the dataset.

  • If the configuration key mr3.container.task.failure.num.sleeps is set to zero, query 16 and query 18 fail many times, even with the mechanism of query re-execution of Hive 3. In the end, only 46 queries out of 72 queries succeed.
  • In contrast, if it is set to 2, query 16 and query 18 seldom fail. In the end, 71 queries succeed where only query 18 fails once.

Setting the configuration key mr3.container.task.failure.num.sleeps to 1 or higher has a drawback that executing a query may take longer if some of its TaskAttempts fail. Two common cases are 1) when the mechanism of query re-execution is triggered and 2) when the mechanism of fault tolerance is triggered.

  1. Query re-execution: while the second DAG is running after the failure of the first DAG, some threads in ContainerWorkers stay idle, thus delaying the completion of the query.
  2. Fault tolerance: if a running TaskAttempt is killed in order to avoid deadlock between Vertexes, its ContainerWorker resumes accepting new TaskAttempts only after its thread finishes sleeping.

In a cluster with small memory for the workload, the higher stability far outweighs the occasional penalty of slower execution, so we recommend setting mr3.container.task.failure.num.sleeps to 1 or higher. In a cluster with plenty of memory for the workload, setting mr3.container.task.failure.num.sleeps to 0 is usually okay. The default value is 0.

If mr3.container.task.failure.num.sleeps is set to 1 or higher with autoscaling enabled, the user may see DAGAppMaster creating more ContainerWorker Pods than necessary when TaskAttempts fail. This is because after a TaskAttempt fails, the ContainerWorker does not become free immediately and DAGAppMaster tries to reschedule another TaskAttempt by creating a new ContainerWorker Pod.