Plan fixes: - detect_platform raises FlowError not RuntimeError - TargetConfig lives in core/config.py only (remote domain imports it) - plan_link handles source changes (remove_link + create_link) - resolve_package_targets skips local files when mount_path is root - LinkedState.from_dict guards on version mismatch - Added missing test for parse_module_ref with absent ref - Task 12 now has full tests and serialization format - Task 13 uses spec signatures as truth, old code as reference - Task 15 includes describe() examples and tests - Task 24 has detailed test cases for ProjectService - Note that conflicts.py is intentionally merged into planning.py - Spec Section 12 example fixed to include filesystem_check arg Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
76 KiB
Flow CLI Rewrite Implementation Plan
For agentic workers: REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Full rewrite of the flow CLI with correct domain abstractions, plan-then-execute pattern, and working module path resolution.
Architecture: Four-layer hybrid (core -> domain -> services -> commands). Domain layer is pure functions + frozen dataclasses. Services orchestrate I/O. Commands are trivial dispatchers. See docs/superpowers/specs/2026-03-16-flow-architecture-redesign.md for the full spec.
Tech Stack: Python 3.9+, PyYAML, argparse, pytest, PyInstaller
Spec: docs/superpowers/specs/2026-03-16-flow-architecture-redesign.md
Standards: CLAUDE.md
Phasing Strategy
Build bottom-up. Each phase produces tested, working code before the next begins. Old code under src/flow/ is untouched until the final phase when it's deleted.
The new code is written in-place (same src/flow/ tree) because this is a full rewrite with no backward compatibility. Old modules are deleted as their replacements land.
| Phase | What | Depends on |
|---|---|---|
| 1 | Core layer (errors, template, paths, platform, console, runtime, config) | Nothing |
| 2 | Dotfiles domain (models, modules, resolution, planning) | Core |
| 3 | Packages domain (models, catalog, resolution, planning) | Core |
| 4 | Bootstrap domain (models, modules, planning) | Core, packages domain |
| 5 | Remote + containers + projects domains | Core |
| 6 | All services | Core, all domains |
| 7 | Commands + CLI + completion | Core, all services |
| 8 | Delete old code, final integration | Everything |
Chunk 1: Core Layer
Task 1: Errors
Files:
-
Create:
src/flow/core/errors.py -
Create:
tests/test_errors.py -
Step 1: Write the error hierarchy
# src/flow/core/errors.py
"""Project-wide error types."""
class FlowError(Exception):
"""Base for all user-facing errors."""
class ConfigError(FlowError):
"""Invalid config or manifest YAML."""
class PlanConflict(FlowError):
"""Conflicts detected during planning."""
def __init__(self, message: str, conflicts: list[str]):
super().__init__(message)
self.conflicts = conflicts
class ExecutionError(FlowError):
"""A plan step failed during execution."""
- Step 2: Write tests
# tests/test_errors.py
"""Tests for flow.core.errors."""
from flow.core.errors import ConfigError, ExecutionError, FlowError, PlanConflict
def test_flow_error_is_exception():
assert issubclass(FlowError, Exception)
def test_config_error_is_flow_error():
assert issubclass(ConfigError, FlowError)
def test_plan_conflict_carries_conflicts():
err = PlanConflict("2 conflicts", ["a exists", "b exists"])
assert str(err) == "2 conflicts"
assert err.conflicts == ["a exists", "b exists"]
def test_execution_error_is_flow_error():
assert issubclass(ExecutionError, FlowError)
- Step 3: Run tests
Run: python -m pytest tests/test_errors.py -v
Expected: 4 passed
- Step 4: Commit
git add src/flow/core/errors.py tests/test_errors.py
git commit -m "feat: add FlowError hierarchy"
Task 2: Template (pure string substitution)
Files:
-
Create:
src/flow/core/template.py -
Create:
tests/test_template.py -
Step 1: Write tests
# tests/test_template.py
"""Tests for flow.core.template."""
import os
from flow.core.template import substitute, substitute_template
class TestSubstitute:
def test_replaces_dollar_var(self):
assert substitute("hello $NAME", {"NAME": "world"}) == "hello world"
def test_replaces_braced_var(self):
assert substitute("hello ${NAME}", {"NAME": "world"}) == "hello world"
def test_falls_back_to_env(self, monkeypatch):
monkeypatch.setenv("FOO", "bar")
assert substitute("$FOO", {}) == "bar"
def test_preserves_unknown_vars(self):
assert substitute("$UNKNOWN", {}) == "$UNKNOWN"
def test_non_string_passthrough(self):
assert substitute(42, {}) == 42
class TestSubstituteTemplate:
def test_replaces_double_braces(self):
assert substitute_template("nvim-{{os}}", {"os": "linux"}) == "nvim-linux"
def test_env_dot_notation(self, monkeypatch):
monkeypatch.setenv("USER", "tomas")
result = substitute_template("{{ env.USER }}", {"env": dict(os.environ)})
assert result == "tomas"
def test_nested_dict_lookup(self):
ctx = {"platform": {"arch": "arm64"}}
assert substitute_template("{{ platform.arch }}", ctx) == "arm64"
def test_preserves_unknown_templates(self):
assert substitute_template("{{ unknown }}", {}) == "{{ unknown }}"
def test_non_string_passthrough(self):
assert substitute_template(42, {}) == 42
def test_whitespace_in_braces(self):
assert substitute_template("{{ os }}", {"os": "linux"}) == "linux"
- Step 2: Run tests to verify they fail
Run: python -m pytest tests/test_template.py -v
Expected: FAIL (module not found)
- Step 3: Implement template.py
# src/flow/core/template.py
"""Variable and template substitution -- pure functions, no I/O."""
import os
import re
from typing import Any, Dict
def substitute(text: Any, variables: Dict[str, str]) -> Any:
"""Replace $VAR and ${VAR} with values from variables dict or env."""
if not isinstance(text, str):
return text
pattern = re.compile(r"\$(\w+)|\$\{([^}]+)\}")
def _replace(match: re.Match[str]) -> str:
key = match.group(1) or match.group(2) or ""
if key in variables:
return str(variables[key])
if key in os.environ:
return os.environ[key]
return match.group(0)
return pattern.sub(_replace, text)
def _resolve_template_value(expr: str, context: Dict[str, Any]) -> Any:
if expr.startswith("env."):
env_key = expr.split(".", 1)[1]
env_ctx = context.get("env", {})
if isinstance(env_ctx, dict) and env_key in env_ctx:
return env_ctx[env_key]
return os.environ.get(env_key)
if expr in context:
return context[expr]
current: Any = context
for part in expr.split("."):
if not isinstance(current, dict) or part not in current:
return None
current = current[part]
return current
def substitute_template(text: Any, context: Dict[str, Any]) -> Any:
"""Replace {{expr}} placeholders with values from context dict."""
if not isinstance(text, str):
return text
def _replace(match: re.Match[str]) -> str:
key = match.group(1).strip()
value = _resolve_template_value(key, context)
if value is None:
return match.group(0)
return str(value)
return re.sub(r"\{\{\s*([^{}]+?)\s*\}\}", _replace, text)
- Step 4: Run tests
Run: python -m pytest tests/test_template.py -v
Expected: All passed
- Step 5: Commit
git add src/flow/core/template.py tests/test_template.py
git commit -m "feat: add template substitution (pure functions)"
Task 3: Paths (XDG constants)
Files:
-
Create:
src/flow/core/paths.py -
Create:
tests/test_core_paths.py -
Step 1: Write paths.py
# src/flow/core/paths.py
"""XDG-compliant path constants for flow."""
import os
from pathlib import Path
def _xdg(env_var: str, fallback: str) -> Path:
return Path(os.environ.get(env_var, fallback))
HOME = Path.home()
CONFIG_DIR = _xdg("XDG_CONFIG_HOME", str(HOME / ".config")) / "flow"
DATA_DIR = _xdg("XDG_DATA_HOME", str(HOME / ".local" / "share")) / "flow"
STATE_DIR = _xdg("XDG_STATE_HOME", str(HOME / ".local" / "state")) / "flow"
DOTFILES_DIR = DATA_DIR / "dotfiles"
MODULES_DIR = DATA_DIR / "modules"
PACKAGES_DIR = DATA_DIR / "packages"
LINKED_STATE = STATE_DIR / "linked.json"
INSTALLED_STATE = STATE_DIR / "installed.json"
# Self-hosted flow config path (from dotfiles repo)
DOTFILES_FLOW_CONFIG = DOTFILES_DIR / "_shared" / "flow" / ".config" / "flow"
def ensure_dirs() -> None:
"""Create all required directories."""
for d in (CONFIG_DIR, DATA_DIR, STATE_DIR, MODULES_DIR, PACKAGES_DIR):
d.mkdir(parents=True, exist_ok=True)
- Step 2: Write tests
# tests/test_core_paths.py
"""Tests for flow.core.paths."""
from pathlib import Path
from flow.core import paths
def test_config_dir_ends_with_flow():
assert paths.CONFIG_DIR.name == "flow"
def test_data_dir_ends_with_flow():
assert paths.DATA_DIR.name == "flow"
def test_modules_dir_under_data():
assert paths.MODULES_DIR.parent == paths.DATA_DIR
def test_linked_state_under_state():
assert paths.LINKED_STATE.parent == paths.STATE_DIR
def test_dotfiles_flow_config_path():
expected_suffix = Path("_shared") / "flow" / ".config" / "flow"
assert str(paths.DOTFILES_FLOW_CONFIG).endswith(str(expected_suffix))
def test_ensure_dirs_creates_directories(tmp_path, monkeypatch):
monkeypatch.setattr(paths, "CONFIG_DIR", tmp_path / "config" / "flow")
monkeypatch.setattr(paths, "DATA_DIR", tmp_path / "data" / "flow")
monkeypatch.setattr(paths, "STATE_DIR", tmp_path / "state" / "flow")
monkeypatch.setattr(paths, "MODULES_DIR", tmp_path / "data" / "flow" / "modules")
monkeypatch.setattr(paths, "PACKAGES_DIR", tmp_path / "data" / "flow" / "packages")
paths.ensure_dirs()
assert (tmp_path / "config" / "flow").is_dir()
assert (tmp_path / "data" / "flow" / "modules").is_dir()
assert (tmp_path / "state" / "flow").is_dir()
- Step 3: Run tests
Run: python -m pytest tests/test_core_paths.py -v
Expected: All passed
- Step 4: Commit
git add src/flow/core/paths.py tests/test_core_paths.py
git commit -m "feat: add XDG path constants"
Task 4: Platform detection
Files:
-
Create:
src/flow/core/platform.py -
Create:
tests/test_core_platform.py -
Step 1: Write tests
# tests/test_core_platform.py
"""Tests for flow.core.platform."""
import os
import pytest
from flow.core.platform import PlatformInfo, detect_context, detect_platform
def test_platform_info_computes_platform_string():
p = PlatformInfo(os="linux", arch="x64")
assert p.platform == "linux-x64"
def test_detect_platform_returns_valid_info():
info = detect_platform()
assert info.os in ("linux", "macos")
assert info.arch in ("x64", "arm64")
assert info.platform == f"{info.os}-{info.arch}"
def test_detect_platform_raises_flow_error_on_unsupported(monkeypatch):
from flow.core.errors import FlowError
monkeypatch.setattr("platform.system", lambda: "FreeBSD")
with pytest.raises(FlowError, match="Unsupported operating system"):
detect_platform()
def test_detect_context_host(monkeypatch):
monkeypatch.delenv("DF_NAMESPACE", raising=False)
monkeypatch.delenv("DF_PLATFORM", raising=False)
assert detect_context() == "host"
def test_detect_context_vm(monkeypatch):
monkeypatch.setenv("DF_NAMESPACE", "personal")
monkeypatch.setenv("DF_PLATFORM", "orb")
assert detect_context() == "vm"
- Step 2: Implement platform.py
# src/flow/core/platform.py
"""OS/arch detection and execution context."""
import os
import platform as _platform
from dataclasses import dataclass
from flow.core.errors import FlowError
@dataclass(frozen=True)
class PlatformInfo:
os: str = "linux"
arch: str = "x64"
@property
def platform(self) -> str:
return f"{self.os}-{self.arch}"
_OS_MAP = {"Darwin": "macos", "Linux": "linux"}
_ARCH_MAP = {"x86_64": "x64", "amd64": "x64", "aarch64": "arm64", "arm64": "arm64"}
def detect_platform() -> PlatformInfo:
raw_os = _platform.system()
os_name = _OS_MAP.get(raw_os)
if os_name is None:
raise FlowError(f"Unsupported operating system: {raw_os}")
raw_arch = _platform.machine().lower()
arch = _ARCH_MAP.get(raw_arch)
if arch is None:
raise FlowError(f"Unsupported architecture: {raw_arch}")
return PlatformInfo(os=os_name, arch=arch)
def detect_context() -> str:
"""Detect execution context: 'host', 'vm', or 'container'."""
if os.path.exists("/.dockerenv") or os.path.exists("/run/.containerenv"):
return "container"
if os.environ.get("DF_NAMESPACE") and os.environ.get("DF_PLATFORM"):
return "vm"
return "host"
- Step 3: Run tests
Run: python -m pytest tests/test_core_platform.py -v
Expected: All passed
- Step 4: Commit
git add src/flow/core/platform.py tests/test_core_platform.py
git commit -m "feat: add platform detection and context awareness"
Task 5: Console output
Files:
-
Create:
src/flow/core/console.py -
Create:
tests/test_core_console.py -
Step 1: Write tests
# tests/test_core_console.py
"""Tests for flow.core.console."""
from flow.core.console import Console
def test_info_prints_message(capsys):
c = Console(color=False)
c.info("hello")
assert "hello" in capsys.readouterr().out
def test_quiet_suppresses_info(capsys):
c = Console(quiet=True, color=False)
c.info("hidden")
assert capsys.readouterr().out == ""
def test_quiet_does_not_suppress_error(capsys):
c = Console(quiet=True, color=False)
c.error("visible")
captured = capsys.readouterr()
assert "visible" in captured.err or "visible" in captured.out
def test_table_prints_headers_and_rows(capsys):
c = Console(color=False)
c.table(["NAME", "STATUS"], [["foo", "ok"], ["bar", "fail"]])
output = capsys.readouterr().out
assert "NAME" in output
assert "foo" in output
assert "bar" in output
def test_no_color_strips_ansi(capsys):
c = Console(color=False)
c.info("test")
output = capsys.readouterr().out
assert "\033[" not in output
- Step 2: Implement console.py
# src/flow/core/console.py
"""Console output formatting with TTY detection and color control."""
import os
import sys
from typing import Any, Optional
class Console:
def __init__(self, *, quiet: bool = False, color: Optional[bool] = None):
self.quiet = quiet
if color is None:
self._color = os.isatty(sys.stdout.fileno()) if hasattr(sys.stdout, "fileno") else False
else:
self._color = color
def _style(self, code: str, text: str) -> str:
if not self._color:
return text
return f"{code}{text}\033[0m"
def info(self, msg: str) -> None:
if self.quiet:
return
tag = self._style("\033[36m", "[INFO]")
print(f"{tag} {msg}")
def warn(self, msg: str) -> None:
tag = self._style("\033[33m", "[WARN]")
print(f"{tag} {msg}")
def error(self, msg: str) -> None:
tag = self._style("\033[31m", "[ERROR]")
print(f"{tag} {msg}", file=sys.stderr)
def success(self, msg: str) -> None:
tag = self._style("\033[32m", "[OK]")
print(f"{tag} {msg}")
def table(self, headers: list[str], rows: list[list[str]]) -> None:
if not rows:
return
widths = [len(h) for h in headers]
for row in rows:
for i, cell in enumerate(row):
if i < len(widths):
widths[i] = max(widths[i], len(str(cell)))
header_line = " ".join(f"{h:<{widths[i]}}" for i, h in enumerate(headers))
if self._color:
print(f"\033[1m{header_line}\033[0m")
else:
print(header_line)
print(" ".join("-" * w for w in widths))
for row in rows:
print(" ".join(f"{str(cell):<{widths[i]}}" for i, cell in enumerate(row)))
def print_plan(self, operations: list[Any], *, verb: str = "execute") -> None:
if not operations:
self.info(f"Nothing to {verb}.")
return
self.info(f"Plan ({len(operations)} operation(s)):")
for op in operations:
print(f" {op}")
- Step 3: Run tests
Run: python -m pytest tests/test_core_console.py -v
Expected: All passed
- Step 4: Commit
git add src/flow/core/console.py tests/test_core_console.py
git commit -m "feat: add Console with color/quiet/TTY support"
Task 6: Runtime primitives (CommandRunner, FileSystem, GitClient, SystemRuntime)
Files:
-
Create:
src/flow/core/runtime.py -
Create:
tests/test_core_runtime.py -
Step 1: Write tests for FileSystem
# tests/test_core_runtime.py
"""Tests for flow.core.runtime."""
from pathlib import Path
from flow.core.runtime import CommandRunner, FileSystem, GitClient, SystemRuntime
class TestFileSystem:
def test_ensure_dir_creates_nested(self, tmp_path):
fs = FileSystem()
target = tmp_path / "a" / "b" / "c"
fs.ensure_dir(target)
assert target.is_dir()
def test_write_and_read_text(self, tmp_path):
fs = FileSystem()
path = tmp_path / "test.txt"
fs.write_text(path, "hello")
assert fs.read_text(path) == "hello"
def test_read_text_default(self, tmp_path):
fs = FileSystem()
path = tmp_path / "missing.txt"
assert fs.read_text(path, default="fallback") == "fallback"
def test_write_and_read_json(self, tmp_path):
fs = FileSystem()
path = tmp_path / "data.json"
fs.write_json(path, {"key": "value"})
assert fs.read_json(path) == {"key": "value"}
def test_create_symlink(self, tmp_path):
fs = FileSystem()
source = tmp_path / "source"
source.write_text("content")
target = tmp_path / "link"
fs.create_symlink(source, target)
assert target.is_symlink()
assert target.resolve() == source.resolve()
def test_same_symlink_true(self, tmp_path):
fs = FileSystem()
source = tmp_path / "source"
source.write_text("content")
target = tmp_path / "link"
target.symlink_to(source)
assert fs.same_symlink(target, source) is True
def test_same_symlink_false(self, tmp_path):
fs = FileSystem()
source = tmp_path / "source"
source.write_text("content")
other = tmp_path / "other"
other.write_text("other")
target = tmp_path / "link"
target.symlink_to(other)
assert fs.same_symlink(target, source) is False
def test_remove_file(self, tmp_path):
fs = FileSystem()
path = tmp_path / "file"
path.write_text("x")
fs.remove_file(path)
assert not path.exists()
def test_remove_file_missing_ok(self, tmp_path):
fs = FileSystem()
fs.remove_file(tmp_path / "missing", missing_ok=True) # no error
def test_copy_file(self, tmp_path):
fs = FileSystem()
src = tmp_path / "src"
src.write_text("data")
dst = tmp_path / "sub" / "dst"
fs.copy_file(src, dst)
assert dst.read_text() == "data"
class TestCommandRunner:
def test_run_echo(self):
runner = CommandRunner()
result = runner.run(["echo", "hello"], capture_output=True)
assert result.stdout.strip() == "hello"
def test_require_binary_finds_echo(self):
runner = CommandRunner()
path = runner.require_binary("echo")
assert path is not None
class TestSystemRuntime:
def test_creates_git_client(self):
rt = SystemRuntime()
assert isinstance(rt.git, GitClient)
assert rt.git.runner is rt.runner
- Step 2: Implement runtime.py
Implement CommandRunner, FileSystem, GitClient, and SystemRuntime per the spec in Section 3.3. Port the working implementations from the current src/flow/core/system.py but:
- Remove the duplicate
write_bytesmethod - Use
FlowErrorinstead ofRuntimeErrorfor error wrapping - Keep the same API surface
# src/flow/core/runtime.py
"""Runtime primitives for process, git, state, and filesystem access."""
from __future__ import annotations
import json
import os
import shlex
import shutil
import subprocess
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Iterable, Mapping, Optional, Sequence
from flow.core.console import Console
from flow.core.errors import FlowError
class CommandRunner:
"""Subprocess wrapper with consistent defaults."""
def run(
self,
argv: Sequence[str] | Iterable[str],
*,
cwd: Optional[Path] = None,
env: Optional[Mapping[str, str]] = None,
capture_output: bool = True,
check: bool = False,
timeout: Optional[float] = None,
) -> subprocess.CompletedProcess[str]:
parts = [str(a) for a in argv]
completed = subprocess.run(
parts,
cwd=str(cwd) if cwd else None,
env=dict(env) if env else None,
capture_output=capture_output,
text=True,
check=False,
timeout=timeout,
)
if check and completed.returncode != 0:
msg = completed.stderr.strip() or completed.stdout.strip()
if not msg:
msg = f"Command failed with exit code {completed.returncode}"
raise FlowError(msg)
return completed
def run_shell(
self,
command: str,
*,
cwd: Optional[Path] = None,
env: Optional[Mapping[str, str]] = None,
capture_output: bool = True,
check: bool = False,
timeout: Optional[float] = None,
) -> subprocess.CompletedProcess[str]:
completed = subprocess.run(
command,
shell=True,
cwd=str(cwd) if cwd else None,
env=dict(env) if env else None,
capture_output=capture_output,
text=True,
check=False,
timeout=timeout,
)
if check and completed.returncode != 0:
msg = completed.stderr.strip() or completed.stdout.strip()
if not msg:
msg = f"Command failed with exit code {completed.returncode}"
raise FlowError(msg)
return completed
def stream_shell(
self,
command: str,
console: Console,
*,
check: bool = True,
) -> subprocess.CompletedProcess[str]:
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
lines: list[str] = []
assert process.stdout is not None
try:
for line in process.stdout:
stripped = line.rstrip()
if stripped:
lines.append(stripped)
finally:
process.stdout.close()
process.wait()
if check and process.returncode != 0:
raise FlowError(f"Command failed (exit {process.returncode}): {command}")
return subprocess.CompletedProcess(command, process.returncode, stdout="\n".join(lines), stderr="")
def require_binary(self, name: str) -> str:
path = shutil.which(name)
if path is None:
raise FlowError(f"Required executable not found: {name}")
return path
class FileSystem:
"""Filesystem wrapper for all mutating operations."""
def ensure_dir(self, path: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None, mode: Optional[int] = None) -> None:
if sudo:
if runner is None:
raise FlowError("Runner required for sudo operations")
runner.require_binary("sudo")
argv: list[str] = ["sudo", "mkdir", "-p"]
if mode is not None:
argv.extend(["-m", f"{mode:o}"])
argv.append(str(path))
runner.run(argv, check=True)
return
path.mkdir(parents=True, exist_ok=True)
if mode is not None:
path.chmod(mode)
def remove_file(self, path: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None, missing_ok: bool = True) -> None:
if sudo:
if runner is None:
raise FlowError("Runner required for sudo operations")
argv = ["sudo", "rm"]
if missing_ok:
argv.append("-f")
argv.append(str(path))
runner.run(argv, check=True)
return
try:
path.unlink()
except FileNotFoundError:
if not missing_ok:
raise
def remove_tree(self, path: Path) -> None:
shutil.rmtree(path, ignore_errors=True)
def copy_file(self, source: Path, target: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None) -> None:
if sudo:
if runner is None:
raise FlowError("Runner required for sudo operations")
self.ensure_dir(target.parent, sudo=True, runner=runner)
runner.run(["sudo", "cp", "-a", str(source), str(target)], check=True)
return
self.ensure_dir(target.parent)
shutil.copy2(source, target)
def copy_tree(self, source: Path, target: Path) -> None:
self.ensure_dir(target.parent)
shutil.copytree(source, target, dirs_exist_ok=True)
def create_symlink(self, source: Path, target: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None) -> None:
if sudo:
if runner is None:
raise FlowError("Runner required for sudo operations")
self.ensure_dir(target.parent, sudo=True, runner=runner)
runner.run(["sudo", "ln", "-sfn", str(source), str(target)], check=True)
return
self.ensure_dir(target.parent)
target.symlink_to(source)
def same_symlink(self, target: Path, source: Path) -> bool:
if not target.is_symlink():
return False
return target.resolve(strict=False) == source.resolve(strict=False)
def read_text(self, path: Path, *, default: Optional[str] = None) -> str:
try:
return path.read_text(encoding="utf-8")
except FileNotFoundError:
if default is None:
raise
return default
def write_text(self, path: Path, content: str) -> None:
self.ensure_dir(path.parent)
path.write_text(content, encoding="utf-8")
def write_bytes(self, path: Path, content: bytes) -> None:
self.ensure_dir(path.parent)
path.write_bytes(content)
def read_json(self, path: Path, *, default: Any = None) -> Any:
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except FileNotFoundError:
return default
def write_json(self, path: Path, data: Any) -> None:
self.ensure_dir(path.parent)
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
class GitClient:
"""Git adapter scoped to a repository root."""
def __init__(self, runner: CommandRunner):
self.runner = runner
def run(self, repo_dir: Path, *args: str, capture_output: bool = True, check: bool = False) -> subprocess.CompletedProcess[str]:
return self.runner.run(
["git", "-C", str(repo_dir), *args],
capture_output=capture_output,
check=check,
)
@dataclass
class SystemRuntime:
"""Shared runtime dependencies."""
runner: CommandRunner = field(default_factory=CommandRunner)
fs: FileSystem = field(default_factory=FileSystem)
git: GitClient = field(init=False)
def __post_init__(self) -> None:
self.git = GitClient(self.runner)
- Step 3: Run tests
Run: python -m pytest tests/test_core_runtime.py -v
Expected: All passed
- Step 4: Commit
git add src/flow/core/runtime.py tests/test_core_runtime.py
git commit -m "feat: add runtime primitives (CommandRunner, FileSystem, GitClient)"
Task 7: Config loading and FlowContext
Files:
-
Create:
src/flow/core/config.py -
Create:
tests/test_core_config.py -
Step 1: Write tests
# tests/test_core_config.py
"""Tests for flow.core.config."""
from pathlib import Path
import pytest
from flow.core.config import AppConfig, load_config, load_manifest
def test_load_config_missing_path(tmp_path):
cfg = load_config(tmp_path / "nonexistent")
assert isinstance(cfg, AppConfig)
assert cfg.dotfiles_url == ""
assert cfg.container_registry == "registry.tomastm.com"
def test_load_config_from_yaml(tmp_path):
(tmp_path / "config.yaml").write_text(
"repository:\n"
" url: git@github.com:user/dots.git\n"
" branch: dev\n"
"paths:\n"
" projects: ~/code\n"
"defaults:\n"
" container-registry: my.registry.com\n"
" tmux-session: main\n"
)
cfg = load_config(tmp_path)
assert cfg.dotfiles_url == "git@github.com:user/dots.git"
assert cfg.dotfiles_branch == "dev"
assert cfg.projects_dir == "~/code"
assert cfg.container_registry == "my.registry.com"
assert cfg.tmux_session == "main"
def test_load_config_parses_targets_shorthand(tmp_path):
(tmp_path / "config.yaml").write_text(
"targets:\n"
" personal@orb: personal.orb\n"
)
cfg = load_config(tmp_path)
assert len(cfg.targets) == 1
assert cfg.targets[0].namespace == "personal"
assert cfg.targets[0].platform == "orb"
assert cfg.targets[0].host == "personal.orb"
def test_load_config_parses_targets_dict(tmp_path):
(tmp_path / "config.yaml").write_text(
"targets:\n"
" work@ec2:\n"
" host: work.ec2.internal\n"
" identity: ~/.ssh/id_work\n"
)
cfg = load_config(tmp_path)
assert len(cfg.targets) == 1
assert cfg.targets[0].host == "work.ec2.internal"
assert cfg.targets[0].identity == "~/.ssh/id_work"
def test_load_manifest_returns_dict(tmp_path):
(tmp_path / "manifest.yaml").write_text(
"packages:\n"
" - name: fd\n"
" type: pkg\n"
)
data = load_manifest(tmp_path)
assert isinstance(data, dict)
assert "packages" in data
def test_load_manifest_merges_files(tmp_path):
(tmp_path / "01-packages.yaml").write_text("packages:\n - name: fd\n type: pkg\n")
(tmp_path / "02-profiles.yaml").write_text("profiles:\n work:\n os: linux\n")
data = load_manifest(tmp_path)
assert "packages" in data
assert "profiles" in data
- Step 2: Implement config.py
Port config loading from current src/flow/core/config.py with these changes:
- Use
TargetConfigdataclass (new) matching spec Section 7.1 - Use
FlowContextdataclass matching spec Section 3.4 - Support both shorthand string and dict target formats
- Remove all
_get_valuemulti-key fallback logic (one canonical form per CLAUDE.md)
The config parser normalizes target entries:
"personal@orb: personal.orb"->TargetConfig(namespace="personal", platform="orb", host="personal.orb")- Dict form:
TargetConfig(namespace=..., platform=..., host=..., identity=...)
Key types:
@dataclass(frozen=True)
class TargetConfig:
namespace: str
platform: str
host: str
identity: str | None = None
@dataclass
class AppConfig:
dotfiles_url: str = ""
dotfiles_branch: str = "main"
projects_dir: str = "~/projects"
container_registry: str = "registry.tomastm.com"
container_tag: str = "latest"
tmux_session: str = "default"
targets: list[TargetConfig] = field(default_factory=list)
@dataclass
class FlowContext:
config: AppConfig
manifest: dict[str, Any]
platform: PlatformInfo
console: Console
runtime: SystemRuntime = field(default_factory=SystemRuntime)
- Step 3: Run tests
Run: python -m pytest tests/test_core_config.py -v
Expected: All passed
- Step 4: Commit
git add src/flow/core/config.py tests/test_core_config.py
git commit -m "feat: add config loading and FlowContext"
Chunk 2: Dotfiles Domain
Task 8: Dotfiles models
Files:
-
Create:
src/flow/domain/__init__.py(empty) -
Create:
src/flow/domain/dotfiles/__init__.py(empty) -
Create:
src/flow/domain/dotfiles/models.py -
Create:
tests/test_domain_dotfiles_models.py -
Step 1: Write models
# src/flow/domain/dotfiles/models.py
"""Dotfiles domain models -- all frozen dataclasses."""
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
@dataclass(frozen=True)
class ModuleRef:
"""An external git repo providing content for a package subtree."""
source: str
ref_type: str # "branch" | "tag" | "commit"
ref_value: str
mount_path: Path # Relative path within package to _module.yaml parent
cache_dir: Path # Where the repo is cloned
module_files: tuple[tuple[Path, Path], ...] # (abs_source, rel_to_cache_root)
@dataclass(frozen=True)
class Package:
"""A dotfiles package: a named set of files mapping to home-relative targets."""
name: str # e.g. "zsh", "nvim"
layer: str # "_shared" or profile name
package_id: str # "layer/name"
source_dir: Path # Absolute path in dotfiles repo
module: Optional[ModuleRef]
local_files: tuple[tuple[Path, Path], ...] # (abs_source, rel_to_package_root)
@dataclass(frozen=True)
class LinkTarget:
"""A single file that should be linked into the filesystem."""
source: Path
target: Path
package: str # package_id
from_module: bool
needs_sudo: bool
@dataclass(frozen=True)
class LinkOp:
"""A single operation in a link plan."""
type: str # "create_link" | "remove_link" | "create_dir"
target: Path
source: Optional[Path]
package: str
needs_sudo: bool
def __str__(self) -> str:
if self.type == "create_link":
sudo = " (sudo)" if self.needs_sudo else ""
return f"LINK: {self.target} -> {self.source}{sudo}"
if self.type == "remove_link":
return f"REMOVE: {self.target}"
if self.type == "create_dir":
return f"MKDIR: {self.target}"
return f"{self.type}: {self.target}"
@dataclass(frozen=True)
class PlanSummary:
added: int
removed: int
unchanged: int
from_modules: int
@dataclass(frozen=True)
class LinkPlan:
"""Complete reconciliation plan."""
operations: list[LinkOp]
conflicts: list[str]
summary: PlanSummary
@dataclass
class LinkedState:
"""Persisted link state."""
links: dict[Path, LinkTarget] = field(default_factory=dict)
def as_dict(self) -> dict:
grouped: dict[str, dict[str, dict]] = {}
for target, lt in sorted(self.links.items(), key=lambda x: str(x[0])):
pkg_links = grouped.setdefault(lt.package, {})
pkg_links[str(target)] = {
"source": str(lt.source),
"from_module": lt.from_module,
"needs_sudo": lt.needs_sudo,
}
return {"version": 2, "links": grouped}
@classmethod
def from_dict(cls, data: dict) -> "LinkedState":
version = data.get("version")
if version is not None and version != 2:
from flow.core.errors import ConfigError
raise ConfigError(
f"Unsupported linked.json version {version}. "
"Delete ~/.local/state/flow/linked.json and relink."
)
links: dict[Path, LinkTarget] = {}
raw_links = data.get("links", {})
for package, pkg_links in raw_links.items():
for target_str, info in pkg_links.items():
links[Path(target_str)] = LinkTarget(
source=Path(info["source"]),
target=Path(target_str),
package=str(package),
from_module=bool(info.get("from_module", False)),
needs_sudo=bool(info.get("needs_sudo", False)),
)
return cls(links=links)
@dataclass(frozen=True)
class RepoInfo:
"""A managed git repo (dotfiles or module)."""
name: str
path: Path
source: str
is_module: bool
- Step 2: Write tests
# tests/test_domain_dotfiles_models.py
"""Tests for dotfiles domain models."""
from pathlib import Path
from flow.domain.dotfiles.models import (
LinkOp,
LinkPlan,
LinkTarget,
LinkedState,
ModuleRef,
Package,
PlanSummary,
)
def test_link_op_str_create():
op = LinkOp(type="create_link", target=Path("/home/x/.zshrc"),
source=Path("/dots/zsh/.zshrc"), package="_shared/zsh", needs_sudo=False)
assert "LINK:" in str(op)
assert ".zshrc" in str(op)
def test_link_op_str_sudo():
op = LinkOp(type="create_link", target=Path("/etc/hosts"),
source=Path("/dots/dns/hosts"), package="_shared/dns", needs_sudo=True)
assert "(sudo)" in str(op)
def test_linked_state_roundtrip():
lt = LinkTarget(source=Path("/a"), target=Path("/b"), package="p", from_module=False, needs_sudo=False)
state = LinkedState(links={Path("/b"): lt})
data = state.as_dict()
restored = LinkedState.from_dict(data)
assert Path("/b") in restored.links
assert restored.links[Path("/b")].source == Path("/a")
assert restored.links[Path("/b")].package == "p"
def test_linked_state_empty():
state = LinkedState.from_dict({})
assert state.links == {}
def test_package_has_id():
pkg = Package(name="zsh", layer="_shared", package_id="_shared/zsh",
source_dir=Path("/dots/_shared/zsh"), module=None, local_files=())
assert pkg.package_id == "_shared/zsh"
- Step 3: Run tests
Run: python -m pytest tests/test_domain_dotfiles_models.py -v
Expected: All passed
- Step 4: Commit
git add src/flow/domain/__init__.py src/flow/domain/dotfiles/__init__.py \
src/flow/domain/dotfiles/models.py tests/test_domain_dotfiles_models.py
git commit -m "feat: add dotfiles domain models"
Task 9: Dotfiles modules (mount path, cache dir, source normalization)
Files:
-
Create:
src/flow/domain/dotfiles/modules.py -
Create:
tests/test_domain_dotfiles_modules.py -
Step 1: Write tests
# tests/test_domain_dotfiles_modules.py
"""Tests for dotfiles module resolution -- the core bug fix."""
from pathlib import Path
import pytest
from flow.core.errors import ConfigError
from flow.domain.dotfiles.modules import (
compute_mount_path,
module_cache_dir,
normalize_source,
parse_module_ref,
)
class TestComputeMountPath:
def test_nested_module(self):
"""_shared/nvim/.config/nvim/_module.yaml -> .config/nvim"""
result = compute_mount_path(
module_yaml=Path("/dots/_shared/nvim/.config/nvim/_module.yaml"),
package_dir=Path("/dots/_shared/nvim"),
)
assert result == Path(".config/nvim")
def test_root_level_module(self):
"""_shared/nvim/_module.yaml -> Path('.')"""
result = compute_mount_path(
module_yaml=Path("/dots/_shared/nvim/_module.yaml"),
package_dir=Path("/dots/_shared/nvim"),
)
assert result == Path(".")
def test_deeply_nested(self):
result = compute_mount_path(
module_yaml=Path("/dots/_shared/pkg/.config/a/b/c/_module.yaml"),
package_dir=Path("/dots/_shared/pkg"),
)
assert result == Path(".config/a/b/c")
class TestModuleCacheDir:
def test_simple_name(self):
result = module_cache_dir("_shared/nvim", Path("/home/x/.local/share/flow/modules"))
assert result == Path("/home/x/.local/share/flow/modules/_shared--nvim")
def test_profile_name(self):
result = module_cache_dir("linux-work/nvim", Path("/m"))
assert result == Path("/m/linux-work--nvim")
class TestNormalizeSource:
def test_github_shorthand(self):
assert normalize_source("github:org/repo") == "https://github.com/org/repo.git"
def test_full_url_passthrough(self):
assert normalize_source("https://example.com/repo.git") == "https://example.com/repo.git"
def test_ssh_passthrough(self):
assert normalize_source("git@github.com:org/repo.git") == "git@github.com:org/repo.git"
class TestParseModuleRef:
def test_branch_ref(self):
raw = {"source": "github:org/nvim-config", "ref": {"branch": "main"}}
ref = parse_module_ref(
raw, package_id="_shared/nvim",
mount_path=Path(".config/nvim"),
modules_base=Path("/modules"),
)
assert ref.source == "https://github.com/org/nvim-config.git"
assert ref.ref_type == "branch"
assert ref.ref_value == "main"
assert ref.mount_path == Path(".config/nvim")
assert ref.cache_dir == Path("/modules/_shared--nvim")
def test_tag_ref(self):
raw = {"source": "github:org/repo", "ref": {"tag": "v1.0"}}
ref = parse_module_ref(raw, "p/x", Path("."), Path("/m"))
assert ref.ref_type == "tag"
assert ref.ref_value == "v1.0"
def test_missing_source_raises(self):
with pytest.raises(ConfigError):
parse_module_ref({}, "p/x", Path("."), Path("/m"))
def test_missing_ref_raises(self):
raw = {"source": "github:org/repo"}
with pytest.raises(ConfigError):
parse_module_ref(raw, "p/x", Path("."), Path("/m"))
def test_ref_not_dict_raises(self):
raw = {"source": "github:org/repo", "ref": "main"}
with pytest.raises(ConfigError):
parse_module_ref(raw, "p/x", Path("."), Path("/m"))
def test_ambiguous_ref_raises(self):
raw = {"source": "github:org/repo", "ref": {"branch": "main", "tag": "v1"}}
with pytest.raises(ConfigError):
parse_module_ref(raw, "p/x", Path("."), Path("/m"))
- Step 2: Implement modules.py
# src/flow/domain/dotfiles/modules.py
"""Module metadata resolution -- pure functions."""
from pathlib import Path
from typing import Tuple
from flow.core.errors import ConfigError
from flow.domain.dotfiles.models import ModuleRef
def compute_mount_path(module_yaml: Path, package_dir: Path) -> Path:
"""Relative path from package root to _module.yaml parent."""
rel = module_yaml.parent.relative_to(package_dir)
return rel
def module_cache_dir(package_id: str, modules_base: Path) -> Path:
"""Cache dir for a module clone. '/' -> '--' to avoid collisions."""
return modules_base / package_id.replace("/", "--")
def normalize_source(source: str) -> str:
"""Normalize git source URL. github:org/repo -> https://github.com/org/repo.git"""
if source.startswith("github:"):
repo = source.split(":", 1)[1]
return f"https://github.com/{repo}.git"
return source
def parse_module_ref(
raw: dict,
package_id: str,
mount_path: Path,
modules_base: Path,
) -> ModuleRef:
"""Build ModuleRef from parsed _module.yaml content."""
source = raw.get("source")
if not isinstance(source, str) or not source:
raise ConfigError(f"Module for {package_id}: 'source' must be a non-empty string")
ref = raw.get("ref")
if not isinstance(ref, dict):
raise ConfigError(f"Module for {package_id}: 'ref' must be a mapping")
choices = [k for k in ("branch", "tag", "commit") if isinstance(ref.get(k), str) and ref[k]]
if len(choices) != 1:
raise ConfigError(f"Module for {package_id}: 'ref' must have exactly one of: branch, tag, commit")
ref_type = choices[0]
ref_value = str(ref[ref_type])
return ModuleRef(
source=normalize_source(source),
ref_type=ref_type,
ref_value=ref_value,
mount_path=mount_path,
cache_dir=module_cache_dir(package_id, modules_base),
module_files=(), # Populated by service after cloning
)
- Step 3: Run tests
Run: python -m pytest tests/test_domain_dotfiles_modules.py -v
Expected: All passed
- Step 4: Commit
git add src/flow/domain/dotfiles/modules.py tests/test_domain_dotfiles_modules.py
git commit -m "feat: add dotfiles module resolution (fixes _module.yaml path bug)"
Task 10: Dotfiles resolution (package -> LinkTargets)
Files:
-
Create:
src/flow/domain/dotfiles/resolution.py -
Create:
tests/test_domain_dotfiles_resolution.py -
Step 1: Write tests
# tests/test_domain_dotfiles_resolution.py
"""Tests for dotfiles path resolution."""
from pathlib import Path
import pytest
from flow.core.errors import PlanConflict
from flow.domain.dotfiles.models import LinkTarget, ModuleRef, Package
from flow.domain.dotfiles.resolution import resolve_all_targets, resolve_package_targets
RESERVED_ROOT = "_root"
HOME = Path("/home/testuser")
def _pkg(name, layer="_shared", files=(), module=None):
return Package(
name=name,
layer=layer,
package_id=f"{layer}/{name}",
source_dir=Path(f"/dots/{layer}/{name}"),
module=module,
local_files=tuple(files),
)
class TestResolvePackageTargets:
def test_simple_file(self):
pkg = _pkg("zsh", files=[
(Path("/dots/_shared/zsh/.zshrc"), Path(".zshrc")),
])
targets = resolve_package_targets(pkg, HOME, set())
assert len(targets) == 1
assert targets[0].target == HOME / ".zshrc"
assert targets[0].source == Path("/dots/_shared/zsh/.zshrc")
assert targets[0].from_module is False
def test_nested_config(self):
pkg = _pkg("git", files=[
(Path("/dots/_shared/git/.config/git/config"), Path(".config/git/config")),
])
targets = resolve_package_targets(pkg, HOME, set())
assert targets[0].target == HOME / ".config" / "git" / "config"
def test_root_marker(self):
pkg = _pkg("dns", files=[
(Path("/dots/_shared/dns/_root/etc/hosts"), Path("_root/etc/hosts")),
])
targets = resolve_package_targets(pkg, HOME, set())
assert targets[0].target == Path("/etc/hosts")
assert targets[0].needs_sudo is True
def test_root_marker_skipped_when_in_skip_set(self):
pkg = _pkg("dns", files=[
(Path("/dots/_shared/dns/_root/etc/hosts"), Path("_root/etc/hosts")),
])
targets = resolve_package_targets(pkg, HOME, {"_root"})
assert len(targets) == 0
def test_skip_package_by_name(self):
pkg = _pkg("nvim", files=[
(Path("/dots/_shared/nvim/.config/nvim/init.lua"), Path(".config/nvim/init.lua")),
])
targets = resolve_package_targets(pkg, HOME, {"nvim"})
assert len(targets) == 0
def test_module_files_linked_under_mount_path(self):
module = ModuleRef(
source="https://github.com/org/nvim-config.git",
ref_type="branch",
ref_value="main",
mount_path=Path(".config/nvim"),
cache_dir=Path("/modules/_shared--nvim"),
module_files=(
(Path("/modules/_shared--nvim/init.lua"), Path("init.lua")),
(Path("/modules/_shared--nvim/lua/plugins.lua"), Path("lua/plugins.lua")),
),
)
pkg = _pkg("nvim", files=[
(Path("/dots/_shared/nvim/.local/bin/nvim-wrapper"), Path(".local/bin/nvim-wrapper")),
], module=module)
targets = resolve_package_targets(pkg, HOME, set())
# Local file outside mount_path
local_targets = [t for t in targets if not t.from_module]
assert len(local_targets) == 1
assert local_targets[0].target == HOME / ".local" / "bin" / "nvim-wrapper"
# Module files under mount_path
module_targets = [t for t in targets if t.from_module]
assert len(module_targets) == 2
module_target_paths = {t.target for t in module_targets}
assert HOME / ".config" / "nvim" / "init.lua" in module_target_paths
assert HOME / ".config" / "nvim" / "lua" / "plugins.lua" in module_target_paths
def test_module_yaml_file_not_linked(self):
"""The _module.yaml marker itself should never be linked."""
pkg = _pkg("nvim", files=[
(Path("/dots/_shared/nvim/.config/nvim/_module.yaml"), Path(".config/nvim/_module.yaml")),
], module=ModuleRef(
source="x", ref_type="branch", ref_value="main",
mount_path=Path(".config/nvim"),
cache_dir=Path("/m"), module_files=(),
))
targets = resolve_package_targets(pkg, HOME, set())
assert not any(t.target.name == "_module.yaml" for t in targets)
def test_root_level_module_skips_all_local_files(self):
"""When mount_path is '.', all local files are from the module, not dotfiles repo."""
module = ModuleRef(
source="x", ref_type="branch", ref_value="main",
mount_path=Path("."),
cache_dir=Path("/m"),
module_files=(
(Path("/m/init.lua"), Path("init.lua")),
),
)
pkg = _pkg("nvim", files=[
(Path("/dots/_shared/nvim/_module.yaml"), Path("_module.yaml")),
(Path("/dots/_shared/nvim/stale-file.txt"), Path("stale-file.txt")),
], module=module)
targets = resolve_package_targets(pkg, HOME, set())
# Only module files should appear, local files skipped
assert len(targets) == 1
assert targets[0].from_module is True
assert targets[0].target == HOME / "init.lua"
class TestResolveAllTargets:
def test_no_conflicts(self):
pkgs = [
_pkg("zsh", files=[(Path("/a/.zshrc"), Path(".zshrc"))]),
_pkg("git", files=[(Path("/a/.gitconfig"), Path(".gitconfig"))]),
]
targets = resolve_all_targets(pkgs, HOME, set())
assert len(targets) == 2
def test_duplicate_target_raises(self):
pkgs = [
_pkg("zsh", layer="_shared", files=[(Path("/a/.zshrc"), Path(".zshrc"))]),
_pkg("zsh", layer="work", files=[(Path("/b/.zshrc"), Path(".zshrc"))]),
]
with pytest.raises(PlanConflict):
resolve_all_targets(pkgs, HOME, set())
- Step 2: Implement resolution.py
# src/flow/domain/dotfiles/resolution.py
"""Path resolution: package -> home-relative LinkTargets. Pure functions."""
from pathlib import Path
from flow.core.errors import PlanConflict
from flow.domain.dotfiles.models import LinkTarget, Package
RESERVED_ROOT = "_root"
MODULE_FILE = "_module.yaml"
def resolve_package_targets(
package: Package,
home: Path,
skip: set[str],
) -> list[LinkTarget]:
"""Resolve all LinkTargets for a package, handling modules correctly."""
if package.name in skip:
return []
targets: list[LinkTarget] = []
mount_path = package.module.mount_path if package.module else None
# Local files (from dotfiles repo)
for abs_source, rel in package.local_files:
# Skip _module.yaml
if rel.name == MODULE_FILE:
continue
# If module exists, skip files inside mount_path (module provides those)
if mount_path is not None:
if mount_path == Path("."):
continue # Root-level module: all local files are superseded by module
try:
rel.relative_to(mount_path)
continue # Inside mount_path, skip
except ValueError:
pass # Outside mount_path, process normally
target, needs_sudo = _resolve_target(rel, home, skip)
if target is None:
continue
targets.append(LinkTarget(
source=abs_source, target=target,
package=package.package_id,
from_module=False, needs_sudo=needs_sudo,
))
# Module files
if package.module:
for abs_source, rel in package.module.module_files:
mounted = package.module.mount_path / rel if package.module.mount_path != Path(".") else rel
target, needs_sudo = _resolve_target(mounted, home, skip)
if target is None:
continue
targets.append(LinkTarget(
source=abs_source, target=target,
package=package.package_id,
from_module=True, needs_sudo=needs_sudo,
))
return targets
def _resolve_target(rel: Path, home: Path, skip: set[str]) -> tuple[Path | None, bool]:
"""Resolve a relative path to an absolute target. Returns (target, needs_sudo)."""
parts = rel.parts
if parts and parts[0] == RESERVED_ROOT:
if RESERVED_ROOT in skip:
return None, False
if len(parts) < 2:
return None, False
return Path("/") / Path(*parts[1:]), True
return home / rel, False
def resolve_all_targets(
packages: list[Package],
home: Path,
skip: set[str],
) -> list[LinkTarget]:
"""Resolve targets for all packages. Raises PlanConflict on duplicate targets."""
all_targets: list[LinkTarget] = []
seen: dict[Path, str] = {}
for pkg in packages:
targets = resolve_package_targets(pkg, home, skip)
for t in targets:
if t.target in seen:
conflicts = [
f"{t.target} claimed by both {seen[t.target]} and {t.package}"
]
raise PlanConflict(
f"Conflicting dotfile targets across packages",
conflicts,
)
seen[t.target] = t.package
all_targets.append(t)
return all_targets
- Step 3: Run tests
Run: python -m pytest tests/test_domain_dotfiles_resolution.py -v
Expected: All passed
- Step 4: Commit
git add src/flow/domain/dotfiles/resolution.py tests/test_domain_dotfiles_resolution.py
git commit -m "feat: add dotfiles path resolution with correct module mounting"
Task 11: Dotfiles planning (link/unlink plans)
Note: The spec's file layout lists domain/dotfiles/conflicts.py as a separate file. This is intentionally merged into planning.py -- cross-package collisions are caught by resolve_all_targets (Task 10), and filesystem conflicts are detected inside plan_link via the injected filesystem_check callback. No separate conflicts.py file is needed.
Files:
-
Create:
src/flow/domain/dotfiles/planning.py -
Create:
tests/test_domain_dotfiles_planning.py -
Step 1: Write tests
# tests/test_domain_dotfiles_planning.py
"""Tests for dotfiles link planning."""
from pathlib import Path
from typing import Optional
from flow.domain.dotfiles.models import (
LinkOp,
LinkTarget,
LinkedState,
)
from flow.domain.dotfiles.planning import plan_link, plan_unlink
def _lt(target, source="/a", pkg="_shared/zsh", module=False, sudo=False):
return LinkTarget(
source=Path(source), target=Path(target),
package=pkg, from_module=module, needs_sudo=sudo,
)
def _fs_check_none(path: Path) -> Optional[str]:
"""Fake filesystem_check: nothing exists."""
return None
def _fs_check_file(path: Path) -> Optional[str]:
"""Fake: everything is a file."""
return "file"
class TestPlanLink:
def test_new_target_creates_link(self):
desired = [_lt("/home/x/.zshrc")]
plan = plan_link(desired, LinkedState(), _fs_check_none)
assert len(plan.operations) == 1
assert plan.operations[0].type == "create_link"
assert plan.summary.added == 1
def test_existing_correct_link_unchanged(self):
lt = _lt("/home/x/.zshrc")
current = LinkedState(links={Path("/home/x/.zshrc"): lt})
plan = plan_link([lt], current, _fs_check_none)
assert len(plan.operations) == 0
assert plan.summary.unchanged == 1
def test_stale_link_removed(self):
old = _lt("/home/x/.old")
current = LinkedState(links={Path("/home/x/.old"): old})
plan = plan_link([], current, _fs_check_none)
assert len(plan.operations) == 1
assert plan.operations[0].type == "remove_link"
assert plan.summary.removed == 1
def test_changed_source_produces_remove_then_create(self):
old = _lt("/home/x/.zshrc", source="/old")
new = _lt("/home/x/.zshrc", source="/new")
current = LinkedState(links={Path("/home/x/.zshrc"): old})
plan = plan_link([new], current, _fs_check_none)
types = [op.type for op in plan.operations]
assert types == ["remove_link", "create_link"]
def test_unmanaged_file_at_target_is_conflict(self):
desired = [_lt("/home/x/.zshrc")]
plan = plan_link(desired, LinkedState(), _fs_check_file)
assert len(plan.conflicts) == 1
assert ".zshrc" in plan.conflicts[0]
def test_module_targets_counted(self):
desired = [_lt("/home/x/.config/nvim/init.lua", module=True)]
plan = plan_link(desired, LinkedState(), _fs_check_none)
assert plan.summary.from_modules == 1
class TestPlanUnlink:
def test_unlink_all(self):
lt = _lt("/home/x/.zshrc")
current = LinkedState(links={Path("/home/x/.zshrc"): lt})
plan = plan_unlink(current, packages=None)
assert len(plan.operations) == 1
assert plan.operations[0].type == "remove_link"
def test_unlink_specific_package(self):
zsh = _lt("/home/x/.zshrc", pkg="_shared/zsh")
git = _lt("/home/x/.gitconfig", pkg="_shared/git")
current = LinkedState(links={
Path("/home/x/.zshrc"): zsh,
Path("/home/x/.gitconfig"): git,
})
plan = plan_unlink(current, packages=["_shared/zsh"])
assert len(plan.operations) == 1
assert plan.operations[0].target == Path("/home/x/.zshrc")
def test_unlink_by_basename(self):
zsh = _lt("/home/x/.zshrc", pkg="_shared/zsh")
current = LinkedState(links={Path("/home/x/.zshrc"): zsh})
plan = plan_unlink(current, packages=["zsh"])
assert len(plan.operations) == 1
- Step 2: Implement planning.py
# src/flow/domain/dotfiles/planning.py
"""Link/unlink plan computation. Pure functions with injected I/O."""
from pathlib import Path
from typing import Callable, Optional
from flow.domain.dotfiles.models import (
LinkOp,
LinkPlan,
LinkTarget,
LinkedState,
PlanSummary,
)
def plan_link(
desired: list[LinkTarget],
current: LinkedState,
filesystem_check: Callable[[Path], Optional[str]],
) -> LinkPlan:
"""Build reconciliation plan.
filesystem_check: injected by service. Returns "file", "dir", "symlink", or None.
"""
ops: list[LinkOp] = []
conflicts: list[str] = []
added = 0
removed = 0
unchanged = 0
from_modules = 0
desired_map = {t.target: t for t in desired}
desired_targets = set(desired_map.keys())
current_targets = set(current.links.keys())
# Removals: in current but not desired
for target in sorted(current_targets - desired_targets):
ops.append(LinkOp(
type="remove_link", target=target, source=None,
package=current.links[target].package,
needs_sudo=current.links[target].needs_sudo,
))
removed += 1
# Additions, updates, and unchanged
for target in sorted(desired_targets):
spec = desired_map[target]
if target in current.links:
cur = current.links[target]
if cur.source == spec.source:
unchanged += 1
if spec.from_module:
from_modules += 1
continue
# Source changed: remove old link, then create new one
ops.append(LinkOp(
type="remove_link", target=target, source=cur.source,
package=cur.package, needs_sudo=cur.needs_sudo,
))
ops.append(LinkOp(
type="create_link", target=target, source=spec.source,
package=spec.package, needs_sudo=spec.needs_sudo,
))
added += 1
if spec.from_module:
from_modules += 1
continue
# New target: check filesystem for conflicts
fs_state = filesystem_check(target)
if fs_state is not None:
conflicts.append(
f"{target} already exists ({fs_state}) and is not managed by flow"
)
continue
ops.append(LinkOp(
type="create_link", target=target, source=spec.source,
package=spec.package, needs_sudo=spec.needs_sudo,
))
added += 1
if spec.from_module:
from_modules += 1
return LinkPlan(
operations=ops,
conflicts=conflicts,
summary=PlanSummary(
added=added, removed=removed,
unchanged=unchanged, from_modules=from_modules,
),
)
def plan_unlink(
current: LinkedState,
packages: Optional[list[str]],
) -> LinkPlan:
"""Plan removal of managed links."""
ops: list[LinkOp] = []
for target in sorted(current.links.keys()):
lt = current.links[target]
if packages is not None:
# Match by full package_id or by basename
basename = lt.package.split("/", 1)[-1] if "/" in lt.package else lt.package
if lt.package not in packages and basename not in packages:
continue
ops.append(LinkOp(
type="remove_link", target=target, source=lt.source,
package=lt.package, needs_sudo=lt.needs_sudo,
))
return LinkPlan(
operations=ops,
conflicts=[],
summary=PlanSummary(added=0, removed=len(ops), unchanged=0, from_modules=0),
)
- Step 3: Run tests
Run: python -m pytest tests/test_domain_dotfiles_planning.py -v
Expected: All passed
- Step 4: Commit
git add src/flow/domain/dotfiles/planning.py tests/test_domain_dotfiles_planning.py
git commit -m "feat: add dotfiles link planning with conflict detection"
Chunk 3: Packages Domain
Task 12: Package models
Files:
-
Create:
src/flow/domain/packages/__init__.py(empty) -
Create:
src/flow/domain/packages/models.py -
Create:
tests/test_domain_packages_models.py -
Step 1: Write models
All frozen dataclasses as specified in spec Section 5.1: PackageDef, ProfilePackageRef, PkgInstallOp, PkgRemoveOp, PackagePlan, InstalledPackage, InstalledState.
InstalledState serialization format:
# as_dict() produces:
{
"version": 1,
"packages": {
"neovim": {
"version": "0.10.4",
"type": "binary",
"files": ["/home/x/.local/bin/nvim", "/home/x/.local/share/nvim"]
}
}
}
# from_dict() parses the above. Raises ConfigError on version mismatch.
- Step 2: Write tests
# tests/test_domain_packages_models.py
"""Tests for packages domain models."""
from pathlib import Path
import pytest
from flow.core.errors import ConfigError
from flow.domain.packages.models import InstalledPackage, InstalledState, PackageDef
def test_installed_state_roundtrip():
state = InstalledState(packages={
"neovim": InstalledPackage(
name="neovim", version="0.10.4", type="binary",
files=[Path("/home/x/.local/bin/nvim")],
),
})
data = state.as_dict()
restored = InstalledState.from_dict(data)
assert "neovim" in restored.packages
assert restored.packages["neovim"].version == "0.10.4"
assert restored.packages["neovim"].files == [Path("/home/x/.local/bin/nvim")]
def test_installed_state_empty():
state = InstalledState.from_dict({})
assert state.packages == {}
def test_installed_state_version_mismatch():
with pytest.raises(ConfigError):
InstalledState.from_dict({"version": 99, "packages": {}})
def test_package_def_fields():
pkg = PackageDef(
name="fd", type="pkg", sources={"apt": "fd-find"},
source=None, version=None, asset_pattern=None,
platform_map={}, extract_dir=None, install={},
post_install=None, allow_sudo=False,
)
assert pkg.name == "fd"
assert pkg.type == "pkg"
- Step 3: Run tests
Run: python -m pytest tests/test_domain_packages_models.py -v
Expected: All passed
- Step 4: Commit
git add src/flow/domain/packages/ tests/test_domain_packages_models.py
git commit -m "feat: add packages domain models"
Task 13: Package catalog and resolution
Files:
-
Create:
src/flow/domain/packages/catalog.py -
Create:
src/flow/domain/packages/resolution.py -
Create:
tests/test_domain_packages.py -
Step 1: Write tests covering:
-
parse_catalogwith list and dict manifest formats -
normalize_profile_entrywith string shorthand ("binary/neovim"), plain name, dict -
resolve_specmerging catalog and profile overrides -
resolve_source_namewith apt/brew/dnf fallbacks -
resolve_binary_assetwith platform map and asset pattern -
resolve_download_urlwith github shorthand and direct URL -
pm_update_commandandpm_install_commandfor apt/dnf/brew -
detect_package_managerreturning apt/dnf/None -
Step 2: Implement catalog.py and resolution.py
Implement the functions listed in spec Section 5.2 matching those exact signatures. Use src/flow/services/package_defs.py only as reference for resolution logic, not as source of truth -- the spec signatures are canonical. This is the single copy of all package resolution logic (no duplication with bootstrap).
- Step 3: Run tests
Run: python -m pytest tests/test_domain_packages.py -v
- Step 4: Commit
git add src/flow/domain/packages/ tests/test_domain_packages.py
git commit -m "feat: add packages catalog and resolution (single source of truth)"
Task 14: Package planning
Files:
-
Create:
src/flow/domain/packages/planning.py -
Create:
tests/test_domain_packages_planning.py -
Step 1: Write tests for
plan_installandplan_remove -
Step 2: Implement planning.py
-
Step 3: Run tests, commit
git commit -m "feat: add package install/remove planning"
Chunk 4: Bootstrap Domain
Task 15: Bootstrap models and setup modules
Files:
-
Create:
src/flow/domain/bootstrap/__init__.py -
Create:
src/flow/domain/bootstrap/models.py -
Create:
src/flow/domain/bootstrap/modules.py -
Create:
tests/test_domain_bootstrap_modules.py -
Step 1: Write models:
Profile,SetupModuleDef,BootstrapAction,BootstrapPlan -
Step 2: Write and test setup modules:
HostnameModule,LocaleModule,ShellModule,SSHKeygenModule,RuncmdModule. Each returns shell commands from.plan()and a human-readable string from.describe().
describe() examples (used in dry-run output):
HostnameModule.describe()->"Set hostname to my-host"LocaleModule.describe()->"Set locale to en_US.UTF-8"ShellModule.describe()->"Install and configure shell: zsh"SSHKeygenModule.describe()->"Generate 1 SSH key(s)"RuncmdModule.describe()->"Run 3 custom command(s)"
Add a test for each module asserting isinstance(module.describe(), str) and that .plan() returns a non-empty list of strings.
- Step 3: Run tests, commit
git commit -m "feat: add bootstrap models and setup modules"
Task 16: Bootstrap planning
Files:
-
Create:
src/flow/domain/bootstrap/planning.py -
Create:
tests/test_domain_bootstrap_planning.py -
Step 1: Write tests for
parse_profileandplan_bootstrap-- verify it produces correct ordered actions (validate env, setup modules, packages, shell, dotfiles link). -
Step 2: Implement --
plan_bootstrapraisesConfigErrorif required env vars are missing, then builds ordered action list using packages domain for install planning. -
Step 3: Run tests, commit
git commit -m "feat: add bootstrap planning (orchestrator over packages + dotfiles)"
Chunk 5: Remote + Containers + Projects Domains
Task 17: Remote domain
Files:
-
Create:
src/flow/domain/remote/__init__.py -
Create:
src/flow/domain/remote/models.py-- containsTargetandSSHCommandonly.TargetConfiglives incore/config.py(already created in Task 7). Import it from there. -
Create:
src/flow/domain/remote/resolution.py -
Create:
tests/test_domain_remote.py -
Step 1: Write tests for
parse_target,resolve_target,build_ssh_command,terminfo_fix_command -
Step 2: Implement models and resolution. Port from current
src/flow/services/ssh.py.resolve_targetreceiveslist[TargetConfig]imported fromflow.core.config. -
Step 3: Run tests, commit
git commit -m "feat: add remote domain (SSH target resolution)"
Task 18: Containers domain
Files:
-
Create:
src/flow/domain/containers/__init__.py -
Create:
src/flow/domain/containers/models.py -
Create:
src/flow/domain/containers/resolution.py -
Create:
tests/test_domain_containers.py -
Step 1: Write tests for
parse_image_ref,container_name,resolve_mounts,build_container_spec -
Step 2: Implement. Port from current
src/flow/services/containers.py. -
Step 3: Run tests, commit
git commit -m "feat: add containers domain (image resolution, mount computation)"
Chunk 6: Services Layer (Part 1)
Task 19: DotfilesService
Files:
-
Create:
src/flow/services/__init__.py -
Create:
src/flow/services/dotfiles.py -
Create:
tests/test_service_dotfiles.py -
Step 1: Write integration tests using
tmp_pathfor real filesystem +FakeRunnerfor git:test_link_creates_symlinks-- set up dotfiles dir, call.link(), verify symlinkstest_link_with_module-- set up package with_module.yamland cloned module dir, verify correct target pathstest_unlink_removes_symlinks-- link then unlink, verify cleaned uptest_link_dry_run_no_changes-- dry run does not create symlinkstest_status_shows_packages-- set up linked state, verify output
-
Step 2: Implement DotfilesService
The service performs all I/O:
_discover_packages: walks dotfiles dir, reads_module.yamlfiles, buildsPackageobjects withlocal_filespopulated- Calls pure domain functions for resolution and planning
- Executes the plan (create/remove symlinks via
FileSystem) - Persists
LinkedStateto JSON
- Step 3: Run tests, commit
git commit -m "feat: add DotfilesService with link/unlink/status/edit"
Task 20: PackageService
Files:
-
Create:
src/flow/services/packages.py -
Create:
tests/test_service_packages.py -
Step 1: Write tests for install, list, remove flows
-
Step 2: Implement PackageService -- uses domain planning, executes via runtime
-
Step 3: Run tests, commit
git commit -m "feat: add PackageService"
Task 21: BootstrapService
Files:
-
Create:
src/flow/services/bootstrap.py -
Create:
tests/test_service_bootstrap.py -
Step 1: Write tests -- bootstrap run with dry_run, show, list
-
Step 2: Implement -- uses domain planning, executes actions sequentially, delegates to PackageService and DotfilesService
-
Step 3: Run tests, commit
git commit -m "feat: add BootstrapService"
Chunk 7: Services Layer (Part 2)
Task 22: RemoteService
Files:
-
Create:
src/flow/services/remote.py -
Create:
tests/test_service_remote.py -
Step 1: Write tests -- enter dry_run, list targets, terminfo warning
-
Step 2: Implement
-
Step 3: Run tests, commit
git commit -m "feat: add RemoteService"
Task 23: ContainerService
Files:
-
Create:
src/flow/services/containers.py -
Create:
tests/test_service_containers.py -
Step 1: Write tests -- create, list, stop, remove with FakeRunner
-
Step 2: Implement
-
Step 3: Run tests, commit
git commit -m "feat: add ContainerService"
Task 24: ProjectService
Note: ProjectService has no domain layer -- it's a thin service that runs git commands directly. All logic is I/O-bound (listing dirs, running git). No separate domain module needed.
Files:
-
Create:
src/flow/services/projects.py -
Create:
tests/test_service_projects.py -
Step 1: Write tests
# tests/test_service_projects.py
"""Tests for ProjectService."""
def test_check_clean_repo(tmp_path):
"""Create a git repo at tmp_path/projects/myrepo, commit a file.
Call service.check(fetch=False). Verify output contains 'clean'."""
def test_check_uncommitted_changes(tmp_path):
"""Create repo, modify a tracked file without committing.
Verify 'uncommitted changes' in output."""
def test_check_no_git_repos(tmp_path):
"""Empty projects dir. Verify info message."""
def test_summary_shows_all_dirs(tmp_path):
"""Mix of git and non-git dirs. Verify table output."""
-
Step 2: Implement --
check()iterates dirs inprojects_dir, runs git status/diff/rev-list per repo.fetch()runsgit fetch --all.summary()ischeck(fetch=False). Useself.ctx.runtime.gitfor all git calls,self.ctx.console.tablefor output. -
Step 3: Run tests, commit
git commit -m "feat: add ProjectService"
Chunk 8: Commands + CLI
Task 25: CLI entry point + context validation
Files:
-
Create:
src/flow/cli.py(rewrite) -
Create:
tests/test_cli.py -
Step 1: Write tests -- non-root check, context validation (remote blocked in VM), version flag, dry_run passed through
-
Step 2: Implement cli.py per spec Section 10
-
Step 3: Run tests, commit
git commit -m "feat: add CLI entry point with context awareness"
Task 26: Command modules
Files:
- Create:
src/flow/commands/remote.py - Create:
src/flow/commands/dev.py - Create:
src/flow/commands/dotfiles.py - Create:
src/flow/commands/setup.py - Create:
src/flow/commands/packages.py - Create:
src/flow/commands/projects.py - Create:
src/flow/commands/__init__.py
Each command module: register argparse subcommands, handler functions that construct the service and call one method.
- Step 1: Implement all command modules (each is 30-60 lines)
- Step 2: Write integration test that runs
flow --help,flow dotfiles --help, etc. - Step 3: Commit
git commit -m "feat: add command modules (thin CLI adapters)"
Task 27: Zsh completion
Files:
-
Create:
src/flow/commands/completion.py -
Create:
tests/test_completion.py -
Step 1: Implement -- port and adapt from current completion.py with updated command names
-
Step 2: Write tests for
complete()function with the new command surface -
Step 3: Commit
git commit -m "feat: add zsh completion with updated command surface"
Chunk 9: Cleanup + Integration
Task 28: Delete old code
- Step 1: Remove old files
# Remove old service/command/core files that have been replaced
rm -f src/flow/core/action.py
rm -f src/flow/core/stow.py
rm -f src/flow/core/process.py
# Old core files replaced by new ones:
# system.py -> runtime.py (already handled)
# config.py, console.py, platform.py, paths.py, errors.py -> rewritten in place
# variables.py -> template.py
# Remove old command modules
rm -rf src/flow/commands/ # Already replaced by new commands/
# Remove old services
rm -rf src/flow/services/ # Already replaced by new services/
# Remove old tests
rm -f tests/test_action.py tests/test_stow.py tests/test_commands.py
rm -f tests/test_dotfiles_folding.py tests/test_self_hosting.py
- Step 2: Run full test suite
Run: python -m pytest tests/ -v
Expected: All new tests pass, no imports of deleted modules
- Step 3: Update pyproject.toml entry point if needed
Verify [project.scripts] flow = "flow.cli:main" still works.
- Step 4: Test binary build
make build
./dist/flow --help
./dist/flow --version
- Step 5: Commit
git add -A
git commit -m "chore: remove old code, complete rewrite"
Task 29: Update README
-
Step 1: Rewrite README.md to match the new command surface, config format, and architecture.
-
Step 2: Commit
git add README.md
git commit -m "docs: update README for new architecture"