fix(dotfiles): make symlink handling robust and fix _module checkout

- Atomic state writes (tempfile + os.replace) so a crash mid-write cannot
  corrupt linked.json.
- Managed-symlink guards in FileSystem.create_symlink and the new
  remove_symlink: refuse to overwrite or delete a path unless it is
  absent or already a symlink pointing to the expected source. Stops
  silent user-file deletion in the plan/apply race window.
- plan_link adopts orphan symlinks whose readlink already matches the
  desired source, so a partial-apply failure can be recovered by rerun.
- _load_state warns loudly on each stale entry it drops, and status()
  no longer rewrites linked.json as a side effect of read.
- _apply_plan dispatches exhaustively; unknown LinkOp types raise.
- Remove _checkout_module_ref early-return for branch == "main" -- it
  assumed the remote default was main, breaking master-default repos.
  Always run the explicit checkout (idempotent).
- Warn when a module's cache_dir is absent during link, suggesting
  flow dotfiles repos pull.
- LinkOp.type and ModuleRef.ref_type tightened to Literal[...]; dead
  "create_dir" enum value removed from the model.

Tests: +29 covering atomic writes, overwrite guards, remove_symlink
semantics, orphan adoption (match/mismatch), partial-failure rerun,
status read-only, branch/tag/commit checkout argv, uncloned-module
warning, stale-state warnings.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-14 00:01:46 +03:00
parent 78f95bc88e
commit c0e2758057
7 changed files with 582 additions and 32 deletions

View File

@@ -1,8 +1,13 @@
"""Tests for flow.core.runtime."""
import json
import os
from pathlib import Path
import pytest
from flow.core.containers import ContainerRuntime
from flow.core.errors import FlowError
from flow.core.runtime import CommandRunner, FileSystem, GitClient, SystemRuntime
from flow.core.tmux import TmuxClient
@@ -77,6 +82,128 @@ class TestFileSystem:
fs.copy_file(src, dst)
assert dst.read_text() == "data"
def test_write_json_atomic_leaves_no_tmp_residue(self, tmp_path):
fs = FileSystem()
path = tmp_path / "state" / "data.json"
fs.write_json(path, {"a": 1})
assert path.read_text()
assert json.loads(path.read_text()) == {"a": 1}
residue = list(path.parent.glob("*.tmp"))
assert residue == []
def test_write_json_overwrites_stale_tmp(self, tmp_path):
fs = FileSystem()
path = tmp_path / "data.json"
stale = path.with_suffix(path.suffix + ".tmp")
stale.write_text("garbage")
fs.write_json(path, {"k": "v"})
assert json.loads(path.read_text()) == {"k": "v"}
assert not stale.exists()
def test_remove_tree_raises_when_missing(self, tmp_path):
fs = FileSystem()
with pytest.raises(FileNotFoundError):
fs.remove_tree(tmp_path / "absent")
def test_remove_tree_missing_ok(self, tmp_path):
fs = FileSystem()
fs.remove_tree(tmp_path / "absent", missing_ok=True) # no error
def test_remove_tree_raises_on_permission_error(self, tmp_path):
fs = FileSystem()
# Create a parent dir we can't traverse, holding a child directory.
# If we can't actually drop permissions (e.g. running as root), skip.
if os.geteuid() == 0:
pytest.skip("Running as root: cannot simulate permission error")
parent = tmp_path / "locked"
parent.mkdir()
(parent / "child").mkdir()
try:
parent.chmod(0o500) # r-x: can list but not modify
with pytest.raises(OSError):
fs.remove_tree(parent / "child")
finally:
parent.chmod(0o700)
def test_create_symlink_refuses_real_file(self, tmp_path):
fs = FileSystem()
source = tmp_path / "source"
source.write_text("src")
target = tmp_path / "target"
target.write_text("user content")
with pytest.raises(FlowError, match="Refusing to overwrite"):
fs.create_symlink(source, target)
# Target untouched.
assert target.read_text() == "user content"
def test_create_symlink_refuses_foreign_symlink(self, tmp_path):
fs = FileSystem()
source = tmp_path / "source"
source.write_text("src")
elsewhere = tmp_path / "elsewhere"
elsewhere.write_text("other")
target = tmp_path / "target"
target.symlink_to(elsewhere)
with pytest.raises(FlowError, match="Refusing to overwrite"):
fs.create_symlink(source, target)
# Foreign symlink preserved.
assert target.readlink() == elsewhere
def test_create_symlink_when_target_absent(self, tmp_path):
fs = FileSystem()
source = tmp_path / "source"
source.write_text("src")
target = tmp_path / "sub" / "target"
fs.create_symlink(source, target)
assert target.is_symlink()
assert target.readlink() == source
def test_create_symlink_idempotent_overwrite(self, tmp_path):
fs = FileSystem()
source = tmp_path / "source"
source.write_text("src")
target = tmp_path / "target"
target.symlink_to(source)
fs.create_symlink(source, target) # must not raise
assert target.is_symlink()
assert target.readlink() == source
def test_remove_symlink_with_matching_expected(self, tmp_path):
fs = FileSystem()
source = tmp_path / "source"
source.write_text("src")
target = tmp_path / "target"
target.symlink_to(source)
fs.remove_symlink(target, expected_source=source)
assert not target.exists()
assert not target.is_symlink()
def test_remove_symlink_refuses_regular_file(self, tmp_path):
fs = FileSystem()
target = tmp_path / "target"
target.write_text("real file")
with pytest.raises(FlowError, match="Refusing to remove non-symlink"):
fs.remove_symlink(target)
assert target.read_text() == "real file"
def test_remove_symlink_refuses_mismatched_expected(self, tmp_path):
fs = FileSystem()
source = tmp_path / "source"
source.write_text("src")
elsewhere = tmp_path / "elsewhere"
elsewhere.write_text("other")
target = tmp_path / "target"
target.symlink_to(elsewhere)
with pytest.raises(FlowError, match="Refusing to remove symlink"):
fs.remove_symlink(target, expected_source=source)
# Untouched.
assert target.is_symlink()
assert target.readlink() == elsewhere
def test_remove_symlink_absent_is_noop(self, tmp_path):
fs = FileSystem()
fs.remove_symlink(tmp_path / "missing") # no error
class TestCommandRunner:
def test_run_echo(self):

