refactor: post-review hardening pass

Independent re-audit surfaced 11 follow-ups across two layers of review
(my fresh-eyes read + a parallel agent pass). Bundled into a single
commit because changes are small and intertwined.

Symlink / state consistency:
- FileSystem.same_symlink now uses raw readlink() instead of resolve().
  Aligns the three sites that ask "is this our link?" (_load_state,
  _check_overwrite_safe, remove_symlink) on a single rule: exact-readlink
  match. Following symlink chains would let externally-modified links
  pass as ours and be silently overwritten.
- LinkedState.from_dict raises ConfigError on missing required fields
  instead of .get(..., False) silent defaults. Matches InstalledState.
- LinkOp.source is now consistently None for remove_link ops; the
  service derives expected_source from current.links. Removes the
  asymmetry between in-state and orphan-broken removal ops.
- _apply_plan: rename shadowing local from link_target to spec.

Fail loud:
- _xdg() now treats XDG_CONFIG_HOME="" the same as unset. Previously
  an empty env var produced Path("") and state files were written to
  $PWD instead of ~/.local/state/flow.
- _resolve_target raises PlanConflict when a package contains a bare
  _root entry (no path components) instead of silently dropping it.
- _strip_prefix raises FlowError when a declared install path does not
  start with its section's expected prefix (e.g. etc/foo under install.bin).

Speculative abstraction removed (CLAUDE.md):
- core.template.substitute (the $VAR form) had no production callers --
  deleted along with its tests; only the {{var}} form remains.
- SetupModule base class -- five subclasses, no shared behaviour, no
  polymorphic call site. Deleted.
- Profile.arch -- parsed but never read. Deleted.
- PackagePlan.pm_command -- set but never read. Deleted (service
  recomputes pm_install_command at the call site).
- FileSystem.ensure_dir(mode=...), .copy_file(sudo=...), .read_text(
  default=...) -- no callers. Deleted along with their test.
- bootstrap _execute_action: the upfront `phase not in VALID_PHASES`
  check duplicated the trailing exhaustive raise. Kept the trailing
  raise as the single source of truth; phase set still documented in
  VALID_PHASES.

Completion ctx threading:
- Removed _config()/_manifest() helpers that re-loaded from disk on
  every completion call. _list_targets, _list_namespaces, _list_platforms,
  _list_bootstrap_profiles, _list_manifest_packages now take ctx and
  read from ctx.config / ctx.manifest.

Test coverage and e2e:
- e2e container test exercises a real `flow dotfiles link` (no dry-run)
  and asserts the resulting symlinks point into the dotfiles dir;
  reruns to verify idempotency.
- New tests: LinkedState corrupt-state ConfigError, LinkedState bad-version
  ConfigError, bare-_root PlanConflict, service-level _root path routing
  + skip semantics.
- 11 stale test imports removed (pyflakes clean across src/ + tests/).

357 unit tests + 1 e2e (gated) all pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-14 00:23:06 +03:00
parent 6a0eb9f6ef
commit 6b7a48bb20
32 changed files with 239 additions and 181 deletions

View File

@@ -0,0 +1 @@
{"sessionId":"f8703b33-7153-4d74-8fcd-4901f99605ae","pid":157025,"procStart":"20988217","acquiredAt":1778702857140}

View File

