With HiveWarehouseConnector (originally published by Hortonworks), Spark can read/write DataFrames to/from Hive on MR3 through JDBC queries. Unlike Hive-LLAP, however, Hive on MR3 cannot pass DataFrames directly to Spark because ContainerWorkers of MR3 are designed only to execute Tasks from DAGAppMaster. Nevertheless HiveWarehouseConnector should be useful in common use cases in which Spark extracts/transforms/loads data while Hive on MR3 analyzes data with SQL queries.
Below we illustrate how to use HiveWarehouseConnector with Hive on MR3. We assume that HiveServer2 of Hive 3.1.2 on MR3 is running, and that Spark 2.3.1 is configured to access the same HDFS that Hive on MR3 uses.
Compiling HiveWarehouseConnector
HiveWarehouseConnector for Hive on MR3 is available at GitHub.
In order to compile HiveWarehouseConnector, the user needs Scala and SBT.
First check the version of each component in build.sbt
:
sparkVersion := sys.props.getOrElse("spark.version", "2.3.1")
val hadoopVersion = sys.props.getOrElse("hadoop.version", "3.1.0")
val hiveVersion = sys.props.getOrElse("hive.version", "3.1.2")
val log4j2Version = sys.props.getOrElse("log4j2.version", "2.4.1")
val tezVersion = sys.props.getOrElse("tez.version", "0.9.1")
val thriftVersion = sys.props.getOrElse("thrift.version", "0.9.3")
hadoopVersion
must be 3.0.0 or higher because org.apache.hadoop.fs.Path
is serializable only in Hadoop 3.
Then compile HiveWarehouseConnector by executing SBT:
$ sbt assembly
The HiveWarehouseConnector jar file can be found at /target/scala-2.11
(if Scala 2.11 is used).
Runnig spark-shell
Run spark-shell
with the HiveWarehouseConnector jar file, as in the following example:
$ spark-shell --jars ~/spark-hivemr3/target/scala-2.11/hive-warehouse-connector-assembly-1.0.0-SNAPSHOT.jar --conf spark.sql.hive.hiveserver2.jdbc.url="jdbc:hive2://10.1.91.41:9852/;principal=hive/red0@RED"
Here 10.1.91.41:9852
is the address of HiveServer2, and hive/red0@RED
is its service principal.
Then the user can create HiveWarehouseSession:
scala> val hive = com.hortonworks.spark.sql.hive.llap.HiveWarehouseBuilder.session(spark).build()
hive: com.hortonworks.spark.sql.hive.llap.HiveWarehouseSessionImpl = com.hortonworks.spark.sql.hive.llap.HiveWarehouseSessionImpl@129e45eb
Executing Metastore operations
The user can execute various Metastore operations in HiveWarehouseSession.
Here are examples of executing Metastore operations.
Note that in order to create a new database, we invoke hive.executeUpdate()
with a string representing a SQL query.
scala> hive.showDatabases.show(false)
+------------------------------+
|database_name |
+------------------------------+
|default |
|spark_mr3 |
|tpcds_bin_partitioned_orc_1000|
+------------------------------+
scala> hive.executeUpdate("CREATE DATABASE create_from_spark LOCATION 'hdfs://red0:8020/tmp/hivemr3/warehouse'")
res4: Boolean = true
scala> hive.showDatabases.show(false)
+------------------------------+
|database_name |
+------------------------------+
|create_from_spark |
|default |
|spark_mr3 |
|tpcds_bin_partitioned_orc_1000|
+------------------------------+
scala> hive.dropDatabase("create_from_spark", false, false)
scala> hive.showDatabases.show(false)
+------------------------------+
|database_name |
+------------------------------+
|default |
|spark_mr3 |
|tpcds_bin_partitioned_orc_1000|
+------------------------------+
scala> hive.setDatabase("spark_mr3")
scala> hive.showTables.show
+-----------+
| tab_name|
+-----------+
|2016_survey|
+-----------+
scala> hive.createTable("new_table").column("my_num", "int").column("my_color", "string").create()
scala> hive.showTables.show
+-----------+
| tab_name|
+-----------+
|2016_survey|
| new_table|
+-----------+
scala> hive.describeTable("new_table").show
+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
| my_num| int| |
|my_color| string| |
+--------+---------+-------+
scala> hive.dropTable("new_table", false, false)
Reading from Hive on MR3
In order to read DataFrames from Hive on MR3 directly via JDBC, we invoke hive.execute()
with a string representing a SQL query.
hive.execute()
stores the result in an org.apache.spark.sql.Dataset
object.
scala> hive.setDatabase("default")
scala> val result = hive.execute("select * from pokemon where hp > 120 and type1 = 'Water'")
result: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [number: int, name: string ... 9 more fields]
scala> result.show
+------+---------+-----+--------+-----+---+------+-------+------+------+-----+
|number| name|type1| type2|total| hp|attack|defense|sp_atk|sp_def|speed|
+------+---------+-----+--------+-----+---+------+-------+------+------+-----+
| 131| Lapras|Water| Ice| 535|130| 85| 80| 85| 95| 60|
| 134| Vaporeon|Water| | 525|130| 65| 60| 110| 95| 65|
| 171| Lanturn|Water|Electric| 460|125| 58| 58| 76| 76| 67|
| 320| Wailmer|Water| | 400|130| 70| 35| 70| 35| 60|
| 321| Wailord|Water| | 500|170| 90| 45| 90| 45| 60|
| 594|Alomomola|Water| | 470|165| 75| 80| 40| 45| 65|
| 131| Lapras|Water| Ice| 535|130| 85| 80| 85| 95| 60|
| 134| Vaporeon|Water| | 525|130| 65| 60| 110| 95| 65|
| 171| Lanturn|Water|Electric| 460|125| 58| 58| 76| 76| 67|
| 320| Wailmer|Water| | 400|130| 70| 35| 70| 35| 60|
| 321| Wailord|Water| | 500|170| 90| 45| 90| 45| 60|
| 594|Alomomola|Water| | 470|165| 75| 80| 40| 45| 65|
+------+---------+-----+--------+-----+---+------+-------+------+------+-----+
Alternatively the user can read DataFrames from Hive on MR3 through intermediate data.
We invoke hive.readTable()
with two strings: 1) the name of the table to read, and 2) the format in which intermedate data is written, such as "orc"
and "parquet"
.
Then hive.readTable()
returns a pair consisting of the result in an org.apache.spark.sql.Dataset
object and an org.apache.hadoop.fs.Path
object representing the path to intermediate data.
scala> val (df, path) = hive.readTable("durable_Water", "orc")
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [number: int, name: string ... 9 more fields]
path: org.apache.hadoop.fs.Path = hdfs://red0:8020/tmp/20190424172950-18880806-511f-4f81-aed9-afd6bac6e6dc
scala> df.show
+------+---------+-----+--------+-----+---+------+-------+------+------+-----+
|number| name|type1| type2|total| hp|attack|defense|sp_atk|sp_def|speed|
+------+---------+-----+--------+-----+---+------+-------+------+------+-----+
| 171| Lanturn|Water|Electric| 460|125| 58| 58| 76| 76| 67|
| 131| Lapras|Water| Ice| 535|130| 85| 80| 85| 95| 60|
| 134| Vaporeon|Water| | 525|130| 65| 60| 110| 95| 65|
| 594|Alomomola|Water| | 470|165| 75| 80| 40| 45| 65|
| 594|Alomomola|Water| | 470|165| 75| 80| 40| 45| 65|
| 320| Wailmer|Water| | 400|130| 70| 35| 70| 35| 60|
| 320| Wailmer|Water| | 400|130| 70| 35| 70| 35| 60|
| 171| Lanturn|Water|Electric| 460|125| 58| 58| 76| 76| 67|
| 134| Vaporeon|Water| | 525|130| 65| 60| 110| 95| 65|
| 131| Lapras|Water| Ice| 535|130| 85| 80| 85| 95| 60|
| 321| Wailord|Water| | 500|170| 90| 45| 90| 45| 60|
| 321| Wailord|Water| | 500|170| 90| 45| 90| 45| 60|
+------+---------+-----+--------+-----+---+------+-------+------+------+-----+
Internally reading from Hive on MR3 with hive.readTable()
proceeds in three stages:
- Spark sends a SQL query via JDBC to Hive on MR3. The SQL query simply reads a Hive table and stores the result in a temporary external table.
- Hive on MR3 executes the query to write intermediate data to HDFS, and drops the external table.
- Spark reads the intermediate data from HDFS to create DataFrames.
Since operations on the resultant DataFrames are lazy (which implies that intermediate data is not loaded to memory immediately), the user may have to manually execute checkpointing after setting the checkpointing directory of the current SparkContext:
scala> sc.setCheckpointDir("/tmp") // sc == current SparkContext
scala> val checkpointed_df = df.checkpoint()
checkpointed_df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_col0: int, _col1: string ... 9 more fields]
In the current implementation, the intermediate data is not automatically deleted, so the user should delete it explicitly later using the second element of the pair from hive.readTable()
.
scala> path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, true)
res5: Boolean = true
Writing to Hive on MR3
In order to write DataFrames to Hive on MR3, we use com.hortonworks.spark.sql.hive.llap.HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR
as shown in the following example:
scala> result.write.format(com.hortonworks.spark.sql.hive.llap.HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR).option("table", "durable_water").save
scala> hive.execute("select * from durable_water").show
+------+---------+-----+--------+-----+---+------+-------+------+------+-----+
|number| name|type1| type2|total| hp|attack|defense|sp_atk|sp_def|speed|
+------+---------+-----+--------+-----+---+------+-------+------+------+-----+
| 131| Lapras|Water| Ice| 535|130| 85| 80| 85| 95| 60|
| 321| Wailord|Water| | 500|170| 90| 45| 90| 45| 60|
| 594|Alomomola|Water| | 470|165| 75| 80| 40| 45| 65|
| 134| Vaporeon|Water| | 525|130| 65| 60| 110| 95| 65|
| 171| Lanturn|Water|Electric| 460|125| 58| 58| 76| 76| 67|
| 320| Wailmer|Water| | 400|130| 70| 35| 70| 35| 60|
| 321| Wailord|Water| | 500|170| 90| 45| 90| 45| 60|
| 594|Alomomola|Water| | 470|165| 75| 80| 40| 45| 65|
| 131| Lapras|Water| Ice| 535|130| 85| 80| 85| 95| 60|
| 134| Vaporeon|Water| | 525|130| 65| 60| 110| 95| 65|
| 171| Lanturn|Water|Electric| 460|125| 58| 58| 76| 76| 67|
| 320| Wailmer|Water| | 400|130| 70| 35| 70| 35| 60|
+------+---------+-----+--------+-----+---+------+-------+------+------+-----+
Internally writing to Hive on MR3 proceeds in three stages:
- Spark writes DataFrames to HDFS as intermediate data.
By default, it writes under
/tmp
, and the location can be specified with the Spark propertyspark.datasource.hive.warehouse.load.staging.dir
. - Spark sends a SQL query via JDBC to Hive on MR3.
- Hive on MR3 reads the intermediate data written by Spark to execute the query, updating both HDFS and Metastore.