View File

@@ -28,10 +28,15 @@ def _fs_check_file(path: Path) -> Optional[str]:
return "file"
def _no_link_target(path: Path) -> Optional[Path]:
"""Default fake link_target: no symlink anywhere."""
return None
class TestPlanLink:
def test_new_target_creates_link(self):
desired = [_lt("/home/x/.zshrc")]
plan = plan_link(desired, LinkedState(), _fs_check_none)
plan = plan_link(desired, LinkedState(), _fs_check_none, _no_link_target)
assert len(plan.operations) == 1
assert plan.operations[0].type == "create_link"
assert plan.summary.added == 1
@@ -39,14 +44,14 @@ class TestPlanLink:
def test_existing_correct_link_unchanged(self):
lt = _lt("/home/x/.zshrc")
current = LinkedState(links={Path("/home/x/.zshrc"): lt})
plan = plan_link([lt], current, _fs_check_none)
plan = plan_link([lt], current, _fs_check_none, _no_link_target)
assert len(plan.operations) == 0
assert plan.summary.unchanged == 1
def test_stale_link_removed(self):
old = _lt("/home/x/.old")
current = LinkedState(links={Path("/home/x/.old"): old})
plan = plan_link([], current, _fs_check_none)
plan = plan_link([], current, _fs_check_none, _no_link_target)
assert len(plan.operations) == 1
assert plan.operations[0].type == "remove_link"
assert plan.summary.removed == 1
@@ -55,7 +60,7 @@ class TestPlanLink:
old = _lt("/home/x/.zshrc", source="/old")
new = _lt("/home/x/.zshrc", source="/new")
current = LinkedState(links={Path("/home/x/.zshrc"): old})
plan = plan_link([new], current, _fs_check_none)
plan = plan_link([new], current, _fs_check_none, _no_link_target)
types = [op.type for op in plan.operations]
assert types == ["remove_link", "create_link"]
assert plan.summary.updated == 1
@@ -64,25 +69,71 @@ class TestPlanLink:
def test_unmanaged_file_at_target_is_conflict(self):
desired = [_lt("/home/x/.zshrc")]
plan = plan_link(desired, LinkedState(), _fs_check_file)
plan = plan_link(desired, LinkedState(), _fs_check_file, _no_link_target)
assert len(plan.conflicts) == 1
assert ".zshrc" in plan.conflicts[0]
def test_module_targets_counted(self):
desired = [_lt("/home/x/.config/nvim/init.lua", module=True)]
plan = plan_link(desired, LinkedState(), _fs_check_none)
plan = plan_link(desired, LinkedState(), _fs_check_none, _no_link_target)
assert plan.summary.from_modules == 1
def test_broken_symlink_is_repaired(self):
lt = _lt("/home/x/.zshrc")
current = LinkedState(links={Path("/home/x/.zshrc"): lt})
plan = plan_link([lt], current, lambda p: "broken_symlink")
plan = plan_link(
[lt], current, lambda p: "broken_symlink", _no_link_target,
)
types = [op.type for op in plan.operations]
assert types == ["remove_link", "create_link"]
assert plan.summary.updated == 1
assert plan.summary.unchanged == 0
def test_orphan_symlink_matching_source_is_adopted(self):
"""Untracked symlink that already points at the desired source counts
as unchanged, not a conflict. Enables partial-failure rerun."""
desired = [_lt("/home/x/.zshrc", source="/dots/.zshrc")]
plan = plan_link(
desired,
LinkedState(),
lambda p: "symlink",
lambda p: Path("/dots/.zshrc"),
)
assert plan.conflicts == ()
assert plan.operations == ()
assert plan.summary.unchanged == 1
assert plan.summary.added == 0
def test_orphan_symlink_pointing_elsewhere_is_conflict(self):
"""Untracked symlink that points at the wrong source is still a
conflict -- we don't silently retarget unmanaged links."""
desired = [_lt("/home/x/.zshrc", source="/dots/.zshrc")]
plan = plan_link(
desired,
LinkedState(),
lambda p: "symlink",
lambda p: Path("/elsewhere/.zshrc"),
)
assert len(plan.conflicts) == 1
assert plan.summary.added == 0
assert plan.summary.unchanged == 0
def test_orphan_module_symlink_counts_as_module(self):
"""Adopted orphan symlinks for module files still count toward
from_modules in the summary."""
desired = [
_lt("/home/x/.config/nvim/init.lua", source="/cache/init.lua", module=True),
]
plan = plan_link(
desired,
LinkedState(),
lambda p: "symlink",
lambda p: Path("/cache/init.lua"),
)
assert plan.summary.unchanged == 1
assert plan.summary.from_modules == 1
class TestPlanUnlink:
def test_unlink_all(self):

