artifacts/oci/PostgreSQL/backup/backup.py

124 lines
4.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import subprocess
import logging
from datetime import datetime
from utils import load_config, setup_cron
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
# 配置日志输出到标准输出
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
logger = logging.getLogger(__name__)
# 加载配置
config = load_config()
def load_encryption_key():
key_path = '/app/encryption.key'
if not os.path.exists(key_path):
logger.error(f"未找到加密密钥文件:{key_path}")
sys.exit(1)
with open(key_path, 'rb') as key_file:
key = key_file.read()
return key
def encrypt_file(file_name, key):
if len(key) == 16 or len(key) == 32:
algorithm = algorithms.AES(key)
# 生成随机的 12 字节96 位IV
iv = os.urandom(12)
cipher = Cipher(algorithm, mode=modes.GCM(iv))
else:
logger.error("不支持的密钥长度,必须是 16 或 32 字节AES-128 或 AES-256")
sys.exit(1)
encryptor = cipher.encryptor()
with open(file_name, 'rb') as infile:
plaintext = infile.read()
ciphertext = encryptor.update(plaintext) + encryptor.finalize()
tag = encryptor.tag
# 将 IV、认证标签和密文写入文件
with open(file_name, 'wb') as outfile:
outfile.write(iv + tag + ciphertext)
def upload_to_cloud(file_name):
storage_provider = config['backup']['storage']['provider']
if storage_provider == 'aws':
import boto3
s3 = boto3.client(
's3',
aws_access_key_id=config['backup']['storage']['access_key'],
aws_secret_access_key=config['backup']['storage']['secret_key']
)
s3.upload_file(file_name, config['backup']['storage']['bucket_name'], file_name)
logger.info(f"备份文件已上传到 AWS S3{file_name}")
elif storage_provider == 'aliyun':
import oss2
auth = oss2.Auth(
config['backup']['storage']['access_key'],
config['backup']['storage']['secret_key']
)
bucket = oss2.Bucket(auth, config['backup']['storage']['endpoint'], config['backup']['storage']['bucket_name'])
with open(file_name, 'rb') as fileobj:
bucket.put_object(file_name, fileobj)
logger.info(f"备份文件已上传到阿里云 OSS{file_name}")
else:
logger.error(f"不支持的存储提供商:{storage_provider}")
sys.exit(1)
def backup():
logger.info("开始备份过程")
backup_type = config['backup'].get('type', 'full')
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
backup_file = f"backup_{backup_type}_{timestamp}.sql"
# 执行 pg_dump 命令
pg_dump_cmd = [
'pg_dump',
'-h', os.environ.get('DB_HOST', 'localhost'),
'-U', os.environ.get('DB_USER', 'postgres'),
os.environ.get('DB_NAME', 'postgres'),
'-F', 'c', # 使用自定义格式,便于压缩
'-b', # 包含大对象
'-f', backup_file
]
try:
subprocess.run(pg_dump_cmd, check=True, env=os.environ)
logger.info(f"数据库备份成功:{backup_file}")
except subprocess.CalledProcessError as e:
logger.error(f"数据库备份失败:{e}")
sys.exit(1)
# 默认启用加密
encryption_enabled = config['backup'].get('encryption', True)
if encryption_enabled:
key = load_encryption_key()
encrypt_file(backup_file, key)
logger.info(f"备份文件已加密:{backup_file}")
# 上传到云存储
upload_to_cloud(backup_file)
# 删除本地备份文件
os.remove(backup_file)
logger.info("本地备份文件已删除")
def verify_backup():
# 实现备份验证逻辑
logger.info("开始备份验证过程")
# 这里可以实现下载备份文件,尝试解密和解压,确保备份文件有效
logger.info("备份验证完成")
if __name__ == "__main__":
if 'setup_cron' in sys.argv:
setup_cron()
elif 'run_backup' in sys.argv:
backup()
elif 'verify_backup' in sys.argv:
verify_backup()
else:
logger.error("请指定 'setup_cron''run_backup''verify_backup'")