feat: Introduce nodes table and refactor Xray config syncer to generate multiple protocol-specific configurations using new XHTTP and TCP templates.

This commit is contained in:
Haitao Pan 2026-01-27 21:56:39 +08:00
parent 858cc4ff70
commit 96a080bdf5
8 changed files with 230 additions and 31 deletions

View File

@ -204,10 +204,19 @@ func runServer(ctx context.Context, cfg *config.Config, logger *slog.Logger) err
outputPath = "/usr/local/etc/xray/config.json"
}
syncer, err := xrayconfig.NewPeriodicSyncer(xrayconfig.PeriodicOptions{
Logger: logger.With("component", "xray-sync"),
Interval: syncInterval,
Source: gormSource,
Generator: xrayconfig.Generator{Definition: xrayconfig.DefaultDefinition(), OutputPath: outputPath},
Logger: logger.With("component", "xray-sync"),
Interval: syncInterval,
Source: gormSource,
Generators: []xrayconfig.Generator{
{
Definition: xrayconfig.XHTTPDefinition(),
OutputPath: "/usr/local/etc/xray/config.json", // Match user's xhttp config path
},
{
Definition: xrayconfig.TCPDefinition(),
OutputPath: "/usr/local/etc/xray/tcp-config.json", // Match user's tcp config path
},
},
ValidateCommand: cfg.Xray.Sync.ValidateCommand,
RestartCommand: cfg.Xray.Sync.RestartCommand,
})

View File

