本文档是 Apache Flink 的旧版本。建议访问 最新的稳定版本

Hive Integration

Apache Hive has established itself as a focal point of the data warehousing ecosystem. It serves as not only a SQL engine for big data analytics and ETL, but also a data management platform, where data is discovered, defined, and evolved.

Flink offers a two-fold integration with Hive.

The first is to leverage Hive’s Metastore as a persistent catalog with Flink’s HiveCatalog for storing Flink specific metadata across sessions. For example, users can store their Kafka or ElasticSearch tables in Hive Metastore by using HiveCatalog, and reuse them later on in SQL queries.

The second is to offer Flink as an alternative engine for reading and writing Hive tables.

The HiveCatalog is designed to be “out of the box” compatible with existing Hive installations. You do not need to modify your existing Hive Metastore or change the data placement or partitioning of your tables.

Supported Hive Versions

Flink supports the following Hive versions.

  • 1.0
    • 1.0.0
    • 1.0.1
  • 1.1
    • 1.1.0
    • 1.1.1
  • 1.2
    • 1.2.0
    • 1.2.1
    • 1.2.2
  • 2.0
    • 2.0.0
    • 2.0.1
  • 2.1
    • 2.1.0
    • 2.1.1
  • 2.2
    • 2.2.0
  • 2.3
    • 2.3.0
    • 2.3.1
    • 2.3.2
    • 2.3.3
    • 2.3.4
    • 2.3.5
    • 2.3.6
  • 3.1
    • 3.1.0
    • 3.1.1
    • 3.1.2

Please note Hive itself have different features available for different versions, and these issues are not caused by Flink:

  • Hive built-in functions are supported in 1.2.0 and later.
  • Column constraints, i.e. PRIMARY KEY and NOT NULL, are supported in 3.1.0 and later.
  • Altering table statistics is supported in 1.2.0 and later.
  • DATE column statistics are supported in 1.2.0 and later.
  • Writing to ORC tables is not supported in 2.0.x.

Dependencies

To integrate with Hive, you need to add some extra dependencies to the /lib/ directory in Flink distribution to make the integration work in Table API program or SQL in SQL Client. Alternatively, you can put these dependencies in a dedicated folder, and add them to classpath with the -C or -l option for Table API program or SQL Client respectively.

Please find the required dependencies for different Hive major versions below.

/flink-1.10.0
   /lib

       // Flink's Hive connector.Contains flink-hadoop-compatibility and flink-orc jars
       flink-connector-hive_2.11-1.10.0.jar

       // Hadoop dependencies
       // You can pick a pre-built Hadoop uber jar provided by Flink, alternatively
       // you can use your own hadoop jars. Either way, make sure it's compatible with your Hadoop
       // cluster and the Hive version you're using.
       flink-shaded-hadoop-2-uber-2.7.5-8.0.jar

       // Hive dependencies
       hive-exec-2.3.4.jar
/flink-1.10.0
   /lib

       // Flink's Hive connector. Contains flink-hadoop-compatibility and flink-orc jars
       flink-connector-hive_2.11-1.10.0.jar

       // Hadoop dependencies
       // You can pick a pre-built Hadoop uber jar provided by Flink, alternatively
       // you can use your own hadoop jars. Either way, make sure it's compatible with your Hadoop
       // cluster and the Hive version you're using.
       flink-shaded-hadoop-2-uber-2.6.5-8.0.jar

       // Hive dependencies
       hive-metastore-1.0.0.jar
       hive-exec-1.0.0.jar
       libfb303-0.9.0.jar // libfb303 is not packed into hive-exec in some versions, need to add it separately
/flink-1.10.0
   /lib

       // Flink's Hive connector. Contains flink-hadoop-compatibility and flink-orc jars
       flink-connector-hive_2.11-1.10.0.jar

       // Hadoop dependencies
       // You can pick a pre-built Hadoop uber jar provided by Flink, alternatively
       // you can use your own hadoop jars. Either way, make sure it's compatible with your Hadoop
       // cluster and the Hive version you're using.
       flink-shaded-hadoop-2-uber-2.6.5-8.0.jar

       // Hive dependencies
       hive-metastore-1.1.0.jar
       hive-exec-1.1.0.jar
       libfb303-0.9.2.jar // libfb303 is not packed into hive-exec in some versions, need to add it separately
