Skip to content

Core Workflow Activities

The flow object provides several asynchronous methods (do, sleep, dialog, request, start) to define the steps, pauses, and interactions of your workflow. The engine ensures these activities are executed durably and can be resumed.

Tip: Aim to make each activity step as granular as possible. See Rule 2: Make Steps Granular.

The flow.do() method is used to execute a unit of work, which can be synchronous or asynchronous (returning a Promise). It’s the most common activity for running your custom logic, calculations, or simple external calls.

Signature:

flow.do<Result>(name: string, task: Task<Result>): Promise<Result>;
flow.do<Result>(name: string, options: TaskOptions, task: Task<Result>): Promise<Result>;
flow.do<Result, Intermediate>(name: string, options: WithSchema<TaskOptions, Result, Intermediate>, task: Task<Intermediate>): Promise<Result>;
  • name: (Required) A unique string identifier for this step within the workflow definition. This name is crucial for idempotency.
  • task: (Required) An asynchronous function async (ctx) => { ... } that contains the logic for this step. It receives a TaskContext object (ctx) with properties like:
    • ctx.step: Information about the current step execution (e.g., ctx.step.attempts).
    • ctx.use: The dependency injection function (same as flow.use).
    • ctx.signal: An AbortSignal for cancellation handling.
    • ctx.span: The OpenTelemetry span for this task.
  • options: (Optional) An object to configure behavior like retries (TaskOptions) or add schema validation (WithSchema).
  • Result: The type of the value returned by the task function.

Idempotency and Caching:

The name provided to flow.do() is key. The workflow engine records the result of a successfully completed flow.do() step associated with its unique name. If the workflow restarts or replays due to an interruption, the engine will not re-execute the task function for a step with the same name that has already completed successfully. Instead, it will immediately return the previously recorded result.

This makes flow.do() steps inherently idempotent. Ensure your step names are unique and descriptive within the workflow.

For a deeper dive into idempotency, see Rule 1: Ensure Idempotency in our Workflow Rules guide.

Example:

// Inside the execute function:
const result = await flow.do('First step', async ({ step }) => {
flow.log(`Executing First step, attempt: ${step.attempts}`);
// Example: Simulate work that might fail initially
if (step.attempts < 3) {
flow.warn('Simulating failure on attempt', step.attempts);
throw new Error(`Not ready yet: ${step.attempts}`); // Throwing an error triggers a retry (configurable)
}
// Simulate successful work
const output = { message: 'First step completed successfully!', data: 123 };
flow.log('First step succeeded.');
return output; // This result is cached upon success
});
// If the workflow resumes after this step, the above task function
// will not re-run. `result` will contain the cached output.
flow.log('Result from First step:', result);
const result2 = await flow.do('Second step', () => {
// Simple synchronous task
flow.log('Executing Second step.');
return { output: 'Second step result' };
});
flow.log('Result from Second step:', result2);

Retries:

If the task function throws an error, the engine will attempt to retry the step based on the configured retry strategy (either in the step’s options or the workflow defaults). See the Error Handling & Retries guide for more on configuring retries.

Use flow.sleep() to pause the workflow execution for a specific duration or until a specific time.

Signature:

flow.sleep(name: string, until: Duration | Date | SleepOptions): Promise<void>;
flow.sleep<T>(name: string, until: Duration | Date | SleepOptions, value: T): Promise<T>;
  • name: (Required) A unique string identifier for this sleep step.
  • until: (Required) Specifies when the workflow should wake up. Can be:
    • A number (milliseconds duration).
    • A Date object (wake up at this specific time).
    • A duration string (e.g., '5 seconds', '1 minute', '2 hours').
    • A SleepOptions object { until: ..., message?: ... }.
  • value: (Optional) A value to return when the sleep duration completes.

Behavior:

When flow.sleep() is called, the workflow instance enters the SLEEPING state (while the overall instance remains ACTIVE). The engine schedules it to wake up at the specified time. Once woken, the workflow resumes execution from the point immediately after the await flow.sleep(...) call.

Use Cases:

  • Implementing scheduled tasks.
  • Waiting for external processes that take a known amount of time.
  • Rate limiting or adding delays between steps.

Example:

// Inside the execute function:
flow.log('About to sleep...');
// Wait for 5 seconds
await flow.sleep('Wait between steps', '5 seconds');
// Or wait until a specific date:
// await flow.sleep('Wait until specific time', new Date('2024-12-31T23:59:59Z'));
flow.log('Woke up after sleeping!');
// You can also return a value after sleeping
const sleepResult = await flow.sleep('Short nap', 100, 'Slept for 100ms');
flow.log(sleepResult); // Outputs: Slept for 100ms

