Files
flow/docs/superpowers/plans/2026-03-16-flow-rewrite.md
Tomas Mirchev df8a19d6cc Fix all plan review issues, save implementation plan
Plan fixes:
- detect_platform raises FlowError not RuntimeError
- TargetConfig lives in core/config.py only (remote domain imports it)
- plan_link handles source changes (remove_link + create_link)
- resolve_package_targets skips local files when mount_path is root
- LinkedState.from_dict guards on version mismatch
- Added missing test for parse_module_ref with absent ref
- Task 12 now has full tests and serialization format
- Task 13 uses spec signatures as truth, old code as reference
- Task 15 includes describe() examples and tests
- Task 24 has detailed test cases for ProjectService
- Note that conflicts.py is intentionally merged into planning.py
- Spec Section 12 example fixed to include filesystem_check arg

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-16 04:39:11 +02:00

76 KiB

Flow CLI Rewrite Implementation Plan

For agentic workers: REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Full rewrite of the flow CLI with correct domain abstractions, plan-then-execute pattern, and working module path resolution.

Architecture: Four-layer hybrid (core -> domain -> services -> commands). Domain layer is pure functions + frozen dataclasses. Services orchestrate I/O. Commands are trivial dispatchers. See docs/superpowers/specs/2026-03-16-flow-architecture-redesign.md for the full spec.

Tech Stack: Python 3.9+, PyYAML, argparse, pytest, PyInstaller

Spec: docs/superpowers/specs/2026-03-16-flow-architecture-redesign.md Standards: CLAUDE.md


Phasing Strategy

Build bottom-up. Each phase produces tested, working code before the next begins. Old code under src/flow/ is untouched until the final phase when it's deleted.

The new code is written in-place (same src/flow/ tree) because this is a full rewrite with no backward compatibility. Old modules are deleted as their replacements land.

Phase What Depends on
1 Core layer (errors, template, paths, platform, console, runtime, config) Nothing
2 Dotfiles domain (models, modules, resolution, planning) Core
3 Packages domain (models, catalog, resolution, planning) Core
4 Bootstrap domain (models, modules, planning) Core, packages domain
5 Remote + containers + projects domains Core
6 All services Core, all domains
7 Commands + CLI + completion Core, all services
8 Delete old code, final integration Everything

Chunk 1: Core Layer

Task 1: Errors

Files:

  • Create: src/flow/core/errors.py

  • Create: tests/test_errors.py

  • Step 1: Write the error hierarchy

# src/flow/core/errors.py
"""Project-wide error types."""


class FlowError(Exception):
    """Base for all user-facing errors."""


class ConfigError(FlowError):
    """Invalid config or manifest YAML."""


class PlanConflict(FlowError):
    """Conflicts detected during planning."""

    def __init__(self, message: str, conflicts: list[str]):
        super().__init__(message)
        self.conflicts = conflicts


class ExecutionError(FlowError):
    """A plan step failed during execution."""
  • Step 2: Write tests
# tests/test_errors.py
"""Tests for flow.core.errors."""

from flow.core.errors import ConfigError, ExecutionError, FlowError, PlanConflict


def test_flow_error_is_exception():
    assert issubclass(FlowError, Exception)


def test_config_error_is_flow_error():
    assert issubclass(ConfigError, FlowError)


def test_plan_conflict_carries_conflicts():
    err = PlanConflict("2 conflicts", ["a exists", "b exists"])
    assert str(err) == "2 conflicts"
    assert err.conflicts == ["a exists", "b exists"]


def test_execution_error_is_flow_error():
    assert issubclass(ExecutionError, FlowError)
  • Step 3: Run tests

Run: python -m pytest tests/test_errors.py -v Expected: 4 passed

  • Step 4: Commit
git add src/flow/core/errors.py tests/test_errors.py
git commit -m "feat: add FlowError hierarchy"

Task 2: Template (pure string substitution)

Files:

  • Create: src/flow/core/template.py

  • Create: tests/test_template.py

  • Step 1: Write tests

# tests/test_template.py
"""Tests for flow.core.template."""

import os

from flow.core.template import substitute, substitute_template


class TestSubstitute:
    def test_replaces_dollar_var(self):
        assert substitute("hello $NAME", {"NAME": "world"}) == "hello world"

    def test_replaces_braced_var(self):
        assert substitute("hello ${NAME}", {"NAME": "world"}) == "hello world"

    def test_falls_back_to_env(self, monkeypatch):
        monkeypatch.setenv("FOO", "bar")
        assert substitute("$FOO", {}) == "bar"

    def test_preserves_unknown_vars(self):
        assert substitute("$UNKNOWN", {}) == "$UNKNOWN"

    def test_non_string_passthrough(self):
        assert substitute(42, {}) == 42


class TestSubstituteTemplate:
    def test_replaces_double_braces(self):
        assert substitute_template("nvim-{{os}}", {"os": "linux"}) == "nvim-linux"

    def test_env_dot_notation(self, monkeypatch):
        monkeypatch.setenv("USER", "tomas")
        result = substitute_template("{{ env.USER }}", {"env": dict(os.environ)})
        assert result == "tomas"

    def test_nested_dict_lookup(self):
        ctx = {"platform": {"arch": "arm64"}}
        assert substitute_template("{{ platform.arch }}", ctx) == "arm64"

    def test_preserves_unknown_templates(self):
        assert substitute_template("{{ unknown }}", {}) == "{{ unknown }}"

    def test_non_string_passthrough(self):
        assert substitute_template(42, {}) == 42

    def test_whitespace_in_braces(self):
        assert substitute_template("{{  os  }}", {"os": "linux"}) == "linux"
  • Step 2: Run tests to verify they fail

Run: python -m pytest tests/test_template.py -v Expected: FAIL (module not found)

  • Step 3: Implement template.py
# src/flow/core/template.py
"""Variable and template substitution -- pure functions, no I/O."""

import os
import re
from typing import Any, Dict


def substitute(text: Any, variables: Dict[str, str]) -> Any:
    """Replace $VAR and ${VAR} with values from variables dict or env."""
    if not isinstance(text, str):
        return text

    pattern = re.compile(r"\$(\w+)|\$\{([^}]+)\}")

    def _replace(match: re.Match[str]) -> str:
        key = match.group(1) or match.group(2) or ""
        if key in variables:
            return str(variables[key])
        if key in os.environ:
            return os.environ[key]
        return match.group(0)

    return pattern.sub(_replace, text)


def _resolve_template_value(expr: str, context: Dict[str, Any]) -> Any:
    if expr.startswith("env."):
        env_key = expr.split(".", 1)[1]
        env_ctx = context.get("env", {})
        if isinstance(env_ctx, dict) and env_key in env_ctx:
            return env_ctx[env_key]
        return os.environ.get(env_key)

    if expr in context:
        return context[expr]

    current: Any = context
    for part in expr.split("."):
        if not isinstance(current, dict) or part not in current:
            return None
        current = current[part]

    return current


def substitute_template(text: Any, context: Dict[str, Any]) -> Any:
    """Replace {{expr}} placeholders with values from context dict."""
    if not isinstance(text, str):
        return text

    def _replace(match: re.Match[str]) -> str:
        key = match.group(1).strip()
        value = _resolve_template_value(key, context)
        if value is None:
            return match.group(0)
        return str(value)

    return re.sub(r"\{\{\s*([^{}]+?)\s*\}\}", _replace, text)
  • Step 4: Run tests

Run: python -m pytest tests/test_template.py -v Expected: All passed

  • Step 5: Commit
git add src/flow/core/template.py tests/test_template.py
git commit -m "feat: add template substitution (pure functions)"

Task 3: Paths (XDG constants)

Files:

  • Create: src/flow/core/paths.py

  • Create: tests/test_core_paths.py

  • Step 1: Write paths.py

# src/flow/core/paths.py
"""XDG-compliant path constants for flow."""

import os
from pathlib import Path


def _xdg(env_var: str, fallback: str) -> Path:
    return Path(os.environ.get(env_var, fallback))


HOME = Path.home()

CONFIG_DIR = _xdg("XDG_CONFIG_HOME", str(HOME / ".config")) / "flow"
DATA_DIR = _xdg("XDG_DATA_HOME", str(HOME / ".local" / "share")) / "flow"
STATE_DIR = _xdg("XDG_STATE_HOME", str(HOME / ".local" / "state")) / "flow"

DOTFILES_DIR = DATA_DIR / "dotfiles"
MODULES_DIR = DATA_DIR / "modules"
PACKAGES_DIR = DATA_DIR / "packages"

LINKED_STATE = STATE_DIR / "linked.json"
INSTALLED_STATE = STATE_DIR / "installed.json"

# Self-hosted flow config path (from dotfiles repo)
DOTFILES_FLOW_CONFIG = DOTFILES_DIR / "_shared" / "flow" / ".config" / "flow"


def ensure_dirs() -> None:
    """Create all required directories."""
    for d in (CONFIG_DIR, DATA_DIR, STATE_DIR, MODULES_DIR, PACKAGES_DIR):
        d.mkdir(parents=True, exist_ok=True)
  • Step 2: Write tests
# tests/test_core_paths.py
"""Tests for flow.core.paths."""

from pathlib import Path

from flow.core import paths


def test_config_dir_ends_with_flow():
    assert paths.CONFIG_DIR.name == "flow"


def test_data_dir_ends_with_flow():
    assert paths.DATA_DIR.name == "flow"


