refactor: fail loud, tighten types, remove speculative abstraction

Fail loud at the boundary:
- substitute_template raises ConfigError on unresolved {{...}}; no more
  silent literal placeholders in download URLs.
- parse_profile raises ConfigError when 'os' is missing -- no
  raw.get("os", "linux") default that silently masks typos.
- urllib download failures wrapped to FlowError.
- bootstrap _execute_action dispatches phases explicitly and raises
  on unhandled phase; no more "anything else runs as shell".

Direct access over defensive wrapping:
- plan_bootstrap requires env; plan_install requires pm. Drop the
  dead `or os.environ` / `or detect_package_manager()` fallbacks.
- InstalledState.from_dict raises ConfigError on missing fields
  rather than .get(..., default).
- Replace `x or {}` chains with explicit `x if x is not None else {}`
  in package resolution; catalog validates type/platform-map/install
  shapes at parse.

One canonical form / direct access:
- Path.home() replaced with paths.HOME in services/packages.py and
  commands/completion.py. paths.HOME is the single source now.
- Use Path.is_relative_to for install-path containment instead of
  str.startswith.

Domain purity:
- domain/containers/resolution.resolve_mounts takes a filesystem_check
  predicate; service passes the probe in. Domain no longer touches
  the filesystem directly.

No speculative abstraction:
- Drop the `allow_sudo` field entirely. The _script_uses_sudo check
  it gated was bypassable (substring match) and gave false confidence;
  the manifest is fully user-trusted anyway.
- Delete dead terminfo_fix_command + RemoteService.fix_terminfo
  (no command surface exposes them).
- FileSystem.remove_tree no longer swallows errors via ignore_errors;
  callers opt into missing_ok if needed.

Typed enums:
- PackageDef.type, AppConfig.container_runtime as Literal[...].
  container_runtime values validated at config parse.

Completion bypasses runtime no longer:
- complete(ctx, ...) threads context; ContainerRuntime and state-file
  reads go through ctx.runtime instead of constructing primitives.

Tests added for: template raise, missing os raise, env/pm required,
unknown phase raise, no allow_sudo gate, URL download failure, install
path escape, corrupt installed.json, container_runtime Literal,
filesystem_check controls mounts.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-14 00:02:06 +03:00
parent c0e2758057
commit a71742afee
26 changed files with 429 additions and 214 deletions

View File