@ -78,13 +78,30 @@ func Run(ctx context.Context, opts Options) error {
tracker := newSyncTracker()
source := NewHTTPClientSource(client, tracker)
generator := xrayconfig.Generator{Definition: xrayconfig.DefaultDefinition(), OutputPath: outputPath}
generators := []xrayconfig.Generator{
{
Definition: xrayconfig.XHTTPDefinition(),
OutputPath: "/usr/local/etc/xray/config.json",
},
{
Definition: xrayconfig.TCPDefinition(),
OutputPath: "/usr/local/etc/xray/tcp-config.json",
},
}
if templatePath := strings.TrimSpace(opts.Xray.Sync.TemplatePath); templatePath != "" {
payload, err := os.ReadFile(templatePath)
if err != nil {
return fmt.Errorf("load xray template %s: %w", templatePath, err)
}
generator.Definition = xrayconfig.JSONDefinition{Raw: append([]byte(nil), payload...)}
// If custom template is provided, we use it for the primary output path
// specified in config, or fallback to the first default.
effectivePath := outputPath
generators = []xrayconfig.Generator{
{
Definition: xrayconfig.JSONDefinition{Raw: append([]byte(nil), payload...)},
OutputPath: effectivePath,
},
}
}
syncLogger := logger.With("component", "agent-xray-sync")
@ -92,7 +109,7 @@ func Run(ctx context.Context, opts Options) error {
Logger: syncLogger,
Interval: syncInterval,
Source: source,
Generator: generator,
Generators: generators,
ValidateCommand: opts.Xray.Sync.ValidateCommand,
RestartCommand: opts.Xray.Sync.RestartCommand,
OnSync: func(result xrayconfig.SyncResult) {

View File

@ -22,7 +22,7 @@ type PeriodicOptions struct {
Logger *slog.Logger
Interval time.Duration
Source ClientSource
Generator Generator
Generators []Generator
ValidateCommand []string
RestartCommand []string
Runner commandRunner
@ -34,7 +34,7 @@ type PeriodicSyncer struct {
logger *slog.Logger
interval time.Duration
source ClientSource
generator Generator
generators []Generator
validateCommand []string
restartCommand []string
runner commandRunner
@ -53,8 +53,13 @@ func NewPeriodicSyncer(opts PeriodicOptions) (*PeriodicSyncer, error) {
if opts.Source == nil {
return nil, errors.New("client source is required")
}
if strings.TrimSpace(opts.Generator.OutputPath) == "" {
return nil, errors.New("generator output path is required")
if len(opts.Generators) == 0 {
return nil, errors.New("at least one generator is required")
}
for i, g := range opts.Generators {
if strings.TrimSpace(g.OutputPath) == "" {
return nil, fmt.Errorf("generator %d output path is required", i)
}
}
if opts.Interval <= 0 {
return nil, errors.New("interval must be positive")
@ -71,7 +76,7 @@ func NewPeriodicSyncer(opts PeriodicOptions) (*PeriodicSyncer, error) {
logger: logger,
interval: opts.Interval,
source: opts.Source,
generator: opts.Generator,
generators: append([]Generator(nil), opts.Generators...),
validateCommand: append([]string(nil), opts.ValidateCommand...),
restartCommand: append([]string(nil), opts.RestartCommand...),
runner: runner,
@ -148,8 +153,10 @@ func (s *PeriodicSyncer) sync(ctx context.Context) (int, error) {
if err != nil {
return 0, fmt.Errorf("list clients: %w", err)
}
if err := s.generator.Generate(clients); err != nil {
return 0, fmt.Errorf("generate config: %w", err)
for _, g := range s.generators {
if err := g.Generate(clients); err != nil {
return 0, fmt.Errorf("generate config for %s: %w", g.OutputPath, err)
}
}
if len(s.validateCommand) > 0 {
if err := s.runCommand(ctx, s.validateCommand, "validate config"); err != nil {

View File

@ -35,24 +35,24 @@ func TestNewPeriodicSyncerValidation(t *testing.T) {
{
name: "missing source",
opts: PeriodicOptions{
Interval: time.Minute,
Generator: Generator{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}, OutputPath: output},
Interval: time.Minute,
Generators: []Generator{{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}, OutputPath: output}},
},
},
{
name: "missing output",
opts: PeriodicOptions{
Interval: time.Minute,
Source: staticSource{},
Generator: Generator{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}},
Interval: time.Minute,
Source: staticSource{},
Generators: []Generator{{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}}},
},
},
{
name: "non-positive interval",
opts: PeriodicOptions{
Interval: 0,
Source: staticSource{},
Generator: Generator{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}, OutputPath: output},
Interval: 0,
Source: staticSource{},
Generators: []Generator{{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}, OutputPath: output}},
},
},
}
@ -71,7 +71,7 @@ func TestPeriodicSyncerSyncSuccess(t *testing.T) {
opts := PeriodicOptions{
Interval: time.Minute,
Source: staticSource{clients: []Client{{ID: "uuid-a", Email: "a@example"}, {ID: "uuid-b"}}},
Generator: Generator{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}, OutputPath: output},
Generators: []Generator{{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}, OutputPath: output}},
ValidateCommand: []string{"echo", "validate"},
RestartCommand: []string{"echo", "restart"},
}
@ -112,9 +112,9 @@ func TestPeriodicSyncerSyncSuccess(t *testing.T) {
func TestPeriodicSyncerSyncError(t *testing.T) {
output := filepath.Join(t.TempDir(), "config.json")
opts := PeriodicOptions{
Interval: time.Minute,
Source: staticSource{err: errors.New("boom")},
Generator: Generator{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}, OutputPath: output},
Interval: time.Minute,
Source: staticSource{err: errors.New("boom")},
Generators: []Generator{{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}, OutputPath: output}},
}
syncer, err := NewPeriodicSyncer(opts)
if err != nil {
@ -135,7 +135,7 @@ func TestPeriodicSyncerStartStop(t *testing.T) {
calls.Add(1)
return src.ListClients(ctx)
}),
Generator: Generator{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}, OutputPath: output},
Generators: []Generator{{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}, OutputPath: output}},
}
syncer, err := NewPeriodicSyncer(opts)
if err != nil {
@ -162,9 +162,9 @@ func TestPeriodicSyncerOnSyncCallback(t *testing.T) {
var results []SyncResult
var mu sync.Mutex
opts := PeriodicOptions{
Interval: 5 * time.Millisecond,
Source: staticSource{clients: []Client{{ID: "uuid-a"}}},
Generator: Generator{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}, OutputPath: output},
Interval: 5 * time.Millisecond,
Source: staticSource{clients: []Client{{ID: "uuid-a"}}},
Generators: []Generator{{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}, OutputPath: output}},
OnSync: func(res SyncResult) {
mu.Lock()
defer mu.Unlock()

View File

@ -0,0 +1,79 @@
{
"log": {
"loglevel": "warning"
},
"routing": {
"domainStrategy": "IPIfNonMatch",
"rules": [
{
"type": "field",
"ip": [
"geoip:cn"
],
"outboundTag": "block"
}
]
},
"inbounds": [
{
"listen": "0.0.0.0",
"port": 1443,
"protocol": "vless",
"settings": {
"clients": [],
"decryption": "none",
"fallbacks": [
{
"dest": "8001",
"xver": 1
},
{
"alpn": "h2",
"dest": "8002",
"xver": 1
}
]
},
"streamSettings": {
"network": "tcp",
"security": "tls",
"tlsSettings": {
"rejectUnknownSni": true,
"minVersion": "1.2",
"certificates": [
{
"ocspStapling": 3600,
"certificateFile": "/var/lib/caddy/.local/share/caddy/certificates/acme-v02.api.letsencrypt.org-directory/ha-proxy-jp.svc.plus/ha-proxy-jp.svc.plus.crt",
"keyFile": "/var/lib/caddy/.local/share/caddy/certificates/acme-v02.api.letsencrypt.org-directory/ha-proxy-jp.svc.plus/ha-proxy-jp.svc.plus.key"
}
]
}
},
"sniffing": {
"enabled": true,
"destOverride": [
"http",
"tls"
]
}
}
],
"outbounds": [
{
"protocol": "freedom",
"tag": "direct"
},
{
"protocol": "blackhole",
"tag": "block"
}
],
"policy": {
"levels": {
"0": {
"handshake": 2,
"connIdle": 120
}
}
}
}

View File

@ -0,0 +1,46 @@
{
"log": {
"loglevel": "debug"
},
"routing": {
"domainStrategy": "AsIs",
"rules": [
{
"type": "field",
"ip": [
"geoip:private"
],
"outboundTag": "blocked"
}
]
},
"inbounds": [
{
"listen": "/dev/shm/xray.sock,0666",
"protocol": "vless",
"settings": {
"clients": [],
"decryption": "none"
},
"streamSettings": {
"network": "xhttp",
"xhttpSettings": {
"mode": "auto",
"path": "/split"
}
}
}
],
"outbounds": [
{
"protocol": "freedom",
"settings": {},
"tag": "direct"
},
{
"protocol": "blackhole",
"settings": {},
"tag": "blocked"
}
]
}

View File

@ -5,6 +5,12 @@ import _ "embed"
var (
//go:embed template_server.json
serverTemplateJSON []byte
//go:embed template_tcp.json
tcpTemplateJSON []byte
//go:embed template_xhttp.json
xhttpTemplateJSON []byte
)
// DefaultDefinition returns the built-in Xray configuration definition used when
@ -12,5 +18,15 @@ var (
// that configuration rendering no longer depends on filesystem state at
// runtime.
func DefaultDefinition() Definition {
return JSONDefinition{Raw: append([]byte(nil), serverTemplateJSON...)}
return TCPDefinition()
}
// TCPDefinition returns the Xray configuration for TCP transport.
func TCPDefinition() Definition {
return JSONDefinition{Raw: append([]byte(nil), tcpTemplateJSON...)}
}
// XHTTPDefinition returns the Xray configuration for XHTTP transport.
func XHTTPDefinition() Definition {
return JSONDefinition{Raw: append([]byte(nil), xhttpTemplateJSON...)}
}

View File

@ -131,6 +131,21 @@ CREATE TABLE public.subscriptions (
CONSTRAINT subscriptions_user_external_uk UNIQUE (user_uuid, external_id)
);
CREATE TABLE public.nodes (
uuid UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
location TEXT NOT NULL,
address TEXT NOT NULL,
port INTEGER NOT NULL DEFAULT 443,
server_name TEXT,
protocols JSONB NOT NULL DEFAULT '[]'::jsonb,
available BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
version BIGINT NOT NULL DEFAULT 0,
origin_node TEXT NOT NULL DEFAULT 'local'
);
-- =========================================
-- Indexes
-- =========================================
@ -141,6 +156,7 @@ CREATE INDEX idx_sessions_user_uuid ON public.sessions (user_uuid);
CREATE INDEX idx_admin_settings_version ON public.admin_settings (version);
CREATE INDEX idx_subscriptions_user_uuid ON public.subscriptions (user_uuid);
CREATE INDEX idx_subscriptions_status ON public.subscriptions (status);
CREATE INDEX idx_nodes_available ON public.nodes (available);
-- =========================================
-- Triggers
@ -190,3 +206,12 @@ CREATE TRIGGER trg_admin_settings_bump_version
CREATE TRIGGER trg_subscriptions_set_updated_at
BEFORE UPDATE ON public.subscriptions
FOR EACH ROW EXECUTE FUNCTION public.set_updated_at();
-- nodes
CREATE TRIGGER trg_nodes_set_updated_at
BEFORE UPDATE ON public.nodes
FOR EACH ROW EXECUTE FUNCTION public.set_updated_at();
CREATE TRIGGER trg_nodes_bump_version
BEFORE UPDATE ON public.nodes
FOR EACH ROW EXECUTE FUNCTION public.bump_version();