@@ -7,7 +7,7 @@ import subprocess
from pathlib import Path
from typing import Sequence
from flow.core.config import FlowContext, load_config, load_manifest
from flow.core.config import FlowContext
from flow.core import paths
from flow.core.errors import FlowError
from flow.domain.remote.resolution import HOST_TEMPLATES
@@ -57,13 +57,13 @@ def complete(ctx: FlowContext, words: Sequence[str], cword: int) -> list[str]:
command = _canonical_command(before[0])
if command == "enter" or command == "remote":
return _complete_remote(before, current)
return _complete_remote(ctx, before, current)
if command == "dev":
return _complete_dev(ctx, before, current)
if command == "dotfiles":
return _complete_dotfiles(before, current)
if command == "setup":
return _complete_setup(before, current)
return _complete_setup(ctx, before, current)
if command == "packages":
return _complete_packages(ctx, before, current)
if command == "projects":
@@ -104,37 +104,24 @@ def _filter(candidates: Sequence[str], prefix: str) -> list[str]:
return [candidate for candidate in unique if candidate.startswith(prefix)]
def _config():
return load_config()
def _list_targets(ctx: FlowContext) -> list[str]:
return sorted({f"{target.namespace}@{target.platform}" for target in ctx.config.targets})
def _manifest():
return load_manifest()
def _list_namespaces(ctx: FlowContext) -> list[str]:
return sorted({target.namespace for target in ctx.config.targets})
def _list_targets() -> list[str]:
cfg = _config()
return sorted({f"{target.namespace}@{target.platform}" for target in cfg.targets})
def _list_platforms(ctx: FlowContext) -> list[str]:
return sorted(set(HOST_TEMPLATES) | {target.platform for target in ctx.config.targets})
def _list_namespaces() -> list[str]:
cfg = _config()
return sorted({target.namespace for target in cfg.targets})
def _list_bootstrap_profiles(ctx: FlowContext) -> list[str]:
return sorted(ctx.manifest.get("profiles", {}).keys())
def _list_platforms() -> list[str]:
cfg = _config()
return sorted(set(HOST_TEMPLATES) | {target.platform for target in cfg.targets})
def _list_bootstrap_profiles() -> list[str]:
manifest = _manifest()
return sorted(manifest.get("profiles", {}).keys())
def _list_manifest_packages() -> list[str]:
manifest = _manifest()
packages = manifest.get("packages", [])
def _list_manifest_packages(ctx: FlowContext) -> list[str]:
packages = ctx.manifest.get("packages", [])
names: set[str] = set()
if isinstance(packages, list):
for package in packages:
@@ -219,7 +206,7 @@ def _profile_from_before(before: Sequence[str]) -> str | None:
return None
def _complete_remote(before: Sequence[str], current: str) -> list[str]:
def _complete_remote(ctx: FlowContext, before: Sequence[str], current: str) -> list[str]:
if before[0] == "remote":
if len(before) == 1:
return _filter(["enter", "list"], current)
@@ -231,9 +218,9 @@ def _complete_remote(before: Sequence[str], current: str) -> list[str]:
enter_tokens = before
if enter_tokens and enter_tokens[-1] in {"-p", "--platform"}:
return _filter(_list_platforms(), current)
return _filter(_list_platforms(ctx), current)
if enter_tokens and enter_tokens[-1] in {"-n", "--namespace"}:
return _filter(_list_namespaces(), current)
return _filter(_list_namespaces(ctx), current)
if current.startswith("-"):
return _filter(
[
@@ -246,7 +233,7 @@ def _complete_remote(before: Sequence[str], current: str) -> list[str]:
],
current,
)
return _filter(_list_targets(), current)
return _filter(_list_targets(ctx), current)
def _complete_dev(ctx: FlowContext, before: Sequence[str], current: str) -> list[str]:
@@ -321,20 +308,20 @@ def _complete_dotfiles(before: Sequence[str], current: str) -> list[str]:
return []
def _complete_setup(before: Sequence[str], current: str) -> list[str]:
def _complete_setup(ctx: FlowContext, before: Sequence[str], current: str) -> list[str]:
if len(before) <= 1:
return _filter(["run", "show", "list"], current)
subcommand = before[1]
if subcommand == "run":
if before and before[-1] == "--profile":
return _filter(_list_bootstrap_profiles(), current)
return _filter(_list_bootstrap_profiles(ctx), current)
if current.startswith("-"):
return _filter(["--profile", "--dry-run", "--var"], current)
return _filter(_list_bootstrap_profiles(), current)
return _filter(_list_bootstrap_profiles(ctx), current)
if subcommand == "show":
return _filter(_list_bootstrap_profiles(), current) if not current.startswith("-") else []
return _filter(_list_bootstrap_profiles(ctx), current) if not current.startswith("-") else []
return []
@@ -346,10 +333,10 @@ def _complete_packages(ctx: FlowContext, before: Sequence[str], current: str) ->
subcommand = before[1]
if subcommand == "install":
if before and before[-1] == "--profile":
return _filter(_list_bootstrap_profiles(), current)
return _filter(_list_bootstrap_profiles(ctx), current)
if current.startswith("-"):
return _filter(["--profile", "--dry-run"], current)
return _filter(_list_manifest_packages(), current)
return _filter(_list_manifest_packages(ctx), current)
if subcommand == "remove":
if current.startswith("-"):

View File

