From df8a19d6cc1e5d822e96376c1b381f15015e1899 Mon Sep 17 00:00:00 2001 From: Tomas Mirchev Date: Mon, 16 Mar 2026 04:39:11 +0200 Subject: [PATCH] Fix all plan review issues, save implementation plan Plan fixes: - detect_platform raises FlowError not RuntimeError - TargetConfig lives in core/config.py only (remote domain imports it) - plan_link handles source changes (remove_link + create_link) - resolve_package_targets skips local files when mount_path is root - LinkedState.from_dict guards on version mismatch - Added missing test for parse_module_ref with absent ref - Task 12 now has full tests and serialization format - Task 13 uses spec signatures as truth, old code as reference - Task 15 includes describe() examples and tests - Task 24 has detailed test cases for ProjectService - Note that conflicts.py is intentionally merged into planning.py - Spec Section 12 example fixed to include filesystem_check arg Co-Authored-By: Claude Opus 4.6 (1M context) --- .../plans/2026-03-16-flow-rewrite.md | 2533 +++++++++++++++++ .../2026-03-16-flow-architecture-redesign.md | 2 +- 2 files changed, 2534 insertions(+), 1 deletion(-) create mode 100644 docs/superpowers/plans/2026-03-16-flow-rewrite.md diff --git a/docs/superpowers/plans/2026-03-16-flow-rewrite.md b/docs/superpowers/plans/2026-03-16-flow-rewrite.md new file mode 100644 index 0000000..9a4bf20 --- /dev/null +++ b/docs/superpowers/plans/2026-03-16-flow-rewrite.md @@ -0,0 +1,2533 @@ +# Flow CLI Rewrite Implementation Plan + +> **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Full rewrite of the flow CLI with correct domain abstractions, plan-then-execute pattern, and working module path resolution. + +**Architecture:** Four-layer hybrid (core -> domain -> services -> commands). Domain layer is pure functions + frozen dataclasses. Services orchestrate I/O. Commands are trivial dispatchers. See `docs/superpowers/specs/2026-03-16-flow-architecture-redesign.md` for the full spec. + +**Tech Stack:** Python 3.9+, PyYAML, argparse, pytest, PyInstaller + +**Spec:** `docs/superpowers/specs/2026-03-16-flow-architecture-redesign.md` +**Standards:** `CLAUDE.md` + +--- + +## Phasing Strategy + +Build bottom-up. Each phase produces tested, working code before the next begins. Old code under `src/flow/` is untouched until the final phase when it's deleted. + +The new code is written in-place (same `src/flow/` tree) because this is a full rewrite with no backward compatibility. Old modules are deleted as their replacements land. + +| Phase | What | Depends on | +|-------|------|------------| +| 1 | Core layer (errors, template, paths, platform, console, runtime, config) | Nothing | +| 2 | Dotfiles domain (models, modules, resolution, planning) | Core | +| 3 | Packages domain (models, catalog, resolution, planning) | Core | +| 4 | Bootstrap domain (models, modules, planning) | Core, packages domain | +| 5 | Remote + containers + projects domains | Core | +| 6 | All services | Core, all domains | +| 7 | Commands + CLI + completion | Core, all services | +| 8 | Delete old code, final integration | Everything | + +--- + +## Chunk 1: Core Layer + +### Task 1: Errors + +**Files:** +- Create: `src/flow/core/errors.py` +- Create: `tests/test_errors.py` + +- [ ] **Step 1: Write the error hierarchy** + +```python +# src/flow/core/errors.py +"""Project-wide error types.""" + + +class FlowError(Exception): + """Base for all user-facing errors.""" + + +class ConfigError(FlowError): + """Invalid config or manifest YAML.""" + + +class PlanConflict(FlowError): + """Conflicts detected during planning.""" + + def __init__(self, message: str, conflicts: list[str]): + super().__init__(message) + self.conflicts = conflicts + + +class ExecutionError(FlowError): + """A plan step failed during execution.""" +``` + +- [ ] **Step 2: Write tests** + +```python +# tests/test_errors.py +"""Tests for flow.core.errors.""" + +from flow.core.errors import ConfigError, ExecutionError, FlowError, PlanConflict + + +def test_flow_error_is_exception(): + assert issubclass(FlowError, Exception) + + +def test_config_error_is_flow_error(): + assert issubclass(ConfigError, FlowError) + + +def test_plan_conflict_carries_conflicts(): + err = PlanConflict("2 conflicts", ["a exists", "b exists"]) + assert str(err) == "2 conflicts" + assert err.conflicts == ["a exists", "b exists"] + + +def test_execution_error_is_flow_error(): + assert issubclass(ExecutionError, FlowError) +``` + +- [ ] **Step 3: Run tests** + +Run: `python -m pytest tests/test_errors.py -v` +Expected: 4 passed + +- [ ] **Step 4: Commit** + +```bash +git add src/flow/core/errors.py tests/test_errors.py +git commit -m "feat: add FlowError hierarchy" +``` + +--- + +### Task 2: Template (pure string substitution) + +**Files:** +- Create: `src/flow/core/template.py` +- Create: `tests/test_template.py` + +- [ ] **Step 1: Write tests** + +```python +# tests/test_template.py +"""Tests for flow.core.template.""" + +import os + +from flow.core.template import substitute, substitute_template + + +class TestSubstitute: + def test_replaces_dollar_var(self): + assert substitute("hello $NAME", {"NAME": "world"}) == "hello world" + + def test_replaces_braced_var(self): + assert substitute("hello ${NAME}", {"NAME": "world"}) == "hello world" + + def test_falls_back_to_env(self, monkeypatch): + monkeypatch.setenv("FOO", "bar") + assert substitute("$FOO", {}) == "bar" + + def test_preserves_unknown_vars(self): + assert substitute("$UNKNOWN", {}) == "$UNKNOWN" + + def test_non_string_passthrough(self): + assert substitute(42, {}) == 42 + + +class TestSubstituteTemplate: + def test_replaces_double_braces(self): + assert substitute_template("nvim-{{os}}", {"os": "linux"}) == "nvim-linux" + + def test_env_dot_notation(self, monkeypatch): + monkeypatch.setenv("USER", "tomas") + result = substitute_template("{{ env.USER }}", {"env": dict(os.environ)}) + assert result == "tomas" + + def test_nested_dict_lookup(self): + ctx = {"platform": {"arch": "arm64"}} + assert substitute_template("{{ platform.arch }}", ctx) == "arm64" + + def test_preserves_unknown_templates(self): + assert substitute_template("{{ unknown }}", {}) == "{{ unknown }}" + + def test_non_string_passthrough(self): + assert substitute_template(42, {}) == 42 + + def test_whitespace_in_braces(self): + assert substitute_template("{{ os }}", {"os": "linux"}) == "linux" +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `python -m pytest tests/test_template.py -v` +Expected: FAIL (module not found) + +- [ ] **Step 3: Implement template.py** + +```python +# src/flow/core/template.py +"""Variable and template substitution -- pure functions, no I/O.""" + +import os +import re +from typing import Any, Dict + + +def substitute(text: Any, variables: Dict[str, str]) -> Any: + """Replace $VAR and ${VAR} with values from variables dict or env.""" + if not isinstance(text, str): + return text + + pattern = re.compile(r"\$(\w+)|\$\{([^}]+)\}") + + def _replace(match: re.Match[str]) -> str: + key = match.group(1) or match.group(2) or "" + if key in variables: + return str(variables[key]) + if key in os.environ: + return os.environ[key] + return match.group(0) + + return pattern.sub(_replace, text) + + +def _resolve_template_value(expr: str, context: Dict[str, Any]) -> Any: + if expr.startswith("env."): + env_key = expr.split(".", 1)[1] + env_ctx = context.get("env", {}) + if isinstance(env_ctx, dict) and env_key in env_ctx: + return env_ctx[env_key] + return os.environ.get(env_key) + + if expr in context: + return context[expr] + + current: Any = context + for part in expr.split("."): + if not isinstance(current, dict) or part not in current: + return None + current = current[part] + + return current + + +def substitute_template(text: Any, context: Dict[str, Any]) -> Any: + """Replace {{expr}} placeholders with values from context dict.""" + if not isinstance(text, str): + return text + + def _replace(match: re.Match[str]) -> str: + key = match.group(1).strip() + value = _resolve_template_value(key, context) + if value is None: + return match.group(0) + return str(value) + + return re.sub(r"\{\{\s*([^{}]+?)\s*\}\}", _replace, text) +``` + +- [ ] **Step 4: Run tests** + +Run: `python -m pytest tests/test_template.py -v` +Expected: All passed + +- [ ] **Step 5: Commit** + +```bash +git add src/flow/core/template.py tests/test_template.py +git commit -m "feat: add template substitution (pure functions)" +``` + +--- + +### Task 3: Paths (XDG constants) + +**Files:** +- Create: `src/flow/core/paths.py` +- Create: `tests/test_core_paths.py` + +- [ ] **Step 1: Write paths.py** + +```python +# src/flow/core/paths.py +"""XDG-compliant path constants for flow.""" + +import os +from pathlib import Path + + +def _xdg(env_var: str, fallback: str) -> Path: + return Path(os.environ.get(env_var, fallback)) + + +HOME = Path.home() + +CONFIG_DIR = _xdg("XDG_CONFIG_HOME", str(HOME / ".config")) / "flow" +DATA_DIR = _xdg("XDG_DATA_HOME", str(HOME / ".local" / "share")) / "flow" +STATE_DIR = _xdg("XDG_STATE_HOME", str(HOME / ".local" / "state")) / "flow" + +DOTFILES_DIR = DATA_DIR / "dotfiles" +MODULES_DIR = DATA_DIR / "modules" +PACKAGES_DIR = DATA_DIR / "packages" + +LINKED_STATE = STATE_DIR / "linked.json" +INSTALLED_STATE = STATE_DIR / "installed.json" + +# Self-hosted flow config path (from dotfiles repo) +DOTFILES_FLOW_CONFIG = DOTFILES_DIR / "_shared" / "flow" / ".config" / "flow" + + +def ensure_dirs() -> None: + """Create all required directories.""" + for d in (CONFIG_DIR, DATA_DIR, STATE_DIR, MODULES_DIR, PACKAGES_DIR): + d.mkdir(parents=True, exist_ok=True) +``` + +- [ ] **Step 2: Write tests** + +```python +# tests/test_core_paths.py +"""Tests for flow.core.paths.""" + +from pathlib import Path + +from flow.core import paths + + +def test_config_dir_ends_with_flow(): + assert paths.CONFIG_DIR.name == "flow" + + +def test_data_dir_ends_with_flow(): + assert paths.DATA_DIR.name == "flow" + + +def test_modules_dir_under_data(): + assert paths.MODULES_DIR.parent == paths.DATA_DIR + + +def test_linked_state_under_state(): + assert paths.LINKED_STATE.parent == paths.STATE_DIR + + +def test_dotfiles_flow_config_path(): + expected_suffix = Path("_shared") / "flow" / ".config" / "flow" + assert str(paths.DOTFILES_FLOW_CONFIG).endswith(str(expected_suffix)) + + +def test_ensure_dirs_creates_directories(tmp_path, monkeypatch): + monkeypatch.setattr(paths, "CONFIG_DIR", tmp_path / "config" / "flow") + monkeypatch.setattr(paths, "DATA_DIR", tmp_path / "data" / "flow") + monkeypatch.setattr(paths, "STATE_DIR", tmp_path / "state" / "flow") + monkeypatch.setattr(paths, "MODULES_DIR", tmp_path / "data" / "flow" / "modules") + monkeypatch.setattr(paths, "PACKAGES_DIR", tmp_path / "data" / "flow" / "packages") + + paths.ensure_dirs() + + assert (tmp_path / "config" / "flow").is_dir() + assert (tmp_path / "data" / "flow" / "modules").is_dir() + assert (tmp_path / "state" / "flow").is_dir() +``` + +- [ ] **Step 3: Run tests** + +Run: `python -m pytest tests/test_core_paths.py -v` +Expected: All passed + +- [ ] **Step 4: Commit** + +```bash +git add src/flow/core/paths.py tests/test_core_paths.py +git commit -m "feat: add XDG path constants" +``` + +--- + +### Task 4: Platform detection + +**Files:** +- Create: `src/flow/core/platform.py` +- Create: `tests/test_core_platform.py` + +- [ ] **Step 1: Write tests** + +```python +# tests/test_core_platform.py +"""Tests for flow.core.platform.""" + +import os + +import pytest + +from flow.core.platform import PlatformInfo, detect_context, detect_platform + + +def test_platform_info_computes_platform_string(): + p = PlatformInfo(os="linux", arch="x64") + assert p.platform == "linux-x64" + + +def test_detect_platform_returns_valid_info(): + info = detect_platform() + assert info.os in ("linux", "macos") + assert info.arch in ("x64", "arm64") + assert info.platform == f"{info.os}-{info.arch}" + + +def test_detect_platform_raises_flow_error_on_unsupported(monkeypatch): + from flow.core.errors import FlowError + monkeypatch.setattr("platform.system", lambda: "FreeBSD") + with pytest.raises(FlowError, match="Unsupported operating system"): + detect_platform() + + +def test_detect_context_host(monkeypatch): + monkeypatch.delenv("DF_NAMESPACE", raising=False) + monkeypatch.delenv("DF_PLATFORM", raising=False) + assert detect_context() == "host" + + +def test_detect_context_vm(monkeypatch): + monkeypatch.setenv("DF_NAMESPACE", "personal") + monkeypatch.setenv("DF_PLATFORM", "orb") + assert detect_context() == "vm" +``` + +- [ ] **Step 2: Implement platform.py** + +```python +# src/flow/core/platform.py +"""OS/arch detection and execution context.""" + +import os +import platform as _platform +from dataclasses import dataclass + +from flow.core.errors import FlowError + + +@dataclass(frozen=True) +class PlatformInfo: + os: str = "linux" + arch: str = "x64" + + @property + def platform(self) -> str: + return f"{self.os}-{self.arch}" + + +_OS_MAP = {"Darwin": "macos", "Linux": "linux"} +_ARCH_MAP = {"x86_64": "x64", "amd64": "x64", "aarch64": "arm64", "arm64": "arm64"} + + +def detect_platform() -> PlatformInfo: + raw_os = _platform.system() + os_name = _OS_MAP.get(raw_os) + if os_name is None: + raise FlowError(f"Unsupported operating system: {raw_os}") + + raw_arch = _platform.machine().lower() + arch = _ARCH_MAP.get(raw_arch) + if arch is None: + raise FlowError(f"Unsupported architecture: {raw_arch}") + + return PlatformInfo(os=os_name, arch=arch) + + +def detect_context() -> str: + """Detect execution context: 'host', 'vm', or 'container'.""" + if os.path.exists("/.dockerenv") or os.path.exists("/run/.containerenv"): + return "container" + if os.environ.get("DF_NAMESPACE") and os.environ.get("DF_PLATFORM"): + return "vm" + return "host" +``` + +- [ ] **Step 3: Run tests** + +Run: `python -m pytest tests/test_core_platform.py -v` +Expected: All passed + +- [ ] **Step 4: Commit** + +```bash +git add src/flow/core/platform.py tests/test_core_platform.py +git commit -m "feat: add platform detection and context awareness" +``` + +--- + +### Task 5: Console output + +**Files:** +- Create: `src/flow/core/console.py` +- Create: `tests/test_core_console.py` + +- [ ] **Step 1: Write tests** + +```python +# tests/test_core_console.py +"""Tests for flow.core.console.""" + +from flow.core.console import Console + + +def test_info_prints_message(capsys): + c = Console(color=False) + c.info("hello") + assert "hello" in capsys.readouterr().out + + +def test_quiet_suppresses_info(capsys): + c = Console(quiet=True, color=False) + c.info("hidden") + assert capsys.readouterr().out == "" + + +def test_quiet_does_not_suppress_error(capsys): + c = Console(quiet=True, color=False) + c.error("visible") + captured = capsys.readouterr() + assert "visible" in captured.err or "visible" in captured.out + + +def test_table_prints_headers_and_rows(capsys): + c = Console(color=False) + c.table(["NAME", "STATUS"], [["foo", "ok"], ["bar", "fail"]]) + output = capsys.readouterr().out + assert "NAME" in output + assert "foo" in output + assert "bar" in output + + +def test_no_color_strips_ansi(capsys): + c = Console(color=False) + c.info("test") + output = capsys.readouterr().out + assert "\033[" not in output +``` + +- [ ] **Step 2: Implement console.py** + +```python +# src/flow/core/console.py +"""Console output formatting with TTY detection and color control.""" + +import os +import sys +from typing import Any, Optional + + +class Console: + def __init__(self, *, quiet: bool = False, color: Optional[bool] = None): + self.quiet = quiet + if color is None: + self._color = os.isatty(sys.stdout.fileno()) if hasattr(sys.stdout, "fileno") else False + else: + self._color = color + + def _style(self, code: str, text: str) -> str: + if not self._color: + return text + return f"{code}{text}\033[0m" + + def info(self, msg: str) -> None: + if self.quiet: + return + tag = self._style("\033[36m", "[INFO]") + print(f"{tag} {msg}") + + def warn(self, msg: str) -> None: + tag = self._style("\033[33m", "[WARN]") + print(f"{tag} {msg}") + + def error(self, msg: str) -> None: + tag = self._style("\033[31m", "[ERROR]") + print(f"{tag} {msg}", file=sys.stderr) + + def success(self, msg: str) -> None: + tag = self._style("\033[32m", "[OK]") + print(f"{tag} {msg}") + + def table(self, headers: list[str], rows: list[list[str]]) -> None: + if not rows: + return + widths = [len(h) for h in headers] + for row in rows: + for i, cell in enumerate(row): + if i < len(widths): + widths[i] = max(widths[i], len(str(cell))) + + header_line = " ".join(f"{h:<{widths[i]}}" for i, h in enumerate(headers)) + if self._color: + print(f"\033[1m{header_line}\033[0m") + else: + print(header_line) + print(" ".join("-" * w for w in widths)) + for row in rows: + print(" ".join(f"{str(cell):<{widths[i]}}" for i, cell in enumerate(row))) + + def print_plan(self, operations: list[Any], *, verb: str = "execute") -> None: + if not operations: + self.info(f"Nothing to {verb}.") + return + self.info(f"Plan ({len(operations)} operation(s)):") + for op in operations: + print(f" {op}") +``` + +- [ ] **Step 3: Run tests** + +Run: `python -m pytest tests/test_core_console.py -v` +Expected: All passed + +- [ ] **Step 4: Commit** + +```bash +git add src/flow/core/console.py tests/test_core_console.py +git commit -m "feat: add Console with color/quiet/TTY support" +``` + +--- + +### Task 6: Runtime primitives (CommandRunner, FileSystem, GitClient, SystemRuntime) + +**Files:** +- Create: `src/flow/core/runtime.py` +- Create: `tests/test_core_runtime.py` + +- [ ] **Step 1: Write tests for FileSystem** + +```python +# tests/test_core_runtime.py +"""Tests for flow.core.runtime.""" + +from pathlib import Path + +from flow.core.runtime import CommandRunner, FileSystem, GitClient, SystemRuntime + + +class TestFileSystem: + def test_ensure_dir_creates_nested(self, tmp_path): + fs = FileSystem() + target = tmp_path / "a" / "b" / "c" + fs.ensure_dir(target) + assert target.is_dir() + + def test_write_and_read_text(self, tmp_path): + fs = FileSystem() + path = tmp_path / "test.txt" + fs.write_text(path, "hello") + assert fs.read_text(path) == "hello" + + def test_read_text_default(self, tmp_path): + fs = FileSystem() + path = tmp_path / "missing.txt" + assert fs.read_text(path, default="fallback") == "fallback" + + def test_write_and_read_json(self, tmp_path): + fs = FileSystem() + path = tmp_path / "data.json" + fs.write_json(path, {"key": "value"}) + assert fs.read_json(path) == {"key": "value"} + + def test_create_symlink(self, tmp_path): + fs = FileSystem() + source = tmp_path / "source" + source.write_text("content") + target = tmp_path / "link" + fs.create_symlink(source, target) + assert target.is_symlink() + assert target.resolve() == source.resolve() + + def test_same_symlink_true(self, tmp_path): + fs = FileSystem() + source = tmp_path / "source" + source.write_text("content") + target = tmp_path / "link" + target.symlink_to(source) + assert fs.same_symlink(target, source) is True + + def test_same_symlink_false(self, tmp_path): + fs = FileSystem() + source = tmp_path / "source" + source.write_text("content") + other = tmp_path / "other" + other.write_text("other") + target = tmp_path / "link" + target.symlink_to(other) + assert fs.same_symlink(target, source) is False + + def test_remove_file(self, tmp_path): + fs = FileSystem() + path = tmp_path / "file" + path.write_text("x") + fs.remove_file(path) + assert not path.exists() + + def test_remove_file_missing_ok(self, tmp_path): + fs = FileSystem() + fs.remove_file(tmp_path / "missing", missing_ok=True) # no error + + def test_copy_file(self, tmp_path): + fs = FileSystem() + src = tmp_path / "src" + src.write_text("data") + dst = tmp_path / "sub" / "dst" + fs.copy_file(src, dst) + assert dst.read_text() == "data" + + +class TestCommandRunner: + def test_run_echo(self): + runner = CommandRunner() + result = runner.run(["echo", "hello"], capture_output=True) + assert result.stdout.strip() == "hello" + + def test_require_binary_finds_echo(self): + runner = CommandRunner() + path = runner.require_binary("echo") + assert path is not None + + +class TestSystemRuntime: + def test_creates_git_client(self): + rt = SystemRuntime() + assert isinstance(rt.git, GitClient) + assert rt.git.runner is rt.runner +``` + +- [ ] **Step 2: Implement runtime.py** + +Implement `CommandRunner`, `FileSystem`, `GitClient`, and `SystemRuntime` per the spec in Section 3.3. Port the working implementations from the current `src/flow/core/system.py` but: +- Remove the duplicate `write_bytes` method +- Use `FlowError` instead of `RuntimeError` for error wrapping +- Keep the same API surface + +```python +# src/flow/core/runtime.py +"""Runtime primitives for process, git, state, and filesystem access.""" + +from __future__ import annotations + +import json +import os +import shlex +import shutil +import subprocess +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Iterable, Mapping, Optional, Sequence + +from flow.core.console import Console +from flow.core.errors import FlowError + + +class CommandRunner: + """Subprocess wrapper with consistent defaults.""" + + def run( + self, + argv: Sequence[str] | Iterable[str], + *, + cwd: Optional[Path] = None, + env: Optional[Mapping[str, str]] = None, + capture_output: bool = True, + check: bool = False, + timeout: Optional[float] = None, + ) -> subprocess.CompletedProcess[str]: + parts = [str(a) for a in argv] + completed = subprocess.run( + parts, + cwd=str(cwd) if cwd else None, + env=dict(env) if env else None, + capture_output=capture_output, + text=True, + check=False, + timeout=timeout, + ) + if check and completed.returncode != 0: + msg = completed.stderr.strip() or completed.stdout.strip() + if not msg: + msg = f"Command failed with exit code {completed.returncode}" + raise FlowError(msg) + return completed + + def run_shell( + self, + command: str, + *, + cwd: Optional[Path] = None, + env: Optional[Mapping[str, str]] = None, + capture_output: bool = True, + check: bool = False, + timeout: Optional[float] = None, + ) -> subprocess.CompletedProcess[str]: + completed = subprocess.run( + command, + shell=True, + cwd=str(cwd) if cwd else None, + env=dict(env) if env else None, + capture_output=capture_output, + text=True, + check=False, + timeout=timeout, + ) + if check and completed.returncode != 0: + msg = completed.stderr.strip() or completed.stdout.strip() + if not msg: + msg = f"Command failed with exit code {completed.returncode}" + raise FlowError(msg) + return completed + + def stream_shell( + self, + command: str, + console: Console, + *, + check: bool = True, + ) -> subprocess.CompletedProcess[str]: + process = subprocess.Popen( + command, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + ) + lines: list[str] = [] + assert process.stdout is not None + try: + for line in process.stdout: + stripped = line.rstrip() + if stripped: + lines.append(stripped) + finally: + process.stdout.close() + process.wait() + + if check and process.returncode != 0: + raise FlowError(f"Command failed (exit {process.returncode}): {command}") + + return subprocess.CompletedProcess(command, process.returncode, stdout="\n".join(lines), stderr="") + + def require_binary(self, name: str) -> str: + path = shutil.which(name) + if path is None: + raise FlowError(f"Required executable not found: {name}") + return path + + +class FileSystem: + """Filesystem wrapper for all mutating operations.""" + + def ensure_dir(self, path: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None, mode: Optional[int] = None) -> None: + if sudo: + if runner is None: + raise FlowError("Runner required for sudo operations") + runner.require_binary("sudo") + argv: list[str] = ["sudo", "mkdir", "-p"] + if mode is not None: + argv.extend(["-m", f"{mode:o}"]) + argv.append(str(path)) + runner.run(argv, check=True) + return + path.mkdir(parents=True, exist_ok=True) + if mode is not None: + path.chmod(mode) + + def remove_file(self, path: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None, missing_ok: bool = True) -> None: + if sudo: + if runner is None: + raise FlowError("Runner required for sudo operations") + argv = ["sudo", "rm"] + if missing_ok: + argv.append("-f") + argv.append(str(path)) + runner.run(argv, check=True) + return + try: + path.unlink() + except FileNotFoundError: + if not missing_ok: + raise + + def remove_tree(self, path: Path) -> None: + shutil.rmtree(path, ignore_errors=True) + + def copy_file(self, source: Path, target: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None) -> None: + if sudo: + if runner is None: + raise FlowError("Runner required for sudo operations") + self.ensure_dir(target.parent, sudo=True, runner=runner) + runner.run(["sudo", "cp", "-a", str(source), str(target)], check=True) + return + self.ensure_dir(target.parent) + shutil.copy2(source, target) + + def copy_tree(self, source: Path, target: Path) -> None: + self.ensure_dir(target.parent) + shutil.copytree(source, target, dirs_exist_ok=True) + + def create_symlink(self, source: Path, target: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None) -> None: + if sudo: + if runner is None: + raise FlowError("Runner required for sudo operations") + self.ensure_dir(target.parent, sudo=True, runner=runner) + runner.run(["sudo", "ln", "-sfn", str(source), str(target)], check=True) + return + self.ensure_dir(target.parent) + target.symlink_to(source) + + def same_symlink(self, target: Path, source: Path) -> bool: + if not target.is_symlink(): + return False + return target.resolve(strict=False) == source.resolve(strict=False) + + def read_text(self, path: Path, *, default: Optional[str] = None) -> str: + try: + return path.read_text(encoding="utf-8") + except FileNotFoundError: + if default is None: + raise + return default + + def write_text(self, path: Path, content: str) -> None: + self.ensure_dir(path.parent) + path.write_text(content, encoding="utf-8") + + def write_bytes(self, path: Path, content: bytes) -> None: + self.ensure_dir(path.parent) + path.write_bytes(content) + + def read_json(self, path: Path, *, default: Any = None) -> Any: + try: + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + except FileNotFoundError: + return default + + def write_json(self, path: Path, data: Any) -> None: + self.ensure_dir(path.parent) + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + + +class GitClient: + """Git adapter scoped to a repository root.""" + + def __init__(self, runner: CommandRunner): + self.runner = runner + + def run(self, repo_dir: Path, *args: str, capture_output: bool = True, check: bool = False) -> subprocess.CompletedProcess[str]: + return self.runner.run( + ["git", "-C", str(repo_dir), *args], + capture_output=capture_output, + check=check, + ) + + +@dataclass +class SystemRuntime: + """Shared runtime dependencies.""" + runner: CommandRunner = field(default_factory=CommandRunner) + fs: FileSystem = field(default_factory=FileSystem) + git: GitClient = field(init=False) + + def __post_init__(self) -> None: + self.git = GitClient(self.runner) +``` + +- [ ] **Step 3: Run tests** + +Run: `python -m pytest tests/test_core_runtime.py -v` +Expected: All passed + +- [ ] **Step 4: Commit** + +```bash +git add src/flow/core/runtime.py tests/test_core_runtime.py +git commit -m "feat: add runtime primitives (CommandRunner, FileSystem, GitClient)" +``` + +--- + +### Task 7: Config loading and FlowContext + +**Files:** +- Create: `src/flow/core/config.py` +- Create: `tests/test_core_config.py` + +- [ ] **Step 1: Write tests** + +```python +# tests/test_core_config.py +"""Tests for flow.core.config.""" + +from pathlib import Path + +import pytest + +from flow.core.config import AppConfig, load_config, load_manifest + + +def test_load_config_missing_path(tmp_path): + cfg = load_config(tmp_path / "nonexistent") + assert isinstance(cfg, AppConfig) + assert cfg.dotfiles_url == "" + assert cfg.container_registry == "registry.tomastm.com" + + +def test_load_config_from_yaml(tmp_path): + (tmp_path / "config.yaml").write_text( + "repository:\n" + " url: git@github.com:user/dots.git\n" + " branch: dev\n" + "paths:\n" + " projects: ~/code\n" + "defaults:\n" + " container-registry: my.registry.com\n" + " tmux-session: main\n" + ) + cfg = load_config(tmp_path) + assert cfg.dotfiles_url == "git@github.com:user/dots.git" + assert cfg.dotfiles_branch == "dev" + assert cfg.projects_dir == "~/code" + assert cfg.container_registry == "my.registry.com" + assert cfg.tmux_session == "main" + + +def test_load_config_parses_targets_shorthand(tmp_path): + (tmp_path / "config.yaml").write_text( + "targets:\n" + " personal@orb: personal.orb\n" + ) + cfg = load_config(tmp_path) + assert len(cfg.targets) == 1 + assert cfg.targets[0].namespace == "personal" + assert cfg.targets[0].platform == "orb" + assert cfg.targets[0].host == "personal.orb" + + +def test_load_config_parses_targets_dict(tmp_path): + (tmp_path / "config.yaml").write_text( + "targets:\n" + " work@ec2:\n" + " host: work.ec2.internal\n" + " identity: ~/.ssh/id_work\n" + ) + cfg = load_config(tmp_path) + assert len(cfg.targets) == 1 + assert cfg.targets[0].host == "work.ec2.internal" + assert cfg.targets[0].identity == "~/.ssh/id_work" + + +def test_load_manifest_returns_dict(tmp_path): + (tmp_path / "manifest.yaml").write_text( + "packages:\n" + " - name: fd\n" + " type: pkg\n" + ) + data = load_manifest(tmp_path) + assert isinstance(data, dict) + assert "packages" in data + + +def test_load_manifest_merges_files(tmp_path): + (tmp_path / "01-packages.yaml").write_text("packages:\n - name: fd\n type: pkg\n") + (tmp_path / "02-profiles.yaml").write_text("profiles:\n work:\n os: linux\n") + data = load_manifest(tmp_path) + assert "packages" in data + assert "profiles" in data +``` + +- [ ] **Step 2: Implement config.py** + +Port config loading from current `src/flow/core/config.py` with these changes: +- Use `TargetConfig` dataclass (new) matching spec Section 7.1 +- Use `FlowContext` dataclass matching spec Section 3.4 +- Support both shorthand string and dict target formats +- Remove all `_get_value` multi-key fallback logic (one canonical form per CLAUDE.md) + +The config parser normalizes target entries: +- `"personal@orb: personal.orb"` -> `TargetConfig(namespace="personal", platform="orb", host="personal.orb")` +- Dict form: `TargetConfig(namespace=..., platform=..., host=..., identity=...)` + +Key types: + +```python +@dataclass(frozen=True) +class TargetConfig: + namespace: str + platform: str + host: str + identity: str | None = None + +@dataclass +class AppConfig: + dotfiles_url: str = "" + dotfiles_branch: str = "main" + projects_dir: str = "~/projects" + container_registry: str = "registry.tomastm.com" + container_tag: str = "latest" + tmux_session: str = "default" + targets: list[TargetConfig] = field(default_factory=list) + +@dataclass +class FlowContext: + config: AppConfig + manifest: dict[str, Any] + platform: PlatformInfo + console: Console + runtime: SystemRuntime = field(default_factory=SystemRuntime) +``` + +- [ ] **Step 3: Run tests** + +Run: `python -m pytest tests/test_core_config.py -v` +Expected: All passed + +- [ ] **Step 4: Commit** + +```bash +git add src/flow/core/config.py tests/test_core_config.py +git commit -m "feat: add config loading and FlowContext" +``` + +--- + +## Chunk 2: Dotfiles Domain + +### Task 8: Dotfiles models + +**Files:** +- Create: `src/flow/domain/__init__.py` (empty) +- Create: `src/flow/domain/dotfiles/__init__.py` (empty) +- Create: `src/flow/domain/dotfiles/models.py` +- Create: `tests/test_domain_dotfiles_models.py` + +- [ ] **Step 1: Write models** + +```python +# src/flow/domain/dotfiles/models.py +"""Dotfiles domain models -- all frozen dataclasses.""" + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional + + +@dataclass(frozen=True) +class ModuleRef: + """An external git repo providing content for a package subtree.""" + source: str + ref_type: str # "branch" | "tag" | "commit" + ref_value: str + mount_path: Path # Relative path within package to _module.yaml parent + cache_dir: Path # Where the repo is cloned + module_files: tuple[tuple[Path, Path], ...] # (abs_source, rel_to_cache_root) + + +@dataclass(frozen=True) +class Package: + """A dotfiles package: a named set of files mapping to home-relative targets.""" + name: str # e.g. "zsh", "nvim" + layer: str # "_shared" or profile name + package_id: str # "layer/name" + source_dir: Path # Absolute path in dotfiles repo + module: Optional[ModuleRef] + local_files: tuple[tuple[Path, Path], ...] # (abs_source, rel_to_package_root) + + +@dataclass(frozen=True) +class LinkTarget: + """A single file that should be linked into the filesystem.""" + source: Path + target: Path + package: str # package_id + from_module: bool + needs_sudo: bool + + +@dataclass(frozen=True) +class LinkOp: + """A single operation in a link plan.""" + type: str # "create_link" | "remove_link" | "create_dir" + target: Path + source: Optional[Path] + package: str + needs_sudo: bool + + def __str__(self) -> str: + if self.type == "create_link": + sudo = " (sudo)" if self.needs_sudo else "" + return f"LINK: {self.target} -> {self.source}{sudo}" + if self.type == "remove_link": + return f"REMOVE: {self.target}" + if self.type == "create_dir": + return f"MKDIR: {self.target}" + return f"{self.type}: {self.target}" + + +@dataclass(frozen=True) +class PlanSummary: + added: int + removed: int + unchanged: int + from_modules: int + + +@dataclass(frozen=True) +class LinkPlan: + """Complete reconciliation plan.""" + operations: list[LinkOp] + conflicts: list[str] + summary: PlanSummary + + +@dataclass +class LinkedState: + """Persisted link state.""" + links: dict[Path, LinkTarget] = field(default_factory=dict) + + def as_dict(self) -> dict: + grouped: dict[str, dict[str, dict]] = {} + for target, lt in sorted(self.links.items(), key=lambda x: str(x[0])): + pkg_links = grouped.setdefault(lt.package, {}) + pkg_links[str(target)] = { + "source": str(lt.source), + "from_module": lt.from_module, + "needs_sudo": lt.needs_sudo, + } + return {"version": 2, "links": grouped} + + @classmethod + def from_dict(cls, data: dict) -> "LinkedState": + version = data.get("version") + if version is not None and version != 2: + from flow.core.errors import ConfigError + raise ConfigError( + f"Unsupported linked.json version {version}. " + "Delete ~/.local/state/flow/linked.json and relink." + ) + links: dict[Path, LinkTarget] = {} + raw_links = data.get("links", {}) + for package, pkg_links in raw_links.items(): + for target_str, info in pkg_links.items(): + links[Path(target_str)] = LinkTarget( + source=Path(info["source"]), + target=Path(target_str), + package=str(package), + from_module=bool(info.get("from_module", False)), + needs_sudo=bool(info.get("needs_sudo", False)), + ) + return cls(links=links) + + +@dataclass(frozen=True) +class RepoInfo: + """A managed git repo (dotfiles or module).""" + name: str + path: Path + source: str + is_module: bool +``` + +- [ ] **Step 2: Write tests** + +```python +# tests/test_domain_dotfiles_models.py +"""Tests for dotfiles domain models.""" + +from pathlib import Path + +from flow.domain.dotfiles.models import ( + LinkOp, + LinkPlan, + LinkTarget, + LinkedState, + ModuleRef, + Package, + PlanSummary, +) + + +def test_link_op_str_create(): + op = LinkOp(type="create_link", target=Path("/home/x/.zshrc"), + source=Path("/dots/zsh/.zshrc"), package="_shared/zsh", needs_sudo=False) + assert "LINK:" in str(op) + assert ".zshrc" in str(op) + + +def test_link_op_str_sudo(): + op = LinkOp(type="create_link", target=Path("/etc/hosts"), + source=Path("/dots/dns/hosts"), package="_shared/dns", needs_sudo=True) + assert "(sudo)" in str(op) + + +def test_linked_state_roundtrip(): + lt = LinkTarget(source=Path("/a"), target=Path("/b"), package="p", from_module=False, needs_sudo=False) + state = LinkedState(links={Path("/b"): lt}) + data = state.as_dict() + restored = LinkedState.from_dict(data) + assert Path("/b") in restored.links + assert restored.links[Path("/b")].source == Path("/a") + assert restored.links[Path("/b")].package == "p" + + +def test_linked_state_empty(): + state = LinkedState.from_dict({}) + assert state.links == {} + + +def test_package_has_id(): + pkg = Package(name="zsh", layer="_shared", package_id="_shared/zsh", + source_dir=Path("/dots/_shared/zsh"), module=None, local_files=()) + assert pkg.package_id == "_shared/zsh" +``` + +- [ ] **Step 3: Run tests** + +Run: `python -m pytest tests/test_domain_dotfiles_models.py -v` +Expected: All passed + +- [ ] **Step 4: Commit** + +```bash +git add src/flow/domain/__init__.py src/flow/domain/dotfiles/__init__.py \ + src/flow/domain/dotfiles/models.py tests/test_domain_dotfiles_models.py +git commit -m "feat: add dotfiles domain models" +``` + +--- + +### Task 9: Dotfiles modules (mount path, cache dir, source normalization) + +**Files:** +- Create: `src/flow/domain/dotfiles/modules.py` +- Create: `tests/test_domain_dotfiles_modules.py` + +- [ ] **Step 1: Write tests** + +```python +# tests/test_domain_dotfiles_modules.py +"""Tests for dotfiles module resolution -- the core bug fix.""" + +from pathlib import Path + +import pytest + +from flow.core.errors import ConfigError +from flow.domain.dotfiles.modules import ( + compute_mount_path, + module_cache_dir, + normalize_source, + parse_module_ref, +) + + +class TestComputeMountPath: + def test_nested_module(self): + """_shared/nvim/.config/nvim/_module.yaml -> .config/nvim""" + result = compute_mount_path( + module_yaml=Path("/dots/_shared/nvim/.config/nvim/_module.yaml"), + package_dir=Path("/dots/_shared/nvim"), + ) + assert result == Path(".config/nvim") + + def test_root_level_module(self): + """_shared/nvim/_module.yaml -> Path('.')""" + result = compute_mount_path( + module_yaml=Path("/dots/_shared/nvim/_module.yaml"), + package_dir=Path("/dots/_shared/nvim"), + ) + assert result == Path(".") + + def test_deeply_nested(self): + result = compute_mount_path( + module_yaml=Path("/dots/_shared/pkg/.config/a/b/c/_module.yaml"), + package_dir=Path("/dots/_shared/pkg"), + ) + assert result == Path(".config/a/b/c") + + +class TestModuleCacheDir: + def test_simple_name(self): + result = module_cache_dir("_shared/nvim", Path("/home/x/.local/share/flow/modules")) + assert result == Path("/home/x/.local/share/flow/modules/_shared--nvim") + + def test_profile_name(self): + result = module_cache_dir("linux-work/nvim", Path("/m")) + assert result == Path("/m/linux-work--nvim") + + +class TestNormalizeSource: + def test_github_shorthand(self): + assert normalize_source("github:org/repo") == "https://github.com/org/repo.git" + + def test_full_url_passthrough(self): + assert normalize_source("https://example.com/repo.git") == "https://example.com/repo.git" + + def test_ssh_passthrough(self): + assert normalize_source("git@github.com:org/repo.git") == "git@github.com:org/repo.git" + + +class TestParseModuleRef: + def test_branch_ref(self): + raw = {"source": "github:org/nvim-config", "ref": {"branch": "main"}} + ref = parse_module_ref( + raw, package_id="_shared/nvim", + mount_path=Path(".config/nvim"), + modules_base=Path("/modules"), + ) + assert ref.source == "https://github.com/org/nvim-config.git" + assert ref.ref_type == "branch" + assert ref.ref_value == "main" + assert ref.mount_path == Path(".config/nvim") + assert ref.cache_dir == Path("/modules/_shared--nvim") + + def test_tag_ref(self): + raw = {"source": "github:org/repo", "ref": {"tag": "v1.0"}} + ref = parse_module_ref(raw, "p/x", Path("."), Path("/m")) + assert ref.ref_type == "tag" + assert ref.ref_value == "v1.0" + + def test_missing_source_raises(self): + with pytest.raises(ConfigError): + parse_module_ref({}, "p/x", Path("."), Path("/m")) + + def test_missing_ref_raises(self): + raw = {"source": "github:org/repo"} + with pytest.raises(ConfigError): + parse_module_ref(raw, "p/x", Path("."), Path("/m")) + + def test_ref_not_dict_raises(self): + raw = {"source": "github:org/repo", "ref": "main"} + with pytest.raises(ConfigError): + parse_module_ref(raw, "p/x", Path("."), Path("/m")) + + def test_ambiguous_ref_raises(self): + raw = {"source": "github:org/repo", "ref": {"branch": "main", "tag": "v1"}} + with pytest.raises(ConfigError): + parse_module_ref(raw, "p/x", Path("."), Path("/m")) +``` + +- [ ] **Step 2: Implement modules.py** + +```python +# src/flow/domain/dotfiles/modules.py +"""Module metadata resolution -- pure functions.""" + +from pathlib import Path +from typing import Tuple + +from flow.core.errors import ConfigError +from flow.domain.dotfiles.models import ModuleRef + + +def compute_mount_path(module_yaml: Path, package_dir: Path) -> Path: + """Relative path from package root to _module.yaml parent.""" + rel = module_yaml.parent.relative_to(package_dir) + return rel + + +def module_cache_dir(package_id: str, modules_base: Path) -> Path: + """Cache dir for a module clone. '/' -> '--' to avoid collisions.""" + return modules_base / package_id.replace("/", "--") + + +def normalize_source(source: str) -> str: + """Normalize git source URL. github:org/repo -> https://github.com/org/repo.git""" + if source.startswith("github:"): + repo = source.split(":", 1)[1] + return f"https://github.com/{repo}.git" + return source + + +def parse_module_ref( + raw: dict, + package_id: str, + mount_path: Path, + modules_base: Path, +) -> ModuleRef: + """Build ModuleRef from parsed _module.yaml content.""" + source = raw.get("source") + if not isinstance(source, str) or not source: + raise ConfigError(f"Module for {package_id}: 'source' must be a non-empty string") + + ref = raw.get("ref") + if not isinstance(ref, dict): + raise ConfigError(f"Module for {package_id}: 'ref' must be a mapping") + + choices = [k for k in ("branch", "tag", "commit") if isinstance(ref.get(k), str) and ref[k]] + if len(choices) != 1: + raise ConfigError(f"Module for {package_id}: 'ref' must have exactly one of: branch, tag, commit") + + ref_type = choices[0] + ref_value = str(ref[ref_type]) + + return ModuleRef( + source=normalize_source(source), + ref_type=ref_type, + ref_value=ref_value, + mount_path=mount_path, + cache_dir=module_cache_dir(package_id, modules_base), + module_files=(), # Populated by service after cloning + ) +``` + +- [ ] **Step 3: Run tests** + +Run: `python -m pytest tests/test_domain_dotfiles_modules.py -v` +Expected: All passed + +- [ ] **Step 4: Commit** + +```bash +git add src/flow/domain/dotfiles/modules.py tests/test_domain_dotfiles_modules.py +git commit -m "feat: add dotfiles module resolution (fixes _module.yaml path bug)" +``` + +--- + +### Task 10: Dotfiles resolution (package -> LinkTargets) + +**Files:** +- Create: `src/flow/domain/dotfiles/resolution.py` +- Create: `tests/test_domain_dotfiles_resolution.py` + +- [ ] **Step 1: Write tests** + +```python +# tests/test_domain_dotfiles_resolution.py +"""Tests for dotfiles path resolution.""" + +from pathlib import Path + +import pytest + +from flow.core.errors import PlanConflict +from flow.domain.dotfiles.models import LinkTarget, ModuleRef, Package +from flow.domain.dotfiles.resolution import resolve_all_targets, resolve_package_targets + +RESERVED_ROOT = "_root" +HOME = Path("/home/testuser") + + +def _pkg(name, layer="_shared", files=(), module=None): + return Package( + name=name, + layer=layer, + package_id=f"{layer}/{name}", + source_dir=Path(f"/dots/{layer}/{name}"), + module=module, + local_files=tuple(files), + ) + + +class TestResolvePackageTargets: + def test_simple_file(self): + pkg = _pkg("zsh", files=[ + (Path("/dots/_shared/zsh/.zshrc"), Path(".zshrc")), + ]) + targets = resolve_package_targets(pkg, HOME, set()) + assert len(targets) == 1 + assert targets[0].target == HOME / ".zshrc" + assert targets[0].source == Path("/dots/_shared/zsh/.zshrc") + assert targets[0].from_module is False + + def test_nested_config(self): + pkg = _pkg("git", files=[ + (Path("/dots/_shared/git/.config/git/config"), Path(".config/git/config")), + ]) + targets = resolve_package_targets(pkg, HOME, set()) + assert targets[0].target == HOME / ".config" / "git" / "config" + + def test_root_marker(self): + pkg = _pkg("dns", files=[ + (Path("/dots/_shared/dns/_root/etc/hosts"), Path("_root/etc/hosts")), + ]) + targets = resolve_package_targets(pkg, HOME, set()) + assert targets[0].target == Path("/etc/hosts") + assert targets[0].needs_sudo is True + + def test_root_marker_skipped_when_in_skip_set(self): + pkg = _pkg("dns", files=[ + (Path("/dots/_shared/dns/_root/etc/hosts"), Path("_root/etc/hosts")), + ]) + targets = resolve_package_targets(pkg, HOME, {"_root"}) + assert len(targets) == 0 + + def test_skip_package_by_name(self): + pkg = _pkg("nvim", files=[ + (Path("/dots/_shared/nvim/.config/nvim/init.lua"), Path(".config/nvim/init.lua")), + ]) + targets = resolve_package_targets(pkg, HOME, {"nvim"}) + assert len(targets) == 0 + + def test_module_files_linked_under_mount_path(self): + module = ModuleRef( + source="https://github.com/org/nvim-config.git", + ref_type="branch", + ref_value="main", + mount_path=Path(".config/nvim"), + cache_dir=Path("/modules/_shared--nvim"), + module_files=( + (Path("/modules/_shared--nvim/init.lua"), Path("init.lua")), + (Path("/modules/_shared--nvim/lua/plugins.lua"), Path("lua/plugins.lua")), + ), + ) + pkg = _pkg("nvim", files=[ + (Path("/dots/_shared/nvim/.local/bin/nvim-wrapper"), Path(".local/bin/nvim-wrapper")), + ], module=module) + + targets = resolve_package_targets(pkg, HOME, set()) + + # Local file outside mount_path + local_targets = [t for t in targets if not t.from_module] + assert len(local_targets) == 1 + assert local_targets[0].target == HOME / ".local" / "bin" / "nvim-wrapper" + + # Module files under mount_path + module_targets = [t for t in targets if t.from_module] + assert len(module_targets) == 2 + module_target_paths = {t.target for t in module_targets} + assert HOME / ".config" / "nvim" / "init.lua" in module_target_paths + assert HOME / ".config" / "nvim" / "lua" / "plugins.lua" in module_target_paths + + def test_module_yaml_file_not_linked(self): + """The _module.yaml marker itself should never be linked.""" + pkg = _pkg("nvim", files=[ + (Path("/dots/_shared/nvim/.config/nvim/_module.yaml"), Path(".config/nvim/_module.yaml")), + ], module=ModuleRef( + source="x", ref_type="branch", ref_value="main", + mount_path=Path(".config/nvim"), + cache_dir=Path("/m"), module_files=(), + )) + targets = resolve_package_targets(pkg, HOME, set()) + assert not any(t.target.name == "_module.yaml" for t in targets) + + def test_root_level_module_skips_all_local_files(self): + """When mount_path is '.', all local files are from the module, not dotfiles repo.""" + module = ModuleRef( + source="x", ref_type="branch", ref_value="main", + mount_path=Path("."), + cache_dir=Path("/m"), + module_files=( + (Path("/m/init.lua"), Path("init.lua")), + ), + ) + pkg = _pkg("nvim", files=[ + (Path("/dots/_shared/nvim/_module.yaml"), Path("_module.yaml")), + (Path("/dots/_shared/nvim/stale-file.txt"), Path("stale-file.txt")), + ], module=module) + targets = resolve_package_targets(pkg, HOME, set()) + # Only module files should appear, local files skipped + assert len(targets) == 1 + assert targets[0].from_module is True + assert targets[0].target == HOME / "init.lua" + + +class TestResolveAllTargets: + def test_no_conflicts(self): + pkgs = [ + _pkg("zsh", files=[(Path("/a/.zshrc"), Path(".zshrc"))]), + _pkg("git", files=[(Path("/a/.gitconfig"), Path(".gitconfig"))]), + ] + targets = resolve_all_targets(pkgs, HOME, set()) + assert len(targets) == 2 + + def test_duplicate_target_raises(self): + pkgs = [ + _pkg("zsh", layer="_shared", files=[(Path("/a/.zshrc"), Path(".zshrc"))]), + _pkg("zsh", layer="work", files=[(Path("/b/.zshrc"), Path(".zshrc"))]), + ] + with pytest.raises(PlanConflict): + resolve_all_targets(pkgs, HOME, set()) +``` + +- [ ] **Step 2: Implement resolution.py** + +```python +# src/flow/domain/dotfiles/resolution.py +"""Path resolution: package -> home-relative LinkTargets. Pure functions.""" + +from pathlib import Path + +from flow.core.errors import PlanConflict +from flow.domain.dotfiles.models import LinkTarget, Package + +RESERVED_ROOT = "_root" +MODULE_FILE = "_module.yaml" + + +def resolve_package_targets( + package: Package, + home: Path, + skip: set[str], +) -> list[LinkTarget]: + """Resolve all LinkTargets for a package, handling modules correctly.""" + if package.name in skip: + return [] + + targets: list[LinkTarget] = [] + mount_path = package.module.mount_path if package.module else None + + # Local files (from dotfiles repo) + for abs_source, rel in package.local_files: + # Skip _module.yaml + if rel.name == MODULE_FILE: + continue + + # If module exists, skip files inside mount_path (module provides those) + if mount_path is not None: + if mount_path == Path("."): + continue # Root-level module: all local files are superseded by module + try: + rel.relative_to(mount_path) + continue # Inside mount_path, skip + except ValueError: + pass # Outside mount_path, process normally + + target, needs_sudo = _resolve_target(rel, home, skip) + if target is None: + continue + targets.append(LinkTarget( + source=abs_source, target=target, + package=package.package_id, + from_module=False, needs_sudo=needs_sudo, + )) + + # Module files + if package.module: + for abs_source, rel in package.module.module_files: + mounted = package.module.mount_path / rel if package.module.mount_path != Path(".") else rel + target, needs_sudo = _resolve_target(mounted, home, skip) + if target is None: + continue + targets.append(LinkTarget( + source=abs_source, target=target, + package=package.package_id, + from_module=True, needs_sudo=needs_sudo, + )) + + return targets + + +def _resolve_target(rel: Path, home: Path, skip: set[str]) -> tuple[Path | None, bool]: + """Resolve a relative path to an absolute target. Returns (target, needs_sudo).""" + parts = rel.parts + if parts and parts[0] == RESERVED_ROOT: + if RESERVED_ROOT in skip: + return None, False + if len(parts) < 2: + return None, False + return Path("/") / Path(*parts[1:]), True + return home / rel, False + + +def resolve_all_targets( + packages: list[Package], + home: Path, + skip: set[str], +) -> list[LinkTarget]: + """Resolve targets for all packages. Raises PlanConflict on duplicate targets.""" + all_targets: list[LinkTarget] = [] + seen: dict[Path, str] = {} + + for pkg in packages: + targets = resolve_package_targets(pkg, home, skip) + for t in targets: + if t.target in seen: + conflicts = [ + f"{t.target} claimed by both {seen[t.target]} and {t.package}" + ] + raise PlanConflict( + f"Conflicting dotfile targets across packages", + conflicts, + ) + seen[t.target] = t.package + all_targets.append(t) + + return all_targets +``` + +- [ ] **Step 3: Run tests** + +Run: `python -m pytest tests/test_domain_dotfiles_resolution.py -v` +Expected: All passed + +- [ ] **Step 4: Commit** + +```bash +git add src/flow/domain/dotfiles/resolution.py tests/test_domain_dotfiles_resolution.py +git commit -m "feat: add dotfiles path resolution with correct module mounting" +``` + +--- + +### Task 11: Dotfiles planning (link/unlink plans) + +**Note:** The spec's file layout lists `domain/dotfiles/conflicts.py` as a separate file. This is intentionally merged into `planning.py` -- cross-package collisions are caught by `resolve_all_targets` (Task 10), and filesystem conflicts are detected inside `plan_link` via the injected `filesystem_check` callback. No separate `conflicts.py` file is needed. + +**Files:** +- Create: `src/flow/domain/dotfiles/planning.py` +- Create: `tests/test_domain_dotfiles_planning.py` + +- [ ] **Step 1: Write tests** + +```python +# tests/test_domain_dotfiles_planning.py +"""Tests for dotfiles link planning.""" + +from pathlib import Path +from typing import Optional + +from flow.domain.dotfiles.models import ( + LinkOp, + LinkTarget, + LinkedState, +) +from flow.domain.dotfiles.planning import plan_link, plan_unlink + + +def _lt(target, source="/a", pkg="_shared/zsh", module=False, sudo=False): + return LinkTarget( + source=Path(source), target=Path(target), + package=pkg, from_module=module, needs_sudo=sudo, + ) + + +def _fs_check_none(path: Path) -> Optional[str]: + """Fake filesystem_check: nothing exists.""" + return None + + +def _fs_check_file(path: Path) -> Optional[str]: + """Fake: everything is a file.""" + return "file" + + +class TestPlanLink: + def test_new_target_creates_link(self): + desired = [_lt("/home/x/.zshrc")] + plan = plan_link(desired, LinkedState(), _fs_check_none) + assert len(plan.operations) == 1 + assert plan.operations[0].type == "create_link" + assert plan.summary.added == 1 + + def test_existing_correct_link_unchanged(self): + lt = _lt("/home/x/.zshrc") + current = LinkedState(links={Path("/home/x/.zshrc"): lt}) + plan = plan_link([lt], current, _fs_check_none) + assert len(plan.operations) == 0 + assert plan.summary.unchanged == 1 + + def test_stale_link_removed(self): + old = _lt("/home/x/.old") + current = LinkedState(links={Path("/home/x/.old"): old}) + plan = plan_link([], current, _fs_check_none) + assert len(plan.operations) == 1 + assert plan.operations[0].type == "remove_link" + assert plan.summary.removed == 1 + + def test_changed_source_produces_remove_then_create(self): + old = _lt("/home/x/.zshrc", source="/old") + new = _lt("/home/x/.zshrc", source="/new") + current = LinkedState(links={Path("/home/x/.zshrc"): old}) + plan = plan_link([new], current, _fs_check_none) + types = [op.type for op in plan.operations] + assert types == ["remove_link", "create_link"] + + def test_unmanaged_file_at_target_is_conflict(self): + desired = [_lt("/home/x/.zshrc")] + plan = plan_link(desired, LinkedState(), _fs_check_file) + assert len(plan.conflicts) == 1 + assert ".zshrc" in plan.conflicts[0] + + def test_module_targets_counted(self): + desired = [_lt("/home/x/.config/nvim/init.lua", module=True)] + plan = plan_link(desired, LinkedState(), _fs_check_none) + assert plan.summary.from_modules == 1 + + +class TestPlanUnlink: + def test_unlink_all(self): + lt = _lt("/home/x/.zshrc") + current = LinkedState(links={Path("/home/x/.zshrc"): lt}) + plan = plan_unlink(current, packages=None) + assert len(plan.operations) == 1 + assert plan.operations[0].type == "remove_link" + + def test_unlink_specific_package(self): + zsh = _lt("/home/x/.zshrc", pkg="_shared/zsh") + git = _lt("/home/x/.gitconfig", pkg="_shared/git") + current = LinkedState(links={ + Path("/home/x/.zshrc"): zsh, + Path("/home/x/.gitconfig"): git, + }) + plan = plan_unlink(current, packages=["_shared/zsh"]) + assert len(plan.operations) == 1 + assert plan.operations[0].target == Path("/home/x/.zshrc") + + def test_unlink_by_basename(self): + zsh = _lt("/home/x/.zshrc", pkg="_shared/zsh") + current = LinkedState(links={Path("/home/x/.zshrc"): zsh}) + plan = plan_unlink(current, packages=["zsh"]) + assert len(plan.operations) == 1 +``` + +- [ ] **Step 2: Implement planning.py** + +```python +# src/flow/domain/dotfiles/planning.py +"""Link/unlink plan computation. Pure functions with injected I/O.""" + +from pathlib import Path +from typing import Callable, Optional + +from flow.domain.dotfiles.models import ( + LinkOp, + LinkPlan, + LinkTarget, + LinkedState, + PlanSummary, +) + + +def plan_link( + desired: list[LinkTarget], + current: LinkedState, + filesystem_check: Callable[[Path], Optional[str]], +) -> LinkPlan: + """Build reconciliation plan. + + filesystem_check: injected by service. Returns "file", "dir", "symlink", or None. + """ + ops: list[LinkOp] = [] + conflicts: list[str] = [] + added = 0 + removed = 0 + unchanged = 0 + from_modules = 0 + + desired_map = {t.target: t for t in desired} + desired_targets = set(desired_map.keys()) + current_targets = set(current.links.keys()) + + # Removals: in current but not desired + for target in sorted(current_targets - desired_targets): + ops.append(LinkOp( + type="remove_link", target=target, source=None, + package=current.links[target].package, + needs_sudo=current.links[target].needs_sudo, + )) + removed += 1 + + # Additions, updates, and unchanged + for target in sorted(desired_targets): + spec = desired_map[target] + + if target in current.links: + cur = current.links[target] + if cur.source == spec.source: + unchanged += 1 + if spec.from_module: + from_modules += 1 + continue + # Source changed: remove old link, then create new one + ops.append(LinkOp( + type="remove_link", target=target, source=cur.source, + package=cur.package, needs_sudo=cur.needs_sudo, + )) + ops.append(LinkOp( + type="create_link", target=target, source=spec.source, + package=spec.package, needs_sudo=spec.needs_sudo, + )) + added += 1 + if spec.from_module: + from_modules += 1 + continue + + # New target: check filesystem for conflicts + fs_state = filesystem_check(target) + if fs_state is not None: + conflicts.append( + f"{target} already exists ({fs_state}) and is not managed by flow" + ) + continue + + ops.append(LinkOp( + type="create_link", target=target, source=spec.source, + package=spec.package, needs_sudo=spec.needs_sudo, + )) + added += 1 + if spec.from_module: + from_modules += 1 + + return LinkPlan( + operations=ops, + conflicts=conflicts, + summary=PlanSummary( + added=added, removed=removed, + unchanged=unchanged, from_modules=from_modules, + ), + ) + + +def plan_unlink( + current: LinkedState, + packages: Optional[list[str]], +) -> LinkPlan: + """Plan removal of managed links.""" + ops: list[LinkOp] = [] + + for target in sorted(current.links.keys()): + lt = current.links[target] + if packages is not None: + # Match by full package_id or by basename + basename = lt.package.split("/", 1)[-1] if "/" in lt.package else lt.package + if lt.package not in packages and basename not in packages: + continue + + ops.append(LinkOp( + type="remove_link", target=target, source=lt.source, + package=lt.package, needs_sudo=lt.needs_sudo, + )) + + return LinkPlan( + operations=ops, + conflicts=[], + summary=PlanSummary(added=0, removed=len(ops), unchanged=0, from_modules=0), + ) +``` + +- [ ] **Step 3: Run tests** + +Run: `python -m pytest tests/test_domain_dotfiles_planning.py -v` +Expected: All passed + +- [ ] **Step 4: Commit** + +```bash +git add src/flow/domain/dotfiles/planning.py tests/test_domain_dotfiles_planning.py +git commit -m "feat: add dotfiles link planning with conflict detection" +``` + +--- + +## Chunk 3: Packages Domain + +### Task 12: Package models + +**Files:** +- Create: `src/flow/domain/packages/__init__.py` (empty) +- Create: `src/flow/domain/packages/models.py` +- Create: `tests/test_domain_packages_models.py` + +- [ ] **Step 1: Write models** + +All frozen dataclasses as specified in spec Section 5.1: `PackageDef`, `ProfilePackageRef`, `PkgInstallOp`, `PkgRemoveOp`, `PackagePlan`, `InstalledPackage`, `InstalledState`. + +`InstalledState` serialization format: + +```python +# as_dict() produces: +{ + "version": 1, + "packages": { + "neovim": { + "version": "0.10.4", + "type": "binary", + "files": ["/home/x/.local/bin/nvim", "/home/x/.local/share/nvim"] + } + } +} + +# from_dict() parses the above. Raises ConfigError on version mismatch. +``` + +- [ ] **Step 2: Write tests** + +```python +# tests/test_domain_packages_models.py +"""Tests for packages domain models.""" + +from pathlib import Path + +import pytest + +from flow.core.errors import ConfigError +from flow.domain.packages.models import InstalledPackage, InstalledState, PackageDef + + +def test_installed_state_roundtrip(): + state = InstalledState(packages={ + "neovim": InstalledPackage( + name="neovim", version="0.10.4", type="binary", + files=[Path("/home/x/.local/bin/nvim")], + ), + }) + data = state.as_dict() + restored = InstalledState.from_dict(data) + assert "neovim" in restored.packages + assert restored.packages["neovim"].version == "0.10.4" + assert restored.packages["neovim"].files == [Path("/home/x/.local/bin/nvim")] + + +def test_installed_state_empty(): + state = InstalledState.from_dict({}) + assert state.packages == {} + + +def test_installed_state_version_mismatch(): + with pytest.raises(ConfigError): + InstalledState.from_dict({"version": 99, "packages": {}}) + + +def test_package_def_fields(): + pkg = PackageDef( + name="fd", type="pkg", sources={"apt": "fd-find"}, + source=None, version=None, asset_pattern=None, + platform_map={}, extract_dir=None, install={}, + post_install=None, allow_sudo=False, + ) + assert pkg.name == "fd" + assert pkg.type == "pkg" +``` + +- [ ] **Step 3: Run tests** + +Run: `python -m pytest tests/test_domain_packages_models.py -v` +Expected: All passed + +- [ ] **Step 4: Commit** + +```bash +git add src/flow/domain/packages/ tests/test_domain_packages_models.py +git commit -m "feat: add packages domain models" +``` + +--- + +### Task 13: Package catalog and resolution + +**Files:** +- Create: `src/flow/domain/packages/catalog.py` +- Create: `src/flow/domain/packages/resolution.py` +- Create: `tests/test_domain_packages.py` + +- [ ] **Step 1: Write tests covering:** +- `parse_catalog` with list and dict manifest formats +- `normalize_profile_entry` with string shorthand ("binary/neovim"), plain name, dict +- `resolve_spec` merging catalog and profile overrides +- `resolve_source_name` with apt/brew/dnf fallbacks +- `resolve_binary_asset` with platform map and asset pattern +- `resolve_download_url` with github shorthand and direct URL +- `pm_update_command` and `pm_install_command` for apt/dnf/brew +- `detect_package_manager` returning apt/dnf/None + +- [ ] **Step 2: Implement catalog.py and resolution.py** + +Implement the functions listed in spec Section 5.2 matching those exact signatures. Use `src/flow/services/package_defs.py` only as reference for resolution logic, not as source of truth -- the spec signatures are canonical. This is the single copy of all package resolution logic (no duplication with bootstrap). + +- [ ] **Step 3: Run tests** + +Run: `python -m pytest tests/test_domain_packages.py -v` + +- [ ] **Step 4: Commit** + +```bash +git add src/flow/domain/packages/ tests/test_domain_packages.py +git commit -m "feat: add packages catalog and resolution (single source of truth)" +``` + +--- + +### Task 14: Package planning + +**Files:** +- Create: `src/flow/domain/packages/planning.py` +- Create: `tests/test_domain_packages_planning.py` + +- [ ] **Step 1: Write tests for `plan_install` and `plan_remove`** + +- [ ] **Step 2: Implement planning.py** + +- [ ] **Step 3: Run tests, commit** + +```bash +git commit -m "feat: add package install/remove planning" +``` + +--- + +## Chunk 4: Bootstrap Domain + +### Task 15: Bootstrap models and setup modules + +**Files:** +- Create: `src/flow/domain/bootstrap/__init__.py` +- Create: `src/flow/domain/bootstrap/models.py` +- Create: `src/flow/domain/bootstrap/modules.py` +- Create: `tests/test_domain_bootstrap_modules.py` + +- [ ] **Step 1: Write models**: `Profile`, `SetupModuleDef`, `BootstrapAction`, `BootstrapPlan` + +- [ ] **Step 2: Write and test setup modules**: `HostnameModule`, `LocaleModule`, `ShellModule`, `SSHKeygenModule`, `RuncmdModule`. Each returns shell commands from `.plan()` and a human-readable string from `.describe()`. + +`describe()` examples (used in dry-run output): +- `HostnameModule.describe()` -> `"Set hostname to my-host"` +- `LocaleModule.describe()` -> `"Set locale to en_US.UTF-8"` +- `ShellModule.describe()` -> `"Install and configure shell: zsh"` +- `SSHKeygenModule.describe()` -> `"Generate 1 SSH key(s)"` +- `RuncmdModule.describe()` -> `"Run 3 custom command(s)"` + +Add a test for each module asserting `isinstance(module.describe(), str)` and that `.plan()` returns a non-empty list of strings. + +- [ ] **Step 3: Run tests, commit** + +```bash +git commit -m "feat: add bootstrap models and setup modules" +``` + +--- + +### Task 16: Bootstrap planning + +**Files:** +- Create: `src/flow/domain/bootstrap/planning.py` +- Create: `tests/test_domain_bootstrap_planning.py` + +- [ ] **Step 1: Write tests** for `parse_profile` and `plan_bootstrap` -- verify it produces correct ordered actions (validate env, setup modules, packages, shell, dotfiles link). + +- [ ] **Step 2: Implement** -- `plan_bootstrap` raises `ConfigError` if required env vars are missing, then builds ordered action list using packages domain for install planning. + +- [ ] **Step 3: Run tests, commit** + +```bash +git commit -m "feat: add bootstrap planning (orchestrator over packages + dotfiles)" +``` + +--- + +## Chunk 5: Remote + Containers + Projects Domains + +### Task 17: Remote domain + +**Files:** +- Create: `src/flow/domain/remote/__init__.py` +- Create: `src/flow/domain/remote/models.py` -- contains `Target` and `SSHCommand` only. **`TargetConfig` lives in `core/config.py`** (already created in Task 7). Import it from there. +- Create: `src/flow/domain/remote/resolution.py` +- Create: `tests/test_domain_remote.py` + +- [ ] **Step 1: Write tests** for `parse_target`, `resolve_target`, `build_ssh_command`, `terminfo_fix_command` + +- [ ] **Step 2: Implement models and resolution.** Port from current `src/flow/services/ssh.py`. `resolve_target` receives `list[TargetConfig]` imported from `flow.core.config`. + +- [ ] **Step 3: Run tests, commit** + +```bash +git commit -m "feat: add remote domain (SSH target resolution)" +``` + +--- + +### Task 18: Containers domain + +**Files:** +- Create: `src/flow/domain/containers/__init__.py` +- Create: `src/flow/domain/containers/models.py` +- Create: `src/flow/domain/containers/resolution.py` +- Create: `tests/test_domain_containers.py` + +- [ ] **Step 1: Write tests** for `parse_image_ref`, `container_name`, `resolve_mounts`, `build_container_spec` + +- [ ] **Step 2: Implement.** Port from current `src/flow/services/containers.py`. + +- [ ] **Step 3: Run tests, commit** + +```bash +git commit -m "feat: add containers domain (image resolution, mount computation)" +``` + +--- + +## Chunk 6: Services Layer (Part 1) + +### Task 19: DotfilesService + +**Files:** +- Create: `src/flow/services/__init__.py` +- Create: `src/flow/services/dotfiles.py` +- Create: `tests/test_service_dotfiles.py` + +- [ ] **Step 1: Write integration tests** using `tmp_path` for real filesystem + `FakeRunner` for git: + - `test_link_creates_symlinks` -- set up dotfiles dir, call `.link()`, verify symlinks + - `test_link_with_module` -- set up package with `_module.yaml` and cloned module dir, verify correct target paths + - `test_unlink_removes_symlinks` -- link then unlink, verify cleaned up + - `test_link_dry_run_no_changes` -- dry run does not create symlinks + - `test_status_shows_packages` -- set up linked state, verify output + +- [ ] **Step 2: Implement DotfilesService** + +The service performs all I/O: +1. `_discover_packages`: walks dotfiles dir, reads `_module.yaml` files, builds `Package` objects with `local_files` populated +2. Calls pure domain functions for resolution and planning +3. Executes the plan (create/remove symlinks via `FileSystem`) +4. Persists `LinkedState` to JSON + +- [ ] **Step 3: Run tests, commit** + +```bash +git commit -m "feat: add DotfilesService with link/unlink/status/edit" +``` + +--- + +### Task 20: PackageService + +**Files:** +- Create: `src/flow/services/packages.py` +- Create: `tests/test_service_packages.py` + +- [ ] **Step 1: Write tests** for install, list, remove flows + +- [ ] **Step 2: Implement PackageService** -- uses domain planning, executes via runtime + +- [ ] **Step 3: Run tests, commit** + +```bash +git commit -m "feat: add PackageService" +``` + +--- + +### Task 21: BootstrapService + +**Files:** +- Create: `src/flow/services/bootstrap.py` +- Create: `tests/test_service_bootstrap.py` + +- [ ] **Step 1: Write tests** -- bootstrap run with dry_run, show, list + +- [ ] **Step 2: Implement** -- uses domain planning, executes actions sequentially, delegates to PackageService and DotfilesService + +- [ ] **Step 3: Run tests, commit** + +```bash +git commit -m "feat: add BootstrapService" +``` + +--- + +## Chunk 7: Services Layer (Part 2) + +### Task 22: RemoteService + +**Files:** +- Create: `src/flow/services/remote.py` +- Create: `tests/test_service_remote.py` + +- [ ] **Step 1: Write tests** -- enter dry_run, list targets, terminfo warning + +- [ ] **Step 2: Implement** + +- [ ] **Step 3: Run tests, commit** + +```bash +git commit -m "feat: add RemoteService" +``` + +--- + +### Task 23: ContainerService + +**Files:** +- Create: `src/flow/services/containers.py` +- Create: `tests/test_service_containers.py` + +- [ ] **Step 1: Write tests** -- create, list, stop, remove with FakeRunner + +- [ ] **Step 2: Implement** + +- [ ] **Step 3: Run tests, commit** + +```bash +git commit -m "feat: add ContainerService" +``` + +--- + +### Task 24: ProjectService + +**Note:** `ProjectService` has no domain layer -- it's a thin service that runs git commands directly. All logic is I/O-bound (listing dirs, running git). No separate domain module needed. + +**Files:** +- Create: `src/flow/services/projects.py` +- Create: `tests/test_service_projects.py` + +- [ ] **Step 1: Write tests** + +```python +# tests/test_service_projects.py +"""Tests for ProjectService.""" + +def test_check_clean_repo(tmp_path): + """Create a git repo at tmp_path/projects/myrepo, commit a file. + Call service.check(fetch=False). Verify output contains 'clean'.""" + +def test_check_uncommitted_changes(tmp_path): + """Create repo, modify a tracked file without committing. + Verify 'uncommitted changes' in output.""" + +def test_check_no_git_repos(tmp_path): + """Empty projects dir. Verify info message.""" + +def test_summary_shows_all_dirs(tmp_path): + """Mix of git and non-git dirs. Verify table output.""" +``` + +- [ ] **Step 2: Implement** -- `check()` iterates dirs in `projects_dir`, runs git status/diff/rev-list per repo. `fetch()` runs `git fetch --all`. `summary()` is `check(fetch=False)`. Use `self.ctx.runtime.git` for all git calls, `self.ctx.console.table` for output. + +- [ ] **Step 3: Run tests, commit** + +```bash +git commit -m "feat: add ProjectService" +``` + +--- + +## Chunk 8: Commands + CLI + +### Task 25: CLI entry point + context validation + +**Files:** +- Create: `src/flow/cli.py` (rewrite) +- Create: `tests/test_cli.py` + +- [ ] **Step 1: Write tests** -- non-root check, context validation (remote blocked in VM), version flag, dry_run passed through + +- [ ] **Step 2: Implement cli.py** per spec Section 10 + +- [ ] **Step 3: Run tests, commit** + +```bash +git commit -m "feat: add CLI entry point with context awareness" +``` + +--- + +### Task 26: Command modules + +**Files:** +- Create: `src/flow/commands/remote.py` +- Create: `src/flow/commands/dev.py` +- Create: `src/flow/commands/dotfiles.py` +- Create: `src/flow/commands/setup.py` +- Create: `src/flow/commands/packages.py` +- Create: `src/flow/commands/projects.py` +- Create: `src/flow/commands/__init__.py` + +Each command module: register argparse subcommands, handler functions that construct the service and call one method. + +- [ ] **Step 1: Implement all command modules** (each is 30-60 lines) +- [ ] **Step 2: Write integration test** that runs `flow --help`, `flow dotfiles --help`, etc. +- [ ] **Step 3: Commit** + +```bash +git commit -m "feat: add command modules (thin CLI adapters)" +``` + +--- + +### Task 27: Zsh completion + +**Files:** +- Create: `src/flow/commands/completion.py` +- Create: `tests/test_completion.py` + +- [ ] **Step 1: Implement** -- port and adapt from current completion.py with updated command names + +- [ ] **Step 2: Write tests** for `complete()` function with the new command surface + +- [ ] **Step 3: Commit** + +```bash +git commit -m "feat: add zsh completion with updated command surface" +``` + +--- + +## Chunk 9: Cleanup + Integration + +### Task 28: Delete old code + +- [ ] **Step 1: Remove old files** + +```bash +# Remove old service/command/core files that have been replaced +rm -f src/flow/core/action.py +rm -f src/flow/core/stow.py +rm -f src/flow/core/process.py +# Old core files replaced by new ones: +# system.py -> runtime.py (already handled) +# config.py, console.py, platform.py, paths.py, errors.py -> rewritten in place +# variables.py -> template.py + +# Remove old command modules +rm -rf src/flow/commands/ # Already replaced by new commands/ + +# Remove old services +rm -rf src/flow/services/ # Already replaced by new services/ + +# Remove old tests +rm -f tests/test_action.py tests/test_stow.py tests/test_commands.py +rm -f tests/test_dotfiles_folding.py tests/test_self_hosting.py +``` + +- [ ] **Step 2: Run full test suite** + +Run: `python -m pytest tests/ -v` +Expected: All new tests pass, no imports of deleted modules + +- [ ] **Step 3: Update pyproject.toml entry point if needed** + +Verify `[project.scripts] flow = "flow.cli:main"` still works. + +- [ ] **Step 4: Test binary build** + +```bash +make build +./dist/flow --help +./dist/flow --version +``` + +- [ ] **Step 5: Commit** + +```bash +git add -A +git commit -m "chore: remove old code, complete rewrite" +``` + +--- + +### Task 29: Update README + +- [ ] **Step 1: Rewrite README.md** to match the new command surface, config format, and architecture. + +- [ ] **Step 2: Commit** + +```bash +git add README.md +git commit -m "docs: update README for new architecture" +``` diff --git a/docs/superpowers/specs/2026-03-16-flow-architecture-redesign.md b/docs/superpowers/specs/2026-03-16-flow-architecture-redesign.md index 0eb404d..db51804 100644 --- a/docs/superpowers/specs/2026-03-16-flow-architecture-redesign.md +++ b/docs/superpowers/specs/2026-03-16-flow-architecture-redesign.md @@ -1225,7 +1225,7 @@ def test_plan_link_creates_ops_for_new_targets(): desired = [LinkTarget(source=Path("/a"), target=Path("/home/x/.zshrc"), package="_shared/zsh", from_module=False, needs_sudo=False)] current = LinkedState(links={}) - plan = plan_link(desired, current) + plan = plan_link(desired, current, filesystem_check=lambda p: None) assert len(plan.operations) == 1 assert plan.operations[0].type == "create_link"