feat(account): add migratectl CLI and golang-migrate workflows (#460)
This commit is contained in:
parent
9ee3ee54b0
commit
0a8ff47c12
@ -3,7 +3,7 @@ MAIN_FILE := ./cmd/accountsvc/main.go
|
||||
PORT ?= 8080
|
||||
OS := $(shell uname -s)
|
||||
SCHEMA_FILE := ./sql/schema.sql
|
||||
MIGRATION_FILES := $(shell ls -1 sql/*.migrate.*.sql 2>/dev/null | sort)
|
||||
MIGRATION_FILES := $(shell ls -1 sql/migrations/*.up.sql 2>/dev/null | sort)
|
||||
|
||||
|
||||
DB_NAME := account
|
||||
@ -64,25 +64,14 @@ init-db:
|
||||
# migrate-db target
|
||||
# =========================================
|
||||
migrate-db:
|
||||
@echo ">>> 执行数据库 schema 迁移 (level/role/MFA/email verification)"
|
||||
@echo "--------------------------------------------"
|
||||
@echo "使用数据库连接: $(DB_URL)"
|
||||
@if ! command -v psql >/dev/null 2>&1; then \
|
||||
echo "未检测到 psql,请先安装 PostgreSQL 客户端"; \
|
||||
exit 1; \
|
||||
fi
|
||||
@if [ -z "$(MIGRATION_FILES)" ]; then \
|
||||
echo "未找到迁移 SQL 文件 (*.migrate.*.sql)"; \
|
||||
exit 0; \
|
||||
fi
|
||||
@for file in $(MIGRATION_FILES); do \
|
||||
echo "执行迁移文件: $$file"; \
|
||||
echo "--------------------------------------------"; \
|
||||
psql "$(DB_URL)" -v ON_ERROR_STOP=1 -f $$file || { echo "❌ $$file 执行失败"; exit 1; }; \
|
||||
echo "✅ $$file 执行完成"; \
|
||||
echo ""; \
|
||||
done
|
||||
@echo "✅ 所有迁移执行完成"
|
||||
@echo ">>> 执行数据库 schema 迁移 (golang-migrate)"
|
||||
@echo "--------------------------------------------"
|
||||
@echo "使用数据库连接: $(DB_URL)"
|
||||
@if ! command -v go >/dev/null 2>&1; then \
|
||||
echo "未检测到 go,请先安装 Go 环境"; \
|
||||
exit 1; \
|
||||
fi
|
||||
@go run ./cmd/migratectl/main.go migrate --dsn "$(DB_URL)" --dir account/sql/migrations
|
||||
|
||||
# 删除数据库(安全防呆)
|
||||
drop-db:
|
||||
|
||||
181
account/cmd/migratectl/main.go
Normal file
181
account/cmd/migratectl/main.go
Normal file
@ -0,0 +1,181 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"xcontrol/account/internal/migrate"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultMigrationDir = "account/sql/migrations"
|
||||
defaultSchemaFile = "account/sql/schema.sql"
|
||||
)
|
||||
|
||||
func main() {
|
||||
ctx := context.Background()
|
||||
rootCmd := newRootCmd()
|
||||
if err := rootCmd.ExecuteContext(ctx); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func newRootCmd() *cobra.Command {
|
||||
var migrationDir string
|
||||
cmd := &cobra.Command{
|
||||
Use: "migratectl",
|
||||
Short: "XControl database migration orchestrator",
|
||||
}
|
||||
|
||||
migrationDir = defaultMigrationDir
|
||||
cmd.PersistentFlags().StringVar(&migrationDir, "dir", migrationDir, "directory containing migration files")
|
||||
|
||||
cmd.AddCommand(newMigrateCmd(&migrationDir))
|
||||
cmd.AddCommand(newCleanCmd())
|
||||
cmd.AddCommand(newCheckCmd())
|
||||
cmd.AddCommand(newVerifyCmd())
|
||||
cmd.AddCommand(newResetCmd(&migrationDir))
|
||||
cmd.AddCommand(newVersionCmd(&migrationDir))
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
func newMigrateCmd(dir *string) *cobra.Command {
|
||||
var dsn string
|
||||
cmd := &cobra.Command{
|
||||
Use: "migrate",
|
||||
Short: "Apply database migrations",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
if dsn == "" {
|
||||
return errors.New("--dsn is required")
|
||||
}
|
||||
runner := migrate.NewRunner(*dir)
|
||||
ctx, cancel := context.WithTimeout(cmd.Context(), 5*time.Minute)
|
||||
defer cancel()
|
||||
return runner.Up(ctx, dsn)
|
||||
},
|
||||
}
|
||||
cmd.Flags().StringVar(&dsn, "dsn", "", "PostgreSQL connection string")
|
||||
return cmd
|
||||
}
|
||||
|
||||
func newCleanCmd() *cobra.Command {
|
||||
var (
|
||||
dsn string
|
||||
force bool
|
||||
)
|
||||
cmd := &cobra.Command{
|
||||
Use: "clean",
|
||||
Short: "Clean leftover database structures",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
if dsn == "" {
|
||||
return errors.New("--dsn is required")
|
||||
}
|
||||
cleaner := migrate.NewCleaner()
|
||||
ctx, cancel := context.WithTimeout(cmd.Context(), 5*time.Minute)
|
||||
defer cancel()
|
||||
return cleaner.Clean(ctx, dsn, force)
|
||||
},
|
||||
}
|
||||
cmd.Flags().StringVar(&dsn, "dsn", "", "PostgreSQL connection string")
|
||||
cmd.Flags().BoolVar(&force, "force", false, "Confirm clean-up actions")
|
||||
return cmd
|
||||
}
|
||||
|
||||
func newCheckCmd() *cobra.Command {
|
||||
var (
|
||||
cnDSN string
|
||||
globalDSN string
|
||||
autoFix bool
|
||||
)
|
||||
cmd := &cobra.Command{
|
||||
Use: "check",
|
||||
Short: "Compare CN and Global schemas",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
checker := migrate.NewChecker()
|
||||
ctx, cancel := context.WithTimeout(cmd.Context(), 10*time.Minute)
|
||||
defer cancel()
|
||||
return checker.Check(ctx, cnDSN, globalDSN, autoFix)
|
||||
},
|
||||
}
|
||||
cmd.Flags().StringVar(&cnDSN, "cn", "", "CN region PostgreSQL DSN")
|
||||
cmd.Flags().StringVar(&globalDSN, "global", "", "Global region PostgreSQL DSN")
|
||||
cmd.Flags().BoolVar(&autoFix, "auto-fix", false, "Automatically apply missing statements to CN")
|
||||
return cmd
|
||||
}
|
||||
|
||||
func newVerifyCmd() *cobra.Command {
|
||||
var (
|
||||
dsn string
|
||||
schemaPath string
|
||||
)
|
||||
cmd := &cobra.Command{
|
||||
Use: "verify",
|
||||
Short: "Verify that the database matches schema.sql",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
if dsn == "" {
|
||||
return errors.New("--dsn is required")
|
||||
}
|
||||
if schemaPath == "" {
|
||||
schemaPath = defaultSchemaFile
|
||||
}
|
||||
verifier := migrate.NewVerifier()
|
||||
ctx, cancel := context.WithTimeout(cmd.Context(), 5*time.Minute)
|
||||
defer cancel()
|
||||
return verifier.Verify(ctx, dsn, schemaPath)
|
||||
},
|
||||
}
|
||||
cmd.Flags().StringVar(&dsn, "dsn", "", "PostgreSQL connection string")
|
||||
cmd.Flags().StringVar(&schemaPath, "schema", defaultSchemaFile, "Path to schema.sql reference file")
|
||||
return cmd
|
||||
}
|
||||
|
||||
func newResetCmd(dir *string) *cobra.Command {
|
||||
var dsn string
|
||||
cmd := &cobra.Command{
|
||||
Use: "reset",
|
||||
Short: "Drop public schema and re-run migrations",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
if dsn == "" {
|
||||
return errors.New("--dsn is required")
|
||||
}
|
||||
runner := migrate.NewRunner(*dir)
|
||||
ctx, cancel := context.WithTimeout(cmd.Context(), 10*time.Minute)
|
||||
defer cancel()
|
||||
return runner.Reset(ctx, dsn)
|
||||
},
|
||||
}
|
||||
cmd.Flags().StringVar(&dsn, "dsn", "", "PostgreSQL connection string")
|
||||
return cmd
|
||||
}
|
||||
|
||||
func newVersionCmd(dir *string) *cobra.Command {
|
||||
var dsn string
|
||||
cmd := &cobra.Command{
|
||||
Use: "version",
|
||||
Short: "Show current migration version",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
if dsn == "" {
|
||||
return errors.New("--dsn is required")
|
||||
}
|
||||
runner := migrate.NewRunner(*dir)
|
||||
version, dirty, err := runner.Version(dsn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if dirty {
|
||||
fmt.Printf("Current migration version: %d (dirty)\n", version)
|
||||
} else {
|
||||
fmt.Printf("Current migration version: %d\n", version)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
cmd.Flags().StringVar(&dsn, "dsn", "", "PostgreSQL connection string")
|
||||
return cmd
|
||||
}
|
||||
102
account/internal/migrate/checker.go
Normal file
102
account/internal/migrate/checker.go
Normal file
@ -0,0 +1,102 @@
|
||||
package migrate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"xcontrol/account/internal/utils"
|
||||
)
|
||||
|
||||
// Checker compares two PostgreSQL schemas (CN vs Global) and optionally fixes
|
||||
// missing structures on the CN side.
|
||||
type Checker struct{}
|
||||
|
||||
func NewChecker() *Checker {
|
||||
return &Checker{}
|
||||
}
|
||||
|
||||
func (c *Checker) Check(ctx context.Context, cnDSN, globalDSN string, autoFix bool) error {
|
||||
if cnDSN == "" || globalDSN == "" {
|
||||
return errors.New("both --cn and --global DSNs are required")
|
||||
}
|
||||
|
||||
cnDump, err := utils.RunPgDump(ctx, cnDSN)
|
||||
if err != nil {
|
||||
return fmt.Errorf("dump CN schema: %w", err)
|
||||
}
|
||||
globalDump, err := utils.RunPgDump(ctx, globalDSN)
|
||||
if err != nil {
|
||||
return fmt.Errorf("dump Global schema: %w", err)
|
||||
}
|
||||
|
||||
cnStatements := utils.NormalizeStatements(cnDump)
|
||||
globalStatements := utils.NormalizeStatements(globalDump)
|
||||
|
||||
onlyCN, onlyGlobal := utils.CompareStatements(cnStatements, globalStatements)
|
||||
|
||||
if len(onlyCN) == 0 && len(onlyGlobal) == 0 {
|
||||
fmt.Println("✅ CN and Global schemas are consistent")
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(onlyGlobal) > 0 {
|
||||
fmt.Println("⚠️ Statements missing on CN (present on Global):")
|
||||
for _, stmt := range onlyGlobal {
|
||||
fmt.Printf(" + %s;\n", stmt)
|
||||
}
|
||||
}
|
||||
|
||||
if len(onlyCN) > 0 {
|
||||
fmt.Println("⚠️ Statements only found on CN:")
|
||||
for _, stmt := range onlyCN {
|
||||
fmt.Printf(" - %s;\n", stmt)
|
||||
}
|
||||
}
|
||||
|
||||
if autoFix && len(onlyGlobal) > 0 {
|
||||
if err := applyStatements(ctx, cnDSN, onlyGlobal); err != nil {
|
||||
return fmt.Errorf("apply auto-fix: %w", err)
|
||||
}
|
||||
fmt.Println("✅ Auto-fix applied on CN database — please re-run check to confirm")
|
||||
// After auto-fix we still return an error if CN has extra statements.
|
||||
if len(onlyCN) > 0 {
|
||||
return errors.New("schema differences remain on CN after auto-fix")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
return errors.New("schema differences detected")
|
||||
}
|
||||
|
||||
func applyStatements(ctx context.Context, dsn string, statements []string) error {
|
||||
db, err := openDB(ctx, dsn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
tx, err := db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
for _, stmt := range statements {
|
||||
if strings.Contains(stmt, "pglogical") {
|
||||
continue
|
||||
}
|
||||
sql := stmt
|
||||
if !strings.HasSuffix(sql, ";") {
|
||||
sql += ";"
|
||||
}
|
||||
fmt.Printf("→ Applying fix: %s\n", sql)
|
||||
if _, err := tx.ExecContext(ctx, sql); err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Printf("✅ Applied fix\n")
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
138
account/internal/migrate/cleaner.go
Normal file
138
account/internal/migrate/cleaner.go
Normal file
@ -0,0 +1,138 @@
|
||||
package migrate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// Cleaner removes leftover structures such as invalid indexes or disabled
|
||||
// triggers while keeping pglogical untouched.
|
||||
type Cleaner struct{}
|
||||
|
||||
func NewCleaner() *Cleaner {
|
||||
return &Cleaner{}
|
||||
}
|
||||
|
||||
func (c *Cleaner) Clean(ctx context.Context, dsn string, force bool) error {
|
||||
if !force {
|
||||
return errors.New("clean requires --force confirmation")
|
||||
}
|
||||
|
||||
db, err := openDB(ctx, dsn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
if err := dropInvalidIndexes(ctx, tx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := dropDisabledTriggers(ctx, tx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := dropTemporaryTables(ctx, tx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fmt.Println("✅ Database clean-up completed")
|
||||
return nil
|
||||
}
|
||||
|
||||
func dropInvalidIndexes(ctx context.Context, tx *sql.Tx) error {
|
||||
rows, err := tx.QueryContext(ctx, `
|
||||
SELECT quote_ident(n.nspname) || '.' || quote_ident(c.relname)
|
||||
FROM pg_index i
|
||||
JOIN pg_class c ON c.oid = i.indexrelid
|
||||
JOIN pg_namespace n ON n.oid = c.relnamespace
|
||||
WHERE n.nspname NOT IN ('pglogical', 'pg_catalog', 'information_schema')
|
||||
AND (NOT i.indisvalid OR NOT i.indisready)
|
||||
`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var identifier string
|
||||
if err := rows.Scan(&identifier); err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Printf("→ Dropping invalid index %s\n", identifier)
|
||||
if _, err := tx.ExecContext(ctx, fmt.Sprintf("DROP INDEX IF EXISTS %s", identifier)); err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Printf("✅ Dropped index %s\n", identifier)
|
||||
}
|
||||
|
||||
return rows.Err()
|
||||
}
|
||||
|
||||
func dropDisabledTriggers(ctx context.Context, tx *sql.Tx) error {
|
||||
rows, err := tx.QueryContext(ctx, `
|
||||
SELECT quote_ident(n.nspname) || '.' || quote_ident(rel.relname) AS tbl,
|
||||
quote_ident(t.tgname) AS trigger_name
|
||||
FROM pg_trigger t
|
||||
JOIN pg_class rel ON rel.oid = t.tgrelid
|
||||
JOIN pg_namespace n ON n.oid = rel.relnamespace
|
||||
WHERE t.tgenabled = 'D'
|
||||
AND t.tgisinternal = false
|
||||
AND n.nspname NOT IN ('pglogical', 'pg_catalog', 'information_schema')
|
||||
`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var table, trigger string
|
||||
if err := rows.Scan(&table, &trigger); err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Printf("→ Dropping disabled trigger %s on %s\n", trigger, table)
|
||||
if _, err := tx.ExecContext(ctx, fmt.Sprintf("DROP TRIGGER IF EXISTS %s ON %s", trigger, table)); err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Printf("✅ Dropped trigger %s on %s\n", trigger, table)
|
||||
}
|
||||
|
||||
return rows.Err()
|
||||
}
|
||||
|
||||
func dropTemporaryTables(ctx context.Context, tx *sql.Tx) error {
|
||||
rows, err := tx.QueryContext(ctx, `
|
||||
SELECT quote_ident(table_schema) || '.' || quote_ident(table_name)
|
||||
FROM information_schema.tables
|
||||
WHERE table_schema NOT IN ('pglogical', 'pg_catalog', 'information_schema')
|
||||
AND (table_name LIKE 'tmp_%' OR table_name LIKE 'temp_%' OR table_name LIKE 'backup_%')
|
||||
`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var table string
|
||||
if err := rows.Scan(&table); err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Printf("→ Dropping temporary table %s\n", table)
|
||||
if _, err := tx.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s CASCADE", table)); err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Printf("✅ Dropped table %s\n", table)
|
||||
}
|
||||
|
||||
return rows.Err()
|
||||
}
|
||||
28
account/internal/migrate/db.go
Normal file
28
account/internal/migrate/db.go
Normal file
@ -0,0 +1,28 @@
|
||||
package migrate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"time"
|
||||
|
||||
_ "github.com/jackc/pgx/v5/stdlib"
|
||||
)
|
||||
|
||||
func openDB(ctx context.Context, dsn string) (*sql.DB, error) {
|
||||
db, err := sql.Open("pgx", dsn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
db.SetConnMaxLifetime(0)
|
||||
db.SetConnMaxIdleTime(5 * time.Minute)
|
||||
db.SetMaxIdleConns(5)
|
||||
db.SetMaxOpenConns(10)
|
||||
|
||||
if err := db.PingContext(ctx); err != nil {
|
||||
db.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
183
account/internal/migrate/runner.go
Normal file
183
account/internal/migrate/runner.go
Normal file
@ -0,0 +1,183 @@
|
||||
package migrate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/golang-migrate/migrate/v4"
|
||||
)
|
||||
|
||||
const defaultDir = "account/sql/migrations"
|
||||
|
||||
// Runner coordinates golang-migrate operations.
|
||||
type Runner struct {
|
||||
Dir string
|
||||
}
|
||||
|
||||
// NewRunner creates a runner that reads migration files from dir. When dir is
|
||||
// empty, the default directory under account/sql/migrations is used.
|
||||
func NewRunner(dir string) *Runner {
|
||||
if dir == "" {
|
||||
dir = defaultDir
|
||||
}
|
||||
return &Runner{Dir: dir}
|
||||
}
|
||||
|
||||
// Up executes all migrations that have not been applied yet. Each step logs
|
||||
// its outcome to provide clear visibility.
|
||||
func (r *Runner) Up(ctx context.Context, dsn string) error {
|
||||
absDir, err := filepath.Abs(r.Dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
migrations, err := r.loadMigrations(absDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m, err := migrate.New(fmt.Sprintf("file://%s", absDir), dsn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closeMigrator(m)
|
||||
|
||||
currentVersion, dirty, err := m.Version()
|
||||
if err != nil {
|
||||
if errors.Is(err, migrate.ErrNilVersion) {
|
||||
currentVersion = 0
|
||||
} else {
|
||||
return fmt.Errorf("fetch current version: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if dirty {
|
||||
return fmt.Errorf("database is in a dirty state at version %d; please fix manually", currentVersion)
|
||||
}
|
||||
|
||||
applied := false
|
||||
for _, migration := range migrations {
|
||||
if migration.version <= currentVersion {
|
||||
continue
|
||||
}
|
||||
|
||||
fmt.Printf("→ Applying migration %s ...\n", migration.name)
|
||||
if err := m.Migrate(migration.version); err != nil {
|
||||
if errors.Is(err, migrate.ErrNoChange) {
|
||||
fmt.Printf("✅ Migration %s already applied\n", migration.name)
|
||||
continue
|
||||
}
|
||||
return fmt.Errorf("apply migration %s: %w", migration.name, err)
|
||||
}
|
||||
applied = true
|
||||
fmt.Printf("✅ Migration %s applied\n", migration.name)
|
||||
}
|
||||
|
||||
if !applied {
|
||||
fmt.Println("✅ Database schema already up-to-date")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Version reports the current schema version tracked by golang-migrate.
|
||||
func (r *Runner) Version(dsn string) (uint, bool, error) {
|
||||
absDir, err := filepath.Abs(r.Dir)
|
||||
if err != nil {
|
||||
return 0, false, err
|
||||
}
|
||||
|
||||
m, err := migrate.New(fmt.Sprintf("file://%s", absDir), dsn)
|
||||
if err != nil {
|
||||
return 0, false, err
|
||||
}
|
||||
defer closeMigrator(m)
|
||||
|
||||
version, dirty, err := m.Version()
|
||||
if err != nil {
|
||||
if errors.Is(err, migrate.ErrNilVersion) {
|
||||
return 0, false, nil
|
||||
}
|
||||
return 0, false, err
|
||||
}
|
||||
|
||||
return version, dirty, nil
|
||||
}
|
||||
|
||||
// Reset drops the public schema before replaying all migrations.
|
||||
func (r *Runner) Reset(ctx context.Context, dsn string) error {
|
||||
db, err := openDB(ctx, dsn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
fmt.Println("⚠️ Dropping public schema (preserving pglogical)...")
|
||||
if _, err := db.ExecContext(ctx, "DROP SCHEMA IF EXISTS public CASCADE"); err != nil {
|
||||
return fmt.Errorf("drop public schema: %w", err)
|
||||
}
|
||||
if _, err := db.ExecContext(ctx, "CREATE SCHEMA IF NOT EXISTS public"); err != nil {
|
||||
return fmt.Errorf("recreate public schema: %w", err)
|
||||
}
|
||||
|
||||
return r.Up(ctx, dsn)
|
||||
}
|
||||
|
||||
func (r *Runner) loadMigrations(absDir string) ([]*migrationFile, error) {
|
||||
entries, err := os.ReadDir(absDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
migrationMap := make(map[uint]*migrationFile)
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() {
|
||||
continue
|
||||
}
|
||||
name := entry.Name()
|
||||
if !strings.HasSuffix(name, ".up.sql") {
|
||||
continue
|
||||
}
|
||||
parts := strings.SplitN(name, "_", 2)
|
||||
if len(parts) != 2 {
|
||||
continue
|
||||
}
|
||||
version, err := strconv.ParseUint(parts[0], 10, 64)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
migrationMap[uint(version)] = &migrationFile{
|
||||
version: uint(version),
|
||||
name: name,
|
||||
}
|
||||
}
|
||||
|
||||
var migrations []*migrationFile
|
||||
for _, m := range migrationMap {
|
||||
migrations = append(migrations, m)
|
||||
}
|
||||
|
||||
sort.Slice(migrations, func(i, j int) bool {
|
||||
return migrations[i].version < migrations[j].version
|
||||
})
|
||||
|
||||
return migrations, nil
|
||||
}
|
||||
|
||||
func closeMigrator(m *migrate.Migrate) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
_, _ = m.Close()
|
||||
}
|
||||
|
||||
type migrationFile struct {
|
||||
version uint
|
||||
name string
|
||||
}
|
||||
64
account/internal/migrate/verifier.go
Normal file
64
account/internal/migrate/verifier.go
Normal file
@ -0,0 +1,64 @@
|
||||
package migrate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"xcontrol/account/internal/utils"
|
||||
)
|
||||
|
||||
const defaultSchemaPath = "account/sql/schema.sql"
|
||||
|
||||
// Verifier validates that the live database matches the canonical schema.sql.
|
||||
type Verifier struct{}
|
||||
|
||||
func NewVerifier() *Verifier {
|
||||
return &Verifier{}
|
||||
}
|
||||
|
||||
func (v *Verifier) Verify(ctx context.Context, dsn, schemaPath string) error {
|
||||
if dsn == "" {
|
||||
return errors.New("--dsn is required")
|
||||
}
|
||||
if schemaPath == "" {
|
||||
schemaPath = defaultSchemaPath
|
||||
}
|
||||
|
||||
dump, err := utils.RunPgDump(ctx, dsn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("dump database schema: %w", err)
|
||||
}
|
||||
|
||||
schemaBytes, err := os.ReadFile(schemaPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read schema file: %w", err)
|
||||
}
|
||||
|
||||
dbStatements := utils.NormalizeStatements(dump)
|
||||
fileStatements := utils.NormalizeStatements(string(schemaBytes))
|
||||
|
||||
onlyDB, onlyFile := utils.CompareStatements(dbStatements, fileStatements)
|
||||
|
||||
if len(onlyDB) == 0 && len(onlyFile) == 0 {
|
||||
fmt.Println("✅ Database schema matches schema.sql")
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(onlyFile) > 0 {
|
||||
fmt.Println("⚠️ Statements missing from database (present in schema.sql):")
|
||||
for _, stmt := range onlyFile {
|
||||
fmt.Printf(" + %s;\n", stmt)
|
||||
}
|
||||
}
|
||||
|
||||
if len(onlyDB) > 0 {
|
||||
fmt.Println("⚠️ Extra statements found in database:")
|
||||
for _, stmt := range onlyDB {
|
||||
fmt.Printf(" - %s;\n", stmt)
|
||||
}
|
||||
}
|
||||
|
||||
return errors.New("schema mismatch detected")
|
||||
}
|
||||
132
account/internal/utils/diff.go
Normal file
132
account/internal/utils/diff.go
Normal file
@ -0,0 +1,132 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"sort"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// NormalizeStatements extracts relevant DDL statements from the provided dump.
|
||||
// Comments, empty lines and helper SET statements are ignored. Whitespace is
|
||||
// normalised to make comparisons deterministic across environments.
|
||||
func NormalizeStatements(dump string) []string {
|
||||
var statements []string
|
||||
var builder strings.Builder
|
||||
|
||||
flush := func() {
|
||||
stmt := strings.TrimSpace(builder.String())
|
||||
if stmt == "" {
|
||||
builder.Reset()
|
||||
return
|
||||
}
|
||||
stmt = strings.TrimSuffix(stmt, ";")
|
||||
stmt = strings.TrimSpace(stmt)
|
||||
if isRelevantStatement(stmt) {
|
||||
normalized := collapseWhitespace(stmt)
|
||||
statements = append(statements, normalized)
|
||||
}
|
||||
builder.Reset()
|
||||
}
|
||||
|
||||
scanner := bufio.NewScanner(strings.NewReader(dump))
|
||||
for scanner.Scan() {
|
||||
line := strings.TrimSpace(scanner.Text())
|
||||
if line == "" || strings.HasPrefix(line, "--") {
|
||||
continue
|
||||
}
|
||||
if shouldSkipLine(line) {
|
||||
continue
|
||||
}
|
||||
|
||||
builder.WriteString(line)
|
||||
if strings.HasSuffix(line, ";") {
|
||||
flush()
|
||||
} else {
|
||||
builder.WriteString(" ")
|
||||
}
|
||||
}
|
||||
|
||||
flush()
|
||||
|
||||
sort.Strings(statements)
|
||||
return statements
|
||||
}
|
||||
|
||||
// CompareStatements returns the statements that are present only on the left
|
||||
// or only on the right collection.
|
||||
func CompareStatements(left, right []string) (onlyLeft, onlyRight []string) {
|
||||
leftSet := make(map[string]struct{}, len(left))
|
||||
for _, stmt := range left {
|
||||
if strings.Contains(stmt, "pglogical") {
|
||||
continue
|
||||
}
|
||||
leftSet[stmt] = struct{}{}
|
||||
}
|
||||
|
||||
rightSet := make(map[string]struct{}, len(right))
|
||||
for _, stmt := range right {
|
||||
if strings.Contains(stmt, "pglogical") {
|
||||
continue
|
||||
}
|
||||
rightSet[stmt] = struct{}{}
|
||||
}
|
||||
|
||||
for stmt := range leftSet {
|
||||
if _, ok := rightSet[stmt]; !ok {
|
||||
onlyLeft = append(onlyLeft, stmt)
|
||||
}
|
||||
}
|
||||
|
||||
for stmt := range rightSet {
|
||||
if _, ok := leftSet[stmt]; !ok {
|
||||
onlyRight = append(onlyRight, stmt)
|
||||
}
|
||||
}
|
||||
|
||||
sort.Strings(onlyLeft)
|
||||
sort.Strings(onlyRight)
|
||||
return
|
||||
}
|
||||
|
||||
func shouldSkipLine(line string) bool {
|
||||
lower := strings.ToLower(line)
|
||||
switch {
|
||||
case strings.HasPrefix(lower, "set "):
|
||||
return true
|
||||
case strings.HasPrefix(lower, "select pg_catalog.set_config"):
|
||||
return true
|
||||
case strings.HasPrefix(lower, "reset "):
|
||||
return true
|
||||
case strings.HasPrefix(line, "\\connect "):
|
||||
return true
|
||||
case strings.HasPrefix(lower, "lock table"):
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func isRelevantStatement(stmt string) bool {
|
||||
lower := strings.ToLower(stmt)
|
||||
switch {
|
||||
case strings.HasPrefix(lower, "create table"):
|
||||
return true
|
||||
case strings.HasPrefix(lower, "alter table"):
|
||||
return true
|
||||
case strings.HasPrefix(lower, "create index"):
|
||||
return true
|
||||
case strings.HasPrefix(lower, "alter index"):
|
||||
return true
|
||||
case strings.HasPrefix(lower, "comment on table"):
|
||||
return true
|
||||
case strings.HasPrefix(lower, "comment on column"):
|
||||
return true
|
||||
case strings.HasPrefix(lower, "grant "):
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func collapseWhitespace(input string) string {
|
||||
fields := strings.Fields(input)
|
||||
return strings.Join(fields, " ")
|
||||
}
|
||||
56
account/internal/utils/exec.go
Normal file
56
account/internal/utils/exec.go
Normal file
@ -0,0 +1,56 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// CommandResult captures stdout and stderr from an executed command.
|
||||
type CommandResult struct {
|
||||
Stdout string
|
||||
Stderr string
|
||||
}
|
||||
|
||||
// RunCommand executes the provided command with context awareness and
|
||||
// returns the collected stdout/stderr output. Errors include contextual
|
||||
// information as well as stderr to make troubleshooting easier.
|
||||
func RunCommand(ctx context.Context, name string, args ...string) (*CommandResult, error) {
|
||||
cmd := exec.CommandContext(ctx, name, args...)
|
||||
cmd.Env = os.Environ()
|
||||
|
||||
var stdoutBuf, stderrBuf bytes.Buffer
|
||||
cmd.Stdout = &stdoutBuf
|
||||
cmd.Stderr = &stderrBuf
|
||||
|
||||
err := cmd.Run()
|
||||
result := &CommandResult{Stdout: stdoutBuf.String(), Stderr: stderrBuf.String()}
|
||||
if err != nil {
|
||||
return result, fmt.Errorf("command %s %s failed: %w\n%s", name, strings.Join(args, " "), err, result.Stderr)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// RunPgDump executes pg_dump with the flags required for schema comparison
|
||||
// and returns the textual dump. The pglogical schema is excluded to avoid
|
||||
// touching logical replication internals.
|
||||
func RunPgDump(ctx context.Context, dsn string) (string, error) {
|
||||
args := []string{
|
||||
"--schema-only",
|
||||
"--no-owner",
|
||||
"--no-privileges",
|
||||
"--exclude-schema=pglogical",
|
||||
"--dbname", dsn,
|
||||
}
|
||||
|
||||
result, err := RunCommand(ctx, "pg_dump", args...)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return result.Stdout, nil
|
||||
}
|
||||
@ -0,0 +1 @@
|
||||
-- No-op down migration; manual rollback required.
|
||||
1
account/sql/migrations/202510070000_to_rbac.down.sql
Normal file
1
account/sql/migrations/202510070000_to_rbac.down.sql
Normal file
@ -0,0 +1 @@
|
||||
-- No-op down migration; manual rollback required.
|
||||
@ -0,0 +1 @@
|
||||
-- No-op down migration; manual rollback required.
|
||||
@ -0,0 +1 @@
|
||||
-- No-op down migration; manual rollback required.
|
||||
@ -1,3 +1,12 @@
|
||||
使用新的 `migratectl` CLI 可以在不同环境下快速执行迁移、校验和重置操作:
|
||||
|
||||
```bash
|
||||
go run ./cmd/migratectl/main.go migrate --dsn "$DB_URL"
|
||||
go run ./cmd/migratectl/main.go check --cn "$CN_DSN" --global "$GLOBAL_DSN"
|
||||
```
|
||||
|
||||
以下命令展示了如何授予 pglogical schema 访问权限:
|
||||
|
||||
sudo -u postgres psql -d account -c "GRANT USAGE ON SCHEMA pglogical TO PUBLIC;"
|
||||
|
||||
-- 登录 postgres
|
||||
|
||||
5
go.mod
5
go.mod
@ -8,6 +8,7 @@ require (
|
||||
github.com/gin-contrib/cors v1.6.0
|
||||
github.com/gin-gonic/gin v1.9.1
|
||||
github.com/go-git/go-git/v5 v5.16.2
|
||||
github.com/golang-migrate/migrate/v4 v4.19.0
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/jackc/pgx/v5 v5.7.5
|
||||
github.com/pgvector/pgvector-go v0.3.0
|
||||
@ -45,6 +46,8 @@ require (
|
||||
github.com/go-playground/validator/v10 v10.19.0 // indirect
|
||||
github.com/goccy/go-json v0.10.2 // indirect
|
||||
github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect
|
||||
github.com/hashicorp/errwrap v1.1.0 // indirect
|
||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
||||
@ -72,6 +75,6 @@ require (
|
||||
golang.org/x/sync v0.13.0 // indirect
|
||||
golang.org/x/sys v0.32.0 // indirect
|
||||
golang.org/x/text v0.24.0 // indirect
|
||||
google.golang.org/protobuf v1.34.1 // indirect
|
||||
google.golang.org/protobuf v1.34.2 // indirect
|
||||
gopkg.in/warnings.v0 v0.1.2 // indirect
|
||||
)
|
||||
|
||||
11
go.sum
11
go.sum
@ -76,6 +76,8 @@ github.com/go-playground/validator/v10 v10.19.0 h1:ol+5Fu+cSq9JD7SoSqe04GMI92cbn
|
||||
github.com/go-playground/validator/v10 v10.19.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
|
||||
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
|
||||
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
|
||||
github.com/golang-migrate/migrate/v4 v4.19.0 h1:RcjOnCGz3Or6HQYEJ/EEVLfWnmw9KnoigPSjzhCuaSE=
|
||||
github.com/golang-migrate/migrate/v4 v4.19.0/go.mod h1:9dyEcu+hO+G9hPSw8AIg50yg622pXJsoHItQnDGZkI0=
|
||||
github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 h1:f+oWsMOmNPc8JmEHVZIycC7hBoQxHH9pNKQORJNozsQ=
|
||||
github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8/go.mod h1:wcDNUvekVysuuOpQKo3191zZyTpiI6se1N1ULghS0sw=
|
||||
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
||||
@ -83,6 +85,11 @@ github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
||||
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
|
||||
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
||||
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
|
||||
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
|
||||
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
|
||||
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
|
||||
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
||||
@ -226,8 +233,8 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
|
||||
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
|
||||
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
|
||||
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
|
||||
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
|
||||
Loading…
Reference in New Issue
Block a user