本文档是 Apache Flink 的旧版本。建议访问 最新的稳定版本

INSERT 语句

INSERT 语句用来向表中添加行。

执行 INSERT 语句

可以使用 TableEnvironment 中的 sqlUpdate() 方法执行 INSERT 语句,也可以在 SQL CLI 中执行 INSERT 语句。sqlUpdate() 方法执行 INSERT 语句时时懒执行的,只有当TableEnvironment.execute(jobName)被调用时才会被执行。

以下的例子展示了如何在 TableEnvironment 和 SQL CLI 中执行一个 INSERT 语句。

EnvironmentSettings settings = EnvironmentSettings.newInstance()...
TableEnvironment tEnv = TableEnvironment.create(settings);

// 注册一个 "Orders" 源表,和 "RubberOrders" 结果表
tEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)");
tEnv.sqlUpdate("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH (...)");

// 运行一个 INSERT 语句,将源表的数据输出到结果表中
tEnv.sqlUpdate(
  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
val settings = EnvironmentSettings.newInstance()...
val tEnv = TableEnvironment.create(settings)

// 注册一个 "Orders" 源表,和 "RubberOrders" 结果表
tEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
tEnv.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")

// 运行一个 INSERT 语句,将源表的数据输出到结果表中
tEnv.sqlUpdate(
  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
settings = EnvironmentSettings.newInstance()...
table_env = TableEnvironment.create(settings)

# 注册一个 "Orders" 源表,和 "RubberOrders" 结果表
table_env.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
table_env.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")

# 运行一个 INSERT 语句,将源表的数据输出到结果表中
table_env \
    .sqlUpdate("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...);
[INFO] Table has been created.

Flink SQL> CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...);

Flink SQL> SHOW TABLES;
Orders
RubberOrders

Flink SQL> INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%';
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:

Back to top

将 SELECT 查询数据插入表中

通过 INSERT 语句,可以将查询的结果插入到表中,

语法

INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name [PARTITION part_spec] select_statement

part_spec:
  (part_col_name1=val1 [, part_col_name2=val2, ...])

OVERWRITE

INSERT OVERWRITE 将会覆盖表中或分区中的任何已存在的数据。否则,新数据会追加到表中或分区中。

PARTITION

PARTITION 语句应该包含需要插入的静态分区列与值。

示例

-- 创建一个分区表
CREATE TABLE country_page_view (user STRING, cnt INT, date STRING, country STRING)
PARTITIONED BY (date, country)
WITH (...)

-- 追加行到该静态分区中 (date='2019-8-30', country='China')
INSERT INTO country_page_view PARTITION (date='2019-8-30', country='China')
  SELECT user, cnt FROM page_view_source;

-- 追加行到分区 (date, country) 中,其中 date 是静态分区 '2019-8-30';country 是动态分区,其值由每一行动态决定
INSERT INTO country_page_view PARTITION (date='2019-8-30')
  SELECT user, cnt, country FROM page_view_source;

-- 覆盖行到静态分区 (date='2019-8-30', country='China')
INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30', country='China')
  SELECT user, cnt FROM page_view_source;

-- 覆盖行到分区 (date, country) 中,其中 date 是静态分区 '2019-8-30';country 是动态分区,其值由每一行动态决定
INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30')
  SELECT user, cnt, country FROM page_view_source;

将值插入表中

通过 INSERT 语句,也可以直接将值插入到表中,

语法

INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name VALUES values_row [, values_row ...]

values_row:
    : (val1 [, val2, ...])

OVERWRITE

INSERT OVERWRITE 将会覆盖表中的任何已存在的数据。否则,新数据会追加到表中。

示例

CREATE TABLE students (name STRING, age INT, gpa DECIMAL(3, 2)) WITH (...);

INSERT INTO students
  VALUES ('fred flintstone', 35, 1.28), ('barney rubble', 32, 2.32);

Back to top