Use flow.dialog() when your workflow needs to pause and wait for input or confirmation from a human user (or an external system acting like one).

Signature:

flow.dialog<Result, Params>(name: string, define: DialogActivity<Params>): Promise<Result>;
flow.dialog<Result, Params>(name: string, options: DialogOptions, define: DialogActivity<Params>): Promise<Result>;
flow.dialog<Result, Params>(name: string, options: WithSchema<DialogOptions, Result>, define: DialogActivity<Params>): Promise<Result>;
  • name: (Required) A unique string identifier for this dialog step.
  • define: (Required) An asynchronous function async (ctx) => { ... } that defines the parameters for the dialog interaction. It receives an ActivityContext (ctx) and must return an object (DialogInput) containing:
    • params: Data needed by the UI or external system to present the dialog (e.g., form identifier, context data, user/role info).
    • assignees: (Optional) An array of subject strings (user/group IDs) who are eligible to respond to the dialog.
    • until: (Optional) A Duration or Date by which the dialog must be resolved, otherwise the step errors.
    • message: (Optional) A descriptive message for this step.
  • options: (Optional) An object to configure retries (DialogOptions) or add schema validation (WithSchema) for the result returned when the dialog is completed.
  • Result: The type of the data expected back when the dialog is resolved.
  • Params: The type of the params object returned by the define function.

Behavior:

  1. The define function is executed to generate the dialog parameters (DialogInput).
  2. The workflow instance step enters the PENDING state (instance remains ACTIVE).
  3. The engine records the params, assignees, until, and a unique token associated with this step.
  4. The workflow pauses indefinitely (or until the until time is reached).
  5. An external system (like a UI or notification service) must query the workflow instance state (via the GraphQL API), retrieve the PENDING step’s details (including params and token), present the necessary interface to the user, and collect their response.
  6. Once the user responds, the external system calls the workflowActivity(token: ...).continue(input: ...) GraphQL mutation, providing the token and the user’s response data (input.data).
  7. The engine receives the continuation request, finds the paused instance and step via the token, validates the submitted data against the schema (if provided in options), wakes up the workflow, and the await flow.dialog(...) call resolves, returning the validated data.

Example:

// Inside the execute function
import * as v from '@identity-flow/sdk/valibot';
flow.log('Waiting for form approval...');
const approvalResult = await flow.dialog(
'approve form',
// Options specify the expected result schema
{ schema: v.object({ approved: v.boolean(), reason: v.nullish(v.string()) }) },
// Define function returns params for the UI and sets a timeout
({ token }) => ({
params: {
form: 'one-form-approval', // Identifier for the UI component
token // Pass the token needed to continue
},
until: '30 minutes' // Step fails if not resolved in 30 mins
})
);
// Execution resumes here after GraphQL workflowActivity(...).continue() is called
flow.log('Approval result received:', approvalResult);
if (approvalResult.approved) {
flow.log('Form was approved.', approvalResult.reason ? `Reason: ${approvalResult.reason}` : '');
} else {
flow.log('Form was rejected.', approvalResult.reason ? `Reason: ${approvalResult.reason}` : '');
// Potentially end the workflow early
return { status: 'rejected' };
}

flow.dialog is essential for incorporating human decision points into your automated processes.

Use flow.request() when your workflow needs to pause and wait for external data or status updates.

Signature:

flow.request<Result, Data>(name: string, options: RequestOptions, activity: Activity<Data>): Promise<Result>;
  • name: (Required) A unique string identifier for this request step.
  • options: (Required) An object to configure the request behavior, including schema validation and polling configuration.
  • activity: (Required) An asynchronous function async (ctx) => { ... } that contains the logic for this step. It receives a RequestContext (ctx) with properties like:
    • ctx.step: Information about the current step execution (e.g., ctx.step.attempts).
    • ctx.use: The dependency injection function (same as flow.use).
    • ctx.signal: An AbortSignal for cancellation handling.
    • ctx.span: The OpenTelemetry span for this task.
    • ctx.token: A unique token for this request activity, used for external continuation.
  • Result: The type of data expected back when the request is completed (either via external continuation or successful polling).
  • Data: The type of data returned by the initial activity function (often used for polling).

