feat: Implement dynamic agent registration using self-reported IDs and persist agent details in a new agents database table.

This commit is contained in:
Haitao Pan 2026-02-05 00:07:30 +08:00
parent 7cae075709
commit c084cd4c8f
3 changed files with 45 additions and 11 deletions

View File

@ -658,16 +658,13 @@ func runServer(ctx context.Context, cfg *config.Config, logger *slog.Logger) err
}
} else if token := os.Getenv("INTERNAL_SERVICE_TOKEN"); token != "" {
// Fallback: if no credentials configured but we have an internal token,
// accept any agent that presents this token (ID will be taken from agent's self-reported ID)
// This allows the agent to use its configured ID (e.g., "hk-xhttp.svc.plus")
agentID := strings.TrimSpace(os.Getenv("AGENT_ID"))
if agentID == "" {
agentID = "internal-agent" // fallback ID if not specified
}
// create a wildcard credential that accepts any agent presenting this token.
// The actual agent ID will be extracted from the request (e.g., X-Agent-ID header).
// This allows multiple agents to authenticate with the same shared token.
agentRegistry, err = agentserver.NewRegistry(agentserver.Config{
Credentials: []agentserver.Credential{{
ID: agentID,
Name: "Internal Agent",
ID: "*", // Wildcard: accept any agent ID
Name: "Internal Agents (Shared Token)",
Token: token,
Groups: []string{"internal"},
}},
@ -1047,7 +1044,7 @@ func agentReportStatusHandler(registry *agentserver.Registry, logger *slog.Logge
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "agent_identity_missing", "message": "agent identity missing"})
return
}
identity, ok := value.(agentserver.Identity)
authenticatedIdentity, ok := value.(agentserver.Identity)
if !ok {
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "agent_identity_invalid", "message": "agent identity malformed"})
return
@ -1057,9 +1054,21 @@ func agentReportStatusHandler(registry *agentserver.Registry, logger *slog.Logge
c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": "invalid_status_payload", "message": "invalid status payload"})
return
}
registry.ReportStatus(identity, report)
// Extract agent ID from report (self-reported by agent)
agentID := strings.TrimSpace(report.AgentID)
if agentID == "" {
// Fallback to authenticated identity ID if agent doesn't report its ID
agentID = authenticatedIdentity.ID
}
// Dynamically register agent with self-reported ID
// This allows multiple agents to use the same shared token
agentIdentity := registry.RegisterAgent(agentID, authenticatedIdentity.Groups)
registry.ReportStatus(agentIdentity, report)
if logger != nil {
logger.Info("agent status updated", "agent", identity.ID, "healthy", report.Healthy, "clients", report.Xray.Clients)
logger.Info("agent status updated", "agent", agentIdentity.ID, "healthy", report.Healthy, "clients", report.Xray.Clients)
}
c.Status(http.StatusNoContent)
}

View File

@ -18,6 +18,7 @@ type ClientListResponse struct {
// StatusReport captures the runtime state of an agent and the managed Xray
// instance.
type StatusReport struct {
AgentID string `json:"agentId"` // Self-reported agent ID (e.g., "hk-xhttp.svc.plus")
Healthy bool `json:"healthy"`
Message string `json:"message,omitempty"`
Users int `json:"users"`

View File

@ -117,6 +117,30 @@ func (r *Registry) ReportStatus(agent Identity, report agentproto.StatusReport)
}
}
// RegisterAgent dynamically registers an agent with the given ID if it doesn't already exist.
// This allows agents to self-report their IDs when using a shared authentication token.
// The agent will inherit the groups from the credential used for authentication.
// Returns the identity for the agent (either existing or newly created).
func (r *Registry) RegisterAgent(agentID string, groups []string) Identity {
r.mu.Lock()
defer r.mu.Unlock()
// Check if agent already registered
if identity, exists := r.byID[agentID]; exists {
return identity
}
// Create new identity for this agent
identity := Identity{
ID: agentID,
Name: agentID, // Use ID as name by default
Groups: groups,
}
r.byID[agentID] = identity
return identity
}
// Statuses returns the latest status snapshot for all agents sorted by ID.
func (r *Registry) Statuses() []StatusSnapshot {
r.mu.RLock()