def test_modules_dir_under_data():
    assert paths.MODULES_DIR.parent == paths.DATA_DIR


def test_linked_state_under_state():
    assert paths.LINKED_STATE.parent == paths.STATE_DIR


def test_dotfiles_flow_config_path():
    expected_suffix = Path("_shared") / "flow" / ".config" / "flow"
    assert str(paths.DOTFILES_FLOW_CONFIG).endswith(str(expected_suffix))


def test_ensure_dirs_creates_directories(tmp_path, monkeypatch):
    monkeypatch.setattr(paths, "CONFIG_DIR", tmp_path / "config" / "flow")
    monkeypatch.setattr(paths, "DATA_DIR", tmp_path / "data" / "flow")
    monkeypatch.setattr(paths, "STATE_DIR", tmp_path / "state" / "flow")
    monkeypatch.setattr(paths, "MODULES_DIR", tmp_path / "data" / "flow" / "modules")
    monkeypatch.setattr(paths, "PACKAGES_DIR", tmp_path / "data" / "flow" / "packages")

    paths.ensure_dirs()

    assert (tmp_path / "config" / "flow").is_dir()
    assert (tmp_path / "data" / "flow" / "modules").is_dir()
    assert (tmp_path / "state" / "flow").is_dir()
  • Step 3: Run tests

Run: python -m pytest tests/test_core_paths.py -v Expected: All passed

  • Step 4: Commit
git add src/flow/core/paths.py tests/test_core_paths.py
git commit -m "feat: add XDG path constants"

Task 4: Platform detection

Files:

  • Create: src/flow/core/platform.py

  • Create: tests/test_core_platform.py

  • Step 1: Write tests

# tests/test_core_platform.py
"""Tests for flow.core.platform."""

import os

import pytest

from flow.core.platform import PlatformInfo, detect_context, detect_platform


def test_platform_info_computes_platform_string():
    p = PlatformInfo(os="linux", arch="x64")
    assert p.platform == "linux-x64"


def test_detect_platform_returns_valid_info():
    info = detect_platform()
    assert info.os in ("linux", "macos")
    assert info.arch in ("x64", "arm64")
    assert info.platform == f"{info.os}-{info.arch}"


def test_detect_platform_raises_flow_error_on_unsupported(monkeypatch):
    from flow.core.errors import FlowError
    monkeypatch.setattr("platform.system", lambda: "FreeBSD")
    with pytest.raises(FlowError, match="Unsupported operating system"):
        detect_platform()


def test_detect_context_host(monkeypatch):
    monkeypatch.delenv("DF_NAMESPACE", raising=False)
    monkeypatch.delenv("DF_PLATFORM", raising=False)
    assert detect_context() == "host"


def test_detect_context_vm(monkeypatch):
    monkeypatch.setenv("DF_NAMESPACE", "personal")
    monkeypatch.setenv("DF_PLATFORM", "orb")
    assert detect_context() == "vm"
  • Step 2: Implement platform.py
# src/flow/core/platform.py
"""OS/arch detection and execution context."""

import os
import platform as _platform
from dataclasses import dataclass

from flow.core.errors import FlowError


@dataclass(frozen=True)
class PlatformInfo:
    os: str = "linux"
    arch: str = "x64"

    @property
    def platform(self) -> str:
        return f"{self.os}-{self.arch}"


_OS_MAP = {"Darwin": "macos", "Linux": "linux"}
_ARCH_MAP = {"x86_64": "x64", "amd64": "x64", "aarch64": "arm64", "arm64": "arm64"}


def detect_platform() -> PlatformInfo:
    raw_os = _platform.system()
    os_name = _OS_MAP.get(raw_os)
    if os_name is None:
        raise FlowError(f"Unsupported operating system: {raw_os}")

    raw_arch = _platform.machine().lower()
    arch = _ARCH_MAP.get(raw_arch)
    if arch is None:
        raise FlowError(f"Unsupported architecture: {raw_arch}")

    return PlatformInfo(os=os_name, arch=arch)


def detect_context() -> str:
    """Detect execution context: 'host', 'vm', or 'container'."""
    if os.path.exists("/.dockerenv") or os.path.exists("/run/.containerenv"):
        return "container"
    if os.environ.get("DF_NAMESPACE") and os.environ.get("DF_PLATFORM"):
        return "vm"
    return "host"
  • Step 3: Run tests

Run: python -m pytest tests/test_core_platform.py -v Expected: All passed

  • Step 4: Commit
git add src/flow/core/platform.py tests/test_core_platform.py
git commit -m "feat: add platform detection and context awareness"

Task 5: Console output

Files:

  • Create: src/flow/core/console.py

  • Create: tests/test_core_console.py

  • Step 1: Write tests

# tests/test_core_console.py
"""Tests for flow.core.console."""

from flow.core.console import Console


def test_info_prints_message(capsys):
    c = Console(color=False)
    c.info("hello")
    assert "hello" in capsys.readouterr().out


def test_quiet_suppresses_info(capsys):
    c = Console(quiet=True, color=False)
    c.info("hidden")
    assert capsys.readouterr().out == ""


def test_quiet_does_not_suppress_error(capsys):
    c = Console(quiet=True, color=False)
    c.error("visible")
    captured = capsys.readouterr()
    assert "visible" in captured.err or "visible" in captured.out


def test_table_prints_headers_and_rows(capsys):
    c = Console(color=False)
    c.table(["NAME", "STATUS"], [["foo", "ok"], ["bar", "fail"]])
    output = capsys.readouterr().out
    assert "NAME" in output
    assert "foo" in output
    assert "bar" in output


def test_no_color_strips_ansi(capsys):
    c = Console(color=False)
    c.info("test")
    output = capsys.readouterr().out
    assert "\033[" not in output
  • Step 2: Implement console.py
# src/flow/core/console.py
"""Console output formatting with TTY detection and color control."""

import os
import sys
from typing import Any, Optional


class Console:
    def __init__(self, *, quiet: bool = False, color: Optional[bool] = None):
        self.quiet = quiet
        if color is None:
            self._color = os.isatty(sys.stdout.fileno()) if hasattr(sys.stdout, "fileno") else False
        else:
            self._color = color

    def _style(self, code: str, text: str) -> str:
        if not self._color:
            return text
        return f"{code}{text}\033[0m"

    def info(self, msg: str) -> None:
        if self.quiet:
            return
        tag = self._style("\033[36m", "[INFO]")
        print(f"{tag} {msg}")

    def warn(self, msg: str) -> None:
        tag = self._style("\033[33m", "[WARN]")
        print(f"{tag} {msg}")

    def error(self, msg: str) -> None:
        tag = self._style("\033[31m", "[ERROR]")
        print(f"{tag} {msg}", file=sys.stderr)

    def success(self, msg: str) -> None:
        tag = self._style("\033[32m", "[OK]")
        print(f"{tag} {msg}")

    def table(self, headers: list[str], rows: list[list[str]]) -> None:
        if not rows:
            return
        widths = [len(h) for h in headers]
        for row in rows:
            for i, cell in enumerate(row):
                if i < len(widths):
                    widths[i] = max(widths[i], len(str(cell)))

        header_line = "  ".join(f"{h:<{widths[i]}}" for i, h in enumerate(headers))
        if self._color:
            print(f"\033[1m{header_line}\033[0m")
        else:
            print(header_line)
        print("  ".join("-" * w for w in widths))
        for row in rows:
            print("  ".join(f"{str(cell):<{widths[i]}}" for i, cell in enumerate(row)))

    def print_plan(self, operations: list[Any], *, verb: str = "execute") -> None:
        if not operations:
            self.info(f"Nothing to {verb}.")
            return
        self.info(f"Plan ({len(operations)} operation(s)):")
        for op in operations:
            print(f"  {op}")
  • Step 3: Run tests

Run: python -m pytest tests/test_core_console.py -v Expected: All passed

  • Step 4: Commit
git add src/flow/core/console.py tests/test_core_console.py
git commit -m "feat: add Console with color/quiet/TTY support"

Task 6: Runtime primitives (CommandRunner, FileSystem, GitClient, SystemRuntime)

Files:

  • Create: src/flow/core/runtime.py

  • Create: tests/test_core_runtime.py

  • Step 1: Write tests for FileSystem

# tests/test_core_runtime.py
"""Tests for flow.core.runtime."""

from pathlib import Path

from flow.core.runtime import CommandRunner, FileSystem, GitClient, SystemRuntime


