feat: add multi-source billing ingestion

This commit is contained in:
Haitao Pan 2026-04-12 13:14:41 +08:00
parent 1cd49fadd9
commit 8cf03ba207
17 changed files with 1344 additions and 188 deletions

View File

@ -13,3 +13,11 @@ existing `accounts.svc.plus` PostgreSQL schema.
- `POST /v1/jobs/reconcile`
- `GET /healthz`
- `GET /v1/status`
## Documentation
- `docs/README.md` - documentation index and verification notes
- `docs/architecture.md` - deployment and data-flow diagrams
- `docs/api.md` - task API surface and upstream/downstream boundaries
- `sql/billing-service-schema.sql` - bootstrap/reference DDL aligned with the
current `accounts.svc.plus` accounting schema

View File

@ -34,7 +34,7 @@ func main() {
svc := service.New(
cfg,
exporter.NewClient(cfg.ExporterBaseURL),
exporter.NewClient(cfg.InternalServiceToken),
repository.NewPostgres(db),
)
svc.Start(ctx)

48
docs/README.md Normal file
View File

@ -0,0 +1,48 @@
# billing-service docs
This directory holds service-owned documentation for `billing-service`.
## Documents
- [architecture.md](architecture.md) - deployment topology, billing data flow,
and current-vs-target architecture notes
- [api.md](api.md) - task endpoints, upstream snapshot contract, and downstream
read-model boundaries
- [multi-node-https-plan.md](multi-node-https-plan.md) - target-state plan for
evolving from a single exporter URL to secure multi-node HTTPS ingestion
- [../sql/billing-service-schema.sql](../sql/billing-service-schema.sql) -
reference DDL for the accounting tables `billing-service` depends on
## Scope
These docs describe the `billing-service` role inside the Cloud Network Billing
& Control Plane.
- `billing-service` is the task-oriented write model
- `accounts.svc.plus` is the PostgreSQL-backed read model
- `console.svc.plus` is the presentation layer and does not query
`billing-service` directly
System-wide contracts still live in
`github-org-cloud-neutral-toolkit/docs/architecture/network-billing-contracts.md`.
## Deployment Verification
Local or operator dry-run validation:
```bash
cd /Users/shenlan/workspaces/cloud-neutral-toolkit/playbooks
export DATABASE_URL=postgres://...
ANSIBLE_CONFIG=./ansible.cfg \
ansible-playbook -i ./inventory.ini -D -C ./deploy_billing_service.yml -l jp_xhttp_contabo_host
```
Notes:
- `DATABASE_URL` must be exported before running `deploy_billing_service.yml`
- on `jp-xhttp-contabo.svc.plus`, `DATABASE_URL` should reference the same
`account` database used by `accounts.svc.plus`
- check mode may report predicted changes; the goal is to pass the preflight
assertion and render a valid deployment plan
- GitHub Actions uses the `BILLING_SERVICE_DATABASE_URL` secret to satisfy the
same precondition in the `deploy-billing-service` job

149
docs/api.md Normal file
View File

@ -0,0 +1,149 @@
# billing-service API and interfaces
This document describes the current `billing-service` task API plus the
upstream and downstream interfaces it depends on.
## Service endpoints
### `GET /healthz`
Returns service health derived from the most recent collect-and-rate execution.
Example response:
```json
{
"status": "ok",
"message": ""
}
```
### `GET /v1/status`
Returns the latest in-memory job result snapshot.
Key fields:
- `job`
- `started_at`
- `finished_at`
- `processed_samples`
- `written_minutes`
- `replayed_minutes`
- `status`
- `error`
### `POST /v1/jobs/collect-and-rate`
Triggers an immediate snapshot pull from `xray-exporter`, computes minute
deltas, rates chargeable bytes, and writes replay-safe facts into PostgreSQL.
Behavior:
- method must be `POST`
- returns `200` when the run completes without a fatal service error
- returns `503` when upstream fetch or persistence fails hard enough to mark the
run unavailable
### `POST /v1/jobs/reconcile`
Triggers the same execution path as collect-and-rate, but records the job name
as `reconcile` for operational visibility.
## Upstream dependency
### `xray-exporter`
`billing-service` currently depends on a single exporter base URL and fetches:
- `GET /v1/snapshots/latest`
Minimum payload shape:
```json
{
"collected_at": "2026-04-08T12:00:00Z",
"node_id": "jp-xhttp-contabo.svc.plus",
"env": "prod",
"samples": [
{
"uuid": "uuid-1",
"email": "user@example.com",
"inbound_tag": "xhttp-premium",
"uplink_bytes_total": 1024,
"downlink_bytes_total": 2048
}
]
}
```
Required fields:
- `collected_at`
- `node_id`
- `env`
- `samples[].uuid`
- `samples[].email`
- `samples[].inbound_tag`
- `samples[].uplink_bytes_total`
- `samples[].downlink_bytes_total`
### Target upstream contract
Current production behavior remains `GET /v1/snapshots/latest`, but the target
multi-node design should evolve to:
- HTTPS transport for remote exporter pulls
- source-specific authentication
- a windowed pull API that supports catch-up and pagination
Recommended target path:
- `GET /v1/snapshots/window?since=<RFC3339>&until=<RFC3339>&limit=<n>&cursor=<token>`
Target-state expectations:
- remote pulls use `https://` exporter base URLs
- TLS verification stays enabled
- each source can be authenticated independently
- responses can be replayed safely from source checkpoints without duplicate
billing writes
## Downstream reads
User-facing reads do not go through `billing-service`. The read model is
`accounts.svc.plus`, backed by PostgreSQL.
Relevant downstream APIs:
- `GET /api/account/usage/summary`
- `GET /api/account/usage/buckets`
- `GET /api/account/billing/summary`
Read-path rules:
- `billing-service` does not expose user-facing usage or billing query APIs
- `accounts.svc.plus` reads PostgreSQL-backed usage and billing facts
- `console.svc.plus` queries `accounts.svc.plus`, not `billing-service`
## Configuration inputs
Runtime environment variables used by the current implementation:
- `EXPORTER_BASE_URL`
- `DATABASE_URL`
- `LISTEN_ADDR`
- `COLLECT_INTERVAL`
- `DEFAULT_REGION`
- `SOURCE_REVISION`
- `PRICE_PER_BYTE`
- `INITIAL_INCLUDED_QUOTA_BYTES`
- `INITIAL_BALANCE`
`DATABASE_URL` rule:
- it must point to the same `account` database that `accounts.svc.plus` uses
- on `jp-xhttp-contabo.svc.plus`, the current accounts containers use
`DB_HOST=stunnel-client`, `DB_PORT=15432`, and `DB_NAME=account`
- `billing-service` should follow that same target so user-facing reads in
`accounts.svc.plus` see the exact facts written by `billing-service`

124
docs/architecture.md Normal file
View File

@ -0,0 +1,124 @@
# billing-service architecture
`billing-service` is the billing write model in the Cloud Network Billing &
Control Plane. It consumes normalized traffic snapshots, computes replay-safe
minute deltas, and writes billing truth into PostgreSQL.
## Deployment topology
```mermaid
flowchart TB
subgraph Node["Single-node VPS / billing host"]
Agent["agent-svc-plus"]
Exporter["xray-exporter"]
Billing["billing-service"]
StunnelClient["stunnel-client"]
end
subgraph DBHost["postgresql.svc.plus"]
StunnelServer["stunnel-server"]
PostgreSQL["PostgreSQL"]
end
subgraph ReadPath["Read and presentation"]
Accounts["accounts.svc.plus"]
Console["console.svc.plus"]
end
Agent --> Billing
Agent -. coordination .-> Exporter
Billing --> StunnelClient
StunnelClient --> StunnelServer
StunnelServer --> PostgreSQL
Accounts --> PostgreSQL
Console --> Accounts
```
## Data flow
```mermaid
flowchart LR
Xray["xray-core<br/>raw cumulative counters"]
Exporter["xray-exporter<br/>translation layer"]
Billing["billing-service<br/>minute delta + rating writer"]
PostgreSQL["PostgreSQL<br/>billing source of truth"]
Accounts["accounts.svc.plus<br/>read model API"]
Console["console.svc.plus<br/>presentation"]
Agent["agent-svc-plus<br/>control plane"]
Xray -->|"raw per-UUID totals"| Exporter
Exporter -->|"GET /v1/snapshots/latest payload"| Billing
Agent -->|"schedule collect / reconcile"| Billing
Billing -->|"idempotent writes"| PostgreSQL
PostgreSQL -->|"usage + ledger + quota facts"| Accounts
Accounts -->|"account usage / billing summary APIs"| Console
```
## Role boundaries
- `agent-svc-plus`: control plane scheduling, reconciliation triggers, and
future automation hooks
- `xray-exporter`: collection and translation layer; it exposes normalized
snapshots and Prometheus metrics
- `billing-service`: billing writer; it computes positive minute deltas and
persists replay-safe facts
- `accounts.svc.plus`: PostgreSQL-backed read model; it aggregates usage,
billing, and quota state for user-facing APIs
- `console.svc.plus`: presentation layer; it reads from `accounts.svc.plus`
only
## Shared database contract
`billing-service` and `accounts.svc.plus` share the same account database and
schema.
- the database name remains `account`
- on `jp-xhttp-contabo.svc.plus`, `accounts.svc.plus` reaches it through
`stunnel-client:15432`
- `billing-service` must point `DATABASE_URL` at that same PostgreSQL target so
writes and reads stay in one source of truth
## Current implementation vs target architecture
### Current implementation
- `billing-service` pulls from a single `EXPORTER_BASE_URL`
- the upstream snapshot source is `GET /v1/snapshots/latest`
- the service is a task-oriented writer with health, status, and job endpoints
- persisted facts land in the existing `accounts.svc.plus` accounting schema
### Target architecture
- `billing-service` remains the write model, but evolves into a multi-node
aggregation point
- the write path handles multiple exporter feeds or equivalent multi-node sample
sets without losing `node_id`, `env`, or `inbound_tag`
- remote exporter ingestion must work over HTTPS because exporters are not
guaranteed to live on the same private network
- the target pull contract must support source checkpoints and replay-safe
catch-up, rather than relying only on one `latest` snapshot
- `accounts.svc.plus` stays the read model and never delegates user-facing
usage/billing reads back to `billing-service`
## Target multi-node ingress requirements
For the target architecture, `billing-service` should treat exporter nodes as
remote sources, not implicit local sidecars.
- upstream pulls should use HTTPS with certificate validation enabled
- prefer mTLS between `billing-service` and each `xray-exporter`
- if mTLS is not ready, use HTTPS plus per-source bearer credentials
- source progress must be tracked per exporter node so retries and catch-up stay
bounded and observable
- billing completeness must come from windowed, replay-safe collection, not
from assuming the newest snapshot implies nothing was missed
- minute-level sync drift is acceptable; the target is short-window eventual
consistency rather than second-level strong consistency
## Invariants
- PostgreSQL is the only billing source of truth
- `billing-service` and `accounts.svc.plus` share the same `account` database
- Prometheus and Grafana remain observability only
- `console.svc.plus` does not read PostgreSQL or `billing-service` directly
- `accounts.svc.plus` does not use Prometheus as a billing data source

View File

@ -0,0 +1,199 @@
# billing-service multi-node HTTPS ingestion plan
This document defines the target evolution path for `billing-service` from the
current single `EXPORTER_BASE_URL` pull model to a secure multi-node ingestion
model.
## Goal
Keep `billing-service` as the single billing write model, but let it ingest
snapshots from many remote `xray-exporter` instances over HTTPS without
assuming private-network reachability.
## Consistency budget
Target state does not require second-level strong consistency.
- minute-level sync drift is acceptable
- the system should be treated as eventually consistent across a short
multi-minute window
- user-facing reads in `accounts.svc.plus` and `console.svc.plus` may lag the
newest exporter counters briefly
- billing correctness matters more than immediate freshness
Operational meaning:
- collector retries may intentionally overlap prior windows
- delayed exporter delivery should be repaired by later collect or reconcile
runs
- the write model must converge to the correct minute buckets and ledger state
without double charging
## Why the current model is not enough
Today:
- `billing-service` accepts one `EXPORTER_BASE_URL`
- it fetches one `GET /v1/snapshots/latest` payload
- it assumes the latest snapshot is enough to advance billing state
This is fine for a single local exporter, but it is not enough for:
- multiple proxy nodes
- exporters reachable only over public or cross-region networks
- outage recovery where `latest` alone cannot prove whether intermediate
windows were missed
- source-specific authentication and certificate validation
## Target design
### 1. Multi-source registry instead of one base URL
Target state replaces the single `EXPORTER_BASE_URL` dependency with a source
registry owned by `billing-service`.
Each configured source should define at least:
- `source_id`
- `node_id`
- `env`
- `base_url`
- `enabled`
- `auth_mode`
- `credential_ref`
- `ca_bundle_ref` or trusted issuer reference
- `server_name`
- `collect_interval`
- `request_timeout`
Rules:
- target `base_url` must be `https://...`
- `node_id` and `env` must match what the exporter emits
- one source maps to one exporter endpoint, even if several sources later share
the same network path
### 2. HTTPS-only upstream interaction
Target state requires secure transport for remote exporter pulls.
Security rules:
- remote exporter pulls must use HTTPS
- certificate verification must stay enabled
- `billing-service` must not rely on insecure skip-verify mode
- prefer mTLS for service-to-service trust
- if mTLS is not yet available, use HTTPS plus a per-source bearer token
- credentials must be scoped per source, not shared globally across all nodes
Recommended trust order:
1. HTTPS + mTLS
2. HTTPS + bearer token + pinned CA / trusted issuer
### 3. Completeness-first pull contract
To make multi-node billing safe, the upstream contract must evolve from
`latest` to a windowed pull API.
Recommended target contract:
`GET /v1/snapshots/window?since=<RFC3339>&until=<RFC3339>&limit=<n>&cursor=<token>`
Response shape should include:
- `source_id`
- `node_id`
- `env`
- `window_start`
- `window_end`
- `items[]`
- `next_cursor`
- `has_more`
- `emitted_at`
Each item should still carry:
- `collected_at`
- `samples[].uuid`
- `samples[].email`
- `samples[].inbound_tag`
- `samples[].uplink_bytes_total`
- `samples[].downlink_bytes_total`
Why this matters:
- `latest` is enough for observability, but not enough to prove billing
completeness
- windowed pagination lets `billing-service` resume from checkpoints and catch
up after transient failures
### 4. Source checkpoints and replay safety
`billing-service` should track fetch progress per source, not globally.
Recommended source checkpoint fields:
- `source_id`
- `last_successful_until`
- `last_cursor`
- `last_attempted_at`
- `last_succeeded_at`
- `last_error`
Collection behavior:
- pull per source using that source's last successful checkpoint
- always overlap a small safety window during retries
- rely on idempotent minute-bucket writes so overlap does not double-charge
- expose source-level health in `/v1/status`
- treat short multi-minute lag as acceptable if replay convergence is preserved
### 5. Safe write semantics
Security alone is not enough; the write path must remain replay-safe.
Target write-path rules:
- billing facts remain keyed by `node_id`, `env`, `uuid`, `inbound_tag`, and
bucket time
- re-fetching the same source window must not duplicate usage or ledger rows
- reconcile jobs must be able to replay a source or time range intentionally
## Recommended rollout
### Phase 1. Preserve current runtime
- keep `EXPORTER_BASE_URL` as legacy single-source mode
- keep `GET /v1/snapshots/latest` for current deployment compatibility
### Phase 2. Add source registry support
- introduce a multi-source config model
- let `billing-service` iterate sources internally
- keep single-source config as a compatibility shim
### Phase 3. Add HTTPS window API to exporter
- extend `xray-exporter` with a secure windowed snapshot API
- add source authentication and certificate validation requirements
### Phase 4. Dual-read migration
- let `billing-service` support both:
- legacy single-source `latest`
- target multi-source HTTPS window pulls
- compare source-level completeness and write counts during rollout
### Phase 5. Make multi-source HTTPS the default
- require HTTPS for remote exporter sources
- reserve plain HTTP for explicit same-host dev or local-only modes
- retire single global `EXPORTER_BASE_URL` as the primary production contract
## Non-goals
- exposing `billing-service` as a user-facing query API
- moving billing truth into Prometheus
- weakening TLS verification to simplify rollout
- making `accounts.svc.plus` call `billing-service` for runtime reads

View File

@ -1,6 +1,7 @@
package config
import (
"encoding/json"
"fmt"
"os"
"strconv"
@ -8,8 +9,19 @@ import (
"time"
)
type ExporterSource struct {
SourceID string
BaseURL string
ExpectedNodeID string
ExpectedEnv string
Enabled bool
TimeoutSeconds int
}
type Config struct {
ExporterBaseURL string
ExporterSources []ExporterSource
InternalServiceToken string
DatabaseURL string
ListenAddr string
CollectInterval time.Duration
@ -20,13 +32,23 @@ type Config struct {
InitialBalance float64
}
type rawExporterSource struct {
SourceID string `json:"source_id"`
BaseURL string `json:"base_url"`
ExpectedNodeID string `json:"expected_node_id"`
ExpectedEnv string `json:"expected_env"`
Enabled *bool `json:"enabled"`
TimeoutSeconds int `json:"timeout_seconds"`
}
func Load() (Config, error) {
cfg := Config{
ExporterBaseURL: strings.TrimRight(strings.TrimSpace(os.Getenv("EXPORTER_BASE_URL")), "/"),
DatabaseURL: strings.TrimSpace(os.Getenv("DATABASE_URL")),
ListenAddr: strings.TrimSpace(os.Getenv("LISTEN_ADDR")),
DefaultRegion: strings.TrimSpace(os.Getenv("DEFAULT_REGION")),
SourceRevision: strings.TrimSpace(os.Getenv("SOURCE_REVISION")),
ExporterBaseURL: strings.TrimRight(strings.TrimSpace(os.Getenv("EXPORTER_BASE_URL")), "/"),
InternalServiceToken: strings.TrimSpace(os.Getenv("INTERNAL_SERVICE_TOKEN")),
DatabaseURL: strings.TrimSpace(os.Getenv("DATABASE_URL")),
ListenAddr: strings.TrimSpace(os.Getenv("LISTEN_ADDR")),
DefaultRegion: strings.TrimSpace(os.Getenv("DEFAULT_REGION")),
SourceRevision: strings.TrimSpace(os.Getenv("SOURCE_REVISION")),
}
if cfg.ListenAddr == "" {
cfg.ListenAddr = ":8081"
@ -35,12 +57,18 @@ func Load() (Config, error) {
cfg.SourceRevision = "billing-service-v1"
}
if cfg.ExporterBaseURL == "" {
return Config{}, fmt.Errorf("EXPORTER_BASE_URL is required")
}
if cfg.DatabaseURL == "" {
return Config{}, fmt.Errorf("DATABASE_URL is required")
}
if cfg.InternalServiceToken == "" {
return Config{}, fmt.Errorf("INTERNAL_SERVICE_TOKEN is required")
}
sources, err := loadExporterSources(cfg.ExporterBaseURL, strings.TrimSpace(os.Getenv("EXPORTER_SOURCES_JSON")))
if err != nil {
return Config{}, err
}
cfg.ExporterSources = sources
interval := strings.TrimSpace(os.Getenv("COLLECT_INTERVAL"))
if interval == "" {
@ -59,6 +87,54 @@ func Load() (Config, error) {
return cfg, nil
}
func loadExporterSources(legacyBaseURL, rawJSON string) ([]ExporterSource, error) {
if rawJSON == "" {
if legacyBaseURL == "" {
return nil, fmt.Errorf("EXPORTER_SOURCES_JSON or EXPORTER_BASE_URL is required")
}
return []ExporterSource{{
SourceID: "default",
BaseURL: strings.TrimRight(strings.TrimSpace(legacyBaseURL), "/"),
Enabled: true,
TimeoutSeconds: 15,
}}, nil
}
var rawSources []rawExporterSource
if err := json.Unmarshal([]byte(rawJSON), &rawSources); err != nil {
return nil, fmt.Errorf("parse EXPORTER_SOURCES_JSON: %w", err)
}
if len(rawSources) == 0 {
return nil, fmt.Errorf("EXPORTER_SOURCES_JSON must define at least one source")
}
sources := make([]ExporterSource, 0, len(rawSources))
for _, raw := range rawSources {
source := ExporterSource{
SourceID: strings.TrimSpace(raw.SourceID),
BaseURL: strings.TrimRight(strings.TrimSpace(raw.BaseURL), "/"),
ExpectedNodeID: strings.TrimSpace(raw.ExpectedNodeID),
ExpectedEnv: strings.TrimSpace(raw.ExpectedEnv),
Enabled: true,
TimeoutSeconds: raw.TimeoutSeconds,
}
if raw.Enabled != nil {
source.Enabled = *raw.Enabled
}
if source.SourceID == "" {
return nil, fmt.Errorf("EXPORTER_SOURCES_JSON source_id is required")
}
if source.BaseURL == "" {
return nil, fmt.Errorf("EXPORTER_SOURCES_JSON base_url is required for source %s", source.SourceID)
}
if source.TimeoutSeconds <= 0 {
source.TimeoutSeconds = 15
}
sources = append(sources, source)
}
return sources, nil
}
func parseFloatEnv(key string, fallback float64) float64 {
raw := strings.TrimSpace(os.Getenv(key))
if raw == "" {

View File

@ -0,0 +1,32 @@
package config
import "testing"
func TestLoadExporterSourcesFromJSON(t *testing.T) {
sources, err := loadExporterSources("", `[{"source_id":"jp","base_url":"https://jp-xhttp-contabo.svc.plus","expected_node_id":"jp-xhttp-contabo.svc.plus","expected_env":"prod","enabled":true,"timeout_seconds":20}]`)
if err != nil {
t.Fatalf("load sources: %v", err)
}
if len(sources) != 1 {
t.Fatalf("expected 1 source, got %d", len(sources))
}
if sources[0].SourceID != "jp" || sources[0].BaseURL != "https://jp-xhttp-contabo.svc.plus" {
t.Fatalf("unexpected source %#v", sources[0])
}
if sources[0].TimeoutSeconds != 20 {
t.Fatalf("expected timeout 20, got %d", sources[0].TimeoutSeconds)
}
}
func TestLoadExporterSourcesFallsBackToLegacyBaseURL(t *testing.T) {
sources, err := loadExporterSources("http://127.0.0.1:8080", "")
if err != nil {
t.Fatalf("load legacy source: %v", err)
}
if len(sources) != 1 {
t.Fatalf("expected 1 source, got %d", len(sources))
}
if sources[0].SourceID != "default" || sources[0].BaseURL != "http://127.0.0.1:8080" {
t.Fatalf("unexpected source %#v", sources[0])
}
}

View File

@ -6,49 +6,65 @@ import (
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"billing-service/internal/config"
"billing-service/internal/model"
)
type Client struct {
baseURL string
httpClient *http.Client
serviceToken string
}
func NewClient(baseURL string) *Client {
func NewClient(serviceToken string) *Client {
return &Client{
baseURL: strings.TrimRight(strings.TrimSpace(baseURL), "/"),
httpClient: &http.Client{Timeout: 15 * time.Second},
serviceToken: strings.TrimSpace(serviceToken),
}
}
func (c *Client) FetchLatestSnapshot(ctx context.Context) (model.Snapshot, error) {
endpoint, err := url.JoinPath(c.baseURL, "/v1/snapshots/latest")
func (c *Client) FetchWindow(ctx context.Context, source config.ExporterSource, since, until time.Time, limit int, cursor *time.Time) (model.SnapshotWindowPage, error) {
timeout := time.Duration(source.TimeoutSeconds) * time.Second
if timeout <= 0 {
timeout = 15 * time.Second
}
client := &http.Client{Timeout: timeout}
endpoint, err := url.JoinPath(strings.TrimRight(strings.TrimSpace(source.BaseURL), "/"), "/v1/snapshots/window")
if err != nil {
return model.Snapshot{}, fmt.Errorf("build snapshot endpoint: %w", err)
return model.SnapshotWindowPage{}, fmt.Errorf("build snapshots window endpoint: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
if err != nil {
return model.Snapshot{}, fmt.Errorf("build snapshot request: %w", err)
return model.SnapshotWindowPage{}, fmt.Errorf("build snapshots window request: %w", err)
}
req.Header.Set("Accept", "application/json")
resp, err := c.httpClient.Do(req)
query := req.URL.Query()
query.Set("since", since.UTC().Format(time.RFC3339))
query.Set("until", until.UTC().Format(time.RFC3339))
query.Set("limit", strconv.Itoa(limit))
if cursor != nil {
query.Set("cursor", cursor.UTC().Format(time.RFC3339))
}
req.URL.RawQuery = query.Encode()
req.Header.Set("Accept", "application/json")
req.Header.Set("Authorization", "Bearer "+c.serviceToken)
resp, err := client.Do(req)
if err != nil {
return model.Snapshot{}, fmt.Errorf("fetch snapshot: %w", err)
return model.SnapshotWindowPage{}, fmt.Errorf("fetch snapshots window for %s: %w", source.SourceID, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return model.Snapshot{}, fmt.Errorf("fetch snapshot: unexpected status %s", resp.Status)
return model.SnapshotWindowPage{}, fmt.Errorf("fetch snapshots window for %s: unexpected status %s", source.SourceID, resp.Status)
}
var snapshot model.Snapshot
if err := json.NewDecoder(resp.Body).Decode(&snapshot); err != nil {
return model.Snapshot{}, fmt.Errorf("decode snapshot: %w", err)
var page model.SnapshotWindowPage
if err := json.NewDecoder(resp.Body).Decode(&page); err != nil {
return model.SnapshotWindowPage{}, fmt.Errorf("decode snapshots window for %s: %w", source.SourceID, err)
}
return snapshot, nil
return page, nil
}

View File

@ -17,6 +17,14 @@ type Snapshot struct {
Samples []Sample `json:"samples"`
}
type SnapshotWindowPage struct {
NodeID string `json:"node_id"`
Env string `json:"env"`
Snapshots []Snapshot `json:"snapshots"`
HasMore bool `json:"has_more"`
NextCursor string `json:"next_cursor,omitempty"`
}
type Checkpoint struct {
NodeID string
AccountUUID string
@ -76,13 +84,30 @@ type BillingProfile struct {
PricingRuleVersion string
}
type JobResult struct {
Job string `json:"job"`
StartedAt time.Time `json:"started_at"`
FinishedAt time.Time `json:"finished_at"`
ProcessedSamples int `json:"processed_samples"`
WrittenMinutes int `json:"written_minutes"`
ReplayedMinutes int `json:"replayed_minutes"`
Status string `json:"status"`
Error string `json:"error,omitempty"`
type SourceSyncState struct {
SourceID string
LastCompletedUntil *time.Time
LastAttemptedAt *time.Time
LastSucceededAt *time.Time
LastError string
}
type SourceStatus struct {
SourceID string `json:"source_id"`
LastCompletedUntil *time.Time `json:"last_completed_until,omitempty"`
LastAttemptedAt *time.Time `json:"last_attempted_at,omitempty"`
LastSucceededAt *time.Time `json:"last_succeeded_at,omitempty"`
LastError string `json:"last_error,omitempty"`
}
type JobResult struct {
Job string `json:"job"`
StartedAt time.Time `json:"started_at"`
FinishedAt time.Time `json:"finished_at"`
ProcessedSamples int `json:"processed_samples"`
WrittenMinutes int `json:"written_minutes"`
ReplayedMinutes int `json:"replayed_minutes"`
Status string `json:"status"`
Error string `json:"error,omitempty"`
SourceStatuses []SourceStatus `json:"source_statuses,omitempty"`
}

View File

@ -250,6 +250,81 @@ func (p *Postgres) GetBillingProfile(ctx context.Context, accountUUID string) (*
return &profile, nil
}
func (p *Postgres) GetSourceSyncState(ctx context.Context, sourceID string) (*model.SourceSyncState, error) {
const query = `
SELECT source_id, last_completed_until, last_attempted_at, last_succeeded_at, last_error
FROM billing_source_sync_state
WHERE source_id = $1`
var state model.SourceSyncState
var lastCompleted sql.NullTime
var lastAttempted sql.NullTime
var lastSucceeded sql.NullTime
err := p.db.QueryRowContext(ctx, query, sourceID).Scan(
&state.SourceID,
&lastCompleted,
&lastAttempted,
&lastSucceeded,
&state.LastError,
)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
if lastCompleted.Valid {
value := lastCompleted.Time
state.LastCompletedUntil = &value
}
if lastAttempted.Valid {
value := lastAttempted.Time
state.LastAttemptedAt = &value
}
if lastSucceeded.Valid {
value := lastSucceeded.Time
state.LastSucceededAt = &value
}
return &state, nil
}
func (p *Postgres) UpsertSourceSyncState(ctx context.Context, state model.SourceSyncState) error {
const query = `
INSERT INTO billing_source_sync_state (
source_id, last_completed_until, last_attempted_at, last_succeeded_at, last_error
) VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (source_id) DO UPDATE SET
last_completed_until = EXCLUDED.last_completed_until,
last_attempted_at = EXCLUDED.last_attempted_at,
last_succeeded_at = EXCLUDED.last_succeeded_at,
last_error = EXCLUDED.last_error,
updated_at = now()
`
var lastCompleted any
if state.LastCompletedUntil != nil {
lastCompleted = state.LastCompletedUntil.UTC()
}
var lastAttempted any
if state.LastAttemptedAt != nil {
lastAttempted = state.LastAttemptedAt.UTC()
}
var lastSucceeded any
if state.LastSucceededAt != nil {
lastSucceeded = state.LastSucceededAt.UTC()
}
_, err := p.db.ExecContext(ctx, query,
state.SourceID,
lastCompleted,
lastAttempted,
lastSucceeded,
state.LastError,
)
return err
}
var _ Repository = (*Postgres)(nil)
func ensureUTC(ts time.Time) time.Time {

View File

@ -14,4 +14,6 @@ type Repository interface {
GetQuotaState(ctx context.Context, accountUUID string) (*model.QuotaState, error)
UpsertQuotaState(ctx context.Context, state model.QuotaState) error
GetBillingProfile(ctx context.Context, accountUUID string) (*model.BillingProfile, error)
GetSourceSyncState(ctx context.Context, sourceID string) (*model.SourceSyncState, error)
UpsertSourceSyncState(ctx context.Context, state model.SourceSyncState) error
}

View File

@ -43,6 +43,7 @@ func TestPostgresAcceptanceWritesAccountingTables(t *testing.T) {
accountUUID := "11111111-1111-1111-1111-111111111111"
if _, err := db.ExecContext(ctx, `
DELETE FROM billing_source_sync_state;
DELETE FROM billing_ledger;
DELETE FROM traffic_minute_buckets;
DELETE FROM traffic_stat_checkpoints;
@ -59,22 +60,33 @@ func TestPostgresAcceptanceWritesAccountingTables(t *testing.T) {
}
svc := New(config.Config{
ExporterSources: []config.ExporterSource{{
SourceID: "default",
BaseURL: "https://jp-xhttp-contabo.svc.plus",
ExpectedNodeID: "jp-node",
ExpectedEnv: "prod",
Enabled: true,
TimeoutSeconds: 15,
}},
InternalServiceToken: "secret",
DefaultRegion: "",
SourceRevision: "billing-service-acceptance",
PricePerByte: 0.5,
InitialIncludedQuotaBytes: 1000,
InitialBalance: 0,
}, fakeSource{snapshot: model.Snapshot{
CollectedAt: time.Date(2026, 4, 8, 11, 0, 45, 0, time.UTC),
NodeID: "jp-node",
Env: "prod",
Samples: []model.Sample{{
UUID: accountUUID,
Email: "billing@example.com",
InboundTag: "premium",
UplinkBytesTotal: 100,
DownlinkBytesTotal: 50,
}},
}, &fakeWindowSource{pagesBySource: map[string][]model.SnapshotWindowPage{
"default": {singleSnapshotPage(model.Snapshot{
CollectedAt: time.Date(2026, 4, 8, 11, 0, 45, 0, time.UTC),
NodeID: "jp-node",
Env: "prod",
Samples: []model.Sample{{
UUID: accountUUID,
Email: "billing@example.com",
InboundTag: "premium",
UplinkBytesTotal: 100,
DownlinkBytesTotal: 50,
}},
})},
}}, repository.NewPostgres(db))
result, err := svc.RunCollectAndRate(ctx, "collect-and-rate")
@ -89,6 +101,7 @@ func TestPostgresAcceptanceWritesAccountingTables(t *testing.T) {
assertRowCount(t, db, "traffic_minute_buckets", 1)
assertRowCount(t, db, "billing_ledger", 1)
assertRowCount(t, db, "account_quota_states", 1)
assertRowCount(t, db, "billing_source_sync_state", 1)
var totalBytes int64
if err := db.QueryRowContext(ctx, `SELECT total_bytes FROM traffic_minute_buckets LIMIT 1`).Scan(&totalBytes); err != nil {

View File

@ -15,13 +15,18 @@ import (
"github.com/google/uuid"
)
type snapshotSource interface {
FetchLatestSnapshot(ctx context.Context) (model.Snapshot, error)
const (
sourceWindowOverlap = 2 * time.Minute
sourceWindowPageSize = 120
)
type windowSource interface {
FetchWindow(ctx context.Context, source config.ExporterSource, since, until time.Time, limit int, cursor *time.Time) (model.SnapshotWindowPage, error)
}
type Service struct {
cfg config.Config
source snapshotSource
source windowSource
repo repository.Repository
mu sync.Mutex
@ -30,7 +35,7 @@ type Service struct {
lastError string
}
func New(cfg config.Config, source snapshotSource, repo repository.Repository) *Service {
func New(cfg config.Config, source windowSource, repo repository.Repository) *Service {
return &Service{
cfg: cfg,
source: source,
@ -65,30 +70,31 @@ func (s *Service) RunCollectAndRate(ctx context.Context, job string) (model.JobR
Status: "ok",
}
snapshot, err := s.source.FetchLatestSnapshot(ctx)
if err != nil {
result.Status = "error"
result.Error = err.Error()
result.FinishedAt = time.Now().UTC()
s.record(result)
return result, err
enabledSources := 0
fatalSourceFailures := 0
for _, source := range s.cfg.ExporterSources {
if !source.Enabled {
continue
}
enabledSources++
status, err := s.collectSource(ctx, source, &result)
result.SourceStatuses = append(result.SourceStatuses, status)
if err != nil {
fatalSourceFailures++
result.Error = joinError(result.Error, err.Error())
}
}
for _, sample := range snapshot.Samples {
if err := validateSample(sample); err != nil {
if enabledSources == 0 {
result.Status = "error"
result.Error = joinError(result.Error, "no enabled exporter sources configured")
}
if fatalSourceFailures > 0 {
if result.ProcessedSamples == 0 && result.WrittenMinutes == 0 && result.ReplayedMinutes == 0 && result.Status != "partial" {
result.Status = "error"
} else if result.Status == "ok" {
result.Status = "partial"
result.Error = joinError(result.Error, err.Error())
continue
}
processed, err := s.processSample(ctx, snapshot, sample, &result)
if err != nil {
result.Status = "partial"
result.Error = joinError(result.Error, err.Error())
continue
}
if processed {
result.ProcessedSamples++
}
}
@ -112,6 +118,110 @@ func (s *Service) Health() (bool, string) {
return s.lastOK, s.lastError
}
func (s *Service) collectSource(ctx context.Context, source config.ExporterSource, result *model.JobResult) (model.SourceStatus, error) {
state, err := s.repo.GetSourceSyncState(ctx, source.SourceID)
if err != nil {
return model.SourceStatus{SourceID: source.SourceID, LastError: err.Error()}, fmt.Errorf("load source sync state %s: %w", source.SourceID, err)
}
if state == nil {
state = &model.SourceSyncState{SourceID: source.SourceID}
}
attemptedAt := time.Now().UTC()
state.LastAttemptedAt = &attemptedAt
state.LastError = ""
if err := s.repo.UpsertSourceSyncState(ctx, *state); err != nil {
return sourceStatusFromState(*state), fmt.Errorf("record source attempt %s: %w", source.SourceID, err)
}
until := time.Now().UTC().Truncate(time.Minute).Add(-time.Minute)
since := until.Add(-sourceWindowOverlap)
if state.LastCompletedUntil != nil {
since = state.LastCompletedUntil.UTC().Add(-sourceWindowOverlap)
}
if since.After(until) {
completedUntil := until
state.LastCompletedUntil = &completedUntil
succeededAt := time.Now().UTC()
state.LastSucceededAt = &succeededAt
state.LastError = ""
if err := s.repo.UpsertSourceSyncState(ctx, *state); err != nil {
return sourceStatusFromState(*state), fmt.Errorf("record source noop completion %s: %w", source.SourceID, err)
}
return sourceStatusFromState(*state), nil
}
var cursor *time.Time
var lastProcessed *time.Time
for {
page, err := s.source.FetchWindow(ctx, source, since, until, sourceWindowPageSize, cursor)
if err != nil {
return s.recordSourceFailure(ctx, *state, fmt.Errorf("fetch window for %s: %w", source.SourceID, err))
}
for _, snapshot := range page.Snapshots {
if err := validateSnapshotSource(snapshot, source); err != nil {
return s.recordSourceFailure(ctx, *state, err)
}
processed, err := s.processSnapshot(ctx, snapshot, result)
if err != nil {
return s.recordSourceFailure(ctx, *state, err)
}
if processed {
collectedAt := snapshot.CollectedAt.UTC()
lastProcessed = &collectedAt
}
}
if !page.HasMore {
break
}
if strings.TrimSpace(page.NextCursor) == "" {
return s.recordSourceFailure(ctx, *state, fmt.Errorf("fetch window for %s: next_cursor missing while has_more=true", source.SourceID))
}
nextCursor, err := time.Parse(time.RFC3339, strings.TrimSpace(page.NextCursor))
if err != nil {
return s.recordSourceFailure(ctx, *state, fmt.Errorf("parse next cursor for %s: %w", source.SourceID, err))
}
cursor = &nextCursor
}
completedUntil := until
if lastProcessed != nil && lastProcessed.Before(completedUntil) {
completedUntil = lastProcessed.UTC()
}
succeededAt := time.Now().UTC()
state.LastCompletedUntil = &completedUntil
state.LastSucceededAt = &succeededAt
state.LastError = ""
if err := s.repo.UpsertSourceSyncState(ctx, *state); err != nil {
return sourceStatusFromState(*state), fmt.Errorf("record source completion %s: %w", source.SourceID, err)
}
return sourceStatusFromState(*state), nil
}
func (s *Service) processSnapshot(ctx context.Context, snapshot model.Snapshot, result *model.JobResult) (bool, error) {
processedAny := false
for _, sample := range snapshot.Samples {
if err := validateSample(sample); err != nil {
result.Status = "partial"
result.Error = joinError(result.Error, err.Error())
continue
}
processed, err := s.processSample(ctx, snapshot, sample, result)
if err != nil {
return processedAny, fmt.Errorf("process snapshot %s for %s: %w", snapshot.CollectedAt.UTC().Format(time.RFC3339), sample.UUID, err)
}
if processed {
processedAny = true
result.ProcessedSamples++
}
}
return processedAny, nil
}
func (s *Service) processSample(ctx context.Context, snapshot model.Snapshot, sample model.Sample, result *model.JobResult) (bool, error) {
storageNodeID := composeStorageNodeID(snapshot.Env, snapshot.NodeID)
minuteStart := snapshot.CollectedAt.UTC().Truncate(time.Minute)
@ -178,15 +288,12 @@ func (s *Service) processSample(ctx context.Context, snapshot model.Snapshot, sa
result.WrittenMinutes++
}
amountDelta := -float64(totalBytes) * s.cfg.PricePerByte
entry := model.LedgerEntry{
ID: deterministicLedgerID(bucket),
AccountUUID: sample.UUID,
BucketStart: minuteStart,
BucketEnd: minuteStart.Add(time.Minute),
EntryType: "traffic_charge",
RatedBytes: totalBytes,
AmountDelta: amountDelta,
PricingRuleVersion: s.cfg.SourceRevision,
}
@ -207,7 +314,7 @@ func (s *Service) processSample(ctx context.Context, snapshot model.Snapshot, sa
includedApplied := minInt64(quota.RemainingIncludedQuota, totalBytes)
chargeableBytes := totalBytes - includedApplied
amountDelta = -float64(chargeableBytes) * effectivePricing.basePricePerByte * effectivePricing.multiplier
amountDelta := -float64(chargeableBytes) * effectivePricing.basePricePerByte * effectivePricing.multiplier
entry.RatedBytes = chargeableBytes
entry.AmountDelta = amountDelta
entry.PricingRuleVersion = effectivePricing.pricingRuleVersion
@ -314,6 +421,16 @@ func validateSample(sample model.Sample) error {
return nil
}
func validateSnapshotSource(snapshot model.Snapshot, source config.ExporterSource) error {
if source.ExpectedNodeID != "" && strings.TrimSpace(snapshot.NodeID) != source.ExpectedNodeID {
return fmt.Errorf("source %s expected node_id %q, got %q", source.SourceID, source.ExpectedNodeID, strings.TrimSpace(snapshot.NodeID))
}
if source.ExpectedEnv != "" && strings.TrimSpace(snapshot.Env) != source.ExpectedEnv {
return fmt.Errorf("source %s expected env %q, got %q", source.SourceID, source.ExpectedEnv, strings.TrimSpace(snapshot.Env))
}
return nil
}
func deterministicLedgerID(bucket model.MinuteBucket) string {
key := fmt.Sprintf("%s|%s|%s|%s|%s", bucket.BucketStart.UTC().Format(time.RFC3339), bucket.NodeID, bucket.AccountUUID, bucket.Region, bucket.LineCode)
return uuid.NewSHA1(uuid.NameSpaceOID, []byte(key)).String()
@ -335,6 +452,34 @@ func joinError(existing, next string) string {
return existing + "; " + next
}
func sourceStatusFromState(state model.SourceSyncState) model.SourceStatus {
return model.SourceStatus{
SourceID: state.SourceID,
LastCompletedUntil: copyTimePtr(state.LastCompletedUntil),
LastAttemptedAt: copyTimePtr(state.LastAttemptedAt),
LastSucceededAt: copyTimePtr(state.LastSucceededAt),
LastError: state.LastError,
}
}
func copyTimePtr(value *time.Time) *time.Time {
if value == nil {
return nil
}
cloned := value.UTC()
return &cloned
}
func (s *Service) recordSourceFailure(ctx context.Context, state model.SourceSyncState, err error) (model.SourceStatus, error) {
message := err.Error()
state.LastError = message
if persistErr := s.repo.UpsertSourceSyncState(ctx, state); persistErr != nil {
message = joinError(message, fmt.Sprintf("persist source error state: %v", persistErr))
state.LastError = message
}
return sourceStatusFromState(state), err
}
func (s *Service) record(result model.JobResult) {
s.lastResult = result
s.lastError = result.Error

View File

@ -10,13 +10,38 @@ import (
"billing-service/internal/repository"
)
type fakeSource struct {
snapshot model.Snapshot
err error
type fakeWindowSource struct {
pagesBySource map[string][]model.SnapshotWindowPage
errBySource map[string]error
requests []windowRequest
}
func (f fakeSource) FetchLatestSnapshot(context.Context) (model.Snapshot, error) {
return f.snapshot, f.err
type windowRequest struct {
sourceID string
since time.Time
until time.Time
limit int
cursor *time.Time
}
func (f *fakeWindowSource) FetchWindow(_ context.Context, source config.ExporterSource, since, until time.Time, limit int, cursor *time.Time) (model.SnapshotWindowPage, error) {
f.requests = append(f.requests, windowRequest{
sourceID: source.SourceID,
since: since,
until: until,
limit: limit,
cursor: cursor,
})
if err := f.errBySource[source.SourceID]; err != nil {
return model.SnapshotWindowPage{}, err
}
pages := f.pagesBySource[source.SourceID]
if len(pages) == 0 {
return model.SnapshotWindowPage{}, nil
}
page := pages[0]
f.pagesBySource[source.SourceID] = pages[1:]
return page, nil
}
type memoryRepo struct {
@ -25,6 +50,7 @@ type memoryRepo struct {
ledgers map[string]model.LedgerEntry
quotas map[string]model.QuotaState
profiles map[string]model.BillingProfile
sourceSync map[string]model.SourceSyncState
}
func newMemoryRepo() *memoryRepo {
@ -34,6 +60,7 @@ func newMemoryRepo() *memoryRepo {
ledgers: map[string]model.LedgerEntry{},
quotas: map[string]model.QuotaState{},
profiles: map[string]model.BillingProfile{},
sourceSync: map[string]model.SourceSyncState{},
}
}
@ -45,7 +72,7 @@ func bucketKey(bucket model.MinuteBucket) string {
return bucket.BucketStart.UTC().Format(time.RFC3339) + "\x00" + bucket.NodeID + "\x00" + bucket.AccountUUID + "\x00" + bucket.Region + "\x00" + bucket.LineCode
}
func (m *memoryRepo) GetCheckpoint(ctx context.Context, nodeID, accountUUID string) (*model.Checkpoint, error) {
func (m *memoryRepo) GetCheckpoint(_ context.Context, nodeID, accountUUID string) (*model.Checkpoint, error) {
if checkpoint, ok := m.checkpoints[checkpointKey(nodeID, accountUUID)]; ok {
copy := checkpoint
return &copy, nil
@ -53,25 +80,25 @@ func (m *memoryRepo) GetCheckpoint(ctx context.Context, nodeID, accountUUID stri
return nil, nil
}
func (m *memoryRepo) UpsertCheckpoint(ctx context.Context, checkpoint model.Checkpoint) error {
func (m *memoryRepo) UpsertCheckpoint(_ context.Context, checkpoint model.Checkpoint) error {
m.checkpoints[checkpointKey(checkpoint.NodeID, checkpoint.AccountUUID)] = checkpoint
return nil
}
func (m *memoryRepo) UpsertMinuteBucket(ctx context.Context, bucket model.MinuteBucket) (bool, error) {
func (m *memoryRepo) UpsertMinuteBucket(_ context.Context, bucket model.MinuteBucket) (bool, error) {
key := bucketKey(bucket)
_, existed := m.buckets[key]
m.buckets[key] = bucket
return existed, nil
}
func (m *memoryRepo) UpsertLedger(ctx context.Context, entry model.LedgerEntry) (bool, error) {
func (m *memoryRepo) UpsertLedger(_ context.Context, entry model.LedgerEntry) (bool, error) {
_, existed := m.ledgers[entry.ID]
m.ledgers[entry.ID] = entry
return existed, nil
}
func (m *memoryRepo) GetQuotaState(ctx context.Context, accountUUID string) (*model.QuotaState, error) {
func (m *memoryRepo) GetQuotaState(_ context.Context, accountUUID string) (*model.QuotaState, error) {
if quota, ok := m.quotas[accountUUID]; ok {
copy := quota
return &copy, nil
@ -79,12 +106,12 @@ func (m *memoryRepo) GetQuotaState(ctx context.Context, accountUUID string) (*mo
return nil, nil
}
func (m *memoryRepo) UpsertQuotaState(ctx context.Context, state model.QuotaState) error {
func (m *memoryRepo) UpsertQuotaState(_ context.Context, state model.QuotaState) error {
m.quotas[state.AccountUUID] = state
return nil
}
func (m *memoryRepo) GetBillingProfile(ctx context.Context, accountUUID string) (*model.BillingProfile, error) {
func (m *memoryRepo) GetBillingProfile(_ context.Context, accountUUID string) (*model.BillingProfile, error) {
if profile, ok := m.profiles[accountUUID]; ok {
copy := profile
return &copy, nil
@ -92,10 +119,40 @@ func (m *memoryRepo) GetBillingProfile(ctx context.Context, accountUUID string)
return nil, nil
}
func (m *memoryRepo) GetSourceSyncState(_ context.Context, sourceID string) (*model.SourceSyncState, error) {
if state, ok := m.sourceSync[sourceID]; ok {
copy := cloneSyncState(state)
return &copy, nil
}
return nil, nil
}
func (m *memoryRepo) UpsertSourceSyncState(_ context.Context, state model.SourceSyncState) error {
m.sourceSync[state.SourceID] = cloneSyncState(state)
return nil
}
func cloneSyncState(state model.SourceSyncState) model.SourceSyncState {
copy := state
copy.LastCompletedUntil = copyTimePtr(state.LastCompletedUntil)
copy.LastAttemptedAt = copyTimePtr(state.LastAttemptedAt)
copy.LastSucceededAt = copyTimePtr(state.LastSucceededAt)
return copy
}
var _ repository.Repository = (*memoryRepo)(nil)
func baseConfig() config.Config {
return config.Config{
ExporterSources: []config.ExporterSource{{
SourceID: "default",
BaseURL: "https://jp-xhttp-contabo.svc.plus",
ExpectedNodeID: "jp-node",
ExpectedEnv: "prod",
Enabled: true,
TimeoutSeconds: 15,
}},
InternalServiceToken: "secret",
DefaultRegion: "",
SourceRevision: "billing-service-v1",
PricePerByte: 0.5,
@ -104,20 +161,33 @@ func baseConfig() config.Config {
}
}
func singleSnapshotPage(snapshot model.Snapshot) model.SnapshotWindowPage {
return model.SnapshotWindowPage{
NodeID: snapshot.NodeID,
Env: snapshot.Env,
Snapshots: []model.Snapshot{snapshot},
}
}
func TestDeltaCalculationAndQuotaUpdate(t *testing.T) {
repo := newMemoryRepo()
svc := New(baseConfig(), fakeSource{snapshot: model.Snapshot{
CollectedAt: time.Date(2026, 4, 8, 10, 30, 15, 0, time.UTC),
NodeID: "jp-node",
Env: "prod",
Samples: []model.Sample{{
UUID: "11111111-1111-1111-1111-111111111111",
Email: "user@example.com",
InboundTag: "premium",
UplinkBytesTotal: 100,
DownlinkBytesTotal: 50,
}},
}}, repo)
source := &fakeWindowSource{
pagesBySource: map[string][]model.SnapshotWindowPage{
"default": {singleSnapshotPage(model.Snapshot{
CollectedAt: time.Date(2026, 4, 8, 10, 30, 15, 0, time.UTC),
NodeID: "jp-node",
Env: "prod",
Samples: []model.Sample{{
UUID: "11111111-1111-1111-1111-111111111111",
Email: "user@example.com",
InboundTag: "premium",
UplinkBytesTotal: 100,
DownlinkBytesTotal: 50,
}},
})},
},
}
svc := New(baseConfig(), source, repo)
result, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate")
if err != nil {
@ -144,17 +214,22 @@ func TestIncludedQuotaAndMultipliersFromBillingProfile(t *testing.T) {
LineMultiplier: 2.0,
PricingRuleVersion: "pricing-v2",
}
svc := New(baseConfig(), fakeSource{snapshot: model.Snapshot{
CollectedAt: time.Date(2026, 4, 8, 10, 30, 15, 0, time.UTC),
NodeID: "jp-node",
Env: "prod",
Samples: []model.Sample{{
UUID: accountUUID,
InboundTag: "premium",
UplinkBytesTotal: 100,
DownlinkBytesTotal: 50,
}},
}}, repo)
source := &fakeWindowSource{
pagesBySource: map[string][]model.SnapshotWindowPage{
"default": {singleSnapshotPage(model.Snapshot{
CollectedAt: time.Date(2026, 4, 8, 10, 30, 15, 0, time.UTC),
NodeID: "jp-node",
Env: "prod",
Samples: []model.Sample{{
UUID: accountUUID,
InboundTag: "premium",
UplinkBytesTotal: 100,
DownlinkBytesTotal: 50,
}},
})},
},
}
svc := New(baseConfig(), source, repo)
result, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate")
if err != nil {
@ -197,19 +272,37 @@ func TestIncludedQuotaAndMultipliersFromBillingProfile(t *testing.T) {
func TestDuplicateMinuteIsReplaySafe(t *testing.T) {
repo := newMemoryRepo()
snapshot := model.Snapshot{
CollectedAt: time.Date(2026, 4, 8, 10, 30, 30, 0, time.UTC),
NodeID: "jp-node",
Env: "prod",
Samples: []model.Sample{{
UUID: "11111111-1111-1111-1111-111111111111",
Email: "user@example.com",
InboundTag: "premium",
UplinkBytesTotal: 100,
DownlinkBytesTotal: 50,
}},
source := &fakeWindowSource{
pagesBySource: map[string][]model.SnapshotWindowPage{
"default": {
singleSnapshotPage(model.Snapshot{
CollectedAt: time.Date(2026, 4, 8, 10, 30, 30, 0, time.UTC),
NodeID: "jp-node",
Env: "prod",
Samples: []model.Sample{{
UUID: "11111111-1111-1111-1111-111111111111",
Email: "user@example.com",
InboundTag: "premium",
UplinkBytesTotal: 100,
DownlinkBytesTotal: 50,
}},
}),
singleSnapshotPage(model.Snapshot{
CollectedAt: time.Date(2026, 4, 8, 10, 30, 30, 0, time.UTC),
NodeID: "jp-node",
Env: "prod",
Samples: []model.Sample{{
UUID: "11111111-1111-1111-1111-111111111111",
Email: "user@example.com",
InboundTag: "premium",
UplinkBytesTotal: 100,
DownlinkBytesTotal: 50,
}},
}),
},
},
}
svc := New(baseConfig(), fakeSource{snapshot: snapshot}, repo)
svc := New(baseConfig(), source, repo)
if _, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate"); err != nil {
t.Fatalf("first run: %v", err)
@ -240,17 +333,22 @@ func TestNegativeDeltaProtection(t *testing.T) {
XrayRevision: "prev",
ResetEpoch: 0,
}
svc := New(cfg, fakeSource{snapshot: model.Snapshot{
CollectedAt: time.Date(2026, 4, 8, 10, 31, 0, 0, time.UTC),
NodeID: "jp-node",
Env: "prod",
Samples: []model.Sample{{
UUID: accountUUID,
InboundTag: "premium",
UplinkBytesTotal: 10,
DownlinkBytesTotal: 20,
}},
}}, repo)
source := &fakeWindowSource{
pagesBySource: map[string][]model.SnapshotWindowPage{
"default": {singleSnapshotPage(model.Snapshot{
CollectedAt: time.Date(2026, 4, 8, 10, 31, 0, 0, time.UTC),
NodeID: "jp-node",
Env: "prod",
Samples: []model.Sample{{
UUID: accountUUID,
InboundTag: "premium",
UplinkBytesTotal: 10,
DownlinkBytesTotal: 20,
}},
})},
},
}
svc := New(cfg, source, repo)
result, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate")
if err != nil {
@ -278,17 +376,22 @@ func TestRestartRecoveryFromCheckpoint(t *testing.T) {
LastDownlinkTotal: 100,
LastSeenAt: time.Now().UTC(),
}
svc := New(baseConfig(), fakeSource{snapshot: model.Snapshot{
CollectedAt: time.Date(2026, 4, 8, 10, 32, 0, 0, time.UTC),
NodeID: "jp-node",
Env: "prod",
Samples: []model.Sample{{
UUID: accountUUID,
InboundTag: "premium",
UplinkBytesTotal: 130,
DownlinkBytesTotal: 140,
}},
}}, repo)
source := &fakeWindowSource{
pagesBySource: map[string][]model.SnapshotWindowPage{
"default": {singleSnapshotPage(model.Snapshot{
CollectedAt: time.Date(2026, 4, 8, 10, 32, 0, 0, time.UTC),
NodeID: "jp-node",
Env: "prod",
Samples: []model.Sample{{
UUID: accountUUID,
InboundTag: "premium",
UplinkBytesTotal: 130,
DownlinkBytesTotal: 140,
}},
})},
},
}
svc := New(baseConfig(), source, repo)
result, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate")
if err != nil {
@ -313,60 +416,103 @@ func TestMultiEnvIsolation(t *testing.T) {
repo := newMemoryRepo()
accountUUID := "11111111-1111-1111-1111-111111111111"
cfg := baseConfig()
prodSvc := New(cfg, fakeSource{snapshot: model.Snapshot{
CollectedAt: time.Date(2026, 4, 8, 10, 33, 0, 0, time.UTC),
NodeID: "jp-node",
Env: "prod",
Samples: []model.Sample{{UUID: accountUUID, InboundTag: "premium", UplinkBytesTotal: 10, DownlinkBytesTotal: 10}},
}}, repo)
previewSvc := New(cfg, fakeSource{snapshot: model.Snapshot{
CollectedAt: time.Date(2026, 4, 8, 10, 33, 0, 0, time.UTC),
NodeID: "jp-node",
Env: "preview",
Samples: []model.Sample{{UUID: accountUUID, InboundTag: "premium", UplinkBytesTotal: 10, DownlinkBytesTotal: 10}},
}}, repo)
if _, err := prodSvc.RunCollectAndRate(context.Background(), "collect-and-rate"); err != nil {
t.Fatalf("prod run: %v", err)
cfg.ExporterSources = []config.ExporterSource{
{
SourceID: "prod-source",
BaseURL: "https://prod.svc.plus",
ExpectedNodeID: "jp-node",
ExpectedEnv: "prod",
Enabled: true,
TimeoutSeconds: 15,
},
{
SourceID: "preview-source",
BaseURL: "https://preview.svc.plus",
ExpectedNodeID: "jp-node",
ExpectedEnv: "preview",
Enabled: true,
TimeoutSeconds: 15,
},
}
if _, err := previewSvc.RunCollectAndRate(context.Background(), "collect-and-rate"); err != nil {
t.Fatalf("preview run: %v", err)
source := &fakeWindowSource{
pagesBySource: map[string][]model.SnapshotWindowPage{
"prod-source": {singleSnapshotPage(model.Snapshot{
CollectedAt: time.Date(2026, 4, 8, 10, 33, 0, 0, time.UTC),
NodeID: "jp-node",
Env: "prod",
Samples: []model.Sample{{UUID: accountUUID, InboundTag: "premium", UplinkBytesTotal: 10, DownlinkBytesTotal: 10}},
})},
"preview-source": {singleSnapshotPage(model.Snapshot{
CollectedAt: time.Date(2026, 4, 8, 10, 33, 0, 0, time.UTC),
NodeID: "jp-node",
Env: "preview",
Samples: []model.Sample{{UUID: accountUUID, InboundTag: "premium", UplinkBytesTotal: 10, DownlinkBytesTotal: 10}},
})},
},
}
svc := New(cfg, source, repo)
if _, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate"); err != nil {
t.Fatalf("run: %v", err)
}
if len(repo.buckets) != 2 {
t.Fatalf("expected isolated buckets per env, got %d", len(repo.buckets))
}
}
func TestLateMinuteReconcileUsesSameMinuteKey(t *testing.T) {
func TestExpectedNodeIDMismatchIsFatalForSource(t *testing.T) {
repo := newMemoryRepo()
accountUUID := "11111111-1111-1111-1111-111111111111"
cfg := baseConfig()
collectedAt := time.Date(2026, 4, 8, 10, 34, 50, 0, time.UTC)
snapshot := model.Snapshot{
CollectedAt: collectedAt,
NodeID: "jp-node",
Env: "prod",
Samples: []model.Sample{{
UUID: accountUUID,
InboundTag: "premium",
UplinkBytesTotal: 20,
DownlinkBytesTotal: 20,
}},
source := &fakeWindowSource{
pagesBySource: map[string][]model.SnapshotWindowPage{
"default": {singleSnapshotPage(model.Snapshot{
CollectedAt: time.Date(2026, 4, 8, 10, 34, 0, 0, time.UTC),
NodeID: "unexpected-node",
Env: "prod",
Samples: []model.Sample{{UUID: "11111111-1111-1111-1111-111111111111", InboundTag: "premium", UplinkBytesTotal: 10, DownlinkBytesTotal: 10}},
})},
},
}
svc := New(cfg, fakeSource{snapshot: snapshot}, repo)
svc := New(baseConfig(), source, repo)
if _, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate"); err != nil {
t.Fatalf("first run: %v", err)
result, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate")
if err == nil {
t.Fatalf("expected source mismatch error")
}
result, err := svc.RunCollectAndRate(context.Background(), "reconcile")
if err != nil {
t.Fatalf("reconcile run: %v", err)
}
if result.ReplayedMinutes == 0 {
t.Fatalf("expected reconcile to report replayed minute, got %#v", result)
}
if len(repo.buckets) != 1 {
t.Fatalf("expected single logical minute bucket, got %d", len(repo.buckets))
if result.Status != "error" {
t.Fatalf("expected error status, got %#v", result)
}
}
func TestSourceStatusIncludesSyncState(t *testing.T) {
repo := newMemoryRepo()
source := &fakeWindowSource{
pagesBySource: map[string][]model.SnapshotWindowPage{
"default": {singleSnapshotPage(model.Snapshot{
CollectedAt: time.Date(2026, 4, 8, 10, 35, 0, 0, time.UTC),
NodeID: "jp-node",
Env: "prod",
Samples: []model.Sample{{
UUID: "11111111-1111-1111-1111-111111111111",
InboundTag: "premium",
UplinkBytesTotal: 10,
DownlinkBytesTotal: 10,
}},
})},
},
}
svc := New(baseConfig(), source, repo)
result, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate")
if err != nil {
t.Fatalf("run job: %v", err)
}
if len(result.SourceStatuses) != 1 {
t.Fatalf("expected one source status, got %#v", result.SourceStatuses)
}
if result.SourceStatuses[0].SourceID != "default" {
t.Fatalf("unexpected source status %#v", result.SourceStatuses[0])
}
if result.SourceStatuses[0].LastCompletedUntil == nil {
t.Fatalf("expected last completed until in source status")
}
}

View File

@ -0,0 +1,89 @@
-- billing-service bootstrap/reference DDL
--
-- This file mirrors the accounting tables that billing-service depends on in
-- the current accounts.svc.plus PostgreSQL schema. It is a service-owned
-- documentation/bootstrap artifact and does not redefine schema ownership.
CREATE TABLE IF NOT EXISTS public.traffic_stat_checkpoints (
node_id TEXT NOT NULL,
account_uuid UUID NOT NULL REFERENCES public.users(uuid) ON DELETE CASCADE,
last_uplink_total BIGINT NOT NULL DEFAULT 0,
last_downlink_total BIGINT NOT NULL DEFAULT 0,
last_seen_at TIMESTAMPTZ NOT NULL,
xray_revision TEXT NOT NULL DEFAULT '',
reset_epoch BIGINT NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (node_id, account_uuid)
);
CREATE TABLE IF NOT EXISTS public.traffic_minute_buckets (
bucket_start TIMESTAMPTZ NOT NULL,
node_id TEXT NOT NULL,
account_uuid UUID NOT NULL REFERENCES public.users(uuid) ON DELETE CASCADE,
region TEXT NOT NULL DEFAULT '',
line_code TEXT NOT NULL DEFAULT '',
uplink_bytes BIGINT NOT NULL DEFAULT 0,
downlink_bytes BIGINT NOT NULL DEFAULT 0,
total_bytes BIGINT NOT NULL DEFAULT 0,
multiplier DOUBLE PRECISION NOT NULL DEFAULT 1.0,
rating_status TEXT NOT NULL DEFAULT 'pending',
source_revision TEXT NOT NULL DEFAULT '',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (bucket_start, node_id, account_uuid, region, line_code)
);
CREATE TABLE IF NOT EXISTS public.billing_ledger (
id UUID PRIMARY KEY,
account_uuid UUID NOT NULL REFERENCES public.users(uuid) ON DELETE CASCADE,
bucket_start TIMESTAMPTZ NOT NULL,
bucket_end TIMESTAMPTZ NOT NULL,
entry_type TEXT NOT NULL,
rated_bytes BIGINT NOT NULL DEFAULT 0,
amount_delta DOUBLE PRECISION NOT NULL DEFAULT 0,
balance_after DOUBLE PRECISION NOT NULL DEFAULT 0,
pricing_rule_version TEXT NOT NULL DEFAULT '',
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS public.account_quota_states (
account_uuid UUID PRIMARY KEY REFERENCES public.users(uuid) ON DELETE CASCADE,
remaining_included_quota BIGINT NOT NULL DEFAULT 0,
current_balance DOUBLE PRECISION NOT NULL DEFAULT 0,
arrears BOOLEAN NOT NULL DEFAULT false,
throttle_state TEXT NOT NULL DEFAULT 'normal',
suspend_state TEXT NOT NULL DEFAULT 'active',
last_rated_bucket_at TIMESTAMPTZ NULL,
effective_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS public.account_billing_profiles (
account_uuid UUID PRIMARY KEY REFERENCES public.users(uuid) ON DELETE CASCADE,
package_name TEXT NOT NULL DEFAULT 'default',
included_quota_bytes BIGINT NOT NULL DEFAULT 0,
base_price_per_byte DOUBLE PRECISION NOT NULL DEFAULT 0,
region_multiplier DOUBLE PRECISION NOT NULL DEFAULT 1.0,
line_multiplier DOUBLE PRECISION NOT NULL DEFAULT 1.0,
peak_multiplier DOUBLE PRECISION NOT NULL DEFAULT 1.0,
offpeak_multiplier DOUBLE PRECISION NOT NULL DEFAULT 1.0,
pricing_rule_version TEXT NOT NULL DEFAULT 'pricing-default-v1',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS public.billing_source_sync_state (
source_id TEXT PRIMARY KEY,
last_completed_until TIMESTAMPTZ NULL,
last_attempted_at TIMESTAMPTZ NULL,
last_succeeded_at TIMESTAMPTZ NULL,
last_error TEXT NOT NULL DEFAULT '',
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_traffic_minute_buckets_account_bucket
ON public.traffic_minute_buckets (account_uuid, bucket_start DESC);
CREATE INDEX IF NOT EXISTS idx_billing_ledger_account_created
ON public.billing_ledger (account_uuid, created_at DESC);

View File

@ -84,3 +84,12 @@ CREATE TABLE IF NOT EXISTS public.account_billing_profiles (
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS public.billing_source_sync_state (
source_id TEXT PRIMARY KEY,
last_completed_until TIMESTAMPTZ NULL,
last_attempted_at TIMESTAMPTZ NULL,
last_succeeded_at TIMESTAMPTZ NULL,
last_error TEXT NOT NULL DEFAULT '',
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);