@@ -5,7 +5,15 @@ from pathlib import Path
def _xdg(env_var: str, fallback: str) -> Path:
return Path(os.environ.get(env_var, fallback))
"""Read an XDG path env var, treating empty string as unset.
The XDG basedir spec says an unset OR empty value should fall through
to the default; `os.environ.get` only handles "unset". Without this
guard, `XDG_CONFIG_HOME=""` resolves to `Path("")` and state files
are written relative to $PWD.
"""
value = os.environ.get(env_var)
return Path(value) if value else Path(fallback)
HOME = Path.home()

View File

@@ -82,20 +82,14 @@ class CommandRunner:
class FileSystem:
"""Filesystem wrapper for all mutating operations."""
def ensure_dir(self, path: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None, mode: Optional[int] = None) -> None:
def ensure_dir(self, path: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None) -> None:
if sudo:
if runner is None:
raise FlowError("Runner required for sudo operations")
runner.require_binary("sudo")
argv: list[str] = ["sudo", "mkdir", "-p"]
if mode is not None:
argv.extend(["-m", f"{mode:o}"])
argv.append(str(path))
runner.run(argv, check=True)
runner.run(["sudo", "mkdir", "-p", str(path)], check=True)
return
path.mkdir(parents=True, exist_ok=True)
if mode is not None:
path.chmod(mode)
def remove_file(self, path: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None, missing_ok: bool = True) -> None:
if sudo:
@@ -120,13 +114,7 @@ class FileSystem:
if not missing_ok:
raise
def copy_file(self, source: Path, target: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None) -> None:
if sudo:
if runner is None:
raise FlowError("Runner required for sudo operations")
self.ensure_dir(target.parent, sudo=True, runner=runner)
runner.run(["sudo", "cp", "-a", str(source), str(target)], check=True)
return
def copy_file(self, source: Path, target: Path) -> None:
self.ensure_dir(target.parent)
shutil.copy2(source, target)
@@ -190,17 +178,19 @@ class FileSystem:
target.unlink()
def same_symlink(self, target: Path, source: Path) -> bool:
"""True iff target is a symlink whose readlink exactly equals source.
Uses raw readlink (not resolve) so the answer is consistent with the
readlink-based checks in _check_overwrite_safe and remove_symlink.
Following symlink chains would let externally-modified links pass as
"ours" and silently overwrite them; raw match is the safer rule.
"""
if not target.is_symlink():
return False
return target.resolve(strict=False) == source.resolve(strict=False)
return Path(target.readlink()) == Path(source)
def read_text(self, path: Path, *, default: Optional[str] = None) -> str:
try:
return path.read_text(encoding="utf-8")
except FileNotFoundError:
if default is None:
raise
return default
def read_text(self, path: Path) -> str:
return path.read_text(encoding="utf-8")
def write_text(self, path: Path, content: str) -> None:
self.ensure_dir(path.parent)

View File

@@ -1,4 +1,4 @@
"""Variable and template substitution -- pure functions, no I/O."""
"""Template substitution -- pure functions, no I/O."""
import os
import re
@@ -9,24 +9,6 @@ from flow.core.errors import ConfigError
_UNRESOLVED = object()
def substitute(text: Any, variables: Dict[str, str]) -> Any:
"""Replace $VAR and ${VAR} with values from variables dict or env."""
if not isinstance(text, str):
return text
pattern = re.compile(r"\$(\w+)|\$\{([^}]+)\}")
def _replace(match: re.Match[str]) -> str:
key = match.group(1) or match.group(2) or ""
if key in variables:
return str(variables[key])
if key in os.environ:
return os.environ[key]
return match.group(0)
return pattern.sub(_replace, text)
def _resolve_template_value(expr: str, context: Dict[str, Any]) -> Any:
if expr.startswith("env."):
env_key = expr.split(".", 1)[1]

View File

@@ -17,7 +17,6 @@ class Profile:
"""A bootstrap profile from the manifest."""
name: str
os: str
arch: Optional[str]
hostname: Optional[str]
locale: Optional[str]
shell: Optional[str]

View File