class TestFileSystem:
    def test_ensure_dir_creates_nested(self, tmp_path):
        fs = FileSystem()
        target = tmp_path / "a" / "b" / "c"
        fs.ensure_dir(target)
        assert target.is_dir()

    def test_write_and_read_text(self, tmp_path):
        fs = FileSystem()
        path = tmp_path / "test.txt"
        fs.write_text(path, "hello")
        assert fs.read_text(path) == "hello"

    def test_read_text_default(self, tmp_path):
        fs = FileSystem()
        path = tmp_path / "missing.txt"
        assert fs.read_text(path, default="fallback") == "fallback"

    def test_write_and_read_json(self, tmp_path):
        fs = FileSystem()
        path = tmp_path / "data.json"
        fs.write_json(path, {"key": "value"})
        assert fs.read_json(path) == {"key": "value"}

    def test_create_symlink(self, tmp_path):
        fs = FileSystem()
        source = tmp_path / "source"
        source.write_text("content")
        target = tmp_path / "link"
        fs.create_symlink(source, target)
        assert target.is_symlink()
        assert target.resolve() == source.resolve()

    def test_same_symlink_true(self, tmp_path):
        fs = FileSystem()
        source = tmp_path / "source"
        source.write_text("content")
        target = tmp_path / "link"
        target.symlink_to(source)
        assert fs.same_symlink(target, source) is True

    def test_same_symlink_false(self, tmp_path):
        fs = FileSystem()
        source = tmp_path / "source"
        source.write_text("content")
        other = tmp_path / "other"
        other.write_text("other")
        target = tmp_path / "link"
        target.symlink_to(other)
        assert fs.same_symlink(target, source) is False

    def test_remove_file(self, tmp_path):
        fs = FileSystem()
        path = tmp_path / "file"
        path.write_text("x")
        fs.remove_file(path)
        assert not path.exists()

    def test_remove_file_missing_ok(self, tmp_path):
        fs = FileSystem()
        fs.remove_file(tmp_path / "missing", missing_ok=True)  # no error

    def test_copy_file(self, tmp_path):
        fs = FileSystem()
        src = tmp_path / "src"
        src.write_text("data")
        dst = tmp_path / "sub" / "dst"
        fs.copy_file(src, dst)
        assert dst.read_text() == "data"


class TestCommandRunner:
    def test_run_echo(self):
        runner = CommandRunner()
        result = runner.run(["echo", "hello"], capture_output=True)
        assert result.stdout.strip() == "hello"

    def test_require_binary_finds_echo(self):
        runner = CommandRunner()
        path = runner.require_binary("echo")
        assert path is not None


class TestSystemRuntime:
    def test_creates_git_client(self):
        rt = SystemRuntime()
        assert isinstance(rt.git, GitClient)
        assert rt.git.runner is rt.runner
  • Step 2: Implement runtime.py

Implement CommandRunner, FileSystem, GitClient, and SystemRuntime per the spec in Section 3.3. Port the working implementations from the current src/flow/core/system.py but:

  • Remove the duplicate write_bytes method
  • Use FlowError instead of RuntimeError for error wrapping
  • Keep the same API surface
# src/flow/core/runtime.py
"""Runtime primitives for process, git, state, and filesystem access."""

from __future__ import annotations

import json
import os
import shlex
import shutil
import subprocess
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Iterable, Mapping, Optional, Sequence

from flow.core.console import Console
from flow.core.errors import FlowError


class CommandRunner:
    """Subprocess wrapper with consistent defaults."""

    def run(
        self,
        argv: Sequence[str] | Iterable[str],
        *,
        cwd: Optional[Path] = None,
        env: Optional[Mapping[str, str]] = None,
        capture_output: bool = True,
        check: bool = False,
        timeout: Optional[float] = None,
    ) -> subprocess.CompletedProcess[str]:
        parts = [str(a) for a in argv]
        completed = subprocess.run(
            parts,
            cwd=str(cwd) if cwd else None,
            env=dict(env) if env else None,
            capture_output=capture_output,
            text=True,
            check=False,
            timeout=timeout,
        )
        if check and completed.returncode != 0:
            msg = completed.stderr.strip() or completed.stdout.strip()
            if not msg:
                msg = f"Command failed with exit code {completed.returncode}"
            raise FlowError(msg)
        return completed

    def run_shell(
        self,
        command: str,
        *,
        cwd: Optional[Path] = None,
        env: Optional[Mapping[str, str]] = None,
        capture_output: bool = True,
        check: bool = False,
        timeout: Optional[float] = None,
    ) -> subprocess.CompletedProcess[str]:
        completed = subprocess.run(
            command,
            shell=True,
            cwd=str(cwd) if cwd else None,
            env=dict(env) if env else None,
            capture_output=capture_output,
            text=True,
            check=False,
            timeout=timeout,
        )
        if check and completed.returncode != 0:
            msg = completed.stderr.strip() or completed.stdout.strip()
            if not msg:
                msg = f"Command failed with exit code {completed.returncode}"
            raise FlowError(msg)
        return completed

    def stream_shell(
        self,
        command: str,
        console: Console,
        *,
        check: bool = True,
    ) -> subprocess.CompletedProcess[str]:
        process = subprocess.Popen(
            command,
            shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            text=True,
            bufsize=1,
        )
        lines: list[str] = []
        assert process.stdout is not None
        try:
            for line in process.stdout:
                stripped = line.rstrip()
                if stripped:
                    lines.append(stripped)
        finally:
            process.stdout.close()
            process.wait()

        if check and process.returncode != 0:
            raise FlowError(f"Command failed (exit {process.returncode}): {command}")

        return subprocess.CompletedProcess(command, process.returncode, stdout="\n".join(lines), stderr="")

    def require_binary(self, name: str) -> str:
        path = shutil.which(name)
        if path is None:
            raise FlowError(f"Required executable not found: {name}")
        return path


class FileSystem:
    """Filesystem wrapper for all mutating operations."""

    def ensure_dir(self, path: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None, mode: Optional[int] = None) -> None:
        if sudo:
            if runner is None:
                raise FlowError("Runner required for sudo operations")
            runner.require_binary("sudo")
            argv: list[str] = ["sudo", "mkdir", "-p"]
            if mode is not None:
                argv.extend(["-m", f"{mode:o}"])
            argv.append(str(path))
            runner.run(argv, check=True)
            return
        path.mkdir(parents=True, exist_ok=True)
        if mode is not None:
            path.chmod(mode)

    def remove_file(self, path: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None, missing_ok: bool = True) -> None:
        if sudo:
            if runner is None:
                raise FlowError("Runner required for sudo operations")
            argv = ["sudo", "rm"]
            if missing_ok:
                argv.append("-f")
            argv.append(str(path))
            runner.run(argv, check=True)
            return
        try:
            path.unlink()
        except FileNotFoundError:
            if not missing_ok:
                raise

    def remove_tree(self, path: Path) -> None:
        shutil.rmtree(path, ignore_errors=True)

    def copy_file(self, source: Path, target: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None) -> None:
        if sudo:
            if runner is None:
                raise FlowError("Runner required for sudo operations")
            self.ensure_dir(target.parent, sudo=True, runner=runner)
            runner.run(["sudo", "cp", "-a", str(source), str(target)], check=True)
            return
        self.ensure_dir(target.parent)
        shutil.copy2(source, target)

    def copy_tree(self, source: Path, target: Path) -> None:
        self.ensure_dir(target.parent)
        shutil.copytree(source, target, dirs_exist_ok=True)

    def create_symlink(self, source: Path, target: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None) -> None:
        if sudo:
            if runner is None:
                raise FlowError("Runner required for sudo operations")
            self.ensure_dir(target.parent, sudo=True, runner=runner)
            runner.run(["sudo", "ln", "-sfn", str(source), str(target)], check=True)
            return
        self.ensure_dir(target.parent)
        target.symlink_to(source)

    def same_symlink(self, target: Path, source: Path) -> bool:
        if not target.is_symlink():
            return False
        return target.resolve(strict=False) == source.resolve(strict=False)

    def read_text(self, path: Path, *, default: Optional[str] = None) -> str:
        try:
            return path.read_text(encoding="utf-8")
        except FileNotFoundError:
            if default is None:
                raise
            return default

    def write_text(self, path: Path, content: str) -> None:
        self.ensure_dir(path.parent)
        path.write_text(content, encoding="utf-8")

    def write_bytes(self, path: Path, content: bytes) -> None:
        self.ensure_dir(path.parent)
        path.write_bytes(content)

    def read_json(self, path: Path, *, default: Any = None) -> Any:
        try:
            with open(path, "r", encoding="utf-8") as f:
                return json.load(f)
        except FileNotFoundError:
            return default

    def write_json(self, path: Path, data: Any) -> None:
        self.ensure_dir(path.parent)
        with open(path, "w", encoding="utf-8") as f:
            json.dump(data, f, indent=2)


class GitClient:
    """Git adapter scoped to a repository root."""

    def __init__(self, runner: CommandRunner):
        self.runner = runner

    def run(self, repo_dir: Path, *args: str, capture_output: bool = True, check: bool = False) -> subprocess.CompletedProcess[str]:
        return self.runner.run(
            ["git", "-C", str(repo_dir), *args],
            capture_output=capture_output,
            check=check,
        )


@dataclass
class SystemRuntime:
    """Shared runtime dependencies."""
    runner: CommandRunner = field(default_factory=CommandRunner)
    fs: FileSystem = field(default_factory=FileSystem)
    git: GitClient = field(init=False)

    def __post_init__(self) -> None:
        self.git = GitClient(self.runner)
  • Step 3: Run tests

Run: python -m pytest tests/test_core_runtime.py -v Expected: All passed

  • Step 4: Commit
git add src/flow/core/runtime.py tests/test_core_runtime.py
git commit -m "feat: add runtime primitives (CommandRunner, FileSystem, GitClient)"

Task 7: Config loading and FlowContext

Files:

  • Create: src/flow/core/config.py

  • Create: tests/test_core_config.py

  • Step 1: Write tests

# tests/test_core_config.py
"""Tests for flow.core.config."""

from pathlib import Path

import pytest

from flow.core.config import AppConfig, load_config, load_manifest


def test_load_config_missing_path(tmp_path):
    cfg = load_config(tmp_path / "nonexistent")
    assert isinstance(cfg, AppConfig)
    assert cfg.dotfiles_url == ""
    assert cfg.container_registry == "registry.tomastm.com"


