pyflink.common package¶
Module contents¶
Important classes used by both Flink Streaming and Batch API:
ExecutionConfig
: A config to define the behavior of the program execution.
-
class
pyflink.common.
Configuration
(other=None, j_configuration=None)[source]¶ Bases:
object
Lightweight configuration object which stores key/value pairs.
-
add_all
(other, prefix=None)[source]¶ Adds all entries from the given configuration into this configuration. The keys are prepended with the given prefix if exist.
- Parameters
other (Configuration) – The configuration whose entries are added to this configuration.
prefix (str) – Optional, the prefix to prepend.
-
add_all_to_dict
(target_dict)[source]¶ Adds all entries in this configuration to the given dict.
- Parameters
target_dict (dict) – The dict to be updated.
-
contains_key
(key)[source]¶ Checks whether there is an entry with the specified key.
- Parameters
key (str) – Key of entry.
- Returns
True if the key is stored, false otherwise.
- Return type
bool
-
get_boolean
(key, default_value)[source]¶ Returns the value associated with the given key as a boolean.
- Parameters
key (str) – The key pointing to the associated value.
default_value (bool) – The default value which is returned in case there is no value associated with the given key.
- Returns
The (default) value associated with the given key.
- Return type
bool
-
get_bytearray
(key, default_value)[source]¶ Returns the value associated with the given key as a byte array.
- Parameters
key (str) – The key pointing to the associated value.
default_value (bytearray) – The default value which is returned in case there is no value associated with the given key.
- Returns
The (default) value associated with the given key.
- Return type
bytearray
-
get_float
(key, default_value)[source]¶ Returns the value associated with the given key as a float.
- Parameters
key (str) – The key pointing to the associated value.
default_value (float) – The default value which is returned in case there is no value associated with the given key.
- Returns
The (default) value associated with the given key.
- Return type
float
-
get_integer
(key, default_value)[source]¶ Returns the value associated with the given key as an integer.
- Parameters
key (str) – The key pointing to the associated value.
default_value (int) – The default value which is returned in case there is no value associated with the given key.
- Returns
The (default) value associated with the given key.
- Return type
int
-
get_string
(key, default_value)[source]¶ Returns the value associated with the given key as a string.
- Parameters
key (str) – The key pointing to the associated value.
default_value (str) – The default value which is returned in case there is no value associated with the given key.
- Returns
The (default) value associated with the given key.
- Return type
str
-
key_set
()[source]¶ Returns the keys of all key/value pairs stored inside this configuration object.
- Returns
The keys of all key/value pairs stored inside this configuration object.
- Return type
set
-
remove_config
(key)[source]¶ Removes given config key from the configuration.
- Parameters
key (str) – The config key to remove.
- Returns
True if config has been removed, false otherwise.
- Return type
bool
-
set_boolean
(key, value)[source]¶ Adds the given key/value pair to the configuration object.
- Parameters
key (str) – The key of the key/value pair to be added.
value (int) – The value of the key/value pair to be added.
-
set_bytearray
(key, value)[source]¶ Adds the given byte array to the configuration object.
- Parameters
key (str) – The key under which the bytes are added.
value (bytearray) – The byte array to be added.
-
set_float
(key, value)[source]¶ Adds the given key/value pair to the configuration object.
- Parameters
key (str) – The key of the key/value pair to be added.
value (float) – The value of the key/value pair to be added.
-
set_integer
(key, value)[source]¶ Adds the given key/value pair to the configuration object.
- Parameters
key (str) – The key of the key/value pair to be added.
value (int) – The value of the key/value pair to be added.
-
-
class
pyflink.common.
ExecutionConfig
(j_execution_config)[source]¶ Bases:
object
A config to define the behavior of the program execution. It allows to define (among other options) the following settings:
The default parallelism of the program, i.e., how many parallel tasks to use for all functions that do not define a specific value directly.
The number of retries in the case of failed executions.
The delay between execution retries.
The
ExecutionMode
of the program: Batch or Pipelined. The default execution mode isExecutionMode.PIPELINED
Enabling or disabling the “closure cleaner”. The closure cleaner pre-processes the implementations of functions. In case they are (anonymous) inner classes, it removes unused references to the enclosing class to fix certain serialization-related problems and to reduce the size of the closure.
The config allows to register types and serializers to increase the efficiency of handling generic types and POJOs. This is usually only needed when the functions return not only the types declared in their signature, but also subclasses of those types.
The flag value indicating use of the default parallelism. This value can be used to reset the parallelism back to the default state.
The flag value indicating an unknown or unset parallelism. This value is not a valid parallelism and indicates that the parallelism should remain unchanged.
-
PARALLELISM_DEFAULT
= -1¶
-
PARALLELISM_UNKNOWN
= -2¶
-
add_default_kryo_serializer
(type_class_name, serializer_class_name)[source]¶ Adds a new Kryo default serializer to the Runtime.
Example:
>>> config.add_default_kryo_serializer("com.aaa.bbb.PojoClass", ... "com.aaa.bbb.Serializer")
- Parameters
type_class_name – The full-qualified java class name of the types serialized with the given serializer.
serializer_class_name – The full-qualified java class name of the serializer to use.
-
disable_auto_generated_uids
()[source]¶ Disables auto-generated UIDs. Forces users to manually specify UIDs on DataStream applications.
It is highly recommended that users specify UIDs before deploying to production since they are used to match state in savepoints to operators in a job. Because auto-generated ID’s are likely to change when modifying a job, specifying custom IDs allow an application to evolve overtime without discarding state.
-
disable_auto_type_registration
()[source]¶ Control whether Flink is automatically registering all types in the user programs with Kryo.
-
disable_force_avro
()[source]¶ Disables the Apache Avro serializer as the forced serializer for POJOs.
-
disable_generic_types
()[source]¶ Disables the use of generic types (types that would be serialized via Kryo). If this option is used, Flink will throw an
UnsupportedOperationException
whenever it encounters a data type that would go through Kryo for serialization.Disabling generic types can be helpful to eagerly find and eliminate the use of types that would go through Kryo serialization during runtime. Rather than checking types individually, using this option will throw exceptions eagerly in the places where generic types are used.
Important: We recommend to use this option only during development and pre-production phases, not during actual production use. The application program and/or the input data may be such that new, previously unseen, types occur at some point. In that case, setting this option would cause the program to fail.
See also
-
disable_object_reuse
()[source]¶ Disables reusing objects that Flink internally uses for deserialization and passing data to user-code functions.
See also
- Returns
This object.
-
disable_sysout_logging
()[source]¶ Disables the printing of progress update messages to stdout.
- Returns
This object.
-
enable_auto_generated_uids
()[source]¶ Enables the Flink runtime to auto-generate UID’s for operators.
See also
-
enable_closure_cleaner
()[source]¶ Enables the ClosureCleaner. This analyzes user code functions and sets fields to null that are not used. This will in most cases make closures or anonymous inner classes serializable that where not serializable due to some Scala or Java implementation artifact. User code must be serializable because it needs to be sent to worker nodes.
- Returns
This object.
-
enable_force_avro
()[source]¶ Forces Flink to use the Apache Avro serializer for POJOs.
Important: Make sure to include the flink-avro module.
-
enable_force_kryo
()[source]¶ Force TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO. In some cases this might be preferable. For example, when using interfaces with subclasses that cannot be analyzed as POJO.
-
enable_generic_types
()[source]¶ Enables the use generic types which are serialized via Kryo.
Generic types are enabled by default.
See also
-
enable_object_reuse
()[source]¶ Enables reusing objects that Flink internally uses for deserialization and passing data to user-code functions. Keep in mind that this can lead to bugs when the user-code function of an operation is not aware of this behaviour.
- Returns
This object.
-
enable_sysout_logging
()[source]¶ Enables the printing of progress update messages to stdout.
- Returns
This object.
-
get_auto_watermark_interval
()[source]¶ Returns the interval of the automatic watermark emission.
See also
- Returns
The integer value interval in milliseconds of the automatic watermark emission.
-
get_default_input_dependency_constraint
()[source]¶ Gets the default input dependency constraint for vertex scheduling. It indicates when a task should be scheduled considering its inputs status.
The default constraint is
InputDependencyConstraint.ANY
.- Returns
The input dependency constraint of this job. The possible constraints are
InputDependencyConstraint.ANY
andInputDependencyConstraint.ALL
.
-
get_default_kryo_serializer_classes
()[source]¶ Returns the registered default Kryo Serializer classes.
- Returns
The dict which the keys are full-qualified java class names of the registered types and the values are full-qualified java class names of the Kryo default Serializer classes.
-
get_execution_mode
()[source]¶ Gets the execution mode used to execute the program. The execution mode defines whether data exchanges are performed in a batch or on a pipelined manner.
The default execution mode is
ExecutionMode.PIPELINED
.See also
- Returns
The execution mode for the program.
-
get_global_job_parameters
()[source]¶ Gets current configuration dict.
- Returns
The configuration dict.
-
get_latency_tracking_interval
()[source]¶ Returns the latency tracking interval.
- Returns
The latency tracking interval in milliseconds.
-
get_max_parallelism
()[source]¶ Gets the maximum degree of parallelism defined for the program.
The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also defines the number of key groups used for partitioned state.
- Returns
Maximum degree of parallelism.
-
get_parallelism
()[source]¶ Gets the parallelism with which operation are executed by default. Operations can individually override this value to use a specific parallelism.
Other operations may need to run with a different parallelism - for example calling a reduce operation over the entire data set will involve an operation that runs with a parallelism of one (the final reduce to the single result value).
- Returns
The parallelism used by operations, unless they override that value. This method returns
ExecutionConfig.PARALLELISM_DEFAULT
if the environment’s default parallelism should be used.
-
get_registered_kryo_types
()[source]¶ Returns the registered Kryo types.
- Returns
The list of full-qualified java class names of the registered Kryo types.
-
get_registered_pojo_types
()[source]¶ Returns the registered POJO types.
- Returns
The list of full-qualified java class names of the registered POJO types.
-
get_registered_types_with_kryo_serializer_classes
()[source]¶ Returns the registered types with their Kryo Serializer classes.
- Returns
The dict which the keys are full-qualified java class names of the registered types and the values are full-qualified java class names of the Kryo Serializer classes.
-
get_restart_strategy
()[source]¶ Returns the restart strategy which has been set for the current job.
See also
- Returns
The specified restart configuration.
-
get_task_cancellation_interval
()[source]¶ Gets the interval (in milliseconds) between consecutive attempts to cancel a running task.
- Returns
The integer value interval in milliseconds.
-
get_task_cancellation_timeout
()[source]¶ Returns the timeout (in milliseconds) after which an ongoing task cancellation leads to a fatal TaskManager error.
The value
0
means that the timeout is disabled. In this case a stuck cancellation will not lead to a fatal error.- Returns
The timeout in milliseconds.
-
has_auto_generated_uids_enabled
()[source]¶ Checks whether auto generated UIDs are supported.
Auto generated UIDs are enabled by default.
See also
See also
- Returns
Boolean value that represent whether auto generated UIDs are supported.
-
has_generic_types_disabled
()[source]¶ Checks whether generic types are supported. Generic types are types that go through Kryo during serialization.
Generic types are enabled by default.
See also
See also
- Returns
Boolean value that represent whether the generic types are supported.
-
is_auto_type_registration_disabled
()[source]¶ Returns whether Flink is automatically registering all types in the user programs with Kryo.
- Returns
True
means auto type registration is disabled andFalse
means enabled.
-
is_closure_cleaner_enabled
()[source]¶ Returns whether the ClosureCleaner is enabled.
See also
- Returns
True
means enable andFalse
means disable.
-
is_force_avro_enabled
()[source]¶ Returns whether the Apache Avro is the default serializer for POJOs.
- Returns
Boolean value that represent whether the Apache Avro is the default serializer for POJOs.
-
is_force_kryo_enabled
()[source]¶ - Returns
Boolean value that represent whether the usage of Kryo serializer for all POJOs is enabled.
-
is_object_reuse_enabled
()[source]¶ Returns whether object reuse has been enabled or disabled.
See also
- Returns
Boolean value that represent whether object reuse has been enabled or disabled.
-
is_sysout_logging_enabled
()[source]¶ Gets whether progress update messages should be printed to stdout.
- Returns
True, if progress update messages should be printed, false otherwise.
-
is_use_snapshot_compression
()[source]¶ Returns whether he compression (snappy) for keyed state in full checkpoints and savepoints is enabled.
- Returns
True
means enabled andFalse
means disabled.
-
register_kryo_type
(type_class_name)[source]¶ Registers the given type with the serialization stack. If the type is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags are written.
Example:
>>> config.register_kryo_type("com.aaa.bbb.KryoClass")
- Parameters
type_class_name – The full-qualified java class name of the type to register.
-
register_pojo_type
(type_class_name)[source]¶ Registers the given type with the serialization stack. If the type is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags are written.
Example:
>>> config.register_pojo_type("com.aaa.bbb.PojoClass")
- Parameters
type_class_name – The full-qualified java class name of the type to register.
-
register_type_with_kryo_serializer
(type_class_name, serializer_class_name)[source]¶ Registers the given Serializer via its class as a serializer for the given type at the KryoSerializer.
Example:
>>> config.register_type_with_kryo_serializer("com.aaa.bbb.PojoClass", ... "com.aaa.bbb.Serializer")
- Parameters
type_class_name – The full-qualified java class name of the types serialized with the given serializer.
serializer_class_name – The full-qualified java class name of the serializer to use.
-
set_auto_watermark_interval
(interval)[source]¶ Sets the interval of the automatic watermark emission. Watermarks are used throughout the streaming system to keep track of the progress of time. They are used, for example, for time based windowing.
- Parameters
interval – The integer value interval between watermarks in milliseconds.
- Returns
This object.
-
set_default_input_dependency_constraint
(input_dependency_constraint)[source]¶ Sets the default input dependency constraint for vertex scheduling. It indicates when a task should be scheduled considering its inputs status.
The default constraint is
InputDependencyConstraint.ANY
.Example:
>>> config.set_default_input_dependency_constraint(InputDependencyConstraint.ALL)
- Parameters
input_dependency_constraint – The input dependency constraint. The constraints could be
InputDependencyConstraint.ANY
orInputDependencyConstraint.ALL
.
-
set_execution_mode
(execution_mode)[source]¶ Sets the execution mode to execute the program. The execution mode defines whether data exchanges are performed in a batch or on a pipelined manner.
The default execution mode is
ExecutionMode.PIPELINED
.Example:
>>> config.set_execution_mode(ExecutionMode.BATCH)
- Parameters
execution_mode – The execution mode to use. The execution mode could be
ExecutionMode.PIPELINED
,ExecutionMode.PIPELINED_FORCED
,ExecutionMode.BATCH
orExecutionMode.BATCH_FORCED
.
-
set_global_job_parameters
(global_job_parameters_dict)[source]¶ Register a custom, serializable user configuration dict.
Example:
>>> config.set_global_job_parameters({"environment.checkpoint_interval": "1000"})
- Parameters
global_job_parameters_dict – Custom user configuration dict.
-
set_latency_tracking_interval
(interval)[source]¶ Interval for sending latency tracking marks from the sources to the sinks.
Flink will send latency tracking marks from the sources at the specified interval. Setting a tracking interval <= 0 disables the latency tracking.
- Parameters
interval – Integer value interval in milliseconds.
- Returns
This object.
-
set_max_parallelism
(max_parallelism)[source]¶ Sets the maximum degree of parallelism defined for the program.
The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also defines the number of key groups used for partitioned state.
- Parameters
max_parallelism – Maximum degree of parallelism to be used for the program.
-
set_parallelism
(parallelism)[source]¶ Sets the parallelism for operations executed through this environment. Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run with x parallel instances.
This method overrides the default parallelism for this environment. The local execution environment uses by default a value equal to the number of hardware contexts (CPU cores / threads). When executing the program via the command line client from a JAR/Python file, the default parallelism is the one configured for that setup.
- Parameters
parallelism – The parallelism to use.
- Returns
This object.
-
set_restart_strategy
(restart_strategy_configuration)[source]¶ Sets the restart strategy to be used for recovery.
>>> config = env.get_config() >>> config.set_restart_strategy(RestartStrategies.fixed_delay_restart(10, 1000))
The restart strategy configurations are all created from
RestartStrategies
.- Parameters
restart_strategy_configuration – Configuration defining the restart strategy to use.
-
set_task_cancellation_interval
(interval)[source]¶ Sets the configuration parameter specifying the interval (in milliseconds) between consecutive attempts to cancel a running task.
- Parameters
interval – The integer value interval in milliseconds.
- Returns
This object.
-
set_task_cancellation_timeout
(timeout)[source]¶ Sets the timeout (in milliseconds) after which an ongoing task cancellation is considered failed, leading to a fatal TaskManager error.
The cluster default is configured via
TaskManagerOptions#TASK_CANCELLATION_TIMEOUT
.The value
0
disables the timeout. In this case a stuck cancellation will not lead to a fatal error.- Parameters
timeout – The task cancellation timeout (in milliseconds).
- Returns
This object.
-
class
pyflink.common.
ExecutionMode
[source]¶ Bases:
object
The execution mode specifies how a batch program is executed in terms of data exchange: pipelining or batched.
Executes the program in a pipelined fashion (including shuffles and broadcasts), except for data exchanges that are susceptible to deadlocks when pipelining. These data exchanges are performed in a batch manner.
An example of situations that are susceptible to deadlocks (when executed in a pipelined manner) are data flows that branch (one data set consumed by multiple operations) and re-join later.
Executes the program in a pipelined fashion (including shuffles and broadcasts), including data exchanges that are susceptible to deadlocks when executed via pipelining.
Usually, PIPELINED is the preferable option, which pipelines most data exchanges and only uses batch data exchanges in situations that are susceptible to deadlocks.
This option should only be used with care and only in situations where the programmer is sure that the program is safe for full pipelining and that Flink was too conservative when choosing the batch exchange at a certain point.
This mode executes all shuffles and broadcasts in a batch fashion, while pipelining data between operations that exchange data only locally between one producer and one consumer.
This mode executes the program in a strict batch way, including all points where data is forwarded locally from one producer to one consumer. This mode is typically more expensive to execute than the BATCH mode. It does guarantee that no successive operations are ever executed concurrently.
-
BATCH
= 2¶
-
BATCH_FORCED
= 3¶
-
PIPELINED
= 0¶
-
PIPELINED_FORCED
= 1¶
-
-
class
pyflink.common.
InputDependencyConstraint
[source]¶ Bases:
object
This constraint indicates when a task should be scheduled considering its inputs status.
ANY
:Schedule the task if any input is consumable.
ALL
:Schedule the task if all the inputs are consumable.
-
ALL
= 1¶
-
ANY
= 0¶
-
-
class
pyflink.common.
RestartStrategies
[source]¶ Bases:
object
This class defines methods to generate RestartStrategyConfigurations. These configurations are used to create RestartStrategies at runtime.
The RestartStrategyConfigurations are used to decouple the core module from the runtime module.
-
class
FailureRateRestartStrategyConfiguration
(max_failure_rate=None, failure_interval=None, delay_between_attempts_interval=None, j_restart_strategy=None)[source]¶ Bases:
pyflink.common.restart_strategy.RestartStrategyConfiguration
Configuration representing a failure rate restart strategy.
-
class
FallbackRestartStrategyConfiguration
(j_restart_strategy=None)[source]¶ Bases:
pyflink.common.restart_strategy.RestartStrategyConfiguration
Restart strategy configuration that could be used by jobs to use cluster level restart strategy. Useful especially when one has a custom implementation of restart strategy set via flink-conf.yaml.
-
class
FixedDelayRestartStrategyConfiguration
(restart_attempts=None, delay_between_attempts_interval=None, j_restart_strategy=None)[source]¶ Bases:
pyflink.common.restart_strategy.RestartStrategyConfiguration
Configuration representing a fixed delay restart strategy.
-
class
NoRestartStrategyConfiguration
(j_restart_strategy=None)[source]¶ Bases:
pyflink.common.restart_strategy.RestartStrategyConfiguration
Configuration representing no restart strategy.
-
static
failure_rate_restart
(failure_rate, failure_interval, delay_interval)[source]¶ Generates a FailureRateRestartStrategyConfiguration.
- Parameters
failure_rate – Maximum number of restarts in given interval
failure_interval
before failing a job.failure_interval – Time interval for failures, the input could be integer value in milliseconds or datetime.timedelta object.
delay_interval – Delay in-between restart attempts, the input could be integer value in milliseconds or datetime.timedelta object.
-
static
fixed_delay_restart
(restart_attempts, delay_between_attempts)[source]¶ Generates a FixedDelayRestartStrategyConfiguration.
- Parameters
restart_attempts – Number of restart attempts for the FixedDelayRestartStrategy.
delay_between_attempts – Delay in-between restart attempts for the FixedDelayRestartStrategy, the input could be integer value in milliseconds or datetime.timedelta object.
- Returns
-
class
-
class
pyflink.common.
RestartStrategyConfiguration
(j_restart_strategy_configuration)[source]¶ Bases:
object
Abstract configuration for restart strategies.
-
class
pyflink.common.
SqlDialect
[source]¶ Bases:
object
Enumeration of valid SQL compatibility modes.
In most of the cases, the built-in compatibility mode should be sufficient. For some features, i.e. the “INSERT INTO T PARTITION(a=’xxx’) …” grammar, you may need to switch to the Hive dialect if required.
We may introduce other SQL dialects in the future.
Flink’s default SQL behavior.
HIVE
:SQL dialect that allows some Apache Hive specific grammar.
Note: We might never support all of the Hive grammar. See the documentation for supported features.
-
DEFAULT
= 0¶
-
HIVE
= 1¶
-