feat(accounts): expose network identities and pg billing source

This commit is contained in:
Haitao Pan 2026-04-09 13:29:18 +08:00
parent 70c6a3f82f
commit 3b4df1ad0d
6 changed files with 254 additions and 8 deletions

View File

@ -11,6 +11,8 @@ import (
"account/internal/store"
)
const accountingDataSource = "postgresql"
type nodeHeartbeatRequest struct {
NodeID string `json:"nodeId"`
Region string `json:"region"`
@ -88,6 +90,7 @@ func (h *handler) accountUsageSummary(c *gin.Context) {
"totalBytes": totalBytes,
"uplinkBytes": uplinkBytes,
"downlinkBytes": downlinkBytes,
"sourceOfTruth": accountingDataSource,
"currentBalance": currentBalance,
"remainingIncludedQuota": remainingQuota,
"suspendState": suspendState,
@ -122,8 +125,9 @@ func (h *handler) accountUsageBuckets(c *gin.Context) {
}
c.JSON(http.StatusOK, gin.H{
"accountUuid": user.ID,
"buckets": buckets,
"accountUuid": user.ID,
"buckets": buckets,
"sourceOfTruth": accountingDataSource,
})
}
@ -145,9 +149,10 @@ func (h *handler) accountBillingSummary(c *gin.Context) {
}
c.JSON(http.StatusOK, gin.H{
"accountUuid": user.ID,
"quotaState": quota,
"ledger": ledger,
"accountUuid": user.ID,
"quotaState": quota,
"ledger": ledger,
"sourceOfTruth": accountingDataSource,
})
}

View File