def test_load_config_from_yaml(tmp_path):
    (tmp_path / "config.yaml").write_text(
        "repository:\n"
        "  url: git@github.com:user/dots.git\n"
        "  branch: dev\n"
        "paths:\n"
        "  projects: ~/code\n"
        "defaults:\n"
        "  container-registry: my.registry.com\n"
        "  tmux-session: main\n"
    )
    cfg = load_config(tmp_path)
    assert cfg.dotfiles_url == "git@github.com:user/dots.git"
    assert cfg.dotfiles_branch == "dev"
    assert cfg.projects_dir == "~/code"
    assert cfg.container_registry == "my.registry.com"
    assert cfg.tmux_session == "main"


def test_load_config_parses_targets_shorthand(tmp_path):
    (tmp_path / "config.yaml").write_text(
        "targets:\n"
        "  personal@orb: personal.orb\n"
    )
    cfg = load_config(tmp_path)
    assert len(cfg.targets) == 1
    assert cfg.targets[0].namespace == "personal"
    assert cfg.targets[0].platform == "orb"
    assert cfg.targets[0].host == "personal.orb"


def test_load_config_parses_targets_dict(tmp_path):
    (tmp_path / "config.yaml").write_text(
        "targets:\n"
        "  work@ec2:\n"
        "    host: work.ec2.internal\n"
        "    identity: ~/.ssh/id_work\n"
    )
    cfg = load_config(tmp_path)
    assert len(cfg.targets) == 1
    assert cfg.targets[0].host == "work.ec2.internal"
    assert cfg.targets[0].identity == "~/.ssh/id_work"


def test_load_manifest_returns_dict(tmp_path):
    (tmp_path / "manifest.yaml").write_text(
        "packages:\n"
        "  - name: fd\n"
        "    type: pkg\n"
    )
    data = load_manifest(tmp_path)
    assert isinstance(data, dict)
    assert "packages" in data


def test_load_manifest_merges_files(tmp_path):
    (tmp_path / "01-packages.yaml").write_text("packages:\n  - name: fd\n    type: pkg\n")
    (tmp_path / "02-profiles.yaml").write_text("profiles:\n  work:\n    os: linux\n")
    data = load_manifest(tmp_path)
    assert "packages" in data
    assert "profiles" in data
  • Step 2: Implement config.py

Port config loading from current src/flow/core/config.py with these changes:

  • Use TargetConfig dataclass (new) matching spec Section 7.1
  • Use FlowContext dataclass matching spec Section 3.4
  • Support both shorthand string and dict target formats
  • Remove all _get_value multi-key fallback logic (one canonical form per CLAUDE.md)

The config parser normalizes target entries:

  • "personal@orb: personal.orb" -> TargetConfig(namespace="personal", platform="orb", host="personal.orb")
  • Dict form: TargetConfig(namespace=..., platform=..., host=..., identity=...)

Key types:

@dataclass(frozen=True)
class TargetConfig:
    namespace: str
    platform: str
    host: str
    identity: str | None = None

@dataclass
class AppConfig:
    dotfiles_url: str = ""
    dotfiles_branch: str = "main"
    projects_dir: str = "~/projects"
    container_registry: str = "registry.tomastm.com"
    container_tag: str = "latest"
    tmux_session: str = "default"
    targets: list[TargetConfig] = field(default_factory=list)

@dataclass
class FlowContext:
    config: AppConfig
    manifest: dict[str, Any]
    platform: PlatformInfo
    console: Console
    runtime: SystemRuntime = field(default_factory=SystemRuntime)
  • Step 3: Run tests

Run: python -m pytest tests/test_core_config.py -v Expected: All passed

  • Step 4: Commit
git add src/flow/core/config.py tests/test_core_config.py
git commit -m "feat: add config loading and FlowContext"

Chunk 2: Dotfiles Domain

Task 8: Dotfiles models

Files:

  • Create: src/flow/domain/__init__.py (empty)

  • Create: src/flow/domain/dotfiles/__init__.py (empty)

  • Create: src/flow/domain/dotfiles/models.py

  • Create: tests/test_domain_dotfiles_models.py

  • Step 1: Write models

# src/flow/domain/dotfiles/models.py
"""Dotfiles domain models -- all frozen dataclasses."""

from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional


@dataclass(frozen=True)
class ModuleRef:
    """An external git repo providing content for a package subtree."""
    source: str
    ref_type: str          # "branch" | "tag" | "commit"
    ref_value: str
    mount_path: Path       # Relative path within package to _module.yaml parent
    cache_dir: Path        # Where the repo is cloned
    module_files: tuple[tuple[Path, Path], ...]  # (abs_source, rel_to_cache_root)


@dataclass(frozen=True)
class Package:
    """A dotfiles package: a named set of files mapping to home-relative targets."""
    name: str              # e.g. "zsh", "nvim"
    layer: str             # "_shared" or profile name
    package_id: str        # "layer/name"
    source_dir: Path       # Absolute path in dotfiles repo
    module: Optional[ModuleRef]
    local_files: tuple[tuple[Path, Path], ...]  # (abs_source, rel_to_package_root)


@dataclass(frozen=True)
class LinkTarget:
    """A single file that should be linked into the filesystem."""
    source: Path
    target: Path
    package: str           # package_id
    from_module: bool
    needs_sudo: bool


@dataclass(frozen=True)
class LinkOp:
    """A single operation in a link plan."""
    type: str              # "create_link" | "remove_link" | "create_dir"
    target: Path
    source: Optional[Path]
    package: str
    needs_sudo: bool

    def __str__(self) -> str:
        if self.type == "create_link":
            sudo = " (sudo)" if self.needs_sudo else ""
            return f"LINK: {self.target} -> {self.source}{sudo}"
        if self.type == "remove_link":
            return f"REMOVE: {self.target}"
        if self.type == "create_dir":
            return f"MKDIR: {self.target}"
        return f"{self.type}: {self.target}"


@dataclass(frozen=True)
class PlanSummary:
    added: int
    removed: int
    unchanged: int
    from_modules: int


@dataclass(frozen=True)
class LinkPlan:
    """Complete reconciliation plan."""
    operations: list[LinkOp]
    conflicts: list[str]
    summary: PlanSummary


@dataclass
class LinkedState:
    """Persisted link state."""
    links: dict[Path, LinkTarget] = field(default_factory=dict)

    def as_dict(self) -> dict:
        grouped: dict[str, dict[str, dict]] = {}
        for target, lt in sorted(self.links.items(), key=lambda x: str(x[0])):
            pkg_links = grouped.setdefault(lt.package, {})
            pkg_links[str(target)] = {
                "source": str(lt.source),
                "from_module": lt.from_module,
                "needs_sudo": lt.needs_sudo,
            }
        return {"version": 2, "links": grouped}

    @classmethod
    def from_dict(cls, data: dict) -> "LinkedState":
        version = data.get("version")
        if version is not None and version != 2:
            from flow.core.errors import ConfigError
            raise ConfigError(
                f"Unsupported linked.json version {version}. "
                "Delete ~/.local/state/flow/linked.json and relink."
            )
        links: dict[Path, LinkTarget] = {}
        raw_links = data.get("links", {})
        for package, pkg_links in raw_links.items():
            for target_str, info in pkg_links.items():
                links[Path(target_str)] = LinkTarget(
                    source=Path(info["source"]),
                    target=Path(target_str),
                    package=str(package),
                    from_module=bool(info.get("from_module", False)),
                    needs_sudo=bool(info.get("needs_sudo", False)),
                )
        return cls(links=links)


@dataclass(frozen=True)
class RepoInfo:
    """A managed git repo (dotfiles or module)."""
    name: str
    path: Path
    source: str
    is_module: bool
  • Step 2: Write tests
# tests/test_domain_dotfiles_models.py
"""Tests for dotfiles domain models."""

from pathlib import Path

from flow.domain.dotfiles.models import (
    LinkOp,
    LinkPlan,
    LinkTarget,
    LinkedState,
    ModuleRef,
    Package,
    PlanSummary,
)


def test_link_op_str_create():
    op = LinkOp(type="create_link", target=Path("/home/x/.zshrc"),
                source=Path("/dots/zsh/.zshrc"), package="_shared/zsh", needs_sudo=False)
    assert "LINK:" in str(op)
    assert ".zshrc" in str(op)


def test_link_op_str_sudo():
    op = LinkOp(type="create_link", target=Path("/etc/hosts"),
                source=Path("/dots/dns/hosts"), package="_shared/dns", needs_sudo=True)
    assert "(sudo)" in str(op)


def test_linked_state_roundtrip():
    lt = LinkTarget(source=Path("/a"), target=Path("/b"), package="p", from_module=False, needs_sudo=False)
    state = LinkedState(links={Path("/b"): lt})
    data = state.as_dict()
    restored = LinkedState.from_dict(data)
    assert Path("/b") in restored.links
    assert restored.links[Path("/b")].source == Path("/a")
    assert restored.links[Path("/b")].package == "p"


def test_linked_state_empty():
    state = LinkedState.from_dict({})
    assert state.links == {}


def test_package_has_id():
    pkg = Package(name="zsh", layer="_shared", package_id="_shared/zsh",
                  source_dir=Path("/dots/_shared/zsh"), module=None, local_files=())
    assert pkg.package_id == "_shared/zsh"
  • Step 3: Run tests

