Core Workflow Activities
The flow object provides several asynchronous methods (do, sleep, dialog, request, start) to define the steps, pauses, and interactions of your workflow. The engine ensures these activities are executed durably and can be resumed.
Tip: Aim to make each activity step as granular as possible. See Rule 2: Make Steps Granular.
The flow.do() method is used to execute a unit of work, which can be synchronous or asynchronous (returning a Promise). It’s the most common activity for running your custom logic, calculations, or simple external calls.
Signature:
flow.do<Result>(name: string, task: Task<Result>): Promise<Result>;flow.do<Result>(name: string, options: TaskOptions, task: Task<Result>): Promise<Result>;flow.do<Result, Intermediate>(name: string, options: WithSchema<TaskOptions, Result, Intermediate>, task: Task<Intermediate>): Promise<Result>;name: (Required) A unique string identifier for this step within the workflow definition. This name is crucial for idempotency.task: (Required) An asynchronous functionasync (ctx) => { ... }that contains the logic for this step. It receives aTaskContextobject (ctx) with properties like:ctx.step: Information about the current step execution (e.g.,ctx.step.attempts).ctx.use: The dependency injection function (same asflow.use).ctx.signal: AnAbortSignalfor cancellation handling.ctx.span: The OpenTelemetry span for this task.
options: (Optional) An object to configure behavior like retries (TaskOptions) or add schema validation (WithSchema).Result: The type of the value returned by thetaskfunction.
Idempotency and Caching:
The name provided to flow.do() is key. The workflow engine records the result of a successfully completed flow.do() step associated with its unique name. If the workflow restarts or replays due to an interruption, the engine will not re-execute the task function for a step with the same name that has already completed successfully. Instead, it will immediately return the previously recorded result.
This makes flow.do() steps inherently idempotent. Ensure your step names are unique and descriptive within the workflow.
For a deeper dive into idempotency, see Rule 1: Ensure Idempotency in our Workflow Rules guide.
Example:
// Inside the execute function:
const result = await flow.do('First step', async ({ step }) => { flow.log(`Executing First step, attempt: ${step.attempts}`);
// Example: Simulate work that might fail initially if (step.attempts < 3) { flow.warn('Simulating failure on attempt', step.attempts); throw new Error(`Not ready yet: ${step.attempts}`); // Throwing an error triggers a retry (configurable) }
// Simulate successful work const output = { message: 'First step completed successfully!', data: 123 }; flow.log('First step succeeded.'); return output; // This result is cached upon success});
// If the workflow resumes after this step, the above task function// will not re-run. `result` will contain the cached output.flow.log('Result from First step:', result);
const result2 = await flow.do('Second step', () => { // Simple synchronous task flow.log('Executing Second step.'); return { output: 'Second step result' };});
flow.log('Result from Second step:', result2);Retries:
If the task function throws an error, the engine will attempt to retry the step based on the configured retry strategy (either in the step’s options or the workflow defaults). See the Error Handling & Retries guide for more on configuring retries.
Use flow.sleep() to pause the workflow execution for a specific duration or until a specific time.
Signature:
flow.sleep(name: string, until: Duration | Date | SleepOptions): Promise<void>;flow.sleep<T>(name: string, until: Duration | Date | SleepOptions, value: T): Promise<T>;name: (Required) A unique string identifier for this sleep step.until: (Required) Specifies when the workflow should wake up. Can be:- A number (milliseconds duration).
- A
Dateobject (wake up at this specific time). - A duration string (e.g.,
'5 seconds','1 minute','2 hours'). - A
SleepOptionsobject{ until: ..., message?: ... }.
value: (Optional) A value to return when the sleep duration completes.
Behavior:
When flow.sleep() is called, the workflow instance enters the SLEEPING state (while the overall instance remains ACTIVE). The engine schedules it to wake up at the specified time. Once woken, the workflow resumes execution from the point immediately after the await flow.sleep(...) call.
Use Cases:
- Implementing scheduled tasks.
- Waiting for external processes that take a known amount of time.
- Rate limiting or adding delays between steps.
Example:
// Inside the execute function:
flow.log('About to sleep...');
// Wait for 5 secondsawait flow.sleep('Wait between steps', '5 seconds');// Or wait until a specific date:// await flow.sleep('Wait until specific time', new Date('2024-12-31T23:59:59Z'));
flow.log('Woke up after sleeping!');
// You can also return a value after sleepingconst sleepResult = await flow.sleep('Short nap', 100, 'Slept for 100ms');flow.log(sleepResult); // Outputs: Slept for 100msUse flow.dialog() when your workflow needs to pause and wait for input or confirmation from a human user (or an external system acting like one).
Signature:
flow.dialog<Result, Params>(name: string, define: DialogActivity<Params>): Promise<Result>;flow.dialog<Result, Params>(name: string, options: DialogOptions, define: DialogActivity<Params>): Promise<Result>;flow.dialog<Result, Params>(name: string, options: WithSchema<DialogOptions, Result>, define: DialogActivity<Params>): Promise<Result>;name: (Required) A unique string identifier for this dialog step.define: (Required) An asynchronous functionasync (ctx) => { ... }that defines the parameters for the dialog interaction. It receives anActivityContext(ctx) and must return an object (DialogInput) containing:params: Data needed by the UI or external system to present the dialog (e.g., form identifier, context data, user/role info).assignees: (Optional) An array of subject strings (user/group IDs) who are eligible to respond to the dialog.until: (Optional) ADurationorDateby which the dialog must be resolved, otherwise the step errors.message: (Optional) A descriptive message for this step.
options: (Optional) An object to configure retries (DialogOptions) or add schema validation (WithSchema) for the result returned when the dialog is completed.Result: The type of the data expected back when the dialog is resolved.Params: The type of theparamsobject returned by thedefinefunction.
Behavior:
- The
definefunction is executed to generate the dialog parameters (DialogInput). - The workflow instance step enters the
PENDINGstate (instance remainsACTIVE). - The engine records the
params,assignees,until, and a uniquetokenassociated with this step. - The workflow pauses indefinitely (or until the
untiltime is reached). - An external system (like a UI or notification service) must query the workflow instance state (via the GraphQL API), retrieve the
PENDINGstep’s details (includingparamsandtoken), present the necessary interface to the user, and collect their response. - Once the user responds, the external system calls the
workflowActivity(token: ...).continue(input: ...)GraphQL mutation, providing thetokenand the user’s response data (input.data). - The engine receives the continuation request, finds the paused instance and step via the
token, validates the submitteddataagainst theschema(if provided inoptions), wakes up the workflow, and theawait flow.dialog(...)call resolves, returning the validateddata.
Example:
// Inside the execute functionimport * as v from '@identity-flow/sdk/valibot';
flow.log('Waiting for form approval...');
const approvalResult = await flow.dialog( 'approve form', // Options specify the expected result schema { schema: v.object({ approved: v.boolean(), reason: v.nullish(v.string()) }) }, // Define function returns params for the UI and sets a timeout ({ token }) => ({ params: { form: 'one-form-approval', // Identifier for the UI component token // Pass the token needed to continue }, until: '30 minutes' // Step fails if not resolved in 30 mins }));
// Execution resumes here after GraphQL workflowActivity(...).continue() is calledflow.log('Approval result received:', approvalResult);
if (approvalResult.approved) { flow.log('Form was approved.', approvalResult.reason ? `Reason: ${approvalResult.reason}` : '');} else { flow.log('Form was rejected.', approvalResult.reason ? `Reason: ${approvalResult.reason}` : ''); // Potentially end the workflow early return { status: 'rejected' };}flow.dialog is essential for incorporating human decision points into your automated processes.
Use flow.request() when your workflow needs to pause and wait for external data or status updates.
Signature:
flow.request<Result, Data>(name: string, options: RequestOptions, activity: Activity<Data>): Promise<Result>;name: (Required) A unique string identifier for this request step.options: (Required) An object to configure the request behavior, including schema validation and polling configuration.activity: (Required) An asynchronous functionasync (ctx) => { ... }that contains the logic for this step. It receives aRequestContext(ctx) with properties like:ctx.step: Information about the current step execution (e.g.,ctx.step.attempts).ctx.use: The dependency injection function (same asflow.use).ctx.signal: AnAbortSignalfor cancellation handling.ctx.span: The OpenTelemetry span for this task.ctx.token: A unique token for this request activity, used for external continuation.
Result: The type of data expected back when the request is completed (either via external continuation or successful polling).Data: The type of data returned by the initialactivityfunction (often used for polling).
Behavior:
- External Continuation (Default):
- The
activityfunction executes, initiating the external process and potentially passing the uniquectx.tokento the external system. - The workflow instance step enters the
REQUESTINGstate (instance remainsACTIVE) and hibernates. - The external system performs its work asynchronously.
- When complete, the external system must call back to the IdentityFlow engine, typically via the
workflowActivity(token: ...).continue(input: ...)GraphQL mutation, providing thetokenand the resultdata. - The engine validates the result against the
schema(if provided), wakes the workflow, and theawait flow.request(...)call resolves with the result.
- The
- Polling:
- The
activityfunction executes, initiates the external process, and returns data (Data) needed for polling (e.g., a job ID). - The workflow instance step enters the
REQUESTINGstate. - The engine periodically executes the provided
poll.callbackfunction according to thepoll.retriesconfiguration. - The
poll.callbackfunction checks the status of the external operation (using the data returned by the initialactivity). It should return{ success: true, data: Result }when the operation is complete, or{ success: false }to continue polling. - If the
poll.callbackreturns success, theawait flow.request(...)call resolves with theResultdata. - If polling attempts are exhausted without success, the step fails.
- The
Use Cases:
- Interacting with asynchronous APIs that use webhooks/callbacks.
- Starting long-running batch jobs and waiting for their completion.
- Integrating with systems where status needs to be checked periodically.
Example (Conceptual - Callback):
// Inside the execute function:import * as v from '@identity-flow/sdk/valibot';
flow.log('Initiating external report generation...');
const reportResult = await flow.request( 'generate-report', // Define expected result structure { schema: v.object({ reportUrl: v.string(), status: v.literal('completed') }) }, // Activity function initiates the request and passes the token async ({ token, use }) => { const reportService = use(ReportServiceBinding); // Assuming ReportServiceBinding is defined // Tell the external service to start generation and notify via callback // including the unique token. await reportService.startReportGeneration({ userId: flow.params.userId, callbackToken: token, // External service uses this token to continue callbackUrl: 'https://identity-flow.example.com/api/callback', // Example callback endpoint }); flow.log('Report generation initiated, waiting for callback...'); // No return value needed here as we wait for external continuation },);
// Execution resumes here after the external service calls back via APIflow.log(`Report generated successfully: ${reportResult.reportUrl}`);Example (Conceptual - Polling):
// Inside the execute function:import * as v from '@identity-flow/sdk/valibot';
flow.log('Starting batch job...');
const jobResult = await flow.request( 'process-batch-job', { // Expected result when polling succeeds schema: v.object({ finalStatus: v.string(), outputLocation: v.string() }), // Polling configuration poll: { // Check every 5 minutes, retry up to 10 times retries: { limit: 10, delay: '5 minutes' }, // Polling callback function callback: async ({ use, step }) => { const jobService = use(JobServiceBinding); // Assuming JobServiceBinding defined const jobId = step.data?.jobId; // Access data returned by initial activity if (!jobId) throw new Error('Missing Job ID for polling');
const status = await jobService.getJobStatus(jobId); flow.debug(`Polling job ${jobId}: Status is ${status.state}`);
if (status.state === 'COMPLETED') { return { success: true, data: { finalStatus: status.state, outputLocation: status.resultsUrl }, }; } else if (status.state === 'FAILED') { throw new Error(`Job ${jobId} failed externally: ${status.errorMessage}`); } // Otherwise, continue polling return { success: false }; }, }, }, // Initial activity starts the job and returns data needed for polling async ({ use }) => { const jobService = use(JobServiceBinding); // Assuming JobServiceBinding defined const jobInfo = await jobService.startBatchJob({ inputData: flow.params.batchData }); flow.log(`Batch job started with ID: ${jobInfo.jobId}`); // Return data needed by the polling callback return { jobId: jobInfo.jobId }; },);
// Execution resumes here after polling succeedsflow.log(`Batch job ${jobResult.finalStatus}, output: ${jobResult.outputLocation}`);flow.request() handles scenarios where the workflow needs to pause and wait for external triggers or periodic checks.
For complex processes, you can break down logic into smaller, reusable workflows and orchestrate them using flow.start(). This allows you to start another workflow definition (a “sub-workflow” or “child workflow”) and wait for its completion.
Signature:
flow.start<Result, Params>(name: string, activity: StartWorkflowActivity<Params>): Promise<Result>;flow.start<Result, Params>(name: string, options: ActivityOptions, activity: StartWorkflowActivity<Params>): Promise<Result>;flow.start<Result, Params>(name: string, options: WithSchema<ActivityOptions, Result>, activity: StartWorkflowActivity<Params>): Promise<Result>;name: (Required) A unique string identifier for this sub-workflow step.activity: (Required) An asynchronous functionasync (ctx) => { ... }that defines which sub-workflow to start and with which parameters. It receives aStartWorkflowContext(ctx) and must return an object (StartWorkflowInput) containing:workflow: The name (or ID) of the workflow definition to start.params: The input parameters to pass to the sub-workflow.version: (Optional) A specific version or range of the sub-workflow definition to use.releaseChannel: (Optional) A specific release channel (e.g.,'latest') of the sub-workflow definition.label,description,recipients,until,meta: (Optional) Properties to set on the sub-workflow instance.
options: (Optional) An object to configure retries (ActivityOptions) for the initiation of the sub-workflow or add schema validation (WithSchema) for theResultexpected back from the sub-workflow upon its completion.Result: The type of the final result returned by the sub-workflow.Params: The type of theparamspassed to the sub-workflow.
Behavior:
- The
activityfunction executes to determine the sub-workflow definition and its input parameters. - The engine initiates a new instance of the specified sub-workflow definition.
- The current workflow instance step enters the
RELYINGstate (instance remainsACTIVE) and pauses. - The sub-workflow instance executes independently.
- When the sub-workflow instance completes successfully, its final result is recorded.
- The engine wakes up the original (parent) workflow.
- The
await flow.start(...)call resolves, returning the result from the completed sub-workflow. - If the sub-workflow fails or is terminated, the
flow.start()step in the parent workflow will also typically fail (subject to retry configurations on theflow.startoptions).
Use Cases:
- Reusing common logic (e.g., a standard notification workflow, an approval sub-process).
- Breaking down very large or complex workflows into manageable, independent parts.
- Implementing patterns like dynamic parallel execution (by starting multiple sub-workflows).
Example (Conceptual):
import * as v from '@identity-flow/sdk/valibot';import { defineWorkflow } from '@identity-flow/sdk';
export default defineWorkflow({ name: '@my-org/main-process', version: '1.0.0' }, async (flow) => { flow.log('Main process starting.');
const userData = await flow.do('fetch-user-data', async () => ({ name: 'Alice', id: 'user123' }));
flow.log('Starting notification sub-workflow...'); const notificationResult = await flow.start( 'send-welcome-email', { schema: v.object({ delivered: v.boolean(), messageId: v.string() }) }, (ctx) => ({ workflow: '@my-org/send-notification', params: { recipientId: userData.id, subject: 'Welcome!', body: `Hi ${userData.name}, welcome aboard!`, }, label: `Welcome Email for ${userData.name}`, }), );
if (notificationResult.delivered) { flow.log(`Welcome email sent successfully, Message ID: ${notificationResult.messageId}`); } else { flow.warn('Welcome email failed to deliver.'); }
flow.log('Main process finished.'); return { status: 'complete' };});// src/workflows/send-notification.ts (Sub-workflow example)import * as v from '@identity-flow/sdk/valibot';import { defineWorkflow } from '@identity-flow/sdk';
// import { EmailBinding } from '../bindings/email'; // Assuming EmailBinding is defined
export default defineWorkflow( { name: '@my-org/send-notification', version: '1.0.0', schema: v.object({ recipientId: v.string(), subject: v.string(), body: v.string() }), }, async (flow) => { const { recipientId, subject, body } = flow.params; flow.log(`Attempting to send notification to ${recipientId}`);
const sendResult = await flow.do('send-via-email-service', async ({ use }) => { // const emailService = use(EmailBinding); // const messageId = await emailService.send(recipientId, subject, body); // return { delivered: true, messageId }; // For example purposes: flow.log('Simulating email send...'); return { delivered: true, messageId: 'fake-message-id-' + Math.random().toString(36).substring(7), }; });
flow.log('Notification sub-workflow finished.'); return sendResult; },);Using flow.start() promotes modularity and reuse in complex workflow scenarios.
Master the other core components of workflow development: