259 lines
7.7 KiB
Python
259 lines
7.7 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
|
|
|
|
def _fatal(msg: str, code: int = 2) -> None:
|
|
print(f"stackflow: {msg}", file=sys.stderr)
|
|
raise SystemExit(code)
|
|
|
|
|
|
def _load_yaml(path: str) -> dict[str, Any]:
|
|
try:
|
|
import yaml # type: ignore
|
|
except Exception:
|
|
_fatal(
|
|
"missing dependency PyYAML. Install with: python3 -m pip install -r scripts/stackflow/requirements.txt"
|
|
)
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
data = yaml.safe_load(f)
|
|
if not isinstance(data, dict):
|
|
_fatal(f"config must be a YAML mapping, got {type(data).__name__}")
|
|
return data
|
|
|
|
|
|
def _get(d: dict[str, Any], key: str, typ: type, *, required: bool = True) -> Any:
|
|
v = d.get(key)
|
|
if v is None:
|
|
if required:
|
|
_fatal(f"missing required field: {key}")
|
|
return None
|
|
if not isinstance(v, typ):
|
|
_fatal(f"field {key} must be {typ.__name__}, got {type(v).__name__}")
|
|
return v
|
|
|
|
|
|
def _as_str(v: Any, ctx: str) -> str:
|
|
if not isinstance(v, str) or not v.strip():
|
|
_fatal(f"{ctx} must be a non-empty string")
|
|
return v
|
|
|
|
|
|
def _normalize_record(rec: dict[str, Any]) -> dict[str, Any]:
|
|
name = rec.get("name")
|
|
rtype = rec.get("type")
|
|
if name is None or rtype is None:
|
|
_fatal("dns.records entries require name and type")
|
|
name = _as_str(name, "dns.records[].name")
|
|
rtype = _as_str(rtype, "dns.records[].type").upper()
|
|
|
|
out: dict[str, Any] = {"name": name, "type": rtype}
|
|
|
|
if "valueFrom" in rec:
|
|
out["valueFrom"] = _as_str(rec["valueFrom"], "dns.records[].valueFrom")
|
|
elif "value" in rec:
|
|
out["value"] = _as_str(rec["value"], "dns.records[].value")
|
|
else:
|
|
_fatal("dns.records entries require either value or valueFrom")
|
|
|
|
ttl = rec.get("ttl")
|
|
if ttl is not None:
|
|
if not isinstance(ttl, int) or ttl <= 0:
|
|
_fatal("dns.records[].ttl must be a positive int")
|
|
out["ttl"] = ttl
|
|
|
|
proxied = rec.get("proxied")
|
|
if proxied is not None:
|
|
if not isinstance(proxied, bool):
|
|
_fatal("dns.records[].proxied must be boolean")
|
|
out["proxied"] = proxied
|
|
|
|
return out
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Stack:
|
|
name: str
|
|
root_domain: str
|
|
dns_provider: str
|
|
cloud: str
|
|
targets: list[dict[str, Any]]
|
|
|
|
|
|
def validate(cfg: dict[str, Any]) -> Stack:
|
|
kind = _get(cfg, "kind", str)
|
|
if kind != "StackFlow":
|
|
_fatal(f"kind must be StackFlow, got {kind!r}")
|
|
|
|
md = _get(cfg, "metadata", dict)
|
|
name = _as_str(md.get("name"), "metadata.name")
|
|
|
|
g = _get(cfg, "global", dict)
|
|
root_domain = _as_str(g.get("domain"), "global.domain")
|
|
dns_provider = _as_str(g.get("dns_provider"), "global.dns_provider")
|
|
cloud = _as_str(g.get("cloud"), "global.cloud")
|
|
|
|
# Optional multi-environment overrides.
|
|
environments = g.get("environments")
|
|
if environments is not None and not isinstance(environments, dict):
|
|
_fatal("global.environments must be a mapping of env -> overrides")
|
|
|
|
targets = _get(cfg, "targets", list)
|
|
for i, t in enumerate(targets):
|
|
if not isinstance(t, dict):
|
|
_fatal(f"targets[{i}] must be a mapping")
|
|
_as_str(t.get("id"), f"targets[{i}].id")
|
|
_as_str(t.get("type"), f"targets[{i}].type")
|
|
domains = t.get("domains")
|
|
if domains is None or not isinstance(domains, list) or not domains:
|
|
_fatal(f"targets[{i}].domains must be a non-empty list")
|
|
for j, d in enumerate(domains):
|
|
fqdn = _as_str(d, f"targets[{i}].domains[{j}]")
|
|
if not (fqdn == root_domain or fqdn.endswith("." + root_domain)):
|
|
_fatal(
|
|
f"targets[{i}].domains[{j}] must be under global.domain ({root_domain}), got {fqdn}"
|
|
)
|
|
|
|
dns = t.get("dns", {})
|
|
if dns is None:
|
|
dns = {}
|
|
if not isinstance(dns, dict):
|
|
_fatal(f"targets[{i}].dns must be a mapping")
|
|
recs = dns.get("records", [])
|
|
if recs is None:
|
|
recs = []
|
|
if not isinstance(recs, list):
|
|
_fatal(f"targets[{i}].dns.records must be a list")
|
|
for k, r in enumerate(recs):
|
|
if not isinstance(r, dict):
|
|
_fatal(f"targets[{i}].dns.records[{k}] must be a mapping")
|
|
_normalize_record(r)
|
|
|
|
return Stack(
|
|
name=name,
|
|
root_domain=root_domain,
|
|
dns_provider=dns_provider,
|
|
cloud=cloud,
|
|
targets=targets,
|
|
)
|
|
|
|
|
|
def _apply_env_overrides(cfg: dict[str, Any], env_name: str | None) -> dict[str, Any]:
|
|
if env_name is None:
|
|
return cfg
|
|
g = cfg.get("global")
|
|
if not isinstance(g, dict):
|
|
return cfg
|
|
envs = g.get("environments")
|
|
if envs is None:
|
|
return cfg
|
|
if not isinstance(envs, dict):
|
|
_fatal("global.environments must be a mapping of env -> overrides")
|
|
overrides = envs.get(env_name)
|
|
if overrides is None:
|
|
_fatal(f"env not found in global.environments: {env_name}")
|
|
if not isinstance(overrides, dict):
|
|
_fatal(f"global.environments.{env_name} must be a mapping")
|
|
|
|
# Shallow-merge global overrides into global.
|
|
merged = dict(cfg)
|
|
merged_global = dict(g)
|
|
for k, v in overrides.items():
|
|
merged_global[k] = v
|
|
merged["global"] = merged_global
|
|
return merged
|
|
|
|
|
|
def dns_plan(cfg: dict[str, Any], env_name: str | None) -> dict[str, Any]:
|
|
cfg2 = _apply_env_overrides(cfg, env_name)
|
|
stack = validate(cfg2)
|
|
out: dict[str, Any] = {
|
|
"stack": stack.name,
|
|
"env": env_name or "",
|
|
"global": {
|
|
"domain": stack.root_domain,
|
|
"dns_provider": stack.dns_provider,
|
|
},
|
|
"records": [],
|
|
}
|
|
|
|
# Flatten all explicit dns.records across targets.
|
|
records: list[dict[str, Any]] = []
|
|
for t in stack.targets:
|
|
tid = t["id"]
|
|
dns = t.get("dns") or {}
|
|
recs = dns.get("records") or []
|
|
for r in recs:
|
|
nr = _normalize_record(r)
|
|
nr["target"] = tid
|
|
records.append(nr)
|
|
out["records"] = records
|
|
return out
|
|
|
|
|
|
def main(argv: list[str]) -> int:
|
|
ap = argparse.ArgumentParser(prog="stackflow")
|
|
ap.add_argument("--config", required=True, help="Path to StackFlow YAML")
|
|
ap.add_argument(
|
|
"--env",
|
|
default="",
|
|
help="Optional environment name under global.environments (e.g. dev/prod)",
|
|
)
|
|
ap.add_argument(
|
|
"--phase",
|
|
required=True,
|
|
choices=["validate", "dns-plan"],
|
|
help="Which phase to run",
|
|
)
|
|
ap.add_argument(
|
|
"--format",
|
|
default="json",
|
|
choices=["json"],
|
|
help="Output format for plan phases",
|
|
)
|
|
args = ap.parse_args(argv)
|
|
|
|
if not os.path.exists(args.config):
|
|
_fatal(f"config not found: {args.config}")
|
|
|
|
cfg = _load_yaml(args.config)
|
|
env_name = args.env.strip() or None
|
|
|
|
if args.phase == "validate":
|
|
cfg2 = _apply_env_overrides(cfg, env_name)
|
|
s = validate(cfg2)
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"ok": True,
|
|
"stack": s.name,
|
|
"env": env_name or "",
|
|
"domain": s.root_domain,
|
|
"dns_provider": s.dns_provider,
|
|
"cloud": s.cloud,
|
|
"targets": len(s.targets),
|
|
},
|
|
indent=2,
|
|
sort_keys=True,
|
|
)
|
|
)
|
|
return 0
|
|
|
|
if args.phase == "dns-plan":
|
|
plan = dns_plan(cfg, env_name)
|
|
print(json.dumps(plan, indent=2, sort_keys=True))
|
|
return 0
|
|
|
|
_fatal(f"unknown phase: {args.phase}")
|
|
return 2
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main(sys.argv[1:]))
|