T
- Type of the elements emitted by this sinkpublic class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConfigurable, Checkpointed<org.apache.flink.streaming.connectors.fs.RollingSink.BucketState>, CheckpointListener
FileSystem
files. This
is itegrated with the checkpointing mechanism to provide exactly once semantics.
When creating the sink a basePath
must be specified. The base directory contains
one directory for every bucket. The bucket directories themselves contain several part files.
These contain the actual written data.
The sink uses a Bucketer
to determine the name of bucket directories inside the
base directory. Whenever the Bucketer
returns a different directory name than
it returned before the sink will close the current part files inside that bucket
and start the new bucket directory. The default bucketer is a DateTimeBucketer
with
date format string ""yyyy-MM-dd--HH"
. You can specify a custom Bucketer
using setBucketer(Bucketer)
. For example, use
NonRollingBucketer
if you don't want to have
buckets but still write part files in a fault-tolerant way.
The filenames of the part files contain the part prefix, the parallel subtask index of the sink
and a rolling counter, for example "part-1-17"
. Per default the part prefix is
"part"
but this can be
configured using setPartPrefix(String)
. When a part file becomes bigger
than the batch size the current part file is closed, the part counter is increased and
a new part file is created. The batch size defaults to 384MB
, this can be configured
using setBatchSize(long)
.
Part files can be in one of three states: in-progress, pending or finished. The reason for this
is how the sink works together with the checkpointing mechanism to provide exactly-once semantics
and fault-tolerance. The part file that is currently being written to is in-progress. Once
a part file is closed for writing it becomes pending. When a checkpoint is successful the
currently pending files will be moved to finished. If a failure occurs the pending files
will be deleted to reset state to the last checkpoint. The data in in-progress files will
also have to be rolled back. If the FileSystem
supports the truncate
call
this will be used to reset the file back to a previous state. If not, a special file
with the same name as the part file and the suffix ".valid-length"
will be written
that contains the length up to which the file contains valid data. When reading the file
it must be ensured that it is only read up to that point. The prefixes and suffixes for
the different file states and valid-length files can be configured, for example with
setPendingSuffix(String)
.
Note: If checkpointing is not enabled the pending files will never be moved to the finished state.
In that case, the pending suffix/prefix can be set to ""
to make the sink work
in a non-fault-tolerant way but still provide output without prefixes and suffixes.
The part files are written using an instance of Writer
. By default
StringWriter
is used, which writes the result
of toString()
for every element. Separated by newlines. You can configure the writer
using setWriter(Writer)
. For example,
SequenceFileWriter
can be used to write
Hadoop SequenceFiles
.
Example:
new RollingSink<Tuple2<IntWritable, Text>>(outPath)
.setWriter(new SequenceFileWriter<IntWritable, Text>())
.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")
This will create a sink that writes to SequenceFiles
and rolls every minute.DateTimeBucketer
,
StringWriter
,
SequenceFileWriter
,
Serialized FormConstructor and Description |
---|
RollingSink(String basePath)
Creates a new
RollingSink that writes files to the given base directory. |
Modifier and Type | Method and Description |
---|---|
void |
close()
Tear-down method for the user code.
|
RollingSink<T> |
disableCleanupOnOpen()
Disable cleanup of leftover in-progress/pending files when the sink is opened.
|
void |
invoke(T value)
Function for standard sink behaviour.
|
void |
notifyCheckpointComplete(long checkpointId)
This method is called as a notification once a distributed checkpoint has been completed.
|
void |
open(Configuration parameters)
Initialization method for the function.
|
void |
restoreState(org.apache.flink.streaming.connectors.fs.RollingSink.BucketState state)
Restores the state of the function or operator to that of a previous checkpoint.
|
RollingSink<T> |
setAsyncTimeout(long timeout)
Sets the default timeout for asynchronous operations such as recoverLease and truncate.
|
RollingSink<T> |
setBatchSize(long batchSize)
Sets the maximum bucket size in bytes.
|
RollingSink<T> |
setBucketer(Bucketer bucketer)
Sets the
Bucketer to use for determining the bucket files to write to. |
RollingSink<T> |
setInProgressPrefix(String inProgressPrefix)
Sets the prefix of in-progress part files.
|
RollingSink<T> |
setInProgressSuffix(String inProgressSuffix)
Sets the suffix of in-progress part files.
|
void |
setInputType(TypeInformation<?> type,
ExecutionConfig executionConfig)
Method that is called on an
OutputFormat when it is passed to
the DataSet's output method. |
RollingSink<T> |
setPartPrefix(String partPrefix)
Sets the prefix of part files.
|
RollingSink<T> |
setPendingPrefix(String pendingPrefix)
Sets the prefix of pending part files.
|
RollingSink<T> |
setPendingSuffix(String pendingSuffix)
Sets the suffix of pending part files.
|
RollingSink<T> |
setValidLengthPrefix(String validLengthPrefix)
Sets the prefix of valid-length files.
|
RollingSink<T> |
setValidLengthSuffix(String validLengthSuffix)
Sets the suffix of valid-length files.
|
RollingSink<T> |
setWriter(Writer<T> writer)
Sets the
Writer to be used for writing the incoming elements to bucket files. |
org.apache.flink.streaming.connectors.fs.RollingSink.BucketState |
snapshotState(long checkpointId,
long checkpointTimestamp)
Gets the current state of the function of operator.
|
getIterationRuntimeContext, getRuntimeContext, setRuntimeContext
public RollingSink(String basePath)
RollingSink
that writes files to the given base directory.
This uses aDateTimeBucketer
as bucketer and a StringWriter
has writer.
The maximum bucket size is set to 384 MB.
basePath
- The directory to which to write the bucket files.public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig)
InputTypeConfigurable
OutputFormat
when it is passed to
the DataSet's output method. May be used to configures the output format based on the data type.setInputType
in interface InputTypeConfigurable
type
- The data type of the input.executionConfig
- The execution config for this parallel execution.public void open(Configuration parameters) throws Exception
RichFunction
The configuration object passed to the function can be used for configuration and initialization. The configuration contains all parameters that were configured on the function in the program composition.
public class MyMapper extends FilterFunction<String> {
private String searchString;
public void open(Configuration parameters) {
this.searchString = parameters.getString("foo");
}
public boolean filter(String value) {
return value.equals(searchString);
}
}
By default, this method does nothing.
open
in interface RichFunction
open
in class AbstractRichFunction
parameters
- The configuration containing the parameters attached to the contract.Exception
- Implementations may forward exceptions, which are caught by the runtime. When the
runtime catches an exception, it aborts the task and lets the fail-over logic
decide whether to retry the task execution.Configuration
public void close() throws Exception
RichFunction
This method can be used for clean up work.
close
in interface RichFunction
close
in class AbstractRichFunction
Exception
- Implementations may forward exceptions, which are caught by the runtime. When the
runtime catches an exception, it aborts the task and lets the fail-over logic
decide whether to retry the task execution.public void invoke(T value) throws Exception
SinkFunction
invoke
in interface SinkFunction<T>
invoke
in class RichSinkFunction<T>
value
- The input record.Exception
public void notifyCheckpointComplete(long checkpointId) throws Exception
CheckpointListener
notifyCheckpointComplete
in interface CheckpointListener
checkpointId
- The ID of the checkpoint that has been completed.Exception
public org.apache.flink.streaming.connectors.fs.RollingSink.BucketState snapshotState(long checkpointId, long checkpointTimestamp) throws Exception
Checkpointed
snapshotState
in interface Checkpointed<org.apache.flink.streaming.connectors.fs.RollingSink.BucketState>
checkpointId
- The ID of the checkpoint.checkpointTimestamp
- The timestamp of the checkpoint, as derived by
System.currentTimeMillis() on the JobManager.Exception
- Thrown if the creation of the state object failed. This causes the
checkpoint to fail. The system may decide to fail the operation (and trigger
recovery), or to discard this checkpoint attempt and to continue running
and to try again with the next checkpoint attempt.public void restoreState(org.apache.flink.streaming.connectors.fs.RollingSink.BucketState state)
Checkpointed
restoreState
in interface Checkpointed<org.apache.flink.streaming.connectors.fs.RollingSink.BucketState>
state
- The state to be restored.public RollingSink<T> setBatchSize(long batchSize)
When a bucket part file becomes larger than this size a new bucket part file is started and
the old one is closed. The name of the bucket files depends on the Bucketer
.
batchSize
- The bucket part file size in bytes.public RollingSink<T> setBucketer(Bucketer bucketer)
Bucketer
to use for determining the bucket files to write to.bucketer
- The bucketer to use.public RollingSink<T> setWriter(Writer<T> writer)
Writer
to be used for writing the incoming elements to bucket files.writer
- The Writer
to use.public RollingSink<T> setInProgressSuffix(String inProgressSuffix)
"in-progress"
.public RollingSink<T> setInProgressPrefix(String inProgressPrefix)
"_"
.public RollingSink<T> setPendingSuffix(String pendingSuffix)
".pending"
.public RollingSink<T> setPendingPrefix(String pendingPrefix)
"_"
.public RollingSink<T> setValidLengthSuffix(String validLengthSuffix)
".valid-length"
.public RollingSink<T> setValidLengthPrefix(String validLengthPrefix)
"_"
.public RollingSink<T> setPartPrefix(String partPrefix)
"part"
.public RollingSink<T> disableCleanupOnOpen()
This should only be disabled if using the sink without checkpoints, to not remove the files already in the directory.
public RollingSink<T> setAsyncTimeout(long timeout)
timeout
- The timeout, in milliseconds.Copyright © 2014–2017 The Apache Software Foundation. All rights reserved.