From 24d682adf1393dab162e93a7431127abf2bb8ca5 Mon Sep 17 00:00:00 2001 From: Tomas Mirchev Date: Wed, 25 Feb 2026 17:20:43 +0200 Subject: [PATCH] new --- README.md | 2 + example/README.md | 1 + src/flow/cli.py | 17 -- src/flow/commands/completion.py | 7 +- src/flow/commands/dotfiles.py | 328 ++++++++++++++++++++++++--- tests/test_cli.py | 16 ++ tests/test_completion.py | 5 + tests/test_dotfiles_e2e_container.py | 276 ++++++++++++++++++++++ tests/test_dotfiles_folding.py | 278 +++++++++++++++++++++++ 9 files changed, 877 insertions(+), 53 deletions(-) create mode 100644 tests/test_dotfiles_e2e_container.py diff --git a/README.md b/README.md index e58bdf3..2a00e22 100644 --- a/README.md +++ b/README.md @@ -177,6 +177,7 @@ flow dev create api -i tm0/node -p ~/projects/api flow dotfiles init --repo git@github.com:you/dotfiles.git flow dotfiles link --profile linux-auto +flow dotfiles undo flow dotfiles status flow dotfiles modules list flow dotfiles modules sync @@ -198,4 +199,5 @@ flow completion install-zsh python3 -m venv .venv .venv/bin/pip install -e ".[dev]" python3 -m pytest +FLOW_RUN_E2E_CONTAINER=1 .venv/bin/pytest -q tests/test_dotfiles_e2e_container.py ``` diff --git a/example/README.md b/example/README.md index 883f844..e41f730 100644 --- a/example/README.md +++ b/example/README.md @@ -35,6 +35,7 @@ Initialize and link dotfiles: ```bash flow dotfiles init --repo "$EXAMPLE_REPO" flow dotfiles link --profile linux-auto +flow dotfiles undo flow dotfiles status ``` diff --git a/src/flow/cli.py b/src/flow/cli.py index 4811bba..966a753 100644 --- a/src/flow/cli.py +++ b/src/flow/cli.py @@ -2,7 +2,6 @@ import argparse import os -import shutil import subprocess import sys @@ -22,21 +21,6 @@ def _ensure_non_root(console: ConsoleLogger) -> None: sys.exit(1) -def _refresh_sudo_credentials(console: ConsoleLogger) -> None: - if os.environ.get("FLOW_SKIP_SUDO_REFRESH") == "1": - return - - if not shutil.which("sudo"): - console.error("sudo is required but was not found in PATH") - sys.exit(1) - - try: - subprocess.run(["sudo", "-v"], check=True) - except subprocess.CalledProcessError: - console.error("Failed to refresh sudo credentials") - sys.exit(1) - - def main(): parser = argparse.ArgumentParser( prog="flow", @@ -69,7 +53,6 @@ def main(): return ensure_dirs() - _refresh_sudo_credentials(console) try: platform_info = detect_platform() diff --git a/src/flow/commands/completion.py b/src/flow/commands/completion.py index 26ebe42..1a4bd80 100644 --- a/src/flow/commands/completion.py +++ b/src/flow/commands/completion.py @@ -316,7 +316,7 @@ def _complete_dotfiles(before: Sequence[str], current: str) -> List[str]: if current.startswith("-"): return _filter(["--verbose", "-h", "--help"], current) return _filter( - ["init", "link", "unlink", "status", "sync", "relink", "clean", "edit", "repo", "modules"], + ["init", "link", "unlink", "undo", "status", "sync", "relink", "clean", "edit", "repo", "modules"], current, ) @@ -325,7 +325,7 @@ def _complete_dotfiles(before: Sequence[str], current: str) -> List[str]: if current.startswith("-"): return _filter(["-h", "--help"], current) return _filter( - ["init", "link", "unlink", "status", "sync", "relink", "clean", "edit", "repo", "modules"], + ["init", "link", "unlink", "undo", "status", "sync", "relink", "clean", "edit", "repo", "modules"], current, ) before = [before[0]] + list(before[2:]) @@ -384,6 +384,9 @@ def _complete_dotfiles(before: Sequence[str], current: str) -> List[str]: return _filter(["-h", "--help"], current) return _filter(_list_dotfiles_packages(), current) + if sub == "undo": + return _filter(["-h", "--help"], current) if current.startswith("-") else [] + if sub == "edit": if current.startswith("-"): return _filter(["--no-commit", "-h", "--help"], current) diff --git a/src/flow/commands/dotfiles.py b/src/flow/commands/dotfiles.py index 2addaa6..791d1f7 100644 --- a/src/flow/commands/dotfiles.py +++ b/src/flow/commands/dotfiles.py @@ -8,8 +8,9 @@ import shutil import subprocess import sys from dataclasses import dataclass +from datetime import datetime, timezone from pathlib import Path -from typing import Dict, List, Optional, Set +from typing import Any, Dict, List, Optional, Set, Tuple import yaml @@ -19,6 +20,7 @@ from flow.core.paths import DOTFILES_DIR, LINKED_STATE, MODULES_DIR RESERVED_SHARED = "_shared" RESERVED_ROOT = "_root" MODULE_FILE = "_module.yaml" +LINK_BACKUP_DIR = LINKED_STATE.parent / "link-backups" @dataclass @@ -59,6 +61,9 @@ def register(subparsers): unlink.add_argument("packages", nargs="*", help="Specific packages to unlink (default: all)") unlink.set_defaults(handler=run_unlink) + undo = sub.add_parser("undo", help="Undo latest dotfiles link transaction") + undo.set_defaults(handler=run_undo) + status = sub.add_parser("status", help="Show dotfiles link status") status.set_defaults(handler=run_status) @@ -451,9 +456,7 @@ def _save_state(state: dict) -> None: json.dump(state, handle, indent=2) -def _load_link_specs_from_state() -> Dict[Path, LinkSpec]: - state = _load_state() - links = state.get("links", {}) +def _parse_link_specs(links: Any) -> Dict[Path, LinkSpec]: if not isinstance(links, dict): raise RuntimeError("Unsupported linked state format. Remove linked.json and relink dotfiles.") @@ -479,15 +482,145 @@ def _load_link_specs_from_state() -> Dict[Path, LinkSpec]: return resolved -def _save_link_specs_to_state(specs: Dict[Path, LinkSpec]) -> None: +def _serialize_link_specs(specs: Dict[Path, LinkSpec]) -> Dict[str, Dict[str, dict]]: grouped: Dict[str, Dict[str, dict]] = {} for spec in sorted(specs.values(), key=lambda s: str(s.target)): grouped.setdefault(spec.package, {})[str(spec.target)] = { "source": str(spec.source), "is_directory_link": spec.is_directory_link, } + return grouped - _save_state({"version": 2, "links": grouped}) + +def _cleanup_link_transaction_files(transaction: Optional[dict]) -> None: + if not isinstance(transaction, dict): + return + + backup_dir = transaction.get("backup_dir") + if isinstance(backup_dir, str) and backup_dir: + shutil.rmtree(Path(backup_dir), ignore_errors=True) + + +def _load_last_link_transaction() -> Optional[dict]: + state = _load_state() + transaction = state.get("last_transaction") + if not isinstance(transaction, dict): + return None + return transaction + + +def _save_last_link_transaction(transaction: dict) -> None: + state = _load_state() + previous = state.get("last_transaction") + if isinstance(previous, dict): + _cleanup_link_transaction_files(previous) + state["last_transaction"] = transaction + _save_state(state) + + +def _clear_last_link_transaction(*, remove_backups: bool = True) -> None: + state = _load_state() + transaction = state.get("last_transaction") + if remove_backups and isinstance(transaction, dict): + _cleanup_link_transaction_files(transaction) + state.pop("last_transaction", None) + _save_state(state) + + +def _start_link_transaction(previous_links: Dict[Path, LinkSpec]) -> dict: + now = datetime.now(timezone.utc) + tx_id = now.strftime("%Y%m%dT%H%M%S%fZ") + backup_dir = LINK_BACKUP_DIR / tx_id + return { + "id": tx_id, + "created_at": now.isoformat(), + "backup_dir": str(backup_dir), + "previous_links": _serialize_link_specs(previous_links), + "targets": [], + } + + +def _snapshot_target( + target: Path, + *, + use_sudo: bool, + backup_dir: Path, + index: int, +) -> dict: + if target.is_symlink(): + return {"kind": "symlink", "source": os.readlink(target)} + + if target.exists(): + if target.is_dir(): + raise RuntimeError(f"Cannot snapshot directory target: {target}") + + backup_dir.mkdir(parents=True, exist_ok=True) + backup_path = backup_dir / f"{index:06d}" + if use_sudo: + _run_sudo(["cp", "-a", str(target), str(backup_path)], dry_run=False) + else: + shutil.copy2(target, backup_path) + return {"kind": "file", "backup": str(backup_path)} + + return {"kind": "missing"} + + +def _restore_target_snapshot(target: Path, snapshot: dict) -> None: + if not isinstance(snapshot, dict): + raise RuntimeError(f"Unsupported transaction snapshot for {target}") + + use_sudo = not _is_in_home(target, Path.home()) + + if target.exists() or target.is_symlink(): + if target.is_dir() and not target.is_symlink(): + raise RuntimeError(f"Cannot restore {target}; a directory now exists at that path") + _remove_target(target, use_sudo=use_sudo, dry_run=False) + + kind = snapshot.get("kind") + if kind == "missing": + return + + if kind == "symlink": + source = snapshot.get("source") + if not isinstance(source, str): + raise RuntimeError(f"Unsupported transaction snapshot for {target}") + if use_sudo: + _run_sudo(["mkdir", "-p", str(target.parent)], dry_run=False) + _run_sudo(["ln", "-sfn", source, str(target)], dry_run=False) + else: + target.parent.mkdir(parents=True, exist_ok=True) + target.symlink_to(source) + return + + if kind == "file": + backup = snapshot.get("backup") + if not isinstance(backup, str): + raise RuntimeError(f"Unsupported transaction snapshot for {target}") + backup_path = Path(backup) + if not backup_path.exists(): + raise RuntimeError(f"Backup missing for {target}: {backup_path}") + if use_sudo: + _run_sudo(["mkdir", "-p", str(target.parent)], dry_run=False) + _run_sudo(["cp", "-a", str(backup_path), str(target)], dry_run=False) + else: + target.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(backup_path, target) + return + + raise RuntimeError(f"Unsupported transaction snapshot kind for {target}: {kind}") + + +def _load_link_specs_from_state() -> Dict[Path, LinkSpec]: + state = _load_state() + links = state.get("links", {}) + return _parse_link_specs(links) + + +def _save_link_specs_to_state(specs: Dict[Path, LinkSpec]) -> None: + state = _load_state() + state["version"] = 2 + state["links"] = _serialize_link_specs(specs) + _save_state(state) def _list_profiles(flow_dir: Path) -> List[str]: @@ -684,6 +817,8 @@ def _run_sudo(cmd: List[str], *, dry_run: bool = False) -> None: if dry_run: print(" " + " ".join(shlex.quote(part) for part in (["sudo"] + cmd))) return + if shutil.which("sudo") is None: + raise RuntimeError("sudo is required for root-targeted dotfiles, but it was not found in PATH") subprocess.run(["sudo"] + cmd, check=True) @@ -788,9 +923,25 @@ def _collect_home_specs( def _validate_conflicts( desired: Dict[Path, LinkSpec], current: Dict[Path, LinkSpec], - force: bool, -) -> List[str]: - conflicts: List[str] = [] +) -> tuple[List[str], List[str]]: + force_required: List[str] = [] + fatal: List[str] = [] + + # Validate removals for targets currently tracked in state. + # If a managed path was changed on disk (regular file or different symlink), + # require --force before deleting it. + for target, spec in current.items(): + if target in desired: + continue + if not (target.exists() or target.is_symlink()): + continue + if _same_symlink(target, spec.source): + continue + if target.is_dir() and not target.is_symlink(): + fatal.append(f"Conflict: {target} is a directory and cannot be overwritten") + continue + force_required.append(f"Conflict: {target} differs from managed link and would be removed") + for target, spec in desired.items(): if not (target.exists() or target.is_symlink()): continue @@ -799,16 +950,23 @@ def _validate_conflicts( continue if target in current: + current_spec = current[target] + if _same_symlink(target, current_spec.source): + # Existing managed link can be replaced by desired link. + continue + if target.is_dir() and not target.is_symlink(): + fatal.append(f"Conflict: {target} is a directory and cannot be overwritten") + continue + force_required.append(f"Conflict: {target} differs from managed link and would be replaced") continue if target.is_dir() and not target.is_symlink(): - conflicts.append(f"Conflict: {target} is a directory") + fatal.append(f"Conflict: {target} is a directory and cannot be overwritten") continue - if not force: - conflicts.append(f"Conflict: {target} already exists and is not managed by flow") + force_required.append(f"Conflict: {target} already exists and is not managed by flow") - return conflicts + return force_required, fatal def _apply_link_spec(spec: LinkSpec, *, copy: bool, dry_run: bool) -> bool: @@ -847,39 +1005,87 @@ def _sync_to_desired( copy: bool, ) -> None: current = _load_link_specs_from_state() - conflicts = _validate_conflicts(desired, current, force) + previous = dict(current) + force_required, fatal = _validate_conflicts(desired, current) - if conflicts: - for conflict in conflicts: + if fatal: + for conflict in fatal: ctx.console.error(conflict) - if not force: - raise RuntimeError("Use --force to overwrite existing files") + raise RuntimeError("One or more targets are existing directories and cannot be overwritten") - for target in sorted(current.keys(), key=str): - if target in desired: - continue - use_sudo = not _is_in_home(target, Path.home()) - _remove_target(target, use_sudo=use_sudo, dry_run=dry_run) - del current[target] + if force_required and not force: + for conflict in force_required: + ctx.console.error(conflict) + raise RuntimeError("Use --force to overwrite existing files") - for target in sorted(desired.keys(), key=str): - spec = desired[target] + transaction: Optional[dict] = None + snapshots: Dict[Path, dict] = {} + if not dry_run: + transaction = _start_link_transaction(previous) + backup_dir = Path(transaction["backup_dir"]) - if _same_symlink(target, spec.source): - current[target] = spec - continue + def snapshot_before_change(target: Path) -> None: + if target in snapshots: + return + use_sudo = not _is_in_home(target, Path.home()) + snapshots[target] = _snapshot_target( + target, + use_sudo=use_sudo, + backup_dir=backup_dir, + index=len(snapshots) + 1, + ) - exists = target.exists() or target.is_symlink() - if exists: + try: + for target in sorted(current.keys(), key=str): + if target in desired: + continue + if not dry_run and transaction is not None and (target.exists() or target.is_symlink()): + snapshot_before_change(target) use_sudo = not _is_in_home(target, Path.home()) _remove_target(target, use_sudo=use_sudo, dry_run=dry_run) + del current[target] - applied = _apply_link_spec(spec, copy=copy, dry_run=dry_run) - if applied: - current[target] = spec + for target in sorted(desired.keys(), key=str): + spec = desired[target] + + if _same_symlink(target, spec.source): + current[target] = spec + continue + + if not dry_run and transaction is not None: + snapshot_before_change(target) + + exists = target.exists() or target.is_symlink() + if exists: + use_sudo = not _is_in_home(target, Path.home()) + _remove_target(target, use_sudo=use_sudo, dry_run=dry_run) + + applied = _apply_link_spec(spec, copy=copy, dry_run=dry_run) + if applied: + current[target] = spec + except Exception: + if not dry_run and transaction is not None: + transaction["targets"] = [ + {"target": str(target), "before": snapshots[target]} + for target in sorted(snapshots.keys(), key=str) + ] + transaction["incomplete"] = True + try: + _save_link_specs_to_state(current) + _save_last_link_transaction(transaction) + except Exception: + pass + raise if not dry_run: _save_link_specs_to_state(current) + if transaction is not None: + transaction["targets"] = [ + {"target": str(target), "before": snapshots[target]} + for target in sorted(snapshots.keys(), key=str) + ] + transaction["incomplete"] = False + _save_last_link_transaction(transaction) def _desired_links_for_profile( @@ -1012,9 +1218,61 @@ def run_unlink(ctx: FlowContext, args): del current[target] _save_link_specs_to_state(current) + _clear_last_link_transaction(remove_backups=True) ctx.console.success(f"Removed {removed} symlink(s)") +def run_undo(ctx: FlowContext, args): + transaction = _load_last_link_transaction() + if transaction is None: + ctx.console.info("No dotfiles link transaction to undo.") + return + + raw_targets = transaction.get("targets") + if not isinstance(raw_targets, list): + ctx.console.error("Invalid undo state format. Remove linked.json and relink dotfiles.") + sys.exit(1) + + restore_plan: List[Tuple[Path, dict]] = [] + for entry in raw_targets: + if not isinstance(entry, dict): + ctx.console.error("Invalid undo state format. Remove linked.json and relink dotfiles.") + sys.exit(1) + + target_raw = entry.get("target") + before = entry.get("before") + if not isinstance(target_raw, str) or not isinstance(before, dict): + ctx.console.error("Invalid undo state format. Remove linked.json and relink dotfiles.") + sys.exit(1) + restore_plan.append((Path(target_raw), before)) + + try: + # Restore deeper paths first to avoid parent/child ordering issues. + for target, snapshot in sorted( + restore_plan, + key=lambda item: (len(item[0].parts), str(item[0])), + reverse=True, + ): + _restore_target_snapshot(target, snapshot) + except RuntimeError as e: + ctx.console.error(str(e)) + sys.exit(1) + + previous_links = transaction.get("previous_links", {}) + try: + _parse_link_specs(previous_links) + except RuntimeError as e: + ctx.console.error(str(e)) + sys.exit(1) + + state = _load_state() + state["version"] = 2 + state["links"] = previous_links + _save_state(state) + _clear_last_link_transaction(remove_backups=True) + ctx.console.success(f"Undid {len(restore_plan)} change(s)") + + def run_status(ctx: FlowContext, args): try: current = _load_link_specs_from_state() @@ -1227,6 +1485,8 @@ def run_clean(ctx: FlowContext, args): if not args.dry_run: _save_link_specs_to_state(current) + if removed > 0: + _clear_last_link_transaction(remove_backups=True) if removed > 0: ctx.console.success(f"Cleaned {removed} broken symlink(s)") diff --git a/tests/test_cli.py b/tests/test_cli.py index b2fe771..9044703 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -55,11 +55,27 @@ def test_dotfiles_help(): assert "init" in result.stdout assert "link" in result.stdout assert "unlink" in result.stdout + assert "undo" in result.stdout assert "status" in result.stdout assert "sync" in result.stdout assert "repo" in result.stdout +def test_dotfiles_help_without_sudo_in_path(): + env = _clean_env() + env["PATH"] = os.path.dirname(sys.executable) + + result = subprocess.run( + [sys.executable, "-m", "flow", "dotfiles", "--help"], + capture_output=True, + text=True, + env=env, + ) + + assert result.returncode == 0 + assert "dotfiles" in result.stdout + + def test_bootstrap_help(): result = subprocess.run( [sys.executable, "-m", "flow", "bootstrap", "--help"], diff --git a/tests/test_completion.py b/tests/test_completion.py index 34fd975..fdf5d69 100644 --- a/tests/test_completion.py +++ b/tests/test_completion.py @@ -84,6 +84,11 @@ def test_complete_dotfiles_repo_subcommands(): assert out == ["pull", "push"] +def test_complete_dotfiles_top_level_includes_undo(): + out = completion.complete(["flow", "dotfiles", "u"], 3) + assert out == ["undo", "unlink"] + + def test_complete_dotfiles_modules_subcommands(): out = completion.complete(["flow", "dotfiles", "modules", "s"], 4) assert out == ["sync"] diff --git a/tests/test_dotfiles_e2e_container.py b/tests/test_dotfiles_e2e_container.py new file mode 100644 index 0000000..067a30f --- /dev/null +++ b/tests/test_dotfiles_e2e_container.py @@ -0,0 +1,276 @@ +"""Containerized e2e tests for dotfiles link safety. + +These tests are opt-in and run only when FLOW_RUN_E2E_CONTAINER=1. +""" + +import os +import shutil +import subprocess +import uuid +from pathlib import Path + +import pytest + + +REPO_ROOT = Path(__file__).resolve().parents[1] + + +def _docker_available() -> bool: + if shutil.which("docker") is None: + return False + + result = subprocess.run( + ["docker", "info"], + capture_output=True, + text=True, + check=False, + ) + return result.returncode == 0 + + +def _require_container_e2e() -> None: + if os.environ.get("FLOW_RUN_E2E_CONTAINER") != "1": + pytest.skip("Set FLOW_RUN_E2E_CONTAINER=1 to run container e2e tests") + if not _docker_available(): + pytest.skip("Docker is required for container e2e tests") + + +@pytest.fixture(scope="module") +def e2e_image(tmp_path_factory): + _require_container_e2e() + + context_dir = tmp_path_factory.mktemp("flow-e2e-docker-context") + dockerfile = context_dir / "Dockerfile" + dockerfile.write_text( + "FROM python:3.11-slim\n" + "RUN apt-get update && apt-get install -y --no-install-recommends sudo && rm -rf /var/lib/apt/lists/*\n" + "RUN pip install --no-cache-dir pyyaml\n" + "RUN useradd -m -s /bin/bash flow\n" + "RUN echo 'flow ALL=(ALL) NOPASSWD:ALL' > /etc/sudoers.d/flow && chmod 440 /etc/sudoers.d/flow\n" + "USER flow\n" + "WORKDIR /workspace\n" + ) + + tag = f"flow-e2e-{uuid.uuid4().hex[:10]}" + subprocess.run( + ["docker", "build", "-t", tag, str(context_dir)], + check=True, + capture_output=True, + text=True, + ) + + try: + yield tag + finally: + subprocess.run(["docker", "rmi", "-f", tag], capture_output=True, text=True, check=False) + + +def _run_in_container(image_tag: str, script: str) -> subprocess.CompletedProcess: + return subprocess.run( + [ + "docker", + "run", + "--rm", + "-v", + f"{REPO_ROOT}:/workspace/flow-cli:ro", + image_tag, + "bash", + "-lc", + script, + ], + capture_output=True, + text=True, + check=False, + ) + + +def _assert_ok(run: subprocess.CompletedProcess) -> None: + if run.returncode != 0: + raise AssertionError(f"Container e2e failed:\nSTDOUT:\n{run.stdout}\nSTDERR:\n{run.stderr}") + + +def test_e2e_link_and_undo_with_root_targets(e2e_image): + script = r""" +set -euo pipefail +export HOME=/home/flow +export XDG_DATA_HOME=/tmp/xdg-data +export XDG_CONFIG_HOME=/tmp/xdg-config +export XDG_STATE_HOME=/tmp/xdg-state +mkdir -p "$XDG_DATA_HOME/flow/dotfiles" "$XDG_CONFIG_HOME/flow" "$XDG_STATE_HOME/flow" + +dot="$XDG_DATA_HOME/flow/dotfiles" +mkdir -p "$dot/_shared/zsh" +mkdir -p "$dot/_shared/rootpkg/_root/tmp" +echo '# managed zshrc' > "$dot/_shared/zsh/.zshrc" +echo 'root-target' > "$dot/_shared/rootpkg/_root/tmp/flow-e2e-root-target" + +echo '# before' > "$HOME/.zshrc" + +PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --force +test -L "$HOME/.zshrc" +test -L /tmp/flow-e2e-root-target + +PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles undo +test -f "$HOME/.zshrc" +test ! -L "$HOME/.zshrc" +grep -q '^# before$' "$HOME/.zshrc" +test ! -e /tmp/flow-e2e-root-target +""" + _assert_ok(_run_in_container(e2e_image, script)) + + +def test_e2e_dry_run_force_is_read_only_in_both_flag_orders(e2e_image): + script = r""" +set -euo pipefail +export HOME=/home/flow +export XDG_DATA_HOME=/tmp/xdg-data +export XDG_CONFIG_HOME=/tmp/xdg-config +export XDG_STATE_HOME=/tmp/xdg-state +mkdir -p "$XDG_DATA_HOME/flow/dotfiles" "$XDG_CONFIG_HOME/flow" "$XDG_STATE_HOME/flow" + +dot="$XDG_DATA_HOME/flow/dotfiles" +mkdir -p "$dot/_shared/zsh" +echo '# managed zshrc' > "$dot/_shared/zsh/.zshrc" +echo '# original' > "$HOME/.zshrc" + +PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --dry-run --force +PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --force --dry-run + +test -f "$HOME/.zshrc" +test ! -L "$HOME/.zshrc" +grep -q '^# original$' "$HOME/.zshrc" + +state="$XDG_STATE_HOME/flow/linked.json" +if [ -f "$state" ]; then + python - "$state" <<'PY' +import json, sys +data = json.load(open(sys.argv[1], encoding="utf-8")) +assert data.get("links", {}) == {}, data +assert "last_transaction" not in data, data +PY +fi +""" + _assert_ok(_run_in_container(e2e_image, script)) + + +def test_e2e_unmanaged_conflict_without_force_is_non_destructive(e2e_image): + script = r""" +set -euo pipefail +export HOME=/home/flow +export XDG_DATA_HOME=/tmp/xdg-data +export XDG_CONFIG_HOME=/tmp/xdg-config +export XDG_STATE_HOME=/tmp/xdg-state +mkdir -p "$XDG_DATA_HOME/flow/dotfiles" "$XDG_CONFIG_HOME/flow" "$XDG_STATE_HOME/flow" + +dot="$XDG_DATA_HOME/flow/dotfiles" +mkdir -p "$dot/_shared/zsh" +echo '# managed zshrc' > "$dot/_shared/zsh/.zshrc" +echo '# user-file' > "$HOME/.zshrc" + +set +e +PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link +rc=$? +set -e +test "$rc" -ne 0 + +test -f "$HOME/.zshrc" +test ! -L "$HOME/.zshrc" +grep -q '^# user-file$' "$HOME/.zshrc" +""" + _assert_ok(_run_in_container(e2e_image, script)) + + +def test_e2e_managed_drift_requires_force(e2e_image): + script = r""" +set -euo pipefail +export HOME=/home/flow +export XDG_DATA_HOME=/tmp/xdg-data +export XDG_CONFIG_HOME=/tmp/xdg-config +export XDG_STATE_HOME=/tmp/xdg-state +mkdir -p "$XDG_DATA_HOME/flow/dotfiles" "$XDG_CONFIG_HOME/flow" "$XDG_STATE_HOME/flow" + +dot="$XDG_DATA_HOME/flow/dotfiles" +mkdir -p "$dot/_shared/zsh" +echo '# managed zshrc' > "$dot/_shared/zsh/.zshrc" + +PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --force +test -L "$HOME/.zshrc" + +rm -f "$HOME/.zshrc" +echo '# drifted-manual' > "$HOME/.zshrc" + +set +e +PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link +rc=$? +set -e +test "$rc" -ne 0 +test -f "$HOME/.zshrc" +test ! -L "$HOME/.zshrc" +grep -q '^# drifted-manual$' "$HOME/.zshrc" +""" + _assert_ok(_run_in_container(e2e_image, script)) + + +def test_e2e_directory_conflict_is_atomic_even_with_force(e2e_image): + script = r""" +set -euo pipefail +export HOME=/home/flow +export XDG_DATA_HOME=/tmp/xdg-data +export XDG_CONFIG_HOME=/tmp/xdg-config +export XDG_STATE_HOME=/tmp/xdg-state +mkdir -p "$XDG_DATA_HOME/flow/dotfiles" "$XDG_CONFIG_HOME/flow" "$XDG_STATE_HOME/flow" + +dot="$XDG_DATA_HOME/flow/dotfiles" +mkdir -p "$dot/_shared/zsh" "$dot/_shared/git" +echo '# managed zshrc' > "$dot/_shared/zsh/.zshrc" +echo '[user]' > "$dot/_shared/git/.gitconfig" + +mkdir -p "$HOME/.zshrc" + +set +e +PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --force +rc=$? +set -e +test "$rc" -ne 0 + +test -d "$HOME/.zshrc" +test ! -e "$HOME/.gitconfig" +""" + _assert_ok(_run_in_container(e2e_image, script)) + + +def test_e2e_undo_after_failed_followup_link_restores_last_transaction(e2e_image): + script = r""" +set -euo pipefail +export HOME=/home/flow +export XDG_DATA_HOME=/tmp/xdg-data +export XDG_CONFIG_HOME=/tmp/xdg-config +export XDG_STATE_HOME=/tmp/xdg-state +mkdir -p "$XDG_DATA_HOME/flow/dotfiles" "$XDG_CONFIG_HOME/flow" "$XDG_STATE_HOME/flow" + +dot="$XDG_DATA_HOME/flow/dotfiles" +mkdir -p "$dot/_shared/a" "$dot/_shared/b" +echo '# aaa' > "$dot/_shared/a/.a" +echo '# bbb' > "$dot/_shared/b/.b" + +echo '# pre-a' > "$HOME/.a" +echo '# pre-b' > "$HOME/.b" + +PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --force a +test -L "$HOME/.a" + +# Turn .b into a directory to force a fatal conflict, while .a stays desired and unchanged. +rm -f "$HOME/.b" +mkdir -p "$HOME/.b" +set +e +PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --force +rc=$? +set -e +test "$rc" -ne 0 + +PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles undo +test -f "$HOME/.a" +test ! -L "$HOME/.a" +grep -q '^# pre-a$' "$HOME/.a" +""" + _assert_ok(_run_in_container(e2e_image, script)) diff --git a/tests/test_dotfiles_folding.py b/tests/test_dotfiles_folding.py index 90c1a73..42262cc 100644 --- a/tests/test_dotfiles_folding.py +++ b/tests/test_dotfiles_folding.py @@ -1,5 +1,6 @@ """Tests for dotfiles link planning, root markers, and module sources.""" +from argparse import Namespace import json import subprocess from pathlib import Path @@ -11,9 +12,13 @@ from flow.commands.dotfiles import ( _collect_home_specs, _list_profiles, _load_link_specs_from_state, + _load_state, _pull_requires_ack, _resolved_package_source, + _run_sudo, + run_undo, _save_link_specs_to_state, + _sync_to_desired, _sync_modules, ) from flow.core.config import AppConfig, FlowContext @@ -273,3 +278,276 @@ def test_sync_modules_resolves_relative_source_independent_of_cwd(tmp_path, monk def test_pull_requires_ack_only_on_real_updates(): assert _pull_requires_ack("Already up to date.\n", "") is False assert _pull_requires_ack("Updating 123..456\n", "") is True + + +def test_sync_to_desired_dry_run_force_is_read_only(tmp_path, monkeypatch): + state_file = tmp_path / "linked.json" + monkeypatch.setattr("flow.commands.dotfiles.LINKED_STATE", state_file) + monkeypatch.setattr("flow.commands.dotfiles._is_in_home", lambda _path, _home: True) + + source = tmp_path / "source" / ".zshrc" + source.parent.mkdir(parents=True) + source.write_text("# new") + + target = tmp_path / "home" / ".zshrc" + target.parent.mkdir(parents=True) + target.write_text("# old") + + desired = { + target: LinkSpec( + source=source, + target=target, + package="_shared/zsh", + ) + } + + _sync_to_desired( + _ctx(), + desired, + force=True, + dry_run=True, + copy=False, + ) + + assert target.exists() + assert not target.is_symlink() + assert target.read_text() == "# old" + assert not state_file.exists() + + +def test_sync_to_desired_force_fails_before_any_writes_on_directory_conflict(tmp_path, monkeypatch): + state_file = tmp_path / "linked.json" + monkeypatch.setattr("flow.commands.dotfiles.LINKED_STATE", state_file) + monkeypatch.setattr("flow.commands.dotfiles._is_in_home", lambda _path, _home: True) + + source_root = tmp_path / "source" + source_root.mkdir() + source_ok = source_root / "ok" + source_ok.write_text("ok") + source_conflict = source_root / "conflict" + source_conflict.write_text("conflict") + + home = tmp_path / "home" + home.mkdir() + target_ok = home / "a-file" + target_conflict = home / "z-dir" + target_conflict.mkdir() + + desired = { + target_ok: LinkSpec(source=source_ok, target=target_ok, package="_shared/test"), + target_conflict: LinkSpec(source=source_conflict, target=target_conflict, package="_shared/test"), + } + + with pytest.raises(RuntimeError, match="cannot be overwritten"): + _sync_to_desired( + _ctx(), + desired, + force=True, + dry_run=False, + copy=False, + ) + + assert not target_ok.exists() + assert not target_ok.is_symlink() + assert not state_file.exists() + + +def test_undo_restores_previous_file_and_link_state(tmp_path, monkeypatch): + state_file = tmp_path / "linked.json" + monkeypatch.setattr("flow.commands.dotfiles.LINKED_STATE", state_file) + monkeypatch.setattr("flow.commands.dotfiles.LINK_BACKUP_DIR", tmp_path / "link-backups") + monkeypatch.setattr("flow.commands.dotfiles._is_in_home", lambda _path, _home: True) + + source = tmp_path / "source" / ".zshrc" + source.parent.mkdir(parents=True) + source.write_text("# managed") + + target = tmp_path / "home" / ".zshrc" + target.parent.mkdir(parents=True) + target.write_text("# previous") + + desired = { + target: LinkSpec( + source=source, + target=target, + package="_shared/zsh", + ) + } + + _sync_to_desired( + _ctx(), + desired, + force=True, + dry_run=False, + copy=False, + ) + + assert target.is_symlink() + + state_after_link = _load_state() + assert "last_transaction" in state_after_link + tx = state_after_link["last_transaction"] + assert isinstance(tx, dict) + assert tx.get("targets") + + run_undo(_ctx(), Namespace()) + + assert target.exists() + assert not target.is_symlink() + assert target.read_text() == "# previous" + + state_after_undo = _load_state() + assert state_after_undo.get("links") == {} + assert "last_transaction" not in state_after_undo + + +def test_sync_to_desired_persists_incomplete_transaction_on_failure(tmp_path, monkeypatch): + state_file = tmp_path / "linked.json" + monkeypatch.setattr("flow.commands.dotfiles.LINKED_STATE", state_file) + monkeypatch.setattr("flow.commands.dotfiles.LINK_BACKUP_DIR", tmp_path / "link-backups") + monkeypatch.setattr("flow.commands.dotfiles._is_in_home", lambda _path, _home: True) + + source = tmp_path / "source" + source.mkdir() + src_a = source / "a" + src_b = source / "b" + src_a.write_text("a") + src_b.write_text("b") + + home = tmp_path / "home" + home.mkdir() + target_a = home / "a" + target_b = home / "b" + target_a.write_text("old-a") + + desired = { + target_a: LinkSpec(source=src_a, target=target_a, package="_shared/test"), + target_b: LinkSpec(source=src_b, target=target_b, package="_shared/test"), + } + + call_count = {"n": 0} + + def _failing_apply(spec, *, copy, dry_run): # noqa: ARG001 + call_count["n"] += 1 + if call_count["n"] == 2: + raise RuntimeError("simulated failure") + spec.target.parent.mkdir(parents=True, exist_ok=True) + spec.target.symlink_to(spec.source) + return True + + monkeypatch.setattr("flow.commands.dotfiles._apply_link_spec", _failing_apply) + + with pytest.raises(RuntimeError, match="simulated failure"): + _sync_to_desired( + _ctx(), + desired, + force=True, + dry_run=False, + copy=False, + ) + + state_after_failure = _load_state() + tx = state_after_failure.get("last_transaction") + assert isinstance(tx, dict) + assert tx.get("incomplete") is True + assert target_a.is_symlink() + + run_undo(_ctx(), Namespace()) + + assert target_a.exists() + assert not target_a.is_symlink() + assert target_a.read_text() == "old-a" + assert not target_b.exists() + assert _load_state().get("links") == {} + + +def test_sync_to_desired_requires_force_to_remove_modified_managed_target(tmp_path, monkeypatch): + state_file = tmp_path / "linked.json" + monkeypatch.setattr("flow.commands.dotfiles.LINKED_STATE", state_file) + monkeypatch.setattr("flow.commands.dotfiles._is_in_home", lambda _path, _home: True) + + source = tmp_path / "source" / ".old" + source.parent.mkdir(parents=True) + source.write_text("managed") + + target = tmp_path / "home" / ".zshrc" + target.parent.mkdir(parents=True) + target.write_text("user-edited") + + _save_link_specs_to_state( + { + target: LinkSpec( + source=source, + target=target, + package="_shared/zsh", + ) + } + ) + + with pytest.raises(RuntimeError, match="Use --force"): + _sync_to_desired( + _ctx(), + {}, + force=False, + dry_run=False, + copy=False, + ) + + assert target.exists() + assert not target.is_symlink() + assert target.read_text() == "user-edited" + assert target in _load_link_specs_from_state() + + +def test_sync_to_desired_requires_force_to_replace_modified_managed_target(tmp_path, monkeypatch): + state_file = tmp_path / "linked.json" + monkeypatch.setattr("flow.commands.dotfiles.LINKED_STATE", state_file) + monkeypatch.setattr("flow.commands.dotfiles._is_in_home", lambda _path, _home: True) + + old_source = tmp_path / "source" / ".old" + new_source = tmp_path / "source" / ".new" + old_source.parent.mkdir(parents=True) + old_source.write_text("managed-old") + new_source.write_text("managed-new") + + target = tmp_path / "home" / ".gitconfig" + target.parent.mkdir(parents=True) + target.write_text("manual-file") + + _save_link_specs_to_state( + { + target: LinkSpec( + source=old_source, + target=target, + package="_shared/git", + ) + } + ) + + desired = { + target: LinkSpec( + source=new_source, + target=target, + package="_shared/git", + ) + } + + with pytest.raises(RuntimeError, match="Use --force"): + _sync_to_desired( + _ctx(), + desired, + force=False, + dry_run=False, + copy=False, + ) + + assert target.exists() + assert not target.is_symlink() + assert target.read_text() == "manual-file" + assert _load_link_specs_from_state()[target].source == old_source + + +def test_run_sudo_errors_when_binary_missing(monkeypatch): + monkeypatch.setattr("flow.commands.dotfiles.shutil.which", lambda _name: None) + with pytest.raises(RuntimeError, match="sudo is required"): + _run_sudo(["true"], dry_run=False)