fix(stats): unblock stats sync

This commit is contained in:
Adam 2026-06-21 14:53:46 -05:00
parent 69f1ec22e3
commit 4c6750d464
No known key found for this signature in database
GPG Key ID: 9CB48779AF150E75
4 changed files with 83 additions and 44 deletions

View File

@ -5,28 +5,36 @@ import {
StartQueryExecutionCommand,
type Row,
} from "@aws-sdk/client-athena"
import { Effect, Layer, Schema } from "effect"
import { Effect, Layer } from "effect"
import * as Context from "effect/Context"
import { Resource } from "sst/resource"
const ATHENA_MAX_POLL_ATTEMPTS = 60
const ATHENA_MAX_POLL_ATTEMPTS = 300
const ATHENA_PAGE_SIZE = 1000
export type AthenaData = Record<string, string>
export class AthenaQueryError extends Schema.TaggedErrorClass<AthenaQueryError>()("AthenaQueryError", {
message: Schema.String,
queryExecutionId: Schema.optional(Schema.String),
cause: Schema.optional(Schema.Defect()),
}) {}
export class AthenaQueryError extends Error {
readonly _tag = "AthenaQueryError"
readonly queryExecutionId?: string
export class AthenaQueryTimeoutError extends Schema.TaggedErrorClass<AthenaQueryTimeoutError>()(
"AthenaQueryTimeoutError",
{
message: Schema.String,
queryExecutionId: Schema.String,
},
) {}
constructor(input: { message: string; queryExecutionId?: string; cause?: unknown }) {
super(input.message, { cause: input.cause })
this.name = "AthenaQueryError"
this.queryExecutionId = input.queryExecutionId
}
}
export class AthenaQueryTimeoutError extends Error {
readonly _tag = "AthenaQueryTimeoutError"
readonly queryExecutionId: string
constructor(input: { message: string; queryExecutionId: string }) {
super(input.message)
this.name = "AthenaQueryTimeoutError"
this.queryExecutionId = input.queryExecutionId
}
}
export declare namespace Athena {
export interface Service {
@ -57,7 +65,7 @@ export class Athena extends Context.Service<Athena, Athena.Service>()("@opencode
})
const queryExecutionId = started.QueryExecutionId
if (!queryExecutionId)
return yield* new AthenaQueryError({ message: "Athena did not return a query execution id" })
return yield* Effect.fail(new AthenaQueryError({ message: "Athena did not return a query execution id" }))
yield* poll(client, queryExecutionId)
return yield* results(client, queryExecutionId)
@ -87,16 +95,20 @@ const poll: (
if (status?.State === "SUCCEEDED") return
if (status?.State === "FAILED" || status?.State === "CANCELLED")
return yield* new AthenaQueryError({
message: `Athena stats query ${status.State.toLowerCase()}: ${status.StateChangeReason ?? "unknown reason"}`,
queryExecutionId,
})
return yield* Effect.fail(
new AthenaQueryError({
message: `Athena stats query ${status.State.toLowerCase()}: ${status.StateChangeReason ?? "unknown reason"}`,
queryExecutionId,
}),
)
if (attempt >= ATHENA_MAX_POLL_ATTEMPTS - 1)
return yield* new AthenaQueryTimeoutError({
message: `Athena stats query ${queryExecutionId} did not complete`,
queryExecutionId,
})
return yield* Effect.fail(
new AthenaQueryTimeoutError({
message: `Athena stats query ${queryExecutionId} did not complete`,
queryExecutionId,
}),
)
return yield* poll(client, queryExecutionId, attempt + 1)
})

View File