@@ -1,20 +1,15 @@
"""Bootstrap setup modules -- each produces shell commands."""
"""Bootstrap setup modules -- each produces shell commands.
Each module is a small frozen dataclass with a describe() and plan().
There is no shared base; callers know which concrete module they have.
"""
import shlex
from dataclasses import dataclass
class SetupModule:
"""Base for setup modules."""
def describe(self) -> str:
raise NotImplementedError
def plan(self) -> list[str]:
raise NotImplementedError
@dataclass(frozen=True)
class HostnameModule(SetupModule):
class HostnameModule:
hostname: str
def describe(self) -> str:
@@ -29,7 +24,7 @@ class HostnameModule(SetupModule):
@dataclass(frozen=True)
class LocaleModule(SetupModule):
class LocaleModule:
locale: str
def describe(self) -> str:
@@ -44,7 +39,7 @@ class LocaleModule(SetupModule):
@dataclass(frozen=True)
class ShellModule(SetupModule):
class ShellModule:
shell: str
def describe(self) -> str:
@@ -58,7 +53,7 @@ class ShellModule(SetupModule):
@dataclass(frozen=True)
class SSHKeygenModule(SetupModule):
class SSHKeygenModule:
keys: list[dict[str, str]]
def describe(self) -> str:
@@ -78,7 +73,7 @@ class SSHKeygenModule(SetupModule):
@dataclass(frozen=True)
class RuncmdModule(SetupModule):
class RuncmdModule:
commands: list[str]
def describe(self) -> str:

View File

