Release Notes - Flink 1.8 #
These release notes discuss important aspects, such as configuration, behavior, or dependencies, that changed between Flink 1.7 and Flink 1.8. Please read these notes carefully if you are planning to upgrade your Flink version to 1.8.
Continuous incremental cleanup of old Keyed State with TTL #
We introduced TTL (time-to-live) for Keyed state in Flink 1.6 (FLINK-9510). This feature allowed to clean up and make inaccessible keyed state entries when accessing them. In addition state would now also being cleaned up when writing a savepoint/checkpoint.
Flink 1.8 introduces continous cleanup of old entries for both the RocksDB state backend (FLINK-10471) and the heap state backend (FLINK-10473). This means that old entries (according to the ttl setting) are continously being cleanup up.
New Support for Schema Migration when restoring Savepoints #
With Flink 1.7.0 we added support for changing the schema of state when using
(FLINK-10605). With Flink
1.8.0 we made great progress migrating all built-in
TypeSerializers to a new
serializer snapshot abstraction that theoretically allows schema migration. Of
the serializers that come with Flink, we now support schema migration for the
(FLINK-11485), and Java
(FLINK-11334), As well as
for Kryo in limited cases
Savepoint compatibility #
Savepoints from Flink 1.2 that contain a Scala
are not compatible with Flink 1.8 anymore because of an update in this
can get around this restriction by first upgrading to a version
between Flink 1.3 and Flink 1.7 and then updating to Flink 1.8.
We needed to switch to a custom build of RocksDB called FRocksDB because we need certain changes in RocksDB for supporting continuous state cleanup with TTL. The used build of FRocksDB is based on the upgraded version 5.17.2 of RocksDB. For Mac OS X, RocksDB version 5.17.2 is supported only for OS X version >= 10.13. See also: https://github.com/facebook/rocksdb/issues/4862.
Maven Dependencies #Changes to bundling of Hadoop libraries with Flink (
Convenience binaries that include hadoop are no longer released.
If a deployment relies on
flink-shaded-hadoop2 being included in
flink-dist, then you must manually download a pre-packaged Hadoop
jar from the optional components section of the download
page and copy it into the
/lib directory. Alternatively, a Flink distribution that includes
hadoop can be built by packaging
flink-dist and activating the
include-hadoop maven profile.
As hadoop is no longer included in
flink-dist by default, specifying
-DwithoutHadoop when packaging
flink-dist no longer impacts the build.
Configuration #TaskManager configuration (
TaskManagers now bind to the host IP address instead of the hostname
by default . This behaviour can be controlled by the configuration
taskmanager.network.bind-policy. If your Flink cluster should
experience inexplicable connection problems after upgrading, try to
taskmanager.network.bind-policy: name in your
to return to the pre-1.8 behaviour.
Table API #Deprecation of direct
Tableconstructor usage (
Flink 1.8 deprecates direct usage of the constructor of the
Table class in
the Table API. This constructor would previously be used to perform a join with
a lateral table. You should now use
This change is necessary for converting the Table class into an interface, which will make the API more maintainable and cleaner in the future.Introduction of new CSV format descriptor (
This release introduces a new format descriptor for CSV files that is compliant
with RFC 4180. The new descriptor is available as
org.apache.flink.table.descriptors.Csv. For now, this can only be used
together with the Kafka connector. The old descriptor is available as
org.apache.flink.table.descriptors.OldCsv for use with file system
In order to separate API from actual implementation, the static methods
TableEnvironment.getTableEnvironment() are deprecated. You should now use
Users that had a
flink-table dependency before, need to update their
flink-table-planner and the correct dependency of
flink-table-api-*, depending on whether Java or Scala is used: one of
Change to naming of Table API connector jars (
ExternalCatalogTable.builder() is deprecated in favour of
The naming scheme for kafka/elasticsearch6 sql-jars has been changed.
In maven terms, they no longer have the
sql-jar qualifier and the artifactId
is now prefixed with
flink-sql instead of
Null literals in the Table API need to be defined with
Null(type) from now on. The old approach is deprecated.
Connectors #Introduction of a new KafkaDeserializationSchema that give direct access to ConsumerRecord (
For the Flink
KafkaConsumers, we introduced a new
that gives direct access to the Kafka
ConsumerRecord. This subsumes the
KeyedSerializationSchema functionality, which is deprecated but still available
Starting from Flink 1.8.0, the
FlinkKafkaConsumer now always filters out
restored partitions that are no longer associated with a specified topic to
subscribe to in the restored execution. This behaviour did not exist in
previous versions of the
FlinkKafkaConsumer. If you wish to retain the
previous behaviour, please use the
disableFilterRestoredPartitionsWithSubscribedTopics() configuration method on
Consider this example: if you had a Kafka Consumer that was consuming
A, you did a savepoint, then changed your Kafka consumer
to instead consume from topic
B, and then restarted your job from
the savepoint. Before this change, your consumer would now consume
from both topic
B because it was stored in state that the
consumer was consuming from topic
A. With the change, your consumer
would only consume from topic
B after restore because we filter the
topics that are stored in state using the configured topics.
Miscellaneous Interface changes #The canEqual() method was dropped from the TypeSerializer interface (
canEqual() methods are usually used to make proper equality checks across
hierarchies of types. The
TypeSerializer actually doesn’t require this
property, so the method is now removed.
CompositeSerializerSnapshot utility class has been removed. You should
CompositeTypeSerializerSnapshot instead, for snapshots of composite
serializers that delegate serialization to multiple nested serializers. Please
for instructions on using
Memory management #
In Fink 1.8.0 and prior version, the managed memory fraction of taskmanager is controlled by
and with 0.7 as the default value. However, sometimes this will cause OOMs due to the fact that the default value of JVM
NewRatio is 2, which means the old generation occupied only 2/3 (0.66) of the heap memory. So if you run into
this case, please manually change this value to a lower value.