fix(stats): better stat table periods, provider/model norm
This commit is contained in:
parent
cf2cd13fb8
commit
9ac0f3e9ac
@ -0,0 +1,5 @@
|
||||
ALTER TABLE `geo_stat` ADD `provider` varchar(128) DEFAULT 'all' NOT NULL;--> statement-breakpoint
|
||||
ALTER TABLE `geo_stat` ADD `model` varchar(256) DEFAULT 'all' NOT NULL;--> statement-breakpoint
|
||||
ALTER TABLE `geo_stat` DROP INDEX `uniq_country_period`;--> statement-breakpoint
|
||||
CREATE UNIQUE INDEX `uniq_country_period` ON `geo_stat` (`grain`,`period_start`,`dataset`,`tier`,`client`,`source`,`provider`,`model`,`country`);--> statement-breakpoint
|
||||
CREATE INDEX `idx_country_model` ON `geo_stat` (`model`,`country`,`grain`,`period_start`);
|
||||
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,3 @@
|
||||
ALTER TABLE `geo_stat` ADD `period_key` varchar(32) NOT NULL;--> statement-breakpoint
|
||||
ALTER TABLE `model_stat` ADD `period_key` varchar(32) NOT NULL;--> statement-breakpoint
|
||||
ALTER TABLE `provider_stat` ADD `period_key` varchar(32) NOT NULL;
|
||||
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,6 @@
|
||||
ALTER TABLE `geo_stat` DROP COLUMN `period_start`;--> statement-breakpoint
|
||||
ALTER TABLE `geo_stat` DROP COLUMN `period_end`;--> statement-breakpoint
|
||||
ALTER TABLE `model_stat` DROP COLUMN `period_start`;--> statement-breakpoint
|
||||
ALTER TABLE `model_stat` DROP COLUMN `period_end`;--> statement-breakpoint
|
||||
ALTER TABLE `provider_stat` DROP COLUMN `period_start`;--> statement-breakpoint
|
||||
ALTER TABLE `provider_stat` DROP COLUMN `period_end`;
|
||||
File diff suppressed because it is too large
Load Diff
@ -16,7 +16,7 @@ export const modelStat = mysqlTable(
|
||||
(table) => [
|
||||
uniqueIndex("uniq_model_period").on(
|
||||
table.grain,
|
||||
table.period_start,
|
||||
table.period_key,
|
||||
table.dataset,
|
||||
table.tier,
|
||||
table.client,
|
||||
@ -24,8 +24,8 @@ export const modelStat = mysqlTable(
|
||||
table.provider,
|
||||
table.model,
|
||||
),
|
||||
index("idx_leaderboard_tokens").on(table.grain, table.period_start, table.dataset, table.tier, table.total_tokens),
|
||||
index("idx_model").on(table.model, table.grain, table.period_start),
|
||||
index("idx_leaderboard_tokens").on(table.grain, table.period_key, table.dataset, table.tier, table.total_tokens),
|
||||
index("idx_model").on(table.model, table.grain, table.period_key),
|
||||
],
|
||||
)
|
||||
|
||||
@ -45,7 +45,7 @@ export const providerStat = mysqlTable(
|
||||
(table) => [
|
||||
uniqueIndex("uniq_provider_period").on(
|
||||
table.grain,
|
||||
table.period_start,
|
||||
table.period_key,
|
||||
table.dataset,
|
||||
table.tier,
|
||||
table.client,
|
||||
@ -54,20 +54,20 @@ export const providerStat = mysqlTable(
|
||||
),
|
||||
index("idx_provider_leaderboard_tokens").on(
|
||||
table.grain,
|
||||
table.period_start,
|
||||
table.period_key,
|
||||
table.dataset,
|
||||
table.tier,
|
||||
table.total_tokens,
|
||||
),
|
||||
index("idx_provider_market_share").on(
|
||||
table.grain,
|
||||
table.period_start,
|
||||
table.period_key,
|
||||
table.dataset,
|
||||
table.tier,
|
||||
table.market_share_tokens,
|
||||
),
|
||||
index("idx_provider_rank").on(table.grain, table.period_start, table.dataset, table.tier, table.rank_by_tokens),
|
||||
index("idx_provider").on(table.provider, table.grain, table.period_start),
|
||||
index("idx_provider_rank").on(table.grain, table.period_key, table.dataset, table.tier, table.rank_by_tokens),
|
||||
index("idx_provider").on(table.provider, table.grain, table.period_key),
|
||||
],
|
||||
)
|
||||
|
||||
@ -75,6 +75,8 @@ export const geoStat = mysqlTable(
|
||||
"geo_stat",
|
||||
{
|
||||
...periodColumns(),
|
||||
provider: varchar({ length: 128 }).notNull().default("all"),
|
||||
model: varchar({ length: 256 }).notNull().default("all"),
|
||||
country: char({ length: 2 }).notNull(),
|
||||
continent: varchar({ length: 8 }).notNull().default(""),
|
||||
...metricColumns(),
|
||||
@ -88,17 +90,20 @@ export const geoStat = mysqlTable(
|
||||
(table) => [
|
||||
uniqueIndex("uniq_country_period").on(
|
||||
table.grain,
|
||||
table.period_start,
|
||||
table.period_key,
|
||||
table.dataset,
|
||||
table.tier,
|
||||
table.client,
|
||||
table.source,
|
||||
table.provider,
|
||||
table.model,
|
||||
table.country,
|
||||
),
|
||||
index("idx_country_map_tokens").on(table.grain, table.period_start, table.dataset, table.tier, table.total_tokens),
|
||||
index("idx_country_rank").on(table.grain, table.period_start, table.dataset, table.tier, table.rank_by_tokens),
|
||||
index("idx_country").on(table.country, table.grain, table.period_start),
|
||||
index("idx_continent").on(table.continent, table.grain, table.period_start),
|
||||
index("idx_country_map_tokens").on(table.grain, table.period_key, table.dataset, table.tier, table.total_tokens),
|
||||
index("idx_country_rank").on(table.grain, table.period_key, table.dataset, table.tier, table.rank_by_tokens),
|
||||
index("idx_country").on(table.country, table.grain, table.period_key),
|
||||
index("idx_continent").on(table.continent, table.grain, table.period_key),
|
||||
index("idx_country_model").on(table.model, table.country, table.grain, table.period_key),
|
||||
],
|
||||
)
|
||||
|
||||
@ -106,8 +111,7 @@ function periodColumns() {
|
||||
return {
|
||||
id: bigint({ mode: "number" }).autoincrement().primaryKey(),
|
||||
grain: varchar({ length: 16 }).notNull(),
|
||||
period_start: datetime({ mode: "date" }).notNull(),
|
||||
period_end: datetime({ mode: "date" }).notNull(),
|
||||
period_key: varchar({ length: 32 }).notNull(),
|
||||
dataset: varchar({ length: 64 }).notNull().default("all"),
|
||||
tier: varchar({ length: 64 }).notNull().default("all"),
|
||||
client: varchar({ length: 64 }).notNull().default("all"),
|
||||
|
||||
@ -8,6 +8,7 @@ import {
|
||||
collapseRows,
|
||||
inserted,
|
||||
rankRowsWithMarketShare,
|
||||
statPeriodKey,
|
||||
synthesizeAllTierRows,
|
||||
toStatBaseRow,
|
||||
UPSERT_CHUNK_SIZE,
|
||||
@ -15,11 +16,13 @@ import {
|
||||
} from "./stat"
|
||||
|
||||
export type GeoStatRow = typeof geoStat.$inferInsert
|
||||
export type GeoStatAggregate = StatBaseAggregate & { country: string; continent: string }
|
||||
export type GeoStatAggregate = StatBaseAggregate & { provider: string; model: string; country: string; continent: string }
|
||||
export type GeoStatMetric = {
|
||||
periodStart: Date
|
||||
periodEnd: Date
|
||||
periodKey: string
|
||||
updatedAt: Date
|
||||
tier: string
|
||||
provider: string
|
||||
model: string
|
||||
country: string
|
||||
continent: string
|
||||
totalTokens: number
|
||||
@ -30,11 +33,13 @@ export declare namespace GeoStatRepo {
|
||||
readonly listDaily: () => Effect.Effect<GeoStatMetric[], DatabaseError>
|
||||
readonly listByPeriod: (opts: {
|
||||
readonly grain: string
|
||||
readonly periodStart: Date
|
||||
readonly periodKey: string
|
||||
readonly dataset?: string
|
||||
readonly tier?: string
|
||||
readonly client?: string
|
||||
readonly source?: string
|
||||
readonly provider?: string
|
||||
readonly model?: string
|
||||
}) => Effect.Effect<GeoStatRow[], DatabaseError>
|
||||
readonly upsert: (rows: GeoStatRow[]) => Effect.Effect<void, DatabaseError>
|
||||
}
|
||||
@ -51,27 +56,39 @@ export class GeoStatRepo extends Context.Service<GeoStatRepo, GeoStatRepo.Servic
|
||||
try: () =>
|
||||
db
|
||||
.select({
|
||||
periodStart: geoStat.period_start,
|
||||
periodEnd: geoStat.period_end,
|
||||
periodKey: geoStat.period_key,
|
||||
updatedAt: geoStat.updated_at,
|
||||
tier: geoStat.tier,
|
||||
provider: geoStat.provider,
|
||||
model: geoStat.model,
|
||||
country: geoStat.country,
|
||||
continent: geoStat.continent,
|
||||
totalTokens: geoStat.total_tokens,
|
||||
})
|
||||
.from(geoStat)
|
||||
.where(and(eq(geoStat.grain, "day"), eq(geoStat.client, "all"), eq(geoStat.source, "all")))
|
||||
.orderBy(asc(geoStat.period_start)),
|
||||
.where(
|
||||
and(
|
||||
eq(geoStat.grain, "day"),
|
||||
eq(geoStat.client, "all"),
|
||||
eq(geoStat.source, "all"),
|
||||
eq(geoStat.provider, "all"),
|
||||
eq(geoStat.model, "all"),
|
||||
),
|
||||
)
|
||||
.orderBy(asc(geoStat.period_key)),
|
||||
catch: (cause) => DatabaseError.make({ cause }),
|
||||
})
|
||||
})
|
||||
|
||||
const listByPeriod = Effect.fn("GeoStatRepo.listByPeriod")(function* (opts: {
|
||||
readonly grain: string
|
||||
readonly periodStart: Date
|
||||
readonly periodKey: string
|
||||
readonly dataset?: string
|
||||
readonly tier?: string
|
||||
readonly client?: string
|
||||
readonly source?: string
|
||||
readonly provider?: string
|
||||
readonly model?: string
|
||||
}) {
|
||||
return yield* Effect.tryPromise({
|
||||
try: () =>
|
||||
@ -81,11 +98,13 @@ export class GeoStatRepo extends Context.Service<GeoStatRepo, GeoStatRepo.Servic
|
||||
.where(
|
||||
and(
|
||||
eq(geoStat.grain, opts.grain),
|
||||
eq(geoStat.period_start, opts.periodStart),
|
||||
eq(geoStat.period_key, opts.periodKey),
|
||||
eq(geoStat.dataset, opts.dataset ?? "zen"),
|
||||
eq(geoStat.tier, opts.tier ?? "all"),
|
||||
eq(geoStat.client, opts.client ?? "all"),
|
||||
eq(geoStat.source, opts.source ?? "all"),
|
||||
eq(geoStat.provider, opts.provider ?? "all"),
|
||||
eq(geoStat.model, opts.model ?? "all"),
|
||||
),
|
||||
),
|
||||
catch: (cause) => DatabaseError.make({ cause }),
|
||||
@ -103,7 +122,6 @@ export class GeoStatRepo extends Context.Service<GeoStatRepo, GeoStatRepo.Servic
|
||||
.values(chunk)
|
||||
.onDuplicateKeyUpdate({
|
||||
set: {
|
||||
period_end: inserted("period_end"),
|
||||
continent: inserted("continent"),
|
||||
sessions: inserted("sessions"),
|
||||
requests: inserted("requests"),
|
||||
@ -146,26 +164,35 @@ export class GeoStatRepo extends Context.Service<GeoStatRepo, GeoStatRepo.Servic
|
||||
}
|
||||
|
||||
export function rowsFromAggregates(aggregates: GeoStatAggregate[]) {
|
||||
return rankRowsWithMarketShare([
|
||||
...synthesizeAllTierRows(
|
||||
collapseRows(aggregates.filter((item) => item.grain === "week").map(toRow), dimensionKey),
|
||||
dimensionKey,
|
||||
),
|
||||
...synthesizeAllTierRows(
|
||||
collapseRows(aggregates.filter((item) => item.grain === "day").map(toRow), dimensionKey),
|
||||
dimensionKey,
|
||||
),
|
||||
])
|
||||
return rankRowsWithMarketShare(
|
||||
[
|
||||
...synthesizeAllTierRows(
|
||||
collapseRows(aggregates.filter((item) => item.grain === "week").map(toRow), dimensionKey),
|
||||
dimensionKey,
|
||||
),
|
||||
...synthesizeAllTierRows(
|
||||
collapseRows(aggregates.filter((item) => item.grain === "day").map(toRow), dimensionKey),
|
||||
dimensionKey,
|
||||
),
|
||||
],
|
||||
marketShareKey,
|
||||
)
|
||||
}
|
||||
|
||||
function toRow(data: GeoStatAggregate): GeoStatRow {
|
||||
return {
|
||||
...toStatBaseRow(data),
|
||||
provider: data.provider,
|
||||
model: data.model,
|
||||
country: data.country,
|
||||
continent: data.continent,
|
||||
}
|
||||
}
|
||||
|
||||
function dimensionKey(row: GeoStatRow) {
|
||||
return row.country
|
||||
return [row.provider, row.model, row.country].join("\u0000")
|
||||
}
|
||||
|
||||
function marketShareKey(row: GeoStatRow) {
|
||||
return [statPeriodKey(row), row.provider, row.model].join("\u0000")
|
||||
}
|
||||
|
||||
@ -28,17 +28,17 @@ const TOKEN_SCALE = 1_000_000
|
||||
const DOLLARS_PER_MICROCENT = 1 / 100_000_000
|
||||
const months = ["JAN", "FEB", "MAR", "APR", "MAY", "JUN", "JUL", "AUG", "SEP", "OCT", "NOV", "DEC"] as const
|
||||
|
||||
type StatMetricRow = Omit<ModelStatMetric, "periodStart" | "periodEnd"> & {
|
||||
type StatMetricRow = Omit<ModelStatMetric, "updatedAt"> & {
|
||||
periodStart: number
|
||||
periodEnd: number
|
||||
updatedAt: number
|
||||
}
|
||||
type ProviderMetricRow = Omit<ProviderStatMetric, "periodStart" | "periodEnd"> & {
|
||||
type ProviderMetricRow = Omit<ProviderStatMetric, "updatedAt"> & {
|
||||
periodStart: number
|
||||
periodEnd: number
|
||||
updatedAt: number
|
||||
}
|
||||
type GeoMetricRow = Omit<GeoStatMetric, "periodStart" | "periodEnd"> & {
|
||||
type GeoMetricRow = Omit<GeoStatMetric, "updatedAt"> & {
|
||||
periodStart: number
|
||||
periodEnd: number
|
||||
updatedAt: number
|
||||
}
|
||||
|
||||
type DateWindow = { start: number; end: number; previousStart: number; previousEnd: number }
|
||||
@ -85,10 +85,10 @@ function buildStatsHomeData(
|
||||
|
||||
const earliest = Math.min(...periods.map((row) => row.periodStart))
|
||||
const latest = Math.max(...periods.map((row) => row.periodStart))
|
||||
const latestEnd = Math.max(...periods.map((row) => row.periodEnd))
|
||||
const latestUpdate = Math.max(...periods.map((row) => row.updatedAt))
|
||||
|
||||
return {
|
||||
updatedAt: new Date(latestEnd).toISOString(),
|
||||
updatedAt: new Date(latestUpdate).toISOString(),
|
||||
usage: createUsageProductRecord((product) =>
|
||||
createRangeRecord((range) => buildUsagePoints(normalized, product, range, getWindow(range, earliest, latest))),
|
||||
),
|
||||
@ -361,14 +361,14 @@ function createRangeRecord<T>(value: (range: UsageRange) => T): Record<UsageRang
|
||||
}
|
||||
|
||||
function normalizeStatRow(row: ModelStatMetric): StatMetricRow[] {
|
||||
const periodStart = dateTime(row.periodStart)
|
||||
const periodEnd = dateTime(row.periodEnd)
|
||||
if (!Number.isFinite(periodStart) || !Number.isFinite(periodEnd)) return []
|
||||
const periodStart = periodKeyTime(row.periodKey)
|
||||
const updatedAt = dateTime(row.updatedAt)
|
||||
if (!Number.isFinite(periodStart) || !Number.isFinite(updatedAt)) return []
|
||||
return [
|
||||
{
|
||||
...row,
|
||||
periodStart,
|
||||
periodEnd,
|
||||
updatedAt,
|
||||
tier: normalizeTier(row.tier),
|
||||
provider: row.provider || "unknown",
|
||||
model: row.model || "unknown",
|
||||
@ -377,14 +377,14 @@ function normalizeStatRow(row: ModelStatMetric): StatMetricRow[] {
|
||||
}
|
||||
|
||||
function normalizeProviderRow(row: ProviderStatMetric): ProviderMetricRow[] {
|
||||
const periodStart = dateTime(row.periodStart)
|
||||
const periodEnd = dateTime(row.periodEnd)
|
||||
if (!Number.isFinite(periodStart) || !Number.isFinite(periodEnd)) return []
|
||||
const periodStart = periodKeyTime(row.periodKey)
|
||||
const updatedAt = dateTime(row.updatedAt)
|
||||
if (!Number.isFinite(periodStart) || !Number.isFinite(updatedAt)) return []
|
||||
return [
|
||||
{
|
||||
...row,
|
||||
periodStart,
|
||||
periodEnd,
|
||||
updatedAt,
|
||||
tier: normalizeTier(row.tier),
|
||||
provider: row.provider || "unknown",
|
||||
},
|
||||
@ -392,15 +392,17 @@ function normalizeProviderRow(row: ProviderStatMetric): ProviderMetricRow[] {
|
||||
}
|
||||
|
||||
function normalizeGeoRow(row: GeoStatMetric): GeoMetricRow[] {
|
||||
const periodStart = dateTime(row.periodStart)
|
||||
const periodEnd = dateTime(row.periodEnd)
|
||||
if (!Number.isFinite(periodStart) || !Number.isFinite(periodEnd)) return []
|
||||
const periodStart = periodKeyTime(row.periodKey)
|
||||
const updatedAt = dateTime(row.updatedAt)
|
||||
if (!Number.isFinite(periodStart) || !Number.isFinite(updatedAt)) return []
|
||||
return [
|
||||
{
|
||||
...row,
|
||||
periodStart,
|
||||
periodEnd,
|
||||
updatedAt,
|
||||
tier: normalizeTier(row.tier),
|
||||
provider: row.provider || "all",
|
||||
model: row.model || "all",
|
||||
country: row.country || "ZZ",
|
||||
continent: row.continent || "",
|
||||
},
|
||||
@ -420,6 +422,12 @@ function dateTime(value: Date | string) {
|
||||
return (value instanceof Date ? value : new Date(value)).getTime()
|
||||
}
|
||||
|
||||
function periodKeyTime(value: string) {
|
||||
const match = /^(\d{4})-(\d{2})-(\d{2})$/.exec(value)
|
||||
if (!match) return Number.NaN
|
||||
return Date.UTC(Number(match[1]), Number(match[2]) - 1, Number(match[3]))
|
||||
}
|
||||
|
||||
function formatBucketLabel(value: number, range: UsageRange) {
|
||||
const date = new Date(value)
|
||||
if (range === "YTD") return months[date.getUTCMonth()]
|
||||
@ -433,11 +441,19 @@ function formatBucketLabel(value: number, range: UsageRange) {
|
||||
function formatProvider(provider: string) {
|
||||
const known: Record<string, string> = {
|
||||
anthropic: "Anthropic",
|
||||
deepseek: "DeepSeek",
|
||||
google: "Google",
|
||||
minimax: "MiniMax",
|
||||
moonshot: "Moonshot",
|
||||
moonshotai: "Moonshot",
|
||||
nvidia: "Nvidia",
|
||||
nvidia: "NVIDIA",
|
||||
opencode: "opencode",
|
||||
openai: "OpenAI",
|
||||
qwen: "Qwen",
|
||||
tencent: "Tencent",
|
||||
xai: "xAI",
|
||||
xiaomi: "Xiaomi",
|
||||
zhipu: "Zhipu",
|
||||
zhipuai: "Zhipu",
|
||||
}
|
||||
const normalized = provider.toLowerCase().replace(/[^a-z0-9]/g, "")
|
||||
|
||||
66
packages/stats/core/src/domain/inference.test.ts
Normal file
66
packages/stats/core/src/domain/inference.test.ts
Normal file
@ -0,0 +1,66 @@
|
||||
import { describe, expect, test } from "bun:test"
|
||||
import { toModelAggregate } from "./inference"
|
||||
import { modelAuthor, normalizeInferenceModel } from "./model-normalization"
|
||||
|
||||
describe("inference stat normalization", () => {
|
||||
test("normalizes model suffixes used by router/provider variants", () => {
|
||||
expect(normalizeInferenceModel("deepseek-v4-flash-free")).toBe("deepseek-v4-flash")
|
||||
expect(normalizeInferenceModel("deepseek-v4-flash:global")).toBe("deepseek-v4-flash")
|
||||
expect(normalizeInferenceModel("mimo-v2.5-free")).toBe("mimo-v2.5")
|
||||
expect(normalizeInferenceModel("nemotron-3-super-free")).toBe("nemotron-3-super")
|
||||
expect(normalizeInferenceModel("mimo-v2.5-free:global")).toBe("mimo-v2.5")
|
||||
})
|
||||
|
||||
test("maps normalized model ids to public authors", () => {
|
||||
expect(modelAuthor("big-pickle")).toBe("opencode")
|
||||
expect(modelAuthor("claude-sonnet-4-5")).toBe("anthropic")
|
||||
expect(modelAuthor("deepseek-v4-pro")).toBe("deepseek")
|
||||
expect(modelAuthor("gemini-3.5-flash")).toBe("google")
|
||||
expect(modelAuthor("glm-5.1")).toBe("zhipu")
|
||||
expect(modelAuthor("gpt-5.5-pro")).toBe("openai")
|
||||
expect(modelAuthor("grok-build-0.1")).toBe("xai")
|
||||
expect(modelAuthor("hy3-preview")).toBe("tencent")
|
||||
expect(modelAuthor("kimi-k2.6")).toBe("moonshot")
|
||||
expect(modelAuthor("mimo-v2-omni")).toBe("xiaomi")
|
||||
expect(modelAuthor("minimax-m2.7")).toBe("minimax")
|
||||
expect(modelAuthor("nemotron-3-super-free")).toBe("nvidia")
|
||||
expect(modelAuthor("qwen3.7-max")).toBe("qwen")
|
||||
expect(modelAuthor("alpha-gpt-next")).toBeUndefined()
|
||||
})
|
||||
|
||||
test("model aggregates ignore datalake provider and use normalized author/model", () => {
|
||||
expect(toModelAggregate(aggregate("alpha-gpt-next", "openai"))).toEqual([])
|
||||
|
||||
expect(toModelAggregate(aggregate("deepseek-v4-flash-free", "not-public-provider"))).toMatchObject([
|
||||
{
|
||||
period_key: "2026-05-20",
|
||||
provider: "deepseek",
|
||||
model: "deepseek-v4-flash",
|
||||
},
|
||||
])
|
||||
})
|
||||
|
||||
test("model aggregates use ISO week period keys", () => {
|
||||
expect(
|
||||
toModelAggregate({
|
||||
...aggregate("gpt-5.5-pro", "openai"),
|
||||
grain: "week",
|
||||
period_key: "2026-W20",
|
||||
}),
|
||||
).toMatchObject([{ period_key: "2026-W20" }])
|
||||
})
|
||||
})
|
||||
|
||||
function aggregate(model: string, provider: string) {
|
||||
return {
|
||||
grain: "day",
|
||||
period_key: "2026-05-20",
|
||||
dataset: "zen",
|
||||
tier: "Paid",
|
||||
provider,
|
||||
model,
|
||||
sessions: "1",
|
||||
requests: "1",
|
||||
sample_count: "1",
|
||||
}
|
||||
}
|
||||
@ -2,10 +2,17 @@ import { Resource } from "sst/resource"
|
||||
import type { AthenaData } from "../athena"
|
||||
import type { GeoStatAggregate } from "./geo"
|
||||
import type { ModelStatAggregate } from "./model"
|
||||
import {
|
||||
EXCLUDED_MODELS,
|
||||
MODEL_AUTHOR_OVERRIDES,
|
||||
MODEL_AUTHOR_RULES,
|
||||
modelAuthor,
|
||||
normalizeInferenceModel,
|
||||
} from "./model-normalization"
|
||||
import type { ProviderStatAggregate } from "./provider"
|
||||
import { normalizeCountry, normalizeTier, type StatBaseAggregate } from "./stat"
|
||||
|
||||
export type StatDimension = "model" | "provider" | "geo"
|
||||
export type StatDimension = "model" | "provider" | "geo" | "geo_model"
|
||||
|
||||
export function buildStatsQuery(periodStart: Date, periodEnd: Date, dimension: StatDimension) {
|
||||
const periodStartValue = sqlString(periodStart.toISOString())
|
||||
@ -20,8 +27,13 @@ export function buildStatsQuery(periodStart: Date, periodEnd: Date, dimension: S
|
||||
groupBy: "provider, model",
|
||||
}
|
||||
if (dimension === "provider") return { select: "provider", groupBy: "provider" }
|
||||
if (dimension === "geo_model")
|
||||
return {
|
||||
select: "provider, model, country, COALESCE(MAX(NULLIF(continent, '')), '') AS continent",
|
||||
groupBy: "provider, model, country",
|
||||
}
|
||||
return {
|
||||
select: "country, COALESCE(MAX(NULLIF(continent, '')), '') AS continent",
|
||||
select: "'all' AS provider, 'all' AS model, country, COALESCE(MAX(NULLIF(continent, '')), '') AS continent",
|
||||
groupBy: "country",
|
||||
}
|
||||
})()
|
||||
@ -48,32 +60,56 @@ export function buildStatsQuery(periodStart: Date, periodEnd: Date, dimension: S
|
||||
COUNT(*) AS sample_count`
|
||||
|
||||
return `
|
||||
WITH filtered AS (
|
||||
WITH normalized AS (
|
||||
SELECT
|
||||
from_iso8601_timestamp(event_timestamp) AS event_time,
|
||||
CASE
|
||||
WHEN source = 'lite' THEN 'Go'
|
||||
WHEN model IN ('gpt-5-nano', 'grok-code', 'big-pickle') OR model LIKE '%-free' THEN 'Free'
|
||||
ELSE 'Paid'
|
||||
END AS tier,
|
||||
COALESCE(NULLIF(
|
||||
CASE
|
||||
WHEN starts_with(provider, 'minimax-plan') THEN 'minimax-plan'
|
||||
WHEN starts_with(provider, 'zai-plan') THEN 'zai-plan'
|
||||
WHEN starts_with(provider, 'azure-databricks') THEN 'azure-databricks'
|
||||
WHEN regexp_like(provider, '^azure[0-9]+') THEN 'azure-openai'
|
||||
ELSE provider
|
||||
END,
|
||||
''
|
||||
), 'unknown') AS provider,
|
||||
model AS raw_model,
|
||||
COALESCE(NULLIF(regexp_replace(model, '(-free|:global)+$', ''), ''), 'unknown') AS model,
|
||||
COALESCE(NULLIF(provider_model, ''), '') AS provider_model,
|
||||
COALESCE(NULLIF(model, ''), 'unknown') AS model,
|
||||
UPPER(COALESCE(NULLIF(cf_country, ''), 'ZZ')) AS country,
|
||||
COALESCE(NULLIF(cf_continent, ''), '') AS continent,
|
||||
session,
|
||||
status,
|
||||
duration AS duration_ms,
|
||||
time_to_first_byte AS ttfb_ms,
|
||||
timestamp_first_byte,
|
||||
timestamp_last_byte,
|
||||
tokens_input,
|
||||
tokens_output,
|
||||
tokens_reasoning,
|
||||
tokens_cache_read,
|
||||
tokens_cache_write_5m,
|
||||
cost_input_microcents,
|
||||
cost_output_microcents,
|
||||
cost_total_microcents,
|
||||
cost_input,
|
||||
cost_output,
|
||||
cost_total,
|
||||
source
|
||||
FROM ${sourceTable}
|
||||
WHERE event_type = 'completions'
|
||||
AND model IS NOT NULL
|
||||
AND model <> ''
|
||||
AND (strpos(COALESCE(user_agent, ''), 'ai-sdk') > 0 OR strpos(COALESCE(user_agent, ''), 'opencode') > 0)
|
||||
AND event_timestamp >= ${periodStartValue}
|
||||
AND event_timestamp < ${periodEndValue}
|
||||
), filtered AS (
|
||||
SELECT
|
||||
event_time,
|
||||
CASE
|
||||
WHEN source = 'lite' THEN 'Go'
|
||||
WHEN model IN ('gpt-5-nano', 'grok-code', 'big-pickle') OR regexp_like(raw_model, '-free(:global)?$') THEN 'Free'
|
||||
ELSE 'Paid'
|
||||
END AS tier,
|
||||
${modelAuthorSql("model")} AS provider,
|
||||
provider_model,
|
||||
model,
|
||||
country,
|
||||
continent,
|
||||
session,
|
||||
status,
|
||||
duration_ms,
|
||||
ttfb_ms,
|
||||
CASE
|
||||
WHEN timestamp_last_byte - timestamp_first_byte < 100 THEN null
|
||||
ELSE CAST(tokens_output AS double) / (timestamp_last_byte - timestamp_first_byte) * 1000
|
||||
@ -86,50 +122,47 @@ WITH filtered AS (
|
||||
COALESCE(cost_input_microcents, cost_input * 1000000) AS cost_input_microcents,
|
||||
COALESCE(cost_output_microcents, cost_output * 1000000) AS cost_output_microcents,
|
||||
COALESCE(cost_total_microcents, cost_total * 1000000) AS cost_total_microcents
|
||||
FROM ${sourceTable}
|
||||
WHERE event_type = 'completions'
|
||||
AND model IS NOT NULL
|
||||
AND model <> ''
|
||||
AND (strpos(COALESCE(user_agent, ''), 'ai-sdk') > 0 OR strpos(COALESCE(user_agent, ''), 'opencode') > 0)
|
||||
AND event_timestamp >= ${periodStartValue}
|
||||
AND event_timestamp < ${periodEndValue}
|
||||
FROM normalized
|
||||
WHERE lower(model) NOT IN (${[...EXCLUDED_MODELS].map(sqlString).join(", ")})
|
||||
), weekly AS (
|
||||
SELECT
|
||||
concat(CAST(year_of_week(event_time) AS varchar), '-W', lpad(CAST(week(event_time) AS varchar), 2, '0')) AS week_key,
|
||||
*
|
||||
FROM filtered
|
||||
), daily AS (
|
||||
SELECT date_trunc('day', event_time) AS day, *
|
||||
SELECT substr(to_iso8601(date_trunc('day', event_time)), 1, 10) AS day_key, *
|
||||
FROM filtered
|
||||
)
|
||||
SELECT
|
||||
'week' AS grain,
|
||||
${periodStartValue} AS period_start,
|
||||
${periodEndValue} AS period_end,
|
||||
week_key AS period_key,
|
||||
${sqlString(Resource.StatsSyncConfig.dataset)} AS dataset,
|
||||
tier,
|
||||
${dimensionSql.select},
|
||||
${aggregateColumns}
|
||||
FROM filtered
|
||||
GROUP BY tier, ${dimensionSql.groupBy}
|
||||
FROM weekly
|
||||
GROUP BY week_key, tier, ${dimensionSql.groupBy}
|
||||
UNION ALL
|
||||
SELECT
|
||||
'day' AS grain,
|
||||
to_iso8601(day) AS period_start,
|
||||
to_iso8601(least(day + INTERVAL '1' DAY, from_iso8601_timestamp(${periodEndValue}))) AS period_end,
|
||||
day_key AS period_key,
|
||||
${sqlString(Resource.StatsSyncConfig.dataset)} AS dataset,
|
||||
tier,
|
||||
${dimensionSql.select},
|
||||
${aggregateColumns}
|
||||
FROM daily
|
||||
GROUP BY day, tier, ${dimensionSql.groupBy}
|
||||
ORDER BY grain, period_start, total_tokens DESC
|
||||
GROUP BY day_key, tier, ${dimensionSql.groupBy}
|
||||
ORDER BY grain, period_key, total_tokens DESC
|
||||
`
|
||||
}
|
||||
|
||||
export function toModelAggregate(data: AthenaData): ModelStatAggregate[] {
|
||||
const model = normalizeInferenceModel(data.model)
|
||||
const author = modelAuthor(model)
|
||||
if (!author) return []
|
||||
|
||||
return toStatBaseAggregate(data).flatMap((base) => [
|
||||
{
|
||||
...base,
|
||||
provider: data.provider || "unknown",
|
||||
model: data.model || "unknown",
|
||||
provider_model: data.provider_model || "",
|
||||
},
|
||||
{ ...base, provider: author, model, provider_model: data.provider_model || "" },
|
||||
])
|
||||
}
|
||||
|
||||
@ -141,6 +174,8 @@ export function toGeoAggregate(data: AthenaData): GeoStatAggregate[] {
|
||||
return toStatBaseAggregate(data).flatMap((base) => [
|
||||
{
|
||||
...base,
|
||||
provider: data.provider || "all",
|
||||
model: normalizeInferenceModel(data.model || "all"),
|
||||
country: normalizeCountry(data.country),
|
||||
continent: data.continent || "",
|
||||
},
|
||||
@ -149,15 +184,12 @@ export function toGeoAggregate(data: AthenaData): GeoStatAggregate[] {
|
||||
|
||||
function toStatBaseAggregate(data: AthenaData): StatBaseAggregate[] {
|
||||
const grain = data.grain === "day" || data.grain === "week" ? data.grain : undefined
|
||||
const periodStart = new Date(data.period_start ?? "")
|
||||
const periodEnd = new Date(data.period_end ?? "")
|
||||
if (!grain || Number.isNaN(periodStart.getTime()) || Number.isNaN(periodEnd.getTime())) return []
|
||||
if (!grain || !data.period_key) return []
|
||||
|
||||
return [
|
||||
{
|
||||
grain,
|
||||
period_start: periodStart,
|
||||
period_end: periodEnd,
|
||||
period_key: data.period_key,
|
||||
dataset: data.dataset || Resource.StatsSyncConfig.dataset,
|
||||
tier: normalizeTier(data.tier || "unknown"),
|
||||
sessions: integer(data, "sessions"),
|
||||
@ -210,3 +242,11 @@ function sqlIdentifier(value: string) {
|
||||
function sqlString(value: string) {
|
||||
return `'${value.replace(/'/g, "''")}'`
|
||||
}
|
||||
|
||||
function modelAuthorSql(model: string) {
|
||||
return `CASE
|
||||
${MODEL_AUTHOR_OVERRIDES.map((item) => ` WHEN lower(${model}) = ${sqlString(item.model)} THEN ${sqlString(item.author)}`).join("\n")}
|
||||
${MODEL_AUTHOR_RULES.map((item) => ` WHEN strpos(lower(${model}), ${sqlString(item.match)}) > 0 THEN ${sqlString(item.author)}`).join("\n")}
|
||||
ELSE 'unknown'
|
||||
END`
|
||||
}
|
||||
|
||||
30
packages/stats/core/src/domain/model-normalization.ts
Normal file
30
packages/stats/core/src/domain/model-normalization.ts
Normal file
@ -0,0 +1,30 @@
|
||||
export const MODEL_AUTHOR_OVERRIDES = [{ model: "big-pickle", author: "opencode" }] as const
|
||||
export const MODEL_AUTHOR_RULES = [
|
||||
{ match: "claude", author: "anthropic" },
|
||||
{ match: "gemini", author: "google" },
|
||||
{ match: "deepseek", author: "deepseek" },
|
||||
{ match: "glm", author: "zhipu" },
|
||||
{ match: "gpt", author: "openai" },
|
||||
{ match: "grok", author: "xai" },
|
||||
{ match: "hy3", author: "tencent" },
|
||||
{ match: "kimi", author: "moonshot" },
|
||||
{ match: "mimo", author: "xiaomi" },
|
||||
{ match: "minimax", author: "minimax" },
|
||||
{ match: "nemotron", author: "nvidia" },
|
||||
{ match: "qwen", author: "qwen" },
|
||||
] as const
|
||||
export const EXCLUDED_MODELS = new Set(["alpha-gpt-next"])
|
||||
|
||||
export function normalizeInferenceModel(value: string | undefined) {
|
||||
return (value || "unknown").replace(/(-free|:global)+$/, "") || "unknown"
|
||||
}
|
||||
|
||||
export function modelAuthor(value: string | undefined) {
|
||||
const model = normalizeInferenceModel(value).toLowerCase()
|
||||
if (EXCLUDED_MODELS.has(model)) return undefined
|
||||
|
||||
const override = MODEL_AUTHOR_OVERRIDES.find((item) => item.model === model)
|
||||
if (override) return override.author
|
||||
|
||||
return MODEL_AUTHOR_RULES.find((item) => model.includes(item.match))?.author ?? "unknown"
|
||||
}
|
||||
@ -19,8 +19,8 @@ export type ModelStatRow = typeof modelStat.$inferInsert
|
||||
export type ModelStatAggregate = StatBaseAggregate & { provider: string; model: string; provider_model: string }
|
||||
|
||||
export type ModelStatMetric = {
|
||||
periodStart: Date
|
||||
periodEnd: Date
|
||||
periodKey: string
|
||||
updatedAt: Date
|
||||
tier: string
|
||||
provider: string
|
||||
model: string
|
||||
@ -55,8 +55,8 @@ export class ModelStatRepo extends Context.Service<ModelStatRepo, ModelStatRepo.
|
||||
try: () =>
|
||||
db
|
||||
.select({
|
||||
periodStart: modelStat.period_start,
|
||||
periodEnd: modelStat.period_end,
|
||||
periodKey: modelStat.period_key,
|
||||
updatedAt: modelStat.updated_at,
|
||||
tier: modelStat.tier,
|
||||
provider: modelStat.provider,
|
||||
model: modelStat.model,
|
||||
@ -72,7 +72,7 @@ export class ModelStatRepo extends Context.Service<ModelStatRepo, ModelStatRepo.
|
||||
})
|
||||
.from(modelStat)
|
||||
.where(and(eq(modelStat.grain, "day"), eq(modelStat.client, "all"), eq(modelStat.source, "all")))
|
||||
.orderBy(asc(modelStat.period_start)),
|
||||
.orderBy(asc(modelStat.period_key)),
|
||||
catch: (cause) => DatabaseError.make({ cause }),
|
||||
})
|
||||
})
|
||||
@ -88,7 +88,6 @@ export class ModelStatRepo extends Context.Service<ModelStatRepo, ModelStatRepo.
|
||||
.values(chunk)
|
||||
.onDuplicateKeyUpdate({
|
||||
set: {
|
||||
period_end: inserted("period_end"),
|
||||
provider_model: inserted("provider_model"),
|
||||
sessions: inserted("sessions"),
|
||||
requests: inserted("requests"),
|
||||
|
||||
@ -17,8 +17,8 @@ import {
|
||||
export type ProviderStatRow = typeof providerStat.$inferInsert
|
||||
export type ProviderStatAggregate = StatBaseAggregate & { provider: string }
|
||||
export type ProviderStatMetric = {
|
||||
periodStart: Date
|
||||
periodEnd: Date
|
||||
periodKey: string
|
||||
updatedAt: Date
|
||||
tier: string
|
||||
provider: string
|
||||
totalTokens: number
|
||||
@ -29,7 +29,7 @@ export declare namespace ProviderStatRepo {
|
||||
readonly listDaily: () => Effect.Effect<ProviderStatMetric[], DatabaseError>
|
||||
readonly listByPeriod: (opts: {
|
||||
readonly grain: string
|
||||
readonly periodStart: Date
|
||||
readonly periodKey: string
|
||||
readonly dataset?: string
|
||||
readonly tier?: string
|
||||
readonly client?: string
|
||||
@ -52,22 +52,22 @@ export class ProviderStatRepo extends Context.Service<ProviderStatRepo, Provider
|
||||
try: () =>
|
||||
db
|
||||
.select({
|
||||
periodStart: providerStat.period_start,
|
||||
periodEnd: providerStat.period_end,
|
||||
periodKey: providerStat.period_key,
|
||||
updatedAt: providerStat.updated_at,
|
||||
tier: providerStat.tier,
|
||||
provider: providerStat.provider,
|
||||
totalTokens: providerStat.total_tokens,
|
||||
})
|
||||
.from(providerStat)
|
||||
.where(and(eq(providerStat.grain, "day"), eq(providerStat.client, "all"), eq(providerStat.source, "all")))
|
||||
.orderBy(asc(providerStat.period_start)),
|
||||
.orderBy(asc(providerStat.period_key)),
|
||||
catch: (cause) => DatabaseError.make({ cause }),
|
||||
})
|
||||
})
|
||||
|
||||
const listByPeriod = Effect.fn("ProviderStatRepo.listByPeriod")(function* (opts: {
|
||||
readonly grain: string
|
||||
readonly periodStart: Date
|
||||
readonly periodKey: string
|
||||
readonly dataset?: string
|
||||
readonly tier?: string
|
||||
readonly client?: string
|
||||
@ -81,7 +81,7 @@ export class ProviderStatRepo extends Context.Service<ProviderStatRepo, Provider
|
||||
.where(
|
||||
and(
|
||||
eq(providerStat.grain, opts.grain),
|
||||
eq(providerStat.period_start, opts.periodStart),
|
||||
eq(providerStat.period_key, opts.periodKey),
|
||||
eq(providerStat.dataset, opts.dataset ?? "zen"),
|
||||
eq(providerStat.tier, opts.tier ?? "all"),
|
||||
eq(providerStat.client, opts.client ?? "all"),
|
||||
@ -103,7 +103,6 @@ export class ProviderStatRepo extends Context.Service<ProviderStatRepo, Provider
|
||||
.values(chunk)
|
||||
.onDuplicateKeyUpdate({
|
||||
set: {
|
||||
period_end: inserted("period_end"),
|
||||
sessions: inserted("sessions"),
|
||||
requests: inserted("requests"),
|
||||
input_tokens: inserted("input_tokens"),
|
||||
|
||||
@ -1,13 +1,13 @@
|
||||
import { sql } from "drizzle-orm"
|
||||
|
||||
export const UPSERT_CHUNK_SIZE = 500
|
||||
const DAY_MS = 86_400_000
|
||||
|
||||
export type StatGrain = "day" | "week"
|
||||
|
||||
export type StatBaseAggregate = {
|
||||
grain: StatGrain
|
||||
period_start: Date
|
||||
period_end: Date
|
||||
period_key: string
|
||||
dataset: string
|
||||
tier: string
|
||||
sessions: number
|
||||
@ -34,8 +34,7 @@ export type StatBaseAggregate = {
|
||||
|
||||
export type StatBaseRow = {
|
||||
grain: string
|
||||
period_start: Date
|
||||
period_end: Date
|
||||
period_key: string
|
||||
dataset?: string
|
||||
tier?: string
|
||||
client?: string
|
||||
@ -65,8 +64,7 @@ export type StatBaseRow = {
|
||||
export function toStatBaseRow(data: StatBaseAggregate) {
|
||||
return {
|
||||
grain: data.grain,
|
||||
period_start: data.period_start,
|
||||
period_end: data.period_end,
|
||||
period_key: data.period_key,
|
||||
dataset: data.dataset,
|
||||
tier: data.tier,
|
||||
client: "all",
|
||||
@ -101,7 +99,7 @@ export function synthesizeAllTierRows<T extends StatBaseRow>(rows: T[], dimensio
|
||||
rows.reduce<Record<string, T>>((result, row) => {
|
||||
const key = [
|
||||
row.grain,
|
||||
row.period_start.toISOString(),
|
||||
row.period_key,
|
||||
row.dataset,
|
||||
row.client,
|
||||
row.source,
|
||||
@ -119,7 +117,7 @@ export function collapseRows<T extends StatBaseRow>(rows: T[], dimensionKey: (ro
|
||||
rows.reduce<Record<string, T>>((result, row) => {
|
||||
const key = [
|
||||
row.grain,
|
||||
row.period_start.toISOString(),
|
||||
row.period_key,
|
||||
row.dataset,
|
||||
row.tier,
|
||||
row.client,
|
||||
@ -135,7 +133,6 @@ export function collapseRows<T extends StatBaseRow>(rows: T[], dimensionKey: (ro
|
||||
export function combineRows<T extends StatBaseRow>(left: T, right: T): T {
|
||||
return {
|
||||
...left,
|
||||
period_end: right.period_end > left.period_end ? right.period_end : left.period_end,
|
||||
sessions: (left.sessions ?? 0) + (right.sessions ?? 0),
|
||||
requests: (left.requests ?? 0) + (right.requests ?? 0),
|
||||
input_tokens: (left.input_tokens ?? 0) + (right.input_tokens ?? 0),
|
||||
@ -160,17 +157,41 @@ export function combineRows<T extends StatBaseRow>(left: T, right: T): T {
|
||||
}
|
||||
|
||||
export function statPeriodKey(row: StatBaseRow) {
|
||||
return [row.grain, row.period_start.toISOString(), row.dataset, row.tier, row.client, row.source].join("\u0000")
|
||||
return [row.grain, row.period_key, row.dataset, row.tier, row.client, row.source].join("\u0000")
|
||||
}
|
||||
|
||||
export function periodKeyFor(grain: StatGrain, periodStart: Date) {
|
||||
if (grain === "week") return isoWeekId(periodStart)
|
||||
return utcDateId(periodStart)
|
||||
}
|
||||
|
||||
export function startOfUtcDay(value: Date) {
|
||||
return new Date(Date.UTC(value.getUTCFullYear(), value.getUTCMonth(), value.getUTCDate()))
|
||||
}
|
||||
|
||||
export function startOfIsoWeek(value: Date) {
|
||||
return new Date(Date.UTC(value.getUTCFullYear(), value.getUTCMonth(), value.getUTCDate() - (value.getUTCDay() || 7) + 1))
|
||||
}
|
||||
|
||||
export function isoWeekId(value: Date) {
|
||||
const thursday = new Date(
|
||||
Date.UTC(value.getUTCFullYear(), value.getUTCMonth(), value.getUTCDate() + 4 - (value.getUTCDay() || 7)),
|
||||
)
|
||||
return `${thursday.getUTCFullYear()}-W${String(Math.ceil(((thursday.getTime() - Date.UTC(thursday.getUTCFullYear(), 0, 1)) / DAY_MS + 1) / 7)).padStart(2, "0")}`
|
||||
}
|
||||
|
||||
function utcDateId(value: Date) {
|
||||
return `${value.getUTCFullYear()}-${String(value.getUTCMonth() + 1).padStart(2, "0")}-${String(value.getUTCDate()).padStart(2, "0")}`
|
||||
}
|
||||
|
||||
export function rankBy<T extends StatBaseRow>(rows: T[], value: (row: T) => number) {
|
||||
return new Map(rows.toSorted((a, b) => value(b) - value(a)).map((row, index) => [row, index + 1]))
|
||||
}
|
||||
|
||||
export function rankRowsWithMarketShare<T extends StatBaseRow>(rows: T[]) {
|
||||
export function rankRowsWithMarketShare<T extends StatBaseRow>(rows: T[], groupKey: (row: T) => string = statPeriodKey) {
|
||||
return Object.values(
|
||||
rows.reduce<Record<string, T[]>>((result, row) => {
|
||||
const key = statPeriodKey(row)
|
||||
const key = groupKey(row)
|
||||
result[key] = [...(result[key] ?? []), row]
|
||||
return result
|
||||
}, {}),
|
||||
|
||||
@ -3,14 +3,19 @@ import { readdir } from "node:fs/promises"
|
||||
import path from "node:path"
|
||||
import { drizzle } from "drizzle-orm/planetscale-serverless"
|
||||
import { geoStat, modelStat, providerStat } from "./database/schema"
|
||||
import { modelAuthor, normalizeInferenceModel } from "./domain/model-normalization"
|
||||
import {
|
||||
chunks,
|
||||
collapseRows,
|
||||
inserted,
|
||||
isoWeekId,
|
||||
normalizeCountry,
|
||||
normalizeTier,
|
||||
periodKeyFor,
|
||||
rankBy,
|
||||
rankRowsWithMarketShare,
|
||||
startOfIsoWeek,
|
||||
startOfUtcDay,
|
||||
statPeriodKey,
|
||||
synthesizeAllTierRows,
|
||||
toStatBaseRow,
|
||||
@ -23,10 +28,9 @@ const DEFAULT_TIERS = ["Go", "Free", "Paid"]
|
||||
const FREE_MODELS = new Set(["gpt-5-nano", "grok-code", "big-pickle"])
|
||||
|
||||
type Grain = "day" | "week"
|
||||
type MetricDimension = "model" | "provider" | "geo"
|
||||
type MetricDimension = "model" | "provider" | "geo" | "geo-model"
|
||||
type LookupDimension = "model-provider-model" | "geo-continent"
|
||||
type ImportKey = `${MetricDimension | LookupDimension}-${Grain}`
|
||||
type GeneratedImportKey = `${MetricDimension}-${Grain}`
|
||||
type QuerySpec = {
|
||||
name: string
|
||||
importKey: ImportKey
|
||||
@ -34,19 +38,17 @@ type QuerySpec = {
|
||||
query: ReturnType<typeof metricQuery>
|
||||
}
|
||||
type RawRow = Record<string, string>
|
||||
type Period = { start: Date; end: Date }
|
||||
type ImportOptions = {
|
||||
dataset: string
|
||||
databaseUrl: string | undefined
|
||||
directories: string[]
|
||||
dryRun: boolean
|
||||
periodEnd: Date | undefined
|
||||
periodStart: Date | undefined
|
||||
files: Partial<Record<ImportKey, string[]>>
|
||||
}
|
||||
type ModelAggregate = StatBaseAggregate & { provider: string; model: string; provider_model: string }
|
||||
type ProviderAggregate = StatBaseAggregate & { provider: string }
|
||||
type GeoAggregate = StatBaseAggregate & { country: string; continent: string }
|
||||
type GeoAggregate = StatBaseAggregate & { provider: string; model: string; country: string; continent: string }
|
||||
type ModelStatRow = typeof modelStat.$inferInsert
|
||||
type ProviderStatRow = typeof providerStat.$inferInsert
|
||||
type GeoStatRow = typeof geoStat.$inferInsert
|
||||
@ -60,6 +62,8 @@ const inputKeys = [
|
||||
"provider-week",
|
||||
"geo-day",
|
||||
"geo-week",
|
||||
"geo-model-day",
|
||||
"geo-model-week",
|
||||
"geo-continent-day",
|
||||
"geo-continent-week",
|
||||
] as const satisfies ImportKey[]
|
||||
@ -77,7 +81,7 @@ function printQueries(args: string[]) {
|
||||
const flags = parseFlags(args)
|
||||
const limit = parseIntegerFlag(flags, "limit") ?? 1000
|
||||
const tiers = parseListFlag(flags, "tiers") ?? DEFAULT_TIERS
|
||||
const queries = buildQueries(limit, tiers, flags.has("include-weekly"))
|
||||
const queries = buildQueries(limit, tiers)
|
||||
const only = flags.get("only")?.[0]
|
||||
|
||||
if (only) {
|
||||
@ -112,41 +116,46 @@ async function importFiles(args: string[]) {
|
||||
...(await lookupRows(opts.files["geo-continent-day"], "day", opts, geoContinentLookup)),
|
||||
...(await lookupRows(opts.files["geo-continent-week"], "week", opts, geoContinentLookup)),
|
||||
])
|
||||
const modelRows = modelRowsFromAggregates([
|
||||
...(await metricRows(opts.files["model-day"], "day", opts, (row, base) => ({
|
||||
...base,
|
||||
provider: provider(row),
|
||||
model: model(row),
|
||||
provider_model: providerModelLookup.get(lookupKey(base, provider(row), model(row))) ?? providerModel(row),
|
||||
}))),
|
||||
...(await metricRows(opts.files["model-week"], "week", opts, (row, base) => ({
|
||||
...base,
|
||||
provider: provider(row),
|
||||
model: model(row),
|
||||
provider_model: providerModelLookup.get(lookupKey(base, provider(row), model(row))) ?? providerModel(row),
|
||||
}))),
|
||||
])
|
||||
const modelAggregates = [
|
||||
...(await metricRows(opts.files["model-day"], "day", opts, (row, base) =>
|
||||
modelAggregate(row, base, providerModelLookup),
|
||||
)),
|
||||
...(await metricRows(opts.files["model-week"], "week", opts, (row, base) =>
|
||||
modelAggregate(row, base, providerModelLookup),
|
||||
)),
|
||||
]
|
||||
const modelRows = modelRowsFromAggregates(modelAggregates)
|
||||
const providerRows = providerRowsFromAggregates([
|
||||
...(await metricRows(opts.files["provider-day"], "day", opts, (row, base) => ({
|
||||
...base,
|
||||
provider: provider(row),
|
||||
provider: provider(row) ?? "unknown",
|
||||
}))),
|
||||
...(await metricRows(opts.files["provider-week"], "week", opts, (row, base) => ({
|
||||
...base,
|
||||
provider: provider(row),
|
||||
provider: provider(row) ?? "unknown",
|
||||
}))),
|
||||
])
|
||||
const geoRows = geoRowsFromAggregates([
|
||||
...(await metricRows(opts.files["geo-day"], "day", opts, (row, base) => ({
|
||||
...base,
|
||||
provider: "all",
|
||||
model: "all",
|
||||
country: country(row),
|
||||
continent: continentLookup.get(lookupKey(base, country(row))) ?? continent(row),
|
||||
}))),
|
||||
...(await metricRows(opts.files["geo-week"], "week", opts, (row, base) => ({
|
||||
...base,
|
||||
provider: "all",
|
||||
model: "all",
|
||||
country: country(row),
|
||||
continent: continentLookup.get(lookupKey(base, country(row))) ?? continent(row),
|
||||
}))),
|
||||
...(await metricRows(opts.files["geo-model-day"], "day", opts, (row, base) =>
|
||||
geoModelAggregate(row, base, continentLookup),
|
||||
)),
|
||||
...(await metricRows(opts.files["geo-model-week"], "week", opts, (row, base) =>
|
||||
geoModelAggregate(row, base, continentLookup),
|
||||
)),
|
||||
])
|
||||
|
||||
console.log(
|
||||
@ -174,36 +183,40 @@ async function importFiles(args: string[]) {
|
||||
await upsertGeoRows(db, geoRows)
|
||||
}
|
||||
|
||||
function buildQueries(limit: number, tiers: string[], includeWeekly: boolean): QuerySpec[] {
|
||||
function buildQueries(limit: number, tiers: string[]): QuerySpec[] {
|
||||
const daily = tiers.flatMap((tier) => [
|
||||
querySpec(
|
||||
"model-day",
|
||||
tier,
|
||||
metricQuery(["date", "tier", "provider.normalized", "model", "provider.model"], limit, tierFilters(tier)),
|
||||
metricQuery(["date", "tier", "stat_provider", "stat_model"], limit, tierFilters(tier)),
|
||||
),
|
||||
querySpec(
|
||||
"provider-day",
|
||||
tier,
|
||||
metricQuery(["date", "tier", "provider.normalized"], limit, tierFilters(tier)),
|
||||
metricQuery(["date", "tier", "stat_provider"], limit, tierFilters(tier)),
|
||||
),
|
||||
querySpec("geo-day", tier, metricQuery(["date", "tier", "country", "continent"], limit, tierFilters(tier))),
|
||||
querySpec(
|
||||
"geo-model-day",
|
||||
tier,
|
||||
metricQuery(["date", "tier", "stat_provider", "stat_model", "country", "continent"], limit, tierFilters(tier)),
|
||||
),
|
||||
])
|
||||
const weekly = tiers.flatMap((tier) => [
|
||||
querySpec("model-week", tier, metricQuery(["week", "tier", "stat_provider", "stat_model"], limit, tierFilters(tier))),
|
||||
querySpec("provider-week", tier, metricQuery(["week", "tier", "stat_provider"], limit, tierFilters(tier))),
|
||||
querySpec("geo-week", tier, metricQuery(["week", "tier", "country", "continent"], limit, tierFilters(tier))),
|
||||
querySpec(
|
||||
"geo-model-week",
|
||||
tier,
|
||||
metricQuery(["week", "tier", "stat_provider", "stat_model", "country", "continent"], limit, tierFilters(tier)),
|
||||
),
|
||||
])
|
||||
const weekly = includeWeekly
|
||||
? tiers.flatMap((tier) => [
|
||||
querySpec(
|
||||
"model-week",
|
||||
tier,
|
||||
metricQuery(["tier", "provider.normalized", "model", "provider.model"], limit, tierFilters(tier)),
|
||||
),
|
||||
querySpec("provider-week", tier, metricQuery(["tier", "provider.normalized"], limit, tierFilters(tier))),
|
||||
querySpec("geo-week", tier, metricQuery(["tier", "country", "continent"], limit, tierFilters(tier))),
|
||||
])
|
||||
: []
|
||||
|
||||
return [...daily, ...weekly]
|
||||
}
|
||||
|
||||
function querySpec(importKey: GeneratedImportKey, tier: string, query: ReturnType<typeof metricQuery>) {
|
||||
function querySpec(importKey: ImportKey, tier: string, query: ReturnType<typeof metricQuery>) {
|
||||
return {
|
||||
name: `${importKey}-${queryNameSegment(tier)}`,
|
||||
importKey,
|
||||
@ -238,8 +251,6 @@ function metricQuery(
|
||||
{ op: "P50", column: "time_to_first_byte" },
|
||||
{ op: "P95", column: "time_to_first_byte" },
|
||||
{ op: "AVG", column: "tps.output" },
|
||||
{ op: "SUM", column: "success" },
|
||||
{ op: "SUM", column: "error" },
|
||||
],
|
||||
filters: [...commonFilters(), ...filters],
|
||||
filter_combination: "AND",
|
||||
@ -269,6 +280,7 @@ function commonFilters() {
|
||||
{ column: "event_type", op: "=", value: "completions" },
|
||||
{ column: "model", op: "exists" },
|
||||
{ column: "model", op: "!=", value: "" },
|
||||
{ column: "model", op: "!=", value: "alpha-gpt-next" },
|
||||
]
|
||||
}
|
||||
|
||||
@ -276,10 +288,10 @@ function metricRows<T extends StatBaseAggregate>(
|
||||
files: string[] | undefined,
|
||||
grain: Grain,
|
||||
opts: ImportOptions,
|
||||
map: (row: RawRow, base: StatBaseAggregate) => T,
|
||||
map: (row: RawRow, base: StatBaseAggregate) => T | T[],
|
||||
) {
|
||||
if (!files) return Promise.resolve([])
|
||||
return readFiles(files).then((rows) => rows.map((row) => map(row, baseAggregate(row, grain, opts))))
|
||||
return readFiles(files).then((rows) => rows.flatMap((row) => map(row, baseAggregate(row, grain, opts))))
|
||||
}
|
||||
|
||||
function lookupRows(
|
||||
@ -334,10 +346,13 @@ function classifyRows(file: string, rows: RawRow[]): ImportKey {
|
||||
if (rows.length === 0) fail(`Cannot classify empty export: ${file}`)
|
||||
const headers = new Set(rows.flatMap((row) => Object.keys(row).map(normalizeHeader)))
|
||||
const grain: Grain = headers.has("date") ? "day" : "week"
|
||||
if (headers.has("model")) return hasMetricHeaders(headers) ? `model-${grain}` : `model-provider-model-${grain}`
|
||||
if (hasHeader(headers, ["country", "cf.country"]))
|
||||
if (hasHeader(headers, ["country", "cf.country"])) {
|
||||
if (hasHeader(headers, ["model", "stat_model"]) && hasMetricHeaders(headers)) return `geo-model-${grain}`
|
||||
return hasMetricHeaders(headers) ? `geo-${grain}` : `geo-continent-${grain}`
|
||||
if (hasHeader(headers, ["provider", "provider.normalized"])) return `provider-${grain}`
|
||||
}
|
||||
if (hasHeader(headers, ["model", "stat_model"]))
|
||||
return hasMetricHeaders(headers) ? `model-${grain}` : `model-provider-model-${grain}`
|
||||
if (hasHeader(headers, ["provider", "provider.normalized", "stat_provider"])) return `provider-${grain}`
|
||||
fail(`Cannot classify export from columns in ${file}`)
|
||||
}
|
||||
|
||||
@ -362,8 +377,27 @@ function mergeFiles(left: Partial<Record<ImportKey, string[]>>, right: Partial<R
|
||||
function modelProviderModelLookup(row: RawRow, grain: Grain, opts: ImportOptions): [string, string][] {
|
||||
const base = basePeriod(row, grain, opts)
|
||||
const value = providerModel(row)
|
||||
if (!value) return []
|
||||
return [[lookupKey({ ...base, dataset: opts.dataset, tier: tier(row), grain }, provider(row), model(row)), value]]
|
||||
const author = provider(row)
|
||||
if (!value || !author) return []
|
||||
return [[lookupKey({ ...base, dataset: opts.dataset, tier: tier(row), grain }, author, model(row)), value]]
|
||||
}
|
||||
|
||||
function modelAggregate(
|
||||
row: RawRow,
|
||||
base: StatBaseAggregate,
|
||||
providerModelLookup: Map<string, string>,
|
||||
): ModelAggregate[] {
|
||||
const author = provider(row)
|
||||
if (!author) return []
|
||||
|
||||
return [
|
||||
{
|
||||
...base,
|
||||
provider: author,
|
||||
model: model(row),
|
||||
provider_model: providerModelLookup.get(lookupKey(base, author, model(row))) ?? providerModel(row),
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
function geoContinentLookup(row: RawRow, grain: Grain, opts: ImportOptions): [string, string][] {
|
||||
@ -373,6 +407,25 @@ function geoContinentLookup(row: RawRow, grain: Grain, opts: ImportOptions): [st
|
||||
return [[lookupKey({ ...base, dataset: opts.dataset, tier: tier(row), grain }, country(row)), value]]
|
||||
}
|
||||
|
||||
function geoModelAggregate(
|
||||
row: RawRow,
|
||||
base: StatBaseAggregate,
|
||||
continentLookup: Map<string, string>,
|
||||
): GeoAggregate[] {
|
||||
const author = provider(row)
|
||||
if (!author) return []
|
||||
|
||||
return [
|
||||
{
|
||||
...base,
|
||||
provider: author,
|
||||
model: model(row),
|
||||
country: country(row),
|
||||
continent: continentLookup.get(lookupKey(base, country(row))) ?? continent(row),
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
function baseAggregate(row: RawRow, grain: Grain, opts: ImportOptions): StatBaseAggregate {
|
||||
return {
|
||||
...basePeriod(row, grain, opts),
|
||||
@ -412,24 +465,20 @@ function baseAggregate(row: RawRow, grain: Grain, opts: ImportOptions): StatBase
|
||||
}
|
||||
|
||||
function basePeriod(row: RawRow, grain: Grain, opts: ImportOptions) {
|
||||
const period = periodFor(row, grain, opts)
|
||||
return { period_start: period.start, period_end: period.end }
|
||||
return { period_key: periodKey(row, grain, opts) }
|
||||
}
|
||||
|
||||
function periodFor(row: RawRow, grain: Grain, opts: ImportOptions): Period {
|
||||
function periodKey(row: RawRow, grain: Grain, opts: ImportOptions) {
|
||||
if (grain === "week") {
|
||||
const end = opts.periodEnd ?? parseTime(row)
|
||||
if (!end) fail("--period-end is required for week imports")
|
||||
return { start: opts.periodStart ?? syncWeekStart(end), end }
|
||||
const week = parseWeek(row)
|
||||
if (week) return week
|
||||
fail("weekly imports require a week or period_key column")
|
||||
}
|
||||
|
||||
const time = parseTime(row)
|
||||
const start = time ? startOfUtcDay(time) : opts.periodStart
|
||||
if (!start) fail("daily imports require a time column or --period-start")
|
||||
return {
|
||||
start,
|
||||
end: opts.periodEnd && sameUtcDay(start, opts.periodEnd) ? opts.periodEnd : new Date(start.getTime() + DAY_MS),
|
||||
}
|
||||
return periodKeyFor("day", start)
|
||||
}
|
||||
|
||||
function modelRowsFromAggregates(aggregates: ModelAggregate[]) {
|
||||
@ -459,16 +508,19 @@ function providerRowsFromAggregates(aggregates: ProviderAggregate[]) {
|
||||
}
|
||||
|
||||
function geoRowsFromAggregates(aggregates: GeoAggregate[]) {
|
||||
return rankRowsWithMarketShare([
|
||||
...synthesizeAllTierRows(
|
||||
collapseRows(aggregates.filter((item) => item.grain === "week").map(toGeoRow), geoDimensionKey),
|
||||
geoDimensionKey,
|
||||
),
|
||||
...synthesizeAllTierRows(
|
||||
collapseRows(aggregates.filter((item) => item.grain === "day").map(toGeoRow), geoDimensionKey),
|
||||
geoDimensionKey,
|
||||
),
|
||||
])
|
||||
return rankRowsWithMarketShare(
|
||||
[
|
||||
...synthesizeAllTierRows(
|
||||
collapseRows(aggregates.filter((item) => item.grain === "week").map(toGeoRow), geoDimensionKey),
|
||||
geoDimensionKey,
|
||||
),
|
||||
...synthesizeAllTierRows(
|
||||
collapseRows(aggregates.filter((item) => item.grain === "day").map(toGeoRow), geoDimensionKey),
|
||||
geoDimensionKey,
|
||||
),
|
||||
],
|
||||
geoMarketShareKey,
|
||||
)
|
||||
}
|
||||
|
||||
function toModelRow(data: ModelAggregate): ModelStatRow {
|
||||
@ -480,7 +532,13 @@ function toProviderRow(data: ProviderAggregate): ProviderStatRow {
|
||||
}
|
||||
|
||||
function toGeoRow(data: GeoAggregate): GeoStatRow {
|
||||
return { ...toStatBaseRow(data), country: data.country, continent: data.continent }
|
||||
return {
|
||||
...toStatBaseRow(data),
|
||||
provider: data.provider,
|
||||
model: data.model,
|
||||
country: data.country,
|
||||
continent: data.continent,
|
||||
}
|
||||
}
|
||||
|
||||
function rankModelRows(rows: ModelStatRow[]) {
|
||||
@ -512,11 +570,15 @@ function providerDimensionKey(row: ProviderStatRow) {
|
||||
}
|
||||
|
||||
function geoDimensionKey(row: GeoStatRow) {
|
||||
return row.country
|
||||
return [row.provider, row.model, row.country].join("\u0000")
|
||||
}
|
||||
|
||||
function lookupKey(base: { grain: string; period_start: Date; dataset: string; tier: string }, ...dimension: string[]) {
|
||||
return [base.grain, base.period_start.toISOString(), base.dataset, base.tier, ...dimension].join("\u0000")
|
||||
function geoMarketShareKey(row: GeoStatRow) {
|
||||
return [statPeriodKey(row), row.provider, row.model].join("\u0000")
|
||||
}
|
||||
|
||||
function lookupKey(base: { grain: string; period_key: string; dataset: string; tier: string }, ...dimension: string[]) {
|
||||
return [base.grain, base.period_key, base.dataset, base.tier, ...dimension].join("\u0000")
|
||||
}
|
||||
|
||||
function tier(row: RawRow) {
|
||||
@ -527,23 +589,19 @@ function deriveTier(row: RawRow) {
|
||||
const source = cell(row, ["source"])
|
||||
const value = model(row)
|
||||
if (source === "lite") return "Go"
|
||||
if (FREE_MODELS.has(value) || value.endsWith("-free")) return "Free"
|
||||
if (FREE_MODELS.has(value) || /-free(:global)?$/.test(rawModel(row))) return "Free"
|
||||
return "Zen"
|
||||
}
|
||||
|
||||
function provider(row: RawRow) {
|
||||
return normalizeProvider(cell(row, ["provider.normalized", "stat_provider", "provider"]) || "unknown")
|
||||
}
|
||||
|
||||
function normalizeProvider(value: string) {
|
||||
if (value.startsWith("minimax-plan")) return "minimax-plan"
|
||||
if (value.startsWith("zai-plan")) return "zai-plan"
|
||||
if (value.startsWith("azure-databricks")) return "azure-databricks"
|
||||
if (/^azure[0-9]+/.test(value)) return "azure-openai"
|
||||
return value || "unknown"
|
||||
return cell(row, ["stat_provider"]) || modelAuthor(model(row))
|
||||
}
|
||||
|
||||
function model(row: RawRow) {
|
||||
return normalizeInferenceModel(cell(row, ["stat_model"]) || rawModel(row))
|
||||
}
|
||||
|
||||
function rawModel(row: RawRow) {
|
||||
return cell(row, ["model"]) || "unknown"
|
||||
}
|
||||
|
||||
@ -609,20 +667,21 @@ function parseTime(row: RawRow) {
|
||||
return date
|
||||
}
|
||||
|
||||
function startOfUtcDay(value: Date) {
|
||||
return new Date(Date.UTC(value.getUTCFullYear(), value.getUTCMonth(), value.getUTCDate()))
|
||||
}
|
||||
function parseWeek(row: RawRow) {
|
||||
const value = cell(row, ["period_key", "week", "stat_week"])
|
||||
if (!value) return undefined
|
||||
|
||||
function syncWeekStart(periodEnd: Date) {
|
||||
return new Date(Date.UTC(periodEnd.getUTCFullYear(), periodEnd.getUTCMonth(), periodEnd.getUTCDate() - 6))
|
||||
}
|
||||
const match = /^(\d{4})-W(\d{1,2})$/.exec(value)
|
||||
if (!match) fail(`Invalid week value: ${value}`)
|
||||
|
||||
function sameUtcDay(left: Date, right: Date) {
|
||||
return (
|
||||
left.getUTCFullYear() === right.getUTCFullYear() &&
|
||||
left.getUTCMonth() === right.getUTCMonth() &&
|
||||
left.getUTCDate() === right.getUTCDate()
|
||||
)
|
||||
const year = Number(match[1])
|
||||
const week = Number(match[2])
|
||||
if (week < 1 || week > 53) fail(`Invalid week value: ${value}`)
|
||||
|
||||
const start = new Date(startOfIsoWeek(new Date(Date.UTC(year, 0, 4))).getTime() + (week - 1) * 7 * DAY_MS)
|
||||
const id = `${year}-W${String(week).padStart(2, "0")}`
|
||||
if (isoWeekId(start) !== id) fail(`Invalid week value: ${value}`)
|
||||
return id
|
||||
}
|
||||
|
||||
async function readRows(file: string) {
|
||||
@ -732,7 +791,6 @@ async function upsertModelRows(db: ReturnType<typeof drizzle>, rows: ModelStatRo
|
||||
.values(chunk)
|
||||
.onDuplicateKeyUpdate({
|
||||
set: {
|
||||
period_end: inserted("period_end"),
|
||||
provider_model: inserted("provider_model"),
|
||||
sessions: inserted("sessions"),
|
||||
requests: inserted("requests"),
|
||||
@ -771,7 +829,6 @@ async function upsertProviderRows(db: ReturnType<typeof drizzle>, rows: Provider
|
||||
.values(chunk)
|
||||
.onDuplicateKeyUpdate({
|
||||
set: {
|
||||
period_end: inserted("period_end"),
|
||||
sessions: inserted("sessions"),
|
||||
requests: inserted("requests"),
|
||||
input_tokens: inserted("input_tokens"),
|
||||
@ -813,7 +870,6 @@ async function upsertGeoRows(db: ReturnType<typeof drizzle>, rows: GeoStatRow[])
|
||||
.values(chunk)
|
||||
.onDuplicateKeyUpdate({
|
||||
set: {
|
||||
period_end: inserted("period_end"),
|
||||
continent: inserted("continent"),
|
||||
sessions: inserted("sessions"),
|
||||
requests: inserted("requests"),
|
||||
@ -860,7 +916,6 @@ function parseImportOptions(args: string[]): ImportOptions {
|
||||
databaseUrl: flags.get("database-url")?.[0] ?? process.env.DATABASE_URL,
|
||||
directories: flags.get("dir") ?? flags.get("directory") ?? [],
|
||||
dryRun: flags.has("dry-run"),
|
||||
periodEnd: parseDateFlag(flags, "period-end"),
|
||||
periodStart: parseDateFlag(flags, "period-start"),
|
||||
files,
|
||||
}
|
||||
@ -913,9 +968,9 @@ function parseListFlag(flags: Map<string, string[]>, name: string) {
|
||||
|
||||
function usage(): never {
|
||||
fail(`Usage:
|
||||
bun src/honeycomb-backfill.ts queries [--tiers Go,Free,Paid] [--limit 1000] [--include-weekly]
|
||||
bun src/honeycomb-backfill.ts import [--period-end ISO for weekly files] [--dry-run] [--database-url URL] --dir downloads
|
||||
bun src/honeycomb-backfill.ts import [--period-end ISO for weekly files] [--dry-run] [--database-url URL] --model-day file.csv [--model-day more.csv] ...`)
|
||||
bun src/honeycomb-backfill.ts queries [--tiers Go,Free,Paid] [--limit 1000]
|
||||
bun src/honeycomb-backfill.ts import [--dry-run] [--database-url URL] --dir downloads
|
||||
bun src/honeycomb-backfill.ts import [--dry-run] [--database-url URL] --model-day file.csv [--model-day more.csv] ...`)
|
||||
}
|
||||
|
||||
function fail(message: string): never {
|
||||
|
||||
@ -6,8 +6,10 @@ import { GeoStatRepo, rowsFromAggregates as geoRowsFromAggregates } from "./doma
|
||||
import { buildStatsQuery, toGeoAggregate, toModelAggregate, toProviderAggregate } from "./domain/inference"
|
||||
import { ModelStatRepo, rowsFromAggregates as modelRowsFromAggregates } from "./domain/model"
|
||||
import { ProviderStatRepo, rowsFromAggregates as providerRowsFromAggregates } from "./domain/provider"
|
||||
import { startOfIsoWeek } from "./domain/stat"
|
||||
|
||||
const DATALAKE_INGESTION_LAG_MS = 5 * 60_000
|
||||
const WEEK_MS = 7 * 86_400_000
|
||||
|
||||
export type SyncStatsResult = { ok: true; rows: number; startedAt: string; periodStart: string; periodEnd: string }
|
||||
export type SyncStatsError = AthenaQueryError | AthenaQueryTimeoutError | DatabaseError
|
||||
@ -19,9 +21,7 @@ export const syncStats: () => Effect.Effect<
|
||||
> = Effect.fn("StatSync.sync")(function* () {
|
||||
const startedAt = yield* DateTime.nowAsDate
|
||||
const periodEnd = new Date(Math.floor((startedAt.getTime() - DATALAKE_INGESTION_LAG_MS) / 60_000) * 60_000)
|
||||
const periodStart = new Date(
|
||||
Date.UTC(periodEnd.getUTCFullYear(), periodEnd.getUTCMonth(), periodEnd.getUTCDate() - 6),
|
||||
)
|
||||
const periodStart = new Date(startOfIsoWeek(periodEnd).getTime() - WEEK_MS)
|
||||
const athena = yield* Athena
|
||||
const modelStats = yield* ModelStatRepo
|
||||
const providerStats = yield* ProviderStatRepo
|
||||
@ -29,7 +29,7 @@ export const syncStats: () => Effect.Effect<
|
||||
|
||||
yield* logRuntimeCheck()
|
||||
|
||||
const [modelAggregates, providerAggregates, geoAggregates] = yield* Effect.all(
|
||||
const [modelAggregates, providerAggregates, geoAggregates, geoModelAggregates] = yield* Effect.all(
|
||||
[
|
||||
athena
|
||||
.query(buildStatsQuery(periodStart, periodEnd, "model"))
|
||||
@ -40,12 +40,15 @@ export const syncStats: () => Effect.Effect<
|
||||
athena
|
||||
.query(buildStatsQuery(periodStart, periodEnd, "geo"))
|
||||
.pipe(Effect.map((rows) => rows.flatMap(toGeoAggregate))),
|
||||
athena
|
||||
.query(buildStatsQuery(periodStart, periodEnd, "geo_model"))
|
||||
.pipe(Effect.map((rows) => rows.flatMap(toGeoAggregate))),
|
||||
],
|
||||
{ concurrency: "unbounded" },
|
||||
)
|
||||
const modelRows = modelRowsFromAggregates(modelAggregates)
|
||||
const providerRows = providerRowsFromAggregates(providerAggregates)
|
||||
const geoRows = geoRowsFromAggregates(geoAggregates)
|
||||
const geoRows = geoRowsFromAggregates([...geoAggregates, ...geoModelAggregates])
|
||||
|
||||
yield* Effect.all([modelStats.upsert(modelRows), providerStats.upsert(providerRows), geoStats.upsert(geoRows)], {
|
||||
concurrency: "unbounded",
|
||||
|
||||
Loading…
Reference in New Issue
Block a user