Source code for pyflink.datastream.connectors.kafka

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import warnings
from abc import ABC, abstractmethod
from enum import Enum
from typing import Dict, Union, List, Set, Callable, Any, Optional

from py4j.java_gateway import JavaObject, get_java_class

from pyflink.common import DeserializationSchema, TypeInformation, typeinfo, SerializationSchema, \
    Types, Row
from pyflink.datastream.connectors import Source, Sink
from pyflink.datastream.connectors.base import DeliveryGuarantee, SupportsPreprocessing, \
    StreamTransformer
from pyflink.datastream.functions import SinkFunction, SourceFunction
from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import to_jarray, get_field, get_field_value

__all__ = [
    'FlinkKafkaConsumer',
    'FlinkKafkaProducer',
    'KafkaSource',
    'KafkaSourceBuilder',
    'KafkaSink',
    'KafkaSinkBuilder',
    'Semantic',
    'KafkaTopicPartition',
    'KafkaOffsetsInitializer',
    'KafkaOffsetResetStrategy',
    'KafkaRecordSerializationSchema',
    'KafkaRecordSerializationSchemaBuilder',
    'KafkaTopicSelector'
]


# ---- FlinkKafkaConsumer ----

class FlinkKafkaConsumerBase(SourceFunction, ABC):
    """
    Base class of all Flink Kafka Consumer data sources. This implements the common behavior across
    all kafka versions.

    The Kafka version specific behavior is defined mainly in the specific subclasses.
    """

    def __init__(self, j_flink_kafka_consumer):
        super(FlinkKafkaConsumerBase, self).__init__(source_func=j_flink_kafka_consumer)

    def set_commit_offsets_on_checkpoints(self,
                                          commit_on_checkpoints: bool) -> 'FlinkKafkaConsumerBase':
        """
        Specifies whether or not the consumer should commit offsets back to kafka on checkpoints.
        This setting will only have effect if checkpointing is enabled for the job. If checkpointing
        isn't enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit" (for 0.9+)
        property settings will be used.
        """
        self._j_function = self._j_function \
            .setCommitOffsetsOnCheckpoints(commit_on_checkpoints)
        return self

    def set_start_from_earliest(self) -> 'FlinkKafkaConsumerBase':
        """
        Specifies the consumer to start reading from the earliest offset for all partitions. This
        lets the consumer ignore any committed group offsets in Zookeeper/ Kafka brokers.

        This method does not affect where partitions are read from when the consumer is restored
        from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
        savepoint, only the offsets in the restored state will be used.
        """
        self._j_function = self._j_function.setStartFromEarliest()
        return self

    def set_start_from_latest(self) -> 'FlinkKafkaConsumerBase':
        """
        Specifies the consuer to start reading from the latest offset for all partitions. This lets
        the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.

        This method does not affect where partitions are read from when the consumer is restored
        from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
        savepoint, only the offsets in the restored state will be used.
        """
        self._j_function = self._j_function.setStartFromLatest()
        return self

    def set_start_from_timestamp(self, startup_offsets_timestamp: int) -> 'FlinkKafkaConsumerBase':
        """
        Specifies the consumer to start reading partitions from a specified timestamp. The specified
        timestamp must be before the current timestamp. This lets the consumer ignore any committed
        group offsets in Zookeeper / Kafka brokers.

        The consumer will look up the earliest offset whose timestamp is greater than or equal to
        the specific timestamp from Kafka. If there's no such offset, the consumer will use the
        latest offset to read data from Kafka.

        This method does not affect where partitions are read from when the consumer is restored
        from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
        savepoint, only the offsets in the restored state will be used.

        :param startup_offsets_timestamp: timestamp for the startup offsets, as milliseconds for
                                          epoch.
        """
        self._j_function = self._j_function.setStartFromTimestamp(
            startup_offsets_timestamp)
        return self

    def set_start_from_group_offsets(self) -> 'FlinkKafkaConsumerBase':
        """
        Specifies the consumer to start reading from any committed group offsets found in Zookeeper/
        Kafka brokers. The 'group.id' property must be set in the configuration properties. If no
        offset can be found for a partition, the behaviour in 'auto.offset.reset' set in the
        configuration properties will be used for the partition.

        This method does not affect where partitions are read from when the consumer is restored
        from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
        savepoint, only the offsets in the restored state will be used.
        """
        self._j_function = self._j_function.setStartFromGroupOffsets()
        return self

    def disable_filter_restored_partitions_with_subscribed_topics(self) -> 'FlinkKafkaConsumerBase':
        """
        By default, when restoring from a checkpoint / savepoint, the consumer always ignores
        restored partitions that are no longer associated with the current specified topics or topic
        pattern to subscribe to.

        This method does not affect where partitions are read from when the consumer is restored
        from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
        savepoint, only the offsets in the restored state will be used.
        """
        self._j_function = self._j_function \
            .disableFilterRestoredPartitionsWithSubscribedTopics()
        return self

    def get_produced_type(self) -> TypeInformation:
        return typeinfo._from_java_type(self._j_function.getProducedType())


