The user of Hive on MR3 can use user defined functions (UDFs) in the same way as in Hive on Tez. For example, the following commands are all supported inside Beeline connections:
create function
create temporary function
drop function
drop temporary function
add jar
delete jar
Temporary and permanent functions
In Hive on MR3, temporary functions belong to individual Beeline connections and are not shared.
Hence two Beeline connections can create their own temporary functions of the same name.
In the following example, users gitlab-runner
and hive
manage their own temporary function foo
, and all commands work okay.
### user gitlab-runner
0: jdbc:hive2://indigo1:9852/> use tpcds_bin_partitioned_orc_10_ext;
0: jdbc:hive2://indigo1:9852/> add jar hdfs:///tmp/temp1.jar;
0: jdbc:hive2://indigo1:9852/> create temporary function foo as 'test.simple.SimpleClass1';
0: jdbc:hive2://indigo1:9852/> select foo(s_zip) from store limit 5;
### user hive
0: jdbc:hive2://indigo1:9852/> use tpcds_bin_partitioned_orc_10_ext;
0: jdbc:hive2://indigo1:9852/> describe function foo;
| Function 'foo' does not exist. |
0: jdbc:hive2://indigo1:9852/> add jar hdfs:///tmp/temp1.jar;
0: jdbc:hive2://indigo1:9852/> create temporary function foo as 'test.simple.SimpleClass1';
0: jdbc:hive2://indigo1:9852/> select foo(s_zip) from store limit 5;
### user gitlab-runner
0: jdbc:hive2://indigo1:9852/> drop temporary function foo;
### user hive
0: jdbc:hive2://indigo1:9852/> select foo(cc_city) from call_center, store where foo(cc_city) = foo(s_city) limit 10;
In contrast, permanent functions are shared by all Beeline connections.
The extra requirement is that each Beeline connection should add the same resource (such as a jar file) manually.
In the following example, user hive
registers a permanent function foo
which is called by user pllab
.
We assume that the configuration key hive.users.in.admin.role
is set to hive
.
### user hive
0: jdbc:hive2://indigo1:9852/> use tpcds_bin_partitioned_orc_3000;
0: jdbc:hive2://indigo1:9852/> add jar hdfs:///tmp/temp2.jar;
0: jdbc:hive2://indigo1:9852/> create function foo as 'test.simple.SimpleClass2';
0: jdbc:hive2://indigo1:9852/> select foo(cc_city) from call_center, store where foo(cc_city) = foo(s_city) limit 10;
### user pllab
0: jdbc:hive2://indigo1:9852/> use tpcds_bin_partitioned_orc_3000;
0: jdbc:hive2://indigo1:9852/> select foo(cc_city) from call_center, store where foo(cc_city) = foo(s_city) limit 10;
### --> FAIL because function foo is unknown
0: jdbc:hive2://indigo1:9852/> create function foo as 'test.simple.SimpleClass2';
### --> FAIL because the jar file is not added
0: jdbc:hive2://indigo1:9852/> add jar hdfs:///tmp/temp2.jar;
### --> now function foo is visible
0: jdbc:hive2://indigo1:9852/> describe function foo;
0: jdbc:hive2://indigo1:9852/> select foo(cc_city) from call_center, store where foo(cc_city) = foo(s_city) limit 10;
### user hive
0: jdbc:hive2://indigo1:9852/> drop function foo;
### user pllab
0: jdbc:hive2://indigo1:9852/> describe function foo; ### FAIL because function foo is not visible
Since permanent functions are maintained by HiveServer2 for each database, redefining an existing permanent function in any Beeline connection immediately takes effect in all other Beeline connections.
Configurations for UDFs
If the user uses non-trivial UDFs that create their own threads or allocate a lot of memory,
ContainerWorkers may fail to reclaim all resources assigned to UDFs, thus leading to resource leaks.
Hence the following settings in mr3-site.xml
are recommended:
- set
mr3.container.use.termination.checker
to true - set
mr3.container.terminate.on.fatal.error
to true - optionally set
mr3.am.task.retry.on.fatal.error
to true
On the other hand,
if no such UDFs are used, ContainerWorkers usually recover well from fatal errors such as OutOfMemoryError.
Hence it is okay to set both configuration keys
mr3.container.use.termination.checker
and
mr3.container.terminate.on.fatal.error
to false in mr3-site.xml
.
Localizing UDF resources
HiveServer2 stores all UDF resources in a new persistent directory which survive all Beeline connections.
In contrast,
DAGAppMaster and ContainerWorkers
store all UDF resources in a new temporary directory created for each DAG,
e.g, /opt/mr3-run/hive/dag_10664_0000_10_LR
,
which is mounted under the working directory and deleted after the DAG completes.
Using Python scripts
In order to use Python scripts for UDFs (with TRANSFORM
)
which are not supported in SQL standard based authorization
(see
https://issues.apache.org/jira/browse/HIVE-6415),
the user should take the following steps:
- set
hive.security.authorization.enabled
to false inhive-site.xml
. - set
hive.security.authorization.manager
to eitherorg.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdConfOnlyAuthorizerFactory
ororg.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory
inhive-site.xml
. - set
mr3.container.localize.python.working.dir.unsafe
to true inmr3-site.xml
so that Python resources (*.py
or*.PY
) are localized in the working directory of ContainerWorkers.
mr3.container.localize.python.working.dir.unsafe
should be set to true
so that ContainerWorkers look for Python scripts in their working directory (/opt/mr3-run/hive/
),
not in the Java classpath.
In the following example, ContainerWorkers assume that newline.py
is available in the working directory.
### /opt/mr3-run/work-dir/ is a path inside HiveServer2 Pod
### where a PersistentVolume is mounted
add file /opt/mr3-run/work-dir/newline.py;
SELECT TRANSFORM(key, value) USING
'python newline.py' AS key, value FROM src limit 10;
If mr3.container.localize.python.working.dir.unsafe
is set to false, however,
the Python script newline.py
is localized only in a temporary directory created for the DAG,
which is an immediate subdirectory of the working directory.
As a result, the command python newline.py
is not executed properly.
In order to run Python scripts in ContainerWorker Pods,
the user should build a custom Docker image in which
the command python
(or python3
) is available in the default path.
When registering a Python file in HiveServer2 (by executing the command add file
),
the user should make sure that
no Python file with the same name but from a different source has been registered before.
In the following example,
newline.py
is registered twice from different sources:
add file /opt/mr3-run/work-dir/newline.py;
add file s3a://hivemr3/newline.py;
Then executing a query fails with the following error.
SELECT TRANSFORM(key, value) USING 'python3 newline.py' AS key, value FROM src limit 6;
...
Error: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.tez.TezTask. java.io.IOException: Previous writer likely failed to write file:/opt/mr3-run/work-dir/hive/hive/_mr3_session_dir/b6acebbe/newline.py. Failing because I am unlikely to write too.
In such a case, the user should manually delete one of the file instances:
delete file /opt/mr3-run/work-dir/newline.py;
Using TRANSFORM
with Python scripts is not a safe operation for two reasons.
- Python scripts are shared by all DAGs because the working directory of ContainerWorkers is shared.
- Once localized, Python scripts are never deleted from the working directory. This is necessary because we cannot determine when to delete Python scripts.