Cloudflare Queues for Background Jobs
Priority: P1 (High Value)
What are Cloudflare Queues?
Managed message queue with guaranteed at-least-once delivery, batching, retries, dead letter queues, and both push (Worker consumer) and pull (HTTP) consumption models.
Why This Matters for Company Manager
Current Background Job Infrastructure
| Component | Technology | Issues |
|---|---|---|
| Queue engine | Bull on Upstash Redis | Redis dependency, limited observability |
| Cron jobs | Shell scripts (3h intervals) | No retry, no DLQ, no visibility |
| Order sync | `sync-prestashop-orders-latest.sh` | Runs for 30+ min, no progress tracking |
| Product sync | `sync-prestashop-products.sh` | Incremental but brittle |
| Supplier orders | `auto-supplier-orders-service.sh` | No dead letter, no alerting |
| Queue manager | `packages/queue-manager/` | Custom Bull wrapper, Upstash Redis |
Problems with Current Setup
1. \1: Failed jobs are lost or retried indefinitely 2. \1: No dashboard for job status, no metrics 3. \1: Upstash Redis adds latency and cost 4. \1: Can't react to events in real-time 5. \1: No way to slow down producers if consumers are overwhelmed 6. \1: No horizontal scaling of job processing
What Queues Solves
| Before | After |
|---|---|
| Bull on Upstash Redis | Native Queues (no Redis) |
| No DLQ | Dead letter queues for failed messages |
| No observability | Cloudflare dashboard + API metrics |
| 3h cron intervals | Event-driven or fine-grained scheduling |
| No retry policy | Configurable retries with backoff |
| Single consumer | Up to 250 concurrent consumers |
| No backpressure | Built-in flow control |
Architecture
Producers (Workers/API) Queue Consumer Workers
┌──────────────────┐ ┌────────────┐ ┌──────────────────┐
│ R2 upload event │─────►│ │────►│ Image processor │
│ Webhook received │─────►│ CF Queue │────►│ Sync executor │
│ TRPC mutation │─────►│ │────►│ Email sender │
│ Cron trigger │─────►│ │────►│ Analytics writer │
└──────────────────┘ └──────┬─────┘ └──────────────────┘
│ failed (max retries)
┌──────▼─────┐
│ Dead Letter │
│ Queue │
└────────────┘
Implementation
Queue Definitions
# Create queues
npx wrangler queues create prestashop-sync
npx wrangler queues create prestashop-sync-dlq
npx wrangler queues create email-sending
npx wrangler queues create email-sending-dlq
npx wrangler queues create image-processing
npx wrangler queues create analytics-events
npx wrangler queues create ai-agent-tasks
Wrangler Configuration
// wrangler.jsonc (job-processor Worker)
{
"name": "job-processor",
"queues": {
"producers": [
{ "binding": "PRESTASHOP_QUEUE", "queue": "prestashop-sync" },
{ "binding": "EMAIL_QUEUE", "queue": "email-sending" },
{ "binding": "IMAGE_QUEUE", "queue": "image-processing" },
{ "binding": "ANALYTICS_QUEUE", "queue": "analytics-events" },
{ "binding": "AGENT_QUEUE", "queue": "ai-agent-tasks" }
],
"consumers": [
{
"queue": "prestashop-sync",
"max_batch_size": 10,
"max_retries": 3,
"dead_letter_queue": "prestashop-sync-dlq",
"max_batch_timeout": 30,
"retry_delay": "exponential"
},
{
"queue": "email-sending",
"max_batch_size": 50,
"max_retries": 5,
"dead_letter_queue": "email-sending-dlq"
},
{
"queue": "image-processing",
"max_batch_size": 5,
"max_retries": 2
}
]
}
}
Producer: Prestashop Sync Trigger
Replace cron-based sync with event-driven + scheduled triggers:
// Scheduled trigger (replaces cron)
export default {
// Runs every 3 hours (same as current cron)
async scheduled(event: ScheduledEvent, env: Env) {
const tenants = await getActiveTenants(env);
for (const tenant of tenants) {
// Queue order sync
await env.PRESTASHOP_QUEUE.send({
type: "sync-orders",
tenantId: tenant.id,
siteId: tenant.siteId,
lastDays: 40,
bulkMode: true,
});
// Queue product sync
await env.PRESTASHOP_QUEUE.send({
type: "sync-products",
tenantId: tenant.id,
siteId: tenant.siteId,
incremental: true,
linkVendors: true,
});
}
},
// Also triggered by webhooks from Prestashop
async fetch(request: Request, env: Env) {
const webhook = await request.json();
if (webhook.type === "order.created") {
await env.PRESTASHOP_QUEUE.send({
type: "sync-single-order",
tenantId: webhook.tenantId,
orderId: webhook.orderId,
});
}
return new Response("OK");
},
};
Consumer: Process Sync Jobs
export default {
async queue(batch: MessageBatch<SyncJob>, env: Env) {
for (const message of batch.messages) {
const job = message.body;
try {
switch (job.type) {
case "sync-orders":
await syncOrders(env, job);
break;
case "sync-products":
await syncProducts(env, job);
break;
case "sync-single-order":
await syncSingleOrder(env, job);
break;
}
message.ack(); // Success
} catch (error) {
console.error(`Job failed: ${job.type}`, error);
message.retry(); // Will retry or go to DLQ
}
}
},
};
Producer: Email Pipeline
// In TRPC router or service
async function queueEmail(env: Env, email: EmailJob) {
await env.EMAIL_QUEUE.send({
to: email.to,
subject: email.subject,
templateId: email.templateId,
variables: email.variables,
tenantId: email.tenantId,
priority: email.priority ?? "normal",
});
}
// Batch send (e.g., newsletter)
async function queueBulkEmails(env: Env, emails: EmailJob[]) {
const batches = chunk(emails, 100);
for (const batch of batches) {
await env.EMAIL_QUEUE.sendBatch(
batch.map(email => ({ body: email }))
);
}
}
Consumer: Email Processor
export default {
async queue(batch: MessageBatch<EmailJob>, env: Env) {
// Process batch of emails
for (const message of batch.messages) {
try {
await sendEmail(message.body);
message.ack();
} catch (error) {
if (isTransientError(error)) {
message.retry({ delaySeconds: 60 }); // Retry in 1 minute
} else {
message.ack(); // Permanent failure, log and move on
await logEmailFailure(env, message.body, error);
}
}
}
},
};
Image Processing Pipeline
// Producer: trigger on R2 upload
// (R2 event notification → Queue)
// Consumer: process images
export default {
async queue(batch: MessageBatch<ImageJob>, env: Env) {
for (const message of batch.messages) {
const { key, tenantId, operations } = message.body;
// Get image from R2
const image = await env.MEDIA.get(key);
if (!image) { message.ack(); continue; }
const buffer = await image.arrayBuffer();
for (const op of operations) {
switch (op.type) {
case "thumbnail":
const thumb = await generateThumbnail(buffer, op.width, op.height);
await env.MEDIA.put(`${key}/thumb-${op.width}x${op.height}`, thumb);
break;
case "alt-text":
const altText = await generateAltText(env, buffer);
// Store alt text in DB via Hyperdrive
break;
}
}
message.ack();
}
},
};
AI Agent Task Queue
// Producer: schedule agent runs
export default {
async scheduled(event: ScheduledEvent, env: Env) {
// Queue agent tasks for each tenant
await env.AGENT_QUEUE.send({
agentType: "content-management",
tenantId: "...",
dryRun: false,
});
},
};
// Consumer: execute agent tasks
export default {
async queue(batch: MessageBatch<AgentTask>, env: Env) {
for (const message of batch.messages) {
const { agentType, tenantId, dryRun } = message.body;
// Check circuit breaker first
const breaker = await checkCircuitBreaker(env, tenantId, agentType);
if (breaker.isOpen) {
message.ack(); // Skip, circuit is open
continue;
}
try {
await executeAgent(env, agentType, tenantId, dryRun);
message.ack();
} catch {
message.retry();
}
}
},
};
Migration from Bull/Upstash Redis
Phase 1: New Queues Only
Add Cloudflare Queues for new features without touching existing Bull:
- Image processing pipeline
- Analytics event ingestion
- AI agent task scheduling
Phase 2: Email Migration
Move email sending from Bull to CF Queues:
- Add DLQ for failed emails
- Better retry policies
- Remove email-related Bull jobs
Phase 3: Sync Migration
Move Prestashop sync from cron to CF Queues:
- Event-driven sync (webhook → queue)
- Scheduled fallback (every 3h)
- Per-tenant job isolation
- DLQ for failed syncs
Phase 4: Full Migration
Remove Bull/Upstash Redis dependency entirely.
Pull Consumers (HTTP API)
For consuming from the Next.js app (not a Worker):
// In TRPC router or API route
async function pullMessages(queueId: string) {
const res = await fetch(
`https://api.cloudflare.com/client/v4/accounts/${ACCOUNT_ID}/queues/${queueId}/messages/pull`,
{
method: "POST",
headers: { Authorization: `Bearer ${API_TOKEN}` },
body: JSON.stringify({ batch_size: 10, visibility_timeout_ms: 30000 }),
}
);
return res.json();
}
Limits
| Metric | Free | Paid |
|---|---|---|
| Queues/account | 10,000 | 10,000 |
| Message size | 128 KB | 128 KB |
| Max batch size | 100 | 100 |
| Max concurrent consumers | 250 | 250 |
| Pull throughput | 5,000 msg/s | 5,000 msg/s |
| Message retention | 24 hours | 14 days |
| Max retries | Configurable | Configurable |
Pricing
| Plan | Cost |
|---|---|
| Free | 10,000 ops/day |
| Paid | $0.40/million operations |
\1: $2-10/month for moderate job volume.
Estimated Impact
- **Reliability**: At-least-once delivery with DLQ (currently: best-effort)
- **Observability**: Built-in metrics and dashboard
- **Cost**: Remove Upstash Redis dependency (~$10-30/mo savings)
- **Flexibility**: Event-driven + scheduled triggers
- **Scale**: 250 concurrent consumers (currently: single process)
- **Effort**: 2-3 weeks for full migration