fix(stats): clean retired provider rows during sync (#30420)
This commit is contained in:
parent
b3da479a7c
commit
212ae3d25f
@ -1,14 +1,16 @@
|
||||
import { and, asc, eq } from "drizzle-orm"
|
||||
import { and, asc, eq, inArray, or } from "drizzle-orm"
|
||||
import { Effect, Layer } from "effect"
|
||||
import * as Context from "effect/Context"
|
||||
import { DatabaseError, DrizzleClient } from "../database"
|
||||
import { geoStat } from "../database/schema"
|
||||
import { RETIRED_STAT_MODELS, RETIRED_STAT_PROVIDERS } from "./model-normalization"
|
||||
import {
|
||||
chunks,
|
||||
collapseRows,
|
||||
inserted,
|
||||
rankRowsWithMarketShare,
|
||||
statPeriodKey,
|
||||
statRowScope,
|
||||
synthesizeAllTierRows,
|
||||
toStatBaseRow,
|
||||
UPSERT_CHUNK_SIZE,
|
||||
@ -47,6 +49,7 @@ export declare namespace GeoStatRepo {
|
||||
readonly model?: string
|
||||
}) => Effect.Effect<GeoStatRow[], DatabaseError>
|
||||
readonly upsert: (rows: GeoStatRow[]) => Effect.Effect<void, DatabaseError>
|
||||
readonly deleteRetiredDimensions: (rows: GeoStatRow[]) => Effect.Effect<void, DatabaseError>
|
||||
}
|
||||
}
|
||||
|
||||
@ -163,7 +166,29 @@ export class GeoStatRepo extends Context.Service<GeoStatRepo, GeoStatRepo.Servic
|
||||
)
|
||||
})
|
||||
|
||||
return GeoStatRepo.of({ listDaily, listByPeriod, upsert })
|
||||
const deleteRetiredDimensions = Effect.fn("GeoStatRepo.deleteRetiredDimensions")(function* (rows: GeoStatRow[]) {
|
||||
const scope = statRowScope(rows)
|
||||
if (!scope) return
|
||||
|
||||
yield* Effect.tryPromise({
|
||||
try: () =>
|
||||
db
|
||||
.delete(geoStat)
|
||||
.where(
|
||||
and(
|
||||
inArray(geoStat.grain, scope.grains),
|
||||
inArray(geoStat.period_key, scope.periodKeys),
|
||||
inArray(geoStat.dataset, scope.datasets),
|
||||
inArray(geoStat.client, scope.clients),
|
||||
inArray(geoStat.source, scope.sources),
|
||||
or(inArray(geoStat.provider, RETIRED_STAT_PROVIDERS), inArray(geoStat.model, RETIRED_STAT_MODELS)),
|
||||
),
|
||||
),
|
||||
catch: (cause) => DatabaseError.make({ cause }),
|
||||
})
|
||||
})
|
||||
|
||||
return GeoStatRepo.of({ listDaily, listByPeriod, upsert, deleteRetiredDimensions })
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
import { describe, expect, test } from "bun:test"
|
||||
import { toModelAggregate, toProviderAggregate } from "./inference"
|
||||
import { toGeoAggregate, toModelAggregate, toProviderAggregate } from "./inference"
|
||||
import { modelAuthor, normalizeInferenceModel, statModel, statProvider } from "./model-normalization"
|
||||
|
||||
describe("inference stat normalization", () => {
|
||||
@ -31,9 +31,11 @@ describe("inference stat normalization", () => {
|
||||
test("uses provider.model to resolve opencode route providers", () => {
|
||||
expect(statModel("big-pickle", "claude-sonnet-4-5")).toBe("claude-sonnet-4-5")
|
||||
expect(statModel("big-pickle", "gpt-5-free")).toBe("gpt-5")
|
||||
expect(statModel("big-pickle", "")).toBe("unknown")
|
||||
expect(statProvider("big-pickle", "claude-sonnet-4-5", "opencode")).toBe("anthropic")
|
||||
expect(statProvider("big-pickle", "gpt-5", "opencode")).toBe("openai")
|
||||
expect(statProvider("big-pickle", "", "opencode")).toBe("unknown")
|
||||
expect(statProvider("unknown", "", "custom-provider")).toBe("custom-provider")
|
||||
})
|
||||
|
||||
test("model aggregates prefer provider.model and use normalized model", () => {
|
||||
@ -65,6 +67,12 @@ describe("inference stat normalization", () => {
|
||||
expect(toProviderAggregate(aggregate("big-pickle", "opencode"))).toMatchObject([{ provider: "unknown" }])
|
||||
})
|
||||
|
||||
test("geo aggregates never keep opencode or big-pickle dimensions", () => {
|
||||
expect(toGeoAggregate({ ...aggregate("big-pickle", "opencode"), country: "US" })).toMatchObject([
|
||||
{ provider: "unknown", model: "unknown", country: "US" },
|
||||
])
|
||||
})
|
||||
|
||||
test("model aggregates use ISO week period keys", () => {
|
||||
expect(
|
||||
toModelAggregate({
|
||||
|
||||
@ -2,7 +2,13 @@ import { Resource } from "sst/resource"
|
||||
import type { AthenaData } from "../athena"
|
||||
import type { GeoStatAggregate } from "./geo"
|
||||
import type { ModelStatAggregate } from "./model"
|
||||
import { EXCLUDED_MODELS, MODEL_AUTHOR_RULES, statModel, statProvider } from "./model-normalization"
|
||||
import {
|
||||
EXCLUDED_MODELS,
|
||||
MODEL_AUTHOR_RULES,
|
||||
RETIRED_STAT_PROVIDERS,
|
||||
statModel,
|
||||
statProvider,
|
||||
} from "./model-normalization"
|
||||
import type { ProviderStatAggregate } from "./provider"
|
||||
import { normalizeCountry, normalizeTier, type StatBaseAggregate } from "./stat"
|
||||
|
||||
@ -60,6 +66,7 @@ WITH normalized AS (
|
||||
model AS raw_model,
|
||||
${statModelSql("model", "provider_model")} AS model,
|
||||
COALESCE(NULLIF(provider_model, ''), '') AS provider_model,
|
||||
COALESCE(NULLIF(provider, ''), '') AS raw_provider,
|
||||
UPPER(COALESCE(NULLIF(cf_country, ''), 'ZZ')) AS country,
|
||||
COALESCE(NULLIF(cf_continent, ''), '') AS continent,
|
||||
session,
|
||||
@ -95,7 +102,7 @@ WITH normalized AS (
|
||||
WHEN raw_model IN ('gpt-5-nano', 'grok-code', 'big-pickle') OR regexp_like(raw_model, '-free(:global)?$') THEN 'Free'
|
||||
ELSE 'Paid'
|
||||
END AS tier,
|
||||
${statProviderSql("model", "provider_model")} AS provider,
|
||||
${statProviderSql("model", "provider_model", "raw_provider")} AS provider,
|
||||
provider_model,
|
||||
model,
|
||||
country,
|
||||
@ -241,15 +248,16 @@ function sqlString(value: string) {
|
||||
|
||||
function statModelSql(model: string, providerModel: string) {
|
||||
return `COALESCE(NULLIF(regexp_replace(CASE
|
||||
WHEN ${model} = 'big-pickle' THEN COALESCE(NULLIF(${providerModel}, ''), ${model})
|
||||
WHEN lower(${model}) = 'big-pickle' THEN NULLIF(${providerModel}, '')
|
||||
ELSE ${model}
|
||||
END, '(-free|:global)+$', ''), ''), 'unknown')`
|
||||
}
|
||||
|
||||
function statProviderSql(model: string, providerModel: string) {
|
||||
function statProviderSql(model: string, providerModel: string, provider: string) {
|
||||
return `CASE
|
||||
${MODEL_AUTHOR_RULES.map((item) => ` WHEN strpos(lower(${providerModel}), ${sqlString(item.match)}) > 0 THEN ${sqlString(item.author)}`).join("\n")}
|
||||
${MODEL_AUTHOR_RULES.map((item) => ` WHEN strpos(lower(${model}), ${sqlString(item.match)}) > 0 THEN ${sqlString(item.author)}`).join("\n")}
|
||||
WHEN ${provider} <> '' AND lower(${provider}) NOT IN (${RETIRED_STAT_PROVIDERS.map(sqlString).join(", ")}) THEN ${provider}
|
||||
ELSE 'unknown'
|
||||
END`
|
||||
}
|
||||
|
||||
@ -13,6 +13,8 @@ export const MODEL_AUTHOR_RULES = [
|
||||
{ match: "qwen", author: "qwen" },
|
||||
] as const
|
||||
export const EXCLUDED_MODELS = new Set(["alpha-gpt-next"])
|
||||
export const RETIRED_STAT_MODELS = ["big-pickle"]
|
||||
export const RETIRED_STAT_PROVIDERS = ["opencode"]
|
||||
|
||||
export function normalizeInferenceModel(value: string | undefined) {
|
||||
return (value || "unknown").replace(/(-free|:global)+$/, "") || "unknown"
|
||||
@ -27,7 +29,7 @@ export function modelAuthor(value: string | undefined) {
|
||||
|
||||
export function statModel(model: string | undefined, providerModel: string | undefined) {
|
||||
const normalized = normalizeInferenceModel(model)
|
||||
if (normalized.toLowerCase() === "big-pickle") return normalizeInferenceModel(providerModel)
|
||||
if (RETIRED_STAT_MODELS.includes(normalized.toLowerCase())) return normalizeInferenceModel(providerModel)
|
||||
return normalized
|
||||
}
|
||||
|
||||
@ -42,6 +44,6 @@ export function statProvider(
|
||||
const providerModelAuthor = modelAuthor(providerModel)
|
||||
if (providerModelAuthor && providerModelAuthor !== "unknown") return providerModelAuthor
|
||||
if (modelAuthorValue !== "unknown") return modelAuthorValue
|
||||
if (provider && provider.toLowerCase() !== "opencode") return provider
|
||||
if (provider && !RETIRED_STAT_PROVIDERS.includes(provider.toLowerCase())) return provider
|
||||
return modelAuthorValue
|
||||
}
|
||||
|
||||
@ -1,14 +1,16 @@
|
||||
import { and, asc, eq } from "drizzle-orm"
|
||||
import { and, asc, eq, inArray, or } from "drizzle-orm"
|
||||
import { Effect, Layer } from "effect"
|
||||
import * as Context from "effect/Context"
|
||||
import { DatabaseError, DrizzleClient } from "../database"
|
||||
import { modelStat } from "../database/schema"
|
||||
import { RETIRED_STAT_MODELS, RETIRED_STAT_PROVIDERS } from "./model-normalization"
|
||||
import {
|
||||
chunks,
|
||||
collapseRows,
|
||||
inserted,
|
||||
rankBy,
|
||||
statPeriodKey,
|
||||
statRowScope,
|
||||
synthesizeAllTierRows,
|
||||
toStatBaseRow,
|
||||
UPSERT_CHUNK_SIZE,
|
||||
@ -39,6 +41,7 @@ export declare namespace ModelStatRepo {
|
||||
export interface Service {
|
||||
readonly listDaily: () => Effect.Effect<ModelStatMetric[], DatabaseError>
|
||||
readonly upsert: (rows: ModelStatRow[]) => Effect.Effect<void, DatabaseError>
|
||||
readonly deleteRetiredDimensions: (rows: ModelStatRow[]) => Effect.Effect<void, DatabaseError>
|
||||
}
|
||||
}
|
||||
|
||||
@ -120,7 +123,34 @@ export class ModelStatRepo extends Context.Service<ModelStatRepo, ModelStatRepo.
|
||||
)
|
||||
})
|
||||
|
||||
return ModelStatRepo.of({ listDaily, upsert })
|
||||
const deleteRetiredDimensions = Effect.fn("ModelStatRepo.deleteRetiredDimensions")(function* (
|
||||
rows: ModelStatRow[],
|
||||
) {
|
||||
const scope = statRowScope(rows)
|
||||
if (!scope) return
|
||||
|
||||
yield* Effect.tryPromise({
|
||||
try: () =>
|
||||
db
|
||||
.delete(modelStat)
|
||||
.where(
|
||||
and(
|
||||
inArray(modelStat.grain, scope.grains),
|
||||
inArray(modelStat.period_key, scope.periodKeys),
|
||||
inArray(modelStat.dataset, scope.datasets),
|
||||
inArray(modelStat.client, scope.clients),
|
||||
inArray(modelStat.source, scope.sources),
|
||||
or(
|
||||
inArray(modelStat.provider, RETIRED_STAT_PROVIDERS),
|
||||
inArray(modelStat.model, RETIRED_STAT_MODELS),
|
||||
),
|
||||
),
|
||||
),
|
||||
catch: (cause) => DatabaseError.make({ cause }),
|
||||
})
|
||||
})
|
||||
|
||||
return ModelStatRepo.of({ listDaily, upsert, deleteRetiredDimensions })
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
@ -1,13 +1,15 @@
|
||||
import { and, asc, eq } from "drizzle-orm"
|
||||
import { and, asc, eq, inArray } from "drizzle-orm"
|
||||
import { Effect, Layer } from "effect"
|
||||
import * as Context from "effect/Context"
|
||||
import { DatabaseError, DrizzleClient } from "../database"
|
||||
import { providerStat } from "../database/schema"
|
||||
import { RETIRED_STAT_PROVIDERS } from "./model-normalization"
|
||||
import {
|
||||
chunks,
|
||||
collapseRows,
|
||||
inserted,
|
||||
rankRowsWithMarketShare,
|
||||
statRowScope,
|
||||
synthesizeAllTierRows,
|
||||
toStatBaseRow,
|
||||
UPSERT_CHUNK_SIZE,
|
||||
@ -36,6 +38,7 @@ export declare namespace ProviderStatRepo {
|
||||
readonly source?: string
|
||||
}) => Effect.Effect<ProviderStatRow[], DatabaseError>
|
||||
readonly upsert: (rows: ProviderStatRow[]) => Effect.Effect<void, DatabaseError>
|
||||
readonly deleteRetiredDimensions: (rows: ProviderStatRow[]) => Effect.Effect<void, DatabaseError>
|
||||
}
|
||||
}
|
||||
|
||||
@ -138,7 +141,31 @@ export class ProviderStatRepo extends Context.Service<ProviderStatRepo, Provider
|
||||
)
|
||||
})
|
||||
|
||||
return ProviderStatRepo.of({ listDaily, listByPeriod, upsert })
|
||||
const deleteRetiredDimensions = Effect.fn("ProviderStatRepo.deleteRetiredDimensions")(function* (
|
||||
rows: ProviderStatRow[],
|
||||
) {
|
||||
const scope = statRowScope(rows)
|
||||
if (!scope) return
|
||||
|
||||
yield* Effect.tryPromise({
|
||||
try: () =>
|
||||
db
|
||||
.delete(providerStat)
|
||||
.where(
|
||||
and(
|
||||
inArray(providerStat.grain, scope.grains),
|
||||
inArray(providerStat.period_key, scope.periodKeys),
|
||||
inArray(providerStat.dataset, scope.datasets),
|
||||
inArray(providerStat.client, scope.clients),
|
||||
inArray(providerStat.source, scope.sources),
|
||||
inArray(providerStat.provider, RETIRED_STAT_PROVIDERS),
|
||||
),
|
||||
),
|
||||
catch: (cause) => DatabaseError.make({ cause }),
|
||||
})
|
||||
})
|
||||
|
||||
return ProviderStatRepo.of({ listDaily, listByPeriod, upsert, deleteRetiredDimensions })
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
@ -147,6 +147,17 @@ export function statPeriodKey(row: StatBaseRow) {
|
||||
return [row.grain, row.period_key, row.dataset, row.tier, row.client, row.source].join("\u0000")
|
||||
}
|
||||
|
||||
export function statRowScope(rows: StatBaseRow[]) {
|
||||
if (rows.length === 0) return
|
||||
return {
|
||||
grains: unique(rows.map((row) => row.grain)),
|
||||
periodKeys: unique(rows.map((row) => row.period_key)),
|
||||
datasets: unique(rows.map((row) => row.dataset ?? "all")),
|
||||
clients: unique(rows.map((row) => row.client ?? "all")),
|
||||
sources: unique(rows.map((row) => row.source ?? "all")),
|
||||
}
|
||||
}
|
||||
|
||||
export function periodKeyFor(grain: StatGrain, periodStart: Date) {
|
||||
if (grain === "week") return isoWeekId(periodStart)
|
||||
return utcDateId(periodStart)
|
||||
@ -219,6 +230,10 @@ export function chunks<T>(items: T[], size: number) {
|
||||
)
|
||||
}
|
||||
|
||||
function unique(values: string[]) {
|
||||
return [...new Set(values)]
|
||||
}
|
||||
|
||||
export function inserted(column: string) {
|
||||
return sql.raw(`values(\`${column}\`)`)
|
||||
}
|
||||
|
||||
@ -56,6 +56,14 @@ export const syncStats: () => Effect.Effect<
|
||||
concurrency: "unbounded",
|
||||
discard: true,
|
||||
})
|
||||
yield* Effect.all(
|
||||
[
|
||||
modelStats.deleteRetiredDimensions(modelRows),
|
||||
providerStats.deleteRetiredDimensions(providerRows),
|
||||
geoStats.deleteRetiredDimensions(geoRows),
|
||||
],
|
||||
{ concurrency: "unbounded", discard: true },
|
||||
)
|
||||
|
||||
yield* Effect.logInfo(
|
||||
`stats sync complete ${JSON.stringify({
|
||||
|
||||
Loading…
Reference in New Issue
Block a user