连接器

本篇描述了如何在 PyFlink 中使用连接器,并着重介绍了在 Python 程序中使用 Flink 连接器时需要注意的细节。

Note 想要了解常见的连接器信息和通用配置,请查阅相关的 Java/Scala 文档

下载连接器(connector)和格式(format)jar 包

由于 Flink 是一个基于 Java/Scala 的项目,连接器(connector)和格式(format)的实现是作为 jar 包存在的,要在 PyFlink 作业中使用,首先需要将其指定为作业的 依赖

table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")

如何使用连接器

在 PyFink Table API 中,DDL 是定义 source 和 sink 比较推荐的方式,这可以通过 TableEnvironment 中的 execute_sql() 方法来完成,然后就可以在作业中使用这张表了。

source_ddl = """
        CREATE TABLE source_table(
            a VARCHAR,
            b INT
        ) WITH (
          'type' = 'kafka',
          'topic' = 'source_topic',
          'properties.bootstrap.servers' = 'kafka:9092',
          'properties.group.id' = 'test_3',
          'scan.startup.mode' = 'latest-offset',
          'format' = 'json'
        )
        """

sink_ddl = """
        CREATE TABLE sink_table(
            a VARCHAR
        ) WITH (
          'type' = 'kafka',
          'topic' = 'sink_topic',
          'properties.bootstrap.servers' = 'kafka:9092',
          'format' = 'json'
        )
        """

t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)

t_env.sql_query("SELECT a FROM source_table") \
    .insert_into("sink_table")

下面是如何在 PyFlink 中使用 Kafka source/sink 和 JSON 格式的完整示例。

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, EnvironmentSettings


def log_processing():
    env = StreamExecutionEnvironment.get_execution_environment()
    env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
    t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=env_settings)
    # specify connector and format jars
    t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")
    
    source_ddl = """
            CREATE TABLE source_table(
                a VARCHAR,
                b INT
            ) WITH (
              'type' = 'kafka',
              'topic' = 'source_topic',
              'properties.bootstrap.servers' = 'kafka:9092',
              'properties.group.id' = 'test_3',
              'scan.startup.mode' = 'latest-offset',
              'format' = 'json'
            )
            """

    sink_ddl = """
            CREATE TABLE sink_table(
                a VARCHAR
            ) WITH (
              'type' = 'kafka',
              'topic' = 'sink_topic',
              'properties.bootstrap.servers' = 'kafka:9092',
              'format' = 'json'
            )
            """

    t_env.execute_sql(source_ddl)
    t_env.execute_sql(sink_ddl)

    t_env.sql_query("SELECT a FROM source_table") \
        .insert_into("sink_table")

    t_env.execute("payment_demo")


if __name__ == '__main__':
    log_processing()

内置的 Sources 和 Sinks

有些 source 和 sink 被内置在 Flink 中,可以直接使用。这些内置的 source 包括将 Pandas DataFrame 作为数据源,或者将一个元素集合作为数据源。内置的 sink 包括将数据转换为 Pandas DataFrame 等。

和 Pandas 之间互转

PyFlink 表支持与 Pandas DataFrame 之间互相转换。

import pandas as pd
import numpy as np

# 创建一个 PyFlink 表
pdf = pd.DataFrame(np.random.rand(1000, 2))
table = t_env.from_pandas(pdf, ["a", "b"]).filter("a > 0.5")

# 将 PyFlink 表转换成 Pandas DataFrame
pdf = table.to_pandas()

from_elements()

from_elements() 用于从一个元素集合中创建一张表。元素类型必须是可支持的原子类型或者复杂类型。

table_env.from_elements([(1, 'Hi'), (2, 'Hello')])

# 使用第二个参数指定自定义字段名
table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['a', 'b'])

# 使用第二个参数指定自定义表结构
table_env.from_elements([(1, 'Hi'), (2, 'Hello')],
                        DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
                                       DataTypes.FIELD("b", DataTypes.STRING())]))

以上查询返回的表如下:

+----+-------+
| a  |   b   |
+====+=======+
| 1  |  Hi   |
+----+-------+
| 2  | Hello |
+----+-------+

用户自定义的 source 和 sink

在某些情况下,你可能想要自定义 source 或 sink。目前,source 和 sink 必须使用 Java/Scala 实现,你可以定义一个 TableFactory ,然后通过 DDL 在 PyFlink 作业中来使用它们。更多详情,可查阅 Java/Scala 文档