Run: python -m pytest tests/test_domain_dotfiles_models.py -v Expected: All passed

  • Step 4: Commit
git add src/flow/domain/__init__.py src/flow/domain/dotfiles/__init__.py \
    src/flow/domain/dotfiles/models.py tests/test_domain_dotfiles_models.py
git commit -m "feat: add dotfiles domain models"

Task 9: Dotfiles modules (mount path, cache dir, source normalization)

Files:

  • Create: src/flow/domain/dotfiles/modules.py

  • Create: tests/test_domain_dotfiles_modules.py

  • Step 1: Write tests

# tests/test_domain_dotfiles_modules.py
"""Tests for dotfiles module resolution -- the core bug fix."""

from pathlib import Path

import pytest

from flow.core.errors import ConfigError
from flow.domain.dotfiles.modules import (
    compute_mount_path,
    module_cache_dir,
    normalize_source,
    parse_module_ref,
)


class TestComputeMountPath:
    def test_nested_module(self):
        """_shared/nvim/.config/nvim/_module.yaml -> .config/nvim"""
        result = compute_mount_path(
            module_yaml=Path("/dots/_shared/nvim/.config/nvim/_module.yaml"),
            package_dir=Path("/dots/_shared/nvim"),
        )
        assert result == Path(".config/nvim")

    def test_root_level_module(self):
        """_shared/nvim/_module.yaml -> Path('.')"""
        result = compute_mount_path(
            module_yaml=Path("/dots/_shared/nvim/_module.yaml"),
            package_dir=Path("/dots/_shared/nvim"),
        )
        assert result == Path(".")

    def test_deeply_nested(self):
        result = compute_mount_path(
            module_yaml=Path("/dots/_shared/pkg/.config/a/b/c/_module.yaml"),
            package_dir=Path("/dots/_shared/pkg"),
        )
        assert result == Path(".config/a/b/c")


class TestModuleCacheDir:
    def test_simple_name(self):
        result = module_cache_dir("_shared/nvim", Path("/home/x/.local/share/flow/modules"))
        assert result == Path("/home/x/.local/share/flow/modules/_shared--nvim")

    def test_profile_name(self):
        result = module_cache_dir("linux-work/nvim", Path("/m"))
        assert result == Path("/m/linux-work--nvim")


class TestNormalizeSource:
    def test_github_shorthand(self):
        assert normalize_source("github:org/repo") == "https://github.com/org/repo.git"

    def test_full_url_passthrough(self):
        assert normalize_source("https://example.com/repo.git") == "https://example.com/repo.git"

    def test_ssh_passthrough(self):
        assert normalize_source("git@github.com:org/repo.git") == "git@github.com:org/repo.git"


class TestParseModuleRef:
    def test_branch_ref(self):
        raw = {"source": "github:org/nvim-config", "ref": {"branch": "main"}}
        ref = parse_module_ref(
            raw, package_id="_shared/nvim",
            mount_path=Path(".config/nvim"),
            modules_base=Path("/modules"),
        )
        assert ref.source == "https://github.com/org/nvim-config.git"
        assert ref.ref_type == "branch"
        assert ref.ref_value == "main"
        assert ref.mount_path == Path(".config/nvim")
        assert ref.cache_dir == Path("/modules/_shared--nvim")

    def test_tag_ref(self):
        raw = {"source": "github:org/repo", "ref": {"tag": "v1.0"}}
        ref = parse_module_ref(raw, "p/x", Path("."), Path("/m"))
        assert ref.ref_type == "tag"
        assert ref.ref_value == "v1.0"

    def test_missing_source_raises(self):
        with pytest.raises(ConfigError):
            parse_module_ref({}, "p/x", Path("."), Path("/m"))

    def test_missing_ref_raises(self):
        raw = {"source": "github:org/repo"}
        with pytest.raises(ConfigError):
            parse_module_ref(raw, "p/x", Path("."), Path("/m"))

    def test_ref_not_dict_raises(self):
        raw = {"source": "github:org/repo", "ref": "main"}
        with pytest.raises(ConfigError):
            parse_module_ref(raw, "p/x", Path("."), Path("/m"))

    def test_ambiguous_ref_raises(self):
        raw = {"source": "github:org/repo", "ref": {"branch": "main", "tag": "v1"}}
        with pytest.raises(ConfigError):
            parse_module_ref(raw, "p/x", Path("."), Path("/m"))
  • Step 2: Implement modules.py
# src/flow/domain/dotfiles/modules.py
"""Module metadata resolution -- pure functions."""

from pathlib import Path
from typing import Tuple

from flow.core.errors import ConfigError
from flow.domain.dotfiles.models import ModuleRef


def compute_mount_path(module_yaml: Path, package_dir: Path) -> Path:
    """Relative path from package root to _module.yaml parent."""
    rel = module_yaml.parent.relative_to(package_dir)
    return rel


def module_cache_dir(package_id: str, modules_base: Path) -> Path:
    """Cache dir for a module clone. '/' -> '--' to avoid collisions."""
    return modules_base / package_id.replace("/", "--")


def normalize_source(source: str) -> str:
    """Normalize git source URL. github:org/repo -> https://github.com/org/repo.git"""
    if source.startswith("github:"):
        repo = source.split(":", 1)[1]
        return f"https://github.com/{repo}.git"
    return source


def parse_module_ref(
    raw: dict,
    package_id: str,
    mount_path: Path,
    modules_base: Path,
) -> ModuleRef:
    """Build ModuleRef from parsed _module.yaml content."""
    source = raw.get("source")
    if not isinstance(source, str) or not source:
        raise ConfigError(f"Module for {package_id}: 'source' must be a non-empty string")

    ref = raw.get("ref")
    if not isinstance(ref, dict):
        raise ConfigError(f"Module for {package_id}: 'ref' must be a mapping")

    choices = [k for k in ("branch", "tag", "commit") if isinstance(ref.get(k), str) and ref[k]]
    if len(choices) != 1:
        raise ConfigError(f"Module for {package_id}: 'ref' must have exactly one of: branch, tag, commit")

    ref_type = choices[0]
    ref_value = str(ref[ref_type])

    return ModuleRef(
        source=normalize_source(source),
        ref_type=ref_type,
        ref_value=ref_value,
        mount_path=mount_path,
        cache_dir=module_cache_dir(package_id, modules_base),
        module_files=(),  # Populated by service after cloning
    )
  • Step 3: Run tests

Run: python -m pytest tests/test_domain_dotfiles_modules.py -v Expected: All passed

  • Step 4: Commit
git add src/flow/domain/dotfiles/modules.py tests/test_domain_dotfiles_modules.py
git commit -m "feat: add dotfiles module resolution (fixes _module.yaml path bug)"

Task 10: Dotfiles resolution (package -> LinkTargets)

Files:

  • Create: src/flow/domain/dotfiles/resolution.py

  • Create: tests/test_domain_dotfiles_resolution.py

  • Step 1: Write tests

# tests/test_domain_dotfiles_resolution.py
"""Tests for dotfiles path resolution."""

from pathlib import Path

import pytest

from flow.core.errors import PlanConflict
from flow.domain.dotfiles.models import LinkTarget, ModuleRef, Package
from flow.domain.dotfiles.resolution import resolve_all_targets, resolve_package_targets

RESERVED_ROOT = "_root"
HOME = Path("/home/testuser")


def _pkg(name, layer="_shared", files=(), module=None):
    return Package(
        name=name,
        layer=layer,
        package_id=f"{layer}/{name}",
        source_dir=Path(f"/dots/{layer}/{name}"),
        module=module,
        local_files=tuple(files),
    )


