MasterControl is a utility that allows the user to connect to the DAGAppMaster and manage DAGs.

  • On Yarn, MasterControl should be executed by the owner of the DAGAppMaster. MasterControl executed by a wrong user generates java.lang.IllegalArgumentException. The script to execute is mr3/master-control.sh.
  • On Kubernetes, MasterControl should be executed after setting the environment variable CLIENT_TO_AM_TOKEN_KEY to the value printed by the script kubernetes/run-hive.sh in the MR3 release. The script to execute is kubernetes/hive/hive/master-control.sh.

MasterControl works only if DAGAppMaster runs in Yarn or Kubernetes mode, and not in LocalThread or LocalProcess mode.

Using MasterControl on Yarn

MasterControl takes a command with a Yarn ApplicationID.

$ mr3/master-control.sh 
...
Usage:
  getDags <AppID>                          : Get all running/finished DAGs
  getRunningDags <AppID>                   : Get all running DAGs
  killDag <AppId> <DAGID>                  : Send a request to kill a DAG
  ackDagFinished <AppID> <DAGID>           : Acknowledge the completion of a DAG
  stopContainerWorkers <AppID>             : Gracefully stop all running ContainerWorkers
  closeDagAppMaster <AppID>                : Gracefully stop DAGAppMaster

The command stopContainerWorkers waits until all current DAGs complete, and then terminates all running ContainerWorkers. The command closeDagAppMaster waits until all current DAGs complete, and then terminates the current DAGAppMaster. For other commands, we show examples below.

We list all running DAGs in the Yarn application application_1563779533969_0084. MasterControl prints IDs and names of all running DAGs.

$ mr3/master-control.sh getDags application_1563779533969_0084
...
19/08/12 21:47:54 INFO client.AMYarnClient$: Created AMYarnClient AMYarnClient
19/08/12 21:47:55 INFO client.DAGClientRPC: CreatedDAGClientRPC for application_1563779533969_0084
19/08/12 21:47:55 INFO client.DAGClientRPC: DAGClientRPC running under user gitlab-runner@RED
19/08/12 21:47:55 INFO client.DAGClientRPC: Creating Proxy: red6 36404
19/08/12 21:47:55 INFO client.DAGClientRPC: Set clientToAmToken: 20 bytes
Lists of running/finished DAGs in application_1563779533969_0084:
dag_1563779533969_0084_10 hive_20201022094259_ab5b59b8-ec51-417c-98cb-d609595c8564:10
dag_1563779533969_0084_9 hive_20201022094200_b2936f6a-accf-46f0-bf7e-e6d757ae2994:9

We kill the running DAG dag_1563779533969_0084_9.

$ mr3/master-control.sh killDag application_1563779533969_0084 dag_1563779533969_0084_9
...
Sent a request to kill DAG dag_1563779533969_0084_9.

After a while, the DAG dag_1563779533969_0084_9 is killed and no longer appears in the list of running DAGs.

$ mr3/master-control.sh getDags application_1563779533969_0084
...
Lists of running/finished DAGs in application_1563779533969_0084:
dag_1563779533969_0084_10 hive_20201022094259_ab5b59b8-ec51-417c-98cb-d609595c8564:10

Using MasterControl on Kubernetes

When running Hive on MR3, we recommend the user to execute MasterControl inside the HiveServer2 Pod (where env.sh is already mounted and the environment variable CLIENT_TO_AM_TOKEN_KEY is already set). The first step is to obtain the ApplicationID from the log file for HiveServer2 (outside Kubernetes). In the following example, we get the name of the HiveServer2 Pod, use it as an argument to kubectl logs, and obtain the ApplicationID application_2407_0000.

$ kubectl get pods -n hivemr3 | grep hivemr3
hivemr3-hiveserver2-gdz9l   1/1     Running   0          2m8s
hivemr3-metastore-0         1/1     Running   0          2m18s

$ kubectl logs -n hivemr3 hivemr3-hiveserver2-gdz9l | grep ApplicationID
2019-11-12T07:46:24,053  INFO [main] client.MR3Client$: Starting DAGAppMaster with ApplicationID application_2407_0000 in session mode

Next check if the environment variable CLIENT_TO_AM_TOKEN_KEY is already set inside the HiveServer2 Pod.

$ kubectl exec -it -n hivemr3 hivemr3-hiveserver2-gdz9l /bin/bash
bash-4.2$ printenv | grep CLIENT_TO_AM_TOKEN_KEY
CLIENT_TO_AM_TOKEN_KEY=928e2d30-5468-4396-ba84-6ccb5e16f2cd

Now the user can execute master-control.sh using the ApplicationID.

