fix(stats): tolerate pending user column

This commit is contained in:
Adam 2026-06-20 15:56:36 -05:00
parent 1c76587ce2
commit 503309d244
No known key found for this signature in database
GPG Key ID: 9CB48779AF150E75
4 changed files with 221 additions and 135 deletions

View File

@ -8,6 +8,8 @@ import {
chunks,
collapseRows,
inserted,
isMissingUniqueUsersColumn,
omitUniqueUsers,
rankRowsWithMarketShare,
statPeriodKey,
statRowScope,
@ -136,49 +138,59 @@ export class GeoStatRepo extends Context.Service<GeoStatRepo, GeoStatRepo.Servic
chunks(rows, UPSERT_CHUNK_SIZE),
(chunk) =>
Effect.tryPromise({
try: () =>
db
.insert(geoStat)
.values(chunk)
.onDuplicateKeyUpdate({
set: {
continent: inserted("continent"),
sessions: inserted("sessions"),
requests: inserted("requests"),
unique_users: inserted("unique_users"),
input_tokens: inserted("input_tokens"),
output_tokens: inserted("output_tokens"),
reasoning_tokens: inserted("reasoning_tokens"),
cache_read_tokens: inserted("cache_read_tokens"),
total_tokens: inserted("total_tokens"),
input_cost_microcents: inserted("input_cost_microcents"),
output_cost_microcents: inserted("output_cost_microcents"),
total_cost_microcents: inserted("total_cost_microcents"),
avg_duration_ms: inserted("avg_duration_ms"),
p50_duration_ms: inserted("p50_duration_ms"),
p95_duration_ms: inserted("p95_duration_ms"),
avg_ttfb_ms: inserted("avg_ttfb_ms"),
p50_ttfb_ms: inserted("p50_ttfb_ms"),
p95_ttfb_ms: inserted("p95_ttfb_ms"),
avg_output_tps: inserted("avg_output_tps"),
success_count: inserted("success_count"),
error_count: inserted("error_count"),
sample_count: inserted("sample_count"),
market_share_tokens: inserted("market_share_tokens"),
market_share_requests: inserted("market_share_requests"),
market_share_sessions: inserted("market_share_sessions"),
rank_by_tokens: inserted("rank_by_tokens"),
rank_by_requests: inserted("rank_by_requests"),
rank_by_sessions: inserted("rank_by_sessions"),
rank_by_cost: inserted("rank_by_cost"),
},
}),
try: async () => {
try {
return await upsertGeoChunk(chunk, true)
} catch (cause) {
if (!isMissingUniqueUsersColumn(cause)) throw cause
return upsertGeoChunk(chunk, false)
}
},
catch: (cause) => DatabaseError.make({ cause }),
}),
{ discard: true },
)
})
function upsertGeoChunk(chunk: GeoStatRow[], includeUniqueUsers: boolean) {
return db
.insert(geoStat)
.values(includeUniqueUsers ? chunk : omitUniqueUsers(chunk))
.onDuplicateKeyUpdate({
set: {
continent: inserted("continent"),
sessions: inserted("sessions"),
requests: inserted("requests"),
...(includeUniqueUsers ? { unique_users: inserted("unique_users") } : {}),
input_tokens: inserted("input_tokens"),
output_tokens: inserted("output_tokens"),
reasoning_tokens: inserted("reasoning_tokens"),
cache_read_tokens: inserted("cache_read_tokens"),
total_tokens: inserted("total_tokens"),
input_cost_microcents: inserted("input_cost_microcents"),
output_cost_microcents: inserted("output_cost_microcents"),
total_cost_microcents: inserted("total_cost_microcents"),
avg_duration_ms: inserted("avg_duration_ms"),
p50_duration_ms: inserted("p50_duration_ms"),
p95_duration_ms: inserted("p95_duration_ms"),
avg_ttfb_ms: inserted("avg_ttfb_ms"),
p50_ttfb_ms: inserted("p50_ttfb_ms"),
p95_ttfb_ms: inserted("p95_ttfb_ms"),
avg_output_tps: inserted("avg_output_tps"),
success_count: inserted("success_count"),
error_count: inserted("error_count"),
sample_count: inserted("sample_count"),
market_share_tokens: inserted("market_share_tokens"),
market_share_requests: inserted("market_share_requests"),
market_share_sessions: inserted("market_share_sessions"),
rank_by_tokens: inserted("rank_by_tokens"),
rank_by_requests: inserted("rank_by_requests"),
rank_by_sessions: inserted("rank_by_sessions"),
rank_by_cost: inserted("rank_by_cost"),
},
})
}
const deleteRetiredDimensions = Effect.fn("GeoStatRepo.deleteRetiredDimensions")(function* (rows: GeoStatRow[]) {
const scope = statRowScope(rows)
if (!scope) return

View File

@ -8,6 +8,8 @@ import {
chunks,
collapseRows,
inserted,
isMissingUniqueUsersColumn,
omitUniqueUsers,
rankBy,
statPeriodKey,
statRowScope,
@ -56,35 +58,55 @@ export class ModelStatRepo extends Context.Service<ModelStatRepo, ModelStatRepo.
const listDaily = Effect.fn("ModelStatRepo.listDaily")(function* () {
return yield* Effect.tryPromise({
try: () =>
db
.select({
periodKey: modelStat.period_key,
updatedAt: modelStat.updated_at,
tier: modelStat.tier,
provider: modelStat.provider,
model: modelStat.model,
sessions: modelStat.sessions,
uniqueUsers: modelStat.unique_users,
inputTokens: modelStat.input_tokens,
outputTokens: modelStat.output_tokens,
reasoningTokens: modelStat.reasoning_tokens,
cacheReadTokens: modelStat.cache_read_tokens,
totalTokens: modelStat.total_tokens,
inputCostMicrocents: modelStat.input_cost_microcents,
outputCostMicrocents: modelStat.output_cost_microcents,
totalCostMicrocents: modelStat.total_cost_microcents,
})
.from(modelStat)
.where(
and(
eq(modelStat.grain, "day"),
eq(modelStat.client, "all"),
eq(modelStat.source, "all"),
inArray(modelStat.tier, ["Go", "go"]),
),
)
.orderBy(asc(modelStat.period_key)),
try: async () => {
try {
return await db
.select({
periodKey: modelStat.period_key,
updatedAt: modelStat.updated_at,
tier: modelStat.tier,
provider: modelStat.provider,
model: modelStat.model,
sessions: modelStat.sessions,
uniqueUsers: modelStat.unique_users,
inputTokens: modelStat.input_tokens,
outputTokens: modelStat.output_tokens,
reasoningTokens: modelStat.reasoning_tokens,
cacheReadTokens: modelStat.cache_read_tokens,
totalTokens: modelStat.total_tokens,
inputCostMicrocents: modelStat.input_cost_microcents,
outputCostMicrocents: modelStat.output_cost_microcents,
totalCostMicrocents: modelStat.total_cost_microcents,
})
.from(modelStat)
.where(modelDailyScope())
.orderBy(asc(modelStat.period_key))
} catch (cause) {
if (!isMissingUniqueUsersColumn(cause)) throw cause
return (
await db
.select({
periodKey: modelStat.period_key,
updatedAt: modelStat.updated_at,
tier: modelStat.tier,
provider: modelStat.provider,
model: modelStat.model,
sessions: modelStat.sessions,
inputTokens: modelStat.input_tokens,
outputTokens: modelStat.output_tokens,
reasoningTokens: modelStat.reasoning_tokens,
cacheReadTokens: modelStat.cache_read_tokens,
totalTokens: modelStat.total_tokens,
inputCostMicrocents: modelStat.input_cost_microcents,
outputCostMicrocents: modelStat.output_cost_microcents,
totalCostMicrocents: modelStat.total_cost_microcents,
})
.from(modelStat)
.where(modelDailyScope())
.orderBy(asc(modelStat.period_key))
).map((row) => ({ ...row, uniqueUsers: 0 }))
}
},
catch: (cause) => DatabaseError.make({ cause }),
})
})
@ -94,45 +116,55 @@ export class ModelStatRepo extends Context.Service<ModelStatRepo, ModelStatRepo.
chunks(rows, UPSERT_CHUNK_SIZE),
(chunk) =>
Effect.tryPromise({
try: () =>
db
.insert(modelStat)
.values(chunk)
.onDuplicateKeyUpdate({
set: {
provider_model: inserted("provider_model"),
sessions: inserted("sessions"),
requests: inserted("requests"),
unique_users: inserted("unique_users"),
input_tokens: inserted("input_tokens"),
output_tokens: inserted("output_tokens"),
reasoning_tokens: inserted("reasoning_tokens"),
cache_read_tokens: inserted("cache_read_tokens"),
total_tokens: inserted("total_tokens"),
input_cost_microcents: inserted("input_cost_microcents"),
output_cost_microcents: inserted("output_cost_microcents"),
total_cost_microcents: inserted("total_cost_microcents"),
avg_duration_ms: inserted("avg_duration_ms"),
p50_duration_ms: inserted("p50_duration_ms"),
p95_duration_ms: inserted("p95_duration_ms"),
avg_ttfb_ms: inserted("avg_ttfb_ms"),
p50_ttfb_ms: inserted("p50_ttfb_ms"),
p95_ttfb_ms: inserted("p95_ttfb_ms"),
avg_output_tps: inserted("avg_output_tps"),
success_count: inserted("success_count"),
error_count: inserted("error_count"),
sample_count: inserted("sample_count"),
rank_by_tokens: inserted("rank_by_tokens"),
rank_by_requests: inserted("rank_by_requests"),
rank_by_cost: inserted("rank_by_cost"),
},
}),
try: async () => {
try {
return await upsertModelChunk(chunk, true)
} catch (cause) {
if (!isMissingUniqueUsersColumn(cause)) throw cause
return upsertModelChunk(chunk, false)
}
},
catch: (cause) => DatabaseError.make({ cause }),
}),
{ discard: true },
)
})
function upsertModelChunk(chunk: ModelStatRow[], includeUniqueUsers: boolean) {
return db
.insert(modelStat)
.values(includeUniqueUsers ? chunk : omitUniqueUsers(chunk))
.onDuplicateKeyUpdate({
set: {
provider_model: inserted("provider_model"),
sessions: inserted("sessions"),
requests: inserted("requests"),
...(includeUniqueUsers ? { unique_users: inserted("unique_users") } : {}),
input_tokens: inserted("input_tokens"),
output_tokens: inserted("output_tokens"),
reasoning_tokens: inserted("reasoning_tokens"),
cache_read_tokens: inserted("cache_read_tokens"),
total_tokens: inserted("total_tokens"),
input_cost_microcents: inserted("input_cost_microcents"),
output_cost_microcents: inserted("output_cost_microcents"),
total_cost_microcents: inserted("total_cost_microcents"),
avg_duration_ms: inserted("avg_duration_ms"),
p50_duration_ms: inserted("p50_duration_ms"),
p95_duration_ms: inserted("p95_duration_ms"),
avg_ttfb_ms: inserted("avg_ttfb_ms"),
p50_ttfb_ms: inserted("p50_ttfb_ms"),
p95_ttfb_ms: inserted("p95_ttfb_ms"),
avg_output_tps: inserted("avg_output_tps"),
success_count: inserted("success_count"),
error_count: inserted("error_count"),
sample_count: inserted("sample_count"),
rank_by_tokens: inserted("rank_by_tokens"),
rank_by_requests: inserted("rank_by_requests"),
rank_by_cost: inserted("rank_by_cost"),
},
})
}
const deleteRetiredDimensions = Effect.fn("ModelStatRepo.deleteRetiredDimensions")(function* (
rows: ModelStatRow[],
) {
@ -165,6 +197,15 @@ export class ModelStatRepo extends Context.Service<ModelStatRepo, ModelStatRepo.
)
}
function modelDailyScope() {
return and(
eq(modelStat.grain, "day"),
eq(modelStat.client, "all"),
eq(modelStat.source, "all"),
inArray(modelStat.tier, ["Go", "go"]),
)
}
export function rowsFromAggregates(aggregates: ModelStatAggregate[]) {
return rankRows([
...synthesizeAllTierRows(

View File

@ -8,6 +8,8 @@ import {
chunks,
collapseRows,
inserted,
isMissingUniqueUsersColumn,
omitUniqueUsers,
rankRowsWithMarketShare,
statRowScope,
synthesizeAllTierRows,
@ -107,48 +109,58 @@ export class ProviderStatRepo extends Context.Service<ProviderStatRepo, Provider
chunks(rows, UPSERT_CHUNK_SIZE),
(chunk) =>
Effect.tryPromise({
try: () =>
db
.insert(providerStat)
.values(chunk)
.onDuplicateKeyUpdate({
set: {
sessions: inserted("sessions"),
requests: inserted("requests"),
unique_users: inserted("unique_users"),
input_tokens: inserted("input_tokens"),
output_tokens: inserted("output_tokens"),
reasoning_tokens: inserted("reasoning_tokens"),
cache_read_tokens: inserted("cache_read_tokens"),
total_tokens: inserted("total_tokens"),
input_cost_microcents: inserted("input_cost_microcents"),
output_cost_microcents: inserted("output_cost_microcents"),
total_cost_microcents: inserted("total_cost_microcents"),
avg_duration_ms: inserted("avg_duration_ms"),
p50_duration_ms: inserted("p50_duration_ms"),
p95_duration_ms: inserted("p95_duration_ms"),
avg_ttfb_ms: inserted("avg_ttfb_ms"),
p50_ttfb_ms: inserted("p50_ttfb_ms"),
p95_ttfb_ms: inserted("p95_ttfb_ms"),
avg_output_tps: inserted("avg_output_tps"),
success_count: inserted("success_count"),
error_count: inserted("error_count"),
sample_count: inserted("sample_count"),
market_share_tokens: inserted("market_share_tokens"),
market_share_requests: inserted("market_share_requests"),
market_share_sessions: inserted("market_share_sessions"),
rank_by_tokens: inserted("rank_by_tokens"),
rank_by_requests: inserted("rank_by_requests"),
rank_by_sessions: inserted("rank_by_sessions"),
rank_by_cost: inserted("rank_by_cost"),
},
}),
try: async () => {
try {
return await upsertProviderChunk(chunk, true)
} catch (cause) {
if (!isMissingUniqueUsersColumn(cause)) throw cause
return upsertProviderChunk(chunk, false)
}
},
catch: (cause) => DatabaseError.make({ cause }),
}),
{ discard: true },
)
})
function upsertProviderChunk(chunk: ProviderStatRow[], includeUniqueUsers: boolean) {
return db
.insert(providerStat)
.values(includeUniqueUsers ? chunk : omitUniqueUsers(chunk))
.onDuplicateKeyUpdate({
set: {
sessions: inserted("sessions"),
requests: inserted("requests"),
...(includeUniqueUsers ? { unique_users: inserted("unique_users") } : {}),
input_tokens: inserted("input_tokens"),
output_tokens: inserted("output_tokens"),
reasoning_tokens: inserted("reasoning_tokens"),
cache_read_tokens: inserted("cache_read_tokens"),
total_tokens: inserted("total_tokens"),
input_cost_microcents: inserted("input_cost_microcents"),
output_cost_microcents: inserted("output_cost_microcents"),
total_cost_microcents: inserted("total_cost_microcents"),
avg_duration_ms: inserted("avg_duration_ms"),
p50_duration_ms: inserted("p50_duration_ms"),
p95_duration_ms: inserted("p95_duration_ms"),
avg_ttfb_ms: inserted("avg_ttfb_ms"),
p50_ttfb_ms: inserted("p50_ttfb_ms"),
p95_ttfb_ms: inserted("p95_ttfb_ms"),
avg_output_tps: inserted("avg_output_tps"),
success_count: inserted("success_count"),
error_count: inserted("error_count"),
sample_count: inserted("sample_count"),
market_share_tokens: inserted("market_share_tokens"),
market_share_requests: inserted("market_share_requests"),
market_share_sessions: inserted("market_share_sessions"),
rank_by_tokens: inserted("rank_by_tokens"),
rank_by_requests: inserted("rank_by_requests"),
rank_by_sessions: inserted("rank_by_sessions"),
rank_by_cost: inserted("rank_by_cost"),
},
})
}
const deleteRetiredDimensions = Effect.fn("ProviderStatRepo.deleteRetiredDimensions")(function* (
rows: ProviderStatRow[],
) {

View File

@ -147,6 +147,18 @@ export function combineRows<T extends StatBaseRow>(left: T, right: T): T {
}
}
export function isMissingUniqueUsersColumn(cause: unknown): boolean {
return errorText(cause).includes("Unknown column 'unique_users'")
}
export function omitUniqueUsers<T extends { unique_users?: number }>(rows: T[]) {
return rows.map((row) => {
const result = { ...row }
delete result.unique_users
return result
})
}
export function statPeriodKey(row: StatBaseRow) {
return [row.grain, row.period_key, row.dataset, row.tier, row.client, row.source].join("\u0000")
}
@ -242,6 +254,15 @@ export function inserted(column: string) {
return sql.raw(`values(\`${column}\`)`)
}
function errorText(cause: unknown): string {
if (cause instanceof Error) return `${cause.message} ${errorText((cause as { cause?: unknown }).cause)}`
if (typeof cause === "object" && cause)
return Object.values(cause as Record<string, unknown>)
.map(errorText)
.join(" ")
return String(cause)
}
export function weightedAverage(
left: number | null | undefined,
leftWeight = 0,