class TestResolvePackageTargets:
    def test_simple_file(self):
        pkg = _pkg("zsh", files=[
            (Path("/dots/_shared/zsh/.zshrc"), Path(".zshrc")),
        ])
        targets = resolve_package_targets(pkg, HOME, set())
        assert len(targets) == 1
        assert targets[0].target == HOME / ".zshrc"
        assert targets[0].source == Path("/dots/_shared/zsh/.zshrc")
        assert targets[0].from_module is False

    def test_nested_config(self):
        pkg = _pkg("git", files=[
            (Path("/dots/_shared/git/.config/git/config"), Path(".config/git/config")),
        ])
        targets = resolve_package_targets(pkg, HOME, set())
        assert targets[0].target == HOME / ".config" / "git" / "config"

    def test_root_marker(self):
        pkg = _pkg("dns", files=[
            (Path("/dots/_shared/dns/_root/etc/hosts"), Path("_root/etc/hosts")),
        ])
        targets = resolve_package_targets(pkg, HOME, set())
        assert targets[0].target == Path("/etc/hosts")
        assert targets[0].needs_sudo is True

    def test_root_marker_skipped_when_in_skip_set(self):
        pkg = _pkg("dns", files=[
            (Path("/dots/_shared/dns/_root/etc/hosts"), Path("_root/etc/hosts")),
        ])
        targets = resolve_package_targets(pkg, HOME, {"_root"})
        assert len(targets) == 0

    def test_skip_package_by_name(self):
        pkg = _pkg("nvim", files=[
            (Path("/dots/_shared/nvim/.config/nvim/init.lua"), Path(".config/nvim/init.lua")),
        ])
        targets = resolve_package_targets(pkg, HOME, {"nvim"})
        assert len(targets) == 0

    def test_module_files_linked_under_mount_path(self):
        module = ModuleRef(
            source="https://github.com/org/nvim-config.git",
            ref_type="branch",
            ref_value="main",
            mount_path=Path(".config/nvim"),
            cache_dir=Path("/modules/_shared--nvim"),
            module_files=(
                (Path("/modules/_shared--nvim/init.lua"), Path("init.lua")),
                (Path("/modules/_shared--nvim/lua/plugins.lua"), Path("lua/plugins.lua")),
            ),
        )
        pkg = _pkg("nvim", files=[
            (Path("/dots/_shared/nvim/.local/bin/nvim-wrapper"), Path(".local/bin/nvim-wrapper")),
        ], module=module)

        targets = resolve_package_targets(pkg, HOME, set())

        # Local file outside mount_path
        local_targets = [t for t in targets if not t.from_module]
        assert len(local_targets) == 1
        assert local_targets[0].target == HOME / ".local" / "bin" / "nvim-wrapper"

        # Module files under mount_path
        module_targets = [t for t in targets if t.from_module]
        assert len(module_targets) == 2
        module_target_paths = {t.target for t in module_targets}
        assert HOME / ".config" / "nvim" / "init.lua" in module_target_paths
        assert HOME / ".config" / "nvim" / "lua" / "plugins.lua" in module_target_paths

    def test_module_yaml_file_not_linked(self):
        """The _module.yaml marker itself should never be linked."""
        pkg = _pkg("nvim", files=[
            (Path("/dots/_shared/nvim/.config/nvim/_module.yaml"), Path(".config/nvim/_module.yaml")),
        ], module=ModuleRef(
            source="x", ref_type="branch", ref_value="main",
            mount_path=Path(".config/nvim"),
            cache_dir=Path("/m"), module_files=(),
        ))
        targets = resolve_package_targets(pkg, HOME, set())
        assert not any(t.target.name == "_module.yaml" for t in targets)

    def test_root_level_module_skips_all_local_files(self):
        """When mount_path is '.', all local files are from the module, not dotfiles repo."""
        module = ModuleRef(
            source="x", ref_type="branch", ref_value="main",
            mount_path=Path("."),
            cache_dir=Path("/m"),
            module_files=(
                (Path("/m/init.lua"), Path("init.lua")),
            ),
        )
        pkg = _pkg("nvim", files=[
            (Path("/dots/_shared/nvim/_module.yaml"), Path("_module.yaml")),
            (Path("/dots/_shared/nvim/stale-file.txt"), Path("stale-file.txt")),
        ], module=module)
        targets = resolve_package_targets(pkg, HOME, set())
        # Only module files should appear, local files skipped
        assert len(targets) == 1
        assert targets[0].from_module is True
        assert targets[0].target == HOME / "init.lua"


class TestResolveAllTargets:
    def test_no_conflicts(self):
        pkgs = [
            _pkg("zsh", files=[(Path("/a/.zshrc"), Path(".zshrc"))]),
            _pkg("git", files=[(Path("/a/.gitconfig"), Path(".gitconfig"))]),
        ]
        targets = resolve_all_targets(pkgs, HOME, set())
        assert len(targets) == 2

    def test_duplicate_target_raises(self):
        pkgs = [
            _pkg("zsh", layer="_shared", files=[(Path("/a/.zshrc"), Path(".zshrc"))]),
            _pkg("zsh", layer="work", files=[(Path("/b/.zshrc"), Path(".zshrc"))]),
        ]
        with pytest.raises(PlanConflict):
            resolve_all_targets(pkgs, HOME, set())
  • Step 2: Implement resolution.py
# src/flow/domain/dotfiles/resolution.py
"""Path resolution: package -> home-relative LinkTargets. Pure functions."""

from pathlib import Path

from flow.core.errors import PlanConflict
from flow.domain.dotfiles.models import LinkTarget, Package

RESERVED_ROOT = "_root"
MODULE_FILE = "_module.yaml"


def resolve_package_targets(
    package: Package,
    home: Path,
    skip: set[str],
) -> list[LinkTarget]:
    """Resolve all LinkTargets for a package, handling modules correctly."""
    if package.name in skip:
        return []

    targets: list[LinkTarget] = []
    mount_path = package.module.mount_path if package.module else None

    # Local files (from dotfiles repo)
    for abs_source, rel in package.local_files:
        # Skip _module.yaml
        if rel.name == MODULE_FILE:
            continue

        # If module exists, skip files inside mount_path (module provides those)
        if mount_path is not None:
            if mount_path == Path("."):
                continue  # Root-level module: all local files are superseded by module
            try:
                rel.relative_to(mount_path)
                continue  # Inside mount_path, skip
            except ValueError:
                pass  # Outside mount_path, process normally

        target, needs_sudo = _resolve_target(rel, home, skip)
        if target is None:
            continue
        targets.append(LinkTarget(
            source=abs_source, target=target,
            package=package.package_id,
            from_module=False, needs_sudo=needs_sudo,
        ))

    # Module files
    if package.module:
        for abs_source, rel in package.module.module_files:
            mounted = package.module.mount_path / rel if package.module.mount_path != Path(".") else rel
            target, needs_sudo = _resolve_target(mounted, home, skip)
            if target is None:
                continue
            targets.append(LinkTarget(
                source=abs_source, target=target,
                package=package.package_id,
                from_module=True, needs_sudo=needs_sudo,
            ))

    return targets


def _resolve_target(rel: Path, home: Path, skip: set[str]) -> tuple[Path | None, bool]:
    """Resolve a relative path to an absolute target. Returns (target, needs_sudo)."""
    parts = rel.parts
    if parts and parts[0] == RESERVED_ROOT:
        if RESERVED_ROOT in skip:
            return None, False
        if len(parts) < 2:
            return None, False
        return Path("/") / Path(*parts[1:]), True
    return home / rel, False


def resolve_all_targets(
    packages: list[Package],
    home: Path,
    skip: set[str],
) -> list[LinkTarget]:
    """Resolve targets for all packages. Raises PlanConflict on duplicate targets."""
    all_targets: list[LinkTarget] = []
    seen: dict[Path, str] = {}

    for pkg in packages:
        targets = resolve_package_targets(pkg, home, skip)
        for t in targets:
            if t.target in seen:
                conflicts = [
                    f"{t.target} claimed by both {seen[t.target]} and {t.package}"
                ]
                raise PlanConflict(
                    f"Conflicting dotfile targets across packages",
                    conflicts,
                )
            seen[t.target] = t.package
            all_targets.append(t)

    return all_targets
  • Step 3: Run tests

Run: python -m pytest tests/test_domain_dotfiles_resolution.py -v Expected: All passed

  • Step 4: Commit
git add src/flow/domain/dotfiles/resolution.py tests/test_domain_dotfiles_resolution.py
git commit -m "feat: add dotfiles path resolution with correct module mounting"

Note: The spec's file layout lists domain/dotfiles/conflicts.py as a separate file. This is intentionally merged into planning.py -- cross-package collisions are caught by resolve_all_targets (Task 10), and filesystem conflicts are detected inside plan_link via the injected filesystem_check callback. No separate conflicts.py file is needed.

Files:

  • Create: src/flow/domain/dotfiles/planning.py

  • Create: tests/test_domain_dotfiles_planning.py

  • Step 1: Write tests

# tests/test_domain_dotfiles_planning.py
"""Tests for dotfiles link planning."""

from pathlib import Path
from typing import Optional

from flow.domain.dotfiles.models import (
    LinkOp,
    LinkTarget,
    LinkedState,
)
from flow.domain.dotfiles.planning import plan_link, plan_unlink


def _lt(target, source="/a", pkg="_shared/zsh", module=False, sudo=False):
    return LinkTarget(
        source=Path(source), target=Path(target),
        package=pkg, from_module=module, needs_sudo=sudo,
    )


def _fs_check_none(path: Path) -> Optional[str]:
    """Fake filesystem_check: nothing exists."""
    return None


def _fs_check_file(path: Path) -> Optional[str]:
    """Fake: everything is a file."""
    return "file"


class TestPlanLink:
    def test_new_target_creates_link(self):
        desired = [_lt("/home/x/.zshrc")]
        plan = plan_link(desired, LinkedState(), _fs_check_none)
        assert len(plan.operations) == 1
        assert plan.operations[0].type == "create_link"
        assert plan.summary.added == 1

    def test_existing_correct_link_unchanged(self):
        lt = _lt("/home/x/.zshrc")
        current = LinkedState(links={Path("/home/x/.zshrc"): lt})
        plan = plan_link([lt], current, _fs_check_none)
        assert len(plan.operations) == 0
        assert plan.summary.unchanged == 1

    def test_stale_link_removed(self):
        old = _lt("/home/x/.old")
        current = LinkedState(links={Path("/home/x/.old"): old})
        plan = plan_link([], current, _fs_check_none)
        assert len(plan.operations) == 1
        assert plan.operations[0].type == "remove_link"
        assert plan.summary.removed == 1

    def test_changed_source_produces_remove_then_create(self):
        old = _lt("/home/x/.zshrc", source="/old")
        new = _lt("/home/x/.zshrc", source="/new")
        current = LinkedState(links={Path("/home/x/.zshrc"): old})
        plan = plan_link([new], current, _fs_check_none)
        types = [op.type for op in plan.operations]
        assert types == ["remove_link", "create_link"]

    def test_unmanaged_file_at_target_is_conflict(self):
        desired = [_lt("/home/x/.zshrc")]
        plan = plan_link(desired, LinkedState(), _fs_check_file)
        assert len(plan.conflicts) == 1
        assert ".zshrc" in plan.conflicts[0]

    def test_module_targets_counted(self):
        desired = [_lt("/home/x/.config/nvim/init.lua", module=True)]
        plan = plan_link(desired, LinkedState(), _fs_check_none)
        assert plan.summary.from_modules == 1


