From 8cf03ba207649e2992b1f356ee6327808c98e12a Mon Sep 17 00:00:00 2001 From: Haitao Pan Date: Sun, 12 Apr 2026 13:14:41 +0800 Subject: [PATCH] feat: add multi-source billing ingestion --- README.md | 8 + cmd/billing-service/main.go | 2 +- docs/README.md | 48 +++ docs/api.md | 149 ++++++++ docs/architecture.md | 124 +++++++ docs/multi-node-https-plan.md | 199 ++++++++++ internal/config/config.go | 92 ++++- internal/config/config_test.go | 32 ++ internal/exporter/client.go | 50 ++- internal/model/types.go | 43 ++- internal/repository/postgres.go | 75 ++++ internal/repository/repository.go | 2 + internal/service/postgres_acceptance_test.go | 35 +- internal/service/service.go | 203 ++++++++-- internal/service/service_test.go | 372 +++++++++++++------ sql/billing-service-schema.sql | 89 +++++ testdata/postgres/init.sql | 9 + 17 files changed, 1344 insertions(+), 188 deletions(-) create mode 100644 docs/README.md create mode 100644 docs/api.md create mode 100644 docs/architecture.md create mode 100644 docs/multi-node-https-plan.md create mode 100644 internal/config/config_test.go create mode 100644 sql/billing-service-schema.sql diff --git a/README.md b/README.md index 5e7032a..f71c2c6 100644 --- a/README.md +++ b/README.md @@ -13,3 +13,11 @@ existing `accounts.svc.plus` PostgreSQL schema. - `POST /v1/jobs/reconcile` - `GET /healthz` - `GET /v1/status` + +## Documentation + +- `docs/README.md` - documentation index and verification notes +- `docs/architecture.md` - deployment and data-flow diagrams +- `docs/api.md` - task API surface and upstream/downstream boundaries +- `sql/billing-service-schema.sql` - bootstrap/reference DDL aligned with the + current `accounts.svc.plus` accounting schema diff --git a/cmd/billing-service/main.go b/cmd/billing-service/main.go index 7b89737..b72d0c5 100644 --- a/cmd/billing-service/main.go +++ b/cmd/billing-service/main.go @@ -34,7 +34,7 @@ func main() { svc := service.New( cfg, - exporter.NewClient(cfg.ExporterBaseURL), + exporter.NewClient(cfg.InternalServiceToken), repository.NewPostgres(db), ) svc.Start(ctx) diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 0000000..f161543 --- /dev/null +++ b/docs/README.md @@ -0,0 +1,48 @@ +# billing-service docs + +This directory holds service-owned documentation for `billing-service`. + +## Documents + +- [architecture.md](architecture.md) - deployment topology, billing data flow, + and current-vs-target architecture notes +- [api.md](api.md) - task endpoints, upstream snapshot contract, and downstream + read-model boundaries +- [multi-node-https-plan.md](multi-node-https-plan.md) - target-state plan for + evolving from a single exporter URL to secure multi-node HTTPS ingestion +- [../sql/billing-service-schema.sql](../sql/billing-service-schema.sql) - + reference DDL for the accounting tables `billing-service` depends on + +## Scope + +These docs describe the `billing-service` role inside the Cloud Network Billing +& Control Plane. + +- `billing-service` is the task-oriented write model +- `accounts.svc.plus` is the PostgreSQL-backed read model +- `console.svc.plus` is the presentation layer and does not query + `billing-service` directly + +System-wide contracts still live in +`github-org-cloud-neutral-toolkit/docs/architecture/network-billing-contracts.md`. + +## Deployment Verification + +Local or operator dry-run validation: + +```bash +cd /Users/shenlan/workspaces/cloud-neutral-toolkit/playbooks +export DATABASE_URL=postgres://... +ANSIBLE_CONFIG=./ansible.cfg \ +ansible-playbook -i ./inventory.ini -D -C ./deploy_billing_service.yml -l jp_xhttp_contabo_host +``` + +Notes: + +- `DATABASE_URL` must be exported before running `deploy_billing_service.yml` +- on `jp-xhttp-contabo.svc.plus`, `DATABASE_URL` should reference the same + `account` database used by `accounts.svc.plus` +- check mode may report predicted changes; the goal is to pass the preflight + assertion and render a valid deployment plan +- GitHub Actions uses the `BILLING_SERVICE_DATABASE_URL` secret to satisfy the + same precondition in the `deploy-billing-service` job diff --git a/docs/api.md b/docs/api.md new file mode 100644 index 0000000..c847fc8 --- /dev/null +++ b/docs/api.md @@ -0,0 +1,149 @@ +# billing-service API and interfaces + +This document describes the current `billing-service` task API plus the +upstream and downstream interfaces it depends on. + +## Service endpoints + +### `GET /healthz` + +Returns service health derived from the most recent collect-and-rate execution. + +Example response: + +```json +{ + "status": "ok", + "message": "" +} +``` + +### `GET /v1/status` + +Returns the latest in-memory job result snapshot. + +Key fields: + +- `job` +- `started_at` +- `finished_at` +- `processed_samples` +- `written_minutes` +- `replayed_minutes` +- `status` +- `error` + +### `POST /v1/jobs/collect-and-rate` + +Triggers an immediate snapshot pull from `xray-exporter`, computes minute +deltas, rates chargeable bytes, and writes replay-safe facts into PostgreSQL. + +Behavior: + +- method must be `POST` +- returns `200` when the run completes without a fatal service error +- returns `503` when upstream fetch or persistence fails hard enough to mark the + run unavailable + +### `POST /v1/jobs/reconcile` + +Triggers the same execution path as collect-and-rate, but records the job name +as `reconcile` for operational visibility. + +## Upstream dependency + +### `xray-exporter` + +`billing-service` currently depends on a single exporter base URL and fetches: + +- `GET /v1/snapshots/latest` + +Minimum payload shape: + +```json +{ + "collected_at": "2026-04-08T12:00:00Z", + "node_id": "jp-xhttp-contabo.svc.plus", + "env": "prod", + "samples": [ + { + "uuid": "uuid-1", + "email": "user@example.com", + "inbound_tag": "xhttp-premium", + "uplink_bytes_total": 1024, + "downlink_bytes_total": 2048 + } + ] +} +``` + +Required fields: + +- `collected_at` +- `node_id` +- `env` +- `samples[].uuid` +- `samples[].email` +- `samples[].inbound_tag` +- `samples[].uplink_bytes_total` +- `samples[].downlink_bytes_total` + +### Target upstream contract + +Current production behavior remains `GET /v1/snapshots/latest`, but the target +multi-node design should evolve to: + +- HTTPS transport for remote exporter pulls +- source-specific authentication +- a windowed pull API that supports catch-up and pagination + +Recommended target path: + +- `GET /v1/snapshots/window?since=&until=&limit=&cursor=` + +Target-state expectations: + +- remote pulls use `https://` exporter base URLs +- TLS verification stays enabled +- each source can be authenticated independently +- responses can be replayed safely from source checkpoints without duplicate + billing writes + +## Downstream reads + +User-facing reads do not go through `billing-service`. The read model is +`accounts.svc.plus`, backed by PostgreSQL. + +Relevant downstream APIs: + +- `GET /api/account/usage/summary` +- `GET /api/account/usage/buckets` +- `GET /api/account/billing/summary` + +Read-path rules: + +- `billing-service` does not expose user-facing usage or billing query APIs +- `accounts.svc.plus` reads PostgreSQL-backed usage and billing facts +- `console.svc.plus` queries `accounts.svc.plus`, not `billing-service` + +## Configuration inputs + +Runtime environment variables used by the current implementation: + +- `EXPORTER_BASE_URL` +- `DATABASE_URL` +- `LISTEN_ADDR` +- `COLLECT_INTERVAL` +- `DEFAULT_REGION` +- `SOURCE_REVISION` +- `PRICE_PER_BYTE` +- `INITIAL_INCLUDED_QUOTA_BYTES` +- `INITIAL_BALANCE` + +`DATABASE_URL` rule: + +- it must point to the same `account` database that `accounts.svc.plus` uses +- on `jp-xhttp-contabo.svc.plus`, the current accounts containers use + `DB_HOST=stunnel-client`, `DB_PORT=15432`, and `DB_NAME=account` +- `billing-service` should follow that same target so user-facing reads in + `accounts.svc.plus` see the exact facts written by `billing-service` diff --git a/docs/architecture.md b/docs/architecture.md new file mode 100644 index 0000000..dd11c7f --- /dev/null +++ b/docs/architecture.md @@ -0,0 +1,124 @@ +# billing-service architecture + +`billing-service` is the billing write model in the Cloud Network Billing & +Control Plane. It consumes normalized traffic snapshots, computes replay-safe +minute deltas, and writes billing truth into PostgreSQL. + +## Deployment topology + +```mermaid +flowchart TB + subgraph Node["Single-node VPS / billing host"] + Agent["agent-svc-plus"] + Exporter["xray-exporter"] + Billing["billing-service"] + StunnelClient["stunnel-client"] + end + + subgraph DBHost["postgresql.svc.plus"] + StunnelServer["stunnel-server"] + PostgreSQL["PostgreSQL"] + end + + subgraph ReadPath["Read and presentation"] + Accounts["accounts.svc.plus"] + Console["console.svc.plus"] + end + + Agent --> Billing + Agent -. coordination .-> Exporter + Billing --> StunnelClient + StunnelClient --> StunnelServer + StunnelServer --> PostgreSQL + Accounts --> PostgreSQL + Console --> Accounts +``` + +## Data flow + +```mermaid +flowchart LR + Xray["xray-core
raw cumulative counters"] + Exporter["xray-exporter
translation layer"] + Billing["billing-service
minute delta + rating writer"] + PostgreSQL["PostgreSQL
billing source of truth"] + Accounts["accounts.svc.plus
read model API"] + Console["console.svc.plus
presentation"] + Agent["agent-svc-plus
control plane"] + + Xray -->|"raw per-UUID totals"| Exporter + Exporter -->|"GET /v1/snapshots/latest payload"| Billing + Agent -->|"schedule collect / reconcile"| Billing + Billing -->|"idempotent writes"| PostgreSQL + PostgreSQL -->|"usage + ledger + quota facts"| Accounts + Accounts -->|"account usage / billing summary APIs"| Console +``` + +## Role boundaries + +- `agent-svc-plus`: control plane scheduling, reconciliation triggers, and + future automation hooks +- `xray-exporter`: collection and translation layer; it exposes normalized + snapshots and Prometheus metrics +- `billing-service`: billing writer; it computes positive minute deltas and + persists replay-safe facts +- `accounts.svc.plus`: PostgreSQL-backed read model; it aggregates usage, + billing, and quota state for user-facing APIs +- `console.svc.plus`: presentation layer; it reads from `accounts.svc.plus` + only + +## Shared database contract + +`billing-service` and `accounts.svc.plus` share the same account database and +schema. + +- the database name remains `account` +- on `jp-xhttp-contabo.svc.plus`, `accounts.svc.plus` reaches it through + `stunnel-client:15432` +- `billing-service` must point `DATABASE_URL` at that same PostgreSQL target so + writes and reads stay in one source of truth + +## Current implementation vs target architecture + +### Current implementation + +- `billing-service` pulls from a single `EXPORTER_BASE_URL` +- the upstream snapshot source is `GET /v1/snapshots/latest` +- the service is a task-oriented writer with health, status, and job endpoints +- persisted facts land in the existing `accounts.svc.plus` accounting schema + +### Target architecture + +- `billing-service` remains the write model, but evolves into a multi-node + aggregation point +- the write path handles multiple exporter feeds or equivalent multi-node sample + sets without losing `node_id`, `env`, or `inbound_tag` +- remote exporter ingestion must work over HTTPS because exporters are not + guaranteed to live on the same private network +- the target pull contract must support source checkpoints and replay-safe + catch-up, rather than relying only on one `latest` snapshot +- `accounts.svc.plus` stays the read model and never delegates user-facing + usage/billing reads back to `billing-service` + +## Target multi-node ingress requirements + +For the target architecture, `billing-service` should treat exporter nodes as +remote sources, not implicit local sidecars. + +- upstream pulls should use HTTPS with certificate validation enabled +- prefer mTLS between `billing-service` and each `xray-exporter` +- if mTLS is not ready, use HTTPS plus per-source bearer credentials +- source progress must be tracked per exporter node so retries and catch-up stay + bounded and observable +- billing completeness must come from windowed, replay-safe collection, not + from assuming the newest snapshot implies nothing was missed +- minute-level sync drift is acceptable; the target is short-window eventual + consistency rather than second-level strong consistency + +## Invariants + +- PostgreSQL is the only billing source of truth +- `billing-service` and `accounts.svc.plus` share the same `account` database +- Prometheus and Grafana remain observability only +- `console.svc.plus` does not read PostgreSQL or `billing-service` directly +- `accounts.svc.plus` does not use Prometheus as a billing data source diff --git a/docs/multi-node-https-plan.md b/docs/multi-node-https-plan.md new file mode 100644 index 0000000..a39b306 --- /dev/null +++ b/docs/multi-node-https-plan.md @@ -0,0 +1,199 @@ +# billing-service multi-node HTTPS ingestion plan + +This document defines the target evolution path for `billing-service` from the +current single `EXPORTER_BASE_URL` pull model to a secure multi-node ingestion +model. + +## Goal + +Keep `billing-service` as the single billing write model, but let it ingest +snapshots from many remote `xray-exporter` instances over HTTPS without +assuming private-network reachability. + +## Consistency budget + +Target state does not require second-level strong consistency. + +- minute-level sync drift is acceptable +- the system should be treated as eventually consistent across a short + multi-minute window +- user-facing reads in `accounts.svc.plus` and `console.svc.plus` may lag the + newest exporter counters briefly +- billing correctness matters more than immediate freshness + +Operational meaning: + +- collector retries may intentionally overlap prior windows +- delayed exporter delivery should be repaired by later collect or reconcile + runs +- the write model must converge to the correct minute buckets and ledger state + without double charging + +## Why the current model is not enough + +Today: + +- `billing-service` accepts one `EXPORTER_BASE_URL` +- it fetches one `GET /v1/snapshots/latest` payload +- it assumes the latest snapshot is enough to advance billing state + +This is fine for a single local exporter, but it is not enough for: + +- multiple proxy nodes +- exporters reachable only over public or cross-region networks +- outage recovery where `latest` alone cannot prove whether intermediate + windows were missed +- source-specific authentication and certificate validation + +## Target design + +### 1. Multi-source registry instead of one base URL + +Target state replaces the single `EXPORTER_BASE_URL` dependency with a source +registry owned by `billing-service`. + +Each configured source should define at least: + +- `source_id` +- `node_id` +- `env` +- `base_url` +- `enabled` +- `auth_mode` +- `credential_ref` +- `ca_bundle_ref` or trusted issuer reference +- `server_name` +- `collect_interval` +- `request_timeout` + +Rules: + +- target `base_url` must be `https://...` +- `node_id` and `env` must match what the exporter emits +- one source maps to one exporter endpoint, even if several sources later share + the same network path + +### 2. HTTPS-only upstream interaction + +Target state requires secure transport for remote exporter pulls. + +Security rules: + +- remote exporter pulls must use HTTPS +- certificate verification must stay enabled +- `billing-service` must not rely on insecure skip-verify mode +- prefer mTLS for service-to-service trust +- if mTLS is not yet available, use HTTPS plus a per-source bearer token +- credentials must be scoped per source, not shared globally across all nodes + +Recommended trust order: + +1. HTTPS + mTLS +2. HTTPS + bearer token + pinned CA / trusted issuer + +### 3. Completeness-first pull contract + +To make multi-node billing safe, the upstream contract must evolve from +`latest` to a windowed pull API. + +Recommended target contract: + +`GET /v1/snapshots/window?since=&until=&limit=&cursor=` + +Response shape should include: + +- `source_id` +- `node_id` +- `env` +- `window_start` +- `window_end` +- `items[]` +- `next_cursor` +- `has_more` +- `emitted_at` + +Each item should still carry: + +- `collected_at` +- `samples[].uuid` +- `samples[].email` +- `samples[].inbound_tag` +- `samples[].uplink_bytes_total` +- `samples[].downlink_bytes_total` + +Why this matters: + +- `latest` is enough for observability, but not enough to prove billing + completeness +- windowed pagination lets `billing-service` resume from checkpoints and catch + up after transient failures + +### 4. Source checkpoints and replay safety + +`billing-service` should track fetch progress per source, not globally. + +Recommended source checkpoint fields: + +- `source_id` +- `last_successful_until` +- `last_cursor` +- `last_attempted_at` +- `last_succeeded_at` +- `last_error` + +Collection behavior: + +- pull per source using that source's last successful checkpoint +- always overlap a small safety window during retries +- rely on idempotent minute-bucket writes so overlap does not double-charge +- expose source-level health in `/v1/status` +- treat short multi-minute lag as acceptable if replay convergence is preserved + +### 5. Safe write semantics + +Security alone is not enough; the write path must remain replay-safe. + +Target write-path rules: + +- billing facts remain keyed by `node_id`, `env`, `uuid`, `inbound_tag`, and + bucket time +- re-fetching the same source window must not duplicate usage or ledger rows +- reconcile jobs must be able to replay a source or time range intentionally + +## Recommended rollout + +### Phase 1. Preserve current runtime + +- keep `EXPORTER_BASE_URL` as legacy single-source mode +- keep `GET /v1/snapshots/latest` for current deployment compatibility + +### Phase 2. Add source registry support + +- introduce a multi-source config model +- let `billing-service` iterate sources internally +- keep single-source config as a compatibility shim + +### Phase 3. Add HTTPS window API to exporter + +- extend `xray-exporter` with a secure windowed snapshot API +- add source authentication and certificate validation requirements + +### Phase 4. Dual-read migration + +- let `billing-service` support both: + - legacy single-source `latest` + - target multi-source HTTPS window pulls +- compare source-level completeness and write counts during rollout + +### Phase 5. Make multi-source HTTPS the default + +- require HTTPS for remote exporter sources +- reserve plain HTTP for explicit same-host dev or local-only modes +- retire single global `EXPORTER_BASE_URL` as the primary production contract + +## Non-goals + +- exposing `billing-service` as a user-facing query API +- moving billing truth into Prometheus +- weakening TLS verification to simplify rollout +- making `accounts.svc.plus` call `billing-service` for runtime reads diff --git a/internal/config/config.go b/internal/config/config.go index 6954fae..181226e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1,6 +1,7 @@ package config import ( + "encoding/json" "fmt" "os" "strconv" @@ -8,8 +9,19 @@ import ( "time" ) +type ExporterSource struct { + SourceID string + BaseURL string + ExpectedNodeID string + ExpectedEnv string + Enabled bool + TimeoutSeconds int +} + type Config struct { ExporterBaseURL string + ExporterSources []ExporterSource + InternalServiceToken string DatabaseURL string ListenAddr string CollectInterval time.Duration @@ -20,13 +32,23 @@ type Config struct { InitialBalance float64 } +type rawExporterSource struct { + SourceID string `json:"source_id"` + BaseURL string `json:"base_url"` + ExpectedNodeID string `json:"expected_node_id"` + ExpectedEnv string `json:"expected_env"` + Enabled *bool `json:"enabled"` + TimeoutSeconds int `json:"timeout_seconds"` +} + func Load() (Config, error) { cfg := Config{ - ExporterBaseURL: strings.TrimRight(strings.TrimSpace(os.Getenv("EXPORTER_BASE_URL")), "/"), - DatabaseURL: strings.TrimSpace(os.Getenv("DATABASE_URL")), - ListenAddr: strings.TrimSpace(os.Getenv("LISTEN_ADDR")), - DefaultRegion: strings.TrimSpace(os.Getenv("DEFAULT_REGION")), - SourceRevision: strings.TrimSpace(os.Getenv("SOURCE_REVISION")), + ExporterBaseURL: strings.TrimRight(strings.TrimSpace(os.Getenv("EXPORTER_BASE_URL")), "/"), + InternalServiceToken: strings.TrimSpace(os.Getenv("INTERNAL_SERVICE_TOKEN")), + DatabaseURL: strings.TrimSpace(os.Getenv("DATABASE_URL")), + ListenAddr: strings.TrimSpace(os.Getenv("LISTEN_ADDR")), + DefaultRegion: strings.TrimSpace(os.Getenv("DEFAULT_REGION")), + SourceRevision: strings.TrimSpace(os.Getenv("SOURCE_REVISION")), } if cfg.ListenAddr == "" { cfg.ListenAddr = ":8081" @@ -35,12 +57,18 @@ func Load() (Config, error) { cfg.SourceRevision = "billing-service-v1" } - if cfg.ExporterBaseURL == "" { - return Config{}, fmt.Errorf("EXPORTER_BASE_URL is required") - } if cfg.DatabaseURL == "" { return Config{}, fmt.Errorf("DATABASE_URL is required") } + if cfg.InternalServiceToken == "" { + return Config{}, fmt.Errorf("INTERNAL_SERVICE_TOKEN is required") + } + + sources, err := loadExporterSources(cfg.ExporterBaseURL, strings.TrimSpace(os.Getenv("EXPORTER_SOURCES_JSON"))) + if err != nil { + return Config{}, err + } + cfg.ExporterSources = sources interval := strings.TrimSpace(os.Getenv("COLLECT_INTERVAL")) if interval == "" { @@ -59,6 +87,54 @@ func Load() (Config, error) { return cfg, nil } +func loadExporterSources(legacyBaseURL, rawJSON string) ([]ExporterSource, error) { + if rawJSON == "" { + if legacyBaseURL == "" { + return nil, fmt.Errorf("EXPORTER_SOURCES_JSON or EXPORTER_BASE_URL is required") + } + return []ExporterSource{{ + SourceID: "default", + BaseURL: strings.TrimRight(strings.TrimSpace(legacyBaseURL), "/"), + Enabled: true, + TimeoutSeconds: 15, + }}, nil + } + + var rawSources []rawExporterSource + if err := json.Unmarshal([]byte(rawJSON), &rawSources); err != nil { + return nil, fmt.Errorf("parse EXPORTER_SOURCES_JSON: %w", err) + } + if len(rawSources) == 0 { + return nil, fmt.Errorf("EXPORTER_SOURCES_JSON must define at least one source") + } + + sources := make([]ExporterSource, 0, len(rawSources)) + for _, raw := range rawSources { + source := ExporterSource{ + SourceID: strings.TrimSpace(raw.SourceID), + BaseURL: strings.TrimRight(strings.TrimSpace(raw.BaseURL), "/"), + ExpectedNodeID: strings.TrimSpace(raw.ExpectedNodeID), + ExpectedEnv: strings.TrimSpace(raw.ExpectedEnv), + Enabled: true, + TimeoutSeconds: raw.TimeoutSeconds, + } + if raw.Enabled != nil { + source.Enabled = *raw.Enabled + } + if source.SourceID == "" { + return nil, fmt.Errorf("EXPORTER_SOURCES_JSON source_id is required") + } + if source.BaseURL == "" { + return nil, fmt.Errorf("EXPORTER_SOURCES_JSON base_url is required for source %s", source.SourceID) + } + if source.TimeoutSeconds <= 0 { + source.TimeoutSeconds = 15 + } + sources = append(sources, source) + } + return sources, nil +} + func parseFloatEnv(key string, fallback float64) float64 { raw := strings.TrimSpace(os.Getenv(key)) if raw == "" { diff --git a/internal/config/config_test.go b/internal/config/config_test.go new file mode 100644 index 0000000..4bb262b --- /dev/null +++ b/internal/config/config_test.go @@ -0,0 +1,32 @@ +package config + +import "testing" + +func TestLoadExporterSourcesFromJSON(t *testing.T) { + sources, err := loadExporterSources("", `[{"source_id":"jp","base_url":"https://jp-xhttp-contabo.svc.plus","expected_node_id":"jp-xhttp-contabo.svc.plus","expected_env":"prod","enabled":true,"timeout_seconds":20}]`) + if err != nil { + t.Fatalf("load sources: %v", err) + } + if len(sources) != 1 { + t.Fatalf("expected 1 source, got %d", len(sources)) + } + if sources[0].SourceID != "jp" || sources[0].BaseURL != "https://jp-xhttp-contabo.svc.plus" { + t.Fatalf("unexpected source %#v", sources[0]) + } + if sources[0].TimeoutSeconds != 20 { + t.Fatalf("expected timeout 20, got %d", sources[0].TimeoutSeconds) + } +} + +func TestLoadExporterSourcesFallsBackToLegacyBaseURL(t *testing.T) { + sources, err := loadExporterSources("http://127.0.0.1:8080", "") + if err != nil { + t.Fatalf("load legacy source: %v", err) + } + if len(sources) != 1 { + t.Fatalf("expected 1 source, got %d", len(sources)) + } + if sources[0].SourceID != "default" || sources[0].BaseURL != "http://127.0.0.1:8080" { + t.Fatalf("unexpected source %#v", sources[0]) + } +} diff --git a/internal/exporter/client.go b/internal/exporter/client.go index 3586db2..3cdab7c 100644 --- a/internal/exporter/client.go +++ b/internal/exporter/client.go @@ -6,49 +6,65 @@ import ( "fmt" "net/http" "net/url" + "strconv" "strings" "time" + "billing-service/internal/config" "billing-service/internal/model" ) type Client struct { - baseURL string - httpClient *http.Client + serviceToken string } -func NewClient(baseURL string) *Client { +func NewClient(serviceToken string) *Client { return &Client{ - baseURL: strings.TrimRight(strings.TrimSpace(baseURL), "/"), - httpClient: &http.Client{Timeout: 15 * time.Second}, + serviceToken: strings.TrimSpace(serviceToken), } } -func (c *Client) FetchLatestSnapshot(ctx context.Context) (model.Snapshot, error) { - endpoint, err := url.JoinPath(c.baseURL, "/v1/snapshots/latest") +func (c *Client) FetchWindow(ctx context.Context, source config.ExporterSource, since, until time.Time, limit int, cursor *time.Time) (model.SnapshotWindowPage, error) { + timeout := time.Duration(source.TimeoutSeconds) * time.Second + if timeout <= 0 { + timeout = 15 * time.Second + } + client := &http.Client{Timeout: timeout} + + endpoint, err := url.JoinPath(strings.TrimRight(strings.TrimSpace(source.BaseURL), "/"), "/v1/snapshots/window") if err != nil { - return model.Snapshot{}, fmt.Errorf("build snapshot endpoint: %w", err) + return model.SnapshotWindowPage{}, fmt.Errorf("build snapshots window endpoint: %w", err) } req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil) if err != nil { - return model.Snapshot{}, fmt.Errorf("build snapshot request: %w", err) + return model.SnapshotWindowPage{}, fmt.Errorf("build snapshots window request: %w", err) } - req.Header.Set("Accept", "application/json") - resp, err := c.httpClient.Do(req) + query := req.URL.Query() + query.Set("since", since.UTC().Format(time.RFC3339)) + query.Set("until", until.UTC().Format(time.RFC3339)) + query.Set("limit", strconv.Itoa(limit)) + if cursor != nil { + query.Set("cursor", cursor.UTC().Format(time.RFC3339)) + } + req.URL.RawQuery = query.Encode() + req.Header.Set("Accept", "application/json") + req.Header.Set("Authorization", "Bearer "+c.serviceToken) + + resp, err := client.Do(req) if err != nil { - return model.Snapshot{}, fmt.Errorf("fetch snapshot: %w", err) + return model.SnapshotWindowPage{}, fmt.Errorf("fetch snapshots window for %s: %w", source.SourceID, err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return model.Snapshot{}, fmt.Errorf("fetch snapshot: unexpected status %s", resp.Status) + return model.SnapshotWindowPage{}, fmt.Errorf("fetch snapshots window for %s: unexpected status %s", source.SourceID, resp.Status) } - var snapshot model.Snapshot - if err := json.NewDecoder(resp.Body).Decode(&snapshot); err != nil { - return model.Snapshot{}, fmt.Errorf("decode snapshot: %w", err) + var page model.SnapshotWindowPage + if err := json.NewDecoder(resp.Body).Decode(&page); err != nil { + return model.SnapshotWindowPage{}, fmt.Errorf("decode snapshots window for %s: %w", source.SourceID, err) } - return snapshot, nil + return page, nil } diff --git a/internal/model/types.go b/internal/model/types.go index 7a58f0c..2706fbf 100644 --- a/internal/model/types.go +++ b/internal/model/types.go @@ -17,6 +17,14 @@ type Snapshot struct { Samples []Sample `json:"samples"` } +type SnapshotWindowPage struct { + NodeID string `json:"node_id"` + Env string `json:"env"` + Snapshots []Snapshot `json:"snapshots"` + HasMore bool `json:"has_more"` + NextCursor string `json:"next_cursor,omitempty"` +} + type Checkpoint struct { NodeID string AccountUUID string @@ -76,13 +84,30 @@ type BillingProfile struct { PricingRuleVersion string } -type JobResult struct { - Job string `json:"job"` - StartedAt time.Time `json:"started_at"` - FinishedAt time.Time `json:"finished_at"` - ProcessedSamples int `json:"processed_samples"` - WrittenMinutes int `json:"written_minutes"` - ReplayedMinutes int `json:"replayed_minutes"` - Status string `json:"status"` - Error string `json:"error,omitempty"` +type SourceSyncState struct { + SourceID string + LastCompletedUntil *time.Time + LastAttemptedAt *time.Time + LastSucceededAt *time.Time + LastError string +} + +type SourceStatus struct { + SourceID string `json:"source_id"` + LastCompletedUntil *time.Time `json:"last_completed_until,omitempty"` + LastAttemptedAt *time.Time `json:"last_attempted_at,omitempty"` + LastSucceededAt *time.Time `json:"last_succeeded_at,omitempty"` + LastError string `json:"last_error,omitempty"` +} + +type JobResult struct { + Job string `json:"job"` + StartedAt time.Time `json:"started_at"` + FinishedAt time.Time `json:"finished_at"` + ProcessedSamples int `json:"processed_samples"` + WrittenMinutes int `json:"written_minutes"` + ReplayedMinutes int `json:"replayed_minutes"` + Status string `json:"status"` + Error string `json:"error,omitempty"` + SourceStatuses []SourceStatus `json:"source_statuses,omitempty"` } diff --git a/internal/repository/postgres.go b/internal/repository/postgres.go index 0d67e05..6ac41d5 100644 --- a/internal/repository/postgres.go +++ b/internal/repository/postgres.go @@ -250,6 +250,81 @@ func (p *Postgres) GetBillingProfile(ctx context.Context, accountUUID string) (* return &profile, nil } +func (p *Postgres) GetSourceSyncState(ctx context.Context, sourceID string) (*model.SourceSyncState, error) { + const query = ` + SELECT source_id, last_completed_until, last_attempted_at, last_succeeded_at, last_error + FROM billing_source_sync_state + WHERE source_id = $1` + + var state model.SourceSyncState + var lastCompleted sql.NullTime + var lastAttempted sql.NullTime + var lastSucceeded sql.NullTime + + err := p.db.QueryRowContext(ctx, query, sourceID).Scan( + &state.SourceID, + &lastCompleted, + &lastAttempted, + &lastSucceeded, + &state.LastError, + ) + if err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err + } + if lastCompleted.Valid { + value := lastCompleted.Time + state.LastCompletedUntil = &value + } + if lastAttempted.Valid { + value := lastAttempted.Time + state.LastAttemptedAt = &value + } + if lastSucceeded.Valid { + value := lastSucceeded.Time + state.LastSucceededAt = &value + } + return &state, nil +} + +func (p *Postgres) UpsertSourceSyncState(ctx context.Context, state model.SourceSyncState) error { + const query = ` + INSERT INTO billing_source_sync_state ( + source_id, last_completed_until, last_attempted_at, last_succeeded_at, last_error + ) VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (source_id) DO UPDATE SET + last_completed_until = EXCLUDED.last_completed_until, + last_attempted_at = EXCLUDED.last_attempted_at, + last_succeeded_at = EXCLUDED.last_succeeded_at, + last_error = EXCLUDED.last_error, + updated_at = now() + ` + + var lastCompleted any + if state.LastCompletedUntil != nil { + lastCompleted = state.LastCompletedUntil.UTC() + } + var lastAttempted any + if state.LastAttemptedAt != nil { + lastAttempted = state.LastAttemptedAt.UTC() + } + var lastSucceeded any + if state.LastSucceededAt != nil { + lastSucceeded = state.LastSucceededAt.UTC() + } + + _, err := p.db.ExecContext(ctx, query, + state.SourceID, + lastCompleted, + lastAttempted, + lastSucceeded, + state.LastError, + ) + return err +} + var _ Repository = (*Postgres)(nil) func ensureUTC(ts time.Time) time.Time { diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 8b2ffea..8eb37be 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -14,4 +14,6 @@ type Repository interface { GetQuotaState(ctx context.Context, accountUUID string) (*model.QuotaState, error) UpsertQuotaState(ctx context.Context, state model.QuotaState) error GetBillingProfile(ctx context.Context, accountUUID string) (*model.BillingProfile, error) + GetSourceSyncState(ctx context.Context, sourceID string) (*model.SourceSyncState, error) + UpsertSourceSyncState(ctx context.Context, state model.SourceSyncState) error } diff --git a/internal/service/postgres_acceptance_test.go b/internal/service/postgres_acceptance_test.go index 0591fa5..302a788 100644 --- a/internal/service/postgres_acceptance_test.go +++ b/internal/service/postgres_acceptance_test.go @@ -43,6 +43,7 @@ func TestPostgresAcceptanceWritesAccountingTables(t *testing.T) { accountUUID := "11111111-1111-1111-1111-111111111111" if _, err := db.ExecContext(ctx, ` + DELETE FROM billing_source_sync_state; DELETE FROM billing_ledger; DELETE FROM traffic_minute_buckets; DELETE FROM traffic_stat_checkpoints; @@ -59,22 +60,33 @@ func TestPostgresAcceptanceWritesAccountingTables(t *testing.T) { } svc := New(config.Config{ + ExporterSources: []config.ExporterSource{{ + SourceID: "default", + BaseURL: "https://jp-xhttp-contabo.svc.plus", + ExpectedNodeID: "jp-node", + ExpectedEnv: "prod", + Enabled: true, + TimeoutSeconds: 15, + }}, + InternalServiceToken: "secret", DefaultRegion: "", SourceRevision: "billing-service-acceptance", PricePerByte: 0.5, InitialIncludedQuotaBytes: 1000, InitialBalance: 0, - }, fakeSource{snapshot: model.Snapshot{ - CollectedAt: time.Date(2026, 4, 8, 11, 0, 45, 0, time.UTC), - NodeID: "jp-node", - Env: "prod", - Samples: []model.Sample{{ - UUID: accountUUID, - Email: "billing@example.com", - InboundTag: "premium", - UplinkBytesTotal: 100, - DownlinkBytesTotal: 50, - }}, + }, &fakeWindowSource{pagesBySource: map[string][]model.SnapshotWindowPage{ + "default": {singleSnapshotPage(model.Snapshot{ + CollectedAt: time.Date(2026, 4, 8, 11, 0, 45, 0, time.UTC), + NodeID: "jp-node", + Env: "prod", + Samples: []model.Sample{{ + UUID: accountUUID, + Email: "billing@example.com", + InboundTag: "premium", + UplinkBytesTotal: 100, + DownlinkBytesTotal: 50, + }}, + })}, }}, repository.NewPostgres(db)) result, err := svc.RunCollectAndRate(ctx, "collect-and-rate") @@ -89,6 +101,7 @@ func TestPostgresAcceptanceWritesAccountingTables(t *testing.T) { assertRowCount(t, db, "traffic_minute_buckets", 1) assertRowCount(t, db, "billing_ledger", 1) assertRowCount(t, db, "account_quota_states", 1) + assertRowCount(t, db, "billing_source_sync_state", 1) var totalBytes int64 if err := db.QueryRowContext(ctx, `SELECT total_bytes FROM traffic_minute_buckets LIMIT 1`).Scan(&totalBytes); err != nil { diff --git a/internal/service/service.go b/internal/service/service.go index d39acea..da74c4b 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -15,13 +15,18 @@ import ( "github.com/google/uuid" ) -type snapshotSource interface { - FetchLatestSnapshot(ctx context.Context) (model.Snapshot, error) +const ( + sourceWindowOverlap = 2 * time.Minute + sourceWindowPageSize = 120 +) + +type windowSource interface { + FetchWindow(ctx context.Context, source config.ExporterSource, since, until time.Time, limit int, cursor *time.Time) (model.SnapshotWindowPage, error) } type Service struct { cfg config.Config - source snapshotSource + source windowSource repo repository.Repository mu sync.Mutex @@ -30,7 +35,7 @@ type Service struct { lastError string } -func New(cfg config.Config, source snapshotSource, repo repository.Repository) *Service { +func New(cfg config.Config, source windowSource, repo repository.Repository) *Service { return &Service{ cfg: cfg, source: source, @@ -65,30 +70,31 @@ func (s *Service) RunCollectAndRate(ctx context.Context, job string) (model.JobR Status: "ok", } - snapshot, err := s.source.FetchLatestSnapshot(ctx) - if err != nil { - result.Status = "error" - result.Error = err.Error() - result.FinishedAt = time.Now().UTC() - s.record(result) - return result, err + enabledSources := 0 + fatalSourceFailures := 0 + for _, source := range s.cfg.ExporterSources { + if !source.Enabled { + continue + } + enabledSources++ + + status, err := s.collectSource(ctx, source, &result) + result.SourceStatuses = append(result.SourceStatuses, status) + if err != nil { + fatalSourceFailures++ + result.Error = joinError(result.Error, err.Error()) + } } - for _, sample := range snapshot.Samples { - if err := validateSample(sample); err != nil { + if enabledSources == 0 { + result.Status = "error" + result.Error = joinError(result.Error, "no enabled exporter sources configured") + } + if fatalSourceFailures > 0 { + if result.ProcessedSamples == 0 && result.WrittenMinutes == 0 && result.ReplayedMinutes == 0 && result.Status != "partial" { + result.Status = "error" + } else if result.Status == "ok" { result.Status = "partial" - result.Error = joinError(result.Error, err.Error()) - continue - } - - processed, err := s.processSample(ctx, snapshot, sample, &result) - if err != nil { - result.Status = "partial" - result.Error = joinError(result.Error, err.Error()) - continue - } - if processed { - result.ProcessedSamples++ } } @@ -112,6 +118,110 @@ func (s *Service) Health() (bool, string) { return s.lastOK, s.lastError } +func (s *Service) collectSource(ctx context.Context, source config.ExporterSource, result *model.JobResult) (model.SourceStatus, error) { + state, err := s.repo.GetSourceSyncState(ctx, source.SourceID) + if err != nil { + return model.SourceStatus{SourceID: source.SourceID, LastError: err.Error()}, fmt.Errorf("load source sync state %s: %w", source.SourceID, err) + } + if state == nil { + state = &model.SourceSyncState{SourceID: source.SourceID} + } + + attemptedAt := time.Now().UTC() + state.LastAttemptedAt = &attemptedAt + state.LastError = "" + if err := s.repo.UpsertSourceSyncState(ctx, *state); err != nil { + return sourceStatusFromState(*state), fmt.Errorf("record source attempt %s: %w", source.SourceID, err) + } + + until := time.Now().UTC().Truncate(time.Minute).Add(-time.Minute) + since := until.Add(-sourceWindowOverlap) + if state.LastCompletedUntil != nil { + since = state.LastCompletedUntil.UTC().Add(-sourceWindowOverlap) + } + if since.After(until) { + completedUntil := until + state.LastCompletedUntil = &completedUntil + succeededAt := time.Now().UTC() + state.LastSucceededAt = &succeededAt + state.LastError = "" + if err := s.repo.UpsertSourceSyncState(ctx, *state); err != nil { + return sourceStatusFromState(*state), fmt.Errorf("record source noop completion %s: %w", source.SourceID, err) + } + return sourceStatusFromState(*state), nil + } + + var cursor *time.Time + var lastProcessed *time.Time + for { + page, err := s.source.FetchWindow(ctx, source, since, until, sourceWindowPageSize, cursor) + if err != nil { + return s.recordSourceFailure(ctx, *state, fmt.Errorf("fetch window for %s: %w", source.SourceID, err)) + } + + for _, snapshot := range page.Snapshots { + if err := validateSnapshotSource(snapshot, source); err != nil { + return s.recordSourceFailure(ctx, *state, err) + } + + processed, err := s.processSnapshot(ctx, snapshot, result) + if err != nil { + return s.recordSourceFailure(ctx, *state, err) + } + if processed { + collectedAt := snapshot.CollectedAt.UTC() + lastProcessed = &collectedAt + } + } + + if !page.HasMore { + break + } + if strings.TrimSpace(page.NextCursor) == "" { + return s.recordSourceFailure(ctx, *state, fmt.Errorf("fetch window for %s: next_cursor missing while has_more=true", source.SourceID)) + } + nextCursor, err := time.Parse(time.RFC3339, strings.TrimSpace(page.NextCursor)) + if err != nil { + return s.recordSourceFailure(ctx, *state, fmt.Errorf("parse next cursor for %s: %w", source.SourceID, err)) + } + cursor = &nextCursor + } + + completedUntil := until + if lastProcessed != nil && lastProcessed.Before(completedUntil) { + completedUntil = lastProcessed.UTC() + } + succeededAt := time.Now().UTC() + state.LastCompletedUntil = &completedUntil + state.LastSucceededAt = &succeededAt + state.LastError = "" + if err := s.repo.UpsertSourceSyncState(ctx, *state); err != nil { + return sourceStatusFromState(*state), fmt.Errorf("record source completion %s: %w", source.SourceID, err) + } + return sourceStatusFromState(*state), nil +} + +func (s *Service) processSnapshot(ctx context.Context, snapshot model.Snapshot, result *model.JobResult) (bool, error) { + processedAny := false + for _, sample := range snapshot.Samples { + if err := validateSample(sample); err != nil { + result.Status = "partial" + result.Error = joinError(result.Error, err.Error()) + continue + } + + processed, err := s.processSample(ctx, snapshot, sample, result) + if err != nil { + return processedAny, fmt.Errorf("process snapshot %s for %s: %w", snapshot.CollectedAt.UTC().Format(time.RFC3339), sample.UUID, err) + } + if processed { + processedAny = true + result.ProcessedSamples++ + } + } + return processedAny, nil +} + func (s *Service) processSample(ctx context.Context, snapshot model.Snapshot, sample model.Sample, result *model.JobResult) (bool, error) { storageNodeID := composeStorageNodeID(snapshot.Env, snapshot.NodeID) minuteStart := snapshot.CollectedAt.UTC().Truncate(time.Minute) @@ -178,15 +288,12 @@ func (s *Service) processSample(ctx context.Context, snapshot model.Snapshot, sa result.WrittenMinutes++ } - amountDelta := -float64(totalBytes) * s.cfg.PricePerByte entry := model.LedgerEntry{ ID: deterministicLedgerID(bucket), AccountUUID: sample.UUID, BucketStart: minuteStart, BucketEnd: minuteStart.Add(time.Minute), EntryType: "traffic_charge", - RatedBytes: totalBytes, - AmountDelta: amountDelta, PricingRuleVersion: s.cfg.SourceRevision, } @@ -207,7 +314,7 @@ func (s *Service) processSample(ctx context.Context, snapshot model.Snapshot, sa includedApplied := minInt64(quota.RemainingIncludedQuota, totalBytes) chargeableBytes := totalBytes - includedApplied - amountDelta = -float64(chargeableBytes) * effectivePricing.basePricePerByte * effectivePricing.multiplier + amountDelta := -float64(chargeableBytes) * effectivePricing.basePricePerByte * effectivePricing.multiplier entry.RatedBytes = chargeableBytes entry.AmountDelta = amountDelta entry.PricingRuleVersion = effectivePricing.pricingRuleVersion @@ -314,6 +421,16 @@ func validateSample(sample model.Sample) error { return nil } +func validateSnapshotSource(snapshot model.Snapshot, source config.ExporterSource) error { + if source.ExpectedNodeID != "" && strings.TrimSpace(snapshot.NodeID) != source.ExpectedNodeID { + return fmt.Errorf("source %s expected node_id %q, got %q", source.SourceID, source.ExpectedNodeID, strings.TrimSpace(snapshot.NodeID)) + } + if source.ExpectedEnv != "" && strings.TrimSpace(snapshot.Env) != source.ExpectedEnv { + return fmt.Errorf("source %s expected env %q, got %q", source.SourceID, source.ExpectedEnv, strings.TrimSpace(snapshot.Env)) + } + return nil +} + func deterministicLedgerID(bucket model.MinuteBucket) string { key := fmt.Sprintf("%s|%s|%s|%s|%s", bucket.BucketStart.UTC().Format(time.RFC3339), bucket.NodeID, bucket.AccountUUID, bucket.Region, bucket.LineCode) return uuid.NewSHA1(uuid.NameSpaceOID, []byte(key)).String() @@ -335,6 +452,34 @@ func joinError(existing, next string) string { return existing + "; " + next } +func sourceStatusFromState(state model.SourceSyncState) model.SourceStatus { + return model.SourceStatus{ + SourceID: state.SourceID, + LastCompletedUntil: copyTimePtr(state.LastCompletedUntil), + LastAttemptedAt: copyTimePtr(state.LastAttemptedAt), + LastSucceededAt: copyTimePtr(state.LastSucceededAt), + LastError: state.LastError, + } +} + +func copyTimePtr(value *time.Time) *time.Time { + if value == nil { + return nil + } + cloned := value.UTC() + return &cloned +} + +func (s *Service) recordSourceFailure(ctx context.Context, state model.SourceSyncState, err error) (model.SourceStatus, error) { + message := err.Error() + state.LastError = message + if persistErr := s.repo.UpsertSourceSyncState(ctx, state); persistErr != nil { + message = joinError(message, fmt.Sprintf("persist source error state: %v", persistErr)) + state.LastError = message + } + return sourceStatusFromState(state), err +} + func (s *Service) record(result model.JobResult) { s.lastResult = result s.lastError = result.Error diff --git a/internal/service/service_test.go b/internal/service/service_test.go index 10f60d6..bd11d23 100644 --- a/internal/service/service_test.go +++ b/internal/service/service_test.go @@ -10,13 +10,38 @@ import ( "billing-service/internal/repository" ) -type fakeSource struct { - snapshot model.Snapshot - err error +type fakeWindowSource struct { + pagesBySource map[string][]model.SnapshotWindowPage + errBySource map[string]error + requests []windowRequest } -func (f fakeSource) FetchLatestSnapshot(context.Context) (model.Snapshot, error) { - return f.snapshot, f.err +type windowRequest struct { + sourceID string + since time.Time + until time.Time + limit int + cursor *time.Time +} + +func (f *fakeWindowSource) FetchWindow(_ context.Context, source config.ExporterSource, since, until time.Time, limit int, cursor *time.Time) (model.SnapshotWindowPage, error) { + f.requests = append(f.requests, windowRequest{ + sourceID: source.SourceID, + since: since, + until: until, + limit: limit, + cursor: cursor, + }) + if err := f.errBySource[source.SourceID]; err != nil { + return model.SnapshotWindowPage{}, err + } + pages := f.pagesBySource[source.SourceID] + if len(pages) == 0 { + return model.SnapshotWindowPage{}, nil + } + page := pages[0] + f.pagesBySource[source.SourceID] = pages[1:] + return page, nil } type memoryRepo struct { @@ -25,6 +50,7 @@ type memoryRepo struct { ledgers map[string]model.LedgerEntry quotas map[string]model.QuotaState profiles map[string]model.BillingProfile + sourceSync map[string]model.SourceSyncState } func newMemoryRepo() *memoryRepo { @@ -34,6 +60,7 @@ func newMemoryRepo() *memoryRepo { ledgers: map[string]model.LedgerEntry{}, quotas: map[string]model.QuotaState{}, profiles: map[string]model.BillingProfile{}, + sourceSync: map[string]model.SourceSyncState{}, } } @@ -45,7 +72,7 @@ func bucketKey(bucket model.MinuteBucket) string { return bucket.BucketStart.UTC().Format(time.RFC3339) + "\x00" + bucket.NodeID + "\x00" + bucket.AccountUUID + "\x00" + bucket.Region + "\x00" + bucket.LineCode } -func (m *memoryRepo) GetCheckpoint(ctx context.Context, nodeID, accountUUID string) (*model.Checkpoint, error) { +func (m *memoryRepo) GetCheckpoint(_ context.Context, nodeID, accountUUID string) (*model.Checkpoint, error) { if checkpoint, ok := m.checkpoints[checkpointKey(nodeID, accountUUID)]; ok { copy := checkpoint return ©, nil @@ -53,25 +80,25 @@ func (m *memoryRepo) GetCheckpoint(ctx context.Context, nodeID, accountUUID stri return nil, nil } -func (m *memoryRepo) UpsertCheckpoint(ctx context.Context, checkpoint model.Checkpoint) error { +func (m *memoryRepo) UpsertCheckpoint(_ context.Context, checkpoint model.Checkpoint) error { m.checkpoints[checkpointKey(checkpoint.NodeID, checkpoint.AccountUUID)] = checkpoint return nil } -func (m *memoryRepo) UpsertMinuteBucket(ctx context.Context, bucket model.MinuteBucket) (bool, error) { +func (m *memoryRepo) UpsertMinuteBucket(_ context.Context, bucket model.MinuteBucket) (bool, error) { key := bucketKey(bucket) _, existed := m.buckets[key] m.buckets[key] = bucket return existed, nil } -func (m *memoryRepo) UpsertLedger(ctx context.Context, entry model.LedgerEntry) (bool, error) { +func (m *memoryRepo) UpsertLedger(_ context.Context, entry model.LedgerEntry) (bool, error) { _, existed := m.ledgers[entry.ID] m.ledgers[entry.ID] = entry return existed, nil } -func (m *memoryRepo) GetQuotaState(ctx context.Context, accountUUID string) (*model.QuotaState, error) { +func (m *memoryRepo) GetQuotaState(_ context.Context, accountUUID string) (*model.QuotaState, error) { if quota, ok := m.quotas[accountUUID]; ok { copy := quota return ©, nil @@ -79,12 +106,12 @@ func (m *memoryRepo) GetQuotaState(ctx context.Context, accountUUID string) (*mo return nil, nil } -func (m *memoryRepo) UpsertQuotaState(ctx context.Context, state model.QuotaState) error { +func (m *memoryRepo) UpsertQuotaState(_ context.Context, state model.QuotaState) error { m.quotas[state.AccountUUID] = state return nil } -func (m *memoryRepo) GetBillingProfile(ctx context.Context, accountUUID string) (*model.BillingProfile, error) { +func (m *memoryRepo) GetBillingProfile(_ context.Context, accountUUID string) (*model.BillingProfile, error) { if profile, ok := m.profiles[accountUUID]; ok { copy := profile return ©, nil @@ -92,10 +119,40 @@ func (m *memoryRepo) GetBillingProfile(ctx context.Context, accountUUID string) return nil, nil } +func (m *memoryRepo) GetSourceSyncState(_ context.Context, sourceID string) (*model.SourceSyncState, error) { + if state, ok := m.sourceSync[sourceID]; ok { + copy := cloneSyncState(state) + return ©, nil + } + return nil, nil +} + +func (m *memoryRepo) UpsertSourceSyncState(_ context.Context, state model.SourceSyncState) error { + m.sourceSync[state.SourceID] = cloneSyncState(state) + return nil +} + +func cloneSyncState(state model.SourceSyncState) model.SourceSyncState { + copy := state + copy.LastCompletedUntil = copyTimePtr(state.LastCompletedUntil) + copy.LastAttemptedAt = copyTimePtr(state.LastAttemptedAt) + copy.LastSucceededAt = copyTimePtr(state.LastSucceededAt) + return copy +} + var _ repository.Repository = (*memoryRepo)(nil) func baseConfig() config.Config { return config.Config{ + ExporterSources: []config.ExporterSource{{ + SourceID: "default", + BaseURL: "https://jp-xhttp-contabo.svc.plus", + ExpectedNodeID: "jp-node", + ExpectedEnv: "prod", + Enabled: true, + TimeoutSeconds: 15, + }}, + InternalServiceToken: "secret", DefaultRegion: "", SourceRevision: "billing-service-v1", PricePerByte: 0.5, @@ -104,20 +161,33 @@ func baseConfig() config.Config { } } +func singleSnapshotPage(snapshot model.Snapshot) model.SnapshotWindowPage { + return model.SnapshotWindowPage{ + NodeID: snapshot.NodeID, + Env: snapshot.Env, + Snapshots: []model.Snapshot{snapshot}, + } +} + func TestDeltaCalculationAndQuotaUpdate(t *testing.T) { repo := newMemoryRepo() - svc := New(baseConfig(), fakeSource{snapshot: model.Snapshot{ - CollectedAt: time.Date(2026, 4, 8, 10, 30, 15, 0, time.UTC), - NodeID: "jp-node", - Env: "prod", - Samples: []model.Sample{{ - UUID: "11111111-1111-1111-1111-111111111111", - Email: "user@example.com", - InboundTag: "premium", - UplinkBytesTotal: 100, - DownlinkBytesTotal: 50, - }}, - }}, repo) + source := &fakeWindowSource{ + pagesBySource: map[string][]model.SnapshotWindowPage{ + "default": {singleSnapshotPage(model.Snapshot{ + CollectedAt: time.Date(2026, 4, 8, 10, 30, 15, 0, time.UTC), + NodeID: "jp-node", + Env: "prod", + Samples: []model.Sample{{ + UUID: "11111111-1111-1111-1111-111111111111", + Email: "user@example.com", + InboundTag: "premium", + UplinkBytesTotal: 100, + DownlinkBytesTotal: 50, + }}, + })}, + }, + } + svc := New(baseConfig(), source, repo) result, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate") if err != nil { @@ -144,17 +214,22 @@ func TestIncludedQuotaAndMultipliersFromBillingProfile(t *testing.T) { LineMultiplier: 2.0, PricingRuleVersion: "pricing-v2", } - svc := New(baseConfig(), fakeSource{snapshot: model.Snapshot{ - CollectedAt: time.Date(2026, 4, 8, 10, 30, 15, 0, time.UTC), - NodeID: "jp-node", - Env: "prod", - Samples: []model.Sample{{ - UUID: accountUUID, - InboundTag: "premium", - UplinkBytesTotal: 100, - DownlinkBytesTotal: 50, - }}, - }}, repo) + source := &fakeWindowSource{ + pagesBySource: map[string][]model.SnapshotWindowPage{ + "default": {singleSnapshotPage(model.Snapshot{ + CollectedAt: time.Date(2026, 4, 8, 10, 30, 15, 0, time.UTC), + NodeID: "jp-node", + Env: "prod", + Samples: []model.Sample{{ + UUID: accountUUID, + InboundTag: "premium", + UplinkBytesTotal: 100, + DownlinkBytesTotal: 50, + }}, + })}, + }, + } + svc := New(baseConfig(), source, repo) result, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate") if err != nil { @@ -197,19 +272,37 @@ func TestIncludedQuotaAndMultipliersFromBillingProfile(t *testing.T) { func TestDuplicateMinuteIsReplaySafe(t *testing.T) { repo := newMemoryRepo() - snapshot := model.Snapshot{ - CollectedAt: time.Date(2026, 4, 8, 10, 30, 30, 0, time.UTC), - NodeID: "jp-node", - Env: "prod", - Samples: []model.Sample{{ - UUID: "11111111-1111-1111-1111-111111111111", - Email: "user@example.com", - InboundTag: "premium", - UplinkBytesTotal: 100, - DownlinkBytesTotal: 50, - }}, + source := &fakeWindowSource{ + pagesBySource: map[string][]model.SnapshotWindowPage{ + "default": { + singleSnapshotPage(model.Snapshot{ + CollectedAt: time.Date(2026, 4, 8, 10, 30, 30, 0, time.UTC), + NodeID: "jp-node", + Env: "prod", + Samples: []model.Sample{{ + UUID: "11111111-1111-1111-1111-111111111111", + Email: "user@example.com", + InboundTag: "premium", + UplinkBytesTotal: 100, + DownlinkBytesTotal: 50, + }}, + }), + singleSnapshotPage(model.Snapshot{ + CollectedAt: time.Date(2026, 4, 8, 10, 30, 30, 0, time.UTC), + NodeID: "jp-node", + Env: "prod", + Samples: []model.Sample{{ + UUID: "11111111-1111-1111-1111-111111111111", + Email: "user@example.com", + InboundTag: "premium", + UplinkBytesTotal: 100, + DownlinkBytesTotal: 50, + }}, + }), + }, + }, } - svc := New(baseConfig(), fakeSource{snapshot: snapshot}, repo) + svc := New(baseConfig(), source, repo) if _, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate"); err != nil { t.Fatalf("first run: %v", err) @@ -240,17 +333,22 @@ func TestNegativeDeltaProtection(t *testing.T) { XrayRevision: "prev", ResetEpoch: 0, } - svc := New(cfg, fakeSource{snapshot: model.Snapshot{ - CollectedAt: time.Date(2026, 4, 8, 10, 31, 0, 0, time.UTC), - NodeID: "jp-node", - Env: "prod", - Samples: []model.Sample{{ - UUID: accountUUID, - InboundTag: "premium", - UplinkBytesTotal: 10, - DownlinkBytesTotal: 20, - }}, - }}, repo) + source := &fakeWindowSource{ + pagesBySource: map[string][]model.SnapshotWindowPage{ + "default": {singleSnapshotPage(model.Snapshot{ + CollectedAt: time.Date(2026, 4, 8, 10, 31, 0, 0, time.UTC), + NodeID: "jp-node", + Env: "prod", + Samples: []model.Sample{{ + UUID: accountUUID, + InboundTag: "premium", + UplinkBytesTotal: 10, + DownlinkBytesTotal: 20, + }}, + })}, + }, + } + svc := New(cfg, source, repo) result, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate") if err != nil { @@ -278,17 +376,22 @@ func TestRestartRecoveryFromCheckpoint(t *testing.T) { LastDownlinkTotal: 100, LastSeenAt: time.Now().UTC(), } - svc := New(baseConfig(), fakeSource{snapshot: model.Snapshot{ - CollectedAt: time.Date(2026, 4, 8, 10, 32, 0, 0, time.UTC), - NodeID: "jp-node", - Env: "prod", - Samples: []model.Sample{{ - UUID: accountUUID, - InboundTag: "premium", - UplinkBytesTotal: 130, - DownlinkBytesTotal: 140, - }}, - }}, repo) + source := &fakeWindowSource{ + pagesBySource: map[string][]model.SnapshotWindowPage{ + "default": {singleSnapshotPage(model.Snapshot{ + CollectedAt: time.Date(2026, 4, 8, 10, 32, 0, 0, time.UTC), + NodeID: "jp-node", + Env: "prod", + Samples: []model.Sample{{ + UUID: accountUUID, + InboundTag: "premium", + UplinkBytesTotal: 130, + DownlinkBytesTotal: 140, + }}, + })}, + }, + } + svc := New(baseConfig(), source, repo) result, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate") if err != nil { @@ -313,60 +416,103 @@ func TestMultiEnvIsolation(t *testing.T) { repo := newMemoryRepo() accountUUID := "11111111-1111-1111-1111-111111111111" cfg := baseConfig() - - prodSvc := New(cfg, fakeSource{snapshot: model.Snapshot{ - CollectedAt: time.Date(2026, 4, 8, 10, 33, 0, 0, time.UTC), - NodeID: "jp-node", - Env: "prod", - Samples: []model.Sample{{UUID: accountUUID, InboundTag: "premium", UplinkBytesTotal: 10, DownlinkBytesTotal: 10}}, - }}, repo) - previewSvc := New(cfg, fakeSource{snapshot: model.Snapshot{ - CollectedAt: time.Date(2026, 4, 8, 10, 33, 0, 0, time.UTC), - NodeID: "jp-node", - Env: "preview", - Samples: []model.Sample{{UUID: accountUUID, InboundTag: "premium", UplinkBytesTotal: 10, DownlinkBytesTotal: 10}}, - }}, repo) - - if _, err := prodSvc.RunCollectAndRate(context.Background(), "collect-and-rate"); err != nil { - t.Fatalf("prod run: %v", err) + cfg.ExporterSources = []config.ExporterSource{ + { + SourceID: "prod-source", + BaseURL: "https://prod.svc.plus", + ExpectedNodeID: "jp-node", + ExpectedEnv: "prod", + Enabled: true, + TimeoutSeconds: 15, + }, + { + SourceID: "preview-source", + BaseURL: "https://preview.svc.plus", + ExpectedNodeID: "jp-node", + ExpectedEnv: "preview", + Enabled: true, + TimeoutSeconds: 15, + }, } - if _, err := previewSvc.RunCollectAndRate(context.Background(), "collect-and-rate"); err != nil { - t.Fatalf("preview run: %v", err) + source := &fakeWindowSource{ + pagesBySource: map[string][]model.SnapshotWindowPage{ + "prod-source": {singleSnapshotPage(model.Snapshot{ + CollectedAt: time.Date(2026, 4, 8, 10, 33, 0, 0, time.UTC), + NodeID: "jp-node", + Env: "prod", + Samples: []model.Sample{{UUID: accountUUID, InboundTag: "premium", UplinkBytesTotal: 10, DownlinkBytesTotal: 10}}, + })}, + "preview-source": {singleSnapshotPage(model.Snapshot{ + CollectedAt: time.Date(2026, 4, 8, 10, 33, 0, 0, time.UTC), + NodeID: "jp-node", + Env: "preview", + Samples: []model.Sample{{UUID: accountUUID, InboundTag: "premium", UplinkBytesTotal: 10, DownlinkBytesTotal: 10}}, + })}, + }, + } + svc := New(cfg, source, repo) + + if _, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate"); err != nil { + t.Fatalf("run: %v", err) } if len(repo.buckets) != 2 { t.Fatalf("expected isolated buckets per env, got %d", len(repo.buckets)) } } -func TestLateMinuteReconcileUsesSameMinuteKey(t *testing.T) { +func TestExpectedNodeIDMismatchIsFatalForSource(t *testing.T) { repo := newMemoryRepo() - accountUUID := "11111111-1111-1111-1111-111111111111" - cfg := baseConfig() - collectedAt := time.Date(2026, 4, 8, 10, 34, 50, 0, time.UTC) - snapshot := model.Snapshot{ - CollectedAt: collectedAt, - NodeID: "jp-node", - Env: "prod", - Samples: []model.Sample{{ - UUID: accountUUID, - InboundTag: "premium", - UplinkBytesTotal: 20, - DownlinkBytesTotal: 20, - }}, + source := &fakeWindowSource{ + pagesBySource: map[string][]model.SnapshotWindowPage{ + "default": {singleSnapshotPage(model.Snapshot{ + CollectedAt: time.Date(2026, 4, 8, 10, 34, 0, 0, time.UTC), + NodeID: "unexpected-node", + Env: "prod", + Samples: []model.Sample{{UUID: "11111111-1111-1111-1111-111111111111", InboundTag: "premium", UplinkBytesTotal: 10, DownlinkBytesTotal: 10}}, + })}, + }, } - svc := New(cfg, fakeSource{snapshot: snapshot}, repo) + svc := New(baseConfig(), source, repo) - if _, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate"); err != nil { - t.Fatalf("first run: %v", err) + result, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate") + if err == nil { + t.Fatalf("expected source mismatch error") } - result, err := svc.RunCollectAndRate(context.Background(), "reconcile") - if err != nil { - t.Fatalf("reconcile run: %v", err) - } - if result.ReplayedMinutes == 0 { - t.Fatalf("expected reconcile to report replayed minute, got %#v", result) - } - if len(repo.buckets) != 1 { - t.Fatalf("expected single logical minute bucket, got %d", len(repo.buckets)) + if result.Status != "error" { + t.Fatalf("expected error status, got %#v", result) + } +} + +func TestSourceStatusIncludesSyncState(t *testing.T) { + repo := newMemoryRepo() + source := &fakeWindowSource{ + pagesBySource: map[string][]model.SnapshotWindowPage{ + "default": {singleSnapshotPage(model.Snapshot{ + CollectedAt: time.Date(2026, 4, 8, 10, 35, 0, 0, time.UTC), + NodeID: "jp-node", + Env: "prod", + Samples: []model.Sample{{ + UUID: "11111111-1111-1111-1111-111111111111", + InboundTag: "premium", + UplinkBytesTotal: 10, + DownlinkBytesTotal: 10, + }}, + })}, + }, + } + svc := New(baseConfig(), source, repo) + + result, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate") + if err != nil { + t.Fatalf("run job: %v", err) + } + if len(result.SourceStatuses) != 1 { + t.Fatalf("expected one source status, got %#v", result.SourceStatuses) + } + if result.SourceStatuses[0].SourceID != "default" { + t.Fatalf("unexpected source status %#v", result.SourceStatuses[0]) + } + if result.SourceStatuses[0].LastCompletedUntil == nil { + t.Fatalf("expected last completed until in source status") } } diff --git a/sql/billing-service-schema.sql b/sql/billing-service-schema.sql new file mode 100644 index 0000000..52c89f4 --- /dev/null +++ b/sql/billing-service-schema.sql @@ -0,0 +1,89 @@ +-- billing-service bootstrap/reference DDL +-- +-- This file mirrors the accounting tables that billing-service depends on in +-- the current accounts.svc.plus PostgreSQL schema. It is a service-owned +-- documentation/bootstrap artifact and does not redefine schema ownership. + +CREATE TABLE IF NOT EXISTS public.traffic_stat_checkpoints ( + node_id TEXT NOT NULL, + account_uuid UUID NOT NULL REFERENCES public.users(uuid) ON DELETE CASCADE, + last_uplink_total BIGINT NOT NULL DEFAULT 0, + last_downlink_total BIGINT NOT NULL DEFAULT 0, + last_seen_at TIMESTAMPTZ NOT NULL, + xray_revision TEXT NOT NULL DEFAULT '', + reset_epoch BIGINT NOT NULL DEFAULT 0, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (node_id, account_uuid) +); + +CREATE TABLE IF NOT EXISTS public.traffic_minute_buckets ( + bucket_start TIMESTAMPTZ NOT NULL, + node_id TEXT NOT NULL, + account_uuid UUID NOT NULL REFERENCES public.users(uuid) ON DELETE CASCADE, + region TEXT NOT NULL DEFAULT '', + line_code TEXT NOT NULL DEFAULT '', + uplink_bytes BIGINT NOT NULL DEFAULT 0, + downlink_bytes BIGINT NOT NULL DEFAULT 0, + total_bytes BIGINT NOT NULL DEFAULT 0, + multiplier DOUBLE PRECISION NOT NULL DEFAULT 1.0, + rating_status TEXT NOT NULL DEFAULT 'pending', + source_revision TEXT NOT NULL DEFAULT '', + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (bucket_start, node_id, account_uuid, region, line_code) +); + +CREATE TABLE IF NOT EXISTS public.billing_ledger ( + id UUID PRIMARY KEY, + account_uuid UUID NOT NULL REFERENCES public.users(uuid) ON DELETE CASCADE, + bucket_start TIMESTAMPTZ NOT NULL, + bucket_end TIMESTAMPTZ NOT NULL, + entry_type TEXT NOT NULL, + rated_bytes BIGINT NOT NULL DEFAULT 0, + amount_delta DOUBLE PRECISION NOT NULL DEFAULT 0, + balance_after DOUBLE PRECISION NOT NULL DEFAULT 0, + pricing_rule_version TEXT NOT NULL DEFAULT '', + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE TABLE IF NOT EXISTS public.account_quota_states ( + account_uuid UUID PRIMARY KEY REFERENCES public.users(uuid) ON DELETE CASCADE, + remaining_included_quota BIGINT NOT NULL DEFAULT 0, + current_balance DOUBLE PRECISION NOT NULL DEFAULT 0, + arrears BOOLEAN NOT NULL DEFAULT false, + throttle_state TEXT NOT NULL DEFAULT 'normal', + suspend_state TEXT NOT NULL DEFAULT 'active', + last_rated_bucket_at TIMESTAMPTZ NULL, + effective_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE TABLE IF NOT EXISTS public.account_billing_profiles ( + account_uuid UUID PRIMARY KEY REFERENCES public.users(uuid) ON DELETE CASCADE, + package_name TEXT NOT NULL DEFAULT 'default', + included_quota_bytes BIGINT NOT NULL DEFAULT 0, + base_price_per_byte DOUBLE PRECISION NOT NULL DEFAULT 0, + region_multiplier DOUBLE PRECISION NOT NULL DEFAULT 1.0, + line_multiplier DOUBLE PRECISION NOT NULL DEFAULT 1.0, + peak_multiplier DOUBLE PRECISION NOT NULL DEFAULT 1.0, + offpeak_multiplier DOUBLE PRECISION NOT NULL DEFAULT 1.0, + pricing_rule_version TEXT NOT NULL DEFAULT 'pricing-default-v1', + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE TABLE IF NOT EXISTS public.billing_source_sync_state ( + source_id TEXT PRIMARY KEY, + last_completed_until TIMESTAMPTZ NULL, + last_attempted_at TIMESTAMPTZ NULL, + last_succeeded_at TIMESTAMPTZ NULL, + last_error TEXT NOT NULL DEFAULT '', + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_traffic_minute_buckets_account_bucket + ON public.traffic_minute_buckets (account_uuid, bucket_start DESC); + +CREATE INDEX IF NOT EXISTS idx_billing_ledger_account_created + ON public.billing_ledger (account_uuid, created_at DESC); diff --git a/testdata/postgres/init.sql b/testdata/postgres/init.sql index fb487cb..52a7664 100644 --- a/testdata/postgres/init.sql +++ b/testdata/postgres/init.sql @@ -84,3 +84,12 @@ CREATE TABLE IF NOT EXISTS public.account_billing_profiles ( created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); + +CREATE TABLE IF NOT EXISTS public.billing_source_sync_state ( + source_id TEXT PRIMARY KEY, + last_completed_until TIMESTAMPTZ NULL, + last_attempted_at TIMESTAMPTZ NULL, + last_succeeded_at TIMESTAMPTZ NULL, + last_error TEXT NOT NULL DEFAULT '', + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +);