feat(accounts): add accounting control plane

This commit is contained in:
Haitao Pan 2026-04-01 16:15:16 +08:00
parent 794d386639
commit e9fb4af72b
13 changed files with 1945 additions and 26 deletions

335
api/accounting.go Normal file
View File

@ -0,0 +1,335 @@
package api
import (
"net/http"
"strconv"
"strings"
"time"
"github.com/gin-gonic/gin"
"account/internal/store"
)
type nodeHeartbeatRequest struct {
NodeID string `json:"nodeId"`
Region string `json:"region"`
LineCode string `json:"lineCode"`
PricingGroup string `json:"pricingGroup"`
StatsEnabled bool `json:"statsEnabled"`
XrayRevision string `json:"xrayRevision"`
Healthy bool `json:"healthy"`
LatencyMS int `json:"latencyMs"`
ErrorRate float64 `json:"errorRate"`
ActiveConnections int `json:"activeConnections"`
HealthScore float64 `json:"healthScore"`
SampledAt string `json:"sampledAt"`
}
func parseOptionalTime(value string) (time.Time, error) {
trimmed := strings.TrimSpace(value)
if trimmed == "" {
return time.Time{}, nil
}
parsed, err := time.Parse(time.RFC3339, trimmed)
if err == nil {
return parsed.UTC(), nil
}
return time.Parse("2006-01-02T15:04", trimmed)
}
func (h *handler) accountUsageSummary(c *gin.Context) {
user, ok := h.requireAuthenticatedUser(c)
if !ok {
return
}
buckets, err := h.store.ListTrafficMinuteBucketsByAccount(c.Request.Context(), user.ID, time.Time{}, time.Time{})
if err != nil {
respondError(c, http.StatusInternalServerError, "usage_summary_unavailable", "failed to load usage summary")
return
}
var totalBytes, uplinkBytes, downlinkBytes int64
var lastBucketAt *time.Time
for _, bucket := range buckets {
totalBytes += bucket.TotalBytes
uplinkBytes += bucket.UplinkBytes
downlinkBytes += bucket.DownlinkBytes
timestamp := bucket.BucketStart.UTC()
if lastBucketAt == nil || timestamp.After(*lastBucketAt) {
lastBucketAt = &timestamp
}
}
currentBalance := 0.0
remainingQuota := int64(0)
suspendState := "active"
throttleState := "normal"
arrears := false
if quota, err := h.store.GetAccountQuotaState(c.Request.Context(), user.ID); err == nil && quota != nil {
currentBalance = quota.CurrentBalance
remainingQuota = quota.RemainingIncludedQuota
suspendState = quota.SuspendState
throttleState = quota.ThrottleState
arrears = quota.Arrears
}
syncDelaySeconds := 0
if lastBucketAt != nil {
syncDelaySeconds = int(time.Since(*lastBucketAt).Seconds())
if syncDelaySeconds < 0 {
syncDelaySeconds = 0
}
}
c.JSON(http.StatusOK, gin.H{
"accountUuid": user.ID,
"totalBytes": totalBytes,
"uplinkBytes": uplinkBytes,
"downlinkBytes": downlinkBytes,
"currentBalance": currentBalance,
"remainingIncludedQuota": remainingQuota,
"suspendState": suspendState,
"throttleState": throttleState,
"arrears": arrears,
"lastBucketAt": lastBucketAt,
"syncDelaySeconds": syncDelaySeconds,
})
}
func (h *handler) accountUsageBuckets(c *gin.Context) {
user, ok := h.requireAuthenticatedUser(c)
if !ok {
return
}
start, err := parseOptionalTime(c.Query("start"))
if err != nil {
respondError(c, http.StatusBadRequest, "invalid_start", "start must be RFC3339")
return
}
end, err := parseOptionalTime(c.Query("end"))
if err != nil {
respondError(c, http.StatusBadRequest, "invalid_end", "end must be RFC3339")
return
}
buckets, err := h.store.ListTrafficMinuteBucketsByAccount(c.Request.Context(), user.ID, start, end)
if err != nil {
respondError(c, http.StatusInternalServerError, "usage_buckets_unavailable", "failed to load usage buckets")
return
}
c.JSON(http.StatusOK, gin.H{
"accountUuid": user.ID,
"buckets": buckets,
})
}
func (h *handler) accountBillingSummary(c *gin.Context) {
user, ok := h.requireAuthenticatedUser(c)
if !ok {
return
}
ledger, err := h.store.ListBillingLedgerByAccount(c.Request.Context(), user.ID, 20)
if err != nil {
respondError(c, http.StatusInternalServerError, "billing_summary_unavailable", "failed to load billing summary")
return
}
var quota *store.AccountQuotaState
if snapshot, err := h.store.GetAccountQuotaState(c.Request.Context(), user.ID); err == nil {
quota = snapshot
}
c.JSON(http.StatusOK, gin.H{
"accountUuid": user.ID,
"quotaState": quota,
"ledger": ledger,
})
}
func (h *handler) accountPolicy(c *gin.Context) {
user, ok := h.requireAuthenticatedUser(c)
if !ok {
return
}
policy, err := h.store.GetLatestAccountPolicySnapshot(c.Request.Context(), user.ID)
if err != nil {
respondError(c, http.StatusNotFound, "policy_not_found", "account policy snapshot is not available")
return
}
c.JSON(http.StatusOK, gin.H{
"accountUuid": policy.AccountUUID,
"policyVersion": policy.PolicyVersion,
"authState": policy.AuthState,
"rateProfile": policy.RateProfile,
"connProfile": policy.ConnProfile,
"eligibleNodeGroups": policy.EligibleNodeGroups,
"preferredStrategy": policy.PreferredStrategy,
"degradeMode": policy.DegradeMode,
"expiresAt": policy.ExpiresAt,
})
}
func (h *handler) adminTrafficNodes(c *gin.Context) {
if _, ok := h.requireAdminPermission(c, permissionAdminAgentsStatus); !ok {
return
}
nodes, err := h.store.ListLatestNodeHealthSnapshots(c.Request.Context())
if err != nil {
respondError(c, http.StatusInternalServerError, "node_health_unavailable", "failed to load node health snapshots")
return
}
c.JSON(http.StatusOK, gin.H{"nodes": nodes})
}
func (h *handler) adminTrafficAccount(c *gin.Context) {
if _, ok := h.requireAdminPermission(c, permissionAdminUsersListRead); !ok {
return
}
accountUUID := strings.TrimSpace(c.Param("uuid"))
if accountUUID == "" {
respondError(c, http.StatusBadRequest, "account_uuid_required", "account uuid is required")
return
}
buckets, err := h.store.ListTrafficMinuteBucketsByAccount(c.Request.Context(), accountUUID, time.Time{}, time.Time{})
if err != nil {
respondError(c, http.StatusInternalServerError, "account_traffic_unavailable", "failed to load account traffic")
return
}
ledger, err := h.store.ListBillingLedgerByAccount(c.Request.Context(), accountUUID, 20)
if err != nil {
respondError(c, http.StatusInternalServerError, "account_billing_unavailable", "failed to load account billing")
return
}
policy, _ := h.store.GetLatestAccountPolicySnapshot(c.Request.Context(), accountUUID)
quota, _ := h.store.GetAccountQuotaState(c.Request.Context(), accountUUID)
c.JSON(http.StatusOK, gin.H{
"accountUuid": accountUUID,
"buckets": buckets,
"ledger": ledger,
"policy": policy,
"quotaState": quota,
})
}
func (h *handler) adminCollectorStatus(c *gin.Context) {
if _, ok := h.requireAdminPermission(c, permissionAdminAgentsStatus); !ok {
return
}
checkpoints, err := h.store.ListTrafficStatCheckpoints(c.Request.Context())
if err != nil {
respondError(c, http.StatusInternalServerError, "collector_status_unavailable", "failed to load collector checkpoints")
return
}
buckets, err := h.store.ListTrafficMinuteBuckets(c.Request.Context())
if err != nil {
respondError(c, http.StatusInternalServerError, "collector_status_unavailable", "failed to load collector buckets")
return
}
var latestCheckpointAt *time.Time
for _, checkpoint := range checkpoints {
timestamp := checkpoint.LastSeenAt.UTC()
if latestCheckpointAt == nil || timestamp.After(*latestCheckpointAt) {
latestCheckpointAt = &timestamp
}
}
c.JSON(http.StatusOK, gin.H{
"checkpointCount": len(checkpoints),
"minuteBucketCount": len(buckets),
"latestCheckpointAt": latestCheckpointAt,
})
}
func (h *handler) adminSchedulerStatus(c *gin.Context) {
if _, ok := h.requireAdminPermission(c, permissionAdminAgentsStatus); !ok {
return
}
limit := 20
if raw := strings.TrimSpace(c.Query("limit")); raw != "" {
if parsed, err := strconv.Atoi(raw); err == nil && parsed > 0 && parsed <= 200 {
limit = parsed
}
}
decisions, err := h.store.ListRecentSchedulerDecisions(c.Request.Context(), limit)
if err != nil {
respondError(c, http.StatusInternalServerError, "scheduler_status_unavailable", "failed to load scheduler decisions")
return
}
c.JSON(http.StatusOK, gin.H{"decisions": decisions})
}
func (h *handler) internalAccountPolicy(c *gin.Context) {
accountUUID := strings.TrimSpace(c.Param("accountUUID"))
if accountUUID == "" {
respondError(c, http.StatusBadRequest, "account_uuid_required", "account uuid is required")
return
}
policy, err := h.store.GetLatestAccountPolicySnapshot(c.Request.Context(), accountUUID)
if err != nil {
respondError(c, http.StatusNotFound, "policy_not_found", "account policy snapshot is not available")
return
}
c.JSON(http.StatusOK, policy)
}
func (h *handler) internalNodeHeartbeat(c *gin.Context) {
var req nodeHeartbeatRequest
if err := c.ShouldBindJSON(&req); err != nil {
respondError(c, http.StatusBadRequest, "invalid_request", "invalid heartbeat payload")
return
}
nodeID := strings.TrimSpace(req.NodeID)
if nodeID == "" {
respondError(c, http.StatusBadRequest, "node_id_required", "node id is required")
return
}
sampledAt, err := parseOptionalTime(req.SampledAt)
if err != nil {
respondError(c, http.StatusBadRequest, "invalid_sampled_at", "sampledAt must be RFC3339")
return
}
if sampledAt.IsZero() {
sampledAt = time.Now().UTC()
}
if err := h.store.UpsertNodeHealthSnapshot(c.Request.Context(), &store.NodeHealthSnapshot{
NodeID: nodeID,
Region: strings.TrimSpace(req.Region),
LineCode: strings.TrimSpace(req.LineCode),
PricingGroup: strings.TrimSpace(req.PricingGroup),
StatsEnabled: req.StatsEnabled,
XrayRevision: strings.TrimSpace(req.XrayRevision),
Healthy: req.Healthy,
LatencyMS: req.LatencyMS,
ErrorRate: req.ErrorRate,
ActiveConnections: req.ActiveConnections,
HealthScore: req.HealthScore,
SampledAt: sampledAt,
}); err != nil {
respondError(c, http.StatusInternalServerError, "heartbeat_persist_failed", "failed to persist node heartbeat")
return
}
c.Status(http.StatusNoContent)
}

206
api/accounting_test.go Normal file
View File

@ -0,0 +1,206 @@
package api
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/gin-gonic/gin"
"account/internal/agentserver"
"account/internal/store"
)
func TestAgentUsersUseAccountUUIDAsStatsEmail(t *testing.T) {
gin.SetMode(gin.TestMode)
st := store.NewMemoryStore()
ctx := context.Background()
if err := st.CreateUser(ctx, &store.User{
Name: "Stats User",
Email: "stats@example.com",
PasswordHash: "hashed",
EmailVerified: true,
Role: store.RoleUser,
Level: store.LevelUser,
Active: true,
ProxyUUID: "proxy-user-id",
}); err != nil {
t.Fatalf("create user: %v", err)
}
user, err := st.GetUserByEmail(ctx, "stats@example.com")
if err != nil {
t.Fatalf("get user: %v", err)
}
registry, err := agentserver.NewRegistry(agentserver.Config{
Credentials: []agentserver.Credential{{
ID: "*",
Name: "test-agent",
Token: "agent-token",
}},
})
if err != nil {
t.Fatalf("new registry: %v", err)
}
router := gin.New()
RegisterRoutes(router, WithStore(st), WithAgentRegistry(registry), WithEmailVerification(false))
req := httptest.NewRequest(http.MethodGet, "/api/agent-server/v1/users", nil)
req.Header.Set("Authorization", "Bearer agent-token")
req.Header.Set("X-Agent-ID", "hk-xhttp.svc.plus")
rec := httptest.NewRecorder()
router.ServeHTTP(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("unexpected status: %d body=%s", rec.Code, rec.Body.String())
}
var payload struct {
Clients []struct {
ID string `json:"id"`
Email string `json:"email"`
} `json:"clients"`
}
if err := json.Unmarshal(rec.Body.Bytes(), &payload); err != nil {
t.Fatalf("decode payload: %v", err)
}
for _, client := range payload.Clients {
if client.Email == user.ID {
return
}
}
t.Fatalf("expected stats email %q in payload, got %#v", user.ID, payload.Clients)
}
func TestAccountUsageAndPolicyEndpoints(t *testing.T) {
gin.SetMode(gin.TestMode)
st := store.NewMemoryStore()
ctx := context.Background()
if err := st.CreateUser(ctx, &store.User{
Name: "Billing User",
Email: "billing@example.com",
PasswordHash: "hashed",
EmailVerified: true,
Role: store.RoleUser,
Level: store.LevelUser,
Active: true,
}); err != nil {
t.Fatalf("create user: %v", err)
}
user, err := st.GetUserByEmail(ctx, "billing@example.com")
if err != nil {
t.Fatalf("get user: %v", err)
}
sessionToken := "usage-session-token"
if err := st.CreateSession(ctx, sessionToken, user.ID, time.Now().UTC().Add(time.Hour)); err != nil {
t.Fatalf("create session: %v", err)
}
bucketStart := time.Date(2026, 4, 1, 10, 30, 0, 0, time.UTC)
if err := st.UpsertTrafficMinuteBucket(ctx, &store.TrafficMinuteBucket{
BucketStart: bucketStart,
NodeID: "node-a",
AccountUUID: user.ID,
Region: "hk",
LineCode: "premium",
UplinkBytes: 128,
DownlinkBytes: 256,
TotalBytes: 384,
Multiplier: 1.5,
RatingStatus: store.RatingStatusRated,
}); err != nil {
t.Fatalf("upsert bucket: %v", err)
}
if err := st.UpsertAccountQuotaState(ctx, &store.AccountQuotaState{
AccountUUID: user.ID,
RemainingIncludedQuota: 2048,
CurrentBalance: 87.5,
Arrears: false,
ThrottleState: "normal",
SuspendState: "active",
EffectiveAt: time.Now().UTC(),
}); err != nil {
t.Fatalf("upsert quota state: %v", err)
}
if err := st.UpsertAccountPolicySnapshot(ctx, &store.AccountPolicySnapshot{
AccountUUID: user.ID,
PolicyVersion: "policy-v1",
AuthState: "active",
RateProfile: "standard",
ConnProfile: "standard",
EligibleNodeGroups: []string{"hk-premium"},
PreferredStrategy: "ewma",
DegradeMode: "fallback",
ExpiresAt: time.Now().UTC().Add(5 * time.Minute),
}); err != nil {
t.Fatalf("upsert policy: %v", err)
}
router := gin.New()
RegisterRoutes(router, WithStore(st), WithEmailVerification(false))
req := httptest.NewRequest(http.MethodGet, "/api/account/usage/summary", nil)
req.Header.Set("Authorization", "Bearer "+sessionToken)
rec := httptest.NewRecorder()
router.ServeHTTP(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("usage summary status: %d body=%s", rec.Code, rec.Body.String())
}
var usagePayload struct {
AccountUUID string `json:"accountUuid"`
TotalBytes int64 `json:"totalBytes"`
}
if err := json.Unmarshal(rec.Body.Bytes(), &usagePayload); err != nil {
t.Fatalf("decode usage payload: %v", err)
}
if usagePayload.AccountUUID != user.ID {
t.Fatalf("expected account uuid %q, got %q", user.ID, usagePayload.AccountUUID)
}
if usagePayload.TotalBytes != 384 {
t.Fatalf("expected total bytes 384, got %d", usagePayload.TotalBytes)
}
policyReq := httptest.NewRequest(http.MethodGet, "/api/account/policy", nil)
policyReq.Header.Set("Authorization", "Bearer "+sessionToken)
policyRec := httptest.NewRecorder()
router.ServeHTTP(policyRec, policyReq)
if policyRec.Code != http.StatusOK {
t.Fatalf("policy status: %d body=%s", policyRec.Code, policyRec.Body.String())
}
var policyPayload struct {
AccountUUID string `json:"accountUuid"`
PreferredStrategy string `json:"preferredStrategy"`
EligibleNodeGroups []string `json:"eligibleNodeGroups"`
}
if err := json.Unmarshal(policyRec.Body.Bytes(), &policyPayload); err != nil {
t.Fatalf("decode policy payload: %v", err)
}
if policyPayload.AccountUUID != user.ID {
t.Fatalf("expected policy account uuid %q, got %q", user.ID, policyPayload.AccountUUID)
}
if policyPayload.PreferredStrategy != "ewma" {
t.Fatalf("expected preferred strategy ewma, got %q", policyPayload.PreferredStrategy)
}
if len(policyPayload.EligibleNodeGroups) != 1 || policyPayload.EligibleNodeGroups[0] != "hk-premium" {
t.Fatalf("unexpected eligible node groups %#v", policyPayload.EligibleNodeGroups)
}
}

View File

