feat(stackflow): add declarative StackFlow + DNS roles

This commit is contained in:
Haitao Pan 2026-02-08 13:53:36 +08:00
parent 4a31bc4075
commit 96cf0d7d8a
13 changed files with 971 additions and 0 deletions

152
.github/workflows/stackflow.yaml vendored Normal file
View File

@ -0,0 +1,152 @@
name: StackFlow (Plan/Validate)
on:
workflow_dispatch:
inputs:
config:
description: "Path to StackFlow config (e.g. StackFlow/svc-plus.yaml)"
required: true
type: string
default: "StackFlow/svc-plus.yaml"
phase:
description: "Phase to run"
required: true
type: choice
options:
- validate
- dns-plan
pull_request:
paths:
- "StackFlow/**/*.yml"
- "StackFlow/**/*.yaml"
- "stackflow/**/*.yml"
- "stackflow/**/*.yaml"
- ".github/workflows/stackflow.yaml"
- "scripts/stackflow/**"
push:
branches:
- main
paths:
- "StackFlow/**/*.yml"
- "StackFlow/**/*.yaml"
- "stackflow/**/*.yml"
- "stackflow/**/*.yaml"
- ".github/workflows/stackflow.yaml"
- "scripts/stackflow/**"
jobs:
resolve-configs:
runs-on: ubuntu-latest
outputs:
configs: ${{ steps.set.outputs.configs }}
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Resolve config list
id: set
shell: bash
run: |
set -euo pipefail
if [[ "${{ github.event_name }}" == "workflow_dispatch" ]]; then
python - <<'PY' >> "$GITHUB_OUTPUT"
import json
print("configs=" + json.dumps(["${{ inputs.config }}"]))
PY
exit 0
fi
if [[ "${{ github.event_name }}" == "pull_request" ]]; then
git fetch origin "${{ github.base_ref }}" --depth=1
files="$(git diff --name-only "origin/${{ github.base_ref }}"...HEAD || true)"
else
# push
files="$(git diff --name-only "${{ github.event.before }}" "${{ github.sha }}" || true)"
fi
configs="$(printf '%s\n' "$files" | grep -E '^(StackFlow|stackflow)/.*\.ya?ml$' || true)"
if [[ -z "${configs}" ]]; then
if [[ -f "stackflow/svc.plus.yaml" ]]; then
configs="stackflow/svc.plus.yaml"
else
configs="StackFlow/svc-plus.yaml"
fi
fi
printf '%s\n' "$configs" | python - <<'PY' >> "$GITHUB_OUTPUT"
import json, sys
configs = [l.strip() for l in sys.stdin.read().splitlines() if l.strip()]
print("configs=" + json.dumps(configs))
PY
stackflow:
runs-on: ubuntu-latest
needs: resolve-configs
concurrency:
group: stackflow-${{ github.ref }}
cancel-in-progress: true
strategy:
fail-fast: false
matrix:
config: ${{ fromJson(needs.resolve-configs.outputs.configs) }}
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install deps
run: |
python -m pip install --upgrade pip
python -m pip install -r scripts/stackflow/requirements.txt
- name: Prepare output dir
shell: bash
run: |
set -euo pipefail
mkdir -p out
- name: Run StackFlow (workflow_dispatch)
if: ${{ github.event_name == 'workflow_dispatch' }}
run: |
python scripts/stackflow/runner.py \
--config "${{ inputs.config }}" \
--phase "${{ inputs.phase }}"
- name: Validate (CI)
if: ${{ github.event_name != 'workflow_dispatch' }}
run: |
python scripts/stackflow/runner.py \
--config "${{ matrix.config }}" \
--phase validate \
> "out/$(basename "${{ matrix.config }}").validate.json"
- name: DNS Plan (CI)
if: ${{ github.event_name != 'workflow_dispatch' }}
run: |
python scripts/stackflow/runner.py \
--config "${{ matrix.config }}" \
--phase dns-plan \
> "out/$(basename "${{ matrix.config }}").dns-plan.json"
- name: Compute artifact name (CI)
if: ${{ github.event_name != 'workflow_dispatch' }}
shell: bash
run: |
set -euo pipefail
name="${{ matrix.config }}"
name="${name//\//-}"
echo "ARTIFACT_NAME=stackflow-${name}" >> "$GITHUB_ENV"
- name: Upload artifacts (CI)
if: ${{ github.event_name != 'workflow_dispatch' }}
uses: actions/upload-artifact@v4
with:
name: ${{ env.ARTIFACT_NAME }}
path: out/

