diff --git a/src/flow/services/bootstrap.py b/src/flow/services/bootstrap.py index 70f2fc3..0743a53 100644 --- a/src/flow/services/bootstrap.py +++ b/src/flow/services/bootstrap.py @@ -1,1001 +1,77 @@ -"""Bootstrap domain logic.""" +# src/flow/services/bootstrap.py +"""BootstrapService -- orchestrates system setup.""" -import argparse -import os -import re -import shlex -import shutil -import sys -import tempfile -import urllib.request -from pathlib import Path -from typing import Any, Dict, List, Optional +from __future__ import annotations -import yaml +from typing import Any, Optional -from flow.commands import dotfiles as dotfiles_cmd from flow.core.config import FlowContext -from flow.core.process import run_command -from flow.core.system import FileSystem -from flow.core.variables import substitute_template +from flow.core.errors import FlowError +from flow.domain.bootstrap.planning import parse_profile, plan_bootstrap -DEFAULT_LOCALE = "en_US.UTF-8" -PACKAGE_TYPES = {"pkg", "binary", "cask"} -_FS = FileSystem() +class BootstrapService: + def __init__(self, ctx: FlowContext): + self.ctx = ctx -def register(subparsers): - p = subparsers.add_parser( - "bootstrap", - aliases=["setup", "provision"], - help="Environment provisioning", - ) - sub = p.add_subparsers(dest="bootstrap_command") + def run( + self, + profile_name: str, + *, + dry_run: bool = False, + ) -> None: + """Run bootstrap for a profile.""" + profiles = self.ctx.manifest.get("profiles", {}) + if profile_name not in profiles: + raise FlowError(f"Unknown profile: {profile_name}") - run_p = sub.add_parser("run", help="Run bootstrap actions") - run_p.add_argument("--profile", help="Profile name to use") - run_p.add_argument("--dry-run", action="store_true", help="Show plan without executing") - run_p.add_argument("--var", action="append", default=[], help="Set variable KEY=VALUE") - run_p.set_defaults(handler=run_bootstrap) + profile = parse_profile(profile_name, profiles[profile_name]) + plan = plan_bootstrap(profile, self.ctx.manifest) - ls = sub.add_parser("list", help="List available profiles") - ls.set_defaults(handler=run_list) + self.ctx.console.info(f"Bootstrap profile: {profile_name}") + self.ctx.console.print_plan(plan.actions, verb="bootstrap") - show = sub.add_parser("show", help="Show profile configuration") - show.add_argument("profile", help="Profile name") - show.set_defaults(handler=run_show) - - packages = sub.add_parser("packages", help="List packages defined in profiles") - packages.add_argument("--profile", help="Profile name (default: all profiles)") - packages.add_argument( - "--resolved", - action="store_true", - help="Show resolved package names for detected package manager", - ) - packages.set_defaults(handler=run_packages) - - p.set_defaults(handler=lambda ctx, args: p.print_help()) - - -def _get_profiles(ctx: FlowContext) -> dict: - profiles = ctx.manifest.get("profiles") - if profiles is None: - if "environments" in ctx.manifest: - raise RuntimeError( - "Manifest key 'environments' is no longer supported. Rename it to 'profiles'." - ) - return {} - - if not isinstance(profiles, dict): - raise RuntimeError("Manifest key 'profiles' must be a mapping") - - return profiles - - -def _parse_variables(var_args: list) -> dict: - variables = {} - for item in var_args: - if "=" not in item: - raise ValueError(f"Invalid --var value '{item}'. Expected KEY=VALUE") - key, value = item.split("=", 1) - if not key: - raise ValueError(f"Invalid --var value '{item}'. KEY cannot be empty") - variables[key] = value - return variables - - -def _profile_template_context( - ctx: FlowContext, - extra_env: Dict[str, str], - extra: Optional[Dict[str, Any]] = None, -) -> Dict[str, Any]: - env_map = dict(os.environ) - env_map.update(extra_env) - - template_ctx: Dict[str, Any] = { - "env": env_map, - "os": ctx.platform.os, - "arch": ctx.platform.arch, - } - if extra: - template_ctx.update(extra) - return template_ctx - - -def _render_template_value(value: Any, template_ctx: Dict[str, Any]) -> Any: - if isinstance(value, str): - return substitute_template(value, template_ctx) - if isinstance(value, list): - return [_render_template_value(item, template_ctx) for item in value] - if isinstance(value, dict): - return {k: _render_template_value(v, template_ctx) for k, v in value.items()} - return value - - -def _linux_detect_package_manager() -> Optional[str]: - if shutil.which("apt") or shutil.which("apt-get"): - return "apt" - if shutil.which("dnf"): - return "dnf" - return None - - -def _resolve_package_manager(ctx: FlowContext, profile_cfg: dict) -> str: - explicit = profile_cfg.get("package-manager") - if isinstance(explicit, str) and explicit: - return explicit - - profile_os = profile_cfg.get("os") - if profile_os == "macos": - return "brew" - if profile_os == "linux": - detected = _linux_detect_package_manager() - if detected: - return detected - raise RuntimeError("Unable to auto-detect package manager (expected apt or dnf)") - - raise RuntimeError("Profile 'os' must be set to 'linux' or 'macos'") - - -def _get_package_catalog(ctx: FlowContext) -> Dict[str, Dict[str, Any]]: - raw = ctx.manifest.get("packages", []) - catalog: Dict[str, Dict[str, Any]] = {} - - if isinstance(raw, dict): - # Also support mapping form: packages: {name: {...}} - for name, definition in raw.items(): - if not isinstance(definition, dict): - continue - pkg = dict(definition) - pkg["name"] = str(pkg.get("name") or name) - pkg.setdefault("type", "pkg") - catalog[pkg["name"]] = pkg - return catalog - - if not isinstance(raw, list): - return catalog - - for item in raw: - if not isinstance(item, dict): - continue - name = item.get("name") - if not isinstance(name, str) or not name: - continue - pkg = dict(item) - pkg.setdefault("type", "pkg") - catalog[name] = pkg - - return catalog - - -def _normalize_profile_package_entry(entry: Any) -> Dict[str, Any]: - if isinstance(entry, str): - if "/" in entry: - prefix, name = entry.split("/", 1) - if prefix in PACKAGE_TYPES and name: - return {"name": name, "type": prefix} - return {"name": entry} - - if isinstance(entry, dict): - if not isinstance(entry.get("name"), str) or not entry["name"]: - raise RuntimeError("Package object entries must include a non-empty 'name'") - return dict(entry) - - raise RuntimeError(f"Unsupported package entry: {entry!r}") - - -def _resolve_package_spec( - catalog: Dict[str, Dict[str, Any]], - profile_entry: Dict[str, Any], -) -> Dict[str, Any]: - name = profile_entry["name"] - base = dict(catalog.get(name, {})) - merged = dict(base) - merged.update(profile_entry) - merged["name"] = name - - pkg_type = merged.get("type") or "pkg" - if pkg_type not in PACKAGE_TYPES: - raise RuntimeError(f"Unsupported package type '{pkg_type}' for package '{name}'") - merged["type"] = pkg_type - - return merged - - -def _resolve_pkg_source_name(spec: Dict[str, Any], package_manager: str) -> str: - sources = spec.get("sources", {}) - if not isinstance(sources, dict): - return spec["name"] - - keys = [package_manager] - if package_manager == "apt": - keys.append("apt-get") - if package_manager == "apt-get": - keys.append("apt") - - for key in keys: - value = sources.get(key) - if isinstance(value, str) and value: - return value - - return spec["name"] - - -def _platform_lookup_keys(ctx: FlowContext) -> List[str]: - keys = [ctx.platform.platform] - - if ctx.platform.os == "macos": - keys.append(f"darwin-{ctx.platform.arch}") - - if ctx.platform.arch == "x64": - keys.append(f"{ctx.platform.os}-amd64") - if ctx.platform.os == "macos": - keys.append("darwin-amd64") - - unique: List[str] = [] - for key in keys: - if key not in unique: - unique.append(key) - return unique - - -def _resolve_binary_platform_vars(ctx: FlowContext, spec: Dict[str, Any]) -> Dict[str, str]: - platform_vars = { - "os": ctx.platform.os, - "arch": ctx.platform.arch, - } - - platform_map = spec.get("platform-map", {}) - if isinstance(platform_map, dict): - for key in _platform_lookup_keys(ctx): - mapping = platform_map.get(key) - if isinstance(mapping, dict): - for mk, mv in mapping.items(): - if isinstance(mv, str): - platform_vars[mk] = mv - break - - return platform_vars - - -def _resolve_binary_asset(ctx: FlowContext, spec: Dict[str, Any], template_ctx: Dict[str, Any]) -> str: - assets = spec.get("assets", {}) - if isinstance(assets, dict) and assets: - for key in _platform_lookup_keys(ctx): - value = assets.get(key) - if isinstance(value, str) and value: - return substitute_template(value, template_ctx) - raise RuntimeError( - f"No binary asset mapping for platform {ctx.platform.platform} in package '{spec['name']}'" - ) - - pattern = spec.get("asset-pattern") - if not isinstance(pattern, str) or not pattern: - raise RuntimeError( - f"Binary package '{spec['name']}' must define either 'assets' or 'asset-pattern'" - ) - - return substitute_template(pattern, template_ctx) - - -def _resolve_binary_download_url( - spec: Dict[str, Any], - asset_name: str, - template_ctx: Dict[str, Any], -) -> str: - source = spec.get("source") - if not isinstance(source, str) or not source: - raise RuntimeError(f"Binary package '{spec['name']}' is missing 'source'") - - version = str(spec.get("version", "")) - if source.startswith("github:"): - owner_repo = source[len("github:") :] - if not owner_repo: - raise RuntimeError(f"Invalid github source in package '{spec['name']}'") - if not version: - raise RuntimeError(f"Binary package '{spec['name']}' requires 'version'") - return f"https://github.com/{owner_repo}/releases/download/v{version}/{asset_name}" - - rendered_source = substitute_template(source, template_ctx) - if not asset_name: - return rendered_source - - if rendered_source.endswith(asset_name): - return rendered_source - - if rendered_source.endswith("/"): - return rendered_source + asset_name - - return f"{rendered_source}/{asset_name}" - - -def _strip_prefix(path: Path, prefix: Path) -> Path: - try: - return path.relative_to(prefix) - except ValueError: - return path - - -def _is_under(path: Path, root: Path) -> bool: - try: - path.resolve().relative_to(root.resolve()) - return True - except ValueError: - return False - - -def _validate_declared_install_path(package_name: str, declared_path: Path) -> None: - if declared_path.is_absolute(): - raise RuntimeError( - f"Install path for '{package_name}' must be relative: {declared_path}" - ) - if any(part == ".." for part in declared_path.parts): - raise RuntimeError( - f"Install path for '{package_name}' must not include parent traversal: {declared_path}" - ) - - -def _install_destination(kind: str) -> Path: - home = Path.home() - if kind == "bin": - return home / ".local" / "bin" - if kind == "share": - return home / ".local" / "share" - if kind == "man": - return home / ".local" / "share" / "man" - if kind == "lib": - return home / ".local" / "lib" - raise RuntimeError(f"Unsupported install section: {kind}") - - -def _install_strip_prefix(kind: str) -> Path: - if kind == "bin": - return Path("bin") - if kind == "share": - return Path("share") - if kind == "man": - return Path("share") / "man" - if kind == "lib": - return Path("lib") - return Path(".") - - -def _copy_install_item(kind: str, src: Path, declared_path: Path) -> None: - destination_root = _install_destination(kind) - stripped = _strip_prefix(declared_path, _install_strip_prefix(kind)) - destination = destination_root / stripped - - if src.is_dir(): - _FS.copy_tree(src, destination) - else: - _FS.copy_file(src, destination) - if kind == "bin": - destination.chmod(destination.stat().st_mode | 0o111) - - -def _install_binary_package( - ctx: FlowContext, - spec: Dict[str, Any], - extra_env: Dict[str, str], - dry_run: bool, -) -> None: - version = str(spec.get("version", "")) - platform_vars = _resolve_binary_platform_vars(ctx, spec) - template_ctx = _profile_template_context( - ctx, - extra_env, - { - "name": spec["name"], - "version": version, - **platform_vars, - }, - ) - - asset_name = _resolve_binary_asset(ctx, spec, template_ctx) - template_ctx["asset"] = asset_name - download_url = _resolve_binary_download_url(spec, asset_name, template_ctx) - template_ctx["downloadUrl"] = download_url - - if dry_run: - ctx.console.info(f"[{spec['name']}] Would download: {download_url}") - return - - install = spec.get("install", {}) - if not isinstance(install, dict) or not install: - raise RuntimeError(f"Binary package '{spec['name']}' must define non-empty 'install'") - - with tempfile.TemporaryDirectory(prefix=f"flow-{spec['name']}-") as tmp: - tmp_dir = Path(tmp) - archive_path = tmp_dir / asset_name - - ctx.console.info(f"Downloading {spec['name']} from {download_url}") - with urllib.request.urlopen(download_url, timeout=60) as response: - _FS.write_bytes(archive_path, response.read()) - - extracted = tmp_dir / "extract" - _FS.ensure_dir(extracted) - try: - shutil.unpack_archive(str(archive_path), str(extracted)) - except (shutil.ReadError, ValueError) as e: - raise RuntimeError( - f"Could not extract archive for '{spec['name']}': {e}" - ) from e - - extract_dir_value = str(spec.get("extract-dir", ".")) - extract_dir_value = substitute_template(extract_dir_value, template_ctx) - if extract_dir_value == ".": - source_root = extracted - else: - source_root = extracted / extract_dir_value - - if not source_root.exists(): - raise RuntimeError( - f"extract-dir '{extract_dir_value}' not found for package '{spec['name']}'" - ) - source_root_resolved = source_root.resolve() - - for kind in ("bin", "share", "man", "lib"): - items = install.get(kind, []) - if not isinstance(items, list): - continue - - for raw_item in items: - if not isinstance(raw_item, str): - continue - - rendered = substitute_template(raw_item, template_ctx) - declared_path = Path(rendered) - _validate_declared_install_path(spec["name"], declared_path) - - src = (source_root / declared_path).resolve() - if not _is_under(src, source_root_resolved): - raise RuntimeError( - f"Install path escapes extract-dir for '{spec['name']}': {declared_path}" - ) - if not src.exists(): - raise RuntimeError( - f"Install path not found for '{spec['name']}': {declared_path}" - ) - _copy_install_item(kind, src, declared_path) - - -def _script_uses_sudo(script: str) -> bool: - return re.search(r"(^|\s)sudo(\s|$)", script) is not None - - -def _run_script( - ctx: FlowContext, - script: str, - template_ctx: Dict[str, Any], - *, - dry_run: bool, - allow_sudo: bool, - description: str, -) -> None: - rendered = substitute_template(script, template_ctx) - if not allow_sudo and _script_uses_sudo(rendered): - ctx.console.warn(f"Skipping {description}: sudo is blocked (set allow_sudo: true)") - return - - if dry_run: - ctx.console.info(f"Would run {description}:") - for line in rendered.splitlines(): - if line.strip(): - print(f" {line}") - return - - run_command(rendered, ctx.console) - - -def _run_one_command(ctx: FlowContext, command: str, dry_run: bool) -> None: - if dry_run: - print(f" $ {command}") - return - run_command(command, ctx.console) - - -def _ensure_shell_installed( - ctx: FlowContext, - shell_name: str, - package_manager: str, - package_catalog: Dict[str, Dict[str, Any]], - extra_env: Dict[str, str], - *, - dry_run: bool, - pm_state: Dict[str, bool], -) -> None: - if shutil.which(shell_name): - return - - shell_spec = package_catalog.get(shell_name, {"name": shell_name, "type": "pkg"}) - shell_spec = dict(shell_spec) - shell_spec["name"] = shell_name - shell_spec.setdefault("type", "pkg") - - ctx.console.info(f"Shell '{shell_name}' is missing; installing it first") - _install_package( - ctx, - shell_spec, - package_manager, - extra_env, - dry_run=dry_run, - pm_state=pm_state, - ) - - -def _set_shell(ctx: FlowContext, shell_name: str, *, dry_run: bool) -> None: - shell_path = shutil.which(shell_name) - if not shell_path: - raise RuntimeError(f"Shell not found after installation: {shell_name}") - - quoted_path = shlex.quote(shell_path) - quoted_user = shlex.quote(os.environ.get("USER", "")) - - try: - with open("/etc/shells", "r", encoding="utf-8") as handle: - shell_lines = handle.read() - except OSError: - shell_lines = "" - - if shell_path not in shell_lines: - _run_one_command( - ctx, - f"echo {quoted_path} | sudo tee -a /etc/shells >/dev/null", - dry_run, - ) - - _run_one_command(ctx, f"sudo chsh -s {quoted_path} {quoted_user}", dry_run) - - -def _set_hostname(ctx: FlowContext, hostname: str, *, dry_run: bool) -> None: - quoted = shlex.quote(hostname) - if ctx.platform.os == "macos": - _run_one_command(ctx, f"sudo scutil --set ComputerName {quoted}", dry_run) - _run_one_command(ctx, f"sudo scutil --set HostName {quoted}", dry_run) - _run_one_command(ctx, f"sudo scutil --set LocalHostName {quoted}", dry_run) - else: - _run_one_command(ctx, f"sudo hostnamectl set-hostname {quoted}", dry_run) - - -def _set_locale(ctx: FlowContext, locale: str, *, dry_run: bool) -> None: - if ctx.platform.os != "linux": - return - quoted = shlex.quote(locale) - _run_one_command(ctx, f"sudo locale-gen {quoted}", dry_run) - _run_one_command(ctx, f"sudo update-locale LANG={quoted}", dry_run) - - -def _ensure_required_variables(profile_cfg: Dict[str, Any], env_map: Dict[str, str]) -> None: - requires = profile_cfg.get("requires", []) - if not isinstance(requires, list): - raise RuntimeError("Profile 'requires' must be a list") - - missing = [] - for key in requires: - if not isinstance(key, str) or not key: - continue - if env_map.get(key, "") == "": - missing.append(key) - - if missing: - raise RuntimeError( - "Missing required environment variables: " - + ", ".join(missing) - + ". Export them or pass with --var KEY=VALUE." - ) - - -def _pm_update_command(pm: str) -> str: - if pm in ("apt", "apt-get"): - return "sudo apt update -qq" - if pm == "dnf": - return "sudo dnf makecache -q" - if pm == "brew": - return "brew update" - return f"sudo {shlex.quote(pm)} update" - - -def _pm_install_command(pm: str, packages: List[str], pkg_type: str) -> str: - pkg_args = " ".join(shlex.quote(pkg) for pkg in packages) - if pm in ("apt", "apt-get"): - return f"sudo apt install -y {pkg_args}" - if pm == "dnf": - return f"sudo dnf install -y {pkg_args}" - if pm == "brew" and pkg_type == "cask": - return f"brew install --cask {pkg_args}" - if pm == "brew": - return f"brew install {pkg_args}" - return f"sudo {shlex.quote(pm)} install {pkg_args}" - - -def _install_package( - ctx: FlowContext, - spec: Dict[str, Any], - package_manager: str, - extra_env: Dict[str, str], - *, - dry_run: bool, - pm_state: Dict[str, bool], -) -> None: - pkg_type = spec.get("type", "pkg") - - if pkg_type in {"pkg", "cask"} and not pm_state.get("updated"): - _run_one_command(ctx, _pm_update_command(package_manager), dry_run) - pm_state["updated"] = True - - if pkg_type == "pkg": - package_name = _resolve_pkg_source_name(spec, package_manager) - _run_one_command( - ctx, - _pm_install_command(package_manager, [package_name], "pkg"), - dry_run, - ) - return - - if pkg_type == "cask": - if package_manager != "brew": - ctx.console.warn(f"Skipping cask package on non-brew system: {spec['name']}") + if dry_run: return - package_name = _resolve_pkg_source_name(spec, "brew") - _run_one_command( - ctx, - _pm_install_command(package_manager, [package_name], "cask"), - dry_run, - ) - return - if pkg_type == "binary": - _install_binary_package(ctx, spec, extra_env, dry_run) - return + for action in plan.actions: + self.ctx.console.info(f" {action}") - raise RuntimeError(f"Unsupported package type: {pkg_type}") + if action.phase == "packages": + # Delegate to PackageService + from flow.services.packages import PackageService + pkg_svc = PackageService(self.ctx) + pkg_names = [p.name for p in plan.packages_to_install] + if pkg_names: + pkg_svc.install(pkg_names) + continue + if action.phase == "dotfiles": + # Delegate to DotfilesService + from flow.services.dotfiles import DotfilesService + dot_svc = DotfilesService(self.ctx) + dot_svc.link(profile=profile_name) + continue -def _run_package_post_install( - ctx: FlowContext, - spec: Dict[str, Any], - extra_env: Dict[str, str], - *, - dry_run: bool, -) -> None: - script = spec.get("post-install") - if not isinstance(script, str) or not script.strip(): - return + # Execute shell commands + for cmd in action.commands: + self.ctx.runtime.runner.run_shell(cmd, check=True) - allow_sudo = bool(spec.get("allow_sudo", False)) - extra_ctx = { - "name": spec["name"], - "version": str(spec.get("version", "")), - } - if spec.get("type") == "binary": - extra_ctx.update(_resolve_binary_platform_vars(ctx, spec)) + self.ctx.console.success(f"Bootstrap complete for {profile_name}.") - template_ctx = _profile_template_context( - ctx, - extra_env, - extra_ctx, - ) - _run_script( - ctx, - script, - template_ctx, - dry_run=dry_run, - allow_sudo=allow_sudo, - description=f"post-install hook for {spec['name']}", - ) + def show(self, profile_name: str) -> None: + """Show bootstrap plan without executing.""" + self.run(profile_name, dry_run=True) + def list_profiles(self) -> None: + """List available profiles.""" + profiles = self.ctx.manifest.get("profiles", {}) + if not profiles: + self.ctx.console.info("No profiles defined in manifest.") + return -def _run_runcmd( - ctx: FlowContext, - profile_cfg: Dict[str, Any], - extra_env: Dict[str, str], - *, - dry_run: bool, -) -> None: - commands = profile_cfg.get("runcmd", []) - if not isinstance(commands, list): - raise RuntimeError("Profile 'runcmd' must be a list") - - template_ctx = _profile_template_context(ctx, extra_env) - for command in commands: - if not isinstance(command, str) or not command.strip(): - continue - rendered = substitute_template(command, template_ctx) - _run_one_command(ctx, rendered, dry_run) - - -def _run_ssh_keygen( - ctx: FlowContext, - profile_cfg: Dict[str, Any], - extra_env: Dict[str, str], - *, - dry_run: bool, -) -> None: - ssh_keygen = profile_cfg.get("ssh-keygen", profile_cfg.get("ssh_keygen", [])) - if not isinstance(ssh_keygen, list): - raise RuntimeError("Profile 'ssh-keygen' must be a list") - - template_ctx = _profile_template_context(ctx, extra_env) - ssh_dir = Path.home() / ".ssh" - if dry_run: - print(f" $ mkdir -p {ssh_dir}") - else: - _FS.ensure_dir(ssh_dir, mode=0o700) - - for entry in ssh_keygen: - if not isinstance(entry, dict): - continue - - key_type = str(entry.get("type", "ed25519")) - filename = str(entry.get("filename", f"id_{key_type}")) - key_path = ssh_dir / filename - if key_path.exists(): - ctx.console.warn(f"SSH key already exists: {key_path}") - continue - - comment = str(_render_template_value(entry.get("comment", ""), template_ctx)) - bits = entry.get("bits") - - command = [ - "ssh-keygen", - "-t", - shlex.quote(key_type), - "-f", - shlex.quote(str(key_path)), - "-N", - '""', - "-C", - shlex.quote(comment), + rows = [ + [name, data.get("os", "linux"), data.get("hostname", "-")] + for name, data in sorted(profiles.items()) ] - if bits: - command.extend(["-b", shlex.quote(str(bits))]) - - _run_one_command(ctx, " ".join(command), dry_run) - _run_one_command(ctx, f"chmod 600 {shlex.quote(str(key_path))}", dry_run) - - -def _run_post_link( - ctx: FlowContext, - profile_cfg: Dict[str, Any], - extra_env: Dict[str, str], - *, - dry_run: bool, -) -> None: - script = profile_cfg.get("post-link") - if not script: - script = profile_cfg.get("post-config") - - if not isinstance(script, str) or not script.strip(): - return - - template_ctx = _profile_template_context(ctx, extra_env) - _run_script( - ctx, - script, - template_ctx, - dry_run=dry_run, - allow_sudo=True, - description="post-link hook", - ) - - -def _auto_link_profile_configs(ctx: FlowContext, profile_name: str, *, dry_run: bool) -> None: - link_args = argparse.Namespace( - packages=[], - profile=profile_name, - copy=False, - force=False, - dry_run=dry_run, - ) - dotfiles_cmd.run_link(ctx, link_args) - - -def run_bootstrap(ctx: FlowContext, args): - profiles = _get_profiles(ctx) - if not profiles: - ctx.console.error("No profiles found in manifest.") - sys.exit(1) - - profile_name = args.profile - if not profile_name: - if len(profiles) == 1: - profile_name = next(iter(profiles)) - else: - ctx.console.error( - f"Multiple profiles available. Specify with --profile: {', '.join(sorted(profiles.keys()))}" - ) - sys.exit(1) - - if profile_name not in profiles: - ctx.console.error( - f"Profile not found: {profile_name}. Available: {', '.join(sorted(profiles.keys()))}" - ) - sys.exit(1) - - profile_cfg = profiles[profile_name] - if not isinstance(profile_cfg, dict): - ctx.console.error(f"Profile '{profile_name}' must be a mapping") - sys.exit(1) - - profile_os = profile_cfg.get("os") - if profile_os not in {"linux", "macos"}: - ctx.console.error( - f"Profile '{profile_name}' must define os: linux|macos" - ) - sys.exit(1) - - if profile_os != ctx.platform.os: - ctx.console.error( - f"Profile '{profile_name}' targets '{profile_os}', current OS is '{ctx.platform.os}'" - ) - sys.exit(1) - - try: - cli_vars = _parse_variables(args.var) - package_manager = _resolve_package_manager(ctx, profile_cfg) - _ensure_required_variables(profile_cfg, {**os.environ, **cli_vars}) - except ValueError as e: - ctx.console.error(str(e)) - sys.exit(1) - except RuntimeError as e: - ctx.console.error(str(e)) - sys.exit(1) - - package_catalog = _get_package_catalog(ctx) - pm_state = {"updated": False} - - template_ctx = _profile_template_context(ctx, cli_vars) - - if "hostname" in profile_cfg: - hostname = str(_render_template_value(profile_cfg["hostname"], template_ctx)) - _set_hostname(ctx, hostname, dry_run=args.dry_run) - - locale = str(profile_cfg.get("locale", DEFAULT_LOCALE)) - _set_locale(ctx, locale, dry_run=args.dry_run) - - shell_name = profile_cfg.get("shell") - if isinstance(shell_name, str) and shell_name: - _ensure_shell_installed( - ctx, - shell_name, - package_manager, - package_catalog, - cli_vars, - dry_run=args.dry_run, - pm_state=pm_state, - ) - - profile_packages = profile_cfg.get("packages", []) - if not isinstance(profile_packages, list): - ctx.console.error("Profile 'packages' must be a list") - sys.exit(1) - - for raw_entry in profile_packages: - try: - normalized = _normalize_profile_package_entry(raw_entry) - spec = _resolve_package_spec(package_catalog, normalized) - except RuntimeError as e: - ctx.console.error(str(e)) - sys.exit(1) - - if spec.get("skip"): - ctx.console.info(f"Skipping package {spec['name']} (skip=true)") - continue - - ctx.console.info(f"Installing package: {spec['name']} ({spec['type']})") - try: - _install_package( - ctx, - spec, - package_manager, - cli_vars, - dry_run=args.dry_run, - pm_state=pm_state, - ) - _run_package_post_install(ctx, spec, cli_vars, dry_run=args.dry_run) - except RuntimeError as e: - ctx.console.error(str(e)) - sys.exit(1) - - if isinstance(shell_name, str) and shell_name: - try: - _set_shell(ctx, shell_name, dry_run=args.dry_run) - except RuntimeError as e: - ctx.console.error(str(e)) - sys.exit(1) - - try: - _run_ssh_keygen(ctx, profile_cfg, cli_vars, dry_run=args.dry_run) - _run_runcmd(ctx, profile_cfg, cli_vars, dry_run=args.dry_run) - _auto_link_profile_configs(ctx, profile_name, dry_run=args.dry_run) - _run_post_link(ctx, profile_cfg, cli_vars, dry_run=args.dry_run) - except RuntimeError as e: - ctx.console.error(str(e)) - sys.exit(1) - except SystemExit: - raise - - -def run_list(ctx: FlowContext, args): - profiles = _get_profiles(ctx) - if not profiles: - ctx.console.info("No profiles defined in manifest.") - return - - headers = ["PROFILE", "OS", "PM", "PACKAGES", "REQUIRES"] - rows = [] - for name, profile_cfg in sorted(profiles.items()): - if not isinstance(profile_cfg, dict): - continue - os_name = str(profile_cfg.get("os", "?")) - pm = str(profile_cfg.get("package-manager", "auto")) - packages = profile_cfg.get("packages", []) - package_count = len(packages) if isinstance(packages, list) else 0 - requires = profile_cfg.get("requires", []) - requires_count = len(requires) if isinstance(requires, list) else 0 - rows.append([name, os_name, pm, str(package_count), str(requires_count)]) - - ctx.console.table(headers, rows) - - -def run_show(ctx: FlowContext, args): - profiles = _get_profiles(ctx) - profile_name = args.profile - - if profile_name not in profiles: - ctx.console.error( - f"Profile not found: {profile_name}. Available: {', '.join(sorted(profiles.keys()))}" - ) - sys.exit(1) - - print(yaml.safe_dump({profile_name: profiles[profile_name]}, sort_keys=False).rstrip()) - - -def run_packages(ctx: FlowContext, args): - profiles = _get_profiles(ctx) - if not profiles: - ctx.console.info("No profiles defined in manifest.") - return - - if args.profile: - if args.profile not in profiles: - ctx.console.error( - f"Profile not found: {args.profile}. Available: {', '.join(sorted(profiles.keys()))}" - ) - sys.exit(1) - selected_profiles = [(args.profile, profiles[args.profile])] - else: - selected_profiles = sorted(profiles.items()) - - package_catalog = _get_package_catalog(ctx) - rows = [] - for profile_name, profile_cfg in selected_profiles: - if not isinstance(profile_cfg, dict): - continue - - pm = _resolve_package_manager(ctx, profile_cfg) - profile_packages = profile_cfg.get("packages", []) - if not isinstance(profile_packages, list): - continue - - for raw_entry in profile_packages: - normalized = _normalize_profile_package_entry(raw_entry) - spec = _resolve_package_spec(package_catalog, normalized) - - if args.resolved: - if spec["type"] in {"pkg", "cask"}: - resolved = _resolve_pkg_source_name(spec, pm) - else: - resolved = spec.get("asset-pattern", spec.get("source", "")) - rows.append([profile_name, pm, spec["type"], spec["name"], str(resolved)]) - else: - rows.append([profile_name, spec["type"], spec["name"]]) - - if not rows: - ctx.console.info("No packages defined in selected profile(s).") - return - - if args.resolved: - ctx.console.table(["PROFILE", "PM", "TYPE", "PACKAGE", "RESOLVED"], rows) - else: - ctx.console.table(["PROFILE", "TYPE", "PACKAGE"], rows) + self.ctx.console.table(["PROFILE", "OS", "HOSTNAME"], rows) diff --git a/src/flow/services/containers.py b/src/flow/services/containers.py index 9c06263..9983666 100644 --- a/src/flow/services/containers.py +++ b/src/flow/services/containers.py @@ -1,321 +1,117 @@ -"""Container lifecycle helpers for `flow dev`.""" +"""ContainerService -- manages development containers.""" from __future__ import annotations import os -import shutil from typing import Optional from flow.core.config import FlowContext from flow.core.errors import FlowError - -DEFAULT_REGISTRY = "registry.tomastm.com" -DEFAULT_TAG = "latest" -CONTAINER_HOME = "/home/dev" - - -def runtime() -> str: - for name in ("docker", "podman"): - if shutil.which(name): - return name - raise FlowError("No container runtime found (docker or podman)") - - -def container_name(name: str) -> str: - return name if name.startswith("dev-") else f"dev-{name}" - - -def parse_image_ref( - image: str, - *, - default_registry: str = DEFAULT_REGISTRY, - default_tag: str = DEFAULT_TAG, -) -> tuple[str, str, str, str]: - registry = default_registry - tag = default_tag - - if image.startswith("docker/"): - registry = "docker.io" - image = f"library/{image.split('/', 1)[1]}" - elif image.startswith("tm0/"): - registry = default_registry - image = image.split("/", 1)[1] - elif "/" in image: - prefix, remainder = image.split("/", 1) - if "." in prefix or ":" in prefix or prefix == "localhost": - registry = prefix - image = remainder - - if ":" in image.split("/")[-1]: - tag = image.rsplit(":", 1)[1] - image = image.rsplit(":", 1)[0] - - repo = image - full_ref = f"{registry}/{repo}:{tag}" - label_prefix = ( - registry.rsplit(".", 1)[0].rsplit(".", 1)[-1] if "." in registry else registry - ) - label = f"{label_prefix}/{repo.split('/')[-1]}" - - return full_ref, repo, tag, label +from flow.core import paths +from flow.domain.containers.resolution import ( + build_container_spec, + container_name, + parse_image_ref, + resolve_mounts, +) class ContainerService: - """Own all container-runtime interactions.""" - def __init__(self, ctx: FlowContext): self.ctx = ctx - self.runner = ctx.runtime.runner - def container_exists(self, rt: str, name: str) -> bool: - result = self.runner.run( - [rt, "container", "ls", "-a", "--format", "{{.Names}}"], - capture_output=True, - ) - return name in result.stdout.strip().splitlines() - - def container_running(self, rt: str, name: str) -> bool: - result = self.runner.run( - [rt, "container", "ls", "--format", "{{.Names}}"], - capture_output=True, - ) - return name in result.stdout.strip().splitlines() - - def run_create(self, args) -> None: - rt = runtime() - cname = container_name(args.name) - - if self.container_exists(rt, cname): - raise FlowError(f"Container already exists: {cname}") - - project_path = os.path.realpath(args.project) if args.project else None - if project_path and not os.path.isdir(project_path): - raise FlowError(f"Invalid project path: {project_path}") - - full_ref, _, _, _ = parse_image_ref( - args.image, + def create( + self, + image: str, + namespace: str = "default", + *, + dry_run: bool = False, + extra_env: Optional[dict[str, str]] = None, + ) -> None: + """Create and start a container.""" + image_ref = parse_image_ref( + image, default_registry=self.ctx.config.container_registry, default_tag=self.ctx.config.container_tag, ) - cmd = [ - rt, - "run", - "-d", - "--name", - cname, - "--label", - "dev=true", - "--label", - f"dev.name={args.name}", - "--label", - f"dev.image_ref={full_ref}", - "--network", - "host", - "--init", - ] + mounts = resolve_mounts( + paths.HOME, + self.ctx.config.projects_dir, + dotfiles_dir=paths.DOTFILES_DIR, + ) - if project_path: - cmd.extend(["-v", f"{project_path}:/workspace"]) - cmd.extend(["--label", f"dev.project_path={project_path}"]) + spec = build_container_spec( + namespace, image_ref, mounts, + env=extra_env, + ) - docker_sock = "/var/run/docker.sock" - if os.path.exists(docker_sock): - cmd.extend(["-v", f"{docker_sock}:{docker_sock}"]) + self.ctx.console.info(f"Creating container: {spec.name}") + self.ctx.console.info(f" Image: {spec.image.full}") + self.ctx.console.info(f" Mounts: {len(spec.mounts)}") - home = os.path.expanduser("~") - mounts = [ - (f"{home}/.ssh", f"{CONTAINER_HOME}/.ssh:ro", os.path.isdir), - (f"{home}/.npmrc", f"{CONTAINER_HOME}/.npmrc:ro", os.path.isfile), - (f"{home}/.npm", f"{CONTAINER_HOME}/.npm", os.path.isdir), - ] - for source, target, predicate in mounts: - if predicate(source): - cmd.extend(["-v", f"{source}:{target}"]) - - try: - import grp - - docker_gid = str(grp.getgrnam("docker").gr_gid) - cmd.extend(["--group-add", docker_gid]) - except (KeyError, ImportError): - pass - - cmd.extend([full_ref, "sleep", "infinity"]) - self.runner.run(cmd, capture_output=False, check=True) - self.ctx.console.success(f"Created and started container: {cname}") - - def run_exec(self, args) -> None: - rt = runtime() - cname = container_name(args.name) - - if not self.container_running(rt, cname): - raise FlowError(f"Container {cname} not running") - - if args.cmd: - exec_cmd = [rt, "exec"] - if os.isatty(0): - exec_cmd.extend(["-it"]) - exec_cmd.append(cname) - exec_cmd.extend(args.cmd) - result = self.runner.run(exec_cmd, capture_output=False) - raise SystemExit(result.returncode) - - for shell in (["zsh", "-l"], ["bash", "-l"], ["sh"]): - exec_cmd = [rt, "exec", "--detach-keys", "ctrl-q,ctrl-p", "-it", cname, *shell] - result = self.runner.run(exec_cmd, capture_output=False) - if result.returncode not in (126, 127): - raise SystemExit(result.returncode) - - raise FlowError(f"Unable to start an interactive shell in {cname}") - - def run_connect(self, args) -> None: - rt = runtime() - cname = container_name(args.name) - - if not self.container_exists(rt, cname): - raise FlowError(f"Container does not exist: {cname}") - - if not self.container_running(rt, cname): - self.runner.run([rt, "start", cname], capture_output=True) - - if not shutil.which("tmux"): - self.ctx.console.warn("tmux not found; falling back to direct exec") - args.cmd = [] - self.run_exec(args) + if dry_run: return - result = self.runner.run( - [rt, "container", "inspect", cname, "--format", "{{ .Config.Image }}"] + # Build docker run command + argv = ["docker", "run", "-d", "--name", spec.name] + + for m in spec.mounts: + argv.extend(["-v", f"{m.source}:{m.target}{':ro' if m.readonly else ''}"]) + + for k, v in spec.env.items(): + argv.extend(["-e", f"{k}={v}"]) + + argv.append(spec.image.full) + + if spec.command: + argv.extend(spec.command.split()) + + self.ctx.runtime.runner.run(argv, check=True, capture_output=False) + self.ctx.console.success(f"Container {spec.name} created.") + + def enter( + self, + name: str, + *, + shell: str = "/bin/bash", + ) -> None: + """Exec into a running container.""" + self.ctx.console.info(f"Entering container: {name}") + self.ctx.runtime.runner.run( + ["docker", "exec", "-it", name, shell], + capture_output=False, + check=True, ) - image_ref = result.stdout.strip() - _, _, _, image_label = parse_image_ref(image_ref) - check = self.runner.run(["tmux", "has-session", "-t", cname], check=False) - if check.returncode != 0: - ns = os.environ.get("DF_NAMESPACE", "") - plat = os.environ.get("DF_PLATFORM", "") - self.runner.run( - [ - "tmux", - "new-session", - "-ds", - cname, - "-e", - f"DF_IMAGE={image_label}", - "-e", - f"DF_NAMESPACE={ns}", - "-e", - f"DF_PLATFORM={plat}", - f"flow dev exec {args.name}", - ], - capture_output=True, - ) - self.runner.run( - [ - "tmux", - "set-option", - "-t", - cname, - "default-command", - f"flow dev exec {args.name}", - ], - capture_output=True, - ) - - if os.environ.get("TMUX"): - os.execvp("tmux", ["tmux", "switch-client", "-t", cname]) - os.execvp("tmux", ["tmux", "attach", "-t", cname]) - - def run_list(self, _args) -> None: - rt = runtime() - result = self.runner.run( - [ - rt, - "ps", - "-a", - "--filter", - "label=dev=true", - "--format", - '{{.Label "dev.name"}}|{{.Image}}|{{.Label "dev.project_path"}}|{{.Status}}', - ] + def stop(self, name: str) -> None: + """Stop a running container.""" + self.ctx.runtime.runner.run( + ["docker", "stop", name], check=True, ) + self.ctx.console.success(f"Container {name} stopped.") + + def remove(self, name: str) -> None: + """Remove a container.""" + self.ctx.runtime.runner.run( + ["docker", "rm", "-f", name], check=True, + ) + self.ctx.console.success(f"Container {name} removed.") + + def list(self) -> None: + """List flow-managed containers.""" + result = self.ctx.runtime.runner.run( + ["docker", "ps", "-a", "--filter", "name=flow-", "--format", + "{{.Names}}\t{{.Image}}\t{{.Status}}"], + ) + if not result.stdout.strip(): + self.ctx.console.info("No flow containers found.") + return rows = [] for line in result.stdout.strip().splitlines(): - if not line: - continue - name, image, project, status = (line.split("|") + ["", "", "", ""])[:4] - home = os.path.expanduser("~") - if project.startswith(home): - project = "~" + project[len(home) :] - rows.append([name, image, project, status]) + parts = line.split("\t") + if len(parts) >= 3: + rows.append(parts[:3]) - if not rows: - self.ctx.console.info("No development containers found.") - return - - self.ctx.console.table(["NAME", "IMAGE", "PROJECT", "STATUS"], rows) - - def run_stop(self, args) -> None: - rt = runtime() - cname = container_name(args.name) - - if not self.container_exists(rt, cname): - raise FlowError(f"Container {cname} does not exist") - - if args.kill: - self.ctx.console.info(f"Killing container {cname}...") - self.runner.run([rt, "kill", cname], capture_output=False, check=True) - else: - self.ctx.console.info(f"Stopping container {cname}...") - self.runner.run([rt, "stop", cname], capture_output=False, check=True) - - self._tmux_fallback(cname) - - def run_remove(self, args) -> None: - rt = runtime() - cname = container_name(args.name) - - if not self.container_exists(rt, cname): - raise FlowError(f"Container {cname} does not exist") - - if args.force: - self.ctx.console.info(f"Removing container {cname} (force)...") - self.runner.run([rt, "rm", "-f", cname], capture_output=False, check=True) - else: - self.ctx.console.info(f"Removing container {cname}...") - self.runner.run([rt, "rm", cname], capture_output=False, check=True) - - self._tmux_fallback(cname) - - def run_respawn(self, args) -> None: - if not shutil.which("tmux"): - raise FlowError("tmux is required for respawn but was not found") - - cname = container_name(args.name) - result = self.runner.run( - [ - "tmux", - "list-panes", - "-t", - cname, - "-s", - "-F", - "#{session_name}:#{window_index}.#{pane_index}", - ] - ) - for pane in result.stdout.strip().splitlines(): - if not pane: - continue - self.ctx.console.info(f"Respawning {pane}...") - self.runner.run(["tmux", "respawn-pane", "-t", pane], capture_output=False) - - def _tmux_fallback(self, cname: str) -> None: - if not os.environ.get("TMUX"): - return - result = self.runner.run(["tmux", "display-message", "-p", "#S"]) - if result.stdout.strip() != cname: - return - self.runner.run(["tmux", "new-session", "-ds", "default"], capture_output=True) - self.runner.run(["tmux", "switch-client", "-t", "default"], capture_output=True) + self.ctx.console.table(["NAME", "IMAGE", "STATUS"], rows) diff --git a/src/flow/services/dotfiles.py b/src/flow/services/dotfiles.py index 4cf045c..84aa40d 100644 --- a/src/flow/services/dotfiles.py +++ b/src/flow/services/dotfiles.py @@ -1,1697 +1,333 @@ -"""Dotfiles domain logic.""" +"""DotfilesService -- orchestrates dotfiles linking.""" + +from __future__ import annotations -import argparse -import json -import os -import shlex -import shutil -import subprocess -import sys -from dataclasses import dataclass -from datetime import datetime, timezone from pathlib import Path -from typing import Any, Dict, List, Optional, Set, Tuple +from typing import Any, Optional import yaml from flow.core.config import FlowContext -from flow.core.paths import DOTFILES_DIR, LINKED_STATE, MODULES_DIR -from flow.core.system import CommandRunner, FileSystem +from flow.core.errors import FlowError, PlanConflict +from flow.core import paths +from flow.domain.dotfiles.models import ( + LinkedState, + LinkTarget, + ModuleRef, + Package, +) +from flow.domain.dotfiles.modules import ( + compute_mount_path, + normalize_source, + parse_module_ref, +) +from flow.domain.dotfiles.planning import plan_link, plan_unlink +from flow.domain.dotfiles.resolution import resolve_all_targets + -RESERVED_SHARED = "_shared" -RESERVED_ROOT = "_root" MODULE_FILE = "_module.yaml" -LINK_BACKUP_DIR = LINKED_STATE.parent / "link-backups" -_RUNNER = CommandRunner() -_FS = FileSystem() +SKIP_DIRS = {".git", ".github", "__pycache__", "flow"} +SKIP_FILES = {".DS_Store", ".gitkeep"} -@dataclass -class LinkSpec: - source: Path - target: Path - package: str - is_directory_link: bool = False +class DotfilesService: + def __init__(self, ctx: FlowContext): + self.ctx = ctx + self.dotfiles_dir = paths.DOTFILES_DIR + self.modules_dir = paths.MODULES_DIR + def link( + self, + *, + profile: Optional[str] = None, + dry_run: bool = False, + skip: Optional[set[str]] = None, + ) -> None: + """Link dotfiles to home directory.""" + skip_set = skip or set() + packages = self._discover_packages(profile) -@dataclass -class ModuleSpec: - package: str - source: str - ref_type: str - ref_value: str - package_dir: Path - # Relative path (under ~) where the module repo root is mounted. - # Derived from the directory containing `_module.yaml` within the package. - target_prefix: Path = Path() + if not packages: + self.ctx.console.info("No packages found.") + return + # Resolve all targets + targets = resolve_all_targets(packages, paths.HOME, skip_set) -def register(subparsers): - p = subparsers.add_parser("dotfiles", aliases=["dot"], help="Manage dotfiles") - p.add_argument("--verbose", action="store_true", help="Show detailed output") - sub = p.add_subparsers(dest="dotfiles_command") + # Load current state + current = self._load_state() - init = sub.add_parser("init", help="Clone dotfiles repository") - init.add_argument("--repo", help="Override repository URL") - init.set_defaults(handler=run_init) + # Build plan + plan = plan_link(targets, current, self._filesystem_check) - link = sub.add_parser("link", help="Create symlinks for dotfile packages") - link.add_argument("packages", nargs="*", help="Specific packages to link (default: all)") - link.add_argument("--profile", help="Profile to use") - link.add_argument("--copy", action="store_true", help="Copy instead of symlink") - link.add_argument("--force", action="store_true", help="Overwrite existing files") - link.add_argument("--dry-run", action="store_true", help="Show what would be done") - link.set_defaults(handler=run_link) + if plan.conflicts: + self.ctx.console.warn(f"{len(plan.conflicts)} conflict(s):") + for c in plan.conflicts: + self.ctx.console.warn(f" {c}") - unlink = sub.add_parser("unlink", help="Remove dotfile symlinks") - unlink.add_argument("packages", nargs="*", help="Specific packages to unlink (default: all)") - unlink.set_defaults(handler=run_unlink) - - undo = sub.add_parser("undo", help="Undo latest dotfiles link transaction") - undo.set_defaults(handler=run_undo) - - status = sub.add_parser("status", help="Show dotfiles link status") - status.set_defaults(handler=run_status) - - sync = sub.add_parser("sync", help="Pull latest dotfiles from remote") - sync.add_argument("--relink", action="store_true", help="Run relink after pull") - sync.add_argument("--profile", help="Profile to use when relinking") - sync.set_defaults(handler=run_sync) - - modules = sub.add_parser("modules", help="Inspect and refresh external modules") - modules_sub = modules.add_subparsers(dest="dotfiles_modules_command") - - modules_list = modules_sub.add_parser("list", help="List detected module packages") - modules_list.add_argument("packages", nargs="*", help="Filter by package name") - modules_list.add_argument("--profile", help="Limit to shared + one profile") - modules_list.set_defaults(handler=run_modules_list) - - modules_sync = modules_sub.add_parser("sync", help="Refresh module checkouts") - modules_sync.add_argument("packages", nargs="*", help="Filter by package name") - modules_sync.add_argument("--profile", help="Limit to shared + one profile") - modules_sync.set_defaults(handler=run_modules_sync) - - modules.set_defaults(handler=run_modules_list) - - repo = sub.add_parser("repo", help="Manage dotfiles repository") - repo_sub = repo.add_subparsers(dest="dotfiles_repo_command") - - repo_status = repo_sub.add_parser("status", help="Show git status for dotfiles repo") - repo_status.set_defaults(handler=run_repo_status) - - repo_pull = repo_sub.add_parser("pull", help="Pull latest changes") - repo_pull.add_argument( - "--rebase", - dest="rebase", - action="store_true", - help="Use rebase strategy (default)", - ) - repo_pull.add_argument( - "--no-rebase", - dest="rebase", - action="store_false", - help="Disable rebase strategy", - ) - repo_pull.add_argument("--relink", action="store_true", help="Run relink after pull") - repo_pull.add_argument("--profile", help="Profile to use when relinking") - repo_pull.set_defaults(rebase=True) - repo_pull.set_defaults(handler=run_repo_pull) - - repo_push = repo_sub.add_parser("push", help="Push local changes") - repo_push.set_defaults(handler=run_repo_push) - - repo.set_defaults(handler=lambda ctx, args: repo.print_help()) - - relink = sub.add_parser("relink", help="Refresh symlinks after changes") - relink.add_argument("packages", nargs="*", help="Specific packages to relink (default: all)") - relink.add_argument("--profile", help="Profile to use") - relink.set_defaults(handler=run_relink) - - clean = sub.add_parser("clean", help="Remove broken symlinks") - clean.add_argument("--dry-run", action="store_true", help="Show what would be done") - clean.set_defaults(handler=run_clean) - - edit = sub.add_parser("edit", help="Edit package or path with auto-commit") - edit.add_argument("target", help="Package name or path inside dotfiles repo") - edit.add_argument("--no-commit", action="store_true", help="Skip auto-commit") - edit.set_defaults(handler=run_edit) - - p.set_defaults(handler=lambda ctx, args: p.print_help()) - - -def _flow_config_dir(dotfiles_dir: Optional[Path] = None) -> Path: - return dotfiles_dir or DOTFILES_DIR - - -def _insert_spec( - desired: Dict[Path, LinkSpec], - *, - target: Path, - source: Path, - package: str, -) -> None: - existing = desired.get(target) - if existing is not None: - raise RuntimeError( - "Conflicting dotfile targets are not allowed: " - f"{target} from {existing.package} and {package}" - ) - - desired[target] = LinkSpec(source=source, target=target, package=package) - - -def _is_path_like_target(target: str) -> bool: - raw = Path(target) - return "/" in target or target.startswith(".") or raw.suffix != "" - - -def _module_cache_dir(spec: ModuleSpec) -> Path: - key = spec.package.replace("/", "--") - return MODULES_DIR / key - - -def _normalize_module_source(source: str, *, package_dir: Optional[Path] = None) -> str: - if source.startswith("github:"): - repo = source.split(":", 1)[1] - return f"https://github.com/{repo}.git" - - if "://" in source or source.startswith("git@"): - return source - - raw = Path(source).expanduser() - if raw.is_absolute(): - return str(raw) - - if package_dir is None: - return source - - return str((package_dir / raw).resolve()) - - -def _load_module_spec(package_dir: Path, package: str, module_file: Path) -> ModuleSpec: - if not module_file.exists(): - raise RuntimeError(f"Module file not found: {module_file}") - - try: - with open(module_file, "r", encoding="utf-8") as handle: - raw = yaml.safe_load(handle) or {} - except yaml.YAMLError as e: - raise RuntimeError(f"Invalid YAML in {module_file}: {e}") from e - - if not isinstance(raw, dict): - raise RuntimeError(f"{module_file} must contain a mapping") - - source = raw.get("source") - if not isinstance(source, str) or not source: - raise RuntimeError(f"{module_file} must define non-empty 'source'") - - ref = raw.get("ref") - if not isinstance(ref, dict): - raise RuntimeError(f"{module_file} must define 'ref' mapping") - - choices = [key for key in ("branch", "tag", "commit") if isinstance(ref.get(key), str) and ref.get(key)] - if len(choices) != 1: - raise RuntimeError(f"{module_file} 'ref' must include exactly one of: branch, tag, commit") - - ref_type = choices[0] - ref_value = str(ref[ref_type]) - - target_prefix = Path() - try: - target_prefix = module_file.parent.relative_to(package_dir) - except ValueError: - raise RuntimeError(f"{MODULE_FILE} must be inside package dir {package_dir}: {module_file}") from None - - if target_prefix == Path("."): - target_prefix = Path() - return ModuleSpec( - package=package, - # Relative sources are resolved relative to the `_module.yaml` location. - source=_normalize_module_source(source, package_dir=module_file.parent), - ref_type=ref_type, - ref_value=ref_value, - package_dir=package_dir, - target_prefix=target_prefix, - ) - - -def _run_git(dir_path: Path, *args: str, capture: bool = True) -> subprocess.CompletedProcess: - return _RUNNER.run( - ["git", "-C", str(dir_path)] + list(args), - capture_output=capture, - check=False, - ) - - -def _pull_requires_ack(stdout: str, stderr: str) -> bool: - text = f"{stdout}\n{stderr}".strip() - if not text: - return False - - lowered = text.lower() - if "already up to date" in lowered or "already up-to-date" in lowered: - return False - - return True - - -def _pull_repo_before_edit(ctx: FlowContext, repo_dir: Path, *, verbose: bool = False) -> None: - ctx.console.info(f"Pulling latest changes in {repo_dir}...") - result = _run_git(repo_dir, "pull", "--rebase", capture=True) - if result.returncode != 0: - ctx.console.warn(f"Git pull failed: {result.stderr.strip()}") - return - - if verbose: - output = result.stdout.strip() - if output: - print(output) - - if _pull_requires_ack(result.stdout, result.stderr): - ctx.console.info("Repository updated before edit. Review incoming changes first.") - try: - input("Press Enter to continue editing... ") - except (EOFError, KeyboardInterrupt): - print() - - -def _refresh_module(spec: ModuleSpec) -> None: - module_dir = _module_cache_dir(spec) - _FS.ensure_dir(module_dir.parent) - - if not module_dir.exists(): - clone = _RUNNER.run( - ["git", "clone", "--recurse-submodules", spec.source, str(module_dir)], - capture_output=True, - check=False, - ) - if clone.returncode != 0: - raise RuntimeError( - f"Failed to clone module {spec.package} from {spec.source}: {clone.stderr.strip()}" + if not plan.operations: + self.ctx.console.info( + f"Nothing to do. {plan.summary.unchanged} link(s) already correct." ) + return - if spec.ref_type == "branch": - fetch = _run_git(module_dir, "fetch", "origin", spec.ref_value) - if fetch.returncode != 0: - raise RuntimeError( - f"Failed to fetch module {spec.package} branch {spec.ref_value}: {fetch.stderr.strip()}" - ) + self.ctx.console.print_plan(plan.operations, verb="link") - checkout = _run_git(module_dir, "checkout", spec.ref_value) - if checkout.returncode != 0: - create = _run_git(module_dir, "checkout", "-B", spec.ref_value, f"origin/{spec.ref_value}") - if create.returncode != 0: - raise RuntimeError( - f"Failed to checkout branch {spec.ref_value} for module {spec.package}: " - f"{create.stderr.strip()}" + if dry_run: + return + + # Execute + new_state = LinkedState(links=dict(current.links)) + for op in plan.operations: + if op.type == "create_link": + assert op.source is not None + self.ctx.runtime.fs.create_symlink( + op.source, op.target, + sudo=op.needs_sudo, + runner=self.ctx.runtime.runner if op.needs_sudo else None, ) - - pull = _run_git(module_dir, "pull", "--ff-only", "origin", spec.ref_value) - if pull.returncode != 0: - raise RuntimeError( - f"Failed to update module {spec.package} branch {spec.ref_value}: {pull.stderr.strip()}" - ) - - elif spec.ref_type == "tag": - fetch = _run_git(module_dir, "fetch", "--tags", "origin") - if fetch.returncode != 0: - raise RuntimeError( - f"Failed to fetch tags for module {spec.package}: {fetch.stderr.strip()}" - ) - - checkout = _run_git(module_dir, "checkout", f"tags/{spec.ref_value}") - if checkout.returncode != 0: - raise RuntimeError( - f"Failed to checkout tag {spec.ref_value} for module {spec.package}: " - f"{checkout.stderr.strip()}" - ) - - else: - fetch = _run_git(module_dir, "fetch", "origin") - if fetch.returncode != 0: - raise RuntimeError( - f"Failed to fetch module {spec.package}: {fetch.stderr.strip()}" - ) - - checkout = _run_git(module_dir, "checkout", spec.ref_value) - if checkout.returncode != 0: - raise RuntimeError( - f"Failed to checkout commit {spec.ref_value} for module {spec.package}: " - f"{checkout.stderr.strip()}" - ) - - update = _run_git(module_dir, "submodule", "update", "--init", "--recursive") - if update.returncode != 0: - raise RuntimeError( - f"Failed to update nested submodules for module {spec.package}: {update.stderr.strip()}" - ) - - -def _iter_package_dirs( - dotfiles_dir: Path, - *, - profile: Optional[str] = None, - package_filter: Optional[Set[str]] = None, -) -> List[tuple[str, Path]]: - out: List[tuple[str, Path]] = [] - flow_dir = _flow_config_dir(dotfiles_dir) - - shared = flow_dir / RESERVED_SHARED - if shared.is_dir(): - for pkg_dir in sorted(shared.iterdir()): - if pkg_dir.is_dir() and not pkg_dir.name.startswith("."): - if package_filter and pkg_dir.name not in package_filter: - continue - out.append((f"{RESERVED_SHARED}/{pkg_dir.name}", pkg_dir)) - - profiles = [profile] if profile else _list_profiles(flow_dir) - for profile_name in profiles: - profile_dir = flow_dir / profile_name - if not profile_dir.is_dir(): - continue - for pkg_dir in sorted(profile_dir.iterdir()): - if pkg_dir.is_dir() and not pkg_dir.name.startswith("."): - if package_filter and pkg_dir.name not in package_filter: - continue - out.append((f"{profile_name}/{pkg_dir.name}", pkg_dir)) - - return out - - -def _find_module_files(package_dir: Path) -> List[Path]: - found: List[Path] = [] - for root, dirs, files in os.walk(package_dir): - # Avoid picking up nested git metadata. - dirs[:] = [entry for entry in dirs if entry != ".git"] - if MODULE_FILE in files: - found.append(Path(root) / MODULE_FILE) - return sorted(found, key=lambda item: str(item)) - - -def _find_package_module_file(package: str, package_dir: Path) -> Optional[Path]: - module_files = _find_module_files(package_dir) - if not module_files: - return None - if len(module_files) > 1: - rels = ", ".join(str(path.relative_to(package_dir)) for path in module_files) - raise RuntimeError(f"Multiple {MODULE_FILE} files found for package {package}: {rels}") - return module_files[0] - - -def _collect_module_specs( - dotfiles_dir: Path, - *, - profile: Optional[str] = None, - package_filter: Optional[Set[str]] = None, -) -> List[ModuleSpec]: - specs: List[ModuleSpec] = [] - for package, package_dir in _iter_package_dirs( - dotfiles_dir, - profile=profile, - package_filter=package_filter, - ): - module_file = _find_package_module_file(package, package_dir) - if module_file is None: - continue - specs.append(_load_module_spec(package_dir, package, module_file)) - return specs - - -def _sync_modules( - ctx: FlowContext, - *, - verbose: bool = False, - profile: Optional[str] = None, - package_filter: Optional[Set[str]] = None, -) -> None: - _ensure_flow_dir(ctx) - - for spec in _collect_module_specs( - DOTFILES_DIR, - profile=profile, - package_filter=package_filter, - ): - if verbose: - ctx.console.info( - f"Updating module {spec.package} from {spec.source} ({spec.ref_type}={spec.ref_value})" - ) - _refresh_module(spec) - - -def _module_ref_label(spec: ModuleSpec) -> str: - return f"{spec.ref_type}:{spec.ref_value}" - - -def _module_head_short(module_dir: Path) -> str: - result = _run_git(module_dir, "rev-parse", "--short", "HEAD") - if result.returncode != 0: - return "unknown" - return result.stdout.strip() or "unknown" - - -def _resolved_package_source( - ctx: FlowContext, - package: str, - package_dir: Path, - *, - verbose: bool = False, -) -> Path: - """Return the directory to treat as editable source for a package. - - If the package contains a `_module.yaml` anywhere in its tree, the module checkout - directory is returned (and must already exist from `flow dotfiles sync`). - Otherwise the local package directory is returned. - """ - - module_file = _find_package_module_file(package, package_dir) - if module_file is None: - return package_dir - - if verbose: - rel = module_file.relative_to(package_dir) - mount = rel.parent - if mount == Path("."): - mount = Path() - ctx.console.info( - f"Package {package} uses {rel}; mounting module content at {mount or '.'}" - ) - - spec = _load_module_spec(package_dir, package, module_file) - module_dir = _module_cache_dir(spec) - if not module_dir.exists(): - raise RuntimeError( - f"Module source missing for package '{package}'. Run 'flow dotfiles sync' first." - ) - - return module_dir - - -def _load_state() -> dict: - if LINKED_STATE.exists(): - with open(LINKED_STATE, "r", encoding="utf-8") as handle: - return json.load(handle) - return {"version": 2, "links": {}} - - -def _save_state(state: dict) -> None: - _FS.write_json(LINKED_STATE, state) - - -def _parse_link_specs(links: Any) -> Dict[Path, LinkSpec]: - if not isinstance(links, dict): - raise RuntimeError("Unsupported linked state format. Remove linked.json and relink dotfiles.") - - resolved: Dict[Path, LinkSpec] = {} - for package, pkg_links in links.items(): - if not isinstance(pkg_links, dict): - raise RuntimeError("Unsupported linked state format. Remove linked.json and relink dotfiles.") - - for target_str, link_info in pkg_links.items(): - if not isinstance(link_info, dict) or "source" not in link_info: - raise RuntimeError( - "Unsupported linked state format. Remove linked.json and relink dotfiles." + # Find the matching LinkTarget + lt = next(t for t in targets if t.target == op.target) + new_state.links[op.target] = lt + elif op.type == "remove_link": + self.ctx.runtime.fs.remove_file( + op.target, + sudo=op.needs_sudo, + runner=self.ctx.runtime.runner if op.needs_sudo else None, ) + new_state.links.pop(op.target, None) - target = Path(target_str) - resolved[target] = LinkSpec( - source=Path(link_info["source"]), - target=target, - package=str(package), - is_directory_link=bool(link_info.get("is_directory_link", False)), + self._save_state(new_state) + self.ctx.console.success( + f"Linked: {plan.summary.added} added, " + f"{plan.summary.removed} removed, " + f"{plan.summary.unchanged} unchanged" + ) + + def unlink( + self, + *, + packages: Optional[list[str]] = None, + dry_run: bool = False, + ) -> None: + """Remove managed symlinks.""" + current = self._load_state() + if not current.links: + self.ctx.console.info("No managed links found.") + return + + plan = plan_unlink(current, packages) + + if not plan.operations: + self.ctx.console.info("Nothing to unlink.") + return + + self.ctx.console.print_plan(plan.operations, verb="unlink") + + if dry_run: + return + + new_state = LinkedState(links=dict(current.links)) + for op in plan.operations: + self.ctx.runtime.fs.remove_file( + op.target, + sudo=op.needs_sudo, + runner=self.ctx.runtime.runner if op.needs_sudo else None, + missing_ok=True, ) - - return resolved - - -def _serialize_link_specs(specs: Dict[Path, LinkSpec]) -> Dict[str, Dict[str, dict]]: - grouped: Dict[str, Dict[str, dict]] = {} - for spec in sorted(specs.values(), key=lambda s: str(s.target)): - grouped.setdefault(spec.package, {})[str(spec.target)] = { - "source": str(spec.source), - "is_directory_link": spec.is_directory_link, - } - return grouped - - -def _cleanup_link_transaction_files(transaction: Optional[dict]) -> None: - if not isinstance(transaction, dict): - return - - backup_dir = transaction.get("backup_dir") - if isinstance(backup_dir, str) and backup_dir: - _FS.remove_tree(Path(backup_dir)) - - -def _load_last_link_transaction() -> Optional[dict]: - state = _load_state() - transaction = state.get("last_transaction") - if not isinstance(transaction, dict): - return None - return transaction - - -def _save_last_link_transaction(transaction: dict) -> None: - state = _load_state() - previous = state.get("last_transaction") - if isinstance(previous, dict): - _cleanup_link_transaction_files(previous) - state["last_transaction"] = transaction - _save_state(state) - - -def _clear_last_link_transaction(*, remove_backups: bool = True) -> None: - state = _load_state() - transaction = state.get("last_transaction") - if remove_backups and isinstance(transaction, dict): - _cleanup_link_transaction_files(transaction) - state.pop("last_transaction", None) - _save_state(state) - - -def _start_link_transaction(previous_links: Dict[Path, LinkSpec]) -> dict: - now = datetime.now(timezone.utc) - tx_id = now.strftime("%Y%m%dT%H%M%S%fZ") - backup_dir = LINK_BACKUP_DIR / tx_id - return { - "id": tx_id, - "created_at": now.isoformat(), - "backup_dir": str(backup_dir), - "previous_links": _serialize_link_specs(previous_links), - "targets": [], - } - - -def _snapshot_target( - target: Path, - *, - use_sudo: bool, - backup_dir: Path, - index: int, -) -> dict: - if target.is_symlink(): - return {"kind": "symlink", "source": os.readlink(target)} - - if target.exists(): - if target.is_dir(): - raise RuntimeError(f"Cannot snapshot directory target: {target}") - - _FS.ensure_dir(backup_dir) - backup_path = backup_dir / f"{index:06d}" - if use_sudo: - _FS.copy_file(target, backup_path, sudo=True, runner=_RUNNER) - else: - _FS.copy_file(target, backup_path) - return {"kind": "file", "backup": str(backup_path)} - - return {"kind": "missing"} - - -def _restore_target_snapshot(target: Path, snapshot: dict) -> None: - if not isinstance(snapshot, dict): - raise RuntimeError(f"Unsupported transaction snapshot for {target}") - - use_sudo = not _is_in_home(target, Path.home()) - - if target.exists() or target.is_symlink(): - if target.is_dir() and not target.is_symlink(): - raise RuntimeError(f"Cannot restore {target}; a directory now exists at that path") - _remove_target(target, use_sudo=use_sudo, dry_run=False) - - kind = snapshot.get("kind") - if kind == "missing": - return - - if kind == "symlink": - source = snapshot.get("source") - if not isinstance(source, str): - raise RuntimeError(f"Unsupported transaction snapshot for {target}") - if use_sudo: - _FS.create_symlink(Path(source), target, sudo=True, runner=_RUNNER) - else: - _FS.create_symlink(Path(source), target) - return - - if kind == "file": - backup = snapshot.get("backup") - if not isinstance(backup, str): - raise RuntimeError(f"Unsupported transaction snapshot for {target}") - backup_path = Path(backup) - if not backup_path.exists(): - raise RuntimeError(f"Backup missing for {target}: {backup_path}") - if use_sudo: - _FS.copy_file(backup_path, target, sudo=True, runner=_RUNNER) - else: - _FS.copy_file(backup_path, target) - return - - raise RuntimeError(f"Unsupported transaction snapshot kind for {target}: {kind}") - - -def _load_link_specs_from_state() -> Dict[Path, LinkSpec]: - state = _load_state() - links = state.get("links", {}) - return _parse_link_specs(links) - - -def _save_link_specs_to_state(specs: Dict[Path, LinkSpec]) -> None: - state = _load_state() - state["version"] = 2 - state["links"] = _serialize_link_specs(specs) - _save_state(state) - - -def _list_profiles(flow_dir: Path) -> List[str]: - if not flow_dir.exists() or not flow_dir.is_dir(): - return [] - - profiles: List[str] = [] - for child in flow_dir.iterdir(): - if not child.is_dir(): - continue - if child.name.startswith("."): - continue - if child.name.startswith("_"): - continue - profiles.append(child.name) - return sorted(profiles) - - -def _walk_package(source_dir: Path): - for root, dirs, files in os.walk(source_dir): - # Never traverse git metadata from module-backed package sources. - dirs[:] = [entry for entry in dirs if entry != ".git"] - for fname in files: - if fname == ".git": - continue - src = Path(root) / fname - rel = src.relative_to(source_dir) - yield src, rel - - -def _profile_skip_set(ctx: FlowContext, profile: Optional[str]) -> Set[str]: - if not profile: - return set() - - profiles = ctx.manifest.get("profiles", {}) - if not isinstance(profiles, dict): - return set() - - profile_cfg = profiles.get(profile, {}) - if not isinstance(profile_cfg, dict): - return set() - - configs = profile_cfg.get("configs", {}) - if not isinstance(configs, dict): - return set() - - skip = configs.get("skip", []) - if not isinstance(skip, list): - return set() - - return {str(item) for item in skip if item} - - -def _discover_packages(dotfiles_dir: Path, profile: Optional[str] = None) -> dict: - flow_dir = _flow_config_dir(dotfiles_dir) - packages = {} - - shared = flow_dir / RESERVED_SHARED - if shared.is_dir(): - for pkg in sorted(shared.iterdir()): - if pkg.is_dir() and not pkg.name.startswith("."): - packages[pkg.name] = pkg - - if profile: - profile_dir = flow_dir / profile - if profile_dir.is_dir(): - for pkg in sorted(profile_dir.iterdir()): - if pkg.is_dir() and not pkg.name.startswith("."): - packages[pkg.name] = pkg - - return packages - - -def _find_package_dir(package_name: str, dotfiles_dir: Optional[Path] = None) -> Optional[Path]: - flow_dir = _flow_config_dir(dotfiles_dir) - - shared_dir = flow_dir / RESERVED_SHARED / package_name - if shared_dir.exists(): - return shared_dir - - for profile in _list_profiles(flow_dir): - profile_pkg = flow_dir / profile / package_name - if profile_pkg.exists(): - return profile_pkg - - return None - - -def _resolve_edit_target(target: str, dotfiles_dir: Optional[Path] = None) -> Optional[Path]: - dotfiles_dir = dotfiles_dir or DOTFILES_DIR - base_dir = dotfiles_dir.resolve() - raw = Path(target).expanduser() - if raw.is_absolute(): - if not _is_under(raw, base_dir): - return None - return raw - - is_path_like = _is_path_like_target(target) - if is_path_like: - candidate = dotfiles_dir / raw - if not _is_under(candidate, base_dir): - return None - if candidate.exists() or candidate.parent.exists(): - return candidate - return None - - package_dir = _find_package_dir(target, dotfiles_dir=dotfiles_dir) - if package_dir is not None: - return package_dir - - candidate = dotfiles_dir / raw - if candidate.exists(): - return candidate - - return None - - -def _ensure_dotfiles_dir(ctx: FlowContext): - if not DOTFILES_DIR.exists(): - ctx.console.error(f"Dotfiles not found at {DOTFILES_DIR}. Run 'flow dotfiles init' first.") - sys.exit(1) - - -def _ensure_flow_dir(ctx: FlowContext): - _ensure_dotfiles_dir(ctx) - flow_dir = _flow_config_dir() - if not flow_dir.exists() or not flow_dir.is_dir(): - ctx.console.error(f"Dotfiles repository not found at {flow_dir}") - sys.exit(1) - - -def _run_dotfiles_git(*cmd, capture: bool = True) -> subprocess.CompletedProcess: - return _RUNNER.run( - ["git", "-C", str(DOTFILES_DIR)] + list(cmd), - capture_output=capture, - ) - - -def _pull_dotfiles(ctx: FlowContext, *, rebase: bool = True) -> None: - pull_cmd = ["pull"] - if rebase: - pull_cmd.append("--rebase") - - strategy = "with rebase" if rebase else "without rebase" - ctx.console.info(f"Pulling latest dotfiles ({strategy})...") - result = _run_dotfiles_git(*pull_cmd, capture=True) - - if result.returncode != 0: - raise RuntimeError(f"Git pull failed: {result.stderr.strip()}") - - output = result.stdout.strip() - if output: - print(output) - - ctx.console.success("Dotfiles synced.") - - -def _resolve_profile(ctx: FlowContext, requested: Optional[str]) -> Optional[str]: - flow_dir = _flow_config_dir() - profiles = _list_profiles(flow_dir) - - if requested: - if requested not in profiles: - raise RuntimeError(f"Profile not found: {requested}") - return requested - - if len(profiles) == 1: - return profiles[0] - - if len(profiles) > 1: - raise RuntimeError(f"Multiple profiles available. Use --profile: {', '.join(profiles)}") - - return None - - -def _is_in_home(path: Path, home: Path) -> bool: - try: - path.relative_to(home) - return True - except ValueError: - return False - - -def _is_under(path: Path, parent: Path) -> bool: - try: - path.resolve().relative_to(parent.resolve()) - return True - except ValueError: - return False - - -def _run_sudo(cmd: List[str], *, dry_run: bool = False) -> None: - if dry_run: - print(" " + " ".join(shlex.quote(part) for part in (["sudo"] + cmd))) - return - if shutil.which("sudo") is None: - raise RuntimeError("sudo is required for root-targeted dotfiles, but it was not found in PATH") - _RUNNER.run(["sudo"] + cmd, capture_output=False, check=True) - - -def _remove_target(path: Path, *, use_sudo: bool, dry_run: bool) -> None: - if not (path.exists() or path.is_symlink()): - return - - if path.is_dir() and not path.is_symlink(): - raise RuntimeError(f"Cannot overwrite directory: {path}") - - if use_sudo: - _run_sudo(["rm", "-f", str(path)], dry_run=dry_run) - return - - if dry_run: - print(f" REMOVE: {path}") - return - _FS.remove_file(path) - - -def _same_symlink(target: Path, source: Path) -> bool: - return _FS.same_symlink(target, source) - - -def _rel_is_under(path: Path, parent: Path) -> bool: - try: - path.relative_to(parent) - return True - except ValueError: - return False - - -def _add_package_home_specs( - ctx: FlowContext, - desired: Dict[Path, LinkSpec], - *, - package: str, - package_dir: Path, - home: Path, - skip: Set[str], - verbose: bool = False, -) -> None: - module_file = _find_package_module_file(package, package_dir) - module_dir: Optional[Path] = None - module_prefix = Path() - module_file_rel: Optional[Path] = None - - if module_file is not None: - module_file_rel = module_file.relative_to(package_dir) - module_prefix = module_file_rel.parent - if module_prefix == Path("."): - module_prefix = Path() - - spec = _load_module_spec(package_dir, package, module_file) - module_dir = _module_cache_dir(spec) - if not module_dir.exists(): - raise RuntimeError( - f"Module source missing for package '{package}'. Run 'flow dotfiles sync' first." - ) - - if verbose: - ctx.console.info( - f"Package {package} uses {module_file_rel}; linking module content under {module_prefix or '.'}" - ) - - # Link local package files, except module mounts and the marker file itself. - for src, rel in _walk_package(package_dir): - if module_file_rel is not None and (rel == module_file_rel or _rel_is_under(rel, module_prefix)): - continue - - if rel.parts and rel.parts[0] == RESERVED_ROOT: - if RESERVED_ROOT in skip: - continue - if len(rel.parts) < 2: - continue - target = Path("/") / Path(*rel.parts[1:]) - else: - target = home / rel - - _insert_spec( - desired, - target=target, - source=src, - package=package, - ) - - if module_dir is None: - return - - # Link module files into the directory containing `_module.yaml`. - for src, rel in _walk_package(module_dir): - mounted = module_prefix / rel - if mounted.parts and mounted.parts[0] == RESERVED_ROOT: - if RESERVED_ROOT in skip: - continue - if len(mounted.parts) < 2: - continue - target = Path("/") / Path(*mounted.parts[1:]) - else: - target = home / mounted - - _insert_spec( - desired, - target=target, - source=src, - package=package, - ) - - -def _collect_home_specs( - ctx: FlowContext, - flow_dir: Path, - home: Path, - profile: Optional[str], - skip: Set[str], - package_filter: Optional[Set[str]], - *, - verbose: bool = False, -) -> Dict[Path, LinkSpec]: - desired: Dict[Path, LinkSpec] = {} - - if RESERVED_SHARED not in skip: - shared_dir = flow_dir / RESERVED_SHARED - if shared_dir.is_dir(): - for pkg_dir in sorted(shared_dir.iterdir()): - if not pkg_dir.is_dir() or pkg_dir.name.startswith("."): - continue - if package_filter and pkg_dir.name not in package_filter: - continue - if pkg_dir.name in skip: - continue - - package_name = f"{RESERVED_SHARED}/{pkg_dir.name}" - _add_package_home_specs( - ctx, - desired, - package=package_name, - package_dir=pkg_dir, - home=home, - skip=skip, - verbose=verbose, - ) - - if profile and "_profile" not in skip: - profile_dir = flow_dir / profile - if profile_dir.is_dir(): - for pkg_dir in sorted(profile_dir.iterdir()): - if not pkg_dir.is_dir() or pkg_dir.name.startswith("."): - continue - if package_filter and pkg_dir.name not in package_filter: - continue - if pkg_dir.name in skip: - continue - - package_name = f"{profile}/{pkg_dir.name}" - _add_package_home_specs( - ctx, - desired, - package=package_name, - package_dir=pkg_dir, - home=home, - skip=skip, - verbose=verbose, - ) - - return desired - - -def _validate_conflicts( - desired: Dict[Path, LinkSpec], - current: Dict[Path, LinkSpec], -) -> tuple[List[str], List[str]]: - force_required: List[str] = [] - fatal: List[str] = [] - - # Validate removals for targets currently tracked in state. - # If a managed path was changed on disk (regular file or different symlink), - # require --force before deleting it. - for target, spec in current.items(): - if target in desired: - continue - if not (target.exists() or target.is_symlink()): - continue - if _same_symlink(target, spec.source): - continue - if target.is_dir() and not target.is_symlink(): - fatal.append(f"Conflict: {target} is a directory and cannot be overwritten") - continue - force_required.append(f"Conflict: {target} differs from managed link and would be removed") - - for target, spec in desired.items(): - if not (target.exists() or target.is_symlink()): - continue - - if _same_symlink(target, spec.source): - continue - - if target in current: - current_spec = current[target] - if _same_symlink(target, current_spec.source): - # Existing managed link can be replaced by desired link. - continue - if target.is_dir() and not target.is_symlink(): - fatal.append(f"Conflict: {target} is a directory and cannot be overwritten") - continue - force_required.append(f"Conflict: {target} differs from managed link and would be replaced") - continue - - if target.is_dir() and not target.is_symlink(): - fatal.append(f"Conflict: {target} is a directory and cannot be overwritten") - continue - - force_required.append(f"Conflict: {target} already exists and is not managed by flow") - - return force_required, fatal - - -def _apply_link_spec(spec: LinkSpec, *, copy: bool, dry_run: bool) -> bool: - use_sudo = not _is_in_home(spec.target, Path.home()) - - if copy and use_sudo: - print(f" SKIP COPY (root target): {spec.target}") - return False - - if use_sudo: - _run_sudo(["mkdir", "-p", str(spec.target.parent)], dry_run=dry_run) - _run_sudo(["ln", "-sfn", str(spec.source), str(spec.target)], dry_run=dry_run) - return True - - if dry_run: - if copy: - print(f" COPY: {spec.source} -> {spec.target}") - else: - print(f" LINK: {spec.target} -> {spec.source}") - return True - - _FS.ensure_dir(spec.target.parent) - if copy: - _FS.copy_file(spec.source, spec.target) - return True - _FS.create_symlink(spec.source, spec.target) - return True - - -def _sync_to_desired( - ctx: FlowContext, - desired: Dict[Path, LinkSpec], - *, - force: bool, - dry_run: bool, - copy: bool, -) -> None: - current = _load_link_specs_from_state() - previous = dict(current) - force_required, fatal = _validate_conflicts(desired, current) - - if fatal: - for conflict in fatal: - ctx.console.error(conflict) - raise RuntimeError("One or more targets are existing directories and cannot be overwritten") - - if force_required and not force: - for conflict in force_required: - ctx.console.error(conflict) - raise RuntimeError("Use --force to overwrite existing files") - - transaction: Optional[dict] = None - snapshots: Dict[Path, dict] = {} - if not dry_run: - transaction = _start_link_transaction(previous) - backup_dir = Path(transaction["backup_dir"]) - - def snapshot_before_change(target: Path) -> None: - if target in snapshots: - return - use_sudo = not _is_in_home(target, Path.home()) - snapshots[target] = _snapshot_target( - target, - use_sudo=use_sudo, - backup_dir=backup_dir, - index=len(snapshots) + 1, - ) - - try: - for target in sorted(current.keys(), key=str): - if target in desired: - continue - if not dry_run and transaction is not None and (target.exists() or target.is_symlink()): - snapshot_before_change(target) - use_sudo = not _is_in_home(target, Path.home()) - _remove_target(target, use_sudo=use_sudo, dry_run=dry_run) - del current[target] - - for target in sorted(desired.keys(), key=str): - spec = desired[target] - - if _same_symlink(target, spec.source): - current[target] = spec - continue - - if not dry_run and transaction is not None: - snapshot_before_change(target) - - exists = target.exists() or target.is_symlink() - if exists: - use_sudo = not _is_in_home(target, Path.home()) - _remove_target(target, use_sudo=use_sudo, dry_run=dry_run) - - applied = _apply_link_spec(spec, copy=copy, dry_run=dry_run) - if applied: - current[target] = spec - except Exception: - if not dry_run and transaction is not None: - transaction["targets"] = [ - {"target": str(target), "before": snapshots[target]} - for target in sorted(snapshots.keys(), key=str) - ] - transaction["incomplete"] = True - try: - _save_link_specs_to_state(current) - _save_last_link_transaction(transaction) - except Exception: - pass - raise - - if not dry_run: - _save_link_specs_to_state(current) - if transaction is not None: - transaction["targets"] = [ - {"target": str(target), "before": snapshots[target]} - for target in sorted(snapshots.keys(), key=str) - ] - transaction["incomplete"] = False - _save_last_link_transaction(transaction) - - -def _desired_links_for_profile( - ctx: FlowContext, - profile: Optional[str], - package_filter: Optional[Set[str]], - *, - verbose: bool = False, -) -> Dict[Path, LinkSpec]: - flow_dir = _flow_config_dir() - home = Path.home() - - skip = _profile_skip_set(ctx, profile) - return _collect_home_specs( - ctx, - flow_dir, - home, - profile, - skip, - package_filter, - verbose=verbose, - ) - - -def run_init(ctx: FlowContext, args): - repo_url = args.repo or ctx.config.dotfiles_url - if not repo_url: - ctx.console.error("No dotfiles repository URL. Set it in YAML config or pass --repo.") - sys.exit(1) - - if DOTFILES_DIR.exists(): - ctx.console.warn(f"Dotfiles directory already exists: {DOTFILES_DIR}") - return - - _FS.ensure_dir(DOTFILES_DIR.parent) - branch = ctx.config.dotfiles_branch - cmd = ["git", "clone", "-b", branch, "--recurse-submodules", repo_url, str(DOTFILES_DIR)] - ctx.console.info(f"Cloning {repo_url} (branch: {branch})...") - _RUNNER.run(cmd, capture_output=False, check=True) - - try: - _sync_modules(ctx, verbose=bool(getattr(args, "verbose", False))) - except RuntimeError as e: - ctx.console.error(str(e)) - sys.exit(1) - - ctx.console.success(f"Dotfiles cloned to {DOTFILES_DIR}") - - -def run_link(ctx: FlowContext, args): - _ensure_flow_dir(ctx) - - try: - profile = _resolve_profile(ctx, args.profile) - except RuntimeError as e: - ctx.console.error(str(e)) - sys.exit(1) - - package_filter = set(args.packages) if args.packages else None - - try: - desired = _desired_links_for_profile( - ctx, - profile, - package_filter, - verbose=bool(getattr(args, "verbose", False)), - ) - except RuntimeError as e: - ctx.console.error(str(e)) - sys.exit(1) - - if not desired: - ctx.console.warn("No link targets found for selected profile/filters") - return - - try: - _sync_to_desired( - ctx, - desired, - force=args.force, - dry_run=args.dry_run, - copy=args.copy, - ) - except RuntimeError as e: - ctx.console.error(str(e)) - sys.exit(1) - - if args.dry_run: - return - - ctx.console.success(f"Linked {len(desired)} item(s)") - - -def _package_match(package_id: str, filters: Set[str]) -> bool: - if package_id in filters: - return True - - # Allow users to pass just package basename (e.g. zsh) - base = package_id.split("/", 1)[-1] - return base in filters - - -def run_unlink(ctx: FlowContext, args): - try: - current = _load_link_specs_from_state() - except RuntimeError as e: - ctx.console.error(str(e)) - sys.exit(1) - - if not current: - ctx.console.info("No linked dotfiles found.") - return - - filters = set(args.packages) if args.packages else None - removed = 0 - - for target in sorted(list(current.keys()), key=str): - spec = current[target] - if filters and not _package_match(spec.package, filters): - continue - - use_sudo = not _is_in_home(target, Path.home()) - try: - _remove_target(target, use_sudo=use_sudo, dry_run=False) - except RuntimeError as e: - ctx.console.warn(str(e)) - continue - - removed += 1 - del current[target] - - _save_link_specs_to_state(current) - _clear_last_link_transaction(remove_backups=True) - ctx.console.success(f"Removed {removed} symlink(s)") - - -def run_undo(ctx: FlowContext, args): - transaction = _load_last_link_transaction() - if transaction is None: - ctx.console.info("No dotfiles link transaction to undo.") - return - - raw_targets = transaction.get("targets") - if not isinstance(raw_targets, list): - ctx.console.error("Invalid undo state format. Remove linked.json and relink dotfiles.") - sys.exit(1) - - restore_plan: List[Tuple[Path, dict]] = [] - for entry in raw_targets: - if not isinstance(entry, dict): - ctx.console.error("Invalid undo state format. Remove linked.json and relink dotfiles.") - sys.exit(1) - - target_raw = entry.get("target") - before = entry.get("before") - if not isinstance(target_raw, str) or not isinstance(before, dict): - ctx.console.error("Invalid undo state format. Remove linked.json and relink dotfiles.") - sys.exit(1) - restore_plan.append((Path(target_raw), before)) - - try: - # Restore deeper paths first to avoid parent/child ordering issues. - for target, snapshot in sorted( - restore_plan, - key=lambda item: (len(item[0].parts), str(item[0])), - reverse=True, - ): - _restore_target_snapshot(target, snapshot) - except RuntimeError as e: - ctx.console.error(str(e)) - sys.exit(1) - - previous_links = transaction.get("previous_links", {}) - try: - _parse_link_specs(previous_links) - except RuntimeError as e: - ctx.console.error(str(e)) - sys.exit(1) - - state = _load_state() - state["version"] = 2 - state["links"] = previous_links - _save_state(state) - _clear_last_link_transaction(remove_backups=True) - ctx.console.success(f"Undid {len(restore_plan)} change(s)") - - -def run_status(ctx: FlowContext, args): - try: - current = _load_link_specs_from_state() - except RuntimeError as e: - ctx.console.error(str(e)) - sys.exit(1) - - if not current: - ctx.console.info("No linked dotfiles.") - return - - grouped: Dict[str, List[LinkSpec]] = {} - for spec in current.values(): - grouped.setdefault(spec.package, []).append(spec) - - for package in sorted(grouped.keys()): - ctx.console.info(f"[{package}]") - for spec in sorted(grouped[package], key=lambda s: str(s.target)): - if spec.target.is_symlink(): - if _same_symlink(spec.target, spec.source): - print(f" OK: {spec.target} -> {spec.source}") - else: - print(f" CHANGED: {spec.target}") - elif spec.target.exists(): - print(f" NOT SYMLINK: {spec.target}") - else: - print(f" BROKEN: {spec.target} (missing)") - - -def run_sync(ctx: FlowContext, args): - _ensure_dotfiles_dir(ctx) - - try: - _pull_dotfiles(ctx, rebase=True) - _sync_modules(ctx, verbose=bool(getattr(args, "verbose", False))) - except RuntimeError as e: - ctx.console.error(str(e)) - sys.exit(1) - - if args.relink: - relink_args = argparse.Namespace(packages=[], profile=args.profile) - run_relink(ctx, relink_args) - - -def _validated_profile_name(profile: Optional[str]) -> Optional[str]: - if not profile: - return None - - profiles = _list_profiles(_flow_config_dir()) - if profile not in profiles: - raise RuntimeError(f"Profile not found: {profile}") - return profile - - -def _package_filter_from_args(args) -> Optional[Set[str]]: - packages = getattr(args, "packages", []) - if not packages: - return None - return {str(pkg) for pkg in packages} - - -def run_modules_list(ctx: FlowContext, args): - _ensure_flow_dir(ctx) - - try: - profile = _validated_profile_name(getattr(args, "profile", None)) - except RuntimeError as e: - ctx.console.error(str(e)) - sys.exit(1) - - package_filter = _package_filter_from_args(args) - - specs = _collect_module_specs( - DOTFILES_DIR, - profile=profile, - package_filter=package_filter, - ) - - if not specs: - ctx.console.info("No module packages found.") - return - - rows = [] - for spec in sorted(specs, key=lambda item: item.package): - module_dir = _module_cache_dir(spec) - if module_dir.exists(): - status = f"ready@{_module_head_short(module_dir)}" - else: - status = "missing" - rows.append([spec.package, _module_ref_label(spec), spec.source, status]) - - ctx.console.table(["PACKAGE", "REF", "SOURCE", "STATUS"], rows) - - -def run_modules_sync(ctx: FlowContext, args): - _ensure_flow_dir(ctx) - - try: - profile = _validated_profile_name(getattr(args, "profile", None)) - except RuntimeError as e: - ctx.console.error(str(e)) - sys.exit(1) - - package_filter = _package_filter_from_args(args) - specs = _collect_module_specs( - DOTFILES_DIR, - profile=profile, - package_filter=package_filter, - ) - - if not specs: - ctx.console.info("No module packages to sync.") - return - - try: - _sync_modules( - ctx, - verbose=bool(getattr(args, "verbose", False)), - profile=profile, - package_filter=package_filter, - ) - except RuntimeError as e: - ctx.console.error(str(e)) - sys.exit(1) - - ctx.console.success(f"Synced {len(specs)} module(s)") - - -def run_repo_status(ctx: FlowContext, args): - _ensure_dotfiles_dir(ctx) - - result = _run_dotfiles_git("status", "--short", "--branch", capture=True) - if result.returncode != 0: - ctx.console.error(result.stderr.strip() or "Failed to read dotfiles git status") - sys.exit(1) - - output = result.stdout.strip() - if output: - print(output) - else: - ctx.console.info("Dotfiles repository is clean.") - - -def run_repo_pull(ctx: FlowContext, args): - _ensure_dotfiles_dir(ctx) - - try: - _pull_dotfiles(ctx, rebase=args.rebase) - _sync_modules(ctx, verbose=bool(getattr(args, "verbose", False))) - except RuntimeError as e: - ctx.console.error(str(e)) - sys.exit(1) - - if args.relink: - relink_args = argparse.Namespace(packages=[], profile=args.profile) - run_relink(ctx, relink_args) - - -def run_repo_push(ctx: FlowContext, args): - _ensure_dotfiles_dir(ctx) - - ctx.console.info("Pushing dotfiles changes...") - result = _run_dotfiles_git("push", capture=True) - if result.returncode != 0: - ctx.console.error(f"Git push failed: {result.stderr.strip()}") - sys.exit(1) - - output = result.stdout.strip() - if output: - print(output) - ctx.console.success("Dotfiles pushed.") - - -def run_relink(ctx: FlowContext, args): - _ensure_flow_dir(ctx) - - args.copy = False - args.force = False - args.dry_run = False - ctx.console.info("Relinking with updated configuration...") - run_link(ctx, args) - - -def run_clean(ctx: FlowContext, args): - try: - current = _load_link_specs_from_state() - except RuntimeError as e: - ctx.console.error(str(e)) - sys.exit(1) - - if not current: - ctx.console.info("No linked dotfiles found.") - return - - removed = 0 - for target in sorted(list(current.keys()), key=str): - if not target.is_symlink() or target.exists(): - continue - - if args.dry_run: - print(f"Would remove broken symlink: {target}") - else: - use_sudo = not _is_in_home(target, Path.home()) - _remove_target(target, use_sudo=use_sudo, dry_run=False) - del current[target] - removed += 1 - - if not args.dry_run: - _save_link_specs_to_state(current) - if removed > 0: - _clear_last_link_transaction(remove_backups=True) - - if removed > 0: - ctx.console.success(f"Cleaned {removed} broken symlink(s)") - else: - ctx.console.info("No broken symlinks found") - - -def run_edit(ctx: FlowContext, args): - _ensure_dotfiles_dir(ctx) - - target_name = args.target - verbose = bool(getattr(args, "verbose", False)) - - edit_target = None - if not _is_path_like_target(target_name): - package_dir = _find_package_dir(target_name) - if package_dir is not None: - try: - package_layer = package_dir.parent.name - package_id = f"{package_layer}/{package_dir.name}" - edit_target = _resolved_package_source( - ctx, - package_id, - package_dir, - verbose=verbose, - ) - except RuntimeError as e: - ctx.console.error(str(e)) - sys.exit(1) - - if edit_target is None: - edit_target = _resolve_edit_target(target_name) - - if edit_target is None: - ctx.console.error(f"No matching package or path found for: {target_name}") - sys.exit(1) - - module_mode = _is_under(edit_target, MODULES_DIR) - - if verbose and module_mode: - ctx.console.info(f"Editing module workspace: {edit_target}") - - if ctx.config.dotfiles_pull_before_edit: - pull_repo = DOTFILES_DIR - if module_mode: - pull_repo = edit_target if edit_target.is_dir() else edit_target.parent - _pull_repo_before_edit(ctx, pull_repo, verbose=verbose) - - editor = os.environ.get("EDITOR", "vim") - ctx.console.info(f"Opening {edit_target} in {editor}...") - edit_result = _RUNNER.run(shlex.split(editor) + [str(edit_target)], capture_output=False) - if edit_result.returncode != 0: - ctx.console.warn(f"Editor exited with status {edit_result.returncode}") - - if module_mode: - module_git_dir = edit_target if edit_target.is_dir() else edit_target.parent - result = _run_git(module_git_dir, "status", "--porcelain", capture=True) - - if result.stdout.strip() and not args.no_commit: - ctx.console.info("Module changes detected, committing...") - _RUNNER.run(["git", "-C", str(module_git_dir), "add", "."], capture_output=False, check=True) - _RUNNER.run( - ["git", "-C", str(module_git_dir), "commit", "-m", f"Update {target_name}"], - capture_output=False, + new_state.links.pop(op.target, None) + + self._save_state(new_state) + self.ctx.console.success(f"Unlinked {plan.summary.removed} file(s).") + + def status(self) -> None: + """Show linked dotfiles status.""" + state = self._load_state() + if not state.links: + self.ctx.console.info("No managed links.") + return + + # Group by package + by_package: dict[str, list[LinkTarget]] = {} + for lt in state.links.values(): + by_package.setdefault(lt.package, []).append(lt) + + rows: list[list[str]] = [] + for pkg_id in sorted(by_package): + links = by_package[pkg_id] + rows.append([pkg_id, str(len(links)), "linked"]) + + self.ctx.console.table(["PACKAGE", "FILES", "STATUS"], rows) + + def edit(self, package_name: str) -> None: + """Open package directory in editor.""" + pkg_dir = self.dotfiles_dir / "_shared" / package_name + if not pkg_dir.is_dir(): + raise FlowError(f"Package not found: {package_name}") + + self.ctx.console.info(f"Package directory: {pkg_dir}") + + def sync(self) -> None: + """Pull latest dotfiles and sync modules.""" + if not self.dotfiles_dir.is_dir(): + if not self.ctx.config.dotfiles_url: + raise FlowError("No dotfiles URL configured") + self.ctx.console.info(f"Cloning dotfiles from {self.ctx.config.dotfiles_url}") + self.ctx.runtime.git.run( + self.dotfiles_dir.parent, + "clone", self.ctx.config.dotfiles_url, str(self.dotfiles_dir), check=True, ) + else: + self.ctx.console.info("Pulling latest dotfiles...") + self.ctx.runtime.git.run( + self.dotfiles_dir, "pull", "--ff-only", check=True, + ) + # Sync modules + packages = self._discover_packages(profile=None) + for pkg in packages: + if pkg.module: + self._sync_module(pkg) + + def _sync_module(self, pkg: Package) -> None: + """Clone or update a module.""" + module = pkg.module + assert module is not None + cache_dir = module.cache_dir + + if cache_dir.is_dir(): + self.ctx.console.info(f" Updating module: {pkg.package_id}") + self.ctx.runtime.git.run(cache_dir, "fetch", "--all", check=True) + if module.ref_type == "branch": + self.ctx.runtime.git.run( + cache_dir, "checkout", module.ref_value, check=True, + ) + self.ctx.runtime.git.run( + cache_dir, "pull", "--ff-only", check=True, + ) + elif module.ref_type == "tag": + self.ctx.runtime.git.run( + cache_dir, "checkout", f"tags/{module.ref_value}", check=True, + ) + elif module.ref_type == "commit": + self.ctx.runtime.git.run( + cache_dir, "checkout", module.ref_value, check=True, + ) + else: + self.ctx.console.info(f" Cloning module: {pkg.package_id}") + self.ctx.runtime.runner.run( + ["git", "clone", module.source, str(cache_dir)], + check=True, + ) + if module.ref_type != "branch" or module.ref_value != "main": + ref = module.ref_value + if module.ref_type == "tag": + ref = f"tags/{ref}" + self.ctx.runtime.git.run(cache_dir, "checkout", ref, check=True) + + def _discover_packages(self, profile: Optional[str]) -> list[Package]: + """Walk dotfiles dir and build Package objects.""" + packages: list[Package] = [] + + if not self.dotfiles_dir.is_dir(): + return packages + + layers = ["_shared"] + if profile: + layers.append(profile) + + for layer in layers: + layer_dir = self.dotfiles_dir / layer + if not layer_dir.is_dir(): + continue + + for pkg_dir in sorted(layer_dir.iterdir()): + if not pkg_dir.is_dir() or pkg_dir.name in SKIP_DIRS: + continue + + package_id = f"{layer}/{pkg_dir.name}" + + # Find _module.yaml (if any) + module_ref = self._find_module(pkg_dir, package_id) + + # Collect local files + local_files = self._collect_files(pkg_dir) + + packages.append(Package( + name=pkg_dir.name, + layer=layer, + package_id=package_id, + source_dir=pkg_dir, + module=module_ref, + local_files=tuple(local_files), + )) + + return packages + + def _find_module(self, pkg_dir: Path, package_id: str) -> Optional[ModuleRef]: + """Find and parse _module.yaml in a package directory.""" + for module_yaml in pkg_dir.rglob(MODULE_FILE): try: - response = input("Push module changes to remote? [Y/n] ") - except (EOFError, KeyboardInterrupt): - response = "n" - print() - if response.lower() != "n": - _RUNNER.run(["git", "-C", str(module_git_dir), "push"], capture_output=False, check=True) - ctx.console.success("Module changes committed and pushed") - else: - ctx.console.info("Module changes committed locally (not pushed)") - elif result.stdout.strip() and args.no_commit: - ctx.console.info("Module changes detected; skipped commit (--no-commit)") - else: - ctx.console.info("No module changes to commit") - return + with open(module_yaml) as f: + raw = yaml.safe_load(f) or {} + except Exception: + continue - result = _run_dotfiles_git("status", "--porcelain", capture=True) + mount_path = compute_mount_path(module_yaml, pkg_dir) - if result.stdout.strip() and not args.no_commit: - ctx.console.info("Changes detected, committing...") - _RUNNER.run(["git", "-C", str(DOTFILES_DIR), "add", "."], capture_output=False, check=True) - _RUNNER.run( - ["git", "-C", str(DOTFILES_DIR), "commit", "-m", f"Update {target_name}"], - capture_output=False, - check=True, - ) + ref = parse_module_ref(raw, package_id, mount_path, self.modules_dir) - try: - response = input("Push changes to remote? [Y/n] ") - except (EOFError, KeyboardInterrupt): - response = "n" - print() - if response.lower() != "n": - _RUNNER.run(["git", "-C", str(DOTFILES_DIR), "push"], capture_output=False, check=True) - ctx.console.success("Changes committed and pushed") - else: - ctx.console.info("Changes committed locally (not pushed)") - elif result.stdout.strip() and args.no_commit: - ctx.console.info("Changes detected; skipped commit (--no-commit)") - else: - ctx.console.info("No changes to commit") + # If module is cloned, populate module_files + if ref.cache_dir.is_dir(): + module_files = self._collect_files(ref.cache_dir) + ref = ModuleRef( + source=ref.source, + ref_type=ref.ref_type, + ref_value=ref.ref_value, + mount_path=ref.mount_path, + cache_dir=ref.cache_dir, + module_files=tuple(module_files), + ) + + return ref + return None + + def _collect_files(self, root_dir: Path) -> list[tuple[Path, Path]]: + """Collect all files in a directory as (absolute, relative) pairs.""" + files: list[tuple[Path, Path]] = [] + for path in sorted(root_dir.rglob("*")): + if not path.is_file(): + continue + if path.name in SKIP_FILES: + continue + # Skip .git contents + try: + path.relative_to(root_dir / ".git") + continue + except ValueError: + pass + rel = path.relative_to(root_dir) + files.append((path, rel)) + return files + + def _filesystem_check(self, path: Path) -> Optional[str]: + """Check what exists at a path. Returns type or None.""" + if path.is_symlink(): + return "symlink" + if path.is_file(): + return "file" + if path.is_dir(): + return "dir" + return None + + def _load_state(self) -> LinkedState: + """Load linked state from disk.""" + data = self.ctx.runtime.fs.read_json(paths.LINKED_STATE, default={}) + if data is None: + data = {} + return LinkedState.from_dict(data) + + def _save_state(self, state: LinkedState) -> None: + """Save linked state to disk.""" + self.ctx.runtime.fs.write_json(paths.LINKED_STATE, state.as_dict()) diff --git a/src/flow/services/packages.py b/src/flow/services/packages.py index 0b2e15c..1d9d969 100644 --- a/src/flow/services/packages.py +++ b/src/flow/services/packages.py @@ -1,113 +1,233 @@ -"""Package-state service built on shared package definitions.""" +# src/flow/services/packages.py +"""PackageService -- orchestrates package installation.""" from __future__ import annotations from pathlib import Path +from typing import Any, Optional from flow.core.config import FlowContext -from flow.core.system import JsonStateStore -from flow.services.package_defs import BinaryInstaller, get_package_catalog +from flow.core.errors import FlowError +from flow.core import paths +from flow.domain.packages.catalog import normalize_profile_entry, parse_catalog +from flow.domain.packages.models import ( + InstalledPackage, + InstalledState, + PackageDef, + PackagePlan, +) +from flow.domain.packages.planning import plan_install, plan_remove +from flow.domain.packages.resolution import ( + detect_package_manager, + pm_install_command, + pm_update_command, + resolve_binary_asset, + resolve_download_url, + resolve_source_name, + resolve_spec, +) class PackageService: - def __init__(self, ctx: FlowContext, *, installed_state: Path): + def __init__(self, ctx: FlowContext): self.ctx = ctx - self.installed = JsonStateStore(installed_state, ctx.runtime.fs, dict) - self.binary_installer = BinaryInstaller(ctx) - def load_installed(self) -> dict: - state = self.installed.load() - return state if isinstance(state, dict) else {} + def install( + self, + package_names: Optional[list[str]] = None, + *, + profile: Optional[str] = None, + dry_run: bool = False, + ) -> None: + """Install packages from profile or by name.""" + catalog = parse_catalog(self.ctx.manifest) + installed = self._load_state() + pm = detect_package_manager() - def save_installed(self, state: dict) -> None: - self.installed.save(state) - - def definitions(self): - return get_package_catalog(self.ctx) - - def install(self, args) -> None: - definitions = self.definitions() - installed = self.load_installed() - had_error = False - - for package_name in args.packages: - package_def = definitions.get(package_name) - if not package_def: - self.ctx.console.error(f"Package not found in manifest: {package_name}") - had_error = True - continue - - package_type = package_def.get("type", "pkg") - if package_type != "binary": - self.ctx.console.error( - f"'flow package install' supports binary packages only. '{package_name}' is type '{package_type}'." - ) - had_error = True - continue - - self.ctx.console.info(f"Installing {package_name}...") - try: - self.binary_installer.install(package_def, {}, dry_run=args.dry_run) - except RuntimeError as exc: - self.ctx.console.error(str(exc)) - had_error = True - continue - - if not args.dry_run: - installed[package_name] = { - "version": str(package_def.get("version", "")), - "type": package_type, - } - self.ctx.console.success(f"Installed {package_name}") - - if not args.dry_run: - self.save_installed(installed) - if had_error: - raise SystemExit(1) - - def list(self, args) -> None: - definitions = self.definitions() - installed = self.load_installed() - - rows = [] - if args.all: - if not definitions: - self.ctx.console.info("No packages defined in manifest.") - return - for name, package_def in sorted(definitions.items()): - rows.append( - [ - name, - str(package_def.get("type", "pkg")), - str(installed.get(name, {}).get("version", "-")), - str(package_def.get("version", "")) or "-", - ] - ) + # Resolve packages to install + packages: list[PackageDef] = [] + if package_names: + for name in package_names: + ref = normalize_profile_entry(name) + pkg = resolve_spec(ref, catalog) + packages.append(pkg) + elif profile: + profiles = self.ctx.manifest.get("profiles", {}) + if profile not in profiles: + raise FlowError(f"Unknown profile: {profile}") + profile_data = profiles[profile] + for entry in profile_data.get("packages", []): + ref = normalize_profile_entry(entry) + pkg = resolve_spec(ref, catalog) + packages.append(pkg) else: - if not installed: - self.ctx.console.info("No packages installed.") - return - for name, info in sorted(installed.items()): - rows.append( - [ - name, - str(info.get("type", "?")), - str(info.get("version", "?")), - str(definitions.get(name, {}).get("version", "")) or "-", - ] - ) + raise FlowError("Specify package names or --profile") - self.ctx.console.table(["PACKAGE", "TYPE", "INSTALLED", "AVAILABLE"], rows) + plan = plan_install(packages, installed, self.ctx.platform.platform, pm) - def remove(self, args) -> None: - installed = self.load_installed() - for package_name in args.packages: - if package_name not in installed: - self.ctx.console.warn(f"Package not installed: {package_name}") - continue - del installed[package_name] - self.ctx.console.success(f"Removed {package_name} from installed packages") - self.ctx.console.warn( - "Note: installed files were not automatically deleted. Remove manually if needed." + if not plan.install_ops: + self.ctx.console.info("All packages already installed.") + return + + self.ctx.console.print_plan( + [str(op) for op in plan.install_ops], verb="install" + ) + + if dry_run: + return + + # Execute PM packages + if plan.pm_update_needed and pm: + self.ctx.console.info(f"Updating package manager ({pm})...") + self.ctx.runtime.runner.run_shell( + pm_update_command(pm), check=True, ) - self.save_installed(installed) + + pm_names = [ + op.source_name for op in plan.install_ops if op.method == "pm" + ] + if pm_names and pm: + cmd = pm_install_command(pm, pm_names) + self.ctx.console.info(f"Installing: {', '.join(pm_names)}") + self.ctx.runtime.runner.run_shell(cmd, check=True) + for op in plan.install_ops: + if op.method == "pm": + installed.packages[op.package.name] = InstalledPackage( + name=op.package.name, + version=op.package.version or "system", + type="pkg", + ) + + # Execute binary packages + for op in plan.install_ops: + if op.method == "binary" and op.download_url: + self._install_binary(op.package, op.download_url, op.source_name, installed) + elif op.method == "appimage" and op.download_url: + self._install_appimage(op.package, op.download_url, installed) + + self._save_state(installed) + self.ctx.console.success(f"Installed {len(plan.install_ops)} package(s).") + + def _install_binary( + self, pkg: PackageDef, url: str, asset: str, state: InstalledState, + ) -> None: + """Download and install a binary package.""" + self.ctx.console.info(f"Downloading {pkg.name}...") + tmp_dir = paths.DATA_DIR / "tmp" + self.ctx.runtime.fs.ensure_dir(tmp_dir) + archive = tmp_dir / asset + + self.ctx.runtime.runner.run_shell( + f"curl -fSL -o {archive} '{url}'", check=True, + ) + + bin_dir = Path.home() / ".local" / "bin" + self.ctx.runtime.fs.ensure_dir(bin_dir) + + installed_files: list[Path] = [] + if asset.endswith((".tar.gz", ".tar.xz", ".tar.bz2", ".tgz")): + extract_dir = tmp_dir / f"{pkg.name}-extract" + self.ctx.runtime.fs.ensure_dir(extract_dir) + self.ctx.runtime.runner.run_shell( + f"tar -xf {archive} -C {extract_dir}", check=True, + ) + # Find and install binaries + install_cfg = pkg.install or {} + binary_name = install_cfg.get("binary", pkg.name) + src_dir = pkg.extract_dir or "" + + for candidate in extract_dir.rglob(binary_name): + if candidate.is_file(): + target = bin_dir / binary_name + self.ctx.runtime.fs.copy_file(candidate, target) + target.chmod(0o755) + installed_files.append(target) + break + else: + # Single binary + target = bin_dir / pkg.name + self.ctx.runtime.fs.copy_file(archive, target) + target.chmod(0o755) + installed_files.append(target) + + # Cleanup + self.ctx.runtime.fs.remove_tree(tmp_dir) + + state.packages[pkg.name] = InstalledPackage( + name=pkg.name, + version=pkg.version or "latest", + type="binary", + files=installed_files, + ) + + def _install_appimage( + self, pkg: PackageDef, url: str, state: InstalledState, + ) -> None: + """Download and install an AppImage.""" + bin_dir = Path.home() / ".local" / "bin" + self.ctx.runtime.fs.ensure_dir(bin_dir) + target = bin_dir / pkg.name + + self.ctx.console.info(f"Downloading {pkg.name} AppImage...") + self.ctx.runtime.runner.run_shell( + f"curl -fSL -o {target} '{url}'", check=True, + ) + target.chmod(0o755) + + state.packages[pkg.name] = InstalledPackage( + name=pkg.name, + version=pkg.version or "latest", + type="appimage", + files=[target], + ) + + def remove( + self, + package_names: list[str], + *, + dry_run: bool = False, + ) -> None: + """Remove installed packages.""" + installed = self._load_state() + plan = plan_remove(package_names, installed) + + if not plan.remove_ops: + self.ctx.console.info("No matching packages to remove.") + return + + self.ctx.console.print_plan( + [str(op) for op in plan.remove_ops], verb="remove" + ) + + if dry_run: + return + + for op in plan.remove_ops: + for f in op.files: + self.ctx.runtime.fs.remove_file(f, missing_ok=True) + installed.packages.pop(op.name, None) + + self._save_state(installed) + self.ctx.console.success(f"Removed {len(plan.remove_ops)} package(s).") + + def list_packages(self) -> None: + """List installed packages.""" + installed = self._load_state() + if not installed.packages: + self.ctx.console.info("No packages installed by flow.") + return + + rows = [ + [name, pkg.version, pkg.type] + for name, pkg in sorted(installed.packages.items()) + ] + self.ctx.console.table(["NAME", "VERSION", "TYPE"], rows) + + def _load_state(self) -> InstalledState: + data = self.ctx.runtime.fs.read_json(paths.INSTALLED_STATE, default={}) + if data is None: + data = {} + return InstalledState.from_dict(data) + + def _save_state(self, state: InstalledState) -> None: + self.ctx.runtime.fs.write_json(paths.INSTALLED_STATE, state.as_dict()) diff --git a/src/flow/services/projects.py b/src/flow/services/projects.py index 0f510ea..f50d86c 100644 --- a/src/flow/services/projects.py +++ b/src/flow/services/projects.py @@ -1,174 +1,87 @@ -"""Project sync service for `flow sync`.""" +"""ProjectService -- manages git project status.""" from __future__ import annotations -import os -import subprocess +from pathlib import Path +from typing import Optional from flow.core.config import FlowContext from flow.core.errors import FlowError -class ProjectSyncService: - """Inspect and synchronize git repositories under the projects directory.""" - +class ProjectService: def __init__(self, ctx: FlowContext): self.ctx = ctx - self.runner = ctx.runtime.runner + self.projects_dir = Path(self.ctx.config.projects_dir).expanduser() - def git(self, repo: str, *cmd: str, capture: bool = True) -> subprocess.CompletedProcess[str]: - return self.runner.run( - ["git", "-C", repo, *cmd], - capture_output=capture, - ) + def check(self, *, fetch: bool = False) -> None: + """Check status of all git repos in projects dir.""" + if not self.projects_dir.is_dir(): + self.ctx.console.info(f"Projects directory not found: {self.projects_dir}") + return - def is_git_repo(self, repo_path: str) -> bool: - git_dir = os.path.join(repo_path, ".git") - return os.path.isdir(git_dir) or os.path.isfile(git_dir) + repos = self._find_repos() + if not repos: + self.ctx.console.info("No git repositories found.") + return - def check_repo(self, repo_path: str, do_fetch: bool = True) -> tuple[str, list[str] | None]: - name = os.path.basename(repo_path) - if not self.is_git_repo(repo_path): - return name, None - - issues: list[str] = [] - - if do_fetch: - fetch_result = self.git(repo_path, "fetch", "--all", "--quiet") - if fetch_result.returncode != 0: - issues.append("git fetch failed") - - result = self.git(repo_path, "rev-parse", "--abbrev-ref", "HEAD") - branch = result.stdout.strip() if result.returncode == 0 else "HEAD" - - diff_result = self.git(repo_path, "diff", "--quiet") - cached_result = self.git(repo_path, "diff", "--cached", "--quiet") - if diff_result.returncode != 0 or cached_result.returncode != 0: - issues.append("uncommitted changes") - else: - untracked = self.git(repo_path, "ls-files", "--others", "--exclude-standard") - if untracked.stdout.strip(): - issues.append("untracked files") - - upstream_check = self.git(repo_path, "rev-parse", "--abbrev-ref", f"{branch}@{{u}}") - if upstream_check.returncode == 0: - unpushed = self.git(repo_path, "rev-list", "--oneline", f"{branch}@{{u}}..{branch}") - if unpushed.stdout.strip(): - issues.append( - f"{len(unpushed.stdout.strip().splitlines())} unpushed commit(s) on {branch}" - ) - else: - issues.append(f"no upstream for {branch}") - - branches_result = self.git( - repo_path, - "for-each-ref", - "--format=%(refname:short)", - "refs/heads", - ) - for branch_name in branches_result.stdout.strip().splitlines(): - if not branch_name or branch_name == branch: - continue - upstream = self.git(repo_path, "rev-parse", "--abbrev-ref", f"{branch_name}@{{u}}") - if upstream.returncode == 0: - ahead = self.git(repo_path, "rev-list", "--count", f"{branch_name}@{{u}}..{branch_name}") - if ahead.stdout.strip() != "0": - issues.append(f"branch {branch_name}: {ahead.stdout.strip()} ahead") - else: - issues.append(f"branch {branch_name}: no upstream") - - return name, issues - - def _projects_dir(self) -> str: - projects_dir = os.path.expanduser(self.ctx.config.projects_dir) - if not os.path.isdir(projects_dir): - raise FlowError(f"Projects directory not found: {projects_dir}") - return projects_dir - - def run_check(self, args) -> None: - projects_dir = self._projects_dir() + if fetch: + self.ctx.console.info("Fetching all remotes...") + for repo in repos: + self.ctx.runtime.git.run(repo, "fetch", "--all", "--quiet") rows = [] - needs_action = [] - not_git = [] - checked = 0 + for repo in repos: + status = self._repo_status(repo) + rows.append([repo.name, status]) - for entry in sorted(os.listdir(projects_dir)): - repo_path = os.path.join(projects_dir, entry) - if not os.path.isdir(repo_path): + self.ctx.console.table(["REPO", "STATUS"], rows) + + def summary(self) -> None: + """Quick summary without fetch.""" + self.check(fetch=False) + + def fetch(self) -> None: + """Fetch all remotes then show status.""" + self.check(fetch=True) + + def _find_repos(self) -> list[Path]: + """Find all git repos in projects dir (immediate children only).""" + repos = [] + for child in sorted(self.projects_dir.iterdir()): + if not child.is_dir(): continue + # Check for .git dir or .git file (worktree) + git_path = child / ".git" + if git_path.exists(): + repos.append(child) + return repos - name, issues = self.check_repo(repo_path, do_fetch=args.fetch) - if issues is None: - not_git.append(name) - continue - checked += 1 - if issues: - needs_action.append(name) - rows.append([name, "; ".join(issues)]) - else: - rows.append([name, "clean and synced"]) + def _repo_status(self, repo: Path) -> str: + """Get human-readable status for a repo.""" + parts = [] - if checked == 0: - self.ctx.console.info("No git repositories found in projects directory.") - if not_git: - self.ctx.console.info(f"Skipped non-git directories: {', '.join(sorted(not_git))}") - return + # Check for uncommitted changes + result = self.ctx.runtime.git.run( + repo, "status", "--porcelain", + ) + if result.stdout.strip(): + parts.append("uncommitted changes") - self.ctx.console.table(["PROJECT", "STATUS"], rows) + # Check ahead/behind + result = self.ctx.runtime.git.run( + repo, "rev-list", "--left-right", "--count", "HEAD...@{u}", + ) + if result.returncode == 0 and result.stdout.strip(): + counts = result.stdout.strip().split() + if len(counts) == 2: + ahead, behind = int(counts[0]), int(counts[1]) + if ahead > 0: + parts.append(f"{ahead} ahead") + if behind > 0: + parts.append(f"{behind} behind") - if needs_action: - self.ctx.console.warn(f"Projects needing action: {', '.join(sorted(needs_action))}") - else: - self.ctx.console.success("All repositories clean and synced.") + if not parts: + parts.append("clean") - if not_git: - self.ctx.console.info(f"Skipped non-git directories: {', '.join(sorted(not_git))}") - - def run_fetch(self, _args) -> None: - projects_dir = self._projects_dir() - - had_error = False - fetched = 0 - for entry in sorted(os.listdir(projects_dir)): - repo_path = os.path.join(projects_dir, entry) - if not self.is_git_repo(repo_path): - continue - self.ctx.console.info(f"Fetching {entry}...") - result = self.git(repo_path, "fetch", "--all", "--quiet") - fetched += 1 - if result.returncode != 0: - self.ctx.console.error(f"Failed to fetch {entry}") - had_error = True - - if fetched == 0: - self.ctx.console.info("No git repositories found in projects directory.") - return - - if had_error: - raise SystemExit(1) - - self.ctx.console.success("All remotes fetched.") - - def run_summary(self, _args) -> None: - projects_dir = self._projects_dir() - - rows = [] - for entry in sorted(os.listdir(projects_dir)): - repo_path = os.path.join(projects_dir, entry) - if not os.path.isdir(repo_path): - continue - - name, issues = self.check_repo(repo_path, do_fetch=False) - if issues is None: - rows.append([name, "not a git repo"]) - elif issues: - rows.append([name, "; ".join(issues)]) - else: - rows.append([name, "clean"]) - - if not rows: - self.ctx.console.info("No projects found.") - return - - self.ctx.console.table(["PROJECT", "STATUS"], rows) + return ", ".join(parts) diff --git a/src/flow/services/remote.py b/src/flow/services/remote.py new file mode 100644 index 0000000..85abb89 --- /dev/null +++ b/src/flow/services/remote.py @@ -0,0 +1,67 @@ +"""RemoteService -- manages SSH connections to targets.""" + +from __future__ import annotations + +import os +from typing import Optional + +from flow.core.config import FlowContext +from flow.core.errors import FlowError +from flow.domain.remote.resolution import ( + build_ssh_command, + list_targets, + resolve_target, + terminfo_fix_command, +) + + +class RemoteService: + def __init__(self, ctx: FlowContext): + self.ctx = ctx + + def enter( + self, + target_spec: str, + *, + dry_run: bool = False, + ) -> None: + """SSH into a target.""" + target = resolve_target(target_spec, self.ctx.config.targets) + cmd = build_ssh_command(target) + + self.ctx.console.info(f"Connecting to {target.label} ({target.host})") + + if dry_run: + self.ctx.console.info(f"Would run: {' '.join(cmd.argv)}") + return + + # Set env vars for the SSH session + env = dict(os.environ) + env.update(cmd.env) + + self.ctx.runtime.runner.run( + cmd.argv, + env=env, + capture_output=False, + check=True, + ) + + def list(self) -> None: + """List configured targets.""" + targets = list_targets(self.ctx.config.targets) + if not targets: + self.ctx.console.info("No targets configured.") + return + + rows = [ + [t.label, t.host, t.identity or "-"] + for t in targets + ] + self.ctx.console.table(["TARGET", "HOST", "IDENTITY"], rows) + + def fix_terminfo(self, target_spec: str) -> None: + """Show terminfo fix commands.""" + cmds = terminfo_fix_command() + self.ctx.console.info("Run these commands to fix terminfo:") + for cmd in cmds: + self.ctx.console.info(f" {cmd}") diff --git a/tests/test_service_bootstrap.py b/tests/test_service_bootstrap.py new file mode 100644 index 0000000..e7c46fe --- /dev/null +++ b/tests/test_service_bootstrap.py @@ -0,0 +1,67 @@ +"""Tests for BootstrapService.""" + +from pathlib import Path + +import pytest + +from flow.core.config import AppConfig, FlowContext +from flow.core.console import Console +from flow.core.errors import FlowError +from flow.core.platform import PlatformInfo +from flow.core.runtime import SystemRuntime +from flow.services.bootstrap import BootstrapService + + +def _make_ctx(manifest=None): + return FlowContext( + config=AppConfig(), + manifest=manifest or {}, + platform=PlatformInfo(), + console=Console(color=False), + runtime=SystemRuntime(), + ) + + +class TestBootstrapService: + def test_show_profile(self, capsys): + manifest = { + "profiles": { + "work": { + "os": "linux", + "hostname": "dev", + "packages": ["fd"], + }, + }, + "packages": [{"name": "fd", "type": "pkg"}], + } + ctx = _make_ctx(manifest) + svc = BootstrapService(ctx) + svc.show("work") + output = capsys.readouterr().out + assert "work" in output + + def test_unknown_profile_raises(self): + ctx = _make_ctx({"profiles": {}}) + svc = BootstrapService(ctx) + with pytest.raises(FlowError, match="Unknown profile"): + svc.run("missing") + + def test_list_profiles(self, capsys): + manifest = { + "profiles": { + "work": {"os": "linux", "hostname": "dev"}, + "personal": {"os": "linux"}, + }, + } + ctx = _make_ctx(manifest) + svc = BootstrapService(ctx) + svc.list_profiles() + output = capsys.readouterr().out + assert "work" in output + assert "personal" in output + + def test_list_profiles_empty(self, capsys): + ctx = _make_ctx({}) + svc = BootstrapService(ctx) + svc.list_profiles() + assert "No profiles" in capsys.readouterr().out diff --git a/tests/test_service_containers.py b/tests/test_service_containers.py new file mode 100644 index 0000000..4234cf8 --- /dev/null +++ b/tests/test_service_containers.py @@ -0,0 +1,73 @@ +"""Tests for ContainerService.""" + +import subprocess +from pathlib import Path +from unittest.mock import MagicMock, patch + +from flow.core.config import AppConfig, FlowContext +from flow.core.console import Console +from flow.core.platform import PlatformInfo +from flow.core.runtime import CommandRunner, FileSystem, SystemRuntime +from flow.core import paths +from flow.services.containers import ContainerService + + +class FakeRunner(CommandRunner): + """CommandRunner that captures calls instead of executing.""" + def __init__(self): + self.calls: list[tuple] = [] + + def run(self, argv, *, cwd=None, env=None, capture_output=True, check=False, timeout=None): + self.calls.append(("run", list(argv))) + return subprocess.CompletedProcess(argv, 0, stdout="", stderr="") + + def run_shell(self, command, *, cwd=None, env=None, capture_output=True, check=False, timeout=None): + self.calls.append(("run_shell", command)) + return subprocess.CompletedProcess(command, 0, stdout="", stderr="") + + +def _make_ctx(tmp_path, runner=None): + rt = SystemRuntime() + if runner: + rt.runner = runner + return FlowContext( + config=AppConfig(), + manifest={}, + platform=PlatformInfo(), + console=Console(color=False), + runtime=rt, + ) + + +class TestContainerService: + def test_create_dry_run(self, tmp_path, capsys, monkeypatch): + monkeypatch.setattr(paths, "HOME", tmp_path) + monkeypatch.setattr(paths, "DOTFILES_DIR", tmp_path / "dotfiles") + ctx = _make_ctx(tmp_path) + svc = ContainerService(ctx) + svc.create("devbox", "personal", dry_run=True) + output = capsys.readouterr().out + assert "devbox" in output + + def test_list_no_docker(self, tmp_path, capsys): + runner = FakeRunner() + ctx = _make_ctx(tmp_path, runner=runner) + svc = ContainerService(ctx) + svc.list() + # FakeRunner returns empty stdout -> "No flow containers" + output = capsys.readouterr().out + assert "No flow containers" in output + + def test_stop_calls_docker(self, tmp_path): + runner = FakeRunner() + ctx = _make_ctx(tmp_path, runner=runner) + svc = ContainerService(ctx) + svc.stop("flow-personal-devbox") + assert any("docker" in str(c) and "stop" in str(c) for c in runner.calls) + + def test_remove_calls_docker(self, tmp_path): + runner = FakeRunner() + ctx = _make_ctx(tmp_path, runner=runner) + svc = ContainerService(ctx) + svc.remove("flow-personal-devbox") + assert any("docker" in str(c) and "rm" in str(c) for c in runner.calls) diff --git a/tests/test_service_dotfiles.py b/tests/test_service_dotfiles.py new file mode 100644 index 0000000..27bd3f4 --- /dev/null +++ b/tests/test_service_dotfiles.py @@ -0,0 +1,172 @@ +"""Tests for DotfilesService.""" + +from pathlib import Path +from unittest.mock import MagicMock + +import yaml + +from flow.core.config import AppConfig, FlowContext +from flow.core.console import Console +from flow.core.platform import PlatformInfo +from flow.core.runtime import SystemRuntime +from flow.core import paths +from flow.services.dotfiles import DotfilesService + + +def _make_ctx(tmp_path, console=None): + """Build a FlowContext for testing.""" + return FlowContext( + config=AppConfig(), + manifest={}, + platform=PlatformInfo(), + console=console or Console(color=False), + runtime=SystemRuntime(), + ) + + +def _setup_dotfiles(tmp_path, packages_files): + """Set up a fake dotfiles directory structure. + packages_files: dict of {package_name: {relative_path: content}} + """ + dotfiles = tmp_path / "dotfiles" + shared = dotfiles / "_shared" + for pkg_name, files in packages_files.items(): + pkg_dir = shared / pkg_name + for rel_path, content in files.items(): + file_path = pkg_dir / rel_path + file_path.parent.mkdir(parents=True, exist_ok=True) + file_path.write_text(content) + return dotfiles + + +class TestDotfilesServiceLink: + def test_link_creates_symlinks(self, tmp_path, monkeypatch): + home = tmp_path / "home" + home.mkdir() + + dotfiles = _setup_dotfiles(tmp_path, { + "zsh": {".zshrc": "# zsh config"}, + "git": {".config/git/config": "[user]\n name = test"}, + }) + + monkeypatch.setattr(paths, "HOME", home) + monkeypatch.setattr(paths, "DOTFILES_DIR", dotfiles) + monkeypatch.setattr(paths, "MODULES_DIR", tmp_path / "modules") + monkeypatch.setattr(paths, "LINKED_STATE", tmp_path / "state" / "linked.json") + + ctx = _make_ctx(tmp_path) + svc = DotfilesService(ctx) + svc.link() + + assert (home / ".zshrc").is_symlink() + assert (home / ".config" / "git" / "config").is_symlink() + + def test_link_with_module(self, tmp_path, monkeypatch): + home = tmp_path / "home" + home.mkdir() + + dotfiles = tmp_path / "dotfiles" + modules = tmp_path / "modules" + + # Set up package with _module.yaml + pkg_dir = dotfiles / "_shared" / "nvim" + config_dir = pkg_dir / ".config" / "nvim" + config_dir.mkdir(parents=True) + (config_dir / "_module.yaml").write_text(yaml.dump({ + "source": "github:test/nvim-config", + "ref": {"branch": "main"}, + })) + + # Set up local file outside mount path + (pkg_dir / ".local" / "bin").mkdir(parents=True) + (pkg_dir / ".local" / "bin" / "nvim-wrapper").write_text("#!/bin/sh") + + # Set up cloned module + module_dir = modules / "_shared--nvim" + module_dir.mkdir(parents=True) + (module_dir / "init.lua").write_text("-- init") + (module_dir / "lua").mkdir() + (module_dir / "lua" / "plugins.lua").write_text("-- plugins") + + monkeypatch.setattr(paths, "HOME", home) + monkeypatch.setattr(paths, "DOTFILES_DIR", dotfiles) + monkeypatch.setattr(paths, "MODULES_DIR", modules) + monkeypatch.setattr(paths, "LINKED_STATE", tmp_path / "state" / "linked.json") + + ctx = _make_ctx(tmp_path) + svc = DotfilesService(ctx) + svc.link() + + # Module files should be linked under .config/nvim/ + assert (home / ".config" / "nvim" / "init.lua").is_symlink() + assert (home / ".config" / "nvim" / "lua" / "plugins.lua").is_symlink() + # Local file outside mount path should be linked + assert (home / ".local" / "bin" / "nvim-wrapper").is_symlink() + + def test_unlink_removes_symlinks(self, tmp_path, monkeypatch): + home = tmp_path / "home" + home.mkdir() + + dotfiles = _setup_dotfiles(tmp_path, { + "zsh": {".zshrc": "# zsh"}, + }) + + monkeypatch.setattr(paths, "HOME", home) + monkeypatch.setattr(paths, "DOTFILES_DIR", dotfiles) + monkeypatch.setattr(paths, "MODULES_DIR", tmp_path / "modules") + monkeypatch.setattr(paths, "LINKED_STATE", tmp_path / "state" / "linked.json") + + ctx = _make_ctx(tmp_path) + svc = DotfilesService(ctx) + + # Link first + svc.link() + assert (home / ".zshrc").is_symlink() + + # Then unlink + svc.unlink() + assert not (home / ".zshrc").exists() + + def test_link_dry_run_no_changes(self, tmp_path, monkeypatch): + home = tmp_path / "home" + home.mkdir() + + dotfiles = _setup_dotfiles(tmp_path, { + "zsh": {".zshrc": "# zsh"}, + }) + + monkeypatch.setattr(paths, "HOME", home) + monkeypatch.setattr(paths, "DOTFILES_DIR", dotfiles) + monkeypatch.setattr(paths, "MODULES_DIR", tmp_path / "modules") + monkeypatch.setattr(paths, "LINKED_STATE", tmp_path / "state" / "linked.json") + + ctx = _make_ctx(tmp_path) + svc = DotfilesService(ctx) + svc.link(dry_run=True) + + # No symlinks should exist + assert not (home / ".zshrc").exists() + + def test_status_shows_packages(self, tmp_path, monkeypatch, capsys): + home = tmp_path / "home" + home.mkdir() + + dotfiles = _setup_dotfiles(tmp_path, { + "zsh": {".zshrc": "# zsh"}, + }) + + monkeypatch.setattr(paths, "HOME", home) + monkeypatch.setattr(paths, "DOTFILES_DIR", dotfiles) + monkeypatch.setattr(paths, "MODULES_DIR", tmp_path / "modules") + monkeypatch.setattr(paths, "LINKED_STATE", tmp_path / "state" / "linked.json") + + ctx = _make_ctx(tmp_path) + svc = DotfilesService(ctx) + + # Link first to populate state + svc.link() + + # Check status + svc.status() + output = capsys.readouterr().out + assert "zsh" in output diff --git a/tests/test_service_packages.py b/tests/test_service_packages.py new file mode 100644 index 0000000..da72afc --- /dev/null +++ b/tests/test_service_packages.py @@ -0,0 +1,65 @@ +"""Tests for PackageService.""" + +from pathlib import Path + +import pytest + +from flow.core.config import AppConfig, FlowContext +from flow.core.console import Console +from flow.core.errors import FlowError +from flow.core.platform import PlatformInfo +from flow.core.runtime import SystemRuntime +from flow.core import paths +from flow.domain.packages.models import InstalledPackage, InstalledState +from flow.services.packages import PackageService + + +def _make_ctx(tmp_path, manifest=None): + return FlowContext( + config=AppConfig(), + manifest=manifest or {}, + platform=PlatformInfo(), + console=Console(color=False), + runtime=SystemRuntime(), + ) + + +class TestPackageService: + def test_list_empty(self, tmp_path, monkeypatch, capsys): + monkeypatch.setattr(paths, "INSTALLED_STATE", tmp_path / "installed.json") + ctx = _make_ctx(tmp_path) + svc = PackageService(ctx) + svc.list_packages() + assert "No packages" in capsys.readouterr().out + + def test_list_shows_installed(self, tmp_path, monkeypatch, capsys): + state = InstalledState(packages={ + "fd": InstalledPackage(name="fd", version="10.2", type="pkg"), + }) + state_path = tmp_path / "installed.json" + import json + state_path.parent.mkdir(parents=True, exist_ok=True) + with open(state_path, "w") as f: + json.dump(state.as_dict(), f) + + monkeypatch.setattr(paths, "INSTALLED_STATE", state_path) + ctx = _make_ctx(tmp_path) + svc = PackageService(ctx) + svc.list_packages() + output = capsys.readouterr().out + assert "fd" in output + assert "10.2" in output + + def test_remove_not_installed(self, tmp_path, monkeypatch, capsys): + monkeypatch.setattr(paths, "INSTALLED_STATE", tmp_path / "installed.json") + ctx = _make_ctx(tmp_path) + svc = PackageService(ctx) + svc.remove(["missing"]) + assert "No matching" in capsys.readouterr().out + + def test_install_requires_args(self, tmp_path, monkeypatch): + monkeypatch.setattr(paths, "INSTALLED_STATE", tmp_path / "installed.json") + ctx = _make_ctx(tmp_path) + svc = PackageService(ctx) + with pytest.raises(FlowError, match="Specify"): + svc.install() diff --git a/tests/test_service_projects.py b/tests/test_service_projects.py new file mode 100644 index 0000000..b543932 --- /dev/null +++ b/tests/test_service_projects.py @@ -0,0 +1,78 @@ +"""Tests for ProjectService.""" + +import subprocess +from pathlib import Path + +from flow.core.config import AppConfig, FlowContext +from flow.core.console import Console +from flow.core.platform import PlatformInfo +from flow.core.runtime import SystemRuntime +from flow.services.projects import ProjectService + + +def _make_ctx(projects_dir): + return FlowContext( + config=AppConfig(projects_dir=str(projects_dir)), + manifest={}, + platform=PlatformInfo(), + console=Console(color=False), + runtime=SystemRuntime(), + ) + + +def _init_repo(path, commit=True): + """Create a git repo with an initial commit.""" + path.mkdir(parents=True, exist_ok=True) + subprocess.run(["git", "init", str(path)], capture_output=True, check=True) + subprocess.run(["git", "-C", str(path), "config", "user.email", "test@test.com"], capture_output=True, check=True) + subprocess.run(["git", "-C", str(path), "config", "user.name", "Test"], capture_output=True, check=True) + if commit: + (path / "README.md").write_text("# test") + subprocess.run(["git", "-C", str(path), "add", "."], capture_output=True, check=True) + subprocess.run(["git", "-C", str(path), "commit", "-m", "init"], capture_output=True, check=True) + + +class TestProjectService: + def test_check_clean_repo(self, tmp_path, capsys): + projects = tmp_path / "projects" + projects.mkdir() + _init_repo(projects / "myrepo") + + ctx = _make_ctx(projects) + svc = ProjectService(ctx) + svc.check(fetch=False) + + output = capsys.readouterr().out + assert "myrepo" in output + assert "clean" in output + + def test_check_uncommitted_changes(self, tmp_path, capsys): + projects = tmp_path / "projects" + projects.mkdir() + _init_repo(projects / "myrepo") + (projects / "myrepo" / "new_file.txt").write_text("changes") + + ctx = _make_ctx(projects) + svc = ProjectService(ctx) + svc.check(fetch=False) + + output = capsys.readouterr().out + assert "uncommitted" in output + + def test_check_no_git_repos(self, tmp_path, capsys): + projects = tmp_path / "projects" + projects.mkdir() + (projects / "not-a-repo").mkdir() + + ctx = _make_ctx(projects) + svc = ProjectService(ctx) + svc.check(fetch=False) + + output = capsys.readouterr().out + assert "No git" in output + + def test_missing_projects_dir(self, tmp_path, capsys): + ctx = _make_ctx(tmp_path / "nonexistent") + svc = ProjectService(ctx) + svc.check(fetch=False) + assert "not found" in capsys.readouterr().out diff --git a/tests/test_service_remote.py b/tests/test_service_remote.py new file mode 100644 index 0000000..77a7e77 --- /dev/null +++ b/tests/test_service_remote.py @@ -0,0 +1,62 @@ +"""Tests for RemoteService.""" + +import pytest + +from flow.core.config import AppConfig, FlowContext, TargetConfig +from flow.core.console import Console +from flow.core.errors import FlowError +from flow.core.platform import PlatformInfo +from flow.core.runtime import SystemRuntime +from flow.services.remote import RemoteService + + +def _make_ctx(targets=None): + return FlowContext( + config=AppConfig(targets=targets or []), + manifest={}, + platform=PlatformInfo(), + console=Console(color=False), + runtime=SystemRuntime(), + ) + + +class TestRemoteService: + def test_enter_dry_run(self, capsys): + targets = [TargetConfig(namespace="personal", platform="orb", host="personal.orb")] + ctx = _make_ctx(targets) + svc = RemoteService(ctx) + svc.enter("personal@orb", dry_run=True) + output = capsys.readouterr().out + assert "personal@orb" in output + assert "ssh" in output + + def test_enter_unknown_target(self): + ctx = _make_ctx() + svc = RemoteService(ctx) + with pytest.raises(FlowError, match="Unknown target"): + svc.enter("missing@host") + + def test_list_targets(self, capsys): + targets = [ + TargetConfig(namespace="personal", platform="orb", host="personal.orb"), + TargetConfig(namespace="work", platform="ec2", host="work.ec2"), + ] + ctx = _make_ctx(targets) + svc = RemoteService(ctx) + svc.list() + output = capsys.readouterr().out + assert "personal@orb" in output + assert "work@ec2" in output + + def test_list_empty(self, capsys): + ctx = _make_ctx() + svc = RemoteService(ctx) + svc.list() + assert "No targets" in capsys.readouterr().out + + def test_fix_terminfo(self, capsys): + ctx = _make_ctx() + svc = RemoteService(ctx) + svc.fix_terminfo("personal@orb") + output = capsys.readouterr().out + assert "infocmp" in output