Caching Strategies
Reduce redundant operations with intelligent caching.
Caching Strategies
Reduce redundant operations with intelligent caching.
Refer to the main Parallel Processing page for more details and considerations.
export default defineWorkflow('batch-processing', async (flow) => { const { items } = flow.params;
// Process items in parallel with concurrency controlconst results = await Promise.all(items.map(item =>flow.do(`process item ${item.id}`, async () => {return processItem(item);})));
return results;});export default defineWorkflow('order-processing', async (flow) => { // Run multiple independent operations in parallel const [payment, inventory, shipping] = await Promise.all([ flow.do('process payment', async () => { return processPayment(flow.params); }), flow.do('check inventory', async () => { return checkInventory(flow.params.items); }), flow.do('calculate shipping', async () => { return calculateShipping(flow.params.address); }) ]);
return { payment, inventory, shipping };});export default defineWorkflow('dynamic-batch', async (flow) => { const { items } = flow.params;
// Dynamically adjust concurrency based on load const batchSize = calculateOptimalBatchSize(items.length);
// Process in batches to control memory usage const results = []; for (let i = 0; i < items.length; i += batchSize) { const batch = items.slice(i, i + batchSize); const batchResults = await Promise.all( batch.map(item => flow.do(`process item ${item.id}`, async () => { return processItem(item); }) ) ); results.push(...batchResults); }
return results;});Efficient resource management is crucial for performance and stability. See also the Resource Management deep dive and the Integrations page for client configurations.
Connection Pooling
Optimize database and API connections with smart pooling strategies.
Resource Limits
Set appropriate limits to prevent resource exhaustion.
See Integrations for full client configuration details.
const DbClient = () => createDbClient({ // Connection pool settings poolSize: 10, minConnections: 2, maxIdleTime: '5 minutes',
// Query timeout queryTimeout: '30 seconds',
// Circuit breaker circuitBreaker: { failureThreshold: 5, resetTimeout: '1 minute' }});
export default defineWorkflow('data-processing', async (flow) => { const db = flow.use(DbClient);
await flow.do('process data', async () => { return db.query('SELECT * FROM data'); });});export default defineWorkflow('batch-processing', async (flow) => { const db = flow.use(DbClient); const { items } = flow.params;
// Use batch operations instead of individual queriesawait flow.do('insert data', async () => {return db.batchInsert('items', items, {batchSize: 1000});});});See Integrations for full client configuration details. Note the retry and circuit breaker settings, detailed further in Retry Policies.
const HttpClient = () => createHttpClient({ // Connection settings maxConnections: 100, keepAlive: true, timeout: '30 seconds',
// Compression compression: true,
// Connection pooling agent: { keepAlive: true, maxSockets: 100, maxFreeSockets: 10, timeout: 60000 },
// Retry configuration retries: { limit: 3, backoff: 'exponential' },
// Circuit breaker circuitBreaker: { failureThreshold: 5, resetTimeout: '1 minute' }});export default defineWorkflow('data-processing', async (flow) => { // Cache results based on input parameters const data = await flow.do('fetch data', { cache: { ttl: '1 hour', key: flow.params.id, tags: ['user-data'], staleWhileRevalidate: '5 minutes' } }, async () => { return fetchExpensiveData(flow.params); });
return data;});export default defineWorkflow('dynamic-caching', async (flow) => { await flow.do('process data', { cache: { ttl: flow.params.priority === 'high' ? '5 minutes' : '1 hour', key: `${flow.params.id}-${flow.params.priority}`, condition: (params) => params.cacheable !== false, onError: 'stale' // Use stale data on error } }, async () => { return processData(flow.params); });});export default defineWorkflow('cache-management', async (flow) => { const cache = flow.use(CacheClient);
// Invalidate specific cache entriesawait flow.do('update data', async () => {await updateData(flow.params);await cache.invalidate({tags: ['user-data', flow.params.userId]});});});