From be5a28e0b75376b2b22676c52d974bd7a3f65223 Mon Sep 17 00:00:00 2001 From: Haitao Pan Date: Thu, 23 Apr 2026 15:59:41 +0800 Subject: [PATCH] docs: add engineering reference documentation --- README.md | 11 +- docs/README.md | 19 ++- docs/api.md | 66 +++++--- docs/architecture.md | 19 ++- docs/design.md | 266 +++++++++++++++++++++++++++++++ docs/reference/cmd.md | 60 +++++++ docs/reference/config.md | 191 +++++++++++++++++++++++ docs/reference/exporter.md | 97 ++++++++++++ docs/reference/httpapi.md | 174 +++++++++++++++++++++ docs/reference/model.md | 181 ++++++++++++++++++++++ docs/reference/repository.md | 242 +++++++++++++++++++++++++++++ docs/reference/service.md | 292 +++++++++++++++++++++++++++++++++++ 12 files changed, 1589 insertions(+), 29 deletions(-) create mode 100644 docs/design.md create mode 100644 docs/reference/cmd.md create mode 100644 docs/reference/config.md create mode 100644 docs/reference/exporter.md create mode 100644 docs/reference/httpapi.md create mode 100644 docs/reference/model.md create mode 100644 docs/reference/repository.md create mode 100644 docs/reference/service.md diff --git a/README.md b/README.md index f71c2c6..3290ad6 100644 --- a/README.md +++ b/README.md @@ -3,12 +3,13 @@ `billing-service` is the v1 minute-delta and replay-safe writer for the Cloud Network Billing & Control Plane. -It pulls the latest normalized snapshot from `xray-exporter`, computes deltas -from cumulative counters, and writes idempotent usage and billing facts into the -existing `accounts.svc.plus` PostgreSQL schema. +It pulls windowed normalized snapshots from one or more `xray-exporter` +sources, computes deltas from cumulative counters, and writes idempotent usage +and billing facts into the existing `accounts.svc.plus` PostgreSQL schema. ## Endpoints +- `GET /api/ping` - `POST /v1/jobs/collect-and-rate` - `POST /v1/jobs/reconcile` - `GET /healthz` @@ -16,6 +17,10 @@ existing `accounts.svc.plus` PostgreSQL schema. ## Documentation +- `docs/design.md` - current implementation design, main collect-and-rate flow, + idempotency rules, and module boundaries +- `docs/reference/` - code-level reference for `cmd/` and `internal/` + packages, including types, interfaces, and functions - `docs/README.md` - documentation index and verification notes - `docs/architecture.md` - deployment and data-flow diagrams - `docs/api.md` - task API surface and upstream/downstream boundaries diff --git a/docs/README.md b/docs/README.md index f161543..b2deb58 100644 --- a/docs/README.md +++ b/docs/README.md @@ -2,7 +2,21 @@ This directory holds service-owned documentation for `billing-service`. -## Documents +## 总览 + +- [design.md](design.md) - 当前实现下的系统设计、主执行流程、幂等约束和模块边界 + +## 代码参考 + +- [reference/cmd.md](reference/cmd.md) - 进程入口、依赖装配与生命周期 +- [reference/config.md](reference/config.md) - 配置结构、环境变量和配置解析函数 +- [reference/model.md](reference/model.md) - 全部共享数据模型与字段语义 +- [reference/exporter.md](reference/exporter.md) - exporter 客户端与窗口拉取契约 +- [reference/repository.md](reference/repository.md) - 仓储接口、PostgreSQL 实现与表映射 +- [reference/service.md](reference/service.md) - 业务服务、主流程与内部辅助函数 +- [reference/httpapi.md](reference/httpapi.md) - HTTP 路由、handler 与状态码映射 + +## 系统边界与外部契约 - [architecture.md](architecture.md) - deployment topology, billing data flow, and current-vs-target architecture notes @@ -10,6 +24,9 @@ This directory holds service-owned documentation for `billing-service`. read-model boundaries - [multi-node-https-plan.md](multi-node-https-plan.md) - target-state plan for evolving from a single exporter URL to secure multi-node HTTPS ingestion + +## SQL 契约 + - [../sql/billing-service-schema.sql](../sql/billing-service-schema.sql) - reference DDL for the accounting tables `billing-service` depends on diff --git a/docs/api.md b/docs/api.md index e0f3e76..be66a39 100644 --- a/docs/api.md +++ b/docs/api.md @@ -3,6 +3,14 @@ This document describes the current `billing-service` task API plus the upstream and downstream interfaces it depends on. +If you need the code-level mapping behind these interfaces, read: + +- [design.md](design.md) +- [reference/config.md](reference/config.md) +- [reference/exporter.md](reference/exporter.md) +- [reference/httpapi.md](reference/httpapi.md) +- [reference/service.md](reference/service.md) + ## Service endpoints ### `GET /api/ping` @@ -77,48 +85,57 @@ as `reconcile` for operational visibility. ### `xray-exporter` -`billing-service` currently depends on a single exporter base URL and fetches: +`billing-service` currently resolves one or more exporter sources and fetches: -- `GET /v1/snapshots/latest` +- `GET /v1/snapshots/window?since=&until=&limit=&cursor=` -Minimum payload shape: +Current response shape: ```json { - "collected_at": "2026-04-08T12:00:00Z", "node_id": "jp-xhttp-contabo.svc.plus", "env": "prod", - "samples": [ + "snapshots": [ { - "uuid": "uuid-1", - "email": "user@example.com", - "inbound_tag": "xhttp-premium", - "uplink_bytes_total": 1024, - "downlink_bytes_total": 2048 + "collected_at": "2026-04-08T12:00:00Z", + "node_id": "jp-xhttp-contabo.svc.plus", + "env": "prod", + "samples": [ + { + "uuid": "11111111-1111-1111-1111-111111111111", + "email": "user@example.com", + "inbound_tag": "xhttp-premium", + "uplink_bytes_total": 1024, + "downlink_bytes_total": 2048 + } + ] } - ] + ], + "has_more": false } ``` Required fields: -- `collected_at` - `node_id` - `env` -- `samples[].uuid` -- `samples[].email` -- `samples[].inbound_tag` -- `samples[].uplink_bytes_total` -- `samples[].downlink_bytes_total` +- `snapshots[].collected_at` +- `snapshots[].node_id` +- `snapshots[].env` +- `snapshots[].samples[].uuid` +- `snapshots[].samples[].email` +- `snapshots[].samples[].inbound_tag` +- `snapshots[].samples[].uplink_bytes_total` +- `snapshots[].samples[].downlink_bytes_total` ### Target upstream contract -Current production behavior remains `GET /v1/snapshots/latest`, but the target -multi-node design should evolve to: +Current production behavior already uses a windowed pull API. The target +multi-node design should evolve around the same shape: - HTTPS transport for remote exporter pulls - source-specific authentication -- a windowed pull API that supports catch-up and pagination +- stable catch-up and pagination semantics across multiple exporter nodes Recommended target path: @@ -154,7 +171,9 @@ Read-path rules: Runtime environment variables used by the current implementation: - `IMAGE` +- `EXPORTER_SOURCES_JSON` - `EXPORTER_BASE_URL` +- `INTERNAL_SERVICE_TOKEN` - `DATABASE_URL` - `LISTEN_ADDR` - `COLLECT_INTERVAL` @@ -164,6 +183,13 @@ Runtime environment variables used by the current implementation: - `INITIAL_INCLUDED_QUOTA_BYTES` - `INITIAL_BALANCE` +Source configuration rule: + +- prefer `EXPORTER_SOURCES_JSON` as the main source declaration path +- `EXPORTER_BASE_URL` is still supported as the current compatibility path when + JSON source configuration is absent +- upstream requests use `INTERNAL_SERVICE_TOKEN` as Bearer authentication + `IMAGE` rule: - it must contain the full image reference used to start the container diff --git a/docs/architecture.md b/docs/architecture.md index dd11c7f..d12a258 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -4,6 +4,13 @@ Control Plane. It consumes normalized traffic snapshots, computes replay-safe minute deltas, and writes billing truth into PostgreSQL. +Code-level references for the current implementation: + +- [design.md](design.md) +- [reference/service.md](reference/service.md) +- [reference/repository.md](reference/repository.md) +- [reference/httpapi.md](reference/httpapi.md) + ## Deployment topology ```mermaid @@ -47,7 +54,7 @@ flowchart LR Agent["agent-svc-plus
control plane"] Xray -->|"raw per-UUID totals"| Exporter - Exporter -->|"GET /v1/snapshots/latest payload"| Billing + Exporter -->|"GET /v1/snapshots/window"| Billing Agent -->|"schedule collect / reconcile"| Billing Billing -->|"idempotent writes"| PostgreSQL PostgreSQL -->|"usage + ledger + quota facts"| Accounts @@ -82,8 +89,10 @@ schema. ### Current implementation -- `billing-service` pulls from a single `EXPORTER_BASE_URL` -- the upstream snapshot source is `GET /v1/snapshots/latest` +- `billing-service` loads one or more sources from `EXPORTER_SOURCES_JSON` +- if `EXPORTER_SOURCES_JSON` is absent, it still accepts a single + `EXPORTER_BASE_URL` as a compatibility path +- the upstream snapshot source is `GET /v1/snapshots/window` - the service is a task-oriented writer with health, status, and job endpoints - persisted facts land in the existing `accounts.svc.plus` accounting schema @@ -95,8 +104,8 @@ schema. sets without losing `node_id`, `env`, or `inbound_tag` - remote exporter ingestion must work over HTTPS because exporters are not guaranteed to live on the same private network -- the target pull contract must support source checkpoints and replay-safe - catch-up, rather than relying only on one `latest` snapshot +- the target pull contract must keep source checkpoints and replay-safe + catch-up explicit and observable across nodes - `accounts.svc.plus` stays the read model and never delegates user-facing usage/billing reads back to `billing-service` diff --git a/docs/design.md b/docs/design.md new file mode 100644 index 0000000..3a243fb --- /dev/null +++ b/docs/design.md @@ -0,0 +1,266 @@ +# billing-service 设计说明 + +本文档面向维护 `billing-service` 的后端工程师,描述当前代码实现下的系统设计、模块边界、主执行流程,以及回放安全与幂等约束。 + +补充阅读: + +- [architecture.md](architecture.md):部署拓扑、系统边界、目标架构 +- [api.md](api.md):HTTP 任务接口、上下游契约 +- [reference/cmd.md](reference/cmd.md):进程入口与依赖装配 +- [reference/service.md](reference/service.md):服务层与核心业务函数 + +## 1. 定位与职责 + +`billing-service` 是 Cloud Network Billing & Control Plane 里的计费写模型。它不承担面向用户的查询职责,只负责把 exporter 提供的累计流量快照转换成可回放、可幂等的分钟计费事实,并写入 PostgreSQL。 + +当前职责边界: + +- 上游:从一个或多个 `xray-exporter` 拉取窗口化快照 +- 中间:做来源校验、累计值差分、分钟桶写入、账本写入、配额状态更新、同步状态推进 +- 下游:写入与 `accounts.svc.plus` 共享的 `account` 数据库 +- 对外:暴露运维任务接口和状态接口,不提供用户账单查询接口 + +## 2. 运行入口与模块装配 + +唯一进程入口是 [cmd/billing-service/main.go](/Users/shenlan/workspaces/cloud-neutral-toolkit/billing-service/cmd/billing-service/main.go)。 + +启动顺序: + +1. `config.Load()` 读取环境变量并构造 `config.Config` +2. `sql.Open("pgx", cfg.DatabaseURL)` 建立 PostgreSQL 连接 +3. `service.New(cfg, exporter.NewClient(...), repository.NewPostgres(db))` 装配核心依赖 +4. `svc.Start(ctx)` 启动后台定时采集循环 +5. `httpapi.New(svc).Routes()` 注册 HTTP 路由 +6. `http.Server.ListenAndServe()` 启动服务 +7. 收到 `SIGINT` / `SIGTERM` 后触发 `Shutdown` + +### 模块依赖图 + +```mermaid +flowchart LR + Main["cmd/billing-service/main.go"] + Config["internal/config"] + HTTP["internal/httpapi"] + Service["internal/service"] + Exporter["internal/exporter"] + Repo["internal/repository"] + Model["internal/model"] + DB["PostgreSQL"] + Upstream["xray-exporter"] + + Main --> Config + Main --> Exporter + Main --> Repo + Main --> Service + Main --> HTTP + Service --> Config + Service --> Repo + Service --> Model + Service --> Exporter + HTTP --> Service + Exporter --> Config + Exporter --> Model + Repo --> Model + Repo --> DB + Exporter --> Upstream +``` + +模块职责: + +- `internal/config`:环境变量解析、默认值、来源列表装配、镜像信息拆解 +- `internal/model`:上游快照、持久化实体、状态返回对象 +- `internal/exporter`:对 `xray-exporter` 的 HTTP 客户端 +- `internal/repository`:PostgreSQL 读写适配层 +- `internal/service`:窗口推进、差分、计费、幂等与状态机 +- `internal/httpapi`:任务触发和状态查询 HTTP 层 + +## 3. 一次 collect-and-rate 的主流程 + +HTTP `POST /v1/jobs/collect-and-rate` 和后台 ticker 都会进入 `service.Service.RunCollectAndRate`。当前主路径如下: + +```mermaid +sequenceDiagram + participant API as httpapi / ticker + participant S as service.Service + participant R as repository.Repository + participant E as exporter.Client + + API->>S: RunCollectAndRate(ctx, "collect-and-rate") + S->>S: 初始化 JobResult + loop 每个 enabled source + S->>R: GetSourceSyncState(sourceID) + S->>R: UpsertSourceSyncState(记录 attempted_at) + S->>E: FetchWindow(source, since, until, limit, cursor) + loop 每个 snapshot + S->>S: validateSnapshotSource + loop 每个 sample + S->>S: validateSample + S->>R: GetCheckpoint + S->>R: GetBillingProfile + S->>R: UpsertMinuteBucket + S->>R: GetQuotaState + S->>R: UpsertLedger + opt 新账本 + S->>R: UpsertQuotaState + end + S->>R: UpsertCheckpoint + end + end + S->>R: UpsertSourceSyncState(记录 completed_until / succeeded_at) + end + S->>S: record(result) +``` + +### 主流程中的关键决定 + +1. 按来源串行处理 + `RunCollectAndRate` 逐个遍历 `cfg.ExporterSources`,当前实现没有并发采集来源。 + +2. 以 source sync state 推进窗口 + `collectSource` 基于 `billing_source_sync_state.last_completed_until` 计算下一次拉取窗口,并固定带 2 分钟重叠。 + +3. 以 checkpoint 做累计值差分 + `processSample` 从 `traffic_stat_checkpoints` 读取上次累计值,计算本次分钟增量。 + +4. 先写分钟桶,再写账本,再更新配额 + 这保证数据层面先有流量事实,再有收费结果,最后才更新账户余额与剩余额度。 + +5. 以幂等写控制回放 + 分钟桶主键和账本主键都可重复 upsert;账本命中已有记录时,不再次扣减余额或配额。 + +## 4. 回放安全与幂等约束 + +当前实现依赖以下约束保证“重复拉取窗口不会重复扣费”: + +### 4.1 来源窗口重叠 + +- 常量:`sourceWindowOverlap = 2 * time.Minute` +- 每次采集窗口都会与上一轮完成位置重叠 2 分钟 +- 目的:容忍边界分钟重复返回或轻微时钟漂移 + +### 4.2 分钟桶主键去重 + +`traffic_minute_buckets` 主键: + +- `bucket_start` +- `node_id` +- `account_uuid` +- `region` +- `line_code` + +同一分钟、同节点、同账户、同地域、同线路的桶再次写入只会更新,不会生成第二条记录。 + +### 4.3 账本 ID 确定性生成 + +`deterministicLedgerID(bucket)` 基于以下字段生成 SHA-1 UUID: + +- `bucket_start` +- `node_id` +- `account_uuid` +- `region` +- `line_code` + +因此同一个分钟桶重复处理时会命中同一条账本记录。 + +### 4.4 配额只在“新账本”时变更 + +`processSample` 只有在 `UpsertLedger` 返回 `ledgerExisted == false` 时才会: + +- 扣减 `RemainingIncludedQuota` +- 更新 `CurrentBalance` +- 推进 `LastRatedBucketAt` +- 写回 `account_quota_states` + +这避免回放窗口重复扣费。 + +### 4.5 负差分重置保护 + +如果 exporter 返回的累计值比 checkpoint 更小,当前实现认为上游累计计数器发生了重置: + +- 增加 `reset_epoch` +- 直接更新 checkpoint +- 不生成分钟桶、不生成账本、不更新配额 + +这条路径的目标是防止负流量差分污染账本。 + +## 5. 当前配置来源 + +配置由 `internal/config.Load()` 从环境变量读取。关键项: + +- 必填:`DATABASE_URL`、`INTERNAL_SERVICE_TOKEN` +- 来源配置主路径:`EXPORTER_SOURCES_JSON` +- 当前兼容路径:`EXPORTER_BASE_URL` +- 监听与计费默认值:`LISTEN_ADDR`、`COLLECT_INTERVAL`、`SOURCE_REVISION`、`PRICE_PER_BYTE`、`INITIAL_INCLUDED_QUOTA_BYTES`、`INITIAL_BALANCE` + +当前建议: + +- 使用 `EXPORTER_SOURCES_JSON` 明确声明一个或多个来源 +- 仅把 `EXPORTER_BASE_URL` 视为当前仍保留的兼容入口,不作为主设计 + +详细字段见 [reference/config.md](reference/config.md)。 + +## 6. 数据持久化设计 + +当前实现直接依赖以下 PostgreSQL 表: + +- `traffic_stat_checkpoints` +- `traffic_minute_buckets` +- `billing_ledger` +- `account_quota_states` +- `account_billing_profiles` +- `billing_source_sync_state` + +表结构参考: + +- [../sql/billing-service-schema.sql](../sql/billing-service-schema.sql) +- [reference/repository.md](reference/repository.md) + +数据职责分层: + +- checkpoint:累计值差分基线 +- minute bucket:分钟级流量事实 +- ledger:收费事实与余额快照 +- quota state:账户剩余额度、余额、欠费与节流状态 +- billing profile:按账户覆盖默认定价 +- source sync state:来源窗口推进与失败信息 + +## 7. 上下游边界 + +### 上游边界 + +当前代码事实: + +- 使用 `GET /v1/snapshots/window` +- 通过 `Authorization: Bearer ` 认证 +- 查询参数:`since`、`until`、`limit`、`cursor` + +`service.collectSource` 还会基于 `ExpectedNodeID` 和 `ExpectedEnv` 校验来源是否与配置匹配。 + +### 下游边界 + +`billing-service` 只写 PostgreSQL,不承担用户查询。 + +读路径固定为: + +`console.svc.plus` -> `accounts.svc.plus` -> PostgreSQL + +因此 `billing-service` 的 `/v1/status` 仅用于运维,不是用户账单查询 API。 + +## 8. 当前实现与后续演进 + +### 当前实现 + +- 服务内串行遍历来源 +- 依赖 exporter 提供窗口接口 +- 定价规则以默认配置 + `account_billing_profiles` 覆盖为主 +- 写路径与 `accounts.svc.plus` 共享同一个 `account` 数据库 + +### 后续演进 + +未来目标和跨节点 HTTPS 架构,继续以以下文档为准: + +- [architecture.md](architecture.md) +- [api.md](api.md) +- [multi-node-https-plan.md](multi-node-https-plan.md) + +本设计文档不把目标态混入当前实现说明;如果未来代码演进到目标态,应同步更新本文件与 `docs/reference/*`。 diff --git a/docs/reference/cmd.md b/docs/reference/cmd.md new file mode 100644 index 0000000..0b6ae66 --- /dev/null +++ b/docs/reference/cmd.md @@ -0,0 +1,60 @@ +# cmd 包参考 + +本文档覆盖 `cmd/billing-service/main.go`。该目录只有一个进程入口函数,用于装配配置、数据库、服务层和 HTTP 层。 + +## 文件定位 + +- 路径:`cmd/billing-service/main.go` +- 对外职责:启动 `billing-service` 进程 +- 依赖方向:`config` -> `exporter` -> `repository` -> `service` -> `httpapi` + +## 函数 + +### `main` + +- 签名:`func main()` +- 参数:无 +- 返回:无 +- 职责: + - 读取运行配置 + - 打开 PostgreSQL 连接 + - 构造 `service.Service` + - 启动后台采集循环 + - 启动 HTTP 服务 + - 监听退出信号并触发优雅关闭 +- 调用位置:Go 进程入口,由运行时直接调用 +- 主要副作用: + - 读取环境变量 + - 建立数据库连接 + - 启动 goroutine + - 监听网络地址 + - 向日志输出启动信息 +- 错误/边界条件: + - `config.Load()` 返回错误时直接 `log.Fatal` + - `sql.Open` 返回错误时直接 `log.Fatal` + - `ListenAndServe()` 返回非 `http.ErrServerClosed` 时直接 `log.Fatal` + +### 启动流程拆解 + +| 步骤 | 代码调用 | 目的 | +| --- | --- | --- | +| 1 | `config.Load()` | 组装运行配置与默认值 | +| 2 | `sql.Open("pgx", cfg.DatabaseURL)` | 建立 PostgreSQL 驱动连接 | +| 3 | `signal.NotifyContext(...)` | 统一管理退出信号 | +| 4 | `service.New(...)` | 装配业务服务 | +| 5 | `svc.Start(ctx)` | 启动后台定时采集循环 | +| 6 | `httpapi.New(svc).Routes()` | 注册 HTTP 路由 | +| 7 | `server.ListenAndServe()` | 启动 HTTP 服务器 | +| 8 | `server.Shutdown(...)` | 在信号到达后优雅退出 | + +### 依赖装配结果 + +`main` 中构造出的核心依赖如下: + +- 配置对象:`config.Config` +- exporter 客户端:`*exporter.Client` +- 持久化实现:`*repository.Postgres` +- 服务层:`*service.Service` +- HTTP handler:`*httpapi.Handler` + +这意味着 `main` 自身不承载业务逻辑,只负责装配与生命周期管理。 diff --git a/docs/reference/config.md b/docs/reference/config.md new file mode 100644 index 0000000..b2c5e92 --- /dev/null +++ b/docs/reference/config.md @@ -0,0 +1,191 @@ +# config 包参考 + +本文档覆盖 `internal/config/config.go`,说明运行配置结构、环境变量来源和全部生产函数。 + +## 包职责 + +`internal/config` 负责把环境变量解析成运行时可直接使用的 `Config`,并处理: + +- 镜像标识拆解 +- exporter 来源列表构造 +- 默认值和必填项校验 +- 数值型环境变量解析 + +## 类型 + +### `ExporterSource` + +签名:`type ExporterSource struct` + +| 字段 | 类型 | 含义 | +| --- | --- | --- | +| `SourceID` | `string` | 来源唯一标识,写入 `billing_source_sync_state` 的主键 | +| `BaseURL` | `string` | exporter 基础地址,`FetchWindow` 会在其后拼接 `/v1/snapshots/window` | +| `ExpectedNodeID` | `string` | 期望上游返回的 `snapshot.node_id`,用于来源校验 | +| `ExpectedEnv` | `string` | 期望上游返回的 `snapshot.env`,用于来源校验 | +| `Enabled` | `bool` | 是否参与当前轮采集 | +| `TimeoutSeconds` | `int` | 单个来源 HTTP 请求超时秒数,<= 0 时使用默认值 15 | + +### `Config` + +签名:`type Config struct` + +| 字段 | 类型 | 含义 | +| --- | --- | --- | +| `ImageRef` | `string` | 原始 `IMAGE` 环境变量 | +| `ImageTag` | `string` | 从 `ImageRef` 中解析出的 tag | +| `ImageCommit` | `string` | 从 tag 中解析出的完整 40 位 commit | +| `ImageVersion` | `string` | 当前实现与 `ImageCommit` 相同,用于 `/api/ping` | +| `ExporterBaseURL` | `string` | 旧单来源兼容入口,对应 `EXPORTER_BASE_URL` | +| `ExporterSources` | `[]ExporterSource` | 当前实际启用的来源清单,推荐由 `EXPORTER_SOURCES_JSON` 生成 | +| `InternalServiceToken` | `string` | 调用 exporter 的 Bearer token | +| `DatabaseURL` | `string` | PostgreSQL DSN | +| `ListenAddr` | `string` | HTTP 监听地址,默认 `:8081` | +| `CollectInterval` | `time.Duration` | 后台定时采集间隔,默认 1 分钟 | +| `DefaultRegion` | `string` | 写入分钟桶时的默认地域 | +| `SourceRevision` | `string` | 当前写路径版本标识,默认 `billing-service-v1` | +| `PricePerByte` | `float64` | 默认每字节价格 | +| `InitialIncludedQuotaBytes` | `int64` | 默认初始包含流量 | +| `InitialBalance` | `float64` | 默认初始余额 | + +### `rawExporterSource` + +签名:`type rawExporterSource struct` + +这是 `EXPORTER_SOURCES_JSON` 的中间反序列化结构,不直接暴露给其他包。 + +| 字段 | 类型 | 含义 | +| --- | --- | --- | +| `SourceID` | `string` | JSON `source_id` | +| `BaseURL` | `string` | JSON `base_url` | +| `ExpectedNodeID` | `string` | JSON `expected_node_id` | +| `ExpectedEnv` | `string` | JSON `expected_env` | +| `Enabled` | `*bool` | JSON `enabled`,指针用于区分“未传”和“显式 false” | +| `TimeoutSeconds` | `int` | JSON `timeout_seconds` | + +## 环境变量映射 + +| 环境变量 | 是否必填 | 默认值 | 作用 | +| --- | --- | --- | --- | +| `IMAGE` | 否 | 空字符串 | 供 `/api/ping` 返回镜像身份 | +| `EXPORTER_SOURCES_JSON` | 条件必填 | 无 | 推荐的多来源配置入口 | +| `EXPORTER_BASE_URL` | 条件必填 | 无 | 当前兼容路径,仅在未提供 `EXPORTER_SOURCES_JSON` 时使用 | +| `INTERNAL_SERVICE_TOKEN` | 是 | 无 | exporter Bearer token | +| `DATABASE_URL` | 是 | 无 | PostgreSQL 连接串 | +| `LISTEN_ADDR` | 否 | `:8081` | HTTP 监听地址 | +| `COLLECT_INTERVAL` | 否 | `1m` | 后台定时采集间隔 | +| `DEFAULT_REGION` | 否 | 空字符串 | 分钟桶地域字段 | +| `SOURCE_REVISION` | 否 | `billing-service-v1` | 写路径版本号 | +| `PRICE_PER_BYTE` | 否 | `0` | 默认单价 | +| `INITIAL_INCLUDED_QUOTA_BYTES` | 否 | `0` | 默认初始包含流量 | +| `INITIAL_BALANCE` | 否 | `0` | 默认初始余额 | + +## 函数 + +### `Load` + +- 签名:`func Load() (Config, error)` +- 参数:无 +- 返回: + - `Config`:已填充默认值和解析结果的配置对象 + - `error`:必填项缺失、时长解析失败、来源 JSON 解析失败时返回 +- 职责: + - 读取全部环境变量 + - 调用 `parseImageRef` 拆解镜像信息 + - 调用 `loadExporterSources` 生成 `ExporterSources` + - 处理默认值和必填项校验 +- 调用位置: + - `cmd/billing-service/main.go` +- 主要副作用: + - 读取进程环境变量 +- 错误/边界条件: + - `DATABASE_URL` 为空时报错 + - `INTERNAL_SERVICE_TOKEN` 为空时报错 + - `COLLECT_INTERVAL` 无法解析时报错 + - `EXPORTER_SOURCES_JSON` 非法或为空列表时报错 + - 若 `EXPORTER_SOURCES_JSON` 未设置,则要求 `EXPORTER_BASE_URL` 存在 + +### `parseImageRef` + +- 签名:`func parseImageRef(imageRef string) (tag, commit, version string)` +- 参数: + - `imageRef`:完整镜像引用,例如 `registry.example.com/billing-service:sha-<40位SHA>` +- 返回: + - `tag`:冒号后的 tag + - `commit`:当 tag 为完整 40 位 SHA 或 `sha-<40位SHA>` 时提取出的 commit + - `version`:当前实现与 `commit` 相同 +- 职责: + - 为 `/api/ping` 提供可校验的运行镜像身份 +- 调用位置: + - `Load` +- 主要副作用:无 +- 错误/边界条件: + - 空字符串、无冒号、无 tag、tag 不是完整 40 位 SHA 时,返回空结果而非报错 + +### `loadExporterSources` + +- 签名:`func loadExporterSources(legacyBaseURL, rawJSON string) ([]ExporterSource, error)` +- 参数: + - `legacyBaseURL`:旧单来源配置 + - `rawJSON`:`EXPORTER_SOURCES_JSON` 原始内容 +- 返回: + - `[]ExporterSource`:可直接参与采集的来源配置 + - `error`:来源缺失、JSON 非法、必填字段缺失时返回 +- 职责: + - 优先使用 `EXPORTER_SOURCES_JSON` + - 在未设置 JSON 时退回到单来源兼容模式 + - 为来源填充默认超时和 `enabled=true` +- 调用位置: + - `Load` +- 主要副作用:无 +- 错误/边界条件: + - 两种来源配置都缺失时报错 + - JSON 为空数组时报错 + - 某个来源缺少 `source_id` 或 `base_url` 时按来源维度报错 + - `timeout_seconds <= 0` 时回填为 15 + +### `parseFloatEnv` + +- 签名:`func parseFloatEnv(key string, fallback float64) float64` +- 参数: + - `key`:环境变量名 + - `fallback`:解析失败或缺失时返回的默认值 +- 返回: + - 解析成功后的 `float64`,否则返回 `fallback` +- 职责: + - 解析浮点型环境变量,不把格式错误上抛为配置加载失败 +- 调用位置: + - `Load` +- 主要副作用: + - 读取指定环境变量 +- 错误/边界条件: + - 缺失或格式错误时静默回退到默认值 + +### `parseIntEnv` + +- 签名:`func parseIntEnv(key string, fallback int64) int64` +- 参数: + - `key`:环境变量名 + - `fallback`:解析失败或缺失时返回的默认值 +- 返回: + - 解析成功后的 `int64`,否则返回 `fallback` +- 职责: + - 解析整型环境变量,不把格式错误上抛为配置加载失败 +- 调用位置: + - `Load` +- 主要副作用: + - 读取指定环境变量 +- 错误/边界条件: + - 缺失或格式错误时静默回退到默认值 + +## 当前主路径与兼容路径 + +当前实现仍保留两条来源配置路径: + +1. 主路径:`EXPORTER_SOURCES_JSON` +2. 兼容路径:`EXPORTER_BASE_URL` + +维护建议: + +- 新部署应只使用 `EXPORTER_SOURCES_JSON` +- 阅读代码时不要把 `ExporterBaseURL` 误解为主设计,它只是当前尚未移除的兼容入口 diff --git a/docs/reference/exporter.md b/docs/reference/exporter.md new file mode 100644 index 0000000..d67fadd --- /dev/null +++ b/docs/reference/exporter.md @@ -0,0 +1,97 @@ +# exporter 包参考 + +本文档覆盖 `internal/exporter/client.go`,说明与 `xray-exporter` 的 HTTP 调用约定。 + +## 包职责 + +`internal/exporter` 目前只提供一个客户端类型 `Client`。它不负责业务校验,也不负责窗口推进,只负责: + +- 拼接窗口拉取 URL +- 注入查询参数和认证头 +- 发送 HTTP 请求 +- 反序列化为 `model.SnapshotWindowPage` + +## 类型 + +### `Client` + +签名:`type Client struct` + +| 字段 | 类型 | 含义 | +| --- | --- | --- | +| `serviceToken` | `string` | 写入 `Authorization: Bearer ...` 的内部服务令牌 | + +## 函数 + +### `NewClient` + +- 签名:`func NewClient(serviceToken string) *Client` +- 参数: + - `serviceToken`:调用 exporter 时使用的内部服务令牌 +- 返回: + - `*Client`:已去掉首尾空白的客户端实例 +- 职责: + - 构造 exporter 客户端 +- 调用位置: + - `cmd/billing-service/main.go` +- 主要副作用:无 +- 错误/边界条件: + - 不做空令牌校验;必填校验在 `config.Load()` 中完成 + +### `FetchWindow` + +- 签名:`func (c *Client) FetchWindow(ctx context.Context, source config.ExporterSource, since, until time.Time, limit int, cursor *time.Time) (model.SnapshotWindowPage, error)` +- 参数: + - `ctx`:请求上下文 + - `source`:来源配置,提供 `BaseURL`、`SourceID`、`TimeoutSeconds` + - `since`:窗口起始时间 + - `until`:窗口结束时间 + - `limit`:分页大小 + - `cursor`:可选游标,非空时加入查询参数 +- 返回: + - `model.SnapshotWindowPage`:窗口分页结果 + - `error`:构造请求、发送请求、状态码异常、JSON 解码失败时返回 +- 职责: + - 将 `source.BaseURL` 与 `/v1/snapshots/window` 拼接成请求地址 + - 设置 `since`、`until`、`limit`、`cursor` 查询参数 + - 设置 `Accept: application/json` 和 `Authorization: Bearer ` 请求头 + - 基于来源超时时间构造 `http.Client` +- 调用位置: + - `service.collectSource` +- 主要副作用: + - 发起外部 HTTP 请求 +- 错误/边界条件: + - `source.TimeoutSeconds <= 0` 时使用 15 秒默认超时 + - 响应状态码不是 `200 OK` 时返回错误 + - 返回体不是合法 JSON 时返回错误 + +## 请求契约 + +### URL + +- 路径:`/v1/snapshots/window` +- 拼接方式:`url.JoinPath(strings.TrimRight(source.BaseURL, "/"), "/v1/snapshots/window")` + +### 查询参数 + +| 参数 | 含义 | +| --- | --- | +| `since` | RFC3339 格式的窗口起始时间 | +| `until` | RFC3339 格式的窗口结束时间 | +| `limit` | 本页最大快照数量 | +| `cursor` | 非空时追加,用于翻页 | + +### 请求头 + +| Header | 值 | +| --- | --- | +| `Accept` | `application/json` | +| `Authorization` | `Bearer ` | + +## 与 service 层的边界 + +- `Client` 不校验 `snapshot.node_id` / `snapshot.env` +- `Client` 不处理窗口重叠或完成位置推进 +- `Client` 不判断 `NextCursor` 是否为空或是否合法 + +这些行为都在 `service.collectSource` 中完成。 diff --git a/docs/reference/httpapi.md b/docs/reference/httpapi.md new file mode 100644 index 0000000..e72cfd3 --- /dev/null +++ b/docs/reference/httpapi.md @@ -0,0 +1,174 @@ +# httpapi 包参考 + +本文档覆盖 `internal/httpapi/handler.go`,说明 HTTP 路由注册和各处理函数与服务层的映射关系。 + +## 包职责 + +`internal/httpapi` 负责把 `service.Service` 暴露成 HTTP 接口。当前只承担: + +- 健康检查 +- 运行状态查询 +- 立即触发任务 +- 运行镜像身份查询 + +它不包含业务逻辑,所有计费行为都委托给 `service.Service`。 + +## 类型 + +### `Handler` + +签名:`type Handler struct` + +| 字段 | 类型 | 含义 | +| --- | --- | --- | +| `service` | `*service.Service` | 业务服务实例 | + +## 函数与方法 + +### `New` + +- 签名:`func New(svc *service.Service) *Handler` +- 参数: + - `svc`:业务服务实例 +- 返回: + - `*Handler` +- 职责: + - 构造 HTTP handler +- 调用位置: + - `cmd/billing-service/main.go` +- 主要副作用:无 +- 错误/边界条件: + - 不校验 `svc` 是否为 `nil` + +### `Routes` + +- 签名:`func (h *Handler) Routes() http.Handler` +- 参数:无 +- 返回: + - `http.Handler`:注册好全部路由的 `*http.ServeMux` +- 职责: + - 注册当前服务支持的全部 HTTP 路径 +- 调用位置: + - `cmd/billing-service/main.go` +- 主要副作用: + - 构造新的 `ServeMux` +- 错误/边界条件: + - 当前路由集合固定,不包含版本协商或中间件 + +当前注册的路由: + +| 路径 | 处理函数 | 说明 | +| --- | --- | --- | +| `/api/ping` | `ping` | 返回运行镜像身份 | +| `/healthz` | `healthz` | 返回最近一次任务健康状态 | +| `/v1/status` | `status` | 返回最近一次任务结果快照 | +| `/v1/jobs/collect-and-rate` | `collectAndRate` | 触发立即采集与计费 | +| `/v1/jobs/reconcile` | `reconcile` | 触发同一执行路径,但 job 名不同 | + +### `ping` + +- 签名:`func (h *Handler) ping(w http.ResponseWriter, r *http.Request)` +- 参数:`w`、`r` +- 返回:无 +- 职责: + - 返回 `service.Ping()` 的结果 +- 调用位置: + - `Routes` +- 主要副作用: + - 写 HTTP 响应 +- 错误/边界条件: + - 当前不限制 HTTP 方法,任何方法都会返回 `200` + +### `healthz` + +- 签名:`func (h *Handler) healthz(w http.ResponseWriter, r *http.Request)` +- 参数:`w`、`r` +- 返回:无 +- 职责: + - 调用 `service.Health()` + - 将布尔健康值映射为 HTTP 状态码与 `status` 文本 +- 调用位置: + - `Routes` +- 主要副作用: + - 写 HTTP 响应 +- 错误/边界条件: + - `ok == false` 时返回 `503` + - 当前不限制 HTTP 方法 + +### `status` + +- 签名:`func (h *Handler) status(w http.ResponseWriter, r *http.Request)` +- 参数:`w`、`r` +- 返回:无 +- 职责: + - 返回最近一次 `service.Status()` +- 调用位置: + - `Routes` +- 主要副作用: + - 写 HTTP 响应 +- 错误/边界条件: + - 当前不限制 HTTP 方法 + +### `collectAndRate` + +- 签名:`func (h *Handler) collectAndRate(w http.ResponseWriter, r *http.Request)` +- 参数:`w`、`r` +- 返回:无 +- 职责: + - 只接受 `POST` + - 调用 `service.RunCollectAndRate(r.Context(), "collect-and-rate")` + - 根据是否返回错误决定 `200` 或 `503` +- 调用位置: + - `Routes` +- 主要副作用: + - 触发一次完整计费执行 + - 写 HTTP 响应 +- 错误/边界条件: + - 非 `POST` 返回 `405 method not allowed` + - 服务层返回错误时仍返回 `JobResult`,但状态码为 `503` + +### `reconcile` + +- 签名:`func (h *Handler) reconcile(w http.ResponseWriter, r *http.Request)` +- 参数:`w`、`r` +- 返回:无 +- 职责: + - 只接受 `POST` + - 调用 `service.RunCollectAndRate(r.Context(), "reconcile")` + - 与 `collectAndRate` 共用同一业务路径,仅 job 名不同 +- 调用位置: + - `Routes` +- 主要副作用: + - 触发一次完整计费执行 + - 写 HTTP 响应 +- 错误/边界条件: + - 非 `POST` 返回 `405` + - 服务层返回错误时状态码为 `503` + +### `writeJSON` + +- 签名:`func writeJSON(w http.ResponseWriter, status int, payload any)` +- 参数: + - `w`:响应写入器 + - `status`:HTTP 状态码 + - `payload`:任意可 JSON 编码的对象 +- 返回:无 +- 职责: + - 统一设置 `Content-Type: application/json` + - 写入状态码和 JSON 响应体 +- 调用位置: + - `ping` + - `healthz` + - `status` + - `collectAndRate` + - `reconcile` +- 主要副作用: + - 写 HTTP Header 和 Body +- 错误/边界条件: + - JSON 编码错误被忽略,调用方不会收到显式错误 + +## 接口设计说明 + +- `collect-and-rate` 和 `reconcile` 在当前实现中只有作业名不同,没有独立 reconciliation 流程 +- `/healthz` 反映的是最近一次作业的结果,不是即时数据库探活 +- `/api/ping` 用于发布追踪,不依赖数据库或 exporter diff --git a/docs/reference/model.md b/docs/reference/model.md new file mode 100644 index 0000000..b9114c6 --- /dev/null +++ b/docs/reference/model.md @@ -0,0 +1,181 @@ +# model 包参考 + +本文档覆盖 `internal/model/types.go`。该包只定义结构体,不包含行为函数,是 `exporter`、`repository`、`service`、`httpapi` 共享的数据契约层。 + +## 1. 上游快照模型 + +### `Sample` + +签名:`type Sample struct` + +| 字段 | 类型 | JSON 字段 | 含义 | +| --- | --- | --- | --- | +| `UUID` | `string` | `uuid` | 账户 UUID,服务层会校验其为合法 UUID | +| `Email` | `string` | `email` | 上游样本附带的邮箱,当前服务层不使用此字段做计费决策 | +| `InboundTag` | `string` | `inbound_tag` | 线路标签,写入分钟桶的 `line_code` | +| `UplinkBytesTotal` | `int64` | `uplink_bytes_total` | 截止采样时累计上行字节 | +| `DownlinkBytesTotal` | `int64` | `downlink_bytes_total` | 截止采样时累计下行字节 | + +### `Snapshot` + +签名:`type Snapshot struct` + +| 字段 | 类型 | JSON 字段 | 含义 | +| --- | --- | --- | --- | +| `CollectedAt` | `time.Time` | `collected_at` | 快照采集时间,服务层会按分钟截断 | +| `NodeID` | `string` | `node_id` | 上游节点标识 | +| `Env` | `string` | `env` | 环境标识,例如 `prod` | +| `Samples` | `[]Sample` | `samples` | 本次快照中的样本列表 | + +### `SnapshotWindowPage` + +签名:`type SnapshotWindowPage struct` + +| 字段 | 类型 | JSON 字段 | 含义 | +| --- | --- | --- | --- | +| `NodeID` | `string` | `node_id` | 分页级别的节点标识 | +| `Env` | `string` | `env` | 分页级别的环境标识 | +| `Snapshots` | `[]Snapshot` | `snapshots` | 窗口拉取结果中的快照列表 | +| `HasMore` | `bool` | `has_more` | 是否还有下一页 | +| `NextCursor` | `string` | `next_cursor` | 下一页游标,当前实现要求其为 RFC3339 时间字符串 | + +## 2. 持久化实体模型 + +### `Checkpoint` + +签名:`type Checkpoint struct` + +| 字段 | 类型 | 含义 | +| --- | --- | --- | +| `NodeID` | `string` | 存储层节点 ID,实际由 `env:node_id` 组合而成 | +| `AccountUUID` | `string` | 账户 UUID | +| `LastUplinkTotal` | `int64` | 上次记录的累计上行字节 | +| `LastDownlinkTotal` | `int64` | 上次记录的累计下行字节 | +| `LastSeenAt` | `time.Time` | 上次成功处理该样本的采样时间 | +| `XrayRevision` | `string` | 当前写路径版本标识 | +| `ResetEpoch` | `int64` | 检测到累计值回退后的重置计数 | + +### `MinuteBucket` + +签名:`type MinuteBucket struct` + +| 字段 | 类型 | 含义 | +| --- | --- | --- | +| `BucketStart` | `time.Time` | 分钟桶起始时间,按 UTC 分钟对齐 | +| `NodeID` | `string` | 存储层节点 ID | +| `AccountUUID` | `string` | 账户 UUID | +| `Region` | `string` | 地域代码,当前来自 `cfg.DefaultRegion` | +| `LineCode` | `string` | 线路代码,当前来自 `sample.InboundTag` | +| `UplinkBytes` | `int64` | 当前分钟增量上行字节 | +| `DownlinkBytes` | `int64` | 当前分钟增量下行字节 | +| `TotalBytes` | `int64` | 当前分钟总字节数 | +| `Multiplier` | `float64` | 最终乘数,当前为地域乘数 * 线路乘数 | +| `RatingStatus` | `string` | 当前实现固定写 `rated` | +| `SourceRevision` | `string` | 写路径版本或计费规则版本 | + +### `LedgerEntry` + +签名:`type LedgerEntry struct` + +| 字段 | 类型 | 含义 | +| --- | --- | --- | +| `ID` | `string` | 账本主键,基于分钟桶确定性生成 | +| `AccountUUID` | `string` | 账户 UUID | +| `BucketStart` | `time.Time` | 对应分钟桶起始时间 | +| `BucketEnd` | `time.Time` | 对应分钟桶结束时间,当前实现为 `BucketStart + 1 分钟` | +| `EntryType` | `string` | 当前实现固定为 `traffic_charge` | +| `RatedBytes` | `int64` | 真正参与计费的字节数,已扣除包含流量 | +| `AmountDelta` | `float64` | 本次金额变化,当前为负值表示扣费 | +| `BalanceAfter` | `float64` | 扣费后的余额 | +| `PricingRuleVersion` | `string` | 实际应用的定价规则版本 | + +### `QuotaState` + +签名:`type QuotaState struct` + +| 字段 | 类型 | 含义 | +| --- | --- | --- | +| `AccountUUID` | `string` | 账户 UUID | +| `RemainingIncludedQuota` | `int64` | 剩余包含流量字节数 | +| `CurrentBalance` | `float64` | 当前余额 | +| `Arrears` | `bool` | 是否欠费 | +| `ThrottleState` | `string` | 节流状态,当前实现用 `normal` / `throttled` | +| `SuspendState` | `string` | 挂起状态,当前新建状态默认 `active` | +| `LastRatedBucketAt` | `*time.Time` | 最近一次成功计费的分钟桶时间 | +| `EffectiveAt` | `time.Time` | 当前状态生效时间 | + +### `BillingProfile` + +签名:`type BillingProfile struct` + +| 字段 | 类型 | 含义 | +| --- | --- | --- | +| `AccountUUID` | `string` | 账户 UUID | +| `PackageName` | `string` | 套餐名 | +| `IncludedQuotaBytes` | `int64` | 套餐包含流量 | +| `BasePricePerByte` | `float64` | 基础单价 | +| `RegionMultiplier` | `float64` | 地域乘数 | +| `LineMultiplier` | `float64` | 线路乘数 | +| `PeakMultiplier` | `float64` | 峰时乘数,当前代码未参与计算 | +| `OffPeakMultiplier` | `float64` | 非峰时乘数,当前代码未参与计算 | +| `PricingRuleVersion` | `string` | 计费规则版本号 | + +### `SourceSyncState` + +签名:`type SourceSyncState struct` + +| 字段 | 类型 | 含义 | +| --- | --- | --- | +| `SourceID` | `string` | 来源 ID | +| `LastCompletedUntil` | `*time.Time` | 最近一次成功推进到的窗口终点 | +| `LastAttemptedAt` | `*time.Time` | 最近一次开始尝试采集的时间 | +| `LastSucceededAt` | `*time.Time` | 最近一次成功完成采集的时间 | +| `LastError` | `string` | 最近一次失败信息 | + +## 3. HTTP 与运行状态模型 + +### `SourceStatus` + +签名:`type SourceStatus struct` + +| 字段 | 类型 | JSON 字段 | 含义 | +| --- | --- | --- | --- | +| `SourceID` | `string` | `source_id` | 来源 ID | +| `LastCompletedUntil` | `*time.Time` | `last_completed_until` | 对外暴露的窗口完成位置 | +| `LastAttemptedAt` | `*time.Time` | `last_attempted_at` | 对外暴露的最近尝试时间 | +| `LastSucceededAt` | `*time.Time` | `last_succeeded_at` | 对外暴露的最近成功时间 | +| `LastError` | `string` | `last_error` | 最近错误 | + +### `JobResult` + +签名:`type JobResult struct` + +| 字段 | 类型 | JSON 字段 | 含义 | +| --- | --- | --- | --- | +| `Job` | `string` | `job` | 作业名,当前为 `collect-and-rate` 或 `reconcile` | +| `StartedAt` | `time.Time` | `started_at` | 作业开始时间 | +| `FinishedAt` | `time.Time` | `finished_at` | 作业结束时间 | +| `ProcessedSamples` | `int` | `processed_samples` | 成功进入处理主路径的样本数 | +| `WrittenMinutes` | `int` | `written_minutes` | 新写入的分钟桶数 | +| `ReplayedMinutes` | `int` | `replayed_minutes` | 由于回放或重复写命中的分钟数 | +| `Status` | `string` | `status` | `ok` / `partial` / `error` | +| `Error` | `string` | `error` | 汇总错误信息 | +| `SourceStatuses` | `[]SourceStatus` | `source_statuses` | 各来源状态摘要 | + +### `PingInfo` + +签名:`type PingInfo struct` + +| 字段 | 类型 | JSON 字段 | 含义 | +| --- | --- | --- | --- | +| `Image` | `string` | `image` | 原始镜像引用 | +| `Tag` | `string` | `tag` | 解析出的镜像 tag | +| `Commit` | `string` | `commit` | 解析出的完整 commit | +| `Version` | `string` | `version` | 当前实现等于 commit | + +## 4. 使用约束 + +- 所有 `time.Time` 在服务层和仓储层都按 UTC 处理 +- `Snapshot` / `Sample` 是上游输入契约,不应被仓储层直接修改 +- `MinuteBucket`、`LedgerEntry`、`QuotaState` 共同构成计费事实链 +- `JobResult` 和 `SourceStatus` 只代表最近一次执行的内存快照,不是持久化审计日志 diff --git a/docs/reference/repository.md b/docs/reference/repository.md new file mode 100644 index 0000000..c5f453c --- /dev/null +++ b/docs/reference/repository.md @@ -0,0 +1,242 @@ +# repository 包参考 + +本文档覆盖 `internal/repository/repository.go` 与 `internal/repository/postgres.go`,说明仓储接口、PostgreSQL 实现及其与 SQL 表的映射关系。 + +## 包职责 + +`internal/repository` 是服务层和 PostgreSQL 之间的适配层。它负责: + +- 读取 checkpoint、账户配额、计费配置、来源同步状态 +- 对分钟桶、账本、checkpoint、配额状态、来源同步状态做幂等写入 +- 屏蔽 SQL 细节,让服务层只依赖领域对象 + +## 表映射 + +| 领域对象 / 方法 | PostgreSQL 表 | +| --- | --- | +| `Checkpoint` | `traffic_stat_checkpoints` | +| `MinuteBucket` | `traffic_minute_buckets` | +| `LedgerEntry` | `billing_ledger` | +| `QuotaState` | `account_quota_states` | +| `BillingProfile` | `account_billing_profiles` | +| `SourceSyncState` | `billing_source_sync_state` | + +参考 DDL: + +- [../../sql/billing-service-schema.sql](../../sql/billing-service-schema.sql) + +## 接口 + +### `Repository` + +签名:`type Repository interface` + +该接口定义了服务层需要的最小持久化能力。 + +| 方法 | 参数 | 返回 | 作用 | +| --- | --- | --- | --- | +| `GetCheckpoint` | `ctx, nodeID, accountUUID` | `(*model.Checkpoint, error)` | 读取差分基线 | +| `UpsertCheckpoint` | `ctx, checkpoint` | `error` | 更新差分基线 | +| `UpsertMinuteBucket` | `ctx, bucket` | `(bool, error)` | upsert 分钟桶,并返回是否已存在 | +| `UpsertLedger` | `ctx, entry` | `(bool, error)` | upsert 账本,并返回是否已存在 | +| `GetQuotaState` | `ctx, accountUUID` | `(*model.QuotaState, error)` | 读取账户配额状态 | +| `UpsertQuotaState` | `ctx, state` | `error` | 更新账户配额状态 | +| `GetBillingProfile` | `ctx, accountUUID` | `(*model.BillingProfile, error)` | 读取账户定价配置 | +| `GetSourceSyncState` | `ctx, sourceID` | `(*model.SourceSyncState, error)` | 读取来源同步进度 | +| `UpsertSourceSyncState` | `ctx, state` | `error` | 更新来源同步进度 | + +## 类型 + +### `Postgres` + +签名:`type Postgres struct` + +| 字段 | 类型 | 含义 | +| --- | --- | --- | +| `db` | `*sql.DB` | PostgreSQL 连接池 | + +### `NewPostgres` + +- 签名:`func NewPostgres(db *sql.DB) *Postgres` +- 参数: + - `db`:PostgreSQL 连接池 +- 返回: + - `*Postgres`:仓储实现 +- 职责: + - 构造 `Repository` 的 PostgreSQL 适配器 +- 调用位置: + - `cmd/billing-service/main.go` +- 主要副作用:无 +- 错误/边界条件: + - 不校验 `db` 是否为 `nil` + +## 读取方法 + +### `GetCheckpoint` + +- 签名:`func (p *Postgres) GetCheckpoint(ctx context.Context, nodeID, accountUUID string) (*model.Checkpoint, error)` +- 参数:`ctx`、`nodeID`、`accountUUID` +- 返回: + - 命中时返回 `*model.Checkpoint` + - 未命中时返回 `nil, nil` + - 查询失败时返回错误 +- 职责: + - 从 `traffic_stat_checkpoints` 读取指定节点和账户的累计值基线 +- 调用位置: + - `service.processSample` +- 主要副作用: + - 读取数据库 +- 错误/边界条件: + - `sql.ErrNoRows` 被转换为 `nil, nil` + +### `GetQuotaState` + +- 签名:`func (p *Postgres) GetQuotaState(ctx context.Context, accountUUID string) (*model.QuotaState, error)` +- 参数:`ctx`、`accountUUID` +- 返回: + - 命中时返回 `*model.QuotaState` + - 未命中时返回 `nil, nil` + - 查询失败时返回错误 +- 职责: + - 读取 `account_quota_states` + - 把 `last_rated_bucket_at` 的 `sql.NullTime` 转成 `*time.Time` +- 调用位置: + - `service.processSample` +- 主要副作用: + - 读取数据库 +- 错误/边界条件: + - `sql.ErrNoRows` 被转换为 `nil, nil` + +### `GetBillingProfile` + +- 签名:`func (p *Postgres) GetBillingProfile(ctx context.Context, accountUUID string) (*model.BillingProfile, error)` +- 参数:`ctx`、`accountUUID` +- 返回: + - 命中时返回 `*model.BillingProfile` + - 未命中时返回 `nil, nil` + - 查询失败时返回错误 +- 职责: + - 从 `account_billing_profiles` 读取账户级定价配置 +- 调用位置: + - `service.processSample` +- 主要副作用: + - 读取数据库 +- 错误/边界条件: + - `sql.ErrNoRows` 被转换为 `nil, nil` + +### `GetSourceSyncState` + +- 签名:`func (p *Postgres) GetSourceSyncState(ctx context.Context, sourceID string) (*model.SourceSyncState, error)` +- 参数:`ctx`、`sourceID` +- 返回: + - 命中时返回 `*model.SourceSyncState` + - 未命中时返回 `nil, nil` + - 查询失败时返回错误 +- 职责: + - 从 `billing_source_sync_state` 读取来源同步状态 + - 把 3 个 `sql.NullTime` 字段转换为可选时间指针 +- 调用位置: + - `service.collectSource` +- 主要副作用: + - 读取数据库 +- 错误/边界条件: + - `sql.ErrNoRows` 被转换为 `nil, nil` + +## 写入方法 + +### `UpsertCheckpoint` + +- 签名:`func (p *Postgres) UpsertCheckpoint(ctx context.Context, checkpoint model.Checkpoint) error` +- 参数:`ctx`、`checkpoint` +- 返回:`error` +- 职责: + - 把最新累计值基线写入 `traffic_stat_checkpoints` + - 以 `(node_id, account_uuid)` 为冲突键更新旧记录 +- 调用位置: + - `service.processSample` +- 主要副作用: + - 写数据库 +- 错误/边界条件: + - SQL 执行失败时直接返回错误 + +### `UpsertMinuteBucket` + +- 签名:`func (p *Postgres) UpsertMinuteBucket(ctx context.Context, bucket model.MinuteBucket) (bool, error)` +- 参数:`ctx`、`bucket` +- 返回: + - `bool`:写入前该分钟桶是否已经存在 + - `error`:查询或写入失败时返回 +- 职责: + - 先调用 `minuteBucketExists` + - 再对 `traffic_minute_buckets` 执行 upsert +- 调用位置: + - `service.processSample` +- 主要副作用: + - 先读后写数据库 +- 错误/边界条件: + - 幂等性依赖主键 `(bucket_start, node_id, account_uuid, region, line_code)` + +### `UpsertLedger` + +- 签名:`func (p *Postgres) UpsertLedger(ctx context.Context, entry model.LedgerEntry) (bool, error)` +- 参数:`ctx`、`entry` +- 返回: + - `bool`:写入前该账本是否已存在 + - `error`:查询或写入失败时返回 +- 职责: + - 先调用 `ledgerExists` + - 再对 `billing_ledger` 执行 upsert +- 调用位置: + - `service.processSample` +- 主要副作用: + - 先读后写数据库 +- 错误/边界条件: + - 幂等性依赖 `entry.ID` + +### `UpsertQuotaState` + +- 签名:`func (p *Postgres) UpsertQuotaState(ctx context.Context, state model.QuotaState) error` +- 参数:`ctx`、`state` +- 返回:`error` +- 职责: + - 将账户状态写入 `account_quota_states` + - 根据 `account_uuid` 做 upsert +- 调用位置: + - `service.processSample` +- 主要副作用: + - 写数据库 +- 错误/边界条件: + - `LastRatedBucketAt == nil` 时按 SQL `NULL` 写入 + +### `UpsertSourceSyncState` + +- 签名:`func (p *Postgres) UpsertSourceSyncState(ctx context.Context, state model.SourceSyncState) error` +- 参数:`ctx`、`state` +- 返回:`error` +- 职责: + - 将来源同步状态写入 `billing_source_sync_state` + - 根据 `source_id` 做 upsert +- 调用位置: + - `service.collectSource` + - `service.recordSourceFailure` +- 主要副作用: + - 写数据库 +- 错误/边界条件: + - 各时间字段为 `nil` 时按 SQL `NULL` 写入 + +## 内部辅助函数 + +这些函数不属于 `Repository` 接口,但仍是当前生产代码的一部分。 + +| 函数 | 签名 | 参数 | 返回 | 职责 | 调用位置 | 副作用 / 边界条件 | +| --- | --- | --- | --- | --- | --- | --- | +| `minuteBucketExists` | `func (p *Postgres) minuteBucketExists(ctx context.Context, bucket model.MinuteBucket) (bool, error)` | `ctx`、`bucket` | 是否存在、错误 | 检查分钟桶主键是否已存在 | `UpsertMinuteBucket` | `sql.ErrNoRows` 转为 `false, nil` | +| `ledgerExists` | `func (p *Postgres) ledgerExists(ctx context.Context, id string) (bool, error)` | `ctx`、`id` | 是否存在、错误 | 检查账本主键是否已存在 | `UpsertLedger` | `sql.ErrNoRows` 转为 `false, nil` | +| `ensureUTC` | `func ensureUTC(ts time.Time) time.Time` | `ts` | `time.Time` | 返回 UTC 时间 | 当前未被调用 | 无副作用;目前是保留的时间规范化辅助函数 | +| `unexpectedStatus` | `func unexpectedStatus(name string) error` | `name` | `error` | 构造统一错误消息 | 当前未被调用 | 无副作用;目前是保留的错误构造辅助函数 | + +## 设计说明 + +- 仓储层当前没有显式事务封装,`processSample` 的多次写入由服务层按固定顺序驱动 +- `UpsertMinuteBucket` 和 `UpsertLedger` 的“是否已存在”采用“先查再写”的接口语义,便于服务层统计 `WrittenMinutes` / `ReplayedMinutes` +- `ensureUTC` 和 `unexpectedStatus` 当前未进入主路径;阅读代码时不要误判为关键业务流程的一部分 diff --git a/docs/reference/service.md b/docs/reference/service.md new file mode 100644 index 0000000..637dcb1 --- /dev/null +++ b/docs/reference/service.md @@ -0,0 +1,292 @@ +# service 包参考 + +本文档覆盖 `internal/service/service.go`,这是仓库的核心业务层。 + +## 包职责 + +`internal/service` 负责把窗口化快照转换成计费事实。它串联: + +- 来源窗口推进 +- 快照与样本校验 +- 累计值差分 +- 分钟桶写入 +- 账本写入 +- 配额状态推进 +- 来源同步状态记录 +- 最近一次任务结果缓存 + +## 调用链总览 + +```mermaid +flowchart TD + Start["Start"] --> Run["RunCollectAndRate"] + Run --> Collect["collectSource"] + Collect --> ProcSnap["processSnapshot"] + ProcSnap --> ProcSample["processSample"] + ProcSample --> Pricing["resolvePricing"] + Collect --> Fail["recordSourceFailure"] + Run --> Record["record"] + Collect --> Status["sourceStatusFromState"] +``` + +## 类型 + +### `windowSource` + +签名:`type windowSource interface` + +| 方法 | 作用 | +| --- | --- | +| `FetchWindow(...)` | 抽象上游窗口拉取能力,便于用 `exporter.Client` 或测试桩替换 | + +### `Service` + +签名:`type Service struct` + +| 字段 | 类型 | 含义 | +| --- | --- | --- | +| `cfg` | `config.Config` | 运行配置 | +| `source` | `windowSource` | 上游窗口数据源 | +| `repo` | `repository.Repository` | 仓储接口 | +| `mu` | `sync.Mutex` | 保护最近一次任务状态 | +| `lastResult` | `model.JobResult` | 最近一次任务结果 | +| `lastOK` | `bool` | 最近一次任务是否非 `error` | +| `lastError` | `string` | 最近一次任务错误信息 | + +### `effectivePricing` + +签名:`type effectivePricing struct` + +| 字段 | 类型 | 含义 | +| --- | --- | --- | +| `includedQuotaBytes` | `int64` | 最终生效的包含流量 | +| `basePricePerByte` | `float64` | 最终生效的单价 | +| `multiplier` | `float64` | 最终生效的乘数 | +| `pricingRuleVersion` | `string` | 最终生效的规则版本 | + +## 公共函数与方法 + +### `New` + +- 签名:`func New(cfg config.Config, source windowSource, repo repository.Repository) *Service` +- 参数: + - `cfg`:运行配置 + - `source`:上游来源适配器 + - `repo`:持久化适配器 +- 返回: + - `*Service` +- 职责: + - 构造核心业务服务 +- 调用位置: + - `cmd/billing-service/main.go` +- 主要副作用:无 +- 错误/边界条件: + - 不校验参数是否为 `nil` + +### `Start` + +- 签名:`func (s *Service) Start(ctx context.Context)` +- 参数: + - `ctx`:进程级生命周期上下文 +- 返回:无 +- 职责: + - 启动后台 goroutine + - 先执行一次 `collect-and-rate` + - 再按 `cfg.CollectInterval` 周期性执行 +- 调用位置: + - `cmd/billing-service/main.go` +- 主要副作用: + - 启动 goroutine + - 周期性访问上游和数据库 +- 错误/边界条件: + - 后台调用忽略 `RunCollectAndRate` 返回值,失败只反映到 `lastResult` / `healthz` + +### `RunCollectAndRate` + +- 签名:`func (s *Service) RunCollectAndRate(ctx context.Context, job string) (model.JobResult, error)` +- 参数: + - `ctx`:请求或后台上下文 + - `job`:作业名,通常为 `collect-and-rate` 或 `reconcile` +- 返回: + - `model.JobResult`:本次执行结果 + - `error`:当整体状态为 `error` 时返回 +- 职责: + - 初始化一次作业结果 + - 遍历全部启用来源 + - 聚合来源级状态与错误 + - 记录最终结果到内存状态 +- 调用位置: + - `Start` + - `httpapi.collectAndRate` + - `httpapi.reconcile` +- 主要副作用: + - 持有 `s.mu` + - 调用上游来源和仓储 + - 更新 `lastResult`、`lastOK`、`lastError` +- 错误/边界条件: + - 没有任何启用来源时把状态置为 `error` + - 部分来源失败但仍处理到部分样本时,把状态置为 `partial` + - 状态为 `error` 时返回聚合错误 + +### `Status` + +- 签名:`func (s *Service) Status() model.JobResult` +- 参数:无 +- 返回:最近一次 `JobResult` +- 职责: + - 在线返回最近一次作业结果快照 +- 调用位置: + - `httpapi.status` +- 主要副作用: + - 获取互斥锁 +- 错误/边界条件: + - 进程刚启动且尚未执行时会返回零值 `JobResult` + +### `Health` + +- 签名:`func (s *Service) Health() (bool, string)` +- 参数:无 +- 返回: + - `bool`:最近一次任务是否非 `error` + - `string`:最近一次错误字符串 +- 职责: + - 为 `/healthz` 提供简化健康状态 +- 调用位置: + - `httpapi.healthz` +- 主要副作用: + - 获取互斥锁 +- 错误/边界条件: + - 健康状态只代表最近一次执行,不代表数据库连通性即时探测 + +### `Ping` + +- 签名:`func (s *Service) Ping() model.PingInfo` +- 参数:无 +- 返回:`model.PingInfo` +- 职责: + - 返回运行镜像身份信息 +- 调用位置: + - `httpapi.ping` +- 主要副作用:无 +- 错误/边界条件: + - 当 `IMAGE` 未正确注入时,返回空字段而不是伪造值 + +## 核心内部流程函数 + +### `collectSource` + +- 签名:`func (s *Service) collectSource(ctx context.Context, source config.ExporterSource, result *model.JobResult) (model.SourceStatus, error)` +- 参数: + - `ctx`:上下文 + - `source`:单个来源配置 + - `result`:当前作业结果,用于累计统计 +- 返回: + - `model.SourceStatus`:该来源的同步状态摘要 + - `error`:该来源失败时返回 +- 职责: + - 读取和初始化来源同步状态 + - 计算当前窗口的 `since` / `until` + - 分页拉取来源快照 + - 校验来源 `node_id` / `env` + - 调用 `processSnapshot` + - 更新完成位置与成功时间 +- 调用位置: + - `RunCollectAndRate` +- 主要副作用: + - 读写 `billing_source_sync_state` + - 调用外部 exporter + - 推进 `result.SourceStatuses` +- 错误/边界条件: + - `HasMore=true` 但 `NextCursor` 为空时视为错误 + - `NextCursor` 不是 RFC3339 时间时视为错误 + - 当 `since.After(until)` 时,认为本轮无需采集,但仍会推进 `last_completed_until` + +### `processSnapshot` + +- 签名:`func (s *Service) processSnapshot(ctx context.Context, snapshot model.Snapshot, result *model.JobResult) (bool, error)` +- 参数: + - `ctx`:上下文 + - `snapshot`:单个快照 + - `result`:当前作业结果 +- 返回: + - `bool`:该快照是否至少成功处理了一个样本 + - `error`:某个样本进入致命错误路径时返回 +- 职责: + - 遍历快照中的全部样本 + - 调用 `validateSample` + - 对合法样本调用 `processSample` + - 递增 `ProcessedSamples` +- 调用位置: + - `collectSource` +- 主要副作用: + - 更新 `result` +- 错误/边界条件: + - 非法 UUID 不会中断整张快照,只把作业标记为 `partial` + - `processSample` 返回错误时中断该快照处理 + +### `processSample` + +- 签名:`func (s *Service) processSample(ctx context.Context, snapshot model.Snapshot, sample model.Sample, result *model.JobResult) (bool, error)` +- 参数: + - `ctx`:上下文 + - `snapshot`:所属快照 + - `sample`:单个账户样本 + - `result`:当前作业结果 +- 返回: + - `bool`:该样本是否成功进入分钟桶/账本处理路径 + - `error`:读取或写入持久化失败时返回 +- 职责: + - 组合 `storageNodeID` + - 读取 checkpoint 并计算累计值差分 + - 检测负差分并执行 reset 保护 + - 读取计费配置与配额状态 + - 生成并 upsert 分钟桶 + - 生成并 upsert 账本 + - 在新账本场景下更新配额状态 + - 最后更新 checkpoint +- 调用位置: + - `processSnapshot` +- 主要副作用: + - 访问 5 张表:checkpoint、minute bucket、ledger、quota state、billing profile + - 可能修改 `result.WrittenMinutes` / `result.ReplayedMinutes` +- 错误/边界条件: + - 负差分时仅更新 checkpoint 并返回 `false, nil` + - `minuteExisted` 会增加一次 `ReplayedMinutes` + - `ledgerExisted` 也会增加一次 `ReplayedMinutes` + - 当 `quota == nil` 时会按默认配置初始化账户状态 + +## 辅助函数分组 + +### 定价与数值辅助 + +| 函数 | 签名 | 参数 | 返回 | 职责 | 调用位置 | 副作用 / 边界条件 | +| --- | --- | --- | --- | --- | --- | --- | +| `resolvePricing` | `func resolvePricing(profile *model.BillingProfile, cfg config.Config) effectivePricing` | `profile`、`cfg` | `effectivePricing` | 计算最终生效的包含流量、单价、乘数、规则版本 | `processSample` | `profile == nil` 时回退到配置默认值;乘数 <= 0 时回退到 `1.0` | +| `minInt64` | `func minInt64(a, b int64) int64` | `a`、`b` | `int64` | 返回较小值,用于计算本次消耗的包含流量 | `processSample` | 无副作用 | + +### 校验与标识辅助 + +| 函数 | 签名 | 参数 | 返回 | 职责 | 调用位置 | 副作用 / 边界条件 | +| --- | --- | --- | --- | --- | --- | --- | +| `validateSample` | `func validateSample(sample model.Sample) error` | `sample` | `error` | 校验样本 UUID 非空且可解析 | `processSnapshot` | 非法 UUID 返回错误字符串 | +| `validateSnapshotSource` | `func validateSnapshotSource(snapshot model.Snapshot, source config.ExporterSource) error` | `snapshot`、`source` | `error` | 校验来源返回的 `node_id` / `env` 是否符合配置 | `collectSource` | 仅当 `ExpectedNodeID` 或 `ExpectedEnv` 非空时才校验 | +| `deterministicLedgerID` | `func deterministicLedgerID(bucket model.MinuteBucket) string` | `bucket` | `string` | 基于分钟桶生成确定性账本 ID | `processSample` | 使用 SHA-1 UUID;同桶重复处理会命中同一 ID | +| `composeStorageNodeID` | `func composeStorageNodeID(env, nodeID string) string` | `env`、`nodeID` | `string` | 组合存储层节点 ID | `processSample` | `env` 为空时仅返回 `nodeID` | + +### 错误与状态辅助 + +| 函数 | 签名 | 参数 | 返回 | 职责 | 调用位置 | 副作用 / 边界条件 | +| --- | --- | --- | --- | --- | --- | --- | +| `joinError` | `func joinError(existing, next string) string` | `existing`、`next` | `string` | 把多段错误信息拼接为一个字符串 | `RunCollectAndRate`、`processSnapshot`、`recordSourceFailure` | 已有错误时用 `; ` 连接 | +| `sourceStatusFromState` | `func sourceStatusFromState(state model.SourceSyncState) model.SourceStatus` | `state` | `model.SourceStatus` | 把持久化状态转换成对外返回状态 | `collectSource`、`recordSourceFailure` | 会复制时间指针,避免共享内部地址 | +| `copyTimePtr` | `func copyTimePtr(value *time.Time) *time.Time` | `value` | `*time.Time` | 复制时间指针并转为 UTC | `sourceStatusFromState`、测试中的 clone 辅助 | `nil` 输入返回 `nil` | +| `recordSourceFailure` | `func (s *Service) recordSourceFailure(ctx context.Context, state model.SourceSyncState, err error) (model.SourceStatus, error)` | `ctx`、`state`、`err` | `model.SourceStatus`、`error` | 记录来源失败信息并返回状态摘要 | `collectSource` | 若持久化失败,会把“持久化失败信息”附加到 `LastError` 文本,但仍返回原始 `err` | +| `record` | `func (s *Service) record(result model.JobResult)` | `result` | 无 | 刷新内存中的最近一次任务状态 | `RunCollectAndRate` | 更新 `lastResult`、`lastError`、`lastOK` | + +## 关键不变量 + +- `RunCollectAndRate` 使用 `s.mu` 串行化整次执行,避免并发更新 `lastResult` +- 来源窗口推进状态来自 `billing_source_sync_state` +- 流量幂等依赖分钟桶主键与确定性账本 ID 的双重约束 +- 配额状态只有在新账本写入时才前进 +- checkpoint 永远在样本处理末尾写回,负差分时走重置路径