diff --git a/README.md b/README.md index becb78e..9b03155 100644 --- a/README.md +++ b/README.md @@ -197,6 +197,54 @@ curl "$AGENT_ANALYTICS_URL/events?project=marketing-site&event=signup_click&sinc The hosted `login` flow in the CLI is for Agent Analytics Cloud. For this self-hosted OSS server, the simplest path is `AGENT_ANALYTICS_URL` plus `AGENT_ANALYTICS_API_KEY`. +## Trustworthy Growth Queries + +`POST /query` is the main agent-facing read surface for custom growth analysis. It supports both event metrics and session-backed metrics on the same filtered event set: + +- `event_count` +- `unique_users` +- `session_count` +- `bounce_rate` +- `avg_duration` + +That means an agent can stay on the existing OSS contract and still segment noisy traffic out of product reporting before drawing conclusions. + +Segment product traffic away from marketing/docs/internal reads: + +```bash +curl "$AGENT_ANALYTICS_URL/query" \ + -H "Content-Type: application/json" \ + -H "X-API-Key: $AGENT_ANALYTICS_API_KEY" \ + -d '{ + "project": "agentanalytics-landing", + "metrics": ["event_count", "session_count", "bounce_rate", "avg_duration"], + "group_by": ["event"], + "filters": [ + { "field": "properties.hostname", "op": "eq", "value": "app.agentanalytics.sh" }, + { "field": "event", "op": "neq", "value": "api_read" } + ], + "order_by": "event_count", + "order": "desc", + "limit": 20 + }' +``` + +Check activation events without trusting blended totals: + +```bash +curl "$AGENT_ANALYTICS_URL/query" \ + -H "Content-Type: application/json" \ + -H "X-API-Key: $AGENT_ANALYTICS_API_KEY" \ + -d '{ + "project": "agentanalytics-landing", + "metrics": ["event_count", "unique_users", "session_count"], + "group_by": ["event"], + "filters": [ + { "field": "event", "op": "contains", "value": "project" } + ] + }' +``` + ## What Gets Tracked and Stored - Default `page_view` events include full URL, pathname, hostname, referrer, title, screen resolution, language, browser, browser version, OS, device type, timezone, UTM parameters, session count, and first-touch attribution. @@ -237,8 +285,10 @@ The self-hosted server in this repo exposes these routes: - `GET /projects` - `GET /stats?project=...` - `GET /events?project=...` +- `POST /query` +- `GET /properties?project=...` -Read endpoints require `X-API-Key`. Write endpoints use the public project token in the JSON body. The public docs and OpenAPI spec cover the broader Agent Analytics platform too, so treat the list above as the source of truth for the OSS server in this repo. +Read endpoints require `X-API-Key`. Write endpoints use the public project token in the JSON body. Richer read endpoints such as sessions, breakdowns, pages, and heatmaps are intentionally not exposed by this OSS server even though the shared core package still contains those internal adapter methods. The public docs and OpenAPI spec cover the broader Agent Analytics platform too, so treat the list above as the source of truth for the OSS server in this repo. ## Contributing diff --git a/schema.sql b/schema.sql index d32d2e2..3d0672f 100644 --- a/schema.sql +++ b/schema.sql @@ -8,7 +8,8 @@ CREATE TABLE IF NOT EXISTS events ( user_id TEXT, session_id TEXT, timestamp INTEGER NOT NULL, - date TEXT NOT NULL + date TEXT NOT NULL, + country TEXT ); CREATE INDEX IF NOT EXISTS idx_events_project_date ON events(project_id, date); @@ -30,3 +31,13 @@ CREATE TABLE IF NOT EXISTS sessions ( CREATE INDEX IF NOT EXISTS idx_sessions_project_date ON sessions(project_id, date); CREATE INDEX IF NOT EXISTS idx_sessions_user ON sessions(project_id, user_id); + +CREATE TABLE IF NOT EXISTS identity_map ( + previous_id TEXT NOT NULL, + canonical_id TEXT NOT NULL, + project_id TEXT NOT NULL, + created_at INTEGER NOT NULL, + PRIMARY KEY (previous_id, project_id) +); + +CREATE INDEX IF NOT EXISTS idx_identity_canonical ON identity_map(canonical_id, project_id); diff --git a/src/__tests__/server.test.mjs b/src/__tests__/server.test.mjs index 162192a..6b7dbb7 100644 --- a/src/__tests__/server.test.mjs +++ b/src/__tests__/server.test.mjs @@ -4,10 +4,15 @@ * Uses SqliteAdapter with :memory: DB + createAnalyticsHandler from core. * Tests every endpoint the handler exposes. */ +import Database from 'better-sqlite3'; +import { mkdtempSync, rmSync } from 'node:fs'; +import { join } from 'node:path'; +import { tmpdir } from 'node:os'; import { describe, it, expect, beforeAll } from 'vitest'; import { createAnalyticsHandler } from '@agent-analytics/core'; import { SqliteAdapter } from '../db/sqlite.js'; import { makeValidateWrite, makeValidateRead } from '../auth.js'; +import cloudflareWorker from '../platforms/cloudflare.js'; const PROJECT = 'test-project'; const TOKEN = 'pt_test'; @@ -41,6 +46,49 @@ function get(path, headers = {}) { return new Request(`http://localhost${path}`, { headers: { 'User-Agent': BROWSER_UA, ...headers } }); } +class FakeD1Statement { + constructor(db, sql, params = []) { + this.db = db; + this.sql = sql; + this.params = params; + } + + bind(...params) { + return new FakeD1Statement(this.db, this.sql, params); + } + + _runSync() { + return this.db.prepare(this.sql).run(...this.params); + } + + async run() { + return this._runSync(); + } + + async all() { + return { results: this.db.prepare(this.sql).all(...this.params) }; + } + + async first() { + return this.db.prepare(this.sql).get(...this.params) || null; + } +} + +class FakeD1Database { + constructor(db) { + this.db = db; + } + + prepare(sql) { + return new FakeD1Statement(this.db, sql); + } + + async batch(statements) { + const txn = this.db.transaction((entries) => entries.map((entry) => entry._runSync())); + return txn(statements); + } +} + // --- /health --- describe('GET /health', () => { @@ -129,6 +177,271 @@ describe('POST /track', () => { if (writeOps) await Promise.all(writeOps); expect(response.status).toBe(200); }); + + it('stitches an anonymous user to a known user via /identify', async () => { + const anonId = 'anon-user-1'; + + const tracked = await handler(postJSON('/track', { + token: TOKEN, + project: PROJECT, + event: 'page_view', + properties: { path: '/signup' }, + user_id: anonId, + session_id: 'sess-identify', + })); + if (tracked.writeOps) await Promise.all(tracked.writeOps); + + const identified = await handler(postJSON('/identify', { + token: TOKEN, + project: PROJECT, + previous_id: anonId, + user_id: 'user-1', + })); + + expect(identified.response.status).toBe(200); + + const { response } = await handler(get(`/events?project=${PROJECT}&session_id=sess-identify`, authHeaders)); + const data = await response.json(); + expect(data.events[0].user_id).toBe('user-1'); + }); + + it('canonicalizes a late event after /identify has already been recorded', async () => { + const anonId = 'anon-user-late'; + const sessionId = 'sess-identify-late'; + + const identified = await handler(postJSON('/identify', { + token: TOKEN, + project: PROJECT, + previous_id: anonId, + user_id: 'user-late', + })); + + expect(identified.response.status).toBe(200); + + const tracked = await handler(postJSON('/track', { + token: TOKEN, + project: PROJECT, + event: 'signup', + properties: { path: '/signup' }, + user_id: anonId, + session_id: sessionId, + })); + if (tracked.writeOps) await Promise.all(tracked.writeOps); + + const { response: eventsResponse } = await handler(get(`/events?project=${PROJECT}&session_id=${sessionId}`, authHeaders)); + const eventsData = await eventsResponse.json(); + expect(eventsData.events[0].user_id).toBe('user-late'); + + const query = postJSON('/query', { + project: PROJECT, + metrics: ['unique_users'], + filters: [ + { field: 'event', op: 'eq', value: 'signup' }, + { field: 'session_id', op: 'eq', value: sessionId }, + ], + }); + query.headers.set('X-API-Key', API_KEY); + + const { response: queryResponse } = await handler(query); + const queryData = await queryResponse.json(); + expect(queryData.rows[0].unique_users).toBe(1); + }); +}); + +describe('SqliteAdapter hardening', () => { + it('canonicalizes a delayed write that starts before /identify and commits after it', async () => { + class RaceSqliteAdapter extends SqliteAdapter { + beforeNextBatch = null; + + async _batch(statements) { + if (this.beforeNextBatch) { + const hook = this.beforeNextBatch; + this.beforeNextBatch = null; + await hook(); + } + + return super._batch(statements); + } + } + + const adapter = new RaceSqliteAdapter(':memory:'); + const anonId = 'anon-race-user'; + const canonicalId = 'user-race'; + const sessionId = 'sess-race'; + + adapter.beforeNextBatch = async () => { + await adapter.identifyUser({ + project: PROJECT, + previous_id: anonId, + canonical_id: canonicalId, + }); + }; + + await adapter.trackEvent({ + project: PROJECT, + event: 'signup', + properties: { path: '/signup' }, + user_id: anonId, + session_id: sessionId, + timestamp: Date.now(), + }); + + const events = await adapter.getEvents({ project: PROJECT, session_id: sessionId }); + expect(events).toHaveLength(1); + expect(events[0].user_id).toBe(canonicalId); + + const sessions = await adapter.getSessions({ project: PROJECT, user_id: canonicalId }); + expect(sessions.some(session => session.session_id === sessionId)).toBe(true); + + adapter.db.close(); + }); + + it('migrates legacy sqlite files before session-backed queries read the new country column', async () => { + const tempDir = mkdtempSync(join(tmpdir(), 'agent-analytics-')); + const dbPath = join(tempDir, 'legacy.db'); + const legacyDb = new Database(dbPath); + const now = Date.now(); + const today = new Date(now).toISOString().split('T')[0]; + + legacyDb.exec(` + CREATE TABLE events ( + id TEXT PRIMARY KEY, + project_id TEXT NOT NULL, + event TEXT NOT NULL, + properties TEXT, + user_id TEXT, + session_id TEXT, + timestamp INTEGER NOT NULL, + date TEXT NOT NULL + ); + CREATE INDEX idx_events_project_date ON events(project_id, date); + CREATE INDEX idx_events_session ON events(session_id); + CREATE TABLE sessions ( + session_id TEXT PRIMARY KEY, + user_id TEXT, + project_id TEXT NOT NULL, + start_time INTEGER NOT NULL, + end_time INTEGER NOT NULL, + duration INTEGER DEFAULT 0, + entry_page TEXT, + exit_page TEXT, + event_count INTEGER DEFAULT 1, + is_bounce INTEGER DEFAULT 1, + date TEXT NOT NULL + ); + CREATE INDEX idx_sessions_project_date ON sessions(project_id, date); + CREATE INDEX idx_sessions_user ON sessions(project_id, user_id); + `); + + legacyDb.prepare(` + INSERT INTO events (id, project_id, event, properties, user_id, session_id, timestamp, date) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + `).run('legacy-event', PROJECT, 'page_view', JSON.stringify({ path: '/legacy' }), 'legacy-user', 'legacy-session', now, today); + legacyDb.prepare(` + INSERT INTO sessions (session_id, user_id, project_id, start_time, end_time, duration, entry_page, exit_page, event_count, is_bounce, date) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `).run('legacy-session', 'legacy-user', PROJECT, now, now + 1500, 1500, '/legacy', '/legacy', 1, 1, today); + legacyDb.close(); + + const adapter = new SqliteAdapter(dbPath); + + try { + const columns = adapter.db.prepare('PRAGMA table_info(events)').all().map(column => column.name); + expect(columns).toContain('country'); + + const result = await adapter.query({ + project: PROJECT, + metrics: ['event_count', 'session_count', 'bounce_rate', 'avg_duration'], + group_by: ['event'], + }); + + expect(result.rows).toHaveLength(1); + expect(result.rows[0].event).toBe('page_view'); + expect(result.rows[0].event_count).toBe(1); + expect(result.rows[0].session_count).toBe(1); + } finally { + adapter.db.close(); + rmSync(tempDir, { recursive: true, force: true }); + } + }); +}); + +describe('Cloudflare D1 hardening', () => { + it('upgrades legacy D1 schemas before /track and /identify writes run', async () => { + const legacyDb = new Database(':memory:'); + legacyDb.exec(` + CREATE TABLE events ( + id TEXT PRIMARY KEY, + project_id TEXT NOT NULL, + event TEXT NOT NULL, + properties TEXT, + user_id TEXT, + session_id TEXT, + timestamp INTEGER NOT NULL, + date TEXT NOT NULL + ); + CREATE INDEX idx_events_project_date ON events(project_id, date); + CREATE INDEX idx_events_session ON events(session_id); + CREATE TABLE sessions ( + session_id TEXT PRIMARY KEY, + user_id TEXT, + project_id TEXT NOT NULL, + start_time INTEGER NOT NULL, + end_time INTEGER NOT NULL, + duration INTEGER DEFAULT 0, + entry_page TEXT, + exit_page TEXT, + event_count INTEGER DEFAULT 1, + is_bounce INTEGER DEFAULT 1, + date TEXT NOT NULL + ); + CREATE INDEX idx_sessions_project_date ON sessions(project_id, date); + CREATE INDEX idx_sessions_user ON sessions(project_id, user_id); + `); + + const env = { + DB: new FakeD1Database(legacyDb), + PROJECT_TOKENS: TOKEN, + API_KEYS: API_KEY, + }; + const waitUntilOps = []; + const ctx = { + waitUntil(promise) { + waitUntilOps.push(promise); + }, + }; + + const trackResponse = await cloudflareWorker.fetch(postJSON('/track', { + token: TOKEN, + project: PROJECT, + event: 'page_view', + properties: { path: '/legacy-d1' }, + user_id: 'anon-d1-user', + session_id: 'legacy-d1-session', + }), env, ctx); + await Promise.all(waitUntilOps); + + expect(trackResponse.status).toBe(200); + + const identifyResponse = await cloudflareWorker.fetch(postJSON('/identify', { + token: TOKEN, + project: PROJECT, + previous_id: 'anon-d1-user', + user_id: 'user-d1', + }), env, ctx); + expect(identifyResponse.status).toBe(200); + + const columns = legacyDb.prepare('PRAGMA table_info(events)').all().map((column) => column.name); + expect(columns).toContain('country'); + + const identityColumns = legacyDb.prepare('PRAGMA table_info(identity_map)').all().map((column) => column.name); + expect(identityColumns).toContain('canonical_id'); + + const eventUser = legacyDb.prepare('SELECT user_id FROM events WHERE session_id = ?').get('legacy-d1-session'); + expect(eventUser.user_id).toBe('user-d1'); + + legacyDb.close(); + }); }); // --- /track/batch --- @@ -259,6 +572,61 @@ describe('OSS analytics endpoints', () => { expect(response.status).toBe(200); }); + it('returns session-backed metrics for /query', async () => { + const project = `session-metrics-${Date.now()}`; + const now = Date.now(); + + const firstPageView = await handler(postJSON('/track', { + token: TOKEN, + project, + event: 'page_view', + properties: { path: '/' }, + user_id: 'query-user-1', + session_id: 'query-session-1', + timestamp: now, + })); + if (firstPageView.writeOps) await Promise.all(firstPageView.writeOps); + + const signup = await handler(postJSON('/track', { + token: TOKEN, + project, + event: 'signup', + properties: { path: '/signup' }, + user_id: 'query-user-1', + session_id: 'query-session-1', + timestamp: now + 1000, + })); + if (signup.writeOps) await Promise.all(signup.writeOps); + + const secondPageView = await handler(postJSON('/track', { + token: TOKEN, + project, + event: 'page_view', + properties: { path: '/' }, + user_id: 'query-user-2', + session_id: 'query-session-2', + timestamp: now, + })); + if (secondPageView.writeOps) await Promise.all(secondPageView.writeOps); + + const req = postJSON('/query', { + project, + metrics: ['event_count', 'session_count', 'bounce_rate', 'avg_duration'], + group_by: ['event'], + order_by: 'event', + order: 'asc', + }); + req.headers.set('X-API-Key', API_KEY); + const { response } = await handler(req); + expect(response.status).toBe(200); + const data = await response.json(); + const pageViewRow = data.rows.find(row => row.event === 'page_view'); + expect(pageViewRow).toBeDefined(); + expect(pageViewRow.session_count).toBe(2); + expect(pageViewRow.bounce_rate).toBe(0.5); + expect(pageViewRow.avg_duration).toBe(500); + }); + it('returns 200 for /properties', async () => { const { response } = await handler(get(`/properties?project=${PROJECT}`, authHeaders)); expect(response.status).toBe(200); @@ -294,7 +662,7 @@ describe('GET /projects', () => { const data = await response.json(); expect(Array.isArray(data.projects)).toBe(true); expect(data.projects.length).toBeGreaterThan(0); - expect(data.projects[0].id).toBe(PROJECT); + expect(data.projects.some(project => project.id === PROJECT)).toBe(true); }); it('rejects without API key', async () => { diff --git a/src/db/d1.js b/src/db/d1.js new file mode 100644 index 0000000..916c640 --- /dev/null +++ b/src/db/d1.js @@ -0,0 +1,103 @@ +import { D1Adapter as CoreD1Adapter } from '@agent-analytics/core'; +import { ulid } from '@agent-analytics/core/ulid'; +import { + buildEventInsertStatement, + buildIdentifyStatements, + buildSessionUpsertStatement, +} from './identity-aware.js'; +import { queryWithSessionMetrics } from './query.js'; + +const schemaCompatibility = new WeakMap(); + +export function ensureD1Compatibility(db) { + let pending = schemaCompatibility.get(db); + if (pending) return pending; + + pending = (async () => { + const eventColumns = await db.prepare('PRAGMA table_info(events)').all(); + const hasCountry = eventColumns.results.some(column => column.name === 'country'); + + if (eventColumns.results.length > 0 && !hasCountry) { + await db.prepare('ALTER TABLE events ADD COLUMN country TEXT').run(); + } + + await db.prepare(` + CREATE TABLE IF NOT EXISTS identity_map ( + previous_id TEXT NOT NULL, + canonical_id TEXT NOT NULL, + project_id TEXT NOT NULL, + created_at INTEGER NOT NULL, + PRIMARY KEY (previous_id, project_id) + ) + `).run(); + + await db.prepare('CREATE INDEX IF NOT EXISTS idx_identity_canonical ON identity_map(canonical_id, project_id)').run(); + })().catch((error) => { + schemaCompatibility.delete(db); + throw error; + }); + + schemaCompatibility.set(db, pending); + return pending; +} + +export class D1Adapter extends CoreD1Adapter { + _sessionUpsertSqlAndParams(project, eventData) { + return buildSessionUpsertStatement({ + project, + session_id: eventData.session_id, + user_id: eventData.user_id, + timestamp: eventData.timestamp, + properties: eventData.properties, + count: eventData._count || 1, + }); + } + + async trackEvent(eventData) { + const eventStatement = buildEventInsertStatement({ + id: ulid(), + ...eventData, + }); + + if (!eventData.session_id) { + return this._run(eventStatement.sql, eventStatement.params); + } + + const sessionStatement = this._sessionUpsertSqlAndParams(eventData.project, eventData); + return this._batch([eventStatement, sessionStatement]); + } + + async trackBatch(events) { + const statements = []; + + for (const event of events) { + statements.push(buildEventInsertStatement({ + id: ulid(), + ...event, + })); + } + + for (const event of events) { + if (!event.session_id) continue; + statements.push(this._sessionUpsertSqlAndParams(event.project, event)); + } + + return this._batch(statements); + } + + async upsertSession(sessionData) { + const statement = this._sessionUpsertSqlAndParams( + sessionData.project_id || sessionData.project, + sessionData, + ); + return this._run(statement.sql, statement.params); + } + + async identifyUser(identityData) { + return this._batch(buildIdentifyStatements(identityData)); + } + + async query(args) { + return queryWithSessionMetrics(this, args); + } +} diff --git a/src/db/identity-aware.js b/src/db/identity-aware.js new file mode 100644 index 0000000..6167e66 --- /dev/null +++ b/src/db/identity-aware.js @@ -0,0 +1,112 @@ +function formatDate(timestamp) { + return new Date(timestamp).toISOString().split('T')[0]; +} + +function canonicalUserExpression(userId, project) { + if (!userId || !project) { + return { sql: 'NULL', params: [] }; + } + + return { + sql: `COALESCE( + (SELECT canonical_id FROM identity_map WHERE previous_id = ? AND project_id = ?), + ? + )`, + params: [userId, project, userId], + }; +} + +export function buildEventInsertStatement({ id, project, event, properties, user_id, session_id, timestamp, country }) { + const ts = timestamp || Date.now(); + const date = formatDate(ts); + const canonicalUser = canonicalUserExpression(user_id, project); + + return { + sql: `INSERT INTO events (id, project_id, event, properties, user_id, session_id, timestamp, date, country) + VALUES (?, ?, ?, ?, ${canonicalUser.sql}, ?, ?, ?, ?)`, + params: [ + id, + project, + event, + properties ? JSON.stringify(properties) : null, + ...canonicalUser.params, + session_id || null, + ts, + date, + country || null, + ], + }; +} + +export function buildSessionUpsertStatement({ project, session_id, user_id, timestamp, properties, count = 1 }) { + const ts = timestamp || Date.now(); + const date = formatDate(ts); + const page = (properties && typeof properties === 'object') + ? (properties.path || properties.url || null) + : null; + const canonicalUser = canonicalUserExpression(user_id, project); + + return { + sql: `INSERT INTO sessions (session_id, user_id, project_id, start_time, end_time, duration, entry_page, exit_page, event_count, is_bounce, date) + VALUES (?, ${canonicalUser.sql}, ?, ?, ?, 0, ?, ?, ?, 1, ?) + ON CONFLICT(session_id) DO UPDATE SET + user_id = COALESCE(excluded.user_id, sessions.user_id), + start_time = MIN(sessions.start_time, excluded.start_time), + end_time = MAX(sessions.end_time, excluded.end_time), + duration = MAX(sessions.end_time, excluded.end_time) - MIN(sessions.start_time, excluded.start_time), + entry_page = CASE WHEN excluded.start_time < sessions.start_time THEN excluded.entry_page ELSE sessions.entry_page END, + exit_page = CASE WHEN excluded.end_time >= sessions.end_time THEN excluded.exit_page ELSE sessions.exit_page END, + event_count = sessions.event_count + excluded.event_count, + is_bounce = CASE WHEN sessions.event_count + excluded.event_count > 1 THEN 0 ELSE 1 END`, + params: [ + session_id, + ...canonicalUser.params, + project, + ts, + ts, + page, + page, + count, + date, + ], + }; +} + +export function buildIdentifyStatements({ project, previous_id, canonical_id, created_at = Date.now() }) { + const canonicalUser = canonicalUserExpression(canonical_id, project); + + return [ + { + sql: `UPDATE identity_map + SET canonical_id = ${canonicalUser.sql} + WHERE canonical_id = ? AND project_id = ?`, + params: [...canonicalUser.params, previous_id, project], + }, + { + sql: `INSERT INTO identity_map (previous_id, canonical_id, project_id, created_at) + VALUES (?, ${canonicalUser.sql}, ?, ?) + ON CONFLICT(previous_id, project_id) DO UPDATE SET + canonical_id = ${canonicalUser.sql}, + created_at = excluded.created_at`, + params: [ + previous_id, + ...canonicalUser.params, + project, + created_at, + ...canonicalUser.params, + ], + }, + { + sql: `UPDATE events + SET user_id = ${canonicalUser.sql} + WHERE user_id = ? AND project_id = ?`, + params: [...canonicalUser.params, previous_id, project], + }, + { + sql: `UPDATE sessions + SET user_id = ${canonicalUser.sql} + WHERE user_id = ? AND project_id = ?`, + params: [...canonicalUser.params, previous_id, project], + }, + ]; +} diff --git a/src/db/query.js b/src/db/query.js new file mode 100644 index 0000000..423aebd --- /dev/null +++ b/src/db/query.js @@ -0,0 +1,214 @@ +import { validatePropertyKey } from '@agent-analytics/core/base-adapter'; +import { today, parseSince } from '../../node_modules/@agent-analytics/core/src/db/adapter.js'; +import { AnalyticsError, ERROR_CODES } from '../../node_modules/@agent-analytics/core/src/errors.js'; +import { + METRICS, + ALLOWED_METRICS, + GROUP_BY_FIELDS, + ALLOWED_GROUP_BY, + FILTER_OPS, + FILTERABLE_FIELDS, + ALLOWED_ORDER_BY, + DEFAULT_LIMIT, + MAX_LIMIT, +} from '../../node_modules/@agent-analytics/core/src/constants.js'; + +export async function queryWithSessionMetrics(adapter, { + project, + metrics = [METRICS.EVENT_COUNT], + filters, + date_from, + date_to, + group_by = [], + order_by, + order, + limit = DEFAULT_LIMIT, +}) { + for (const metric of metrics) { + if (!ALLOWED_METRICS.includes(metric)) { + throw new AnalyticsError( + ERROR_CODES.INVALID_METRIC, + `invalid metric: ${metric}. allowed: ${ALLOWED_METRICS.join(', ')}`, + 400, + ); + } + } + + for (const groupField of group_by) { + if (!ALLOWED_GROUP_BY.includes(groupField)) { + throw new AnalyticsError( + ERROR_CODES.INVALID_GROUP_BY, + `invalid group_by: ${groupField}. allowed: ${ALLOWED_GROUP_BY.join(', ')}`, + 400, + ); + } + } + + const selectParts = [...group_by]; + for (const metric of metrics) { + if (metric === METRICS.EVENT_COUNT) selectParts.push('COUNT(*) as event_count'); + if (metric === METRICS.UNIQUE_USERS) selectParts.push('COUNT(DISTINCT user_id) as unique_users'); + if (metric === METRICS.SESSION_COUNT) selectParts.push('COUNT(DISTINCT session_id) as session_count'); + if (metric === METRICS.BOUNCE_RATE) selectParts.push('COUNT(DISTINCT session_id) as _session_count_for_bounce'); + if (metric === METRICS.AVG_DURATION) selectParts.push('COUNT(DISTINCT session_id) as _session_count_for_duration'); + } + if (selectParts.length === 0) selectParts.push('COUNT(*) as event_count'); + + const fromDate = parseSince(date_from); + const toDate = date_to || today(); + const whereParts = ['project_id = ?', 'date >= ?', 'date <= ?']; + const params = [project, fromDate, toDate]; + + if (filters && Array.isArray(filters)) { + for (const filter of filters) { + if (!filter.field || !filter.op || filter.value === undefined) continue; + + const sqlOp = FILTER_OPS[filter.op]; + if (!sqlOp) { + throw new AnalyticsError( + ERROR_CODES.INVALID_FILTER_OP, + `invalid filter op: ${filter.op}. allowed: ${Object.keys(FILTER_OPS).join(', ')}`, + 400, + ); + } + + if (FILTERABLE_FIELDS.includes(filter.field)) { + if (filter.op === 'contains') { + whereParts.push(`${filter.field} LIKE '%' || ? || '%'`); + } else { + whereParts.push(`${filter.field} ${sqlOp} ?`); + } + params.push(filter.value); + continue; + } + + if (filter.field.startsWith('properties.')) { + const propKey = filter.field.replace('properties.', ''); + validatePropertyKey(propKey); + if (filter.op === 'contains') { + whereParts.push(`json_extract(properties, '$.${propKey}') LIKE '%' || ? || '%'`); + } else { + whereParts.push(`json_extract(properties, '$.${propKey}') ${sqlOp} ?`); + } + params.push(filter.value); + } + } + } + + const usesSessionMetrics = metrics.includes(METRICS.BOUNCE_RATE) || metrics.includes(METRICS.AVG_DURATION); + const usesEventMetrics = metrics.includes(METRICS.EVENT_COUNT) + || metrics.includes(METRICS.UNIQUE_USERS) + || metrics.includes(METRICS.SESSION_COUNT); + + let sql; + if (!usesSessionMetrics) { + sql = `SELECT ${selectParts.join(', ')} FROM events WHERE ${whereParts.join(' AND ')}`; + if (group_by.length > 0) sql += ` GROUP BY ${group_by.join(', ')}`; + } else { + const ctes = [ + `filtered_events AS ( + SELECT event, date, user_id, session_id, country + FROM events + WHERE ${whereParts.join(' AND ')} + )`, + ]; + const cteParams = [...params]; + + if (usesEventMetrics || group_by.length > 0) { + ctes.push(`event_agg AS ( + SELECT ${selectParts.join(', ')} + FROM filtered_events + ${group_by.length > 0 ? `GROUP BY ${group_by.join(', ')}` : ''} + )`); + } + + cteParams.push(project); + const sessionMetricBaseSelect = []; + if (group_by.length > 0) { + sessionMetricBaseSelect.push(...group_by.map(field => `fe.${field} as ${field}`)); + } + sessionMetricBaseSelect.push( + 'fe.session_id as session_id', + 'MAX(COALESCE(s.is_bounce, 0)) as _is_bounce', + 'MAX(COALESCE(s.duration, 0)) as _duration', + ); + ctes.push(`session_metrics AS ( + SELECT ${sessionMetricBaseSelect.join(', ')} + FROM filtered_events fe + LEFT JOIN sessions s + ON s.project_id = ? AND s.session_id = fe.session_id + WHERE fe.session_id IS NOT NULL + GROUP BY ${group_by.length > 0 ? `${group_by.map(field => `fe.${field}`).join(', ')}, ` : ''}fe.session_id + )`); + + const sessionMetricSelects = []; + if (group_by.length > 0) { + sessionMetricSelects.push(...group_by); + } + if (metrics.includes(METRICS.BOUNCE_RATE)) { + sessionMetricSelects.push('ROUND(AVG(CASE WHEN _is_bounce = 1 THEN 1.0 ELSE 0 END), 3) as bounce_rate'); + } + if (metrics.includes(METRICS.AVG_DURATION)) { + sessionMetricSelects.push('ROUND(AVG(_duration)) as avg_duration'); + } + ctes.push(`session_agg AS ( + SELECT ${sessionMetricSelects.join(', ')} + FROM session_metrics + ${group_by.length > 0 ? `GROUP BY ${group_by.join(', ')}` : ''} + )`); + + const finalSelectParts = []; + const finalParams = [...cteParams]; + + if (usesEventMetrics || group_by.length > 0) { + finalSelectParts.push(...group_by.map(field => `ea.${field} as ${field}`)); + if (metrics.includes(METRICS.EVENT_COUNT)) finalSelectParts.push('ea.event_count'); + if (metrics.includes(METRICS.UNIQUE_USERS)) finalSelectParts.push('ea.unique_users'); + if (metrics.includes(METRICS.SESSION_COUNT)) finalSelectParts.push('ea.session_count'); + if (metrics.includes(METRICS.BOUNCE_RATE)) finalSelectParts.push('COALESCE(sa.bounce_rate, 0) as bounce_rate'); + if (metrics.includes(METRICS.AVG_DURATION)) finalSelectParts.push('COALESCE(sa.avg_duration, 0) as avg_duration'); + + sql = `WITH ${ctes.join(', ')} + SELECT ${finalSelectParts.join(', ')} + FROM event_agg ea`; + if (group_by.length > 0) { + sql += ` LEFT JOIN session_agg sa ON ${group_by.map(field => `ea.${field} IS sa.${field}`).join(' AND ')}`; + } else { + sql += ' CROSS JOIN session_agg sa'; + } + } else { + if (metrics.includes(METRICS.BOUNCE_RATE)) { + finalSelectParts.push('COALESCE(bounce_rate, 0) as bounce_rate'); + } + if (metrics.includes(METRICS.AVG_DURATION)) { + finalSelectParts.push('COALESCE(avg_duration, 0) as avg_duration'); + } + sql = `WITH ${ctes.join(', ')} + SELECT ${finalSelectParts.join(', ')} + FROM session_agg`; + } + + params.length = 0; + params.push(...finalParams); + } + + const defaultOrder = group_by.includes(GROUP_BY_FIELDS.DATE) + ? GROUP_BY_FIELDS.DATE + : (metrics[0] || group_by[0] || METRICS.EVENT_COUNT); + const orderField = order_by && ALLOWED_ORDER_BY.includes(order_by) ? order_by : defaultOrder; + const orderDir = order === 'asc' ? 'ASC' : 'DESC'; + sql += ` ORDER BY ${orderField} ${orderDir}`; + + const maxLimit = Math.min(limit, MAX_LIMIT); + sql += ' LIMIT ?'; + params.push(maxLimit); + + const rows = await adapter._queryAll(sql, params); + return { + period: { from: fromDate, to: toDate }, + metrics, + group_by, + rows, + count: rows.length, + }; +} diff --git a/src/db/sqlite.js b/src/db/sqlite.js index 109a9d8..da92d7c 100644 --- a/src/db/sqlite.js +++ b/src/db/sqlite.js @@ -10,6 +10,13 @@ import { readFileSync } from 'node:fs'; import { resolve, dirname } from 'node:path'; import { fileURLToPath } from 'node:url'; import { BaseAdapter } from '@agent-analytics/core/base-adapter'; +import { ulid } from '@agent-analytics/core/ulid'; +import { + buildEventInsertStatement, + buildIdentifyStatements, + buildSessionUpsertStatement, +} from './identity-aware.js'; +import { queryWithSessionMetrics } from './query.js'; const __dirname = dirname(fileURLToPath(import.meta.url)); @@ -25,6 +32,14 @@ export class SqliteAdapter extends BaseAdapter { const schemaPath = resolve(__dirname, '../../schema.sql'); const schema = readFileSync(schemaPath, 'utf-8'); this.db.exec(schema); + this._migrateLegacySchema(); + } + + _migrateLegacySchema() { + const eventColumns = this.db.prepare('PRAGMA table_info(events)').all(); + if (eventColumns.length > 0 && !eventColumns.some(column => column.name === 'country')) { + this.db.exec('ALTER TABLE events ADD COLUMN country TEXT'); + } } _run(sql, params) { @@ -47,4 +62,63 @@ export class SqliteAdapter extends BaseAdapter { }); txn(statements); } + + _sessionUpsertSqlAndParams(project, eventData) { + return buildSessionUpsertStatement({ + project, + session_id: eventData.session_id, + user_id: eventData.user_id, + timestamp: eventData.timestamp, + properties: eventData.properties, + count: eventData._count || 1, + }); + } + + async trackEvent(eventData) { + const eventStatement = buildEventInsertStatement({ + id: ulid(), + ...eventData, + }); + + if (!eventData.session_id) { + return this._run(eventStatement.sql, eventStatement.params); + } + + const sessionStatement = this._sessionUpsertSqlAndParams(eventData.project, eventData); + return this._batch([eventStatement, sessionStatement]); + } + + async trackBatch(events) { + const statements = []; + + for (const event of events) { + statements.push(buildEventInsertStatement({ + id: ulid(), + ...event, + })); + } + + for (const event of events) { + if (!event.session_id) continue; + statements.push(this._sessionUpsertSqlAndParams(event.project, event)); + } + + return this._batch(statements); + } + + async upsertSession(sessionData) { + const statement = this._sessionUpsertSqlAndParams( + sessionData.project_id || sessionData.project, + sessionData, + ); + return this._run(statement.sql, statement.params); + } + + async identifyUser(identityData) { + return this._batch(buildIdentifyStatements(identityData)); + } + + async query(args) { + return queryWithSessionMetrics(this, args); + } } diff --git a/src/platforms/cloudflare.js b/src/platforms/cloudflare.js index c3dc0f6..10b9df2 100644 --- a/src/platforms/cloudflare.js +++ b/src/platforms/cloudflare.js @@ -7,11 +7,13 @@ * For the multi-tenant hosted product, see hosted/entry.js. */ -import { createAnalyticsHandler, D1Adapter } from '@agent-analytics/core'; +import { createAnalyticsHandler } from '@agent-analytics/core'; import { makeValidateWrite, makeValidateRead } from '../auth.js'; +import { D1Adapter, ensureD1Compatibility } from '../db/d1.js'; export default { async fetch(request, env, ctx) { + await ensureD1Compatibility(env.DB); const db = new D1Adapter(env.DB); const validateWrite = makeValidateWrite(env.PROJECT_TOKENS); const validateRead = makeValidateRead(env.API_KEYS);