With HiveWarehouseConnector (originally published by Hortonworks), Spark can read/write DataFrames to/from Hive on MR3 through JDBC queries. Unlike Hive-LLAP, however, Hive on MR3 cannot pass DataFrames directly to Spark because ContainerWorkers of MR3 are designed only to execute Tasks from DAGAppMaster. Nevertheless HiveWarehouseConnector should be useful in common use cases in which Spark extracts/transforms/loads data while Hive on MR3 analyzes data with SQL queries.

Below we illustrate how to use HiveWarehouseConnector with Hive on MR3. We assume that HiveServer2 of Hive 3.1.2 on MR3 is running, and that Spark 2.3.1 is configured to access the same HDFS that Hive on MR3 uses.

Compiling HiveWarehouseConnector

HiveWarehouseConnector for Hive on MR3 is available at GitHub. In order to compile HiveWarehouseConnector, the user needs Scala and SBT. First check the version of each component in build.sbt:

sparkVersion := sys.props.getOrElse("spark.version", "2.3.1")

val hadoopVersion = sys.props.getOrElse("hadoop.version", "3.1.0")
val hiveVersion = sys.props.getOrElse("hive.version", "3.1.2")
val log4j2Version = sys.props.getOrElse("log4j2.version", "2.4.1")
val tezVersion = sys.props.getOrElse("tez.version", "0.9.1")
val thriftVersion = sys.props.getOrElse("thrift.version", "0.9.3")

hadoopVersion must be 3.0.0 or higher because org.apache.hadoop.fs.Path is serializable only in Hadoop 3.

Then compile HiveWarehouseConnector by executing SBT:

$ sbt assembly

The HiveWarehouseConnector jar file can be found at /target/scala-2.11 (if Scala 2.11 is used).

Runnig spark-shell

Run spark-shell with the HiveWarehouseConnector jar file, as in the following example:

$ spark-shell --jars ~/spark-hivemr3/target/scala-2.11/hive-warehouse-connector-assembly-1.0.0-SNAPSHOT.jar --conf spark.sql.hive.hiveserver2.jdbc.url="jdbc:hive2://10.1.91.41:9852/;principal=hive/red0@RED"

Here 10.1.91.41:9852 is the address of HiveServer2, and hive/red0@RED is its service principal.

Then the user can create HiveWarehouseSession:

scala> val hive = com.hortonworks.spark.sql.hive.llap.HiveWarehouseBuilder.session(spark).build()
hive: com.hortonworks.spark.sql.hive.llap.HiveWarehouseSessionImpl = com.hortonworks.spark.sql.hive.llap.HiveWarehouseSessionImpl@129e45eb

Executing Metastore operations

The user can execute various Metastore operations in HiveWarehouseSession. Here are examples of executing Metastore operations. Note that in order to create a new database, we invoke hive.executeUpdate() with a string representing a SQL query.

scala> hive.showDatabases.show(false)
+------------------------------+
|database_name                 |
+------------------------------+
|default                       |
|spark_mr3                     |
|tpcds_bin_partitioned_orc_1000|
+------------------------------+
scala> hive.executeUpdate("CREATE DATABASE create_from_spark LOCATION 'hdfs://red0:8020/tmp/hivemr3/warehouse'")
res4: Boolean = true

scala> hive.showDatabases.show(false)
+------------------------------+
|database_name                 |
+------------------------------+
|create_from_spark             |
|default                       |
|spark_mr3                     |
|tpcds_bin_partitioned_orc_1000|
+------------------------------+
scala> hive.dropDatabase("create_from_spark", false, false)

scala> hive.showDatabases.show(false)
+------------------------------+
|database_name                 |
+------------------------------+
|default                       |
|spark_mr3                     |
|tpcds_bin_partitioned_orc_1000|
+------------------------------+
scala> hive.setDatabase("spark_mr3")

scala> hive.showTables.show
+-----------+
|   tab_name|
+-----------+
|2016_survey|
+-----------+
scala> hive.createTable("new_table").column("my_num", "int").column("my_color", "string").create()

scala> hive.showTables.show
+-----------+
|   tab_name|
+-----------+
|2016_survey|
|  new_table|
+-----------+

scala> hive.describeTable("new_table").show
+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|  my_num|      int|       |
|my_color|   string|       |
+--------+---------+-------+

scala> hive.dropTable("new_table", false, false)

Reading from Hive on MR3

In order to read DataFrames from Hive on MR3 directly via JDBC, we invoke hive.execute() with a string representing a SQL query. hive.execute() stores the result in an org.apache.spark.sql.Dataset object.

scala> hive.setDatabase("default")

scala> val result = hive.execute("select * from pokemon where hp > 120 and type1 = 'Water'")
result: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [number: int, name: string ... 9 more fields]

scala> result.show
+------+---------+-----+--------+-----+---+------+-------+------+------+-----+
|number|     name|type1|   type2|total| hp|attack|defense|sp_atk|sp_def|speed|
+------+---------+-----+--------+-----+---+------+-------+------+------+-----+
|   131|   Lapras|Water|     Ice|  535|130|    85|     80|    85|    95|   60|
|   134| Vaporeon|Water|        |  525|130|    65|     60|   110|    95|   65|
|   171|  Lanturn|Water|Electric|  460|125|    58|     58|    76|    76|   67|
|   320|  Wailmer|Water|        |  400|130|    70|     35|    70|    35|   60|
|   321|  Wailord|Water|        |  500|170|    90|     45|    90|    45|   60|
|   594|Alomomola|Water|        |  470|165|    75|     80|    40|    45|   65|
|   131|   Lapras|Water|     Ice|  535|130|    85|     80|    85|    95|   60|
|   134| Vaporeon|Water|        |  525|130|    65|     60|   110|    95|   65|
|   171|  Lanturn|Water|Electric|  460|125|    58|     58|    76|    76|   67|
|   320|  Wailmer|Water|        |  400|130|    70|     35|    70|    35|   60|
|   321|  Wailord|Water|        |  500|170|    90|     45|    90|    45|   60|
|   594|Alomomola|Water|        |  470|165|    75|     80|    40|    45|   65|
+------+---------+-----+--------+-----+---+------+-------+------+------+-----+

