The user of Hive on MR3 can use user defined functions (UDFs) in the same way as in Hive on Tez. For example, the following commands are all supported inside Beeline connections:

  • create function
  • create temporary function
  • drop function
  • drop temporary function
  • add jar
  • delete jar

Temporary and permanent functions

In Hive on MR3, temporary functions belong to individual Beeline connections and are not shared. Hence two Beeline connections can create their own temporary functions of the same name. In the following example, users gitlab-runner and hive manage their own temporary function foo, and all commands work okay.

### user gitlab-runner
0: jdbc:hive2://indigo1:9852/> use tpcds_bin_partitioned_orc_10_ext;
0: jdbc:hive2://indigo1:9852/> add jar hdfs:///tmp/temp1.jar;
0: jdbc:hive2://indigo1:9852/> create temporary function foo as 'test.simple.SimpleClass1';
0: jdbc:hive2://indigo1:9852/> select foo(s_zip) from store limit 5;

### user hive
0: jdbc:hive2://indigo1:9852/> use tpcds_bin_partitioned_orc_10_ext;
0: jdbc:hive2://indigo1:9852/> describe function foo;
| Function 'foo' does not exist.  |
0: jdbc:hive2://indigo1:9852/> add jar hdfs:///tmp/temp1.jar;
0: jdbc:hive2://indigo1:9852/> create temporary function foo as 'test.simple.SimpleClass1';
0: jdbc:hive2://indigo1:9852/> select foo(s_zip) from store limit 5;

### user gitlab-runner
0: jdbc:hive2://indigo1:9852/> drop temporary function foo;

### user hive
0: jdbc:hive2://indigo1:9852/> select foo(cc_city) from call_center, store where foo(cc_city) = foo(s_city) limit 10;

In contrast, permanent functions are shared by all Beeline connections. The extra requirement is that each Beeline connection should add the same resource (such as a jar file) manually. In the following example, user hive registers a permanent function foo which is called by user pllab. We assume that the configuration key hive.users.in.admin.role is set to hive.

### user hive
0: jdbc:hive2://indigo1:9852/> use tpcds_bin_partitioned_orc_3000;
0: jdbc:hive2://indigo1:9852/> add jar hdfs:///tmp/temp2.jar;
0: jdbc:hive2://indigo1:9852/> create function foo as 'test.simple.SimpleClass2';
0: jdbc:hive2://indigo1:9852/> select foo(cc_city) from call_center, store where foo(cc_city) = foo(s_city) limit 10;

### user pllab
0: jdbc:hive2://indigo1:9852/> use tpcds_bin_partitioned_orc_3000;
0: jdbc:hive2://indigo1:9852/> select foo(cc_city) from call_center, store where foo(cc_city) = foo(s_city) limit 10;
  ### --> FAIL because function foo is unknown
0: jdbc:hive2://indigo1:9852/> create function foo as 'test.simple.SimpleClass2';  
  ### --> FAIL because the jar file is not added 
0: jdbc:hive2://indigo1:9852/> add jar hdfs:///tmp/temp2.jar;  
  ### --> now function foo is visible
0: jdbc:hive2://indigo1:9852/> describe function foo;
0: jdbc:hive2://indigo1:9852/> select foo(cc_city) from call_center, store where foo(cc_city) = foo(s_city) limit 10;

### user hive
0: jdbc:hive2://indigo1:9852/> drop function foo;

### user pllab
0: jdbc:hive2://indigo1:9852/> describe function foo;  ### FAIL because function foo is not visible

Since permanent functions are maintained by HiveServer2 for each database, redefining an existing permanent function in any Beeline connection immediately takes effect in all other Beeline connections.

Configurations for UDFs

If the user uses non-trivial UDFs that create their own threads or allocate a lot of memory, ContainerWorkers may fail to reclaim all resources assigned to UDFs, thus leading to resource leaks. Hence the following settings in mr3-site.xml are recommended:

  • set mr3.container.use.termination.checker to true
  • set mr3.container.terminate.on.fatal.error to true
  • optionally set mr3.am.task.retry.on.fatal.error to true

On the other hand, if no such UDFs are used, ContainerWorkers usually recover well from fatal errors such as OutOfMemoryError. Hence it is okay to set both configuration keys mr3.container.use.termination.checker and mr3.container.terminate.on.fatal.error to false in mr3-site.xml.

Localizing UDF resources

HiveServer2 stores all UDF resources in a new persistent directory which survive all Beeline connections. In contrast, DAGAppMaster and ContainerWorkers store all UDF resources in a new temporary directory created for each DAG, e.g, /opt/mr3-run/hive/dag_10664_0000_10_LR, which is mounted under the working directory and deleted after the DAG completes.

Using Python scripts

In order to use Python scripts for UDFs (with TRANSFORM) which are not supported in SQL standard based authorization (see https://issues.apache.org/jira/browse/HIVE-6415), the user should take the following steps:

  • set hive.security.authorization.enabled to false in hive-site.xml.
  • set hive.security.authorization.manager to either org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdConfOnlyAuthorizerFactory or org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory in hive-site.xml.
  • set mr3.container.localize.python.working.dir.unsafe to true in mr3-site.xml so that Python resources (*.py or *.PY) are localized in the working directory of ContainerWorkers.

mr3.container.localize.python.working.dir.unsafe should be set to true so that ContainerWorkers look for Python scripts in their working directory (/opt/mr3-run/hive/), not in the Java classpath. In the following example, ContainerWorkers assume that newline.py is available in the working directory.

### /opt/mr3-run/work-dir/ is a path inside HiveServer2 Pod
### where a PersistentVolume is mounted
add file /opt/mr3-run/work-dir/newline.py;

SELECT TRANSFORM(key, value) USING
'python newline.py' AS key, value FROM src limit 10;

If mr3.container.localize.python.working.dir.unsafe is set to false, however, the Python script newline.py is localized only in a temporary directory created for the DAG, which is an immediate subdirectory of the working directory. As a result, the command python newline.py is not executed properly.

In order to run Python scripts in ContainerWorker Pods, the user should build a custom Docker image in which the command python (or python3) is available in the default path.

When registering a Python file in HiveServer2 (by executing the command add file), the user should make sure that no Python file with the same name but from a different source has been registered before. In the following example, newline.py is registered twice from different sources:

add file /opt/mr3-run/work-dir/newline.py;
add file s3a://hivemr3/newline.py;

Then executing a query fails with the following error.

SELECT TRANSFORM(key, value) USING 'python3 newline.py' AS key, value FROM src limit 6;
...
Error: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.tez.TezTask. java.io.IOException: Previous writer likely failed to write file:/opt/mr3-run/work-dir/hive/hive/_mr3_session_dir/b6acebbe/newline.py. Failing because I am unlikely to write too.

In such a case, the user should manually delete one of the file instances:

delete file /opt/mr3-run/work-dir/newline.py;

Using TRANSFORM with Python scripts is not a safe operation for two reasons.

  • Python scripts are shared by all DAGs because the working directory of ContainerWorkers is shared.
  • Once localized, Python scripts are never deleted from the working directory. This is necessary because we cannot determine when to delete Python scripts.