Cloudflare Workflows for Durable Orchestration

Priority: P2 (Strategic)

What are Cloudflare Workflows?

A durable execution engine where each step's result is automatically persisted. Workflows can sleep for hours/days, wait for external events, automatically retry failed steps, and only bill for active compute time.

Why This Matters for Company Manager

Current Long-Running Processes

ProcessDurationCurrent TechIssues
Prestashop order sync30+ minShell script + CLINo state recovery, no progress tracking
Prestashop product sync20+ minShell script + CLISame
Auto supplier orders10+ minShell script + CLISame
AI agent decisionsVariableIn-memory stateLost on restart, no audit trail
Email campaignsHoursBull queueNo orchestration across steps
Data importsVariableCLI scriptsNo resume capability

What Workflows Solves

BeforeAfter
Script crashes → restart from scratchResume from last completed step
No progress visibilityBuilt-in observability API
Manual retry on failureAutomatic retry with backoff
Fire-and-forget cronDurable state across days
No human-in-the-loop`step.waitForEvent()` for approvals
Pay for idle time$0 for sleeping steps

Integration Opportunities

1. Prestashop Sync Orchestration

Replace the monolithic sync scripts with a multi-step workflow:


import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from "cloudflare:workers";

interface SyncParams {
  tenantId: string;
  siteId: string;
  entities: ("orders" | "products")[];
  lastDays: number;
}

export class PrestashopSyncWorkflow extends WorkflowEntrypoint<Env, SyncParams> {
  async run(event: WorkflowEvent<SyncParams>, step: WorkflowStep) {
    const { tenantId, siteId, entities, lastDays } = event.payload;

    // Step 1: Validate credentials (persisted)
    const credentials = await step.do("validate-credentials", async () => {
      const creds = await this.env.HYPERDRIVE.query(
        "SELECT * FROM prestashop_config WHERE tenant_id = $1",
        [tenantId]
      );
      if (!creds) throw new Error("No Prestashop credentials");
      return creds;
    });

    // Step 2: Fetch remote data counts
    const counts = await step.do("fetch-counts", async () => {
      const api = new PrestashopAPI(credentials);
      return {
        orders: entities.includes("orders") ? await api.getOrderCount(lastDays) : 0,
        products: entities.includes("products") ? await api.getProductCount() : 0,
      };
    });

    // Step 3: Sync orders (if requested)
    if (entities.includes("orders") && counts.orders > 0) {
      const batchSize = 50;
      const batches = Math.ceil(counts.orders / batchSize);

      for (let i = 0; i < batches; i++) {
        await step.do(`sync-orders-batch-${i}`, {
          retries: { limit: 3, delay: "30 seconds", backoff: "exponential" },
        }, async () => {
          const api = new PrestashopAPI(credentials);
          const orders = await api.getOrders(i * batchSize, batchSize);
          await syncOrderBatch(this.env, tenantId, orders);
          return { processed: orders.length, batch: i };
        });
      }
    }

    // Step 4: Sync products (if requested)
    if (entities.includes("products") && counts.products > 0) {
      const batchSize = 100;
      const batches = Math.ceil(counts.products / batchSize);

      for (let i = 0; i < batches; i++) {
        await step.do(`sync-products-batch-${i}`, {
          retries: { limit: 3, delay: "30 seconds", backoff: "exponential" },
        }, async () => {
          const api = new PrestashopAPI(credentials);
          const products = await api.getProducts(i * batchSize, batchSize);
          await syncProductBatch(this.env, tenantId, products);
          return { processed: products.length, batch: i };
        });
      }
    }

    // Step 5: Post-sync verification
    const verification = await step.do("verify-sync", async () => {
      const localCounts = await getLocalCounts(this.env, tenantId);
      return {
        ordersInDB: localCounts.orders,
        productsInDB: localCounts.products,
        syncedAt: new Date().toISOString(),
      };
    });

    return { success: true, verification };
  }
}

\1:

2. AI Decision Pipeline

Implement the AI autonomy decision flow as a durable workflow:


export class AIDecisionWorkflow extends WorkflowEntrypoint<Env, DecisionParams> {
  async run(event: WorkflowEvent<DecisionParams>, step: WorkflowStep) {
    const { tenantId, agentType, proposals } = event.payload;

    // Step 1: Check circuit breaker
    const circuitOk = await step.do("check-circuit-breaker", async () => {
      return checkCircuitBreaker(this.env, tenantId, agentType);
    });
    if (!circuitOk) return { status: "circuit-open", skipped: true };

    // Step 2: Check autonomy level
    const config = await step.do("get-autonomy-config", async () => {
      return getAutonomyConfig(this.env, tenantId, agentType);
    });

    for (const proposal of proposals) {
      // Step 3: Create decision record
      const decision = await step.do(`create-decision-${proposal.id}`, async () => {
        return createDecision(this.env, proposal);
      });

      // Step 4: Auto-execute or wait for approval
      if (config.autonomyLevel >= 3 && proposal.confidence >= config.threshold) {
        // SUPERVISED/AUTONOMOUS: auto-execute
        await step.do(`execute-${proposal.id}`, async () => {
          return executeDecision(this.env, decision);
        });
      } else {
        // MANUAL/ASSISTED: wait for human approval (up to 72 hours)
        const approval = await step.waitForEvent(`approval-${proposal.id}`, {
          type: "decision-review",
          timeout: "72 hours",
        });

        if (approval?.data?.approved) {
          await step.do(`execute-${proposal.id}`, async () => {
            return executeDecision(this.env, decision);
          });
        }
      }

      // Step 5: Start rollback window
      await step.sleep(`rollback-window-${proposal.id}`, "24 hours");

      // Step 6: Finalize (no more rollback)
      await step.do(`finalize-${proposal.id}`, async () => {
        return finalizeDecision(this.env, decision.id);
      });
    }
  }
}