def _get_kafka_consumer(topics, properties, deserialization_schema, j_consumer_clz):
    if not isinstance(topics, list):
        topics = [topics]
    gateway = get_gateway()
    j_properties = gateway.jvm.java.util.Properties()
    for key, value in properties.items():
        j_properties.setProperty(key, value)

    j_flink_kafka_consumer = j_consumer_clz(topics,
                                            deserialization_schema._j_deserialization_schema,
                                            j_properties)
    return j_flink_kafka_consumer


[docs]class FlinkKafkaConsumer(FlinkKafkaConsumerBase): """ The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from Apache Kafka. The consumer can run in multiple parallel instances, each of which will pull data from one or more Kafka partitions. The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost during a failure, and that the computation processes elements 'exactly once. (These guarantees naturally assume that Kafka itself does not lose any data.) Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets committed to Kafka / Zookeeper are only to bring the outside view of progress in sync with Flink's view of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer has consumed a topic. Please refer to Kafka's documentation for the available configuration properties: http://kafka.apache.org/documentation.html#newconsumerconfigs """ def __init__(self, topics: Union[str, List[str]], deserialization_schema: DeserializationSchema, properties: Dict): """ Creates a new Kafka streaming source consumer for Kafka 0.10.x. This constructor allows passing multiple topics to the consumer. :param topics: The Kafka topics to read from. :param deserialization_schema: The de-/serializer used to convert between Kafka's byte messages and Flink's objects. :param properties: The properties that are used to configure both the fetcher and the offset handler. """ warnings.warn("Deprecated in 1.16. Use KafkaSource instead.", DeprecationWarning) JFlinkKafkaConsumer = get_gateway().jvm \ .org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, deserialization_schema, JFlinkKafkaConsumer) super(FlinkKafkaConsumer, self).__init__(j_flink_kafka_consumer=j_flink_kafka_consumer)
# ---- FlinkKafkaProducer ----
[docs]class Semantic(Enum): """ Semantics that can be chosen. :data: `EXACTLY_ONCE`: The Flink producer will write all messages in a Kafka transaction that will be committed to the Kafka on a checkpoint. In this mode FlinkKafkaProducer sets up a pool of FlinkKafkaProducer. Between each checkpoint there is created new Kafka transaction, which is being committed on FlinkKafkaProducer.notifyCheckpointComplete(long). If checkpoint complete notifications are running late, FlinkKafkaProducer can run out of FlinkKafkaProducers in the pool. In that case any subsequent FlinkKafkaProducer.snapshot- State() requests will fail and the FlinkKafkaProducer will keep using the FlinkKafkaProducer from previous checkpoint. To decrease chances of failing checkpoints there are four options: 1. decrease number of max concurrent checkpoints 2. make checkpoints mre reliable (so that they complete faster) 3. increase delay between checkpoints 4. increase size of FlinkKafkaProducers pool :data: `AT_LEAST_ONCE`: The Flink producer will wait for all outstanding messages in the Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. :data: `NONE`: Means that nothing will be guaranteed. Messages can be lost and/or duplicated in case of failure. """ EXACTLY_ONCE = 0, AT_LEAST_ONCE = 1, NONE = 2 def _to_j_semantic(self): JSemantic = get_gateway().jvm \ .org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic return getattr(JSemantic, self.name)
class FlinkKafkaProducerBase(SinkFunction, ABC): """ Flink Sink to produce data into a Kafka topic. Please note that this producer provides at-least-once reliability guarantees when checkpoints are enabled and set_flush_on_checkpoint(True) is set. Otherwise, the producer doesn;t provid any reliability guarantees. """ def __init__(self, j_flink_kafka_producer): super(FlinkKafkaProducerBase, self).__init__(sink_func=j_flink_kafka_producer) def set_log_failures_only(self, log_failures_only: bool) -> 'FlinkKafkaProducerBase': """ Defines whether the producer should fail on errors, or only log them. If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown and cause the streaming program to fail (and enter recovery). :param log_failures_only: The flag to indicate logging-only on exceptions. """ self._j_function.setLogFailuresOnly(log_failures_only) return self def set_flush_on_checkpoint(self, flush_on_checkpoint: bool) -> 'FlinkKafkaProducerBase': """ If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint. :param flush_on_checkpoint: Flag indicating the flush mode (true = flush on checkpoint) """ self._j_function.setFlushOnCheckpoint(flush_on_checkpoint) return self def set_write_timestamp_to_kafka(self, write_timestamp_to_kafka: bool) -> 'FlinkKafkaProducerBase': """ If set to true, Flink will write the (event time) timestamp attached to each record into Kafka. Timestamps must be positive for Kafka to accept them. :param write_timestamp_to_kafka: Flag indicating if Flink's internal timestamps are written to Kafka. """ self._j_function.setWriteTimestampToKafka(write_timestamp_to_kafka) return self
[docs]class FlinkKafkaProducer(FlinkKafkaProducerBase): """ Flink Sink to produce data into a Kafka topic. By default producer will use AT_LEAST_ONCE semantic. Before using EXACTLY_ONCE please refer to Flink's Kafka connector documentation. """ def __init__(self, topic: str, serialization_schema: SerializationSchema, producer_config: Dict, kafka_producer_pool_size: int = 5, semantic=Semantic.AT_LEAST_ONCE): """ Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to the topic. Using this constructor, the default FlinkFixedPartitioner will be used as the partitioner. This default partitioner maps each sink subtask to a single Kafka partition (i.e. all records received by a sink subtask will end up in the same Kafka partition). :param topic: ID of the Kafka topic. :param serialization_schema: User defined key-less serialization schema. :param producer_config: Properties with the producer configuration. """ gateway = get_gateway() j_properties = gateway.jvm.java.util.Properties() for key, value in producer_config.items(): j_properties.setProperty(key, value) JFlinkKafkaProducer = gateway.jvm \ .org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer j_flink_kafka_producer = JFlinkKafkaProducer( topic, serialization_schema._j_serialization_schema, j_properties, None, semantic._to_j_semantic(), kafka_producer_pool_size) super(FlinkKafkaProducer, self).__init__(j_flink_kafka_producer=j_flink_kafka_producer) def ignore_failures_after_transaction_timeout(self) -> 'FlinkKafkaProducer': """ Disables the propagation of exceptions thrown when committing presumably timed out Kafka transactions during recovery of the job. If a Kafka transaction is timed out, a commit will never be successful. Hence, use this feature to avoid recovery loops of the Job. Exceptions will still be logged to inform the user that data loss might have occurred. Note that we use the System.currentTimeMillis() to track the age of a transaction. Moreover, only exceptions thrown during the recovery are caught, i.e., the producer will attempt at least one commit of the transaction before giving up. :return: This FlinkKafkaProducer. """ self._j_function.ignoreFailuresAfterTransactionTimeout() return self
# ---- KafkaSource ----
[docs]class KafkaSource(Source): """ The Source implementation of Kafka. Please use a :class:`KafkaSourceBuilder` to construct a :class:`KafkaSource`. The following example shows how to create a KafkaSource emitting records of String type. :: >>> source = KafkaSource \\ ... .builder() \\ ... .set_bootstrap_servers('MY_BOOTSTRAP_SERVERS') \\ ... .set_group_id('MY_GROUP') \\ ... .set_topics('TOPIC1', 'TOPIC2') \\ ... .set_value_only_deserializer(SimpleStringSchema()) \\ ... .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \\ ... .build() .. versionadded:: 1.16.0 """ def __init__(self, j_kafka_source: JavaObject): super().__init__(j_kafka_source) @staticmethod def builder() -> 'KafkaSourceBuilder': """ Get a kafkaSourceBuilder to build a :class:`KafkaSource`. :return: a Kafka source builder. """ return KafkaSourceBuilder()
[docs]class KafkaSourceBuilder(object): """ The builder class for :class:`KafkaSource` to make it easier for the users to construct a :class:`KafkaSource`. The following example shows the minimum setup to create a KafkaSource that reads the String values from a Kafka topic. :: >>> source = KafkaSource.builder() \\ ... .set_bootstrap_servers('MY_BOOTSTRAP_SERVERS') \\ ... .set_topics('TOPIC1', 'TOPIC2') \\ ... .set_value_only_deserializer(SimpleStringSchema()) \\ ... .build() The bootstrap servers, topics/partitions to consume, and the record deserializer are required fields that must be set. To specify the starting offsets of the KafkaSource, one can call :meth:`set_starting_offsets`. By default, the KafkaSource runs in an CONTINUOUS_UNBOUNDED mode and never stops until the Flink job is canceled or fails. To let the KafkaSource run in CONTINUOUS_UNBOUNDED but stops at some given offsets, one can call :meth:`set_stopping_offsets`. For example the following KafkaSource stops after it consumes up to the latest partition offsets at the point when the Flink started. :: >>> source = KafkaSource.builder() \\ ... .set_bootstrap_servers('MY_BOOTSTRAP_SERVERS') \\ ... .set_topics('TOPIC1', 'TOPIC2') \\ ... .set_value_only_deserializer(SimpleStringSchema()) \\ ... .set_unbounded(KafkaOffsetsInitializer.latest()) \\ ... .build() .. versionadded:: 1.16.0 """ def __init__(self): self._j_builder = get_gateway().jvm.org.apache.flink.connector.kafka.source \ .KafkaSource.builder() def build(self) -> 'KafkaSource': return KafkaSource(self._j_builder.build()) def set_bootstrap_servers(self, bootstrap_servers: str) -> 'KafkaSourceBuilder': """ Sets the bootstrap servers for the KafkaConsumer of the KafkaSource. :param bootstrap_servers: the bootstrap servers of the Kafka cluster. :return: this KafkaSourceBuilder. """ self._j_builder.setBootstrapServers(bootstrap_servers) return self def set_group_id(self, group_id: str) -> 'KafkaSourceBuilder': """ Sets the consumer group id of the KafkaSource. :param group_id: the group id of the KafkaSource. :return: this KafkaSourceBuilder. """ self._j_builder.setGroupId(group_id) return self def set_topics(self, *topics: str) -> 'KafkaSourceBuilder': """ Set a list of topics the KafkaSource should consume from. All the topics in the list should have existed in the Kafka cluster. Otherwise, an exception will be thrown. To allow some topics to be created lazily, please use :meth:`set_topic_pattern` instead. :param topics: the list of topics to consume from. :return: this KafkaSourceBuilder. """ self._j_builder.setTopics(to_jarray(get_gateway().jvm.java.lang.String, topics)) return self def set_topic_pattern(self, topic_pattern: str) -> 'KafkaSourceBuilder': """ Set a topic pattern to consume from use the java Pattern. For grammar, check out `JavaDoc <https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html>`_ . :param topic_pattern: the pattern of the topic name to consume from. :return: this KafkaSourceBuilder. """ self._j_builder.setTopicPattern(get_gateway().jvm.java.util.regex .Pattern.compile(topic_pattern)) return self def set_partitions(self, partitions: Set['KafkaTopicPartition']) -> 'KafkaSourceBuilder': """ Set a set of partitions to consume from. Example: :: >>> KafkaSource.builder().set_partitions({ ... KafkaTopicPartition('TOPIC1', 0), ... KafkaTopicPartition('TOPIC1', 1), ... }) :param partitions: the set of partitions to consume from. :return: this KafkaSourceBuilder. """ j_set = get_gateway().jvm.java.util.HashSet() for tp in partitions: j_set.add(tp._to_j_topic_partition()) self._j_builder.setPartitions(j_set) return self def set_starting_offsets(self, starting_offsets_initializer: 'KafkaOffsetsInitializer') \ -> 'KafkaSourceBuilder': """ Specify from which offsets the KafkaSource should start consume from by providing an :class:`KafkaOffsetsInitializer`. The following :class:`KafkaOffsetsInitializer` s are commonly used and provided out of the box. Currently, customized offset initializer is not supported in PyFlink. * :meth:`KafkaOffsetsInitializer.earliest` - starting from the earliest offsets. This is also the default offset initializer of the KafkaSource for starting offsets. * :meth:`KafkaOffsetsInitializer.latest` - starting from the latest offsets. * :meth:`KafkaOffsetsInitializer.committedOffsets` - starting from the committed offsets of the consumer group. If there is no committed offsets, starting from the offsets specified by the :class:`KafkaOffsetResetStrategy`. * :meth:`KafkaOffsetsInitializer.offsets` - starting from the specified offsets for each partition. * :meth:`KafkaOffsetsInitializer.timestamp` - starting from the specified timestamp for each partition. Note that the guarantee here is that all the records in Kafka whose timestamp is greater than the given starting timestamp will be consumed. However, it is possible that some consumer records whose timestamp is smaller than the given starting timestamp are also consumed. :param starting_offsets_initializer: the :class:`KafkaOffsetsInitializer` setting the starting offsets for the Source. :return: this KafkaSourceBuilder. """ self._j_builder.setStartingOffsets(starting_offsets_initializer._j_initializer) return self def set_unbounded(self, stopping_offsets_initializer: 'KafkaOffsetsInitializer') \ -> 'KafkaSourceBuilder': """ By default, the KafkaSource is set to run in CONTINUOUS_UNBOUNDED manner and thus never stops until the Flink job fails or is canceled. To let the KafkaSource run as a streaming source but still stops at some point, one can set an :class:`KafkaOffsetsInitializer` to specify the stopping offsets for each partition. When all the partitions have reached their stopping offsets, the KafkaSource will then exit. This method is different from :meth:`set_bounded` that after setting the stopping offsets with this method, KafkaSource will still be CONTINUOUS_UNBOUNDED even though it will stop at the stopping offsets specified by the stopping offset initializer. The following :class:`KafkaOffsetsInitializer` s are commonly used and provided out of the box. Currently, customized offset initializer is not supported in PyFlink. * :meth:`KafkaOffsetsInitializer.latest` - starting from the latest offsets. * :meth:`KafkaOffsetsInitializer.committedOffsets` - starting from the committed offsets of the consumer group. If there is no committed offsets, starting from the offsets specified by the :class:`KafkaOffsetResetStrategy`. * :meth:`KafkaOffsetsInitializer.offsets` - starting from the specified offsets for each partition. * :meth:`KafkaOffsetsInitializer.timestamp` - starting from the specified timestamp for each partition. Note that the guarantee here is that all the records in Kafka whose timestamp is greater than the given starting timestamp will be consumed. However, it is possible that some consumer records whose timestamp is smaller than the given starting timestamp are also consumed. :param stopping_offsets_initializer: the :class:`KafkaOffsetsInitializer` to specify the stopping offsets. :return: this KafkaSourceBuilder """ self._j_builder.setUnbounded(stopping_offsets_initializer._j_initializer) return self def set_bounded(self, stopping_offsets_initializer: 'KafkaOffsetsInitializer') \ -> 'KafkaSourceBuilder': """ By default, the KafkaSource is set to run in CONTINUOUS_UNBOUNDED manner and thus never stops until the Flink job fails or is canceled. To let the KafkaSource run in BOUNDED manner and stop at some point, one can set an :class:`KafkaOffsetsInitializer` to specify the stopping offsets for each partition. When all the partitions have reached their stopping offsets, the KafkaSource will then exit. This method is different from :meth:`set_unbounded` that after setting the stopping offsets with this method, :meth:`KafkaSource.get_boundedness` will return BOUNDED instead of CONTINUOUS_UNBOUNDED. The following :class:`KafkaOffsetsInitializer` s are commonly used and provided out of the box. Currently, customized offset initializer is not supported in PyFlink. * :meth:`KafkaOffsetsInitializer.latest` - starting from the latest offsets. * :meth:`KafkaOffsetsInitializer.committedOffsets` - starting from the committed offsets of the consumer group. If there is no committed offsets, starting from the offsets specified by the :class:`KafkaOffsetResetStrategy`. * :meth:`KafkaOffsetsInitializer.offsets` - starting from the specified offsets for each partition. * :meth:`KafkaOffsetsInitializer.timestamp` - starting from the specified timestamp for each partition. Note that the guarantee here is that all the records in Kafka whose timestamp is greater than the given starting timestamp will be consumed. However, it is possible that some consumer records whose timestamp is smaller than the given starting timestamp are also consumed. :param stopping_offsets_initializer: the :class:`KafkaOffsetsInitializer` to specify the stopping offsets. :return: this KafkaSourceBuilder """ self._j_builder.setBounded(stopping_offsets_initializer._j_initializer) return self def set_value_only_deserializer(self, deserialization_schema: DeserializationSchema) \ -> 'KafkaSourceBuilder': """ Sets the :class:`~pyflink.common.serialization.DeserializationSchema` for deserializing the value of Kafka's ConsumerRecord. The other information (e.g. key) in a ConsumerRecord will be ignored. :param deserialization_schema: the :class:`DeserializationSchema` to use for deserialization. :return: this KafkaSourceBuilder. """ self._j_builder.setValueOnlyDeserializer(deserialization_schema._j_deserialization_schema) return self def set_client_id_prefix(self, prefix: str) -> 'KafkaSourceBuilder': """ Sets the client id prefix of this KafkaSource. :param prefix: the client id prefix to use for this KafkaSource. :return: this KafkaSourceBuilder. """ self._j_builder.setClientIdPrefix(prefix) return self def set_property(self, key: str, value: str) -> 'KafkaSourceBuilder': """ Set an arbitrary property for the KafkaSource and KafkaConsumer. The valid keys can be found in ConsumerConfig and KafkaSourceOptions. Note that the following keys will be overridden by the builder when the KafkaSource is created. * ``key.deserializer`` is always set to ByteArrayDeserializer. * ``value.deserializer`` is always set to ByteArrayDeserializer. * ``auto.offset.reset.strategy`` is overridden by AutoOffsetResetStrategy returned by :class:`KafkaOffsetsInitializer` for the starting offsets, which is by default :meth:`KafkaOffsetsInitializer.earliest`. * ``partition.discovery.interval.ms`` is overridden to -1 when :meth:`set_bounded` has been invoked. :param key: the key of the property. :param value: the value of the property. :return: this KafkaSourceBuilder. """ self._j_builder.setProperty(key, value) return self def set_properties(self, props: Dict) -> 'KafkaSourceBuilder': """ Set arbitrary properties for the KafkaSource and KafkaConsumer. The valid keys can be found in ConsumerConfig and KafkaSourceOptions. Note that the following keys will be overridden by the builder when the KafkaSource is created. * ``key.deserializer`` is always set to ByteArrayDeserializer. * ``value.deserializer`` is always set to ByteArrayDeserializer. * ``auto.offset.reset.strategy`` is overridden by AutoOffsetResetStrategy returned by :class:`KafkaOffsetsInitializer` for the starting offsets, which is by default :meth:`KafkaOffsetsInitializer.earliest`. * ``partition.discovery.interval.ms`` is overridden to -1 when :meth:`set_bounded` has been invoked. * ``client.id`` is overridden to "client.id.prefix-RANDOM_LONG", or "group.id-RANDOM_LONG" if the client id prefix is not set. :param props: the properties to set for the KafkaSource. :return: this KafkaSourceBuilder. """ gateway = get_gateway() j_properties = gateway.jvm.java.util.Properties() for key, value in props.items(): j_properties.setProperty(key, value) self._j_builder.setProperties(j_properties) return self
[docs]class KafkaTopicPartition(object): """ Corresponding to Java ``org.apache.kafka.common.TopicPartition`` class. Example: :: >>> topic_partition = KafkaTopicPartition('TOPIC1', 0) .. versionadded:: 1.16.0 """ def __init__(self, topic: str, partition: int): self._topic = topic self._partition = partition def _to_j_topic_partition(self): jvm = get_gateway().jvm return jvm.org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition( self._topic, self._partition) def __eq__(self, other): if not isinstance(other, KafkaTopicPartition): return False return self._topic == other._topic and self._partition == other._partition def __hash__(self): return 31 * (31 + self._partition) + hash(self._topic)
[docs]class KafkaOffsetResetStrategy(Enum): """ Corresponding to Java ``org.apache.kafka.client.consumer.OffsetResetStrategy`` class. .. versionadded:: 1.16.0 """ LATEST = 0 EARLIEST = 1 NONE = 2 def _to_j_offset_reset_strategy(self): JOffsetResetStrategy = get_gateway().jvm.org.apache.flink.kafka.shaded.org.apache.kafka.\ clients.consumer.OffsetResetStrategy return getattr(JOffsetResetStrategy, self.name)
[docs]class KafkaOffsetsInitializer(object): """ An interface for users to specify the starting / stopping offset of a KafkaPartitionSplit. .. versionadded:: 1.16.0 """ def __init__(self, j_initializer: JavaObject): self._j_initializer = j_initializer @staticmethod def committed_offsets( offset_reset_strategy: 'KafkaOffsetResetStrategy' = KafkaOffsetResetStrategy.NONE) -> \ 'KafkaOffsetsInitializer': """ Get an :class:`KafkaOffsetsInitializer` which initializes the offsets to the committed offsets. An exception will be thrown at runtime if there is no committed offsets. An optional :class:`KafkaOffsetResetStrategy` can be specified to initialize the offsets if the committed offsets does not exist. :param offset_reset_strategy: the offset reset strategy to use when the committed offsets do not exist. :return: an offset initializer which initialize the offsets to the committed offsets. """ JOffsetsInitializer = get_gateway().jvm.org.apache.flink.connector.kafka.source.\ enumerator.initializer.OffsetsInitializer return KafkaOffsetsInitializer(JOffsetsInitializer.committedOffsets( offset_reset_strategy._to_j_offset_reset_strategy())) @staticmethod def timestamp(timestamp: int) -> 'KafkaOffsetsInitializer': """ Get an :class:`KafkaOffsetsInitializer` which initializes the offsets in each partition so that the initialized offset is the offset of the first record whose record timestamp is greater than or equals the give timestamp. :param timestamp: the timestamp to start the consumption. :return: an :class:`OffsetsInitializer` which initializes the offsets based on the given timestamp. """ JOffsetsInitializer = get_gateway().jvm.org.apache.flink.connector.kafka.source. \ enumerator.initializer.OffsetsInitializer return KafkaOffsetsInitializer(JOffsetsInitializer.timestamp(timestamp)) @staticmethod def earliest() -> 'KafkaOffsetsInitializer': """ Get an :class:`KafkaOffsetsInitializer` which initializes the offsets to the earliest available offsets of each partition. :return: an :class:`KafkaOffsetsInitializer` which initializes the offsets to the earliest available offsets. """ JOffsetsInitializer = get_gateway().jvm.org.apache.flink.connector.kafka.source. \ enumerator.initializer.OffsetsInitializer return KafkaOffsetsInitializer(JOffsetsInitializer.earliest()) @staticmethod def latest() -> 'KafkaOffsetsInitializer': """ Get an :class:`KafkaOffsetsInitializer` which initializes the offsets to the latest offsets of each partition. :return: an :class:`KafkaOffsetsInitializer` which initializes the offsets to the latest offsets. """ JOffsetsInitializer = get_gateway().jvm.org.apache.flink.connector.kafka.source. \ enumerator.initializer.OffsetsInitializer return KafkaOffsetsInitializer(JOffsetsInitializer.latest()) @staticmethod def offsets(offsets: Dict['KafkaTopicPartition', int], offset_reset_strategy: 'KafkaOffsetResetStrategy' = KafkaOffsetResetStrategy.EARLIEST) -> 'KafkaOffsetsInitializer': """ Get an :class:`KafkaOffsetsInitializer` which initializes the offsets to the specified offsets. An optional :class:`KafkaOffsetResetStrategy` can be specified to initialize the offsets in case the specified offset is out of range. Example: :: >>> KafkaOffsetsInitializer.offsets({ ... KafkaTopicPartition('TOPIC1', 0): 0, ... KafkaTopicPartition('TOPIC1', 1): 10000 ... }, KafkaOffsetResetStrategy.EARLIEST) :param offsets: the specified offsets for each partition. :param offset_reset_strategy: the :class:`KafkaOffsetResetStrategy` to use when the specified offset is out of range. :return: an :class:`KafkaOffsetsInitializer` which initializes the offsets to the specified offsets. """ jvm = get_gateway().jvm j_map_wrapper = jvm.org.apache.flink.python.util.HashMapWrapper( None, get_java_class(jvm.Long)) for tp, offset in offsets.items(): j_map_wrapper.put(tp._to_j_topic_partition(), offset) JOffsetsInitializer = get_gateway().jvm.org.apache.flink.connector.kafka.source. \ enumerator.initializer.OffsetsInitializer return KafkaOffsetsInitializer(JOffsetsInitializer.offsets( j_map_wrapper.asMap(), offset_reset_strategy._to_j_offset_reset_strategy()))
[docs]class KafkaSink(Sink, SupportsPreprocessing): """ Flink Sink to produce data into a Kafka topic. The sink supports all delivery guarantees described by :class:`DeliveryGuarantee`. * :attr:`DeliveryGuarantee.NONE` does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated in case of a Flink failure. * :attr:`DeliveryGuarantee.AT_LEAST_ONCE` the sink will wait for all outstanding records in the Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. No messages will be lost in case of any issue with the Kafka brokers but messages may be duplicated when Flink restarts. * :attr:`DeliveryGuarantee.EXACTLY_ONCE`: In this mode the KafkaSink will write all messages in a Kafka transaction that will be committed to Kafka on a checkpoint. Thus, if the consumer reads only committed data (see Kafka consumer config ``isolation.level``), no duplicates will be seen in case of a Flink restart. However, this delays record writing effectively until a checkpoint is written, so adjust the checkpoint duration accordingly. Please ensure that you use unique transactional id prefixes across your applications running on the same Kafka cluster such that multiple running jobs do not interfere in their transactions! Additionally, it is highly recommended to tweak Kafka transaction timeout (link) >> maximum checkpoint duration + maximum restart duration or data loss may happen when Kafka expires an uncommitted transaction. .. versionadded:: 1.16.0 """ def __init__(self, j_kafka_sink, transformer: Optional[StreamTransformer] = None): super().__init__(j_kafka_sink) self._transformer = transformer @staticmethod def builder() -> 'KafkaSinkBuilder': """ Create a :class:`KafkaSinkBuilder` to construct :class:`KafkaSink`. """ return KafkaSinkBuilder() def get_transformer(self) -> Optional[StreamTransformer]: return self._transformer
[docs]class KafkaSinkBuilder(object): """ Builder to construct :class:`KafkaSink`. The following example shows the minimum setup to create a KafkaSink that writes String values to a Kafka topic. :: >>> record_serializer = KafkaRecordSerializationSchema.builder() \\ ... .set_topic(MY_SINK_TOPIC) \\ ... .set_value_serialization_schema(SimpleStringSchema()) \\ ... .build() >>> sink = KafkaSink.builder() \\ ... .set_bootstrap_servers(MY_BOOTSTRAP_SERVERS) \\ ... .set_record_serializer(record_serializer) \\ ... .build() One can also configure different :class:`DeliveryGuarantee` by using :meth:`set_delivery_guarantee` but keep in mind when using :attr:`DeliveryGuarantee.EXACTLY_ONCE`, one must set the transactional id prefix :meth:`set_transactional_id_prefix`. .. versionadded:: 1.16.0 """ def __init__(self): jvm = get_gateway().jvm self._j_builder = jvm.org.apache.flink.connector.kafka.sink.KafkaSink.builder() self._preprocessing = None def build(self) -> 'KafkaSink': """ Constructs the :class:`KafkaSink` with the configured properties. """ return KafkaSink(self._j_builder.build(), self._preprocessing) def set_bootstrap_servers(self, bootstrap_servers: str) -> 'KafkaSinkBuilder': """ Sets the Kafka bootstrap servers. :param bootstrap_servers: A comma separated list of valid URIs to reach the Kafka broker. """ self._j_builder.setBootstrapServers(bootstrap_servers) return self def set_delivery_guarantee(self, delivery_guarantee: DeliveryGuarantee) -> 'KafkaSinkBuilder': """ Sets the wanted :class:`DeliveryGuarantee`. The default delivery guarantee is :attr:`DeliveryGuarantee.NONE`. :param delivery_guarantee: The wanted :class:`DeliveryGuarantee`. """ self._j_builder.setDeliveryGuarantee(delivery_guarantee._to_j_delivery_guarantee()) return self def set_transactional_id_prefix(self, transactional_id_prefix: str) -> 'KafkaSinkBuilder': """ Sets the prefix for all created transactionalIds if :attr:`DeliveryGuarantee.EXACTLY_ONCE` is configured. It is mandatory to always set this value with :attr:`DeliveryGuarantee.EXACTLY_ONCE` to prevent corrupted transactions if multiple jobs using the KafkaSink run against the same Kafka Cluster. The default prefix is ``"kafka-sink"``. The size of the prefix is capped by MAXIMUM_PREFIX_BYTES (6400) formatted with UTF-8. It is important to keep the prefix stable across application restarts. If the prefix changes it might happen that lingering transactions are not correctly aborted and newly written messages are not immediately consumable until transactions timeout. :param transactional_id_prefix: The transactional id prefix. """ self._j_builder.setTransactionalIdPrefix(transactional_id_prefix) return self def set_record_serializer(self, record_serializer: 'KafkaRecordSerializationSchema') \ -> 'KafkaSinkBuilder': """ Sets the :class:`KafkaRecordSerializationSchema` that transforms incoming records to kafka producer records. :param record_serializer: The :class:`KafkaRecordSerializationSchema`. """ # NOTE: If topic selector is a generated first-column selector, do extra preprocessing j_topic_selector = get_field_value(record_serializer._j_serialization_schema, 'topicSelector') if ( j_topic_selector.getClass().getCanonicalName() == 'org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaBuilder.' 'CachingTopicSelector' ) and ( get_field_value(j_topic_selector, 'topicSelector').getClass().getCanonicalName() is not None and (get_field_value(j_topic_selector, 'topicSelector').getClass().getCanonicalName() .startswith('com.sun.proxy') or get_field_value(j_topic_selector, 'topicSelector').getClass().getCanonicalName() .startswith('jdk.proxy')) ): record_serializer._wrap_serialization_schema() self._preprocessing = record_serializer._build_preprocessing() self._j_builder.setRecordSerializer(record_serializer._j_serialization_schema) return self def set_property(self, key: str, value: str) -> 'KafkaSinkBuilder': """ Sets kafka producer config. :param key: Kafka producer config key. :param value: Kafka producer config value. """ self._j_builder.setProperty(key, value) return self
[docs]class KafkaRecordSerializationSchema(SerializationSchema): """ A serialization schema which defines how to convert the stream record to kafka producer record. .. versionadded:: 1.16.0 """ def __init__(self, j_serialization_schema, topic_selector: Optional['KafkaTopicSelector'] = None): super().__init__(j_serialization_schema) self._topic_selector = topic_selector @staticmethod def builder() -> 'KafkaRecordSerializationSchemaBuilder': """ Creates a default schema builder to provide common building blocks i.e. key serialization, value serialization, topic selection. """ return KafkaRecordSerializationSchemaBuilder() def _wrap_serialization_schema(self): jvm = get_gateway().jvm def _wrap_schema(field_name): j_schema_field = get_field(self._j_serialization_schema.getClass(), field_name) if j_schema_field.get(self._j_serialization_schema) is not None: j_schema_field.set( self._j_serialization_schema, jvm.org.apache.flink.python.util.PythonConnectorUtils .SecondColumnSerializationSchema( j_schema_field.get(self._j_serialization_schema) ) ) _wrap_schema('keySerializationSchema') _wrap_schema('valueSerializationSchema') def _build_preprocessing(self) -> StreamTransformer: class SelectTopicTransformer(StreamTransformer): def __init__(self, topic_selector: KafkaTopicSelector): self._topic_selector = topic_selector def apply(self, ds): output_type = Types.ROW([Types.STRING(), ds.get_type()]) return ds.map(lambda v: Row(self._topic_selector.apply(v), v), output_type=output_type) return SelectTopicTransformer(self._topic_selector)
[docs]class KafkaRecordSerializationSchemaBuilder(object): """ Builder to construct :class:`KafkaRecordSerializationSchema`. Example: :: >>> KafkaRecordSerializationSchema.builder() \\ ... .set_topic('topic') \\ ... .set_key_serialization_schema(SimpleStringSchema()) \\ ... .set_value_serialization_schema(SimpleStringSchema()) \\ ... .build() And the sink topic can be calculated dynamically from each record: :: >>> KafkaRecordSerializationSchema.builder() \\ ... .set_topic_selector(lambda row: 'topic-' + row['category']) \\ ... .set_value_serialization_schema( ... JsonRowSerializationSchema.builder().with_type_info(ROW_TYPE).build()) \\ ... .build() It is necessary to configure exactly one serialization method for the value and a topic. .. versionadded:: 1.16.0 """ def __init__(self): jvm = get_gateway().jvm self._j_builder = jvm.org.apache.flink.connector.kafka.sink \ .KafkaRecordSerializationSchemaBuilder() self._fixed_topic = True # type: bool self._topic_selector = None # type: Optional[KafkaTopicSelector] self._key_serialization_schema = None # type: Optional[SerializationSchema] self._value_serialization_schema = None # type: Optional[SerializationSchema] def build(self) -> 'KafkaRecordSerializationSchema': """ Constructs the :class:`KafkaRecordSerializationSchemaBuilder` with the configured properties. """ if self._fixed_topic: return KafkaRecordSerializationSchema(self._j_builder.build()) else: return KafkaRecordSerializationSchema(self._j_builder.build(), self._topic_selector) def set_topic(self, topic: str) -> 'KafkaRecordSerializationSchemaBuilder': """ Sets a fixed topic which used as destination for all records. :param topic: The fixed topic. """ self._j_builder.setTopic(topic) self._fixed_topic = True return self def set_topic_selector(self, topic_selector: Union[Callable[[Any], str], 'KafkaTopicSelector'])\ -> 'KafkaRecordSerializationSchemaBuilder': """ Sets a topic selector which computes the target topic for every incoming record. :param topic_selector: A :class:`KafkaTopicSelector` implementation or a function that consumes each incoming record and return the topic string. """ if not isinstance(topic_selector, KafkaTopicSelector) and not callable(topic_selector): raise TypeError('topic_selector must be KafkaTopicSelector or a callable') if not isinstance(topic_selector, KafkaTopicSelector): class TopicSelectorFunctionAdapter(KafkaTopicSelector): def __init__(self, f: Callable[[Any], str]): self._f = f def apply(self, data) -> str: return self._f(data) topic_selector = TopicSelectorFunctionAdapter(topic_selector) jvm = get_gateway().jvm self._j_builder.setTopicSelector( jvm.org.apache.flink.python.util.PythonConnectorUtils.createFirstColumnTopicSelector( get_java_class(jvm.org.apache.flink.connector.kafka.sink.TopicSelector) ) ) self._fixed_topic = False self._topic_selector = topic_selector return self def set_key_serialization_schema(self, key_serialization_schema: SerializationSchema) \ -> 'KafkaRecordSerializationSchemaBuilder': """ Sets a :class:`SerializationSchema` which is used to serialize the incoming element to the key of the producer record. The key serialization is optional, if not set, the key of the producer record will be null. :param key_serialization_schema: The :class:`SerializationSchema` to serialize each incoming record as the key of producer record. """ self._key_serialization_schema = key_serialization_schema self._j_builder.setKeySerializationSchema(key_serialization_schema._j_serialization_schema) return self def set_value_serialization_schema(self, value_serialization_schema: SerializationSchema) \ -> 'KafkaRecordSerializationSchemaBuilder': """ Sets a :class:`SerializationSchema` which is used to serialize the incoming element to the value of the producer record. The value serialization is required. :param value_serialization_schema: The :class:`SerializationSchema` to serialize each data record as the key of producer record. """ self._value_serialization_schema = value_serialization_schema self._j_builder.setValueSerializationSchema( value_serialization_schema._j_serialization_schema) return self
[docs]class KafkaTopicSelector(ABC): """ Select topic for an incoming record .. versionadded:: 1.16.0 """ @abstractmethod def apply(self, data) -> str: pass