This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.

Hive Dialect

Starting from 1.11.0, Flink allows users to write SQL statements in Hive syntax when Hive dialect is used. By providing compatibility with Hive syntax, we aim to improve the interoperability with Hive and reduce the scenarios when users need to switch between Flink and Hive in order to execute different statements.

Use Hive Dialect

Flink currently supports two SQL dialects: default and hive. You need to switch to Hive dialect before you can write in Hive syntax. The following describes how to set dialect with SQL Client and Table API. Also notice that you can dynamically switch dialect for each statement you execute. There’s no need to restart a session to use a different dialect.

SQL Client

SQL dialect can be specified via the table.sql-dialect property. Therefore you can set the initial dialect to use in the configuration section of the yaml file for your SQL Client.

execution:
  planner: blink
  type: batch
  result-mode: table

configuration:
  table.sql-dialect: hive
  

You can also set the dialect after the SQL Client has launched.

Flink SQL> set table.sql-dialect=hive; -- to use hive dialect
[INFO] Session property has been set.

Flink SQL> set table.sql-dialect=default; -- to use default dialect
[INFO] Session property has been set.

Table API

You can set dialect for your TableEnvironment with Table API.

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()...build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// to use hive dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// to use default dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
from pyflink.table import *

settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = BatchTableEnvironment.create(environment_settings=settings)

# to use hive dialect
t_env.get_config().set_sql_dialect(SqlDialect.HIVE)
# to use default dialect
t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)

DDL

This section lists the supported DDLs with the Hive dialect. We’ll mainly focus on the syntax here. You can refer to Hive doc for the semantics of each DDL statement.

DATABASE

Show

SHOW DATABASES;

Create

CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name
  [COMMENT database_comment]
  [LOCATION fs_path]
  [WITH DBPROPERTIES (property_name=property_value, ...)];

Alter

Update Properties
ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...);
Update Owner
ALTER (DATABASE|SCHEMA) database_name SET OWNER [USER|ROLE] user_or_role;
Update Location
ALTER (DATABASE|SCHEMA) database_name SET LOCATION fs_path;

Drop

DROP (DATABASE|SCHEMA) [IF EXISTS] database_name [RESTRICT|CASCADE];

Use

USE database_name;

TABLE

Show

SHOW TABLES;

Create

CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
  [(col_name data_type [column_constraint] [COMMENT col_comment], ... [table_constraint])]
  [COMMENT table_comment]
  [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
  [
    [ROW FORMAT row_format]
    [STORED AS file_format]
  ]
  [LOCATION fs_path]
  [TBLPROPERTIES (property_name=property_value, ...)]
  
row_format:
  : DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]] [COLLECTION ITEMS TERMINATED BY char]
      [MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char]
      [NULL DEFINED AS char]
  | SERDE serde_name [WITH SERDEPROPERTIES (property_name=property_value, ...)]
  
file_format:
  : SEQUENCEFILE
  | TEXTFILE
  | RCFILE
  | ORC
  | PARQUET
  | AVRO
  | INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname
  
column_constraint:
  : NOT NULL [[ENABLE|DISABLE] [VALIDATE|NOVALIDATE] [RELY|NORELY]]
  
table_constraint:
  : [CONSTRAINT constraint_name] PRIMARY KEY (col_name, ...) [[ENABLE|DISABLE] [VALIDATE|NOVALIDATE] [RELY|NORELY]]

Alter

Rename
ALTER TABLE table_name RENAME TO new_table_name;
Update Properties
ALTER TABLE table_name SET TBLPROPERTIES (property_name = property_value, property_name = property_value, ... );
Update Location
ALTER TABLE table_name [PARTITION partition_spec] SET LOCATION fs_path;

The partition_spec, if present, needs to be a full spec, i.e. has values for all partition columns. And when it’s present, the operation will be applied to the corresponding partition instead of the table.

Update File Format
ALTER TABLE table_name [PARTITION partition_spec] SET FILEFORMAT file_format;

The partition_spec, if present, needs to be a full spec, i.e. has values for all partition columns. And when it’s present, the operation will be applied to the corresponding partition instead of the table.

Update SerDe Properties
ALTER TABLE table_name [PARTITION partition_spec] SET SERDE serde_class_name [WITH SERDEPROPERTIES serde_properties];
 
ALTER TABLE table_name [PARTITION partition_spec] SET SERDEPROPERTIES serde_properties;
 
serde_properties:
  : (property_name = property_value, property_name = property_value, ... )

The partition_spec, if present, needs to be a full spec, i.e. has values for all partition columns. And when it’s present, the operation will be applied to the corresponding partition instead of the table.

Add Partitions
ALTER TABLE table_name ADD [IF NOT EXISTS] (PARTITION partition_spec [LOCATION fs_path])+;
Drop Partitions
ALTER TABLE table_name DROP [IF EXISTS] PARTITION partition_spec[, PARTITION partition_spec, ...];
Add/Replace Columns
ALTER TABLE table_name
  ADD|REPLACE COLUMNS (col_name data_type [COMMENT col_comment], ...)
  [CASCADE|RESTRICT]
Change Column
ALTER TABLE table_name CHANGE [COLUMN] col_old_name col_new_name column_type
  [COMMENT col_comment] [FIRST|AFTER column_name] [CASCADE|RESTRICT];

Drop

DROP TABLE [IF EXISTS] table_name;

VIEW

Create

CREATE VIEW [IF NOT EXISTS] view_name [(column_name, ...) ]
  [COMMENT view_comment]
  [TBLPROPERTIES (property_name = property_value, ...)]
  AS SELECT ...;

Alter

NOTE: Altering view only works in Table API, but not supported via SQL client.

Rename
ALTER VIEW view_name RENAME TO new_view_name;
Update Properties
ALTER VIEW view_name SET TBLPROPERTIES (property_name = property_value, ... );
Update As Select
ALTER VIEW view_name AS select_statement;

Drop

DROP VIEW [IF EXISTS] view_name;

FUNCTION

Show

SHOW FUNCTIONS;

Create

CREATE FUNCTION function_name AS class_name;

Drop

DROP FUNCTION [IF EXISTS] function_name;

DML

INSERT

INSERT (INTO|OVERWRITE) [TABLE] table_name [PARTITION partition_spec] SELECT ...;

The partition_spec, if present, can be either a full spec or partial spec. If the partition_spec is a partial spec, the dynamic partition column names can be omitted.

DQL

At the moment, Hive dialect supports the same syntax as Flink SQL for DQLs. Refer to Flink SQL queries for more details. And it’s recommended to switch to default dialect to execute DQLs.

Notice

The following are some precautions for using the Hive dialect.

  • Hive dialect should only be used to manipulate Hive tables, not generic tables. And Hive dialect should be used together with a HiveCatalog.
  • While all Hive versions support the same syntax, whether a specific feature is available still depends on the Hive version you use. For example, updating database location is only supported in Hive-2.4.0 or later.
  • Hive and Calcite have different sets of reserved keywords. For example, default is a reserved keyword in Calcite and a non-reserved keyword in Hive. Even with Hive dialect, you have to quote such keywords with backtick ( ` ) in order to use them as identifiers.
  • Due to expanded query incompatibility, views created in Flink cannot be queried in Hive.