T
- The type of elements deserialized from Kafka's byte records, and emitted into
the Flink data streams.KPH
- The type of topic/partition identifier used by Kafka in the specific version.public abstract class AbstractFetcher<T,KPH> extends Object
This fetcher base class implements the logic around emitting records and tracking offsets, as well as around the optional timestamp assignment and watermark generation.
Modifier and Type | Field and Description |
---|---|
protected Object |
checkpointLock
The lock that guarantees that record emission and state updates are atomic,
from the view of taking a checkpoint.
|
protected static int |
NO_TIMESTAMPS_WATERMARKS |
protected static int |
PERIODIC_WATERMARKS |
protected static int |
PUNCTUATED_WATERMARKS |
protected SourceFunction.SourceContext<T> |
sourceContext
The source context to emit records and watermarks to.
|
protected int |
timestampWatermarkMode
The mode describing whether the fetcher also generates timestamps and watermarks.
|
protected ClosableBlockingQueue<KafkaTopicPartitionState<KPH>> |
unassignedPartitionsQueue
Queue of partitions that are not yet assigned to any Kafka clients for consuming.
|
Modifier | Constructor and Description |
---|---|
protected |
AbstractFetcher(SourceFunction.SourceContext<T> sourceContext,
Map<KafkaTopicPartition,Long> seedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
MetricGroup consumerMetricGroup,
boolean useMetrics) |
Modifier and Type | Method and Description |
---|---|
void |
addDiscoveredPartitions(List<KafkaTopicPartition> newPartitions)
Adds a list of newly discovered partitions to the fetcher for consuming.
|
abstract void |
cancel() |
void |
commitInternalOffsetsToKafka(Map<KafkaTopicPartition,Long> offsets,
KafkaCommitCallback commitCallback)
Commits the given partition offsets to the Kafka brokers (or to ZooKeeper for
older Kafka versions).
|
protected abstract KPH |
createKafkaPartitionHandle(KafkaTopicPartition partition)
Creates the Kafka version specific representation of the given
topic partition.
|
protected abstract void |
doCommitInternalOffsetsToKafka(Map<KafkaTopicPartition,Long> offsets,
KafkaCommitCallback commitCallback) |
protected void |
emitRecord(T record,
KafkaTopicPartitionState<KPH> partitionState,
long offset)
Emits a record without attaching an existing timestamp to it.
|
protected void |
emitRecordWithTimestamp(T record,
KafkaTopicPartitionState<KPH> partitionState,
long offset,
long timestamp)
Emits a record attaching a timestamp to it.
|
protected void |
emitRecordWithTimestampAndPeriodicWatermark(T record,
KafkaTopicPartitionState<KPH> partitionState,
long offset,
long kafkaEventTimestamp)
Record emission, if a timestamp will be attached from an assigner that is
also a periodic watermark generator.
|
protected void |
emitRecordWithTimestampAndPunctuatedWatermark(T record,
KafkaTopicPartitionState<KPH> partitionState,
long offset,
long kafkaEventTimestamp)
Record emission, if a timestamp will be attached from an assigner that is
also a punctuated watermark generator.
|
abstract void |
runFetchLoop() |
HashMap<KafkaTopicPartition,Long> |
snapshotCurrentState()
Takes a snapshot of the partition offsets.
|
protected List<KafkaTopicPartitionState<KPH>> |
subscribedPartitionStates()
Gets all partitions (with partition state) that this fetcher is subscribed to.
|
protected static final int NO_TIMESTAMPS_WATERMARKS
protected static final int PERIODIC_WATERMARKS
protected static final int PUNCTUATED_WATERMARKS
protected final SourceFunction.SourceContext<T> sourceContext
protected final Object checkpointLock
protected final ClosableBlockingQueue<KafkaTopicPartitionState<KPH>> unassignedPartitionsQueue
runFetchLoop()
should continuously poll this queue for unassigned partitions, and start consuming
them accordingly.
All partitions added to this queue are guaranteed to have been added
to subscribedPartitionStates
already.
protected final int timestampWatermarkMode
protected AbstractFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition,Long> seedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception
Exception
public void addDiscoveredPartitions(List<KafkaTopicPartition> newPartitions) throws IOException, ClassNotFoundException
This method creates the partition state holder for each new partition, using
KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET
as the starting offset.
It uses the earliest offset because there may be delay in discovering a partition
after it was created and started receiving records.
After the state representation for a partition is created, it is added to the unassigned partitions queue to await to be consumed.
newPartitions
- discovered partitions to addIOException
ClassNotFoundException
protected final List<KafkaTopicPartitionState<KPH>> subscribedPartitionStates()
public abstract void cancel()
public final void commitInternalOffsetsToKafka(Map<KafkaTopicPartition,Long> offsets, @Nonnull KafkaCommitCallback commitCallback) throws Exception
OffsetCommitMode.ON_CHECKPOINTS
.
The given offsets are the internal checkpointed offsets, representing the last processed record of each partition. Version-specific implementations of this method need to hold the contract that the given offsets must be incremented by 1 before committing them, so that committed offsets to Kafka represent "the next record to process".
offsets
- The offsets to commit to Kafka (implementations must increment offsets by 1 before committing).commitCallback
- The callback that the user should trigger when a commit request completes or fails.Exception
- This method forwards exceptions.protected abstract void doCommitInternalOffsetsToKafka(Map<KafkaTopicPartition,Long> offsets, @Nonnull KafkaCommitCallback commitCallback) throws Exception
Exception
protected abstract KPH createKafkaPartitionHandle(KafkaTopicPartition partition)
partition
- The Flink representation of the Kafka topic partition.public HashMap<KafkaTopicPartition,Long> snapshotCurrentState()
Important: This method must be called under the checkpoint lock.
protected void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) throws Exception
Implementation Note: This method is kept brief to be JIT inlining friendly. That makes the fast path efficient, the extended paths are called as separate methods.
record
- The record to emitpartitionState
- The state of the Kafka partition from which the record was fetchedoffset
- The offset of the recordException
protected void emitRecordWithTimestamp(T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long timestamp) throws Exception
Implementation Note: This method is kept brief to be JIT inlining friendly. That makes the fast path efficient, the extended paths are called as separate methods.
record
- The record to emitpartitionState
- The state of the Kafka partition from which the record was fetchedoffset
- The offset of the recordException
protected void emitRecordWithTimestampAndPeriodicWatermark(T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp)
protected void emitRecordWithTimestampAndPunctuatedWatermark(T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp)
Copyright © 2014–2018 The Apache Software Foundation. All rights reserved.