@@ -3,16 +3,13 @@
from __future__ import annotations
import argparse
import json
import subprocess
from pathlib import Path
from typing import Sequence
from flow.core.config import load_config, load_manifest
from flow.core.config import FlowContext, load_config, load_manifest
from flow.core import paths
from flow.core.containers import ContainerRuntime
from flow.core.errors import FlowError
from flow.core.runtime import CommandRunner
from flow.domain.remote.resolution import HOST_TEMPLATES
ZSH_RC_START = "# >>> flow completion >>>"
@@ -52,27 +49,28 @@ def register(subparsers):
parser.set_defaults(handler=_run_zsh_script)
def complete(words: Sequence[str], cword: int) -> list[str]:
def complete(ctx: FlowContext, words: Sequence[str], cword: int) -> list[str]:
before, current = _split_words(words, cword)
if not before:
return _filter(TOP_LEVEL_COMMANDS + ["-h", "--help", "--version"], current)
command = _canonical_command(before[0])
completers = {
"enter": _complete_remote,
"remote": _complete_remote,
"dev": _complete_dev,
"dotfiles": _complete_dotfiles,
"setup": _complete_setup,
"packages": _complete_packages,
"projects": _complete_projects,
"completion": _complete_completion,
}
handler = completers.get(command)
if handler is None:
return []
return handler(before, current)
if command == "enter" or command == "remote":
return _complete_remote(before, current)
if command == "dev":
return _complete_dev(ctx, before, current)
if command == "dotfiles":
return _complete_dotfiles(before, current)
if command == "setup":
return _complete_setup(before, current)
if command == "packages":
return _complete_packages(ctx, before, current)
if command == "projects":
return _complete_projects(before, current)
if command == "completion":
return _complete_completion(before, current)
return []
def _split_words(words: Sequence[str], cword: int) -> tuple[list[str], str]:
@@ -152,11 +150,8 @@ def _list_manifest_packages() -> list[str]:
return sorted(names)
def _list_installed_packages() -> list[str]:
if not paths.INSTALLED_STATE.exists():
return []
with open(paths.INSTALLED_STATE, encoding="utf-8") as handle:
state = json.load(handle)
def _list_installed_packages(ctx: FlowContext) -> list[str]:
state = ctx.runtime.fs.read_json(paths.INSTALLED_STATE, default={})
packages = state.get("packages", {}) if isinstance(state, dict) else {}
return sorted(packages.keys()) if isinstance(packages, dict) else []
@@ -204,11 +199,9 @@ def _list_dotfiles_packages(profile: str | None = None) -> list[str]:
return sorted(names)
def _list_container_names() -> list[str]:
def _list_container_names(ctx: FlowContext) -> list[str]:
try:
config = _config()
rt = ContainerRuntime(CommandRunner(), mode=config.container_runtime)
output = rt.ps(
output = ctx.runtime.containers.ps(
all=True,
filter="label=dev=true",
format='{{.Label "dev.name"}}',
@@ -256,7 +249,7 @@ def _complete_remote(before: Sequence[str], current: str) -> list[str]:
return _filter(_list_targets(), current)
def _complete_dev(before: Sequence[str], current: str) -> list[str]:
def _complete_dev(ctx: FlowContext, before: Sequence[str], current: str) -> list[str]:
if len(before) <= 1:
return _filter(
["create", "attach", "connect", "exec", "enter", "list", "stop", "remove", "rm", "respawn"],
@@ -274,7 +267,7 @@ def _complete_dev(before: Sequence[str], current: str) -> list[str]:
return _filter(options.get(subcommand, []), current)
non_option = [token for token in before[2:] if not token.startswith("-")]
if not non_option:
return _filter(_list_container_names(), current)
return _filter(_list_container_names(ctx), current)
return []
if subcommand == "create":
@@ -346,7 +339,7 @@ def _complete_setup(before: Sequence[str], current: str) -> list[str]:
return []
def _complete_packages(before: Sequence[str], current: str) -> list[str]:
def _complete_packages(ctx: FlowContext, before: Sequence[str], current: str) -> list[str]:
if len(before) <= 1:
return _filter(["install", "remove", "list"], current)
@@ -361,7 +354,7 @@ def _complete_packages(before: Sequence[str], current: str) -> list[str]:
if subcommand == "remove":
if current.startswith("-"):
return []
return _filter(_list_installed_packages(), current)
return _filter(_list_installed_packages(ctx), current)
if subcommand == "list":
return _filter(["--all"], current) if current.startswith("-") else []
@@ -385,8 +378,8 @@ def _complete_completion(before: Sequence[str], current: str) -> list[str]:
return []
def _run_zsh_complete(_ctx, args):
for item in complete(args.words, args.cword):
def _run_zsh_complete(ctx, args):
for item in complete(ctx, args.words, args.cword):
print(item)
@@ -467,7 +460,7 @@ def _zsh_rc_snippet(completions_dir: Path) -> str:
def _zsh_dir_for_rc(path: Path) -> str:
home = Path.home().resolve()
home = paths.HOME.resolve()
resolved = path.expanduser().resolve()
try:
rel = resolved.relative_to(home)

View File

@@ -4,14 +4,21 @@ from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Optional
from typing import Any, Literal, Optional
from flow.core import paths
from flow.core.console import Console
from flow.core.errors import ConfigError
from flow.core.platform import PlatformInfo
from flow.core.runtime import SystemRuntime
ContainerRuntimeMode = Literal["auto", "docker", "podman", "podman-rootful"]
_VALID_CONTAINER_RUNTIMES: tuple[ContainerRuntimeMode, ...] = (
"auto", "docker", "podman", "podman-rootful",
)
@dataclass(frozen=True)
class TargetConfig:
namespace: str
@@ -26,7 +33,7 @@ class AppConfig:
dotfiles_branch: str = "main"
dotfiles_pull_before_edit: bool = True
projects_dir: str = "~/projects"
container_runtime: str = "auto"
container_runtime: ContainerRuntimeMode = "auto"
container_registry: str = "registry.tomastm.com"
container_tag: str = "latest"
tmux_session: str = "default"
@@ -85,12 +92,19 @@ def load_config(
pull_raw = _section_get(data, "repository", "pull-before-edit")
container_runtime_raw = str(_section_get(data, "defaults", "container-runtime", "auto"))
if container_runtime_raw not in _VALID_CONTAINER_RUNTIMES:
raise ConfigError(
f"Invalid container-runtime {container_runtime_raw!r}. "
f"Expected one of: {', '.join(_VALID_CONTAINER_RUNTIMES)}"
)
return AppConfig(
dotfiles_url=str(_section_get(data, "repository", "url", "")),
dotfiles_branch=str(_section_get(data, "repository", "branch", "main")),
dotfiles_pull_before_edit=as_bool(pull_raw) if pull_raw is not None else True,
projects_dir=str(_section_get(data, "paths", "projects", "~/projects")),
container_runtime=str(_section_get(data, "defaults", "container-runtime", "auto")),
container_runtime=container_runtime_raw, # type: ignore[arg-type]
container_registry=str(_section_get(data, "defaults", "container-registry", "registry.tomastm.com")),
container_tag=str(_section_get(data, "defaults", "container-tag", "latest")),
tmux_session=str(_section_get(data, "defaults", "tmux-session", "default")),

View File

@@ -4,6 +4,10 @@ import os
import re
from typing import Any, Dict
from flow.core.errors import ConfigError
_UNRESOLVED = object()
def substitute(text: Any, variables: Dict[str, str]) -> Any:
"""Replace $VAR and ${VAR} with values from variables dict or env."""
@@ -29,7 +33,9 @@ def _resolve_template_value(expr: str, context: Dict[str, Any]) -> Any:
env_ctx = context.get("env", {})
if isinstance(env_ctx, dict) and env_key in env_ctx:
return env_ctx[env_key]
return os.environ.get(env_key)
if env_key in os.environ:
return os.environ[env_key]
return _UNRESOLVED
if expr in context:
return context[expr]
@@ -37,22 +43,27 @@ def _resolve_template_value(expr: str, context: Dict[str, Any]) -> Any:
current: Any = context
for part in expr.split("."):
if not isinstance(current, dict) or part not in current:
return None
return _UNRESOLVED
current = current[part]
return current
def substitute_template(text: Any, context: Dict[str, Any]) -> Any:
"""Replace {{expr}} placeholders with values from context dict."""
"""Replace {{expr}} placeholders with values from context dict.
Raises ConfigError if a placeholder cannot be resolved.
"""
if not isinstance(text, str):
return text
def _replace(match: re.Match[str]) -> str:
key = match.group(1).strip()
value = _resolve_template_value(key, context)
if value is None:
return match.group(0)
if value is _UNRESOLVED:
raise ConfigError(
f"Unresolved template variable: {match.group(0)}"
)
return str(value)
return re.sub(r"\{\{\s*([^{}]+?)\s*\}\}", _replace, text)

View File

@@ -1,7 +1,6 @@
"""Bootstrap planning -- builds ordered action list."""
import os
from typing import Any, Mapping, Optional
from typing import Any, Mapping
from flow.core.errors import ConfigError
from flow.domain.bootstrap.models import BootstrapAction, BootstrapPlan, Profile
@@ -38,11 +37,14 @@ def _normalize_ssh_keys(raw: Any) -> tuple[dict[str, str], ...]:
def parse_profile(name: str, raw: dict[str, Any]) -> Profile:
"""Parse a profile definition from manifest."""
if "os" not in raw:
raise ConfigError(f"Profile {name!r}: required field 'os' is missing")
ssh_keys = raw.get("ssh-keys")
return Profile(
name=name,
os=raw.get("os", "linux"),
os=raw["os"],
arch=raw.get("arch"),
hostname=raw.get("hostname"),
locale=raw.get("locale"),
@@ -60,14 +62,13 @@ def plan_bootstrap(
profile: Profile,
manifest: dict[str, Any],
*,
env: Optional[Mapping[str, str]] = None,
env: Mapping[str, str],
) -> BootstrapPlan:
"""Build a complete bootstrap plan from a profile."""
actions: list[BootstrapAction] = []
environment = env or os.environ
# Phase 1: Validate required env vars
missing = [v for v in profile.env_required if not environment.get(v)]
missing = [v for v in profile.env_required if not env.get(v)]
if missing:
raise ConfigError(
f"Missing required environment variables for profile '{profile.name}': "

View File

@@ -1,7 +1,7 @@
"""Container resolution -- pure functions."""
from pathlib import Path
from typing import Optional
from typing import Callable, Optional
from flow.domain.containers.models import ContainerSpec, ImageRef, Mount
@@ -46,11 +46,16 @@ def container_name(name: str) -> str:
def resolve_mounts(
home: Path,
*,
filesystem_check: Callable[[Path], bool],
project_path: Optional[str] = None,
dotfiles_dir: Optional[Path] = None,
socket_path: Optional[Path] = None,
) -> list[Mount]:
"""Resolve standard container mounts."""
"""Resolve standard container mounts.
``filesystem_check`` is a callable returning whether a given path
exists -- injected so this function is pure.
"""
mounts: list[Mount] = []
if project_path:
@@ -63,13 +68,13 @@ def resolve_mounts(
(home / ".npm", "/home/dev/.npm", False),
]
for source, target, readonly in standard_mounts:
if source.exists():
if filesystem_check(source):
mounts.append(Mount(source=source, target=target, readonly=readonly))
if socket_path:
mounts.append(Mount(source=socket_path, target="/var/run/docker.sock"))
if dotfiles_dir and dotfiles_dir.exists():
if dotfiles_dir and filesystem_check(dotfiles_dir):
mounts.append(
Mount(
source=dotfiles_dir,

View File

@@ -25,7 +25,7 @@ def parse_catalog(manifest: dict[str, Any]) -> dict[str, PackageDef]:
name=name, type="pkg", sources={},
source=None, version=None, asset_pattern=None,
platform_map={}, extract_dir=None, install={},
post_install=None, allow_sudo=False,
post_install=None,
)
catalog[pkg.name] = pkg
@@ -38,6 +38,11 @@ def _parse_package_entry(entry: dict[str, Any]) -> PackageDef:
raise ConfigError("Package entry missing 'name'")
pkg_type = entry.get("type", "pkg")
if pkg_type not in ("pkg", "binary", "appimage", "cask"):
raise ConfigError(
f"Package {name!r}: invalid type {pkg_type!r}. "
f"Expected one of: pkg, binary, appimage, cask"
)
sources: dict[str, str] = {}
# Parse sources from various keys
@@ -48,6 +53,18 @@ def _parse_package_entry(entry: dict[str, Any]) -> PackageDef:
if "sources" in entry and isinstance(entry["sources"], dict):
sources.update(entry["sources"])
platform_map = entry.get("platform-map")
if platform_map is not None and not isinstance(platform_map, dict):
raise ConfigError(
f"Package {name!r}: 'platform-map' must be a mapping, got {type(platform_map).__name__}"
)
install = entry.get("install")
if install is not None and not isinstance(install, dict):
raise ConfigError(
f"Package {name!r}: 'install' must be a mapping, got {type(install).__name__}"
)
return PackageDef(
name=name,
type=pkg_type,
@@ -55,11 +72,10 @@ def _parse_package_entry(entry: dict[str, Any]) -> PackageDef:
source=entry.get("source"),
version=entry.get("version"),
asset_pattern=entry.get("asset-pattern"),
platform_map=entry.get("platform-map") or {},
platform_map=platform_map if platform_map is not None else {},
extract_dir=entry.get("extract-dir"),
install=entry.get("install") or {},
install=install if install is not None else {},
post_install=entry.get("post-install"),
allow_sudo=bool(entry.get("allow-sudo", False)),
)
@@ -79,7 +95,6 @@ def normalize_profile_entry(entry: Any) -> ProfilePackageRef:
extract_dir=None,
install=None,
post_install=None,
allow_sudo=None,
)
return ProfilePackageRef(
name=entry,
@@ -91,7 +106,6 @@ def normalize_profile_entry(entry: Any) -> ProfilePackageRef:
extract_dir=None,
install=None,
post_install=None,
allow_sudo=None,
)
if isinstance(entry, dict):
@@ -106,7 +120,6 @@ def normalize_profile_entry(entry: Any) -> ProfilePackageRef:
extract_dir=entry.get("extract-dir"),
install=entry.get("install"),
post_install=entry.get("post-install"),
allow_sudo=entry.get("allow-sudo"),
)
raise ConfigError(f"Invalid profile package entry: {entry}")

View File

@@ -2,14 +2,14 @@
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Optional
from typing import Any, Literal, Optional
@dataclass(frozen=True)
class PackageDef:
"""A package definition from the manifest."""
name: str
type: str # "pkg" | "binary" | "appimage" | "cask"
type: Literal["pkg", "binary", "appimage", "cask"]
sources: dict[str, str] # pm_name -> package_name
source: Optional[str] # direct URL or github shorthand
version: Optional[str]
@@ -18,7 +18,6 @@ class PackageDef:
extract_dir: Optional[str]
install: dict[str, Any] # install config overrides
post_install: Optional[str]
allow_sudo: bool
@dataclass(frozen=True)
@@ -33,7 +32,6 @@ class ProfilePackageRef:
extract_dir: Optional[str] = None
install: Optional[dict[str, Any]] = None
post_install: Optional[str] = None
allow_sudo: Optional[bool] = None
@dataclass(frozen=True)
@@ -96,19 +94,25 @@ class InstalledState:
@classmethod
def from_dict(cls, data: dict) -> "InstalledState":
from flow.core.errors import ConfigError
version = data.get("version")
if version is not None and version != 1:
from flow.core.errors import ConfigError
raise ConfigError(
f"Unsupported installed.json version {version}. "
"Delete ~/.local/state/flow/installed.json and reinstall."
)
packages: dict[str, InstalledPackage] = {}
for name, info in data.get("packages", {}).items():
packages[name] = InstalledPackage(
name=name,
version=str(info.get("version", "")),
type=str(info.get("type", "")),
files=[Path(f) for f in info.get("files", [])],
)
try:
packages[name] = InstalledPackage(
name=name,
version=str(info["version"]),
type=str(info["type"]),
files=[Path(f) for f in info["files"]],
)
except KeyError as e:
raise ConfigError(
f"Corrupt installed.json: missing field {e.args[0]!r} for package {name!r}"
) from e
return cls(packages=packages)

View File

@@ -11,7 +11,6 @@ from flow.domain.packages.models import (
PkgRemoveOp,
)
from flow.domain.packages.resolution import (
detect_package_manager,
pm_install_command,
resolve_binary_asset,
resolve_download_url,
@@ -23,12 +22,13 @@ def plan_install(
packages: list[PackageDef],
installed: InstalledState,
platform_str: str,
pm: Optional[str] = None,
pm: Optional[str],
) -> PackagePlan:
"""Plan installation of packages."""
if pm is None:
pm = detect_package_manager()
"""Plan installation of packages.
``pm`` may be None if no package manager is available, but any ``pkg``
or ``cask`` package will then raise FlowError.
"""
install_ops: list[PkgInstallOp] = []
pm_packages: list[str] = []

View File

@@ -23,11 +23,10 @@ def resolve_spec(
source=ref.source,
version=ref.version,
asset_pattern=ref.asset_pattern,
platform_map=ref.platform_map or {},
platform_map=ref.platform_map if ref.platform_map is not None else {},
extract_dir=ref.extract_dir,
install=ref.install or {},
install=ref.install if ref.install is not None else {},
post_install=ref.post_install,
allow_sudo=bool(ref.allow_sudo),
)
# Merge: profile overrides catalog
@@ -38,11 +37,10 @@ def resolve_spec(
source=ref.source or base.source,
version=ref.version or base.version,
asset_pattern=ref.asset_pattern or base.asset_pattern,
platform_map=ref.platform_map or base.platform_map,
platform_map=ref.platform_map if ref.platform_map is not None else base.platform_map,
extract_dir=ref.extract_dir or base.extract_dir,
install=ref.install or base.install,
install=ref.install if ref.install is not None else base.install,
post_install=ref.post_install or base.post_install,
allow_sudo=ref.allow_sudo if ref.allow_sudo is not None else base.allow_sudo,
)

View File

@@ -117,27 +117,6 @@ def build_destination(user: str, host: str) -> str:
return f"{user}@{host}"
def terminfo_fix_command(
term: Optional[str] = "xterm-256color",
destination: str = "TARGET",
) -> str:
normalized_term = (term or "").strip().lower()
if normalized_term == "xterm-ghostty":
return f"infocmp -x xterm-ghostty | ssh {destination} -- tic -x -"
if normalized_term == "wezterm":
return (
f"ssh {destination} -- sh -lc "
"'tempfile=$(mktemp) && curl -fsSL -o \"$tempfile\" "
"https://raw.githubusercontent.com/wezterm/wezterm/main/termwiz/data/wezterm.terminfo "
"&& tic -x -o ~/.terminfo \"$tempfile\" && rm \"$tempfile\"'"
)
fallback_term = normalized_term or "xterm-256color"
return f"infocmp -x {fallback_term} | ssh {destination} -- tic -x -"
def list_targets(targets: list[TargetConfig]) -> list[Target]:
"""Convert config targets to domain targets."""
return [

View File

@@ -69,10 +69,10 @@ class BootstrapService:
self.ctx.console.info("No profiles defined in manifest.")
return
rows = [
[name, data.get("os", "linux"), data.get("hostname", "-")]
for name, data in sorted(profiles.items())
]
rows = []
for name, data in sorted(profiles.items()):
parsed = parse_profile(name, data)
rows.append([name, parsed.os, parsed.hostname or "-"])
self.ctx.console.table(["PROFILE", "OS", "HOSTNAME"], rows)
def _execute_action(
@@ -95,6 +95,9 @@ class BootstrapService:
dot_svc.link(profile=dotfiles_profile)
return
# Shell phases: setup, shell, post-link
for cmd in action.commands:
self.ctx.runtime.runner.run_shell(cmd, check=True)
if action.phase in ("setup", "shell", "post-link"):
for cmd in action.commands:
self.ctx.runtime.runner.run_shell(cmd, check=True)
return
raise FlowError(f"Unhandled bootstrap phase: {action.phase!r}")

View File

@@ -41,6 +41,7 @@ class ContainerService:
),
resolve_mounts(
paths.HOME,
filesystem_check=lambda p: p.exists(),
project_path=project_path,
dotfiles_dir=paths.DOTFILES_DIR,
socket_path=self.rt.socket_path,

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
import os
import shutil
import tempfile
import urllib.error
import urllib.request
from pathlib import Path
from typing import Any, Optional
@@ -148,8 +149,11 @@ class PackageService:
extracted = tmp_dir / "extract"
self.ctx.console.info(f"Downloading {pkg.name}...")
with urllib.request.urlopen(url, timeout=60) as response:
self.ctx.runtime.fs.write_bytes(archive, response.read())
try:
with urllib.request.urlopen(url, timeout=60) as response:
self.ctx.runtime.fs.write_bytes(archive, response.read())
except urllib.error.URLError as e:
raise FlowError(f"Failed to download {url}: {e}") from e
self.ctx.runtime.fs.ensure_dir(extracted)
try:
@@ -201,7 +205,7 @@ class PackageService:
self, pkg: PackageDef, url: str, state: InstalledState,
) -> None:
"""Download and install an AppImage."""
bin_dir = Path.home() / ".local" / "bin"
bin_dir = paths.HOME / ".local" / "bin"
self.ctx.runtime.fs.ensure_dir(bin_dir)
target = bin_dir / pkg.name
@@ -307,7 +311,7 @@ class PackageService:
self._validate_install_path(package_name, declared_path)
source = (source_root / declared_path).resolve(strict=False)
if not str(source).startswith(str(source_root_resolved)):
if not source.is_relative_to(source_root_resolved):
raise FlowError(
f"Install path escapes extract-dir for '{package_name}': {declared_path}"
)
@@ -334,19 +338,14 @@ class PackageService:
return
script = substitute_template(pkg.post_install, self._binary_context(pkg))
if not pkg.allow_sudo and self._script_uses_sudo(script):
raise FlowError(
f"Package '{pkg.name}' post-install uses sudo but allow-sudo is false"
)
self.ctx.runtime.runner.run_shell(script, check=True)
def _install_destination(self, section: str) -> Path:
home = Path.home()
destinations = {
"bin": home / ".local" / "bin",
"share": home / ".local" / "share",
"man": home / ".local" / "share" / "man",
"lib": home / ".local" / "lib",
"bin": paths.HOME / ".local" / "bin",
"share": paths.HOME / ".local" / "share",
"man": paths.HOME / ".local" / "share" / "man",
"lib": paths.HOME / ".local" / "lib",
}
if section not in destinations:
raise FlowError(f"Unsupported install section: {section}")
@@ -378,10 +377,3 @@ class PackageService:
raise FlowError(
f"Install path for '{package_name}' must not include parent traversal: {declared_path}"
)
def _script_uses_sudo(self, script: str) -> bool:
for line in script.splitlines():
stripped = line.strip()
if stripped.startswith("sudo "):
return True
return False

View File

@@ -8,11 +8,9 @@ from typing import Optional
from flow.core.config import FlowContext
from flow.domain.remote.resolution import (
build_destination,
build_ssh_command,
list_targets,
resolve_target,
terminfo_fix_command,
)
@@ -70,15 +68,3 @@ class RemoteService:
for t in targets
]
self.ctx.console.table(["TARGET", "HOST", "IDENTITY"], rows)
def fix_terminfo(self, target_spec: str) -> None:
"""Show terminfo fix commands."""
target = resolve_target(
target_spec,
self.ctx.config.targets,
default_user=os.environ.get("USER") or getpass.getuser(),
)
destination = build_destination(target.user, target.host)
cmd = terminfo_fix_command(os.environ.get("TERM"), destination)
self.ctx.console.info("Run this command to fix terminfo:")
self.ctx.console.info(f" {cmd}")

View File

@@ -3,10 +3,24 @@
import subprocess
from flow.commands.completion import complete
from flow.core.config import AppConfig, FlowContext
from flow.core.console import Console
from flow.core.platform import PlatformInfo
from flow.core.runtime import SystemRuntime
def _make_ctx():
return FlowContext(
config=AppConfig(),
manifest={},
platform=PlatformInfo(),
console=Console(color=False),
runtime=SystemRuntime(),
)
def test_complete_top_level():
result = complete(["flow", ""], 1)
result = complete(_make_ctx(), ["flow", ""], 1)
assert "dotfiles" in result
assert "packages" in result
assert "setup" in result
@@ -16,12 +30,12 @@ def test_complete_top_level():
def test_complete_top_level_prefix():
result = complete(["flow", "do"], 1)
result = complete(_make_ctx(), ["flow", "do"], 1)
assert result == ["dotfiles"]
def test_complete_dotfiles_subcommands():
result = complete(["flow", "dotfiles", ""], 2)
result = complete(_make_ctx(), ["flow", "dotfiles", ""], 2)
assert "link" in result
assert "unlink" in result
assert "status" in result
@@ -36,7 +50,7 @@ def test_complete_dotfiles_subcommands():
def test_complete_dotfiles_repos_subcommands():
result = complete(["flow", "dotfiles", "repos", ""], 3)
result = complete(_make_ctx(), ["flow", "dotfiles", "repos", ""], 3)
assert "list" in result
assert "status" in result
assert "pull" in result
@@ -44,43 +58,41 @@ def test_complete_dotfiles_repos_subcommands():
def test_complete_dotfiles_repos_pull_flags():
result = complete(["flow", "dotfiles", "repos", "pull", "--"], 4)
result = complete(_make_ctx(), ["flow", "dotfiles", "repos", "pull", "--"], 4)
assert "--repo" in result
assert "--dry-run" in result
def test_complete_dotfiles_edit_packages():
result = complete(["flow", "dotfiles", "edit", "--"], 3)
result = complete(_make_ctx(), ["flow", "dotfiles", "edit", "--"], 3)
assert "--no-commit" in result
def test_complete_dotfiles_link_flags():
result = complete(["flow", "dotfiles", "link", "--"], 3)
result = complete(_make_ctx(), ["flow", "dotfiles", "link", "--"], 3)
assert "--profile" in result
assert "--dry-run" in result
def test_complete_unknown_command():
result = complete(["flow", "unknown", ""], 2)
result = complete(_make_ctx(), ["flow", "unknown", ""], 2)
assert result == []
def test_complete_packages_subcommands():
result = complete(["flow", "packages", ""], 2)
result = complete(_make_ctx(), ["flow", "packages", ""], 2)
assert "install" in result
assert "remove" in result
assert "list" in result
def test_complete_dev_attach_returns_empty_on_timeout(monkeypatch):
class FakeRuntime:
def __init__(self, runner, *, mode="auto"):
self.runner = runner
def test_complete_dev_attach_returns_empty_on_timeout():
ctx = _make_ctx()
def ps(self, **kwargs):
assert kwargs["timeout"] == 1.0
raise subprocess.TimeoutExpired("docker ps", kwargs["timeout"])
def fake_ps(**kwargs):
assert kwargs["timeout"] == 1.0
raise subprocess.TimeoutExpired("docker ps", kwargs["timeout"])
monkeypatch.setattr("flow.commands.completion.ContainerRuntime", FakeRuntime)
result = complete(["flow", "dev", "attach", ""], 3)
ctx.runtime.containers.ps = fake_ps # type: ignore[method-assign]
result = complete(ctx, ["flow", "dev", "attach", ""], 3)
assert result == []

View File

@@ -5,6 +5,7 @@ from pathlib import Path
import pytest
from flow.core.config import AppConfig, load_config, load_manifest
from flow.core.errors import ConfigError
def test_load_config_missing_path(tmp_path):
@@ -141,3 +142,22 @@ def test_load_config_container_runtime(tmp_path):
)
cfg = load_config(tmp_path)
assert cfg.container_runtime == "podman-rootful"
@pytest.mark.parametrize("runtime", ["auto", "docker", "podman", "podman-rootful"])
def test_load_config_container_runtime_accepts_known_values(tmp_path, runtime):
(tmp_path / "config.yaml").write_text(
"defaults:\n"
f" container-runtime: {runtime}\n"
)
cfg = load_config(tmp_path)
assert cfg.container_runtime == runtime
def test_load_config_container_runtime_rejects_unknown(tmp_path):
(tmp_path / "config.yaml").write_text(
"defaults:\n"
" container-runtime: nspawn\n"
)
with pytest.raises(ConfigError, match="Invalid container-runtime"):
load_config(tmp_path)

View File

@@ -1,5 +1,7 @@
"""Tests for bootstrap planning."""
import inspect
import pytest
from flow.core.errors import ConfigError
@@ -22,33 +24,45 @@ class TestParseProfile:
assert profile.shell == "zsh"
assert len(profile.packages) == 2
def test_defaults(self):
profile = parse_profile("minimal", {})
def test_missing_os_raises(self):
with pytest.raises(ConfigError, match=r"Profile 'minimal': required field 'os' is missing"):
parse_profile("minimal", {})
def test_optional_fields_default(self):
profile = parse_profile("minimal", {"os": "linux"})
assert profile.os == "linux"
assert profile.hostname is None
assert profile.packages == ()
def test_ssh_keys(self):
raw = {"ssh-keys": [{"path": "~/.ssh/id_ed25519", "type": "ed25519"}]}
raw = {"os": "linux", "ssh-keys": [{"path": "~/.ssh/id_ed25519", "type": "ed25519"}]}
profile = parse_profile("test", raw)
assert len(profile.ssh_keys) == 1
def test_ssh_keys_with_filename(self):
raw = {"ssh-keys": [{"filename": "id_work", "type": "ed25519"}]}
raw = {"os": "linux", "ssh-keys": [{"filename": "id_work", "type": "ed25519"}]}
profile = parse_profile("test", raw)
assert profile.ssh_keys[0]["path"] == "~/.ssh/id_work"
def test_env_required(self):
profile = parse_profile("test", {"env-required": ["USER_EMAIL"]})
profile = parse_profile("test", {"os": "linux", "env-required": ["USER_EMAIL"]})
assert profile.env_required == ("USER_EMAIL",)
def test_post_link_and_dotfiles_profile(self):
profile = parse_profile("test", {"dotfiles-profile": "linux-work", "post-link": "echo done"})
profile = parse_profile(
"test",
{"os": "linux", "dotfiles-profile": "linux-work", "post-link": "echo done"},
)
assert profile.dotfiles_profile == "linux-work"
assert profile.post_link == "echo done"
class TestPlanBootstrap:
def test_env_is_required_keyword(self):
sig = inspect.signature(plan_bootstrap)
param = sig.parameters["env"]
assert param.default is inspect.Parameter.empty
def test_basic_plan(self):
profile = Profile(
name="test", os="linux", arch=None,
@@ -57,7 +71,7 @@ class TestPlanBootstrap:
packages=["fd"], env_required=[],
)
manifest = {"packages": [{"name": "fd", "type": "pkg"}]}
plan = plan_bootstrap(profile, manifest)
plan = plan_bootstrap(profile, manifest, env={})
assert plan.profile == "test"
assert plan.total_steps > 0
phases = [a.phase for a in plan.actions]
@@ -65,8 +79,7 @@ class TestPlanBootstrap:
assert "packages" in phases
assert "dotfiles" in phases
def test_missing_env_raises(self, monkeypatch):
monkeypatch.delenv("REQUIRED_VAR", raising=False)
def test_missing_env_raises(self):
profile = Profile(
name="test", os="linux", arch=None,
hostname=None, locale=None, shell=None,
@@ -74,7 +87,7 @@ class TestPlanBootstrap:
env_required=["REQUIRED_VAR"],
)
with pytest.raises(ConfigError, match="REQUIRED_VAR"):
plan_bootstrap(profile, {})
plan_bootstrap(profile, {}, env={})
def test_runcmd_produces_action(self):
profile = Profile(
@@ -83,7 +96,7 @@ class TestPlanBootstrap:
ssh_keys=[], runcmd=["echo hello", "echo world"],
packages=[], env_required=[],
)
plan = plan_bootstrap(profile, {})
plan = plan_bootstrap(profile, {}, env={})
runcmd_actions = [a for a in plan.actions if "custom command" in a.description.lower()]
assert len(runcmd_actions) == 1
@@ -94,7 +107,7 @@ class TestPlanBootstrap:
ssh_keys=[], runcmd=[], packages=[], env_required=[],
post_link="echo done",
)
plan = plan_bootstrap(profile, {})
plan = plan_bootstrap(profile, {}, env={})
assert any(action.phase == "post-link" for action in plan.actions)
def test_ssh_keys_action(self):
@@ -104,6 +117,6 @@ class TestPlanBootstrap:
ssh_keys=[{"path": "~/.ssh/id", "type": "ed25519"}],
runcmd=[], packages=[], env_required=[],
)
plan = plan_bootstrap(profile, {})
plan = plan_bootstrap(profile, {}, env={})
ssh_actions = [a for a in plan.actions if "SSH" in a.description]
assert len(ssh_actions) == 1

View File

@@ -42,28 +42,40 @@ class TestResolveMounts:
def test_projects_mount(self, tmp_path):
projects = tmp_path / "projects"
projects.mkdir()
mounts = resolve_mounts(tmp_path, project_path=str(projects))
mounts = resolve_mounts(
tmp_path, filesystem_check=lambda p: p.exists(), project_path=str(projects),
)
project_mounts = [m for m in mounts if m.target == "/workspace"]
assert len(project_mounts) == 1
def test_dotfiles_mount(self, tmp_path):
dotfiles = tmp_path / "dotfiles"
dotfiles.mkdir()
mounts = resolve_mounts(tmp_path, dotfiles_dir=dotfiles)
mounts = resolve_mounts(
tmp_path, filesystem_check=lambda p: p.exists(), dotfiles_dir=dotfiles,
)
assert any(m.target.endswith("/flow/dotfiles") for m in mounts)
def test_socket_path_mount(self, tmp_path):
sock = tmp_path / "docker.sock"
sock.write_text("")
mounts = resolve_mounts(tmp_path, socket_path=sock)
mounts = resolve_mounts(
tmp_path, filesystem_check=lambda p: p.exists(), socket_path=sock,
)
socket_mounts = [m for m in mounts if m.target == "/var/run/docker.sock"]
assert len(socket_mounts) == 1
assert socket_mounts[0].source == sock
def test_no_socket_path(self, tmp_path):
mounts = resolve_mounts(tmp_path)
mounts = resolve_mounts(tmp_path, filesystem_check=lambda p: p.exists())
assert not any(m.target == "/var/run/docker.sock" for m in mounts)
def test_filesystem_check_controls_standard_mounts(self, tmp_path):
mounts = resolve_mounts(tmp_path, filesystem_check=lambda p: False)
# No standard mounts present when filesystem_check returns False.
assert not any(m.target == "/home/dev/.ssh" for m in mounts)
assert not any(m.target.endswith("/flow/dotfiles") for m in mounts)
class TestBuildContainerSpec:
def test_basic(self):

View File

@@ -67,7 +67,7 @@ class TestResolveSpec:
name="fd", type="pkg", sources={"apt": "fd-find"},
source=None, version=None, asset_pattern=None,
platform_map={}, extract_dir=None, install={},
post_install=None, allow_sudo=False,
post_install=None,
)}
ref = ProfilePackageRef(name="fd", type=None, source=None, version="1.0", asset_pattern=None)
result = resolve_spec(ref, catalog)
@@ -86,7 +86,7 @@ class TestResolveSpec:
name="docker", type="pkg", sources={"apt": "docker-ce"},
source=None, version=None, asset_pattern=None,
platform_map={}, extract_dir=None, install={},
post_install=None, allow_sudo=False,
post_install=None,
)}
ref = ProfilePackageRef(
name="docker",
@@ -95,11 +95,9 @@ class TestResolveSpec:
version=None,
asset_pattern=None,
post_install="sudo groupadd docker || true",
allow_sudo=True,
)
result = resolve_spec(ref, catalog)
assert result.post_install == "sudo groupadd docker || true"
assert result.allow_sudo is True
class TestResolveSourceName:
@@ -108,7 +106,7 @@ class TestResolveSourceName:
name="fd", type="pkg", sources={"apt": "fd-find"},
source=None, version=None, asset_pattern=None,
platform_map={}, extract_dir=None, install={},
post_install=None, allow_sudo=False,
post_install=None,
)
assert resolve_source_name(pkg, "apt") == "fd-find"
@@ -117,7 +115,7 @@ class TestResolveSourceName:
name="fd", type="pkg", sources={},
source=None, version=None, asset_pattern=None,
platform_map={}, extract_dir=None, install={},
post_install=None, allow_sudo=False,
post_install=None,
)
assert resolve_source_name(pkg, "apt") == "fd"
@@ -131,7 +129,7 @@ class TestResolveBinaryAsset:
asset_pattern=None,
platform_map={"linux-x64": "nvim-linux-x86_64.tar.gz"},
extract_dir=None, install={},
post_install=None, allow_sudo=False,
post_install=None,
)
assert resolve_binary_asset(pkg, "linux-x64") == "nvim-linux-x86_64.tar.gz"
@@ -143,7 +141,7 @@ class TestResolveBinaryAsset:
asset_pattern="fd-v10.2.0-{{arch}}-unknown-{{os}}-gnu.tar.gz",
platform_map={},
extract_dir=None, install={},
post_install=None, allow_sudo=False,
post_install=None,
)
result = resolve_binary_asset(pkg, "linux-x64")
assert "x64" in result
@@ -157,7 +155,7 @@ class TestResolveBinaryAsset:
asset_pattern="nvim-{{os}}-{{arch}}.tar.gz",
platform_map={"linux-x64": {"os": "linux", "arch": "x86_64"}},
extract_dir="nvim-{{os}}64", install={},
post_install=None, allow_sudo=False,
post_install=None,
)
assert resolve_binary_asset(pkg, "linux-x64") == "nvim-linux-x86_64.tar.gz"
assert resolve_extract_dir(pkg, "linux-x64") == "nvim-linux64"
@@ -171,7 +169,7 @@ class TestResolveDownloadUrl:
version="v0.10.4",
asset_pattern=None, platform_map={},
extract_dir=None, install={},
post_install=None, allow_sudo=False,
post_install=None,
)
url = resolve_download_url(pkg, "nvim.tar.gz")
assert "github.com/neovim/neovim" in url
@@ -184,7 +182,7 @@ class TestResolveDownloadUrl:
version="0.10.4",
asset_pattern=None, platform_map={},
extract_dir=None, install={},
post_install=None, allow_sudo=False,
post_install=None,
)
url = resolve_download_url(pkg, "nvim.tar.gz", "linux-x64")
assert "/download/v0.10.4/" in url
@@ -196,7 +194,7 @@ class TestResolveDownloadUrl:
version=None,
asset_pattern=None, platform_map={},
extract_dir=None, install={},
post_install=None, allow_sudo=False,
post_install=None,
)
url = resolve_download_url(pkg, "nvim.tar.gz")
assert "latest" in url
@@ -208,7 +206,7 @@ class TestResolveDownloadUrl:
version=None,
asset_pattern=None, platform_map={},
extract_dir=None, install={},
post_install=None, allow_sudo=False,
post_install=None,
)
url = resolve_download_url(pkg, "x.tar.gz")
assert url == "https://example.com/download/x.tar.gz"
@@ -247,7 +245,7 @@ class TestPlanning:
name="wezterm", type="cask", sources={"brew": "wezterm"},
source=None, version=None, asset_pattern=None,
platform_map={}, extract_dir=None, install={},
post_install=None, allow_sudo=False,
post_install=None,
)
plan = plan_install([pkg], InstalledState(), "macos-arm64", "brew")
assert plan.install_ops[0].method == "cask"

