Apache Celeborn is a remote shuffle service which enables MR3 to store intermediate data generated by mappers on Celeborn worker nodes, rather than on local disks. Reducers directly request Celeborn workers for intermediate data, thus eliminating the need for the MR3 Shuffle Handler (or the Hadoop Shuffle Handler on Hadoop).

celeborn-map-reduce

Since intermediate data is stored on Celeborn worker nodes instead of local disks, one of the key benefits of using Celeborn is that MR3 now needs only half as much space on local disks. Furthermore the use of Celeborn enables MR3 to directly process the output of mappers on the reducer side (skipping steps 3 and 4 in the above diagram), thus eliminating over 95% of writes to local disks when executing typical queries.

The lower requirement on local disk storage is particularly important when running MR3 on public clouds where 1) the total capacity of local disks attached to each node is limited and 2) attaching local disks is expensive.

Celeborn officially supports Spark and Flink. We have extended the runtime system of MR3 to use Celeborn as remote shuffle service.

Configuring MR3 with Celeborn

The user can configure MR3 to use Celeborn by setting the following configuration keys in tez-site.xml. Internally a configuration key of the form tez.celeborn.XXX.YYY is converted to celeborn.XXX.YYY and passed to the Celeborn client.

  • tez.celeborn.enabled should be set to true.
  • tez.celeborn.master.endpoints should be set to Celeborn master endpoints.
  • tez.celeborn.client.shuffle.compression.codec should be set to none so that intermediate data is not compressed by Celeborn. Setting it to true is okay, but unnecessary because we recommend the use of Tez for compressing intermediate data (with tez.runtime.compress set to true).

In addition, the user should set the following configuration keys in mr3-site.xml.

  • mr3.dag.vertex.schedule.by.stage should be set to true. Then a Vertex creates Tasks only after all source Vertexes are completed, in compliance with the execution model assumed by Celeborn. (If it is set to false for some reason, the configuration key mr3.taskattempt.queue.scheme should be set to spark.)

  • mr3.dag.route.event.after.source.vertex should be set to true. Then a Vertex receives events only after all source Vertexes are completed. As a small optimization, events from unsuccessful attempts are automatically blocked.

Below we explain three configuration keys in tez-site.xml for adjusting the behavior of fetchers running inside reducers (which request Celeborn workers for intermediate data). To illustrate the usage of these configuration keys, we assume the following scenario. We remark that a similar case actually occurs when running Hive on MR3 on the TPC-DS benchmark with a scale factor of 10TB.

  • A source Vertex creates 1000 mappers, each of which generates 70MB of intermediate data for a certain partition.
  • Every mapper succeeds at its first attempt.
  • Thus the total amount of intermediate data generated by mappers for the given partition is 1000 * 7MB = 70GB.

tez.runtime.celeborn.fetch.split.threshold

This configuration key specifies the maximum size of data that a fetcher can receive from Celeborn workers. Celeborn allows a single fetcher to receive the entire intermediate data destined to a reducer. In the above scenario, however, such a fetcher can easily become a performance bottleneck because a single spill file of 70GB is created by sequentially fetching from Celeborn workers. By adjusting tez.runtime.celeborn.fetch.split.threshold, the user can create multiple fetchers which can concurrently fetch from Celeborn workers.

The default value is 1024 * 1024 * 1024 (1GB). In the above scenario, the default value results in at least 70 fetchers.

tez.runtime.celeborn.unordered.fetch.spill.enabled

By default, this configuration key is set to true, which means that reducers first write the output of mappers on local disks before processing. If it is set to false, reducers directly process the output of mappers fetched via unordered edges without writing to local disks. Since typical queries generate a lot more unordered edges than ordered edges (e.g., due to the use of order by), the user can eliminate most of writes to local disks. In the case of the TPC-DS benchmark, over 95% of writes to local disks are eliminated.

Note that even when this configuration key is set to false, MR3 still needs local disks for ordered edges because reducers store the output of mappers on local disks.

If this configuration key is set to false, the previous configuration key tez.runtime.celeborn.fetch.split.threshold becomes irrelevant to unordered edges.

Operating MR3 with Celeborn

Here are a few comments on operating MR3 with Celeborn.

  • Despite the use of pipelined shuffling (for both ordered and unordered edges), speculative execution works okay with Celeborn (because we set mr3.dag.route.event.after.source.vertex to true) and can be effective in eliminating fetch delays.
  • When using Celeborn, it is safe to set hive.mr3.delete.vertex.local.directory to true in hive-site.xml (or mr3.am.notify.destination.vertex.complete to true in mr3-site.xml) because intermediate data produced by mappers are never written to local disks.
  • When running Hive on MR3, if OutOfMemoryError occurs mainly because of the use of too many fetchers, including OutOfMemoryError in the value for mr3.am.task.no.retry.errors is not recommended because re-executing a query is likely to end up with the same OutOfMemoryError. In such a case, try to use a smaller value for tez.runtime.shuffle.total.parallel.copies.
  • Auto parallelism works okay with Celeborn, but the optimization of shuffling for auto parallelism is irrelevant to Celeborn.

Fault tolerance

From Celeborn 0.5.1, MR3 supports fault tolerance using Task/Vertex reruns. Thus it is safe to disable data replication when configuring Celeborn. (Before Celeborn 0.5.1, MR3 requires data replication by Celeborn in order to support fault tolerance.)

In order to enable fault tolerance using Task/Vertex reruns, set the configuration key tez.runtime.celeborn.client.fetch.throwsFetchFailure to true in tez-site.xml so that MR3 throws Exceptions and thus triggers Task/Vertex reruns whenever fetch failures occur. If tez.runtime.celeborn.client.fetch.throwsFetchFailure is set to false, Task/Vertex reruns are never triggered and MR3 relies on data replication by Celeborn in order to support fault tolerance.

When MR3 cannot use Celeborn

MR3 can use Celeborn only when DAGAppMaster and ContainerWorkers are all created as individual Java processes. Specifically MR3 cannot use Celeborn under the following conditions:

  • DAGAppMaster runs in LocalThread mode.
  • ContainerWorker runs in Local mode.

Even when using Celeborn, MR3 may have to use its own Shuffle Handler or the Hadoop Shuffle Handler if Celeborn refuses to accept shuffle requests (e.g., when MR3 exceeds the quota allocated to it after consuming too much disk space in Celeborn workers). In such a case, MR3 resorts to either the MR3 Shuffle Handler or the Hadoop Shuffle Handler, depending on the value of the configuration key tez.am.shuffle.auxiliary-service.id in tez-site.xml. Assuming that such cases rarely occur, we recommend the following settings:

  • When running Hive on MR3 on Hadoop, set tez.am.shuffle.auxiliary-service.id to mapreduce_shuffle in tez-site.xml and hive.mr3.use.daemon.shufflehandler to 0 in hive-site.xml in order not to create unnecessary shuffle handlers inside ContainerWorkers.
  • When running Hive on MR3 on Kubernetes or in standalone mode, set tez.am.shuffle.auxiliary-service.id to tez_shuffle in tez-site.xml and hive.mr3.use.daemon.shufflehandler to 1 in hive-site.xml so as to minimize memory consumption by shuffle handlers.