feat: align pulumi modules with alicloud landing zone

This commit is contained in:
shenlan 2025-09-26 15:04:36 +08:00
parent 1bfd72d9e9
commit d836b59794
29 changed files with 806 additions and 461 deletions

View File

@ -0,0 +1,62 @@
name: Alicloud Landing Zone Baseline
on:
push:
paths:
- 'iac_modules/pulumi/**'
- 'config/alicloud/**'
- '.github/workflows/iac-pipeline-alicloud-landingzone-baseline.yaml'
workflow_dispatch:
env:
PULUMI_CI: 'true'
CONFIG_PATH: config/alicloud
jobs:
preview:
name: Preview baseline changes
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.10'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Pulumi preview
uses: pulumi/actions@v4
with:
command: preview
stack-name: alicloud/baseline-dev
work-dir: iac_modules/pulumi
env:
ALICLOUD_ACCESS_KEY_ID: ${{ secrets.ALICLOUD_ACCESS_KEY_ID }}
ALICLOUD_ACCESS_KEY_SECRET: ${{ secrets.ALICLOUD_ACCESS_KEY_SECRET }}
PULUMI_ACCESS_TOKEN: ${{ secrets.PULUMI_ACCESS_TOKEN }}
apply:
name: Apply to production stack
needs: preview
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.10'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Pulumi up
uses: pulumi/actions@v4
with:
command: up
stack-name: alicloud/baseline-prod
work-dir: iac_modules/pulumi
env:
ALICLOUD_ACCESS_KEY_ID: ${{ secrets.ALICLOUD_ACCESS_KEY_ID }}
ALICLOUD_ACCESS_KEY_SECRET: ${{ secrets.ALICLOUD_ACCESS_KEY_SECRET }}
PULUMI_ACCESS_TOKEN: ${{ secrets.PULUMI_ACCESS_TOKEN }}

View File

@ -0,0 +1,8 @@
audit:
actiontrail:
enabled: true
name: lz-mvp-actiontrail
oss_bucket_ref: lz-mvp-actiontrail-logs
oss_key_prefix: actiontrail
trail_region: cn-hangzhou
event_rw: All

View File

@ -0,0 +1,5 @@
alicloud:
region: cn-hangzhou
default_tags:
project: landingzone-mvp
owner: your-github-handle

View File

@ -0,0 +1,27 @@
config_service:
recorder:
name: lz-config-recorder
resource_types:
- ACS::ECS::Instance
- ACS::OSS::Bucket
- ACS::VPC::VSwitch
delivery_channel:
name: lz-config-delivery
display_name: LandingZoneBaseline
type: OSS
target_arn: acs:oss:cn-hangzhou:${AliUid}:lz-mvp-actiontrail-logs
assume_role_arn: acs:ram::${AliUid}:role/aliyunconfigdefaultrole
description: Deliver baseline compliance evaluations to OSS
status: 1
rules:
- name: lz-required-env-tag
description: Ensure env tag exists on core resources
source_identifier: ecs-instance-required-tag
source_owner: ALIYUN
risk_level: 2
trigger_types: ConfigurationItemChangeNotification
resource_types_scopes:
- ACS::ECS::Instance
input_parameters:
tagKey: env
maximum_execution_frequency: TwentyFour_Hours

View File

@ -0,0 +1,28 @@
identity:
users:
- name: ops-automation
display_name: Landing Zone Automation
comments: Dedicated RAM user for IaC pipelines
policies:
- name: AliyunOSSFullAccess
type: System
- name: AliyunVPCFullAccess
type: System
- name: AliyunConfigFullAccess
type: System
- name: audit-viewer
display_name: Landing Zone Auditor
comments: Read-only access for monitoring
policies:
- name: ReadOnlyAccess
type: System
groups:
- name: ops-admins
comments: Baseline operations team
policies:
- name: AliyunConfigFullAccess
type: System
- name: AliyunVPCFullAccess
type: System
users:
- ops-automation

View File

@ -0,0 +1,18 @@
network:
vpcs:
- name: lz-main-vpc
cidr_block: 10.10.0.0/16
description: Landing zone baseline VPC
tags:
env: shared
vswitches:
- name: lz-prod-subnet
cidr_block: 10.10.1.0/24
zone_id: cn-hangzhou-h
tags:
env: prod
- name: lz-test-subnet
cidr_block: 10.10.2.0/24
zone_id: cn-hangzhou-h
tags:
env: test

View File

@ -0,0 +1,18 @@
security:
groups:
- name: lz-base-sg
vpc: lz-main-vpc
description: Baseline security group allowing outbound traffic only
tags:
env: shared
ingress:
- protocol: tcp
port_range: "22/22"
cidr_ip: 0.0.0.0/0
description: Temporary SSH access for break-glass
policy: accept
egress:
- protocol: all
port_range: "-1/-1"
cidr_ip: 0.0.0.0/0
policy: accept

View File

@ -0,0 +1,17 @@
storage:
oss_buckets:
- name: lz-mvp-actiontrail-logs
bucket: lz-mvp-actiontrail-logs
storage_class: Standard
versioning:
status: Enabled
lifecycle_rules:
- id: archive-audit-logs
enabled: true
transitions:
- storage_class: IA
days: 180
- storage_class: Archive
days: 365
tags:
env: prod

View File

