Data Platform (Lakehouse) Integration

Priority: P3 (Long-Term Vision)

What is the Cloudflare Data Platform?

A serverless data lakehouse combining three components: 1. \1: Event streaming and ingestion via HTTP or Workers bindings 2. \1: Data stored as Apache Iceberg tables on R2 3. \1: Distributed SQL query engine over Iceberg tables

Data is stored in open Iceberg format, queryable from any compatible engine (Spark, Snowflake, Trino, DuckDB).

Why This Matters for Company Manager

Current Data Challenges

ChallengeDescription
No data warehouseAll analytics run against production PostgreSQL
No historical retentionOld data eventually ages out or grows the DB
No cross-tenant analyticsCan't analyze patterns across the whole platform
No BI integrationNo connection to tools like Metabase, Superset
Expensive long queriesComplex reports block transactional workloads
No data sharingCan't expose data to external analytics tools

What the Data Platform Enables

CapabilityDescription
Historical data lakeStore years of event data in R2 (cheap)
Cross-tenant analyticsPlatform-wide metrics for the SaaS operator
BI tool integrationIceberg REST API → any BI tool
Decoupled analyticsQueries don't touch production DB
Open formatNo vendor lock-in (Iceberg is open standard)
SQL at scaleDistributed compute over huge datasets
Zero egressQuery from any cloud without transfer costs

Architecture


Event Sources                    Data Platform                    Query Consumers
┌──────────────────┐      ┌───────────────────────┐      ┌──────────────────┐
│ POS transactions │─────►│ Pipelines             │      │ Metabase/BI      │
│ Order events     │─────►│ (ingest + transform)  │      │ Custom dashboards│
│ Product changes  │─────►│         │              │      │ DuckDB local     │
│ User activity    │─────►│         ▼              │◄─────│ Snowflake        │
│ Agent decisions  │─────►│ R2 (Iceberg tables)   │◄─────│ Apache Spark     │
│ Page views       │─────►│         │              │      │ Trino            │
│ API calls        │─────►│         ▼              │      │ Jupyter notebooks│
│ Sync events      │─────►│ R2 SQL (query engine)  │      │                  │
└──────────────────┘      └───────────────────────┘      └──────────────────┘
                                    │
                            Iceberg REST API
                           (open, standard)

Implementation

Step 1: Define Pipelines


// Pipeline definitions for event ingestion

// 1. Orders pipeline
const ordersPipeline = {
  name: "orders-events",
  source: "http", // or Workers binding
  destination: {
    type: "r2",
    format: "iceberg",
    table: "orders",
    partition: ["tenant_id", "year", "month"],
  },
  transform: `
    SELECT
      event.order_id AS order_id,
      event.tenant_id AS tenant_id,
      event.site_id AS site_id,
      event.customer_id AS customer_id,
      event.total_amount AS total_amount,
      event.currency AS currency,
      event.status AS status,
      event.payment_method AS payment_method,
      event.item_count AS item_count,
      event.created_at AS created_at,
      EXTRACT(YEAR FROM event.created_at) AS year,
      EXTRACT(MONTH FROM event.created_at) AS month
    FROM source AS event
    WHERE event.total_amount > 0
  `,
};

// 2. Product catalog changes
const productsPipeline = {
  name: "products-events",
  source: "workers-binding",
  destination: {
    type: "r2",
    format: "iceberg",
    table: "product_changes",
    partition: ["tenant_id", "year"],
  },
  transform: `
    SELECT
      event.product_id,
      event.tenant_id,
      event.change_type,
      event.old_price,
      event.new_price,
      event.old_stock,
      event.new_stock,
      event.changed_at,
      EXTRACT(YEAR FROM event.changed_at) AS year
    FROM source AS event
  `,
};

// 3. User activity events
const activityPipeline = {
  name: "activity-events",
  source: "http",
  destination: {
    type: "r2",
    format: "iceberg",
    table: "user_activity",
    partition: ["tenant_id", "date"],
  },
};

Step 2: Ingest Events


// From Workers (binding)
export default {
  async fetch(request: Request, env: Env) {
    // After processing an order...
    await env.ORDERS_PIPELINE.send({
      order_id: order.id,
      tenant_id: ctx.tenantId,
      site_id: ctx.siteId,
      customer_id: order.customerId,
      total_amount: order.total,
      currency: order.currency,
      status: order.status,
      payment_method: order.paymentMethod,
      item_count: order.items.length,
      created_at: new Date().toISOString(),
    });

    return Response.json({ success: true });
  },
};