4
.gitignore vendored
View File

@ -7,3 +7,7 @@ playbooks/deepflow/*.tar.gz
playbooks/deepflow/deepflow-agent-playbook/*.zip playbooks/deepflow/deepflow-agent-playbook/*.zip
remotes.before.txt remotes.before.txt
# Python
__pycache__/
*.pyc

116
StackFlow/svc-plus.yaml Normal file
View File

@ -0,0 +1,116 @@
apiVersion: gitops.svc.plus/v1alpha1
kind: StackFlow
metadata:
# Stack identifier (used in plans/artifacts).
name: svc-plus
global:
# Root domain for this business stack.
# Runner enforces: every targets[].domains[] must be under this root.
domain: svc.plus
# Declarative provider selector for future dns-apply (no secrets here).
dns_provider: cloudflare
# Default cloud for this stack (future iac-apply/deploy/observe phases).
cloud: gcp
gcp_project: xzerolab-480008
# Optional: multi-environment overrides (selected by runner --env).
# Today CI only runs plan/validate; env selection is for future expansion.
environments:
prod:
dns_provider: cloudflare
cloud: gcp
gcp_project: xzerolab-480008
dev:
dns_provider: cloudflare
cloud: gcp
gcp_project: xzerolab-480008
# Source-of-truth repos (informational).
gitops: https://github.com/cloud-neutral-toolkit/gitops
playbooks: https://github.com/cloud-neutral-toolkit/playbook
iac_modules: https://github.com/cloud-neutral-toolkit/iac_modules
targets:
# -----------------------------------------
# Vercel: www + console
# -----------------------------------------
- id: vercel-console
type: vercel
vercel:
project_url: https://vercel.com/svc-designs-projects/console-svc-plus
team_slug: svc-designs-projects
project_slug: console-svc-plus
domains:
- www.svc.plus
- console.svc.plus
# Optional env-specific intent (not used by runner yet).
environments:
dev:
domains:
- www.dev.svc.plus
- console.dev.svc.plus
dns:
# Default policy: pure DNS. Proxy can be enabled per-record later.
records:
- name: www
type: CNAME
value: cname.vercel-dns.com.
proxied: false
- name: console
type: CNAME
value: cname.vercel-dns.com.
proxied: false
# -----------------------------------------
# GCE vhost: clawdbot
# -----------------------------------------
- id: clawdbot
type: vhost
cloud: gcp
gcp:
project: xzerolab-480008
zone: asia-east1-b
instance_name: clawdbot-svc-plus
console_url: https://console.cloud.google.com/compute/instancesDetail/zones/asia-east1-b/instances/clawdbot-svc-plus?project=xzerolab-480008
domains:
- clawdbot.svc.plus
resources:
os: debian-13
cpu: 2
mem_mib: 4096
disk_gb: 50
endpoints:
# Will be filled by future iac-apply output.
public_ipv4: ""
dns:
records:
- name: clawdbot
type: A
valueFrom: endpoints.public_ipv4
proxied: false
# -----------------------------------------
# GCP Cloud Run: accounts
# -----------------------------------------
- id: accounts
type: cloud-run
cloud: gcp
repo: https://github.com/cloud-neutral-toolkit/accounts.svc.plus
gcp:
project: xzerolab-480008
region: asia-northeast1
service: accounts-svc-plus
console_url: https://console.cloud.google.com/run/detail/asia-northeast1/accounts-svc-plus/observability/metrics?project=xzerolab-480008
domains:
- accounts.svc.plus
deploy:
mode: repo-dispatch
repository: cloud-neutral-toolkit/accounts.svc.plus
event_type: stackflow.deploy.cloudrun
dns:
# Cloud Run custom domain mapping needs provider-specific verification records.
# Keep explicit records here once known; plan/validate won't apply them.
records: []

58
docs/stackflow/README.md Normal file
View File

@ -0,0 +1,58 @@
# StackFlow (GitOps YAML Flow)
StackFlow is a declarative YAML describing a full business stack deployment
across DNS, cloud resources (IAC), and Ansible-based provisioning.
This repository already contains:
- `playbooks/` (Ansible provisioning for vhosts/docker/k3s)
- `iac-template/` (Terraform reference templates)
- `.github/workflows/` (bootstrap workflows)
StackFlow adds a top-level config file that can drive those pieces in one place.
## Goals
- One YAML describes root domain + targets (Vercel, Cloud Run, vhosts, etc.)
- CI can validate config, produce a DNS plan, then apply phases later
- Never commit real secrets (tokens/keys); use GitHub Secrets / Secret Manager
## Config Example
See: `StackFlow/svc-plus.yaml`
## Schema (v1alpha1)
Top-level:
- `apiVersion`: `gitops.svc.plus/v1alpha1`
- `kind`: `StackFlow`
- `metadata.name`: stack id
- `global.domain`: root domain, e.g. `svc.plus`
- `global.dns_provider`: `cloudflare` (planned), `alicloud` (existing playbooks)
- `global.cloud`: `gcp`
- `targets[]`: list of deployable targets
Target fields (common):
- `id`: unique id
- `type`: `vercel` | `cloud-run` | `vhost` | `kubernetes` (planned)
- `domains[]`: FQDNs owned by this target
- `dns.records[]`: explicit DNS record intents
DNS record intent:
- `name`: record name relative to `global.domain` (e.g. `www`)
- `type`: `A` | `AAAA` | `CNAME` | `TXT` | `MX`
- `value`: literal value (string)
- `valueFrom`: dotted path reference inside the target (e.g. `endpoints.public_ipv4`)
- `ttl`: optional int seconds
- `proxied`: optional bool (Cloudflare-specific)
## Workflows
Planned phases:
- `validate`: validate YAML structure
- `dns-plan`: output required DNS records (no apply)
- `dns-apply`: apply DNS changes (provider-specific)
- `iac-apply`: provision resources via Terraform
- `deploy`: deploy apps via Ansible or repo-dispatch
- `observe`: connect monitoring / alerts
Today we only ship `validate` + `dns-plan` as the first step.

View File

@ -0,0 +1,16 @@
---
# Required
cloudflare_dns_domain: ""
cloudflare_dns_rr: ""
cloudflare_dns_type: ""
cloudflare_dns_value: ""
# Optional
# Cloudflare TTL supports 1 (automatic) or a provider-dependent range.
cloudflare_dns_ttl: 1
cloudflare_dns_proxied: false
cloudflare_dns_priority: null
# Secret (pass via extra-vars / env / vault; never commit real values)
cloudflare_api_token: ""

View File

@ -0,0 +1,194 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
from __future__ import annotations
import json
import urllib.parse
import urllib.request
from ansible.module_utils.basic import AnsibleModule
API_BASE = "https://api.cloudflare.com/client/v4"
def _request(method, path, api_token, payload=None, query=None):
url = API_BASE + path
if query:
url += "?" + urllib.parse.urlencode(query)
body = None
if payload is not None:
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url=url, data=body, method=method)
req.add_header("Authorization", "Bearer " + api_token)
req.add_header("Content-Type", "application/json")
with urllib.request.urlopen(req, timeout=30) as resp:
raw = resp.read().decode("utf-8")
return json.loads(raw)
def _api_ok(data):
return isinstance(data, dict) and data.get("success") is True
def _api_first_result(data):
if not _api_ok(data):
return None
res = data.get("result")
if isinstance(res, list) and res:
return res[0]
return None
def _zone_id(api_token, zone_name):
data = _request("GET", "/zones", api_token, query={"name": zone_name, "per_page": 50})
z = _api_first_result(data)
if not z:
return None
return z.get("id")
def _record_fqdn(zone, rr):
rr = rr.strip()
if rr in ("@", zone):
return zone
return rr + "." + zone
def _get_record(api_token, zone_id, record_type, fqdn):
data = _request(
"GET",
f"/zones/{zone_id}/dns_records",
api_token,
query={"type": record_type, "name": fqdn, "per_page": 50},
)
return _api_first_result(data)
def main():
module = AnsibleModule(
argument_spec=dict(
state=dict(type="str", choices=["present", "absent"], default="present"),
zone=dict(type="str", required=True),
rr=dict(type="str", required=True),
type=dict(type="str", required=True),
value=dict(type="str"),
ttl=dict(type="int", default=1),
proxied=dict(type="bool", default=False),
priority=dict(type="int", required=False),
api_token=dict(type="str", required=True, no_log=True),
),
supports_check_mode=True,
)
state = module.params["state"]
zone = module.params["zone"]
rr = module.params["rr"]
record_type = module.params["type"].upper()
value = module.params["value"]
ttl = module.params["ttl"]
proxied = module.params["proxied"]
priority = module.params.get("priority", None)
api_token = module.params["api_token"]
if state == "present" and not value:
module.fail_json(msg="value is required when state=present")
try:
zid = _zone_id(api_token, zone)
except Exception as e:
module.fail_json(msg=f"Failed to query Cloudflare zone id: {e}")
if not zid:
module.fail_json(msg=f"Cloudflare zone not found: {zone}")
fqdn = _record_fqdn(zone, rr)
try:
existing = _get_record(api_token, zid, record_type, fqdn)
except Exception as e:
module.fail_json(msg=f"Failed to query Cloudflare DNS record: {e}")
# ----------------------------
# ABSENT
# ----------------------------
if state == "absent":
if not existing:
module.exit_json(changed=False, msg="Record already absent")
if module.check_mode:
module.exit_json(changed=True)
rid = existing.get("id")
try:
data = _request("DELETE", f"/zones/{zid}/dns_records/{rid}", api_token)
except Exception as e:
module.fail_json(msg=f"Failed to delete record: {e}")
if not _api_ok(data):
module.fail_json(msg="Cloudflare API error deleting record", details=data)
module.exit_json(changed=True, msg="Record deleted", record_id=rid, fqdn=fqdn)
# ----------------------------
# PRESENT (create/update)
# ----------------------------
desired = {
"type": record_type,
"name": fqdn,
"content": value,
"ttl": ttl,
"proxied": proxied,
}
if priority is not None:
desired["priority"] = priority
if existing:
cur = {
"type": existing.get("type"),
"name": existing.get("name"),
"content": existing.get("content"),
"ttl": existing.get("ttl"),
"proxied": existing.get("proxied"),
}
if priority is not None:
cur["priority"] = existing.get("priority")
if cur == desired:
module.exit_json(
changed=False,
msg="Record already up to date",
record_id=existing.get("id"),
fqdn=fqdn,
)
if module.check_mode:
module.exit_json(changed=True)
rid = existing.get("id")
try:
data = _request("PUT", f"/zones/{zid}/dns_records/{rid}", api_token, payload=desired)
except Exception as e:
module.fail_json(msg=f"Failed to update record: {e}")
if not _api_ok(data):
module.fail_json(msg="Cloudflare API error updating record", details=data)
module.exit_json(changed=True, msg="Record updated", record_id=rid, fqdn=fqdn)
# CREATE
if module.check_mode:
module.exit_json(changed=True)
try:
data = _request("POST", f"/zones/{zid}/dns_records", api_token, payload=desired)
except Exception as e:
module.fail_json(msg=f"Failed to create record: {e}")
if not _api_ok(data):
module.fail_json(msg="Cloudflare API error creating record", details=data)
rec = data.get("result") or {}
module.exit_json(changed=True, msg="Record created", record_id=rec.get("id"), fqdn=fqdn)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,17 @@
---
- name: Ensure Cloudflare DNS Record
cloudflare_dns_record:
state: present
zone: "{{ cloudflare_dns_domain }}"
rr: "{{ cloudflare_dns_rr }}"
type: "{{ cloudflare_dns_type }}"
value: "{{ cloudflare_dns_value }}"
ttl: "{{ cloudflare_dns_ttl }}"
proxied: "{{ cloudflare_dns_proxied }}"
priority: "{{ cloudflare_dns_priority }}"
api_token: "{{ cloudflare_api_token }}"
register: dns_result
- debug:
var: dns_result

View File

@ -0,0 +1,8 @@
---
cloudflare_dns_sync_domain: ""
cloudflare_dns_sync_records: []
cloudflare_dns_sync_output: "/tmp/dns_records.yaml"
# Secret: set via extra-vars / env / vault; do not commit real values.
cloudflare_api_token: ""

View File

@ -0,0 +1,118 @@
#!/usr/bin/env python3
from __future__ import annotations
import json
import os
import sys
import urllib.parse
import urllib.request
import yaml
API_BASE = "https://api.cloudflare.com/client/v4"
def _req(method: str, path: str, token: str, payload=None, query=None):
url = API_BASE + path
if query:
url += "?" + urllib.parse.urlencode(query)
body = None
if payload is not None:
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url=url, data=body, method=method)
req.add_header("Authorization", "Bearer " + token)
req.add_header("Content-Type", "application/json")
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode("utf-8"))
def _ok(d):
return isinstance(d, dict) and d.get("success") is True
def zone_id(token: str, zone_name: str) -> str:
d = _req("GET", "/zones", token, query={"name": zone_name, "per_page": 50})
if not _ok(d) or not d.get("result"):
raise RuntimeError(f"zone not found: {zone_name}")
return d["result"][0]["id"]
def fqdn(zone: str, rr: str) -> str:
rr = rr.strip()
if rr in ("@", zone):
return zone
return rr + "." + zone
def get_record(token: str, zid: str, rtype: str, name: str):
d = _req("GET", f"/zones/{zid}/dns_records", token, query={"type": rtype, "name": name, "per_page": 50})
if not _ok(d):
raise RuntimeError("query dns_records failed")
res = d.get("result") or []
return res[0] if res else None
def ensure_record(token: str, zid: str, zone: str, rec: dict):
rr = rec["rr"]
rtype = rec["type"].upper()
value = rec["value"]
ttl = int(rec.get("ttl", 1))
proxied = bool(rec.get("proxied", False))
priority = rec.get("priority", None)
name = fqdn(zone, rr)
desired = {"type": rtype, "name": name, "content": value, "ttl": ttl, "proxied": proxied}
if priority is not None:
desired["priority"] = int(priority)
cur = get_record(token, zid, rtype, name)
if not cur:
print("CREATE:", desired)
d = _req("POST", f"/zones/{zid}/dns_records", token, payload=desired)
if not _ok(d):
raise RuntimeError("create failed: " + json.dumps(d))
return
cur_slim = {
"type": cur.get("type"),
"name": cur.get("name"),
"content": cur.get("content"),
"ttl": cur.get("ttl"),
"proxied": cur.get("proxied"),
}
if priority is not None:
cur_slim["priority"] = cur.get("priority")
if cur_slim == desired:
print("OK:", desired["name"], desired["type"])
return
print("UPDATE:", desired)
rid = cur["id"]
d = _req("PUT", f"/zones/{zid}/dns_records/{rid}", token, payload=desired)
if not _ok(d):
raise RuntimeError("update failed: " + json.dumps(d))
def main(argv: list[str]) -> int:
if len(argv) != 2:
print(f"usage: {sys.argv[0]} <dns_records.yaml>", file=sys.stderr)
return 2
fn = argv[1]
token = os.environ.get("CLOUDFLARE_API_TOKEN", "").strip()
if not token:
print("CLOUDFLARE_API_TOKEN is required", file=sys.stderr)
return 2
cfg = yaml.safe_load(open(fn, "r", encoding="utf-8")) or {}
for zone, recs in cfg.items():
zid = zone_id(token, zone)
for rec in recs or []:
ensure_record(token, zid, zone, rec)
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv))

View File

@ -0,0 +1,19 @@
---
- name: Generate DNS records file from template
template:
src: dns_records.yaml.j2
dest: "{{ cloudflare_dns_sync_output }}"
- name: Upload dns_sync.py
copy:
src: dns_sync.py
dest: /tmp/dns_sync.py
mode: "0755"
- name: Sync DNS records
command: >
python3 /tmp/dns_sync.py
{{ cloudflare_dns_sync_output }}
environment:
CLOUDFLARE_API_TOKEN: "{{ cloudflare_api_token }}"

View File

@ -0,0 +1,9 @@
{{ cloudflare_dns_sync_domain }}:
{% for rec in cloudflare_dns_sync_records %}
- rr: "{{ rec.rr }}"
type: "{{ rec.type }}"
value: "{{ rec.value }}"
ttl: {{ rec.ttl | default(1) }}
proxied: {{ rec.proxied | default(false) }}
{% endfor %}

View File

@ -0,0 +1,2 @@
PyYAML==6.0.2

258
scripts/stackflow/runner.py Normal file
View File

@ -0,0 +1,258 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import sys
from dataclasses import dataclass
from typing import Any
def _fatal(msg: str, code: int = 2) -> None:
print(f"stackflow: {msg}", file=sys.stderr)
raise SystemExit(code)
def _load_yaml(path: str) -> dict[str, Any]:
try:
import yaml # type: ignore
except Exception:
_fatal(
"missing dependency PyYAML. Install with: python3 -m pip install -r scripts/stackflow/requirements.txt"
)
with open(path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
if not isinstance(data, dict):
_fatal(f"config must be a YAML mapping, got {type(data).__name__}")
return data
def _get(d: dict[str, Any], key: str, typ: type, *, required: bool = True) -> Any:
v = d.get(key)
if v is None:
if required:
_fatal(f"missing required field: {key}")
return None
if not isinstance(v, typ):
_fatal(f"field {key} must be {typ.__name__}, got {type(v).__name__}")
return v
def _as_str(v: Any, ctx: str) -> str:
if not isinstance(v, str) or not v.strip():
_fatal(f"{ctx} must be a non-empty string")
return v
def _normalize_record(rec: dict[str, Any]) -> dict[str, Any]:
name = rec.get("name")
rtype = rec.get("type")
if name is None or rtype is None:
_fatal("dns.records entries require name and type")
name = _as_str(name, "dns.records[].name")
rtype = _as_str(rtype, "dns.records[].type").upper()
out: dict[str, Any] = {"name": name, "type": rtype}
if "valueFrom" in rec:
out["valueFrom"] = _as_str(rec["valueFrom"], "dns.records[].valueFrom")
elif "value" in rec:
out["value"] = _as_str(rec["value"], "dns.records[].value")
else:
_fatal("dns.records entries require either value or valueFrom")
ttl = rec.get("ttl")
if ttl is not None:
if not isinstance(ttl, int) or ttl <= 0:
_fatal("dns.records[].ttl must be a positive int")
out["ttl"] = ttl
proxied = rec.get("proxied")
if proxied is not None:
if not isinstance(proxied, bool):
_fatal("dns.records[].proxied must be boolean")
out["proxied"] = proxied
return out
@dataclass(frozen=True)
class Stack:
name: str
root_domain: str
dns_provider: str
cloud: str
targets: list[dict[str, Any]]
def validate(cfg: dict[str, Any]) -> Stack:
kind = _get(cfg, "kind", str)
if kind != "StackFlow":
_fatal(f"kind must be StackFlow, got {kind!r}")
md = _get(cfg, "metadata", dict)
name = _as_str(md.get("name"), "metadata.name")
g = _get(cfg, "global", dict)
root_domain = _as_str(g.get("domain"), "global.domain")
dns_provider = _as_str(g.get("dns_provider"), "global.dns_provider")
cloud = _as_str(g.get("cloud"), "global.cloud")
# Optional multi-environment overrides.
environments = g.get("environments")
if environments is not None and not isinstance(environments, dict):
_fatal("global.environments must be a mapping of env -> overrides")
targets = _get(cfg, "targets", list)
for i, t in enumerate(targets):
if not isinstance(t, dict):
_fatal(f"targets[{i}] must be a mapping")
_as_str(t.get("id"), f"targets[{i}].id")
_as_str(t.get("type"), f"targets[{i}].type")
domains = t.get("domains")
if domains is None or not isinstance(domains, list) or not domains:
_fatal(f"targets[{i}].domains must be a non-empty list")
for j, d in enumerate(domains):
fqdn = _as_str(d, f"targets[{i}].domains[{j}]")
if not (fqdn == root_domain or fqdn.endswith("." + root_domain)):
_fatal(
f"targets[{i}].domains[{j}] must be under global.domain ({root_domain}), got {fqdn}"
)
dns = t.get("dns", {})
if dns is None:
dns = {}
if not isinstance(dns, dict):
_fatal(f"targets[{i}].dns must be a mapping")
recs = dns.get("records", [])
if recs is None:
recs = []
if not isinstance(recs, list):
_fatal(f"targets[{i}].dns.records must be a list")
for k, r in enumerate(recs):
if not isinstance(r, dict):
_fatal(f"targets[{i}].dns.records[{k}] must be a mapping")
_normalize_record(r)
return Stack(
name=name,
root_domain=root_domain,
dns_provider=dns_provider,
cloud=cloud,
targets=targets,
)
def _apply_env_overrides(cfg: dict[str, Any], env_name: str | None) -> dict[str, Any]:
if env_name is None:
return cfg
g = cfg.get("global")
if not isinstance(g, dict):
return cfg
envs = g.get("environments")
if envs is None:
return cfg
if not isinstance(envs, dict):
_fatal("global.environments must be a mapping of env -> overrides")
overrides = envs.get(env_name)
if overrides is None:
_fatal(f"env not found in global.environments: {env_name}")
if not isinstance(overrides, dict):
_fatal(f"global.environments.{env_name} must be a mapping")
# Shallow-merge global overrides into global.
merged = dict(cfg)
merged_global = dict(g)
for k, v in overrides.items():
merged_global[k] = v
merged["global"] = merged_global
return merged
def dns_plan(cfg: dict[str, Any], env_name: str | None) -> dict[str, Any]:
cfg2 = _apply_env_overrides(cfg, env_name)
stack = validate(cfg2)
out: dict[str, Any] = {
"stack": stack.name,
"env": env_name or "",
"global": {
"domain": stack.root_domain,
"dns_provider": stack.dns_provider,
},
"records": [],
}
# Flatten all explicit dns.records across targets.
records: list[dict[str, Any]] = []
for t in stack.targets:
tid = t["id"]
dns = t.get("dns") or {}
recs = dns.get("records") or []
for r in recs:
nr = _normalize_record(r)
nr["target"] = tid
records.append(nr)
out["records"] = records
return out
def main(argv: list[str]) -> int:
ap = argparse.ArgumentParser(prog="stackflow")
ap.add_argument("--config", required=True, help="Path to StackFlow YAML")
ap.add_argument(
"--env",
default="",
help="Optional environment name under global.environments (e.g. dev/prod)",
)
ap.add_argument(
"--phase",
required=True,
choices=["validate", "dns-plan"],
help="Which phase to run",
)
ap.add_argument(
"--format",
default="json",
choices=["json"],
help="Output format for plan phases",
)
args = ap.parse_args(argv)
if not os.path.exists(args.config):
_fatal(f"config not found: {args.config}")
cfg = _load_yaml(args.config)
env_name = args.env.strip() or None
if args.phase == "validate":
cfg2 = _apply_env_overrides(cfg, env_name)
s = validate(cfg2)
print(
json.dumps(
{
"ok": True,
"stack": s.name,
"env": env_name or "",
"domain": s.root_domain,
"dns_provider": s.dns_provider,
"cloud": s.cloud,
"targets": len(s.targets),
},
indent=2,
sort_keys=True,
)
)
return 0
if args.phase == "dns-plan":
plan = dns_plan(cfg, env_name)
print(json.dumps(plan, indent=2, sort_keys=True))
return 0
_fatal(f"unknown phase: {args.phase}")
return 2
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))