@ -1,42 +1,55 @@
# ☁️ Pulumi 多模块 AWS IaaS 模板
# ☁️ Pulumi Alicloud Landing Zone Baseline
该目录基于 Pulumi 构建,支持以下模块:
该目录提供了与《docs/landingzone/alicloud-landingzone-mvp-single-account.md》一致的 Pulumi Python 实现,用于在单账号阿里云环境中快速落地身份、审计、配置合规、网络与安全基线。
## ✅ 模块支持
## ✅ 模块拆分
- VPC + 子网(自动分配 CIDR支持 enabled 控制)
- 安全组(通过 firewall.yaml 控制 ingress/egress
- EC2 实例(支持 spot、AMI keyword、user_data、自动标签
- AMI 自动识别(支持 `Ubuntu 22.04`, `Rocky Linux 8.10` 等)
- Pulumi Credentials 自动加载 ~/.aws/profile
- 环境配置文件支持多目录(如 `config/sit/`, `config/prod/`
| 模块 | 说明 |
| --- | --- |
| `modules/identity/ram.py` | 创建 RAM 用户、用户组以及策略绑定,覆盖 `ops-automation`、`audit-viewer` 等身份。 |
| `modules/storage/oss.py` | 管理 OSS 日志桶,支持版本控制与生命周期策略,用于 ActionTrail & Pulumi 状态。 |
| `modules/audit/actiontrail.py` | 启用 ActionTrail将操作日志投递到指定 OSS Bucket。 |
| `modules/config_service/baseline.py` | 初始化 Cloud Config Recorder、Delivery Channel 与基础规则。 |
| `modules/network/vpc.py` | 构建单 VPC + 双可用区交换机的网络基线。 |
| `modules/security/security_groups.py` | 创建默认安全组及入站/出站规则,默认仅放行出站流量。 |
## 📂 配置结构
`config/alicloud/` 目录提供示例配置,按照 Landing Zone 设计拆分:
- `base.yaml`:区域与全局标签
- `identity.yaml`RAM 用户/用户组与策略
- `storage.yaml`ActionTrail 日志桶(版本控制+生命周期)
- `network.yaml`VPC / 交换机拓扑
- `security.yaml`:安全组与规则
- `audit.yaml`ActionTrail 开关与目标 OSS
- `config-service.yaml`Cloud Config 基线配置
> ⚠️ 其中 `target_arn`、`assume_role_arn` 等字段需替换为实际账号 ID`${AliUid}`)。
## 🚀 使用方式
## 🚀 快速部署
```bash
初始化并部署 bash scripts/run.sh sit up
## 📂 配置说明
# config/sit/instances.yaml
```yaml
instances:
- name: master-1
ami: Ubuntu 22.04
type: t3.micro
subnet: public-subnet-1
disk_size_gb: 20
lifecycle: spot
ttl: 1h
```yaml
## 🧹 清理资源
删除资源 + 刷新状态
- bash scripts/run.sh sit down
- pulumi refresh --yes
## 📦 依赖
Python >= 3.8
# 安装依赖
pip install -r requirements.txt
AWS CLI 已配置 ~/.aws/credentials
# 设置配置目录(默认读取 config/,此处指向示例配置)
export CONFIG_PATH=config/alicloud
# Pulumi 登录(可选:使用 OSS backend 或 Pulumi Service
pulumi login
# 预览或部署
pulumi preview --cwd iac_modules/pulumi
pulumi up --cwd iac_modules/pulumi
```
## 🔒 GitHub Actions 自动化
新增 `.github/workflows/iac-pipeline-alicloud-landingzone-baseline.yaml`,结合 `pulumi/actions@v4` 实现 Preview + 主干自动部署,使用 Secrets 管理 `ALICLOUD_ACCESS_KEY_ID/SECRET``PULUMI_ACCESS_TOKEN`
## 🧩 扩展建议
- 根据生产需求扩展 Cloud Config 规则或引入企业版聚合器。
- 在安全组模块中追加环境专属规则Prod/Test
- 利用 `pulumi stack` 拆分 dev/prod 状态,配合 GitHub Environments 审批。

View File

@ -1,155 +1,59 @@
from __future__ import annotations
import os
import sys
from typing import Dict
import pulumi
import pulumi_aws as aws
import boto3
from botocore.exceptions import ProfileNotFound, NoCredentialsError
import pulumi_alicloud as alicloud
from modules import (
create_oss_buckets,
create_ram_identity,
create_security_groups,
create_vpc_topology,
enable_actiontrail,
enable_config_baseline,
)
from utils.config_loader import load_merged_config
from modules.vpc.vpc import create_vpcs
from modules.security_group.sg import create_security_group
from modules.ec2.ec2_instance import create_instances
# ✅ 加载配置
config_dir = os.environ.get("CONFIG_PATH", "config")
config = load_merged_config(config_dir)
aws_conf = config.get("aws", {})
region = aws_conf.get("region", "us-east-1")
profile = aws_conf.get("profile", "default")
key_pairs = aws_conf.get("key_pairs", [])
# ✅ 设置 AWS 配置
aws.config.region = region
aws.config.profile = profile
pulumi.runtime.set_config("aws:region", region)
# ✅ 检查 AWS 凭证
try:
session = boto3.Session(profile_name=profile)
credentials = session.get_credentials()
if not credentials:
raise NoCredentialsError()
except (ProfileNotFound, NoCredentialsError):
pulumi.log.error(f"❌ AWS profile '{profile}' 无效或找不到凭证")
sys.exit(1)
else:
pulumi.log.info(f"✅ AWS credentials loaded (profile: {profile}, region: {region})")
# ✅ 初始化资源容器
global_dependencies = []
vpc = None
subnets = {}
sg = None
key_pair = None
# ========================
# ✅ [模块] VPC + Subnets
vpc_confs = config.get("vpcs", [])
if vpc_confs:
vpc_results = create_vpcs(vpc_confs, region)
all_subnets = {}
for vpc_name, result in vpc_results.items():
pulumi.log.info(f"✅ VPC {vpc_name} 已创建")
global_dependencies.append(result["vpc"])
global_dependencies.extend(result["subnets"].values())
all_subnets.update(result["subnets"])
subnets = all_subnets
else:
pulumi.log.warn("⏭️ 跳过 VPC 创建")
# ========================
# ✅ [模块] 多个 Security Group
# ========================
# ✅ 存储 VPC 结果(名字 → 资源)
vpc_map = {vpc_name: result["vpc"] for vpc_name, result in vpc_results.items()}
firewall_rules = config.get("firewall_rules", [])
security_groups = {}
if firewall_rules and config.get("security_group", {}).get("enabled", True):
for rule in firewall_rules:
if not rule.get("enabled", True):
pulumi.log.warn(f"⏭️ 跳过未启用的 SG: {rule.get('name')}")
continue
vpc_name = rule.get("vpc_name")
if not vpc_name or vpc_name not in vpc_map:
pulumi.log.warn(f"❌ 未找到指定 VPC: {vpc_name},跳过 {rule.get('name')}")
continue
vpc_resource = vpc_map[vpc_name]
sg = create_security_group(vpc_resource.id, rule)
name = rule.get("name", "sg-unnamed")
security_groups[name] = sg
global_dependencies.append(sg)
# 确保 SG 创建等待 VPC 完成
pulumi.log.info(f"✅ Security Group '{name}' 已绑定 VPC: {vpc_name}")
pulumi.export("security_groups", {k: sg.id for k, sg in security_groups.items()})
else:
pulumi.log.warn("⏭️ 跳过 Security Group 创建")
# ========================
# ✅ [模块] SSH Key Pair
# ========================
if key_pairs:
key_cfg = key_pairs[0]
public_key_path = os.path.expanduser(key_cfg["key_file"])
if not os.path.exists(public_key_path):
raise FileNotFoundError(f"❌ SSH 公钥文件不存在: {public_key_path}")
with open(public_key_path) as f:
public_key = f.read().strip()
key_pair = aws.ec2.KeyPair("main-key",
key_name=key_cfg["name"],
public_key=public_key
)
global_dependencies.append(key_pair)
pulumi.log.info("✅ SSH KeyPair 已创建")
else:
pulumi.log.warn("⏭️ 跳过 KeyPair 创建")
def main() -> None:
config_dir = os.environ.get("CONFIG_PATH", "config/alicloud")
config = load_merged_config(config_dir)
# ========================
# ✅ [模块] EC2 实例部署
# ========================
alicloud_conf: Dict[str, object] = config.get("alicloud", {}) # type: ignore[assignment]
region = alicloud_conf.get("region")
profile = alicloud_conf.get("profile")
default_tags = alicloud_conf.get("default_tags", {})
# ========================
# ✅ [模块] EC2 实例部署
# ========================
instances_conf = config.get("instances", [])
ec2_outputs = {}
if region:
alicloud.config.region = region
pulumi.export("region", region)
if profile:
alicloud.config.profile = profile
if instances_conf and config.get("ec2", {}).get("enabled", True):
# ✅ 遍历每个实例,按 sg_names 匹配对应 Security Group ID 列表
def resolve_security_group_ids(instance_conf, sg_map):
sg_ids = []
for name in instance_conf.get("sg_names", []):
sg = sg_map.get(name)
if sg:
sg_ids.append(sg.id)
else:
pulumi.log.warn(f"⚠️ 实例 {instance_conf['name']} 引用了未知 SG: {name}")
return sg_ids
# ✅ 批量传入所有实例配置
ec2_outputs = create_instances(
instances_conf,
subnets,
security_groups, # ✅ 多 SG 映射 sg_name → resource
key_pair.key_name if key_pair else None,
depends_on=global_dependencies
pulumi.log.info(
"Loaded Alicloud configuration",
)
pulumi.log.info("✅ EC2 实例已创建")
else:
pulumi.log.warn("⏭️ 跳过 EC2 实例部署")
identity_results = create_ram_identity(config.get("identity", {}))
# ========================
# ✅ 导出所有实例信息
# ========================
for name, ip in ec2_outputs.items():
pulumi.export(f"{name}", ip)
buckets = create_oss_buckets(config.get("storage", {}), default_tags)
network_results = create_vpc_topology(config.get("network", {}), default_tags)
vpcs = network_results.get("vpcs", {})
security_groups = create_security_groups(
config.get("security", {}), vpcs, default_tags
)
enable_actiontrail(config.get("audit", {}), buckets)
enable_config_baseline(config.get("config_service", {}), buckets)
pulumi.export("ram_user_count", len(identity_results.get("users", {})))
pulumi.export("security_group_count", len(security_groups))
if __name__ == "__main__":
main()

View File

@ -0,0 +1,17 @@
from __future__ import annotations
from .audit.actiontrail import enable_actiontrail
from .config_service.baseline import enable_config_baseline
from .identity.ram import create_ram_identity
from .network.vpc import create_vpc_topology
from .security.security_groups import create_security_groups
from .storage.oss import create_oss_buckets
__all__ = [
"enable_actiontrail",
"enable_config_baseline",
"create_ram_identity",
"create_vpc_topology",
"create_security_groups",
"create_oss_buckets",
]

View File

@ -0,0 +1,53 @@
from __future__ import annotations
from typing import Mapping, Optional
import pulumi
import pulumi_alicloud as alicloud
def enable_actiontrail(
audit_conf: Mapping[str, object],
buckets: Mapping[str, pulumi.Resource],
) -> Optional[pulumi.Resource]:
trail_conf = audit_conf.get("actiontrail") if audit_conf else None
if not trail_conf:
pulumi.log.info("ActionTrail configuration not provided; skipping setup")
return None
if not trail_conf.get("enabled", True):
pulumi.log.info("ActionTrail disabled via configuration; skipping setup")
return None
bucket_name = trail_conf.get("oss_bucket_name")
bucket_reference = trail_conf.get("oss_bucket_ref")
if not bucket_name and bucket_reference:
bucket = buckets.get(bucket_reference)
if bucket is None:
pulumi.log.warn(
f"ActionTrail bucket reference '{bucket_reference}' could not be resolved"
)
else:
bucket_name = bucket.bucket
if not bucket_name:
pulumi.log.warn("No OSS bucket specified for ActionTrail; skipping trail creation")
return None
name = trail_conf.get("name", "landingzone-actiontrail")
trail = alicloud.actiontrail.Trail(
name,
trail_name=trail_conf.get("trail_name", name),
event_rw=trail_conf.get("event_rw", "All"),
oss_bucket_name=bucket_name,
oss_key_prefix=trail_conf.get("oss_key_prefix"),
trail_region=trail_conf.get("trail_region"),
is_organization_trail=trail_conf.get("is_organization_trail"),
oss_write_role_arn=trail_conf.get("oss_write_role_arn"),
sls_project_arn=trail_conf.get("sls_project_arn"),
sls_write_role_arn=trail_conf.get("sls_write_role_arn"),
status=trail_conf.get("status"),
)
pulumi.export("actiontrail_trail", trail.trail_name)
return trail

View File

@ -0,0 +1,33 @@
from __future__ import annotations
from typing import Dict, Mapping, Optional
import pulumi
def merge_tags(*tag_sets: Optional[Mapping[str, str]]) -> Optional[Dict[str, str]]:
"""Merge multiple tag dictionaries while filtering out falsy values."""
merged: Dict[str, str] = {}
for tags in tag_sets:
if not tags:
continue
for key, value in tags.items():
if value is None:
continue
merged[str(key)] = str(value)
return merged or None
def taggable_args(
default_tags: Optional[Mapping[str, str]],
resource_tags: Optional[Mapping[str, str]] = None,
) -> Dict[str, object]:
"""Helper to build Pulumi args with merged tags."""
tags = merge_tags(default_tags, resource_tags)
return {"tags": tags} if tags else {}
def annotate_resource_with_tags(resource: pulumi.CustomResource, tags: Optional[Mapping[str, str]]) -> None:
"""Attach merged tags to the Pulumi resource options after creation."""
if tags:
pulumi.export(f"tags::{resource._name}", tags)

View File

@ -0,0 +1,95 @@
from __future__ import annotations
from typing import Dict, Mapping
import pulumi
import pulumi_alicloud as alicloud
def enable_config_baseline(
config_conf: Mapping[str, object],
buckets: Mapping[str, pulumi.Resource],
) -> Dict[str, object]:
if not config_conf:
pulumi.log.info("Cloud Config configuration not provided; skipping setup")
return {}
resources: Dict[str, object] = {}
recorder_conf = config_conf.get("recorder")
recorder = None
if recorder_conf:
recorder = alicloud.cfg.ConfigurationRecorder(
recorder_conf.get("name", "config-recorder"),
enterprise_edition=recorder_conf.get("enterprise_edition"),
resource_types=recorder_conf.get("resource_types"),
)
resources["recorder"] = recorder
delivery_conf = config_conf.get("delivery_channel")
delivery_channel = None
if delivery_conf:
target_arn = delivery_conf.get("target_arn")
bucket_ref = delivery_conf.get("oss_bucket_ref")
if not target_arn and bucket_ref:
bucket = buckets.get(bucket_ref)
if bucket:
target_arn = delivery_conf.get("target_arn_fallback")
pulumi.log.info(
"Delivery channel target ARN not provided explicitly; using fallback"
)
else:
pulumi.log.warn(
f"Delivery channel bucket reference '{bucket_ref}' could not be resolved"
)
if target_arn:
delivery_channel = alicloud.cfg.DeliveryChannel(
delivery_conf.get("name", "config-delivery-channel"),
delivery_channel_name=delivery_conf.get("display_name"),
description=delivery_conf.get("description"),
delivery_channel_type=delivery_conf.get("type", "OSS"),
delivery_channel_target_arn=target_arn,
delivery_channel_assume_role_arn=delivery_conf.get("assume_role_arn"),
delivery_channel_condition=delivery_conf.get("condition"),
status=delivery_conf.get("status"),
)
resources["delivery_channel"] = delivery_channel
else:
pulumi.log.warn("Cloud Config delivery channel requires a target ARN; skipping")
for rule_conf in config_conf.get("rules", []) or []:
required_fields = ["name", "source_identifier"]
if any(field not in rule_conf for field in required_fields):
pulumi.log.warn(
f"Skipping Cloud Config rule definition due to missing fields: {rule_conf}"
)
continue
rule_args = {
"rule_name": rule_conf["name"],
"description": rule_conf.get("description"),
"risk_level": rule_conf.get("risk_level", 2),
"source_owner": rule_conf.get("source_owner", "ALIYUN"),
"source_identifier": rule_conf["source_identifier"],
"config_rule_trigger_types": rule_conf.get(
"trigger_types", "ConfigurationItemChangeNotification"
),
"resource_types_scopes": rule_conf.get("resource_types_scopes"),
"region_ids_scope": rule_conf.get("region_ids_scope"),
"resource_group_ids_scope": rule_conf.get("resource_group_ids_scope"),
"tag_key_scope": rule_conf.get("tag_key_scope"),
"tag_value_scope": rule_conf.get("tag_value_scope"),
"input_parameters": rule_conf.get("input_parameters"),
"maximum_execution_frequency": rule_conf.get("maximum_execution_frequency"),
"status": rule_conf.get("status"),
}
rule_args = {key: value for key, value in rule_args.items() if value is not None}
if delivery_channel:
rule_args["delivery_channel_id"] = delivery_channel.id
if recorder:
rule_args["configuration_recorder_id"] = recorder.id
rule = alicloud.cfg.Rule(rule_conf["name"], **rule_args)
resources.setdefault("rules", {})[rule_conf["name"]] = rule
return resources

View File

@ -1,97 +0,0 @@
import os
import pulumi
import pulumi_aws as aws
from .utils import resolve_ami
def create_instances(instances_config, subnets_dict, sg_map: dict, key_name, depends_on=None):
outputs = {}
for instance_cfg in instances_config:
name = instance_cfg["name"]
subnet_name = instance_cfg["subnet"]
subnet = subnets_dict[subnet_name]
subnet_id = subnet.id
region = aws.config.region
ami = resolve_ami(instance_cfg["ami"], region)
instance_type = instance_cfg["type"]
disk_size = instance_cfg["disk_size_gb"]
lifecycle = instance_cfg.get("lifecycle", "ondemand")
ttl = instance_cfg.get("ttl", "none")
env = instance_cfg.get("env", "dev")
owner = instance_cfg.get("owner", "unknown")
user_data_path = instance_cfg.get("user_data")
private_ip = instance_cfg.get("private_ip", None)
associate_public_ip = instance_cfg.get("associate_public_ip", True)
# ✅ User data
user_data = None
if user_data_path:
expanded_path = os.path.expanduser(user_data_path)
if os.path.exists(expanded_path):
with open(expanded_path, "r") as f:
user_data = f.read()
else:
pulumi.log.warn(f"⚠️ user_data 文件不存在: {expanded_path}")
tags = {
"Name": name,
"Lifecycle": lifecycle,
"TTL": ttl,
"Environment": env,
"Owner": owner,
}
# ✅ Spot 实例配置
instance_market_options = None
if lifecycle == "spot":
instance_market_options = aws.ec2.InstanceInstanceMarketOptionsArgs(
market_type="spot",
spot_options=aws.ec2.InstanceInstanceMarketOptionsSpotOptionsArgs(
instance_interruption_behavior="terminate",
spot_instance_type="one-time"
)
)
# ✅ 解析 security group ids通过名字
sg_names = instance_cfg.get("sg_names", [])
security_group_ids = []
for sg_name in sg_names:
sg = sg_map.get(sg_name)
if sg:
security_group_ids.append(sg.id)
else:
pulumi.log.warn(f"⚠️ 实例 '{name}' 引用的 SG '{sg_name}' 未找到,已跳过")
# ✅ 构建依赖项
resource_dependencies = [subnet]
for sg in security_group_ids:
resource_dependencies.append(sg_map.get(sg_name))
if depends_on:
resource_dependencies.extend(depends_on)
# ✅ 创建实例
ec2 = aws.ec2.Instance(name,
ami=ami,
instance_type=instance_type,
key_name=key_name,
subnet_id=subnet_id,
private_ip=private_ip,
associate_public_ip_address=associate_public_ip,
vpc_security_group_ids=security_group_ids,
user_data=user_data,
root_block_device={
"volume_size": disk_size,
"volume_type": "gp2"
},
instance_market_options=instance_market_options,
tags=tags,
opts=pulumi.ResourceOptions(depends_on=resource_dependencies)
)
outputs[name + "_id"] = ec2.id
outputs[name + "_public_ip"] = ec2.public_ip
outputs[name + "_private_ip"] = ec2.private_ip
return outputs

View File

@ -1,43 +0,0 @@
import pulumi_aws as aws
AMI_MAP = {
"ubuntu-22.04": ("099720109477", "*ubuntu*22.04*"),
"ubuntu-24.04": ("099720109477", "*ubuntu*24.04*"),
"rocky-8.10": ("792107900819", "Rocky-8-ec2-8.10*"),
"amazonlinux-2": ("137112412989", "amzn2-ami-hvm-*-gp2"),
"amazonlinux-2023": ("137112412989", "al2023-ami-*-x86_64"),
"debian-12": ("136693071363", "debian-12-*"),
"almalinux-9": ("151447241410", "AlmaLinux-9-*"),
}
def query_latest_ami(owner: str, name_filter: str, architecture: str = "x86_64") -> str:
result = aws.ec2.get_ami(
most_recent=True,
owners=[owner],
filters=[
{"name": "name", "values": [name_filter]},
{"name": "architecture", "values": [architecture]},
{"name": "virtualization-type", "values": ["hvm"]},
],
)
return result.id
def resolve_ami(ami_keyword: str, region: str, architecture: str = "x86_64") -> str:
if not aws.config.region:
raise ValueError("❌ AWS region is not set. Please set aws.config.region before calling resolve_ami")
if ami_keyword.startswith("ami-"):
return ami_keyword
keyword = ami_keyword.lower()
print(f"🔍 Resolving AMI for keyword='{keyword}' in region='{region}' with arch='{architecture}'")
if keyword in AMI_MAP:
owner, name_filter = AMI_MAP[keyword]
try:
return query_latest_ami(owner, name_filter, architecture)
except Exception as e:
raise ValueError(f"❌ Failed to find AMI for '{keyword}' in region '{region}': {e}")
raise ValueError(f"❌ Unsupported AMI keyword: {ami_keyword}. Supported keywords: {list(AMI_MAP.keys())}")

View File

@ -0,0 +1,89 @@
from __future__ import annotations
from typing import Dict, Iterable, Mapping, Optional
import pulumi
import pulumi_alicloud as alicloud
PolicyConfig = Mapping[str, str]
UserConfig = Mapping[str, object]
GroupConfig = Mapping[str, object]
def _normalize_policies(policies: Optional[Iterable[PolicyConfig]]) -> Iterable[PolicyConfig]:
return policies or []
def create_ram_identity(
identity_conf: Mapping[str, object],
) -> Dict[str, Dict[str, pulumi.Resource]]:
"""Create RAM users, groups, and policy attachments based on configuration."""
users_conf = identity_conf.get("users", []) or []
groups_conf = identity_conf.get("groups", []) or []
users: Dict[str, pulumi.Resource] = {}
groups: Dict[str, pulumi.Resource] = {}
for user_conf in users_conf:
name = user_conf["name"]
args = {
"name": name,
"display_name": user_conf.get("display_name"),
"email": user_conf.get("email"),
"mobile": user_conf.get("mobile"),
"comments": user_conf.get("comments"),
"force": user_conf.get("force_destroy"),
}
args = {k: v for k, v in args.items() if v is not None}
user = alicloud.ram.User(name, **args)
users[name] = user
for index, policy in enumerate(_normalize_policies(user_conf.get("policies"))):
alicloud.ram.UserPolicyAttachment(
f"{name}-policy-{index}",
policy_name=policy["name"],
policy_type=policy.get("type", "System"),
user_name=name,
opts=pulumi.ResourceOptions(depends_on=[user]),
)
for group_conf in groups_conf:
name = group_conf["name"]
args = {
"group_name": name,
"comments": group_conf.get("comments"),
"force": group_conf.get("force_destroy"),
}
args = {k: v for k, v in args.items() if v is not None}
group = alicloud.ram.Group(name, **args)
groups[name] = group
for index, policy in enumerate(_normalize_policies(group_conf.get("policies"))):
alicloud.ram.GroupPolicyAttachment(
f"{name}-policy-{index}",
group_name=name,
policy_name=policy["name"],
policy_type=policy.get("type", "System"),
opts=pulumi.ResourceOptions(depends_on=[group]),
)
members = group_conf.get("users") or []
missing_members = [user for user in members if user not in users]
if missing_members:
pulumi.log.warn(
f"RAM group '{name}' references users not defined in configuration: {', '.join(missing_members)}"
)
if members:
alicloud.ram.GroupMembership(
f"{name}-membership",
group_name=name,
user_names=[user for user in members if user in users],
opts=pulumi.ResourceOptions(depends_on=[group] + [users[user] for user in members if user in users]),
)
pulumi.export("ram_users", {name: user.name for name, user in users.items()})
pulumi.export("ram_groups", {name: group.group_name for name, group in groups.items()})
return {"users": users, "groups": groups}

View File

@ -0,0 +1,51 @@
from __future__ import annotations
from typing import Dict, Mapping, Optional
import pulumi
import pulumi_alicloud as alicloud
from ..common.tags import merge_tags
def create_vpc_topology(
network_conf: Mapping[str, object],
default_tags: Optional[Mapping[str, str]] = None,
) -> Dict[str, Dict[str, pulumi.Resource]]:
"""Create VPCs and VSwitches as described in the network configuration."""
vpcs_conf = network_conf.get("vpcs", []) or []
vpcs: Dict[str, pulumi.Resource] = {}
vswitches: Dict[str, pulumi.Resource] = {}
for vpc_conf in vpcs_conf:
name = vpc_conf["name"]
vpc_tags = merge_tags(default_tags, vpc_conf.get("tags"))
vpc = alicloud.vpc.Network(
name,
vpc_name=vpc_conf.get("display_name", name),
cidr_block=vpc_conf["cidr_block"],
description=vpc_conf.get("description"),
**({"tags": vpc_tags} if vpc_tags else {}),
)
vpcs[name] = vpc
for switch_conf in vpc_conf.get("vswitches", []) or []:
switch_name = switch_conf["name"]
switch_tags = merge_tags(vpc_tags, switch_conf.get("tags"))
vswitch = alicloud.vpc.Switch(
switch_name,
vswitch_name=switch_conf.get("display_name", switch_name),
vpc_id=vpc.id,
cidr_block=switch_conf["cidr_block"],
zone_id=switch_conf["zone_id"],
description=switch_conf.get("description"),
**({"tags": switch_tags} if switch_tags else {}),
opts=pulumi.ResourceOptions(depends_on=[vpc]),
)
vswitches[switch_name] = vswitch
pulumi.export("vpc_ids", {name: vpc.id for name, vpc in vpcs.items()})
pulumi.export("vswitch_ids", {name: sw.id for name, sw in vswitches.items()})
return {"vpcs": vpcs, "vswitches": vswitches}

View File

@ -0,0 +1,86 @@
from __future__ import annotations
from typing import Dict, Mapping, Optional
import pulumi
import pulumi_alicloud as alicloud
from ..common.tags import merge_tags
def create_security_groups(
security_conf: Mapping[str, object],
vpcs: Mapping[str, pulumi.Resource],
default_tags: Optional[Mapping[str, str]] = None,
) -> Dict[str, pulumi.Resource]:
groups_conf = security_conf.get("groups", []) or []
security_groups: Dict[str, pulumi.Resource] = {}
for group_conf in groups_conf:
name = group_conf["name"]
vpc_name = group_conf.get("vpc")
vpc = vpcs.get(vpc_name) if vpc_name else None
if vpc is None:
pulumi.log.warn(f"Skip security group '{name}' because VPC '{vpc_name}' was not found")
continue
tags = merge_tags(default_tags, group_conf.get("tags"))
sg = alicloud.ecs.SecurityGroup(
name,
security_group_name=group_conf.get("display_name", name),
description=group_conf.get("description"),
security_group_type=group_conf.get("type", "normal"),
vpc_id=vpc.id,
**({"tags": tags} if tags else {}),
opts=pulumi.ResourceOptions(depends_on=[vpc]),
)
security_groups[name] = sg
for index, rule in enumerate(group_conf.get("ingress", []) or []):
_create_rule(sg, rule, "ingress", index)
for index, rule in enumerate(group_conf.get("egress", []) or []):
_create_rule(sg, rule, "egress", index)
pulumi.export("security_group_ids", {name: sg.id for name, sg in security_groups.items()})
return security_groups
def _create_rule(
sg: pulumi.Resource,
rule_conf: Mapping[str, object],
rule_type: str,
index: int,
) -> None:
protocol = rule_conf.get("protocol", "all")
cidr_ip = rule_conf.get("cidr_ip")
ipv6_cidr_ip = rule_conf.get("ipv6_cidr_ip")
source_sg = rule_conf.get("source_security_group_id")
prefix_list_id = rule_conf.get("prefix_list_id")
if not any([cidr_ip, ipv6_cidr_ip, source_sg, prefix_list_id]):
pulumi.log.warn(
f"Security group {sg._name} {rule_type} rule #{index} does not define a source/destination; skipping"
)
return
args = {
"security_group_id": sg.id,
"type": rule_type,
"ip_protocol": protocol,
"port_range": rule_conf.get("port_range", "-1/-1"),
"cidr_ip": cidr_ip,
"ipv6_cidr_ip": ipv6_cidr_ip,
"source_security_group_id": source_sg,
"prefix_list_id": prefix_list_id,
"policy": rule_conf.get("policy", "accept"),
"description": rule_conf.get("description"),
"priority": rule_conf.get("priority"),
"nic_type": rule_conf.get("nic_type"),
}
args = {key: value for key, value in args.items() if value is not None}
alicloud.ecs.SecurityGroupRule(
f"{sg._name}-{rule_type}-{index}",
**args,
opts=pulumi.ResourceOptions(depends_on=[sg]),
)

View File

@ -1,66 +0,0 @@
import pulumi_aws as aws
from pulumi_aws.ec2 import SecurityGroup, SecurityGroupIngressArgs, SecurityGroupEgressArgs
def create_security_group(vpc_id: str, rule_config: dict) -> SecurityGroup:
"""
创建 Security Group支持 ingress/egress 配置包括 TCP, UDP, ICMP
:param vpc_id: 目标 VPC ID
:param rule_config: 单个 firewall_rules 的字典配置
:return: 创建的 SecurityGroup 资源对象
"""
ingress_rules = []
source_ranges = rule_config.get("source_ranges", ["0.0.0.0/0"])
egress_ranges = rule_config.get("egress_ranges", ["0.0.0.0/0"])
for allow_rule in rule_config.get("allow", []):
protocol = allow_rule.get("protocol", "tcp").lower()
ports = allow_rule.get("ports", [])
# ICMP 无需端口处理
if protocol == "icmp":
ingress_rules.append(
SecurityGroupIngressArgs(
protocol="icmp",
from_port=-1,
to_port=-1,
cidr_blocks=source_ranges
)
)
continue
# 处理 TCP/UDP 等需要端口的协议
for port in ports:
if isinstance(port, str) and port.lower() in ["*", "any", "all"]:
from_port, to_port = 0, 65535
else:
port = int(port)
from_port = to_port = port
ingress_rules.append(
SecurityGroupIngressArgs(
protocol=protocol,
from_port=from_port,
to_port=to_port,
cidr_blocks=source_ranges
)
)
# 创建 Security Group
sg = aws.ec2.SecurityGroup(
rule_config.get("name", "default-sg"),
vpc_id=vpc_id,
description=f"Security Group: {rule_config.get('name', 'N/A')}",
ingress=ingress_rules,
egress=[
SecurityGroupEgressArgs(
protocol="-1",
from_port=0,
to_port=0,
cidr_blocks=egress_ranges
)
],
tags={"Name": rule_config.get("name", "default-sg")}
)
return sg

View File

@ -0,0 +1,103 @@
from __future__ import annotations
from typing import Dict, List, Mapping, Optional
import pulumi
import pulumi_alicloud as alicloud
from ..common.tags import merge_tags
LifecycleConfig = Mapping[str, object]
def create_oss_buckets(
storage_conf: Mapping[str, object],
default_tags: Optional[Mapping[str, str]] = None,
) -> Dict[str, pulumi.Resource]:
buckets_conf = storage_conf.get("oss_buckets", []) or []
buckets: Dict[str, pulumi.Resource] = {}
for bucket_conf in buckets_conf:
name = bucket_conf["name"]
tags = merge_tags(default_tags, bucket_conf.get("tags"))
lifecycle_rules = [_build_lifecycle_rule(rule) for rule in bucket_conf.get("lifecycle_rules", [])]
lifecycle_rules = [rule for rule in lifecycle_rules if rule is not None]
bucket = alicloud.oss.Bucket(
name,
bucket=bucket_conf.get("bucket", name),
storage_class=bucket_conf.get("storage_class", "Standard"),
acl=bucket_conf.get("acl"),
force_destroy=bucket_conf.get("force_destroy", False),
logging=_build_logging(bucket_conf.get("logging")),
versioning=_build_versioning(bucket_conf.get("versioning")),
lifecycle_rules=lifecycle_rules or None,
**({"tags": tags} if tags else {}),
)
buckets[name] = bucket
pulumi.export("oss_bucket_names", {name: bucket.bucket for name, bucket in buckets.items()})
return buckets
def _build_versioning(config: Optional[Mapping[str, object]]) -> Optional[alicloud.oss.BucketVersioningArgs]:
if not config:
return None
if isinstance(config, str):
status = config
else:
status = config.get("status", "Enabled")
return alicloud.oss.BucketVersioningArgs(status=status)
def _build_logging(config: Optional[Mapping[str, object]]) -> Optional[alicloud.oss.BucketLoggingArgs]:
if not config:
return None
target_bucket = config.get("target_bucket")
if not target_bucket:
return None
return alicloud.oss.BucketLoggingArgs(
target_bucket=target_bucket,
target_prefix=config.get("target_prefix"),
)
def _build_lifecycle_rule(config: LifecycleConfig) -> Optional[alicloud.oss.BucketLifecycleRuleArgs]:
if not config:
return None
transitions = [
alicloud.oss.BucketLifecycleRuleTransitionArgs(
storage_class=transition["storage_class"],
days=transition.get("days"),
created_before_date=transition.get("created_before_date"),
is_access_time=transition.get("is_access_time"),
return_to_std_when_visit=transition.get("return_to_standard_when_visited"),
)
for transition in config.get("transitions", [])
if "storage_class" in transition
]
expiration_cfg = config.get("expiration")
if expiration_cfg is None and config.get("expiration_days"):
expiration_cfg = {"days": config["expiration_days"]}
expirations: List[alicloud.oss.BucketLifecycleRuleExpirationArgs] = []
if expiration_cfg:
expirations.append(
alicloud.oss.BucketLifecycleRuleExpirationArgs(
days=expiration_cfg.get("days"),
date=expiration_cfg.get("date"),
created_before_date=expiration_cfg.get("created_before_date"),
expired_object_delete_marker=expiration_cfg.get("expired_object_delete_marker"),
)
)
return alicloud.oss.BucketLifecycleRuleArgs(
id=config.get("id"),
enabled=config.get("enabled", True),
prefix=config.get("prefix"),
transitions=transitions or None,
expirations=expirations or None,
)

View File

@ -1,76 +0,0 @@
import pulumi_aws as aws
import pulumi
def create_vpcs(vpc_list, region):
results = {}
for vpc_conf in vpc_list:
result = create_vpc(vpc_conf, region)
results[vpc_conf["name"]] = result
return results
def create_vpc(vpc_conf, region):
vpc = aws.ec2.Vpc(vpc_conf['name'],
cidr_block=vpc_conf['cidr_block'],
enable_dns_support=True,
enable_dns_hostnames=True,
tags={"Name": vpc_conf['name']}
)
# 判断是否包含公有子网
has_public = any(subnet["type"] == "public" for subnet in vpc_conf["subnets"])
igw = aws.ec2.InternetGateway(f"{vpc_conf['name']}-igw", vpc_id=vpc.id) if has_public else None
subnets = {}
for subnet_cfg in vpc_conf["subnets"]:
subnet = aws.ec2.Subnet(subnet_cfg["name"],
vpc_id=vpc.id,
cidr_block=subnet_cfg["cidr_block"],
map_public_ip_on_launch=subnet_cfg["type"] == "public",
availability_zone=subnet_cfg["availability_zone"],
tags={"Name": subnet_cfg["name"]}
)
subnets[subnet_cfg["name"]] = subnet
# 路由表创建,根据 subnet_type 分组
route_tables = {}
if "routes" in vpc_conf:
for route_cfg in vpc_conf["routes"]:
subnet_type = route_cfg["subnet_type"]
route_table_name = f"{vpc_conf['name']}-{subnet_type}-rt"
# 如果还未创建该类型的路由表,则创建
if subnet_type not in route_tables:
route_table = aws.ec2.RouteTable(route_table_name,
vpc_id=vpc.id,
routes=[],
tags={"Name": route_table_name}
)
route_tables[subnet_type] = route_table
else:
route_table = route_tables[subnet_type]
# 添加路由条目(追加)
aws.ec2.Route(f"{route_table_name}-{route_cfg['destination_cidr_block'].replace('/', '-')}",
route_table_id=route_table.id,
destination_cidr_block=route_cfg["destination_cidr_block"],
gateway_id=igw.id if route_cfg["gateway"] == "internet_gateway" else None
)
# 路由表关联到子网
for subnet_cfg in vpc_conf["subnets"]:
subnet_type = subnet_cfg["type"]
if subnet_type in route_tables:
aws.ec2.RouteTableAssociation(f"{subnet_cfg['name']}-assoc",
subnet_id=subnets[subnet_cfg["name"]].id,
route_table_id=route_tables[subnet_type].id
)
# TODO: Peering 支持
return {
"vpc": vpc,
"subnets": subnets,
"igw": igw,
"route_tables": route_tables
}