Alternatively the user can read DataFrames from Hive on MR3 through intermediate data. We invoke hive.readTable() with two strings: 1) the name of the table to read, and 2) the format in which intermedate data is written, such as "orc" and "parquet". Then hive.readTable() returns a pair consisting of the result in an org.apache.spark.sql.Dataset object and an org.apache.hadoop.fs.Path object representing the path to intermediate data.

scala> val (df, path) = hive.readTable("durable_Water", "orc")
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [number: int, name: string ... 9 more fields]
path: org.apache.hadoop.fs.Path = hdfs://red0:8020/tmp/20190424172950-18880806-511f-4f81-aed9-afd6bac6e6dc

scala> df.show
+------+---------+-----+--------+-----+---+------+-------+------+------+-----+  
|number|     name|type1|   type2|total| hp|attack|defense|sp_atk|sp_def|speed|
+------+---------+-----+--------+-----+---+------+-------+------+------+-----+
|   171|  Lanturn|Water|Electric|  460|125|    58|     58|    76|    76|   67|
|   131|   Lapras|Water|     Ice|  535|130|    85|     80|    85|    95|   60|
|   134| Vaporeon|Water|        |  525|130|    65|     60|   110|    95|   65|
|   594|Alomomola|Water|        |  470|165|    75|     80|    40|    45|   65|
|   594|Alomomola|Water|        |  470|165|    75|     80|    40|    45|   65|
|   320|  Wailmer|Water|        |  400|130|    70|     35|    70|    35|   60|
|   320|  Wailmer|Water|        |  400|130|    70|     35|    70|    35|   60|
|   171|  Lanturn|Water|Electric|  460|125|    58|     58|    76|    76|   67|
|   134| Vaporeon|Water|        |  525|130|    65|     60|   110|    95|   65|
|   131|   Lapras|Water|     Ice|  535|130|    85|     80|    85|    95|   60|
|   321|  Wailord|Water|        |  500|170|    90|     45|    90|    45|   60|
|   321|  Wailord|Water|        |  500|170|    90|     45|    90|    45|   60|
+------+---------+-----+--------+-----+---+------+-------+------+------+-----+

Internally reading from Hive on MR3 with hive.readTable() proceeds in three stages:

  1. Spark sends a SQL query via JDBC to Hive on MR3. The SQL query simply reads a Hive table and stores the result in a temporary external table.
  2. Hive on MR3 executes the query to write intermediate data to HDFS, and drops the external table.
  3. Spark reads the intermediate data from HDFS to create DataFrames.

spark-hivemr3-write

Since operations on the resultant DataFrames are lazy (which implies that intermediate data is not loaded to memory immediately), the user may have to manually execute checkpointing after setting the checkpointing directory of the current SparkContext:

scala> sc.setCheckpointDir("/tmp")    // sc == current SparkContext

scala> val checkpointed_df = df.checkpoint()
checkpointed_df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_col0: int, _col1: string ... 9 more fields]

In the current implementation, the intermediate data is not automatically deleted, so the user should delete it explicitly later using the second element of the pair from hive.readTable().

scala> path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, true)
res5: Boolean = true

Writing to Hive on MR3

In order to write DataFrames to Hive on MR3, we use com.hortonworks.spark.sql.hive.llap.HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR as shown in the following example:

scala> result.write.format(com.hortonworks.spark.sql.hive.llap.HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR).option("table", "durable_water").save
                                                                                
scala> hive.execute("select * from durable_water").show
+------+---------+-----+--------+-----+---+------+-------+------+------+-----+
|number|     name|type1|   type2|total| hp|attack|defense|sp_atk|sp_def|speed|
+------+---------+-----+--------+-----+---+------+-------+------+------+-----+
|   131|   Lapras|Water|     Ice|  535|130|    85|     80|    85|    95|   60|
|   321|  Wailord|Water|        |  500|170|    90|     45|    90|    45|   60|
|   594|Alomomola|Water|        |  470|165|    75|     80|    40|    45|   65|
|   134| Vaporeon|Water|        |  525|130|    65|     60|   110|    95|   65|
|   171|  Lanturn|Water|Electric|  460|125|    58|     58|    76|    76|   67|
|   320|  Wailmer|Water|        |  400|130|    70|     35|    70|    35|   60|
|   321|  Wailord|Water|        |  500|170|    90|     45|    90|    45|   60|
|   594|Alomomola|Water|        |  470|165|    75|     80|    40|    45|   65|
|   131|   Lapras|Water|     Ice|  535|130|    85|     80|    85|    95|   60|
|   134| Vaporeon|Water|        |  525|130|    65|     60|   110|    95|   65|
|   171|  Lanturn|Water|Electric|  460|125|    58|     58|    76|    76|   67|
|   320|  Wailmer|Water|        |  400|130|    70|     35|    70|    35|   60|
+------+---------+-----+--------+-----+---+------+-------+------+------+-----+

Internally writing to Hive on MR3 proceeds in three stages:

  1. Spark writes DataFrames to HDFS as intermediate data. By default, it writes under /tmp, and the location can be specified with the Spark property spark.datasource.hive.warehouse.load.staging.dir.
  2. Spark sends a SQL query via JDBC to Hive on MR3.
  3. Hive on MR3 reads the intermediate data written by Spark to execute the query, updating both HDFS and Metastore.

spark-hivemr3-write