Analytics Engine for Time-Series Metrics
Priority: P2 (Strategic)
What is Analytics Engine?
A high-cardinality, high-volume time-series analytics system designed for Workers. Write data points from any Worker; query via SQL API. Uses statistical sampling for scale.
Why This Matters for Company Manager
Current Analytics Pain Points
The platform has \1 with complex SQL aggregations:
// POS sales reporting (pos-sales-reporting.ts) -- 6 separate queries:
// 1. Total revenue, transactions, avg ticket
// 2. Hourly revenue breakdown (EXTRACT(HOUR))
// 3. Top 5 products by revenue
// 4. Payment method distribution
// 5. Employee performance rankings
// 6. Customer served count
// Each query hits the full orders table for the date range
\1 1. Every analytics view triggers multiple raw SQL queries against the production DB 2. No pre-aggregation -- compute aggregates on every request 3. JSON columns for storing breakdowns (AnalyticsMetric.dimensions, SalesReport.details) 4. No time-series optimization 5. Contention with transactional queries
What Analytics Engine Solves
| Before | After |
|---|---|
| 6 SQL queries per POS report | 1 Analytics Engine SQL query |
| Compute on every request | Pre-aggregated at write time |
| Hits production DB | Separate analytics store |
| No real-time metrics | Write from any Worker instantly |
| Limited cardinality | Unlimited-cardinality analytics |
Architecture
Events (writes) Analytics Engine Query (reads)
┌─────────────────┐ ┌──────────────────────┐ ┌─────────────────┐
│ POS transactions│────────►│ │◄────────│ Dashboard API │
│ Page views │────────►│ Analytics Engine │◄────────│ Report builder │
│ API calls │────────►│ (time-series store) │◄────────│ Custom analytics│
│ Agent actions │────────►│ │◄────────│ Usage metering │
│ Search queries │────────►│ 20 blobs + 20 doubles│ │ │
└─────────────────┘ │ per data point │ └─────────────────┘
└──────────────────────┘
SQL API (HTTP)
Implementation
Step 1: Define Datasets
// wrangler.jsonc
{
"analytics_engine_datasets": [
{ "binding": "POS_METRICS", "dataset": "pos_metrics" },
{ "binding": "PAGE_VIEWS", "dataset": "page_views" },
{ "binding": "API_METRICS", "dataset": "api_metrics" },
{ "binding": "AGENT_METRICS", "dataset": "agent_metrics" },
{ "binding": "SEARCH_METRICS", "dataset": "search_metrics" }
]
}
Step 2: Write POS Metrics
Instrument POS transactions to write analytics on every sale:
// In POS processing Worker or API route
function recordPOSTransaction(env: Env, transaction: POSTransaction) {
env.POS_METRICS.writeDataPoint({
indexes: [transaction.tenantId], // index for fast filtering
blobs: [
transaction.siteId, // blob1: site
transaction.locationId, // blob2: location
transaction.terminalId, // blob3: terminal
transaction.employeeId, // blob4: employee
transaction.paymentMethod, // blob5: payment type
transaction.topProductCategory, // blob6: category
new Date().toISOString(), // blob7: timestamp string
],
doubles: [
transaction.totalAmount, // double1: revenue
transaction.itemCount, // double2: items
transaction.discountAmount, // double3: discounts
1, // double4: transaction count (for SUM)
transaction.taxAmount, // double5: tax
],
});
}
Step 3: Write Page View Metrics
// In middleware or analytics Worker
function recordPageView(env: Env, view: PageView) {
env.PAGE_VIEWS.writeDataPoint({
indexes: [view.tenantId],
blobs: [
view.siteId,
view.path,
view.referrer ?? "",
view.userAgent,
view.country ?? "",
view.deviceType,
],
doubles: [
view.loadTimeMs,
1, // page view count
],
});
}
Step 4: Write Agent Performance Metrics
// In AI agent execution
function recordAgentAction(env: Env, action: AgentAction) {
env.AGENT_METRICS.writeDataPoint({
indexes: [action.tenantId],
blobs: [
action.agentType,
action.actionType,
action.status, // "success", "failed", "skipped"
action.operationType,
],
doubles: [
action.durationMs,
action.entitiesAffected,
action.confidence,
1, // action count
],
});
}
Step 5: Query via SQL API
// Analytics query service
class AnalyticsEngineService {
private accountId: string;
private apiToken: string;
async query(sql: string): Promise<any[]> {
const response = await fetch(
`https://api.cloudflare.com/client/v4/accounts/${this.accountId}/analytics_engine/sql`,
{
method: "POST",
headers: { Authorization: `Bearer ${this.apiToken}` },
body: sql,
}
);
return response.json();
}
// POS Dashboard: hourly revenue
async getHourlyRevenue(tenantId: string, date: string) {
return this.query(`
SELECT
toHour(timestamp) AS hour,
SUM(double1) AS revenue,
SUM(double4) AS transactions,
AVG(double1) AS avg_ticket
FROM pos_metrics
WHERE index1 = '${tenantId}'
AND timestamp >= '${date}T00:00:00Z'
AND timestamp < '${date}T23:59:59Z'
GROUP BY hour
ORDER BY hour
`);
}
// POS Dashboard: top products
async getTopProducts(tenantId: string, date: string) {
return this.query(`
SELECT
blob6 AS category,
SUM(double1) AS revenue,
SUM(double4) AS transactions
FROM pos_metrics
WHERE index1 = '${tenantId}'
AND timestamp >= '${date}T00:00:00Z'
AND timestamp < '${date}T23:59:59Z'
GROUP BY category
ORDER BY revenue DESC
LIMIT 10
`);
}
// POS Dashboard: employee performance
async getEmployeePerformance(tenantId: string, date: string) {
return this.query(`
SELECT
blob4 AS employee_id,
SUM(double1) AS revenue,
SUM(double4) AS transactions,
AVG(double1 / double4) AS avg_ticket
FROM pos_metrics
WHERE index1 = '${tenantId}'
AND timestamp >= '${date}T00:00:00Z'
GROUP BY employee_id
ORDER BY revenue DESC
`);
}
// Agent analytics
async getAgentPerformance(tenantId: string, days: number) {
return this.query(`
SELECT
blob1 AS agent_type,
blob3 AS status,
COUNT() AS action_count,
AVG(double1) AS avg_duration_ms,
SUM(double2) AS total_entities_affected,
AVG(double3) AS avg_confidence
FROM agent_metrics
WHERE index1 = '${tenantId}'
AND timestamp >= NOW() - INTERVAL '${days}' DAY
GROUP BY agent_type, status
ORDER BY action_count DESC
`);
}
}
Step 6: Integrate with TRPC Routers
// Replace expensive PostgreSQL analytics with Analytics Engine
export const posAnalyticsRouter = createTRPCRouter({
getDashboard: permissionProtectedProcedure(["pos:read"])
.input(z.object({ date: z.string() }))
.query(async ({ ctx, input }) => {
const ae = new AnalyticsEngineService(env);
// Single parallel fetch instead of 6 sequential SQL queries
const [hourly, topProducts, employees, paymentMethods] = await Promise.all([
ae.getHourlyRevenue(ctx.tenantId, input.date),
ae.getTopProducts(ctx.tenantId, input.date),
ae.getEmployeePerformance(ctx.tenantId, input.date),
ae.getPaymentMethodDistribution(ctx.tenantId, input.date),
]);
return { hourly, topProducts, employees, paymentMethods };
}),
});
Use Cases
1. POS Sales Metrics
Write on every transaction, query for dashboards.
2. Website Analytics
Page views, sessions, bounce rates, conversion funnels.
3. API Usage Metering
Track API calls per tenant for usage-based billing.
4. AI Agent Monitoring
Agent execution times, success rates, confidence scores.
5. Search Analytics
Query patterns, click-through rates, zero-result queries.
6. Custom Tenant Analytics
Expose analytics to tenants via TRPC API for their own dashboards.
Data Model Reference
Each data point:
- **1 index** (96 bytes max) -- primary partition key (use tenantId)
- **20 blobs** (strings, 16 KB total) -- dimensions
- **20 doubles** (numbers) -- measures
- **Automatic timestamp** -- appended by the system
Sampling
Analytics Engine uses statistical sampling at high volumes:
- Low volume: exact counts
- High volume: sampled with `sampleInterval` column for adjustment
- Adjust queries: `SUM(double1) * sampleInterval` for accurate totals
Limits
| Metric | Limit |
|---|---|
| Data points per invocation | 250 |
| Blob total size per point | 16 KB |
| Index size | 96 bytes |
| Doubles per point | 20 |
| Blobs per point | 20 |
| Data retention | 3 months |
Pricing
Currently not billing. Published future pricing:
- Based on data points written + SQL read queries
- No charge for dimensions, cardinality, or data complexity
Estimated Impact
- **Query performance**: 6 SQL queries → 1-4 Analytics Engine queries (10x faster)
- **DB load**: Remove analytics queries from production PostgreSQL entirely
- **Real-time**: Data available for query within seconds of write
- **Scale**: Handle millions of data points without impacting transactional DB
- **Cost**: Free during beta, expected to be very cheap
- **Effort**: 2-3 weeks for core metrics pipeline