From 6e9f9c9e30b83a3b7380ca8e70a461fdec2ae6f1 Mon Sep 17 00:00:00 2001 From: Tomas Mirchev Date: Mon, 16 Mar 2026 05:07:57 +0200 Subject: [PATCH] chore: remove old code replaced by rewrite Delete old core modules (action, stow, process, system, variables), old services (package_defs, ssh), and all tests for deleted code. 191 tests pass with the new codebase. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/flow/core/action.py | 120 ------ src/flow/core/process.py | 30 -- src/flow/core/stow.py | 358 --------------- src/flow/core/system.py | 327 -------------- src/flow/core/variables.py | 61 --- src/flow/services/package_defs.py | 350 --------------- src/flow/services/ssh.py | 184 -------- tests/test_action.py | 115 ----- tests/test_bootstrap.py | 215 --------- tests/test_commands.py | 74 ---- tests/test_config.py | 77 ---- tests/test_console.py | 95 ---- tests/test_dotfiles.py | 104 ----- tests/test_dotfiles_e2e_container.py | 296 ------------- tests/test_dotfiles_folding.py | 622 --------------------------- tests/test_package.py | 45 -- tests/test_paths.py | 77 ---- tests/test_platform.py | 29 -- tests/test_self_hosting.py | 81 ---- tests/test_stow.py | 310 ------------- tests/test_sync.py | 88 ---- tests/test_variables.py | 57 --- 22 files changed, 3715 deletions(-) delete mode 100644 src/flow/core/action.py delete mode 100644 src/flow/core/process.py delete mode 100644 src/flow/core/stow.py delete mode 100644 src/flow/core/system.py delete mode 100644 src/flow/core/variables.py delete mode 100644 src/flow/services/package_defs.py delete mode 100644 src/flow/services/ssh.py delete mode 100644 tests/test_action.py delete mode 100644 tests/test_bootstrap.py delete mode 100644 tests/test_commands.py delete mode 100644 tests/test_config.py delete mode 100644 tests/test_console.py delete mode 100644 tests/test_dotfiles.py delete mode 100644 tests/test_dotfiles_e2e_container.py delete mode 100644 tests/test_dotfiles_folding.py delete mode 100644 tests/test_package.py delete mode 100644 tests/test_paths.py delete mode 100644 tests/test_platform.py delete mode 100644 tests/test_self_hosting.py delete mode 100644 tests/test_stow.py delete mode 100644 tests/test_sync.py delete mode 100644 tests/test_variables.py diff --git a/src/flow/core/action.py b/src/flow/core/action.py deleted file mode 100644 index cb07d67..0000000 --- a/src/flow/core/action.py +++ /dev/null @@ -1,120 +0,0 @@ -"""Action dataclass and ActionExecutor for plan-then-execute workflows.""" - -from dataclasses import dataclass, field -from typing import Any, Callable, Dict, List, Optional - -from flow.core.console import ConsoleLogger - - -@dataclass -class Action: - type: str - description: str - data: Dict[str, Any] = field(default_factory=dict) - skip_on_error: bool = True - os_filter: Optional[str] = None - status: str = "pending" - error: Optional[str] = None - - -class ActionExecutor: - """Register handlers for action types, then execute a plan.""" - - def __init__(self, console: ConsoleLogger): - self.console = console - self._handlers: Dict[str, Callable] = {} - self.post_comments: List[str] = [] - - def register(self, action_type: str, handler: Callable) -> None: - self._handlers[action_type] = handler - - def execute(self, actions: List[Action], *, dry_run: bool = False, current_os: str = "") -> None: - if dry_run: - self._print_plan(actions) - return - - # Filter OS-incompatible actions - compatible = [a for a in actions if a.os_filter is None or a.os_filter == current_os] - skipped_count = len(actions) - len(compatible) - if skipped_count: - self.console.info(f"Skipped {skipped_count} OS-incompatible actions") - - self.console.section_header(f"EXECUTING {len(compatible)} ACTIONS") - - for i, action in enumerate(compatible, 1): - self.console.step_start(i, len(compatible), action.description) - - handler = self._handlers.get(action.type) - if not handler: - action.status = "skipped" - self.console.step_skip(f"No handler for action type: {action.type}") - continue - - try: - handler(action.data) - action.status = "completed" - self.console.step_complete() - except Exception as e: - action.error = str(e) - if action.skip_on_error: - action.status = "skipped" - self.console.step_skip(str(e)) - else: - action.status = "failed" - self.console.step_fail(str(e)) - print(f"\n{self.console.RED}Critical action failed, stopping execution{self.console.RESET}") - break - - self._print_summary(compatible) - - def _print_plan(self, actions: List[Action]) -> None: - self.console.plan_header("EXECUTION PLAN", len(actions)) - - grouped: Dict[str, List[Action]] = {} - for action in actions: - category = action.type.split("-")[0] - grouped.setdefault(category, []).append(action) - - for category, category_actions in grouped.items(): - self.console.plan_category(category) - for i, action in enumerate(category_actions, 1): - self.console.plan_item( - i, - action.description, - action.os_filter, - not action.skip_on_error, - ) - - self.console.plan_legend() - - def _print_summary(self, actions: List[Action]) -> None: - completed = sum(1 for a in actions if a.status == "completed") - failed = sum(1 for a in actions if a.status == "failed") - skipped = sum(1 for a in actions if a.status == "skipped") - - self.console.section_summary("EXECUTION SUMMARY") - c = self.console - - print(f"Total actions: {c.BOLD}{len(actions)}{c.RESET}") - print(f"Completed: {c.GREEN}{completed}{c.RESET}") - if failed: - print(f"Failed: {c.RED}{failed}{c.RESET}") - if skipped: - print(f"Skipped: {c.YELLOW}{skipped}{c.RESET}") - - if self.post_comments: - print(f"\n{c.BOLD}POST-INSTALL NOTES{c.RESET}") - print(f"{c.CYAN}{'-' * 25}{c.RESET}") - for i, comment in enumerate(self.post_comments, 1): - print(f"{i}. {comment}") - - if failed: - print(f"\n{c.BOLD}FAILED ACTIONS{c.RESET}") - print(f"{c.RED}{'-' * 20}{c.RESET}") - for action in actions: - if action.status == "failed": - print(f"{c.RED}>{c.RESET} {action.description}") - print(f" {c.GRAY}Error: {action.error}{c.RESET}") - print(f"\n{c.RED}{failed} action(s) failed. Check the errors above.{c.RESET}") - else: - print(f"\n{c.GREEN}All actions completed successfully!{c.RESET}") diff --git a/src/flow/core/process.py b/src/flow/core/process.py deleted file mode 100644 index e834a33..0000000 --- a/src/flow/core/process.py +++ /dev/null @@ -1,30 +0,0 @@ -"""Command execution helpers.""" - -import subprocess - -from flow.core.console import ConsoleLogger -from flow.core.system import CommandRunner - - -def run_command( - command: str, - console: ConsoleLogger, - *, - check: bool = True, - shell: bool = True, - capture: bool = False, -) -> subprocess.CompletedProcess: - """Run a command with real-time streamed output.""" - if not shell: - raise RuntimeError("run_command only supports shell commands") - - runner = CommandRunner() - if capture: - result = runner.run_shell(command, capture_output=True, check=False) - if check and result.returncode != 0: - raise RuntimeError( - f"Command failed (exit {result.returncode}): {command}" - ) - return result - - return runner.stream_shell(command, console, check=check) diff --git a/src/flow/core/stow.py b/src/flow/core/stow.py deleted file mode 100644 index 7e694b1..0000000 --- a/src/flow/core/stow.py +++ /dev/null @@ -1,358 +0,0 @@ -"""GNU Stow-style tree folding/unfolding for efficient symlink management.""" - -import os -from dataclasses import dataclass, field -from pathlib import Path -from typing import Dict, List, Optional, Set - - -@dataclass -class LinkOperation: - """Represents a single operation to perform during linking.""" - - type: str # "create_symlink" | "create_dir" | "unfold" | "remove" | "remove_dir" - source: Path - target: Path - package: str - is_directory_link: bool = False - - def __str__(self) -> str: - if self.type == "create_symlink": - link_type = "DIR" if self.is_directory_link else "FILE" - return f" {link_type} LINK: {self.target} -> {self.source}" - elif self.type == "create_dir": - return f" CREATE DIR: {self.target}" - elif self.type == "unfold": - return f" UNFOLD: {self.target} (directory symlink -> individual file symlinks)" - elif self.type == "remove": - return f" REMOVE: {self.target}" - elif self.type == "remove_dir": - return f" REMOVE DIR: {self.target}" - return f" {self.type}: {self.target}" - - -@dataclass -class LinkTree: - """Represents the current state of symlinks.""" - - links: Dict[Path, Path] = field(default_factory=dict) # target -> source - packages: Dict[Path, str] = field(default_factory=dict) # target -> package_name - directory_links: Set[Path] = field(default_factory=set) # targets that are directory links - - def add_link(self, target: Path, source: Path, package: str, is_dir_link: bool = False): - """Add a link to the tree.""" - self.links[target] = source - self.packages[target] = package - if is_dir_link: - self.directory_links.add(target) - - def remove_link(self, target: Path): - """Remove a link from the tree.""" - self.links.pop(target, None) - self.packages.pop(target, None) - self.directory_links.discard(target) - - def is_directory_link(self, target: Path) -> bool: - """Check if a target is a directory symlink.""" - return target in self.directory_links - - def get_package(self, target: Path) -> Optional[str]: - """Get the package that owns a link.""" - return self.packages.get(target) - - def can_fold(self, target_dir: Path, package: str) -> bool: - """Check if all links in target_dir belong to the same package. - - Returns True if we can create a single directory symlink instead of - individual file symlinks. - """ - # Check all direct children of target_dir - for link_target, link_package in self.packages.items(): - # If link_target is a child of target_dir - try: - link_target.relative_to(target_dir) - # If parent is target_dir and package differs, cannot fold - if link_target.parent == target_dir and link_package != package: - return False - except ValueError: - # link_target is not under target_dir, skip - continue - - return True - - @classmethod - def from_state(cls, state: dict) -> "LinkTree": - """Build a LinkTree from the linked.json state format (v2 only).""" - tree = cls() - links_dict = state.get("links", {}) - - for package_name, pkg_links in links_dict.items(): - for target_str, link_info in pkg_links.items(): - target = Path(target_str) - if not isinstance(link_info, dict) or "source" not in link_info: - raise RuntimeError( - "Unsupported linked state format. Remove linked.json and relink dotfiles." - ) - - source = Path(link_info["source"]) - is_dir_link = bool(link_info.get("is_directory_link", False)) - - tree.add_link(target, source, package_name, is_dir_link) - - return tree - - def to_state(self) -> dict: - """Convert LinkTree to linked.json state format.""" - state = {"version": 2, "links": {}} - - # Group links by package - package_links: Dict[str, Dict[str, dict]] = {} - for target, source in self.links.items(): - package = self.packages[target] - if package not in package_links: - package_links[package] = {} - - package_links[package][str(target)] = { - "source": str(source), - "is_directory_link": target in self.directory_links, - } - - state["links"] = package_links - return state - - -class TreeFolder: - """Implements GNU Stow tree folding/unfolding algorithm.""" - - def __init__(self, tree: LinkTree): - self.tree = tree - - def plan_link( - self, source: Path, target: Path, package: str, is_dir_link: bool = False - ) -> List[LinkOperation]: - """Plan operations needed to create a link (may include unfolding). - - Args: - source: Source path (file or directory in dotfiles) - target: Target path (where symlink should be created) - package: Package name - is_dir_link: Whether this is a directory symlink (folded) - - Returns a list of operations to execute in order. - """ - operations = [] - - # Check if parent is a directory symlink that needs unfolding - parent = target.parent - if parent in self.tree.links and self.tree.is_directory_link(parent): - # Parent is a folded directory symlink, need to unfold - unfold_ops = self._plan_unfold(parent) - operations.extend(unfold_ops) - - # Create symlink operation (conflict detection will handle existing links) - operations.append( - LinkOperation( - type="create_symlink", - source=source, - target=target, - package=package, - is_directory_link=is_dir_link, - ) - ) - - return operations - - def _find_fold_point( - self, source: Path, target: Path, package: str - ) -> Path: - """Find the deepest directory level where we can create a folder symlink. - - Returns the target path where the symlink should be created. - For single files, this should just return the file path (no folding). - Folding only makes sense when linking entire directories. - """ - # For now, disable automatic folding at the plan_link level - # Folding should be done at a higher level when we know we're - # linking an entire directory tree from a package - return target - - def _plan_unfold(self, folded_dir: Path) -> List[LinkOperation]: - """Plan operations to unfold a directory symlink. - - When unfolding: - 1. Remove the directory symlink - 2. Create a real directory - 3. Create individual file symlinks for all files - """ - operations = [] - - # Get the source of the folded directory - source_dir = self.tree.links.get(folded_dir) - if not source_dir: - return operations - - package = self.tree.packages.get(folded_dir, "") - - # Remove the directory symlink - operations.append( - LinkOperation( - type="remove", - source=source_dir, - target=folded_dir, - package=package, - is_directory_link=True, - ) - ) - - # Create real directory - operations.append( - LinkOperation( - type="create_dir", - source=source_dir, - target=folded_dir, - package=package, - ) - ) - - # Create individual file symlinks for all files in source - if source_dir.exists() and source_dir.is_dir(): - for root, _dirs, files in os.walk(source_dir): - for fname in files: - src_file = Path(root) / fname - rel = src_file.relative_to(source_dir) - dst_file = folded_dir / rel - - operations.append( - LinkOperation( - type="create_symlink", - source=src_file, - target=dst_file, - package=package, - is_directory_link=False, - ) - ) - - return operations - - def plan_unlink(self, target: Path, package: str) -> List[LinkOperation]: - """Plan operations to remove a link (may include refolding).""" - operations = [] - - # Check if this is a directory link - if self.tree.is_directory_link(target): - # Remove all file links under this directory - to_remove = [] - for link_target in self.tree.links.keys(): - try: - link_target.relative_to(target) - to_remove.append(link_target) - except ValueError: - continue - - for link_target in to_remove: - operations.append( - LinkOperation( - type="remove", - source=self.tree.links[link_target], - target=link_target, - package=self.tree.packages[link_target], - is_directory_link=False, - ) - ) - - # Remove the link itself - if target in self.tree.links: - operations.append( - LinkOperation( - type="remove", - source=self.tree.links[target], - target=target, - package=package, - is_directory_link=self.tree.is_directory_link(target), - ) - ) - - return operations - - def detect_conflicts(self, operations: List[LinkOperation]) -> List[str]: - """Detect conflicts before executing operations. - - Returns a list of conflict error messages. - """ - conflicts = [] - - for op in operations: - if op.type == "create_symlink": - # Check if target already exists in tree (managed by flow) - if op.target in self.tree.links: - existing_pkg = self.tree.packages[op.target] - if existing_pkg != op.package: - conflicts.append( - f"Conflict: {op.target} is already linked by package '{existing_pkg}'" - ) - # Check if target exists on disk but not managed by flow - elif op.target.exists() or op.target.is_symlink(): - conflicts.append( - f"Conflict: {op.target} already exists and is not managed by flow" - ) - - # Check if target's parent is a file (can't create file in file) - if op.target.parent.exists() and op.target.parent.is_file(): - conflicts.append( - f"Conflict: {op.target.parent} is a file, cannot create {op.target}" - ) - - return conflicts - - def execute_operations( - self, operations: List[LinkOperation], dry_run: bool = False - ) -> None: - """Execute a list of operations atomically. - - If dry_run is True, only print what would be done. - """ - if dry_run: - for op in operations: - print(str(op)) - return - - # Execute operations - for op in operations: - if op.type == "create_symlink": - # Create parent directories - op.target.parent.mkdir(parents=True, exist_ok=True) - - if op.target.is_symlink(): - current = op.target.resolve(strict=False) - desired = op.source.resolve(strict=False) - if current == desired: - self.tree.add_link(op.target, op.source, op.package, op.is_directory_link) - continue - op.target.unlink() - elif op.target.exists(): - if op.target.is_file(): - op.target.unlink() - else: - raise RuntimeError(f"Cannot overwrite directory: {op.target}") - - # Create symlink - op.target.symlink_to(op.source) - - # Update tree - self.tree.add_link(op.target, op.source, op.package, op.is_directory_link) - - elif op.type == "create_dir": - op.target.mkdir(parents=True, exist_ok=True) - - elif op.type == "remove": - if op.target.exists() or op.target.is_symlink(): - op.target.unlink() - self.tree.remove_link(op.target) - - elif op.type == "remove_dir": - if op.target.exists() and op.target.is_dir(): - op.target.rmdir() - - def to_state(self) -> dict: - """Convert current tree to state format for persistence.""" - return self.tree.to_state() diff --git a/src/flow/core/system.py b/src/flow/core/system.py deleted file mode 100644 index 1842b63..0000000 --- a/src/flow/core/system.py +++ /dev/null @@ -1,327 +0,0 @@ -"""Runtime primitives for process, git, state, and filesystem access.""" - -from __future__ import annotations - -import json -import os -import shlex -import shutil -import subprocess -from dataclasses import dataclass, field -from pathlib import Path -from typing import Any, Iterable, Mapping, Optional, Sequence - -from flow.core.console import ConsoleLogger -from flow.core.errors import FlowError - - -def _as_argv(argv: Sequence[str] | Iterable[str]) -> list[str]: - return [str(part) for part in argv] - - -class CommandRunner: - """Small wrapper around subprocess with consistent defaults.""" - - def format_command(self, argv: Sequence[str] | Iterable[str]) -> str: - return " ".join(shlex.quote(part) for part in _as_argv(argv)) - - def require_binary(self, name: str) -> str: - path = shutil.which(name) - if path is None: - raise FlowError(f"Required executable not found: {name}") - return path - - def run( - self, - argv: Sequence[str] | Iterable[str], - *, - cwd: Optional[Path] = None, - env: Optional[Mapping[str, str]] = None, - capture_output: bool = True, - check: bool = False, - timeout: Optional[int | float] = None, - ) -> subprocess.CompletedProcess[str]: - completed = subprocess.run( - _as_argv(argv), - cwd=str(cwd) if cwd is not None else None, - env=dict(env) if env is not None else None, - capture_output=capture_output, - text=True, - check=False, - timeout=timeout, - ) - if check and completed.returncode != 0: - message = completed.stderr.strip() or completed.stdout.strip() - if not message: - message = f"Command failed with exit code {completed.returncode}" - raise FlowError(message) - return completed - - def run_shell( - self, - command: str, - *, - cwd: Optional[Path] = None, - env: Optional[Mapping[str, str]] = None, - capture_output: bool = True, - check: bool = False, - timeout: Optional[int | float] = None, - ) -> subprocess.CompletedProcess[str]: - completed = subprocess.run( - command, - shell=True, - cwd=str(cwd) if cwd is not None else None, - env=dict(env) if env is not None else None, - capture_output=capture_output, - text=True, - check=False, - timeout=timeout, - ) - if check and completed.returncode != 0: - message = completed.stderr.strip() or completed.stdout.strip() - if not message: - message = f"Command failed with exit code {completed.returncode}" - raise FlowError(message) - return completed - - def stream_shell( - self, - command: str, - console: ConsoleLogger, - *, - check: bool = True, - ) -> subprocess.CompletedProcess[str]: - console.step_command(command) - - process = subprocess.Popen( - command, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - universal_newlines=True, - bufsize=1, - ) - - output_lines: list[str] = [] - assert process.stdout is not None - try: - for line in process.stdout: - line = line.rstrip() - if not line: - continue - console.step_output(line) - output_lines.append(line) - finally: - process.stdout.close() - process.wait() - - if check and process.returncode != 0: - raise FlowError( - f"Command failed (exit {process.returncode}): {command}" - ) - - return subprocess.CompletedProcess( - command, - process.returncode, - stdout="\n".join(output_lines), - stderr="", - ) - - -class GitClient: - """Thin git adapter that always scopes commands to a repository root.""" - - def __init__(self, runner: CommandRunner): - self.runner = runner - - def run( - self, - repo_dir: Path, - *args: str, - capture_output: bool = True, - check: bool = False, - ) -> subprocess.CompletedProcess[str]: - return self.runner.run( - ["git", "-C", str(repo_dir), *args], - capture_output=capture_output, - check=check, - ) - - -class FileSystem: - """Filesystem wrapper for all mutating operations.""" - - def ensure_dir( - self, - path: Path, - *, - sudo: bool = False, - runner: Optional[CommandRunner] = None, - mode: Optional[int] = None, - ) -> None: - if sudo: - if runner is None: - raise FlowError("A command runner is required for sudo operations") - runner.require_binary("sudo") - argv = ["sudo", "mkdir", "-p"] - if mode is not None: - argv.extend(["-m", f"{mode:o}"]) - argv.append(str(path)) - runner.run(argv, check=True) - return - - path.mkdir(parents=True, exist_ok=True) - if mode is not None: - path.chmod(mode) - - def remove_file( - self, - path: Path, - *, - sudo: bool = False, - runner: Optional[CommandRunner] = None, - missing_ok: bool = True, - ) -> None: - if sudo: - if runner is None: - raise FlowError("A command runner is required for sudo operations") - runner.require_binary("sudo") - argv = ["sudo", "rm"] - if missing_ok: - argv.append("-f") - argv.append(str(path)) - runner.run(argv, check=True) - return - - try: - path.unlink() - except FileNotFoundError: - if not missing_ok: - raise - - def remove_tree(self, path: Path) -> None: - shutil.rmtree(path, ignore_errors=True) - - def copy_file( - self, - source: Path, - target: Path, - *, - sudo: bool = False, - runner: Optional[CommandRunner] = None, - ) -> None: - if sudo: - if runner is None: - raise FlowError("A command runner is required for sudo operations") - runner.require_binary("sudo") - self.ensure_dir(target.parent, sudo=True, runner=runner) - runner.run(["sudo", "cp", "-a", str(source), str(target)], check=True) - return - - self.ensure_dir(target.parent) - shutil.copy2(source, target) - - def copy_tree(self, source: Path, target: Path) -> None: - self.ensure_dir(target.parent) - shutil.copytree(source, target, dirs_exist_ok=True) - - def create_symlink( - self, - source: Path, - target: Path, - *, - sudo: bool = False, - runner: Optional[CommandRunner] = None, - ) -> None: - if sudo: - if runner is None: - raise FlowError("A command runner is required for sudo operations") - runner.require_binary("sudo") - self.ensure_dir(target.parent, sudo=True, runner=runner) - runner.run(["sudo", "ln", "-sfn", str(source), str(target)], check=True) - return - - self.ensure_dir(target.parent) - target.symlink_to(source) - - def read_text(self, path: Path, *, default: Optional[str] = None) -> str: - try: - return path.read_text(encoding="utf-8") - except FileNotFoundError: - if default is None: - raise - return default - - def write_text(self, path: Path, content: str) -> None: - self.ensure_dir(path.parent) - path.write_text(content, encoding="utf-8") - - def write_bytes(self, path: Path, content: bytes) -> None: - self.ensure_dir(path.parent) - path.write_bytes(content) - - def write_bytes(self, path: Path, content: bytes) -> None: - self.ensure_dir(path.parent) - path.write_bytes(content) - - def read_json(self, path: Path, *, default: Any = None) -> Any: - try: - with open(path, "r", encoding="utf-8") as handle: - return json.load(handle) - except FileNotFoundError: - return default - - def write_json(self, path: Path, data: Any) -> None: - self.ensure_dir(path.parent) - with open(path, "w", encoding="utf-8") as handle: - json.dump(data, handle, indent=2) - - def same_symlink(self, target: Path, source: Path) -> bool: - if not target.is_symlink(): - return False - return target.resolve(strict=False) == source.resolve(strict=False) - - def is_within(self, path: Path, parent: Path) -> bool: - try: - path.resolve(strict=False).relative_to(parent.resolve(strict=False)) - return True - except ValueError: - return False - - def path_in_home(self, path: Path, home: Optional[Path] = None) -> bool: - root = (home or Path.home()).resolve(strict=False) - try: - path.resolve(strict=False).relative_to(root) - return True - except ValueError: - return False - - -@dataclass -class JsonStateStore: - """JSON file-backed state store.""" - - path: Path - fs: FileSystem - default_factory: Any - - def load(self) -> Any: - data = self.fs.read_json(self.path, default=None) - if data is None: - return self.default_factory() - return data - - def save(self, data: Any) -> None: - self.fs.write_json(self.path, data) - - -@dataclass -class SystemRuntime: - """Shared runtime dependencies carried through the command context.""" - - runner: CommandRunner = field(default_factory=CommandRunner) - fs: FileSystem = field(default_factory=FileSystem) - git: GitClient = field(init=False) - - def __post_init__(self) -> None: - self.git = GitClient(self.runner) diff --git a/src/flow/core/variables.py b/src/flow/core/variables.py deleted file mode 100644 index 3285164..0000000 --- a/src/flow/core/variables.py +++ /dev/null @@ -1,61 +0,0 @@ -"""Variable substitution for shell-style and template expressions.""" - -import os -import re -from pathlib import Path -from typing import Any, Dict - - -def substitute(text: str, variables: Dict[str, str]) -> str: - """Replace $VAR and ${VAR} with values from variables dict or env.""" - if not isinstance(text, str): - return text - - pattern = re.compile(r"\$(\w+)|\$\{([^}]+)\}") - - def _replace(match: re.Match[str]) -> str: - key = match.group(1) or match.group(2) or "" - if key in variables: - return str(variables[key]) - if key == "HOME": - return str(Path.home()) - if key in os.environ: - return os.environ[key] - return match.group(0) - - return pattern.sub(_replace, text) - - -def _resolve_template_value(expr: str, context: Dict[str, Any]) -> Any: - if expr.startswith("env."): - env_key = expr.split(".", 1)[1] - env_ctx = context.get("env", {}) - if isinstance(env_ctx, dict) and env_key in env_ctx: - return env_ctx[env_key] - return os.environ.get(env_key) - - if expr in context: - return context[expr] - - current: Any = context - for part in expr.split("."): - if not isinstance(current, dict) or part not in current: - return None - current = current[part] - - return current - - -def substitute_template(text: str, context: Dict[str, Any]) -> str: - """Replace {{expr}} placeholders with values from context dict.""" - if not isinstance(text, str): - return text - - def _replace(match: re.Match[str]) -> str: - key = match.group(1).strip() - value = _resolve_template_value(key, context) - if value is None: - return match.group(0) - return str(value) - - return re.sub(r"\{\{\s*([^{}]+?)\s*\}\}", _replace, text) diff --git a/src/flow/services/package_defs.py b/src/flow/services/package_defs.py deleted file mode 100644 index 6f62704..0000000 --- a/src/flow/services/package_defs.py +++ /dev/null @@ -1,350 +0,0 @@ -"""Shared package-manifest normalization and binary install helpers.""" - -from __future__ import annotations - -import os -import shutil -import tempfile -import urllib.request -from pathlib import Path -from typing import Any, Dict, List, Optional - -from flow.core.config import FlowContext -from flow.core.errors import FlowError -from flow.core.variables import substitute_template - -PACKAGE_TYPES = {"pkg", "binary", "cask"} - - -def linux_detect_package_manager() -> Optional[str]: - if shutil.which("apt") or shutil.which("apt-get"): - return "apt" - if shutil.which("dnf"): - return "dnf" - return None - - -def resolve_package_manager(ctx: FlowContext, profile_cfg: dict) -> str: - explicit = profile_cfg.get("package-manager") - if isinstance(explicit, str) and explicit: - return explicit - - profile_os = profile_cfg.get("os") - if profile_os == "macos": - return "brew" - if profile_os == "linux": - detected = linux_detect_package_manager() - if detected: - return detected - raise FlowError("Unable to auto-detect package manager (expected apt or dnf)") - raise FlowError("Profile 'os' must be set to 'linux' or 'macos'") - - -def get_package_catalog(ctx: FlowContext) -> Dict[str, Dict[str, Any]]: - raw = ctx.manifest.get("packages", []) - catalog: Dict[str, Dict[str, Any]] = {} - - if isinstance(raw, dict): - for name, definition in raw.items(): - if not isinstance(definition, dict): - continue - package = dict(definition) - package["name"] = str(package.get("name") or name) - package.setdefault("type", "pkg") - catalog[package["name"]] = package - return catalog - - if not isinstance(raw, list): - return catalog - - for item in raw: - if not isinstance(item, dict): - continue - name = item.get("name") - if not isinstance(name, str) or not name: - continue - package = dict(item) - package.setdefault("type", "pkg") - catalog[name] = package - - return catalog - - -def normalize_profile_package_entry(entry: Any) -> Dict[str, Any]: - if isinstance(entry, str): - if "/" in entry: - prefix, name = entry.split("/", 1) - if prefix in PACKAGE_TYPES and name: - return {"name": name, "type": prefix} - return {"name": entry} - - if isinstance(entry, dict): - name = entry.get("name") - if not isinstance(name, str) or not name: - raise FlowError("Package object entries must include a non-empty 'name'") - return dict(entry) - - raise FlowError(f"Unsupported package entry: {entry!r}") - - -def resolve_package_spec( - catalog: Dict[str, Dict[str, Any]], - profile_entry: Dict[str, Any], -) -> Dict[str, Any]: - name = profile_entry["name"] - merged = dict(catalog.get(name, {})) - merged.update(profile_entry) - merged["name"] = name - - pkg_type = merged.get("type") or "pkg" - if pkg_type not in PACKAGE_TYPES: - raise FlowError(f"Unsupported package type '{pkg_type}' for package '{name}'") - merged["type"] = pkg_type - return merged - - -def resolve_pkg_source_name(spec: Dict[str, Any], package_manager: str) -> str: - sources = spec.get("sources", {}) - if not isinstance(sources, dict): - return spec["name"] - - keys = [package_manager] - if package_manager == "apt": - keys.append("apt-get") - if package_manager == "apt-get": - keys.append("apt") - - for key in keys: - value = sources.get(key) - if isinstance(value, str) and value: - return value - return spec["name"] - - -def platform_lookup_keys(ctx: FlowContext) -> List[str]: - keys = [ctx.platform.platform] - if ctx.platform.os == "macos": - keys.append(f"darwin-{ctx.platform.arch}") - if ctx.platform.arch == "x64": - keys.append(f"{ctx.platform.os}-amd64") - if ctx.platform.os == "macos": - keys.append("darwin-amd64") - ordered: list[str] = [] - for key in keys: - if key not in ordered: - ordered.append(key) - return ordered - - -def profile_template_context( - ctx: FlowContext, - extra_env: Dict[str, str], - extra: Optional[Dict[str, Any]] = None, -) -> Dict[str, Any]: - env_map = dict(os.environ) - env_map.update(extra_env) - template_ctx: Dict[str, Any] = { - "env": env_map, - "os": ctx.platform.os, - "arch": ctx.platform.arch, - } - if extra: - template_ctx.update(extra) - return template_ctx - - -def render_template_value(value: Any, template_ctx: Dict[str, Any]) -> Any: - if isinstance(value, str): - return substitute_template(value, template_ctx) - if isinstance(value, list): - return [render_template_value(item, template_ctx) for item in value] - if isinstance(value, dict): - return {key: render_template_value(item, template_ctx) for key, item in value.items()} - return value - - -def resolve_binary_platform_vars(ctx: FlowContext, spec: Dict[str, Any]) -> Dict[str, str]: - platform_vars = { - "os": ctx.platform.os, - "arch": ctx.platform.arch, - } - platform_map = spec.get("platform-map", {}) - if isinstance(platform_map, dict): - for key in platform_lookup_keys(ctx): - mapping = platform_map.get(key) - if isinstance(mapping, dict): - for map_key, map_value in mapping.items(): - if isinstance(map_value, str): - platform_vars[map_key] = map_value - break - return platform_vars - - -def resolve_binary_asset(ctx: FlowContext, spec: Dict[str, Any], template_ctx: Dict[str, Any]) -> str: - assets = spec.get("assets", {}) - if isinstance(assets, dict) and assets: - for key in platform_lookup_keys(ctx): - value = assets.get(key) - if isinstance(value, str) and value: - return substitute_template(value, template_ctx) - raise FlowError( - f"No binary asset mapping for platform {ctx.platform.platform} in package '{spec['name']}'" - ) - - pattern = spec.get("asset-pattern") - if not isinstance(pattern, str) or not pattern: - raise FlowError( - f"Binary package '{spec['name']}' must define either 'assets' or 'asset-pattern'" - ) - return substitute_template(pattern, template_ctx) - - -def resolve_binary_download_url( - spec: Dict[str, Any], - asset_name: str, - template_ctx: Dict[str, Any], -) -> str: - source = spec.get("source") - if not isinstance(source, str) or not source: - raise FlowError(f"Binary package '{spec['name']}' is missing 'source'") - - version = str(spec.get("version", "")) - if source.startswith("github:"): - owner_repo = source[len("github:") :] - if not owner_repo: - raise FlowError(f"Invalid github source in package '{spec['name']}'") - if not version: - raise FlowError(f"Binary package '{spec['name']}' requires 'version'") - return f"https://github.com/{owner_repo}/releases/download/v{version}/{asset_name}" - - rendered_source = substitute_template(source, template_ctx) - if not asset_name or rendered_source.endswith(asset_name): - return rendered_source - if rendered_source.endswith("/"): - return rendered_source + asset_name - return f"{rendered_source}/{asset_name}" - - -def strip_prefix(path: Path, prefix: Path) -> Path: - try: - return path.relative_to(prefix) - except ValueError: - return path - - -def validate_declared_install_path(package_name: str, declared_path: Path) -> None: - if declared_path.is_absolute(): - raise FlowError(f"Install path for '{package_name}' must be relative: {declared_path}") - if any(part == ".." for part in declared_path.parts): - raise FlowError( - f"Install path for '{package_name}' must not include parent traversal: {declared_path}" - ) - - -def install_destination(kind: str) -> Path: - home = Path.home() - if kind == "bin": - return home / ".local" / "bin" - if kind == "share": - return home / ".local" / "share" - if kind == "man": - return home / ".local" / "share" / "man" - if kind == "lib": - return home / ".local" / "lib" - raise FlowError(f"Unsupported install section: {kind}") - - -def install_strip_prefix(kind: str) -> Path: - if kind == "bin": - return Path("bin") - if kind == "share": - return Path("share") - if kind == "man": - return Path("share") / "man" - if kind == "lib": - return Path("lib") - return Path(".") - - -class BinaryInstaller: - def __init__(self, ctx: FlowContext): - self.ctx = ctx - self.fs = ctx.runtime.fs - - def copy_install_item(self, kind: str, src: Path, declared_path: Path) -> None: - destination_root = install_destination(kind) - stripped = strip_prefix(declared_path, install_strip_prefix(kind)) - destination = destination_root / stripped - - if src.is_dir(): - self.fs.copy_tree(src, destination) - else: - self.fs.copy_file(src, destination) - if kind == "bin": - destination.chmod(destination.stat().st_mode | 0o111) - - def install(self, spec: Dict[str, Any], extra_env: Dict[str, str], *, dry_run: bool) -> None: - version = str(spec.get("version", "")) - platform_vars = resolve_binary_platform_vars(self.ctx, spec) - template_ctx = profile_template_context( - self.ctx, - extra_env, - {"name": spec["name"], "version": version, **platform_vars}, - ) - - asset_name = resolve_binary_asset(self.ctx, spec, template_ctx) - template_ctx["asset"] = asset_name - download_url = resolve_binary_download_url(spec, asset_name, template_ctx) - template_ctx["downloadUrl"] = download_url - - if dry_run: - self.ctx.console.info(f"[{spec['name']}] Would download: {download_url}") - return - - install_map = spec.get("install", {}) - if not isinstance(install_map, dict) or not install_map: - raise FlowError(f"Binary package '{spec['name']}' must define non-empty 'install'") - - with tempfile.TemporaryDirectory(prefix=f"flow-{spec['name']}-") as tmp: - tmp_dir = Path(tmp) - archive_path = tmp_dir / asset_name - extracted = tmp_dir / "extract" - - self.ctx.console.info(f"Downloading {spec['name']} from {download_url}") - with urllib.request.urlopen(download_url, timeout=60) as response: - self.fs.write_bytes(archive_path, response.read()) - - self.fs.ensure_dir(extracted) - try: - shutil.unpack_archive(str(archive_path), str(extracted)) - except (shutil.ReadError, ValueError) as exc: - raise FlowError(f"Could not extract archive for '{spec['name']}': {exc}") from exc - - extract_dir_value = substitute_template(str(spec.get("extract-dir", ".")), template_ctx) - source_root = extracted if extract_dir_value == "." else extracted / extract_dir_value - if not source_root.exists(): - raise FlowError( - f"extract-dir '{extract_dir_value}' not found for package '{spec['name']}'" - ) - source_root_resolved = source_root.resolve(strict=False) - - for kind in ("bin", "share", "man", "lib"): - items = install_map.get(kind, []) - if not isinstance(items, list): - continue - for raw_item in items: - if not isinstance(raw_item, str): - continue - rendered = substitute_template(raw_item, template_ctx) - declared_path = Path(rendered) - validate_declared_install_path(spec["name"], declared_path) - source = (source_root / declared_path).resolve(strict=False) - if not str(source).startswith(str(source_root_resolved)): - raise FlowError( - f"Install path escapes extract-dir for '{spec['name']}': {declared_path}" - ) - if not source.exists(): - raise FlowError( - f"Install path not found for '{spec['name']}': {declared_path}" - ) - self.copy_install_item(kind, source, declared_path) diff --git a/src/flow/services/ssh.py b/src/flow/services/ssh.py deleted file mode 100644 index 16d895e..0000000 --- a/src/flow/services/ssh.py +++ /dev/null @@ -1,184 +0,0 @@ -"""SSH target parsing and connection behavior for `flow enter`.""" - -from __future__ import annotations - -import getpass -import os -from typing import Optional - -from flow.core.config import FlowContext -from flow.core.errors import FlowError - -# Default host templates per platform -HOST_TEMPLATES = { - "orb": ".orb", - "utm": ".utm.local", - "core": ".core.lan", -} - - -def parse_target(target: str) -> tuple[Optional[str], Optional[str], Optional[str]]: - """Parse [user@]namespace@platform into (user, namespace, platform).""" - user = None - namespace = None - platform = None - - if "@" in target: - platform = target.rsplit("@", 1)[1] - rest = target.rsplit("@", 1)[0] - else: - rest = target - - if "@" in rest: - user = rest.rsplit("@", 1)[0] - namespace = rest.rsplit("@", 1)[1] - else: - namespace = rest - - return user, namespace, platform - - -def build_destination(user: str, host: str, preserve_host_user: bool = False) -> str: - if "@" in host: - host_user, host_name = host.rsplit("@", 1) - effective_user = host_user if preserve_host_user else (user or host_user) - return f"{effective_user}@{host_name}" - if not user: - return host - return f"{user}@{host}" - - -def terminfo_fix_command(term: Optional[str], destination: str) -> Optional[str]: - normalized_term = (term or "").strip().lower() - - if normalized_term == "xterm-ghostty": - return f"infocmp -x xterm-ghostty | ssh {destination} -- tic -x -" - - if normalized_term == "wezterm": - return ( - f"ssh {destination} -- sh -lc " - "'tempfile=$(mktemp) && curl -fsSL -o \"$tempfile\" " - "https://raw.githubusercontent.com/wezterm/wezterm/main/termwiz/data/wezterm.terminfo " - "&& tic -x -o ~/.terminfo \"$tempfile\" && rm \"$tempfile\"'" - ) - - return None - - -def handle_terminfo_warning( - ctx: FlowContext, - term: Optional[str], - destination: str, - dry_run: bool, -) -> bool: - install_cmd = terminfo_fix_command(term, destination) - if not install_cmd: - return True - - ctx.console.warn( - f"Detected TERM={term}. Remote host may be missing this terminfo entry." - ) - ctx.console.info("flow will not install or modify terminfo on the target automatically.") - ctx.console.info("If needed, run this command manually before reconnecting:") - print(f" {install_cmd}") - - if dry_run or not os.isatty(0): - return True - - response = "" - try: - response = input("Continue with SSH connection? [Y/n] ").strip().lower() - except EOFError: - return True - - if response in {"n", "no"}: - ctx.console.warn("Cancelled before opening SSH session") - return False - - return True - - -class EnterService: - """Resolve enter targets and execute the SSH handoff.""" - - def __init__(self, ctx: FlowContext): - self.ctx = ctx - - def run(self, args) -> None: - if os.environ.get("DF_NAMESPACE") and os.environ.get("DF_PLATFORM"): - ns = os.environ["DF_NAMESPACE"] - plat = os.environ["DF_PLATFORM"] - raise FlowError( - f"Not recommended inside an instance. Currently in: {ns}@{plat}" - ) - - user, namespace, platform = parse_target(args.target) - - if args.user: - user = args.user - if args.namespace: - namespace = args.namespace - if args.platform: - platform = args.platform - - user_was_explicit = bool(user) - - if not user: - user = os.environ.get("USER") or getpass.getuser() - if not namespace: - raise FlowError("Namespace is required in target") - if not platform: - raise FlowError("Platform is required in target") - - host_template = HOST_TEMPLATES.get(platform) - ssh_identity = None - - for target in self.ctx.config.targets: - if target.namespace == namespace and target.platform == platform: - host_template = target.ssh_host - ssh_identity = target.ssh_identity - break - - if not host_template: - raise FlowError(f"Unknown platform: {platform}") - - ssh_host = host_template.replace("", namespace) - destination = build_destination( - user, - ssh_host, - preserve_host_user=not user_was_explicit, - ) - - if not handle_terminfo_warning( - self.ctx, - os.environ.get("TERM"), - destination, - dry_run=args.dry_run, - ): - raise FlowError("Cancelled before opening SSH session") - - ssh_cmd = ["ssh", "-tt"] - if ssh_identity: - ssh_cmd.extend(["-i", os.path.expanduser(ssh_identity)]) - ssh_cmd.append(destination) - - if not args.no_tmux: - ssh_cmd.extend( - [ - "tmux", - "new-session", - "-As", - args.session, - "-e", - f"DF_NAMESPACE={namespace}", - "-e", - f"DF_PLATFORM={platform}", - ] - ) - - if args.dry_run: - self.ctx.console.info("Dry run command:") - print(" " + " ".join(ssh_cmd)) - return - - os.execvp("ssh", ssh_cmd) diff --git a/tests/test_action.py b/tests/test_action.py deleted file mode 100644 index 0919380..0000000 --- a/tests/test_action.py +++ /dev/null @@ -1,115 +0,0 @@ -"""Tests for flow.core.action.""" - -from flow.core.action import Action, ActionExecutor -from flow.core.console import ConsoleLogger - - -def test_action_defaults(): - a = Action(type="test", description="Test action") - assert a.status == "pending" - assert a.error is None - assert a.skip_on_error is True - assert a.os_filter is None - assert a.data == {} - - -def test_executor_register_and_execute(capsys): - console = ConsoleLogger() - executor = ActionExecutor(console) - results = [] - - def handler(data): - results.append(data["key"]) - - executor.register("test-action", handler) - - actions = [ - Action(type="test-action", description="Do thing", data={"key": "value1"}), - Action(type="test-action", description="Do another", data={"key": "value2"}), - ] - - executor.execute(actions, current_os="linux") - assert results == ["value1", "value2"] - assert actions[0].status == "completed" - assert actions[1].status == "completed" - - -def test_executor_dry_run(capsys): - console = ConsoleLogger() - executor = ActionExecutor(console) - executed = [] - - executor.register("test", lambda data: executed.append(1)) - - actions = [Action(type="test", description="Should not run")] - executor.execute(actions, dry_run=True) - assert executed == [] # Nothing executed - out = capsys.readouterr().out - assert "EXECUTION PLAN" in out - - -def test_executor_skip_on_error(capsys): - console = ConsoleLogger() - executor = ActionExecutor(console) - - def failing_handler(data): - raise RuntimeError("boom") - - executor.register("fail", failing_handler) - - actions = [ - Action(type="fail", description="Will fail", skip_on_error=True), - Action(type="fail", description="Should still run", skip_on_error=True), - ] - - executor.execute(actions, current_os="linux") - assert actions[0].status == "skipped" - assert actions[1].status == "skipped" - - -def test_executor_critical_failure_stops(capsys): - console = ConsoleLogger() - executor = ActionExecutor(console) - - def failing_handler(data): - raise RuntimeError("critical failure") - - executor.register("fail", failing_handler) - executor.register("ok", lambda data: None) - - actions = [ - Action(type="fail", description="Critical", skip_on_error=False), - Action(type="ok", description="Should not run"), - ] - - executor.execute(actions, current_os="linux") - assert actions[0].status == "failed" - assert actions[1].status == "pending" # Never reached - - -def test_executor_os_filter(capsys): - console = ConsoleLogger() - executor = ActionExecutor(console) - executed = [] - - executor.register("test", lambda data: executed.append(data.get("name"))) - - actions = [ - Action(type="test", description="Linux only", data={"name": "linux"}, os_filter="linux"), - Action(type="test", description="macOS only", data={"name": "macos"}, os_filter="macos"), - Action(type="test", description="Any OS", data={"name": "any"}), - ] - - executor.execute(actions, current_os="linux") - assert "linux" in executed - assert "any" in executed - assert "macos" not in executed - - -def test_executor_no_handler(capsys): - console = ConsoleLogger() - executor = ActionExecutor(console) - - actions = [Action(type="unknown", description="No handler registered")] - executor.execute(actions, current_os="linux") - assert actions[0].status == "skipped" diff --git a/tests/test_bootstrap.py b/tests/test_bootstrap.py deleted file mode 100644 index 0868705..0000000 --- a/tests/test_bootstrap.py +++ /dev/null @@ -1,215 +0,0 @@ -"""Tests for flow.commands.bootstrap helpers and schema behavior.""" - -import os -from pathlib import Path - -import pytest - -from flow.commands.bootstrap import ( - _ensure_required_variables, - _get_profiles, - _install_binary_package, - _normalize_profile_package_entry, - _resolve_package_manager, - _resolve_package_spec, - _resolve_pkg_source_name, -) -from flow.core.config import AppConfig, FlowContext -from flow.core.console import ConsoleLogger -from flow.core.platform import PlatformInfo - - -@pytest.fixture -def ctx(): - return FlowContext( - config=AppConfig(), - manifest={ - "packages": [ - { - "name": "fd", - "type": "pkg", - "sources": {"apt": "fd-find", "dnf": "fd-find", "brew": "fd"}, - }, - { - "name": "neovim", - "type": "binary", - "source": "github:neovim/neovim", - "version": "0.10.4", - "asset-pattern": "nvim-{{os}}-{{arch}}.tar.gz", - "platform-map": {"linux-x64": {"os": "linux", "arch": "x64"}}, - "install": {"bin": ["bin/nvim"]}, - }, - ] - }, - platform=PlatformInfo(os="linux", arch="x64", platform="linux-x64"), - console=ConsoleLogger(), - ) - - -def test_get_profiles_from_manifest(ctx): - ctx.manifest = {"profiles": {"linux": {"os": "linux"}}} - assert "linux" in _get_profiles(ctx) - - -def test_get_profiles_rejects_environments(ctx): - ctx.manifest = {"environments": {"legacy": {"os": "linux"}}} - with pytest.raises(RuntimeError, match="no longer supported"): - _get_profiles(ctx) - - -def test_resolve_package_manager_explicit_value(ctx): - assert _resolve_package_manager(ctx, {"os": "linux", "package-manager": "dnf"}) == "dnf" - - -def test_resolve_package_manager_linux_auto_apt(monkeypatch, ctx): - monkeypatch.setattr("flow.services.bootstrap.shutil.which", lambda name: "/usr/bin/apt" if name == "apt" else None) - assert _resolve_package_manager(ctx, {"os": "linux"}) == "apt" - - -def test_resolve_package_manager_linux_auto_dnf(monkeypatch, ctx): - monkeypatch.setattr("flow.services.bootstrap.shutil.which", lambda name: "/usr/bin/dnf" if name == "dnf" else None) - assert _resolve_package_manager(ctx, {"os": "linux"}) == "dnf" - - -def test_resolve_package_manager_requires_os(ctx): - with pytest.raises(RuntimeError, match="must be set"): - _resolve_package_manager(ctx, {}) - - -def test_normalize_package_entry_string(): - assert _normalize_profile_package_entry("git") == {"name": "git"} - - -def test_normalize_package_entry_type_prefix(): - assert _normalize_profile_package_entry("cask/wezterm") == {"name": "wezterm", "type": "cask"} - - -def test_normalize_package_entry_object(): - out = _normalize_profile_package_entry({"name": "docker", "allow_sudo": True}) - assert out["name"] == "docker" - assert out["allow_sudo"] is True - - -def test_resolve_package_spec_uses_catalog_type(ctx): - catalog = { - "fd": { - "name": "fd", - "type": "pkg", - "sources": {"apt": "fd-find"}, - } - } - resolved = _resolve_package_spec(catalog, {"name": "fd"}) - assert resolved["type"] == "pkg" - assert resolved["sources"]["apt"] == "fd-find" - - -def test_resolve_package_spec_defaults_to_pkg(ctx): - resolved = _resolve_package_spec({}, {"name": "git"}) - assert resolved["type"] == "pkg" - - -def test_resolve_package_spec_profile_override(ctx): - catalog = { - "neovim": { - "name": "neovim", - "type": "binary", - "version": "0.10.4", - } - } - resolved = _resolve_package_spec(catalog, {"name": "neovim", "post-install": "echo ok"}) - assert resolved["type"] == "binary" - assert resolved["post-install"] == "echo ok" - - -def test_resolve_pkg_source_name_with_mapping(ctx): - spec = {"name": "fd", "sources": {"apt": "fd-find", "dnf": "fd-find", "brew": "fd"}} - assert _resolve_pkg_source_name(spec, "apt") == "fd-find" - assert _resolve_pkg_source_name(spec, "dnf") == "fd-find" - assert _resolve_pkg_source_name(spec, "brew") == "fd" - - -def test_resolve_pkg_source_name_fallback_to_name(ctx): - spec = {"name": "ripgrep", "sources": {"apt": "ripgrep"}} - assert _resolve_pkg_source_name(spec, "dnf") == "ripgrep" - - -def test_ensure_required_variables_missing_raises(): - with pytest.raises(RuntimeError, match="Missing required environment variables"): - _ensure_required_variables({"requires": ["USER_EMAIL", "TARGET_HOSTNAME"]}, {"USER_EMAIL": "a@b"}) - - -def test_ensure_required_variables_accepts_vars(monkeypatch): - env = dict(os.environ) - env["USER_EMAIL"] = "a@b" - env["TARGET_HOSTNAME"] = "devbox" - _ensure_required_variables({"requires": ["USER_EMAIL", "TARGET_HOSTNAME"]}, env) - - -class _FakeResponse: - def __enter__(self): - return self - - def __exit__(self, exc_type, exc, tb): - return False - - def read(self): - return b"archive" - - -def _patch_binary_download(monkeypatch, after_unpack=None): - monkeypatch.setattr( - "flow.services.bootstrap.urllib.request.urlopen", - lambda *args, **kwargs: _FakeResponse(), - ) - - def _fake_unpack(_archive, extract_dir): - extracted = Path(extract_dir) - extracted.mkdir(parents=True, exist_ok=True) - if after_unpack: - after_unpack(extracted) - - monkeypatch.setattr("flow.services.bootstrap.shutil.unpack_archive", _fake_unpack) - - -def test_install_binary_package_rejects_absolute_declared_path(monkeypatch, tmp_path, ctx): - absolute_item = tmp_path / "outside-bin" - absolute_item.write_text("binary") - - _patch_binary_download(monkeypatch) - monkeypatch.setattr( - "flow.services.bootstrap._copy_install_item", - lambda *args, **kwargs: pytest.fail("_copy_install_item should not be called"), - ) - - spec = { - "name": "demo", - "type": "binary", - "source": "https://example.invalid/demo", - "asset-pattern": "demo.tar.gz", - "install": {"bin": [str(absolute_item)]}, - } - - with pytest.raises(RuntimeError, match="must be relative"): - _install_binary_package(ctx, spec, {}, dry_run=False) - - -def test_install_binary_package_rejects_parent_traversal_declared_path(monkeypatch, ctx): - def _after_unpack(extracted): - (extracted.parent / "escape-bin").write_text("binary") - - _patch_binary_download(monkeypatch, after_unpack=_after_unpack) - monkeypatch.setattr( - "flow.services.bootstrap._copy_install_item", - lambda *args, **kwargs: pytest.fail("_copy_install_item should not be called"), - ) - - spec = { - "name": "demo", - "type": "binary", - "source": "https://example.invalid/demo", - "asset-pattern": "demo.tar.gz", - "install": {"bin": ["../escape-bin"]}, - } - - with pytest.raises(RuntimeError, match="parent traversal"): - _install_binary_package(ctx, spec, {}, dry_run=False) diff --git a/tests/test_commands.py b/tests/test_commands.py deleted file mode 100644 index ddc88d4..0000000 --- a/tests/test_commands.py +++ /dev/null @@ -1,74 +0,0 @@ -"""Tests for command modules — registration and target parsing.""" - -from flow.commands.enter import _parse_target, _terminfo_fix_command -from flow.commands.container import _cname, _parse_image_ref - - -class TestParseTarget: - def test_full_target(self): - user, ns, plat = _parse_target("root@personal@orb") - assert user == "root" - assert ns == "personal" - assert plat == "orb" - - def test_no_user(self): - user, ns, plat = _parse_target("personal@orb") - assert user is None - assert ns == "personal" - assert plat == "orb" - - def test_namespace_only(self): - user, ns, plat = _parse_target("personal") - assert user is None - assert ns == "personal" - assert plat is None - - -class TestTerminfoFixCommand: - def test_ghostty_command(self): - cmd = _terminfo_fix_command("xterm-ghostty", "devbox.core.lan") - assert cmd == "infocmp -x xterm-ghostty | ssh devbox.core.lan -- tic -x -" - - def test_wezterm_command(self): - cmd = _terminfo_fix_command("wezterm", "user@devbox.core.lan") - assert cmd is not None - assert "wezterm.terminfo" in cmd - assert "ssh user@devbox.core.lan" in cmd - - def test_unknown_term(self): - assert _terminfo_fix_command("xterm-256color", "devbox.core.lan") is None - - -class TestCname: - def test_adds_prefix(self): - assert _cname("api") == "dev-api" - - def test_no_double_prefix(self): - assert _cname("dev-api") == "dev-api" - - -class TestParseImageRef: - def test_simple_image(self): - ref, repo, tag, label = _parse_image_ref("node") - assert ref == "registry.tomastm.com/node:latest" - assert tag == "latest" - - def test_tm0_shorthand(self): - ref, repo, tag, label = _parse_image_ref("tm0/node") - assert "registry.tomastm.com" in ref - assert "node" in ref - - def test_docker_shorthand(self): - ref, repo, tag, label = _parse_image_ref("docker/python") - assert "docker.io" in ref - assert "python" in ref - - def test_with_tag(self): - ref, repo, tag, label = _parse_image_ref("node:20") - assert tag == "20" - assert ":20" in ref - - def test_full_registry(self): - ref, repo, tag, label = _parse_image_ref("ghcr.io/user/image:v1") - assert ref == "ghcr.io/user/image:v1" - assert tag == "v1" diff --git a/tests/test_config.py b/tests/test_config.py deleted file mode 100644 index 168242a..0000000 --- a/tests/test_config.py +++ /dev/null @@ -1,77 +0,0 @@ -"""Tests for flow.core.config.""" - -import pytest - -from flow.core.config import AppConfig, load_config, load_manifest - - -def test_load_config_missing_path(tmp_path): - cfg = load_config(tmp_path / "nonexistent") - assert isinstance(cfg, AppConfig) - assert cfg.dotfiles_url == "" - assert cfg.container_registry == "registry.tomastm.com" - assert cfg.dotfiles_pull_before_edit is True - - -def test_load_config_merged_yaml(tmp_path): - (tmp_path / "10-config.yaml").write_text( - "repository:\n" - " dotfiles-url: git@github.com:user/dots.git\n" - " dotfiles-branch: dev\n" - " pull-before-edit: false\n" - "paths:\n" - " projects-dir: ~/code\n" - "defaults:\n" - " container-registry: my.registry.com\n" - " container-tag: v1\n" - " tmux-session: main\n" - "targets:\n" - " personal: orb personal@orb\n" - " work@ec2: work.ec2.internal ~/.ssh/id_work\n" - ) - - cfg = load_config(tmp_path) - assert cfg.dotfiles_url == "git@github.com:user/dots.git" - assert cfg.dotfiles_branch == "dev" - assert cfg.dotfiles_pull_before_edit is False - assert cfg.projects_dir == "~/code" - assert cfg.container_registry == "my.registry.com" - assert cfg.container_tag == "v1" - assert cfg.tmux_session == "main" - assert len(cfg.targets) == 2 - assert cfg.targets[0].namespace == "personal" - assert cfg.targets[1].ssh_identity == "~/.ssh/id_work" - - -def test_load_config_pull_before_edit_string_true(tmp_path): - (tmp_path / "10-config.yaml").write_text( - "repository:\n" - " pull-before-edit: yes\n" - ) - - cfg = load_config(tmp_path) - assert cfg.dotfiles_pull_before_edit is True - - -def test_load_manifest_missing_path(tmp_path): - result = load_manifest(tmp_path / "nonexistent") - assert result == {} - - -def test_load_manifest_valid_directory(tmp_path): - (tmp_path / "manifest.yaml").write_text( - "profiles:\n" - " linux-vm:\n" - " os: linux\n" - " hostname: devbox\n" - ) - result = load_manifest(tmp_path) - assert result["profiles"]["linux-vm"]["os"] == "linux" - - -def test_load_manifest_non_dict_raises(tmp_path): - bad = tmp_path / "bad.yaml" - bad.write_text("- a\n- b\n") - - with pytest.raises(RuntimeError, match="must contain a mapping"): - load_manifest(bad) diff --git a/tests/test_console.py b/tests/test_console.py deleted file mode 100644 index fef2eb4..0000000 --- a/tests/test_console.py +++ /dev/null @@ -1,95 +0,0 @@ -"""Tests for flow.core.console.""" - -from flow.core.console import ConsoleLogger - - -def test_console_info(capsys): - c = ConsoleLogger() - c.info("hello") - out = capsys.readouterr().out - assert "[INFO]" in out - assert "hello" in out - - -def test_console_warn(capsys): - c = ConsoleLogger() - c.warn("caution") - out = capsys.readouterr().out - assert "[WARN]" in out - assert "caution" in out - - -def test_console_error(capsys): - c = ConsoleLogger() - c.error("bad thing") - out = capsys.readouterr().out - assert "[ERROR]" in out - assert "bad thing" in out - - -def test_console_success(capsys): - c = ConsoleLogger() - c.success("done") - out = capsys.readouterr().out - assert "[SUCCESS]" in out - assert "done" in out - - -def test_console_step_lifecycle(capsys): - c = ConsoleLogger() - c.step_start(1, 3, "Test step") - c.step_command("echo hi") - c.step_output("hi") - c.step_complete("Done") - out = capsys.readouterr().out - assert "Step 1/3" in out - assert "$ echo hi" in out - assert "Done" in out - - -def test_console_step_skip(capsys): - c = ConsoleLogger() - c.start_time = 0 - c.step_skip("not needed") - out = capsys.readouterr().out - assert "Skipped" in out - - -def test_console_step_fail(capsys): - c = ConsoleLogger() - c.start_time = 0 - c.step_fail("exploded") - out = capsys.readouterr().out - assert "Failed" in out - - -def test_console_table(capsys): - c = ConsoleLogger() - c.table(["NAME", "VALUE"], [["foo", "bar"], ["baz", "qux"]]) - out = capsys.readouterr().out - assert "NAME" in out - assert "foo" in out - assert "baz" in out - - -def test_console_table_empty(capsys): - c = ConsoleLogger() - c.table(["NAME"], []) - out = capsys.readouterr().out - assert out == "" - - -def test_console_section_header(capsys): - c = ConsoleLogger() - c.section_header("Test", "sub") - out = capsys.readouterr().out - assert "TEST" in out - assert "sub" in out - - -def test_console_plan_header(capsys): - c = ConsoleLogger() - c.plan_header("My Plan", 5) - out = capsys.readouterr().out - assert "MY PLAN" in out - assert "5 actions" in out diff --git a/tests/test_dotfiles.py b/tests/test_dotfiles.py deleted file mode 100644 index 8c51f0c..0000000 --- a/tests/test_dotfiles.py +++ /dev/null @@ -1,104 +0,0 @@ -"""Tests for flow.services.dotfiles discovery and path resolution.""" - -import pytest - -from flow.services.dotfiles import _collect_home_specs, _discover_packages, _resolve_edit_target, _walk_package -from flow.core.config import AppConfig, FlowContext -from flow.core.console import ConsoleLogger -from flow.core.platform import PlatformInfo - - -def _make_tree(tmp_path): - flow_root = tmp_path - shared = flow_root / "_shared" - (shared / "zsh").mkdir(parents=True) - (shared / "zsh" / ".zshrc").write_text("# zsh") - (shared / "tmux").mkdir(parents=True) - (shared / "tmux" / ".tmux.conf").write_text("# tmux") - - profile = flow_root / "work" - (profile / "git").mkdir(parents=True) - (profile / "git" / ".gitconfig").write_text("[user]\nname = Work") - - return tmp_path - - -def _ctx() -> FlowContext: - return FlowContext( - config=AppConfig(), - manifest={"profiles": {"work": {"os": "linux", "configs": {"skip": []}}}}, - platform=PlatformInfo(os="linux", arch="x64", platform="linux-x64"), - console=ConsoleLogger(), - ) - - -def test_discover_packages_shared_only(tmp_path): - tree = _make_tree(tmp_path) - packages = _discover_packages(tree) - assert "zsh" in packages - assert "tmux" in packages - assert "git" not in packages - - -def test_discover_packages_with_profile(tmp_path): - tree = _make_tree(tmp_path) - packages = _discover_packages(tree, profile="work") - assert "zsh" in packages - assert "tmux" in packages - assert "git" in packages - - -def test_discover_packages_profile_overrides_shared(tmp_path): - tree = _make_tree(tmp_path) - profile_zsh = tree / "work" / "zsh" - profile_zsh.mkdir(parents=True) - (profile_zsh / ".zshrc").write_text("# work zsh") - - with pytest.raises(RuntimeError, match="Conflicting dotfile targets"): - _collect_home_specs(_ctx(), tree, tmp_path / "home", "work", set(), None) - - -def test_walk_package_returns_relative_paths(tmp_path): - tree = _make_tree(tmp_path) - source = tree / "_shared" / "zsh" - - pairs = list(_walk_package(source)) - assert len(pairs) == 1 - src, rel = pairs[0] - assert src.name == ".zshrc" - assert str(rel) == ".zshrc" - - -def test_resolve_edit_target_package(tmp_path): - tree = _make_tree(tmp_path) - target = _resolve_edit_target("zsh", dotfiles_dir=tree) - assert target == tree / "_shared" / "zsh" - - -def test_resolve_edit_target_repo_path(tmp_path): - tree = _make_tree(tmp_path) - target = _resolve_edit_target("_shared/zsh/.zshrc", dotfiles_dir=tree) - assert target == tree / "_shared" / "zsh" / ".zshrc" - - -def test_resolve_edit_target_rejects_parent_traversal(tmp_path): - tree = _make_tree(tmp_path / "repo") - outside = tmp_path / "outside.txt" - outside.write_text("secret") - - target = _resolve_edit_target("../outside.txt", dotfiles_dir=tree) - assert target is None - - -def test_resolve_edit_target_rejects_nested_repo_escape(tmp_path): - tree = _make_tree(tmp_path / "repo") - outside = tmp_path / "escape.txt" - outside.write_text("secret") - - target = _resolve_edit_target("_shared/../../escape.txt", dotfiles_dir=tree) - assert target is None - - -def test_resolve_edit_target_missing_returns_none(tmp_path): - tree = _make_tree(tmp_path) - assert _resolve_edit_target("does-not-exist", dotfiles_dir=tree) is None diff --git a/tests/test_dotfiles_e2e_container.py b/tests/test_dotfiles_e2e_container.py deleted file mode 100644 index c2bff38..0000000 --- a/tests/test_dotfiles_e2e_container.py +++ /dev/null @@ -1,296 +0,0 @@ -"""Containerized e2e tests for dotfiles link safety. - -These tests are opt-in and run only when FLOW_RUN_E2E_CONTAINER=1. -""" - -import os -import shutil -import subprocess -import uuid -from pathlib import Path - -import pytest - - -REPO_ROOT = Path(__file__).resolve().parents[1] - - -def _runtime_available(runtime: str) -> bool: - if shutil.which(runtime) is None: - return False - - result = subprocess.run( - [runtime, "info"], - capture_output=True, - text=True, - check=False, - ) - return result.returncode == 0 - - -def _container_runtime() -> str | None: - preferred = os.environ.get("FLOW_E2E_CONTAINER_RUNTIME") - candidates = [preferred] if preferred else ["podman", "docker"] - - for runtime in candidates: - if not runtime: - continue - if _runtime_available(runtime): - return runtime - - return None - - -def _require_container_e2e() -> str: - if os.environ.get("FLOW_RUN_E2E_CONTAINER") != "1": - pytest.skip("Set FLOW_RUN_E2E_CONTAINER=1 to run container e2e tests") - runtime = _container_runtime() - if runtime is None: - pytest.skip("Podman or Docker is required for container e2e tests") - return runtime - - -@pytest.fixture(scope="module") -def e2e_runtime(): - return _require_container_e2e() - - -@pytest.fixture(scope="module") -def e2e_image(tmp_path_factory, e2e_runtime): - runtime = e2e_runtime - - context_dir = tmp_path_factory.mktemp("flow-e2e-docker-context") - dockerfile = context_dir / "Dockerfile" - dockerfile.write_text( - "FROM python:3.11-slim\n" - "RUN apt-get update && apt-get install -y --no-install-recommends sudo && rm -rf /var/lib/apt/lists/*\n" - "RUN pip install --no-cache-dir pyyaml\n" - "RUN useradd -m -s /bin/bash flow\n" - "RUN echo 'flow ALL=(ALL) NOPASSWD:ALL' > /etc/sudoers.d/flow && chmod 440 /etc/sudoers.d/flow\n" - "USER flow\n" - "WORKDIR /workspace\n" - ) - - tag = f"flow-e2e-{uuid.uuid4().hex[:10]}" - subprocess.run( - [runtime, "build", "-t", tag, str(context_dir)], - check=True, - capture_output=True, - text=True, - ) - - try: - yield tag - finally: - subprocess.run([runtime, "rmi", "-f", tag], capture_output=True, text=True, check=False) - - -def _run_in_container(runtime: str, image_tag: str, script: str) -> subprocess.CompletedProcess: - return subprocess.run( - [ - runtime, - "run", - "--rm", - "-v", - f"{REPO_ROOT}:/workspace/flow-cli:ro", - image_tag, - "bash", - "-lc", - script, - ], - capture_output=True, - text=True, - check=False, - ) - - -def _assert_ok(run: subprocess.CompletedProcess) -> None: - if run.returncode != 0: - raise AssertionError(f"Container e2e failed:\nSTDOUT:\n{run.stdout}\nSTDERR:\n{run.stderr}") - - -def test_e2e_link_and_undo_with_root_targets(e2e_runtime, e2e_image): - script = r""" -set -euo pipefail -export HOME=/home/flow -export XDG_DATA_HOME=/tmp/xdg-data -export XDG_CONFIG_HOME=/tmp/xdg-config -export XDG_STATE_HOME=/tmp/xdg-state -mkdir -p "$XDG_DATA_HOME/flow/dotfiles" "$XDG_CONFIG_HOME/flow" "$XDG_STATE_HOME/flow" - -dot="$XDG_DATA_HOME/flow/dotfiles" -mkdir -p "$dot/_shared/zsh" -mkdir -p "$dot/_shared/rootpkg/_root/tmp" -echo '# managed zshrc' > "$dot/_shared/zsh/.zshrc" -echo 'root-target' > "$dot/_shared/rootpkg/_root/tmp/flow-e2e-root-target" - -echo '# before' > "$HOME/.zshrc" - -PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --force -test -L "$HOME/.zshrc" -test -L /tmp/flow-e2e-root-target - -PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles undo -test -f "$HOME/.zshrc" -test ! -L "$HOME/.zshrc" -grep -q '^# before$' "$HOME/.zshrc" -test ! -e /tmp/flow-e2e-root-target -""" - _assert_ok(_run_in_container(e2e_runtime, e2e_image, script)) - - -def test_e2e_dry_run_force_is_read_only_in_both_flag_orders(e2e_runtime, e2e_image): - script = r""" -set -euo pipefail -export HOME=/home/flow -export XDG_DATA_HOME=/tmp/xdg-data -export XDG_CONFIG_HOME=/tmp/xdg-config -export XDG_STATE_HOME=/tmp/xdg-state -mkdir -p "$XDG_DATA_HOME/flow/dotfiles" "$XDG_CONFIG_HOME/flow" "$XDG_STATE_HOME/flow" - -dot="$XDG_DATA_HOME/flow/dotfiles" -mkdir -p "$dot/_shared/zsh" -echo '# managed zshrc' > "$dot/_shared/zsh/.zshrc" -echo '# original' > "$HOME/.zshrc" - -PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --dry-run --force -PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --force --dry-run - -test -f "$HOME/.zshrc" -test ! -L "$HOME/.zshrc" -grep -q '^# original$' "$HOME/.zshrc" - -state="$XDG_STATE_HOME/flow/linked.json" -if [ -f "$state" ]; then - python - "$state" <<'PY' -import json, sys -data = json.load(open(sys.argv[1], encoding="utf-8")) -assert data.get("links", {}) == {}, data -assert "last_transaction" not in data, data -PY -fi -""" - _assert_ok(_run_in_container(e2e_runtime, e2e_image, script)) - - -def test_e2e_unmanaged_conflict_without_force_is_non_destructive(e2e_runtime, e2e_image): - script = r""" -set -euo pipefail -export HOME=/home/flow -export XDG_DATA_HOME=/tmp/xdg-data -export XDG_CONFIG_HOME=/tmp/xdg-config -export XDG_STATE_HOME=/tmp/xdg-state -mkdir -p "$XDG_DATA_HOME/flow/dotfiles" "$XDG_CONFIG_HOME/flow" "$XDG_STATE_HOME/flow" - -dot="$XDG_DATA_HOME/flow/dotfiles" -mkdir -p "$dot/_shared/zsh" -echo '# managed zshrc' > "$dot/_shared/zsh/.zshrc" -echo '# user-file' > "$HOME/.zshrc" - -set +e -PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link -rc=$? -set -e -test "$rc" -ne 0 - -test -f "$HOME/.zshrc" -test ! -L "$HOME/.zshrc" -grep -q '^# user-file$' "$HOME/.zshrc" -""" - _assert_ok(_run_in_container(e2e_runtime, e2e_image, script)) - - -def test_e2e_managed_drift_requires_force(e2e_runtime, e2e_image): - script = r""" -set -euo pipefail -export HOME=/home/flow -export XDG_DATA_HOME=/tmp/xdg-data -export XDG_CONFIG_HOME=/tmp/xdg-config -export XDG_STATE_HOME=/tmp/xdg-state -mkdir -p "$XDG_DATA_HOME/flow/dotfiles" "$XDG_CONFIG_HOME/flow" "$XDG_STATE_HOME/flow" - -dot="$XDG_DATA_HOME/flow/dotfiles" -mkdir -p "$dot/_shared/zsh" -echo '# managed zshrc' > "$dot/_shared/zsh/.zshrc" - -PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --force -test -L "$HOME/.zshrc" - -rm -f "$HOME/.zshrc" -echo '# drifted-manual' > "$HOME/.zshrc" - -set +e -PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link -rc=$? -set -e -test "$rc" -ne 0 -test -f "$HOME/.zshrc" -test ! -L "$HOME/.zshrc" -grep -q '^# drifted-manual$' "$HOME/.zshrc" -""" - _assert_ok(_run_in_container(e2e_runtime, e2e_image, script)) - - -def test_e2e_directory_conflict_is_atomic_even_with_force(e2e_runtime, e2e_image): - script = r""" -set -euo pipefail -export HOME=/home/flow -export XDG_DATA_HOME=/tmp/xdg-data -export XDG_CONFIG_HOME=/tmp/xdg-config -export XDG_STATE_HOME=/tmp/xdg-state -mkdir -p "$XDG_DATA_HOME/flow/dotfiles" "$XDG_CONFIG_HOME/flow" "$XDG_STATE_HOME/flow" - -dot="$XDG_DATA_HOME/flow/dotfiles" -mkdir -p "$dot/_shared/zsh" "$dot/_shared/git" -echo '# managed zshrc' > "$dot/_shared/zsh/.zshrc" -echo '[user]' > "$dot/_shared/git/.gitconfig" - -mkdir -p "$HOME/.zshrc" - -set +e -PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --force -rc=$? -set -e -test "$rc" -ne 0 - -test -d "$HOME/.zshrc" -test ! -e "$HOME/.gitconfig" -""" - _assert_ok(_run_in_container(e2e_runtime, e2e_image, script)) - - -def test_e2e_undo_after_failed_followup_link_restores_last_transaction(e2e_runtime, e2e_image): - script = r""" -set -euo pipefail -export HOME=/home/flow -export XDG_DATA_HOME=/tmp/xdg-data -export XDG_CONFIG_HOME=/tmp/xdg-config -export XDG_STATE_HOME=/tmp/xdg-state -mkdir -p "$XDG_DATA_HOME/flow/dotfiles" "$XDG_CONFIG_HOME/flow" "$XDG_STATE_HOME/flow" - -dot="$XDG_DATA_HOME/flow/dotfiles" -mkdir -p "$dot/_shared/a" "$dot/_shared/b" -echo '# aaa' > "$dot/_shared/a/.a" -echo '# bbb' > "$dot/_shared/b/.b" - -echo '# pre-a' > "$HOME/.a" -echo '# pre-b' > "$HOME/.b" - -PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --force a -test -L "$HOME/.a" - -# Turn .b into a directory to force a fatal conflict, while .a stays desired and unchanged. -rm -f "$HOME/.b" -mkdir -p "$HOME/.b" -set +e -PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --force -rc=$? -set -e -test "$rc" -ne 0 - -PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles undo -test -f "$HOME/.a" -test ! -L "$HOME/.a" -grep -q '^# pre-a$' "$HOME/.a" -""" - _assert_ok(_run_in_container(e2e_runtime, e2e_image, script)) diff --git a/tests/test_dotfiles_folding.py b/tests/test_dotfiles_folding.py deleted file mode 100644 index 60c0f20..0000000 --- a/tests/test_dotfiles_folding.py +++ /dev/null @@ -1,622 +0,0 @@ -"""Tests for dotfiles link planning, root markers, and module sources.""" - -from argparse import Namespace -import json -import subprocess -from pathlib import Path - -import pytest - -from flow.services.dotfiles import ( - LinkSpec, - _collect_home_specs, - _list_profiles, - _load_link_specs_from_state, - _load_state, - _pull_requires_ack, - _resolved_package_source, - _run_sudo, - run_relink, - run_undo, - _save_link_specs_to_state, - _sync_to_desired, - _sync_modules, -) -from flow.core.config import AppConfig, FlowContext -from flow.core.console import ConsoleLogger -from flow.core.platform import PlatformInfo - - -def _ctx() -> FlowContext: - return FlowContext( - config=AppConfig(), - manifest={"profiles": {"work": {"os": "linux", "configs": {"skip": []}}}}, - platform=PlatformInfo(os="linux", arch="x64", platform="linux-x64"), - console=ConsoleLogger(), - ) - - -def _make_flow_tree(tmp_path: Path) -> Path: - flow_root = tmp_path - - (flow_root / "_shared" / "git").mkdir(parents=True) - (flow_root / "_shared" / "git" / ".gitconfig").write_text("shared") - (flow_root / "_shared" / "tmux").mkdir(parents=True) - (flow_root / "_shared" / "tmux" / ".tmux.conf").write_text("tmux") - - (flow_root / "work" / "git").mkdir(parents=True) - (flow_root / "work" / "git" / ".gitconfig").write_text("profile") - - (flow_root / "_shared" / "dnsmasq" / "_root" / "etc").mkdir(parents=True) - (flow_root / "_shared" / "dnsmasq" / "_root" / "etc" / "hostname").write_text("devbox") - - return flow_root - - -def test_list_profiles_ignores_reserved_dirs(tmp_path): - flow_root = _make_flow_tree(tmp_path) - profiles = _list_profiles(flow_root) - assert profiles == ["work"] - - -def test_collect_home_specs_conflict_fails(tmp_path): - flow_root = _make_flow_tree(tmp_path) - home = tmp_path / "home" - home.mkdir() - - with pytest.raises(RuntimeError, match="Conflicting dotfile targets"): - _collect_home_specs(_ctx(), flow_root, home, "work", set(), None) - - -def test_collect_home_specs_maps_root_marker_to_absolute(tmp_path): - flow_root = tmp_path - (flow_root / "_shared" / "dnsmasq" / "_root" / "opt" / "homebrew" / "etc").mkdir(parents=True) - src = flow_root / "_shared" / "dnsmasq" / "_root" / "opt" / "homebrew" / "etc" / "dnsmasq.conf" - src.write_text("conf") - - home = tmp_path / "home" - home.mkdir() - - specs = _collect_home_specs(_ctx(), flow_root, home, None, set(), None) - assert Path("/opt/homebrew/etc/dnsmasq.conf") in specs - assert specs[Path("/opt/homebrew/etc/dnsmasq.conf")].source == src - - -def test_collect_home_specs_skip_root_marker(tmp_path): - flow_root = tmp_path - (flow_root / "_shared" / "dnsmasq" / "_root" / "etc").mkdir(parents=True) - (flow_root / "_shared" / "dnsmasq" / "_root" / "etc" / "hostname").write_text("devbox") - - home = tmp_path / "home" - home.mkdir() - - specs = _collect_home_specs(_ctx(), flow_root, home, None, {"_root"}, None) - assert Path("/etc/hostname") not in specs - - -def test_state_round_trip(tmp_path, monkeypatch): - state_file = tmp_path / "linked.json" - monkeypatch.setattr("flow.services.dotfiles.LINKED_STATE", state_file) - - specs = { - Path("/home/user/.gitconfig"): LinkSpec( - source=Path("/repo/_shared/git/.gitconfig"), - target=Path("/home/user/.gitconfig"), - package="_shared/git", - ) - } - _save_link_specs_to_state(specs) - - loaded = _load_link_specs_from_state() - assert Path("/home/user/.gitconfig") in loaded - assert loaded[Path("/home/user/.gitconfig")].package == "_shared/git" - - -def test_state_old_format_rejected(tmp_path, monkeypatch): - state_file = tmp_path / "linked.json" - monkeypatch.setattr("flow.services.dotfiles.LINKED_STATE", state_file) - state_file.write_text( - json.dumps( - { - "links": { - "zsh": { - "/home/user/.zshrc": "/repo/.zshrc", - } - } - } - ) - ) - - with pytest.raises(RuntimeError, match="Unsupported linked state format"): - _load_link_specs_from_state() - - -def test_module_source_requires_sync(tmp_path): - package_root = tmp_path / "_shared" / "nvim" - module_mount = package_root / ".config" / "nvim" - module_mount.mkdir(parents=True) - (module_mount / "_module.yaml").write_text( - "source: github:dummy/example\n" - "ref:\n" - " branch: main\n" - ) - - with pytest.raises(RuntimeError, match="Run 'flow dotfiles sync' first"): - _resolved_package_source(_ctx(), "_shared/nvim", package_root) - - -def test_sync_modules_populates_cache_and_resolves_source(tmp_path, monkeypatch): - module_src = tmp_path / "module-src" - module_src.mkdir() - subprocess.run(["git", "init", "-b", "main", str(module_src)], check=True) - (module_src / "init.lua").write_text("-- module") - subprocess.run(["git", "-C", str(module_src), "add", "."], check=True) - subprocess.run( - [ - "git", - "-C", - str(module_src), - "-c", - "user.name=Flow Test", - "-c", - "user.email=flow-test@example.com", - "commit", - "-m", - "init module", - ], - check=True, - ) - - dotfiles = tmp_path / "dotfiles" - package_root = dotfiles / "_shared" / "nvim" - module_mount = package_root / ".config" / "nvim" - module_mount.mkdir(parents=True) - (module_mount / "_module.yaml").write_text( - f"source: {module_src}\n" - "ref:\n" - " branch: main\n" - ) - (package_root / "notes.txt").write_text("ignore me") - - monkeypatch.setattr("flow.services.dotfiles.DOTFILES_DIR", dotfiles) - monkeypatch.setattr("flow.services.dotfiles.MODULES_DIR", tmp_path / "modules") - - _sync_modules(_ctx(), verbose=False) - resolved = _resolved_package_source(_ctx(), "_shared/nvim", package_root) - - assert (resolved / "init.lua").exists() - - -def test_module_backed_link_specs_exclude_git_internals(tmp_path, monkeypatch): - module_src = tmp_path / "module-src" - module_src.mkdir() - subprocess.run(["git", "init", "-b", "main", str(module_src)], check=True) - (module_src / "init.lua").write_text("-- module") - subprocess.run(["git", "-C", str(module_src), "add", "."], check=True) - subprocess.run( - [ - "git", - "-C", - str(module_src), - "-c", - "user.name=Flow Test", - "-c", - "user.email=flow-test@example.com", - "commit", - "-m", - "init module", - ], - check=True, - ) - - dotfiles = tmp_path / "dotfiles" - package_root = dotfiles / "_shared" / "nvim" - module_mount = package_root / ".config" / "nvim" - module_mount.mkdir(parents=True) - (module_mount / "_module.yaml").write_text( - f"source: {module_src}\n" - "ref:\n" - " branch: main\n" - ) - - monkeypatch.setattr("flow.services.dotfiles.DOTFILES_DIR", dotfiles) - monkeypatch.setattr("flow.services.dotfiles.MODULES_DIR", tmp_path / "modules") - - _sync_modules(_ctx(), verbose=False) - - home = tmp_path / "home" - home.mkdir() - specs = _collect_home_specs(_ctx(), dotfiles, home, None, set(), None) - - assert home / ".config" / "nvim" / "init.lua" in specs - assert not any(target.relative_to(home).parts[0] == ".git" for target in specs) - - -def test_sync_modules_resolves_relative_source_independent_of_cwd(tmp_path, monkeypatch): - module_src = tmp_path / "module-src" - module_src.mkdir() - subprocess.run(["git", "init", "-b", "main", str(module_src)], check=True) - (module_src / "init.lua").write_text("-- module") - subprocess.run(["git", "-C", str(module_src), "add", "."], check=True) - subprocess.run( - [ - "git", - "-C", - str(module_src), - "-c", - "user.name=Flow Test", - "-c", - "user.email=flow-test@example.com", - "commit", - "-m", - "init module", - ], - check=True, - ) - - dotfiles = tmp_path / "dotfiles" - package_root = dotfiles / "_shared" / "nvim" - module_mount = package_root / ".config" / "nvim" - module_mount.mkdir(parents=True) - relative_source = Path("../../../../../module-src") - (module_mount / "_module.yaml").write_text( - f"source: {relative_source}\n" - "ref:\n" - " branch: main\n" - ) - - unrelated_cwd = tmp_path / "unrelated-cwd" - unrelated_cwd.mkdir() - monkeypatch.chdir(unrelated_cwd) - monkeypatch.setattr("flow.services.dotfiles.DOTFILES_DIR", dotfiles) - monkeypatch.setattr("flow.services.dotfiles.MODULES_DIR", tmp_path / "modules") - - _sync_modules(_ctx(), verbose=False) - resolved = _resolved_package_source(_ctx(), "_shared/nvim", package_root) - - assert (resolved / "init.lua").exists() - - -def test_module_mount_inherits_directory_path(tmp_path, monkeypatch): - module_src = tmp_path / "module-src" - module_src.mkdir() - subprocess.run(["git", "init", "-b", "main", str(module_src)], check=True) - (module_src / "init.lua").write_text("-- module") - (module_src / "lua").mkdir() - (module_src / "lua" / "config.lua").write_text("-- module") - subprocess.run(["git", "-C", str(module_src), "add", "."], check=True) - subprocess.run( - [ - "git", - "-C", - str(module_src), - "-c", - "user.name=Flow Test", - "-c", - "user.email=flow-test@example.com", - "commit", - "-m", - "init module", - ], - check=True, - ) - - dotfiles = tmp_path / "dotfiles" - package_root = dotfiles / "_shared" / "nvim" - module_mount = package_root / ".config" / "nvim" - module_mount.mkdir(parents=True) - (module_mount / "_module.yaml").write_text( - f"source: {module_src}\n" - "ref:\n" - " branch: main\n" - ) - - monkeypatch.setattr("flow.services.dotfiles.DOTFILES_DIR", dotfiles) - monkeypatch.setattr("flow.services.dotfiles.MODULES_DIR", tmp_path / "modules") - _sync_modules(_ctx(), verbose=False) - - home = tmp_path / "home" - home.mkdir() - specs = _collect_home_specs(_ctx(), dotfiles, home, None, set(), None) - - assert home / ".config" / "nvim" / "init.lua" in specs - assert home / ".config" / "nvim" / "lua" / "config.lua" in specs - assert home / "init.lua" not in specs - assert home / "lua" / "config.lua" not in specs - - -def test_pull_requires_ack_only_on_real_updates(): - assert _pull_requires_ack("Already up to date.\n", "") is False - assert _pull_requires_ack("Updating 123..456\n", "") is True - - -def test_run_relink_uses_transactional_link_path(monkeypatch): - calls = [] - - monkeypatch.setattr("flow.services.dotfiles._ensure_flow_dir", lambda _ctx: None) - monkeypatch.setattr( - "flow.services.dotfiles.run_unlink", - lambda _ctx, _args: (_ for _ in ()).throw(AssertionError("run_unlink must not be used")), - ) - - def _fake_run_link(_ctx, args): - calls.append((args.packages, args.profile, args.copy, args.force, args.dry_run)) - - monkeypatch.setattr("flow.services.dotfiles.run_link", _fake_run_link) - - run_relink(_ctx(), Namespace(packages=["git"], profile="work")) - - assert calls == [(["git"], "work", False, False, False)] - - -def test_sync_to_desired_dry_run_force_is_read_only(tmp_path, monkeypatch): - state_file = tmp_path / "linked.json" - monkeypatch.setattr("flow.services.dotfiles.LINKED_STATE", state_file) - monkeypatch.setattr("flow.services.dotfiles._is_in_home", lambda _path, _home: True) - - source = tmp_path / "source" / ".zshrc" - source.parent.mkdir(parents=True) - source.write_text("# new") - - target = tmp_path / "home" / ".zshrc" - target.parent.mkdir(parents=True) - target.write_text("# old") - - desired = { - target: LinkSpec( - source=source, - target=target, - package="_shared/zsh", - ) - } - - _sync_to_desired( - _ctx(), - desired, - force=True, - dry_run=True, - copy=False, - ) - - assert target.exists() - assert not target.is_symlink() - assert target.read_text() == "# old" - assert not state_file.exists() - - -def test_sync_to_desired_force_fails_before_any_writes_on_directory_conflict(tmp_path, monkeypatch): - state_file = tmp_path / "linked.json" - monkeypatch.setattr("flow.services.dotfiles.LINKED_STATE", state_file) - monkeypatch.setattr("flow.services.dotfiles._is_in_home", lambda _path, _home: True) - - source_root = tmp_path / "source" - source_root.mkdir() - source_ok = source_root / "ok" - source_ok.write_text("ok") - source_conflict = source_root / "conflict" - source_conflict.write_text("conflict") - - home = tmp_path / "home" - home.mkdir() - target_ok = home / "a-file" - target_conflict = home / "z-dir" - target_conflict.mkdir() - - desired = { - target_ok: LinkSpec(source=source_ok, target=target_ok, package="_shared/test"), - target_conflict: LinkSpec(source=source_conflict, target=target_conflict, package="_shared/test"), - } - - with pytest.raises(RuntimeError, match="cannot be overwritten"): - _sync_to_desired( - _ctx(), - desired, - force=True, - dry_run=False, - copy=False, - ) - - assert not target_ok.exists() - assert not target_ok.is_symlink() - assert not state_file.exists() - - -def test_undo_restores_previous_file_and_link_state(tmp_path, monkeypatch): - state_file = tmp_path / "linked.json" - monkeypatch.setattr("flow.services.dotfiles.LINKED_STATE", state_file) - monkeypatch.setattr("flow.services.dotfiles.LINK_BACKUP_DIR", tmp_path / "link-backups") - monkeypatch.setattr("flow.services.dotfiles._is_in_home", lambda _path, _home: True) - - source = tmp_path / "source" / ".zshrc" - source.parent.mkdir(parents=True) - source.write_text("# managed") - - target = tmp_path / "home" / ".zshrc" - target.parent.mkdir(parents=True) - target.write_text("# previous") - - desired = { - target: LinkSpec( - source=source, - target=target, - package="_shared/zsh", - ) - } - - _sync_to_desired( - _ctx(), - desired, - force=True, - dry_run=False, - copy=False, - ) - - assert target.is_symlink() - - state_after_link = _load_state() - assert "last_transaction" in state_after_link - tx = state_after_link["last_transaction"] - assert isinstance(tx, dict) - assert tx.get("targets") - - run_undo(_ctx(), Namespace()) - - assert target.exists() - assert not target.is_symlink() - assert target.read_text() == "# previous" - - state_after_undo = _load_state() - assert state_after_undo.get("links") == {} - assert "last_transaction" not in state_after_undo - - -def test_sync_to_desired_persists_incomplete_transaction_on_failure(tmp_path, monkeypatch): - state_file = tmp_path / "linked.json" - monkeypatch.setattr("flow.services.dotfiles.LINKED_STATE", state_file) - monkeypatch.setattr("flow.services.dotfiles.LINK_BACKUP_DIR", tmp_path / "link-backups") - monkeypatch.setattr("flow.services.dotfiles._is_in_home", lambda _path, _home: True) - - source = tmp_path / "source" - source.mkdir() - src_a = source / "a" - src_b = source / "b" - src_a.write_text("a") - src_b.write_text("b") - - home = tmp_path / "home" - home.mkdir() - target_a = home / "a" - target_b = home / "b" - target_a.write_text("old-a") - - desired = { - target_a: LinkSpec(source=src_a, target=target_a, package="_shared/test"), - target_b: LinkSpec(source=src_b, target=target_b, package="_shared/test"), - } - - call_count = {"n": 0} - - def _failing_apply(spec, *, copy, dry_run): # noqa: ARG001 - call_count["n"] += 1 - if call_count["n"] == 2: - raise RuntimeError("simulated failure") - spec.target.parent.mkdir(parents=True, exist_ok=True) - spec.target.symlink_to(spec.source) - return True - - monkeypatch.setattr("flow.services.dotfiles._apply_link_spec", _failing_apply) - - with pytest.raises(RuntimeError, match="simulated failure"): - _sync_to_desired( - _ctx(), - desired, - force=True, - dry_run=False, - copy=False, - ) - - state_after_failure = _load_state() - tx = state_after_failure.get("last_transaction") - assert isinstance(tx, dict) - assert tx.get("incomplete") is True - assert target_a.is_symlink() - - run_undo(_ctx(), Namespace()) - - assert target_a.exists() - assert not target_a.is_symlink() - assert target_a.read_text() == "old-a" - assert not target_b.exists() - assert _load_state().get("links") == {} - - -def test_sync_to_desired_requires_force_to_remove_modified_managed_target(tmp_path, monkeypatch): - state_file = tmp_path / "linked.json" - monkeypatch.setattr("flow.services.dotfiles.LINKED_STATE", state_file) - monkeypatch.setattr("flow.services.dotfiles._is_in_home", lambda _path, _home: True) - - source = tmp_path / "source" / ".old" - source.parent.mkdir(parents=True) - source.write_text("managed") - - target = tmp_path / "home" / ".zshrc" - target.parent.mkdir(parents=True) - target.write_text("user-edited") - - _save_link_specs_to_state( - { - target: LinkSpec( - source=source, - target=target, - package="_shared/zsh", - ) - } - ) - - with pytest.raises(RuntimeError, match="Use --force"): - _sync_to_desired( - _ctx(), - {}, - force=False, - dry_run=False, - copy=False, - ) - - assert target.exists() - assert not target.is_symlink() - assert target.read_text() == "user-edited" - assert target in _load_link_specs_from_state() - - -def test_sync_to_desired_requires_force_to_replace_modified_managed_target(tmp_path, monkeypatch): - state_file = tmp_path / "linked.json" - monkeypatch.setattr("flow.services.dotfiles.LINKED_STATE", state_file) - monkeypatch.setattr("flow.services.dotfiles._is_in_home", lambda _path, _home: True) - - old_source = tmp_path / "source" / ".old" - new_source = tmp_path / "source" / ".new" - old_source.parent.mkdir(parents=True) - old_source.write_text("managed-old") - new_source.write_text("managed-new") - - target = tmp_path / "home" / ".gitconfig" - target.parent.mkdir(parents=True) - target.write_text("manual-file") - - _save_link_specs_to_state( - { - target: LinkSpec( - source=old_source, - target=target, - package="_shared/git", - ) - } - ) - - desired = { - target: LinkSpec( - source=new_source, - target=target, - package="_shared/git", - ) - } - - with pytest.raises(RuntimeError, match="Use --force"): - _sync_to_desired( - _ctx(), - desired, - force=False, - dry_run=False, - copy=False, - ) - - assert target.exists() - assert not target.is_symlink() - assert target.read_text() == "manual-file" - assert _load_link_specs_from_state()[target].source == old_source - - -def test_run_sudo_errors_when_binary_missing(monkeypatch): - monkeypatch.setattr("flow.services.dotfiles.shutil.which", lambda _name: None) - with pytest.raises(RuntimeError, match="sudo is required"): - _run_sudo(["true"], dry_run=False) diff --git a/tests/test_package.py b/tests/test_package.py deleted file mode 100644 index aebbfd8..0000000 --- a/tests/test_package.py +++ /dev/null @@ -1,45 +0,0 @@ -"""Tests for flow.commands.package.""" - -from types import SimpleNamespace - -from flow.commands import package - - -def test_load_installed_returns_empty_on_malformed_json(tmp_path, monkeypatch): - state_file = tmp_path / "installed.json" - state_file.write_text("{broken", encoding="utf-8") - monkeypatch.setattr(package, "INSTALLED_STATE", state_file) - - assert package._load_installed() == {} - - -def test_load_installed_returns_empty_on_non_mapping_json(tmp_path, monkeypatch): - state_file = tmp_path / "installed.json" - state_file.write_text('["neovim"]', encoding="utf-8") - monkeypatch.setattr(package, "INSTALLED_STATE", state_file) - - assert package._load_installed() == {} - - -class _ConsoleCapture: - def __init__(self): - self.info_messages = [] - - def info(self, message): - self.info_messages.append(message) - - def table(self, _headers, _rows): - raise AssertionError("table() should not be called when installed state is malformed") - - -def test_run_list_handles_malformed_installed_state(tmp_path, monkeypatch): - state_file = tmp_path / "installed.json" - state_file.write_text("{oops", encoding="utf-8") - monkeypatch.setattr(package, "INSTALLED_STATE", state_file) - monkeypatch.setattr(package, "_get_definitions", lambda _ctx: {}) - - ctx = SimpleNamespace(console=_ConsoleCapture()) - - package.run_list(ctx, SimpleNamespace(all=False)) - - assert ctx.console.info_messages == ["No packages installed."] diff --git a/tests/test_paths.py b/tests/test_paths.py deleted file mode 100644 index 2418ac9..0000000 --- a/tests/test_paths.py +++ /dev/null @@ -1,77 +0,0 @@ -"""Tests for flow.core.paths.""" - -from pathlib import Path - -from flow.core.paths import ( - CONFIG_DIR, - CONFIG_FILE, - DATA_DIR, - DOTFILES_DIR, - INSTALLED_STATE, - LINKED_STATE, - MANIFEST_FILE, - MODULES_DIR, - PACKAGES_DIR, - SCRATCH_DIR, - STATE_DIR, - ensure_dirs, -) - - -def test_config_dir_under_home(): - assert ".config/flow" in str(CONFIG_DIR) - - -def test_data_dir_under_home(): - assert ".local/share/flow" in str(DATA_DIR) - - -def test_state_dir_under_home(): - assert ".local/state/flow" in str(STATE_DIR) - - -def test_manifest_file_in_config_dir(): - assert MANIFEST_FILE == CONFIG_DIR / "manifest.yaml" - - -def test_config_file_in_config_dir(): - assert CONFIG_FILE == CONFIG_DIR / "config.yaml" - - -def test_dotfiles_dir(): - assert DOTFILES_DIR == DATA_DIR / "dotfiles" - - -def test_packages_dir(): - assert PACKAGES_DIR == DATA_DIR / "packages" - - -def test_modules_dir(): - assert MODULES_DIR == DATA_DIR / "modules" - - -def test_scratch_dir(): - assert SCRATCH_DIR == DATA_DIR / "scratch" - - -def test_state_files(): - assert LINKED_STATE == STATE_DIR / "linked.json" - assert INSTALLED_STATE == STATE_DIR / "installed.json" - - -def test_ensure_dirs(tmp_path, monkeypatch): - monkeypatch.setattr("flow.core.paths.CONFIG_DIR", tmp_path / "config") - monkeypatch.setattr("flow.core.paths.DATA_DIR", tmp_path / "data") - monkeypatch.setattr("flow.core.paths.STATE_DIR", tmp_path / "state") - monkeypatch.setattr("flow.core.paths.MODULES_DIR", tmp_path / "data" / "modules") - monkeypatch.setattr("flow.core.paths.PACKAGES_DIR", tmp_path / "data" / "packages") - monkeypatch.setattr("flow.core.paths.SCRATCH_DIR", tmp_path / "data" / "scratch") - - ensure_dirs() - - assert (tmp_path / "config").is_dir() - assert (tmp_path / "data").is_dir() - assert (tmp_path / "state").is_dir() - assert (tmp_path / "data" / "modules").is_dir() - assert (tmp_path / "data" / "packages").is_dir() - assert (tmp_path / "data" / "scratch").is_dir() diff --git a/tests/test_platform.py b/tests/test_platform.py deleted file mode 100644 index 940b15b..0000000 --- a/tests/test_platform.py +++ /dev/null @@ -1,29 +0,0 @@ -"""Tests for flow.core.platform.""" - -import platform as _platform - -import pytest - -from flow.core.platform import PlatformInfo, detect_platform - - -def test_detect_platform_returns_platforminfo(): - info = detect_platform() - assert isinstance(info, PlatformInfo) - assert info.os in ("linux", "macos") - assert info.arch in ("x64", "arm64") - assert info.platform == f"{info.os}-{info.arch}" - - -def test_detect_platform_unsupported_os(monkeypatch): - monkeypatch.setattr(_platform, "system", lambda: "FreeBSD") - with pytest.raises(RuntimeError, match="Unsupported operating system"): - detect_platform() - - -def test_detect_platform_unsupported_arch(monkeypatch): - monkeypatch.setattr(_platform, "machine", lambda: "mips") - with pytest.raises(RuntimeError, match="Unsupported architecture"): - detect_platform() - - diff --git a/tests/test_self_hosting.py b/tests/test_self_hosting.py deleted file mode 100644 index f1877da..0000000 --- a/tests/test_self_hosting.py +++ /dev/null @@ -1,81 +0,0 @@ -"""Tests for self-hosted merged YAML config loading.""" - -from pathlib import Path - -import pytest - -from flow.core import paths as paths_module -from flow.core.config import load_config, load_manifest - - -@pytest.fixture -def mock_roots(tmp_path, monkeypatch): - local_root = tmp_path / "local-flow" - dotfiles_root = tmp_path / "dotfiles" / "_shared" / "flow" / ".config" / "flow" - - local_root.mkdir(parents=True) - dotfiles_root.mkdir(parents=True) - - monkeypatch.setattr(paths_module, "CONFIG_DIR", local_root) - monkeypatch.setattr(paths_module, "DOTFILES_FLOW_CONFIG", dotfiles_root) - - return { - "local": local_root, - "dotfiles": dotfiles_root, - } - - -def test_load_manifest_priority_dotfiles_first(mock_roots): - (mock_roots["local"] / "profiles.yaml").write_text("profiles:\n local: {os: linux}\n") - (mock_roots["dotfiles"] / "profiles.yaml").write_text("profiles:\n dotfiles: {os: macos}\n") - - manifest = load_manifest() - assert "dotfiles" in manifest.get("profiles", {}) - assert "local" not in manifest.get("profiles", {}) - - -def test_load_manifest_fallback_to_local(mock_roots): - (mock_roots["local"] / "profiles.yaml").write_text("profiles:\n local: {os: linux}\n") - - # Remove dotfiles yaml file so local takes over. - dot_yaml = mock_roots["dotfiles"] / "profiles.yaml" - if dot_yaml.exists(): - dot_yaml.unlink() - - manifest = load_manifest() - assert "local" in manifest.get("profiles", {}) - - -def test_load_manifest_empty_when_none_exist(mock_roots): - manifest = load_manifest() - assert manifest == {} - - -def test_load_config_from_merged_yaml(mock_roots): - (mock_roots["dotfiles"] / "config.yaml").write_text( - "repository:\n" - " dotfiles-url: git@github.com:user/dotfiles.git\n" - "defaults:\n" - " container-registry: registry.example.com\n" - ) - - cfg = load_config() - assert cfg.dotfiles_url == "git@github.com:user/dotfiles.git" - assert cfg.container_registry == "registry.example.com" - - -def test_yaml_merge_is_alphabetical_last_writer_wins(mock_roots): - (mock_roots["local"] / "10-a.yaml").write_text("profiles:\n a: {os: linux}\n") - (mock_roots["local"] / "20-b.yaml").write_text("profiles:\n b: {os: linux}\n") - - manifest = load_manifest(mock_roots["local"]) - assert "b" in manifest.get("profiles", {}) - assert "a" not in manifest.get("profiles", {}) - - -def test_explicit_file_path_loads_single_yaml(tmp_path): - one_file = tmp_path / "single.yaml" - one_file.write_text("profiles:\n only: {os: linux}\n") - - manifest = load_manifest(one_file) - assert "only" in manifest["profiles"] diff --git a/tests/test_stow.py b/tests/test_stow.py deleted file mode 100644 index 98e1d2c..0000000 --- a/tests/test_stow.py +++ /dev/null @@ -1,310 +0,0 @@ -"""Tests for flow.core.stow — GNU Stow-style tree folding/unfolding.""" - -import os -from pathlib import Path - -import pytest - -from flow.core.stow import LinkOperation, LinkTree, TreeFolder - - -@pytest.fixture -def temp_home(tmp_path): - """Create a temporary home directory.""" - home = tmp_path / "home" - home.mkdir() - return home - - -@pytest.fixture -def temp_dotfiles(tmp_path): - """Create a temporary dotfiles repository.""" - dotfiles = tmp_path / "dotfiles" - dotfiles.mkdir() - return dotfiles - - -def test_linktree_add_remove(): - """Test basic LinkTree operations.""" - tree = LinkTree() - source = Path("/dotfiles/zsh/.zshrc") - target = Path("/home/user/.zshrc") - - tree.add_link(target, source, "zsh", is_dir_link=False) - assert target in tree.links - assert tree.links[target] == source - assert tree.packages[target] == "zsh" - assert not tree.is_directory_link(target) - - tree.remove_link(target) - assert target not in tree.links - assert target not in tree.packages - - -def test_linktree_directory_link(): - """Test directory link tracking.""" - tree = LinkTree() - source = Path("/dotfiles/nvim/.config/nvim") - target = Path("/home/user/.config/nvim") - - tree.add_link(target, source, "nvim", is_dir_link=True) - assert tree.is_directory_link(target) - - -def test_linktree_can_fold_single_package(): - """Test can_fold with single package.""" - tree = LinkTree() - target_dir = Path("/home/user/.config/nvim") - - # Add files from same package - tree.add_link(target_dir / "init.lua", Path("/dotfiles/nvim/.config/nvim/init.lua"), "nvim") - tree.add_link(target_dir / "lua" / "config.lua", Path("/dotfiles/nvim/.config/nvim/lua/config.lua"), "nvim") - - # Should be able to fold since all files are from same package - assert tree.can_fold(target_dir, "nvim") - - -def test_linktree_can_fold_multiple_packages(): - """Test can_fold with multiple packages.""" - tree = LinkTree() - target_dir = Path("/home/user/.config") - - # Add files from different packages - tree.add_link(target_dir / "nvim", Path("/dotfiles/nvim/.config/nvim"), "nvim", is_dir_link=True) - tree.add_link(target_dir / "tmux", Path("/dotfiles/tmux/.config/tmux"), "tmux", is_dir_link=True) - - # Cannot fold .config since it has files from multiple packages - assert not tree.can_fold(target_dir, "nvim") - - -def test_linktree_from_state_old_format_rejected(): - """Old state format should be rejected (no backward compatibility).""" - state = { - "links": { - "zsh": { - "/home/user/.zshrc": "/dotfiles/zsh/.zshrc", - "/home/user/.zshenv": "/dotfiles/zsh/.zshenv", - } - } - } - - with pytest.raises(RuntimeError, match="Unsupported linked state format"): - LinkTree.from_state(state) - - -def test_linktree_from_state_new_format(): - """Test loading from new state format (with is_directory_link).""" - state = { - "version": 2, - "links": { - "nvim": { - "/home/user/.config/nvim": { - "source": "/dotfiles/nvim/.config/nvim", - "is_directory_link": True, - } - } - } - } - - tree = LinkTree.from_state(state) - target = Path("/home/user/.config/nvim") - assert target in tree.links - assert tree.is_directory_link(target) - assert tree.packages[target] == "nvim" - - -def test_linktree_to_state(): - """Test converting LinkTree to state format.""" - tree = LinkTree() - tree.add_link( - Path("/home/user/.config/nvim"), - Path("/dotfiles/nvim/.config/nvim"), - "nvim", - is_dir_link=True, - ) - tree.add_link( - Path("/home/user/.zshrc"), - Path("/dotfiles/zsh/.zshrc"), - "zsh", - is_dir_link=False, - ) - - state = tree.to_state() - assert state["version"] == 2 - assert "nvim" in state["links"] - assert "zsh" in state["links"] - - nvim_link = state["links"]["nvim"]["/home/user/.config/nvim"] - assert nvim_link["is_directory_link"] is True - - zsh_link = state["links"]["zsh"]["/home/user/.zshrc"] - assert zsh_link["is_directory_link"] is False - - -def test_treefolder_plan_link_simple(temp_home, temp_dotfiles): - """Test planning a simple file link.""" - tree = LinkTree() - folder = TreeFolder(tree) - - source = temp_dotfiles / "zsh" / ".zshrc" - target = temp_home / ".zshrc" - - # Create source file - source.parent.mkdir(parents=True) - source.write_text("# zshrc") - - ops = folder.plan_link(source, target, "zsh") - assert len(ops) == 1 - assert ops[0].type == "create_symlink" - assert ops[0].source == source - assert ops[0].target == target - assert ops[0].package == "zsh" - - -def test_treefolder_detect_conflicts_existing_file(temp_home, temp_dotfiles): - """Test conflict detection for existing files.""" - tree = LinkTree() - folder = TreeFolder(tree) - - source = temp_dotfiles / "zsh" / ".zshrc" - target = temp_home / ".zshrc" - - # Create existing file - target.parent.mkdir(parents=True, exist_ok=True) - target.write_text("# existing") - - source.parent.mkdir(parents=True) - source.write_text("# zshrc") - - ops = folder.plan_link(source, target, "zsh") - conflicts = folder.detect_conflicts(ops) - - assert len(conflicts) == 1 - assert "already exists" in conflicts[0] - - -def test_treefolder_detect_conflicts_different_package(temp_home, temp_dotfiles): - """Test conflict detection for links from different packages.""" - tree = LinkTree() - target = temp_home / ".bashrc" - - # Add existing link from different package - tree.add_link(target, Path("/dotfiles/bash/.bashrc"), "bash") - - folder = TreeFolder(tree) - source = temp_dotfiles / "zsh" / ".bashrc" - source.parent.mkdir(parents=True) - source.write_text("# bashrc") - - ops = folder.plan_link(source, target, "zsh") - conflicts = folder.detect_conflicts(ops) - - assert len(conflicts) == 1 - assert "bash" in conflicts[0] - - -def test_treefolder_execute_operations_dry_run(temp_home, temp_dotfiles, capsys): - """Test dry-run mode.""" - tree = LinkTree() - folder = TreeFolder(tree) - - source = temp_dotfiles / "zsh" / ".zshrc" - target = temp_home / ".zshrc" - - source.parent.mkdir(parents=True) - source.write_text("# zshrc") - - ops = folder.plan_link(source, target, "zsh") - folder.execute_operations(ops, dry_run=True) - - # Check output - captured = capsys.readouterr() - assert "FILE LINK" in captured.out - assert str(target) in captured.out - - # No actual symlink created - assert not target.exists() - - -def test_treefolder_execute_operations_create_symlink(temp_home, temp_dotfiles): - """Test creating actual symlinks.""" - tree = LinkTree() - folder = TreeFolder(tree) - - source = temp_dotfiles / "zsh" / ".zshrc" - target = temp_home / ".zshrc" - - source.parent.mkdir(parents=True) - source.write_text("# zshrc") - - ops = folder.plan_link(source, target, "zsh") - folder.execute_operations(ops, dry_run=False) - - # Check symlink created - assert target.is_symlink() - assert target.resolve() == source.resolve() - - # Check tree updated - assert target in folder.tree.links - - -def test_treefolder_plan_unlink(temp_home, temp_dotfiles): - """Test planning unlink operations.""" - tree = LinkTree() - target = temp_home / ".zshrc" - source = temp_dotfiles / "zsh" / ".zshrc" - - tree.add_link(target, source, "zsh") - - folder = TreeFolder(tree) - ops = folder.plan_unlink(target, "zsh") - - assert len(ops) == 1 - assert ops[0].type == "remove" - assert ops[0].target == target - - -def test_treefolder_plan_unlink_directory_link(temp_home, temp_dotfiles): - """Test planning unlink for directory symlink.""" - tree = LinkTree() - target = temp_home / ".config" / "nvim" - source = temp_dotfiles / "nvim" / ".config" / "nvim" - - tree.add_link(target, source, "nvim", is_dir_link=True) - - folder = TreeFolder(tree) - ops = folder.plan_unlink(target, "nvim") - - # Should remove the directory link - assert len(ops) >= 1 - assert ops[-1].type == "remove" - assert ops[-1].is_directory_link - - -def test_linkoperation_str(): - """Test LinkOperation string representation.""" - op1 = LinkOperation( - type="create_symlink", - source=Path("/src"), - target=Path("/dst"), - package="test", - is_directory_link=False, - ) - assert "FILE LINK" in str(op1) - - op2 = LinkOperation( - type="create_symlink", - source=Path("/src"), - target=Path("/dst"), - package="test", - is_directory_link=True, - ) - assert "DIR LINK" in str(op2) - - op3 = LinkOperation( - type="unfold", - source=Path("/src"), - target=Path("/dst"), - package="test", - ) - assert "UNFOLD" in str(op3) diff --git a/tests/test_sync.py b/tests/test_sync.py deleted file mode 100644 index 229b975..0000000 --- a/tests/test_sync.py +++ /dev/null @@ -1,88 +0,0 @@ -"""Tests for flow.commands.sync.""" - -from types import SimpleNamespace - -import pytest - -from flow.commands import sync - - -def _git_clean_repo(_repo, *cmd, capture=True): - _ = capture - if cmd == ("rev-parse", "--abbrev-ref", "HEAD"): - return SimpleNamespace(returncode=0, stdout="main\n") - if cmd == ("diff", "--quiet"): - return SimpleNamespace(returncode=0, stdout="") - if cmd == ("diff", "--cached", "--quiet"): - return SimpleNamespace(returncode=0, stdout="") - if cmd == ("ls-files", "--others", "--exclude-standard"): - return SimpleNamespace(returncode=0, stdout="") - if cmd == ("rev-parse", "--abbrev-ref", "main@{u}"): - return SimpleNamespace(returncode=0, stdout="origin/main\n") - if cmd == ("rev-list", "--oneline", "main@{u}..main"): - return SimpleNamespace(returncode=0, stdout="") - if cmd == ("for-each-ref", "--format=%(refname:short)", "refs/heads"): - return SimpleNamespace(returncode=0, stdout="main\n") - raise AssertionError(f"Unexpected git command: {cmd!r}") - - -@pytest.mark.parametrize("git_style", ["dir", "file"]) -def test_check_repo_detects_git_dir_and_worktree_file(tmp_path, monkeypatch, git_style): - repo = tmp_path / "repo" - repo.mkdir() - - if git_style == "dir": - (repo / ".git").mkdir() - else: - (repo / ".git").write_text("gitdir: /tmp/worktrees/repo\n", encoding="utf-8") - - monkeypatch.setattr(sync, "_git", _git_clean_repo) - - name, issues = sync._check_repo(str(repo), do_fetch=False) - - assert name == "repo" - assert issues == [] - - -class _ConsoleCapture: - def __init__(self): - self.info_messages = [] - self.error_messages = [] - self.success_messages = [] - - def info(self, message): - self.info_messages.append(message) - - def error(self, message): - self.error_messages.append(message) - - def success(self, message): - self.success_messages.append(message) - - -def test_run_fetch_includes_worktree_style_repo(tmp_path, monkeypatch): - projects = tmp_path / "projects" - projects.mkdir() - - worktree_repo = projects / "worktree" - worktree_repo.mkdir() - (worktree_repo / ".git").write_text("gitdir: /tmp/worktrees/worktree\n", encoding="utf-8") - - (projects / "non_git").mkdir() - - calls = [] - - def _git_fetch(repo, *cmd, capture=True): - _ = capture - calls.append((repo, cmd)) - return SimpleNamespace(returncode=0, stdout="") - - monkeypatch.setattr(sync, "_git", _git_fetch) - - console = _ConsoleCapture() - ctx = SimpleNamespace(config=SimpleNamespace(projects_dir=str(projects)), console=console) - - sync.run_fetch(ctx, SimpleNamespace()) - - assert calls == [(str(worktree_repo), ("fetch", "--all", "--quiet"))] - assert console.success_messages == ["All remotes fetched."] diff --git a/tests/test_variables.py b/tests/test_variables.py deleted file mode 100644 index 35a8a75..0000000 --- a/tests/test_variables.py +++ /dev/null @@ -1,57 +0,0 @@ -"""Tests for flow.core.variables.""" - -from flow.core.variables import substitute, substitute_template - - -def test_substitute_dollar(): - result = substitute("hello $NAME", {"NAME": "world"}) - assert result == "hello world" - - -def test_substitute_braces(): - result = substitute("hello ${NAME}", {"NAME": "world"}) - assert result == "hello world" - - -def test_substitute_multiple(): - result = substitute("$A and ${B}", {"A": "1", "B": "2"}) - assert result == "1 and 2" - - -def test_substitute_home(): - result = substitute("dir=$HOME", {}) - assert "$HOME" not in result - - -def test_substitute_user(): - import os - result = substitute("u=$USER", {}) - assert result == f"u={os.getenv('USER', '')}" - - -def test_substitute_non_string(): - assert substitute(123, {}) == 123 - - -def test_substitute_template_basic(): - result = substitute_template("nvim-{{os}}-{{arch}}.tar.gz", {"os": "linux", "arch": "x86_64"}) - assert result == "nvim-linux-x86_64.tar.gz" - - -def test_substitute_template_missing_key(): - result = substitute_template("{{missing}}", {}) - assert result == "{{missing}}" - - -def test_substitute_template_non_string(): - assert substitute_template(42, {}) == 42 - - -def test_substitute_template_no_placeholders(): - result = substitute_template("plain text", {"os": "linux"}) - assert result == "plain text" - - -def test_substitute_template_env_namespace(): - result = substitute_template("{{ env.USER_EMAIL }}", {"env": {"USER_EMAIL": "you@example.com"}}) - assert result == "you@example.com"