From e1e22bca6212f22a1baf4393ed642bc8ab615919 Mon Sep 17 00:00:00 2001 From: Haitao Pan Date: Tue, 27 Jan 2026 21:56:39 +0800 Subject: [PATCH] feat: Introduce `nodes` table and refactor Xray config syncer to generate multiple protocol-specific configurations using new XHTTP and TCP templates. --- cmd/accountsvc/main.go | 17 ++++-- internal/agentmode/runner.go | 23 ++++++- internal/xrayconfig/syncer.go | 21 ++++--- internal/xrayconfig/syncer_test.go | 32 +++++----- internal/xrayconfig/template_tcp.json | 79 +++++++++++++++++++++++++ internal/xrayconfig/template_xhttp.json | 46 ++++++++++++++ internal/xrayconfig/templates.go | 18 +++++- sql/schema.sql | 25 ++++++++ 8 files changed, 230 insertions(+), 31 deletions(-) create mode 100644 internal/xrayconfig/template_tcp.json create mode 100644 internal/xrayconfig/template_xhttp.json diff --git a/cmd/accountsvc/main.go b/cmd/accountsvc/main.go index a2c36c7..92d4e3e 100644 --- a/cmd/accountsvc/main.go +++ b/cmd/accountsvc/main.go @@ -204,10 +204,19 @@ func runServer(ctx context.Context, cfg *config.Config, logger *slog.Logger) err outputPath = "/usr/local/etc/xray/config.json" } syncer, err := xrayconfig.NewPeriodicSyncer(xrayconfig.PeriodicOptions{ - Logger: logger.With("component", "xray-sync"), - Interval: syncInterval, - Source: gormSource, - Generator: xrayconfig.Generator{Definition: xrayconfig.DefaultDefinition(), OutputPath: outputPath}, + Logger: logger.With("component", "xray-sync"), + Interval: syncInterval, + Source: gormSource, + Generators: []xrayconfig.Generator{ + { + Definition: xrayconfig.XHTTPDefinition(), + OutputPath: "/usr/local/etc/xray/config.json", // Match user's xhttp config path + }, + { + Definition: xrayconfig.TCPDefinition(), + OutputPath: "/usr/local/etc/xray/tcp-config.json", // Match user's tcp config path + }, + }, ValidateCommand: cfg.Xray.Sync.ValidateCommand, RestartCommand: cfg.Xray.Sync.RestartCommand, }) diff --git a/internal/agentmode/runner.go b/internal/agentmode/runner.go index 8efce01..02983c8 100644 --- a/internal/agentmode/runner.go +++ b/internal/agentmode/runner.go @@ -78,13 +78,30 @@ func Run(ctx context.Context, opts Options) error { tracker := newSyncTracker() source := NewHTTPClientSource(client, tracker) - generator := xrayconfig.Generator{Definition: xrayconfig.DefaultDefinition(), OutputPath: outputPath} + generators := []xrayconfig.Generator{ + { + Definition: xrayconfig.XHTTPDefinition(), + OutputPath: "/usr/local/etc/xray/config.json", + }, + { + Definition: xrayconfig.TCPDefinition(), + OutputPath: "/usr/local/etc/xray/tcp-config.json", + }, + } if templatePath := strings.TrimSpace(opts.Xray.Sync.TemplatePath); templatePath != "" { payload, err := os.ReadFile(templatePath) if err != nil { return fmt.Errorf("load xray template %s: %w", templatePath, err) } - generator.Definition = xrayconfig.JSONDefinition{Raw: append([]byte(nil), payload...)} + // If custom template is provided, we use it for the primary output path + // specified in config, or fallback to the first default. + effectivePath := outputPath + generators = []xrayconfig.Generator{ + { + Definition: xrayconfig.JSONDefinition{Raw: append([]byte(nil), payload...)}, + OutputPath: effectivePath, + }, + } } syncLogger := logger.With("component", "agent-xray-sync") @@ -92,7 +109,7 @@ func Run(ctx context.Context, opts Options) error { Logger: syncLogger, Interval: syncInterval, Source: source, - Generator: generator, + Generators: generators, ValidateCommand: opts.Xray.Sync.ValidateCommand, RestartCommand: opts.Xray.Sync.RestartCommand, OnSync: func(result xrayconfig.SyncResult) { diff --git a/internal/xrayconfig/syncer.go b/internal/xrayconfig/syncer.go index 5ebada5..1e20572 100644 --- a/internal/xrayconfig/syncer.go +++ b/internal/xrayconfig/syncer.go @@ -22,7 +22,7 @@ type PeriodicOptions struct { Logger *slog.Logger Interval time.Duration Source ClientSource - Generator Generator + Generators []Generator ValidateCommand []string RestartCommand []string Runner commandRunner @@ -34,7 +34,7 @@ type PeriodicSyncer struct { logger *slog.Logger interval time.Duration source ClientSource - generator Generator + generators []Generator validateCommand []string restartCommand []string runner commandRunner @@ -53,8 +53,13 @@ func NewPeriodicSyncer(opts PeriodicOptions) (*PeriodicSyncer, error) { if opts.Source == nil { return nil, errors.New("client source is required") } - if strings.TrimSpace(opts.Generator.OutputPath) == "" { - return nil, errors.New("generator output path is required") + if len(opts.Generators) == 0 { + return nil, errors.New("at least one generator is required") + } + for i, g := range opts.Generators { + if strings.TrimSpace(g.OutputPath) == "" { + return nil, fmt.Errorf("generator %d output path is required", i) + } } if opts.Interval <= 0 { return nil, errors.New("interval must be positive") @@ -71,7 +76,7 @@ func NewPeriodicSyncer(opts PeriodicOptions) (*PeriodicSyncer, error) { logger: logger, interval: opts.Interval, source: opts.Source, - generator: opts.Generator, + generators: append([]Generator(nil), opts.Generators...), validateCommand: append([]string(nil), opts.ValidateCommand...), restartCommand: append([]string(nil), opts.RestartCommand...), runner: runner, @@ -148,8 +153,10 @@ func (s *PeriodicSyncer) sync(ctx context.Context) (int, error) { if err != nil { return 0, fmt.Errorf("list clients: %w", err) } - if err := s.generator.Generate(clients); err != nil { - return 0, fmt.Errorf("generate config: %w", err) + for _, g := range s.generators { + if err := g.Generate(clients); err != nil { + return 0, fmt.Errorf("generate config for %s: %w", g.OutputPath, err) + } } if len(s.validateCommand) > 0 { if err := s.runCommand(ctx, s.validateCommand, "validate config"); err != nil { diff --git a/internal/xrayconfig/syncer_test.go b/internal/xrayconfig/syncer_test.go index c5802c5..531af81 100644 --- a/internal/xrayconfig/syncer_test.go +++ b/internal/xrayconfig/syncer_test.go @@ -35,24 +35,24 @@ func TestNewPeriodicSyncerValidation(t *testing.T) { { name: "missing source", opts: PeriodicOptions{ - Interval: time.Minute, - Generator: Generator{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}, OutputPath: output}, + Interval: time.Minute, + Generators: []Generator{{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}, OutputPath: output}}, }, }, { name: "missing output", opts: PeriodicOptions{ - Interval: time.Minute, - Source: staticSource{}, - Generator: Generator{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}}, + Interval: time.Minute, + Source: staticSource{}, + Generators: []Generator{{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}}}, }, }, { name: "non-positive interval", opts: PeriodicOptions{ - Interval: 0, - Source: staticSource{}, - Generator: Generator{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}, OutputPath: output}, + Interval: 0, + Source: staticSource{}, + Generators: []Generator{{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}, OutputPath: output}}, }, }, } @@ -71,7 +71,7 @@ func TestPeriodicSyncerSyncSuccess(t *testing.T) { opts := PeriodicOptions{ Interval: time.Minute, Source: staticSource{clients: []Client{{ID: "uuid-a", Email: "a@example"}, {ID: "uuid-b"}}}, - Generator: Generator{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}, OutputPath: output}, + Generators: []Generator{{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}, OutputPath: output}}, ValidateCommand: []string{"echo", "validate"}, RestartCommand: []string{"echo", "restart"}, } @@ -112,9 +112,9 @@ func TestPeriodicSyncerSyncSuccess(t *testing.T) { func TestPeriodicSyncerSyncError(t *testing.T) { output := filepath.Join(t.TempDir(), "config.json") opts := PeriodicOptions{ - Interval: time.Minute, - Source: staticSource{err: errors.New("boom")}, - Generator: Generator{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}, OutputPath: output}, + Interval: time.Minute, + Source: staticSource{err: errors.New("boom")}, + Generators: []Generator{{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}, OutputPath: output}}, } syncer, err := NewPeriodicSyncer(opts) if err != nil { @@ -135,7 +135,7 @@ func TestPeriodicSyncerStartStop(t *testing.T) { calls.Add(1) return src.ListClients(ctx) }), - Generator: Generator{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}, OutputPath: output}, + Generators: []Generator{{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}, OutputPath: output}}, } syncer, err := NewPeriodicSyncer(opts) if err != nil { @@ -162,9 +162,9 @@ func TestPeriodicSyncerOnSyncCallback(t *testing.T) { var results []SyncResult var mu sync.Mutex opts := PeriodicOptions{ - Interval: 5 * time.Millisecond, - Source: staticSource{clients: []Client{{ID: "uuid-a"}}}, - Generator: Generator{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}, OutputPath: output}, + Interval: 5 * time.Millisecond, + Source: staticSource{clients: []Client{{ID: "uuid-a"}}}, + Generators: []Generator{{Definition: JSONDefinition{Raw: []byte(minimalTemplate)}, OutputPath: output}}, OnSync: func(res SyncResult) { mu.Lock() defer mu.Unlock() diff --git a/internal/xrayconfig/template_tcp.json b/internal/xrayconfig/template_tcp.json new file mode 100644 index 0000000..9268580 --- /dev/null +++ b/internal/xrayconfig/template_tcp.json @@ -0,0 +1,79 @@ +{ + "log": { + "loglevel": "warning" + }, + "routing": { + "domainStrategy": "IPIfNonMatch", + "rules": [ + { + "type": "field", + "ip": [ + "geoip:cn" + ], + "outboundTag": "block" + } + ] + }, + "inbounds": [ + { + "listen": "0.0.0.0", + "port": 1443, + "protocol": "vless", + "settings": { + "clients": [], + "decryption": "none", + "fallbacks": [ + { + "dest": "8001", + "xver": 1 + }, + { + "alpn": "h2", + "dest": "8002", + "xver": 1 + } + ] + }, + "streamSettings": { + "network": "tcp", + "security": "tls", + "tlsSettings": { + "rejectUnknownSni": true, + "minVersion": "1.2", + "certificates": [ + { + "ocspStapling": 3600, + "certificateFile": "/var/lib/caddy/.local/share/caddy/certificates/acme-v02.api.letsencrypt.org-directory/ha-proxy-jp.svc.plus/ha-proxy-jp.svc.plus.crt", + "keyFile": "/var/lib/caddy/.local/share/caddy/certificates/acme-v02.api.letsencrypt.org-directory/ha-proxy-jp.svc.plus/ha-proxy-jp.svc.plus.key" + } + ] + } + }, + "sniffing": { + "enabled": true, + "destOverride": [ + "http", + "tls" + ] + } + } + ], + "outbounds": [ + { + "protocol": "freedom", + "tag": "direct" + }, + { + "protocol": "blackhole", + "tag": "block" + } + ], + "policy": { + "levels": { + "0": { + "handshake": 2, + "connIdle": 120 + } + } + } +} \ No newline at end of file diff --git a/internal/xrayconfig/template_xhttp.json b/internal/xrayconfig/template_xhttp.json new file mode 100644 index 0000000..a46f063 --- /dev/null +++ b/internal/xrayconfig/template_xhttp.json @@ -0,0 +1,46 @@ +{ + "log": { + "loglevel": "debug" + }, + "routing": { + "domainStrategy": "AsIs", + "rules": [ + { + "type": "field", + "ip": [ + "geoip:private" + ], + "outboundTag": "blocked" + } + ] + }, + "inbounds": [ + { + "listen": "/dev/shm/xray.sock,0666", + "protocol": "vless", + "settings": { + "clients": [], + "decryption": "none" + }, + "streamSettings": { + "network": "xhttp", + "xhttpSettings": { + "mode": "auto", + "path": "/split" + } + } + } + ], + "outbounds": [ + { + "protocol": "freedom", + "settings": {}, + "tag": "direct" + }, + { + "protocol": "blackhole", + "settings": {}, + "tag": "blocked" + } + ] +} \ No newline at end of file diff --git a/internal/xrayconfig/templates.go b/internal/xrayconfig/templates.go index c38830f..cf0944e 100644 --- a/internal/xrayconfig/templates.go +++ b/internal/xrayconfig/templates.go @@ -5,6 +5,12 @@ import _ "embed" var ( //go:embed template_server.json serverTemplateJSON []byte + + //go:embed template_tcp.json + tcpTemplateJSON []byte + + //go:embed template_xhttp.json + xhttpTemplateJSON []byte ) // DefaultDefinition returns the built-in Xray configuration definition used when @@ -12,5 +18,15 @@ var ( // that configuration rendering no longer depends on filesystem state at // runtime. func DefaultDefinition() Definition { - return JSONDefinition{Raw: append([]byte(nil), serverTemplateJSON...)} + return TCPDefinition() +} + +// TCPDefinition returns the Xray configuration for TCP transport. +func TCPDefinition() Definition { + return JSONDefinition{Raw: append([]byte(nil), tcpTemplateJSON...)} +} + +// XHTTPDefinition returns the Xray configuration for XHTTP transport. +func XHTTPDefinition() Definition { + return JSONDefinition{Raw: append([]byte(nil), xhttpTemplateJSON...)} } diff --git a/sql/schema.sql b/sql/schema.sql index 659b019..4e30897 100644 --- a/sql/schema.sql +++ b/sql/schema.sql @@ -131,6 +131,21 @@ CREATE TABLE public.subscriptions ( CONSTRAINT subscriptions_user_external_uk UNIQUE (user_uuid, external_id) ); +CREATE TABLE public.nodes ( + uuid UUID PRIMARY KEY DEFAULT gen_random_uuid(), + name TEXT NOT NULL, + location TEXT NOT NULL, + address TEXT NOT NULL, + port INTEGER NOT NULL DEFAULT 443, + server_name TEXT, + protocols JSONB NOT NULL DEFAULT '[]'::jsonb, + available BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + version BIGINT NOT NULL DEFAULT 0, + origin_node TEXT NOT NULL DEFAULT 'local' +); + -- ========================================= -- Indexes -- ========================================= @@ -141,6 +156,7 @@ CREATE INDEX idx_sessions_user_uuid ON public.sessions (user_uuid); CREATE INDEX idx_admin_settings_version ON public.admin_settings (version); CREATE INDEX idx_subscriptions_user_uuid ON public.subscriptions (user_uuid); CREATE INDEX idx_subscriptions_status ON public.subscriptions (status); +CREATE INDEX idx_nodes_available ON public.nodes (available); -- ========================================= -- Triggers @@ -190,3 +206,12 @@ CREATE TRIGGER trg_admin_settings_bump_version CREATE TRIGGER trg_subscriptions_set_updated_at BEFORE UPDATE ON public.subscriptions FOR EACH ROW EXECUTE FUNCTION public.set_updated_at(); + +-- nodes +CREATE TRIGGER trg_nodes_set_updated_at + BEFORE UPDATE ON public.nodes + FOR EACH ROW EXECUTE FUNCTION public.set_updated_at(); + +CREATE TRIGGER trg_nodes_bump_version + BEFORE UPDATE ON public.nodes + FOR EACH ROW EXECUTE FUNCTION public.bump_version();