bash-4.2$ ./master-control.sh getDags 
...
Usage:
  getDags <AppID>                          : Get all running/finished DAGs
  getRunningDags <AppID>                   : Get all running DAGs
  killDag <AppId> <DAGID>                  : Send a request to kill a DAG
  ackDagFinished <AppID> <DAGID>           : Acknowledge the completion of a DAG
  stopContainerWorkers <AppID>             : Gracefully stop all running ContainerWorkers
  closeDagAppMaster <AppID>                : Gracefully stop DAGAppMaster
  updateResourceLimit <AppID> <Max memory in GB> <Max CPU cores>   : Update the resource limit
  updateAutoScaling <AppID> <autoScaleOutThresholdPercent> <autoScaleInThresholdPercent> <autoScaleInMinHosts> <autoScaleOutNumInitialContainers>  : Update autoscaling parameters
bash-4.2$ ./master-control.sh getDags application_2407_0000
...
Lists of running/finished DAGs in application_2407_0000:
dag_2407_0000_2 hive_20201022094259_ab5b59b8-ec51-417c-98cb-d609595c8564:2

bash-4.2$ ./master-control.sh killDag application_2407_0000 dag_2407_0000_2
Sent a request to kill DAG dag_2407_0000_2.

bash-4.2$ ./master-control.sh getDags application_2407_0000
No running/finished DAGs in application_2407_0000

On Kubernetes, the user can use the command updateResourceLimit to update (either increase or decrease) the limit on the total resources for all ContainerWorker Pods. This command overrides the settings for the configuration keys mr3.k8s.worker.total.max.memory.gb and mr3.k8s.worker.total.max.cpu.cores in mr3-site.xml. If current ContainerWorker Pods consume more resources than the new limit, MR3 returns excess resources by stopping young ContainerWorker Pods. In order not to disrupt the execution of active DAGs, MR3 gracefully stops these ContainerWorker Pods which continue to run normally until all active DAGs completed.

bash-4.2$ ./master-control.sh updateResourceLimit application_21127_0000 128 32
...
Sent a request to update the resource limit for application_21127_0000: 128 32

On Kubernetes, the user can use the command updateAutoScaling to update the configuration for autoscaling explained in Autoscaling.

  • autoScaleOutThresholdPercent specifies ScaleOutThreshold = mr3.auto.scale.out.threshold.percent.
  • autoScaleInThresholdPercent specifies ScaleInThreshold = mr3.auto.scale.in.threshold.percent.
  • autoScaleInMinHosts specifies AutoScaleInMinHosts = mr3.auto.scale.in.min.hosts.
  • autoScaleOutNumInitialContainers specifies mr3.auto.scale.out.num.initial.containers.

If the environment variable CLIENT_TO_AM_TOKEN_KEY is not set properly, master-control.sh fails with an error message DIGEST-MD5: digest response format violation. Mismatched response.

bash-4.2$ ./master-control.sh getDags application_21127_0000
...
Exception in thread "main" com.datamonad.mr3.api.common.MR3Exception: DAGClientRPC failure
  at com.datamonad.mr3.client.DAGClientRPC.rpc(DAGClientRPC.scala:231)
  at com.datamonad.mr3.client.DAGClientRPC.getAllDags(DAGClientRPC.scala:151)
  at com.datamonad.mr3.client.util.MasterControl.getDags(MasterControl.scala:30)
  at com.datamonad.mr3.client.util.MasterControlK8s$.main(MasterControlK8s.scala:22)
  at com.datamonad.mr3.client.util.MasterControlK8s.main(MasterControlK8s.scala)
Caused by: org.apache.hadoop.ipc.RemoteException(javax.security.sasl.SaslException): DIGEST-MD5: digest response format violation. Mismatched response.
  at org.apache.hadoop.ipc.Client.call(Client.java:1476)
  at org.apache.hadoop.ipc.Client.call(Client.java:1413)
  at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
  at com.sun.proxy.$Proxy10.getAllDags(Unknown Source)
  at com.datamonad.mr3.client.DAGClientRPC$$anonfun$getAllDags$1.apply(DAGClientRPC.scala:153)
  at com.datamonad.mr3.client.DAGClientRPC$$anonfun$getAllDags$1.apply(DAGClientRPC.scala:151)
  at com.datamonad.mr3.client.DAGClientRPC.rpc(DAGClientRPC.scala:223)

A (minor) known issue

When running Hive on MR3, a Beeline connection fails to execute queries with the following error if the user has executed the command closeDagAppMaster and the current DAGAppMaster is gracefully stopping itself.

Caused by: org.apache.hadoop.ipc.RemoteException(com.datamonad.mr3.api.common.MR3Exception): DAGAppMaster.gracefulShutdown() already called and cannot take a new DAG

This is normal behavior because DAGAppMaster refuses to take new DAGs if it is gracefully stopping itself. The same Beeline connection, however, cannot execute any more queries even after another DAGAppMaster is created. Trying to execute a query while a new DAGAppMaster is running fails with the following error:

Error: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.tez.TezTask. java.lang.NullPointerException
  at org.apache.hadoop.fs.Path.<init>(Path.java:104)
  at org.apache.hadoop.fs.Path.<init>(Path.java:93)
  at org.apache.hadoop.hive.ql.exec.mr3.DAGUtils.createMr3ScratchDir(DAGUtils.java:1419)

In such a case, just restarting Beeline solves the problem.