gitops/scripts/stackflow/runner.py

259 lines
7.7 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import sys
from dataclasses import dataclass
from typing import Any
def _fatal(msg: str, code: int = 2) -> None:
print(f"stackflow: {msg}", file=sys.stderr)
raise SystemExit(code)
def _load_yaml(path: str) -> dict[str, Any]:
try:
import yaml # type: ignore
except Exception:
_fatal(
"missing dependency PyYAML. Install with: python3 -m pip install -r scripts/stackflow/requirements.txt"
)
with open(path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
if not isinstance(data, dict):
_fatal(f"config must be a YAML mapping, got {type(data).__name__}")
return data
def _get(d: dict[str, Any], key: str, typ: type, *, required: bool = True) -> Any:
v = d.get(key)
if v is None:
if required:
_fatal(f"missing required field: {key}")
return None
if not isinstance(v, typ):
_fatal(f"field {key} must be {typ.__name__}, got {type(v).__name__}")
return v
def _as_str(v: Any, ctx: str) -> str:
if not isinstance(v, str) or not v.strip():
_fatal(f"{ctx} must be a non-empty string")
return v
def _normalize_record(rec: dict[str, Any]) -> dict[str, Any]:
name = rec.get("name")
rtype = rec.get("type")
if name is None or rtype is None:
_fatal("dns.records entries require name and type")
name = _as_str(name, "dns.records[].name")
rtype = _as_str(rtype, "dns.records[].type").upper()
out: dict[str, Any] = {"name": name, "type": rtype}
if "valueFrom" in rec:
out["valueFrom"] = _as_str(rec["valueFrom"], "dns.records[].valueFrom")
elif "value" in rec:
out["value"] = _as_str(rec["value"], "dns.records[].value")
else:
_fatal("dns.records entries require either value or valueFrom")
ttl = rec.get("ttl")
if ttl is not None:
if not isinstance(ttl, int) or ttl <= 0:
_fatal("dns.records[].ttl must be a positive int")
out["ttl"] = ttl
proxied = rec.get("proxied")
if proxied is not None:
if not isinstance(proxied, bool):
_fatal("dns.records[].proxied must be boolean")
out["proxied"] = proxied
return out
@dataclass(frozen=True)
class Stack:
name: str
root_domain: str
dns_provider: str
cloud: str
targets: list[dict[str, Any]]
def validate(cfg: dict[str, Any]) -> Stack:
kind = _get(cfg, "kind", str)
if kind != "StackFlow":
_fatal(f"kind must be StackFlow, got {kind!r}")
md = _get(cfg, "metadata", dict)
name = _as_str(md.get("name"), "metadata.name")
g = _get(cfg, "global", dict)
root_domain = _as_str(g.get("domain"), "global.domain")
dns_provider = _as_str(g.get("dns_provider"), "global.dns_provider")
cloud = _as_str(g.get("cloud"), "global.cloud")
# Optional multi-environment overrides.
environments = g.get("environments")
if environments is not None and not isinstance(environments, dict):
_fatal("global.environments must be a mapping of env -> overrides")
targets = _get(cfg, "targets", list)
for i, t in enumerate(targets):
if not isinstance(t, dict):
_fatal(f"targets[{i}] must be a mapping")
_as_str(t.get("id"), f"targets[{i}].id")
_as_str(t.get("type"), f"targets[{i}].type")
domains = t.get("domains")
if domains is None or not isinstance(domains, list) or not domains:
_fatal(f"targets[{i}].domains must be a non-empty list")
for j, d in enumerate(domains):
fqdn = _as_str(d, f"targets[{i}].domains[{j}]")
if not (fqdn == root_domain or fqdn.endswith("." + root_domain)):
_fatal(
f"targets[{i}].domains[{j}] must be under global.domain ({root_domain}), got {fqdn}"
)
dns = t.get("dns", {})
if dns is None:
dns = {}
if not isinstance(dns, dict):
_fatal(f"targets[{i}].dns must be a mapping")
recs = dns.get("records", [])
if recs is None:
recs = []
if not isinstance(recs, list):
_fatal(f"targets[{i}].dns.records must be a list")
for k, r in enumerate(recs):
if not isinstance(r, dict):
_fatal(f"targets[{i}].dns.records[{k}] must be a mapping")
_normalize_record(r)
return Stack(
name=name,
root_domain=root_domain,
dns_provider=dns_provider,
cloud=cloud,
targets=targets,
)
def _apply_env_overrides(cfg: dict[str, Any], env_name: str | None) -> dict[str, Any]:
if env_name is None:
return cfg
g = cfg.get("global")
if not isinstance(g, dict):
return cfg
envs = g.get("environments")
if envs is None:
return cfg
if not isinstance(envs, dict):
_fatal("global.environments must be a mapping of env -> overrides")
overrides = envs.get(env_name)
if overrides is None:
_fatal(f"env not found in global.environments: {env_name}")
if not isinstance(overrides, dict):
_fatal(f"global.environments.{env_name} must be a mapping")
# Shallow-merge global overrides into global.
merged = dict(cfg)
merged_global = dict(g)
for k, v in overrides.items():
merged_global[k] = v
merged["global"] = merged_global
return merged
def dns_plan(cfg: dict[str, Any], env_name: str | None) -> dict[str, Any]:
cfg2 = _apply_env_overrides(cfg, env_name)
stack = validate(cfg2)
out: dict[str, Any] = {
"stack": stack.name,
"env": env_name or "",
"global": {
"domain": stack.root_domain,
"dns_provider": stack.dns_provider,
},
"records": [],
}
# Flatten all explicit dns.records across targets.
records: list[dict[str, Any]] = []
for t in stack.targets:
tid = t["id"]
dns = t.get("dns") or {}
recs = dns.get("records") or []
for r in recs:
nr = _normalize_record(r)
nr["target"] = tid
records.append(nr)
out["records"] = records
return out
def main(argv: list[str]) -> int:
ap = argparse.ArgumentParser(prog="stackflow")
ap.add_argument("--config", required=True, help="Path to StackFlow YAML")
ap.add_argument(
"--env",
default="",
help="Optional environment name under global.environments (e.g. dev/prod)",
)
ap.add_argument(
"--phase",
required=True,
choices=["validate", "dns-plan"],
help="Which phase to run",
)
ap.add_argument(
"--format",
default="json",
choices=["json"],
help="Output format for plan phases",
)
args = ap.parse_args(argv)
if not os.path.exists(args.config):
_fatal(f"config not found: {args.config}")
cfg = _load_yaml(args.config)
env_name = args.env.strip() or None
if args.phase == "validate":
cfg2 = _apply_env_overrides(cfg, env_name)
s = validate(cfg2)
print(
json.dumps(
{
"ok": True,
"stack": s.name,
"env": env_name or "",
"domain": s.root_domain,
"dns_provider": s.dns_provider,
"cloud": s.cloud,
"targets": len(s.targets),
},
indent=2,
sort_keys=True,
)
)
return 0
if args.phase == "dns-plan":
plan = dns_plan(cfg, env_name)
print(json.dumps(plan, indent=2, sort_keys=True))
return 0
_fatal(f"unknown phase: {args.phase}")
return 2
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))