class TestPlanUnlink:
    def test_unlink_all(self):
        lt = _lt("/home/x/.zshrc")
        current = LinkedState(links={Path("/home/x/.zshrc"): lt})
        plan = plan_unlink(current, packages=None)
        assert len(plan.operations) == 1
        assert plan.operations[0].type == "remove_link"

    def test_unlink_specific_package(self):
        zsh = _lt("/home/x/.zshrc", pkg="_shared/zsh")
        git = _lt("/home/x/.gitconfig", pkg="_shared/git")
        current = LinkedState(links={
            Path("/home/x/.zshrc"): zsh,
            Path("/home/x/.gitconfig"): git,
        })
        plan = plan_unlink(current, packages=["_shared/zsh"])
        assert len(plan.operations) == 1
        assert plan.operations[0].target == Path("/home/x/.zshrc")

    def test_unlink_by_basename(self):
        zsh = _lt("/home/x/.zshrc", pkg="_shared/zsh")
        current = LinkedState(links={Path("/home/x/.zshrc"): zsh})
        plan = plan_unlink(current, packages=["zsh"])
        assert len(plan.operations) == 1
  • Step 2: Implement planning.py
# src/flow/domain/dotfiles/planning.py
"""Link/unlink plan computation. Pure functions with injected I/O."""

from pathlib import Path
from typing import Callable, Optional

from flow.domain.dotfiles.models import (
    LinkOp,
    LinkPlan,
    LinkTarget,
    LinkedState,
    PlanSummary,
)


def plan_link(
    desired: list[LinkTarget],
    current: LinkedState,
    filesystem_check: Callable[[Path], Optional[str]],
) -> LinkPlan:
    """Build reconciliation plan.

    filesystem_check: injected by service. Returns "file", "dir", "symlink", or None.
    """
    ops: list[LinkOp] = []
    conflicts: list[str] = []
    added = 0
    removed = 0
    unchanged = 0
    from_modules = 0

    desired_map = {t.target: t for t in desired}
    desired_targets = set(desired_map.keys())
    current_targets = set(current.links.keys())

    # Removals: in current but not desired
    for target in sorted(current_targets - desired_targets):
        ops.append(LinkOp(
            type="remove_link", target=target, source=None,
            package=current.links[target].package,
            needs_sudo=current.links[target].needs_sudo,
        ))
        removed += 1

    # Additions, updates, and unchanged
    for target in sorted(desired_targets):
        spec = desired_map[target]

        if target in current.links:
            cur = current.links[target]
            if cur.source == spec.source:
                unchanged += 1
                if spec.from_module:
                    from_modules += 1
                continue
            # Source changed: remove old link, then create new one
            ops.append(LinkOp(
                type="remove_link", target=target, source=cur.source,
                package=cur.package, needs_sudo=cur.needs_sudo,
            ))
            ops.append(LinkOp(
                type="create_link", target=target, source=spec.source,
                package=spec.package, needs_sudo=spec.needs_sudo,
            ))
            added += 1
            if spec.from_module:
                from_modules += 1
            continue

        # New target: check filesystem for conflicts
        fs_state = filesystem_check(target)
        if fs_state is not None:
            conflicts.append(
                f"{target} already exists ({fs_state}) and is not managed by flow"
            )
            continue

        ops.append(LinkOp(
            type="create_link", target=target, source=spec.source,
            package=spec.package, needs_sudo=spec.needs_sudo,
        ))
        added += 1
        if spec.from_module:
            from_modules += 1

    return LinkPlan(
        operations=ops,
        conflicts=conflicts,
        summary=PlanSummary(
            added=added, removed=removed,
            unchanged=unchanged, from_modules=from_modules,
        ),
    )


def plan_unlink(
    current: LinkedState,
    packages: Optional[list[str]],
) -> LinkPlan:
    """Plan removal of managed links."""
    ops: list[LinkOp] = []

    for target in sorted(current.links.keys()):
        lt = current.links[target]
        if packages is not None:
            # Match by full package_id or by basename
            basename = lt.package.split("/", 1)[-1] if "/" in lt.package else lt.package
            if lt.package not in packages and basename not in packages:
                continue

        ops.append(LinkOp(
            type="remove_link", target=target, source=lt.source,
            package=lt.package, needs_sudo=lt.needs_sudo,
        ))

    return LinkPlan(
        operations=ops,
        conflicts=[],
        summary=PlanSummary(added=0, removed=len(ops), unchanged=0, from_modules=0),
    )
  • Step 3: Run tests

Run: python -m pytest tests/test_domain_dotfiles_planning.py -v Expected: All passed

  • Step 4: Commit
git add src/flow/domain/dotfiles/planning.py tests/test_domain_dotfiles_planning.py
git commit -m "feat: add dotfiles link planning with conflict detection"

Chunk 3: Packages Domain

Task 12: Package models

Files:

  • Create: src/flow/domain/packages/__init__.py (empty)

  • Create: src/flow/domain/packages/models.py

  • Create: tests/test_domain_packages_models.py

  • Step 1: Write models

All frozen dataclasses as specified in spec Section 5.1: PackageDef, ProfilePackageRef, PkgInstallOp, PkgRemoveOp, PackagePlan, InstalledPackage, InstalledState.

InstalledState serialization format:

# as_dict() produces:
{
    "version": 1,
    "packages": {
        "neovim": {
            "version": "0.10.4",
            "type": "binary",
            "files": ["/home/x/.local/bin/nvim", "/home/x/.local/share/nvim"]
        }
    }
}

# from_dict() parses the above. Raises ConfigError on version mismatch.
  • Step 2: Write tests
# tests/test_domain_packages_models.py
"""Tests for packages domain models."""

from pathlib import Path

import pytest

from flow.core.errors import ConfigError
from flow.domain.packages.models import InstalledPackage, InstalledState, PackageDef


def test_installed_state_roundtrip():
    state = InstalledState(packages={
        "neovim": InstalledPackage(
            name="neovim", version="0.10.4", type="binary",
            files=[Path("/home/x/.local/bin/nvim")],
        ),
    })
    data = state.as_dict()
    restored = InstalledState.from_dict(data)
    assert "neovim" in restored.packages
    assert restored.packages["neovim"].version == "0.10.4"
    assert restored.packages["neovim"].files == [Path("/home/x/.local/bin/nvim")]


def test_installed_state_empty():
    state = InstalledState.from_dict({})
    assert state.packages == {}


def test_installed_state_version_mismatch():
    with pytest.raises(ConfigError):
        InstalledState.from_dict({"version": 99, "packages": {}})


def test_package_def_fields():
    pkg = PackageDef(
        name="fd", type="pkg", sources={"apt": "fd-find"},
        source=None, version=None, asset_pattern=None,
        platform_map={}, extract_dir=None, install={},
        post_install=None, allow_sudo=False,
    )
    assert pkg.name == "fd"
    assert pkg.type == "pkg"
  • Step 3: Run tests

Run: python -m pytest tests/test_domain_packages_models.py -v Expected: All passed

  • Step 4: Commit
git add src/flow/domain/packages/ tests/test_domain_packages_models.py
git commit -m "feat: add packages domain models"

Task 13: Package catalog and resolution

Files:

  • Create: src/flow/domain/packages/catalog.py

  • Create: src/flow/domain/packages/resolution.py

  • Create: tests/test_domain_packages.py

  • Step 1: Write tests covering:

  • parse_catalog with list and dict manifest formats

  • normalize_profile_entry with string shorthand ("binary/neovim"), plain name, dict

  • resolve_spec merging catalog and profile overrides

  • resolve_source_name with apt/brew/dnf fallbacks

  • resolve_binary_asset with platform map and asset pattern

  • resolve_download_url with github shorthand and direct URL

  • pm_update_command and pm_install_command for apt/dnf/brew

  • detect_package_manager returning apt/dnf/None

  • Step 2: Implement catalog.py and resolution.py

Implement the functions listed in spec Section 5.2 matching those exact signatures. Use src/flow/services/package_defs.py only as reference for resolution logic, not as source of truth -- the spec signatures are canonical. This is the single copy of all package resolution logic (no duplication with bootstrap).

  • Step 3: Run tests

Run: python -m pytest tests/test_domain_packages.py -v

  • Step 4: Commit
git add src/flow/domain/packages/ tests/test_domain_packages.py
git commit -m "feat: add packages catalog and resolution (single source of truth)"

Task 14: Package planning

Files:

  • Create: src/flow/domain/packages/planning.py

  • Create: tests/test_domain_packages_planning.py

  • Step 1: Write tests for plan_install and plan_remove

  • Step 2: Implement planning.py

  • Step 3: Run tests, commit

git commit -m "feat: add package install/remove planning"

Chunk 4: Bootstrap Domain

Task 15: Bootstrap models and setup modules

