diff --git a/packages/stats/core/src/domain/geo.ts b/packages/stats/core/src/domain/geo.ts index 99c7bcc4a..b75e08a34 100644 --- a/packages/stats/core/src/domain/geo.ts +++ b/packages/stats/core/src/domain/geo.ts @@ -8,6 +8,8 @@ import { chunks, collapseRows, inserted, + isMissingUniqueUsersColumn, + omitUniqueUsers, rankRowsWithMarketShare, statPeriodKey, statRowScope, @@ -136,49 +138,59 @@ export class GeoStatRepo extends Context.Service Effect.tryPromise({ - try: () => - db - .insert(geoStat) - .values(chunk) - .onDuplicateKeyUpdate({ - set: { - continent: inserted("continent"), - sessions: inserted("sessions"), - requests: inserted("requests"), - unique_users: inserted("unique_users"), - input_tokens: inserted("input_tokens"), - output_tokens: inserted("output_tokens"), - reasoning_tokens: inserted("reasoning_tokens"), - cache_read_tokens: inserted("cache_read_tokens"), - total_tokens: inserted("total_tokens"), - input_cost_microcents: inserted("input_cost_microcents"), - output_cost_microcents: inserted("output_cost_microcents"), - total_cost_microcents: inserted("total_cost_microcents"), - avg_duration_ms: inserted("avg_duration_ms"), - p50_duration_ms: inserted("p50_duration_ms"), - p95_duration_ms: inserted("p95_duration_ms"), - avg_ttfb_ms: inserted("avg_ttfb_ms"), - p50_ttfb_ms: inserted("p50_ttfb_ms"), - p95_ttfb_ms: inserted("p95_ttfb_ms"), - avg_output_tps: inserted("avg_output_tps"), - success_count: inserted("success_count"), - error_count: inserted("error_count"), - sample_count: inserted("sample_count"), - market_share_tokens: inserted("market_share_tokens"), - market_share_requests: inserted("market_share_requests"), - market_share_sessions: inserted("market_share_sessions"), - rank_by_tokens: inserted("rank_by_tokens"), - rank_by_requests: inserted("rank_by_requests"), - rank_by_sessions: inserted("rank_by_sessions"), - rank_by_cost: inserted("rank_by_cost"), - }, - }), + try: async () => { + try { + return await upsertGeoChunk(chunk, true) + } catch (cause) { + if (!isMissingUniqueUsersColumn(cause)) throw cause + return upsertGeoChunk(chunk, false) + } + }, catch: (cause) => DatabaseError.make({ cause }), }), { discard: true }, ) }) + function upsertGeoChunk(chunk: GeoStatRow[], includeUniqueUsers: boolean) { + return db + .insert(geoStat) + .values(includeUniqueUsers ? chunk : omitUniqueUsers(chunk)) + .onDuplicateKeyUpdate({ + set: { + continent: inserted("continent"), + sessions: inserted("sessions"), + requests: inserted("requests"), + ...(includeUniqueUsers ? { unique_users: inserted("unique_users") } : {}), + input_tokens: inserted("input_tokens"), + output_tokens: inserted("output_tokens"), + reasoning_tokens: inserted("reasoning_tokens"), + cache_read_tokens: inserted("cache_read_tokens"), + total_tokens: inserted("total_tokens"), + input_cost_microcents: inserted("input_cost_microcents"), + output_cost_microcents: inserted("output_cost_microcents"), + total_cost_microcents: inserted("total_cost_microcents"), + avg_duration_ms: inserted("avg_duration_ms"), + p50_duration_ms: inserted("p50_duration_ms"), + p95_duration_ms: inserted("p95_duration_ms"), + avg_ttfb_ms: inserted("avg_ttfb_ms"), + p50_ttfb_ms: inserted("p50_ttfb_ms"), + p95_ttfb_ms: inserted("p95_ttfb_ms"), + avg_output_tps: inserted("avg_output_tps"), + success_count: inserted("success_count"), + error_count: inserted("error_count"), + sample_count: inserted("sample_count"), + market_share_tokens: inserted("market_share_tokens"), + market_share_requests: inserted("market_share_requests"), + market_share_sessions: inserted("market_share_sessions"), + rank_by_tokens: inserted("rank_by_tokens"), + rank_by_requests: inserted("rank_by_requests"), + rank_by_sessions: inserted("rank_by_sessions"), + rank_by_cost: inserted("rank_by_cost"), + }, + }) + } + const deleteRetiredDimensions = Effect.fn("GeoStatRepo.deleteRetiredDimensions")(function* (rows: GeoStatRow[]) { const scope = statRowScope(rows) if (!scope) return diff --git a/packages/stats/core/src/domain/model.ts b/packages/stats/core/src/domain/model.ts index 912ee6290..5cdee912b 100644 --- a/packages/stats/core/src/domain/model.ts +++ b/packages/stats/core/src/domain/model.ts @@ -8,6 +8,8 @@ import { chunks, collapseRows, inserted, + isMissingUniqueUsersColumn, + omitUniqueUsers, rankBy, statPeriodKey, statRowScope, @@ -56,35 +58,55 @@ export class ModelStatRepo extends Context.Service - db - .select({ - periodKey: modelStat.period_key, - updatedAt: modelStat.updated_at, - tier: modelStat.tier, - provider: modelStat.provider, - model: modelStat.model, - sessions: modelStat.sessions, - uniqueUsers: modelStat.unique_users, - inputTokens: modelStat.input_tokens, - outputTokens: modelStat.output_tokens, - reasoningTokens: modelStat.reasoning_tokens, - cacheReadTokens: modelStat.cache_read_tokens, - totalTokens: modelStat.total_tokens, - inputCostMicrocents: modelStat.input_cost_microcents, - outputCostMicrocents: modelStat.output_cost_microcents, - totalCostMicrocents: modelStat.total_cost_microcents, - }) - .from(modelStat) - .where( - and( - eq(modelStat.grain, "day"), - eq(modelStat.client, "all"), - eq(modelStat.source, "all"), - inArray(modelStat.tier, ["Go", "go"]), - ), - ) - .orderBy(asc(modelStat.period_key)), + try: async () => { + try { + return await db + .select({ + periodKey: modelStat.period_key, + updatedAt: modelStat.updated_at, + tier: modelStat.tier, + provider: modelStat.provider, + model: modelStat.model, + sessions: modelStat.sessions, + uniqueUsers: modelStat.unique_users, + inputTokens: modelStat.input_tokens, + outputTokens: modelStat.output_tokens, + reasoningTokens: modelStat.reasoning_tokens, + cacheReadTokens: modelStat.cache_read_tokens, + totalTokens: modelStat.total_tokens, + inputCostMicrocents: modelStat.input_cost_microcents, + outputCostMicrocents: modelStat.output_cost_microcents, + totalCostMicrocents: modelStat.total_cost_microcents, + }) + .from(modelStat) + .where(modelDailyScope()) + .orderBy(asc(modelStat.period_key)) + } catch (cause) { + if (!isMissingUniqueUsersColumn(cause)) throw cause + return ( + await db + .select({ + periodKey: modelStat.period_key, + updatedAt: modelStat.updated_at, + tier: modelStat.tier, + provider: modelStat.provider, + model: modelStat.model, + sessions: modelStat.sessions, + inputTokens: modelStat.input_tokens, + outputTokens: modelStat.output_tokens, + reasoningTokens: modelStat.reasoning_tokens, + cacheReadTokens: modelStat.cache_read_tokens, + totalTokens: modelStat.total_tokens, + inputCostMicrocents: modelStat.input_cost_microcents, + outputCostMicrocents: modelStat.output_cost_microcents, + totalCostMicrocents: modelStat.total_cost_microcents, + }) + .from(modelStat) + .where(modelDailyScope()) + .orderBy(asc(modelStat.period_key)) + ).map((row) => ({ ...row, uniqueUsers: 0 })) + } + }, catch: (cause) => DatabaseError.make({ cause }), }) }) @@ -94,45 +116,55 @@ export class ModelStatRepo extends Context.Service Effect.tryPromise({ - try: () => - db - .insert(modelStat) - .values(chunk) - .onDuplicateKeyUpdate({ - set: { - provider_model: inserted("provider_model"), - sessions: inserted("sessions"), - requests: inserted("requests"), - unique_users: inserted("unique_users"), - input_tokens: inserted("input_tokens"), - output_tokens: inserted("output_tokens"), - reasoning_tokens: inserted("reasoning_tokens"), - cache_read_tokens: inserted("cache_read_tokens"), - total_tokens: inserted("total_tokens"), - input_cost_microcents: inserted("input_cost_microcents"), - output_cost_microcents: inserted("output_cost_microcents"), - total_cost_microcents: inserted("total_cost_microcents"), - avg_duration_ms: inserted("avg_duration_ms"), - p50_duration_ms: inserted("p50_duration_ms"), - p95_duration_ms: inserted("p95_duration_ms"), - avg_ttfb_ms: inserted("avg_ttfb_ms"), - p50_ttfb_ms: inserted("p50_ttfb_ms"), - p95_ttfb_ms: inserted("p95_ttfb_ms"), - avg_output_tps: inserted("avg_output_tps"), - success_count: inserted("success_count"), - error_count: inserted("error_count"), - sample_count: inserted("sample_count"), - rank_by_tokens: inserted("rank_by_tokens"), - rank_by_requests: inserted("rank_by_requests"), - rank_by_cost: inserted("rank_by_cost"), - }, - }), + try: async () => { + try { + return await upsertModelChunk(chunk, true) + } catch (cause) { + if (!isMissingUniqueUsersColumn(cause)) throw cause + return upsertModelChunk(chunk, false) + } + }, catch: (cause) => DatabaseError.make({ cause }), }), { discard: true }, ) }) + function upsertModelChunk(chunk: ModelStatRow[], includeUniqueUsers: boolean) { + return db + .insert(modelStat) + .values(includeUniqueUsers ? chunk : omitUniqueUsers(chunk)) + .onDuplicateKeyUpdate({ + set: { + provider_model: inserted("provider_model"), + sessions: inserted("sessions"), + requests: inserted("requests"), + ...(includeUniqueUsers ? { unique_users: inserted("unique_users") } : {}), + input_tokens: inserted("input_tokens"), + output_tokens: inserted("output_tokens"), + reasoning_tokens: inserted("reasoning_tokens"), + cache_read_tokens: inserted("cache_read_tokens"), + total_tokens: inserted("total_tokens"), + input_cost_microcents: inserted("input_cost_microcents"), + output_cost_microcents: inserted("output_cost_microcents"), + total_cost_microcents: inserted("total_cost_microcents"), + avg_duration_ms: inserted("avg_duration_ms"), + p50_duration_ms: inserted("p50_duration_ms"), + p95_duration_ms: inserted("p95_duration_ms"), + avg_ttfb_ms: inserted("avg_ttfb_ms"), + p50_ttfb_ms: inserted("p50_ttfb_ms"), + p95_ttfb_ms: inserted("p95_ttfb_ms"), + avg_output_tps: inserted("avg_output_tps"), + success_count: inserted("success_count"), + error_count: inserted("error_count"), + sample_count: inserted("sample_count"), + rank_by_tokens: inserted("rank_by_tokens"), + rank_by_requests: inserted("rank_by_requests"), + rank_by_cost: inserted("rank_by_cost"), + }, + }) + } + const deleteRetiredDimensions = Effect.fn("ModelStatRepo.deleteRetiredDimensions")(function* ( rows: ModelStatRow[], ) { @@ -165,6 +197,15 @@ export class ModelStatRepo extends Context.Service Effect.tryPromise({ - try: () => - db - .insert(providerStat) - .values(chunk) - .onDuplicateKeyUpdate({ - set: { - sessions: inserted("sessions"), - requests: inserted("requests"), - unique_users: inserted("unique_users"), - input_tokens: inserted("input_tokens"), - output_tokens: inserted("output_tokens"), - reasoning_tokens: inserted("reasoning_tokens"), - cache_read_tokens: inserted("cache_read_tokens"), - total_tokens: inserted("total_tokens"), - input_cost_microcents: inserted("input_cost_microcents"), - output_cost_microcents: inserted("output_cost_microcents"), - total_cost_microcents: inserted("total_cost_microcents"), - avg_duration_ms: inserted("avg_duration_ms"), - p50_duration_ms: inserted("p50_duration_ms"), - p95_duration_ms: inserted("p95_duration_ms"), - avg_ttfb_ms: inserted("avg_ttfb_ms"), - p50_ttfb_ms: inserted("p50_ttfb_ms"), - p95_ttfb_ms: inserted("p95_ttfb_ms"), - avg_output_tps: inserted("avg_output_tps"), - success_count: inserted("success_count"), - error_count: inserted("error_count"), - sample_count: inserted("sample_count"), - market_share_tokens: inserted("market_share_tokens"), - market_share_requests: inserted("market_share_requests"), - market_share_sessions: inserted("market_share_sessions"), - rank_by_tokens: inserted("rank_by_tokens"), - rank_by_requests: inserted("rank_by_requests"), - rank_by_sessions: inserted("rank_by_sessions"), - rank_by_cost: inserted("rank_by_cost"), - }, - }), + try: async () => { + try { + return await upsertProviderChunk(chunk, true) + } catch (cause) { + if (!isMissingUniqueUsersColumn(cause)) throw cause + return upsertProviderChunk(chunk, false) + } + }, catch: (cause) => DatabaseError.make({ cause }), }), { discard: true }, ) }) + function upsertProviderChunk(chunk: ProviderStatRow[], includeUniqueUsers: boolean) { + return db + .insert(providerStat) + .values(includeUniqueUsers ? chunk : omitUniqueUsers(chunk)) + .onDuplicateKeyUpdate({ + set: { + sessions: inserted("sessions"), + requests: inserted("requests"), + ...(includeUniqueUsers ? { unique_users: inserted("unique_users") } : {}), + input_tokens: inserted("input_tokens"), + output_tokens: inserted("output_tokens"), + reasoning_tokens: inserted("reasoning_tokens"), + cache_read_tokens: inserted("cache_read_tokens"), + total_tokens: inserted("total_tokens"), + input_cost_microcents: inserted("input_cost_microcents"), + output_cost_microcents: inserted("output_cost_microcents"), + total_cost_microcents: inserted("total_cost_microcents"), + avg_duration_ms: inserted("avg_duration_ms"), + p50_duration_ms: inserted("p50_duration_ms"), + p95_duration_ms: inserted("p95_duration_ms"), + avg_ttfb_ms: inserted("avg_ttfb_ms"), + p50_ttfb_ms: inserted("p50_ttfb_ms"), + p95_ttfb_ms: inserted("p95_ttfb_ms"), + avg_output_tps: inserted("avg_output_tps"), + success_count: inserted("success_count"), + error_count: inserted("error_count"), + sample_count: inserted("sample_count"), + market_share_tokens: inserted("market_share_tokens"), + market_share_requests: inserted("market_share_requests"), + market_share_sessions: inserted("market_share_sessions"), + rank_by_tokens: inserted("rank_by_tokens"), + rank_by_requests: inserted("rank_by_requests"), + rank_by_sessions: inserted("rank_by_sessions"), + rank_by_cost: inserted("rank_by_cost"), + }, + }) + } + const deleteRetiredDimensions = Effect.fn("ProviderStatRepo.deleteRetiredDimensions")(function* ( rows: ProviderStatRow[], ) { diff --git a/packages/stats/core/src/domain/stat.ts b/packages/stats/core/src/domain/stat.ts index 356f7403a..553c91b9a 100644 --- a/packages/stats/core/src/domain/stat.ts +++ b/packages/stats/core/src/domain/stat.ts @@ -147,6 +147,18 @@ export function combineRows(left: T, right: T): T { } } +export function isMissingUniqueUsersColumn(cause: unknown): boolean { + return errorText(cause).includes("Unknown column 'unique_users'") +} + +export function omitUniqueUsers(rows: T[]) { + return rows.map((row) => { + const result = { ...row } + delete result.unique_users + return result + }) +} + export function statPeriodKey(row: StatBaseRow) { return [row.grain, row.period_key, row.dataset, row.tier, row.client, row.source].join("\u0000") } @@ -242,6 +254,15 @@ export function inserted(column: string) { return sql.raw(`values(\`${column}\`)`) } +function errorText(cause: unknown): string { + if (cause instanceof Error) return `${cause.message} ${errorText((cause as { cause?: unknown }).cause)}` + if (typeof cause === "object" && cause) + return Object.values(cause as Record) + .map(errorText) + .join(" ") + return String(cause) +} + export function weightedAverage( left: number | null | undefined, leftWeight = 0,