This commit is contained in:
2026-02-25 17:20:43 +02:00
parent 5896b43221
commit 24d682adf1
9 changed files with 877 additions and 53 deletions

View File

@@ -55,11 +55,27 @@ def test_dotfiles_help():
assert "init" in result.stdout
assert "link" in result.stdout
assert "unlink" in result.stdout
assert "undo" in result.stdout
assert "status" in result.stdout
assert "sync" in result.stdout
assert "repo" in result.stdout
def test_dotfiles_help_without_sudo_in_path():
env = _clean_env()
env["PATH"] = os.path.dirname(sys.executable)
result = subprocess.run(
[sys.executable, "-m", "flow", "dotfiles", "--help"],
capture_output=True,
text=True,
env=env,
)
assert result.returncode == 0
assert "dotfiles" in result.stdout
def test_bootstrap_help():
result = subprocess.run(
[sys.executable, "-m", "flow", "bootstrap", "--help"],

View File

@@ -84,6 +84,11 @@ def test_complete_dotfiles_repo_subcommands():
assert out == ["pull", "push"]
def test_complete_dotfiles_top_level_includes_undo():
out = completion.complete(["flow", "dotfiles", "u"], 3)
assert out == ["undo", "unlink"]
def test_complete_dotfiles_modules_subcommands():
out = completion.complete(["flow", "dotfiles", "modules", "s"], 4)
assert out == ["sync"]

View File

@@ -0,0 +1,276 @@
"""Containerized e2e tests for dotfiles link safety.
These tests are opt-in and run only when FLOW_RUN_E2E_CONTAINER=1.
"""
import os
import shutil
import subprocess
import uuid
from pathlib import Path
import pytest
REPO_ROOT = Path(__file__).resolve().parents[1]
def _docker_available() -> bool:
if shutil.which("docker") is None:
return False
result = subprocess.run(
["docker", "info"],
capture_output=True,
text=True,
check=False,
)
return result.returncode == 0
def _require_container_e2e() -> None:
if os.environ.get("FLOW_RUN_E2E_CONTAINER") != "1":
pytest.skip("Set FLOW_RUN_E2E_CONTAINER=1 to run container e2e tests")
if not _docker_available():
pytest.skip("Docker is required for container e2e tests")
@pytest.fixture(scope="module")
def e2e_image(tmp_path_factory):
_require_container_e2e()
context_dir = tmp_path_factory.mktemp("flow-e2e-docker-context")
dockerfile = context_dir / "Dockerfile"
dockerfile.write_text(
"FROM python:3.11-slim\n"
"RUN apt-get update && apt-get install -y --no-install-recommends sudo && rm -rf /var/lib/apt/lists/*\n"
"RUN pip install --no-cache-dir pyyaml\n"
"RUN useradd -m -s /bin/bash flow\n"
"RUN echo 'flow ALL=(ALL) NOPASSWD:ALL' > /etc/sudoers.d/flow && chmod 440 /etc/sudoers.d/flow\n"
"USER flow\n"
"WORKDIR /workspace\n"
)
tag = f"flow-e2e-{uuid.uuid4().hex[:10]}"
subprocess.run(
["docker", "build", "-t", tag, str(context_dir)],
check=True,
capture_output=True,
text=True,
)
try:
yield tag
finally:
subprocess.run(["docker", "rmi", "-f", tag], capture_output=True, text=True, check=False)
def _run_in_container(image_tag: str, script: str) -> subprocess.CompletedProcess:
return subprocess.run(
[
"docker",
"run",
"--rm",
"-v",
f"{REPO_ROOT}:/workspace/flow-cli:ro",
image_tag,
"bash",
"-lc",
script,
],
capture_output=True,
text=True,
check=False,
)
def _assert_ok(run: subprocess.CompletedProcess) -> None:
if run.returncode != 0:
raise AssertionError(f"Container e2e failed:\nSTDOUT:\n{run.stdout}\nSTDERR:\n{run.stderr}")
def test_e2e_link_and_undo_with_root_targets(e2e_image):
script = r"""
set -euo pipefail
export HOME=/home/flow
export XDG_DATA_HOME=/tmp/xdg-data
export XDG_CONFIG_HOME=/tmp/xdg-config
export XDG_STATE_HOME=/tmp/xdg-state
mkdir -p "$XDG_DATA_HOME/flow/dotfiles" "$XDG_CONFIG_HOME/flow" "$XDG_STATE_HOME/flow"
dot="$XDG_DATA_HOME/flow/dotfiles"
mkdir -p "$dot/_shared/zsh"
mkdir -p "$dot/_shared/rootpkg/_root/tmp"
echo '# managed zshrc' > "$dot/_shared/zsh/.zshrc"
echo 'root-target' > "$dot/_shared/rootpkg/_root/tmp/flow-e2e-root-target"
echo '# before' > "$HOME/.zshrc"
PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --force
test -L "$HOME/.zshrc"
test -L /tmp/flow-e2e-root-target
PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles undo
test -f "$HOME/.zshrc"
test ! -L "$HOME/.zshrc"
grep -q '^# before$' "$HOME/.zshrc"
test ! -e /tmp/flow-e2e-root-target
"""
_assert_ok(_run_in_container(e2e_image, script))
def test_e2e_dry_run_force_is_read_only_in_both_flag_orders(e2e_image):
script = r"""
set -euo pipefail
export HOME=/home/flow
export XDG_DATA_HOME=/tmp/xdg-data
export XDG_CONFIG_HOME=/tmp/xdg-config
export XDG_STATE_HOME=/tmp/xdg-state
mkdir -p "$XDG_DATA_HOME/flow/dotfiles" "$XDG_CONFIG_HOME/flow" "$XDG_STATE_HOME/flow"
dot="$XDG_DATA_HOME/flow/dotfiles"
mkdir -p "$dot/_shared/zsh"
echo '# managed zshrc' > "$dot/_shared/zsh/.zshrc"
echo '# original' > "$HOME/.zshrc"
PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --dry-run --force
PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --force --dry-run
test -f "$HOME/.zshrc"
test ! -L "$HOME/.zshrc"
grep -q '^# original$' "$HOME/.zshrc"
state="$XDG_STATE_HOME/flow/linked.json"
if [ -f "$state" ]; then
python - "$state" <<'PY'
import json, sys
data = json.load(open(sys.argv[1], encoding="utf-8"))
assert data.get("links", {}) == {}, data
assert "last_transaction" not in data, data
PY
fi
"""
_assert_ok(_run_in_container(e2e_image, script))
def test_e2e_unmanaged_conflict_without_force_is_non_destructive(e2e_image):
script = r"""
set -euo pipefail
export HOME=/home/flow
export XDG_DATA_HOME=/tmp/xdg-data
export XDG_CONFIG_HOME=/tmp/xdg-config
export XDG_STATE_HOME=/tmp/xdg-state
mkdir -p "$XDG_DATA_HOME/flow/dotfiles" "$XDG_CONFIG_HOME/flow" "$XDG_STATE_HOME/flow"
dot="$XDG_DATA_HOME/flow/dotfiles"
mkdir -p "$dot/_shared/zsh"
echo '# managed zshrc' > "$dot/_shared/zsh/.zshrc"
echo '# user-file' > "$HOME/.zshrc"
set +e
PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link
rc=$?
set -e
test "$rc" -ne 0
test -f "$HOME/.zshrc"
test ! -L "$HOME/.zshrc"
grep -q '^# user-file$' "$HOME/.zshrc"
"""
_assert_ok(_run_in_container(e2e_image, script))
def test_e2e_managed_drift_requires_force(e2e_image):
script = r"""
set -euo pipefail
export HOME=/home/flow
export XDG_DATA_HOME=/tmp/xdg-data
export XDG_CONFIG_HOME=/tmp/xdg-config
export XDG_STATE_HOME=/tmp/xdg-state
mkdir -p "$XDG_DATA_HOME/flow/dotfiles" "$XDG_CONFIG_HOME/flow" "$XDG_STATE_HOME/flow"
dot="$XDG_DATA_HOME/flow/dotfiles"
mkdir -p "$dot/_shared/zsh"
echo '# managed zshrc' > "$dot/_shared/zsh/.zshrc"
PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --force
test -L "$HOME/.zshrc"
rm -f "$HOME/.zshrc"
echo '# drifted-manual' > "$HOME/.zshrc"
set +e
PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link
rc=$?
set -e
test "$rc" -ne 0
test -f "$HOME/.zshrc"
test ! -L "$HOME/.zshrc"
grep -q '^# drifted-manual$' "$HOME/.zshrc"
"""
_assert_ok(_run_in_container(e2e_image, script))
def test_e2e_directory_conflict_is_atomic_even_with_force(e2e_image):
script = r"""
set -euo pipefail
export HOME=/home/flow
export XDG_DATA_HOME=/tmp/xdg-data
export XDG_CONFIG_HOME=/tmp/xdg-config
export XDG_STATE_HOME=/tmp/xdg-state
mkdir -p "$XDG_DATA_HOME/flow/dotfiles" "$XDG_CONFIG_HOME/flow" "$XDG_STATE_HOME/flow"
dot="$XDG_DATA_HOME/flow/dotfiles"
mkdir -p "$dot/_shared/zsh" "$dot/_shared/git"
echo '# managed zshrc' > "$dot/_shared/zsh/.zshrc"
echo '[user]' > "$dot/_shared/git/.gitconfig"
mkdir -p "$HOME/.zshrc"
set +e
PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --force
rc=$?
set -e
test "$rc" -ne 0
test -d "$HOME/.zshrc"
test ! -e "$HOME/.gitconfig"
"""
_assert_ok(_run_in_container(e2e_image, script))
def test_e2e_undo_after_failed_followup_link_restores_last_transaction(e2e_image):
script = r"""
set -euo pipefail
export HOME=/home/flow
export XDG_DATA_HOME=/tmp/xdg-data
export XDG_CONFIG_HOME=/tmp/xdg-config
export XDG_STATE_HOME=/tmp/xdg-state
mkdir -p "$XDG_DATA_HOME/flow/dotfiles" "$XDG_CONFIG_HOME/flow" "$XDG_STATE_HOME/flow"
dot="$XDG_DATA_HOME/flow/dotfiles"
mkdir -p "$dot/_shared/a" "$dot/_shared/b"
echo '# aaa' > "$dot/_shared/a/.a"
echo '# bbb' > "$dot/_shared/b/.b"
echo '# pre-a' > "$HOME/.a"
echo '# pre-b' > "$HOME/.b"
PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --force a
test -L "$HOME/.a"
# Turn .b into a directory to force a fatal conflict, while .a stays desired and unchanged.
rm -f "$HOME/.b"
mkdir -p "$HOME/.b"
set +e
PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --force
rc=$?
set -e
test "$rc" -ne 0
PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles undo
test -f "$HOME/.a"
test ! -L "$HOME/.a"
grep -q '^# pre-a$' "$HOME/.a"
"""
_assert_ok(_run_in_container(e2e_image, script))

View File

@@ -1,5 +1,6 @@
"""Tests for dotfiles link planning, root markers, and module sources."""
from argparse import Namespace
import json
import subprocess
from pathlib import Path
@@ -11,9 +12,13 @@ from flow.commands.dotfiles import (
_collect_home_specs,
_list_profiles,
_load_link_specs_from_state,
_load_state,
_pull_requires_ack,
_resolved_package_source,
_run_sudo,
run_undo,
_save_link_specs_to_state,
_sync_to_desired,
_sync_modules,
)
from flow.core.config import AppConfig, FlowContext
@@ -273,3 +278,276 @@ def test_sync_modules_resolves_relative_source_independent_of_cwd(tmp_path, monk
def test_pull_requires_ack_only_on_real_updates():
assert _pull_requires_ack("Already up to date.\n", "") is False
assert _pull_requires_ack("Updating 123..456\n", "") is True
def test_sync_to_desired_dry_run_force_is_read_only(tmp_path, monkeypatch):
state_file = tmp_path / "linked.json"
monkeypatch.setattr("flow.commands.dotfiles.LINKED_STATE", state_file)
monkeypatch.setattr("flow.commands.dotfiles._is_in_home", lambda _path, _home: True)
source = tmp_path / "source" / ".zshrc"
source.parent.mkdir(parents=True)
source.write_text("# new")
target = tmp_path / "home" / ".zshrc"
target.parent.mkdir(parents=True)
target.write_text("# old")
desired = {
target: LinkSpec(
source=source,
target=target,
package="_shared/zsh",
)
}
_sync_to_desired(
_ctx(),
desired,
force=True,
dry_run=True,
copy=False,
)
assert target.exists()
assert not target.is_symlink()
assert target.read_text() == "# old"
assert not state_file.exists()
def test_sync_to_desired_force_fails_before_any_writes_on_directory_conflict(tmp_path, monkeypatch):
state_file = tmp_path / "linked.json"
monkeypatch.setattr("flow.commands.dotfiles.LINKED_STATE", state_file)
monkeypatch.setattr("flow.commands.dotfiles._is_in_home", lambda _path, _home: True)
source_root = tmp_path / "source"
source_root.mkdir()
source_ok = source_root / "ok"
source_ok.write_text("ok")
source_conflict = source_root / "conflict"
source_conflict.write_text("conflict")
home = tmp_path / "home"
home.mkdir()
target_ok = home / "a-file"
target_conflict = home / "z-dir"
target_conflict.mkdir()
desired = {
target_ok: LinkSpec(source=source_ok, target=target_ok, package="_shared/test"),
target_conflict: LinkSpec(source=source_conflict, target=target_conflict, package="_shared/test"),
}
with pytest.raises(RuntimeError, match="cannot be overwritten"):
_sync_to_desired(
_ctx(),
desired,
force=True,
dry_run=False,
copy=False,
)
assert not target_ok.exists()
assert not target_ok.is_symlink()
assert not state_file.exists()
def test_undo_restores_previous_file_and_link_state(tmp_path, monkeypatch):
state_file = tmp_path / "linked.json"
monkeypatch.setattr("flow.commands.dotfiles.LINKED_STATE", state_file)
monkeypatch.setattr("flow.commands.dotfiles.LINK_BACKUP_DIR", tmp_path / "link-backups")
monkeypatch.setattr("flow.commands.dotfiles._is_in_home", lambda _path, _home: True)
source = tmp_path / "source" / ".zshrc"
source.parent.mkdir(parents=True)
source.write_text("# managed")
target = tmp_path / "home" / ".zshrc"
target.parent.mkdir(parents=True)
target.write_text("# previous")
desired = {
target: LinkSpec(
source=source,
target=target,
package="_shared/zsh",
)
}
_sync_to_desired(
_ctx(),
desired,
force=True,
dry_run=False,
copy=False,
)
assert target.is_symlink()
state_after_link = _load_state()
assert "last_transaction" in state_after_link
tx = state_after_link["last_transaction"]
assert isinstance(tx, dict)
assert tx.get("targets")
run_undo(_ctx(), Namespace())
assert target.exists()
assert not target.is_symlink()
assert target.read_text() == "# previous"
state_after_undo = _load_state()
assert state_after_undo.get("links") == {}
assert "last_transaction" not in state_after_undo
def test_sync_to_desired_persists_incomplete_transaction_on_failure(tmp_path, monkeypatch):
state_file = tmp_path / "linked.json"
monkeypatch.setattr("flow.commands.dotfiles.LINKED_STATE", state_file)
monkeypatch.setattr("flow.commands.dotfiles.LINK_BACKUP_DIR", tmp_path / "link-backups")
monkeypatch.setattr("flow.commands.dotfiles._is_in_home", lambda _path, _home: True)
source = tmp_path / "source"
source.mkdir()
src_a = source / "a"
src_b = source / "b"
src_a.write_text("a")
src_b.write_text("b")
home = tmp_path / "home"
home.mkdir()
target_a = home / "a"
target_b = home / "b"
target_a.write_text("old-a")
desired = {
target_a: LinkSpec(source=src_a, target=target_a, package="_shared/test"),
target_b: LinkSpec(source=src_b, target=target_b, package="_shared/test"),
}
call_count = {"n": 0}
def _failing_apply(spec, *, copy, dry_run): # noqa: ARG001
call_count["n"] += 1
if call_count["n"] == 2:
raise RuntimeError("simulated failure")
spec.target.parent.mkdir(parents=True, exist_ok=True)
spec.target.symlink_to(spec.source)
return True
monkeypatch.setattr("flow.commands.dotfiles._apply_link_spec", _failing_apply)
with pytest.raises(RuntimeError, match="simulated failure"):
_sync_to_desired(
_ctx(),
desired,
force=True,
dry_run=False,
copy=False,
)
state_after_failure = _load_state()
tx = state_after_failure.get("last_transaction")
assert isinstance(tx, dict)
assert tx.get("incomplete") is True
assert target_a.is_symlink()
run_undo(_ctx(), Namespace())
assert target_a.exists()
assert not target_a.is_symlink()
assert target_a.read_text() == "old-a"
assert not target_b.exists()
assert _load_state().get("links") == {}
def test_sync_to_desired_requires_force_to_remove_modified_managed_target(tmp_path, monkeypatch):
state_file = tmp_path / "linked.json"
monkeypatch.setattr("flow.commands.dotfiles.LINKED_STATE", state_file)
monkeypatch.setattr("flow.commands.dotfiles._is_in_home", lambda _path, _home: True)
source = tmp_path / "source" / ".old"
source.parent.mkdir(parents=True)
source.write_text("managed")
target = tmp_path / "home" / ".zshrc"
target.parent.mkdir(parents=True)
target.write_text("user-edited")
_save_link_specs_to_state(
{
target: LinkSpec(
source=source,
target=target,
package="_shared/zsh",
)
}
)
with pytest.raises(RuntimeError, match="Use --force"):
_sync_to_desired(
_ctx(),
{},
force=False,
dry_run=False,
copy=False,
)
assert target.exists()
assert not target.is_symlink()
assert target.read_text() == "user-edited"
assert target in _load_link_specs_from_state()
def test_sync_to_desired_requires_force_to_replace_modified_managed_target(tmp_path, monkeypatch):
state_file = tmp_path / "linked.json"
monkeypatch.setattr("flow.commands.dotfiles.LINKED_STATE", state_file)
monkeypatch.setattr("flow.commands.dotfiles._is_in_home", lambda _path, _home: True)
old_source = tmp_path / "source" / ".old"
new_source = tmp_path / "source" / ".new"
old_source.parent.mkdir(parents=True)
old_source.write_text("managed-old")
new_source.write_text("managed-new")
target = tmp_path / "home" / ".gitconfig"
target.parent.mkdir(parents=True)
target.write_text("manual-file")
_save_link_specs_to_state(
{
target: LinkSpec(
source=old_source,
target=target,
package="_shared/git",
)
}
)
desired = {
target: LinkSpec(
source=new_source,
target=target,
package="_shared/git",
)
}
with pytest.raises(RuntimeError, match="Use --force"):
_sync_to_desired(
_ctx(),
desired,
force=False,
dry_run=False,
copy=False,
)
assert target.exists()
assert not target.is_symlink()
assert target.read_text() == "manual-file"
assert _load_link_specs_from_state()[target].source == old_source
def test_run_sudo_errors_when_binary_missing(monkeypatch):
monkeypatch.setattr("flow.commands.dotfiles.shutil.which", lambda _name: None)
with pytest.raises(RuntimeError, match="sudo is required"):
_run_sudo(["true"], dry_run=False)