IN
- Type of the elements emitted by this sink@PublicEvolving public class StreamingFileSink<IN> extends RichSinkFunction<IN> implements CheckpointedFunction, CheckpointListener, ProcessingTimeCallback
FileSystem
files within buckets. This is
integrated with the checkpointing mechanism to provide exactly once semantics.
When creating the sink a basePath
must be specified. The base directory contains
one directory for every bucket. The bucket directories themselves contain several part files,
with at least one for each parallel subtask of the sink which is writing data to that bucket.
These part files contain the actual output data.
The sink uses a BucketAssigner
to determine in which bucket directory each element should
be written to inside the base directory. The BucketAssigner
can, for example, use time or
a property of the element to determine the bucket directory. The default BucketAssigner
is a
DateTimeBucketAssigner
which will create one new bucket every hour. You can specify
a custom BucketAssigner
using the setBucketAssigner(bucketAssigner)
method, after calling
forRowFormat(Path, Encoder)
or
forBulkFormat(Path, BulkWriter.Factory)
.
The filenames of the part files contain the part prefix, "part-", the parallel subtask index of the sink
and a rolling counter. For example the file "part-1-17"
contains the data from
subtask 1
of the sink and is the 17th
bucket created by that subtask.
Part files roll based on the user-specified RollingPolicy
. By default, a DefaultRollingPolicy
is used.
In some scenarios, the open buckets are required to change based on time. In these cases, the user
can specify a bucketCheckInterval
(by default 1m) and the sink will check periodically and roll
the part file if the specified rolling policy says so.
Part files can be in one of three states: in-progress
, pending
or finished
.
The reason for this is how the sink works together with the checkpointing mechanism to provide exactly-once
semantics and fault-tolerance. The part file that is currently being written to is in-progress
. Once
a part file is closed for writing it becomes pending
. When a checkpoint is successful the currently
pending files will be moved to finished
.
If case of a failure, and in order to guarantee exactly-once semantics, the sink should roll back to the state it
had when that last successful checkpoint occurred. To this end, when restoring, the restored files in pending
state are transferred into the finished
state while any in-progress
files are rolled back, so that
they do not contain data that arrived after the checkpoint from which we restore.
Modifier and Type | Class and Description |
---|---|
static class |
StreamingFileSink.BulkFormatBuilder<IN,BucketID>
A builder for configuring the sink for bulk-encoding formats, e.g.
|
static class |
StreamingFileSink.RowFormatBuilder<IN,BucketID>
A builder for configuring the sink for row-wise encoding formats.
|
SinkFunction.Context<T>
Modifier and Type | Method and Description |
---|---|
void |
close()
Tear-down method for the user code.
|
static <IN> StreamingFileSink.BulkFormatBuilder<IN,String> |
forBulkFormat(Path basePath,
BulkWriter.Factory<IN> writerFactory)
Creates the builder for a
StreamingFileSink with row-encoding format. |
static <IN> StreamingFileSink.RowFormatBuilder<IN,String> |
forRowFormat(Path basePath,
Encoder<IN> encoder)
Creates the builder for a
StreamingFileSink with row-encoding format. |
void |
initializeState(FunctionInitializationContext context)
This method is called when the parallel function instance is created during distributed
execution.
|
void |
invoke(IN value,
SinkFunction.Context context)
Writes the given value to the sink.
|
void |
notifyCheckpointComplete(long checkpointId)
This method is called as a notification once a distributed checkpoint has been completed.
|
void |
onProcessingTime(long timestamp)
This method is invoked with the timestamp for which the trigger was scheduled.
|
void |
open(Configuration parameters)
Initialization method for the function.
|
void |
snapshotState(FunctionSnapshotContext context)
This method is called when a snapshot for a checkpoint is requested.
|
getIterationRuntimeContext, getRuntimeContext, setRuntimeContext
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
invoke
public static <IN> StreamingFileSink.RowFormatBuilder<IN,String> forRowFormat(Path basePath, Encoder<IN> encoder)
StreamingFileSink
with row-encoding format.IN
- the type of incoming elementsbasePath
- the base path where all the buckets are going to be created as sub-directories.encoder
- the Encoder
to be used when writing elements in the buckets.StreamingFileSink.RowFormatBuilder.build()
after specifying the desired parameters.public static <IN> StreamingFileSink.BulkFormatBuilder<IN,String> forBulkFormat(Path basePath, BulkWriter.Factory<IN> writerFactory)
StreamingFileSink
with row-encoding format.IN
- the type of incoming elementsbasePath
- the base path where all the buckets are going to be created as sub-directories.writerFactory
- the BulkWriter.Factory
to be used when writing elements in the buckets.StreamingFileSink.RowFormatBuilder.build()
after specifying the desired parameters.public void initializeState(FunctionInitializationContext context) throws Exception
CheckpointedFunction
initializeState
in interface CheckpointedFunction
context
- the context for initializing the operatorException
public void notifyCheckpointComplete(long checkpointId) throws Exception
CheckpointListener
notifyCheckpointComplete
in interface CheckpointListener
checkpointId
- The ID of the checkpoint that has been completed.Exception
public void snapshotState(FunctionSnapshotContext context) throws Exception
CheckpointedFunction
FunctionInitializationContext
when
the Function was initialized, or offered now by FunctionSnapshotContext
itself.snapshotState
in interface CheckpointedFunction
context
- the context for drawing a snapshot of the operatorException
public void open(Configuration parameters) throws Exception
RichFunction
The configuration object passed to the function can be used for configuration and initialization. The configuration contains all parameters that were configured on the function in the program composition.
public class MyMapper extends FilterFunction<String> {
private String searchString;
public void open(Configuration parameters) {
this.searchString = parameters.getString("foo");
}
public boolean filter(String value) {
return value.equals(searchString);
}
}
By default, this method does nothing.
open
in interface RichFunction
open
in class AbstractRichFunction
parameters
- The configuration containing the parameters attached to the contract.Exception
- Implementations may forward exceptions, which are caught by the runtime. When the
runtime catches an exception, it aborts the task and lets the fail-over logic
decide whether to retry the task execution.Configuration
public void onProcessingTime(long timestamp) throws Exception
ProcessingTimeCallback
If the triggering is delayed for whatever reason (trigger timer was blocked, JVM stalled due to a garbage collection), the timestamp supplied to this function will still be the original timestamp for which the trigger was scheduled.
onProcessingTime
in interface ProcessingTimeCallback
timestamp
- The timestamp for which the trigger event was scheduled.Exception
public void invoke(IN value, SinkFunction.Context context) throws Exception
SinkFunction
You have to override this method when implementing a SinkFunction
, this is a
default
method for backward compatibility with the old-style method only.
invoke
in interface SinkFunction<IN>
value
- The input record.context
- Additional context about the input record.Exception
- This method may throw exceptions. Throwing an exception will cause the operation
to fail and may trigger recovery.public void close() throws Exception
RichFunction
This method can be used for clean up work.
close
in interface RichFunction
close
in class AbstractRichFunction
Exception
- Implementations may forward exceptions, which are caught by the runtime. When the
runtime catches an exception, it aborts the task and lets the fail-over logic
decide whether to retry the task execution.Copyright © 2014–2020 The Apache Software Foundation. All rights reserved.