View File

@@ -1,11 +1,15 @@
"""Tests for DotfilesService."""
import json
import time
from pathlib import Path
import pytest
import yaml
from flow.core.config import AppConfig, FlowContext
from flow.core.console import Console
from flow.core.errors import FlowError
from flow.core.platform import PlatformInfo
from flow.core.runtime import SystemRuntime
from flow.core import paths
@@ -13,6 +17,18 @@ from flow.services.dotfiles import DotfilesService
from tests.fakes import FakeRunner
class _CapturingConsole(Console):
"""Console that records every warn() call for assertions."""
def __init__(self):
super().__init__(color=False)
self.warnings: list[str] = []
def warn(self, msg: str) -> None:
self.warnings.append(msg)
super().warn(msg)
def _make_ctx(tmp_path, console=None):
"""Build a FlowContext for testing."""
return FlowContext(
@@ -433,3 +449,247 @@ class TestDotfilesServiceLink:
svc.link()
assert (home / ".zshrc").is_symlink()
assert (home / ".zshrc").resolve() == real_target.resolve()
def _setup_module_pkg(tmp_path, *, ref_key: str, ref_value: str, profile: str = "_shared"):
"""Build a dotfiles tree with one module package and its module cache."""
dotfiles = tmp_path / "dotfiles"
modules = tmp_path / "modules"
pkg_dir = dotfiles / profile / "nvim"
config_dir = pkg_dir / ".config" / "nvim"
config_dir.mkdir(parents=True)
(config_dir / "_module.yaml").write_text(yaml.dump({
"source": "github:test/nvim-config",
"ref": {ref_key: ref_value},
}))
return dotfiles, modules
def _make_runner_ctx(tmp_path, home, dotfiles, modules, monkeypatch, console=None):
"""Build a FlowContext with a FakeRunner and the path monkeypatches set."""
monkeypatch.setattr(paths, "HOME", home)
monkeypatch.setattr(paths, "DOTFILES_DIR", dotfiles)
monkeypatch.setattr(paths, "MODULES_DIR", modules)
monkeypatch.setattr(paths, "LINKED_STATE", tmp_path / "state" / "linked.json")
runtime = SystemRuntime()
runner = FakeRunner()
runtime.runner = runner
runtime.git.runner = runner
ctx = FlowContext(
config=AppConfig(),
manifest={},
platform=PlatformInfo(),
console=console or Console(color=False),
runtime=runtime,
)
return ctx, runner
class TestCheckoutModuleRef:
"""The branch == 'main' early-return is gone: every ref runs git checkout."""
def test_branch_main_still_runs_checkout(self, tmp_path, monkeypatch):
home = tmp_path / "home"
home.mkdir()
dotfiles, modules = _setup_module_pkg(tmp_path, ref_key="branch", ref_value="main")
ctx, runner = _make_runner_ctx(tmp_path, home, dotfiles, modules, monkeypatch)
DotfilesService(ctx).repos_pull()
# Should have cloned (cache missing) and then checked out 'main'.
clone_calls = [c for c in runner.calls if "clone" in c]
checkout_calls = [c for c in runner.calls if "checkout" in c]
assert clone_calls, f"expected git clone, calls: {runner.calls}"
assert checkout_calls, f"expected git checkout, calls: {runner.calls}"
assert any("main" in c for c in checkout_calls)
def test_tag_ref_uses_tags_prefix(self, tmp_path, monkeypatch):
home = tmp_path / "home"
home.mkdir()
dotfiles, modules = _setup_module_pkg(tmp_path, ref_key="tag", ref_value="v1.0")
ctx, runner = _make_runner_ctx(tmp_path, home, dotfiles, modules, monkeypatch)
DotfilesService(ctx).repos_pull()
checkout_calls = [c for c in runner.calls if "checkout" in c]
assert checkout_calls
# tags/v1.0 form -- detached checkout.
assert any("tags/v1.0" in arg for c in checkout_calls for arg in c)
def test_commit_ref_uses_raw_sha(self, tmp_path, monkeypatch):
home = tmp_path / "home"
home.mkdir()
sha = "deadbeefcafe1234567890abcdef1234567890ab"
dotfiles, modules = _setup_module_pkg(tmp_path, ref_key="commit", ref_value=sha)
ctx, runner = _make_runner_ctx(tmp_path, home, dotfiles, modules, monkeypatch)
DotfilesService(ctx).repos_pull()
checkout_calls = [c for c in runner.calls if "checkout" in c]
assert checkout_calls
# No tags/ prefix on commit refs.
assert any(sha in arg for c in checkout_calls for arg in c)
assert not any(f"tags/{sha}" in arg for c in checkout_calls for arg in c)
class TestStaleStateReconciliation:
"""_load_state warns on stale entries and only persists when invoked
from a mutating path."""
def _populate_stale_state(self, tmp_path):
"""Write a linked.json that points at a target with no symlink."""
state_path = tmp_path / "state" / "linked.json"
state_path.parent.mkdir(parents=True)
state_path.write_text(json.dumps({
"version": 2,
"links": {
"_shared/zsh": {
str(tmp_path / "home" / ".zshrc"): {
"source": str(tmp_path / "dotfiles" / "_shared" / "zsh" / ".zshrc"),
"from_module": False,
"needs_sudo": False,
}
}
},
}))
return state_path
def test_load_state_warns_on_stale(self, tmp_path, monkeypatch):
home = tmp_path / "home"
home.mkdir()
dotfiles = _setup_dotfiles(tmp_path, {"zsh": {".zshrc": "# zsh"}})
monkeypatch.setattr(paths, "HOME", home)
monkeypatch.setattr(paths, "DOTFILES_DIR", dotfiles)
monkeypatch.setattr(paths, "MODULES_DIR", tmp_path / "modules")
state_path = self._populate_stale_state(tmp_path)
monkeypatch.setattr(paths, "LINKED_STATE", state_path)
console = _CapturingConsole()
ctx = _make_ctx(tmp_path, console=console)
svc = DotfilesService(ctx)
state = svc._load_state()
assert state.links == {} # stale entry dropped
assert any("stale link record" in w for w in console.warnings)
def test_status_does_not_rewrite_state(self, tmp_path, monkeypatch):
home = tmp_path / "home"
home.mkdir()
dotfiles = _setup_dotfiles(tmp_path, {"zsh": {".zshrc": "# zsh"}})
monkeypatch.setattr(paths, "HOME", home)
monkeypatch.setattr(paths, "DOTFILES_DIR", dotfiles)
monkeypatch.setattr(paths, "MODULES_DIR", tmp_path / "modules")
state_path = self._populate_stale_state(tmp_path)
monkeypatch.setattr(paths, "LINKED_STATE", state_path)
before_mtime = state_path.stat().st_mtime_ns
before_content = state_path.read_text()
# Sleep just enough that mtime would change if we rewrote.
time.sleep(0.01)
console = _CapturingConsole()
ctx = _make_ctx(tmp_path, console=console)
DotfilesService(ctx).status()
assert state_path.stat().st_mtime_ns == before_mtime
assert state_path.read_text() == before_content
# Still warned about the stale entry.
assert any("stale link record" in w for w in console.warnings)
class TestUnclonedModuleWarning:
def test_link_warns_when_module_cache_missing(self, tmp_path, monkeypatch):
home = tmp_path / "home"
home.mkdir()
dotfiles, modules = _setup_module_pkg(tmp_path, ref_key="branch", ref_value="main")
# No module cache: the module dir does not exist.
monkeypatch.setattr(paths, "HOME", home)
monkeypatch.setattr(paths, "DOTFILES_DIR", dotfiles)
monkeypatch.setattr(paths, "MODULES_DIR", modules)
monkeypatch.setattr(paths, "LINKED_STATE", tmp_path / "state" / "linked.json")
console = _CapturingConsole()
ctx = _make_ctx(tmp_path, console=console)
svc = DotfilesService(ctx)
svc.link() # must not crash
assert any(
"not cloned" in w and "repos pull" in w for w in console.warnings
), console.warnings
class TestOrphanAdoption:
"""After a partial-failure rerun the planner adopts pre-existing matching
symlinks. We simulate a failure during _apply_plan by replacing
create_symlink mid-flight, then re-running link()."""
def test_partial_apply_failure_recoverable_via_rerun(self, tmp_path, monkeypatch):
home = tmp_path / "home"
home.mkdir()
dotfiles = _setup_dotfiles(tmp_path, {
"zsh": {".zshrc": "# zsh"},
"git": {".gitconfig": "[user]\n name = t"},
})
monkeypatch.setattr(paths, "HOME", home)
monkeypatch.setattr(paths, "DOTFILES_DIR", dotfiles)
monkeypatch.setattr(paths, "MODULES_DIR", tmp_path / "modules")
state_path = tmp_path / "state" / "linked.json"
monkeypatch.setattr(paths, "LINKED_STATE", state_path)
ctx = _make_ctx(tmp_path)
svc = DotfilesService(ctx)
# Patch create_symlink to fail on the SECOND call so the first
# symlink lands on disk but the state is not persisted.
real_create = ctx.runtime.fs.create_symlink
calls = {"n": 0}
def flaky_create(source, target, **kw):
calls["n"] += 1
if calls["n"] >= 2:
raise FlowError("simulated mid-apply failure")
return real_create(source, target, **kw)
monkeypatch.setattr(ctx.runtime.fs, "create_symlink", flaky_create)
with pytest.raises(FlowError, match="simulated"):
svc.link()
# State file should NOT have been written (atomic semantics: we
# only persist when _apply_plan completes).
assert not state_path.exists()
# First symlink landed on disk.
existing_links = sorted(
p.name for p in home.rglob("*") if p.is_symlink()
)
assert len(existing_links) == 1
# Restore real implementation and re-run: orphan adoption kicks in.
monkeypatch.setattr(ctx.runtime.fs, "create_symlink", real_create)
svc.link()
assert (home / ".zshrc").is_symlink()
assert (home / ".gitconfig").is_symlink()
assert state_path.exists()
class TestStatePersistsAtomically:
def test_save_state_uses_tmp_file_atomically(self, tmp_path, monkeypatch):
home = tmp_path / "home"
home.mkdir()
dotfiles = _setup_dotfiles(tmp_path, {"zsh": {".zshrc": "# zsh"}})
monkeypatch.setattr(paths, "HOME", home)
monkeypatch.setattr(paths, "DOTFILES_DIR", dotfiles)
monkeypatch.setattr(paths, "MODULES_DIR", tmp_path / "modules")
state_path = tmp_path / "state" / "linked.json"
monkeypatch.setattr(paths, "LINKED_STATE", state_path)
ctx = _make_ctx(tmp_path)
DotfilesService(ctx).link()
# No leftover tmp file.
residue = list(state_path.parent.glob("*.tmp"))
assert residue == []
# Final content is valid JSON.
json.loads(state_path.read_text())