OUT
- OP
- @Internal public abstract class StreamTask<OUT,OP extends StreamOperator<OUT>> extends AbstractInvokable implements AsyncExceptionHandler
StreamOperator
s which form
the Task's operator chain. Operators that are chained together execute synchronously in the
same thread and hence on the same stream partition. A common case for these chains
are successive map/flatmap/filter tasks.
The task chain contains one "head" operator and multiple chained operators. The StreamTask is specialized for the type of the head operator: one-input and two-input tasks, as well as for sources, iteration heads and iteration tails.
The Task class deals with the setup of the streams read by the head operator, and the streams produced by the operators at the ends of the operator chain. Note that the chain may fork and thus have multiple ends.
The life cycle of the task is set up as follows:
-- setInitialState -> provides state of all operators in the chain
-- invoke()
|
+----> Create basic utils (config, etc) and load the chain of operators
+----> operators.setup()
+----> task specific init()
+----> initialize-operator-states()
+----> open-operators()
+----> run()
+----> close-operators()
+----> dispose-operators()
+----> common cleanup
+----> task specific cleanup()
The StreamTask
has a lock object called lock
. All calls to methods on a
StreamOperator
must be synchronized on this lock object to ensure that no methods
are called concurrently.
Modifier and Type | Class and Description |
---|---|
protected static class |
StreamTask.AsyncCheckpointRunnable
This runnable executes the asynchronous parts of all involved backend snapshots for the subtask.
|
Modifier and Type | Field and Description |
---|---|
protected StreamConfig |
configuration
The configuration of this streaming task.
|
protected OP |
headOperator
the head operator that consumes the input streams of this task.
|
protected StreamInputProcessor |
inputProcessor
The input processor.
|
protected static org.slf4j.Logger |
LOG
The logger used by the StreamTask and its subclasses.
|
protected MailboxProcessor |
mailboxProcessor |
protected OperatorChain<OUT,OP> |
operatorChain
The chain of operators executed by this task.
|
protected StateBackend |
stateBackend
Our state backend.
|
protected TimerService |
timerService
The internal
TimerService used to define the current
processing time (default = System.currentTimeMillis() ) and
register timers for tasks to be executed in the future. |
static ThreadGroup |
TRIGGER_THREAD_GROUP
The thread group that holds all trigger timer threads.
|
Modifier | Constructor and Description |
---|---|
protected |
StreamTask(Environment env)
Constructor for initialization, possibly with initial state (recovery / savepoint / etc).
|
protected |
StreamTask(Environment env,
TimerService timerService)
Constructor for initialization, possibly with initial state (recovery / savepoint / etc).
|
protected |
StreamTask(Environment environment,
TimerService timerService,
Thread.UncaughtExceptionHandler uncaughtExceptionHandler) |
protected |
StreamTask(Environment environment,
TimerService timerService,
Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor actionExecutor)
Constructor for initialization, possibly with initial state (recovery / savepoint / etc).
|
protected |
StreamTask(Environment environment,
TimerService timerService,
Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor actionExecutor,
TaskMailbox mailbox) |
Modifier and Type | Method and Description |
---|---|
void |
abortCheckpointOnBarrier(long checkpointId,
Throwable cause)
Aborts a checkpoint as the result of receiving possibly some checkpoint barriers,
but at least one
CancelCheckpointMarker . |
protected void |
advanceToEndOfEventTime()
Emits the
MAX_WATERMARK
so that all registered timers are fired. |
void |
cancel()
This method is called when a task is canceled either as a result of a user abort or an execution failure.
|
protected void |
cancelTask() |
protected void |
cleanup() |
static <OUT> RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> |
createRecordWriterDelegate(StreamConfig configuration,
Environment environment) |
StreamTaskStateInitializer |
createStreamTaskStateInitializer() |
protected void |
declineCheckpoint(long checkpointId) |
protected void |
finalize()
The finalize method shuts down the timer.
|
protected void |
finishTask()
Instructs the task to go through its normal termination routine, i.e.
|
Map<String,Accumulator<?,?>> |
getAccumulatorMap() |
ExecutorService |
getAsyncOperationsThreadPool() |
CloseableRegistry |
getCancelables() |
Object |
getCheckpointLock()
Deprecated.
This method will be removed in future releases. Use mailbox executor
to run
StreamTask actions that require synchronization (e.g. checkpointing, collecting output).
For other (non- yield or tryYield methods can
be used for actions that should give control to other actions temporarily.
MailboxExecutor can be accessed by using |
CheckpointStorageWorkerView |
getCheckpointStorage() |
protected CompletableFuture<Void> |
getCompletionFuture() |
StreamConfig |
getConfiguration() |
MailboxExecutorFactory |
getMailboxExecutorFactory() |
String |
getName()
Gets the name of the task, in the form "taskname (2/5)".
|
ProcessingTimeService |
getProcessingTimeService(int operatorIndex) |
StreamStatusMaintainer |
getStreamStatusMaintainer() |
void |
handleAsyncException(String message,
Throwable exception)
Handles an exception thrown by another thread (e.g.
|
protected void |
handleCheckpointException(Exception exception) |
protected abstract void |
init() |
void |
invoke()
Starts the execution.
|
boolean |
isCanceled() |
boolean |
isFailing() |
boolean |
isRunning() |
Future<Void> |
notifyCheckpointCompleteAsync(long checkpointId)
Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received
the notification from all participating tasks.
|
protected void |
processInput(MailboxDefaultAction.Controller controller)
This method implements the default action of the task (e.g.
|
protected Counter |
setupNumRecordsInCounter(StreamOperator streamOperator) |
String |
toString() |
Future<Boolean> |
triggerCheckpointAsync(CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
boolean advanceToEndOfEventTime)
This method is called to trigger a checkpoint, asynchronously by the checkpoint
coordinator.
|
void |
triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics)
This method is called when a checkpoint is triggered as a result of receiving checkpoint
barriers on all input streams.
|
getCurrentNumberOfSubtasks, getEnvironment, getExecutionConfig, getIndexInSubtaskGroup, getJobConfiguration, getTaskConfiguration, getUserCodeClassLoader, setShouldInterruptOnCancel, shouldInterruptOnCancel
public static final ThreadGroup TRIGGER_THREAD_GROUP
protected static final org.slf4j.Logger LOG
@Nullable protected StreamInputProcessor inputProcessor
init()
method.protected OP extends StreamOperator<OUT> headOperator
protected OperatorChain<OUT,OP extends StreamOperator<OUT>> operatorChain
protected final StreamConfig configuration
protected StateBackend stateBackend
protected TimerService timerService
TimerService
used to define the current
processing time (default = System.currentTimeMillis()
) and
register timers for tasks to be executed in the future.protected final MailboxProcessor mailboxProcessor
protected StreamTask(Environment env)
env
- The task environment for this task.protected StreamTask(Environment env, @Nullable TimerService timerService)
env
- The task environment for this task.timerService
- Optionally, a specific timer service to use.protected StreamTask(Environment environment, @Nullable TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler)
protected StreamTask(Environment environment, @Nullable TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler, StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor actionExecutor)
This constructor accepts a special TimerService
. By default (and if
null is passes for the timer service) a DefaultTimerService
will be used.
environment
- The task environment for this task.timerService
- Optionally, a specific timer service to use.uncaughtExceptionHandler
- to handle uncaught exceptions in the async operations thread poolactionExecutor
- a mean to wrap all actions performed by this task thread. Currently, only SynchronizedActionExecutor can be used to preserve locking semantics.protected StreamTask(Environment environment, @Nullable TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler, StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor actionExecutor, TaskMailbox mailbox)
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception
controller
- controller object for collaborative interaction between the action and the stream task.Exception
- on any problems in the action.protected void advanceToEndOfEventTime() throws Exception
MAX_WATERMARK
so that all registered timers are fired.
This is used by the source task when the job is TERMINATED
. In the case,
we want all the timers registered throughout the pipeline to fire and the related
state (e.g. windows) to be flushed.
For tasks other than the source task, this method does nothing.
Exception
protected void finishTask() throws Exception
StreamOperator.close()
and StreamOperator.dispose()
on its operators.
This is used by the source task to get out of the run-loop when the job is stopped with a savepoint.
For tasks other than the source task, this method does nothing.
Exception
public StreamTaskStateInitializer createStreamTaskStateInitializer()
protected Counter setupNumRecordsInCounter(StreamOperator streamOperator)
public final void invoke() throws Exception
AbstractInvokable
Must be overwritten by the concrete task implementation. This method is called by the task manager when the actual execution of the task starts.
All resources should be cleaned up when the method returns. Make sure
to guard the code with try-finally
blocks where necessary.
invoke
in class AbstractInvokable
Exception
- Tasks may forward their exceptions for the TaskManager to handle through failure/recovery.protected CompletableFuture<Void> getCompletionFuture()
public final void cancel() throws Exception
AbstractInvokable
cancel
in class AbstractInvokable
Exception
- thrown if any exception occurs during the execution of the user codepublic MailboxExecutorFactory getMailboxExecutorFactory()
public final boolean isRunning()
public final boolean isCanceled()
public final boolean isFailing()
protected void finalize() throws Throwable
This should not be relied upon! It will cause shutdown to happen much later than if manual shutdown is attempted, and cause threads to linger for longer than needed.
public String getName()
@Deprecated public Object getCheckpointLock()
StreamTask
actions that require synchronization (e.g. checkpointing, collecting output).
For other (non-StreamTask
) actions other synchronization means can be used.
yield
or tryYield
methods can
be used for actions that should give control to other actions temporarily.
MailboxExecutor can be accessed by using YieldingOperatorFactory
.
Example usage can be found in AsyncWaitOperator
.
public CheckpointStorageWorkerView getCheckpointStorage()
public StreamConfig getConfiguration()
public Map<String,Accumulator<?,?>> getAccumulatorMap()
public StreamStatusMaintainer getStreamStatusMaintainer()
public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime)
AbstractInvokable
This method is called for tasks that start the checkpoints by injecting the initial barriers,
i.e., the source tasks. In contrast, checkpoints on downstream operators, which are the result of
receiving checkpoint barriers, invoke the AbstractInvokable.triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointOptions, CheckpointMetrics)
method.
triggerCheckpointAsync
in class AbstractInvokable
checkpointMetaData
- Meta data for about this checkpointcheckpointOptions
- Options for performing this checkpointadvanceToEndOfEventTime
- Flag indicating if the source should inject a MAX_WATERMARK
in the pipeline
to fire any registered event-time timersfalse
if the checkpoint was not carried out, true
otherwisepublic void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception
AbstractInvokable
triggerCheckpointOnBarrier
in class AbstractInvokable
checkpointMetaData
- Meta data for about this checkpointcheckpointOptions
- Options for performing this checkpointcheckpointMetrics
- Metrics about this checkpointException
- Exceptions thrown as the result of triggering a checkpoint are forwarded.public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws Exception
AbstractInvokable
CancelCheckpointMarker
.
This requires implementing tasks to forward a
CancelCheckpointMarker
to their outputs.
abortCheckpointOnBarrier
in class AbstractInvokable
checkpointId
- The ID of the checkpoint to be aborted.cause
- The reason why the checkpoint was aborted during alignmentException
protected void declineCheckpoint(long checkpointId)
protected void handleCheckpointException(Exception exception)
public ExecutorService getAsyncOperationsThreadPool()
public Future<Void> notifyCheckpointCompleteAsync(long checkpointId)
AbstractInvokable
notifyCheckpointCompleteAsync
in class AbstractInvokable
checkpointId
- The ID of the checkpoint that is complete.public ProcessingTimeService getProcessingTimeService(int operatorIndex)
public void handleAsyncException(String message, Throwable exception)
In more detail, it marks task execution failed for an external reason (a reason other than the task code itself throwing an exception). If the task is already in a terminal state (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing. Otherwise it sets the state to FAILED, and, if the invokable code is running, starts an asynchronous thread that aborts that code.
This method never blocks.
handleAsyncException
in interface AsyncExceptionHandler
public CloseableRegistry getCancelables()
@VisibleForTesting public static <OUT> RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> createRecordWriterDelegate(StreamConfig configuration, Environment environment)
Copyright © 2014–2020 The Apache Software Foundation. All rights reserved.