Analytics Engine for Time-Series Metrics

Priority: P2 (Strategic)

What is Analytics Engine?

A high-cardinality, high-volume time-series analytics system designed for Workers. Write data points from any Worker; query via SQL API. Uses statistical sampling for scale.

Why This Matters for Company Manager

Current Analytics Pain Points

The platform has \1 with complex SQL aggregations:


// POS sales reporting (pos-sales-reporting.ts) -- 6 separate queries:
// 1. Total revenue, transactions, avg ticket
// 2. Hourly revenue breakdown (EXTRACT(HOUR))
// 3. Top 5 products by revenue
// 4. Payment method distribution
// 5. Employee performance rankings
// 6. Customer served count

// Each query hits the full orders table for the date range

\1 1. Every analytics view triggers multiple raw SQL queries against the production DB 2. No pre-aggregation -- compute aggregates on every request 3. JSON columns for storing breakdowns (AnalyticsMetric.dimensions, SalesReport.details) 4. No time-series optimization 5. Contention with transactional queries

What Analytics Engine Solves

BeforeAfter
6 SQL queries per POS report1 Analytics Engine SQL query
Compute on every requestPre-aggregated at write time
Hits production DBSeparate analytics store
No real-time metricsWrite from any Worker instantly
Limited cardinalityUnlimited-cardinality analytics

Architecture


Events (writes)                    Analytics Engine                  Query (reads)
┌─────────────────┐         ┌──────────────────────┐         ┌─────────────────┐
│ POS transactions│────────►│                      │◄────────│ Dashboard API   │
│ Page views      │────────►│  Analytics Engine     │◄────────│ Report builder  │
│ API calls       │────────►│  (time-series store)  │◄────────│ Custom analytics│
│ Agent actions   │────────►│                      │◄────────│ Usage metering  │
│ Search queries  │────────►│  20 blobs + 20 doubles│         │                │
└─────────────────┘         │  per data point       │         └─────────────────┘
                            └──────────────────────┘
                                 SQL API (HTTP)

Implementation

Step 1: Define Datasets


// wrangler.jsonc
{
  "analytics_engine_datasets": [
    { "binding": "POS_METRICS", "dataset": "pos_metrics" },
    { "binding": "PAGE_VIEWS", "dataset": "page_views" },
    { "binding": "API_METRICS", "dataset": "api_metrics" },
    { "binding": "AGENT_METRICS", "dataset": "agent_metrics" },
    { "binding": "SEARCH_METRICS", "dataset": "search_metrics" }
  ]
}

Step 2: Write POS Metrics

Instrument POS transactions to write analytics on every sale:


// In POS processing Worker or API route
function recordPOSTransaction(env: Env, transaction: POSTransaction) {
  env.POS_METRICS.writeDataPoint({
    indexes: [transaction.tenantId],   // index for fast filtering
    blobs: [
      transaction.siteId,               // blob1: site
      transaction.locationId,            // blob2: location
      transaction.terminalId,            // blob3: terminal
      transaction.employeeId,            // blob4: employee
      transaction.paymentMethod,         // blob5: payment type
      transaction.topProductCategory,    // blob6: category
      new Date().toISOString(),          // blob7: timestamp string
    ],
    doubles: [
      transaction.totalAmount,           // double1: revenue
      transaction.itemCount,             // double2: items
      transaction.discountAmount,        // double3: discounts
      1,                                 // double4: transaction count (for SUM)
      transaction.taxAmount,             // double5: tax
    ],
  });
}

Step 3: Write Page View Metrics


// In middleware or analytics Worker
function recordPageView(env: Env, view: PageView) {
  env.PAGE_VIEWS.writeDataPoint({
    indexes: [view.tenantId],
    blobs: [
      view.siteId,
      view.path,
      view.referrer ?? "",
      view.userAgent,
      view.country ?? "",
      view.deviceType,
    ],
    doubles: [
      view.loadTimeMs,
      1, // page view count
    ],
  });
}

Step 4: Write Agent Performance Metrics


// In AI agent execution
function recordAgentAction(env: Env, action: AgentAction) {
  env.AGENT_METRICS.writeDataPoint({
    indexes: [action.tenantId],
    blobs: [
      action.agentType,
      action.actionType,
      action.status, // "success", "failed", "skipped"
      action.operationType,
    ],
    doubles: [
      action.durationMs,
      action.entitiesAffected,
      action.confidence,
      1, // action count
    ],
  });
}

