feat(account): add migratectl CLI and golang-migrate workflows (#460)

This commit is contained in:
shenlan 2025-10-08 21:56:50 +08:00 committed by GitHub
parent 2c7609d1d1
commit 08cc576d8e
20 changed files with 919 additions and 23 deletions

View File

@ -3,7 +3,7 @@ MAIN_FILE := ./cmd/accountsvc/main.go
PORT ?= 8080
OS := $(shell uname -s)
SCHEMA_FILE := ./sql/schema.sql
MIGRATION_FILES := $(shell ls -1 sql/*.migrate.*.sql 2>/dev/null | sort)
MIGRATION_FILES := $(shell ls -1 sql/migrations/*.up.sql 2>/dev/null | sort)
DB_NAME := account
@ -64,25 +64,14 @@ init-db:
# migrate-db target
# =========================================
migrate-db:
@echo ">>> 执行数据库 schema 迁移 (level/role/MFA/email verification)"
@echo "--------------------------------------------"
@echo "使用数据库连接: $(DB_URL)"
@if ! command -v psql >/dev/null 2>&1; then \
echo "未检测到 psql请先安装 PostgreSQL 客户端"; \
exit 1; \
fi
@if [ -z "$(MIGRATION_FILES)" ]; then \
echo "未找到迁移 SQL 文件 (*.migrate.*.sql)"; \
exit 0; \
fi
@for file in $(MIGRATION_FILES); do \
echo "执行迁移文件: $$file"; \
echo "--------------------------------------------"; \
psql "$(DB_URL)" -v ON_ERROR_STOP=1 -f $$file || { echo "$$file 执行失败"; exit 1; }; \
echo "$$file 执行完成"; \
echo ""; \
done
@echo "✅ 所有迁移执行完成"
@echo ">>> 执行数据库 schema 迁移 (golang-migrate)"
@echo "--------------------------------------------"
@echo "使用数据库连接: $(DB_URL)"
@if ! command -v go >/dev/null 2>&1; then \
echo "未检测到 go请先安装 Go 环境"; \
exit 1; \
fi
@go run ./cmd/migratectl/main.go migrate --dsn "$(DB_URL)" --dir account/sql/migrations
# 删除数据库(安全防呆)
drop-db:

View File

@ -0,0 +1,181 @@
package main
import (
"context"
"errors"
"fmt"
"os"
"time"
"github.com/spf13/cobra"
"xcontrol/account/internal/migrate"
)
const (
defaultMigrationDir = "account/sql/migrations"
defaultSchemaFile = "account/sql/schema.sql"
)
func main() {
ctx := context.Background()
rootCmd := newRootCmd()
if err := rootCmd.ExecuteContext(ctx); err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
}
func newRootCmd() *cobra.Command {
var migrationDir string
cmd := &cobra.Command{
Use: "migratectl",
Short: "XControl database migration orchestrator",
}
migrationDir = defaultMigrationDir
cmd.PersistentFlags().StringVar(&migrationDir, "dir", migrationDir, "directory containing migration files")
cmd.AddCommand(newMigrateCmd(&migrationDir))
cmd.AddCommand(newCleanCmd())
cmd.AddCommand(newCheckCmd())
cmd.AddCommand(newVerifyCmd())
cmd.AddCommand(newResetCmd(&migrationDir))
cmd.AddCommand(newVersionCmd(&migrationDir))
return cmd
}
func newMigrateCmd(dir *string) *cobra.Command {
var dsn string
cmd := &cobra.Command{
Use: "migrate",
Short: "Apply database migrations",
RunE: func(cmd *cobra.Command, args []string) error {
if dsn == "" {
return errors.New("--dsn is required")
}
runner := migrate.NewRunner(*dir)
ctx, cancel := context.WithTimeout(cmd.Context(), 5*time.Minute)
defer cancel()
return runner.Up(ctx, dsn)
},
}
cmd.Flags().StringVar(&dsn, "dsn", "", "PostgreSQL connection string")
return cmd
}
func newCleanCmd() *cobra.Command {
var (
dsn string
force bool
)
cmd := &cobra.Command{
Use: "clean",
Short: "Clean leftover database structures",
RunE: func(cmd *cobra.Command, args []string) error {
if dsn == "" {
return errors.New("--dsn is required")
}
cleaner := migrate.NewCleaner()
ctx, cancel := context.WithTimeout(cmd.Context(), 5*time.Minute)
defer cancel()
return cleaner.Clean(ctx, dsn, force)
},
}
cmd.Flags().StringVar(&dsn, "dsn", "", "PostgreSQL connection string")
cmd.Flags().BoolVar(&force, "force", false, "Confirm clean-up actions")
return cmd
}
func newCheckCmd() *cobra.Command {
var (
cnDSN string
globalDSN string
autoFix bool
)
cmd := &cobra.Command{
Use: "check",
Short: "Compare CN and Global schemas",
RunE: func(cmd *cobra.Command, args []string) error {
checker := migrate.NewChecker()
ctx, cancel := context.WithTimeout(cmd.Context(), 10*time.Minute)
defer cancel()
return checker.Check(ctx, cnDSN, globalDSN, autoFix)
},
}
cmd.Flags().StringVar(&cnDSN, "cn", "", "CN region PostgreSQL DSN")
cmd.Flags().StringVar(&globalDSN, "global", "", "Global region PostgreSQL DSN")
cmd.Flags().BoolVar(&autoFix, "auto-fix", false, "Automatically apply missing statements to CN")
return cmd
}
func newVerifyCmd() *cobra.Command {
var (
dsn string
schemaPath string
)
cmd := &cobra.Command{
Use: "verify",
Short: "Verify that the database matches schema.sql",
RunE: func(cmd *cobra.Command, args []string) error {
if dsn == "" {
return errors.New("--dsn is required")
}
if schemaPath == "" {
schemaPath = defaultSchemaFile
}
verifier := migrate.NewVerifier()
ctx, cancel := context.WithTimeout(cmd.Context(), 5*time.Minute)
defer cancel()
return verifier.Verify(ctx, dsn, schemaPath)
},
}
cmd.Flags().StringVar(&dsn, "dsn", "", "PostgreSQL connection string")
cmd.Flags().StringVar(&schemaPath, "schema", defaultSchemaFile, "Path to schema.sql reference file")
return cmd
}
func newResetCmd(dir *string) *cobra.Command {
var dsn string
cmd := &cobra.Command{
Use: "reset",
Short: "Drop public schema and re-run migrations",
RunE: func(cmd *cobra.Command, args []string) error {
if dsn == "" {
return errors.New("--dsn is required")
}
runner := migrate.NewRunner(*dir)
ctx, cancel := context.WithTimeout(cmd.Context(), 10*time.Minute)
defer cancel()
return runner.Reset(ctx, dsn)
},
}
cmd.Flags().StringVar(&dsn, "dsn", "", "PostgreSQL connection string")
return cmd
}
func newVersionCmd(dir *string) *cobra.Command {
var dsn string
cmd := &cobra.Command{
Use: "version",
Short: "Show current migration version",
RunE: func(cmd *cobra.Command, args []string) error {
if dsn == "" {
return errors.New("--dsn is required")
}
runner := migrate.NewRunner(*dir)
version, dirty, err := runner.Version(dsn)
if err != nil {
return err
}
if dirty {
fmt.Printf("Current migration version: %d (dirty)\n", version)
} else {
fmt.Printf("Current migration version: %d\n", version)
}
return nil
},
}
cmd.Flags().StringVar(&dsn, "dsn", "", "PostgreSQL connection string")
return cmd
}

View File

@ -0,0 +1,102 @@
package migrate
import (
"context"
"errors"
"fmt"
"strings"
"xcontrol/account/internal/utils"
)
// Checker compares two PostgreSQL schemas (CN vs Global) and optionally fixes
// missing structures on the CN side.
type Checker struct{}
func NewChecker() *Checker {
return &Checker{}
}
func (c *Checker) Check(ctx context.Context, cnDSN, globalDSN string, autoFix bool) error {
if cnDSN == "" || globalDSN == "" {
return errors.New("both --cn and --global DSNs are required")
}
cnDump, err := utils.RunPgDump(ctx, cnDSN)
if err != nil {
return fmt.Errorf("dump CN schema: %w", err)
}
globalDump, err := utils.RunPgDump(ctx, globalDSN)
if err != nil {
return fmt.Errorf("dump Global schema: %w", err)
}
cnStatements := utils.NormalizeStatements(cnDump)
globalStatements := utils.NormalizeStatements(globalDump)
onlyCN, onlyGlobal := utils.CompareStatements(cnStatements, globalStatements)
if len(onlyCN) == 0 && len(onlyGlobal) == 0 {
fmt.Println("✅ CN and Global schemas are consistent")
return nil
}
if len(onlyGlobal) > 0 {
fmt.Println("⚠️ Statements missing on CN (present on Global):")
for _, stmt := range onlyGlobal {
fmt.Printf(" + %s;\n", stmt)
}
}
if len(onlyCN) > 0 {
fmt.Println("⚠️ Statements only found on CN:")
for _, stmt := range onlyCN {
fmt.Printf(" - %s;\n", stmt)
}
}
if autoFix && len(onlyGlobal) > 0 {
if err := applyStatements(ctx, cnDSN, onlyGlobal); err != nil {
return fmt.Errorf("apply auto-fix: %w", err)
}
fmt.Println("✅ Auto-fix applied on CN database — please re-run check to confirm")
// After auto-fix we still return an error if CN has extra statements.
if len(onlyCN) > 0 {
return errors.New("schema differences remain on CN after auto-fix")
}
return nil
}
return errors.New("schema differences detected")
}
func applyStatements(ctx context.Context, dsn string, statements []string) error {
db, err := openDB(ctx, dsn)
if err != nil {
return err
}
defer db.Close()
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
for _, stmt := range statements {
if strings.Contains(stmt, "pglogical") {
continue
}
sql := stmt
if !strings.HasSuffix(sql, ";") {
sql += ";"
}
fmt.Printf("→ Applying fix: %s\n", sql)
if _, err := tx.ExecContext(ctx, sql); err != nil {
return err
}
fmt.Printf("✅ Applied fix\n")
}
return tx.Commit()
}

View File

@ -0,0 +1,138 @@
package migrate
import (
"context"
"database/sql"
"errors"
"fmt"
)
// Cleaner removes leftover structures such as invalid indexes or disabled
// triggers while keeping pglogical untouched.
type Cleaner struct{}
func NewCleaner() *Cleaner {
return &Cleaner{}
}
func (c *Cleaner) Clean(ctx context.Context, dsn string, force bool) error {
if !force {
return errors.New("clean requires --force confirmation")
}
db, err := openDB(ctx, dsn)
if err != nil {
return err
}
defer db.Close()
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
return err
}
defer tx.Rollback()
if err := dropInvalidIndexes(ctx, tx); err != nil {
return err
}
if err := dropDisabledTriggers(ctx, tx); err != nil {
return err
}
if err := dropTemporaryTables(ctx, tx); err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
fmt.Println("✅ Database clean-up completed")
return nil
}
func dropInvalidIndexes(ctx context.Context, tx *sql.Tx) error {
rows, err := tx.QueryContext(ctx, `
SELECT quote_ident(n.nspname) || '.' || quote_ident(c.relname)
FROM pg_index i
JOIN pg_class c ON c.oid = i.indexrelid
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname NOT IN ('pglogical', 'pg_catalog', 'information_schema')
AND (NOT i.indisvalid OR NOT i.indisready)
`)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var identifier string
if err := rows.Scan(&identifier); err != nil {
return err
}
fmt.Printf("→ Dropping invalid index %s\n", identifier)
if _, err := tx.ExecContext(ctx, fmt.Sprintf("DROP INDEX IF EXISTS %s", identifier)); err != nil {
return err
}
fmt.Printf("✅ Dropped index %s\n", identifier)
}
return rows.Err()
}
func dropDisabledTriggers(ctx context.Context, tx *sql.Tx) error {
rows, err := tx.QueryContext(ctx, `
SELECT quote_ident(n.nspname) || '.' || quote_ident(rel.relname) AS tbl,
quote_ident(t.tgname) AS trigger_name
FROM pg_trigger t
JOIN pg_class rel ON rel.oid = t.tgrelid
JOIN pg_namespace n ON n.oid = rel.relnamespace
WHERE t.tgenabled = 'D'
AND t.tgisinternal = false
AND n.nspname NOT IN ('pglogical', 'pg_catalog', 'information_schema')
`)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var table, trigger string
if err := rows.Scan(&table, &trigger); err != nil {
return err
}
fmt.Printf("→ Dropping disabled trigger %s on %s\n", trigger, table)
if _, err := tx.ExecContext(ctx, fmt.Sprintf("DROP TRIGGER IF EXISTS %s ON %s", trigger, table)); err != nil {
return err
}
fmt.Printf("✅ Dropped trigger %s on %s\n", trigger, table)
}
return rows.Err()
}
func dropTemporaryTables(ctx context.Context, tx *sql.Tx) error {
rows, err := tx.QueryContext(ctx, `
SELECT quote_ident(table_schema) || '.' || quote_ident(table_name)
FROM information_schema.tables
WHERE table_schema NOT IN ('pglogical', 'pg_catalog', 'information_schema')
AND (table_name LIKE 'tmp_%' OR table_name LIKE 'temp_%' OR table_name LIKE 'backup_%')
`)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var table string
if err := rows.Scan(&table); err != nil {
return err
}
fmt.Printf("→ Dropping temporary table %s\n", table)
if _, err := tx.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s CASCADE", table)); err != nil {
return err
}
fmt.Printf("✅ Dropped table %s\n", table)
}
return rows.Err()
}

View File

@ -0,0 +1,28 @@
package migrate
import (
"context"
"database/sql"
"time"
_ "github.com/jackc/pgx/v5/stdlib"
)
func openDB(ctx context.Context, dsn string) (*sql.DB, error) {
db, err := sql.Open("pgx", dsn)
if err != nil {
return nil, err
}
db.SetConnMaxLifetime(0)
db.SetConnMaxIdleTime(5 * time.Minute)
db.SetMaxIdleConns(5)
db.SetMaxOpenConns(10)
if err := db.PingContext(ctx); err != nil {
db.Close()
return nil, err
}
return db, nil
}

View File

@ -0,0 +1,183 @@
package migrate
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"github.com/golang-migrate/migrate/v4"
)
const defaultDir = "account/sql/migrations"
// Runner coordinates golang-migrate operations.
type Runner struct {
Dir string
}
// NewRunner creates a runner that reads migration files from dir. When dir is
// empty, the default directory under account/sql/migrations is used.
func NewRunner(dir string) *Runner {
if dir == "" {
dir = defaultDir
}
return &Runner{Dir: dir}
}
// Up executes all migrations that have not been applied yet. Each step logs
// its outcome to provide clear visibility.
func (r *Runner) Up(ctx context.Context, dsn string) error {
absDir, err := filepath.Abs(r.Dir)
if err != nil {
return err
}
migrations, err := r.loadMigrations(absDir)
if err != nil {
return err
}
m, err := migrate.New(fmt.Sprintf("file://%s", absDir), dsn)
if err != nil {
return err
}
defer closeMigrator(m)
currentVersion, dirty, err := m.Version()
if err != nil {
if errors.Is(err, migrate.ErrNilVersion) {
currentVersion = 0
} else {
return fmt.Errorf("fetch current version: %w", err)
}
}
if dirty {
return fmt.Errorf("database is in a dirty state at version %d; please fix manually", currentVersion)
}
applied := false
for _, migration := range migrations {
if migration.version <= currentVersion {
continue
}
fmt.Printf("→ Applying migration %s ...\n", migration.name)
if err := m.Migrate(migration.version); err != nil {
if errors.Is(err, migrate.ErrNoChange) {
fmt.Printf("✅ Migration %s already applied\n", migration.name)
continue
}
return fmt.Errorf("apply migration %s: %w", migration.name, err)
}
applied = true
fmt.Printf("✅ Migration %s applied\n", migration.name)
}
if !applied {
fmt.Println("✅ Database schema already up-to-date")
}
return nil
}
// Version reports the current schema version tracked by golang-migrate.
func (r *Runner) Version(dsn string) (uint, bool, error) {
absDir, err := filepath.Abs(r.Dir)
if err != nil {
return 0, false, err
}
m, err := migrate.New(fmt.Sprintf("file://%s", absDir), dsn)
if err != nil {
return 0, false, err
}
defer closeMigrator(m)
version, dirty, err := m.Version()
if err != nil {
if errors.Is(err, migrate.ErrNilVersion) {
return 0, false, nil
}
return 0, false, err
}
return version, dirty, nil
}
// Reset drops the public schema before replaying all migrations.
func (r *Runner) Reset(ctx context.Context, dsn string) error {
db, err := openDB(ctx, dsn)
if err != nil {
return err
}
defer db.Close()
fmt.Println("⚠️ Dropping public schema (preserving pglogical)...")
if _, err := db.ExecContext(ctx, "DROP SCHEMA IF EXISTS public CASCADE"); err != nil {
return fmt.Errorf("drop public schema: %w", err)
}
if _, err := db.ExecContext(ctx, "CREATE SCHEMA IF NOT EXISTS public"); err != nil {
return fmt.Errorf("recreate public schema: %w", err)
}
return r.Up(ctx, dsn)
}
func (r *Runner) loadMigrations(absDir string) ([]*migrationFile, error) {
entries, err := os.ReadDir(absDir)
if err != nil {
return nil, err
}
migrationMap := make(map[uint]*migrationFile)
for _, entry := range entries {
if entry.IsDir() {
continue
}
name := entry.Name()
if !strings.HasSuffix(name, ".up.sql") {
continue
}
parts := strings.SplitN(name, "_", 2)
if len(parts) != 2 {
continue
}
version, err := strconv.ParseUint(parts[0], 10, 64)
if err != nil {
continue
}
migrationMap[uint(version)] = &migrationFile{
version: uint(version),
name: name,
}
}
var migrations []*migrationFile
for _, m := range migrationMap {
migrations = append(migrations, m)
}
sort.Slice(migrations, func(i, j int) bool {
return migrations[i].version < migrations[j].version
})
return migrations, nil
}
func closeMigrator(m *migrate.Migrate) {
if m == nil {
return
}
_, _ = m.Close()
}
type migrationFile struct {
version uint
name string
}

View File

@ -0,0 +1,64 @@
package migrate
import (
"context"
"errors"
"fmt"
"os"
"xcontrol/account/internal/utils"
)
const defaultSchemaPath = "account/sql/schema.sql"
// Verifier validates that the live database matches the canonical schema.sql.
type Verifier struct{}
func NewVerifier() *Verifier {
return &Verifier{}
}
func (v *Verifier) Verify(ctx context.Context, dsn, schemaPath string) error {
if dsn == "" {
return errors.New("--dsn is required")
}
if schemaPath == "" {
schemaPath = defaultSchemaPath
}
dump, err := utils.RunPgDump(ctx, dsn)
if err != nil {
return fmt.Errorf("dump database schema: %w", err)
}
schemaBytes, err := os.ReadFile(schemaPath)
if err != nil {
return fmt.Errorf("read schema file: %w", err)
}
dbStatements := utils.NormalizeStatements(dump)
fileStatements := utils.NormalizeStatements(string(schemaBytes))
onlyDB, onlyFile := utils.CompareStatements(dbStatements, fileStatements)
if len(onlyDB) == 0 && len(onlyFile) == 0 {
fmt.Println("✅ Database schema matches schema.sql")
return nil
}
if len(onlyFile) > 0 {
fmt.Println("⚠️ Statements missing from database (present in schema.sql):")
for _, stmt := range onlyFile {
fmt.Printf(" + %s;\n", stmt)
}
}
if len(onlyDB) > 0 {
fmt.Println("⚠️ Extra statements found in database:")
for _, stmt := range onlyDB {
fmt.Printf(" - %s;\n", stmt)
}
}
return errors.New("schema mismatch detected")
}

View File

@ -0,0 +1,132 @@
package utils
import (
"bufio"
"sort"
"strings"
)
// NormalizeStatements extracts relevant DDL statements from the provided dump.
// Comments, empty lines and helper SET statements are ignored. Whitespace is
// normalised to make comparisons deterministic across environments.
func NormalizeStatements(dump string) []string {
var statements []string
var builder strings.Builder
flush := func() {
stmt := strings.TrimSpace(builder.String())
if stmt == "" {
builder.Reset()
return
}
stmt = strings.TrimSuffix(stmt, ";")
stmt = strings.TrimSpace(stmt)
if isRelevantStatement(stmt) {
normalized := collapseWhitespace(stmt)
statements = append(statements, normalized)
}
builder.Reset()
}
scanner := bufio.NewScanner(strings.NewReader(dump))
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" || strings.HasPrefix(line, "--") {
continue
}
if shouldSkipLine(line) {
continue
}
builder.WriteString(line)
if strings.HasSuffix(line, ";") {
flush()
} else {
builder.WriteString(" ")
}
}
flush()
sort.Strings(statements)
return statements
}
// CompareStatements returns the statements that are present only on the left
// or only on the right collection.
func CompareStatements(left, right []string) (onlyLeft, onlyRight []string) {
leftSet := make(map[string]struct{}, len(left))
for _, stmt := range left {
if strings.Contains(stmt, "pglogical") {
continue
}
leftSet[stmt] = struct{}{}
}
rightSet := make(map[string]struct{}, len(right))
for _, stmt := range right {
if strings.Contains(stmt, "pglogical") {
continue
}
rightSet[stmt] = struct{}{}
}
for stmt := range leftSet {
if _, ok := rightSet[stmt]; !ok {
onlyLeft = append(onlyLeft, stmt)
}
}
for stmt := range rightSet {
if _, ok := leftSet[stmt]; !ok {
onlyRight = append(onlyRight, stmt)
}
}
sort.Strings(onlyLeft)
sort.Strings(onlyRight)
return
}
func shouldSkipLine(line string) bool {
lower := strings.ToLower(line)
switch {
case strings.HasPrefix(lower, "set "):
return true
case strings.HasPrefix(lower, "select pg_catalog.set_config"):
return true
case strings.HasPrefix(lower, "reset "):
return true
case strings.HasPrefix(line, "\\connect "):
return true
case strings.HasPrefix(lower, "lock table"):
return true
}
return false
}
func isRelevantStatement(stmt string) bool {
lower := strings.ToLower(stmt)
switch {
case strings.HasPrefix(lower, "create table"):
return true
case strings.HasPrefix(lower, "alter table"):
return true
case strings.HasPrefix(lower, "create index"):
return true
case strings.HasPrefix(lower, "alter index"):
return true
case strings.HasPrefix(lower, "comment on table"):
return true
case strings.HasPrefix(lower, "comment on column"):
return true
case strings.HasPrefix(lower, "grant "):
return true
}
return false
}
func collapseWhitespace(input string) string {
fields := strings.Fields(input)
return strings.Join(fields, " ")
}

View File

@ -0,0 +1,56 @@
package utils
import (
"bytes"
"context"
"fmt"
"os"
"os/exec"
"strings"
)
// CommandResult captures stdout and stderr from an executed command.
type CommandResult struct {
Stdout string
Stderr string
}
// RunCommand executes the provided command with context awareness and
// returns the collected stdout/stderr output. Errors include contextual
// information as well as stderr to make troubleshooting easier.
func RunCommand(ctx context.Context, name string, args ...string) (*CommandResult, error) {
cmd := exec.CommandContext(ctx, name, args...)
cmd.Env = os.Environ()
var stdoutBuf, stderrBuf bytes.Buffer
cmd.Stdout = &stdoutBuf
cmd.Stderr = &stderrBuf
err := cmd.Run()
result := &CommandResult{Stdout: stdoutBuf.String(), Stderr: stderrBuf.String()}
if err != nil {
return result, fmt.Errorf("command %s %s failed: %w\n%s", name, strings.Join(args, " "), err, result.Stderr)
}
return result, nil
}
// RunPgDump executes pg_dump with the flags required for schema comparison
// and returns the textual dump. The pglogical schema is excluded to avoid
// touching logical replication internals.
func RunPgDump(ctx context.Context, dsn string) (string, error) {
args := []string{
"--schema-only",
"--no-owner",
"--no-privileges",
"--exclude-schema=pglogical",
"--dbname", dsn,
}
result, err := RunCommand(ctx, "pg_dump", args...)
if err != nil {
return "", err
}
return result.Stdout, nil
}

View File

@ -0,0 +1 @@
-- No-op down migration; manual rollback required.

View File

@ -0,0 +1 @@
-- No-op down migration; manual rollback required.

View File

@ -0,0 +1 @@
-- No-op down migration; manual rollback required.

View File

@ -0,0 +1 @@
-- No-op down migration; manual rollback required.

View File

@ -1,3 +1,12 @@
使用新的 `migratectl` CLI 可以在不同环境下快速执行迁移、校验和重置操作:
```bash
go run ./cmd/migratectl/main.go migrate --dsn "$DB_URL"
go run ./cmd/migratectl/main.go check --cn "$CN_DSN" --global "$GLOBAL_DSN"
```
以下命令展示了如何授予 pglogical schema 访问权限:
sudo -u postgres psql -d account -c "GRANT USAGE ON SCHEMA pglogical TO PUBLIC;"
-- 登录 postgres

5
go.mod
View File

@ -8,6 +8,7 @@ require (
github.com/gin-contrib/cors v1.6.0
github.com/gin-gonic/gin v1.9.1
github.com/go-git/go-git/v5 v5.16.2
github.com/golang-migrate/migrate/v4 v4.19.0
github.com/google/uuid v1.6.0
github.com/jackc/pgx/v5 v5.7.5
github.com/pgvector/pgvector-go v0.3.0
@ -45,6 +46,8 @@ require (
github.com/go-playground/validator/v10 v10.19.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
@ -72,6 +75,6 @@ require (
golang.org/x/sync v0.13.0 // indirect
golang.org/x/sys v0.32.0 // indirect
golang.org/x/text v0.24.0 // indirect
google.golang.org/protobuf v1.34.1 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
)

11
go.sum
View File

@ -76,6 +76,8 @@ github.com/go-playground/validator/v10 v10.19.0 h1:ol+5Fu+cSq9JD7SoSqe04GMI92cbn
github.com/go-playground/validator/v10 v10.19.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/golang-migrate/migrate/v4 v4.19.0 h1:RcjOnCGz3Or6HQYEJ/EEVLfWnmw9KnoigPSjzhCuaSE=
github.com/golang-migrate/migrate/v4 v4.19.0/go.mod h1:9dyEcu+hO+G9hPSw8AIg50yg622pXJsoHItQnDGZkI0=
github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 h1:f+oWsMOmNPc8JmEHVZIycC7hBoQxHH9pNKQORJNozsQ=
github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8/go.mod h1:wcDNUvekVysuuOpQKo3191zZyTpiI6se1N1ULghS0sw=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
@ -83,6 +85,11 @@ github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
@ -226,8 +233,8 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=