Files:

  • Create: src/flow/domain/bootstrap/__init__.py

  • Create: src/flow/domain/bootstrap/models.py

  • Create: src/flow/domain/bootstrap/modules.py

  • Create: tests/test_domain_bootstrap_modules.py

  • Step 1: Write models: Profile, SetupModuleDef, BootstrapAction, BootstrapPlan

  • Step 2: Write and test setup modules: HostnameModule, LocaleModule, ShellModule, SSHKeygenModule, RuncmdModule. Each returns shell commands from .plan() and a human-readable string from .describe().

describe() examples (used in dry-run output):

  • HostnameModule.describe() -> "Set hostname to my-host"
  • LocaleModule.describe() -> "Set locale to en_US.UTF-8"
  • ShellModule.describe() -> "Install and configure shell: zsh"
  • SSHKeygenModule.describe() -> "Generate 1 SSH key(s)"
  • RuncmdModule.describe() -> "Run 3 custom command(s)"

Add a test for each module asserting isinstance(module.describe(), str) and that .plan() returns a non-empty list of strings.

  • Step 3: Run tests, commit
git commit -m "feat: add bootstrap models and setup modules"

Task 16: Bootstrap planning

Files:

  • Create: src/flow/domain/bootstrap/planning.py

  • Create: tests/test_domain_bootstrap_planning.py

  • Step 1: Write tests for parse_profile and plan_bootstrap -- verify it produces correct ordered actions (validate env, setup modules, packages, shell, dotfiles link).

  • Step 2: Implement -- plan_bootstrap raises ConfigError if required env vars are missing, then builds ordered action list using packages domain for install planning.

  • Step 3: Run tests, commit

git commit -m "feat: add bootstrap planning (orchestrator over packages + dotfiles)"

Chunk 5: Remote + Containers + Projects Domains

Task 17: Remote domain

Files:

  • Create: src/flow/domain/remote/__init__.py

  • Create: src/flow/domain/remote/models.py -- contains Target and SSHCommand only. TargetConfig lives in core/config.py (already created in Task 7). Import it from there.

  • Create: src/flow/domain/remote/resolution.py

  • Create: tests/test_domain_remote.py

  • Step 1: Write tests for parse_target, resolve_target, build_ssh_command, terminfo_fix_command

  • Step 2: Implement models and resolution. Port from current src/flow/services/ssh.py. resolve_target receives list[TargetConfig] imported from flow.core.config.

  • Step 3: Run tests, commit

git commit -m "feat: add remote domain (SSH target resolution)"

Task 18: Containers domain

Files:

  • Create: src/flow/domain/containers/__init__.py

  • Create: src/flow/domain/containers/models.py

  • Create: src/flow/domain/containers/resolution.py

  • Create: tests/test_domain_containers.py

  • Step 1: Write tests for parse_image_ref, container_name, resolve_mounts, build_container_spec

  • Step 2: Implement. Port from current src/flow/services/containers.py.

  • Step 3: Run tests, commit

git commit -m "feat: add containers domain (image resolution, mount computation)"

Chunk 6: Services Layer (Part 1)

Task 19: DotfilesService

Files:

  • Create: src/flow/services/__init__.py

  • Create: src/flow/services/dotfiles.py

  • Create: tests/test_service_dotfiles.py

  • Step 1: Write integration tests using tmp_path for real filesystem + FakeRunner for git:

    • test_link_creates_symlinks -- set up dotfiles dir, call .link(), verify symlinks
    • test_link_with_module -- set up package with _module.yaml and cloned module dir, verify correct target paths
    • test_unlink_removes_symlinks -- link then unlink, verify cleaned up
    • test_link_dry_run_no_changes -- dry run does not create symlinks
    • test_status_shows_packages -- set up linked state, verify output
  • Step 2: Implement DotfilesService

The service performs all I/O:

  1. _discover_packages: walks dotfiles dir, reads _module.yaml files, builds Package objects with local_files populated
  2. Calls pure domain functions for resolution and planning
  3. Executes the plan (create/remove symlinks via FileSystem)
  4. Persists LinkedState to JSON
  • Step 3: Run tests, commit
git commit -m "feat: add DotfilesService with link/unlink/status/edit"

Task 20: PackageService

Files:

  • Create: src/flow/services/packages.py

  • Create: tests/test_service_packages.py

  • Step 1: Write tests for install, list, remove flows

  • Step 2: Implement PackageService -- uses domain planning, executes via runtime

  • Step 3: Run tests, commit

git commit -m "feat: add PackageService"

Task 21: BootstrapService

Files:

  • Create: src/flow/services/bootstrap.py

  • Create: tests/test_service_bootstrap.py

  • Step 1: Write tests -- bootstrap run with dry_run, show, list

  • Step 2: Implement -- uses domain planning, executes actions sequentially, delegates to PackageService and DotfilesService

  • Step 3: Run tests, commit

git commit -m "feat: add BootstrapService"

Chunk 7: Services Layer (Part 2)

Task 22: RemoteService

Files:

  • Create: src/flow/services/remote.py

  • Create: tests/test_service_remote.py

  • Step 1: Write tests -- enter dry_run, list targets, terminfo warning

  • Step 2: Implement

  • Step 3: Run tests, commit

git commit -m "feat: add RemoteService"

Task 23: ContainerService

Files:

  • Create: src/flow/services/containers.py

  • Create: tests/test_service_containers.py

  • Step 1: Write tests -- create, list, stop, remove with FakeRunner

  • Step 2: Implement

  • Step 3: Run tests, commit

git commit -m "feat: add ContainerService"

Task 24: ProjectService

Note: ProjectService has no domain layer -- it's a thin service that runs git commands directly. All logic is I/O-bound (listing dirs, running git). No separate domain module needed.

Files:

  • Create: src/flow/services/projects.py

  • Create: tests/test_service_projects.py

  • Step 1: Write tests

# tests/test_service_projects.py
"""Tests for ProjectService."""

def test_check_clean_repo(tmp_path):
    """Create a git repo at tmp_path/projects/myrepo, commit a file.
    Call service.check(fetch=False). Verify output contains 'clean'."""

def test_check_uncommitted_changes(tmp_path):
    """Create repo, modify a tracked file without committing.
    Verify 'uncommitted changes' in output."""

def test_check_no_git_repos(tmp_path):
    """Empty projects dir. Verify info message."""

def test_summary_shows_all_dirs(tmp_path):
    """Mix of git and non-git dirs. Verify table output."""
  • Step 2: Implement -- check() iterates dirs in projects_dir, runs git status/diff/rev-list per repo. fetch() runs git fetch --all. summary() is check(fetch=False). Use self.ctx.runtime.git for all git calls, self.ctx.console.table for output.

  • Step 3: Run tests, commit

git commit -m "feat: add ProjectService"

Chunk 8: Commands + CLI

Task 25: CLI entry point + context validation

Files:

  • Create: src/flow/cli.py (rewrite)

  • Create: tests/test_cli.py

  • Step 1: Write tests -- non-root check, context validation (remote blocked in VM), version flag, dry_run passed through

  • Step 2: Implement cli.py per spec Section 10

  • Step 3: Run tests, commit

git commit -m "feat: add CLI entry point with context awareness"

Task 26: Command modules

Files:

  • Create: src/flow/commands/remote.py
  • Create: src/flow/commands/dev.py
  • Create: src/flow/commands/dotfiles.py
  • Create: src/flow/commands/setup.py
  • Create: src/flow/commands/packages.py
  • Create: src/flow/commands/projects.py
  • Create: src/flow/commands/__init__.py

Each command module: register argparse subcommands, handler functions that construct the service and call one method.

  • Step 1: Implement all command modules (each is 30-60 lines)
  • Step 2: Write integration test that runs flow --help, flow dotfiles --help, etc.
  • Step 3: Commit
git commit -m "feat: add command modules (thin CLI adapters)"

Task 27: Zsh completion

Files:

  • Create: src/flow/commands/completion.py

  • Create: tests/test_completion.py

  • Step 1: Implement -- port and adapt from current completion.py with updated command names

  • Step 2: Write tests for complete() function with the new command surface

  • Step 3: Commit

git commit -m "feat: add zsh completion with updated command surface"

Chunk 9: Cleanup + Integration

Task 28: Delete old code

  • Step 1: Remove old files
# Remove old service/command/core files that have been replaced
rm -f src/flow/core/action.py
rm -f src/flow/core/stow.py
rm -f src/flow/core/process.py
# Old core files replaced by new ones:
# system.py -> runtime.py (already handled)
# config.py, console.py, platform.py, paths.py, errors.py -> rewritten in place
# variables.py -> template.py

# Remove old command modules
rm -rf src/flow/commands/  # Already replaced by new commands/

# Remove old services
rm -rf src/flow/services/  # Already replaced by new services/

# Remove old tests
rm -f tests/test_action.py tests/test_stow.py tests/test_commands.py
rm -f tests/test_dotfiles_folding.py tests/test_self_hosting.py
  • Step 2: Run full test suite

Run: python -m pytest tests/ -v Expected: All new tests pass, no imports of deleted modules

  • Step 3: Update pyproject.toml entry point if needed

Verify [project.scripts] flow = "flow.cli:main" still works.

  • Step 4: Test binary build
make build
./dist/flow --help
./dist/flow --version
  • Step 5: Commit
git add -A
git commit -m "chore: remove old code, complete rewrite"

Task 29: Update README

  • Step 1: Rewrite README.md to match the new command surface, config format, and architecture.

  • Step 2: Commit

git add README.md
git commit -m "docs: update README for new architecture"