@Internal public class KinesisProxy extends Object implements KinesisProxyInterface
NOTE:
In the AWS KCL library, there is a similar implementation - com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy
.
This implementation differs mainly in that we can make operations to arbitrary Kinesis streams, which is a needed
functionality for the Flink Kinesis Connector since the consumer may simultaneously read from multiple Kinesis streams.
Modifier | Constructor and Description |
---|---|
protected |
KinesisProxy(Properties configProps)
Create a new KinesisProxy based on the supplied configuration properties.
|
Modifier and Type | Method and Description |
---|---|
static KinesisProxyInterface |
create(Properties configProps)
Creates a Kinesis proxy.
|
protected com.amazonaws.services.kinesis.AmazonKinesis |
createKinesisClient(Properties configProps)
Create the Kinesis client, using the provided configuration properties and default
ClientConfiguration . |
protected com.amazonaws.services.kinesis.model.DescribeStreamResult |
describeStream(String streamName,
String startShardId)
Get metainfo for a Kinesis stream, which contains information about which shards this
Kinesis stream possess.
|
protected static long |
fullJitterBackoff(long base,
long max,
double power,
int attempt) |
com.amazonaws.services.kinesis.model.GetRecordsResult |
getRecords(String shardIterator,
int maxRecordsToGet)
Get the next batch of data records using a specific shard iterator.
|
String |
getShardIterator(StreamShardHandle shard,
String shardIteratorType,
Object startingMarker)
Get a shard iterator from the specified position in a shard.
|
GetShardListResult |
getShardList(Map<String,String> streamNamesWithLastSeenShardIds)
Get shard list of multiple Kinesis streams, ignoring the
shards of each stream before a specified last seen shard id.
|
protected static boolean |
isRecoverableException(com.amazonaws.AmazonServiceException ex)
Determines whether the exception is recoverable using exponential-backoff.
|
protected boolean |
isRecoverableSdkClientException(com.amazonaws.SdkClientException ex)
Determines whether the exception is recoverable using exponential-backoff.
|
protected KinesisProxy(Properties configProps)
configProps
- configuration properties containing AWS credential and AWS region infoprotected com.amazonaws.services.kinesis.AmazonKinesis createKinesisClient(Properties configProps)
ClientConfiguration
.
Derived classes can override this method to customize the client configuration.public static KinesisProxyInterface create(Properties configProps)
configProps
- configuration propertiespublic com.amazonaws.services.kinesis.model.GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException
getRecords
in interface KinesisProxyInterface
shardIterator
- a shard iterator that encodes info about which shard to read and where to start readingmaxRecordsToGet
- the maximum amount of records to retrieve for this batchInterruptedException
- this method will retry with backoff if AWS Kinesis complains that the
operation has exceeded the rate limit; this exception will be thrown
if the backoff is interrupted.public GetShardListResult getShardList(Map<String,String> streamNamesWithLastSeenShardIds) throws InterruptedException
getShardList
in interface KinesisProxyInterface
streamNamesWithLastSeenShardIds
- a map with stream as key, and last seen shard id as valueInterruptedException
- this method will retry with backoff if AWS Kinesis complains that the
operation has exceeded the rate limit; this exception will be thrown
if the backoff is interrupted.public String getShardIterator(StreamShardHandle shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException
KinesisProxyInterface.getRecords(String, int)
}
to read data from the Kinesis shard.getShardIterator
in interface KinesisProxyInterface
shard
- the shard to get the iteratorshardIteratorType
- the iterator type, defining how the shard is to be iterated
(one of: TRIM_HORIZON, LATEST, AT_TIMESTAMP, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER)startingMarker
- should be null
if shardIteratorType is TRIM_HORIZON or LATEST,
should be a Date
value if shardIteratorType is AT_TIMESTAMP,
should be a String
representing the sequence number if shardIteratorType is AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBERInterruptedException
- this method will retry with backoff if AWS Kinesis complains that the
operation has exceeded the rate limit; this exception will be thrown
if the backoff is interrupted.protected boolean isRecoverableSdkClientException(com.amazonaws.SdkClientException ex)
ex
- Exception to inspecttrue
if the exception can be recovered from, else
false
protected static boolean isRecoverableException(com.amazonaws.AmazonServiceException ex)
ex
- Exception to inspecttrue
if the exception can be recovered from, else
false
protected com.amazonaws.services.kinesis.model.DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException
This method is using a "full jitter" approach described in AWS's article, "Exponential Backoff and Jitter". This is necessary because concurrent calls will be made by all parallel subtask's fetcher. This jitter backoff approach will help distribute calls across the fetchers over time.
streamName
- the stream to describestartShardId
- which shard to start with for this describe operationInterruptedException
protected static long fullJitterBackoff(long base, long max, double power, int attempt)
Copyright © 2014–2020 The Apache Software Foundation. All rights reserved.