/flink-1.10.0
   /lib

       // Flink's Hive connector. Contains flink-hadoop-compatibility and flink-orc jars
       flink-connector-hive_2.11-1.10.0.jar

       // Hadoop dependencies
       // You can pick a pre-built Hadoop uber jar provided by Flink, alternatively
       // you can use your own hadoop jars. Either way, make sure it's compatible with your Hadoop
       // cluster and the Hive version you're using.
       flink-shaded-hadoop-2-uber-2.6.5-8.0.jar

       // Hive dependencies
       hive-metastore-1.2.1.jar
       hive-exec-1.2.1.jar
       libfb303-0.9.2.jar // libfb303 is not packed into hive-exec in some versions, need to add it separately
/flink-1.10.0
   /lib

       // Flink's Hive connector. Contains flink-hadoop-compatibility and flink-orc jars
       flink-connector-hive_2.11-1.10.0.jar

       // Hadoop dependencies
       // You can pick a pre-built Hadoop uber jar provided by Flink, alternatively
       // you can use your own hadoop jars. Either way, make sure it's compatible with your Hadoop
       // cluster and the Hive version you're using.
       flink-shaded-hadoop-2-uber-2.7.5-8.0.jar

       // Hive dependencies
       hive-exec-2.0.0.jar
/flink-1.10.0
   /lib

       // Flink's Hive connector. Contains flink-hadoop-compatibility and flink-orc jars
       flink-connector-hive_2.11-1.10.0.jar

       // Hadoop dependencies
       // You can pick a pre-built Hadoop uber jar provided by Flink, alternatively
       // you can use your own hadoop jars. Either way, make sure it's compatible with your Hadoop
       // cluster and the Hive version you're using.
       flink-shaded-hadoop-2-uber-2.7.5-8.0.jar

       // Hive dependencies
       hive-exec-2.1.0.jar
/flink-1.10.0
   /lib

       // Flink's Hive connector. Contains flink-hadoop-compatibility and flink-orc jars
       flink-connector-hive_2.11-1.10.0.jar

       // Hadoop dependencies
       // You can pick a pre-built Hadoop uber jar provided by Flink, alternatively
       // you can use your own hadoop jars. Either way, make sure it's compatible with your Hadoop
       // cluster and the Hive version you're using.
       flink-shaded-hadoop-2-uber-2.7.5-8.0.jar

       // Hive dependencies
       hive-exec-2.2.0.jar

       // Orc dependencies -- required by the ORC vectorized optimizations
       orc-core-1.4.3.jar
       aircompressor-0.8.jar // transitive dependency of orc-core
/flink-1.10.0
   /lib

       // Flink's Hive connector. Contains flink-hadoop-compatibility and flink-orc jars
       flink-connector-hive_2.11-1.10.0.jar

       // Hadoop dependencies
       // You can pick a pre-built Hadoop uber jar provided by Flink, alternatively
       // you can use your own hadoop jars. Either way, make sure it's compatible with your Hadoop
       // cluster and the Hive version you're using.
       flink-shaded-hadoop-2-uber-2.8.3-8.0.jar

       // Hive dependencies
       hive-exec-3.1.0.jar
       libfb303-0.9.3.jar // libfb303 is not packed into hive-exec in some versions, need to add it separately

If you are building your own program, you need the following dependencies in your mvn file. It’s recommended not to include these dependencies in the resulting jar file. You’re supposed to add dependencies as stated above at runtime.

<!-- Flink Dependency -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hive_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>

<!-- Hive Dependency -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>${hive.version}</version>
    <scope>provided</scope>
</dependency>

Connecting To Hive

Connect to an existing Hive installation using the catalog interface and HiveCatalog through the table environment or YAML configuration.

If the hive-conf/hive-site.xml file is stored in remote storage system, users should download the hive configuration file to their local environment first.

Please note while HiveCatalog doesn’t require a particular planner, reading/writing Hive tables only works with blink planner. Therefore it’s highly recommended that you use blink planner when connecting to your Hive warehouse.

Take Hive version 2.3.4 for example:

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

String name            = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir     = "/opt/hive-conf"; // a local path
String version         = "2.3.4";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tableEnv.registerCatalog("myhive", hive);

// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val tableEnv = TableEnvironment.create(settings)

val name            = "myhive"
val defaultDatabase = "mydatabase"
val hiveConfDir     = "/opt/hive-conf" // a local path
val version         = "2.3.4"

val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
tableEnv.registerCatalog("myhive", hive)

// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive")
execution:
    planner: blink
    ...
    current-catalog: myhive  # set the HiveCatalog as the current catalog of the session
    current-database: mydatabase
    
catalogs:
   - name: myhive
     type: hive
     hive-conf-dir: /opt/hive-conf
     hive-version: 2.3.4

DDL

DDL to create Hive tables, views, partitions, functions within Flink will be supported soon.

DML

Flink supports DML writing to Hive tables. Please refer to details in Reading & Writing Hive Tables