Introduction
When tasks take too long to execute, running them synchronously with flow.executeTask can lead to stability issues in
your workflows.
To mitigate this, you can defer the execution of a workflow and resume it after the nested task completes. This approach enhances reliability by allowing long-running tasks to process asynchronously.
@Executor("example-task")
export class ExampleExecutor implements TaskExecutor {
async processTask(exampleCommand: ExampleCommand, meta: TaskMetadata, flow: Flow): Promise<TaskResult> {
preprocess();
return flow
.defer() // Defer the workflow
.withTask('example-nested-task', {}) // Define the nested task to execute
.continueOn("myContinuationHandler") // Specify the continuation handler
}
}
@Continuation("myContinuationHandler")
export class MyContinuationHandler extends ContinuationHandler {
async onResponse(
name: string,
result: TaskResult, // nested task result
flow: Flow
): Promise<ExampleResponse> {
return postProcess(); // Process the result after the nested task completes
}
}
The onResponse method is triggered in the following scenarios:
- When each nested task completes or fails.
- When each nested task publishes in-progress results.
The default implementation of onResponse checks result.status and automatically delegates to the appropriate method on your continuation handler:
onSuccessfor successful completiononFailurefor errorsonProgressfor in-progress updates
You can override any of these methods in your handler to customize how the parent task should respond.
By default:
onFailurethrows an error containing details from the failed nested task.onProgressforwards the nested task’s progress as the parent task’s progress.
If you need to transform the in-progress result before publishing it, you can override onProgress like this:
@Continuation("myContinuationHandler")
export class MyContinuationHandler extends ContinuationHandler {
async onProgress(
name: string,
result: TaskResult,
flow: Flow
): Promise<ExampleResponse> {
return {
status: TaskStatus.IN_PROGRESS,
result: {
'nestedTask': result.result
}
};
}
}
When executing long-running tasks, you can defer multiple tasks simultaneously within your workflow. This allows them to run in parallel, enhancing efficiency and reducing overall execution time.
Here’s an example of how to define multiple nested tasks to run in parallel:
@Executor("example-task")
export class ExampleExecutor implements TaskExecutor {
async processTask(exampleCommand: ExampleCommand, meta: TaskMetadata, flow: Flow): Promise<TaskResult> {
preprocess();
return flow
.defer() // Defer the workflow
.withTask('example-nested-task', {name: "first-task"}) // Define the nested task to execute
.withTask('example-nested-task', {name: "second-task"}) // Define the nested task to execute
.continueOn("myContinuationHandler") // Specify the continuation handler
}
}
@Continuation("myContinuationHandler")
export class MyContinuationHandler extends ContinuationHandler {
async onResponse(
name: string,
result: TaskResult, // nested task result
flow: Flow
): Promise<ExampleResponse> {
return postProcess(); // Process the result after the nested task completes
}
}
To ensure that you receive the desired responses from your workflow, it's important to correctly override the onSuccess, onFailure, and onProgress methods in your continuation handler.
By default,
onSuccessmarks the task as completed as soon as the first successful response is received.onFailuremarks the task as failed as soon as any one failure is reported.
Additionally, the onProgress method should be designed to correctly merge the results from each nested task, allowing you to effectively communicate updates on the progress of all concurrent tasks.
You can use flow.deferredTasks() to retrieve information about all deferred tasks in the workflow.