@ -5,6 +5,7 @@ import (
"encoding/json"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"
@ -137,6 +138,20 @@ func TestAccountUsageAndPolicyEndpoints(t *testing.T) {
t.Fatalf("upsert quota state: %v", err)
}
if err := st.InsertBillingLedgerEntry(ctx, &store.BillingLedgerEntry{
ID: "ledger-1",
AccountUUID: user.ID,
BucketStart: bucketStart,
BucketEnd: bucketStart.Add(time.Minute),
EntryType: "traffic_charge",
RatedBytes: 384,
AmountDelta: -1.25,
BalanceAfter: 87.5,
PricingRuleVersion: "pricing-v1",
}); err != nil {
t.Fatalf("insert billing ledger: %v", err)
}
if err := st.UpsertAccountPolicySnapshot(ctx, &store.AccountPolicySnapshot{
AccountUUID: user.ID,
PolicyVersion: "policy-v1",
@ -164,8 +179,9 @@ func TestAccountUsageAndPolicyEndpoints(t *testing.T) {
}
var usagePayload struct {
AccountUUID string `json:"accountUuid"`
TotalBytes int64 `json:"totalBytes"`
AccountUUID string `json:"accountUuid"`
TotalBytes int64 `json:"totalBytes"`
SourceOfTruth string `json:"sourceOfTruth"`
}
if err := json.Unmarshal(rec.Body.Bytes(), &usagePayload); err != nil {
t.Fatalf("decode usage payload: %v", err)
@ -176,6 +192,93 @@ func TestAccountUsageAndPolicyEndpoints(t *testing.T) {
if usagePayload.TotalBytes != 384 {
t.Fatalf("expected total bytes 384, got %d", usagePayload.TotalBytes)
}
if usagePayload.SourceOfTruth != "postgresql" {
t.Fatalf("expected source of truth postgresql, got %q", usagePayload.SourceOfTruth)
}
bucketsReq := httptest.NewRequest(http.MethodGet, "/api/account/usage/buckets", nil)
bucketsReq.Header.Set("Authorization", "Bearer "+sessionToken)
bucketsRec := httptest.NewRecorder()
router.ServeHTTP(bucketsRec, bucketsReq)
if bucketsRec.Code != http.StatusOK {
t.Fatalf("usage buckets status: %d body=%s", bucketsRec.Code, bucketsRec.Body.String())
}
var bucketsPayload struct {
AccountUUID string `json:"accountUuid"`
SourceOfTruth string `json:"sourceOfTruth"`
Buckets []struct {
TotalBytes int64 `json:"totalBytes"`
NodeID string `json:"nodeId"`
BucketStart time.Time `json:"bucketStart"`
} `json:"buckets"`
}
if err := json.Unmarshal(bucketsRec.Body.Bytes(), &bucketsPayload); err != nil {
t.Fatalf("decode usage buckets payload: %v", err)
}
if bucketsPayload.AccountUUID != user.ID {
t.Fatalf("expected usage buckets account uuid %q, got %q", user.ID, bucketsPayload.AccountUUID)
}
if bucketsPayload.SourceOfTruth != "postgresql" {
t.Fatalf("expected usage buckets source of truth postgresql, got %q", bucketsPayload.SourceOfTruth)
}
if len(bucketsPayload.Buckets) != 1 {
t.Fatalf("expected 1 usage bucket, got %d", len(bucketsPayload.Buckets))
}
if bucketsPayload.Buckets[0].TotalBytes != 384 {
t.Fatalf("expected usage bucket total bytes 384, got %d", bucketsPayload.Buckets[0].TotalBytes)
}
if bucketsPayload.Buckets[0].NodeID != "node-a" {
t.Fatalf("expected usage bucket node node-a, got %q", bucketsPayload.Buckets[0].NodeID)
}
billingReq := httptest.NewRequest(http.MethodGet, "/api/account/billing/summary", nil)
billingReq.Header.Set("Authorization", "Bearer "+sessionToken)
billingRec := httptest.NewRecorder()
router.ServeHTTP(billingRec, billingReq)
if billingRec.Code != http.StatusOK {
t.Fatalf("billing summary status: %d body=%s", billingRec.Code, billingRec.Body.String())
}
var billingPayload struct {
AccountUUID string `json:"accountUuid"`
SourceOfTruth string `json:"sourceOfTruth"`
QuotaState struct {
CurrentBalance float64 `json:"currentBalance"`
} `json:"quotaState"`
Ledger []struct {
ID string `json:"id"`
EntryType string `json:"entryType"`
RatedBytes int64 `json:"ratedBytes"`
AmountDelta float64 `json:"amountDelta"`
} `json:"ledger"`
}
if err := json.Unmarshal(billingRec.Body.Bytes(), &billingPayload); err != nil {
t.Fatalf("decode billing payload: %v", err)
}
if billingPayload.AccountUUID != user.ID {
t.Fatalf("expected billing account uuid %q, got %q", user.ID, billingPayload.AccountUUID)
}
if billingPayload.SourceOfTruth != "postgresql" {
t.Fatalf("expected billing source of truth postgresql, got %q", billingPayload.SourceOfTruth)
}
if billingPayload.QuotaState.CurrentBalance != 87.5 {
t.Fatalf("expected billing current balance 87.5, got %v", billingPayload.QuotaState.CurrentBalance)
}
if len(billingPayload.Ledger) != 1 {
t.Fatalf("expected 1 billing ledger entry, got %d", len(billingPayload.Ledger))
}
if billingPayload.Ledger[0].ID != "ledger-1" {
t.Fatalf("expected billing ledger id ledger-1, got %q", billingPayload.Ledger[0].ID)
}
if billingPayload.Ledger[0].EntryType != "traffic_charge" {
t.Fatalf("expected billing entry type traffic_charge, got %q", billingPayload.Ledger[0].EntryType)
}
if billingPayload.Ledger[0].RatedBytes != 384 {
t.Fatalf("expected billing rated bytes 384, got %d", billingPayload.Ledger[0].RatedBytes)
}
policyReq := httptest.NewRequest(http.MethodGet, "/api/account/policy", nil)
policyReq.Header.Set("Authorization", "Bearer "+sessionToken)
@ -204,3 +307,78 @@ func TestAccountUsageAndPolicyEndpoints(t *testing.T) {
t.Fatalf("unexpected eligible node groups %#v", policyPayload.EligibleNodeGroups)
}
}
func TestInternalNetworkIdentitiesEndpoint(t *testing.T) {
gin.SetMode(gin.TestMode)
t.Setenv("INTERNAL_SERVICE_TOKEN", "test-internal-token")
st := store.NewMemoryStore()
ctx := context.Background()
if err := st.CreateUser(ctx, &store.User{
Name: "Exporter User",
Email: "exporter@example.com",
PasswordHash: "hashed",
EmailVerified: true,
Role: store.RoleUser,
Level: store.LevelUser,
Active: true,
ProxyUUID: "proxy-exporter-id",
}); err != nil {
t.Fatalf("create user: %v", err)
}
if err := st.CreateUser(ctx, &store.User{
Name: "Inactive User",
Email: "inactive@example.com",
PasswordHash: "hashed",
EmailVerified: true,
Role: store.RoleUser,
Level: store.LevelUser,
Active: false,
ProxyUUID: "proxy-inactive-id",
}); err != nil {
t.Fatalf("create inactive user: %v", err)
}
router := gin.New()
RegisterRoutes(router, WithStore(st), WithEmailVerification(false))
req := httptest.NewRequest(http.MethodGet, "/api/internal/network/identities", nil)
req.Header.Set("X-Service-Token", os.Getenv("INTERNAL_SERVICE_TOKEN"))
rec := httptest.NewRecorder()
router.ServeHTTP(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("internal identities status: %d body=%s", rec.Code, rec.Body.String())
}
var payload struct {
Identities []struct {
UUID string `json:"uuid"`
Email string `json:"email"`
AccountUUID string `json:"accountUuid"`
} `json:"identities"`
}
if err := json.Unmarshal(rec.Body.Bytes(), &payload); err != nil {
t.Fatalf("decode internal identities payload: %v", err)
}
foundExporter := false
for _, identity := range payload.Identities {
if identity.Email != "exporter@example.com" {
continue
}
foundExporter = true
if identity.UUID == "" {
t.Fatalf("expected exporter UUID to be populated")
}
if identity.AccountUUID == "" {
t.Fatalf("expected account uuid to be populated")
}
}
if !foundExporter {
t.Fatalf("expected exporter identity in payload, got %#v", payload.Identities)
}
}

View File

@ -414,6 +414,7 @@ func RegisterRoutes(r *gin.Engine, opts ...Option) {
internalGroup.Use(auth.InternalAuthMiddleware())
internalGroup.GET("/public-overview", h.internalPublicOverview)
internalGroup.GET("/sandbox/guest", h.internalSandboxGuest)
internalGroup.GET("/network/identities", h.internalNetworkIdentities)
internalGroup.GET("/policy/:accountUUID", h.internalAccountPolicy)
internalGroup.POST("/nodes/heartbeat", h.internalNodeHeartbeat)

View File

@ -0,0 +1,53 @@
package api
import (
"net/http"
"strings"
"time"
"github.com/gin-gonic/gin"
)
type internalNetworkIdentity struct {
UUID string `json:"uuid"`
Email string `json:"email"`
AccountUUID string `json:"accountUuid"`
}
func (h *handler) internalNetworkIdentities(c *gin.Context) {
if h.store == nil {
respondError(c, http.StatusServiceUnavailable, "store_unavailable", "identity store is not available")
return
}
users, err := h.store.ListUsers(c.Request.Context())
if err != nil {
respondError(c, http.StatusInternalServerError, "list_users_failed", "failed to load network identities")
return
}
identities := make([]internalNetworkIdentity, 0, len(users))
for _, user := range users {
if !user.Active {
continue
}
uuid := strings.TrimSpace(user.ProxyUUID)
if uuid == "" {
uuid = strings.TrimSpace(user.ID)
}
if uuid == "" {
continue
}
identities = append(identities, internalNetworkIdentity{
UUID: uuid,
Email: strings.ToLower(strings.TrimSpace(user.Email)),
AccountUUID: strings.TrimSpace(user.ID),
})
}
c.JSON(http.StatusOK, gin.H{
"generatedAt": time.Now().UTC(),
"identities": identities,
})
}

View File

@ -142,6 +142,7 @@ flowchart TB
- `identities` links OAuth providers to users.
- `sessions` stores session tokens with expiry.
- `subscriptions` stores billing state and provider references.
- `traffic_stat_checkpoints`, `traffic_minute_buckets`, `billing_ledger`, `account_quota_states`, `account_policy_snapshots`, `node_health_snapshots`, and `scheduler_decisions` are the control-plane tables for usage, billing, and orchestration.
- `admin_settings` stores a versioned permission matrix.
- `rbac_roles`, `rbac_permissions`, and `rbac_role_permissions` store the RBAC catalog.
- `agents` stores agent runtime status snapshots.
@ -161,3 +162,4 @@ flowchart TB
- Some frontend BFF endpoints use the same account service backend but shape cookies and auth state for browser usage.
- `api/agent-server/v1/*` is the canonical route family for the console and agent runtime.
- Usage and billing APIs are PostgreSQL-backed and do not depend on Prometheus.

View File

@ -1,6 +1,6 @@
# 架构总览
Account Service 是一个单体 Go 服务,提供账号与运营相关能力,同时可作为 Xray Controller 管理 Agent。
Account Service 是一个单体 Go 服务,提供账号、计费与运营相关能力,同时可作为 Xray Controller 管理 Agent。
## 逻辑架构(文字版)
@ -9,6 +9,7 @@ Client
└─ HTTP API (Gin)
├─ Session / MFA / Email verification
├─ Subscription & Admin Settings
├─ Usage / Billing aggregation
├─ Agent Controller (/api/agent-server/v1)
└─ Token Service (optional)
@ -33,6 +34,12 @@ Client
- Agent: 定时拉取用户列表生成 Xray 配置,并上报状态
4) 数据同步
- `traffic_minute_buckets` 保存分钟级流量快照,是 usage / billing 的基础事实表
- `billing_ledger` 保存计费分录,供前端和后台页面读取
- `account_quota_states`、`account_policy_snapshots`、`node_health_snapshots` 和 `scheduler_decisions` 共同构成控制平面的状态层
- 所有 usage / billing 响应都来自 PostgreSQL不依赖 Prometheus
5) 工具同步
- `migratectl`:导入/导出 YAML 快照
- `syncctl`:通过 SSH 在不同环境间同步