View File

@@ -32,12 +32,36 @@ def test_installed_state_version_mismatch():
InstalledState.from_dict({"version": 99, "packages": {}})
def test_installed_state_corrupt_missing_version_raises():
with pytest.raises(ConfigError, match="missing field 'version'"):
InstalledState.from_dict({
"version": 1,
"packages": {"fd": {"type": "pkg", "files": []}},
})
def test_installed_state_corrupt_missing_type_raises():
with pytest.raises(ConfigError, match="missing field 'type'"):
InstalledState.from_dict({
"version": 1,
"packages": {"fd": {"version": "1.0", "files": []}},
})
def test_installed_state_corrupt_missing_files_raises():
with pytest.raises(ConfigError, match="missing field 'files'"):
InstalledState.from_dict({
"version": 1,
"packages": {"fd": {"version": "1.0", "type": "pkg"}},
})
def test_package_def_fields():
pkg = PackageDef(
name="fd", type="pkg", sources={"apt": "fd-find"},
source=None, version=None, asset_pattern=None,
platform_map={}, extract_dir=None, install={},
post_install=None, allow_sudo=False,
post_install=None,
)
assert pkg.name == "fd"
assert pkg.type == "pkg"

View File

@@ -1,7 +1,11 @@
"""Tests for package install/remove planning."""
import inspect
from pathlib import Path
import pytest
from flow.core.errors import FlowError
from flow.domain.packages.models import (
InstalledPackage,
InstalledState,
@@ -16,7 +20,7 @@ def _pkg(name, type="pkg", sources=None, source=None, version=None,
name=name, type=type, sources=sources or {},
source=source, version=version, asset_pattern=asset_pattern,
platform_map=platform_map or {}, extract_dir=None, install={},
post_install=None, allow_sudo=False,
post_install=None,
)
@@ -62,3 +66,15 @@ class TestPlanRemove:
def test_remove_not_installed(self):
plan = plan_remove(["missing"], InstalledState())
assert len(plan.remove_ops) == 0
class TestPlanInstallSignature:
def test_pm_is_required_positional(self):
sig = inspect.signature(plan_install)
param = sig.parameters["pm"]
assert param.default is inspect.Parameter.empty
def test_pkg_without_pm_raises(self):
pkgs = [_pkg("fd", sources={"apt": "fd-find"})]
with pytest.raises(FlowError, match="No supported package manager"):
plan_install(pkgs, InstalledState(), "linux-x64", None)

View File

@@ -10,7 +10,6 @@ from flow.domain.remote.resolution import (
list_targets,
parse_target,
resolve_target,
terminfo_fix_command,
)
@@ -84,9 +83,3 @@ class TestListTargets:
targets = list_targets(configs)
assert len(targets) == 2
assert targets[0].label == "a@b"
class TestTerminfoFix:
def test_returns_command(self):
cmd = terminfo_fix_command()
assert "infocmp" in cmd

View File

@@ -83,7 +83,6 @@ class TestBootstrapService:
"os": "linux",
"packages": [{
"name": "docker",
"allow-sudo": True,
"post-install": "sudo groupadd docker || true",
}],
},
@@ -93,9 +92,27 @@ class TestBootstrapService:
ctx = _make_ctx(manifest)
BootstrapService(ctx).run("linux-auto")
assert captured["packages"][0].allow_sudo is True
assert captured["packages"][0].post_install == "sudo groupadd docker || true"
def test_unknown_phase_raises(self):
from flow.domain.bootstrap.models import BootstrapAction, BootstrapPlan
from flow.domain.bootstrap.models import VALID_PHASES
manifest = {"profiles": {"work": {"os": "linux"}}}
ctx = _make_ctx(manifest)
svc = BootstrapService(ctx)
# Forge an action with a phase that VALID_PHASES contains but the
# dispatch can't handle (shouldn't happen, but tests the explicit guard).
# Use a phase NOT in VALID_PHASES first to confirm the "Unknown" branch.
action = BootstrapAction.__new__(BootstrapAction)
object.__setattr__(action, "phase", "no-such-phase")
object.__setattr__(action, "description", "")
object.__setattr__(action, "commands", ())
object.__setattr__(action, "needs_sudo", False)
plan = BootstrapPlan(profile="work", actions=(), packages_to_install=())
with pytest.raises(FlowError, match="Unknown bootstrap phase"):
svc._execute_action(action, plan, "work")
def test_run_uses_dotfiles_profile_override(self, monkeypatch):
captured = {}

