Cloudflare Queues for Background Jobs

Priority: P1 (High Value)

What are Cloudflare Queues?

Managed message queue with guaranteed at-least-once delivery, batching, retries, dead letter queues, and both push (Worker consumer) and pull (HTTP) consumption models.

Why This Matters for Company Manager

Current Background Job Infrastructure

ComponentTechnologyIssues
Queue engineBull on Upstash RedisRedis dependency, limited observability
Cron jobsShell scripts (3h intervals)No retry, no DLQ, no visibility
Order sync`sync-prestashop-orders-latest.sh`Runs for 30+ min, no progress tracking
Product sync`sync-prestashop-products.sh`Incremental but brittle
Supplier orders`auto-supplier-orders-service.sh`No dead letter, no alerting
Queue manager`packages/queue-manager/`Custom Bull wrapper, Upstash Redis

Problems with Current Setup

1. \1: Failed jobs are lost or retried indefinitely 2. \1: No dashboard for job status, no metrics 3. \1: Upstash Redis adds latency and cost 4. \1: Can't react to events in real-time 5. \1: No way to slow down producers if consumers are overwhelmed 6. \1: No horizontal scaling of job processing

What Queues Solves

BeforeAfter
Bull on Upstash RedisNative Queues (no Redis)
No DLQDead letter queues for failed messages
No observabilityCloudflare dashboard + API metrics
3h cron intervalsEvent-driven or fine-grained scheduling
No retry policyConfigurable retries with backoff
Single consumerUp to 250 concurrent consumers
No backpressureBuilt-in flow control

Architecture


Producers (Workers/API)          Queue           Consumer Workers
┌──────────────────┐      ┌────────────┐     ┌──────────────────┐
│ R2 upload event  │─────►│            │────►│ Image processor  │
│ Webhook received │─────►│  CF Queue  │────►│ Sync executor    │
│ TRPC mutation    │─────►│            │────►│ Email sender     │
│ Cron trigger     │─────►│            │────►│ Analytics writer │
└──────────────────┘      └──────┬─────┘     └──────────────────┘
                                 │ failed (max retries)
                          ┌──────▼─────┐
                          │ Dead Letter │
                          │   Queue     │
                          └────────────┘

Implementation

Queue Definitions


# Create queues
npx wrangler queues create prestashop-sync
npx wrangler queues create prestashop-sync-dlq
npx wrangler queues create email-sending
npx wrangler queues create email-sending-dlq
npx wrangler queues create image-processing
npx wrangler queues create analytics-events
npx wrangler queues create ai-agent-tasks

Wrangler Configuration


// wrangler.jsonc (job-processor Worker)
{
  "name": "job-processor",
  "queues": {
    "producers": [
      { "binding": "PRESTASHOP_QUEUE", "queue": "prestashop-sync" },
      { "binding": "EMAIL_QUEUE", "queue": "email-sending" },
      { "binding": "IMAGE_QUEUE", "queue": "image-processing" },
      { "binding": "ANALYTICS_QUEUE", "queue": "analytics-events" },
      { "binding": "AGENT_QUEUE", "queue": "ai-agent-tasks" }
    ],
    "consumers": [
      {
        "queue": "prestashop-sync",
        "max_batch_size": 10,
        "max_retries": 3,
        "dead_letter_queue": "prestashop-sync-dlq",
        "max_batch_timeout": 30,
        "retry_delay": "exponential"
      },
      {
        "queue": "email-sending",
        "max_batch_size": 50,
        "max_retries": 5,
        "dead_letter_queue": "email-sending-dlq"
      },
      {
        "queue": "image-processing",
        "max_batch_size": 5,
        "max_retries": 2
      }
    ]
  }
}

Producer: Prestashop Sync Trigger

Replace cron-based sync with event-driven + scheduled triggers:


// Scheduled trigger (replaces cron)
export default {
  // Runs every 3 hours (same as current cron)
  async scheduled(event: ScheduledEvent, env: Env) {
    const tenants = await getActiveTenants(env);

    for (const tenant of tenants) {
      // Queue order sync
      await env.PRESTASHOP_QUEUE.send({
        type: "sync-orders",
        tenantId: tenant.id,
        siteId: tenant.siteId,
        lastDays: 40,
        bulkMode: true,
      });

      // Queue product sync
      await env.PRESTASHOP_QUEUE.send({
        type: "sync-products",
        tenantId: tenant.id,
        siteId: tenant.siteId,
        incremental: true,
        linkVendors: true,
      });
    }
  },

  // Also triggered by webhooks from Prestashop
  async fetch(request: Request, env: Env) {
    const webhook = await request.json();

    if (webhook.type === "order.created") {
      await env.PRESTASHOP_QUEUE.send({
        type: "sync-single-order",
        tenantId: webhook.tenantId,
        orderId: webhook.orderId,
      });
    }

    return new Response("OK");
  },
};

