Data Platform (Lakehouse) Integration
Priority: P3 (Long-Term Vision)
What is the Cloudflare Data Platform?
A serverless data lakehouse combining three components: 1. \1: Event streaming and ingestion via HTTP or Workers bindings 2. \1: Data stored as Apache Iceberg tables on R2 3. \1: Distributed SQL query engine over Iceberg tables
Data is stored in open Iceberg format, queryable from any compatible engine (Spark, Snowflake, Trino, DuckDB).
Why This Matters for Company Manager
Current Data Challenges
| Challenge | Description |
|---|---|
| No data warehouse | All analytics run against production PostgreSQL |
| No historical retention | Old data eventually ages out or grows the DB |
| No cross-tenant analytics | Can't analyze patterns across the whole platform |
| No BI integration | No connection to tools like Metabase, Superset |
| Expensive long queries | Complex reports block transactional workloads |
| No data sharing | Can't expose data to external analytics tools |
What the Data Platform Enables
| Capability | Description |
|---|---|
| Historical data lake | Store years of event data in R2 (cheap) |
| Cross-tenant analytics | Platform-wide metrics for the SaaS operator |
| BI tool integration | Iceberg REST API → any BI tool |
| Decoupled analytics | Queries don't touch production DB |
| Open format | No vendor lock-in (Iceberg is open standard) |
| SQL at scale | Distributed compute over huge datasets |
| Zero egress | Query from any cloud without transfer costs |
Architecture
Event Sources Data Platform Query Consumers
┌──────────────────┐ ┌───────────────────────┐ ┌──────────────────┐
│ POS transactions │─────►│ Pipelines │ │ Metabase/BI │
│ Order events │─────►│ (ingest + transform) │ │ Custom dashboards│
│ Product changes │─────►│ │ │ │ DuckDB local │
│ User activity │─────►│ ▼ │◄─────│ Snowflake │
│ Agent decisions │─────►│ R2 (Iceberg tables) │◄─────│ Apache Spark │
│ Page views │─────►│ │ │ │ Trino │
│ API calls │─────►│ ▼ │ │ Jupyter notebooks│
│ Sync events │─────►│ R2 SQL (query engine) │ │ │
└──────────────────┘ └───────────────────────┘ └──────────────────┘
│
Iceberg REST API
(open, standard)
Implementation
Step 1: Define Pipelines
// Pipeline definitions for event ingestion
// 1. Orders pipeline
const ordersPipeline = {
name: "orders-events",
source: "http", // or Workers binding
destination: {
type: "r2",
format: "iceberg",
table: "orders",
partition: ["tenant_id", "year", "month"],
},
transform: `
SELECT
event.order_id AS order_id,
event.tenant_id AS tenant_id,
event.site_id AS site_id,
event.customer_id AS customer_id,
event.total_amount AS total_amount,
event.currency AS currency,
event.status AS status,
event.payment_method AS payment_method,
event.item_count AS item_count,
event.created_at AS created_at,
EXTRACT(YEAR FROM event.created_at) AS year,
EXTRACT(MONTH FROM event.created_at) AS month
FROM source AS event
WHERE event.total_amount > 0
`,
};
// 2. Product catalog changes
const productsPipeline = {
name: "products-events",
source: "workers-binding",
destination: {
type: "r2",
format: "iceberg",
table: "product_changes",
partition: ["tenant_id", "year"],
},
transform: `
SELECT
event.product_id,
event.tenant_id,
event.change_type,
event.old_price,
event.new_price,
event.old_stock,
event.new_stock,
event.changed_at,
EXTRACT(YEAR FROM event.changed_at) AS year
FROM source AS event
`,
};
// 3. User activity events
const activityPipeline = {
name: "activity-events",
source: "http",
destination: {
type: "r2",
format: "iceberg",
table: "user_activity",
partition: ["tenant_id", "date"],
},
};
Step 2: Ingest Events
// From Workers (binding)
export default {
async fetch(request: Request, env: Env) {
// After processing an order...
await env.ORDERS_PIPELINE.send({
order_id: order.id,
tenant_id: ctx.tenantId,
site_id: ctx.siteId,
customer_id: order.customerId,
total_amount: order.total,
currency: order.currency,
status: order.status,
payment_method: order.paymentMethod,
item_count: order.items.length,
created_at: new Date().toISOString(),
});
return Response.json({ success: true });
},
};
// From HTTP endpoint (external systems like Prestashop)
await fetch("https://pipelines.cloudflare.com/v1/company-manager/orders-events", {
method: "POST",
headers: {
"Authorization": `Bearer ${token}`,
"Content-Type": "application/json",
},
body: JSON.stringify(orderEvent),
});
Step 3: Query with R2 SQL
-- Revenue by tenant (platform-wide analytics)
SELECT
tenant_id,
SUM(total_amount) AS total_revenue,
COUNT(*) AS order_count,
AVG(total_amount) AS avg_order_value
FROM orders
WHERE year = 2026 AND month = 2
GROUP BY tenant_id
ORDER BY total_revenue DESC;
-- Monthly revenue trend
SELECT
year,
month,
SUM(total_amount) AS revenue,
COUNT(DISTINCT customer_id) AS unique_customers
FROM orders
WHERE tenant_id = '0195134f-8258-7e85'
GROUP BY year, month
ORDER BY year, month;
-- Product price change analysis
SELECT
product_id,
MIN(old_price) AS lowest_price,
MAX(new_price) AS highest_price,
COUNT(*) AS price_changes
FROM product_changes
WHERE tenant_id = '0195134f-8258-7e85'
AND year = 2026
GROUP BY product_id
HAVING COUNT(*) > 5;
Step 4: Connect BI Tools
Via Iceberg REST API (standard protocol):
Endpoint: https://r2-sql.cloudflare.com/v1/{account_id}/catalog
Protocol: Apache Iceberg REST Catalog
Auth: Bearer token
Compatible with:
- Apache Spark
- Snowflake (External Catalog)
- Trino
- DuckDB (iceberg extension)
- Metabase
- Superset
# DuckDB example (local analysis)
import duckdb
conn = duckdb.connect()
conn.execute("""
INSTALL iceberg;
LOAD iceberg;
SELECT * FROM iceberg_scan(
'https://r2-sql.cloudflare.com/v1/ACCOUNT_ID/catalog/orders'
)
WHERE tenant_id = '0195134f-8258-7e85'
LIMIT 100;
""")
Data Architecture
Table Design
| Table | Partition | Description |
|---|---|---|
| `orders` | tenant_id, year, month | All order events |
| `product_changes` | tenant_id, year | Product catalog changes |
| `user_activity` | tenant_id, date | Page views, clicks, searches |
| `agent_decisions` | tenant_id, agent_type | AI agent action log |
| `sync_events` | tenant_id, source | Prestashop sync events |
| `api_calls` | tenant_id, date | API usage metrics |
Partitioning Strategy
Partition by tenant_id first (required for multi-tenant isolation), then by time for query efficiency:
r2://data-lake/
├── orders/
│ ├── tenant_id=0195134f.../
│ │ ├── year=2026/
│ │ │ ├── month=01/
│ │ │ │ └── data-00001.parquet
│ │ │ └── month=02/
│ │ │ └── data-00001.parquet
│ │ └── year=2025/
│ └── tenant_id=0192d7b9.../
└── user_activity/
└── ...
Platform Analytics (SaaS Operator View)
This is the unique capability -- analytics across all tenants for the platform operator:
-- Platform health: active tenants this month
SELECT
COUNT(DISTINCT tenant_id) AS active_tenants,
SUM(total_amount) AS platform_gmv,
COUNT(*) AS total_orders
FROM orders
WHERE year = 2026 AND month = 2;
-- Churn risk: tenants with declining orders
SELECT
tenant_id,
this_month.orders AS current_orders,
last_month.orders AS previous_orders,
(this_month.orders - last_month.orders) * 100.0 / last_month.orders AS change_pct
FROM (
SELECT tenant_id, COUNT(*) AS orders
FROM orders WHERE year = 2026 AND month = 2
GROUP BY tenant_id
) this_month
JOIN (
SELECT tenant_id, COUNT(*) AS orders
FROM orders WHERE year = 2026 AND month = 1
GROUP BY tenant_id
) last_month USING (tenant_id)
WHERE change_pct < -30
ORDER BY change_pct;
Cost
- **Pipelines**: Currently in beta (pricing TBD)
- **R2 Storage**: $0.015/GB-month, zero egress
- **R2 SQL**: TBD (in development)
- **Total estimated**: Very cheap for moderate data volumes
Estimated Impact
- **Data retention**: Store years of event history cheaply
- **Cross-tenant analytics**: Platform-wide metrics for SaaS operator
- **BI integration**: Connect any Iceberg-compatible tool
- **DB load**: Zero impact on production PostgreSQL
- **Vendor flexibility**: Open Iceberg format, query from anywhere
- **Effort**: 4-6 weeks for core pipeline + table setup