These release notes discuss important aspects, such as configuration, behavior, or dependencies, that changed between Flink 1.11 and Flink 1.12. Please read these notes carefully if you are planning to upgrade your Flink version to 1.12.
Using UnalignedCheckpoints in Flink 1.12.0 combined with two/multiple inputs tasks or with union inputs for single input tasks can result in corrupted state.
This can happen if a new checkpoint is triggered before recovery is fully completed. For state to be corrupted a task with two or more input gates must receive a checkpoint barrier exactly at the same time this tasks finishes recovering spilled in-flight data. In such case this new checkpoint can succeed, with corrupted/missing in-flight data, which will result in various deserialisation/corrupted data stream errors when someone attempts to recover from such corrupted checkpoint.
ExecutionConfig#isLatencyTrackingEnabled was removed, you can use
Deprecated and methods without effect were removed:
-q flag from cli. The option had no effect.
The deprecated method
RuntimeContext#getAllAccumulators was removed. Please use
CheckpointConfig#setPreferCheckpointForRecovery method has been deprecated, because preferring older checkpoints over newer savepoints for recovery can lead to data loss.
Allow explicitly configuring time behaviour on
Before Flink 1.12 the
KeyedStream.intervalJoin() operation was changing behavior based on the globally set Stream TimeCharacteristic. In Flink 1.12 we introduced explicit
inEventTime() methods on
IntervalJoin and the join no longer changes behaviour based on the global characteristic.
timeWindow() operations in DataStream API FLINK-19318
In Flink 1.12 we deprecated the
timeWindow() operations in the DataStream API. Please use
window(WindowAssigner) with either a
SlidingProcessingTimeWindows. For more information, see the deprecation description of
In Flink 1.12 the default stream time characteristic has been changed to
EventTime, thus you don’t need to call this method for enabling event-time support anymore. Explicitly using processing-time windows and timers works in event-time mode. If you need to disable watermarks, please use
ExecutionConfig.setAutoWatermarkInterval(long). If you are using
IngestionTime, please manually set an appropriate
WatermarkStrategy. If you are using generic “time window” operations (for example
KeyedStream.timeWindow()) that change behaviour based on the time characteristic, please use equivalent operations that explicitly specify processing time or event time.
Allow explicitly configuring time behaviour on CEP PatternStream FLINK-19326
Before Flink 1.12 the CEP operations were changing their behavior based on the globally set Stream TimeCharacteristic. In Flink 1.12 we introduced explicit
inEventTime() methods on
PatternStream and the CEP operations no longer change their behaviour based on the global characteristic.
Remove remaining UdfAnalyzer configurations FLINK-13857
ExecutionConfig#get/setCodeAnalysisMode method and
SkipCodeAnalysis class were removed. They took no effect even before that change, therefore there is no need to use any of these.
DataStream#split() operation has been removed after being marked as deprecated for a couple of versions. Please use Side Outputs) instead.
DataStream#fold() method and all related classes FLINK-19035
The long deprecated
(Windowed)DataStream#fold was removed in 1.12. Please use other operations such as e.g.
(Windowed)DataStream#reduce that perform better in distributed systems.
boolean isOuterSnapshotCompatible(TypeSerializer) on the
CompositeTypeSerializerSnapshot class has been deprecated, in favor of a new
OuterSchemaCompatibility#resolveOuterSchemaCompatibility(TypeSerializer) method. Please implement that instead. Compared to the old method, the new method allows composite serializers to signal state schema migration based on outer schema and configuration.
Flink now relies on Scala Macros 2.1.1. This means that we no longer support Scala < 2.11.11.
CREATE FUNCTION DDL for aggregate functions uses the new type inference now. It might be necessary to update existing implementations to the new reflective type extraction logic. Use
StreamTableEnvironment.registerFunction for the old stack.
METADATA is a reserved keyword now. Use backticks to escape column names and other identifiers with this name.
SQL queries that use the
COLLECT function might need to be updated to the new type system.
In Flink 1.12 we removed the Kafka 0.10.x and 0.11.x connectors. Please use the universal Kafka connector which works with any Kafka cluster version after 0.10.2.x.
Please refer to the documentation to learn about how to upgrade the Flink Kafka Connector version.
csv.line-delimiter option has been removed from CSV format. Because the line delimiter should be defined by the connector instead of format. If users have been using this option in previous Flink version, they should alter such table to remove this option when upgrading to Flink 1.12. There should not much users using this option.
flink-avro-confluent-schema-registry module is no longer provided as a fat-jar. You should include its dependencies in your job’s fat-jar. Sql-client users can use flink-sql-avro-confluent-schema-registry fat jar.
The default version of Avro in flink-avro module was upgraded to 1.10. If for some reason you need an older version (you have Avro coming from Hadoop, or you use classes generated from an older Avro version), please explicitly downgrade the Avro version in your project.
NOTE: We observed a decreased performance of the Avro 1.10 version compared to 1.8.2. If you are concerned with the performance and you are fine working with an older version of Avro, consider downgrading the Avro version.
The SQL Client jar was renamed to flink-sql-avro-1.13-SNAPSHOT.jar, previously flink-avro-1.13-SNAPSHOT-sql-jar.jar. Moreover it is no longer needed to add Avro dependencies manually.
The default log4j configuration has changed: Besides the existing rolling of log files on startup of Flink, they also roll once they’ve reached a size of 100MB. Flink keeps a total of 10 log files, effectively limiting the total size of the log directory to 1GB (per Flink service logging to that directory).
jemalloc is adopted as the default memory allocator in Flink’s docker image to reduce issues with memory fragmentation. Users can roll back to using glibc by passing the ‘disable-jemalloc’ flag to the
docker-entrypoint.sh script. For more details, please refer to the Flink on Docker documentation.
The Mesos dependency has been bumped from 1.0.1 to 1.7.0.
In Flink 1.12 we changed the behavior of the standalone scripts to issue a SIGKILL if a SIGTERM did not succeed in shutting down a Flink process.
The semantics of submitting a job have slightly changed. The submission call returns almost immediately, with the job being in a new
INITIALIZING state. Operations such as triggering a savepoint or retrieving the full job details are not available while the job is in that state.
Once the JobManager for that job has been created, the job is in
CREATED state and all calls are available.
python.fn-execution.framework.memory.size have been removed and so will not take effect any more. Besides, the default value of
python.fn-execution.memory.managed has been changed to true and so managed memory will be used by default for Python workers. In cases where Python UDFs are used together with the RocksDB state backend in streaming or built-in batch algorithms in batch, the user can control how managed memory should be shared between data processing (RocksDB state backend or batch algorithms) and Python, by overwriting managed memory consumer weights.
Beginning from Flink 1.12, jobs will be scheduled in the unit of pipelined regions. A pipelined region is a set of pipelined connected tasks. This means that, for streaming jobs which consist of multiple regions, it no longer waits for all tasks to acquire slots before starting to deploy tasks. Instead, any region can be deployed once it has acquired enough slots for within tasks. For batch jobs, tasks will not be assigned slots and get deployed individually. Instead, a task will be deployed together with all other tasks in the same region, once the region has acquired enough slots.
The old scheduler can be enabled using the
jobmanager.scheduler.scheduling-strategy: legacy setting.