View File

@@ -2,6 +2,7 @@
import io
import tarfile
import urllib.error
from pathlib import Path
import pytest
@@ -12,7 +13,7 @@ from flow.core.errors import FlowError
from flow.core.platform import PlatformInfo
from flow.core.runtime import SystemRuntime
from flow.core import paths
from flow.domain.packages.models import InstalledPackage, InstalledState
from flow.domain.packages.models import InstalledPackage, InstalledState, PackageDef
from flow.services.packages import PackageService
@@ -78,6 +79,7 @@ class TestPackageService:
home = tmp_path / "home"
home.mkdir()
monkeypatch.setenv("HOME", str(home))
monkeypatch.setattr(paths, "HOME", home)
monkeypatch.setattr(paths, "DATA_DIR", tmp_path / "data")
monkeypatch.setattr(paths, "INSTALLED_STATE", tmp_path / "installed.json")
@@ -130,3 +132,105 @@ class TestPackageService:
assert (home / ".local" / "bin" / "nvim").exists()
assert (home / ".local" / "share" / "nvim" / "runtime.txt").exists()
assert (home / ".local" / "share" / "man" / "man1" / "nvim.1").exists()
def test_post_install_with_sudo_runs_unchecked(self, tmp_path, monkeypatch):
"""No allow_sudo gate -- post-install scripts run as written."""
home = tmp_path / "home"
home.mkdir()
monkeypatch.setattr(paths, "HOME", home)
monkeypatch.setattr(paths, "INSTALLED_STATE", tmp_path / "installed.json")
calls: list[str] = []
class _Runner:
def run_shell(self, command, **kwargs):
calls.append(command)
class _Result:
returncode = 0
stdout = ""
stderr = ""
return _Result()
ctx = _make_ctx(tmp_path)
ctx.runtime.runner = _Runner()
svc = PackageService(ctx)
pkg = PackageDef(
name="docker", type="pkg", sources={},
source=None, version=None, asset_pattern=None,
platform_map={}, extract_dir=None, install={},
post_install="sudo groupadd docker || true",
)
svc._run_post_install(pkg)
assert calls == ["sudo groupadd docker || true"]
def test_install_binary_url_failure_raises_flow_error(self, tmp_path, monkeypatch):
home = tmp_path / "home"
home.mkdir()
monkeypatch.setattr(paths, "HOME", home)
monkeypatch.setattr(paths, "DATA_DIR", tmp_path / "data")
monkeypatch.setattr(paths, "INSTALLED_STATE", tmp_path / "installed.json")
def _raise(*args, **kwargs):
raise urllib.error.URLError("Network unreachable")
monkeypatch.setattr(
"flow.services.packages.urllib.request.urlopen", _raise,
)
manifest = {
"packages": [{
"name": "neovim",
"type": "binary",
"source": "github:neovim/neovim",
"version": "0.10.4",
"platform-map": {"linux-x64": "nvim-linux-x64.tar.gz"},
"install": {"bin": ["bin/nvim"]},
}],
}
ctx = _make_ctx(tmp_path, manifest)
svc = PackageService(ctx)
packages = svc.resolve_install_packages(package_names=["neovim"])
with pytest.raises(FlowError, match="Failed to download"):
svc.install(packages)
def test_install_path_absolute_raises(self, tmp_path, monkeypatch):
monkeypatch.setattr(paths, "HOME", tmp_path / "home")
ctx = _make_ctx(tmp_path)
svc = PackageService(ctx)
with pytest.raises(FlowError, match="must be relative"):
svc._validate_install_path("pkg", Path("/etc/passwd"))
def test_install_path_parent_traversal_raises(self, tmp_path, monkeypatch):
monkeypatch.setattr(paths, "HOME", tmp_path / "home")
ctx = _make_ctx(tmp_path)
svc = PackageService(ctx)
with pytest.raises(FlowError, match="parent traversal"):
svc._validate_install_path("pkg", Path("../etc/passwd"))
def test_install_path_escapes_extract_dir_raises(self, tmp_path, monkeypatch):
"""A relative path whose resolved location is outside the extract dir."""
home = tmp_path / "home"
home.mkdir()
monkeypatch.setattr(paths, "HOME", home)
ctx = _make_ctx(tmp_path)
svc = PackageService(ctx)
extract_root = tmp_path / "extract"
extract_root.mkdir()
sibling = tmp_path / "sibling"
sibling.mkdir()
# Symlink inside the extract root pointing outside -- the resolved
# source escapes the root.
link = extract_root / "evil"
link.symlink_to(sibling)
with pytest.raises(FlowError, match="escapes extract-dir"):
svc._copy_install_item(
"pkg",
extract_root,
extract_root.resolve(),
"bin",
"evil/escape",
)

