Apache Kafka

Apache Kafka #

Stateful Functions offers an Apache Kafka I/O Module for reading from and writing to Kafka topics. It is based on Apache Flink’s universal Kafka connector and provides exactly-once processing semantics. Kafka is configured in the module specification of your application.

Kafka Ingress Spec #

A Kafka ingress defines an input point that reads records from one or more topics.

version: "3.0"

module:
  meta:
    type: remote
spec:
  ingresses:
  - ingress:
      meta:
        type: io.statefun.kafka/ingress
        id: com.example/users
      spec:
        address: kafka-broker:9092
        consumerGroupId: my-consumer-group
        startupPosition:
          type: earliest
        topics:
          - topic: messages-1
            valueType: com.example/User
            targets:
              - com.example.fns/greeter

The ingress also accepts properties to directly configure the Kafka client, using ingress.spec.properties. Please refer to the Kafka consumer configuration documentation for the full list of available properties. Note that configuration passed using named paths, such as ingress.spec.address, will have higher precedence and overwrite their respective settings in the provided properties.

Startup Position #

The ingress allows configuring the startup position to be one of the following:

From Group Offset (default) #

Starts from offsets that were committed to Kafka for the specified consumer group.

startupPosition:
  type: group-offsets

Earlist #

Starts from the earliest offset.

startupPosition:
  type: earliest

Latest #

Starts from the latest offset.

startupPosition:
  type: latest

Specific Offsets #

Starts from specific offsets, defined as a map of partitions to their target starting offset.

startupPosition:
  type: specific-offsets
  offsets:
    - user-topic/0: 91
    - user-topic/1: 11
    - user-topic/2: 8

Date #

Starts from offsets that have an ingestion time larger than or equal to a specified date.

startupPosition:
  type: date
  date: 2020-02-01 04:15:00.00 Z

On startup, if the specified startup offset for a partition is out-of-range or does not exist (which may be the case if the ingress is configured to start from group offsets, specific offsets, or from a date), then the ingress will fallback to using the position configured using ingress.spec.autoOffsetResetPosition which may be set to either latest or earliest. By default, this is set to be the latest position.

Kafka Egress Spec #

A Kafka egress defines an input point where functions can write out records to one or more topics.

version: "3.0"

module:
  meta:
    type: remote
spec:
  egresses:
    - egress:
        meta:
          type: io.statefun.kafka/egress
          id: example/output-messages
       spec:
         address: kafka-broker:9092
         deliverySemantic:
           type: exactly-once
           transactionTimeoutMillis: 100000
         properties:
           - foo.config: bar

Please refer to the Kafka producer configuration documentation for the full list of available properties.

Kafka Egress and Fault Tolerance #

With fault tolerance enabled, the Kafka egress can provide exactly-once delivery guarantees. You can choose three different modes of operation.

None #

Nothing is guaranteed, produced records can be lost or duplicated.

deliverySemantic:
  type: none

At Least Once #

Stateful Functions will guarantee that no records will be lost but they can be duplicated.

deliverySemantic:
  type: at-least-once

Exactly Once #

Stateful Functions uses Kafka transactions to provide exactly-once semantics.

deliverySemantic:
  type: exactly-once
  transactionTimeoutMillis: 900000 # 15 min

Writing To Kafka #

Functions write directly to Kafka from their SDK context. See SDK specific documentation for more details.

@functions.bind(
    typename='com.example/greeter'
    specs=[ValueSpec(name='seen'), type=IntType])
def fun(context: Context, message: Message):
    # todo
    pass
public class Greeter implements StatefulFunction {
    @Override
    public CompletableFuture<Void> apply(Context context, Message message) {
        context.send(
            KafkaEgressBuilder.forEgress(GREETS_EGRESS)
                .withTopic("output-topic")
                .withKey(...)
                .withValue(...));

        return context.done();
    }
}