These release notes discuss important aspects, such as configuration, behavior, or dependencies, that changed between Flink 1.10 and Flink 1.11. Please read these notes carefully if you are planning to upgrade your Flink version to 1.11.
The user can now submit applications and choose to execute their
main() method on the cluster rather than the client.
This allows for more light-weight application submission. For more details,
see the Application Mode documentation.
With FLINK-16657 the web submission logic changes and it exposes
the same behavior as submitting a job through the CLI in detached mode. This implies that, for instance, jobs based on
the DataSet API that were using sinks like
collect() will now throw an exception while
before the output was simply never printed. See also comments on related PR.
Flink project does not provide any updated “flink-shaded-hadoop-*” jars.
Users need to provide Hadoop dependencies through the HADOOP_CLASSPATH environment variable (recommended) or via
include-hadoop Maven profile has been removed.
flink-jsonare bundled in lib folder (FLINK-18173)
There is no need to download manually jar files for
flink-json formats as they are now bundled in the
Flink no longer supports the legacy scheduler.
jobmanager.scheduler: legacy will no longer work and fail with an
The only valid option for
jobmanager.scheduler is the default value
The user code class loader is being reused by the
TaskExecutor as long as there is at least a single slot allocated for the respective job.
This changes Flink’s recovery behaviour slightly so that it will not reload static fields.
The benefit is that this change drastically reduces pressure on the JVM’s metaspace.
slavefile name with
For Standalone Setups, the file with the worker nodes is no longer called
Previous setups that use the
stop-cluster.sh scripts need to rename that file.
The examples of
Dockerfiles and docker image
build.sh scripts have been removed from the Flink Github repository. The examples will no longer be maintained by community in the Flink Github repository, including the examples of integration with Bluemix. Therefore, the following modules have been deleted from the Flink Github repository:
Check the updated user documentation for Flink Docker integration instead. It now describes in detail how to use and customize the Flink official docker image: configuration options, logging, plugins, adding more dependencies and installing software. The documentation also includes examples for Session and Job cluster deployments with:
With FLIP-116, a new memory model has been introduced for the JobManager. New configuration options have been introduced to control the memory consumption of the JobManager process. This affects all types of deployments: standalone, YARN, Mesos, and the new active Kubernetes integration.
Please, check the user documentation for more details.
If you try to reuse your previous Flink configuration without any adjustments, the new memory model can result in differently computed memory parameters for the JVM and, thus, performance changes or even failures.
In order to start the JobManager process, you have to specify at least one of the following options
See also the migration guide for more information.
The following options are deprecated:
If these deprecated options are still used, they will be interpreted as one of the following new options in order to maintain backwards compatibility:
jobmanager.memory.heap.size) for standalone and Mesos deployments
jobmanager.memory.process.size) for containerized deployments (Kubernetes and Yarn)
The following options have been removed and have no effect anymore:
There is no container cut-off anymore.
metaspace memory of the JobManager’s JVM process are now limited by configurable values:
See also JVM Parameters.
Attention These new limits can produce the respective
OutOfMemoryError exceptions if they are not configured properly or there is a respective memory leak. See also the troubleshooting guide.
mesos.resourcemanager.tasks.mem option, deprecated in 1.10 in favour of
taskmanager.memory.process.size, has been completely removed and will have no effect anymore in 1.11+.
The default table planner has been changed to blink.
Due to various issues with packages
org.apache.flink.table.api.scala/java all classes from those packages were relocated.
Moreover the scala expressions were moved to
org.apache.flink.table.api as announced in Flink 1.9.
If you used one of:
And you do not convert to/from DataStream, switch to:
If you do convert to/from DataStream/DataSet, change your imports to one of:
For the Scala expressions use the import:
Additionally, if you use Scala’s implicit conversions to/from DataStream/DataSet, import
org.apache.flink.table.api.bridge.scala._ instead of
StreamTableSink implementations should remove
BatchTableSink implementations should rename
consumeDataSet and return
In previous versions,
StreamExecutionEnvironment.execute() can both trigger table and DataStream programs.
Since Flink 1.11.0, table programs can only be triggered by
Once table program is converted into DataStream program (through
toRetractStream() method), it can only be triggered by
In previous versions,
ExecutionEnvironment.execute() can both trigger table and DataSet programs for legacy batch planner.
Since Flink 1.11.0, batch table programs can only be triggered by
Once table program is converted into DataSet program (through
toDataSet() method), it can only be triggered by
An additional change flag called
RowKind was added to the
This changed the serialization format and will trigger a state migration.
The logging properties files
logback-yarn.xml have been renamed to
kubernetes-session.sh use these logging properties files.
StateTtlConfig#cleanupInBackground has been removed, because the method was deprecated and the background TTL was enabled by default in 1.10.
The TTL compaction filter in RocksDB has been enabled in 1.10 by default and it is now always enabled in 1.11+. Because of that the following option and methods have been removed in 1.11:
Starting from Flink 1.11 the
StateBackendFactory#createFromConfig interface now takes
ReadableConfig instead of
Configuration class is still a valid argument to that method, as it implements the ReadableConfig interface.
Implementors of custom
StateBackend should adjust their implementations.
ConfigurableOptionsFactory classes have been removed.
Please also recompile your application codes if any class extends
Since Flink-1.11 the option
setTotalOrderSeek will be enabled by default for RocksDB’s
This is in order to prevent user from miss using
For backward compatibility we support customizing
setTotalOrderSeek back to false if any performance regression observed (it shouldn’t happen according to our testing).
The default value of
state.backend.fs.memory-threshold has been increased from 1K to 20K to prevent too many small files created on remote FS for small states.
Jobs with large parallelism on source or stateful operators may have “JM OOM” or “RPC message exceeding maximum frame size” problem with this change.
If you encounter such issues please manually set the configuration back to 1K.
DataTypes can be configured with some parameters, e.g., precision. However in previous releases, the precision provided by users was not taking any effect and default value for the precision was being used. To avoid confusion since Flink 1.11 exceptions will be thrown if the value is not supported to make it more visible to users. Changes include:
TimeTypecan only be
VarCharTypecan only be
DecimalTypecan only be
LocalZonedTimestampTypecan only be
DayTimeIntervalTypecan only be
fractionalPrecisioncan only be
YearMonthIntervalTypecan only be
yearPrecisioncan only be
ZonedTimestampTypeis not supported
All MetricReporters that come with Flink have been converted to plugins.
They should no longer be placed into
/lib directory (doing so may result in dependency conflicts!), but
The DataDog metrics reporter now reports counts as the number of events over the reporting interval, instead of the total count. This aligns the count semantics with the DataDog documentation.
Flink now uses Log4j2 by default. Users who wish to revert back to Log4j1 can find instructions to do so in the logging documentation.
Requesting an unavailable log or stdout file from the JobManager’s HTTP server returns status code 404 now.
In previous releases, the HTTP server would return a file with
(file unavailable) as its content.
Note that the metric
lastCheckpointAlignmentBuffered has been removed, because the upstream task will not send any data after emitting a checkpoint barrier until the alignment has been completed on the downstream side.
The web UI still displays this value but it is always
The Kafka 0.8 and 0.9 connectors are no longer under active development and were removed.
The Elasticsearch 2 connector is no longer under active development and was removed. Prior version of these connectors will continue to work with Flink.
KafkaPartitioner was removed. Please see the release notes of Flink 1.3.0 how to migrate from that interface.
By default, if there is an official filesystem plugin for a given schema, it will not be allowed to use fallback filesystem factories (like HADOOP libraries on the classpath) to load it.
fs.allowed-fallback-filesystems configuration option to override this behaviour.
org.apache.flink.core.fs.FileSystem#getKind method has been formally deprecated, as it was not used by Flink.
Failures in synchronous part of checkpointing (like an exceptions thrown by an operator) will fail its Task (and job) immediately, regardless of the configuration parameters.
Since Flink 1.5 such failures could be ignored by setting
setTolerableCheckpointFailureNumber(...) or its deprecated
Now both options will only affect asynchronous failures.
Checkpoint timeouts will now be treated as normal checkpoint failures and checked against value configured by
DataStream API no longer provides
StreamTask#getCheckpointLock method, which was deprecated in Flink 1.10.
Users should use
MailboxExecutor to run actions that require synchronization with the task’s thread (e.g. collecting output produced by an external thread).
MailboxExecutor#tryYield methods can be used for actions that should give control to other actions temporarily (equivalent of
StreamTask#getCheckpointLock().wait()), if the current operator is blocked.
MailboxExecutor can be accessed by using
YieldingOperatorFactory. Example usage can be found in the
SourceFunction.SourceContext.getCheckpointLock is still available for custom implementations of
Starting from Flink 1.11.0, the
flink-streaming-java module does not have a dependency on
flink-clients anymore. If your project was depending on this transitive dependency you now have to add
flink-clients as an explicit dependency.
AsyncWaitOperator will be allowed to be chained by default with all operators, except of tasks with
This mostly revert limitation introduced as a bug fix for FLINK-13063.
The argument type of methods
#createResultPartitionWriters are adjusted from
List for satisfying the order guarantee requirement in unaligned checkpoint.
It will break the compatibility if users already implemented a custom
ShuffleService based on
boolean isOuterSnapshotCompatible(TypeSerializer) on the
CompositeTypeSerializerSnapshot class has been deprecated, in favor of a new
OuterSchemaCompatibility resolveOuterSchemaCompatibility(TypeSerializer) method.
Please implement that instead.
Compared to the old method, the new method allows composite serializers to signal state schema migration based on outer schema and configuration.
TimestampExtractor was removed along with API methods in the DataStream API.
Please use the new
WatermarkStrategies for working with timestamps and watermarks in the DataStream API.
ListCheckpointed interface has been deprecated because it uses Java Serialization for checkpointing state which is problematic for savepoint compatibility.
CheckpointedFunction interface instead, which gives more control over state serialization.
We removed deprecated state access methods
This means that some code that was compiled against Flink 1.10 will not work with a Flink 1.11 cluster.
An example of this is our Kafka connector which internally used