MR3 is constantly evolving with new updates and features. Below is a partial list of planned updates and features of MR3. Developing an update or feature usually takes a few weeks to a few months, but it can take much longer (a year or two) if it involves research as well.
Task scheduling to reduce the turnaround time
TaskScheduler of MR3 exploits Task priorities and Vertex dependencies in order to maximize the throughput and minimize the turnaround time. The current implementation is the outcome of a research project, but we continue to investigate new ways of scheduling Tasks in order to reduce the turnaround time for DAGs in concurrent environments. While Task scheduling has always been an active research area, our problem differs from previous research in a significant way in that we assume 1) containers shared by concurrent DAGs, as opposed to containers dedicated to individual DAGs, and 2) containers capable of executing multiple Tasks concurrently, as opposed to containers executing one Task at a time.
Dynamic work rebalancing
Dynamic work rebalancing is probably the best solution to the problem of stragglers that has been (and will be) studied in industry and academia for a long time. It is already implemented in Google Cloud Dataflow for executing Apache Beam pipelines. Now that MR3 is a mature system supporting auto scaling, we are ready to investigate the implementation of dynamic work rebalancing in the context of Hive on MR3. The challenge is two-fold:
- extend the core of MR3 so that a Vertex can dynamically increase the number of its Tasks;
- extend Hive so that a mapper can dynamically split itself into several small mappers.
MR3 already addresses the first part of the challenge as it allows a Vertex to increase the number of its Tasks under a specific condition. Hence most of the effort will be focused on the second part.
Like Hadoop MapReduce, MR3 uses the pull model for moving data from producers to consumers, in which a consumer requests, or pulls, its partition after a certain number of producers write their output to local disk. An alternative strategy is to use the push model in which a producer immediately passes, or pushes, its output to consumers. The push model is known to be much faster than the pull model because of its extensive use of memory for storing intermediate data, but makes it hard to implement fault tolerance reliably. (Ironically Presto is based on the push model and does not support fault tolerance for a good reason, but runs much slower than Hive on MR3 which uses the pull model!)
Despite its inherent performance problem, the pull model has been considered as the only choice for an execution engine when fault tolerance is a must-have, as summarized in the following series of papers on Hadoop MapReduce:
- Pavlo et al. (A Comparison of Approaches to Large-scale Data Analysis, SIGMOD 2009) point out that Hadoop MapReduce has a serious performance problem related to the pull model.
- Dean and Ghemawat (MapReduce: A Flexible Data Processing Tool, Communications of the ACM, 2010) acknowledge that the pull model has a performance problem, but argue that the push model is inappropriate because of the requirement of fault tolerance.
- Stonebraker et al. (MapReduce and Parallel DBMSs: Friends or Foes?, Communications of the ACM, 2010) predict that the pull model is fundamental to fault tolerance and thus unlikely to be changed in Hadoop MapReduce.
Online pull is a new way of shuffling data from mappers to reducers which extends the pull model by partially implementing the push model when plenty of memory is available. Thus it aims to achieve the best of two opposite worlds: fault tolerance of the pull model and fast speed of the push model. Our preliminary result (in the case of unordered shuffling) shows that online pull can significantly improve the performance of Hive on MR3 without compromising its support for fault tolerance.
The idea of online pull builds on our two-year experience of developing a prototype execution engine MR2 (a predecessor of MR3) and multi-year experience of developing MR3. The prototype execution engine MR2 implements the push model in its purest form. The following graph shows the result of running TeraSort 1) on Hadoop MapReduce and 2) on MR2 in a cluster of 90 nodes when varying the size of input data up to 320GB per node (with GB of input data per node in the x-axis and execution time per GB in the y-axis). In the course of developing MR3, we have also experimented with a new runtime called ASM which is based on the push model. Our ultimate goal is to bring the performance of MR3 to the next level with the speed and stability of the push model as evidenced by MR2.
The current implementation of MR3 uses a single queue to run all DAGs.
(The configuration key
mr3.queue.name specifies the queue in Yarn for running an MR3 application and thus serves a different purpose.)
While the use of a single queue suffices for typical workloads
thanks to Task scheduling in MR3,
the user may want to further control individual DAGs with multiple queues
interactive queue for interactive jobs and
etl queue for ETL jobs).
Eventually we plan to implement a new mechanism of queue management in MR3
so that the user of Hive on MR3 can use an interface compatible with Hive Workload Manager.
Support for Microsoft Azure and Google Cloud
We have already tested Hive on MR3 thoroughly on Amazon AWS (EMR and EKS). We plan to extend Hive on MR3 as necessary so that it runs seamlessly on Microsoft Azure and Google Cloud as well.
Backend for Amazon Fargate
Currently MR3 implements backends for Yarn and Kubernetes resource schedulers. Another resource scheduler under consideration is Amazon Fargate. Since its unit of resource allocation is containers, Amazon Fargate can make MR3 much less likely to suffer from over-provisioning of cluster resources than Yarn and Kubernetes. In conjunction with the support for autoscaling in MR3, the backend for Amazon Fargate may enable MR3 to finish the execution of DAGs faster (just by creating more containers as needed) while reducing the overall cost (just by deleting idle containers as soon as possible). The limiting factors, however, are the 10GB limit on disk volumes and no support for EFS PersistentVolume.
MapReduce on MR3 and Pig on MR3
MR3 is not designed specifically for Apache Hive. In fact, MR3 started its life as a general purpose execution engine, and it was only later that we decided to develop Hive on MR3 as a means of testing MR3 more thoroughly. As such, MR3 can easily execute MapReduce jobs which are converted to simple DAGs with two Vertexes. MR3 can also execute Pig jobs which can be converted to DAGs utilizing the Tez runtime. Thus, as new major applications of MR3, we plan to implement clients for MapReduce and Pig (MapReduce on MR3 and Pig on MR3). Then all the strengths of MR3, such as concurrent DAGs, cross-DAG container reuse, fault tolerance, and autoscaling, are automatically available when running MapReduce and Pig jobs, whether on Hadoop or on Kubernetes. Since multiple clients can connect to a common DAGAppMaster, we can even share ContainerWorkers among Hive, MapReduce, and Pig jobs without having to maintain three separate clusters!
Support for Java 11 and higher
Java is evolving fast, but MR3 still requires Java 8. On Hadoop, it is not easy to migrate to a higher version such as Java 11 because of potential dependency problems. On Kubernetes, however, it suffices to check dependency problems in a local cluster. Thus we plan to upgrade MR3 to use Java 11 or higher, and experiment with it on Kubernetes. In particular, new garbage collectors available in recent versions of Java (such as Shenandoah and ZGC) may be able to instantly boost the performance of MR3 for free.