@ -189,6 +189,10 @@ func registerAdminRoutes(group *gin.RouterGroup, h *handler) {
admin := group.Group("/admin")
admin.GET("/users/metrics", h.adminUsersMetrics)
admin.GET("/agents/status", h.adminAgentStatus)
admin.GET("/traffic/nodes", h.adminTrafficNodes)
admin.GET("/traffic/accounts/:uuid", h.adminTrafficAccount)
admin.GET("/collector/status", h.adminCollectorStatus)
admin.GET("/scheduler/status", h.adminSchedulerStatus)
// User management
admin.POST("/users", h.createCustomUser)

View File

@ -9,6 +9,7 @@ import (
"account/internal/agentproto"
"account/internal/agentserver"
"account/internal/store"
"account/internal/xrayconfig"
)
@ -75,7 +76,7 @@ func (h *handler) listAgentUsers(c *gin.Context) {
if id != "" {
clients = append(clients, xrayconfig.Client{
ID: id,
Email: strings.TrimSpace(sandboxUser.Email),
Email: strings.TrimSpace(sandboxUser.ID),
Flow: xrayconfig.DefaultFlow,
})
}
@ -91,7 +92,7 @@ func (h *handler) listAgentUsers(c *gin.Context) {
}
clients = append(clients, xrayconfig.Client{
ID: id,
Email: strings.TrimSpace(u.Email),
Email: strings.TrimSpace(u.ID),
Flow: xrayconfig.DefaultFlow,
})
}
@ -143,6 +144,22 @@ func (h *handler) reportAgentStatus(c *gin.Context) {
// Ensure report uses the resolved agent id.
report.AgentID = identity.ID
h.agentRegistry.ReportStatus(identity, report)
if h.store != nil {
nodeID := strings.TrimSpace(report.Xray.NodeID)
if nodeID == "" {
nodeID = identity.ID
}
_ = h.store.UpsertNodeHealthSnapshot(c.Request.Context(), &store.NodeHealthSnapshot{
NodeID: nodeID,
Region: strings.TrimSpace(report.Xray.Region),
LineCode: strings.TrimSpace(report.Xray.LineCode),
PricingGroup: strings.TrimSpace(report.Xray.PricingGroup),
StatsEnabled: report.Xray.StatsEnabled,
XrayRevision: strings.TrimSpace(report.Xray.XrayRevision),
Healthy: report.Healthy,
SampledAt: time.Now().UTC(),
})
}
c.Status(http.StatusNoContent)
}

View File

@ -402,6 +402,8 @@ func RegisterRoutes(r *gin.Engine, opts ...Option) {
internalGroup.Use(auth.InternalAuthMiddleware())
internalGroup.GET("/public-overview", h.internalPublicOverview)
internalGroup.GET("/sandbox/guest", h.internalSandboxGuest)
internalGroup.GET("/policy/:accountUUID", h.internalAccountPolicy)
internalGroup.POST("/nodes/heartbeat", h.internalNodeHeartbeat)
// Public /api routes for admin/management (expected by frontend at /api/admin/...)
apiGroup := r.Group("/api")
@ -419,6 +421,12 @@ func RegisterRoutes(r *gin.Engine, opts ...Option) {
agentServerGroup.GET("/users", h.listAgentUsers)
agentServerGroup.POST("/status", h.reportAgentStatus)
accountGroup := r.Group("/api/account")
accountGroup.GET("/usage/summary", h.accountUsageSummary)
accountGroup.GET("/usage/buckets", h.accountUsageBuckets)
accountGroup.GET("/billing/summary", h.accountBillingSummary)
accountGroup.GET("/policy", h.accountPolicy)
// Legacy alias kept for backward compatibility.
agentGroup := r.Group("/api/agent")
agentGroup.GET("/nodes", h.listAgentNodes)

View File

@ -266,6 +266,10 @@ func TestAgentServerUsers_DefaultSyncIncludesSandboxAndRegularUsers(t *testing.T
}
// Ensure normal user is "expired" per proxy UUID expiry metadata.
sandbox, err := st.GetUserByEmail(ctx, "sandbox@svc.plus")
if err != nil {
t.Fatalf("get sandbox user: %v", err)
}
normal, err := st.GetUserByEmail(ctx, "user@example.com")
if err != nil {
t.Fatalf("get normal user: %v", err)
@ -314,10 +318,10 @@ func TestAgentServerUsers_DefaultSyncIncludesSandboxAndRegularUsers(t *testing.T
seenSandbox := false
seenNormal := false
for _, c := range payload.Clients {
if c.Email == "sandbox@svc.plus" && strings.TrimSpace(c.ID) != "" {
if c.Email == sandbox.ID && strings.TrimSpace(c.ID) != "" {
seenSandbox = true
}
if c.Email == "user@example.com" && strings.TrimSpace(c.ID) != "" {
if c.Email == normal.ID && strings.TrimSpace(c.ID) != "" {
seenNormal = true
}
}

View File

@ -120,23 +120,23 @@ func (h *handler) respondSyncConfigSnapshot(c *gin.Context) {
"vless_uri": vlessURI,
})
nodes = append(nodes, gin.H{
"id": nodeID,
"name": nodeName,
"display_name": nodeName,
"remark": nodeName,
"host": host,
"protocol": "vless",
"transport": "xhttp",
"security": "tls",
"address": host,
"port": 443,
"server_name": host,
"uuid": proxyUUID,
"flow": "",
"source": "server",
"country_code": countryCode,
"updated_at": updatedAt,
"vless_uri": vlessURI,
"id": nodeID,
"name": nodeName,
"display_name": nodeName,
"remark": nodeName,
"host": host,
"protocol": "vless",
"transport": "xhttp",
"security": "tls",
"address": host,
"port": 443,
"server_name": host,
"uuid": proxyUUID,
"flow": "",
"source": "server",
"country_code": countryCode,
"updated_at": updatedAt,
"vless_uri": vlessURI,
"uri_scheme_xhttp": vlessURI,
})
}
@ -227,7 +227,7 @@ func (h *handler) renderUserXrayConfig(user *store.User) (string, string, []stri
}
clients := []xrayconfig.Client{{
ID: clientID,
Email: strings.TrimSpace(user.Email),
Email: strings.TrimSpace(user.ID),
Flow: xrayconfig.DefaultFlow,
}}

View File

@ -0,0 +1,163 @@
# accounts.svc.plus Architecture
## Scope
`accounts.svc.plus` is the identity and account control service. It owns user authentication, session issuance, MFA, subscription state, admin settings, sandbox behavior, and agent registry endpoints.
## Architecture
```mermaid
flowchart TB
subgraph Public["Public auth surface"]
HZ["GET /healthz"]
REG["POST /api/auth/register\nPOST /api/auth/register/send\nPOST /api/auth/register/verify"]
LOGIN["POST /api/auth/login\nPOST /api/auth/mfa/verify\nPOST /api/auth/token/exchange\nGET /api/auth/oauth/login/:provider\nGET /api/auth/oauth/callback/:provider\nPOST /api/auth/token/refresh\nPOST /api/auth/refresh"]
PUBLIC["GET /api/auth/mfa/status\nGET /api/auth/sync/config\nPOST /api/auth/sync/ack\nGET /api/auth/homepage-video\nGET /api/auth/sandbox/binding"]
WEBHOOK["POST /api/billing/stripe/webhook"]
end
subgraph Session["Session protected"]
SESSION["GET /api/auth/session\nDELETE /api/auth/session\nGET/PUT /api/auth/xworkmate/profile"]
MFA["POST /api/auth/mfa/totp/provision\nPOST /api/auth/mfa/totp/verify\nPOST /api/auth/mfa/disable"]
RESET["POST /api/auth/password/reset\nPOST /api/auth/password/reset/confirm"]
BILLING["GET/POST /api/auth/subscriptions\nPOST /api/auth/subscriptions/cancel\nPOST /api/auth/stripe/checkout\nPOST /api/auth/stripe/portal"]
SYNC["POST /api/auth/config/sync"]
ADMINAUTH["GET/POST /api/auth/admin/settings\nGET/PUT /api/auth/admin/homepage-video"]
USEROPS["GET /api/auth/users\nGET/POST /api/auth/admin/users/metrics\nPOST /api/auth/admin/users\nPOST /api/auth/admin/users/:userId/role\nDELETE /api/auth/admin/users/:userId/role\nPOST /api/auth/admin/users/:userId/pause\nPOST /api/auth/admin/users/:userId/resume\nDELETE /api/auth/admin/users/:userId\nPOST /api/auth/admin/users/:userId/renew-uuid"]
ADMINMORE["POST /api/auth/admin/tenants/bootstrap\nGET/POST/DELETE /api/auth/admin/blacklist\nGET/POST /api/auth/admin/sandbox/binding\nPOST /api/auth/admin/sandbox/bind\nPOST /api/auth/admin/assume\nPOST /api/auth/admin/assume/revert\nGET /api/auth/admin/assume/status"]
end
subgraph Internal["Internal / agent surfaces"]
INTERNAL["GET /api/internal/public-overview\nGET /api/internal/sandbox/guest"]
AGENT["GET /api/agent-server/v1/users\nPOST /api/agent-server/v1/status\nGET /api/agent/nodes (legacy)"]
end
subgraph DB["PostgreSQL"]
USERS[("users")]
IDS[("identities")]
SESSIONS[("sessions")]
SUBS[("subscriptions")]
SETTINGS[("admin_settings")]
ROLES[("rbac_roles")]
PERMS[("rbac_permissions")]
ROLEPERMS[("rbac_role_permissions")]
BLACKLIST[("email_blacklist")]
AGENTS[("agents")]
NODES[("nodes")]
end
REG --> USERS
LOGIN --> USERS
LOGIN --> IDS
SESSION --> SESSIONS
MFA --> USERS
RESET --> USERS
BILLING --> SUBS
ADMINAUTH --> SETTINGS
USEROPS --> USERS
USEROPS --> ROLES
USEROPS --> PERMS
USEROPS --> ROLEPERMS
ADMINMORE --> USERS
ADMINMORE --> BLACKLIST
ADMINMORE --> NODES
AGENT --> USERS
AGENT --> AGENTS
AGENT --> NODES
INTERNAL --> USERS
WEBHOOK --> SUBS
```
## API Matrix
### Public auth and bootstrapping
| Name | Path | Purpose | Database / table | Auth mode |
| --- | --- | --- | --- | --- |
| Health | `/healthz` | Liveness check | N/A | none |
| Register | `/api/auth/register` | Create new account | `users`, `identities` | none |
| Register email | `/api/auth/register/send` | Send registration verification email | `users` | none |
| Register verify | `/api/auth/register/verify` | Confirm registration code / email | `users` | none |
| Login | `/api/auth/login` | Issue session token or MFA challenge | `users`, `sessions` | none |
| MFA verify | `/api/auth/mfa/verify` | Finish MFA login | `users`, `sessions` | none |
| OAuth login / callback | `/api/auth/oauth/login/:provider`, `/api/auth/oauth/callback/:provider` | OAuth entry and callback | `users`, `identities`, `sessions` | none |
| Token exchange | `/api/auth/token/exchange` | One-time OAuth exchange to session token | `sessions` | none |
| Token refresh | `/api/auth/token/refresh`, `/api/auth/refresh` | Refresh access token | `sessions` | bearer/refresh token |
| MFA status | `/api/auth/mfa/status` | Check whether MFA is required | `users` | none |
| Sync config | `/api/auth/sync/config`, `/api/auth/sync/ack` | Sync configuration snapshot / ack | `users`, `admin_settings` | none |
| Public homepage video | `/api/auth/homepage-video` | Read public homepage video config | `admin_settings` | none |
| Sandbox binding public read | `/api/auth/sandbox/binding` | Read sandbox node binding for guest/demo | `nodes`, `users` | none or session or internal token |
### Session protected account actions
| Name | Path | Purpose | Database / table | Auth mode |
| --- | --- | --- | --- | --- |
| Session | `/api/auth/session` | Read current session user | `sessions`, `users` | `Authorization: Bearer <session token>` |
| Logout | `DELETE /api/auth/session` | Revoke local session cookie / token | `sessions` | session token |
| XWorkmate profile | `/api/auth/xworkmate/profile` | Read/update per-user profile settings | `users`, `xworkmate_profiles` (migration-backed) | session token |
| MFA setup | `/api/auth/mfa/totp/provision`, `/api/auth/mfa/totp/verify`, `/api/auth/mfa/disable` | TOTP setup and disable | `users` | session token |
| Password reset | `/api/auth/password/reset`, `/api/auth/password/reset/confirm` | Reset password flow | `users` | session token |
| Subscriptions | `/api/auth/subscriptions`, `/api/auth/subscriptions/cancel` | Manage subscription records | `subscriptions`, `users` | session token |
| Stripe checkout / portal | `/api/auth/stripe/checkout`, `/api/auth/stripe/portal` | Open Stripe checkout and customer portal | `subscriptions`, `users` | session token |
| Config sync | `/api/auth/config/sync` | Push client config state | `users`, `admin_settings` | session token |
### Admin and operator actions
| Name | Path | Purpose | Database / table | Auth mode |
| --- | --- | --- | --- | --- |
| Admin settings | `/api/auth/admin/settings` | Read / update permission matrix | `admin_settings`, `rbac_*` | session token + admin/operator |
| Homepage video admin | `/api/auth/admin/homepage-video` | Update homepage video settings | `admin_settings` | session token + admin/operator |
| User metrics | `/api/auth/admin/users/metrics` | Count and summary of users | `users` | session token + admin/operator |
| Create user | `/api/auth/admin/users` | Create custom user | `users`, `sessions` | session token + admin/operator |
| Update role | `/api/auth/admin/users/:userId/role` | Change a user role | `users` | session token + admin/operator |
| Reset role | `DELETE /api/auth/admin/users/:userId/role` | Reset a user role | `users` | session token + admin/operator |
| Pause / resume | `/api/auth/admin/users/:userId/pause`, `/api/auth/admin/users/:userId/resume` | Disable / re-enable user | `users` | session token + admin/operator |
| Delete user | `DELETE /api/auth/admin/users/:userId` | Delete user | `users`, `sessions`, `identities` | session token + admin/operator |
| Renew proxy UUID | `/api/auth/admin/users/:userId/renew-uuid` | Rotate proxy UUID | `users` | session token + admin/operator |
| Tenant bootstrap | `/api/auth/admin/tenants/bootstrap` | Bootstrap tenant records | `tenants`, `tenant_memberships` (migration-backed) | session token + admin/operator |
| Blacklist | `/api/auth/admin/blacklist`, `/api/auth/admin/blacklist/:email` | Manage blocked email list | `email_blacklist` | session token + admin/operator |
| Sandbox binding admin | `/api/auth/admin/sandbox/binding`, `/api/auth/admin/sandbox/bind` | Bind sandbox node | `nodes`, `users` | session token + root/admin rules |
| Assume sandbox | `/api/auth/admin/assume`, `/api/auth/admin/assume/revert`, `/api/auth/admin/assume/status` | Switch to / from sandbox identity | `sessions`, `users` | session token + root-only allowlist |
| Users list | `/api/auth/users` | List users | `users` | session token + auth middleware |
### Internal and agent endpoints
| Name | Path | Purpose | Database / table | Auth mode |
| --- | --- | --- | --- | --- |
| Internal overview | `/api/internal/public-overview` | Public overview for internal clients | `users`, `admin_settings` | `X-Service-Token` |
| Sandbox guest | `/api/internal/sandbox/guest` | Return guest/demo identity snapshot | `users`, `nodes`, `sessions` | `X-Service-Token` |
| Agent users | `/api/agent-server/v1/users` | Provide Xray client list to agent runtime | `users`, `agents`, `nodes` | `Authorization: Bearer <agent token>` + optional `X-Agent-ID` |
| Agent status | `/api/agent-server/v1/status` | Receive agent heartbeat and sync status | `agents`, `nodes` | `Authorization: Bearer <agent token>` + optional `X-Agent-ID` |
| Legacy agent alias | `/api/agent/nodes` | Backward-compatible alias for node list | `nodes` | same as agent-server |
### Webhook
| Name | Path | Purpose | Database / table | Auth mode |
| --- | --- | --- | --- | --- |
| Stripe webhook | `/api/billing/stripe/webhook` | Receive Stripe events | `subscriptions` | Stripe signature verification |
## Data Model Notes
- `users` is the main identity record and also stores roles, groups, permissions, MFA state, proxy UUID, and activity flags.
- `identities` links OAuth providers to users.
- `sessions` stores session tokens with expiry.
- `subscriptions` stores billing state and provider references.
- `admin_settings` stores a versioned permission matrix.
- `rbac_roles`, `rbac_permissions`, and `rbac_role_permissions` store the RBAC catalog.
- `agents` stores agent runtime status snapshots.
- `nodes` stores proxy / node inventory.
- `email_blacklist` stores blocked addresses.
- The codebase also defines migration-backed tenant / XWorkmate models such as `tenants`, `tenant_domains`, `tenant_memberships`, and `xworkmate_profiles`; those tables are part of the broader account domain even though they are not declared in the checked-in `sql/schema.sql` snapshot.
## Auth Layers
1. Public routes do not require a session token.
2. Session protected routes require `Authorization: Bearer <session token>` and active-user validation.
3. Admin routes add role / permission checks on top of session auth.
4. Internal routes require `X-Service-Token`.
5. Agent routes accept agent credential tokens and optional `X-Agent-ID`.
## Notes
- Some frontend BFF endpoints use the same account service backend but shape cookies and auth state for browser usage.
- `api/agent-server/v1/*` is the canonical route family for the console and agent runtime.

View File

@ -28,8 +28,14 @@ type StatusReport struct {
// XrayStatus describes the synchronisation state of the managed Xray process.
type XrayStatus struct {
Running bool `json:"running"`
Clients int `json:"clients"`
LastSync *time.Time `json:"lastSync,omitempty"`
ConfigHash string `json:"configHash,omitempty"`
Running bool `json:"running"`
Clients int `json:"clients"`
LastSync *time.Time `json:"lastSync,omitempty"`
ConfigHash string `json:"configHash,omitempty"`
NodeID string `json:"nodeId,omitempty"`
Region string `json:"region,omitempty"`
LineCode string `json:"lineCode,omitempty"`
PricingGroup string `json:"pricingGroup,omitempty"`
StatsEnabled bool `json:"statsEnabled"`
XrayRevision string `json:"xrayRevision,omitempty"`
}

View File

@ -0,0 +1,376 @@
package store
import (
"context"
"errors"
"fmt"
"sort"
"strings"
"time"
"github.com/google/uuid"
)
func checkpointKey(nodeID, accountUUID string) string {
return strings.TrimSpace(nodeID) + "::" + strings.TrimSpace(accountUUID)
}
func bucketKey(bucketStart time.Time, nodeID, accountUUID, region, lineCode string) string {
return fmt.Sprintf("%s::%s::%s::%s::%s",
bucketStart.UTC().Format(time.RFC3339),
strings.TrimSpace(nodeID),
strings.TrimSpace(accountUUID),
strings.TrimSpace(region),
strings.TrimSpace(lineCode),
)
}
func cloneCheckpoint(src *TrafficStatCheckpoint) *TrafficStatCheckpoint {
if src == nil {
return nil
}
copy := *src
return &copy
}
func cloneBucket(src *TrafficMinuteBucket) *TrafficMinuteBucket {
if src == nil {
return nil
}
copy := *src
return &copy
}
func cloneLedgerEntry(src *BillingLedgerEntry) *BillingLedgerEntry {
if src == nil {
return nil
}
copy := *src
return &copy
}
func cloneQuotaState(src *AccountQuotaState) *AccountQuotaState {
if src == nil {
return nil
}
copy := *src
if src.LastRatedBucketAt != nil {
last := src.LastRatedBucketAt.UTC()
copy.LastRatedBucketAt = &last
}
return &copy
}
func clonePolicySnapshot(src *AccountPolicySnapshot) *AccountPolicySnapshot {
if src == nil {
return nil
}
copy := *src
copy.EligibleNodeGroups = cloneStringSlice(src.EligibleNodeGroups)
return &copy
}
func cloneNodeHealthSnapshot(src *NodeHealthSnapshot) *NodeHealthSnapshot {
if src == nil {
return nil
}
copy := *src
return &copy
}
func cloneSchedulerDecision(src *SchedulerDecision) *SchedulerDecision {
if src == nil {
return nil
}
copy := *src
return &copy
}
func (s *memoryStore) UpsertTrafficStatCheckpoint(ctx context.Context, checkpoint *TrafficStatCheckpoint) error {
_ = ctx
s.mu.Lock()
defer s.mu.Unlock()
copy := cloneCheckpoint(checkpoint)
if copy == nil {
return errors.New("checkpoint is required")
}
now := time.Now().UTC()
if copy.CreatedAt.IsZero() {
copy.CreatedAt = now
}
copy.UpdatedAt = now
s.trafficStatCheckpoints[checkpointKey(copy.NodeID, copy.AccountUUID)] = copy
return nil
}
func (s *memoryStore) GetTrafficStatCheckpoint(ctx context.Context, nodeID, accountUUID string) (*TrafficStatCheckpoint, error) {
_ = ctx
s.mu.RLock()
defer s.mu.RUnlock()
record, ok := s.trafficStatCheckpoints[checkpointKey(nodeID, accountUUID)]
if !ok {
return nil, ErrUserNotFound
}
return cloneCheckpoint(record), nil
}
func (s *memoryStore) ListTrafficStatCheckpoints(ctx context.Context) ([]TrafficStatCheckpoint, error) {
_ = ctx
s.mu.RLock()
defer s.mu.RUnlock()
result := make([]TrafficStatCheckpoint, 0, len(s.trafficStatCheckpoints))
for _, record := range s.trafficStatCheckpoints {
result = append(result, *cloneCheckpoint(record))
}
sort.Slice(result, func(i, j int) bool {
if result[i].NodeID == result[j].NodeID {
return result[i].AccountUUID < result[j].AccountUUID
}
return result[i].NodeID < result[j].NodeID
})
return result, nil
}
func (s *memoryStore) UpsertTrafficMinuteBucket(ctx context.Context, bucket *TrafficMinuteBucket) error {
_ = ctx
s.mu.Lock()
defer s.mu.Unlock()
copy := cloneBucket(bucket)
if copy == nil {
return errors.New("bucket is required")
}
now := time.Now().UTC()
if copy.CreatedAt.IsZero() {
copy.CreatedAt = now
}
copy.UpdatedAt = now
if strings.TrimSpace(copy.RatingStatus) == "" {
copy.RatingStatus = RatingStatusPending
}
s.trafficMinuteBuckets[bucketKey(copy.BucketStart, copy.NodeID, copy.AccountUUID, copy.Region, copy.LineCode)] = copy
return nil
}
func (s *memoryStore) ListTrafficMinuteBucketsByAccount(ctx context.Context, accountUUID string, start, end time.Time) ([]TrafficMinuteBucket, error) {
_ = ctx
s.mu.RLock()
defer s.mu.RUnlock()
result := make([]TrafficMinuteBucket, 0)
for _, bucket := range s.trafficMinuteBuckets {
if bucket.AccountUUID != strings.TrimSpace(accountUUID) {
continue
}
if !start.IsZero() && bucket.BucketStart.Before(start) {
continue
}
if !end.IsZero() && bucket.BucketStart.After(end) {
continue
}
result = append(result, *cloneBucket(bucket))
}
sort.Slice(result, func(i, j int) bool {
return result[i].BucketStart.Before(result[j].BucketStart)
})
return result, nil
}
func (s *memoryStore) ListTrafficMinuteBuckets(ctx context.Context) ([]TrafficMinuteBucket, error) {
_ = ctx
s.mu.RLock()
defer s.mu.RUnlock()
result := make([]TrafficMinuteBucket, 0, len(s.trafficMinuteBuckets))
for _, bucket := range s.trafficMinuteBuckets {
result = append(result, *cloneBucket(bucket))
}
sort.Slice(result, func(i, j int) bool {
return result[i].BucketStart.Before(result[j].BucketStart)
})
return result, nil
}
func (s *memoryStore) InsertBillingLedgerEntry(ctx context.Context, entry *BillingLedgerEntry) error {
_ = ctx
s.mu.Lock()
defer s.mu.Unlock()
copy := cloneLedgerEntry(entry)
if copy == nil {
return errors.New("ledger entry is required")
}
now := time.Now().UTC()
if strings.TrimSpace(copy.ID) == "" {
copy.ID = uuid.NewString()
}
if copy.CreatedAt.IsZero() {
copy.CreatedAt = now
}
s.billingLedgerEntries[copy.ID] = copy
entry.ID = copy.ID
entry.CreatedAt = copy.CreatedAt
return nil
}
func (s *memoryStore) ListBillingLedgerByAccount(ctx context.Context, accountUUID string, limit int) ([]BillingLedgerEntry, error) {
_ = ctx
s.mu.RLock()
defer s.mu.RUnlock()
result := make([]BillingLedgerEntry, 0)
for _, entry := range s.billingLedgerEntries {
if entry.AccountUUID == strings.TrimSpace(accountUUID) {
result = append(result, *cloneLedgerEntry(entry))
}
}
sort.Slice(result, func(i, j int) bool {
return result[i].CreatedAt.After(result[j].CreatedAt)
})
if limit > 0 && len(result) > limit {
result = result[:limit]
}
return result, nil
}
func (s *memoryStore) UpsertAccountQuotaState(ctx context.Context, state *AccountQuotaState) error {
_ = ctx
s.mu.Lock()
defer s.mu.Unlock()
copy := cloneQuotaState(state)
if copy == nil {
return errors.New("quota state is required")
}
copy.UpdatedAt = time.Now().UTC()
if copy.EffectiveAt.IsZero() {
copy.EffectiveAt = copy.UpdatedAt
}
s.accountQuotaStates[strings.TrimSpace(copy.AccountUUID)] = copy
return nil
}
func (s *memoryStore) GetAccountQuotaState(ctx context.Context, accountUUID string) (*AccountQuotaState, error) {
_ = ctx
s.mu.RLock()
defer s.mu.RUnlock()
record, ok := s.accountQuotaStates[strings.TrimSpace(accountUUID)]
if !ok {
return nil, ErrUserNotFound
}
return cloneQuotaState(record), nil
}
func (s *memoryStore) UpsertAccountPolicySnapshot(ctx context.Context, snapshot *AccountPolicySnapshot) error {
_ = ctx
s.mu.Lock()
defer s.mu.Unlock()
copy := clonePolicySnapshot(snapshot)
if copy == nil {
return errors.New("policy snapshot is required")
}
now := time.Now().UTC()
if copy.CreatedAt.IsZero() {
copy.CreatedAt = now
}
copy.UpdatedAt = now
s.accountPolicySnapshots[strings.TrimSpace(copy.AccountUUID)] = copy
return nil
}
func (s *memoryStore) GetLatestAccountPolicySnapshot(ctx context.Context, accountUUID string) (*AccountPolicySnapshot, error) {
_ = ctx
s.mu.RLock()
defer s.mu.RUnlock()
record, ok := s.accountPolicySnapshots[strings.TrimSpace(accountUUID)]
if !ok {
return nil, ErrUserNotFound
}
return clonePolicySnapshot(record), nil
}
func (s *memoryStore) UpsertNodeHealthSnapshot(ctx context.Context, snapshot *NodeHealthSnapshot) error {
_ = ctx
s.mu.Lock()
defer s.mu.Unlock()
copy := cloneNodeHealthSnapshot(snapshot)
if copy == nil {
return errors.New("node health snapshot is required")
}
now := time.Now().UTC()
if copy.CreatedAt.IsZero() {
copy.CreatedAt = now
}
copy.UpdatedAt = now
if copy.SampledAt.IsZero() {
copy.SampledAt = now
}
s.nodeHealthSnapshots[strings.TrimSpace(copy.NodeID)] = copy
return nil
}
func (s *memoryStore) ListLatestNodeHealthSnapshots(ctx context.Context) ([]NodeHealthSnapshot, error) {
_ = ctx
s.mu.RLock()
defer s.mu.RUnlock()
result := make([]NodeHealthSnapshot, 0, len(s.nodeHealthSnapshots))
for _, snapshot := range s.nodeHealthSnapshots {
result = append(result, *cloneNodeHealthSnapshot(snapshot))
}
sort.Slice(result, func(i, j int) bool {
return result[i].NodeID < result[j].NodeID
})
return result, nil
}
func (s *memoryStore) InsertSchedulerDecision(ctx context.Context, decision *SchedulerDecision) error {
_ = ctx
s.mu.Lock()
defer s.mu.Unlock()
copy := cloneSchedulerDecision(decision)
if copy == nil {
return errors.New("scheduler decision is required")
}
now := time.Now().UTC()
if strings.TrimSpace(copy.ID) == "" {
copy.ID = uuid.NewString()
}
if copy.GeneratedAt.IsZero() {
copy.GeneratedAt = now
}
if copy.CreatedAt.IsZero() {
copy.CreatedAt = now
}
s.schedulerDecisions[copy.ID] = copy
decision.ID = copy.ID
decision.GeneratedAt = copy.GeneratedAt
decision.CreatedAt = copy.CreatedAt
return nil
}
func (s *memoryStore) ListRecentSchedulerDecisions(ctx context.Context, limit int) ([]SchedulerDecision, error) {
_ = ctx
s.mu.RLock()
defer s.mu.RUnlock()
result := make([]SchedulerDecision, 0, len(s.schedulerDecisions))
for _, decision := range s.schedulerDecisions {
result = append(result, *cloneSchedulerDecision(decision))
}
sort.Slice(result, func(i, j int) bool {
return result[i].GeneratedAt.After(result[j].GeneratedAt)
})
if limit > 0 && len(result) > limit {
result = result[:limit]
}
return result, nil
}

View File

@ -0,0 +1,563 @@
package store
import (
"context"
"database/sql"
"errors"
"strconv"
"strings"
"time"
"github.com/google/uuid"
)
func (s *postgresStore) UpsertTrafficStatCheckpoint(ctx context.Context, checkpoint *TrafficStatCheckpoint) error {
if checkpoint == nil {
return errors.New("checkpoint is required")
}
const query = `
INSERT INTO traffic_stat_checkpoints (
node_id, account_uuid, last_uplink_total, last_downlink_total, last_seen_at, xray_revision, reset_epoch
) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (node_id, account_uuid) DO UPDATE SET
last_uplink_total = EXCLUDED.last_uplink_total,
last_downlink_total = EXCLUDED.last_downlink_total,
last_seen_at = EXCLUDED.last_seen_at,
xray_revision = EXCLUDED.xray_revision,
reset_epoch = EXCLUDED.reset_epoch,
updated_at = now()
RETURNING created_at, updated_at`
return s.db.QueryRowContext(
ctx,
query,
strings.TrimSpace(checkpoint.NodeID),
strings.TrimSpace(checkpoint.AccountUUID),
checkpoint.LastUplinkTotal,
checkpoint.LastDownlinkTotal,
checkpoint.LastSeenAt.UTC(),
strings.TrimSpace(checkpoint.XrayRevision),
checkpoint.ResetEpoch,
).Scan(&checkpoint.CreatedAt, &checkpoint.UpdatedAt)
}
func (s *postgresStore) GetTrafficStatCheckpoint(ctx context.Context, nodeID, accountUUID string) (*TrafficStatCheckpoint, error) {
const query = `
SELECT node_id, account_uuid, last_uplink_total, last_downlink_total, last_seen_at, xray_revision, reset_epoch, created_at, updated_at
FROM traffic_stat_checkpoints
WHERE node_id = $1 AND account_uuid = $2`
var record TrafficStatCheckpoint
err := s.db.QueryRowContext(ctx, query, strings.TrimSpace(nodeID), strings.TrimSpace(accountUUID)).Scan(
&record.NodeID,
&record.AccountUUID,
&record.LastUplinkTotal,
&record.LastDownlinkTotal,
&record.LastSeenAt,
&record.XrayRevision,
&record.ResetEpoch,
&record.CreatedAt,
&record.UpdatedAt,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrUserNotFound
}
return nil, err
}
return &record, nil
}
func (s *postgresStore) ListTrafficStatCheckpoints(ctx context.Context) ([]TrafficStatCheckpoint, error) {
const query = `
SELECT node_id, account_uuid, last_uplink_total, last_downlink_total, last_seen_at, xray_revision, reset_epoch, created_at, updated_at
FROM traffic_stat_checkpoints
ORDER BY node_id ASC, account_uuid ASC`
rows, err := s.db.QueryContext(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()
var result []TrafficStatCheckpoint
for rows.Next() {
var record TrafficStatCheckpoint
if err := rows.Scan(
&record.NodeID,
&record.AccountUUID,
&record.LastUplinkTotal,
&record.LastDownlinkTotal,
&record.LastSeenAt,
&record.XrayRevision,
&record.ResetEpoch,
&record.CreatedAt,
&record.UpdatedAt,
); err != nil {
return nil, err
}
result = append(result, record)
}
return result, rows.Err()
}
func (s *postgresStore) UpsertTrafficMinuteBucket(ctx context.Context, bucket *TrafficMinuteBucket) error {
if bucket == nil {
return errors.New("bucket is required")
}
status := strings.TrimSpace(bucket.RatingStatus)
if status == "" {
status = RatingStatusPending
}
const query = `
INSERT INTO traffic_minute_buckets (
bucket_start, node_id, account_uuid, region, line_code, uplink_bytes, downlink_bytes, total_bytes, multiplier, rating_status, source_revision
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (bucket_start, node_id, account_uuid, region, line_code) DO UPDATE SET
uplink_bytes = EXCLUDED.uplink_bytes,
downlink_bytes = EXCLUDED.downlink_bytes,
total_bytes = EXCLUDED.total_bytes,
multiplier = EXCLUDED.multiplier,
rating_status = EXCLUDED.rating_status,
source_revision = EXCLUDED.source_revision,
updated_at = now()
RETURNING created_at, updated_at`
return s.db.QueryRowContext(
ctx,
query,
bucket.BucketStart.UTC(),
strings.TrimSpace(bucket.NodeID),
strings.TrimSpace(bucket.AccountUUID),
strings.TrimSpace(bucket.Region),
strings.TrimSpace(bucket.LineCode),
bucket.UplinkBytes,
bucket.DownlinkBytes,
bucket.TotalBytes,
bucket.Multiplier,
status,
strings.TrimSpace(bucket.SourceRevision),
).Scan(&bucket.CreatedAt, &bucket.UpdatedAt)
}
func (s *postgresStore) ListTrafficMinuteBucketsByAccount(ctx context.Context, accountUUID string, start, end time.Time) ([]TrafficMinuteBucket, error) {
query := `
SELECT bucket_start, node_id, account_uuid, region, line_code, uplink_bytes, downlink_bytes, total_bytes, multiplier, rating_status, source_revision, created_at, updated_at
FROM traffic_minute_buckets
WHERE account_uuid = $1`
args := []any{strings.TrimSpace(accountUUID)}
if !start.IsZero() {
query += " AND bucket_start >= $2"
args = append(args, start.UTC())
}
if !end.IsZero() {
query += " AND bucket_start <= $" + strconv.Itoa(len(args)+1)
args = append(args, end.UTC())
}
query += " ORDER BY bucket_start ASC"
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var result []TrafficMinuteBucket
for rows.Next() {
var record TrafficMinuteBucket
if err := rows.Scan(
&record.BucketStart,
&record.NodeID,
&record.AccountUUID,
&record.Region,
&record.LineCode,
&record.UplinkBytes,
&record.DownlinkBytes,
&record.TotalBytes,
&record.Multiplier,
&record.RatingStatus,
&record.SourceRevision,
&record.CreatedAt,
&record.UpdatedAt,
); err != nil {
return nil, err
}
result = append(result, record)
}
return result, rows.Err()
}
func (s *postgresStore) ListTrafficMinuteBuckets(ctx context.Context) ([]TrafficMinuteBucket, error) {
const query = `
SELECT bucket_start, node_id, account_uuid, region, line_code, uplink_bytes, downlink_bytes, total_bytes, multiplier, rating_status, source_revision, created_at, updated_at
FROM traffic_minute_buckets
ORDER BY bucket_start ASC`
rows, err := s.db.QueryContext(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()
var result []TrafficMinuteBucket
for rows.Next() {
var record TrafficMinuteBucket
if err := rows.Scan(
&record.BucketStart,
&record.NodeID,
&record.AccountUUID,
&record.Region,
&record.LineCode,
&record.UplinkBytes,
&record.DownlinkBytes,
&record.TotalBytes,
&record.Multiplier,
&record.RatingStatus,
&record.SourceRevision,
&record.CreatedAt,
&record.UpdatedAt,
); err != nil {
return nil, err
}
result = append(result, record)
}
return result, rows.Err()
}
func (s *postgresStore) InsertBillingLedgerEntry(ctx context.Context, entry *BillingLedgerEntry) error {
if entry == nil {
return errors.New("ledger entry is required")
}
if strings.TrimSpace(entry.ID) == "" {
entry.ID = uuid.NewString()
}
const query = `
INSERT INTO billing_ledger (
id, account_uuid, bucket_start, bucket_end, entry_type, rated_bytes, amount_delta, balance_after, pricing_rule_version
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
RETURNING created_at`
return s.db.QueryRowContext(
ctx,
query,
entry.ID,
strings.TrimSpace(entry.AccountUUID),
entry.BucketStart.UTC(),
entry.BucketEnd.UTC(),
strings.TrimSpace(entry.EntryType),
entry.RatedBytes,
entry.AmountDelta,
entry.BalanceAfter,
strings.TrimSpace(entry.PricingRuleVersion),
).Scan(&entry.CreatedAt)
}
func (s *postgresStore) ListBillingLedgerByAccount(ctx context.Context, accountUUID string, limit int) ([]BillingLedgerEntry, error) {
query := `
SELECT id, account_uuid, bucket_start, bucket_end, entry_type, rated_bytes, amount_delta, balance_after, pricing_rule_version, created_at
FROM billing_ledger
WHERE account_uuid = $1
ORDER BY created_at DESC`
args := []any{strings.TrimSpace(accountUUID)}
if limit > 0 {
query += " LIMIT $2"
args = append(args, limit)
}
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var result []BillingLedgerEntry
for rows.Next() {
var entry BillingLedgerEntry
if err := rows.Scan(
&entry.ID,
&entry.AccountUUID,
&entry.BucketStart,
&entry.BucketEnd,
&entry.EntryType,
&entry.RatedBytes,
&entry.AmountDelta,
&entry.BalanceAfter,
&entry.PricingRuleVersion,
&entry.CreatedAt,
); err != nil {
return nil, err
}
result = append(result, entry)
}
return result, rows.Err()
}
func (s *postgresStore) UpsertAccountQuotaState(ctx context.Context, state *AccountQuotaState) error {
if state == nil {
return errors.New("quota state is required")
}
const query = `
INSERT INTO account_quota_states (
account_uuid, remaining_included_quota, current_balance, arrears, throttle_state, suspend_state, last_rated_bucket_at, effective_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (account_uuid) DO UPDATE SET
remaining_included_quota = EXCLUDED.remaining_included_quota,
current_balance = EXCLUDED.current_balance,
arrears = EXCLUDED.arrears,
throttle_state = EXCLUDED.throttle_state,
suspend_state = EXCLUDED.suspend_state,
last_rated_bucket_at = EXCLUDED.last_rated_bucket_at,
effective_at = EXCLUDED.effective_at,
updated_at = now()
RETURNING updated_at`
return s.db.QueryRowContext(
ctx,
query,
strings.TrimSpace(state.AccountUUID),
state.RemainingIncludedQuota,
state.CurrentBalance,
state.Arrears,
strings.TrimSpace(state.ThrottleState),
strings.TrimSpace(state.SuspendState),
state.LastRatedBucketAt,
state.EffectiveAt.UTC(),
).Scan(&state.UpdatedAt)
}
func (s *postgresStore) GetAccountQuotaState(ctx context.Context, accountUUID string) (*AccountQuotaState, error) {
const query = `
SELECT account_uuid, remaining_included_quota, current_balance, arrears, throttle_state, suspend_state, last_rated_bucket_at, effective_at, updated_at
FROM account_quota_states
WHERE account_uuid = $1`
var state AccountQuotaState
err := s.db.QueryRowContext(ctx, query, strings.TrimSpace(accountUUID)).Scan(
&state.AccountUUID,
&state.RemainingIncludedQuota,
&state.CurrentBalance,
&state.Arrears,
&state.ThrottleState,
&state.SuspendState,
&state.LastRatedBucketAt,
&state.EffectiveAt,
&state.UpdatedAt,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrUserNotFound
}
return nil, err
}
return &state, nil
}
func (s *postgresStore) UpsertAccountPolicySnapshot(ctx context.Context, snapshot *AccountPolicySnapshot) error {
if snapshot == nil {
return errors.New("policy snapshot is required")
}
groups, err := encodeStringSlice(snapshot.EligibleNodeGroups)
if err != nil {
return err
}
const query = `
INSERT INTO account_policy_snapshots (
account_uuid, policy_version, auth_state, rate_profile, conn_profile, eligible_node_groups, preferred_strategy, degrade_mode, expires_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (account_uuid) DO UPDATE SET
policy_version = EXCLUDED.policy_version,
auth_state = EXCLUDED.auth_state,
rate_profile = EXCLUDED.rate_profile,
conn_profile = EXCLUDED.conn_profile,
eligible_node_groups = EXCLUDED.eligible_node_groups,
preferred_strategy = EXCLUDED.preferred_strategy,
degrade_mode = EXCLUDED.degrade_mode,
expires_at = EXCLUDED.expires_at,
updated_at = now()
RETURNING created_at, updated_at`
return s.db.QueryRowContext(
ctx,
query,
strings.TrimSpace(snapshot.AccountUUID),
strings.TrimSpace(snapshot.PolicyVersion),
strings.TrimSpace(snapshot.AuthState),
strings.TrimSpace(snapshot.RateProfile),
strings.TrimSpace(snapshot.ConnProfile),
groups,
strings.TrimSpace(snapshot.PreferredStrategy),
strings.TrimSpace(snapshot.DegradeMode),
snapshot.ExpiresAt.UTC(),
).Scan(&snapshot.CreatedAt, &snapshot.UpdatedAt)
}
func (s *postgresStore) GetLatestAccountPolicySnapshot(ctx context.Context, accountUUID string) (*AccountPolicySnapshot, error) {
const query = `
SELECT account_uuid, policy_version, auth_state, rate_profile, conn_profile, eligible_node_groups, preferred_strategy, degrade_mode, expires_at, created_at, updated_at
FROM account_policy_snapshots
WHERE account_uuid = $1`
var snapshot AccountPolicySnapshot
var groups []byte
err := s.db.QueryRowContext(ctx, query, strings.TrimSpace(accountUUID)).Scan(
&snapshot.AccountUUID,
&snapshot.PolicyVersion,
&snapshot.AuthState,
&snapshot.RateProfile,
&snapshot.ConnProfile,
&groups,
&snapshot.PreferredStrategy,
&snapshot.DegradeMode,
&snapshot.ExpiresAt,
&snapshot.CreatedAt,
&snapshot.UpdatedAt,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrUserNotFound
}
return nil, err
}
snapshot.EligibleNodeGroups = decodeStringSlice(groups)
return &snapshot, nil
}
func (s *postgresStore) UpsertNodeHealthSnapshot(ctx context.Context, snapshot *NodeHealthSnapshot) error {
if snapshot == nil {
return errors.New("node health snapshot is required")
}
const query = `
INSERT INTO node_health_snapshots (
node_id, region, line_code, pricing_group, stats_enabled, xray_revision, healthy, latency_ms, error_rate, active_connections, health_score, sampled_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
ON CONFLICT (node_id) DO UPDATE SET
region = EXCLUDED.region,
line_code = EXCLUDED.line_code,
pricing_group = EXCLUDED.pricing_group,
stats_enabled = EXCLUDED.stats_enabled,
xray_revision = EXCLUDED.xray_revision,
healthy = EXCLUDED.healthy,
latency_ms = EXCLUDED.latency_ms,
error_rate = EXCLUDED.error_rate,
active_connections = EXCLUDED.active_connections,
health_score = EXCLUDED.health_score,
sampled_at = EXCLUDED.sampled_at,
updated_at = now()
RETURNING created_at, updated_at`
return s.db.QueryRowContext(
ctx,
query,
strings.TrimSpace(snapshot.NodeID),
strings.TrimSpace(snapshot.Region),
strings.TrimSpace(snapshot.LineCode),
strings.TrimSpace(snapshot.PricingGroup),
snapshot.StatsEnabled,
strings.TrimSpace(snapshot.XrayRevision),
snapshot.Healthy,
snapshot.LatencyMS,
snapshot.ErrorRate,
snapshot.ActiveConnections,
snapshot.HealthScore,
snapshot.SampledAt.UTC(),
).Scan(&snapshot.CreatedAt, &snapshot.UpdatedAt)
}
func (s *postgresStore) ListLatestNodeHealthSnapshots(ctx context.Context) ([]NodeHealthSnapshot, error) {
const query = `
SELECT node_id, region, line_code, pricing_group, stats_enabled, xray_revision, healthy, latency_ms, error_rate, active_connections, health_score, sampled_at, created_at, updated_at
FROM node_health_snapshots
ORDER BY node_id ASC`
rows, err := s.db.QueryContext(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()
var result []NodeHealthSnapshot
for rows.Next() {
var snapshot NodeHealthSnapshot
if err := rows.Scan(
&snapshot.NodeID,
&snapshot.Region,
&snapshot.LineCode,
&snapshot.PricingGroup,
&snapshot.StatsEnabled,
&snapshot.XrayRevision,
&snapshot.Healthy,
&snapshot.LatencyMS,
&snapshot.ErrorRate,
&snapshot.ActiveConnections,
&snapshot.HealthScore,
&snapshot.SampledAt,
&snapshot.CreatedAt,
&snapshot.UpdatedAt,
); err != nil {
return nil, err
}
result = append(result, snapshot)
}
return result, rows.Err()
}
func (s *postgresStore) InsertSchedulerDecision(ctx context.Context, decision *SchedulerDecision) error {
if decision == nil {
return errors.New("scheduler decision is required")
}
if strings.TrimSpace(decision.ID) == "" {
decision.ID = uuid.NewString()
}
const query = `
INSERT INTO scheduler_decisions (id, account_uuid, node_group, strategy, decision, generated_at)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING created_at`
return s.db.QueryRowContext(
ctx,
query,
decision.ID,
strings.TrimSpace(decision.AccountUUID),
strings.TrimSpace(decision.NodeGroup),
strings.TrimSpace(decision.Strategy),
strings.TrimSpace(decision.Decision),
decision.GeneratedAt.UTC(),
).Scan(&decision.CreatedAt)
}
func (s *postgresStore) ListRecentSchedulerDecisions(ctx context.Context, limit int) ([]SchedulerDecision, error) {
query := `
SELECT id, account_uuid, node_group, strategy, decision, generated_at, created_at
FROM scheduler_decisions
ORDER BY generated_at DESC`
args := []any{}
if limit > 0 {
query += " LIMIT $1"
args = append(args, limit)
}
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var result []SchedulerDecision
for rows.Next() {
var decision SchedulerDecision
if err := rows.Scan(
&decision.ID,
&decision.AccountUUID,
&decision.NodeGroup,
&decision.Strategy,
&decision.Decision,
&decision.GeneratedAt,
&decision.CreatedAt,
); err != nil {
return nil, err
}
result = append(result, decision)
}
return result, rows.Err()
}

View File

@ -73,6 +73,105 @@ type Agent struct {
UpdatedAt time.Time `json:"updatedAt"`
}
const (
RatingStatusPending = "pending"
RatingStatusRated = "rated"
)
type TrafficStatCheckpoint struct {
NodeID string
AccountUUID string
LastUplinkTotal int64
LastDownlinkTotal int64
LastSeenAt time.Time
XrayRevision string
ResetEpoch int64
CreatedAt time.Time
UpdatedAt time.Time
}
type TrafficMinuteBucket struct {
BucketStart time.Time
NodeID string
AccountUUID string
Region string
LineCode string
UplinkBytes int64
DownlinkBytes int64
TotalBytes int64
Multiplier float64
RatingStatus string
SourceRevision string
CreatedAt time.Time
UpdatedAt time.Time
}
type BillingLedgerEntry struct {
ID string
AccountUUID string
BucketStart time.Time
BucketEnd time.Time
EntryType string
RatedBytes int64
AmountDelta float64
BalanceAfter float64
PricingRuleVersion string
CreatedAt time.Time
}
type AccountQuotaState struct {
AccountUUID string
RemainingIncludedQuota int64
CurrentBalance float64
Arrears bool
ThrottleState string
SuspendState string
LastRatedBucketAt *time.Time
EffectiveAt time.Time
UpdatedAt time.Time
}
type AccountPolicySnapshot struct {
AccountUUID string
PolicyVersion string
AuthState string
RateProfile string
ConnProfile string
EligibleNodeGroups []string
PreferredStrategy string
DegradeMode string
ExpiresAt time.Time
CreatedAt time.Time
UpdatedAt time.Time
}
type NodeHealthSnapshot struct {
NodeID string
Region string
LineCode string
PricingGroup string
StatsEnabled bool
XrayRevision string
Healthy bool
LatencyMS int
ErrorRate float64
ActiveConnections int
HealthScore float64
SampledAt time.Time
CreatedAt time.Time
UpdatedAt time.Time
}
type SchedulerDecision struct {
ID string
AccountUUID string
NodeGroup string
Strategy string
Decision string
GeneratedAt time.Time
CreatedAt time.Time
}
// Store provides persistence operations for users.
type Store interface {
CreateUser(ctx context.Context, user *User) error
@ -106,6 +205,23 @@ type Store interface {
DeleteAgent(ctx context.Context, id string) error
DeleteStaleAgents(ctx context.Context, staleThreshold time.Duration) (int, error)
UpsertTrafficStatCheckpoint(ctx context.Context, checkpoint *TrafficStatCheckpoint) error
GetTrafficStatCheckpoint(ctx context.Context, nodeID, accountUUID string) (*TrafficStatCheckpoint, error)
ListTrafficStatCheckpoints(ctx context.Context) ([]TrafficStatCheckpoint, error)
UpsertTrafficMinuteBucket(ctx context.Context, bucket *TrafficMinuteBucket) error
ListTrafficMinuteBucketsByAccount(ctx context.Context, accountUUID string, start, end time.Time) ([]TrafficMinuteBucket, error)
ListTrafficMinuteBuckets(ctx context.Context) ([]TrafficMinuteBucket, error)
InsertBillingLedgerEntry(ctx context.Context, entry *BillingLedgerEntry) error
ListBillingLedgerByAccount(ctx context.Context, accountUUID string, limit int) ([]BillingLedgerEntry, error)
UpsertAccountQuotaState(ctx context.Context, state *AccountQuotaState) error
GetAccountQuotaState(ctx context.Context, accountUUID string) (*AccountQuotaState, error)
UpsertAccountPolicySnapshot(ctx context.Context, snapshot *AccountPolicySnapshot) error
GetLatestAccountPolicySnapshot(ctx context.Context, accountUUID string) (*AccountPolicySnapshot, error)
UpsertNodeHealthSnapshot(ctx context.Context, snapshot *NodeHealthSnapshot) error
ListLatestNodeHealthSnapshots(ctx context.Context) ([]NodeHealthSnapshot, error)
InsertSchedulerDecision(ctx context.Context, decision *SchedulerDecision) error
ListRecentSchedulerDecisions(ctx context.Context, limit int) ([]SchedulerDecision, error)
EnsureTenant(ctx context.Context, tenant *Tenant) error
EnsureTenantDomain(ctx context.Context, domain *TenantDomain) error
UpsertTenantMembership(ctx context.Context, membership *TenantMembership) error
@ -144,6 +260,13 @@ type memoryStore struct {
tenantDomains map[string]*TenantDomain
tenantMemberships map[string]map[string]*TenantMembership
xworkmateProfiles map[string]*XWorkmateProfile
trafficStatCheckpoints map[string]*TrafficStatCheckpoint
trafficMinuteBuckets map[string]*TrafficMinuteBucket
billingLedgerEntries map[string]*BillingLedgerEntry
accountQuotaStates map[string]*AccountQuotaState
accountPolicySnapshots map[string]*AccountPolicySnapshot
nodeHealthSnapshots map[string]*NodeHealthSnapshot
schedulerDecisions map[string]*SchedulerDecision
}
type sessionRecord struct {
@ -182,6 +305,13 @@ func newMemoryStore(allowSuperAdminCounting bool) Store {
tenantDomains: make(map[string]*TenantDomain),
tenantMemberships: make(map[string]map[string]*TenantMembership),
xworkmateProfiles: make(map[string]*XWorkmateProfile),
trafficStatCheckpoints: make(map[string]*TrafficStatCheckpoint),
trafficMinuteBuckets: make(map[string]*TrafficMinuteBucket),
billingLedgerEntries: make(map[string]*BillingLedgerEntry),
accountQuotaStates: make(map[string]*AccountQuotaState),
accountPolicySnapshots: make(map[string]*AccountPolicySnapshot),
nodeHealthSnapshots: make(map[string]*NodeHealthSnapshot),
schedulerDecisions: make(map[string]*SchedulerDecision),
}
}

View File

@ -0,0 +1,107 @@
CREATE TABLE IF NOT EXISTS public.traffic_stat_checkpoints (
node_id TEXT NOT NULL,
account_uuid UUID NOT NULL REFERENCES public.users(uuid) ON DELETE CASCADE,
last_uplink_total BIGINT NOT NULL DEFAULT 0,
last_downlink_total BIGINT NOT NULL DEFAULT 0,
last_seen_at TIMESTAMPTZ NOT NULL,
xray_revision TEXT NOT NULL DEFAULT '',
reset_epoch BIGINT NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (node_id, account_uuid)
);
CREATE TABLE IF NOT EXISTS public.traffic_minute_buckets (
bucket_start TIMESTAMPTZ NOT NULL,
node_id TEXT NOT NULL,
account_uuid UUID NOT NULL REFERENCES public.users(uuid) ON DELETE CASCADE,
region TEXT NOT NULL DEFAULT '',
line_code TEXT NOT NULL DEFAULT '',
uplink_bytes BIGINT NOT NULL DEFAULT 0,
downlink_bytes BIGINT NOT NULL DEFAULT 0,
total_bytes BIGINT NOT NULL DEFAULT 0,
multiplier DOUBLE PRECISION NOT NULL DEFAULT 1.0,
rating_status TEXT NOT NULL DEFAULT 'pending',
source_revision TEXT NOT NULL DEFAULT '',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (bucket_start, node_id, account_uuid, region, line_code)
);
CREATE TABLE IF NOT EXISTS public.billing_ledger (
id UUID PRIMARY KEY,
account_uuid UUID NOT NULL REFERENCES public.users(uuid) ON DELETE CASCADE,
bucket_start TIMESTAMPTZ NOT NULL,
bucket_end TIMESTAMPTZ NOT NULL,
entry_type TEXT NOT NULL,
rated_bytes BIGINT NOT NULL DEFAULT 0,
amount_delta DOUBLE PRECISION NOT NULL DEFAULT 0,
balance_after DOUBLE PRECISION NOT NULL DEFAULT 0,
pricing_rule_version TEXT NOT NULL DEFAULT '',
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS public.account_quota_states (
account_uuid UUID PRIMARY KEY REFERENCES public.users(uuid) ON DELETE CASCADE,
remaining_included_quota BIGINT NOT NULL DEFAULT 0,
current_balance DOUBLE PRECISION NOT NULL DEFAULT 0,
arrears BOOLEAN NOT NULL DEFAULT false,
throttle_state TEXT NOT NULL DEFAULT 'normal',
suspend_state TEXT NOT NULL DEFAULT 'active',
last_rated_bucket_at TIMESTAMPTZ NULL,
effective_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS public.account_policy_snapshots (
account_uuid UUID PRIMARY KEY REFERENCES public.users(uuid) ON DELETE CASCADE,
policy_version TEXT NOT NULL,
auth_state TEXT NOT NULL DEFAULT 'active',
rate_profile TEXT NOT NULL DEFAULT 'standard',
conn_profile TEXT NOT NULL DEFAULT 'standard',
eligible_node_groups JSONB NOT NULL DEFAULT '[]'::jsonb,
preferred_strategy TEXT NOT NULL DEFAULT 'ewma',
degrade_mode TEXT NOT NULL DEFAULT 'deny',
expires_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS public.node_health_snapshots (
node_id TEXT PRIMARY KEY,
region TEXT NOT NULL DEFAULT '',
line_code TEXT NOT NULL DEFAULT '',
pricing_group TEXT NOT NULL DEFAULT '',
stats_enabled BOOLEAN NOT NULL DEFAULT false,
xray_revision TEXT NOT NULL DEFAULT '',
healthy BOOLEAN NOT NULL DEFAULT false,
latency_ms INTEGER NOT NULL DEFAULT 0,
error_rate DOUBLE PRECISION NOT NULL DEFAULT 0,
active_connections INTEGER NOT NULL DEFAULT 0,
health_score DOUBLE PRECISION NOT NULL DEFAULT 0,
sampled_at TIMESTAMPTZ NOT NULL DEFAULT now(),
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS public.scheduler_decisions (
id UUID PRIMARY KEY,
account_uuid UUID NULL REFERENCES public.users(uuid) ON DELETE CASCADE,
node_group TEXT NOT NULL DEFAULT '',
strategy TEXT NOT NULL DEFAULT '',
decision TEXT NOT NULL DEFAULT '',
generated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_traffic_minute_buckets_account_bucket
ON public.traffic_minute_buckets (account_uuid, bucket_start DESC);
CREATE INDEX IF NOT EXISTS idx_billing_ledger_account_created
ON public.billing_ledger (account_uuid, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_node_health_snapshots_sampled
ON public.node_health_snapshots (sampled_at DESC);
CREATE INDEX IF NOT EXISTS idx_scheduler_decisions_generated
ON public.scheduler_decisions (generated_at DESC);