Step 5: Query via SQL API


// Analytics query service
class AnalyticsEngineService {
  private accountId: string;
  private apiToken: string;

  async query(sql: string): Promise<any[]> {
    const response = await fetch(
      `https://api.cloudflare.com/client/v4/accounts/${this.accountId}/analytics_engine/sql`,
      {
        method: "POST",
        headers: { Authorization: `Bearer ${this.apiToken}` },
        body: sql,
      }
    );
    return response.json();
  }

  // POS Dashboard: hourly revenue
  async getHourlyRevenue(tenantId: string, date: string) {
    return this.query(`
      SELECT
        toHour(timestamp) AS hour,
        SUM(double1) AS revenue,
        SUM(double4) AS transactions,
        AVG(double1) AS avg_ticket
      FROM pos_metrics
      WHERE index1 = '${tenantId}'
        AND timestamp >= '${date}T00:00:00Z'
        AND timestamp < '${date}T23:59:59Z'
      GROUP BY hour
      ORDER BY hour
    `);
  }

  // POS Dashboard: top products
  async getTopProducts(tenantId: string, date: string) {
    return this.query(`
      SELECT
        blob6 AS category,
        SUM(double1) AS revenue,
        SUM(double4) AS transactions
      FROM pos_metrics
      WHERE index1 = '${tenantId}'
        AND timestamp >= '${date}T00:00:00Z'
        AND timestamp < '${date}T23:59:59Z'
      GROUP BY category
      ORDER BY revenue DESC
      LIMIT 10
    `);
  }

  // POS Dashboard: employee performance
  async getEmployeePerformance(tenantId: string, date: string) {
    return this.query(`
      SELECT
        blob4 AS employee_id,
        SUM(double1) AS revenue,
        SUM(double4) AS transactions,
        AVG(double1 / double4) AS avg_ticket
      FROM pos_metrics
      WHERE index1 = '${tenantId}'
        AND timestamp >= '${date}T00:00:00Z'
      GROUP BY employee_id
      ORDER BY revenue DESC
    `);
  }

  // Agent analytics
  async getAgentPerformance(tenantId: string, days: number) {
    return this.query(`
      SELECT
        blob1 AS agent_type,
        blob3 AS status,
        COUNT() AS action_count,
        AVG(double1) AS avg_duration_ms,
        SUM(double2) AS total_entities_affected,
        AVG(double3) AS avg_confidence
      FROM agent_metrics
      WHERE index1 = '${tenantId}'
        AND timestamp >= NOW() - INTERVAL '${days}' DAY
      GROUP BY agent_type, status
      ORDER BY action_count DESC
    `);
  }
}

Step 6: Integrate with TRPC Routers


// Replace expensive PostgreSQL analytics with Analytics Engine
export const posAnalyticsRouter = createTRPCRouter({
  getDashboard: permissionProtectedProcedure(["pos:read"])
    .input(z.object({ date: z.string() }))
    .query(async ({ ctx, input }) => {
      const ae = new AnalyticsEngineService(env);

      // Single parallel fetch instead of 6 sequential SQL queries
      const [hourly, topProducts, employees, paymentMethods] = await Promise.all([
        ae.getHourlyRevenue(ctx.tenantId, input.date),
        ae.getTopProducts(ctx.tenantId, input.date),
        ae.getEmployeePerformance(ctx.tenantId, input.date),
        ae.getPaymentMethodDistribution(ctx.tenantId, input.date),
      ]);

      return { hourly, topProducts, employees, paymentMethods };
    }),
});

Use Cases

1. POS Sales Metrics

Write on every transaction, query for dashboards.

2. Website Analytics

Page views, sessions, bounce rates, conversion funnels.

3. API Usage Metering

Track API calls per tenant for usage-based billing.

4. AI Agent Monitoring

Agent execution times, success rates, confidence scores.

5. Search Analytics

Query patterns, click-through rates, zero-result queries.

6. Custom Tenant Analytics

Expose analytics to tenants via TRPC API for their own dashboards.

Data Model Reference

Each data point:

Sampling

Analytics Engine uses statistical sampling at high volumes:

Limits

MetricLimit
Data points per invocation250
Blob total size per point16 KB
Index size96 bytes
Doubles per point20
Blobs per point20
Data retention3 months

Pricing

Currently not billing. Published future pricing:

Estimated Impact