# Flow CLI Rewrite Implementation Plan > **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Full rewrite of the flow CLI with correct domain abstractions, plan-then-execute pattern, and working module path resolution. **Architecture:** Four-layer hybrid (core -> domain -> services -> commands). Domain layer is pure functions + frozen dataclasses. Services orchestrate I/O. Commands are trivial dispatchers. See `docs/superpowers/specs/2026-03-16-flow-architecture-redesign.md` for the full spec. **Tech Stack:** Python 3.9+, PyYAML, argparse, pytest, PyInstaller **Spec:** `docs/superpowers/specs/2026-03-16-flow-architecture-redesign.md` **Standards:** `CLAUDE.md` --- ## Phasing Strategy Build bottom-up. Each phase produces tested, working code before the next begins. Old code under `src/flow/` is untouched until the final phase when it's deleted. The new code is written in-place (same `src/flow/` tree) because this is a full rewrite with no backward compatibility. Old modules are deleted as their replacements land. | Phase | What | Depends on | |-------|------|------------| | 1 | Core layer (errors, template, paths, platform, console, runtime, config) | Nothing | | 2 | Dotfiles domain (models, modules, resolution, planning) | Core | | 3 | Packages domain (models, catalog, resolution, planning) | Core | | 4 | Bootstrap domain (models, modules, planning) | Core, packages domain | | 5 | Remote + containers + projects domains | Core | | 6 | All services | Core, all domains | | 7 | Commands + CLI + completion | Core, all services | | 8 | Delete old code, final integration | Everything | --- ## Chunk 1: Core Layer ### Task 1: Errors **Files:** - Create: `src/flow/core/errors.py` - Create: `tests/test_errors.py` - [ ] **Step 1: Write the error hierarchy** ```python # src/flow/core/errors.py """Project-wide error types.""" class FlowError(Exception): """Base for all user-facing errors.""" class ConfigError(FlowError): """Invalid config or manifest YAML.""" class PlanConflict(FlowError): """Conflicts detected during planning.""" def __init__(self, message: str, conflicts: list[str]): super().__init__(message) self.conflicts = conflicts class ExecutionError(FlowError): """A plan step failed during execution.""" ``` - [ ] **Step 2: Write tests** ```python # tests/test_errors.py """Tests for flow.core.errors.""" from flow.core.errors import ConfigError, ExecutionError, FlowError, PlanConflict def test_flow_error_is_exception(): assert issubclass(FlowError, Exception) def test_config_error_is_flow_error(): assert issubclass(ConfigError, FlowError) def test_plan_conflict_carries_conflicts(): err = PlanConflict("2 conflicts", ["a exists", "b exists"]) assert str(err) == "2 conflicts" assert err.conflicts == ["a exists", "b exists"] def test_execution_error_is_flow_error(): assert issubclass(ExecutionError, FlowError) ``` - [ ] **Step 3: Run tests** Run: `python -m pytest tests/test_errors.py -v` Expected: 4 passed - [ ] **Step 4: Commit** ```bash git add src/flow/core/errors.py tests/test_errors.py git commit -m "feat: add FlowError hierarchy" ``` --- ### Task 2: Template (pure string substitution) **Files:** - Create: `src/flow/core/template.py` - Create: `tests/test_template.py` - [ ] **Step 1: Write tests** ```python # tests/test_template.py """Tests for flow.core.template.""" import os from flow.core.template import substitute, substitute_template class TestSubstitute: def test_replaces_dollar_var(self): assert substitute("hello $NAME", {"NAME": "world"}) == "hello world" def test_replaces_braced_var(self): assert substitute("hello ${NAME}", {"NAME": "world"}) == "hello world" def test_falls_back_to_env(self, monkeypatch): monkeypatch.setenv("FOO", "bar") assert substitute("$FOO", {}) == "bar" def test_preserves_unknown_vars(self): assert substitute("$UNKNOWN", {}) == "$UNKNOWN" def test_non_string_passthrough(self): assert substitute(42, {}) == 42 class TestSubstituteTemplate: def test_replaces_double_braces(self): assert substitute_template("nvim-{{os}}", {"os": "linux"}) == "nvim-linux" def test_env_dot_notation(self, monkeypatch): monkeypatch.setenv("USER", "tomas") result = substitute_template("{{ env.USER }}", {"env": dict(os.environ)}) assert result == "tomas" def test_nested_dict_lookup(self): ctx = {"platform": {"arch": "arm64"}} assert substitute_template("{{ platform.arch }}", ctx) == "arm64" def test_preserves_unknown_templates(self): assert substitute_template("{{ unknown }}", {}) == "{{ unknown }}" def test_non_string_passthrough(self): assert substitute_template(42, {}) == 42 def test_whitespace_in_braces(self): assert substitute_template("{{ os }}", {"os": "linux"}) == "linux" ``` - [ ] **Step 2: Run tests to verify they fail** Run: `python -m pytest tests/test_template.py -v` Expected: FAIL (module not found) - [ ] **Step 3: Implement template.py** ```python # src/flow/core/template.py """Variable and template substitution -- pure functions, no I/O.""" import os import re from typing import Any, Dict def substitute(text: Any, variables: Dict[str, str]) -> Any: """Replace $VAR and ${VAR} with values from variables dict or env.""" if not isinstance(text, str): return text pattern = re.compile(r"\$(\w+)|\$\{([^}]+)\}") def _replace(match: re.Match[str]) -> str: key = match.group(1) or match.group(2) or "" if key in variables: return str(variables[key]) if key in os.environ: return os.environ[key] return match.group(0) return pattern.sub(_replace, text) def _resolve_template_value(expr: str, context: Dict[str, Any]) -> Any: if expr.startswith("env."): env_key = expr.split(".", 1)[1] env_ctx = context.get("env", {}) if isinstance(env_ctx, dict) and env_key in env_ctx: return env_ctx[env_key] return os.environ.get(env_key) if expr in context: return context[expr] current: Any = context for part in expr.split("."): if not isinstance(current, dict) or part not in current: return None current = current[part] return current def substitute_template(text: Any, context: Dict[str, Any]) -> Any: """Replace {{expr}} placeholders with values from context dict.""" if not isinstance(text, str): return text def _replace(match: re.Match[str]) -> str: key = match.group(1).strip() value = _resolve_template_value(key, context) if value is None: return match.group(0) return str(value) return re.sub(r"\{\{\s*([^{}]+?)\s*\}\}", _replace, text) ``` - [ ] **Step 4: Run tests** Run: `python -m pytest tests/test_template.py -v` Expected: All passed - [ ] **Step 5: Commit** ```bash git add src/flow/core/template.py tests/test_template.py git commit -m "feat: add template substitution (pure functions)" ``` --- ### Task 3: Paths (XDG constants) **Files:** - Create: `src/flow/core/paths.py` - Create: `tests/test_core_paths.py` - [ ] **Step 1: Write paths.py** ```python # src/flow/core/paths.py """XDG-compliant path constants for flow.""" import os from pathlib import Path def _xdg(env_var: str, fallback: str) -> Path: return Path(os.environ.get(env_var, fallback)) HOME = Path.home() CONFIG_DIR = _xdg("XDG_CONFIG_HOME", str(HOME / ".config")) / "flow" DATA_DIR = _xdg("XDG_DATA_HOME", str(HOME / ".local" / "share")) / "flow" STATE_DIR = _xdg("XDG_STATE_HOME", str(HOME / ".local" / "state")) / "flow" DOTFILES_DIR = DATA_DIR / "dotfiles" MODULES_DIR = DATA_DIR / "modules" PACKAGES_DIR = DATA_DIR / "packages" LINKED_STATE = STATE_DIR / "linked.json" INSTALLED_STATE = STATE_DIR / "installed.json" # Self-hosted flow config path (from dotfiles repo) DOTFILES_FLOW_CONFIG = DOTFILES_DIR / "_shared" / "flow" / ".config" / "flow" def ensure_dirs() -> None: """Create all required directories.""" for d in (CONFIG_DIR, DATA_DIR, STATE_DIR, MODULES_DIR, PACKAGES_DIR): d.mkdir(parents=True, exist_ok=True) ``` - [ ] **Step 2: Write tests** ```python # tests/test_core_paths.py """Tests for flow.core.paths.""" from pathlib import Path from flow.core import paths def test_config_dir_ends_with_flow(): assert paths.CONFIG_DIR.name == "flow" def test_data_dir_ends_with_flow(): assert paths.DATA_DIR.name == "flow" def test_modules_dir_under_data(): assert paths.MODULES_DIR.parent == paths.DATA_DIR def test_linked_state_under_state(): assert paths.LINKED_STATE.parent == paths.STATE_DIR def test_dotfiles_flow_config_path(): expected_suffix = Path("_shared") / "flow" / ".config" / "flow" assert str(paths.DOTFILES_FLOW_CONFIG).endswith(str(expected_suffix)) def test_ensure_dirs_creates_directories(tmp_path, monkeypatch): monkeypatch.setattr(paths, "CONFIG_DIR", tmp_path / "config" / "flow") monkeypatch.setattr(paths, "DATA_DIR", tmp_path / "data" / "flow") monkeypatch.setattr(paths, "STATE_DIR", tmp_path / "state" / "flow") monkeypatch.setattr(paths, "MODULES_DIR", tmp_path / "data" / "flow" / "modules") monkeypatch.setattr(paths, "PACKAGES_DIR", tmp_path / "data" / "flow" / "packages") paths.ensure_dirs() assert (tmp_path / "config" / "flow").is_dir() assert (tmp_path / "data" / "flow" / "modules").is_dir() assert (tmp_path / "state" / "flow").is_dir() ``` - [ ] **Step 3: Run tests** Run: `python -m pytest tests/test_core_paths.py -v` Expected: All passed - [ ] **Step 4: Commit** ```bash git add src/flow/core/paths.py tests/test_core_paths.py git commit -m "feat: add XDG path constants" ``` --- ### Task 4: Platform detection **Files:** - Create: `src/flow/core/platform.py` - Create: `tests/test_core_platform.py` - [ ] **Step 1: Write tests** ```python # tests/test_core_platform.py """Tests for flow.core.platform.""" import os import pytest from flow.core.platform import PlatformInfo, detect_context, detect_platform def test_platform_info_computes_platform_string(): p = PlatformInfo(os="linux", arch="x64") assert p.platform == "linux-x64" def test_detect_platform_returns_valid_info(): info = detect_platform() assert info.os in ("linux", "macos") assert info.arch in ("x64", "arm64") assert info.platform == f"{info.os}-{info.arch}" def test_detect_platform_raises_flow_error_on_unsupported(monkeypatch): from flow.core.errors import FlowError monkeypatch.setattr("platform.system", lambda: "FreeBSD") with pytest.raises(FlowError, match="Unsupported operating system"): detect_platform() def test_detect_context_host(monkeypatch): monkeypatch.delenv("DF_NAMESPACE", raising=False) monkeypatch.delenv("DF_PLATFORM", raising=False) assert detect_context() == "host" def test_detect_context_vm(monkeypatch): monkeypatch.setenv("DF_NAMESPACE", "personal") monkeypatch.setenv("DF_PLATFORM", "orb") assert detect_context() == "vm" ``` - [ ] **Step 2: Implement platform.py** ```python # src/flow/core/platform.py """OS/arch detection and execution context.""" import os import platform as _platform from dataclasses import dataclass from flow.core.errors import FlowError @dataclass(frozen=True) class PlatformInfo: os: str = "linux" arch: str = "x64" @property def platform(self) -> str: return f"{self.os}-{self.arch}" _OS_MAP = {"Darwin": "macos", "Linux": "linux"} _ARCH_MAP = {"x86_64": "x64", "amd64": "x64", "aarch64": "arm64", "arm64": "arm64"} def detect_platform() -> PlatformInfo: raw_os = _platform.system() os_name = _OS_MAP.get(raw_os) if os_name is None: raise FlowError(f"Unsupported operating system: {raw_os}") raw_arch = _platform.machine().lower() arch = _ARCH_MAP.get(raw_arch) if arch is None: raise FlowError(f"Unsupported architecture: {raw_arch}") return PlatformInfo(os=os_name, arch=arch) def detect_context() -> str: """Detect execution context: 'host', 'vm', or 'container'.""" if os.path.exists("/.dockerenv") or os.path.exists("/run/.containerenv"): return "container" if os.environ.get("DF_NAMESPACE") and os.environ.get("DF_PLATFORM"): return "vm" return "host" ``` - [ ] **Step 3: Run tests** Run: `python -m pytest tests/test_core_platform.py -v` Expected: All passed - [ ] **Step 4: Commit** ```bash git add src/flow/core/platform.py tests/test_core_platform.py git commit -m "feat: add platform detection and context awareness" ``` --- ### Task 5: Console output **Files:** - Create: `src/flow/core/console.py` - Create: `tests/test_core_console.py` - [ ] **Step 1: Write tests** ```python # tests/test_core_console.py """Tests for flow.core.console.""" from flow.core.console import Console def test_info_prints_message(capsys): c = Console(color=False) c.info("hello") assert "hello" in capsys.readouterr().out def test_quiet_suppresses_info(capsys): c = Console(quiet=True, color=False) c.info("hidden") assert capsys.readouterr().out == "" def test_quiet_does_not_suppress_error(capsys): c = Console(quiet=True, color=False) c.error("visible") captured = capsys.readouterr() assert "visible" in captured.err or "visible" in captured.out def test_table_prints_headers_and_rows(capsys): c = Console(color=False) c.table(["NAME", "STATUS"], [["foo", "ok"], ["bar", "fail"]]) output = capsys.readouterr().out assert "NAME" in output assert "foo" in output assert "bar" in output def test_no_color_strips_ansi(capsys): c = Console(color=False) c.info("test") output = capsys.readouterr().out assert "\033[" not in output ``` - [ ] **Step 2: Implement console.py** ```python # src/flow/core/console.py """Console output formatting with TTY detection and color control.""" import os import sys from typing import Any, Optional class Console: def __init__(self, *, quiet: bool = False, color: Optional[bool] = None): self.quiet = quiet if color is None: self._color = os.isatty(sys.stdout.fileno()) if hasattr(sys.stdout, "fileno") else False else: self._color = color def _style(self, code: str, text: str) -> str: if not self._color: return text return f"{code}{text}\033[0m" def info(self, msg: str) -> None: if self.quiet: return tag = self._style("\033[36m", "[INFO]") print(f"{tag} {msg}") def warn(self, msg: str) -> None: tag = self._style("\033[33m", "[WARN]") print(f"{tag} {msg}") def error(self, msg: str) -> None: tag = self._style("\033[31m", "[ERROR]") print(f"{tag} {msg}", file=sys.stderr) def success(self, msg: str) -> None: tag = self._style("\033[32m", "[OK]") print(f"{tag} {msg}") def table(self, headers: list[str], rows: list[list[str]]) -> None: if not rows: return widths = [len(h) for h in headers] for row in rows: for i, cell in enumerate(row): if i < len(widths): widths[i] = max(widths[i], len(str(cell))) header_line = " ".join(f"{h:<{widths[i]}}" for i, h in enumerate(headers)) if self._color: print(f"\033[1m{header_line}\033[0m") else: print(header_line) print(" ".join("-" * w for w in widths)) for row in rows: print(" ".join(f"{str(cell):<{widths[i]}}" for i, cell in enumerate(row))) def print_plan(self, operations: list[Any], *, verb: str = "execute") -> None: if not operations: self.info(f"Nothing to {verb}.") return self.info(f"Plan ({len(operations)} operation(s)):") for op in operations: print(f" {op}") ``` - [ ] **Step 3: Run tests** Run: `python -m pytest tests/test_core_console.py -v` Expected: All passed - [ ] **Step 4: Commit** ```bash git add src/flow/core/console.py tests/test_core_console.py git commit -m "feat: add Console with color/quiet/TTY support" ``` --- ### Task 6: Runtime primitives (CommandRunner, FileSystem, GitClient, SystemRuntime) **Files:** - Create: `src/flow/core/runtime.py` - Create: `tests/test_core_runtime.py` - [ ] **Step 1: Write tests for FileSystem** ```python # tests/test_core_runtime.py """Tests for flow.core.runtime.""" from pathlib import Path from flow.core.runtime import CommandRunner, FileSystem, GitClient, SystemRuntime class TestFileSystem: def test_ensure_dir_creates_nested(self, tmp_path): fs = FileSystem() target = tmp_path / "a" / "b" / "c" fs.ensure_dir(target) assert target.is_dir() def test_write_and_read_text(self, tmp_path): fs = FileSystem() path = tmp_path / "test.txt" fs.write_text(path, "hello") assert fs.read_text(path) == "hello" def test_read_text_default(self, tmp_path): fs = FileSystem() path = tmp_path / "missing.txt" assert fs.read_text(path, default="fallback") == "fallback" def test_write_and_read_json(self, tmp_path): fs = FileSystem() path = tmp_path / "data.json" fs.write_json(path, {"key": "value"}) assert fs.read_json(path) == {"key": "value"} def test_create_symlink(self, tmp_path): fs = FileSystem() source = tmp_path / "source" source.write_text("content") target = tmp_path / "link" fs.create_symlink(source, target) assert target.is_symlink() assert target.resolve() == source.resolve() def test_same_symlink_true(self, tmp_path): fs = FileSystem() source = tmp_path / "source" source.write_text("content") target = tmp_path / "link" target.symlink_to(source) assert fs.same_symlink(target, source) is True def test_same_symlink_false(self, tmp_path): fs = FileSystem() source = tmp_path / "source" source.write_text("content") other = tmp_path / "other" other.write_text("other") target = tmp_path / "link" target.symlink_to(other) assert fs.same_symlink(target, source) is False def test_remove_file(self, tmp_path): fs = FileSystem() path = tmp_path / "file" path.write_text("x") fs.remove_file(path) assert not path.exists() def test_remove_file_missing_ok(self, tmp_path): fs = FileSystem() fs.remove_file(tmp_path / "missing", missing_ok=True) # no error def test_copy_file(self, tmp_path): fs = FileSystem() src = tmp_path / "src" src.write_text("data") dst = tmp_path / "sub" / "dst" fs.copy_file(src, dst) assert dst.read_text() == "data" class TestCommandRunner: def test_run_echo(self): runner = CommandRunner() result = runner.run(["echo", "hello"], capture_output=True) assert result.stdout.strip() == "hello" def test_require_binary_finds_echo(self): runner = CommandRunner() path = runner.require_binary("echo") assert path is not None class TestSystemRuntime: def test_creates_git_client(self): rt = SystemRuntime() assert isinstance(rt.git, GitClient) assert rt.git.runner is rt.runner ``` - [ ] **Step 2: Implement runtime.py** Implement `CommandRunner`, `FileSystem`, `GitClient`, and `SystemRuntime` per the spec in Section 3.3. Port the working implementations from the current `src/flow/core/system.py` but: - Remove the duplicate `write_bytes` method - Use `FlowError` instead of `RuntimeError` for error wrapping - Keep the same API surface ```python # src/flow/core/runtime.py """Runtime primitives for process, git, state, and filesystem access.""" from __future__ import annotations import json import os import shlex import shutil import subprocess from dataclasses import dataclass, field from pathlib import Path from typing import Any, Iterable, Mapping, Optional, Sequence from flow.core.console import Console from flow.core.errors import FlowError class CommandRunner: """Subprocess wrapper with consistent defaults.""" def run( self, argv: Sequence[str] | Iterable[str], *, cwd: Optional[Path] = None, env: Optional[Mapping[str, str]] = None, capture_output: bool = True, check: bool = False, timeout: Optional[float] = None, ) -> subprocess.CompletedProcess[str]: parts = [str(a) for a in argv] completed = subprocess.run( parts, cwd=str(cwd) if cwd else None, env=dict(env) if env else None, capture_output=capture_output, text=True, check=False, timeout=timeout, ) if check and completed.returncode != 0: msg = completed.stderr.strip() or completed.stdout.strip() if not msg: msg = f"Command failed with exit code {completed.returncode}" raise FlowError(msg) return completed def run_shell( self, command: str, *, cwd: Optional[Path] = None, env: Optional[Mapping[str, str]] = None, capture_output: bool = True, check: bool = False, timeout: Optional[float] = None, ) -> subprocess.CompletedProcess[str]: completed = subprocess.run( command, shell=True, cwd=str(cwd) if cwd else None, env=dict(env) if env else None, capture_output=capture_output, text=True, check=False, timeout=timeout, ) if check and completed.returncode != 0: msg = completed.stderr.strip() or completed.stdout.strip() if not msg: msg = f"Command failed with exit code {completed.returncode}" raise FlowError(msg) return completed def stream_shell( self, command: str, console: Console, *, check: bool = True, ) -> subprocess.CompletedProcess[str]: process = subprocess.Popen( command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, ) lines: list[str] = [] assert process.stdout is not None try: for line in process.stdout: stripped = line.rstrip() if stripped: lines.append(stripped) finally: process.stdout.close() process.wait() if check and process.returncode != 0: raise FlowError(f"Command failed (exit {process.returncode}): {command}") return subprocess.CompletedProcess(command, process.returncode, stdout="\n".join(lines), stderr="") def require_binary(self, name: str) -> str: path = shutil.which(name) if path is None: raise FlowError(f"Required executable not found: {name}") return path class FileSystem: """Filesystem wrapper for all mutating operations.""" def ensure_dir(self, path: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None, mode: Optional[int] = None) -> None: if sudo: if runner is None: raise FlowError("Runner required for sudo operations") runner.require_binary("sudo") argv: list[str] = ["sudo", "mkdir", "-p"] if mode is not None: argv.extend(["-m", f"{mode:o}"]) argv.append(str(path)) runner.run(argv, check=True) return path.mkdir(parents=True, exist_ok=True) if mode is not None: path.chmod(mode) def remove_file(self, path: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None, missing_ok: bool = True) -> None: if sudo: if runner is None: raise FlowError("Runner required for sudo operations") argv = ["sudo", "rm"] if missing_ok: argv.append("-f") argv.append(str(path)) runner.run(argv, check=True) return try: path.unlink() except FileNotFoundError: if not missing_ok: raise def remove_tree(self, path: Path) -> None: shutil.rmtree(path, ignore_errors=True) def copy_file(self, source: Path, target: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None) -> None: if sudo: if runner is None: raise FlowError("Runner required for sudo operations") self.ensure_dir(target.parent, sudo=True, runner=runner) runner.run(["sudo", "cp", "-a", str(source), str(target)], check=True) return self.ensure_dir(target.parent) shutil.copy2(source, target) def copy_tree(self, source: Path, target: Path) -> None: self.ensure_dir(target.parent) shutil.copytree(source, target, dirs_exist_ok=True) def create_symlink(self, source: Path, target: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None) -> None: if sudo: if runner is None: raise FlowError("Runner required for sudo operations") self.ensure_dir(target.parent, sudo=True, runner=runner) runner.run(["sudo", "ln", "-sfn", str(source), str(target)], check=True) return self.ensure_dir(target.parent) target.symlink_to(source) def same_symlink(self, target: Path, source: Path) -> bool: if not target.is_symlink(): return False return target.resolve(strict=False) == source.resolve(strict=False) def read_text(self, path: Path, *, default: Optional[str] = None) -> str: try: return path.read_text(encoding="utf-8") except FileNotFoundError: if default is None: raise return default def write_text(self, path: Path, content: str) -> None: self.ensure_dir(path.parent) path.write_text(content, encoding="utf-8") def write_bytes(self, path: Path, content: bytes) -> None: self.ensure_dir(path.parent) path.write_bytes(content) def read_json(self, path: Path, *, default: Any = None) -> Any: try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except FileNotFoundError: return default def write_json(self, path: Path, data: Any) -> None: self.ensure_dir(path.parent) with open(path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2) class GitClient: """Git adapter scoped to a repository root.""" def __init__(self, runner: CommandRunner): self.runner = runner def run(self, repo_dir: Path, *args: str, capture_output: bool = True, check: bool = False) -> subprocess.CompletedProcess[str]: return self.runner.run( ["git", "-C", str(repo_dir), *args], capture_output=capture_output, check=check, ) @dataclass class SystemRuntime: """Shared runtime dependencies.""" runner: CommandRunner = field(default_factory=CommandRunner) fs: FileSystem = field(default_factory=FileSystem) git: GitClient = field(init=False) def __post_init__(self) -> None: self.git = GitClient(self.runner) ``` - [ ] **Step 3: Run tests** Run: `python -m pytest tests/test_core_runtime.py -v` Expected: All passed - [ ] **Step 4: Commit** ```bash git add src/flow/core/runtime.py tests/test_core_runtime.py git commit -m "feat: add runtime primitives (CommandRunner, FileSystem, GitClient)" ``` --- ### Task 7: Config loading and FlowContext **Files:** - Create: `src/flow/core/config.py` - Create: `tests/test_core_config.py` - [ ] **Step 1: Write tests** ```python # tests/test_core_config.py """Tests for flow.core.config.""" from pathlib import Path import pytest from flow.core.config import AppConfig, load_config, load_manifest def test_load_config_missing_path(tmp_path): cfg = load_config(tmp_path / "nonexistent") assert isinstance(cfg, AppConfig) assert cfg.dotfiles_url == "" assert cfg.container_registry == "registry.tomastm.com" def test_load_config_from_yaml(tmp_path): (tmp_path / "config.yaml").write_text( "repository:\n" " url: git@github.com:user/dots.git\n" " branch: dev\n" "paths:\n" " projects: ~/code\n" "defaults:\n" " container-registry: my.registry.com\n" " tmux-session: main\n" ) cfg = load_config(tmp_path) assert cfg.dotfiles_url == "git@github.com:user/dots.git" assert cfg.dotfiles_branch == "dev" assert cfg.projects_dir == "~/code" assert cfg.container_registry == "my.registry.com" assert cfg.tmux_session == "main" def test_load_config_parses_targets_shorthand(tmp_path): (tmp_path / "config.yaml").write_text( "targets:\n" " personal@orb: personal.orb\n" ) cfg = load_config(tmp_path) assert len(cfg.targets) == 1 assert cfg.targets[0].namespace == "personal" assert cfg.targets[0].platform == "orb" assert cfg.targets[0].host == "personal.orb" def test_load_config_parses_targets_dict(tmp_path): (tmp_path / "config.yaml").write_text( "targets:\n" " work@ec2:\n" " host: work.ec2.internal\n" " identity: ~/.ssh/id_work\n" ) cfg = load_config(tmp_path) assert len(cfg.targets) == 1 assert cfg.targets[0].host == "work.ec2.internal" assert cfg.targets[0].identity == "~/.ssh/id_work" def test_load_manifest_returns_dict(tmp_path): (tmp_path / "manifest.yaml").write_text( "packages:\n" " - name: fd\n" " type: pkg\n" ) data = load_manifest(tmp_path) assert isinstance(data, dict) assert "packages" in data def test_load_manifest_merges_files(tmp_path): (tmp_path / "01-packages.yaml").write_text("packages:\n - name: fd\n type: pkg\n") (tmp_path / "02-profiles.yaml").write_text("profiles:\n work:\n os: linux\n") data = load_manifest(tmp_path) assert "packages" in data assert "profiles" in data ``` - [ ] **Step 2: Implement config.py** Port config loading from current `src/flow/core/config.py` with these changes: - Use `TargetConfig` dataclass (new) matching spec Section 7.1 - Use `FlowContext` dataclass matching spec Section 3.4 - Support both shorthand string and dict target formats - Remove all `_get_value` multi-key fallback logic (one canonical form per CLAUDE.md) The config parser normalizes target entries: - `"personal@orb: personal.orb"` -> `TargetConfig(namespace="personal", platform="orb", host="personal.orb")` - Dict form: `TargetConfig(namespace=..., platform=..., host=..., identity=...)` Key types: ```python @dataclass(frozen=True) class TargetConfig: namespace: str platform: str host: str identity: str | None = None @dataclass class AppConfig: dotfiles_url: str = "" dotfiles_branch: str = "main" projects_dir: str = "~/projects" container_registry: str = "registry.tomastm.com" container_tag: str = "latest" tmux_session: str = "default" targets: list[TargetConfig] = field(default_factory=list) @dataclass class FlowContext: config: AppConfig manifest: dict[str, Any] platform: PlatformInfo console: Console runtime: SystemRuntime = field(default_factory=SystemRuntime) ``` - [ ] **Step 3: Run tests** Run: `python -m pytest tests/test_core_config.py -v` Expected: All passed - [ ] **Step 4: Commit** ```bash git add src/flow/core/config.py tests/test_core_config.py git commit -m "feat: add config loading and FlowContext" ``` --- ## Chunk 2: Dotfiles Domain ### Task 8: Dotfiles models **Files:** - Create: `src/flow/domain/__init__.py` (empty) - Create: `src/flow/domain/dotfiles/__init__.py` (empty) - Create: `src/flow/domain/dotfiles/models.py` - Create: `tests/test_domain_dotfiles_models.py` - [ ] **Step 1: Write models** ```python # src/flow/domain/dotfiles/models.py """Dotfiles domain models -- all frozen dataclasses.""" from dataclasses import dataclass, field from pathlib import Path from typing import Optional @dataclass(frozen=True) class ModuleRef: """An external git repo providing content for a package subtree.""" source: str ref_type: str # "branch" | "tag" | "commit" ref_value: str mount_path: Path # Relative path within package to _module.yaml parent cache_dir: Path # Where the repo is cloned module_files: tuple[tuple[Path, Path], ...] # (abs_source, rel_to_cache_root) @dataclass(frozen=True) class Package: """A dotfiles package: a named set of files mapping to home-relative targets.""" name: str # e.g. "zsh", "nvim" layer: str # "_shared" or profile name package_id: str # "layer/name" source_dir: Path # Absolute path in dotfiles repo module: Optional[ModuleRef] local_files: tuple[tuple[Path, Path], ...] # (abs_source, rel_to_package_root) @dataclass(frozen=True) class LinkTarget: """A single file that should be linked into the filesystem.""" source: Path target: Path package: str # package_id from_module: bool needs_sudo: bool @dataclass(frozen=True) class LinkOp: """A single operation in a link plan.""" type: str # "create_link" | "remove_link" | "create_dir" target: Path source: Optional[Path] package: str needs_sudo: bool def __str__(self) -> str: if self.type == "create_link": sudo = " (sudo)" if self.needs_sudo else "" return f"LINK: {self.target} -> {self.source}{sudo}" if self.type == "remove_link": return f"REMOVE: {self.target}" if self.type == "create_dir": return f"MKDIR: {self.target}" return f"{self.type}: {self.target}" @dataclass(frozen=True) class PlanSummary: added: int removed: int unchanged: int from_modules: int @dataclass(frozen=True) class LinkPlan: """Complete reconciliation plan.""" operations: list[LinkOp] conflicts: list[str] summary: PlanSummary @dataclass class LinkedState: """Persisted link state.""" links: dict[Path, LinkTarget] = field(default_factory=dict) def as_dict(self) -> dict: grouped: dict[str, dict[str, dict]] = {} for target, lt in sorted(self.links.items(), key=lambda x: str(x[0])): pkg_links = grouped.setdefault(lt.package, {}) pkg_links[str(target)] = { "source": str(lt.source), "from_module": lt.from_module, "needs_sudo": lt.needs_sudo, } return {"version": 2, "links": grouped} @classmethod def from_dict(cls, data: dict) -> "LinkedState": version = data.get("version") if version is not None and version != 2: from flow.core.errors import ConfigError raise ConfigError( f"Unsupported linked.json version {version}. " "Delete ~/.local/state/flow/linked.json and relink." ) links: dict[Path, LinkTarget] = {} raw_links = data.get("links", {}) for package, pkg_links in raw_links.items(): for target_str, info in pkg_links.items(): links[Path(target_str)] = LinkTarget( source=Path(info["source"]), target=Path(target_str), package=str(package), from_module=bool(info.get("from_module", False)), needs_sudo=bool(info.get("needs_sudo", False)), ) return cls(links=links) @dataclass(frozen=True) class RepoInfo: """A managed git repo (dotfiles or module).""" name: str path: Path source: str is_module: bool ``` - [ ] **Step 2: Write tests** ```python # tests/test_domain_dotfiles_models.py """Tests for dotfiles domain models.""" from pathlib import Path from flow.domain.dotfiles.models import ( LinkOp, LinkPlan, LinkTarget, LinkedState, ModuleRef, Package, PlanSummary, ) def test_link_op_str_create(): op = LinkOp(type="create_link", target=Path("/home/x/.zshrc"), source=Path("/dots/zsh/.zshrc"), package="_shared/zsh", needs_sudo=False) assert "LINK:" in str(op) assert ".zshrc" in str(op) def test_link_op_str_sudo(): op = LinkOp(type="create_link", target=Path("/etc/hosts"), source=Path("/dots/dns/hosts"), package="_shared/dns", needs_sudo=True) assert "(sudo)" in str(op) def test_linked_state_roundtrip(): lt = LinkTarget(source=Path("/a"), target=Path("/b"), package="p", from_module=False, needs_sudo=False) state = LinkedState(links={Path("/b"): lt}) data = state.as_dict() restored = LinkedState.from_dict(data) assert Path("/b") in restored.links assert restored.links[Path("/b")].source == Path("/a") assert restored.links[Path("/b")].package == "p" def test_linked_state_empty(): state = LinkedState.from_dict({}) assert state.links == {} def test_package_has_id(): pkg = Package(name="zsh", layer="_shared", package_id="_shared/zsh", source_dir=Path("/dots/_shared/zsh"), module=None, local_files=()) assert pkg.package_id == "_shared/zsh" ``` - [ ] **Step 3: Run tests** Run: `python -m pytest tests/test_domain_dotfiles_models.py -v` Expected: All passed - [ ] **Step 4: Commit** ```bash git add src/flow/domain/__init__.py src/flow/domain/dotfiles/__init__.py \ src/flow/domain/dotfiles/models.py tests/test_domain_dotfiles_models.py git commit -m "feat: add dotfiles domain models" ``` --- ### Task 9: Dotfiles modules (mount path, cache dir, source normalization) **Files:** - Create: `src/flow/domain/dotfiles/modules.py` - Create: `tests/test_domain_dotfiles_modules.py` - [ ] **Step 1: Write tests** ```python # tests/test_domain_dotfiles_modules.py """Tests for dotfiles module resolution -- the core bug fix.""" from pathlib import Path import pytest from flow.core.errors import ConfigError from flow.domain.dotfiles.modules import ( compute_mount_path, module_cache_dir, normalize_source, parse_module_ref, ) class TestComputeMountPath: def test_nested_module(self): """_shared/nvim/.config/nvim/_module.yaml -> .config/nvim""" result = compute_mount_path( module_yaml=Path("/dots/_shared/nvim/.config/nvim/_module.yaml"), package_dir=Path("/dots/_shared/nvim"), ) assert result == Path(".config/nvim") def test_root_level_module(self): """_shared/nvim/_module.yaml -> Path('.')""" result = compute_mount_path( module_yaml=Path("/dots/_shared/nvim/_module.yaml"), package_dir=Path("/dots/_shared/nvim"), ) assert result == Path(".") def test_deeply_nested(self): result = compute_mount_path( module_yaml=Path("/dots/_shared/pkg/.config/a/b/c/_module.yaml"), package_dir=Path("/dots/_shared/pkg"), ) assert result == Path(".config/a/b/c") class TestModuleCacheDir: def test_simple_name(self): result = module_cache_dir("_shared/nvim", Path("/home/x/.local/share/flow/modules")) assert result == Path("/home/x/.local/share/flow/modules/_shared--nvim") def test_profile_name(self): result = module_cache_dir("linux-work/nvim", Path("/m")) assert result == Path("/m/linux-work--nvim") class TestNormalizeSource: def test_github_shorthand(self): assert normalize_source("github:org/repo") == "https://github.com/org/repo.git" def test_full_url_passthrough(self): assert normalize_source("https://example.com/repo.git") == "https://example.com/repo.git" def test_ssh_passthrough(self): assert normalize_source("git@github.com:org/repo.git") == "git@github.com:org/repo.git" class TestParseModuleRef: def test_branch_ref(self): raw = {"source": "github:org/nvim-config", "ref": {"branch": "main"}} ref = parse_module_ref( raw, package_id="_shared/nvim", mount_path=Path(".config/nvim"), modules_base=Path("/modules"), ) assert ref.source == "https://github.com/org/nvim-config.git" assert ref.ref_type == "branch" assert ref.ref_value == "main" assert ref.mount_path == Path(".config/nvim") assert ref.cache_dir == Path("/modules/_shared--nvim") def test_tag_ref(self): raw = {"source": "github:org/repo", "ref": {"tag": "v1.0"}} ref = parse_module_ref(raw, "p/x", Path("."), Path("/m")) assert ref.ref_type == "tag" assert ref.ref_value == "v1.0" def test_missing_source_raises(self): with pytest.raises(ConfigError): parse_module_ref({}, "p/x", Path("."), Path("/m")) def test_missing_ref_raises(self): raw = {"source": "github:org/repo"} with pytest.raises(ConfigError): parse_module_ref(raw, "p/x", Path("."), Path("/m")) def test_ref_not_dict_raises(self): raw = {"source": "github:org/repo", "ref": "main"} with pytest.raises(ConfigError): parse_module_ref(raw, "p/x", Path("."), Path("/m")) def test_ambiguous_ref_raises(self): raw = {"source": "github:org/repo", "ref": {"branch": "main", "tag": "v1"}} with pytest.raises(ConfigError): parse_module_ref(raw, "p/x", Path("."), Path("/m")) ``` - [ ] **Step 2: Implement modules.py** ```python # src/flow/domain/dotfiles/modules.py """Module metadata resolution -- pure functions.""" from pathlib import Path from typing import Tuple from flow.core.errors import ConfigError from flow.domain.dotfiles.models import ModuleRef def compute_mount_path(module_yaml: Path, package_dir: Path) -> Path: """Relative path from package root to _module.yaml parent.""" rel = module_yaml.parent.relative_to(package_dir) return rel def module_cache_dir(package_id: str, modules_base: Path) -> Path: """Cache dir for a module clone. '/' -> '--' to avoid collisions.""" return modules_base / package_id.replace("/", "--") def normalize_source(source: str) -> str: """Normalize git source URL. github:org/repo -> https://github.com/org/repo.git""" if source.startswith("github:"): repo = source.split(":", 1)[1] return f"https://github.com/{repo}.git" return source def parse_module_ref( raw: dict, package_id: str, mount_path: Path, modules_base: Path, ) -> ModuleRef: """Build ModuleRef from parsed _module.yaml content.""" source = raw.get("source") if not isinstance(source, str) or not source: raise ConfigError(f"Module for {package_id}: 'source' must be a non-empty string") ref = raw.get("ref") if not isinstance(ref, dict): raise ConfigError(f"Module for {package_id}: 'ref' must be a mapping") choices = [k for k in ("branch", "tag", "commit") if isinstance(ref.get(k), str) and ref[k]] if len(choices) != 1: raise ConfigError(f"Module for {package_id}: 'ref' must have exactly one of: branch, tag, commit") ref_type = choices[0] ref_value = str(ref[ref_type]) return ModuleRef( source=normalize_source(source), ref_type=ref_type, ref_value=ref_value, mount_path=mount_path, cache_dir=module_cache_dir(package_id, modules_base), module_files=(), # Populated by service after cloning ) ``` - [ ] **Step 3: Run tests** Run: `python -m pytest tests/test_domain_dotfiles_modules.py -v` Expected: All passed - [ ] **Step 4: Commit** ```bash git add src/flow/domain/dotfiles/modules.py tests/test_domain_dotfiles_modules.py git commit -m "feat: add dotfiles module resolution (fixes _module.yaml path bug)" ``` --- ### Task 10: Dotfiles resolution (package -> LinkTargets) **Files:** - Create: `src/flow/domain/dotfiles/resolution.py` - Create: `tests/test_domain_dotfiles_resolution.py` - [ ] **Step 1: Write tests** ```python # tests/test_domain_dotfiles_resolution.py """Tests for dotfiles path resolution.""" from pathlib import Path import pytest from flow.core.errors import PlanConflict from flow.domain.dotfiles.models import LinkTarget, ModuleRef, Package from flow.domain.dotfiles.resolution import resolve_all_targets, resolve_package_targets RESERVED_ROOT = "_root" HOME = Path("/home/testuser") def _pkg(name, layer="_shared", files=(), module=None): return Package( name=name, layer=layer, package_id=f"{layer}/{name}", source_dir=Path(f"/dots/{layer}/{name}"), module=module, local_files=tuple(files), ) class TestResolvePackageTargets: def test_simple_file(self): pkg = _pkg("zsh", files=[ (Path("/dots/_shared/zsh/.zshrc"), Path(".zshrc")), ]) targets = resolve_package_targets(pkg, HOME, set()) assert len(targets) == 1 assert targets[0].target == HOME / ".zshrc" assert targets[0].source == Path("/dots/_shared/zsh/.zshrc") assert targets[0].from_module is False def test_nested_config(self): pkg = _pkg("git", files=[ (Path("/dots/_shared/git/.config/git/config"), Path(".config/git/config")), ]) targets = resolve_package_targets(pkg, HOME, set()) assert targets[0].target == HOME / ".config" / "git" / "config" def test_root_marker(self): pkg = _pkg("dns", files=[ (Path("/dots/_shared/dns/_root/etc/hosts"), Path("_root/etc/hosts")), ]) targets = resolve_package_targets(pkg, HOME, set()) assert targets[0].target == Path("/etc/hosts") assert targets[0].needs_sudo is True def test_root_marker_skipped_when_in_skip_set(self): pkg = _pkg("dns", files=[ (Path("/dots/_shared/dns/_root/etc/hosts"), Path("_root/etc/hosts")), ]) targets = resolve_package_targets(pkg, HOME, {"_root"}) assert len(targets) == 0 def test_skip_package_by_name(self): pkg = _pkg("nvim", files=[ (Path("/dots/_shared/nvim/.config/nvim/init.lua"), Path(".config/nvim/init.lua")), ]) targets = resolve_package_targets(pkg, HOME, {"nvim"}) assert len(targets) == 0 def test_module_files_linked_under_mount_path(self): module = ModuleRef( source="https://github.com/org/nvim-config.git", ref_type="branch", ref_value="main", mount_path=Path(".config/nvim"), cache_dir=Path("/modules/_shared--nvim"), module_files=( (Path("/modules/_shared--nvim/init.lua"), Path("init.lua")), (Path("/modules/_shared--nvim/lua/plugins.lua"), Path("lua/plugins.lua")), ), ) pkg = _pkg("nvim", files=[ (Path("/dots/_shared/nvim/.local/bin/nvim-wrapper"), Path(".local/bin/nvim-wrapper")), ], module=module) targets = resolve_package_targets(pkg, HOME, set()) # Local file outside mount_path local_targets = [t for t in targets if not t.from_module] assert len(local_targets) == 1 assert local_targets[0].target == HOME / ".local" / "bin" / "nvim-wrapper" # Module files under mount_path module_targets = [t for t in targets if t.from_module] assert len(module_targets) == 2 module_target_paths = {t.target for t in module_targets} assert HOME / ".config" / "nvim" / "init.lua" in module_target_paths assert HOME / ".config" / "nvim" / "lua" / "plugins.lua" in module_target_paths def test_module_yaml_file_not_linked(self): """The _module.yaml marker itself should never be linked.""" pkg = _pkg("nvim", files=[ (Path("/dots/_shared/nvim/.config/nvim/_module.yaml"), Path(".config/nvim/_module.yaml")), ], module=ModuleRef( source="x", ref_type="branch", ref_value="main", mount_path=Path(".config/nvim"), cache_dir=Path("/m"), module_files=(), )) targets = resolve_package_targets(pkg, HOME, set()) assert not any(t.target.name == "_module.yaml" for t in targets) def test_root_level_module_skips_all_local_files(self): """When mount_path is '.', all local files are from the module, not dotfiles repo.""" module = ModuleRef( source="x", ref_type="branch", ref_value="main", mount_path=Path("."), cache_dir=Path("/m"), module_files=( (Path("/m/init.lua"), Path("init.lua")), ), ) pkg = _pkg("nvim", files=[ (Path("/dots/_shared/nvim/_module.yaml"), Path("_module.yaml")), (Path("/dots/_shared/nvim/stale-file.txt"), Path("stale-file.txt")), ], module=module) targets = resolve_package_targets(pkg, HOME, set()) # Only module files should appear, local files skipped assert len(targets) == 1 assert targets[0].from_module is True assert targets[0].target == HOME / "init.lua" class TestResolveAllTargets: def test_no_conflicts(self): pkgs = [ _pkg("zsh", files=[(Path("/a/.zshrc"), Path(".zshrc"))]), _pkg("git", files=[(Path("/a/.gitconfig"), Path(".gitconfig"))]), ] targets = resolve_all_targets(pkgs, HOME, set()) assert len(targets) == 2 def test_duplicate_target_raises(self): pkgs = [ _pkg("zsh", layer="_shared", files=[(Path("/a/.zshrc"), Path(".zshrc"))]), _pkg("zsh", layer="work", files=[(Path("/b/.zshrc"), Path(".zshrc"))]), ] with pytest.raises(PlanConflict): resolve_all_targets(pkgs, HOME, set()) ``` - [ ] **Step 2: Implement resolution.py** ```python # src/flow/domain/dotfiles/resolution.py """Path resolution: package -> home-relative LinkTargets. Pure functions.""" from pathlib import Path from flow.core.errors import PlanConflict from flow.domain.dotfiles.models import LinkTarget, Package RESERVED_ROOT = "_root" MODULE_FILE = "_module.yaml" def resolve_package_targets( package: Package, home: Path, skip: set[str], ) -> list[LinkTarget]: """Resolve all LinkTargets for a package, handling modules correctly.""" if package.name in skip: return [] targets: list[LinkTarget] = [] mount_path = package.module.mount_path if package.module else None # Local files (from dotfiles repo) for abs_source, rel in package.local_files: # Skip _module.yaml if rel.name == MODULE_FILE: continue # If module exists, skip files inside mount_path (module provides those) if mount_path is not None: if mount_path == Path("."): continue # Root-level module: all local files are superseded by module try: rel.relative_to(mount_path) continue # Inside mount_path, skip except ValueError: pass # Outside mount_path, process normally target, needs_sudo = _resolve_target(rel, home, skip) if target is None: continue targets.append(LinkTarget( source=abs_source, target=target, package=package.package_id, from_module=False, needs_sudo=needs_sudo, )) # Module files if package.module: for abs_source, rel in package.module.module_files: mounted = package.module.mount_path / rel if package.module.mount_path != Path(".") else rel target, needs_sudo = _resolve_target(mounted, home, skip) if target is None: continue targets.append(LinkTarget( source=abs_source, target=target, package=package.package_id, from_module=True, needs_sudo=needs_sudo, )) return targets def _resolve_target(rel: Path, home: Path, skip: set[str]) -> tuple[Path | None, bool]: """Resolve a relative path to an absolute target. Returns (target, needs_sudo).""" parts = rel.parts if parts and parts[0] == RESERVED_ROOT: if RESERVED_ROOT in skip: return None, False if len(parts) < 2: return None, False return Path("/") / Path(*parts[1:]), True return home / rel, False def resolve_all_targets( packages: list[Package], home: Path, skip: set[str], ) -> list[LinkTarget]: """Resolve targets for all packages. Raises PlanConflict on duplicate targets.""" all_targets: list[LinkTarget] = [] seen: dict[Path, str] = {} for pkg in packages: targets = resolve_package_targets(pkg, home, skip) for t in targets: if t.target in seen: conflicts = [ f"{t.target} claimed by both {seen[t.target]} and {t.package}" ] raise PlanConflict( f"Conflicting dotfile targets across packages", conflicts, ) seen[t.target] = t.package all_targets.append(t) return all_targets ``` - [ ] **Step 3: Run tests** Run: `python -m pytest tests/test_domain_dotfiles_resolution.py -v` Expected: All passed - [ ] **Step 4: Commit** ```bash git add src/flow/domain/dotfiles/resolution.py tests/test_domain_dotfiles_resolution.py git commit -m "feat: add dotfiles path resolution with correct module mounting" ``` --- ### Task 11: Dotfiles planning (link/unlink plans) **Note:** The spec's file layout lists `domain/dotfiles/conflicts.py` as a separate file. This is intentionally merged into `planning.py` -- cross-package collisions are caught by `resolve_all_targets` (Task 10), and filesystem conflicts are detected inside `plan_link` via the injected `filesystem_check` callback. No separate `conflicts.py` file is needed. **Files:** - Create: `src/flow/domain/dotfiles/planning.py` - Create: `tests/test_domain_dotfiles_planning.py` - [ ] **Step 1: Write tests** ```python # tests/test_domain_dotfiles_planning.py """Tests for dotfiles link planning.""" from pathlib import Path from typing import Optional from flow.domain.dotfiles.models import ( LinkOp, LinkTarget, LinkedState, ) from flow.domain.dotfiles.planning import plan_link, plan_unlink def _lt(target, source="/a", pkg="_shared/zsh", module=False, sudo=False): return LinkTarget( source=Path(source), target=Path(target), package=pkg, from_module=module, needs_sudo=sudo, ) def _fs_check_none(path: Path) -> Optional[str]: """Fake filesystem_check: nothing exists.""" return None def _fs_check_file(path: Path) -> Optional[str]: """Fake: everything is a file.""" return "file" class TestPlanLink: def test_new_target_creates_link(self): desired = [_lt("/home/x/.zshrc")] plan = plan_link(desired, LinkedState(), _fs_check_none) assert len(plan.operations) == 1 assert plan.operations[0].type == "create_link" assert plan.summary.added == 1 def test_existing_correct_link_unchanged(self): lt = _lt("/home/x/.zshrc") current = LinkedState(links={Path("/home/x/.zshrc"): lt}) plan = plan_link([lt], current, _fs_check_none) assert len(plan.operations) == 0 assert plan.summary.unchanged == 1 def test_stale_link_removed(self): old = _lt("/home/x/.old") current = LinkedState(links={Path("/home/x/.old"): old}) plan = plan_link([], current, _fs_check_none) assert len(plan.operations) == 1 assert plan.operations[0].type == "remove_link" assert plan.summary.removed == 1 def test_changed_source_produces_remove_then_create(self): old = _lt("/home/x/.zshrc", source="/old") new = _lt("/home/x/.zshrc", source="/new") current = LinkedState(links={Path("/home/x/.zshrc"): old}) plan = plan_link([new], current, _fs_check_none) types = [op.type for op in plan.operations] assert types == ["remove_link", "create_link"] def test_unmanaged_file_at_target_is_conflict(self): desired = [_lt("/home/x/.zshrc")] plan = plan_link(desired, LinkedState(), _fs_check_file) assert len(plan.conflicts) == 1 assert ".zshrc" in plan.conflicts[0] def test_module_targets_counted(self): desired = [_lt("/home/x/.config/nvim/init.lua", module=True)] plan = plan_link(desired, LinkedState(), _fs_check_none) assert plan.summary.from_modules == 1 class TestPlanUnlink: def test_unlink_all(self): lt = _lt("/home/x/.zshrc") current = LinkedState(links={Path("/home/x/.zshrc"): lt}) plan = plan_unlink(current, packages=None) assert len(plan.operations) == 1 assert plan.operations[0].type == "remove_link" def test_unlink_specific_package(self): zsh = _lt("/home/x/.zshrc", pkg="_shared/zsh") git = _lt("/home/x/.gitconfig", pkg="_shared/git") current = LinkedState(links={ Path("/home/x/.zshrc"): zsh, Path("/home/x/.gitconfig"): git, }) plan = plan_unlink(current, packages=["_shared/zsh"]) assert len(plan.operations) == 1 assert plan.operations[0].target == Path("/home/x/.zshrc") def test_unlink_by_basename(self): zsh = _lt("/home/x/.zshrc", pkg="_shared/zsh") current = LinkedState(links={Path("/home/x/.zshrc"): zsh}) plan = plan_unlink(current, packages=["zsh"]) assert len(plan.operations) == 1 ``` - [ ] **Step 2: Implement planning.py** ```python # src/flow/domain/dotfiles/planning.py """Link/unlink plan computation. Pure functions with injected I/O.""" from pathlib import Path from typing import Callable, Optional from flow.domain.dotfiles.models import ( LinkOp, LinkPlan, LinkTarget, LinkedState, PlanSummary, ) def plan_link( desired: list[LinkTarget], current: LinkedState, filesystem_check: Callable[[Path], Optional[str]], ) -> LinkPlan: """Build reconciliation plan. filesystem_check: injected by service. Returns "file", "dir", "symlink", or None. """ ops: list[LinkOp] = [] conflicts: list[str] = [] added = 0 removed = 0 unchanged = 0 from_modules = 0 desired_map = {t.target: t for t in desired} desired_targets = set(desired_map.keys()) current_targets = set(current.links.keys()) # Removals: in current but not desired for target in sorted(current_targets - desired_targets): ops.append(LinkOp( type="remove_link", target=target, source=None, package=current.links[target].package, needs_sudo=current.links[target].needs_sudo, )) removed += 1 # Additions, updates, and unchanged for target in sorted(desired_targets): spec = desired_map[target] if target in current.links: cur = current.links[target] if cur.source == spec.source: unchanged += 1 if spec.from_module: from_modules += 1 continue # Source changed: remove old link, then create new one ops.append(LinkOp( type="remove_link", target=target, source=cur.source, package=cur.package, needs_sudo=cur.needs_sudo, )) ops.append(LinkOp( type="create_link", target=target, source=spec.source, package=spec.package, needs_sudo=spec.needs_sudo, )) added += 1 if spec.from_module: from_modules += 1 continue # New target: check filesystem for conflicts fs_state = filesystem_check(target) if fs_state is not None: conflicts.append( f"{target} already exists ({fs_state}) and is not managed by flow" ) continue ops.append(LinkOp( type="create_link", target=target, source=spec.source, package=spec.package, needs_sudo=spec.needs_sudo, )) added += 1 if spec.from_module: from_modules += 1 return LinkPlan( operations=ops, conflicts=conflicts, summary=PlanSummary( added=added, removed=removed, unchanged=unchanged, from_modules=from_modules, ), ) def plan_unlink( current: LinkedState, packages: Optional[list[str]], ) -> LinkPlan: """Plan removal of managed links.""" ops: list[LinkOp] = [] for target in sorted(current.links.keys()): lt = current.links[target] if packages is not None: # Match by full package_id or by basename basename = lt.package.split("/", 1)[-1] if "/" in lt.package else lt.package if lt.package not in packages and basename not in packages: continue ops.append(LinkOp( type="remove_link", target=target, source=lt.source, package=lt.package, needs_sudo=lt.needs_sudo, )) return LinkPlan( operations=ops, conflicts=[], summary=PlanSummary(added=0, removed=len(ops), unchanged=0, from_modules=0), ) ``` - [ ] **Step 3: Run tests** Run: `python -m pytest tests/test_domain_dotfiles_planning.py -v` Expected: All passed - [ ] **Step 4: Commit** ```bash git add src/flow/domain/dotfiles/planning.py tests/test_domain_dotfiles_planning.py git commit -m "feat: add dotfiles link planning with conflict detection" ``` --- ## Chunk 3: Packages Domain ### Task 12: Package models **Files:** - Create: `src/flow/domain/packages/__init__.py` (empty) - Create: `src/flow/domain/packages/models.py` - Create: `tests/test_domain_packages_models.py` - [ ] **Step 1: Write models** All frozen dataclasses as specified in spec Section 5.1: `PackageDef`, `ProfilePackageRef`, `PkgInstallOp`, `PkgRemoveOp`, `PackagePlan`, `InstalledPackage`, `InstalledState`. `InstalledState` serialization format: ```python # as_dict() produces: { "version": 1, "packages": { "neovim": { "version": "0.10.4", "type": "binary", "files": ["/home/x/.local/bin/nvim", "/home/x/.local/share/nvim"] } } } # from_dict() parses the above. Raises ConfigError on version mismatch. ``` - [ ] **Step 2: Write tests** ```python # tests/test_domain_packages_models.py """Tests for packages domain models.""" from pathlib import Path import pytest from flow.core.errors import ConfigError from flow.domain.packages.models import InstalledPackage, InstalledState, PackageDef def test_installed_state_roundtrip(): state = InstalledState(packages={ "neovim": InstalledPackage( name="neovim", version="0.10.4", type="binary", files=[Path("/home/x/.local/bin/nvim")], ), }) data = state.as_dict() restored = InstalledState.from_dict(data) assert "neovim" in restored.packages assert restored.packages["neovim"].version == "0.10.4" assert restored.packages["neovim"].files == [Path("/home/x/.local/bin/nvim")] def test_installed_state_empty(): state = InstalledState.from_dict({}) assert state.packages == {} def test_installed_state_version_mismatch(): with pytest.raises(ConfigError): InstalledState.from_dict({"version": 99, "packages": {}}) def test_package_def_fields(): pkg = PackageDef( name="fd", type="pkg", sources={"apt": "fd-find"}, source=None, version=None, asset_pattern=None, platform_map={}, extract_dir=None, install={}, post_install=None, allow_sudo=False, ) assert pkg.name == "fd" assert pkg.type == "pkg" ``` - [ ] **Step 3: Run tests** Run: `python -m pytest tests/test_domain_packages_models.py -v` Expected: All passed - [ ] **Step 4: Commit** ```bash git add src/flow/domain/packages/ tests/test_domain_packages_models.py git commit -m "feat: add packages domain models" ``` --- ### Task 13: Package catalog and resolution **Files:** - Create: `src/flow/domain/packages/catalog.py` - Create: `src/flow/domain/packages/resolution.py` - Create: `tests/test_domain_packages.py` - [ ] **Step 1: Write tests covering:** - `parse_catalog` with list and dict manifest formats - `normalize_profile_entry` with string shorthand ("binary/neovim"), plain name, dict - `resolve_spec` merging catalog and profile overrides - `resolve_source_name` with apt/brew/dnf fallbacks - `resolve_binary_asset` with platform map and asset pattern - `resolve_download_url` with github shorthand and direct URL - `pm_update_command` and `pm_install_command` for apt/dnf/brew - `detect_package_manager` returning apt/dnf/None - [ ] **Step 2: Implement catalog.py and resolution.py** Implement the functions listed in spec Section 5.2 matching those exact signatures. Use `src/flow/services/package_defs.py` only as reference for resolution logic, not as source of truth -- the spec signatures are canonical. This is the single copy of all package resolution logic (no duplication with bootstrap). - [ ] **Step 3: Run tests** Run: `python -m pytest tests/test_domain_packages.py -v` - [ ] **Step 4: Commit** ```bash git add src/flow/domain/packages/ tests/test_domain_packages.py git commit -m "feat: add packages catalog and resolution (single source of truth)" ``` --- ### Task 14: Package planning **Files:** - Create: `src/flow/domain/packages/planning.py` - Create: `tests/test_domain_packages_planning.py` - [ ] **Step 1: Write tests for `plan_install` and `plan_remove`** - [ ] **Step 2: Implement planning.py** - [ ] **Step 3: Run tests, commit** ```bash git commit -m "feat: add package install/remove planning" ``` --- ## Chunk 4: Bootstrap Domain ### Task 15: Bootstrap models and setup modules **Files:** - Create: `src/flow/domain/bootstrap/__init__.py` - Create: `src/flow/domain/bootstrap/models.py` - Create: `src/flow/domain/bootstrap/modules.py` - Create: `tests/test_domain_bootstrap_modules.py` - [ ] **Step 1: Write models**: `Profile`, `SetupModuleDef`, `BootstrapAction`, `BootstrapPlan` - [ ] **Step 2: Write and test setup modules**: `HostnameModule`, `LocaleModule`, `ShellModule`, `SSHKeygenModule`, `RuncmdModule`. Each returns shell commands from `.plan()` and a human-readable string from `.describe()`. `describe()` examples (used in dry-run output): - `HostnameModule.describe()` -> `"Set hostname to my-host"` - `LocaleModule.describe()` -> `"Set locale to en_US.UTF-8"` - `ShellModule.describe()` -> `"Install and configure shell: zsh"` - `SSHKeygenModule.describe()` -> `"Generate 1 SSH key(s)"` - `RuncmdModule.describe()` -> `"Run 3 custom command(s)"` Add a test for each module asserting `isinstance(module.describe(), str)` and that `.plan()` returns a non-empty list of strings. - [ ] **Step 3: Run tests, commit** ```bash git commit -m "feat: add bootstrap models and setup modules" ``` --- ### Task 16: Bootstrap planning **Files:** - Create: `src/flow/domain/bootstrap/planning.py` - Create: `tests/test_domain_bootstrap_planning.py` - [ ] **Step 1: Write tests** for `parse_profile` and `plan_bootstrap` -- verify it produces correct ordered actions (validate env, setup modules, packages, shell, dotfiles link). - [ ] **Step 2: Implement** -- `plan_bootstrap` raises `ConfigError` if required env vars are missing, then builds ordered action list using packages domain for install planning. - [ ] **Step 3: Run tests, commit** ```bash git commit -m "feat: add bootstrap planning (orchestrator over packages + dotfiles)" ``` --- ## Chunk 5: Remote + Containers + Projects Domains ### Task 17: Remote domain **Files:** - Create: `src/flow/domain/remote/__init__.py` - Create: `src/flow/domain/remote/models.py` -- contains `Target` and `SSHCommand` only. **`TargetConfig` lives in `core/config.py`** (already created in Task 7). Import it from there. - Create: `src/flow/domain/remote/resolution.py` - Create: `tests/test_domain_remote.py` - [ ] **Step 1: Write tests** for `parse_target`, `resolve_target`, `build_ssh_command`, `terminfo_fix_command` - [ ] **Step 2: Implement models and resolution.** Port from current `src/flow/services/ssh.py`. `resolve_target` receives `list[TargetConfig]` imported from `flow.core.config`. - [ ] **Step 3: Run tests, commit** ```bash git commit -m "feat: add remote domain (SSH target resolution)" ``` --- ### Task 18: Containers domain **Files:** - Create: `src/flow/domain/containers/__init__.py` - Create: `src/flow/domain/containers/models.py` - Create: `src/flow/domain/containers/resolution.py` - Create: `tests/test_domain_containers.py` - [ ] **Step 1: Write tests** for `parse_image_ref`, `container_name`, `resolve_mounts`, `build_container_spec` - [ ] **Step 2: Implement.** Port from current `src/flow/services/containers.py`. - [ ] **Step 3: Run tests, commit** ```bash git commit -m "feat: add containers domain (image resolution, mount computation)" ``` --- ## Chunk 6: Services Layer (Part 1) ### Task 19: DotfilesService **Files:** - Create: `src/flow/services/__init__.py` - Create: `src/flow/services/dotfiles.py` - Create: `tests/test_service_dotfiles.py` - [ ] **Step 1: Write integration tests** using `tmp_path` for real filesystem + `FakeRunner` for git: - `test_link_creates_symlinks` -- set up dotfiles dir, call `.link()`, verify symlinks - `test_link_with_module` -- set up package with `_module.yaml` and cloned module dir, verify correct target paths - `test_unlink_removes_symlinks` -- link then unlink, verify cleaned up - `test_link_dry_run_no_changes` -- dry run does not create symlinks - `test_status_shows_packages` -- set up linked state, verify output - [ ] **Step 2: Implement DotfilesService** The service performs all I/O: 1. `_discover_packages`: walks dotfiles dir, reads `_module.yaml` files, builds `Package` objects with `local_files` populated 2. Calls pure domain functions for resolution and planning 3. Executes the plan (create/remove symlinks via `FileSystem`) 4. Persists `LinkedState` to JSON - [ ] **Step 3: Run tests, commit** ```bash git commit -m "feat: add DotfilesService with link/unlink/status/edit" ``` --- ### Task 20: PackageService **Files:** - Create: `src/flow/services/packages.py` - Create: `tests/test_service_packages.py` - [ ] **Step 1: Write tests** for install, list, remove flows - [ ] **Step 2: Implement PackageService** -- uses domain planning, executes via runtime - [ ] **Step 3: Run tests, commit** ```bash git commit -m "feat: add PackageService" ``` --- ### Task 21: BootstrapService **Files:** - Create: `src/flow/services/bootstrap.py` - Create: `tests/test_service_bootstrap.py` - [ ] **Step 1: Write tests** -- bootstrap run with dry_run, show, list - [ ] **Step 2: Implement** -- uses domain planning, executes actions sequentially, delegates to PackageService and DotfilesService - [ ] **Step 3: Run tests, commit** ```bash git commit -m "feat: add BootstrapService" ``` --- ## Chunk 7: Services Layer (Part 2) ### Task 22: RemoteService **Files:** - Create: `src/flow/services/remote.py` - Create: `tests/test_service_remote.py` - [ ] **Step 1: Write tests** -- enter dry_run, list targets, terminfo warning - [ ] **Step 2: Implement** - [ ] **Step 3: Run tests, commit** ```bash git commit -m "feat: add RemoteService" ``` --- ### Task 23: ContainerService **Files:** - Create: `src/flow/services/containers.py` - Create: `tests/test_service_containers.py` - [ ] **Step 1: Write tests** -- create, list, stop, remove with FakeRunner - [ ] **Step 2: Implement** - [ ] **Step 3: Run tests, commit** ```bash git commit -m "feat: add ContainerService" ``` --- ### Task 24: ProjectService **Note:** `ProjectService` has no domain layer -- it's a thin service that runs git commands directly. All logic is I/O-bound (listing dirs, running git). No separate domain module needed. **Files:** - Create: `src/flow/services/projects.py` - Create: `tests/test_service_projects.py` - [ ] **Step 1: Write tests** ```python # tests/test_service_projects.py """Tests for ProjectService.""" def test_check_clean_repo(tmp_path): """Create a git repo at tmp_path/projects/myrepo, commit a file. Call service.check(fetch=False). Verify output contains 'clean'.""" def test_check_uncommitted_changes(tmp_path): """Create repo, modify a tracked file without committing. Verify 'uncommitted changes' in output.""" def test_check_no_git_repos(tmp_path): """Empty projects dir. Verify info message.""" def test_summary_shows_all_dirs(tmp_path): """Mix of git and non-git dirs. Verify table output.""" ``` - [ ] **Step 2: Implement** -- `check()` iterates dirs in `projects_dir`, runs git status/diff/rev-list per repo. `fetch()` runs `git fetch --all`. `summary()` is `check(fetch=False)`. Use `self.ctx.runtime.git` for all git calls, `self.ctx.console.table` for output. - [ ] **Step 3: Run tests, commit** ```bash git commit -m "feat: add ProjectService" ``` --- ## Chunk 8: Commands + CLI ### Task 25: CLI entry point + context validation **Files:** - Create: `src/flow/cli.py` (rewrite) - Create: `tests/test_cli.py` - [ ] **Step 1: Write tests** -- non-root check, context validation (remote blocked in VM), version flag, dry_run passed through - [ ] **Step 2: Implement cli.py** per spec Section 10 - [ ] **Step 3: Run tests, commit** ```bash git commit -m "feat: add CLI entry point with context awareness" ``` --- ### Task 26: Command modules **Files:** - Create: `src/flow/commands/remote.py` - Create: `src/flow/commands/dev.py` - Create: `src/flow/commands/dotfiles.py` - Create: `src/flow/commands/setup.py` - Create: `src/flow/commands/packages.py` - Create: `src/flow/commands/projects.py` - Create: `src/flow/commands/__init__.py` Each command module: register argparse subcommands, handler functions that construct the service and call one method. - [ ] **Step 1: Implement all command modules** (each is 30-60 lines) - [ ] **Step 2: Write integration test** that runs `flow --help`, `flow dotfiles --help`, etc. - [ ] **Step 3: Commit** ```bash git commit -m "feat: add command modules (thin CLI adapters)" ``` --- ### Task 27: Zsh completion **Files:** - Create: `src/flow/commands/completion.py` - Create: `tests/test_completion.py` - [ ] **Step 1: Implement** -- port and adapt from current completion.py with updated command names - [ ] **Step 2: Write tests** for `complete()` function with the new command surface - [ ] **Step 3: Commit** ```bash git commit -m "feat: add zsh completion with updated command surface" ``` --- ## Chunk 9: Cleanup + Integration ### Task 28: Delete old code - [ ] **Step 1: Remove old files** ```bash # Remove old service/command/core files that have been replaced rm -f src/flow/core/action.py rm -f src/flow/core/stow.py rm -f src/flow/core/process.py # Old core files replaced by new ones: # system.py -> runtime.py (already handled) # config.py, console.py, platform.py, paths.py, errors.py -> rewritten in place # variables.py -> template.py # Remove old command modules rm -rf src/flow/commands/ # Already replaced by new commands/ # Remove old services rm -rf src/flow/services/ # Already replaced by new services/ # Remove old tests rm -f tests/test_action.py tests/test_stow.py tests/test_commands.py rm -f tests/test_dotfiles_folding.py tests/test_self_hosting.py ``` - [ ] **Step 2: Run full test suite** Run: `python -m pytest tests/ -v` Expected: All new tests pass, no imports of deleted modules - [ ] **Step 3: Update pyproject.toml entry point if needed** Verify `[project.scripts] flow = "flow.cli:main"` still works. - [ ] **Step 4: Test binary build** ```bash make build ./dist/flow --help ./dist/flow --version ``` - [ ] **Step 5: Commit** ```bash git add -A git commit -m "chore: remove old code, complete rewrite" ``` --- ### Task 29: Update README - [ ] **Step 1: Rewrite README.md** to match the new command surface, config format, and architecture. - [ ] **Step 2: Commit** ```bash git add README.md git commit -m "docs: update README for new architecture" ```