Consumer: Process Sync Jobs


export default {
  async queue(batch: MessageBatch<SyncJob>, env: Env) {
    for (const message of batch.messages) {
      const job = message.body;

      try {
        switch (job.type) {
          case "sync-orders":
            await syncOrders(env, job);
            break;
          case "sync-products":
            await syncProducts(env, job);
            break;
          case "sync-single-order":
            await syncSingleOrder(env, job);
            break;
        }

        message.ack(); // Success
      } catch (error) {
        console.error(`Job failed: ${job.type}`, error);
        message.retry(); // Will retry or go to DLQ
      }
    }
  },
};

Producer: Email Pipeline


// In TRPC router or service
async function queueEmail(env: Env, email: EmailJob) {
  await env.EMAIL_QUEUE.send({
    to: email.to,
    subject: email.subject,
    templateId: email.templateId,
    variables: email.variables,
    tenantId: email.tenantId,
    priority: email.priority ?? "normal",
  });
}

// Batch send (e.g., newsletter)
async function queueBulkEmails(env: Env, emails: EmailJob[]) {
  const batches = chunk(emails, 100);
  for (const batch of batches) {
    await env.EMAIL_QUEUE.sendBatch(
      batch.map(email => ({ body: email }))
    );
  }
}

Consumer: Email Processor


export default {
  async queue(batch: MessageBatch<EmailJob>, env: Env) {
    // Process batch of emails
    for (const message of batch.messages) {
      try {
        await sendEmail(message.body);
        message.ack();
      } catch (error) {
        if (isTransientError(error)) {
          message.retry({ delaySeconds: 60 }); // Retry in 1 minute
        } else {
          message.ack(); // Permanent failure, log and move on
          await logEmailFailure(env, message.body, error);
        }
      }
    }
  },
};

Image Processing Pipeline


// Producer: trigger on R2 upload
// (R2 event notification → Queue)

// Consumer: process images
export default {
  async queue(batch: MessageBatch<ImageJob>, env: Env) {
    for (const message of batch.messages) {
      const { key, tenantId, operations } = message.body;

      // Get image from R2
      const image = await env.MEDIA.get(key);
      if (!image) { message.ack(); continue; }

      const buffer = await image.arrayBuffer();

      for (const op of operations) {
        switch (op.type) {
          case "thumbnail":
            const thumb = await generateThumbnail(buffer, op.width, op.height);
            await env.MEDIA.put(`${key}/thumb-${op.width}x${op.height}`, thumb);
            break;
          case "alt-text":
            const altText = await generateAltText(env, buffer);
            // Store alt text in DB via Hyperdrive
            break;
        }
      }

      message.ack();
    }
  },
};

AI Agent Task Queue


// Producer: schedule agent runs
export default {
  async scheduled(event: ScheduledEvent, env: Env) {
    // Queue agent tasks for each tenant
    await env.AGENT_QUEUE.send({
      agentType: "content-management",
      tenantId: "...",
      dryRun: false,
    });
  },
};

// Consumer: execute agent tasks
export default {
  async queue(batch: MessageBatch<AgentTask>, env: Env) {
    for (const message of batch.messages) {
      const { agentType, tenantId, dryRun } = message.body;

      // Check circuit breaker first
      const breaker = await checkCircuitBreaker(env, tenantId, agentType);
      if (breaker.isOpen) {
        message.ack(); // Skip, circuit is open
        continue;
      }

      try {
        await executeAgent(env, agentType, tenantId, dryRun);
        message.ack();
      } catch {
        message.retry();
      }
    }
  },
};

Migration from Bull/Upstash Redis

Phase 1: New Queues Only

Add Cloudflare Queues for new features without touching existing Bull:

Phase 2: Email Migration

Move email sending from Bull to CF Queues:

Phase 3: Sync Migration

Move Prestashop sync from cron to CF Queues:

Phase 4: Full Migration

Remove Bull/Upstash Redis dependency entirely.

Pull Consumers (HTTP API)

For consuming from the Next.js app (not a Worker):


// In TRPC router or API route
async function pullMessages(queueId: string) {
  const res = await fetch(
    `https://api.cloudflare.com/client/v4/accounts/${ACCOUNT_ID}/queues/${queueId}/messages/pull`,
    {
      method: "POST",
      headers: { Authorization: `Bearer ${API_TOKEN}` },
      body: JSON.stringify({ batch_size: 10, visibility_timeout_ms: 30000 }),
    }
  );
  return res.json();
}

Limits

MetricFreePaid
Queues/account10,00010,000
Message size128 KB128 KB
Max batch size100100
Max concurrent consumers250250
Pull throughput5,000 msg/s5,000 msg/s
Message retention24 hours14 days
Max retriesConfigurableConfigurable

Pricing

PlanCost
Free10,000 ops/day
Paid$0.40/million operations

\1: $2-10/month for moderate job volume.

Estimated Impact