// From HTTP endpoint (external systems like Prestashop)
await fetch("https://pipelines.cloudflare.com/v1/company-manager/orders-events", {
  method: "POST",
  headers: {
    "Authorization": `Bearer ${token}`,
    "Content-Type": "application/json",
  },
  body: JSON.stringify(orderEvent),
});

Step 3: Query with R2 SQL


-- Revenue by tenant (platform-wide analytics)
SELECT
  tenant_id,
  SUM(total_amount) AS total_revenue,
  COUNT(*) AS order_count,
  AVG(total_amount) AS avg_order_value
FROM orders
WHERE year = 2026 AND month = 2
GROUP BY tenant_id
ORDER BY total_revenue DESC;

-- Monthly revenue trend
SELECT
  year,
  month,
  SUM(total_amount) AS revenue,
  COUNT(DISTINCT customer_id) AS unique_customers
FROM orders
WHERE tenant_id = '0195134f-8258-7e85'
GROUP BY year, month
ORDER BY year, month;

-- Product price change analysis
SELECT
  product_id,
  MIN(old_price) AS lowest_price,
  MAX(new_price) AS highest_price,
  COUNT(*) AS price_changes
FROM product_changes
WHERE tenant_id = '0195134f-8258-7e85'
  AND year = 2026
GROUP BY product_id
HAVING COUNT(*) > 5;

Step 4: Connect BI Tools

Via Iceberg REST API (standard protocol):


Endpoint: https://r2-sql.cloudflare.com/v1/{account_id}/catalog
Protocol: Apache Iceberg REST Catalog
Auth: Bearer token

Compatible with:
- Apache Spark
- Snowflake (External Catalog)
- Trino
- DuckDB (iceberg extension)
- Metabase
- Superset

# DuckDB example (local analysis)
import duckdb

conn = duckdb.connect()
conn.execute("""
    INSTALL iceberg;
    LOAD iceberg;
    SELECT * FROM iceberg_scan(
        'https://r2-sql.cloudflare.com/v1/ACCOUNT_ID/catalog/orders'
    )
    WHERE tenant_id = '0195134f-8258-7e85'
    LIMIT 100;
""")

Data Architecture

Table Design

TablePartitionDescription
`orders`tenant_id, year, monthAll order events
`product_changes`tenant_id, yearProduct catalog changes
`user_activity`tenant_id, datePage views, clicks, searches
`agent_decisions`tenant_id, agent_typeAI agent action log
`sync_events`tenant_id, sourcePrestashop sync events
`api_calls`tenant_id, dateAPI usage metrics

Partitioning Strategy

Partition by tenant_id first (required for multi-tenant isolation), then by time for query efficiency:


r2://data-lake/
├── orders/
│   ├── tenant_id=0195134f.../
│   │   ├── year=2026/
│   │   │   ├── month=01/
│   │   │   │   └── data-00001.parquet
│   │   │   └── month=02/
│   │   │       └── data-00001.parquet
│   │   └── year=2025/
│   └── tenant_id=0192d7b9.../
└── user_activity/
    └── ...

Platform Analytics (SaaS Operator View)

This is the unique capability -- analytics across all tenants for the platform operator:


-- Platform health: active tenants this month
SELECT
  COUNT(DISTINCT tenant_id) AS active_tenants,
  SUM(total_amount) AS platform_gmv,
  COUNT(*) AS total_orders
FROM orders
WHERE year = 2026 AND month = 2;

-- Churn risk: tenants with declining orders
SELECT
  tenant_id,
  this_month.orders AS current_orders,
  last_month.orders AS previous_orders,
  (this_month.orders - last_month.orders) * 100.0 / last_month.orders AS change_pct
FROM (
  SELECT tenant_id, COUNT(*) AS orders
  FROM orders WHERE year = 2026 AND month = 2
  GROUP BY tenant_id
) this_month
JOIN (
  SELECT tenant_id, COUNT(*) AS orders
  FROM orders WHERE year = 2026 AND month = 1
  GROUP BY tenant_id
) last_month USING (tenant_id)
WHERE change_pct < -30
ORDER BY change_pct;

Cost

Estimated Impact