\1:

3. User Onboarding Flow

Multi-day onboarding with scheduled emails and setup verification:


export class TenantOnboardingWorkflow extends WorkflowEntrypoint<Env, OnboardingParams> {
  async run(event: WorkflowEvent<OnboardingParams>, step: WorkflowStep) {
    const { tenantId, email, plan } = event.payload;

    // Step 1: Send welcome email
    await step.do("welcome-email", async () => {
      await sendEmail(this.env, email, "welcome", { plan });
    });

    // Step 2: Wait 1 day
    await step.sleep("wait-day-1", "1 day");

    // Step 3: Check if setup is complete
    const setupDone = await step.do("check-setup", async () => {
      return checkTenantSetup(this.env, tenantId);
    });

    if (!setupDone) {
      // Step 4: Send setup reminder
      await step.do("setup-reminder", async () => {
        await sendEmail(this.env, email, "setup-reminder", { tenantId });
      });

      // Step 5: Wait 3 more days
      await step.sleep("wait-day-4", "3 days");
    }

    // Step 6: Send tips email
    await step.do("tips-email", async () => {
      await sendEmail(this.env, email, "getting-started-tips", { plan });
    });

    // Step 7: Wait until trial end (e.g., 14 days from start)
    await step.sleepUntil("trial-end", event.payload.trialEndDate);

    // Step 8: Send trial ending notification
    await step.do("trial-ending", async () => {
      await sendEmail(this.env, email, "trial-ending", { plan });
    });
  }
}

4. Data Import Orchestration

For large CSV/data imports with progress tracking:


export class DataImportWorkflow extends WorkflowEntrypoint<Env, ImportParams> {
  async run(event: WorkflowEvent<ImportParams>, step: WorkflowStep) {
    const { tenantId, fileKey, entityType } = event.payload;

    // Step 1: Download and parse file from R2
    const { totalRows, batches } = await step.do("parse-file", async () => {
      const file = await this.env.MEDIA.get(fileKey);
      const data = await parseCSV(await file!.text());
      return { totalRows: data.length, batches: chunk(data, 100) };
    });

    // Step 2: Validate schema
    await step.do("validate-schema", async () => {
      return validateImportSchema(batches[0], entityType);
    });

    // Step 3: Process each batch
    let processed = 0;
    for (let i = 0; i < batches.length; i++) {
      const result = await step.do(`import-batch-${i}`, {
        retries: { limit: 2, delay: "10 seconds" },
      }, async () => {
        return importBatch(this.env, tenantId, batches[i], entityType);
      });
      processed += result.count;
    }

    // Step 4: Send completion notification
    await step.do("notify-complete", async () => {
      await sendNotification(this.env, tenantId, {
        type: "import-complete",
        data: { entityType, totalRows, processed },
      });
    });

    return { totalRows, processed };
  }
}

5. Supplier Order Generation

Replace the auto-supplier-orders cron:


export class SupplierOrderWorkflow extends WorkflowEntrypoint<Env, SupplierOrderParams> {
  async run(event: WorkflowEvent<SupplierOrderParams>, step: WorkflowStep) {
    const { tenantId, siteId, fromDate, toDate } = event.payload;

    // Step 1: Analyze demand
    const demand = await step.do("analyze-demand", async () => {
      return analyzeDemand(this.env, tenantId, fromDate, toDate);
    });

    // Step 2: Generate draft orders per supplier
    const drafts = await step.do("generate-drafts", async () => {
      return generateSupplierOrderDrafts(this.env, tenantId, demand);
    });

    // Step 3: Wait for human review (optional based on autonomy level)
    for (const draft of drafts) {
      if (draft.totalAmount > 1000) {
        // High-value order: wait for approval
        const approval = await step.waitForEvent(`approve-order-${draft.supplierId}`, {
          type: "supplier-order-approval",
          timeout: "48 hours",
        });

        if (!approval?.data?.approved) continue; // Skip unapproved
      }

      // Step 4: Create the order
      await step.do(`create-order-${draft.supplierId}`, async () => {
        return createSupplierOrder(this.env, tenantId, draft);
      });
    }
  }
}

Triggering Workflows


// From a Worker
const instance = await env.MY_WORKFLOW.create({
  params: { tenantId, siteId, entities: ["orders", "products"], lastDays: 40 },
});

// Scheduled (cron trigger)
export default {
  async scheduled(event: ScheduledEvent, env: Env) {
    await env.PRESTASHOP_SYNC.create({
      params: { tenantId: "...", entities: ["orders"], lastDays: 40 },
    });
  },
};

// From TRPC (via HTTP API)
await fetch(`https://api.cloudflare.com/client/v4/accounts/${accountId}/workflows/${workflowName}/instances`, {
  method: "POST",
  headers: { Authorization: `Bearer ${token}` },
  body: JSON.stringify({ params: { ... } }),
});

Monitoring Workflow Instances


// Get instance status
const status = await env.MY_WORKFLOW.get(instanceId);
// { status: "running", output: [...step results...] }

// List all instances
const instances = await env.MY_WORKFLOW.list();

Limits

MetricFreePaid
Max steps/workflow1,0241,024
Max concurrent running10,00010,000
CPU time per step30s (default)Configurable
Workflow durationInfiniteInfinite
State retention3 days30 days
Sleep durationUnlimitedUnlimited

Pricing

Estimated Impact