Add PostgreSQL backup scripts, configuration files, and Dockerfile for the Python 3.10 base image

This commit is contained in:
Haitao Pan 2024-11-19 12:12:08 +08:00
parent 971c65ac09
commit c380363377
10 changed files with 469 additions and 0 deletions

View File

@ -0,0 +1,23 @@
FROM python:3.10-base
# 安装必要的软件包
RUN apt-get update && \
apt-get install -y --no-install-recommends \
postgresql-client cron && \
rm -rf /var/lib/apt/lists/*
# 设置工作目录
WORKDIR /app
# 复制依赖文件并安装 Python 库
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用程序文件
COPY backup.py restore.py utils.py config.yaml entrypoint.sh ./
# 赋予 entrypoint.sh 执行权限
RUN chmod +x /app/entrypoint.sh
# 设置入口点
ENTRYPOINT ["/app/entrypoint.sh"]

View File

@ -0,0 +1,57 @@
目录结构
├── Dockerfile
├── entrypoint.sh
├── requirements.txt
├── backup.py
├── restore.py
├── utils.py
├── config.yaml
```i
构建和运行容器
构建镜像:
bash
复制代码
docker build -t postgres-backup:latest .
运行容器:
步骤 1生成加密密钥
安装 cryptography 库(如果尚未安装):
bash
复制代码
pip install cryptography
生成 AES-256 位加密密钥:
bash
复制代码
python generate_key.py AES-256 /path/to/your/encryption.key
步骤 2设置文件权限
bash
复制代码
chmod 600 /path/to/your/encryption.key
步骤 3运行 Docker 容器
bash
复制代码
docker run -d \
--name postgres-backup-container \
-v /path/to/your/config.yaml:/app/config.yaml \
-v /path/to/your/encryption.key:/app/encryption.key \
-e DB_HOST=your_db_host \
-e DB_USER=your_db_user \
-e DB_NAME=your_db_name \
postgres-backup:latest
请根据您的实际数据库连接信息替换 your_db_host、your_db_user 和 your_db_name。
运行示例
备份
手动执行备份:
bash
复制代码
python /app/backup.py run_backup
恢复
手动执行恢复:
bash
复制代码
python /app/restore.py run_restore

View File

@ -0,0 +1,123 @@
import os
import sys
import subprocess
import logging
from datetime import datetime
from utils import load_config, setup_cron
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
# 配置日志输出到标准输出
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
logger = logging.getLogger(__name__)
# 加载配置
config = load_config()
def load_encryption_key():
key_path = '/app/encryption.key'
if not os.path.exists(key_path):
logger.error(f"未找到加密密钥文件:{key_path}")
sys.exit(1)
with open(key_path, 'rb') as key_file:
key = key_file.read()
return key
def encrypt_file(file_name, key):
if len(key) == 16 or len(key) == 32:
algorithm = algorithms.AES(key)
# 生成随机的 12 字节96 位IV
iv = os.urandom(12)
cipher = Cipher(algorithm, mode=modes.GCM(iv))
else:
logger.error("不支持的密钥长度,必须是 16 或 32 字节AES-128 或 AES-256")
sys.exit(1)
encryptor = cipher.encryptor()
with open(file_name, 'rb') as infile:
plaintext = infile.read()
ciphertext = encryptor.update(plaintext) + encryptor.finalize()
tag = encryptor.tag
# 将 IV、认证标签和密文写入文件
with open(file_name, 'wb') as outfile:
outfile.write(iv + tag + ciphertext)
def upload_to_cloud(file_name):
storage_provider = config['backup']['storage']['provider']
if storage_provider == 'aws':
import boto3
s3 = boto3.client(
's3',
aws_access_key_id=config['backup']['storage']['access_key'],
aws_secret_access_key=config['backup']['storage']['secret_key']
)
s3.upload_file(file_name, config['backup']['storage']['bucket_name'], file_name)
logger.info(f"备份文件已上传到 AWS S3{file_name}")
elif storage_provider == 'aliyun':
import oss2
auth = oss2.Auth(
config['backup']['storage']['access_key'],
config['backup']['storage']['secret_key']
)
bucket = oss2.Bucket(auth, config['backup']['storage']['endpoint'], config['backup']['storage']['bucket_name'])
with open(file_name, 'rb') as fileobj:
bucket.put_object(file_name, fileobj)
logger.info(f"备份文件已上传到阿里云 OSS{file_name}")
else:
logger.error(f"不支持的存储提供商:{storage_provider}")
sys.exit(1)
def backup():
logger.info("开始备份过程")
backup_type = config['backup'].get('type', 'full')
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
backup_file = f"backup_{backup_type}_{timestamp}.sql"
# 执行 pg_dump 命令
pg_dump_cmd = [
'pg_dump',
'-h', os.environ.get('DB_HOST', 'localhost'),
'-U', os.environ.get('DB_USER', 'postgres'),
os.environ.get('DB_NAME', 'postgres'),
'-F', 'c', # 使用自定义格式,便于压缩
'-b', # 包含大对象
'-f', backup_file
]
try:
subprocess.run(pg_dump_cmd, check=True, env=os.environ)
logger.info(f"数据库备份成功:{backup_file}")
except subprocess.CalledProcessError as e:
logger.error(f"数据库备份失败:{e}")
sys.exit(1)
# 默认启用加密
encryption_enabled = config['backup'].get('encryption', True)
if encryption_enabled:
key = load_encryption_key()
encrypt_file(backup_file, key)
logger.info(f"备份文件已加密:{backup_file}")
# 上传到云存储
upload_to_cloud(backup_file)
# 删除本地备份文件
os.remove(backup_file)
logger.info("本地备份文件已删除")
def verify_backup():
# 实现备份验证逻辑
logger.info("开始备份验证过程")
# 这里可以实现下载备份文件,尝试解密和解压,确保备份文件有效
logger.info("备份验证完成")
if __name__ == "__main__":
if 'setup_cron' in sys.argv:
setup_cron()
elif 'run_backup' in sys.argv:
backup()
elif 'verify_backup' in sys.argv:
verify_backup()
else:
logger.error("请指定 'setup_cron''run_backup''verify_backup'")

View File

@ -0,0 +1,21 @@
backup:
type: full # 选项full全量incremental增量
schedule: "0 2 * * *" # 自动备份的 Cron 表达式,默认为每天凌晨 2 点
encryption: true
storage:
provider: aliyun # 选项awsgcpazurealiyun阿里云 OSS
bucket_name: your-bucket-name
access_key: YOUR_ACCESS_KEY
secret_key: YOUR_SECRET_KEY
endpoint: oss-cn-hangzhou.aliyuncs.com # 阿里云 OSS 的 Endpoint
restore:
source: latest # 或者指定特定的备份文件
storage:
provider: aliyun
bucket_name: your-bucket-name
access_key: YOUR_ACCESS_KEY
secret_key: YOUR_SECRET_KEY
endpoint: oss-cn-hangzhou.aliyuncs.com
verification:
enabled: true
schedule: "0 3 * * *" # 备份验证的 Cron 表达式

View File

@ -0,0 +1,7 @@
#!/bin/bash
# 调用 backup.py 的 setup_cron 功能,生成 cron 配置
python /app/backup.py setup_cron
# 以前台模式启动 cron 服务
cron -f

View File

@ -0,0 +1,39 @@
import os
import sys
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import constant_time
from cryptography.hazmat.backends import default_backend
def generate_key(strength, key_file_path):
# 验证加密强度参数
if strength not in ['AES-128', 'AES-256', 'AES-512']:
print("错误:加密强度必须是 AES-128、AES-256 或 AES-512")
sys.exit(1)
# 设置密钥长度
if strength == 'AES-128':
key_length = 16 # 16 字节 = 128 位
elif strength == 'AES-256':
key_length = 32 # 32 字节 = 256 位
elif strength == 'AES-512':
key_length = 64 # 64 字节 = 512 位(非标准 AES 密钥长度)
# 生成随机密钥
key = os.urandom(key_length)
# 将密钥写入指定的文件路径
with open(key_file_path, 'wb') as key_file:
key_file.write(key)
print(f'加密密钥已生成并保存到 {key_file_path},密钥强度:{strength}')
if __name__ == '__main__':
if len(sys.argv) != 3:
print("用法python generate_key.py AES-128|AES-256|AES-512 /path/to/your/encryption.key")
sys.exit(1)
strength = sys.argv[1]
key_file_path = sys.argv[2]
generate_key(strength, key_file_path)

View File

@ -0,0 +1,5 @@
PyYAML
boto3 # 用于 AWS S3 和阿里云 OSS
oss2 # 阿里云 OSS 的官方 SDK
cryptography # 用于加密
psycopg2-binary # PostgreSQL 的 Python 适配器

View File

@ -0,0 +1,141 @@
import os
import sys
import subprocess
import logging
from utils import load_config
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
# 配置日志输出到标准输出
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
logger = logging.getLogger(__name__)
# 加载配置
config = load_config()
def load_encryption_key():
key_path = '/app/encryption.key'
if not os.path.exists(key_path):
logger.error(f"未找到加密密钥文件:{key_path}")
sys.exit(1)
with open(key_path, 'rb') as key_file:
key = key_file.read()
return key
def decrypt_file(file_name, key):
with open(file_name, 'rb') as infile:
iv = infile.read(12)
tag = infile.read(16)
ciphertext = infile.read()
if len(key) == 16 or len(key) == 32:
algorithm = algorithms.AES(key)
cipher = Cipher(algorithm, mode=modes.GCM(iv, tag))
else:
logger.error("不支持的密钥长度,必须是 16 或 32 字节AES-128 或 AES-256")
sys.exit(1)
decryptor = cipher.decryptor()
try:
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
except Exception as e:
logger.error(f"解密失败:{e}")
sys.exit(1)
with open(file_name, 'wb') as outfile:
outfile.write(plaintext)
def download_from_cloud(file_name):
storage_provider = config['restore']['storage']['provider']
if storage_provider == 'aws':
import boto3
s3 = boto3.client(
's3',
aws_access_key_id=config['restore']['storage']['access_key'],
aws_secret_access_key=config['restore']['storage']['secret_key']
)
s3.download_file(config['restore']['storage']['bucket_name'], file_name, file_name)
logger.info(f"备份文件已从 AWS S3 下载:{file_name}")
elif storage_provider == 'aliyun':
import oss2
auth = oss2.Auth(
config['restore']['storage']['access_key'],
config['restore']['storage']['secret_key']
)
bucket = oss2.Bucket(auth, config['restore']['storage']['endpoint'], config['restore']['storage']['bucket_name'])
bucket.get_object_to_file(file_name, file_name)
logger.info(f"备份文件已从阿里云 OSS 下载:{file_name}")
else:
logger.error(f"不支持的存储提供商:{storage_provider}")
sys.exit(1)
def restore():
logger.info("开始恢复过程")
source = config['restore']['source']
storage_provider = config['restore']['storage']['provider']
bucket_name = config['restore']['storage']['bucket_name']
# 如果 source 是 'latest',获取最新的备份文件名
if source == 'latest':
if storage_provider == 'aws':
import boto3
s3 = boto3.client(
's3',
aws_access_key_id=config['restore']['storage']['access_key'],
aws_secret_access_key=config['restore']['storage']['secret_key']
)
objects = s3.list_objects_v2(Bucket=bucket_name)
backups = [obj['Key'] for obj in objects.get('Contents', []) if obj['Key'].startswith('backup_')]
elif storage_provider == 'aliyun':
import oss2
auth = oss2.Auth(
config['restore']['storage']['access_key'],
config['restore']['storage']['secret_key']
)
bucket = oss2.Bucket(auth, config['restore']['storage']['endpoint'], bucket_name)
backups = [obj.key for obj in oss2.ObjectIterator(bucket) if obj.key.startswith('backup_')]
else:
logger.error(f"不支持的存储提供商:{storage_provider}")
sys.exit(1)
if not backups:
logger.error("未找到任何备份文件")
sys.exit(1)
backups.sort()
source = backups[-1] # 获取最新的备份文件名
# 下载备份文件
logger.info(f"正在下载备份文件:{source}")
download_from_cloud(source)
# 默认启用加密
encryption_enabled = config['backup'].get('encryption', True)
if encryption_enabled:
key = load_encryption_key()
decrypt_file(source, key)
logger.info(f"备份文件已解密:{source}")
# 使用 pg_restore 恢复数据库
pg_restore_cmd = [
'pg_restore',
'-h', os.environ.get('DB_HOST', 'localhost'),
'-U', os.environ.get('DB_USER', 'postgres'),
'-d', os.environ.get('DB_NAME', 'postgres'),
'--clean', # 在恢复前删除现有的对象
source
]
try:
subprocess.run(pg_restore_cmd, check=True, env=os.environ)
logger.info("数据库恢复完成")
except subprocess.CalledProcessError as e:
logger.error(f"数据库恢复失败:{e}")
sys.exit(1)
# 删除本地备份文件
os.remove(source)
logger.info("本地备份文件已删除")
if __name__ == "__main__":
if 'run_restore' in sys.argv:
restore()
else:
logger.error("请指定 'run_restore'")

View File

@ -0,0 +1,39 @@
import os
import yaml
def load_config():
with open('/app/config.yaml', 'r') as file:
config = yaml.safe_load(file)
return config
def setup_cron():
config = load_config()
cron_jobs = []
# 添加备份任务
backup_schedule = config['backup'].get('schedule')
if backup_schedule:
backup_command = f"/usr/bin/python /app/backup.py run_backup >> /proc/1/fd/1 2>&1"
cron_jobs.append(f"{backup_schedule} {backup_command}")
# 添加验证任务
verification_enabled = config['verification'].get('enabled', False)
if verification_enabled:
verification_schedule = config['verification'].get('schedule')
if verification_schedule:
verify_command = f"/usr/bin/python /app/backup.py verify_backup >> /proc/1/fd/1 2>&1"
cron_jobs.append(f"{verification_schedule} {verify_command}")
# 添加恢复任务(如果需要自动恢复)
# restore_schedule = config['restore'].get('schedule')
# if restore_schedule:
# restore_command = f"/usr/bin/python /app/restore.py run_restore >> /proc/1/fd/1 2>&1"
# cron_jobs.append(f"{restore_schedule} {restore_command}")
# 将所有 cron 任务写入文件
with open('/etc/cron.d/backup-cron', 'w') as cron_file:
for job in cron_jobs:
cron_file.write(f"{job}\n")
# 赋予 cron 任务文件适当的权限
os.chmod('/etc/cron.d/backup-cron', 0o644)

View File

@ -0,0 +1,14 @@
# 使用官方的 Python 3.10 运行时作为基础镜像
FROM python:3.10-slim
# 将该镜像标记为 python-3.10 基础镜像
LABEL maintainer="Haitao Pan <manbuzhe2009@qq.com>"
# 安装必要的软件包
RUN apt-get update && \
apt-get install -y --no-install-recommends \
cron && \
rm -rf /var/lib/apt/lists/*
# 设置工作目录
WORKDIR /app