Workflows Overview
Workflows are tasks that enable you to chain and reuse other tasks to build comprehensive APIs. They provide a flexible way to orchestrate complex processes by combining smaller, modular tasks.
The key addition on TaskExecutor interface is the Flow object which is passed as a third param to processTask method. The
Flow object empowers you to:
- Execute other tasks: Trigger nested tasks within the workflow.
- Deliver intermediate results: Send partial results to the client before the entire task execution is complete
- Support flexible configurations: Easily create and integrate different configurations for a workflow.
Invoking Other Tasks
Below are examples of how to use the Flow object to invoke a nested task within a TaskExecutor implementation:
-
Sync
@Executor("example-task")
export class ExampleExecutor implements TaskExecutor {
async processTask(command: any, metadata: TaskMetadata, flow: Flow): Promise<ExampleResponse> {
const {result} = await flow.executeTask("example-nested-task", {
name: "Joe"
});
return {
result
}
}
}
-
Async
@Executor("example-task")
export class ExampleExecutor implements TaskExecutor {
async processTask(command: any, metadata: TaskMetadata, flow: Flow): Promise<ExampleResponse> {
return flow
.defer()
.withTask('example-nested-task', params)
.continueOn("exampleContinuationHandler")
}
}
@Continuation("exampleContinuationHandler")
export class ExampleContinuationHandler extends ContinuationHandler {
async onResponse(name: string, result: TaskResult, flow: Flow): Promise<ExampleResponse> {
const finalRes = process(result);
return {
result: finalRes,
}
}
}
More info here.
-
Stream
@Executor("example-task")
export class ExampleExecutor implements TaskExecutor {
async processTask(command: any, metadata: TaskMetadata, flow: Flow): Promise<ExampleResponse> {
const streamingRes = flow.streamTask("example-nested-task", {
name: "Joe",
});
let result = {};
for await (const chunk of streamingRes) {
this.handleStreamChunk(chunk);
if (chunk.type === EventTypes.COMPLETED) {
result = chunk;
}
}
return {
result
};
}
}
Publishing Progress
@Executor("example-task")
export class ExampleExecutor implements TaskExecutor {
async processTask(command: any, metadata: TaskMetadata, flow: Flow): Promise<ExampleResponse> {
await flow.emitPartialResult({
name: "Joe"
});
sleep(5000);
return {
name: 'Joe',
surname: 'Doe'
}
}
}
More info here.
Versioned Workflows
Workflows and task executors can now support versioning, allowing you to run different versions of a workflow side by side (e.g., for upgrades, experiments, or backward compatibility). Versioning applies to both the API endpoint and the executor registration in code.
Registering a Versioned Task
You can define a versioned or multi-part task by including the version (or path segments) in the @Executor() decorator:
@Executor("v2/example-task")
export class ExampleTaskV2Executor implements TaskExecutor {
async processTask(command: any, metadata: TaskMetadata, flow: Flow): Promise<any> {
return { result: "This is version 2!" };
}
}
Refer to naming conventions here.