diff --git a/api/accounting.go b/api/accounting.go index d721b96..53f9e37 100644 --- a/api/accounting.go +++ b/api/accounting.go @@ -11,6 +11,8 @@ import ( "account/internal/store" ) +const accountingDataSource = "postgresql" + type nodeHeartbeatRequest struct { NodeID string `json:"nodeId"` Region string `json:"region"` @@ -88,6 +90,7 @@ func (h *handler) accountUsageSummary(c *gin.Context) { "totalBytes": totalBytes, "uplinkBytes": uplinkBytes, "downlinkBytes": downlinkBytes, + "sourceOfTruth": accountingDataSource, "currentBalance": currentBalance, "remainingIncludedQuota": remainingQuota, "suspendState": suspendState, @@ -122,8 +125,9 @@ func (h *handler) accountUsageBuckets(c *gin.Context) { } c.JSON(http.StatusOK, gin.H{ - "accountUuid": user.ID, - "buckets": buckets, + "accountUuid": user.ID, + "buckets": buckets, + "sourceOfTruth": accountingDataSource, }) } @@ -145,9 +149,10 @@ func (h *handler) accountBillingSummary(c *gin.Context) { } c.JSON(http.StatusOK, gin.H{ - "accountUuid": user.ID, - "quotaState": quota, - "ledger": ledger, + "accountUuid": user.ID, + "quotaState": quota, + "ledger": ledger, + "sourceOfTruth": accountingDataSource, }) } diff --git a/api/accounting_test.go b/api/accounting_test.go index 96c4a13..e8172cb 100644 --- a/api/accounting_test.go +++ b/api/accounting_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "net/http" "net/http/httptest" + "os" "testing" "time" @@ -137,6 +138,20 @@ func TestAccountUsageAndPolicyEndpoints(t *testing.T) { t.Fatalf("upsert quota state: %v", err) } + if err := st.InsertBillingLedgerEntry(ctx, &store.BillingLedgerEntry{ + ID: "ledger-1", + AccountUUID: user.ID, + BucketStart: bucketStart, + BucketEnd: bucketStart.Add(time.Minute), + EntryType: "traffic_charge", + RatedBytes: 384, + AmountDelta: -1.25, + BalanceAfter: 87.5, + PricingRuleVersion: "pricing-v1", + }); err != nil { + t.Fatalf("insert billing ledger: %v", err) + } + if err := st.UpsertAccountPolicySnapshot(ctx, &store.AccountPolicySnapshot{ AccountUUID: user.ID, PolicyVersion: "policy-v1", @@ -164,8 +179,9 @@ func TestAccountUsageAndPolicyEndpoints(t *testing.T) { } var usagePayload struct { - AccountUUID string `json:"accountUuid"` - TotalBytes int64 `json:"totalBytes"` + AccountUUID string `json:"accountUuid"` + TotalBytes int64 `json:"totalBytes"` + SourceOfTruth string `json:"sourceOfTruth"` } if err := json.Unmarshal(rec.Body.Bytes(), &usagePayload); err != nil { t.Fatalf("decode usage payload: %v", err) @@ -176,6 +192,93 @@ func TestAccountUsageAndPolicyEndpoints(t *testing.T) { if usagePayload.TotalBytes != 384 { t.Fatalf("expected total bytes 384, got %d", usagePayload.TotalBytes) } + if usagePayload.SourceOfTruth != "postgresql" { + t.Fatalf("expected source of truth postgresql, got %q", usagePayload.SourceOfTruth) + } + + bucketsReq := httptest.NewRequest(http.MethodGet, "/api/account/usage/buckets", nil) + bucketsReq.Header.Set("Authorization", "Bearer "+sessionToken) + bucketsRec := httptest.NewRecorder() + router.ServeHTTP(bucketsRec, bucketsReq) + + if bucketsRec.Code != http.StatusOK { + t.Fatalf("usage buckets status: %d body=%s", bucketsRec.Code, bucketsRec.Body.String()) + } + + var bucketsPayload struct { + AccountUUID string `json:"accountUuid"` + SourceOfTruth string `json:"sourceOfTruth"` + Buckets []struct { + TotalBytes int64 `json:"totalBytes"` + NodeID string `json:"nodeId"` + BucketStart time.Time `json:"bucketStart"` + } `json:"buckets"` + } + if err := json.Unmarshal(bucketsRec.Body.Bytes(), &bucketsPayload); err != nil { + t.Fatalf("decode usage buckets payload: %v", err) + } + if bucketsPayload.AccountUUID != user.ID { + t.Fatalf("expected usage buckets account uuid %q, got %q", user.ID, bucketsPayload.AccountUUID) + } + if bucketsPayload.SourceOfTruth != "postgresql" { + t.Fatalf("expected usage buckets source of truth postgresql, got %q", bucketsPayload.SourceOfTruth) + } + if len(bucketsPayload.Buckets) != 1 { + t.Fatalf("expected 1 usage bucket, got %d", len(bucketsPayload.Buckets)) + } + if bucketsPayload.Buckets[0].TotalBytes != 384 { + t.Fatalf("expected usage bucket total bytes 384, got %d", bucketsPayload.Buckets[0].TotalBytes) + } + if bucketsPayload.Buckets[0].NodeID != "node-a" { + t.Fatalf("expected usage bucket node node-a, got %q", bucketsPayload.Buckets[0].NodeID) + } + + billingReq := httptest.NewRequest(http.MethodGet, "/api/account/billing/summary", nil) + billingReq.Header.Set("Authorization", "Bearer "+sessionToken) + billingRec := httptest.NewRecorder() + router.ServeHTTP(billingRec, billingReq) + + if billingRec.Code != http.StatusOK { + t.Fatalf("billing summary status: %d body=%s", billingRec.Code, billingRec.Body.String()) + } + + var billingPayload struct { + AccountUUID string `json:"accountUuid"` + SourceOfTruth string `json:"sourceOfTruth"` + QuotaState struct { + CurrentBalance float64 `json:"currentBalance"` + } `json:"quotaState"` + Ledger []struct { + ID string `json:"id"` + EntryType string `json:"entryType"` + RatedBytes int64 `json:"ratedBytes"` + AmountDelta float64 `json:"amountDelta"` + } `json:"ledger"` + } + if err := json.Unmarshal(billingRec.Body.Bytes(), &billingPayload); err != nil { + t.Fatalf("decode billing payload: %v", err) + } + if billingPayload.AccountUUID != user.ID { + t.Fatalf("expected billing account uuid %q, got %q", user.ID, billingPayload.AccountUUID) + } + if billingPayload.SourceOfTruth != "postgresql" { + t.Fatalf("expected billing source of truth postgresql, got %q", billingPayload.SourceOfTruth) + } + if billingPayload.QuotaState.CurrentBalance != 87.5 { + t.Fatalf("expected billing current balance 87.5, got %v", billingPayload.QuotaState.CurrentBalance) + } + if len(billingPayload.Ledger) != 1 { + t.Fatalf("expected 1 billing ledger entry, got %d", len(billingPayload.Ledger)) + } + if billingPayload.Ledger[0].ID != "ledger-1" { + t.Fatalf("expected billing ledger id ledger-1, got %q", billingPayload.Ledger[0].ID) + } + if billingPayload.Ledger[0].EntryType != "traffic_charge" { + t.Fatalf("expected billing entry type traffic_charge, got %q", billingPayload.Ledger[0].EntryType) + } + if billingPayload.Ledger[0].RatedBytes != 384 { + t.Fatalf("expected billing rated bytes 384, got %d", billingPayload.Ledger[0].RatedBytes) + } policyReq := httptest.NewRequest(http.MethodGet, "/api/account/policy", nil) policyReq.Header.Set("Authorization", "Bearer "+sessionToken) @@ -204,3 +307,78 @@ func TestAccountUsageAndPolicyEndpoints(t *testing.T) { t.Fatalf("unexpected eligible node groups %#v", policyPayload.EligibleNodeGroups) } } + +func TestInternalNetworkIdentitiesEndpoint(t *testing.T) { + gin.SetMode(gin.TestMode) + + t.Setenv("INTERNAL_SERVICE_TOKEN", "test-internal-token") + + st := store.NewMemoryStore() + ctx := context.Background() + + if err := st.CreateUser(ctx, &store.User{ + Name: "Exporter User", + Email: "exporter@example.com", + PasswordHash: "hashed", + EmailVerified: true, + Role: store.RoleUser, + Level: store.LevelUser, + Active: true, + ProxyUUID: "proxy-exporter-id", + }); err != nil { + t.Fatalf("create user: %v", err) + } + + if err := st.CreateUser(ctx, &store.User{ + Name: "Inactive User", + Email: "inactive@example.com", + PasswordHash: "hashed", + EmailVerified: true, + Role: store.RoleUser, + Level: store.LevelUser, + Active: false, + ProxyUUID: "proxy-inactive-id", + }); err != nil { + t.Fatalf("create inactive user: %v", err) + } + + router := gin.New() + RegisterRoutes(router, WithStore(st), WithEmailVerification(false)) + + req := httptest.NewRequest(http.MethodGet, "/api/internal/network/identities", nil) + req.Header.Set("X-Service-Token", os.Getenv("INTERNAL_SERVICE_TOKEN")) + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("internal identities status: %d body=%s", rec.Code, rec.Body.String()) + } + + var payload struct { + Identities []struct { + UUID string `json:"uuid"` + Email string `json:"email"` + AccountUUID string `json:"accountUuid"` + } `json:"identities"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &payload); err != nil { + t.Fatalf("decode internal identities payload: %v", err) + } + + foundExporter := false + for _, identity := range payload.Identities { + if identity.Email != "exporter@example.com" { + continue + } + foundExporter = true + if identity.UUID == "" { + t.Fatalf("expected exporter UUID to be populated") + } + if identity.AccountUUID == "" { + t.Fatalf("expected account uuid to be populated") + } + } + if !foundExporter { + t.Fatalf("expected exporter identity in payload, got %#v", payload.Identities) + } +} diff --git a/api/api.go b/api/api.go index 0c5aa39..4d0598b 100644 --- a/api/api.go +++ b/api/api.go @@ -414,6 +414,7 @@ func RegisterRoutes(r *gin.Engine, opts ...Option) { internalGroup.Use(auth.InternalAuthMiddleware()) internalGroup.GET("/public-overview", h.internalPublicOverview) internalGroup.GET("/sandbox/guest", h.internalSandboxGuest) + internalGroup.GET("/network/identities", h.internalNetworkIdentities) internalGroup.GET("/policy/:accountUUID", h.internalAccountPolicy) internalGroup.POST("/nodes/heartbeat", h.internalNodeHeartbeat) diff --git a/api/internal_network_identities.go b/api/internal_network_identities.go new file mode 100644 index 0000000..ff6dab9 --- /dev/null +++ b/api/internal_network_identities.go @@ -0,0 +1,53 @@ +package api + +import ( + "net/http" + "strings" + "time" + + "github.com/gin-gonic/gin" +) + +type internalNetworkIdentity struct { + UUID string `json:"uuid"` + Email string `json:"email"` + AccountUUID string `json:"accountUuid"` +} + +func (h *handler) internalNetworkIdentities(c *gin.Context) { + if h.store == nil { + respondError(c, http.StatusServiceUnavailable, "store_unavailable", "identity store is not available") + return + } + + users, err := h.store.ListUsers(c.Request.Context()) + if err != nil { + respondError(c, http.StatusInternalServerError, "list_users_failed", "failed to load network identities") + return + } + + identities := make([]internalNetworkIdentity, 0, len(users)) + for _, user := range users { + if !user.Active { + continue + } + uuid := strings.TrimSpace(user.ProxyUUID) + if uuid == "" { + uuid = strings.TrimSpace(user.ID) + } + if uuid == "" { + continue + } + + identities = append(identities, internalNetworkIdentity{ + UUID: uuid, + Email: strings.ToLower(strings.TrimSpace(user.Email)), + AccountUUID: strings.TrimSpace(user.ID), + }) + } + + c.JSON(http.StatusOK, gin.H{ + "generatedAt": time.Now().UTC(), + "identities": identities, + }) +} diff --git a/docs/architecture/accounts/overview.md b/docs/architecture/accounts/overview.md index c3d6154..54cce3a 100644 --- a/docs/architecture/accounts/overview.md +++ b/docs/architecture/accounts/overview.md @@ -142,6 +142,7 @@ flowchart TB - `identities` links OAuth providers to users. - `sessions` stores session tokens with expiry. - `subscriptions` stores billing state and provider references. +- `traffic_stat_checkpoints`, `traffic_minute_buckets`, `billing_ledger`, `account_quota_states`, `account_policy_snapshots`, `node_health_snapshots`, and `scheduler_decisions` are the control-plane tables for usage, billing, and orchestration. - `admin_settings` stores a versioned permission matrix. - `rbac_roles`, `rbac_permissions`, and `rbac_role_permissions` store the RBAC catalog. - `agents` stores agent runtime status snapshots. @@ -161,3 +162,4 @@ flowchart TB - Some frontend BFF endpoints use the same account service backend but shape cookies and auth state for browser usage. - `api/agent-server/v1/*` is the canonical route family for the console and agent runtime. +- Usage and billing APIs are PostgreSQL-backed and do not depend on Prometheus. diff --git a/docs/architecture/overview.md b/docs/architecture/overview.md index 344839f..13747ba 100644 --- a/docs/architecture/overview.md +++ b/docs/architecture/overview.md @@ -1,6 +1,6 @@ # 架构总览 -Account Service 是一个单体 Go 服务,提供账号与运营相关能力,同时可作为 Xray Controller 管理 Agent。 +Account Service 是一个单体 Go 服务,提供账号、计费与运营相关能力,同时可作为 Xray Controller 管理 Agent。 ## 逻辑架构(文字版) @@ -9,6 +9,7 @@ Client └─ HTTP API (Gin) ├─ Session / MFA / Email verification ├─ Subscription & Admin Settings + ├─ Usage / Billing aggregation ├─ Agent Controller (/api/agent-server/v1) └─ Token Service (optional) │ @@ -33,6 +34,12 @@ Client - Agent: 定时拉取用户列表生成 Xray 配置,并上报状态 4) 数据同步 +- `traffic_minute_buckets` 保存分钟级流量快照,是 usage / billing 的基础事实表 +- `billing_ledger` 保存计费分录,供前端和后台页面读取 +- `account_quota_states`、`account_policy_snapshots`、`node_health_snapshots` 和 `scheduler_decisions` 共同构成控制平面的状态层 +- 所有 usage / billing 响应都来自 PostgreSQL,不依赖 Prometheus + +5) 工具同步 - `migratectl`:导入/导出 YAML 快照 - `syncctl`:通过 SSH 在不同环境间同步