From 4c6750d46461c82f5d3dc19f16e3cbb164e10c8f Mon Sep 17 00:00:00 2001 From: Adam <2363879+adamdotdevin@users.noreply.github.com> Date: Sun, 21 Jun 2026 14:53:46 -0500 Subject: [PATCH] fix(stats): unblock stats sync --- packages/stats/core/src/athena.ts | 58 +++++++++++++-------- packages/stats/core/src/database.ts | 35 +++++++++---- packages/stats/core/src/domain/inference.ts | 5 ++ packages/stats/server/src/ingest.ts | 29 +++++++---- 4 files changed, 83 insertions(+), 44 deletions(-) diff --git a/packages/stats/core/src/athena.ts b/packages/stats/core/src/athena.ts index 54037002f..50c356afa 100644 --- a/packages/stats/core/src/athena.ts +++ b/packages/stats/core/src/athena.ts @@ -5,28 +5,36 @@ import { StartQueryExecutionCommand, type Row, } from "@aws-sdk/client-athena" -import { Effect, Layer, Schema } from "effect" +import { Effect, Layer } from "effect" import * as Context from "effect/Context" import { Resource } from "sst/resource" -const ATHENA_MAX_POLL_ATTEMPTS = 60 +const ATHENA_MAX_POLL_ATTEMPTS = 300 const ATHENA_PAGE_SIZE = 1000 export type AthenaData = Record -export class AthenaQueryError extends Schema.TaggedErrorClass()("AthenaQueryError", { - message: Schema.String, - queryExecutionId: Schema.optional(Schema.String), - cause: Schema.optional(Schema.Defect()), -}) {} +export class AthenaQueryError extends Error { + readonly _tag = "AthenaQueryError" + readonly queryExecutionId?: string -export class AthenaQueryTimeoutError extends Schema.TaggedErrorClass()( - "AthenaQueryTimeoutError", - { - message: Schema.String, - queryExecutionId: Schema.String, - }, -) {} + constructor(input: { message: string; queryExecutionId?: string; cause?: unknown }) { + super(input.message, { cause: input.cause }) + this.name = "AthenaQueryError" + this.queryExecutionId = input.queryExecutionId + } +} + +export class AthenaQueryTimeoutError extends Error { + readonly _tag = "AthenaQueryTimeoutError" + readonly queryExecutionId: string + + constructor(input: { message: string; queryExecutionId: string }) { + super(input.message) + this.name = "AthenaQueryTimeoutError" + this.queryExecutionId = input.queryExecutionId + } +} export declare namespace Athena { export interface Service { @@ -57,7 +65,7 @@ export class Athena extends Context.Service()("@opencode }) const queryExecutionId = started.QueryExecutionId if (!queryExecutionId) - return yield* new AthenaQueryError({ message: "Athena did not return a query execution id" }) + return yield* Effect.fail(new AthenaQueryError({ message: "Athena did not return a query execution id" })) yield* poll(client, queryExecutionId) return yield* results(client, queryExecutionId) @@ -87,16 +95,20 @@ const poll: ( if (status?.State === "SUCCEEDED") return if (status?.State === "FAILED" || status?.State === "CANCELLED") - return yield* new AthenaQueryError({ - message: `Athena stats query ${status.State.toLowerCase()}: ${status.StateChangeReason ?? "unknown reason"}`, - queryExecutionId, - }) + return yield* Effect.fail( + new AthenaQueryError({ + message: `Athena stats query ${status.State.toLowerCase()}: ${status.StateChangeReason ?? "unknown reason"}`, + queryExecutionId, + }), + ) if (attempt >= ATHENA_MAX_POLL_ATTEMPTS - 1) - return yield* new AthenaQueryTimeoutError({ - message: `Athena stats query ${queryExecutionId} did not complete`, - queryExecutionId, - }) + return yield* Effect.fail( + new AthenaQueryTimeoutError({ + message: `Athena stats query ${queryExecutionId} did not complete`, + queryExecutionId, + }), + ) return yield* poll(client, queryExecutionId, attempt + 1) }) diff --git a/packages/stats/core/src/database.ts b/packages/stats/core/src/database.ts index 9edb717bc..2d55ee7f8 100644 --- a/packages/stats/core/src/database.ts +++ b/packages/stats/core/src/database.ts @@ -44,16 +44,29 @@ export class DrizzleClient extends Context.Service()("@o ) } -export class DatabaseError extends Schema.TaggedErrorClass()("DatabaseError", { - cause: Schema.Defect(), -}) {} +export class DatabaseError extends Error { + readonly _tag = "DatabaseError" + + constructor(input: { cause: unknown }) { + super("Database operation failed", { cause: input.cause }) + this.name = "DatabaseError" + } + + static make(input: { cause: unknown }) { + return new DatabaseError(input) + } +} export const catchDbError = Effect.mapError((cause) => DatabaseError.make({ cause })) -export class MigrationError extends Schema.TaggedErrorClass()("MigrationError", { - message: Schema.String, - cause: Schema.optional(Schema.Defect()), -}) {} +export class MigrationError extends Error { + readonly _tag = "MigrationError" + + constructor(input: { message: string; cause?: unknown }) { + super(input.message, { cause: input.cause }) + this.name = "MigrationError" + } +} export const migrate = Effect.fn("Database.migrate")(function* () { const settings = yield* DatabaseConfig @@ -68,9 +81,11 @@ export const migrate = Effect.fn("Database.migrate")(function* () { catch: (cause) => new MigrationError({ message: "Failed to apply database migrations", cause }), }) if (result) - return yield* new MigrationError({ - message: `Failed to initialize database migrations: ${result.exitCode}`, - }) + return yield* Effect.fail( + new MigrationError({ + message: `Failed to initialize database migrations: ${result.exitCode}`, + }), + ) yield* Effect.logInfo("database migrations complete").pipe( Effect.annotateLogs({ migrationsDir: settings.migrationsDir }), ) diff --git a/packages/stats/core/src/domain/inference.ts b/packages/stats/core/src/domain/inference.ts index a7e4c0037..3fc97aba3 100644 --- a/packages/stats/core/src/domain/inference.ts +++ b/packages/stats/core/src/domain/inference.ts @@ -17,6 +17,8 @@ export type StatDimension = "model" | "provider" | "geo" | "geo_model" export function buildStatsQuery(periodStart: Date, periodEnd: Date, dimension: StatDimension) { const periodStartValue = sqlString(periodStart.toISOString()) const periodEndValue = sqlString(periodEnd.toISOString()) + const periodStartDateValue = sqlString(periodStart.toISOString().slice(0, 10)) + const periodEndDateValue = sqlString(periodEnd.toISOString().slice(0, 10)) const sourceTable = [Resource.InferenceEvent.catalog, Resource.InferenceEvent.database, Resource.InferenceEvent.table] .map(sqlIdentifier) .join(".") @@ -95,6 +97,9 @@ WITH normalized AS ( WHERE event_type = 'completions' AND model IS NOT NULL AND model <> '' + AND source = 'lite' + AND event_date >= ${periodStartDateValue} + AND event_date <= ${periodEndDateValue} AND event_timestamp >= ${periodStartValue} AND event_timestamp < ${periodEndValue} ), filtered AS ( diff --git a/packages/stats/server/src/ingest.ts b/packages/stats/server/src/ingest.ts index 763742d9c..eda662d98 100644 --- a/packages/stats/server/src/ingest.ts +++ b/packages/stats/server/src/ingest.ts @@ -1,6 +1,6 @@ import { Buffer } from "node:buffer" import { FirehoseClient, PutRecordBatchCommand } from "@aws-sdk/client-firehose" -import { Effect, Layer, Schema } from "effect" +import { Effect, Layer } from "effect" import * as Context from "effect/Context" import { Resource } from "sst/resource" @@ -12,11 +12,16 @@ type IngestEvent = Record type LakeRoute = { database: string; table: string } type FirehoseRecord = { Data: Uint8Array } -export class IngestError extends Schema.TaggedErrorClass()("IngestError", { - message: Schema.String, - failed: Schema.Number, - cause: Schema.optional(Schema.Defect()), -}) {} +export class IngestError extends Error { + readonly _tag = "IngestError" + readonly failed: number + + constructor(input: { message: string; failed: number; cause?: unknown }) { + super(input.message, { cause: input.cause }) + this.name = "IngestError" + this.failed = input.failed + } +} export declare namespace Ingest { export interface Service { @@ -37,10 +42,12 @@ export class Ingest extends Context.Service()("@opencode yield* Effect.logWarning( `lake ingest rejected ${JSON.stringify({ records: counts.records, unsupported: counts.unsupported })}`, ) - return yield* new IngestError({ - message: "Unsupported lake event type", - failed: counts.unsupported, - }) + return yield* Effect.fail( + new IngestError({ + message: "Unsupported lake event type", + failed: counts.unsupported, + }), + ) } if (counts.records === 0) return { records: 0 } @@ -66,7 +73,7 @@ export class Ingest extends Context.Service()("@opencode if (failed > 0) { yield* Effect.logWarning(`lake ingest incomplete ${JSON.stringify({ records: counts.records, failed })}`) - return yield* new IngestError({ message: "Failed to ingest all lake records", failed }) + return yield* Effect.fail(new IngestError({ message: "Failed to ingest all lake records", failed })) } yield* Effect.logInfo(`lake ingest complete ${JSON.stringify({ records: counts.records, batches })}`)