@ -44,16 +44,29 @@ export class DrizzleClient extends Context.Service<DrizzleClient, Drizzle>()("@o
)
}
export class DatabaseError extends Schema.TaggedErrorClass<DatabaseError>()("DatabaseError", {
cause: Schema.Defect(),
}) {}
export class DatabaseError extends Error {
readonly _tag = "DatabaseError"
constructor(input: { cause: unknown }) {
super("Database operation failed", { cause: input.cause })
this.name = "DatabaseError"
}
static make(input: { cause: unknown }) {
return new DatabaseError(input)
}
}
export const catchDbError = Effect.mapError((cause) => DatabaseError.make({ cause }))
export class MigrationError extends Schema.TaggedErrorClass<MigrationError>()("MigrationError", {
message: Schema.String,
cause: Schema.optional(Schema.Defect()),
}) {}
export class MigrationError extends Error {
readonly _tag = "MigrationError"
constructor(input: { message: string; cause?: unknown }) {
super(input.message, { cause: input.cause })
this.name = "MigrationError"
}
}
export const migrate = Effect.fn("Database.migrate")(function* () {
const settings = yield* DatabaseConfig
@ -68,9 +81,11 @@ export const migrate = Effect.fn("Database.migrate")(function* () {
catch: (cause) => new MigrationError({ message: "Failed to apply database migrations", cause }),
})
if (result)
return yield* new MigrationError({
message: `Failed to initialize database migrations: ${result.exitCode}`,
})
return yield* Effect.fail(
new MigrationError({
message: `Failed to initialize database migrations: ${result.exitCode}`,
}),
)
yield* Effect.logInfo("database migrations complete").pipe(
Effect.annotateLogs({ migrationsDir: settings.migrationsDir }),
)

View File

@ -17,6 +17,8 @@ export type StatDimension = "model" | "provider" | "geo" | "geo_model"
export function buildStatsQuery(periodStart: Date, periodEnd: Date, dimension: StatDimension) {
const periodStartValue = sqlString(periodStart.toISOString())
const periodEndValue = sqlString(periodEnd.toISOString())
const periodStartDateValue = sqlString(periodStart.toISOString().slice(0, 10))
const periodEndDateValue = sqlString(periodEnd.toISOString().slice(0, 10))
const sourceTable = [Resource.InferenceEvent.catalog, Resource.InferenceEvent.database, Resource.InferenceEvent.table]
.map(sqlIdentifier)
.join(".")
@ -95,6 +97,9 @@ WITH normalized AS (
WHERE event_type = 'completions'
AND model IS NOT NULL
AND model <> ''
AND source = 'lite'
AND event_date >= ${periodStartDateValue}
AND event_date <= ${periodEndDateValue}
AND event_timestamp >= ${periodStartValue}
AND event_timestamp < ${periodEndValue}
), filtered AS (

View File

@ -1,6 +1,6 @@
import { Buffer } from "node:buffer"
import { FirehoseClient, PutRecordBatchCommand } from "@aws-sdk/client-firehose"
import { Effect, Layer, Schema } from "effect"
import { Effect, Layer } from "effect"
import * as Context from "effect/Context"
import { Resource } from "sst/resource"
@ -12,11 +12,16 @@ type IngestEvent = Record<string, unknown>
type LakeRoute = { database: string; table: string }
type FirehoseRecord = { Data: Uint8Array }
export class IngestError extends Schema.TaggedErrorClass<IngestError>()("IngestError", {
message: Schema.String,
failed: Schema.Number,
cause: Schema.optional(Schema.Defect()),
}) {}
export class IngestError extends Error {
readonly _tag = "IngestError"
readonly failed: number
constructor(input: { message: string; failed: number; cause?: unknown }) {
super(input.message, { cause: input.cause })
this.name = "IngestError"
this.failed = input.failed
}
}
export declare namespace Ingest {
export interface Service {
@ -37,10 +42,12 @@ export class Ingest extends Context.Service<Ingest, Ingest.Service>()("@opencode
yield* Effect.logWarning(
`lake ingest rejected ${JSON.stringify({ records: counts.records, unsupported: counts.unsupported })}`,
)
return yield* new IngestError({
message: "Unsupported lake event type",
failed: counts.unsupported,
})
return yield* Effect.fail(
new IngestError({
message: "Unsupported lake event type",
failed: counts.unsupported,
}),
)
}
if (counts.records === 0) return { records: 0 }
@ -66,7 +73,7 @@ export class Ingest extends Context.Service<Ingest, Ingest.Service>()("@opencode
if (failed > 0) {
yield* Effect.logWarning(`lake ingest incomplete ${JSON.stringify({ records: counts.records, failed })}`)
return yield* new IngestError({ message: "Failed to ingest all lake records", failed })
return yield* Effect.fail(new IngestError({ message: "Failed to ingest all lake records", failed }))
}
yield* Effect.logInfo(`lake ingest complete ${JSON.stringify({ records: counts.records, batches })}`)