Add SSH-based account sync utility (#602)
This commit is contained in:
parent
78f1d10031
commit
2a945adb6f
@ -27,6 +27,7 @@ PGLOGICAL_REGION_FILE := ./sql/schema_pglogical_region.sql
|
||||
ACCOUNT_EXPORT_FILE ?= account-export.yaml
|
||||
ACCOUNT_IMPORT_FILE ?= account-export.yaml
|
||||
ACCOUNT_EMAIL_KEYWORD ?=
|
||||
ACCOUNT_SYNC_CONFIG ?= config/sync.yaml
|
||||
SUPERADMIN_USERNAME ?= Admin
|
||||
SUPERADMIN_PASSWORD ?= ChangeMe
|
||||
SUPERADMIN_EMAIL ?= admin@svc.plus
|
||||
@ -38,8 +39,8 @@ export PATH := /usr/local/go/bin:$(PATH)
|
||||
# =========================================
|
||||
|
||||
.PHONY: all init build clean start stop restart dev test help \
|
||||
init-db-core init-db-replication init-db-pglogical \
|
||||
reinit-pglogical
|
||||
init-db-core init-db-replication init-db-pglogical \
|
||||
reinit-pglogical account-sync-push account-sync-pull account-sync-mirror
|
||||
|
||||
all: build
|
||||
|
||||
@ -203,13 +204,25 @@ account-export:
|
||||
@go run ./cmd/migratectl/main.go export --dsn "$(DB_URL)" --output "$(ACCOUNT_EXPORT_FILE)" $(if $(ACCOUNT_EMAIL_KEYWORD),--email "$(ACCOUNT_EMAIL_KEYWORD)")
|
||||
|
||||
account-import:
|
||||
@[ -f "$(ACCOUNT_IMPORT_FILE)" ] || (echo "❌ 未找到文件 $(ACCOUNT_IMPORT_FILE)"; exit 1)
|
||||
@go run ./cmd/migratectl/main.go import --dsn "$(DB_URL)" --file "$(ACCOUNT_IMPORT_FILE)" \
|
||||
$(if $(ACCOUNT_IMPORT_MERGE),--merge) \
|
||||
$(if $(ACCOUNT_IMPORT_MERGE_STRATEGY),--merge-strategy "$(ACCOUNT_IMPORT_MERGE_STRATEGY)") \
|
||||
$(if $(ACCOUNT_IMPORT_DRY_RUN),--dry-run) \
|
||||
$(foreach UUID,$(ACCOUNT_IMPORT_MERGE_ALLOWLIST),--merge-allowlist $(UUID)) \
|
||||
$(ACCOUNT_IMPORT_EXTRA_FLAGS)
|
||||
@[ -f "$(ACCOUNT_IMPORT_FILE)" ] || (echo "❌ 未找到文件 $(ACCOUNT_IMPORT_FILE)"; exit 1)
|
||||
@go run ./cmd/migratectl/main.go import --dsn "$(DB_URL)" --file "$(ACCOUNT_IMPORT_FILE)" \
|
||||
$(if $(ACCOUNT_IMPORT_MERGE),--merge) \
|
||||
$(if $(ACCOUNT_IMPORT_MERGE_STRATEGY),--merge-strategy "$(ACCOUNT_IMPORT_MERGE_STRATEGY)") \
|
||||
$(if $(ACCOUNT_IMPORT_DRY_RUN),--dry-run) \
|
||||
$(foreach UUID,$(ACCOUNT_IMPORT_MERGE_ALLOWLIST),--merge-allowlist $(UUID)) \
|
||||
$(ACCOUNT_IMPORT_EXTRA_FLAGS)
|
||||
|
||||
account-sync-push:
|
||||
@[ -f "$(ACCOUNT_SYNC_CONFIG)" ] || (echo "❌ 未找到配置文件 $(ACCOUNT_SYNC_CONFIG)"; exit 1)
|
||||
@go run ./cmd/syncctl/main.go push --config "$(ACCOUNT_SYNC_CONFIG)"
|
||||
|
||||
account-sync-pull:
|
||||
@[ -f "$(ACCOUNT_SYNC_CONFIG)" ] || (echo "❌ 未找到配置文件 $(ACCOUNT_SYNC_CONFIG)"; exit 1)
|
||||
@go run ./cmd/syncctl/main.go pull --config "$(ACCOUNT_SYNC_CONFIG)"
|
||||
|
||||
account-sync-mirror:
|
||||
@[ -f "$(ACCOUNT_SYNC_CONFIG)" ] || (echo "❌ 未找到配置文件 $(ACCOUNT_SYNC_CONFIG)"; exit 1)
|
||||
@go run ./cmd/syncctl/main.go mirror --config "$(ACCOUNT_SYNC_CONFIG)"
|
||||
|
||||
create-super-admin:
|
||||
@[ -n "$(SUPERADMIN_USERNAME)" ] && [ -n "$(SUPERADMIN_PASSWORD)" ] || (echo "❌ 请指定用户名与密码"; exit 1)
|
||||
|
||||
107
account/cmd/syncctl/main.go
Normal file
107
account/cmd/syncctl/main.go
Normal file
@ -0,0 +1,107 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"xcontrol/account/internal/syncer"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var cfgPath string
|
||||
root := &cobra.Command{
|
||||
Use: "syncctl",
|
||||
Short: "Synchronise account service data across regions",
|
||||
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
|
||||
if cfgPath == "" {
|
||||
return fmt.Errorf("--config is required")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
root.PersistentFlags().StringVar(&cfgPath, "config", "", "Path to synchronisation config file")
|
||||
|
||||
root.AddCommand(newPushCmd(&cfgPath))
|
||||
root.AddCommand(newPullCmd(&cfgPath))
|
||||
root.AddCommand(newMirrorCmd(&cfgPath))
|
||||
|
||||
if err := root.Execute(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func loadSyncer(configPath string) (*syncer.Syncer, func(), error) {
|
||||
cfg, err := syncer.LoadConfig(configPath)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
logger := log.New(os.Stdout, "[syncctl] ", log.LstdFlags)
|
||||
s := syncer.New(cfg, logger)
|
||||
return s, func() {}, nil
|
||||
}
|
||||
|
||||
func newPushCmd(cfgPath *string) *cobra.Command {
|
||||
return &cobra.Command{
|
||||
Use: "push",
|
||||
Short: "Export local snapshot and push to the remote environment",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
sync, cancel, err := loadSyncer(*cfgPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cancel()
|
||||
|
||||
ctx, cancelRun := context.WithTimeout(cmd.Context(), 5*time.Minute)
|
||||
defer cancelRun()
|
||||
return sync.Push(ctx)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newPullCmd(cfgPath *string) *cobra.Command {
|
||||
return &cobra.Command{
|
||||
Use: "pull",
|
||||
Short: "Fetch remote snapshot and import into the local environment",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
sync, cancel, err := loadSyncer(*cfgPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cancel()
|
||||
|
||||
ctx, cancelRun := context.WithTimeout(cmd.Context(), 5*time.Minute)
|
||||
defer cancelRun()
|
||||
return sync.Pull(ctx)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newMirrorCmd(cfgPath *string) *cobra.Command {
|
||||
return &cobra.Command{
|
||||
Use: "mirror",
|
||||
Short: "Perform push then pull to keep both sides aligned",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
sync, cancel, err := loadSyncer(*cfgPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cancel()
|
||||
|
||||
ctx, cancelRun := context.WithTimeout(cmd.Context(), 10*time.Minute)
|
||||
defer cancelRun()
|
||||
return sync.Mirror(ctx)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
// Ensure the default flag.CommandLine is not used by Cobra.
|
||||
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError)
|
||||
}
|
||||
50
account/config/sync.example.yaml
Normal file
50
account/config/sync.example.yaml
Normal file
@ -0,0 +1,50 @@
|
||||
# ============================================
|
||||
# 🔄 XControl Account Sync Configuration
|
||||
# ============================================
|
||||
# 将本地与远端账号服务通过 SSH 安全同步。默认提供单向 push/pull/mirror
|
||||
# 三种模式,可直接通过 `go run ./cmd/syncctl/main.go push --config config/sync.yaml`
|
||||
# 等命令执行。
|
||||
#
|
||||
# 请复制本文件为 config/sync.yaml 并按需修改。
|
||||
# ============================================
|
||||
|
||||
local:
|
||||
# 本地 PostgreSQL 连接地址,用于导入/导出账号数据
|
||||
dsn: "postgres://shenlan:password@127.0.0.1:5432/account?sslmode=disable"
|
||||
# 可选:按 email 关键字过滤导出的账号
|
||||
email_keyword: ""
|
||||
# 导出的快照文件路径(默认 account-export.yaml)
|
||||
export_path: "account-export.yaml"
|
||||
# 导入行为配置,支持 merge / dry-run / allowlist 等参数
|
||||
import:
|
||||
merge: false
|
||||
merge_strategy: ""
|
||||
dry_run: false
|
||||
allowlist: []
|
||||
|
||||
remote:
|
||||
# 远端服务器地址与 SSH 账户
|
||||
address: "cn-homepage.svc.plus"
|
||||
port: 22
|
||||
user: "root"
|
||||
|
||||
# SSH 私钥与 known_hosts 用于强化安全(推荐使用专用部署密钥)
|
||||
identity_file: "/root/.ssh/id_rsa"
|
||||
known_hosts_file: "/root/.ssh/known_hosts"
|
||||
|
||||
# 远端账号服务所在目录,用于执行 make account-export/import
|
||||
account_dir: "/var/www/XControl/account"
|
||||
|
||||
# 远端快照文件路径(默认 account-export.yaml,可使用绝对路径)
|
||||
export_path: "account-export.yaml"
|
||||
import_path: "account-export.yaml"
|
||||
|
||||
# 可选:覆盖远端的 ACCOUNT_EMAIL_KEYWORD 环境变量
|
||||
email_keyword: ""
|
||||
|
||||
# 可选:额外注入的环境变量,例如覆盖数据库连接信息
|
||||
env: {}
|
||||
|
||||
# SSH 连接超时时间
|
||||
timeout: 30s
|
||||
|
||||
156
account/internal/syncer/config.go
Normal file
156
account/internal/syncer/config.go
Normal file
@ -0,0 +1,156 @@
|
||||
package syncer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// Config defines how account snapshots should be synchronised between
|
||||
// environments.
|
||||
type Config struct {
|
||||
Local LocalConfig `yaml:"local"`
|
||||
Remote RemoteConfig `yaml:"remote"`
|
||||
}
|
||||
|
||||
// LocalConfig describes the local database connection and snapshot options.
|
||||
type LocalConfig struct {
|
||||
DSN string `yaml:"dsn"`
|
||||
EmailKeyword string `yaml:"email_keyword"`
|
||||
ExportPath string `yaml:"export_path"`
|
||||
Import ImportOptions `yaml:"import"`
|
||||
}
|
||||
|
||||
// ImportOptions configures how imported snapshots should be reconciled with the
|
||||
// target database.
|
||||
type ImportOptions struct {
|
||||
Merge bool `yaml:"merge"`
|
||||
MergeStrategy string `yaml:"merge_strategy"`
|
||||
DryRun bool `yaml:"dry_run"`
|
||||
Allowlist []string `yaml:"allowlist"`
|
||||
}
|
||||
|
||||
// RemoteConfig contains SSH connection details and remote runtime options.
|
||||
type RemoteConfig struct {
|
||||
Address string `yaml:"address"`
|
||||
Port int `yaml:"port"`
|
||||
User string `yaml:"user"`
|
||||
IdentityFile string `yaml:"identity_file"`
|
||||
KnownHostsFile string `yaml:"known_hosts_file"`
|
||||
AccountDir string `yaml:"account_dir"`
|
||||
ExportPath string `yaml:"export_path"`
|
||||
ImportPath string `yaml:"import_path"`
|
||||
Env map[string]string `yaml:"env"`
|
||||
Timeout Duration `yaml:"timeout"`
|
||||
RemoteEmail string `yaml:"email_keyword"`
|
||||
}
|
||||
|
||||
// Duration wraps time.Duration with YAML unmarshalling support.
|
||||
type Duration struct {
|
||||
time.Duration
|
||||
}
|
||||
|
||||
// UnmarshalYAML parses the duration from either a string (e.g. "30s") or an
|
||||
// integer representing seconds.
|
||||
func (d *Duration) UnmarshalYAML(node *yaml.Node) error {
|
||||
if node == nil {
|
||||
d.Duration = 0
|
||||
return nil
|
||||
}
|
||||
switch node.Kind {
|
||||
case yaml.ScalarNode:
|
||||
var text string
|
||||
if err := node.Decode(&text); err == nil {
|
||||
if text == "" {
|
||||
d.Duration = 0
|
||||
return nil
|
||||
}
|
||||
dur, err := time.ParseDuration(text)
|
||||
if err == nil {
|
||||
d.Duration = dur
|
||||
return nil
|
||||
}
|
||||
}
|
||||
var seconds int64
|
||||
if err := node.Decode(&seconds); err == nil {
|
||||
d.Duration = time.Duration(seconds) * time.Second
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("invalid duration %q", node.Value)
|
||||
default:
|
||||
return fmt.Errorf("unsupported YAML kind %d for duration", node.Kind)
|
||||
}
|
||||
}
|
||||
|
||||
// LoadConfig reads and validates the synchronisation configuration.
|
||||
func LoadConfig(path string) (*Config, error) {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read config: %w", err)
|
||||
}
|
||||
|
||||
var cfg Config
|
||||
if err := yaml.Unmarshal(data, &cfg); err != nil {
|
||||
return nil, fmt.Errorf("parse config: %w", err)
|
||||
}
|
||||
applyDefaults(&cfg)
|
||||
if err := validateConfig(&cfg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &cfg, nil
|
||||
}
|
||||
|
||||
func applyDefaults(cfg *Config) {
|
||||
if cfg.Local.ExportPath == "" {
|
||||
cfg.Local.ExportPath = "account-export.yaml"
|
||||
}
|
||||
if cfg.Remote.Port == 0 {
|
||||
cfg.Remote.Port = 22
|
||||
}
|
||||
if cfg.Remote.ExportPath == "" {
|
||||
cfg.Remote.ExportPath = "account-export.yaml"
|
||||
}
|
||||
if cfg.Remote.ImportPath == "" {
|
||||
cfg.Remote.ImportPath = cfg.Remote.ExportPath
|
||||
}
|
||||
if cfg.Remote.Timeout.Duration == 0 {
|
||||
cfg.Remote.Timeout.Duration = 30 * time.Second
|
||||
}
|
||||
}
|
||||
|
||||
func validateConfig(cfg *Config) error {
|
||||
if cfg.Local.DSN == "" {
|
||||
return fmt.Errorf("local.dsn must be configured")
|
||||
}
|
||||
if cfg.Remote.Address == "" {
|
||||
return fmt.Errorf("remote.address must be configured")
|
||||
}
|
||||
if cfg.Remote.User == "" {
|
||||
return fmt.Errorf("remote.user must be configured")
|
||||
}
|
||||
if cfg.Remote.AccountDir == "" {
|
||||
return fmt.Errorf("remote.account_dir must be configured")
|
||||
}
|
||||
if cfg.Remote.IdentityFile != "" {
|
||||
if _, err := os.Stat(cfg.Remote.IdentityFile); err != nil {
|
||||
return fmt.Errorf("remote.identity_file: %w", err)
|
||||
}
|
||||
abs, err := filepath.Abs(cfg.Remote.IdentityFile)
|
||||
if err == nil {
|
||||
cfg.Remote.IdentityFile = abs
|
||||
}
|
||||
}
|
||||
if cfg.Remote.KnownHostsFile != "" {
|
||||
if _, err := os.Stat(cfg.Remote.KnownHostsFile); err != nil {
|
||||
return fmt.Errorf("remote.known_hosts_file: %w", err)
|
||||
}
|
||||
abs, err := filepath.Abs(cfg.Remote.KnownHostsFile)
|
||||
if err == nil {
|
||||
cfg.Remote.KnownHostsFile = abs
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
377
account/internal/syncer/syncer.go
Normal file
377
account/internal/syncer/syncer.go
Normal file
@ -0,0 +1,377 @@
|
||||
package syncer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/sftp"
|
||||
"golang.org/x/crypto/ssh"
|
||||
"golang.org/x/crypto/ssh/knownhosts"
|
||||
"gopkg.in/yaml.v3"
|
||||
|
||||
"xcontrol/account/internal/migrate"
|
||||
)
|
||||
|
||||
// Syncer coordinates snapshot exports, transfers and imports between two
|
||||
// account service environments.
|
||||
type Syncer struct {
|
||||
cfg *Config
|
||||
logger *log.Logger
|
||||
}
|
||||
|
||||
// New constructs a Syncer using the provided configuration and logger. When
|
||||
// logger is nil the default log.Logger writing to stderr is used.
|
||||
func New(cfg *Config, logger *log.Logger) *Syncer {
|
||||
if logger == nil {
|
||||
logger = log.Default()
|
||||
}
|
||||
return &Syncer{cfg: cfg, logger: logger}
|
||||
}
|
||||
|
||||
// Push performs a one-way synchronisation from the local database to the remote
|
||||
// environment.
|
||||
func (s *Syncer) Push(ctx context.Context) error {
|
||||
s.logger.Println("⏳ exporting local account snapshot ...")
|
||||
dump, err := s.exportLocal(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
contents, err := encodeDump(dump)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
client, err := s.dialSSH(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
if err := s.uploadAndImport(ctx, client, contents); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.logger.Println("✅ push synchronisation finished")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Pull performs a one-way synchronisation from the remote environment into the
|
||||
// local database.
|
||||
func (s *Syncer) Pull(ctx context.Context) error {
|
||||
client, err := s.dialSSH(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
s.logger.Println("⏳ requesting remote export ...")
|
||||
if err := s.remoteExport(ctx, client); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.logger.Println("⏳ downloading remote snapshot ...")
|
||||
data, err := s.download(ctx, client)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dump, err := decodeDump(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.logger.Println("⏳ importing snapshot into local database ...")
|
||||
if err := s.importLocal(ctx, dump); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.logger.Println("✅ pull synchronisation finished")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Mirror executes both push and pull operations sequentially, ensuring both
|
||||
// environments converge to the most recent state.
|
||||
func (s *Syncer) Mirror(ctx context.Context) error {
|
||||
if err := s.Push(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
return s.Pull(ctx)
|
||||
}
|
||||
|
||||
func (s *Syncer) exportLocal(ctx context.Context) (*migrate.AccountDump, error) {
|
||||
exporter := migrate.NewExporter()
|
||||
ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
|
||||
defer cancel()
|
||||
dump, err := exporter.Export(ctx, s.cfg.Local.DSN, s.cfg.Local.EmailKeyword)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("local export: %w", err)
|
||||
}
|
||||
return dump, nil
|
||||
}
|
||||
|
||||
func encodeDump(dump *migrate.AccountDump) ([]byte, error) {
|
||||
var buf bytes.Buffer
|
||||
enc := yaml.NewEncoder(&buf)
|
||||
enc.SetIndent(2)
|
||||
if err := enc.Encode(dump); err != nil {
|
||||
enc.Close()
|
||||
return nil, fmt.Errorf("encode dump: %w", err)
|
||||
}
|
||||
if err := enc.Close(); err != nil {
|
||||
return nil, fmt.Errorf("finalise dump: %w", err)
|
||||
}
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
func decodeDump(data []byte) (*migrate.AccountDump, error) {
|
||||
var dump migrate.AccountDump
|
||||
if err := yaml.Unmarshal(data, &dump); err != nil {
|
||||
return nil, fmt.Errorf("decode dump: %w", err)
|
||||
}
|
||||
return &dump, nil
|
||||
}
|
||||
|
||||
func (s *Syncer) dialSSH(ctx context.Context) (*ssh.Client, error) {
|
||||
signer, err := s.publicKey()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfg := &ssh.ClientConfig{
|
||||
User: s.cfg.Remote.User,
|
||||
Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)},
|
||||
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
|
||||
Timeout: s.cfg.Remote.Timeout.Duration,
|
||||
}
|
||||
if s.cfg.Remote.KnownHostsFile != "" {
|
||||
callback, err := knownhosts.New(s.cfg.Remote.KnownHostsFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("load known hosts: %w", err)
|
||||
}
|
||||
cfg.HostKeyCallback = callback
|
||||
}
|
||||
|
||||
addr := net.JoinHostPort(s.cfg.Remote.Address, fmt.Sprintf("%d", s.cfg.Remote.Port))
|
||||
|
||||
type dialResult struct {
|
||||
client *ssh.Client
|
||||
err error
|
||||
}
|
||||
ch := make(chan dialResult, 1)
|
||||
go func() {
|
||||
client, err := ssh.Dial("tcp", addr, cfg)
|
||||
ch <- dialResult{client: client, err: err}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case res := <-ch:
|
||||
if res.err != nil {
|
||||
return nil, fmt.Errorf("ssh dial: %w", res.err)
|
||||
}
|
||||
return res.client, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Syncer) publicKey() (ssh.Signer, error) {
|
||||
keyPath := s.cfg.Remote.IdentityFile
|
||||
if keyPath == "" {
|
||||
return nil, fmt.Errorf("remote.identity_file must be configured to use public key auth")
|
||||
}
|
||||
key, err := os.ReadFile(keyPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
signer, err := ssh.ParsePrivateKey(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return signer, nil
|
||||
}
|
||||
|
||||
func (s *Syncer) uploadAndImport(ctx context.Context, client *ssh.Client, contents []byte) error {
|
||||
remotePath := s.remoteImportPath()
|
||||
sftpClient, err := sftp.NewClient(client)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create sftp client: %w", err)
|
||||
}
|
||||
defer sftpClient.Close()
|
||||
|
||||
if err := sftpClient.MkdirAll(path.Dir(remotePath)); err != nil {
|
||||
return fmt.Errorf("create remote dir: %w", err)
|
||||
}
|
||||
|
||||
file, err := sftpClient.OpenFile(remotePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open remote file: %w", err)
|
||||
}
|
||||
if _, err := file.Write(contents); err != nil {
|
||||
file.Close()
|
||||
return fmt.Errorf("write remote file: %w", err)
|
||||
}
|
||||
if err := file.Chmod(0o600); err != nil {
|
||||
s.logger.Printf("⚠️ unable to chmod remote file: %v", err)
|
||||
}
|
||||
if err := file.Close(); err != nil {
|
||||
return fmt.Errorf("close remote file: %w", err)
|
||||
}
|
||||
|
||||
s.logger.Println("⏳ triggering remote import ...")
|
||||
if err := s.remoteImport(ctx, client); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Syncer) remoteExport(ctx context.Context, client *ssh.Client) error {
|
||||
session, err := client.NewSession()
|
||||
if err != nil {
|
||||
return fmt.Errorf("new ssh session: %w", err)
|
||||
}
|
||||
defer session.Close()
|
||||
|
||||
if err := s.applyEnv(session); err != nil {
|
||||
return err
|
||||
}
|
||||
if s.cfg.Remote.RemoteEmail != "" {
|
||||
if err := session.Setenv("ACCOUNT_EMAIL_KEYWORD", s.cfg.Remote.RemoteEmail); err != nil {
|
||||
return fmt.Errorf("set env: %w", err)
|
||||
}
|
||||
}
|
||||
if err := session.Setenv("ACCOUNT_EXPORT_FILE", s.remoteExportPath()); err != nil {
|
||||
return fmt.Errorf("set export env: %w", err)
|
||||
}
|
||||
|
||||
cmd := fmt.Sprintf("cd %s && make account-export", shellQuote(s.cfg.Remote.AccountDir))
|
||||
return s.runSession(ctx, session, cmd)
|
||||
}
|
||||
|
||||
func (s *Syncer) remoteImport(ctx context.Context, client *ssh.Client) error {
|
||||
session, err := client.NewSession()
|
||||
if err != nil {
|
||||
return fmt.Errorf("new ssh session: %w", err)
|
||||
}
|
||||
defer session.Close()
|
||||
|
||||
if err := s.applyEnv(session); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := session.Setenv("ACCOUNT_IMPORT_FILE", s.remoteImportPath()); err != nil {
|
||||
return fmt.Errorf("set env: %w", err)
|
||||
}
|
||||
|
||||
cmd := fmt.Sprintf("cd %s && make account-import", shellQuote(s.cfg.Remote.AccountDir))
|
||||
return s.runSession(ctx, session, cmd)
|
||||
}
|
||||
|
||||
func (s *Syncer) applyEnv(session *ssh.Session) error {
|
||||
for key, value := range s.cfg.Remote.Env {
|
||||
if err := session.Setenv(key, value); err != nil {
|
||||
return fmt.Errorf("set env %s: %w", key, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Syncer) download(ctx context.Context, client *ssh.Client) ([]byte, error) {
|
||||
sftpClient, err := sftp.NewClient(client)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create sftp client: %w", err)
|
||||
}
|
||||
defer sftpClient.Close()
|
||||
|
||||
file, err := sftpClient.Open(s.remoteExportPath())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open remote file: %w", err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
var buf bytes.Buffer
|
||||
if _, err := io.Copy(&buf, file); err != nil {
|
||||
return nil, fmt.Errorf("read remote file: %w", err)
|
||||
}
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
func (s *Syncer) importLocal(ctx context.Context, dump *migrate.AccountDump) error {
|
||||
importer := migrate.NewImporter()
|
||||
opts := migrate.ImportOptions{
|
||||
Merge: s.cfg.Local.Import.Merge,
|
||||
MergeStrategy: migrate.MergeStrategy(s.cfg.Local.Import.MergeStrategy),
|
||||
DryRun: s.cfg.Local.Import.DryRun,
|
||||
}
|
||||
if len(s.cfg.Local.Import.Allowlist) > 0 {
|
||||
opts.Allowlist = make(map[string]struct{}, len(s.cfg.Local.Import.Allowlist))
|
||||
for _, uuid := range s.cfg.Local.Import.Allowlist {
|
||||
opts.Allowlist[uuid] = struct{}{}
|
||||
}
|
||||
}
|
||||
_, err := importer.Import(ctx, s.cfg.Local.DSN, dump, opts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("local import: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Syncer) remoteImportPath() string {
|
||||
return s.resolveRemotePath(s.cfg.Remote.ImportPath)
|
||||
}
|
||||
|
||||
func (s *Syncer) remoteExportPath() string {
|
||||
return s.resolveRemotePath(s.cfg.Remote.ExportPath)
|
||||
}
|
||||
|
||||
func (s *Syncer) resolveRemotePath(p string) string {
|
||||
remote := p
|
||||
if strings.HasPrefix(remote, "/") {
|
||||
return remote
|
||||
}
|
||||
return path.Join(s.cfg.Remote.AccountDir, remote)
|
||||
}
|
||||
|
||||
func (s *Syncer) runSession(ctx context.Context, session *ssh.Session, command string) error {
|
||||
var stdout, stderr bytes.Buffer
|
||||
session.Stdout = &stdout
|
||||
session.Stderr = &stderr
|
||||
|
||||
ch := make(chan error, 1)
|
||||
go func() {
|
||||
ch <- session.Run(command)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
session.Signal(ssh.SIGKILL)
|
||||
return ctx.Err()
|
||||
case err := <-ch:
|
||||
if stdout.Len() > 0 {
|
||||
s.logger.Print(strings.TrimSpace(stdout.String()))
|
||||
}
|
||||
if stderr.Len() > 0 {
|
||||
s.logger.Print(strings.TrimSpace(stderr.String()))
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("remote command %q failed: %w", command, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// shellQuote returns a shell-escaped representation of value suitable for use
|
||||
// in remote commands executed via /bin/sh -c.
|
||||
func shellQuote(value string) string {
|
||||
if value == "" {
|
||||
return "''"
|
||||
}
|
||||
return "'" + strings.ReplaceAll(value, "'", "'\\''") + "'"
|
||||
}
|
||||
12
go.mod
12
go.mod
@ -12,12 +12,13 @@ require (
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/jackc/pgx/v5 v5.7.5
|
||||
github.com/pgvector/pgvector-go v0.3.0
|
||||
github.com/pkg/sftp v1.13.10
|
||||
github.com/pquerna/otp v1.5.0
|
||||
github.com/redis/go-redis/v9 v9.12.0
|
||||
github.com/spf13/cobra v1.9.1
|
||||
github.com/yuin/goldmark v1.7.13
|
||||
golang.org/x/crypto v0.37.0
|
||||
golang.org/x/net v0.39.0
|
||||
golang.org/x/crypto v0.41.0
|
||||
golang.org/x/net v0.42.0
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
gorm.io/driver/postgres v1.5.4
|
||||
gorm.io/driver/sqlite v1.5.7
|
||||
@ -58,6 +59,7 @@ require (
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/kevinburke/ssh_config v1.2.0 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
|
||||
github.com/kr/fs v0.1.0 // indirect
|
||||
github.com/leodido/go-urn v1.4.0 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/mattn/go-sqlite3 v1.14.22 // indirect
|
||||
@ -72,9 +74,9 @@ require (
|
||||
github.com/ugorji/go/codec v1.2.12 // indirect
|
||||
github.com/xanzy/ssh-agent v0.3.3 // indirect
|
||||
golang.org/x/arch v0.7.0 // indirect
|
||||
golang.org/x/sync v0.13.0 // indirect
|
||||
golang.org/x/sys v0.32.0 // indirect
|
||||
golang.org/x/text v0.24.0 // indirect
|
||||
golang.org/x/sync v0.16.0 // indirect
|
||||
golang.org/x/sys v0.35.0 // indirect
|
||||
golang.org/x/text v0.28.0 // indirect
|
||||
google.golang.org/protobuf v1.34.2 // indirect
|
||||
gopkg.in/warnings.v0 v0.1.2 // indirect
|
||||
)
|
||||
|
||||
28
go.sum
28
go.sum
@ -116,6 +116,8 @@ github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa02
|
||||
github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
|
||||
github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
|
||||
github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M=
|
||||
github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=
|
||||
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
||||
@ -146,6 +148,8 @@ github.com/pjbgf/sha1cd v0.3.2 h1:a9wb0bp1oC2TGwStyn0Umc/IGKQnEgF0vVaZ8QF8eo4=
|
||||
github.com/pjbgf/sha1cd v0.3.2/go.mod h1:zQWigSxVmsHEZow5qaLtPYxpcKMMQpa09ixqBxuCS6A=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pkg/sftp v1.13.10 h1:+5FbKNTe5Z9aspU88DPIKJ9z2KZoaGCu6Sr6kKR/5mU=
|
||||
github.com/pkg/sftp v1.13.10/go.mod h1:bJ1a7uDhrX/4OII+agvy28lzRvQrmIQuaHrcI1HbeGA=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/pquerna/otp v1.5.0 h1:NMMR+WrmaqXU4EzdGJEE1aUUI0AMRzsp96fFFWNPwxs=
|
||||
@ -207,15 +211,15 @@ golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUu
|
||||
golang.org/x/arch v0.7.0 h1:pskyeJh/3AmoQ8CPE95vxHLqp1G1GfGNXTmcl9NEKTc=
|
||||
golang.org/x/arch v0.7.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
|
||||
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE=
|
||||
golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc=
|
||||
golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4=
|
||||
golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc=
|
||||
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8=
|
||||
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY=
|
||||
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY=
|
||||
golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E=
|
||||
golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610=
|
||||
golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
|
||||
golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs=
|
||||
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
|
||||
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
|
||||
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
|
||||
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
@ -224,14 +228,14 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
|
||||
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
|
||||
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.31.0 h1:erwDkOK1Msy6offm1mOgvspSkslFnIGsFnxOKoufg3o=
|
||||
golang.org/x/term v0.31.0/go.mod h1:R4BeIy7D95HzImkxGkTW1UQTtP54tio2RyHz7PwK0aw=
|
||||
golang.org/x/term v0.34.0 h1:O/2T7POpk0ZZ7MAzMeWFSg6S5IpWd/RXDlM9hgM3DR4=
|
||||
golang.org/x/term v0.34.0/go.mod h1:5jC53AEywhIVebHgPVeg0mj8OD3VO9OzclacVrqpaAw=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
|
||||
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
|
||||
golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
|
||||
golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
|
||||
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
|
||||
|
||||
Loading…
Reference in New Issue
Block a user