@@ -45,7 +45,6 @@ def parse_profile(name: str, raw: dict[str, Any]) -> Profile:
return Profile(
name=name,
os=raw["os"],
arch=raw.get("arch"),
hostname=raw.get("hostname"),
locale=raw.get("locale"),
shell=raw.get("shell"),

View File

@@ -90,9 +90,10 @@ class LinkedState:
@classmethod
def from_dict(cls, data: dict) -> "LinkedState":
from flow.core.errors import ConfigError
version = data.get("version")
if version is not None and version != 2:
from flow.core.errors import ConfigError
raise ConfigError(
f"Unsupported linked.json version {version}. "
"Delete ~/.local/state/flow/linked.json and re-run `flow dotfiles link`."
@@ -101,13 +102,19 @@ class LinkedState:
raw_links = data.get("links", {})
for package, pkg_links in raw_links.items():
for target_str, info in pkg_links.items():
links[Path(target_str)] = LinkTarget(
source=Path(info["source"]),
target=Path(target_str),
package=str(package),
from_module=bool(info.get("from_module", False)),
needs_sudo=bool(info.get("needs_sudo", False)),
)
try:
links[Path(target_str)] = LinkTarget(
source=Path(info["source"]),
target=Path(target_str),
package=str(package),
from_module=bool(info["from_module"]),
needs_sudo=bool(info["needs_sudo"]),
)
except KeyError as e:
raise ConfigError(
f"Corrupt linked.json: entry for {target_str!r} "
f"missing field {e.args[0]!r}"
) from e
return cls(links=links)

View File

@@ -56,10 +56,12 @@ def plan_link(
cur = current.links[target]
fs_state = filesystem_check(target)
# Broken symlink: remove and recreate
# Broken symlink: remove and recreate. remove_link ops never
# carry a source — _apply_plan derives expected_source from
# current.links when needed.
if fs_state == "broken_symlink":
ops.append(LinkOp(
type="remove_link", target=target, source=cur.source,
type="remove_link", target=target, source=None,
package=cur.package, needs_sudo=cur.needs_sudo,
))
ops.append(LinkOp(
@@ -80,7 +82,7 @@ def plan_link(
# Source changed: remove old link, create new one
ops.append(LinkOp(
type="remove_link", target=target, source=cur.source,
type="remove_link", target=target, source=None,
package=cur.package, needs_sudo=cur.needs_sudo,
))
ops.append(LinkOp(
@@ -149,7 +151,8 @@ def plan_unlink(
current: LinkedState,
packages: Optional[list[str]],
) -> LinkPlan:
"""Plan removal of managed links."""
"""Plan removal of managed links. remove_link ops never carry a source —
_apply_plan derives expected_source from current.links."""
ops: list[LinkOp] = []
for target in sorted(current.links.keys()):
@@ -161,7 +164,7 @@ def plan_unlink(
continue
ops.append(LinkOp(
type="remove_link", target=target, source=lt.source,
type="remove_link", target=target, source=None,
package=lt.package, needs_sudo=lt.needs_sudo,
))

View File

@@ -66,7 +66,10 @@ def _resolve_target(rel: Path, home: Path, skip: set[str]) -> tuple[Path | None,
if RESERVED_ROOT in skip:
return None, False
if len(parts) < 2:
return None, False
raise PlanConflict(
f"Invalid {RESERVED_ROOT!r} entry",
[f"{rel}: {RESERVED_ROOT} requires at least one path component"],
)
return Path("/") / Path(*parts[1:]), True
return home / rel, False

View File

@@ -65,7 +65,6 @@ class PackagePlan:
install_ops: tuple[PkgInstallOp, ...]
remove_ops: tuple[PkgRemoveOp, ...]
pm_update_needed: bool
pm_command: Optional[str]
@dataclass

View File

@@ -11,7 +11,6 @@ from flow.domain.packages.models import (
PkgRemoveOp,
)
from flow.domain.packages.resolution import (
pm_install_command,
resolve_binary_asset,
resolve_download_url,
resolve_source_name,
@@ -68,13 +67,10 @@ def plan_install(
else:
raise FlowError(f"Unsupported package type: {pkg.type}")
pm_cmd = pm_install_command(pm, pm_packages) if pm and pm_packages else None
return PackagePlan(
install_ops=tuple(install_ops),
remove_ops=(),
pm_update_needed=bool(pm_packages or any(op.method == "cask" for op in install_ops)),
pm_command=pm_cmd,
)
@@ -96,5 +92,4 @@ def plan_remove(
install_ops=(),
remove_ops=tuple(remove_ops),
pm_update_needed=False,
pm_command=None,
)

View File

@@ -8,7 +8,7 @@ from typing import Optional
from flow.core.config import FlowContext
from flow.core.errors import FlowError
from flow.domain.bootstrap.models import VALID_PHASES, BootstrapAction
from flow.domain.bootstrap.models import BootstrapAction
from flow.domain.bootstrap.planning import parse_profile, plan_bootstrap
@@ -78,10 +78,12 @@ class BootstrapService:
def _execute_action(
self, action: BootstrapAction, plan, dotfiles_profile: str,
) -> None:
"""Execute a single bootstrap action by phase."""
if action.phase not in VALID_PHASES:
raise FlowError(f"Unknown bootstrap phase: {action.phase!r}")
"""Execute a single bootstrap action by phase.
Exhaustive over VALID_PHASES: the trailing raise is the single
source of truth for unhandled phases. Adding a phase to VALID_PHASES
without a handler here surfaces loudly at runtime.
"""
if action.phase == "packages":
from flow.services.packages import PackageService
pkg_svc = PackageService(self.ctx)

View File

@@ -592,8 +592,8 @@ class DotfilesService:
sudo=op.needs_sudo,
runner=self.ctx.runtime.runner if op.needs_sudo else None,
)
link_target = next(target for target in targets if target.target == op.target)
new_state.links[op.target] = link_target
spec = next(t for t in targets if t.target == op.target)
new_state.links[op.target] = spec
elif op.type == "remove_link":
expected = (
current.links[op.target].source

View File

@@ -321,7 +321,10 @@ class PackageService:
)
destination_root = self._install_destination(section)
stripped_path = self._strip_prefix(declared_path, self._install_strip_prefix(section))
stripped_path = self._strip_prefix(
declared_path, self._install_strip_prefix(section),
package_name, section,
)
destination = destination_root / stripped_path
if source.is_dir():
@@ -362,11 +365,22 @@ class PackageService:
raise FlowError(f"Unsupported install section: {section}")
return prefixes[section]
def _strip_prefix(self, path: Path, prefix: Path) -> Path:
def _strip_prefix(self, path: Path, prefix: Path, package_name: str, section: str) -> Path:
"""Strip the section prefix from a declared install path.
Declared paths must start with the section's expected prefix
(e.g. `bin/foo` for the `bin` section, `share/man/...` for `man`).
A mismatch is almost always a manifest authoring error (e.g.
`etc/foo` listed under `install.bin`); raise rather than silently
nesting the file at an unexpected destination.
"""
try:
return path.relative_to(prefix)
except ValueError:
return path
except ValueError as e:
raise FlowError(
f"Install path for '{package_name}' in section "
f"'{section}' must start with {str(prefix)!r}: {path}"
) from e
def _validate_install_path(self, package_name: str, declared_path: Path) -> None:
if declared_path.is_absolute():

View File

@@ -53,6 +53,9 @@ def test_dotfiles_init_and_link_in_container():
# Run flow inside the container against the mounted example repo.
# `flow dotfiles init` clones, so we need a real git remote — turn
# the read-only example mount into a bare-ish working repo first.
# --skip system avoids the _root/ paths which would try to sudo-link
# over /etc/hostname; we already cover the link path on non-system
# packages.
script = (
"set -eux; "
"cp -r /example /home/flowuser/dotfiles-src; "
@@ -62,7 +65,14 @@ def test_dotfiles_init_and_link_in_container():
"git -c user.email=e2e@example.com -c user.name=e2e commit -q -m initial; "
"cd /home/flowuser; "
"flow dotfiles init --repo /home/flowuser/dotfiles-src; "
"flow dotfiles link --profile linux-auto --dry-run; "
"flow dotfiles link --profile linux-auto --skip system; "
# Verify real symlinks were created and point into the dotfiles dir.
"test -L /home/flowuser/.zshrc; "
"test -L /home/flowuser/.gitconfig; "
"readlink /home/flowuser/.zshrc | grep -q '/dotfiles/_shared/zsh/.zshrc'; "
"readlink /home/flowuser/.gitconfig | grep -q '/dotfiles/_shared/git/.gitconfig'; "
# Idempotency: rerun should be a no-op.
"flow dotfiles link --profile linux-auto --skip system; "
"flow dotfiles status"
)
result = subprocess.run(

View File

@@ -1,7 +1,5 @@
"""Tests for flow.core.config."""
from pathlib import Path
import pytest
from flow.core.config import AppConfig, load_config, load_manifest

View File

@@ -1,7 +1,5 @@
"""Tests for flow.core.platform."""
import os
import pytest
from flow.core.platform import PlatformInfo, detect_context, detect_platform

View File

@@ -2,7 +2,6 @@
import json
import os
from pathlib import Path
import pytest
@@ -25,11 +24,6 @@ class TestFileSystem:
fs.write_text(path, "hello")
assert fs.read_text(path) == "hello"
def test_read_text_default(self, tmp_path):
fs = FileSystem()
path = tmp_path / "missing.txt"
assert fs.read_text(path, default="fallback") == "fallback"
def test_write_and_read_json(self, tmp_path):
fs = FileSystem()
path = tmp_path / "data.json"

View File

@@ -1,7 +1,5 @@
"""Tests for flow.core.yaml."""
from pathlib import Path
import pytest
from flow.core.errors import ConfigError

View File

@@ -5,7 +5,7 @@ import inspect
import pytest
from flow.core.errors import ConfigError
from flow.domain.bootstrap.models import BootstrapAction, Profile
from flow.domain.bootstrap.models import Profile
from flow.domain.bootstrap.planning import parse_profile, plan_bootstrap
@@ -65,7 +65,7 @@ class TestPlanBootstrap:
def test_basic_plan(self):
profile = Profile(
name="test", os="linux", arch=None,
name="test", os="linux",
hostname="my-host", locale="en_US.UTF-8",
shell="zsh", ssh_keys=[], runcmd=[],
packages=["fd"], env_required=[],
@@ -81,7 +81,7 @@ class TestPlanBootstrap:
def test_missing_env_raises(self):
profile = Profile(
name="test", os="linux", arch=None,
name="test", os="linux",
hostname=None, locale=None, shell=None,
ssh_keys=[], runcmd=[], packages=[],
env_required=["REQUIRED_VAR"],
@@ -91,7 +91,7 @@ class TestPlanBootstrap:
def test_runcmd_produces_action(self):
profile = Profile(
name="test", os="linux", arch=None,
name="test", os="linux",
hostname=None, locale=None, shell=None,
ssh_keys=[], runcmd=["echo hello", "echo world"],
packages=[], env_required=[],
@@ -102,7 +102,7 @@ class TestPlanBootstrap:
def test_post_link_produces_action(self):
profile = Profile(
name="test", os="linux", arch=None,
name="test", os="linux",
hostname=None, locale=None, shell=None,
ssh_keys=[], runcmd=[], packages=[], env_required=[],
post_link="echo done",
@@ -112,7 +112,7 @@ class TestPlanBootstrap:
def test_ssh_keys_action(self):
profile = Profile(
name="test", os="linux", arch=None,
name="test", os="linux",
hostname=None, locale=None, shell=None,
ssh_keys=[{"path": "~/.ssh/id", "type": "ed25519"}],
runcmd=[], packages=[], env_required=[],

View File

@@ -2,7 +2,7 @@
from pathlib import Path
from flow.domain.containers.models import ContainerSpec, ImageRef, Mount
from flow.domain.containers.models import ImageRef, Mount
from flow.domain.containers.resolution import (
build_container_spec,
container_name,

View File

@@ -2,14 +2,14 @@
from pathlib import Path
import pytest
from flow.core.errors import ConfigError
from flow.domain.dotfiles.models import (
LinkOp,
LinkPlan,
LinkTarget,
LinkedState,
ModuleRef,
Package,
PlanSummary,
)
@@ -45,3 +45,25 @@ def test_package_has_id():
pkg = Package(name="zsh", layer="_shared", package_id="_shared/zsh",
source_dir=Path("/dots/_shared/zsh"), module=None, local_files=())
assert pkg.package_id == "_shared/zsh"
def test_linked_state_corrupt_missing_field_raises():
data = {
"version": 2,
"links": {
"_shared/zsh": {
"/home/u/.zshrc": {
# missing "from_module"
"source": "/dots/_shared/zsh/.zshrc",
"needs_sudo": False,
},
},
},
}
with pytest.raises(ConfigError, match="from_module"):
LinkedState.from_dict(data)
def test_linked_state_unsupported_version_raises():
with pytest.raises(ConfigError, match="version 1"):
LinkedState.from_dict({"version": 1, "links": {}})

View File

@@ -4,7 +4,6 @@ from pathlib import Path
from typing import Optional
from flow.domain.dotfiles.models import (
LinkOp,
LinkTarget,
LinkedState,
)

View File

@@ -5,7 +5,7 @@ from pathlib import Path
import pytest
from flow.core.errors import PlanConflict
from flow.domain.dotfiles.models import LinkTarget, ModuleRef, Package
from flow.domain.dotfiles.models import ModuleRef, Package
from flow.domain.dotfiles.resolution import resolve_all_targets, resolve_package_targets
RESERVED_ROOT = "_root"
@@ -56,6 +56,15 @@ class TestResolvePackageTargets:
targets = resolve_package_targets(pkg, HOME, {"_root"})
assert len(targets) == 0
def test_bare_root_marker_raises(self):
"""A package containing a file literally named `_root` (no children)
should be rejected, not silently dropped."""
pkg = _pkg("bad", files=[
(Path("/dots/_shared/bad/_root"), Path("_root")),
])
with pytest.raises(PlanConflict, match="_root"):
resolve_package_targets(pkg, HOME, set())
def test_skip_package_by_name(self):
pkg = _pkg("nvim", files=[
(Path("/dots/_shared/nvim/.config/nvim/init.lua"), Path(".config/nvim/init.lua")),

View File

@@ -1,12 +1,8 @@
"""Tests for packages catalog and resolution."""
import pytest
from flow.core.errors import ConfigError, FlowError
from flow.domain.packages.catalog import normalize_profile_entry, parse_catalog
from flow.domain.packages.planning import plan_install
from flow.domain.packages.resolution import (
binary_template_context,
detect_package_manager,
pm_cask_install_command,
pm_install_command,

View File

@@ -4,7 +4,7 @@ import pytest
from flow.core.config import TargetConfig
from flow.core.errors import FlowError
from flow.domain.remote.models import SSHCommand, Target
from flow.domain.remote.models import Target
from flow.domain.remote.resolution import (
build_ssh_command,
list_targets,

View File

@@ -96,21 +96,21 @@ class TestBootstrapService:
def test_unknown_phase_raises(self):
from flow.domain.bootstrap.models import BootstrapAction, BootstrapPlan
from flow.domain.bootstrap.models import VALID_PHASES
manifest = {"profiles": {"work": {"os": "linux"}}}
ctx = _make_ctx(manifest)
svc = BootstrapService(ctx)
# Forge an action with a phase that VALID_PHASES contains but the
# dispatch can't handle (shouldn't happen, but tests the explicit guard).
# Use a phase NOT in VALID_PHASES first to confirm the "Unknown" branch.
# Forge an action with a phase the dispatcher doesn't handle.
# The trailing raise in _execute_action is the single source of
# truth for unhandled phases — adding a phase to VALID_PHASES
# without a handler should surface here.
action = BootstrapAction.__new__(BootstrapAction)
object.__setattr__(action, "phase", "no-such-phase")
object.__setattr__(action, "description", "")
object.__setattr__(action, "commands", ())
object.__setattr__(action, "needs_sudo", False)
plan = BootstrapPlan(profile="work", actions=(), packages_to_install=())
with pytest.raises(FlowError, match="Unknown bootstrap phase"):
with pytest.raises(FlowError, match="Unhandled bootstrap phase"):
svc._execute_action(action, plan, "work")
def test_run_uses_dotfiles_profile_override(self, monkeypatch):

View File

@@ -693,3 +693,72 @@ class TestStatePersistsAtomically:
assert residue == []
# Final content is valid JSON.
json.loads(state_path.read_text())
class TestDotfilesServiceRootPaths:
"""`_root/` paths require sudo; verify the service routes them via the
sudo branch of FileSystem.create_symlink (without actually invoking sudo)."""
def test_root_paths_route_via_sudo(self, tmp_path, monkeypatch):
home = tmp_path / "home"
home.mkdir()
dotfiles = tmp_path / "dotfiles"
pkg_dir = dotfiles / "_shared" / "system" / "_root" / "etc"
pkg_dir.mkdir(parents=True)
(pkg_dir / "ourfile").write_text("managed by flow")
monkeypatch.setattr(paths, "HOME", home)
monkeypatch.setattr(paths, "DOTFILES_DIR", dotfiles)
monkeypatch.setattr(paths, "MODULES_DIR", tmp_path / "modules")
monkeypatch.setattr(paths, "LINKED_STATE", tmp_path / "state" / "linked.json")
# Replace the FS layer with one that records sudo calls instead of
# actually invoking sudo. We still want create_symlink's pre-check
# to run, so we patch only the sudo branch's runner.
runner = FakeRunner()
ctx = _make_ctx(tmp_path)
ctx.runtime.runner = runner
svc = DotfilesService(ctx)
# Plan first to inspect the operations -- a _root entry must carry
# needs_sudo=True so create_symlink takes the sudo branch.
packages = svc._discover_packages(profile=None)
assert any(
p.local_files and any("_root" in str(rel) for _, rel in p.local_files)
for p in packages
)
from flow.domain.dotfiles.resolution import resolve_all_targets
targets = resolve_all_targets(packages, home, set())
assert any(t.needs_sudo and t.target == Path("/etc/ourfile") for t in targets)
# Running link() against a real /etc would require root; instead
# confirm that with --dry-run the plan surfaces the sudo op without
# any FS mutation.
svc.link(dry_run=True)
assert not Path("/etc/ourfile").exists() # we did not actually touch /etc
def test_root_paths_can_be_skipped(self, tmp_path, monkeypatch):
home = tmp_path / "home"
home.mkdir()
dotfiles = tmp_path / "dotfiles"
pkg_dir = dotfiles / "_shared" / "system" / "_root" / "etc"
pkg_dir.mkdir(parents=True)
(pkg_dir / "hostname").write_text("flow-host")
# Non-root file in the same package shouldn't be skipped
(dotfiles / "_shared" / "system" / "README").write_text("notes")
monkeypatch.setattr(paths, "HOME", home)
monkeypatch.setattr(paths, "DOTFILES_DIR", dotfiles)
monkeypatch.setattr(paths, "MODULES_DIR", tmp_path / "modules")
monkeypatch.setattr(paths, "LINKED_STATE", tmp_path / "state" / "linked.json")
ctx = _make_ctx(tmp_path)
svc = DotfilesService(ctx)
svc.link(skip={"_root"})
assert not Path("/etc/hostname").exists() or (home / "etc" / "hostname").is_symlink() is False
# README is not under _root, so it should be linked
assert (home / "README").is_symlink()

View File

@@ -1,7 +1,6 @@
"""Tests for ProjectService."""
import subprocess
from pathlib import Path
from flow.core.config import AppConfig, FlowContext
from flow.core.console import Console

View File

@@ -5,25 +5,7 @@ import os
import pytest
from flow.core.errors import ConfigError
from flow.core.template import substitute, substitute_template
class TestSubstitute:
def test_replaces_dollar_var(self):
assert substitute("hello $NAME", {"NAME": "world"}) == "hello world"
def test_replaces_braced_var(self):
assert substitute("hello ${NAME}", {"NAME": "world"}) == "hello world"
def test_falls_back_to_env(self, monkeypatch):
monkeypatch.setenv("FOO", "bar")
assert substitute("$FOO", {}) == "bar"
def test_preserves_unknown_vars(self):
assert substitute("$UNKNOWN", {}) == "$UNKNOWN"
def test_non_string_passthrough(self):
assert substitute(42, {}) == 42
from flow.core.template import substitute_template
class TestSubstituteTemplate: