#!/bin/bash
#==============================================================#
# File      :   check-repl
# Desc      :   Check replica identity on src cluster
# Time      :   {{ '%Y-%m-%d %H:%M' |strftime }}
# Path      :   {{ dir_path }}/check-db
# Deps      :   bash
# License   :   Apache-2.0 @ https://pigsty.io/docs/about/license/
# Copyright :   2018-2026  Ruohang Feng / Vonng (rh@vonng.com)
#==============================================================#


#--------------------------------------------------------------#
# Utils
#--------------------------------------------------------------#
__CN='\033[0m';__CK='\033[0;30m';__CR='\033[0;31m';__CG='\033[0;32m';
__CY='\033[0;33m';__CB='\033[0;34m';__CM='\033[0;35m';__CC='\033[0;36m';__CW='\033[0;37m';
function log_info() {  printf "[${__CG} OK ${__CN}] ${__CG}$*${__CN}\n";   }
function log_warn() {  printf "[${__CY}WARN${__CN}] ${__CY}$*${__CN}\n";   }
function log_error() { printf "[${__CR}FAIL${__CN}] ${__CR}$*${__CN}\n";   }
function log_debug() { printf "[${__CB}HINT${__CN}] ${__CB}$*${__CN}\n"; }
function log_input() { printf "[${__CM} IN ${__CN}] ${__CM}$*\n=> ${__CN}"; }
function log_hint()  { printf "${__CB}$*${__CN}\n"; }
function log_line()  { printf "${__CM}[$*] ===========================================${__CN}\n"; }


#--------------------------------------------------------------#
# Param
#--------------------------------------------------------------#
# check if MIGRATION_CONTEXT is defined as expected
EXPECTED_CONTEXT="{{ src_cls }}.{{ src_db }}"
if [[ "${MIGRATION_CONTEXT}" != "${EXPECTED_CONTEXT}" ]]; then
    log_error "MIGRATION_CONTEXT = ${MIGRATION_CONTEXT} != EXPECTED ${EXPECTED_CONTEXT}"
    log_hint "did you run . activate first?"
    exit 1
fi

CHECK_RESULT="data/check-replica-identity.data"
CHECK_SOLUTION="data/fix-replica-identity.sql"


#--------------------------------------------------------------#
# Execute
#--------------------------------------------------------------#
log_info   "check src table replica identity: ${SRCCLS}.${SRCDB}"
log_info   "  - SRC URL  : ${SRCPG}"
log_info   "  - DATABASE : ${SRCDB}"
log_info   "  - OUTPUT   : ${CHECK_RESULT}"

psql "${SRCPG}" -Xw > ${CHECK_RESULT} <<-EOF
    SELECT quote_ident(nspname) || '.' || quote_ident(relname) AS name,
           con.ri                                              AS keys,
           pg_size_pretty(pg_table_size(c.oid))                AS size,
           CASE relreplident
               WHEN 'd' THEN 'default'
               WHEN 'n' THEN 'nothing'
               WHEN 'f' THEN 'full'
               WHEN 'i' THEN 'index' END                       AS identity
    FROM pg_class c
             JOIN pg_namespace n ON c.relnamespace = n.oid,
         LATERAL (SELECT array_agg(contype) AS ri FROM pg_constraint WHERE conrelid = c.oid) con
    WHERE relkind = 'r' AND nspname !~ '^pg_' AND nspname !~ '^_' AND nspname !~ '^timescaledb' AND nspname !~ '^citus' AND nspname !~ '^columnar'
               AND nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast', 'repack', 'monitor')
    ORDER BY 2, 3;
EOF

log_info "check-repl result:\n"
cat ${CHECK_RESULT}


#--------------------------------------------------------------#
# Execute
#--------------------------------------------------------------#
log_info   "generate replica identity fix sql on ${CHECK_SOLUTION}"

psql "${SRCPG}" -AXwto "${CHECK_SOLUTION}" <<-EOF

SELECT fix || '  -- ' || pg_size_pretty(pg_table_size(relid)) -- , relname, fix, attnotnull
FROM (
         SELECT relid, name AS relname, uk, (quote_ident(nspname) || '.' || quote_ident(uk))::RegClass::OID AS ukid, uk, CASE WHEN uk IS NOT NULL THEN  'ALTER TABLE ' || name || ' REPLICA IDENTITY USING INDEX ' || uk ||';' ELSE  'ALTER TABLE ' || name || ' REPLICA IDENTITY FULL;' END AS fix FROM
             (SELECT nspname, quote_ident(nspname) || '.' || quote_ident(relname) AS name, c.oid AS relid, (SELECT idx.relname AS u FROM pg_catalog.pg_class tbl, pg_catalog.pg_class idx, pg_catalog.pg_index i WHERE tbl.oid = c.oid AND tbl.oid = i.indrelid AND i.indexrelid = idx.oid AND indisunique AND NOT indisprimary AND indisvalid LIMIT 1) AS uk,
                     CASE relreplident WHEN 'd' THEN 'default' WHEN 'n' THEN 'nothing' WHEN 'f' THEN 'full' WHEN 'i' THEN 'index' END AS ri
              FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid
              WHERE relkind = 'r' AND nspname !~ '^pg_' AND nspname !~ '^_' AND nspname !~ '^timescaledb' AND nspname !~ '^citus' AND nspname !~ '^columnar'
                             AND nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast', 'repack', 'monitor')
                AND NOT EXISTS(SELECT 1 FROM pg_constraint WHERE conrelid = c.oid AND contype = 'p') ORDER BY 2, 3 ) p
     ) t,
     LATERAL(SELECT json_object_agg(ra.attname, ra.attnotnull) AS attnotnull FROM (SELECT attname, attnotnull FROM pg_attribute WHERE attrelid = ukid ) rb JOIN (SELECT attname, attnotnull FROM pg_attribute WHERE attrelid = relid ) ra ON ra.attname = rb.attname) atts

EOF


log_info "check-repl fix suggestion:\n"
cat ${CHECK_SOLUTION}

echo ""
log_info "you can fix replica identity by running:\n"
log_hint "    $ psql '${SRCPG}' -Xwf ${CHECK_SOLUTION}"