View File

@@ -55,10 +55,3 @@ class TestRemoteService:
svc = RemoteService(ctx)
svc.list()
assert "No targets" in capsys.readouterr().out
def test_fix_terminfo(self, capsys):
ctx = _make_ctx()
svc = RemoteService(ctx)
svc.fix_terminfo("personal@orb")
output = capsys.readouterr().out
assert "infocmp" in output

View File

@@ -2,6 +2,9 @@
import os
import pytest
from flow.core.errors import ConfigError
from flow.core.template import substitute, substitute_template
@@ -36,8 +39,18 @@ class TestSubstituteTemplate:
ctx = {"platform": {"arch": "arm64"}}
assert substitute_template("{{ platform.arch }}", ctx) == "arm64"
def test_preserves_unknown_templates(self):
assert substitute_template("{{ unknown }}", {}) == "{{ unknown }}"
def test_unknown_variable_raises(self):
with pytest.raises(ConfigError, match=r"\{\{ unknown \}\}"):
substitute_template("{{ unknown }}", {})
def test_nested_unresolved_raises(self):
with pytest.raises(ConfigError, match=r"\{\{ platform.missing \}\}"):
substitute_template("{{ platform.missing }}", {"platform": {"arch": "arm64"}})
def test_unresolved_env_raises(self, monkeypatch):
monkeypatch.delenv("SOME_NEVER_SET_VAR", raising=False)
with pytest.raises(ConfigError):
substitute_template("{{ env.SOME_NEVER_SET_VAR }}", {"env": {}})
def test_non_string_passthrough(self):
assert substitute_template(42, {}) == 42