Cloudflare Workflows for Durable Orchestration
Priority: P2 (Strategic)
What are Cloudflare Workflows?
A durable execution engine where each step's result is automatically persisted. Workflows can sleep for hours/days, wait for external events, automatically retry failed steps, and only bill for active compute time.
Why This Matters for Company Manager
Current Long-Running Processes
| Process | Duration | Current Tech | Issues |
|---|---|---|---|
| Prestashop order sync | 30+ min | Shell script + CLI | No state recovery, no progress tracking |
| Prestashop product sync | 20+ min | Shell script + CLI | Same |
| Auto supplier orders | 10+ min | Shell script + CLI | Same |
| AI agent decisions | Variable | In-memory state | Lost on restart, no audit trail |
| Email campaigns | Hours | Bull queue | No orchestration across steps |
| Data imports | Variable | CLI scripts | No resume capability |
What Workflows Solves
| Before | After |
|---|---|
| Script crashes → restart from scratch | Resume from last completed step |
| No progress visibility | Built-in observability API |
| Manual retry on failure | Automatic retry with backoff |
| Fire-and-forget cron | Durable state across days |
| No human-in-the-loop | `step.waitForEvent()` for approvals |
| Pay for idle time | $0 for sleeping steps |
Integration Opportunities
1. Prestashop Sync Orchestration
Replace the monolithic sync scripts with a multi-step workflow:
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from "cloudflare:workers";
interface SyncParams {
tenantId: string;
siteId: string;
entities: ("orders" | "products")[];
lastDays: number;
}
export class PrestashopSyncWorkflow extends WorkflowEntrypoint<Env, SyncParams> {
async run(event: WorkflowEvent<SyncParams>, step: WorkflowStep) {
const { tenantId, siteId, entities, lastDays } = event.payload;
// Step 1: Validate credentials (persisted)
const credentials = await step.do("validate-credentials", async () => {
const creds = await this.env.HYPERDRIVE.query(
"SELECT * FROM prestashop_config WHERE tenant_id = $1",
[tenantId]
);
if (!creds) throw new Error("No Prestashop credentials");
return creds;
});
// Step 2: Fetch remote data counts
const counts = await step.do("fetch-counts", async () => {
const api = new PrestashopAPI(credentials);
return {
orders: entities.includes("orders") ? await api.getOrderCount(lastDays) : 0,
products: entities.includes("products") ? await api.getProductCount() : 0,
};
});
// Step 3: Sync orders (if requested)
if (entities.includes("orders") && counts.orders > 0) {
const batchSize = 50;
const batches = Math.ceil(counts.orders / batchSize);
for (let i = 0; i < batches; i++) {
await step.do(`sync-orders-batch-${i}`, {
retries: { limit: 3, delay: "30 seconds", backoff: "exponential" },
}, async () => {
const api = new PrestashopAPI(credentials);
const orders = await api.getOrders(i * batchSize, batchSize);
await syncOrderBatch(this.env, tenantId, orders);
return { processed: orders.length, batch: i };
});
}
}
// Step 4: Sync products (if requested)
if (entities.includes("products") && counts.products > 0) {
const batchSize = 100;
const batches = Math.ceil(counts.products / batchSize);
for (let i = 0; i < batches; i++) {
await step.do(`sync-products-batch-${i}`, {
retries: { limit: 3, delay: "30 seconds", backoff: "exponential" },
}, async () => {
const api = new PrestashopAPI(credentials);
const products = await api.getProducts(i * batchSize, batchSize);
await syncProductBatch(this.env, tenantId, products);
return { processed: products.length, batch: i };
});
}
}
// Step 5: Post-sync verification
const verification = await step.do("verify-sync", async () => {
const localCounts = await getLocalCounts(this.env, tenantId);
return {
ordersInDB: localCounts.orders,
productsInDB: localCounts.products,
syncedAt: new Date().toISOString(),
};
});
return { success: true, verification };
}
}
\1:
- If the sync crashes at batch 47/100, it resumes at batch 48
- Each batch has independent retry with exponential backoff
- Progress is queryable via the Workflows API
- No state lost on Worker restart
2. AI Decision Pipeline
Implement the AI autonomy decision flow as a durable workflow:
export class AIDecisionWorkflow extends WorkflowEntrypoint<Env, DecisionParams> {
async run(event: WorkflowEvent<DecisionParams>, step: WorkflowStep) {
const { tenantId, agentType, proposals } = event.payload;
// Step 1: Check circuit breaker
const circuitOk = await step.do("check-circuit-breaker", async () => {
return checkCircuitBreaker(this.env, tenantId, agentType);
});
if (!circuitOk) return { status: "circuit-open", skipped: true };
// Step 2: Check autonomy level
const config = await step.do("get-autonomy-config", async () => {
return getAutonomyConfig(this.env, tenantId, agentType);
});
for (const proposal of proposals) {
// Step 3: Create decision record
const decision = await step.do(`create-decision-${proposal.id}`, async () => {
return createDecision(this.env, proposal);
});
// Step 4: Auto-execute or wait for approval
if (config.autonomyLevel >= 3 && proposal.confidence >= config.threshold) {
// SUPERVISED/AUTONOMOUS: auto-execute
await step.do(`execute-${proposal.id}`, async () => {
return executeDecision(this.env, decision);
});
} else {
// MANUAL/ASSISTED: wait for human approval (up to 72 hours)
const approval = await step.waitForEvent(`approval-${proposal.id}`, {
type: "decision-review",
timeout: "72 hours",
});
if (approval?.data?.approved) {
await step.do(`execute-${proposal.id}`, async () => {
return executeDecision(this.env, decision);
});
}
}
// Step 5: Start rollback window
await step.sleep(`rollback-window-${proposal.id}`, "24 hours");
// Step 6: Finalize (no more rollback)
await step.do(`finalize-${proposal.id}`, async () => {
return finalizeDecision(this.env, decision.id);
});
}
}
}
\1:
- Human-in-the-loop approval via `step.waitForEvent()`
- 24-hour rollback window via `step.sleep()` (costs $0 while sleeping)
- Complete audit trail (each step is persisted)
- Survives crashes, restarts, deployments
3. User Onboarding Flow
Multi-day onboarding with scheduled emails and setup verification:
export class TenantOnboardingWorkflow extends WorkflowEntrypoint<Env, OnboardingParams> {
async run(event: WorkflowEvent<OnboardingParams>, step: WorkflowStep) {
const { tenantId, email, plan } = event.payload;
// Step 1: Send welcome email
await step.do("welcome-email", async () => {
await sendEmail(this.env, email, "welcome", { plan });
});
// Step 2: Wait 1 day
await step.sleep("wait-day-1", "1 day");
// Step 3: Check if setup is complete
const setupDone = await step.do("check-setup", async () => {
return checkTenantSetup(this.env, tenantId);
});
if (!setupDone) {
// Step 4: Send setup reminder
await step.do("setup-reminder", async () => {
await sendEmail(this.env, email, "setup-reminder", { tenantId });
});
// Step 5: Wait 3 more days
await step.sleep("wait-day-4", "3 days");
}
// Step 6: Send tips email
await step.do("tips-email", async () => {
await sendEmail(this.env, email, "getting-started-tips", { plan });
});
// Step 7: Wait until trial end (e.g., 14 days from start)
await step.sleepUntil("trial-end", event.payload.trialEndDate);
// Step 8: Send trial ending notification
await step.do("trial-ending", async () => {
await sendEmail(this.env, email, "trial-ending", { plan });
});
}
}
4. Data Import Orchestration
For large CSV/data imports with progress tracking:
export class DataImportWorkflow extends WorkflowEntrypoint<Env, ImportParams> {
async run(event: WorkflowEvent<ImportParams>, step: WorkflowStep) {
const { tenantId, fileKey, entityType } = event.payload;
// Step 1: Download and parse file from R2
const { totalRows, batches } = await step.do("parse-file", async () => {
const file = await this.env.MEDIA.get(fileKey);
const data = await parseCSV(await file!.text());
return { totalRows: data.length, batches: chunk(data, 100) };
});
// Step 2: Validate schema
await step.do("validate-schema", async () => {
return validateImportSchema(batches[0], entityType);
});
// Step 3: Process each batch
let processed = 0;
for (let i = 0; i < batches.length; i++) {
const result = await step.do(`import-batch-${i}`, {
retries: { limit: 2, delay: "10 seconds" },
}, async () => {
return importBatch(this.env, tenantId, batches[i], entityType);
});
processed += result.count;
}
// Step 4: Send completion notification
await step.do("notify-complete", async () => {
await sendNotification(this.env, tenantId, {
type: "import-complete",
data: { entityType, totalRows, processed },
});
});
return { totalRows, processed };
}
}
5. Supplier Order Generation
Replace the auto-supplier-orders cron:
export class SupplierOrderWorkflow extends WorkflowEntrypoint<Env, SupplierOrderParams> {
async run(event: WorkflowEvent<SupplierOrderParams>, step: WorkflowStep) {
const { tenantId, siteId, fromDate, toDate } = event.payload;
// Step 1: Analyze demand
const demand = await step.do("analyze-demand", async () => {
return analyzeDemand(this.env, tenantId, fromDate, toDate);
});
// Step 2: Generate draft orders per supplier
const drafts = await step.do("generate-drafts", async () => {
return generateSupplierOrderDrafts(this.env, tenantId, demand);
});
// Step 3: Wait for human review (optional based on autonomy level)
for (const draft of drafts) {
if (draft.totalAmount > 1000) {
// High-value order: wait for approval
const approval = await step.waitForEvent(`approve-order-${draft.supplierId}`, {
type: "supplier-order-approval",
timeout: "48 hours",
});
if (!approval?.data?.approved) continue; // Skip unapproved
}
// Step 4: Create the order
await step.do(`create-order-${draft.supplierId}`, async () => {
return createSupplierOrder(this.env, tenantId, draft);
});
}
}
}
Triggering Workflows
// From a Worker
const instance = await env.MY_WORKFLOW.create({
params: { tenantId, siteId, entities: ["orders", "products"], lastDays: 40 },
});
// Scheduled (cron trigger)
export default {
async scheduled(event: ScheduledEvent, env: Env) {
await env.PRESTASHOP_SYNC.create({
params: { tenantId: "...", entities: ["orders"], lastDays: 40 },
});
},
};
// From TRPC (via HTTP API)
await fetch(`https://api.cloudflare.com/client/v4/accounts/${accountId}/workflows/${workflowName}/instances`, {
method: "POST",
headers: { Authorization: `Bearer ${token}` },
body: JSON.stringify({ params: { ... } }),
});
Monitoring Workflow Instances
// Get instance status
const status = await env.MY_WORKFLOW.get(instanceId);
// { status: "running", output: [...step results...] }
// List all instances
const instances = await env.MY_WORKFLOW.list();
Limits
| Metric | Free | Paid |
|---|---|---|
| Max steps/workflow | 1,024 | 1,024 |
| Max concurrent running | 10,000 | 10,000 |
| CPU time per step | 30s (default) | Configurable |
| Workflow duration | Infinite | Infinite |
| State retention | 3 days | 30 days |
| Sleep duration | Unlimited | Unlimited |
Pricing
- Billed as standard Workers compute (CPU time + requests)
- **$0 for sleeping/waiting** (major cost advantage)
- Steps within a workflow are not separate invocations
Estimated Impact
- **Reliability**: Crash-proof sync processes (resume from last step)
- **Observability**: Built-in progress tracking and status API
- **Human-in-the-loop**: Native support for approval workflows
- **Cost**: $0 while sleeping; cheaper than keeping processes alive
- **Effort**: 2-4 weeks per workflow migration