本文档是 Apache Flink 的旧版本。建议访问 最新的稳定版本

Legacy Features

As Flink SQL has matured there are some features that have been replaced with more modern and better functioning substitutes. These legacy features remain documented here for those users that have not yet or are unable to, upgrade to the more modern variant.

Temporal Table Function

The temporal table function is the legacy way of defining something akin to a versioned table that can be used in a temporal table join. Please define temporal joins using versioned table as described above in new queries.

Unlike a versioned table, temporal table functions can only be defined on top of append-only streams — it does not support changelog inputs. Additionally, a temporal table function cannot be defined in pure SQL DDL.

Defining a Temporal Table Function

Temporal table function can be defined on top of append-only streams using the Table API. The table is registered with one or more key columns, and a time attribute used for versioning.

Suppose we have an append-only table of currency rates that we would like to register as a temporal table function.

SELECT * FROM currency_rates;

update_time   currency   rate
============= =========  ====
09:00:00      Yen        102
09:00:00      Euro       114
09:00:00      USD        1
11:15:00      Euro       119
11:49:00      Pounds     108

Using the Table API, we can register this stream using currency for the key and update_time as the versioning time attribute.

TemporalTableFunction rates = tEnv
    .from("currency_rates").
    .createTemporalTableFunction("update_time", "currency");
 
tEnv.registerFunction("rates", rates);                                                        
rates = tEnv
    .from("currency_rates").
    .createTemporalTableFunction("update_time", "currency")
 
tEnv.registerFunction("rates", rates)

Temporal Table Function Join

Once defined, a temporal table function is used as a standard table function. Append-only tables (left input/probe side) can join with a temporal table (right input/build side), i.e., a table that changes over time and tracks its changes, to retrieve the value for a key as it was at a particular point in time.

Consider an append-only table orders that tracks customers orders in different currencies.

SELECT * FROM orders;

order_time amount currency
========== ====== =========
10:15        2    Euro
10:30        1    USD
10:32       50    Yen
10:52        3    Euro
11:04        5    USD

Given these tables, we would like to convert orders to a common currency — USD.

SELECT
  SUM(amount * rate) AS amount
FROM
  orders,
  LATERAL TABLE (rates(order_time))
WHERE
  rates.currency = orders.currency
Table result = orders
    .joinLateral($("rates(order_time)"), $("orders.currency = rates.currency"))
    .select($("(o_amount * r_rate).sum as amount"));
val result = orders
    .joinLateral($"rates(order_time)", $"orders.currency = rates.currency")
    .select($"(o_amount * r_rate).sum as amount"))

Back to top