Behavior:

  • External Continuation (Default):
    1. The activity function executes, initiating the external process and potentially passing the unique ctx.token to the external system.
    2. The workflow instance step enters the REQUESTING state (instance remains ACTIVE) and hibernates.
    3. The external system performs its work asynchronously.
    4. When complete, the external system must call back to the IdentityFlow engine, typically via the workflowActivity(token: ...).continue(input: ...) GraphQL mutation, providing the token and the result data.
    5. The engine validates the result against the schema (if provided), wakes the workflow, and the await flow.request(...) call resolves with the result.
  • Polling:
    1. The activity function executes, initiates the external process, and returns data (Data) needed for polling (e.g., a job ID).
    2. The workflow instance step enters the REQUESTING state.
    3. The engine periodically executes the provided poll.callback function according to the poll.retries configuration.
    4. The poll.callback function checks the status of the external operation (using the data returned by the initial activity). It should return { success: true, data: Result } when the operation is complete, or { success: false } to continue polling.
    5. If the poll.callback returns success, the await flow.request(...) call resolves with the Result data.
    6. If polling attempts are exhausted without success, the step fails.

Use Cases:

  • Interacting with asynchronous APIs that use webhooks/callbacks.
  • Starting long-running batch jobs and waiting for their completion.
  • Integrating with systems where status needs to be checked periodically.

Example (Conceptual - Callback):

// Inside the execute function:
import * as v from '@identity-flow/sdk/valibot';
flow.log('Initiating external report generation...');
const reportResult = await flow.request(
'generate-report',
// Define expected result structure
{ schema: v.object({ reportUrl: v.string(), status: v.literal('completed') }) },
// Activity function initiates the request and passes the token
async ({ token, use }) => {
const reportService = use(ReportServiceBinding); // Assuming ReportServiceBinding is defined
// Tell the external service to start generation and notify via callback
// including the unique token.
await reportService.startReportGeneration({
userId: flow.params.userId,
callbackToken: token, // External service uses this token to continue
callbackUrl: 'https://identity-flow.example.com/api/callback', // Example callback endpoint
});
flow.log('Report generation initiated, waiting for callback...');
// No return value needed here as we wait for external continuation
},
);
// Execution resumes here after the external service calls back via API
flow.log(`Report generated successfully: ${reportResult.reportUrl}`);

Example (Conceptual - Polling):

// Inside the execute function:
import * as v from '@identity-flow/sdk/valibot';
flow.log('Starting batch job...');
const jobResult = await flow.request(
'process-batch-job',
{
// Expected result when polling succeeds
schema: v.object({ finalStatus: v.string(), outputLocation: v.string() }),
// Polling configuration
poll: {
// Check every 5 minutes, retry up to 10 times
retries: { limit: 10, delay: '5 minutes' },
// Polling callback function
callback: async ({ use, step }) => {
const jobService = use(JobServiceBinding); // Assuming JobServiceBinding defined
const jobId = step.data?.jobId; // Access data returned by initial activity
if (!jobId) throw new Error('Missing Job ID for polling');
const status = await jobService.getJobStatus(jobId);
flow.debug(`Polling job ${jobId}: Status is ${status.state}`);
if (status.state === 'COMPLETED') {
return {
success: true,
data: { finalStatus: status.state, outputLocation: status.resultsUrl },
};
} else if (status.state === 'FAILED') {
throw new Error(`Job ${jobId} failed externally: ${status.errorMessage}`);
}
// Otherwise, continue polling
return { success: false };
},
},
},
// Initial activity starts the job and returns data needed for polling
async ({ use }) => {
const jobService = use(JobServiceBinding); // Assuming JobServiceBinding defined
const jobInfo = await jobService.startBatchJob({ inputData: flow.params.batchData });
flow.log(`Batch job started with ID: ${jobInfo.jobId}`);
// Return data needed by the polling callback
return { jobId: jobInfo.jobId };
},
);
// Execution resumes here after polling succeeds
flow.log(`Batch job ${jobResult.finalStatus}, output: ${jobResult.outputLocation}`);

flow.request() handles scenarios where the workflow needs to pause and wait for external triggers or periodic checks.

For complex processes, you can break down logic into smaller, reusable workflows and orchestrate them using flow.start(). This allows you to start another workflow definition (a “sub-workflow” or “child workflow”) and wait for its completion.

Signature:

