feat: add multi-source billing ingestion (#3)
Co-authored-by: Haitao Pan <manbuzhe2009@qq.com>
This commit is contained in:
parent
1cd49fadd9
commit
886963606f
@ -13,3 +13,11 @@ existing `accounts.svc.plus` PostgreSQL schema.
|
||||
- `POST /v1/jobs/reconcile`
|
||||
- `GET /healthz`
|
||||
- `GET /v1/status`
|
||||
|
||||
## Documentation
|
||||
|
||||
- `docs/README.md` - documentation index and verification notes
|
||||
- `docs/architecture.md` - deployment and data-flow diagrams
|
||||
- `docs/api.md` - task API surface and upstream/downstream boundaries
|
||||
- `sql/billing-service-schema.sql` - bootstrap/reference DDL aligned with the
|
||||
current `accounts.svc.plus` accounting schema
|
||||
|
||||
@ -34,7 +34,7 @@ func main() {
|
||||
|
||||
svc := service.New(
|
||||
cfg,
|
||||
exporter.NewClient(cfg.ExporterBaseURL),
|
||||
exporter.NewClient(cfg.InternalServiceToken),
|
||||
repository.NewPostgres(db),
|
||||
)
|
||||
svc.Start(ctx)
|
||||
|
||||
48
docs/README.md
Normal file
48
docs/README.md
Normal file
@ -0,0 +1,48 @@
|
||||
# billing-service docs
|
||||
|
||||
This directory holds service-owned documentation for `billing-service`.
|
||||
|
||||
## Documents
|
||||
|
||||
- [architecture.md](architecture.md) - deployment topology, billing data flow,
|
||||
and current-vs-target architecture notes
|
||||
- [api.md](api.md) - task endpoints, upstream snapshot contract, and downstream
|
||||
read-model boundaries
|
||||
- [multi-node-https-plan.md](multi-node-https-plan.md) - target-state plan for
|
||||
evolving from a single exporter URL to secure multi-node HTTPS ingestion
|
||||
- [../sql/billing-service-schema.sql](../sql/billing-service-schema.sql) -
|
||||
reference DDL for the accounting tables `billing-service` depends on
|
||||
|
||||
## Scope
|
||||
|
||||
These docs describe the `billing-service` role inside the Cloud Network Billing
|
||||
& Control Plane.
|
||||
|
||||
- `billing-service` is the task-oriented write model
|
||||
- `accounts.svc.plus` is the PostgreSQL-backed read model
|
||||
- `console.svc.plus` is the presentation layer and does not query
|
||||
`billing-service` directly
|
||||
|
||||
System-wide contracts still live in
|
||||
`github-org-cloud-neutral-toolkit/docs/architecture/network-billing-contracts.md`.
|
||||
|
||||
## Deployment Verification
|
||||
|
||||
Local or operator dry-run validation:
|
||||
|
||||
```bash
|
||||
cd /Users/shenlan/workspaces/cloud-neutral-toolkit/playbooks
|
||||
export DATABASE_URL=postgres://...
|
||||
ANSIBLE_CONFIG=./ansible.cfg \
|
||||
ansible-playbook -i ./inventory.ini -D -C ./deploy_billing_service.yml -l jp_xhttp_contabo_host
|
||||
```
|
||||
|
||||
Notes:
|
||||
|
||||
- `DATABASE_URL` must be exported before running `deploy_billing_service.yml`
|
||||
- on `jp-xhttp-contabo.svc.plus`, `DATABASE_URL` should reference the same
|
||||
`account` database used by `accounts.svc.plus`
|
||||
- check mode may report predicted changes; the goal is to pass the preflight
|
||||
assertion and render a valid deployment plan
|
||||
- GitHub Actions uses the `BILLING_SERVICE_DATABASE_URL` secret to satisfy the
|
||||
same precondition in the `deploy-billing-service` job
|
||||
149
docs/api.md
Normal file
149
docs/api.md
Normal file
@ -0,0 +1,149 @@
|
||||
# billing-service API and interfaces
|
||||
|
||||
This document describes the current `billing-service` task API plus the
|
||||
upstream and downstream interfaces it depends on.
|
||||
|
||||
## Service endpoints
|
||||
|
||||
### `GET /healthz`
|
||||
|
||||
Returns service health derived from the most recent collect-and-rate execution.
|
||||
|
||||
Example response:
|
||||
|
||||
```json
|
||||
{
|
||||
"status": "ok",
|
||||
"message": ""
|
||||
}
|
||||
```
|
||||
|
||||
### `GET /v1/status`
|
||||
|
||||
Returns the latest in-memory job result snapshot.
|
||||
|
||||
Key fields:
|
||||
|
||||
- `job`
|
||||
- `started_at`
|
||||
- `finished_at`
|
||||
- `processed_samples`
|
||||
- `written_minutes`
|
||||
- `replayed_minutes`
|
||||
- `status`
|
||||
- `error`
|
||||
|
||||
### `POST /v1/jobs/collect-and-rate`
|
||||
|
||||
Triggers an immediate snapshot pull from `xray-exporter`, computes minute
|
||||
deltas, rates chargeable bytes, and writes replay-safe facts into PostgreSQL.
|
||||
|
||||
Behavior:
|
||||
|
||||
- method must be `POST`
|
||||
- returns `200` when the run completes without a fatal service error
|
||||
- returns `503` when upstream fetch or persistence fails hard enough to mark the
|
||||
run unavailable
|
||||
|
||||
### `POST /v1/jobs/reconcile`
|
||||
|
||||
Triggers the same execution path as collect-and-rate, but records the job name
|
||||
as `reconcile` for operational visibility.
|
||||
|
||||
## Upstream dependency
|
||||
|
||||
### `xray-exporter`
|
||||
|
||||
`billing-service` currently depends on a single exporter base URL and fetches:
|
||||
|
||||
- `GET /v1/snapshots/latest`
|
||||
|
||||
Minimum payload shape:
|
||||
|
||||
```json
|
||||
{
|
||||
"collected_at": "2026-04-08T12:00:00Z",
|
||||
"node_id": "jp-xhttp-contabo.svc.plus",
|
||||
"env": "prod",
|
||||
"samples": [
|
||||
{
|
||||
"uuid": "uuid-1",
|
||||
"email": "user@example.com",
|
||||
"inbound_tag": "xhttp-premium",
|
||||
"uplink_bytes_total": 1024,
|
||||
"downlink_bytes_total": 2048
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
Required fields:
|
||||
|
||||
- `collected_at`
|
||||
- `node_id`
|
||||
- `env`
|
||||
- `samples[].uuid`
|
||||
- `samples[].email`
|
||||
- `samples[].inbound_tag`
|
||||
- `samples[].uplink_bytes_total`
|
||||
- `samples[].downlink_bytes_total`
|
||||
|
||||
### Target upstream contract
|
||||
|
||||
Current production behavior remains `GET /v1/snapshots/latest`, but the target
|
||||
multi-node design should evolve to:
|
||||
|
||||
- HTTPS transport for remote exporter pulls
|
||||
- source-specific authentication
|
||||
- a windowed pull API that supports catch-up and pagination
|
||||
|
||||
Recommended target path:
|
||||
|
||||
- `GET /v1/snapshots/window?since=<RFC3339>&until=<RFC3339>&limit=<n>&cursor=<token>`
|
||||
|
||||
Target-state expectations:
|
||||
|
||||
- remote pulls use `https://` exporter base URLs
|
||||
- TLS verification stays enabled
|
||||
- each source can be authenticated independently
|
||||
- responses can be replayed safely from source checkpoints without duplicate
|
||||
billing writes
|
||||
|
||||
## Downstream reads
|
||||
|
||||
User-facing reads do not go through `billing-service`. The read model is
|
||||
`accounts.svc.plus`, backed by PostgreSQL.
|
||||
|
||||
Relevant downstream APIs:
|
||||
|
||||
- `GET /api/account/usage/summary`
|
||||
- `GET /api/account/usage/buckets`
|
||||
- `GET /api/account/billing/summary`
|
||||
|
||||
Read-path rules:
|
||||
|
||||
- `billing-service` does not expose user-facing usage or billing query APIs
|
||||
- `accounts.svc.plus` reads PostgreSQL-backed usage and billing facts
|
||||
- `console.svc.plus` queries `accounts.svc.plus`, not `billing-service`
|
||||
|
||||
## Configuration inputs
|
||||
|
||||
Runtime environment variables used by the current implementation:
|
||||
|
||||
- `EXPORTER_BASE_URL`
|
||||
- `DATABASE_URL`
|
||||
- `LISTEN_ADDR`
|
||||
- `COLLECT_INTERVAL`
|
||||
- `DEFAULT_REGION`
|
||||
- `SOURCE_REVISION`
|
||||
- `PRICE_PER_BYTE`
|
||||
- `INITIAL_INCLUDED_QUOTA_BYTES`
|
||||
- `INITIAL_BALANCE`
|
||||
|
||||
`DATABASE_URL` rule:
|
||||
|
||||
- it must point to the same `account` database that `accounts.svc.plus` uses
|
||||
- on `jp-xhttp-contabo.svc.plus`, the current accounts containers use
|
||||
`DB_HOST=stunnel-client`, `DB_PORT=15432`, and `DB_NAME=account`
|
||||
- `billing-service` should follow that same target so user-facing reads in
|
||||
`accounts.svc.plus` see the exact facts written by `billing-service`
|
||||
124
docs/architecture.md
Normal file
124
docs/architecture.md
Normal file
@ -0,0 +1,124 @@
|
||||
# billing-service architecture
|
||||
|
||||
`billing-service` is the billing write model in the Cloud Network Billing &
|
||||
Control Plane. It consumes normalized traffic snapshots, computes replay-safe
|
||||
minute deltas, and writes billing truth into PostgreSQL.
|
||||
|
||||
## Deployment topology
|
||||
|
||||
```mermaid
|
||||
flowchart TB
|
||||
subgraph Node["Single-node VPS / billing host"]
|
||||
Agent["agent-svc-plus"]
|
||||
Exporter["xray-exporter"]
|
||||
Billing["billing-service"]
|
||||
StunnelClient["stunnel-client"]
|
||||
end
|
||||
|
||||
subgraph DBHost["postgresql.svc.plus"]
|
||||
StunnelServer["stunnel-server"]
|
||||
PostgreSQL["PostgreSQL"]
|
||||
end
|
||||
|
||||
subgraph ReadPath["Read and presentation"]
|
||||
Accounts["accounts.svc.plus"]
|
||||
Console["console.svc.plus"]
|
||||
end
|
||||
|
||||
Agent --> Billing
|
||||
Agent -. coordination .-> Exporter
|
||||
Billing --> StunnelClient
|
||||
StunnelClient --> StunnelServer
|
||||
StunnelServer --> PostgreSQL
|
||||
Accounts --> PostgreSQL
|
||||
Console --> Accounts
|
||||
```
|
||||
|
||||
## Data flow
|
||||
|
||||
```mermaid
|
||||
flowchart LR
|
||||
Xray["xray-core<br/>raw cumulative counters"]
|
||||
Exporter["xray-exporter<br/>translation layer"]
|
||||
Billing["billing-service<br/>minute delta + rating writer"]
|
||||
PostgreSQL["PostgreSQL<br/>billing source of truth"]
|
||||
Accounts["accounts.svc.plus<br/>read model API"]
|
||||
Console["console.svc.plus<br/>presentation"]
|
||||
Agent["agent-svc-plus<br/>control plane"]
|
||||
|
||||
Xray -->|"raw per-UUID totals"| Exporter
|
||||
Exporter -->|"GET /v1/snapshots/latest payload"| Billing
|
||||
Agent -->|"schedule collect / reconcile"| Billing
|
||||
Billing -->|"idempotent writes"| PostgreSQL
|
||||
PostgreSQL -->|"usage + ledger + quota facts"| Accounts
|
||||
Accounts -->|"account usage / billing summary APIs"| Console
|
||||
```
|
||||
|
||||
## Role boundaries
|
||||
|
||||
- `agent-svc-plus`: control plane scheduling, reconciliation triggers, and
|
||||
future automation hooks
|
||||
- `xray-exporter`: collection and translation layer; it exposes normalized
|
||||
snapshots and Prometheus metrics
|
||||
- `billing-service`: billing writer; it computes positive minute deltas and
|
||||
persists replay-safe facts
|
||||
- `accounts.svc.plus`: PostgreSQL-backed read model; it aggregates usage,
|
||||
billing, and quota state for user-facing APIs
|
||||
- `console.svc.plus`: presentation layer; it reads from `accounts.svc.plus`
|
||||
only
|
||||
|
||||
## Shared database contract
|
||||
|
||||
`billing-service` and `accounts.svc.plus` share the same account database and
|
||||
schema.
|
||||
|
||||
- the database name remains `account`
|
||||
- on `jp-xhttp-contabo.svc.plus`, `accounts.svc.plus` reaches it through
|
||||
`stunnel-client:15432`
|
||||
- `billing-service` must point `DATABASE_URL` at that same PostgreSQL target so
|
||||
writes and reads stay in one source of truth
|
||||
|
||||
## Current implementation vs target architecture
|
||||
|
||||
### Current implementation
|
||||
|
||||
- `billing-service` pulls from a single `EXPORTER_BASE_URL`
|
||||
- the upstream snapshot source is `GET /v1/snapshots/latest`
|
||||
- the service is a task-oriented writer with health, status, and job endpoints
|
||||
- persisted facts land in the existing `accounts.svc.plus` accounting schema
|
||||
|
||||
### Target architecture
|
||||
|
||||
- `billing-service` remains the write model, but evolves into a multi-node
|
||||
aggregation point
|
||||
- the write path handles multiple exporter feeds or equivalent multi-node sample
|
||||
sets without losing `node_id`, `env`, or `inbound_tag`
|
||||
- remote exporter ingestion must work over HTTPS because exporters are not
|
||||
guaranteed to live on the same private network
|
||||
- the target pull contract must support source checkpoints and replay-safe
|
||||
catch-up, rather than relying only on one `latest` snapshot
|
||||
- `accounts.svc.plus` stays the read model and never delegates user-facing
|
||||
usage/billing reads back to `billing-service`
|
||||
|
||||
## Target multi-node ingress requirements
|
||||
|
||||
For the target architecture, `billing-service` should treat exporter nodes as
|
||||
remote sources, not implicit local sidecars.
|
||||
|
||||
- upstream pulls should use HTTPS with certificate validation enabled
|
||||
- prefer mTLS between `billing-service` and each `xray-exporter`
|
||||
- if mTLS is not ready, use HTTPS plus per-source bearer credentials
|
||||
- source progress must be tracked per exporter node so retries and catch-up stay
|
||||
bounded and observable
|
||||
- billing completeness must come from windowed, replay-safe collection, not
|
||||
from assuming the newest snapshot implies nothing was missed
|
||||
- minute-level sync drift is acceptable; the target is short-window eventual
|
||||
consistency rather than second-level strong consistency
|
||||
|
||||
## Invariants
|
||||
|
||||
- PostgreSQL is the only billing source of truth
|
||||
- `billing-service` and `accounts.svc.plus` share the same `account` database
|
||||
- Prometheus and Grafana remain observability only
|
||||
- `console.svc.plus` does not read PostgreSQL or `billing-service` directly
|
||||
- `accounts.svc.plus` does not use Prometheus as a billing data source
|
||||
199
docs/multi-node-https-plan.md
Normal file
199
docs/multi-node-https-plan.md
Normal file
@ -0,0 +1,199 @@
|
||||
# billing-service multi-node HTTPS ingestion plan
|
||||
|
||||
This document defines the target evolution path for `billing-service` from the
|
||||
current single `EXPORTER_BASE_URL` pull model to a secure multi-node ingestion
|
||||
model.
|
||||
|
||||
## Goal
|
||||
|
||||
Keep `billing-service` as the single billing write model, but let it ingest
|
||||
snapshots from many remote `xray-exporter` instances over HTTPS without
|
||||
assuming private-network reachability.
|
||||
|
||||
## Consistency budget
|
||||
|
||||
Target state does not require second-level strong consistency.
|
||||
|
||||
- minute-level sync drift is acceptable
|
||||
- the system should be treated as eventually consistent across a short
|
||||
multi-minute window
|
||||
- user-facing reads in `accounts.svc.plus` and `console.svc.plus` may lag the
|
||||
newest exporter counters briefly
|
||||
- billing correctness matters more than immediate freshness
|
||||
|
||||
Operational meaning:
|
||||
|
||||
- collector retries may intentionally overlap prior windows
|
||||
- delayed exporter delivery should be repaired by later collect or reconcile
|
||||
runs
|
||||
- the write model must converge to the correct minute buckets and ledger state
|
||||
without double charging
|
||||
|
||||
## Why the current model is not enough
|
||||
|
||||
Today:
|
||||
|
||||
- `billing-service` accepts one `EXPORTER_BASE_URL`
|
||||
- it fetches one `GET /v1/snapshots/latest` payload
|
||||
- it assumes the latest snapshot is enough to advance billing state
|
||||
|
||||
This is fine for a single local exporter, but it is not enough for:
|
||||
|
||||
- multiple proxy nodes
|
||||
- exporters reachable only over public or cross-region networks
|
||||
- outage recovery where `latest` alone cannot prove whether intermediate
|
||||
windows were missed
|
||||
- source-specific authentication and certificate validation
|
||||
|
||||
## Target design
|
||||
|
||||
### 1. Multi-source registry instead of one base URL
|
||||
|
||||
Target state replaces the single `EXPORTER_BASE_URL` dependency with a source
|
||||
registry owned by `billing-service`.
|
||||
|
||||
Each configured source should define at least:
|
||||
|
||||
- `source_id`
|
||||
- `node_id`
|
||||
- `env`
|
||||
- `base_url`
|
||||
- `enabled`
|
||||
- `auth_mode`
|
||||
- `credential_ref`
|
||||
- `ca_bundle_ref` or trusted issuer reference
|
||||
- `server_name`
|
||||
- `collect_interval`
|
||||
- `request_timeout`
|
||||
|
||||
Rules:
|
||||
|
||||
- target `base_url` must be `https://...`
|
||||
- `node_id` and `env` must match what the exporter emits
|
||||
- one source maps to one exporter endpoint, even if several sources later share
|
||||
the same network path
|
||||
|
||||
### 2. HTTPS-only upstream interaction
|
||||
|
||||
Target state requires secure transport for remote exporter pulls.
|
||||
|
||||
Security rules:
|
||||
|
||||
- remote exporter pulls must use HTTPS
|
||||
- certificate verification must stay enabled
|
||||
- `billing-service` must not rely on insecure skip-verify mode
|
||||
- prefer mTLS for service-to-service trust
|
||||
- if mTLS is not yet available, use HTTPS plus a per-source bearer token
|
||||
- credentials must be scoped per source, not shared globally across all nodes
|
||||
|
||||
Recommended trust order:
|
||||
|
||||
1. HTTPS + mTLS
|
||||
2. HTTPS + bearer token + pinned CA / trusted issuer
|
||||
|
||||
### 3. Completeness-first pull contract
|
||||
|
||||
To make multi-node billing safe, the upstream contract must evolve from
|
||||
`latest` to a windowed pull API.
|
||||
|
||||
Recommended target contract:
|
||||
|
||||
`GET /v1/snapshots/window?since=<RFC3339>&until=<RFC3339>&limit=<n>&cursor=<token>`
|
||||
|
||||
Response shape should include:
|
||||
|
||||
- `source_id`
|
||||
- `node_id`
|
||||
- `env`
|
||||
- `window_start`
|
||||
- `window_end`
|
||||
- `items[]`
|
||||
- `next_cursor`
|
||||
- `has_more`
|
||||
- `emitted_at`
|
||||
|
||||
Each item should still carry:
|
||||
|
||||
- `collected_at`
|
||||
- `samples[].uuid`
|
||||
- `samples[].email`
|
||||
- `samples[].inbound_tag`
|
||||
- `samples[].uplink_bytes_total`
|
||||
- `samples[].downlink_bytes_total`
|
||||
|
||||
Why this matters:
|
||||
|
||||
- `latest` is enough for observability, but not enough to prove billing
|
||||
completeness
|
||||
- windowed pagination lets `billing-service` resume from checkpoints and catch
|
||||
up after transient failures
|
||||
|
||||
### 4. Source checkpoints and replay safety
|
||||
|
||||
`billing-service` should track fetch progress per source, not globally.
|
||||
|
||||
Recommended source checkpoint fields:
|
||||
|
||||
- `source_id`
|
||||
- `last_successful_until`
|
||||
- `last_cursor`
|
||||
- `last_attempted_at`
|
||||
- `last_succeeded_at`
|
||||
- `last_error`
|
||||
|
||||
Collection behavior:
|
||||
|
||||
- pull per source using that source's last successful checkpoint
|
||||
- always overlap a small safety window during retries
|
||||
- rely on idempotent minute-bucket writes so overlap does not double-charge
|
||||
- expose source-level health in `/v1/status`
|
||||
- treat short multi-minute lag as acceptable if replay convergence is preserved
|
||||
|
||||
### 5. Safe write semantics
|
||||
|
||||
Security alone is not enough; the write path must remain replay-safe.
|
||||
|
||||
Target write-path rules:
|
||||
|
||||
- billing facts remain keyed by `node_id`, `env`, `uuid`, `inbound_tag`, and
|
||||
bucket time
|
||||
- re-fetching the same source window must not duplicate usage or ledger rows
|
||||
- reconcile jobs must be able to replay a source or time range intentionally
|
||||
|
||||
## Recommended rollout
|
||||
|
||||
### Phase 1. Preserve current runtime
|
||||
|
||||
- keep `EXPORTER_BASE_URL` as legacy single-source mode
|
||||
- keep `GET /v1/snapshots/latest` for current deployment compatibility
|
||||
|
||||
### Phase 2. Add source registry support
|
||||
|
||||
- introduce a multi-source config model
|
||||
- let `billing-service` iterate sources internally
|
||||
- keep single-source config as a compatibility shim
|
||||
|
||||
### Phase 3. Add HTTPS window API to exporter
|
||||
|
||||
- extend `xray-exporter` with a secure windowed snapshot API
|
||||
- add source authentication and certificate validation requirements
|
||||
|
||||
### Phase 4. Dual-read migration
|
||||
|
||||
- let `billing-service` support both:
|
||||
- legacy single-source `latest`
|
||||
- target multi-source HTTPS window pulls
|
||||
- compare source-level completeness and write counts during rollout
|
||||
|
||||
### Phase 5. Make multi-source HTTPS the default
|
||||
|
||||
- require HTTPS for remote exporter sources
|
||||
- reserve plain HTTP for explicit same-host dev or local-only modes
|
||||
- retire single global `EXPORTER_BASE_URL` as the primary production contract
|
||||
|
||||
## Non-goals
|
||||
|
||||
- exposing `billing-service` as a user-facing query API
|
||||
- moving billing truth into Prometheus
|
||||
- weakening TLS verification to simplify rollout
|
||||
- making `accounts.svc.plus` call `billing-service` for runtime reads
|
||||
@ -1,6 +1,7 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
@ -8,8 +9,19 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type ExporterSource struct {
|
||||
SourceID string
|
||||
BaseURL string
|
||||
ExpectedNodeID string
|
||||
ExpectedEnv string
|
||||
Enabled bool
|
||||
TimeoutSeconds int
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
ExporterBaseURL string
|
||||
ExporterSources []ExporterSource
|
||||
InternalServiceToken string
|
||||
DatabaseURL string
|
||||
ListenAddr string
|
||||
CollectInterval time.Duration
|
||||
@ -20,13 +32,23 @@ type Config struct {
|
||||
InitialBalance float64
|
||||
}
|
||||
|
||||
type rawExporterSource struct {
|
||||
SourceID string `json:"source_id"`
|
||||
BaseURL string `json:"base_url"`
|
||||
ExpectedNodeID string `json:"expected_node_id"`
|
||||
ExpectedEnv string `json:"expected_env"`
|
||||
Enabled *bool `json:"enabled"`
|
||||
TimeoutSeconds int `json:"timeout_seconds"`
|
||||
}
|
||||
|
||||
func Load() (Config, error) {
|
||||
cfg := Config{
|
||||
ExporterBaseURL: strings.TrimRight(strings.TrimSpace(os.Getenv("EXPORTER_BASE_URL")), "/"),
|
||||
DatabaseURL: strings.TrimSpace(os.Getenv("DATABASE_URL")),
|
||||
ListenAddr: strings.TrimSpace(os.Getenv("LISTEN_ADDR")),
|
||||
DefaultRegion: strings.TrimSpace(os.Getenv("DEFAULT_REGION")),
|
||||
SourceRevision: strings.TrimSpace(os.Getenv("SOURCE_REVISION")),
|
||||
ExporterBaseURL: strings.TrimRight(strings.TrimSpace(os.Getenv("EXPORTER_BASE_URL")), "/"),
|
||||
InternalServiceToken: strings.TrimSpace(os.Getenv("INTERNAL_SERVICE_TOKEN")),
|
||||
DatabaseURL: strings.TrimSpace(os.Getenv("DATABASE_URL")),
|
||||
ListenAddr: strings.TrimSpace(os.Getenv("LISTEN_ADDR")),
|
||||
DefaultRegion: strings.TrimSpace(os.Getenv("DEFAULT_REGION")),
|
||||
SourceRevision: strings.TrimSpace(os.Getenv("SOURCE_REVISION")),
|
||||
}
|
||||
if cfg.ListenAddr == "" {
|
||||
cfg.ListenAddr = ":8081"
|
||||
@ -35,12 +57,18 @@ func Load() (Config, error) {
|
||||
cfg.SourceRevision = "billing-service-v1"
|
||||
}
|
||||
|
||||
if cfg.ExporterBaseURL == "" {
|
||||
return Config{}, fmt.Errorf("EXPORTER_BASE_URL is required")
|
||||
}
|
||||
if cfg.DatabaseURL == "" {
|
||||
return Config{}, fmt.Errorf("DATABASE_URL is required")
|
||||
}
|
||||
if cfg.InternalServiceToken == "" {
|
||||
return Config{}, fmt.Errorf("INTERNAL_SERVICE_TOKEN is required")
|
||||
}
|
||||
|
||||
sources, err := loadExporterSources(cfg.ExporterBaseURL, strings.TrimSpace(os.Getenv("EXPORTER_SOURCES_JSON")))
|
||||
if err != nil {
|
||||
return Config{}, err
|
||||
}
|
||||
cfg.ExporterSources = sources
|
||||
|
||||
interval := strings.TrimSpace(os.Getenv("COLLECT_INTERVAL"))
|
||||
if interval == "" {
|
||||
@ -59,6 +87,54 @@ func Load() (Config, error) {
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func loadExporterSources(legacyBaseURL, rawJSON string) ([]ExporterSource, error) {
|
||||
if rawJSON == "" {
|
||||
if legacyBaseURL == "" {
|
||||
return nil, fmt.Errorf("EXPORTER_SOURCES_JSON or EXPORTER_BASE_URL is required")
|
||||
}
|
||||
return []ExporterSource{{
|
||||
SourceID: "default",
|
||||
BaseURL: strings.TrimRight(strings.TrimSpace(legacyBaseURL), "/"),
|
||||
Enabled: true,
|
||||
TimeoutSeconds: 15,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
var rawSources []rawExporterSource
|
||||
if err := json.Unmarshal([]byte(rawJSON), &rawSources); err != nil {
|
||||
return nil, fmt.Errorf("parse EXPORTER_SOURCES_JSON: %w", err)
|
||||
}
|
||||
if len(rawSources) == 0 {
|
||||
return nil, fmt.Errorf("EXPORTER_SOURCES_JSON must define at least one source")
|
||||
}
|
||||
|
||||
sources := make([]ExporterSource, 0, len(rawSources))
|
||||
for _, raw := range rawSources {
|
||||
source := ExporterSource{
|
||||
SourceID: strings.TrimSpace(raw.SourceID),
|
||||
BaseURL: strings.TrimRight(strings.TrimSpace(raw.BaseURL), "/"),
|
||||
ExpectedNodeID: strings.TrimSpace(raw.ExpectedNodeID),
|
||||
ExpectedEnv: strings.TrimSpace(raw.ExpectedEnv),
|
||||
Enabled: true,
|
||||
TimeoutSeconds: raw.TimeoutSeconds,
|
||||
}
|
||||
if raw.Enabled != nil {
|
||||
source.Enabled = *raw.Enabled
|
||||
}
|
||||
if source.SourceID == "" {
|
||||
return nil, fmt.Errorf("EXPORTER_SOURCES_JSON source_id is required")
|
||||
}
|
||||
if source.BaseURL == "" {
|
||||
return nil, fmt.Errorf("EXPORTER_SOURCES_JSON base_url is required for source %s", source.SourceID)
|
||||
}
|
||||
if source.TimeoutSeconds <= 0 {
|
||||
source.TimeoutSeconds = 15
|
||||
}
|
||||
sources = append(sources, source)
|
||||
}
|
||||
return sources, nil
|
||||
}
|
||||
|
||||
func parseFloatEnv(key string, fallback float64) float64 {
|
||||
raw := strings.TrimSpace(os.Getenv(key))
|
||||
if raw == "" {
|
||||
|
||||
32
internal/config/config_test.go
Normal file
32
internal/config/config_test.go
Normal file
@ -0,0 +1,32 @@
|
||||
package config
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestLoadExporterSourcesFromJSON(t *testing.T) {
|
||||
sources, err := loadExporterSources("", `[{"source_id":"jp","base_url":"https://jp-xhttp-contabo.svc.plus","expected_node_id":"jp-xhttp-contabo.svc.plus","expected_env":"prod","enabled":true,"timeout_seconds":20}]`)
|
||||
if err != nil {
|
||||
t.Fatalf("load sources: %v", err)
|
||||
}
|
||||
if len(sources) != 1 {
|
||||
t.Fatalf("expected 1 source, got %d", len(sources))
|
||||
}
|
||||
if sources[0].SourceID != "jp" || sources[0].BaseURL != "https://jp-xhttp-contabo.svc.plus" {
|
||||
t.Fatalf("unexpected source %#v", sources[0])
|
||||
}
|
||||
if sources[0].TimeoutSeconds != 20 {
|
||||
t.Fatalf("expected timeout 20, got %d", sources[0].TimeoutSeconds)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadExporterSourcesFallsBackToLegacyBaseURL(t *testing.T) {
|
||||
sources, err := loadExporterSources("http://127.0.0.1:8080", "")
|
||||
if err != nil {
|
||||
t.Fatalf("load legacy source: %v", err)
|
||||
}
|
||||
if len(sources) != 1 {
|
||||
t.Fatalf("expected 1 source, got %d", len(sources))
|
||||
}
|
||||
if sources[0].SourceID != "default" || sources[0].BaseURL != "http://127.0.0.1:8080" {
|
||||
t.Fatalf("unexpected source %#v", sources[0])
|
||||
}
|
||||
}
|
||||
@ -6,49 +6,65 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"billing-service/internal/config"
|
||||
"billing-service/internal/model"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
baseURL string
|
||||
httpClient *http.Client
|
||||
serviceToken string
|
||||
}
|
||||
|
||||
func NewClient(baseURL string) *Client {
|
||||
func NewClient(serviceToken string) *Client {
|
||||
return &Client{
|
||||
baseURL: strings.TrimRight(strings.TrimSpace(baseURL), "/"),
|
||||
httpClient: &http.Client{Timeout: 15 * time.Second},
|
||||
serviceToken: strings.TrimSpace(serviceToken),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) FetchLatestSnapshot(ctx context.Context) (model.Snapshot, error) {
|
||||
endpoint, err := url.JoinPath(c.baseURL, "/v1/snapshots/latest")
|
||||
func (c *Client) FetchWindow(ctx context.Context, source config.ExporterSource, since, until time.Time, limit int, cursor *time.Time) (model.SnapshotWindowPage, error) {
|
||||
timeout := time.Duration(source.TimeoutSeconds) * time.Second
|
||||
if timeout <= 0 {
|
||||
timeout = 15 * time.Second
|
||||
}
|
||||
client := &http.Client{Timeout: timeout}
|
||||
|
||||
endpoint, err := url.JoinPath(strings.TrimRight(strings.TrimSpace(source.BaseURL), "/"), "/v1/snapshots/window")
|
||||
if err != nil {
|
||||
return model.Snapshot{}, fmt.Errorf("build snapshot endpoint: %w", err)
|
||||
return model.SnapshotWindowPage{}, fmt.Errorf("build snapshots window endpoint: %w", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
|
||||
if err != nil {
|
||||
return model.Snapshot{}, fmt.Errorf("build snapshot request: %w", err)
|
||||
return model.SnapshotWindowPage{}, fmt.Errorf("build snapshots window request: %w", err)
|
||||
}
|
||||
req.Header.Set("Accept", "application/json")
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
query := req.URL.Query()
|
||||
query.Set("since", since.UTC().Format(time.RFC3339))
|
||||
query.Set("until", until.UTC().Format(time.RFC3339))
|
||||
query.Set("limit", strconv.Itoa(limit))
|
||||
if cursor != nil {
|
||||
query.Set("cursor", cursor.UTC().Format(time.RFC3339))
|
||||
}
|
||||
req.URL.RawQuery = query.Encode()
|
||||
req.Header.Set("Accept", "application/json")
|
||||
req.Header.Set("Authorization", "Bearer "+c.serviceToken)
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return model.Snapshot{}, fmt.Errorf("fetch snapshot: %w", err)
|
||||
return model.SnapshotWindowPage{}, fmt.Errorf("fetch snapshots window for %s: %w", source.SourceID, err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return model.Snapshot{}, fmt.Errorf("fetch snapshot: unexpected status %s", resp.Status)
|
||||
return model.SnapshotWindowPage{}, fmt.Errorf("fetch snapshots window for %s: unexpected status %s", source.SourceID, resp.Status)
|
||||
}
|
||||
|
||||
var snapshot model.Snapshot
|
||||
if err := json.NewDecoder(resp.Body).Decode(&snapshot); err != nil {
|
||||
return model.Snapshot{}, fmt.Errorf("decode snapshot: %w", err)
|
||||
var page model.SnapshotWindowPage
|
||||
if err := json.NewDecoder(resp.Body).Decode(&page); err != nil {
|
||||
return model.SnapshotWindowPage{}, fmt.Errorf("decode snapshots window for %s: %w", source.SourceID, err)
|
||||
}
|
||||
return snapshot, nil
|
||||
return page, nil
|
||||
}
|
||||
|
||||
@ -17,6 +17,14 @@ type Snapshot struct {
|
||||
Samples []Sample `json:"samples"`
|
||||
}
|
||||
|
||||
type SnapshotWindowPage struct {
|
||||
NodeID string `json:"node_id"`
|
||||
Env string `json:"env"`
|
||||
Snapshots []Snapshot `json:"snapshots"`
|
||||
HasMore bool `json:"has_more"`
|
||||
NextCursor string `json:"next_cursor,omitempty"`
|
||||
}
|
||||
|
||||
type Checkpoint struct {
|
||||
NodeID string
|
||||
AccountUUID string
|
||||
@ -76,13 +84,30 @@ type BillingProfile struct {
|
||||
PricingRuleVersion string
|
||||
}
|
||||
|
||||
type JobResult struct {
|
||||
Job string `json:"job"`
|
||||
StartedAt time.Time `json:"started_at"`
|
||||
FinishedAt time.Time `json:"finished_at"`
|
||||
ProcessedSamples int `json:"processed_samples"`
|
||||
WrittenMinutes int `json:"written_minutes"`
|
||||
ReplayedMinutes int `json:"replayed_minutes"`
|
||||
Status string `json:"status"`
|
||||
Error string `json:"error,omitempty"`
|
||||
type SourceSyncState struct {
|
||||
SourceID string
|
||||
LastCompletedUntil *time.Time
|
||||
LastAttemptedAt *time.Time
|
||||
LastSucceededAt *time.Time
|
||||
LastError string
|
||||
}
|
||||
|
||||
type SourceStatus struct {
|
||||
SourceID string `json:"source_id"`
|
||||
LastCompletedUntil *time.Time `json:"last_completed_until,omitempty"`
|
||||
LastAttemptedAt *time.Time `json:"last_attempted_at,omitempty"`
|
||||
LastSucceededAt *time.Time `json:"last_succeeded_at,omitempty"`
|
||||
LastError string `json:"last_error,omitempty"`
|
||||
}
|
||||
|
||||
type JobResult struct {
|
||||
Job string `json:"job"`
|
||||
StartedAt time.Time `json:"started_at"`
|
||||
FinishedAt time.Time `json:"finished_at"`
|
||||
ProcessedSamples int `json:"processed_samples"`
|
||||
WrittenMinutes int `json:"written_minutes"`
|
||||
ReplayedMinutes int `json:"replayed_minutes"`
|
||||
Status string `json:"status"`
|
||||
Error string `json:"error,omitempty"`
|
||||
SourceStatuses []SourceStatus `json:"source_statuses,omitempty"`
|
||||
}
|
||||
|
||||
@ -250,6 +250,81 @@ func (p *Postgres) GetBillingProfile(ctx context.Context, accountUUID string) (*
|
||||
return &profile, nil
|
||||
}
|
||||
|
||||
func (p *Postgres) GetSourceSyncState(ctx context.Context, sourceID string) (*model.SourceSyncState, error) {
|
||||
const query = `
|
||||
SELECT source_id, last_completed_until, last_attempted_at, last_succeeded_at, last_error
|
||||
FROM billing_source_sync_state
|
||||
WHERE source_id = $1`
|
||||
|
||||
var state model.SourceSyncState
|
||||
var lastCompleted sql.NullTime
|
||||
var lastAttempted sql.NullTime
|
||||
var lastSucceeded sql.NullTime
|
||||
|
||||
err := p.db.QueryRowContext(ctx, query, sourceID).Scan(
|
||||
&state.SourceID,
|
||||
&lastCompleted,
|
||||
&lastAttempted,
|
||||
&lastSucceeded,
|
||||
&state.LastError,
|
||||
)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
if lastCompleted.Valid {
|
||||
value := lastCompleted.Time
|
||||
state.LastCompletedUntil = &value
|
||||
}
|
||||
if lastAttempted.Valid {
|
||||
value := lastAttempted.Time
|
||||
state.LastAttemptedAt = &value
|
||||
}
|
||||
if lastSucceeded.Valid {
|
||||
value := lastSucceeded.Time
|
||||
state.LastSucceededAt = &value
|
||||
}
|
||||
return &state, nil
|
||||
}
|
||||
|
||||
func (p *Postgres) UpsertSourceSyncState(ctx context.Context, state model.SourceSyncState) error {
|
||||
const query = `
|
||||
INSERT INTO billing_source_sync_state (
|
||||
source_id, last_completed_until, last_attempted_at, last_succeeded_at, last_error
|
||||
) VALUES ($1, $2, $3, $4, $5)
|
||||
ON CONFLICT (source_id) DO UPDATE SET
|
||||
last_completed_until = EXCLUDED.last_completed_until,
|
||||
last_attempted_at = EXCLUDED.last_attempted_at,
|
||||
last_succeeded_at = EXCLUDED.last_succeeded_at,
|
||||
last_error = EXCLUDED.last_error,
|
||||
updated_at = now()
|
||||
`
|
||||
|
||||
var lastCompleted any
|
||||
if state.LastCompletedUntil != nil {
|
||||
lastCompleted = state.LastCompletedUntil.UTC()
|
||||
}
|
||||
var lastAttempted any
|
||||
if state.LastAttemptedAt != nil {
|
||||
lastAttempted = state.LastAttemptedAt.UTC()
|
||||
}
|
||||
var lastSucceeded any
|
||||
if state.LastSucceededAt != nil {
|
||||
lastSucceeded = state.LastSucceededAt.UTC()
|
||||
}
|
||||
|
||||
_, err := p.db.ExecContext(ctx, query,
|
||||
state.SourceID,
|
||||
lastCompleted,
|
||||
lastAttempted,
|
||||
lastSucceeded,
|
||||
state.LastError,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
var _ Repository = (*Postgres)(nil)
|
||||
|
||||
func ensureUTC(ts time.Time) time.Time {
|
||||
|
||||
@ -14,4 +14,6 @@ type Repository interface {
|
||||
GetQuotaState(ctx context.Context, accountUUID string) (*model.QuotaState, error)
|
||||
UpsertQuotaState(ctx context.Context, state model.QuotaState) error
|
||||
GetBillingProfile(ctx context.Context, accountUUID string) (*model.BillingProfile, error)
|
||||
GetSourceSyncState(ctx context.Context, sourceID string) (*model.SourceSyncState, error)
|
||||
UpsertSourceSyncState(ctx context.Context, state model.SourceSyncState) error
|
||||
}
|
||||
|
||||
@ -43,6 +43,7 @@ func TestPostgresAcceptanceWritesAccountingTables(t *testing.T) {
|
||||
|
||||
accountUUID := "11111111-1111-1111-1111-111111111111"
|
||||
if _, err := db.ExecContext(ctx, `
|
||||
DELETE FROM billing_source_sync_state;
|
||||
DELETE FROM billing_ledger;
|
||||
DELETE FROM traffic_minute_buckets;
|
||||
DELETE FROM traffic_stat_checkpoints;
|
||||
@ -59,22 +60,33 @@ func TestPostgresAcceptanceWritesAccountingTables(t *testing.T) {
|
||||
}
|
||||
|
||||
svc := New(config.Config{
|
||||
ExporterSources: []config.ExporterSource{{
|
||||
SourceID: "default",
|
||||
BaseURL: "https://jp-xhttp-contabo.svc.plus",
|
||||
ExpectedNodeID: "jp-node",
|
||||
ExpectedEnv: "prod",
|
||||
Enabled: true,
|
||||
TimeoutSeconds: 15,
|
||||
}},
|
||||
InternalServiceToken: "secret",
|
||||
DefaultRegion: "",
|
||||
SourceRevision: "billing-service-acceptance",
|
||||
PricePerByte: 0.5,
|
||||
InitialIncludedQuotaBytes: 1000,
|
||||
InitialBalance: 0,
|
||||
}, fakeSource{snapshot: model.Snapshot{
|
||||
CollectedAt: time.Date(2026, 4, 8, 11, 0, 45, 0, time.UTC),
|
||||
NodeID: "jp-node",
|
||||
Env: "prod",
|
||||
Samples: []model.Sample{{
|
||||
UUID: accountUUID,
|
||||
Email: "billing@example.com",
|
||||
InboundTag: "premium",
|
||||
UplinkBytesTotal: 100,
|
||||
DownlinkBytesTotal: 50,
|
||||
}},
|
||||
}, &fakeWindowSource{pagesBySource: map[string][]model.SnapshotWindowPage{
|
||||
"default": {singleSnapshotPage(model.Snapshot{
|
||||
CollectedAt: time.Date(2026, 4, 8, 11, 0, 45, 0, time.UTC),
|
||||
NodeID: "jp-node",
|
||||
Env: "prod",
|
||||
Samples: []model.Sample{{
|
||||
UUID: accountUUID,
|
||||
Email: "billing@example.com",
|
||||
InboundTag: "premium",
|
||||
UplinkBytesTotal: 100,
|
||||
DownlinkBytesTotal: 50,
|
||||
}},
|
||||
})},
|
||||
}}, repository.NewPostgres(db))
|
||||
|
||||
result, err := svc.RunCollectAndRate(ctx, "collect-and-rate")
|
||||
@ -89,6 +101,7 @@ func TestPostgresAcceptanceWritesAccountingTables(t *testing.T) {
|
||||
assertRowCount(t, db, "traffic_minute_buckets", 1)
|
||||
assertRowCount(t, db, "billing_ledger", 1)
|
||||
assertRowCount(t, db, "account_quota_states", 1)
|
||||
assertRowCount(t, db, "billing_source_sync_state", 1)
|
||||
|
||||
var totalBytes int64
|
||||
if err := db.QueryRowContext(ctx, `SELECT total_bytes FROM traffic_minute_buckets LIMIT 1`).Scan(&totalBytes); err != nil {
|
||||
|
||||
@ -15,13 +15,18 @@ import (
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type snapshotSource interface {
|
||||
FetchLatestSnapshot(ctx context.Context) (model.Snapshot, error)
|
||||
const (
|
||||
sourceWindowOverlap = 2 * time.Minute
|
||||
sourceWindowPageSize = 120
|
||||
)
|
||||
|
||||
type windowSource interface {
|
||||
FetchWindow(ctx context.Context, source config.ExporterSource, since, until time.Time, limit int, cursor *time.Time) (model.SnapshotWindowPage, error)
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
cfg config.Config
|
||||
source snapshotSource
|
||||
source windowSource
|
||||
repo repository.Repository
|
||||
|
||||
mu sync.Mutex
|
||||
@ -30,7 +35,7 @@ type Service struct {
|
||||
lastError string
|
||||
}
|
||||
|
||||
func New(cfg config.Config, source snapshotSource, repo repository.Repository) *Service {
|
||||
func New(cfg config.Config, source windowSource, repo repository.Repository) *Service {
|
||||
return &Service{
|
||||
cfg: cfg,
|
||||
source: source,
|
||||
@ -65,30 +70,31 @@ func (s *Service) RunCollectAndRate(ctx context.Context, job string) (model.JobR
|
||||
Status: "ok",
|
||||
}
|
||||
|
||||
snapshot, err := s.source.FetchLatestSnapshot(ctx)
|
||||
if err != nil {
|
||||
result.Status = "error"
|
||||
result.Error = err.Error()
|
||||
result.FinishedAt = time.Now().UTC()
|
||||
s.record(result)
|
||||
return result, err
|
||||
enabledSources := 0
|
||||
fatalSourceFailures := 0
|
||||
for _, source := range s.cfg.ExporterSources {
|
||||
if !source.Enabled {
|
||||
continue
|
||||
}
|
||||
enabledSources++
|
||||
|
||||
status, err := s.collectSource(ctx, source, &result)
|
||||
result.SourceStatuses = append(result.SourceStatuses, status)
|
||||
if err != nil {
|
||||
fatalSourceFailures++
|
||||
result.Error = joinError(result.Error, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
for _, sample := range snapshot.Samples {
|
||||
if err := validateSample(sample); err != nil {
|
||||
if enabledSources == 0 {
|
||||
result.Status = "error"
|
||||
result.Error = joinError(result.Error, "no enabled exporter sources configured")
|
||||
}
|
||||
if fatalSourceFailures > 0 {
|
||||
if result.ProcessedSamples == 0 && result.WrittenMinutes == 0 && result.ReplayedMinutes == 0 && result.Status != "partial" {
|
||||
result.Status = "error"
|
||||
} else if result.Status == "ok" {
|
||||
result.Status = "partial"
|
||||
result.Error = joinError(result.Error, err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
processed, err := s.processSample(ctx, snapshot, sample, &result)
|
||||
if err != nil {
|
||||
result.Status = "partial"
|
||||
result.Error = joinError(result.Error, err.Error())
|
||||
continue
|
||||
}
|
||||
if processed {
|
||||
result.ProcessedSamples++
|
||||
}
|
||||
}
|
||||
|
||||
@ -112,6 +118,110 @@ func (s *Service) Health() (bool, string) {
|
||||
return s.lastOK, s.lastError
|
||||
}
|
||||
|
||||
func (s *Service) collectSource(ctx context.Context, source config.ExporterSource, result *model.JobResult) (model.SourceStatus, error) {
|
||||
state, err := s.repo.GetSourceSyncState(ctx, source.SourceID)
|
||||
if err != nil {
|
||||
return model.SourceStatus{SourceID: source.SourceID, LastError: err.Error()}, fmt.Errorf("load source sync state %s: %w", source.SourceID, err)
|
||||
}
|
||||
if state == nil {
|
||||
state = &model.SourceSyncState{SourceID: source.SourceID}
|
||||
}
|
||||
|
||||
attemptedAt := time.Now().UTC()
|
||||
state.LastAttemptedAt = &attemptedAt
|
||||
state.LastError = ""
|
||||
if err := s.repo.UpsertSourceSyncState(ctx, *state); err != nil {
|
||||
return sourceStatusFromState(*state), fmt.Errorf("record source attempt %s: %w", source.SourceID, err)
|
||||
}
|
||||
|
||||
until := time.Now().UTC().Truncate(time.Minute).Add(-time.Minute)
|
||||
since := until.Add(-sourceWindowOverlap)
|
||||
if state.LastCompletedUntil != nil {
|
||||
since = state.LastCompletedUntil.UTC().Add(-sourceWindowOverlap)
|
||||
}
|
||||
if since.After(until) {
|
||||
completedUntil := until
|
||||
state.LastCompletedUntil = &completedUntil
|
||||
succeededAt := time.Now().UTC()
|
||||
state.LastSucceededAt = &succeededAt
|
||||
state.LastError = ""
|
||||
if err := s.repo.UpsertSourceSyncState(ctx, *state); err != nil {
|
||||
return sourceStatusFromState(*state), fmt.Errorf("record source noop completion %s: %w", source.SourceID, err)
|
||||
}
|
||||
return sourceStatusFromState(*state), nil
|
||||
}
|
||||
|
||||
var cursor *time.Time
|
||||
var lastProcessed *time.Time
|
||||
for {
|
||||
page, err := s.source.FetchWindow(ctx, source, since, until, sourceWindowPageSize, cursor)
|
||||
if err != nil {
|
||||
return s.recordSourceFailure(ctx, *state, fmt.Errorf("fetch window for %s: %w", source.SourceID, err))
|
||||
}
|
||||
|
||||
for _, snapshot := range page.Snapshots {
|
||||
if err := validateSnapshotSource(snapshot, source); err != nil {
|
||||
return s.recordSourceFailure(ctx, *state, err)
|
||||
}
|
||||
|
||||
processed, err := s.processSnapshot(ctx, snapshot, result)
|
||||
if err != nil {
|
||||
return s.recordSourceFailure(ctx, *state, err)
|
||||
}
|
||||
if processed {
|
||||
collectedAt := snapshot.CollectedAt.UTC()
|
||||
lastProcessed = &collectedAt
|
||||
}
|
||||
}
|
||||
|
||||
if !page.HasMore {
|
||||
break
|
||||
}
|
||||
if strings.TrimSpace(page.NextCursor) == "" {
|
||||
return s.recordSourceFailure(ctx, *state, fmt.Errorf("fetch window for %s: next_cursor missing while has_more=true", source.SourceID))
|
||||
}
|
||||
nextCursor, err := time.Parse(time.RFC3339, strings.TrimSpace(page.NextCursor))
|
||||
if err != nil {
|
||||
return s.recordSourceFailure(ctx, *state, fmt.Errorf("parse next cursor for %s: %w", source.SourceID, err))
|
||||
}
|
||||
cursor = &nextCursor
|
||||
}
|
||||
|
||||
completedUntil := until
|
||||
if lastProcessed != nil && lastProcessed.Before(completedUntil) {
|
||||
completedUntil = lastProcessed.UTC()
|
||||
}
|
||||
succeededAt := time.Now().UTC()
|
||||
state.LastCompletedUntil = &completedUntil
|
||||
state.LastSucceededAt = &succeededAt
|
||||
state.LastError = ""
|
||||
if err := s.repo.UpsertSourceSyncState(ctx, *state); err != nil {
|
||||
return sourceStatusFromState(*state), fmt.Errorf("record source completion %s: %w", source.SourceID, err)
|
||||
}
|
||||
return sourceStatusFromState(*state), nil
|
||||
}
|
||||
|
||||
func (s *Service) processSnapshot(ctx context.Context, snapshot model.Snapshot, result *model.JobResult) (bool, error) {
|
||||
processedAny := false
|
||||
for _, sample := range snapshot.Samples {
|
||||
if err := validateSample(sample); err != nil {
|
||||
result.Status = "partial"
|
||||
result.Error = joinError(result.Error, err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
processed, err := s.processSample(ctx, snapshot, sample, result)
|
||||
if err != nil {
|
||||
return processedAny, fmt.Errorf("process snapshot %s for %s: %w", snapshot.CollectedAt.UTC().Format(time.RFC3339), sample.UUID, err)
|
||||
}
|
||||
if processed {
|
||||
processedAny = true
|
||||
result.ProcessedSamples++
|
||||
}
|
||||
}
|
||||
return processedAny, nil
|
||||
}
|
||||
|
||||
func (s *Service) processSample(ctx context.Context, snapshot model.Snapshot, sample model.Sample, result *model.JobResult) (bool, error) {
|
||||
storageNodeID := composeStorageNodeID(snapshot.Env, snapshot.NodeID)
|
||||
minuteStart := snapshot.CollectedAt.UTC().Truncate(time.Minute)
|
||||
@ -178,15 +288,12 @@ func (s *Service) processSample(ctx context.Context, snapshot model.Snapshot, sa
|
||||
result.WrittenMinutes++
|
||||
}
|
||||
|
||||
amountDelta := -float64(totalBytes) * s.cfg.PricePerByte
|
||||
entry := model.LedgerEntry{
|
||||
ID: deterministicLedgerID(bucket),
|
||||
AccountUUID: sample.UUID,
|
||||
BucketStart: minuteStart,
|
||||
BucketEnd: minuteStart.Add(time.Minute),
|
||||
EntryType: "traffic_charge",
|
||||
RatedBytes: totalBytes,
|
||||
AmountDelta: amountDelta,
|
||||
PricingRuleVersion: s.cfg.SourceRevision,
|
||||
}
|
||||
|
||||
@ -207,7 +314,7 @@ func (s *Service) processSample(ctx context.Context, snapshot model.Snapshot, sa
|
||||
|
||||
includedApplied := minInt64(quota.RemainingIncludedQuota, totalBytes)
|
||||
chargeableBytes := totalBytes - includedApplied
|
||||
amountDelta = -float64(chargeableBytes) * effectivePricing.basePricePerByte * effectivePricing.multiplier
|
||||
amountDelta := -float64(chargeableBytes) * effectivePricing.basePricePerByte * effectivePricing.multiplier
|
||||
entry.RatedBytes = chargeableBytes
|
||||
entry.AmountDelta = amountDelta
|
||||
entry.PricingRuleVersion = effectivePricing.pricingRuleVersion
|
||||
@ -314,6 +421,16 @@ func validateSample(sample model.Sample) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateSnapshotSource(snapshot model.Snapshot, source config.ExporterSource) error {
|
||||
if source.ExpectedNodeID != "" && strings.TrimSpace(snapshot.NodeID) != source.ExpectedNodeID {
|
||||
return fmt.Errorf("source %s expected node_id %q, got %q", source.SourceID, source.ExpectedNodeID, strings.TrimSpace(snapshot.NodeID))
|
||||
}
|
||||
if source.ExpectedEnv != "" && strings.TrimSpace(snapshot.Env) != source.ExpectedEnv {
|
||||
return fmt.Errorf("source %s expected env %q, got %q", source.SourceID, source.ExpectedEnv, strings.TrimSpace(snapshot.Env))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func deterministicLedgerID(bucket model.MinuteBucket) string {
|
||||
key := fmt.Sprintf("%s|%s|%s|%s|%s", bucket.BucketStart.UTC().Format(time.RFC3339), bucket.NodeID, bucket.AccountUUID, bucket.Region, bucket.LineCode)
|
||||
return uuid.NewSHA1(uuid.NameSpaceOID, []byte(key)).String()
|
||||
@ -335,6 +452,34 @@ func joinError(existing, next string) string {
|
||||
return existing + "; " + next
|
||||
}
|
||||
|
||||
func sourceStatusFromState(state model.SourceSyncState) model.SourceStatus {
|
||||
return model.SourceStatus{
|
||||
SourceID: state.SourceID,
|
||||
LastCompletedUntil: copyTimePtr(state.LastCompletedUntil),
|
||||
LastAttemptedAt: copyTimePtr(state.LastAttemptedAt),
|
||||
LastSucceededAt: copyTimePtr(state.LastSucceededAt),
|
||||
LastError: state.LastError,
|
||||
}
|
||||
}
|
||||
|
||||
func copyTimePtr(value *time.Time) *time.Time {
|
||||
if value == nil {
|
||||
return nil
|
||||
}
|
||||
cloned := value.UTC()
|
||||
return &cloned
|
||||
}
|
||||
|
||||
func (s *Service) recordSourceFailure(ctx context.Context, state model.SourceSyncState, err error) (model.SourceStatus, error) {
|
||||
message := err.Error()
|
||||
state.LastError = message
|
||||
if persistErr := s.repo.UpsertSourceSyncState(ctx, state); persistErr != nil {
|
||||
message = joinError(message, fmt.Sprintf("persist source error state: %v", persistErr))
|
||||
state.LastError = message
|
||||
}
|
||||
return sourceStatusFromState(state), err
|
||||
}
|
||||
|
||||
func (s *Service) record(result model.JobResult) {
|
||||
s.lastResult = result
|
||||
s.lastError = result.Error
|
||||
|
||||
@ -10,13 +10,38 @@ import (
|
||||
"billing-service/internal/repository"
|
||||
)
|
||||
|
||||
type fakeSource struct {
|
||||
snapshot model.Snapshot
|
||||
err error
|
||||
type fakeWindowSource struct {
|
||||
pagesBySource map[string][]model.SnapshotWindowPage
|
||||
errBySource map[string]error
|
||||
requests []windowRequest
|
||||
}
|
||||
|
||||
func (f fakeSource) FetchLatestSnapshot(context.Context) (model.Snapshot, error) {
|
||||
return f.snapshot, f.err
|
||||
type windowRequest struct {
|
||||
sourceID string
|
||||
since time.Time
|
||||
until time.Time
|
||||
limit int
|
||||
cursor *time.Time
|
||||
}
|
||||
|
||||
func (f *fakeWindowSource) FetchWindow(_ context.Context, source config.ExporterSource, since, until time.Time, limit int, cursor *time.Time) (model.SnapshotWindowPage, error) {
|
||||
f.requests = append(f.requests, windowRequest{
|
||||
sourceID: source.SourceID,
|
||||
since: since,
|
||||
until: until,
|
||||
limit: limit,
|
||||
cursor: cursor,
|
||||
})
|
||||
if err := f.errBySource[source.SourceID]; err != nil {
|
||||
return model.SnapshotWindowPage{}, err
|
||||
}
|
||||
pages := f.pagesBySource[source.SourceID]
|
||||
if len(pages) == 0 {
|
||||
return model.SnapshotWindowPage{}, nil
|
||||
}
|
||||
page := pages[0]
|
||||
f.pagesBySource[source.SourceID] = pages[1:]
|
||||
return page, nil
|
||||
}
|
||||
|
||||
type memoryRepo struct {
|
||||
@ -25,6 +50,7 @@ type memoryRepo struct {
|
||||
ledgers map[string]model.LedgerEntry
|
||||
quotas map[string]model.QuotaState
|
||||
profiles map[string]model.BillingProfile
|
||||
sourceSync map[string]model.SourceSyncState
|
||||
}
|
||||
|
||||
func newMemoryRepo() *memoryRepo {
|
||||
@ -34,6 +60,7 @@ func newMemoryRepo() *memoryRepo {
|
||||
ledgers: map[string]model.LedgerEntry{},
|
||||
quotas: map[string]model.QuotaState{},
|
||||
profiles: map[string]model.BillingProfile{},
|
||||
sourceSync: map[string]model.SourceSyncState{},
|
||||
}
|
||||
}
|
||||
|
||||
@ -45,7 +72,7 @@ func bucketKey(bucket model.MinuteBucket) string {
|
||||
return bucket.BucketStart.UTC().Format(time.RFC3339) + "\x00" + bucket.NodeID + "\x00" + bucket.AccountUUID + "\x00" + bucket.Region + "\x00" + bucket.LineCode
|
||||
}
|
||||
|
||||
func (m *memoryRepo) GetCheckpoint(ctx context.Context, nodeID, accountUUID string) (*model.Checkpoint, error) {
|
||||
func (m *memoryRepo) GetCheckpoint(_ context.Context, nodeID, accountUUID string) (*model.Checkpoint, error) {
|
||||
if checkpoint, ok := m.checkpoints[checkpointKey(nodeID, accountUUID)]; ok {
|
||||
copy := checkpoint
|
||||
return ©, nil
|
||||
@ -53,25 +80,25 @@ func (m *memoryRepo) GetCheckpoint(ctx context.Context, nodeID, accountUUID stri
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *memoryRepo) UpsertCheckpoint(ctx context.Context, checkpoint model.Checkpoint) error {
|
||||
func (m *memoryRepo) UpsertCheckpoint(_ context.Context, checkpoint model.Checkpoint) error {
|
||||
m.checkpoints[checkpointKey(checkpoint.NodeID, checkpoint.AccountUUID)] = checkpoint
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryRepo) UpsertMinuteBucket(ctx context.Context, bucket model.MinuteBucket) (bool, error) {
|
||||
func (m *memoryRepo) UpsertMinuteBucket(_ context.Context, bucket model.MinuteBucket) (bool, error) {
|
||||
key := bucketKey(bucket)
|
||||
_, existed := m.buckets[key]
|
||||
m.buckets[key] = bucket
|
||||
return existed, nil
|
||||
}
|
||||
|
||||
func (m *memoryRepo) UpsertLedger(ctx context.Context, entry model.LedgerEntry) (bool, error) {
|
||||
func (m *memoryRepo) UpsertLedger(_ context.Context, entry model.LedgerEntry) (bool, error) {
|
||||
_, existed := m.ledgers[entry.ID]
|
||||
m.ledgers[entry.ID] = entry
|
||||
return existed, nil
|
||||
}
|
||||
|
||||
func (m *memoryRepo) GetQuotaState(ctx context.Context, accountUUID string) (*model.QuotaState, error) {
|
||||
func (m *memoryRepo) GetQuotaState(_ context.Context, accountUUID string) (*model.QuotaState, error) {
|
||||
if quota, ok := m.quotas[accountUUID]; ok {
|
||||
copy := quota
|
||||
return ©, nil
|
||||
@ -79,12 +106,12 @@ func (m *memoryRepo) GetQuotaState(ctx context.Context, accountUUID string) (*mo
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *memoryRepo) UpsertQuotaState(ctx context.Context, state model.QuotaState) error {
|
||||
func (m *memoryRepo) UpsertQuotaState(_ context.Context, state model.QuotaState) error {
|
||||
m.quotas[state.AccountUUID] = state
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryRepo) GetBillingProfile(ctx context.Context, accountUUID string) (*model.BillingProfile, error) {
|
||||
func (m *memoryRepo) GetBillingProfile(_ context.Context, accountUUID string) (*model.BillingProfile, error) {
|
||||
if profile, ok := m.profiles[accountUUID]; ok {
|
||||
copy := profile
|
||||
return ©, nil
|
||||
@ -92,10 +119,40 @@ func (m *memoryRepo) GetBillingProfile(ctx context.Context, accountUUID string)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *memoryRepo) GetSourceSyncState(_ context.Context, sourceID string) (*model.SourceSyncState, error) {
|
||||
if state, ok := m.sourceSync[sourceID]; ok {
|
||||
copy := cloneSyncState(state)
|
||||
return ©, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *memoryRepo) UpsertSourceSyncState(_ context.Context, state model.SourceSyncState) error {
|
||||
m.sourceSync[state.SourceID] = cloneSyncState(state)
|
||||
return nil
|
||||
}
|
||||
|
||||
func cloneSyncState(state model.SourceSyncState) model.SourceSyncState {
|
||||
copy := state
|
||||
copy.LastCompletedUntil = copyTimePtr(state.LastCompletedUntil)
|
||||
copy.LastAttemptedAt = copyTimePtr(state.LastAttemptedAt)
|
||||
copy.LastSucceededAt = copyTimePtr(state.LastSucceededAt)
|
||||
return copy
|
||||
}
|
||||
|
||||
var _ repository.Repository = (*memoryRepo)(nil)
|
||||
|
||||
func baseConfig() config.Config {
|
||||
return config.Config{
|
||||
ExporterSources: []config.ExporterSource{{
|
||||
SourceID: "default",
|
||||
BaseURL: "https://jp-xhttp-contabo.svc.plus",
|
||||
ExpectedNodeID: "jp-node",
|
||||
ExpectedEnv: "prod",
|
||||
Enabled: true,
|
||||
TimeoutSeconds: 15,
|
||||
}},
|
||||
InternalServiceToken: "secret",
|
||||
DefaultRegion: "",
|
||||
SourceRevision: "billing-service-v1",
|
||||
PricePerByte: 0.5,
|
||||
@ -104,20 +161,33 @@ func baseConfig() config.Config {
|
||||
}
|
||||
}
|
||||
|
||||
func singleSnapshotPage(snapshot model.Snapshot) model.SnapshotWindowPage {
|
||||
return model.SnapshotWindowPage{
|
||||
NodeID: snapshot.NodeID,
|
||||
Env: snapshot.Env,
|
||||
Snapshots: []model.Snapshot{snapshot},
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeltaCalculationAndQuotaUpdate(t *testing.T) {
|
||||
repo := newMemoryRepo()
|
||||
svc := New(baseConfig(), fakeSource{snapshot: model.Snapshot{
|
||||
CollectedAt: time.Date(2026, 4, 8, 10, 30, 15, 0, time.UTC),
|
||||
NodeID: "jp-node",
|
||||
Env: "prod",
|
||||
Samples: []model.Sample{{
|
||||
UUID: "11111111-1111-1111-1111-111111111111",
|
||||
Email: "user@example.com",
|
||||
InboundTag: "premium",
|
||||
UplinkBytesTotal: 100,
|
||||
DownlinkBytesTotal: 50,
|
||||
}},
|
||||
}}, repo)
|
||||
source := &fakeWindowSource{
|
||||
pagesBySource: map[string][]model.SnapshotWindowPage{
|
||||
"default": {singleSnapshotPage(model.Snapshot{
|
||||
CollectedAt: time.Date(2026, 4, 8, 10, 30, 15, 0, time.UTC),
|
||||
NodeID: "jp-node",
|
||||
Env: "prod",
|
||||
Samples: []model.Sample{{
|
||||
UUID: "11111111-1111-1111-1111-111111111111",
|
||||
Email: "user@example.com",
|
||||
InboundTag: "premium",
|
||||
UplinkBytesTotal: 100,
|
||||
DownlinkBytesTotal: 50,
|
||||
}},
|
||||
})},
|
||||
},
|
||||
}
|
||||
svc := New(baseConfig(), source, repo)
|
||||
|
||||
result, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate")
|
||||
if err != nil {
|
||||
@ -144,17 +214,22 @@ func TestIncludedQuotaAndMultipliersFromBillingProfile(t *testing.T) {
|
||||
LineMultiplier: 2.0,
|
||||
PricingRuleVersion: "pricing-v2",
|
||||
}
|
||||
svc := New(baseConfig(), fakeSource{snapshot: model.Snapshot{
|
||||
CollectedAt: time.Date(2026, 4, 8, 10, 30, 15, 0, time.UTC),
|
||||
NodeID: "jp-node",
|
||||
Env: "prod",
|
||||
Samples: []model.Sample{{
|
||||
UUID: accountUUID,
|
||||
InboundTag: "premium",
|
||||
UplinkBytesTotal: 100,
|
||||
DownlinkBytesTotal: 50,
|
||||
}},
|
||||
}}, repo)
|
||||
source := &fakeWindowSource{
|
||||
pagesBySource: map[string][]model.SnapshotWindowPage{
|
||||
"default": {singleSnapshotPage(model.Snapshot{
|
||||
CollectedAt: time.Date(2026, 4, 8, 10, 30, 15, 0, time.UTC),
|
||||
NodeID: "jp-node",
|
||||
Env: "prod",
|
||||
Samples: []model.Sample{{
|
||||
UUID: accountUUID,
|
||||
InboundTag: "premium",
|
||||
UplinkBytesTotal: 100,
|
||||
DownlinkBytesTotal: 50,
|
||||
}},
|
||||
})},
|
||||
},
|
||||
}
|
||||
svc := New(baseConfig(), source, repo)
|
||||
|
||||
result, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate")
|
||||
if err != nil {
|
||||
@ -197,19 +272,37 @@ func TestIncludedQuotaAndMultipliersFromBillingProfile(t *testing.T) {
|
||||
|
||||
func TestDuplicateMinuteIsReplaySafe(t *testing.T) {
|
||||
repo := newMemoryRepo()
|
||||
snapshot := model.Snapshot{
|
||||
CollectedAt: time.Date(2026, 4, 8, 10, 30, 30, 0, time.UTC),
|
||||
NodeID: "jp-node",
|
||||
Env: "prod",
|
||||
Samples: []model.Sample{{
|
||||
UUID: "11111111-1111-1111-1111-111111111111",
|
||||
Email: "user@example.com",
|
||||
InboundTag: "premium",
|
||||
UplinkBytesTotal: 100,
|
||||
DownlinkBytesTotal: 50,
|
||||
}},
|
||||
source := &fakeWindowSource{
|
||||
pagesBySource: map[string][]model.SnapshotWindowPage{
|
||||
"default": {
|
||||
singleSnapshotPage(model.Snapshot{
|
||||
CollectedAt: time.Date(2026, 4, 8, 10, 30, 30, 0, time.UTC),
|
||||
NodeID: "jp-node",
|
||||
Env: "prod",
|
||||
Samples: []model.Sample{{
|
||||
UUID: "11111111-1111-1111-1111-111111111111",
|
||||
Email: "user@example.com",
|
||||
InboundTag: "premium",
|
||||
UplinkBytesTotal: 100,
|
||||
DownlinkBytesTotal: 50,
|
||||
}},
|
||||
}),
|
||||
singleSnapshotPage(model.Snapshot{
|
||||
CollectedAt: time.Date(2026, 4, 8, 10, 30, 30, 0, time.UTC),
|
||||
NodeID: "jp-node",
|
||||
Env: "prod",
|
||||
Samples: []model.Sample{{
|
||||
UUID: "11111111-1111-1111-1111-111111111111",
|
||||
Email: "user@example.com",
|
||||
InboundTag: "premium",
|
||||
UplinkBytesTotal: 100,
|
||||
DownlinkBytesTotal: 50,
|
||||
}},
|
||||
}),
|
||||
},
|
||||
},
|
||||
}
|
||||
svc := New(baseConfig(), fakeSource{snapshot: snapshot}, repo)
|
||||
svc := New(baseConfig(), source, repo)
|
||||
|
||||
if _, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate"); err != nil {
|
||||
t.Fatalf("first run: %v", err)
|
||||
@ -240,17 +333,22 @@ func TestNegativeDeltaProtection(t *testing.T) {
|
||||
XrayRevision: "prev",
|
||||
ResetEpoch: 0,
|
||||
}
|
||||
svc := New(cfg, fakeSource{snapshot: model.Snapshot{
|
||||
CollectedAt: time.Date(2026, 4, 8, 10, 31, 0, 0, time.UTC),
|
||||
NodeID: "jp-node",
|
||||
Env: "prod",
|
||||
Samples: []model.Sample{{
|
||||
UUID: accountUUID,
|
||||
InboundTag: "premium",
|
||||
UplinkBytesTotal: 10,
|
||||
DownlinkBytesTotal: 20,
|
||||
}},
|
||||
}}, repo)
|
||||
source := &fakeWindowSource{
|
||||
pagesBySource: map[string][]model.SnapshotWindowPage{
|
||||
"default": {singleSnapshotPage(model.Snapshot{
|
||||
CollectedAt: time.Date(2026, 4, 8, 10, 31, 0, 0, time.UTC),
|
||||
NodeID: "jp-node",
|
||||
Env: "prod",
|
||||
Samples: []model.Sample{{
|
||||
UUID: accountUUID,
|
||||
InboundTag: "premium",
|
||||
UplinkBytesTotal: 10,
|
||||
DownlinkBytesTotal: 20,
|
||||
}},
|
||||
})},
|
||||
},
|
||||
}
|
||||
svc := New(cfg, source, repo)
|
||||
|
||||
result, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate")
|
||||
if err != nil {
|
||||
@ -278,17 +376,22 @@ func TestRestartRecoveryFromCheckpoint(t *testing.T) {
|
||||
LastDownlinkTotal: 100,
|
||||
LastSeenAt: time.Now().UTC(),
|
||||
}
|
||||
svc := New(baseConfig(), fakeSource{snapshot: model.Snapshot{
|
||||
CollectedAt: time.Date(2026, 4, 8, 10, 32, 0, 0, time.UTC),
|
||||
NodeID: "jp-node",
|
||||
Env: "prod",
|
||||
Samples: []model.Sample{{
|
||||
UUID: accountUUID,
|
||||
InboundTag: "premium",
|
||||
UplinkBytesTotal: 130,
|
||||
DownlinkBytesTotal: 140,
|
||||
}},
|
||||
}}, repo)
|
||||
source := &fakeWindowSource{
|
||||
pagesBySource: map[string][]model.SnapshotWindowPage{
|
||||
"default": {singleSnapshotPage(model.Snapshot{
|
||||
CollectedAt: time.Date(2026, 4, 8, 10, 32, 0, 0, time.UTC),
|
||||
NodeID: "jp-node",
|
||||
Env: "prod",
|
||||
Samples: []model.Sample{{
|
||||
UUID: accountUUID,
|
||||
InboundTag: "premium",
|
||||
UplinkBytesTotal: 130,
|
||||
DownlinkBytesTotal: 140,
|
||||
}},
|
||||
})},
|
||||
},
|
||||
}
|
||||
svc := New(baseConfig(), source, repo)
|
||||
|
||||
result, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate")
|
||||
if err != nil {
|
||||
@ -313,60 +416,103 @@ func TestMultiEnvIsolation(t *testing.T) {
|
||||
repo := newMemoryRepo()
|
||||
accountUUID := "11111111-1111-1111-1111-111111111111"
|
||||
cfg := baseConfig()
|
||||
|
||||
prodSvc := New(cfg, fakeSource{snapshot: model.Snapshot{
|
||||
CollectedAt: time.Date(2026, 4, 8, 10, 33, 0, 0, time.UTC),
|
||||
NodeID: "jp-node",
|
||||
Env: "prod",
|
||||
Samples: []model.Sample{{UUID: accountUUID, InboundTag: "premium", UplinkBytesTotal: 10, DownlinkBytesTotal: 10}},
|
||||
}}, repo)
|
||||
previewSvc := New(cfg, fakeSource{snapshot: model.Snapshot{
|
||||
CollectedAt: time.Date(2026, 4, 8, 10, 33, 0, 0, time.UTC),
|
||||
NodeID: "jp-node",
|
||||
Env: "preview",
|
||||
Samples: []model.Sample{{UUID: accountUUID, InboundTag: "premium", UplinkBytesTotal: 10, DownlinkBytesTotal: 10}},
|
||||
}}, repo)
|
||||
|
||||
if _, err := prodSvc.RunCollectAndRate(context.Background(), "collect-and-rate"); err != nil {
|
||||
t.Fatalf("prod run: %v", err)
|
||||
cfg.ExporterSources = []config.ExporterSource{
|
||||
{
|
||||
SourceID: "prod-source",
|
||||
BaseURL: "https://prod.svc.plus",
|
||||
ExpectedNodeID: "jp-node",
|
||||
ExpectedEnv: "prod",
|
||||
Enabled: true,
|
||||
TimeoutSeconds: 15,
|
||||
},
|
||||
{
|
||||
SourceID: "preview-source",
|
||||
BaseURL: "https://preview.svc.plus",
|
||||
ExpectedNodeID: "jp-node",
|
||||
ExpectedEnv: "preview",
|
||||
Enabled: true,
|
||||
TimeoutSeconds: 15,
|
||||
},
|
||||
}
|
||||
if _, err := previewSvc.RunCollectAndRate(context.Background(), "collect-and-rate"); err != nil {
|
||||
t.Fatalf("preview run: %v", err)
|
||||
source := &fakeWindowSource{
|
||||
pagesBySource: map[string][]model.SnapshotWindowPage{
|
||||
"prod-source": {singleSnapshotPage(model.Snapshot{
|
||||
CollectedAt: time.Date(2026, 4, 8, 10, 33, 0, 0, time.UTC),
|
||||
NodeID: "jp-node",
|
||||
Env: "prod",
|
||||
Samples: []model.Sample{{UUID: accountUUID, InboundTag: "premium", UplinkBytesTotal: 10, DownlinkBytesTotal: 10}},
|
||||
})},
|
||||
"preview-source": {singleSnapshotPage(model.Snapshot{
|
||||
CollectedAt: time.Date(2026, 4, 8, 10, 33, 0, 0, time.UTC),
|
||||
NodeID: "jp-node",
|
||||
Env: "preview",
|
||||
Samples: []model.Sample{{UUID: accountUUID, InboundTag: "premium", UplinkBytesTotal: 10, DownlinkBytesTotal: 10}},
|
||||
})},
|
||||
},
|
||||
}
|
||||
svc := New(cfg, source, repo)
|
||||
|
||||
if _, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate"); err != nil {
|
||||
t.Fatalf("run: %v", err)
|
||||
}
|
||||
if len(repo.buckets) != 2 {
|
||||
t.Fatalf("expected isolated buckets per env, got %d", len(repo.buckets))
|
||||
}
|
||||
}
|
||||
|
||||
func TestLateMinuteReconcileUsesSameMinuteKey(t *testing.T) {
|
||||
func TestExpectedNodeIDMismatchIsFatalForSource(t *testing.T) {
|
||||
repo := newMemoryRepo()
|
||||
accountUUID := "11111111-1111-1111-1111-111111111111"
|
||||
cfg := baseConfig()
|
||||
collectedAt := time.Date(2026, 4, 8, 10, 34, 50, 0, time.UTC)
|
||||
snapshot := model.Snapshot{
|
||||
CollectedAt: collectedAt,
|
||||
NodeID: "jp-node",
|
||||
Env: "prod",
|
||||
Samples: []model.Sample{{
|
||||
UUID: accountUUID,
|
||||
InboundTag: "premium",
|
||||
UplinkBytesTotal: 20,
|
||||
DownlinkBytesTotal: 20,
|
||||
}},
|
||||
source := &fakeWindowSource{
|
||||
pagesBySource: map[string][]model.SnapshotWindowPage{
|
||||
"default": {singleSnapshotPage(model.Snapshot{
|
||||
CollectedAt: time.Date(2026, 4, 8, 10, 34, 0, 0, time.UTC),
|
||||
NodeID: "unexpected-node",
|
||||
Env: "prod",
|
||||
Samples: []model.Sample{{UUID: "11111111-1111-1111-1111-111111111111", InboundTag: "premium", UplinkBytesTotal: 10, DownlinkBytesTotal: 10}},
|
||||
})},
|
||||
},
|
||||
}
|
||||
svc := New(cfg, fakeSource{snapshot: snapshot}, repo)
|
||||
svc := New(baseConfig(), source, repo)
|
||||
|
||||
if _, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate"); err != nil {
|
||||
t.Fatalf("first run: %v", err)
|
||||
result, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate")
|
||||
if err == nil {
|
||||
t.Fatalf("expected source mismatch error")
|
||||
}
|
||||
result, err := svc.RunCollectAndRate(context.Background(), "reconcile")
|
||||
if err != nil {
|
||||
t.Fatalf("reconcile run: %v", err)
|
||||
}
|
||||
if result.ReplayedMinutes == 0 {
|
||||
t.Fatalf("expected reconcile to report replayed minute, got %#v", result)
|
||||
}
|
||||
if len(repo.buckets) != 1 {
|
||||
t.Fatalf("expected single logical minute bucket, got %d", len(repo.buckets))
|
||||
if result.Status != "error" {
|
||||
t.Fatalf("expected error status, got %#v", result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSourceStatusIncludesSyncState(t *testing.T) {
|
||||
repo := newMemoryRepo()
|
||||
source := &fakeWindowSource{
|
||||
pagesBySource: map[string][]model.SnapshotWindowPage{
|
||||
"default": {singleSnapshotPage(model.Snapshot{
|
||||
CollectedAt: time.Date(2026, 4, 8, 10, 35, 0, 0, time.UTC),
|
||||
NodeID: "jp-node",
|
||||
Env: "prod",
|
||||
Samples: []model.Sample{{
|
||||
UUID: "11111111-1111-1111-1111-111111111111",
|
||||
InboundTag: "premium",
|
||||
UplinkBytesTotal: 10,
|
||||
DownlinkBytesTotal: 10,
|
||||
}},
|
||||
})},
|
||||
},
|
||||
}
|
||||
svc := New(baseConfig(), source, repo)
|
||||
|
||||
result, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate")
|
||||
if err != nil {
|
||||
t.Fatalf("run job: %v", err)
|
||||
}
|
||||
if len(result.SourceStatuses) != 1 {
|
||||
t.Fatalf("expected one source status, got %#v", result.SourceStatuses)
|
||||
}
|
||||
if result.SourceStatuses[0].SourceID != "default" {
|
||||
t.Fatalf("unexpected source status %#v", result.SourceStatuses[0])
|
||||
}
|
||||
if result.SourceStatuses[0].LastCompletedUntil == nil {
|
||||
t.Fatalf("expected last completed until in source status")
|
||||
}
|
||||
}
|
||||
|
||||
89
sql/billing-service-schema.sql
Normal file
89
sql/billing-service-schema.sql
Normal file
@ -0,0 +1,89 @@
|
||||
-- billing-service bootstrap/reference DDL
|
||||
--
|
||||
-- This file mirrors the accounting tables that billing-service depends on in
|
||||
-- the current accounts.svc.plus PostgreSQL schema. It is a service-owned
|
||||
-- documentation/bootstrap artifact and does not redefine schema ownership.
|
||||
|
||||
CREATE TABLE IF NOT EXISTS public.traffic_stat_checkpoints (
|
||||
node_id TEXT NOT NULL,
|
||||
account_uuid UUID NOT NULL REFERENCES public.users(uuid) ON DELETE CASCADE,
|
||||
last_uplink_total BIGINT NOT NULL DEFAULT 0,
|
||||
last_downlink_total BIGINT NOT NULL DEFAULT 0,
|
||||
last_seen_at TIMESTAMPTZ NOT NULL,
|
||||
xray_revision TEXT NOT NULL DEFAULT '',
|
||||
reset_epoch BIGINT NOT NULL DEFAULT 0,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (node_id, account_uuid)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS public.traffic_minute_buckets (
|
||||
bucket_start TIMESTAMPTZ NOT NULL,
|
||||
node_id TEXT NOT NULL,
|
||||
account_uuid UUID NOT NULL REFERENCES public.users(uuid) ON DELETE CASCADE,
|
||||
region TEXT NOT NULL DEFAULT '',
|
||||
line_code TEXT NOT NULL DEFAULT '',
|
||||
uplink_bytes BIGINT NOT NULL DEFAULT 0,
|
||||
downlink_bytes BIGINT NOT NULL DEFAULT 0,
|
||||
total_bytes BIGINT NOT NULL DEFAULT 0,
|
||||
multiplier DOUBLE PRECISION NOT NULL DEFAULT 1.0,
|
||||
rating_status TEXT NOT NULL DEFAULT 'pending',
|
||||
source_revision TEXT NOT NULL DEFAULT '',
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (bucket_start, node_id, account_uuid, region, line_code)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS public.billing_ledger (
|
||||
id UUID PRIMARY KEY,
|
||||
account_uuid UUID NOT NULL REFERENCES public.users(uuid) ON DELETE CASCADE,
|
||||
bucket_start TIMESTAMPTZ NOT NULL,
|
||||
bucket_end TIMESTAMPTZ NOT NULL,
|
||||
entry_type TEXT NOT NULL,
|
||||
rated_bytes BIGINT NOT NULL DEFAULT 0,
|
||||
amount_delta DOUBLE PRECISION NOT NULL DEFAULT 0,
|
||||
balance_after DOUBLE PRECISION NOT NULL DEFAULT 0,
|
||||
pricing_rule_version TEXT NOT NULL DEFAULT '',
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS public.account_quota_states (
|
||||
account_uuid UUID PRIMARY KEY REFERENCES public.users(uuid) ON DELETE CASCADE,
|
||||
remaining_included_quota BIGINT NOT NULL DEFAULT 0,
|
||||
current_balance DOUBLE PRECISION NOT NULL DEFAULT 0,
|
||||
arrears BOOLEAN NOT NULL DEFAULT false,
|
||||
throttle_state TEXT NOT NULL DEFAULT 'normal',
|
||||
suspend_state TEXT NOT NULL DEFAULT 'active',
|
||||
last_rated_bucket_at TIMESTAMPTZ NULL,
|
||||
effective_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS public.account_billing_profiles (
|
||||
account_uuid UUID PRIMARY KEY REFERENCES public.users(uuid) ON DELETE CASCADE,
|
||||
package_name TEXT NOT NULL DEFAULT 'default',
|
||||
included_quota_bytes BIGINT NOT NULL DEFAULT 0,
|
||||
base_price_per_byte DOUBLE PRECISION NOT NULL DEFAULT 0,
|
||||
region_multiplier DOUBLE PRECISION NOT NULL DEFAULT 1.0,
|
||||
line_multiplier DOUBLE PRECISION NOT NULL DEFAULT 1.0,
|
||||
peak_multiplier DOUBLE PRECISION NOT NULL DEFAULT 1.0,
|
||||
offpeak_multiplier DOUBLE PRECISION NOT NULL DEFAULT 1.0,
|
||||
pricing_rule_version TEXT NOT NULL DEFAULT 'pricing-default-v1',
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS public.billing_source_sync_state (
|
||||
source_id TEXT PRIMARY KEY,
|
||||
last_completed_until TIMESTAMPTZ NULL,
|
||||
last_attempted_at TIMESTAMPTZ NULL,
|
||||
last_succeeded_at TIMESTAMPTZ NULL,
|
||||
last_error TEXT NOT NULL DEFAULT '',
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_traffic_minute_buckets_account_bucket
|
||||
ON public.traffic_minute_buckets (account_uuid, bucket_start DESC);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_billing_ledger_account_created
|
||||
ON public.billing_ledger (account_uuid, created_at DESC);
|
||||
9
testdata/postgres/init.sql
vendored
9
testdata/postgres/init.sql
vendored
@ -84,3 +84,12 @@ CREATE TABLE IF NOT EXISTS public.account_billing_profiles (
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS public.billing_source_sync_state (
|
||||
source_id TEXT PRIMARY KEY,
|
||||
last_completed_until TIMESTAMPTZ NULL,
|
||||
last_attempted_at TIMESTAMPTZ NULL,
|
||||
last_succeeded_at TIMESTAMPTZ NULL,
|
||||
last_error TEXT NOT NULL DEFAULT '',
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
Loading…
Reference in New Issue
Block a user