flow.start<Result, Params>(name: string, activity: StartWorkflowActivity<Params>): Promise<Result>;
flow.start<Result, Params>(name: string, options: ActivityOptions, activity: StartWorkflowActivity<Params>): Promise<Result>;
flow.start<Result, Params>(name: string, options: WithSchema<ActivityOptions, Result>, activity: StartWorkflowActivity<Params>): Promise<Result>;
  • name: (Required) A unique string identifier for this sub-workflow step.
  • activity: (Required) An asynchronous function async (ctx) => { ... } that defines which sub-workflow to start and with which parameters. It receives a StartWorkflowContext (ctx) and must return an object (StartWorkflowInput) containing:
    • workflow: The name (or ID) of the workflow definition to start.
    • params: The input parameters to pass to the sub-workflow.
    • version: (Optional) A specific version or range of the sub-workflow definition to use.
    • releaseChannel: (Optional) A specific release channel (e.g., 'latest') of the sub-workflow definition.
    • label, description, recipients, until, meta: (Optional) Properties to set on the sub-workflow instance.
  • options: (Optional) An object to configure retries (ActivityOptions) for the initiation of the sub-workflow or add schema validation (WithSchema) for the Result expected back from the sub-workflow upon its completion.
  • Result: The type of the final result returned by the sub-workflow.
  • Params: The type of the params passed to the sub-workflow.

Behavior:

  1. The activity function executes to determine the sub-workflow definition and its input parameters.
  2. The engine initiates a new instance of the specified sub-workflow definition.
  3. The current workflow instance step enters the RELYING state (instance remains ACTIVE) and pauses.
  4. The sub-workflow instance executes independently.
  5. When the sub-workflow instance completes successfully, its final result is recorded.
  6. The engine wakes up the original (parent) workflow.
  7. The await flow.start(...) call resolves, returning the result from the completed sub-workflow.
  8. If the sub-workflow fails or is terminated, the flow.start() step in the parent workflow will also typically fail (subject to retry configurations on the flow.start options).

Use Cases:

  • Reusing common logic (e.g., a standard notification workflow, an approval sub-process).
  • Breaking down very large or complex workflows into manageable, independent parts.
  • Implementing patterns like dynamic parallel execution (by starting multiple sub-workflows).

Example (Conceptual):

src/workflows/main-process.ts
import * as v from '@identity-flow/sdk/valibot';
import { defineWorkflow } from '@identity-flow/sdk';
export default defineWorkflow({ name: '@my-org/main-process', version: '1.0.0' }, async (flow) => {
flow.log('Main process starting.');
const userData = await flow.do('fetch-user-data', async () => ({ name: 'Alice', id: 'user123' }));
flow.log('Starting notification sub-workflow...');
const notificationResult = await flow.start(
'send-welcome-email',
{ schema: v.object({ delivered: v.boolean(), messageId: v.string() }) },
(ctx) => ({
workflow: '@my-org/send-notification',
params: {
recipientId: userData.id,
subject: 'Welcome!',
body: `Hi ${userData.name}, welcome aboard!`,
},
label: `Welcome Email for ${userData.name}`,
}),
);
if (notificationResult.delivered) {
flow.log(`Welcome email sent successfully, Message ID: ${notificationResult.messageId}`);
} else {
flow.warn('Welcome email failed to deliver.');
}
flow.log('Main process finished.');
return { status: 'complete' };
});
// src/workflows/send-notification.ts (Sub-workflow example)
import * as v from '@identity-flow/sdk/valibot';
import { defineWorkflow } from '@identity-flow/sdk';
// import { EmailBinding } from '../bindings/email'; // Assuming EmailBinding is defined
export default defineWorkflow(
{
name: '@my-org/send-notification',
version: '1.0.0',
schema: v.object({ recipientId: v.string(), subject: v.string(), body: v.string() }),
},
async (flow) => {
const { recipientId, subject, body } = flow.params;
flow.log(`Attempting to send notification to ${recipientId}`);
const sendResult = await flow.do('send-via-email-service', async ({ use }) => {
// const emailService = use(EmailBinding);
// const messageId = await emailService.send(recipientId, subject, body);
// return { delivered: true, messageId };
// For example purposes:
flow.log('Simulating email send...');
return {
delivered: true,
messageId: 'fake-message-id-' + Math.random().toString(36).substring(7),
};
});
flow.log('Notification sub-workflow finished.');
return sendResult;
},
);

Using flow.start() promotes modularity and reuse in complex workflow scenarios.


Master the other core components of workflow development: