chore: remove old code replaced by rewrite

Delete old core modules (action, stow, process, system, variables),
old services (package_defs, ssh), and all tests for deleted code.

191 tests pass with the new codebase.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-16 05:07:57 +02:00
parent 6ea23e02df
commit 6e9f9c9e30
22 changed files with 0 additions and 3715 deletions

View File

@@ -1,120 +0,0 @@
"""Action dataclass and ActionExecutor for plan-then-execute workflows."""
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional
from flow.core.console import ConsoleLogger
@dataclass
class Action:
type: str
description: str
data: Dict[str, Any] = field(default_factory=dict)
skip_on_error: bool = True
os_filter: Optional[str] = None
status: str = "pending"
error: Optional[str] = None
class ActionExecutor:
"""Register handlers for action types, then execute a plan."""
def __init__(self, console: ConsoleLogger):
self.console = console
self._handlers: Dict[str, Callable] = {}
self.post_comments: List[str] = []
def register(self, action_type: str, handler: Callable) -> None:
self._handlers[action_type] = handler
def execute(self, actions: List[Action], *, dry_run: bool = False, current_os: str = "") -> None:
if dry_run:
self._print_plan(actions)
return
# Filter OS-incompatible actions
compatible = [a for a in actions if a.os_filter is None or a.os_filter == current_os]
skipped_count = len(actions) - len(compatible)
if skipped_count:
self.console.info(f"Skipped {skipped_count} OS-incompatible actions")
self.console.section_header(f"EXECUTING {len(compatible)} ACTIONS")
for i, action in enumerate(compatible, 1):
self.console.step_start(i, len(compatible), action.description)
handler = self._handlers.get(action.type)
if not handler:
action.status = "skipped"
self.console.step_skip(f"No handler for action type: {action.type}")
continue
try:
handler(action.data)
action.status = "completed"
self.console.step_complete()
except Exception as e:
action.error = str(e)
if action.skip_on_error:
action.status = "skipped"
self.console.step_skip(str(e))
else:
action.status = "failed"
self.console.step_fail(str(e))
print(f"\n{self.console.RED}Critical action failed, stopping execution{self.console.RESET}")
break
self._print_summary(compatible)
def _print_plan(self, actions: List[Action]) -> None:
self.console.plan_header("EXECUTION PLAN", len(actions))
grouped: Dict[str, List[Action]] = {}
for action in actions:
category = action.type.split("-")[0]
grouped.setdefault(category, []).append(action)
for category, category_actions in grouped.items():
self.console.plan_category(category)
for i, action in enumerate(category_actions, 1):
self.console.plan_item(
i,
action.description,
action.os_filter,
not action.skip_on_error,
)
self.console.plan_legend()
def _print_summary(self, actions: List[Action]) -> None:
completed = sum(1 for a in actions if a.status == "completed")
failed = sum(1 for a in actions if a.status == "failed")
skipped = sum(1 for a in actions if a.status == "skipped")
self.console.section_summary("EXECUTION SUMMARY")
c = self.console
print(f"Total actions: {c.BOLD}{len(actions)}{c.RESET}")
print(f"Completed: {c.GREEN}{completed}{c.RESET}")
if failed:
print(f"Failed: {c.RED}{failed}{c.RESET}")
if skipped:
print(f"Skipped: {c.YELLOW}{skipped}{c.RESET}")
if self.post_comments:
print(f"\n{c.BOLD}POST-INSTALL NOTES{c.RESET}")
print(f"{c.CYAN}{'-' * 25}{c.RESET}")
for i, comment in enumerate(self.post_comments, 1):
print(f"{i}. {comment}")
if failed:
print(f"\n{c.BOLD}FAILED ACTIONS{c.RESET}")
print(f"{c.RED}{'-' * 20}{c.RESET}")
for action in actions:
if action.status == "failed":
print(f"{c.RED}>{c.RESET} {action.description}")
print(f" {c.GRAY}Error: {action.error}{c.RESET}")
print(f"\n{c.RED}{failed} action(s) failed. Check the errors above.{c.RESET}")
else:
print(f"\n{c.GREEN}All actions completed successfully!{c.RESET}")

View File

@@ -1,30 +0,0 @@
"""Command execution helpers."""
import subprocess
from flow.core.console import ConsoleLogger
from flow.core.system import CommandRunner
def run_command(
command: str,
console: ConsoleLogger,
*,
check: bool = True,
shell: bool = True,
capture: bool = False,
) -> subprocess.CompletedProcess:
"""Run a command with real-time streamed output."""
if not shell:
raise RuntimeError("run_command only supports shell commands")
runner = CommandRunner()
if capture:
result = runner.run_shell(command, capture_output=True, check=False)
if check and result.returncode != 0:
raise RuntimeError(
f"Command failed (exit {result.returncode}): {command}"
)
return result
return runner.stream_shell(command, console, check=check)

View File

@@ -1,358 +0,0 @@
"""GNU Stow-style tree folding/unfolding for efficient symlink management."""
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional, Set
@dataclass
class LinkOperation:
"""Represents a single operation to perform during linking."""
type: str # "create_symlink" | "create_dir" | "unfold" | "remove" | "remove_dir"
source: Path
target: Path
package: str
is_directory_link: bool = False
def __str__(self) -> str:
if self.type == "create_symlink":
link_type = "DIR" if self.is_directory_link else "FILE"
return f" {link_type} LINK: {self.target} -> {self.source}"
elif self.type == "create_dir":
return f" CREATE DIR: {self.target}"
elif self.type == "unfold":
return f" UNFOLD: {self.target} (directory symlink -> individual file symlinks)"
elif self.type == "remove":
return f" REMOVE: {self.target}"
elif self.type == "remove_dir":
return f" REMOVE DIR: {self.target}"
return f" {self.type}: {self.target}"
@dataclass
class LinkTree:
"""Represents the current state of symlinks."""
links: Dict[Path, Path] = field(default_factory=dict) # target -> source
packages: Dict[Path, str] = field(default_factory=dict) # target -> package_name
directory_links: Set[Path] = field(default_factory=set) # targets that are directory links
def add_link(self, target: Path, source: Path, package: str, is_dir_link: bool = False):
"""Add a link to the tree."""
self.links[target] = source
self.packages[target] = package
if is_dir_link:
self.directory_links.add(target)
def remove_link(self, target: Path):
"""Remove a link from the tree."""
self.links.pop(target, None)
self.packages.pop(target, None)
self.directory_links.discard(target)
def is_directory_link(self, target: Path) -> bool:
"""Check if a target is a directory symlink."""
return target in self.directory_links
def get_package(self, target: Path) -> Optional[str]:
"""Get the package that owns a link."""
return self.packages.get(target)
def can_fold(self, target_dir: Path, package: str) -> bool:
"""Check if all links in target_dir belong to the same package.
Returns True if we can create a single directory symlink instead of
individual file symlinks.
"""
# Check all direct children of target_dir
for link_target, link_package in self.packages.items():
# If link_target is a child of target_dir
try:
link_target.relative_to(target_dir)
# If parent is target_dir and package differs, cannot fold
if link_target.parent == target_dir and link_package != package:
return False
except ValueError:
# link_target is not under target_dir, skip
continue
return True
@classmethod
def from_state(cls, state: dict) -> "LinkTree":
"""Build a LinkTree from the linked.json state format (v2 only)."""
tree = cls()
links_dict = state.get("links", {})
for package_name, pkg_links in links_dict.items():
for target_str, link_info in pkg_links.items():
target = Path(target_str)
if not isinstance(link_info, dict) or "source" not in link_info:
raise RuntimeError(
"Unsupported linked state format. Remove linked.json and relink dotfiles."
)
source = Path(link_info["source"])
is_dir_link = bool(link_info.get("is_directory_link", False))
tree.add_link(target, source, package_name, is_dir_link)
return tree
def to_state(self) -> dict:
"""Convert LinkTree to linked.json state format."""
state = {"version": 2, "links": {}}
# Group links by package
package_links: Dict[str, Dict[str, dict]] = {}
for target, source in self.links.items():
package = self.packages[target]
if package not in package_links:
package_links[package] = {}
package_links[package][str(target)] = {
"source": str(source),
"is_directory_link": target in self.directory_links,
}
state["links"] = package_links
return state
class TreeFolder:
"""Implements GNU Stow tree folding/unfolding algorithm."""
def __init__(self, tree: LinkTree):
self.tree = tree
def plan_link(
self, source: Path, target: Path, package: str, is_dir_link: bool = False
) -> List[LinkOperation]:
"""Plan operations needed to create a link (may include unfolding).
Args:
source: Source path (file or directory in dotfiles)
target: Target path (where symlink should be created)
package: Package name
is_dir_link: Whether this is a directory symlink (folded)
Returns a list of operations to execute in order.
"""
operations = []
# Check if parent is a directory symlink that needs unfolding
parent = target.parent
if parent in self.tree.links and self.tree.is_directory_link(parent):
# Parent is a folded directory symlink, need to unfold
unfold_ops = self._plan_unfold(parent)
operations.extend(unfold_ops)
# Create symlink operation (conflict detection will handle existing links)
operations.append(
LinkOperation(
type="create_symlink",
source=source,
target=target,
package=package,
is_directory_link=is_dir_link,
)
)
return operations
def _find_fold_point(
self, source: Path, target: Path, package: str
) -> Path:
"""Find the deepest directory level where we can create a folder symlink.
Returns the target path where the symlink should be created.
For single files, this should just return the file path (no folding).
Folding only makes sense when linking entire directories.
"""
# For now, disable automatic folding at the plan_link level
# Folding should be done at a higher level when we know we're
# linking an entire directory tree from a package
return target
def _plan_unfold(self, folded_dir: Path) -> List[LinkOperation]:
"""Plan operations to unfold a directory symlink.
When unfolding:
1. Remove the directory symlink
2. Create a real directory
3. Create individual file symlinks for all files
"""
operations = []
# Get the source of the folded directory
source_dir = self.tree.links.get(folded_dir)
if not source_dir:
return operations
package = self.tree.packages.get(folded_dir, "")
# Remove the directory symlink
operations.append(
LinkOperation(
type="remove",
source=source_dir,
target=folded_dir,
package=package,
is_directory_link=True,
)
)
# Create real directory
operations.append(
LinkOperation(
type="create_dir",
source=source_dir,
target=folded_dir,
package=package,
)
)
# Create individual file symlinks for all files in source
if source_dir.exists() and source_dir.is_dir():
for root, _dirs, files in os.walk(source_dir):
for fname in files:
src_file = Path(root) / fname
rel = src_file.relative_to(source_dir)
dst_file = folded_dir / rel
operations.append(
LinkOperation(
type="create_symlink",
source=src_file,
target=dst_file,
package=package,
is_directory_link=False,
)
)
return operations
def plan_unlink(self, target: Path, package: str) -> List[LinkOperation]:
"""Plan operations to remove a link (may include refolding)."""
operations = []
# Check if this is a directory link
if self.tree.is_directory_link(target):
# Remove all file links under this directory
to_remove = []
for link_target in self.tree.links.keys():
try:
link_target.relative_to(target)
to_remove.append(link_target)
except ValueError:
continue
for link_target in to_remove:
operations.append(
LinkOperation(
type="remove",
source=self.tree.links[link_target],
target=link_target,
package=self.tree.packages[link_target],
is_directory_link=False,
)
)
# Remove the link itself
if target in self.tree.links:
operations.append(
LinkOperation(
type="remove",
source=self.tree.links[target],
target=target,
package=package,
is_directory_link=self.tree.is_directory_link(target),
)
)
return operations
def detect_conflicts(self, operations: List[LinkOperation]) -> List[str]:
"""Detect conflicts before executing operations.
Returns a list of conflict error messages.
"""
conflicts = []
for op in operations:
if op.type == "create_symlink":
# Check if target already exists in tree (managed by flow)
if op.target in self.tree.links:
existing_pkg = self.tree.packages[op.target]
if existing_pkg != op.package:
conflicts.append(
f"Conflict: {op.target} is already linked by package '{existing_pkg}'"
)
# Check if target exists on disk but not managed by flow
elif op.target.exists() or op.target.is_symlink():
conflicts.append(
f"Conflict: {op.target} already exists and is not managed by flow"
)
# Check if target's parent is a file (can't create file in file)
if op.target.parent.exists() and op.target.parent.is_file():
conflicts.append(
f"Conflict: {op.target.parent} is a file, cannot create {op.target}"
)
return conflicts
def execute_operations(
self, operations: List[LinkOperation], dry_run: bool = False
) -> None:
"""Execute a list of operations atomically.
If dry_run is True, only print what would be done.
"""
if dry_run:
for op in operations:
print(str(op))
return
# Execute operations
for op in operations:
if op.type == "create_symlink":
# Create parent directories
op.target.parent.mkdir(parents=True, exist_ok=True)
if op.target.is_symlink():
current = op.target.resolve(strict=False)
desired = op.source.resolve(strict=False)
if current == desired:
self.tree.add_link(op.target, op.source, op.package, op.is_directory_link)
continue
op.target.unlink()
elif op.target.exists():
if op.target.is_file():
op.target.unlink()
else:
raise RuntimeError(f"Cannot overwrite directory: {op.target}")
# Create symlink
op.target.symlink_to(op.source)
# Update tree
self.tree.add_link(op.target, op.source, op.package, op.is_directory_link)
elif op.type == "create_dir":
op.target.mkdir(parents=True, exist_ok=True)
elif op.type == "remove":
if op.target.exists() or op.target.is_symlink():
op.target.unlink()
self.tree.remove_link(op.target)
elif op.type == "remove_dir":
if op.target.exists() and op.target.is_dir():
op.target.rmdir()
def to_state(self) -> dict:
"""Convert current tree to state format for persistence."""
return self.tree.to_state()

View File

@@ -1,327 +0,0 @@
"""Runtime primitives for process, git, state, and filesystem access."""
from __future__ import annotations
import json
import os
import shlex
import shutil
import subprocess
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Iterable, Mapping, Optional, Sequence
from flow.core.console import ConsoleLogger
from flow.core.errors import FlowError
def _as_argv(argv: Sequence[str] | Iterable[str]) -> list[str]:
return [str(part) for part in argv]
class CommandRunner:
"""Small wrapper around subprocess with consistent defaults."""
def format_command(self, argv: Sequence[str] | Iterable[str]) -> str:
return " ".join(shlex.quote(part) for part in _as_argv(argv))
def require_binary(self, name: str) -> str:
path = shutil.which(name)
if path is None:
raise FlowError(f"Required executable not found: {name}")
return path
def run(
self,
argv: Sequence[str] | Iterable[str],
*,
cwd: Optional[Path] = None,
env: Optional[Mapping[str, str]] = None,
capture_output: bool = True,
check: bool = False,
timeout: Optional[int | float] = None,
) -> subprocess.CompletedProcess[str]:
completed = subprocess.run(
_as_argv(argv),
cwd=str(cwd) if cwd is not None else None,
env=dict(env) if env is not None else None,
capture_output=capture_output,
text=True,
check=False,
timeout=timeout,
)
if check and completed.returncode != 0:
message = completed.stderr.strip() or completed.stdout.strip()
if not message:
message = f"Command failed with exit code {completed.returncode}"
raise FlowError(message)
return completed
def run_shell(
self,
command: str,
*,
cwd: Optional[Path] = None,
env: Optional[Mapping[str, str]] = None,
capture_output: bool = True,
check: bool = False,
timeout: Optional[int | float] = None,
) -> subprocess.CompletedProcess[str]:
completed = subprocess.run(
command,
shell=True,
cwd=str(cwd) if cwd is not None else None,
env=dict(env) if env is not None else None,
capture_output=capture_output,
text=True,
check=False,
timeout=timeout,
)
if check and completed.returncode != 0:
message = completed.stderr.strip() or completed.stdout.strip()
if not message:
message = f"Command failed with exit code {completed.returncode}"
raise FlowError(message)
return completed
def stream_shell(
self,
command: str,
console: ConsoleLogger,
*,
check: bool = True,
) -> subprocess.CompletedProcess[str]:
console.step_command(command)
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1,
)
output_lines: list[str] = []
assert process.stdout is not None
try:
for line in process.stdout:
line = line.rstrip()
if not line:
continue
console.step_output(line)
output_lines.append(line)
finally:
process.stdout.close()
process.wait()
if check and process.returncode != 0:
raise FlowError(
f"Command failed (exit {process.returncode}): {command}"
)
return subprocess.CompletedProcess(
command,
process.returncode,
stdout="\n".join(output_lines),
stderr="",
)
class GitClient:
"""Thin git adapter that always scopes commands to a repository root."""
def __init__(self, runner: CommandRunner):
self.runner = runner
def run(
self,
repo_dir: Path,
*args: str,
capture_output: bool = True,
check: bool = False,
) -> subprocess.CompletedProcess[str]:
return self.runner.run(
["git", "-C", str(repo_dir), *args],
capture_output=capture_output,
check=check,
)
class FileSystem:
"""Filesystem wrapper for all mutating operations."""
def ensure_dir(
self,
path: Path,
*,
sudo: bool = False,
runner: Optional[CommandRunner] = None,
mode: Optional[int] = None,
) -> None:
if sudo:
if runner is None:
raise FlowError("A command runner is required for sudo operations")
runner.require_binary("sudo")
argv = ["sudo", "mkdir", "-p"]
if mode is not None:
argv.extend(["-m", f"{mode:o}"])
argv.append(str(path))
runner.run(argv, check=True)
return
path.mkdir(parents=True, exist_ok=True)
if mode is not None:
path.chmod(mode)
def remove_file(
self,
path: Path,
*,
sudo: bool = False,
runner: Optional[CommandRunner] = None,
missing_ok: bool = True,
) -> None:
if sudo:
if runner is None:
raise FlowError("A command runner is required for sudo operations")
runner.require_binary("sudo")
argv = ["sudo", "rm"]
if missing_ok:
argv.append("-f")
argv.append(str(path))
runner.run(argv, check=True)
return
try:
path.unlink()
except FileNotFoundError:
if not missing_ok:
raise
def remove_tree(self, path: Path) -> None:
shutil.rmtree(path, ignore_errors=True)
def copy_file(
self,
source: Path,
target: Path,
*,
sudo: bool = False,
runner: Optional[CommandRunner] = None,
) -> None:
if sudo:
if runner is None:
raise FlowError("A command runner is required for sudo operations")
runner.require_binary("sudo")
self.ensure_dir(target.parent, sudo=True, runner=runner)
runner.run(["sudo", "cp", "-a", str(source), str(target)], check=True)
return
self.ensure_dir(target.parent)
shutil.copy2(source, target)
def copy_tree(self, source: Path, target: Path) -> None:
self.ensure_dir(target.parent)
shutil.copytree(source, target, dirs_exist_ok=True)
def create_symlink(
self,
source: Path,
target: Path,
*,
sudo: bool = False,
runner: Optional[CommandRunner] = None,
) -> None:
if sudo:
if runner is None:
raise FlowError("A command runner is required for sudo operations")
runner.require_binary("sudo")
self.ensure_dir(target.parent, sudo=True, runner=runner)
runner.run(["sudo", "ln", "-sfn", str(source), str(target)], check=True)
return
self.ensure_dir(target.parent)
target.symlink_to(source)
def read_text(self, path: Path, *, default: Optional[str] = None) -> str:
try:
return path.read_text(encoding="utf-8")
except FileNotFoundError:
if default is None:
raise
return default
def write_text(self, path: Path, content: str) -> None:
self.ensure_dir(path.parent)
path.write_text(content, encoding="utf-8")
def write_bytes(self, path: Path, content: bytes) -> None:
self.ensure_dir(path.parent)
path.write_bytes(content)
def write_bytes(self, path: Path, content: bytes) -> None:
self.ensure_dir(path.parent)
path.write_bytes(content)
def read_json(self, path: Path, *, default: Any = None) -> Any:
try:
with open(path, "r", encoding="utf-8") as handle:
return json.load(handle)
except FileNotFoundError:
return default
def write_json(self, path: Path, data: Any) -> None:
self.ensure_dir(path.parent)
with open(path, "w", encoding="utf-8") as handle:
json.dump(data, handle, indent=2)
def same_symlink(self, target: Path, source: Path) -> bool:
if not target.is_symlink():
return False
return target.resolve(strict=False) == source.resolve(strict=False)
def is_within(self, path: Path, parent: Path) -> bool:
try:
path.resolve(strict=False).relative_to(parent.resolve(strict=False))
return True
except ValueError:
return False
def path_in_home(self, path: Path, home: Optional[Path] = None) -> bool:
root = (home or Path.home()).resolve(strict=False)
try:
path.resolve(strict=False).relative_to(root)
return True
except ValueError:
return False
@dataclass
class JsonStateStore:
"""JSON file-backed state store."""
path: Path
fs: FileSystem
default_factory: Any
def load(self) -> Any:
data = self.fs.read_json(self.path, default=None)
if data is None:
return self.default_factory()
return data
def save(self, data: Any) -> None:
self.fs.write_json(self.path, data)
@dataclass
class SystemRuntime:
"""Shared runtime dependencies carried through the command context."""
runner: CommandRunner = field(default_factory=CommandRunner)
fs: FileSystem = field(default_factory=FileSystem)
git: GitClient = field(init=False)
def __post_init__(self) -> None:
self.git = GitClient(self.runner)

View File

@@ -1,61 +0,0 @@
"""Variable substitution for shell-style and template expressions."""
import os
import re
from pathlib import Path
from typing import Any, Dict
def substitute(text: str, variables: Dict[str, str]) -> str:
"""Replace $VAR and ${VAR} with values from variables dict or env."""
if not isinstance(text, str):
return text
pattern = re.compile(r"\$(\w+)|\$\{([^}]+)\}")
def _replace(match: re.Match[str]) -> str:
key = match.group(1) or match.group(2) or ""
if key in variables:
return str(variables[key])
if key == "HOME":
return str(Path.home())
if key in os.environ:
return os.environ[key]
return match.group(0)
return pattern.sub(_replace, text)
def _resolve_template_value(expr: str, context: Dict[str, Any]) -> Any:
if expr.startswith("env."):
env_key = expr.split(".", 1)[1]
env_ctx = context.get("env", {})
if isinstance(env_ctx, dict) and env_key in env_ctx:
return env_ctx[env_key]
return os.environ.get(env_key)
if expr in context:
return context[expr]
current: Any = context
for part in expr.split("."):
if not isinstance(current, dict) or part not in current:
return None
current = current[part]
return current
def substitute_template(text: str, context: Dict[str, Any]) -> str:
"""Replace {{expr}} placeholders with values from context dict."""
if not isinstance(text, str):
return text
def _replace(match: re.Match[str]) -> str:
key = match.group(1).strip()
value = _resolve_template_value(key, context)
if value is None:
return match.group(0)
return str(value)
return re.sub(r"\{\{\s*([^{}]+?)\s*\}\}", _replace, text)

View File

@@ -1,350 +0,0 @@
"""Shared package-manifest normalization and binary install helpers."""
from __future__ import annotations
import os
import shutil
import tempfile
import urllib.request
from pathlib import Path
from typing import Any, Dict, List, Optional
from flow.core.config import FlowContext
from flow.core.errors import FlowError
from flow.core.variables import substitute_template
PACKAGE_TYPES = {"pkg", "binary", "cask"}
def linux_detect_package_manager() -> Optional[str]:
if shutil.which("apt") or shutil.which("apt-get"):
return "apt"
if shutil.which("dnf"):
return "dnf"
return None
def resolve_package_manager(ctx: FlowContext, profile_cfg: dict) -> str:
explicit = profile_cfg.get("package-manager")
if isinstance(explicit, str) and explicit:
return explicit
profile_os = profile_cfg.get("os")
if profile_os == "macos":
return "brew"
if profile_os == "linux":
detected = linux_detect_package_manager()
if detected:
return detected
raise FlowError("Unable to auto-detect package manager (expected apt or dnf)")
raise FlowError("Profile 'os' must be set to 'linux' or 'macos'")
def get_package_catalog(ctx: FlowContext) -> Dict[str, Dict[str, Any]]:
raw = ctx.manifest.get("packages", [])
catalog: Dict[str, Dict[str, Any]] = {}
if isinstance(raw, dict):
for name, definition in raw.items():
if not isinstance(definition, dict):
continue
package = dict(definition)
package["name"] = str(package.get("name") or name)
package.setdefault("type", "pkg")
catalog[package["name"]] = package
return catalog
if not isinstance(raw, list):
return catalog
for item in raw:
if not isinstance(item, dict):
continue
name = item.get("name")
if not isinstance(name, str) or not name:
continue
package = dict(item)
package.setdefault("type", "pkg")
catalog[name] = package
return catalog
def normalize_profile_package_entry(entry: Any) -> Dict[str, Any]:
if isinstance(entry, str):
if "/" in entry:
prefix, name = entry.split("/", 1)
if prefix in PACKAGE_TYPES and name:
return {"name": name, "type": prefix}
return {"name": entry}
if isinstance(entry, dict):
name = entry.get("name")
if not isinstance(name, str) or not name:
raise FlowError("Package object entries must include a non-empty 'name'")
return dict(entry)
raise FlowError(f"Unsupported package entry: {entry!r}")
def resolve_package_spec(
catalog: Dict[str, Dict[str, Any]],
profile_entry: Dict[str, Any],
) -> Dict[str, Any]:
name = profile_entry["name"]
merged = dict(catalog.get(name, {}))
merged.update(profile_entry)
merged["name"] = name
pkg_type = merged.get("type") or "pkg"
if pkg_type not in PACKAGE_TYPES:
raise FlowError(f"Unsupported package type '{pkg_type}' for package '{name}'")
merged["type"] = pkg_type
return merged
def resolve_pkg_source_name(spec: Dict[str, Any], package_manager: str) -> str:
sources = spec.get("sources", {})
if not isinstance(sources, dict):
return spec["name"]
keys = [package_manager]
if package_manager == "apt":
keys.append("apt-get")
if package_manager == "apt-get":
keys.append("apt")
for key in keys:
value = sources.get(key)
if isinstance(value, str) and value:
return value
return spec["name"]
def platform_lookup_keys(ctx: FlowContext) -> List[str]:
keys = [ctx.platform.platform]
if ctx.platform.os == "macos":
keys.append(f"darwin-{ctx.platform.arch}")
if ctx.platform.arch == "x64":
keys.append(f"{ctx.platform.os}-amd64")
if ctx.platform.os == "macos":
keys.append("darwin-amd64")
ordered: list[str] = []
for key in keys:
if key not in ordered:
ordered.append(key)
return ordered
def profile_template_context(
ctx: FlowContext,
extra_env: Dict[str, str],
extra: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
env_map = dict(os.environ)
env_map.update(extra_env)
template_ctx: Dict[str, Any] = {
"env": env_map,
"os": ctx.platform.os,
"arch": ctx.platform.arch,
}
if extra:
template_ctx.update(extra)
return template_ctx
def render_template_value(value: Any, template_ctx: Dict[str, Any]) -> Any:
if isinstance(value, str):
return substitute_template(value, template_ctx)
if isinstance(value, list):
return [render_template_value(item, template_ctx) for item in value]
if isinstance(value, dict):
return {key: render_template_value(item, template_ctx) for key, item in value.items()}
return value
def resolve_binary_platform_vars(ctx: FlowContext, spec: Dict[str, Any]) -> Dict[str, str]:
platform_vars = {
"os": ctx.platform.os,
"arch": ctx.platform.arch,
}
platform_map = spec.get("platform-map", {})
if isinstance(platform_map, dict):
for key in platform_lookup_keys(ctx):
mapping = platform_map.get(key)
if isinstance(mapping, dict):
for map_key, map_value in mapping.items():
if isinstance(map_value, str):
platform_vars[map_key] = map_value
break
return platform_vars
def resolve_binary_asset(ctx: FlowContext, spec: Dict[str, Any], template_ctx: Dict[str, Any]) -> str:
assets = spec.get("assets", {})
if isinstance(assets, dict) and assets:
for key in platform_lookup_keys(ctx):
value = assets.get(key)
if isinstance(value, str) and value:
return substitute_template(value, template_ctx)
raise FlowError(
f"No binary asset mapping for platform {ctx.platform.platform} in package '{spec['name']}'"
)
pattern = spec.get("asset-pattern")
if not isinstance(pattern, str) or not pattern:
raise FlowError(
f"Binary package '{spec['name']}' must define either 'assets' or 'asset-pattern'"
)
return substitute_template(pattern, template_ctx)
def resolve_binary_download_url(
spec: Dict[str, Any],
asset_name: str,
template_ctx: Dict[str, Any],
) -> str:
source = spec.get("source")
if not isinstance(source, str) or not source:
raise FlowError(f"Binary package '{spec['name']}' is missing 'source'")
version = str(spec.get("version", ""))
if source.startswith("github:"):
owner_repo = source[len("github:") :]
if not owner_repo:
raise FlowError(f"Invalid github source in package '{spec['name']}'")
if not version:
raise FlowError(f"Binary package '{spec['name']}' requires 'version'")
return f"https://github.com/{owner_repo}/releases/download/v{version}/{asset_name}"
rendered_source = substitute_template(source, template_ctx)
if not asset_name or rendered_source.endswith(asset_name):
return rendered_source
if rendered_source.endswith("/"):
return rendered_source + asset_name
return f"{rendered_source}/{asset_name}"
def strip_prefix(path: Path, prefix: Path) -> Path:
try:
return path.relative_to(prefix)
except ValueError:
return path
def validate_declared_install_path(package_name: str, declared_path: Path) -> None:
if declared_path.is_absolute():
raise FlowError(f"Install path for '{package_name}' must be relative: {declared_path}")
if any(part == ".." for part in declared_path.parts):
raise FlowError(
f"Install path for '{package_name}' must not include parent traversal: {declared_path}"
)
def install_destination(kind: str) -> Path:
home = Path.home()
if kind == "bin":
return home / ".local" / "bin"
if kind == "share":
return home / ".local" / "share"
if kind == "man":
return home / ".local" / "share" / "man"
if kind == "lib":
return home / ".local" / "lib"
raise FlowError(f"Unsupported install section: {kind}")
def install_strip_prefix(kind: str) -> Path:
if kind == "bin":
return Path("bin")
if kind == "share":
return Path("share")
if kind == "man":
return Path("share") / "man"
if kind == "lib":
return Path("lib")
return Path(".")
class BinaryInstaller:
def __init__(self, ctx: FlowContext):
self.ctx = ctx
self.fs = ctx.runtime.fs
def copy_install_item(self, kind: str, src: Path, declared_path: Path) -> None:
destination_root = install_destination(kind)
stripped = strip_prefix(declared_path, install_strip_prefix(kind))
destination = destination_root / stripped
if src.is_dir():
self.fs.copy_tree(src, destination)
else:
self.fs.copy_file(src, destination)
if kind == "bin":
destination.chmod(destination.stat().st_mode | 0o111)
def install(self, spec: Dict[str, Any], extra_env: Dict[str, str], *, dry_run: bool) -> None:
version = str(spec.get("version", ""))
platform_vars = resolve_binary_platform_vars(self.ctx, spec)
template_ctx = profile_template_context(
self.ctx,
extra_env,
{"name": spec["name"], "version": version, **platform_vars},
)
asset_name = resolve_binary_asset(self.ctx, spec, template_ctx)
template_ctx["asset"] = asset_name
download_url = resolve_binary_download_url(spec, asset_name, template_ctx)
template_ctx["downloadUrl"] = download_url
if dry_run:
self.ctx.console.info(f"[{spec['name']}] Would download: {download_url}")
return
install_map = spec.get("install", {})
if not isinstance(install_map, dict) or not install_map:
raise FlowError(f"Binary package '{spec['name']}' must define non-empty 'install'")
with tempfile.TemporaryDirectory(prefix=f"flow-{spec['name']}-") as tmp:
tmp_dir = Path(tmp)
archive_path = tmp_dir / asset_name
extracted = tmp_dir / "extract"
self.ctx.console.info(f"Downloading {spec['name']} from {download_url}")
with urllib.request.urlopen(download_url, timeout=60) as response:
self.fs.write_bytes(archive_path, response.read())
self.fs.ensure_dir(extracted)
try:
shutil.unpack_archive(str(archive_path), str(extracted))
except (shutil.ReadError, ValueError) as exc:
raise FlowError(f"Could not extract archive for '{spec['name']}': {exc}") from exc
extract_dir_value = substitute_template(str(spec.get("extract-dir", ".")), template_ctx)
source_root = extracted if extract_dir_value == "." else extracted / extract_dir_value
if not source_root.exists():
raise FlowError(
f"extract-dir '{extract_dir_value}' not found for package '{spec['name']}'"
)
source_root_resolved = source_root.resolve(strict=False)
for kind in ("bin", "share", "man", "lib"):
items = install_map.get(kind, [])
if not isinstance(items, list):
continue
for raw_item in items:
if not isinstance(raw_item, str):
continue
rendered = substitute_template(raw_item, template_ctx)
declared_path = Path(rendered)
validate_declared_install_path(spec["name"], declared_path)
source = (source_root / declared_path).resolve(strict=False)
if not str(source).startswith(str(source_root_resolved)):
raise FlowError(
f"Install path escapes extract-dir for '{spec['name']}': {declared_path}"
)
if not source.exists():
raise FlowError(
f"Install path not found for '{spec['name']}': {declared_path}"
)
self.copy_install_item(kind, source, declared_path)

View File

@@ -1,184 +0,0 @@
"""SSH target parsing and connection behavior for `flow enter`."""
from __future__ import annotations
import getpass
import os
from typing import Optional
from flow.core.config import FlowContext
from flow.core.errors import FlowError
# Default host templates per platform
HOST_TEMPLATES = {
"orb": "<namespace>.orb",
"utm": "<namespace>.utm.local",
"core": "<namespace>.core.lan",
}
def parse_target(target: str) -> tuple[Optional[str], Optional[str], Optional[str]]:
"""Parse [user@]namespace@platform into (user, namespace, platform)."""
user = None
namespace = None
platform = None
if "@" in target:
platform = target.rsplit("@", 1)[1]
rest = target.rsplit("@", 1)[0]
else:
rest = target
if "@" in rest:
user = rest.rsplit("@", 1)[0]
namespace = rest.rsplit("@", 1)[1]
else:
namespace = rest
return user, namespace, platform
def build_destination(user: str, host: str, preserve_host_user: bool = False) -> str:
if "@" in host:
host_user, host_name = host.rsplit("@", 1)
effective_user = host_user if preserve_host_user else (user or host_user)
return f"{effective_user}@{host_name}"
if not user:
return host
return f"{user}@{host}"
def terminfo_fix_command(term: Optional[str], destination: str) -> Optional[str]:
normalized_term = (term or "").strip().lower()
if normalized_term == "xterm-ghostty":
return f"infocmp -x xterm-ghostty | ssh {destination} -- tic -x -"
if normalized_term == "wezterm":
return (
f"ssh {destination} -- sh -lc "
"'tempfile=$(mktemp) && curl -fsSL -o \"$tempfile\" "
"https://raw.githubusercontent.com/wezterm/wezterm/main/termwiz/data/wezterm.terminfo "
"&& tic -x -o ~/.terminfo \"$tempfile\" && rm \"$tempfile\"'"
)
return None
def handle_terminfo_warning(
ctx: FlowContext,
term: Optional[str],
destination: str,
dry_run: bool,
) -> bool:
install_cmd = terminfo_fix_command(term, destination)
if not install_cmd:
return True
ctx.console.warn(
f"Detected TERM={term}. Remote host may be missing this terminfo entry."
)
ctx.console.info("flow will not install or modify terminfo on the target automatically.")
ctx.console.info("If needed, run this command manually before reconnecting:")
print(f" {install_cmd}")
if dry_run or not os.isatty(0):
return True
response = ""
try:
response = input("Continue with SSH connection? [Y/n] ").strip().lower()
except EOFError:
return True
if response in {"n", "no"}:
ctx.console.warn("Cancelled before opening SSH session")
return False
return True
class EnterService:
"""Resolve enter targets and execute the SSH handoff."""
def __init__(self, ctx: FlowContext):
self.ctx = ctx
def run(self, args) -> None:
if os.environ.get("DF_NAMESPACE") and os.environ.get("DF_PLATFORM"):
ns = os.environ["DF_NAMESPACE"]
plat = os.environ["DF_PLATFORM"]
raise FlowError(
f"Not recommended inside an instance. Currently in: {ns}@{plat}"
)
user, namespace, platform = parse_target(args.target)
if args.user:
user = args.user
if args.namespace:
namespace = args.namespace
if args.platform:
platform = args.platform
user_was_explicit = bool(user)
if not user:
user = os.environ.get("USER") or getpass.getuser()
if not namespace:
raise FlowError("Namespace is required in target")
if not platform:
raise FlowError("Platform is required in target")
host_template = HOST_TEMPLATES.get(platform)
ssh_identity = None
for target in self.ctx.config.targets:
if target.namespace == namespace and target.platform == platform:
host_template = target.ssh_host
ssh_identity = target.ssh_identity
break
if not host_template:
raise FlowError(f"Unknown platform: {platform}")
ssh_host = host_template.replace("<namespace>", namespace)
destination = build_destination(
user,
ssh_host,
preserve_host_user=not user_was_explicit,
)
if not handle_terminfo_warning(
self.ctx,
os.environ.get("TERM"),
destination,
dry_run=args.dry_run,
):
raise FlowError("Cancelled before opening SSH session")
ssh_cmd = ["ssh", "-tt"]
if ssh_identity:
ssh_cmd.extend(["-i", os.path.expanduser(ssh_identity)])
ssh_cmd.append(destination)
if not args.no_tmux:
ssh_cmd.extend(
[
"tmux",
"new-session",
"-As",
args.session,
"-e",
f"DF_NAMESPACE={namespace}",
"-e",
f"DF_PLATFORM={platform}",
]
)
if args.dry_run:
self.ctx.console.info("Dry run command:")
print(" " + " ".join(ssh_cmd))
return
os.execvp("ssh", ssh_cmd)

View File

@@ -1,115 +0,0 @@
"""Tests for flow.core.action."""
from flow.core.action import Action, ActionExecutor
from flow.core.console import ConsoleLogger
def test_action_defaults():
a = Action(type="test", description="Test action")
assert a.status == "pending"
assert a.error is None
assert a.skip_on_error is True
assert a.os_filter is None
assert a.data == {}
def test_executor_register_and_execute(capsys):
console = ConsoleLogger()
executor = ActionExecutor(console)
results = []
def handler(data):
results.append(data["key"])
executor.register("test-action", handler)
actions = [
Action(type="test-action", description="Do thing", data={"key": "value1"}),
Action(type="test-action", description="Do another", data={"key": "value2"}),
]
executor.execute(actions, current_os="linux")
assert results == ["value1", "value2"]
assert actions[0].status == "completed"
assert actions[1].status == "completed"
def test_executor_dry_run(capsys):
console = ConsoleLogger()
executor = ActionExecutor(console)
executed = []
executor.register("test", lambda data: executed.append(1))
actions = [Action(type="test", description="Should not run")]
executor.execute(actions, dry_run=True)
assert executed == [] # Nothing executed
out = capsys.readouterr().out
assert "EXECUTION PLAN" in out
def test_executor_skip_on_error(capsys):
console = ConsoleLogger()
executor = ActionExecutor(console)
def failing_handler(data):
raise RuntimeError("boom")
executor.register("fail", failing_handler)
actions = [
Action(type="fail", description="Will fail", skip_on_error=True),
Action(type="fail", description="Should still run", skip_on_error=True),
]
executor.execute(actions, current_os="linux")
assert actions[0].status == "skipped"
assert actions[1].status == "skipped"
def test_executor_critical_failure_stops(capsys):
console = ConsoleLogger()
executor = ActionExecutor(console)
def failing_handler(data):
raise RuntimeError("critical failure")
executor.register("fail", failing_handler)
executor.register("ok", lambda data: None)
actions = [
Action(type="fail", description="Critical", skip_on_error=False),
Action(type="ok", description="Should not run"),
]
executor.execute(actions, current_os="linux")
assert actions[0].status == "failed"
assert actions[1].status == "pending" # Never reached
def test_executor_os_filter(capsys):
console = ConsoleLogger()
executor = ActionExecutor(console)
executed = []
executor.register("test", lambda data: executed.append(data.get("name")))
actions = [
Action(type="test", description="Linux only", data={"name": "linux"}, os_filter="linux"),
Action(type="test", description="macOS only", data={"name": "macos"}, os_filter="macos"),
Action(type="test", description="Any OS", data={"name": "any"}),
]
executor.execute(actions, current_os="linux")
assert "linux" in executed
assert "any" in executed
assert "macos" not in executed
def test_executor_no_handler(capsys):
console = ConsoleLogger()
executor = ActionExecutor(console)
actions = [Action(type="unknown", description="No handler registered")]
executor.execute(actions, current_os="linux")
assert actions[0].status == "skipped"

View File

@@ -1,215 +0,0 @@
"""Tests for flow.commands.bootstrap helpers and schema behavior."""
import os
from pathlib import Path
import pytest
from flow.commands.bootstrap import (
_ensure_required_variables,
_get_profiles,
_install_binary_package,
_normalize_profile_package_entry,
_resolve_package_manager,
_resolve_package_spec,
_resolve_pkg_source_name,
)
from flow.core.config import AppConfig, FlowContext
from flow.core.console import ConsoleLogger
from flow.core.platform import PlatformInfo
@pytest.fixture
def ctx():
return FlowContext(
config=AppConfig(),
manifest={
"packages": [
{
"name": "fd",
"type": "pkg",
"sources": {"apt": "fd-find", "dnf": "fd-find", "brew": "fd"},
},
{
"name": "neovim",
"type": "binary",
"source": "github:neovim/neovim",
"version": "0.10.4",
"asset-pattern": "nvim-{{os}}-{{arch}}.tar.gz",
"platform-map": {"linux-x64": {"os": "linux", "arch": "x64"}},
"install": {"bin": ["bin/nvim"]},
},
]
},
platform=PlatformInfo(os="linux", arch="x64", platform="linux-x64"),
console=ConsoleLogger(),
)
def test_get_profiles_from_manifest(ctx):
ctx.manifest = {"profiles": {"linux": {"os": "linux"}}}
assert "linux" in _get_profiles(ctx)
def test_get_profiles_rejects_environments(ctx):
ctx.manifest = {"environments": {"legacy": {"os": "linux"}}}
with pytest.raises(RuntimeError, match="no longer supported"):
_get_profiles(ctx)
def test_resolve_package_manager_explicit_value(ctx):
assert _resolve_package_manager(ctx, {"os": "linux", "package-manager": "dnf"}) == "dnf"
def test_resolve_package_manager_linux_auto_apt(monkeypatch, ctx):
monkeypatch.setattr("flow.services.bootstrap.shutil.which", lambda name: "/usr/bin/apt" if name == "apt" else None)
assert _resolve_package_manager(ctx, {"os": "linux"}) == "apt"
def test_resolve_package_manager_linux_auto_dnf(monkeypatch, ctx):
monkeypatch.setattr("flow.services.bootstrap.shutil.which", lambda name: "/usr/bin/dnf" if name == "dnf" else None)
assert _resolve_package_manager(ctx, {"os": "linux"}) == "dnf"
def test_resolve_package_manager_requires_os(ctx):
with pytest.raises(RuntimeError, match="must be set"):
_resolve_package_manager(ctx, {})
def test_normalize_package_entry_string():
assert _normalize_profile_package_entry("git") == {"name": "git"}
def test_normalize_package_entry_type_prefix():
assert _normalize_profile_package_entry("cask/wezterm") == {"name": "wezterm", "type": "cask"}
def test_normalize_package_entry_object():
out = _normalize_profile_package_entry({"name": "docker", "allow_sudo": True})
assert out["name"] == "docker"
assert out["allow_sudo"] is True
def test_resolve_package_spec_uses_catalog_type(ctx):
catalog = {
"fd": {
"name": "fd",
"type": "pkg",
"sources": {"apt": "fd-find"},
}
}
resolved = _resolve_package_spec(catalog, {"name": "fd"})
assert resolved["type"] == "pkg"
assert resolved["sources"]["apt"] == "fd-find"
def test_resolve_package_spec_defaults_to_pkg(ctx):
resolved = _resolve_package_spec({}, {"name": "git"})
assert resolved["type"] == "pkg"
def test_resolve_package_spec_profile_override(ctx):
catalog = {
"neovim": {
"name": "neovim",
"type": "binary",
"version": "0.10.4",
}
}
resolved = _resolve_package_spec(catalog, {"name": "neovim", "post-install": "echo ok"})
assert resolved["type"] == "binary"
assert resolved["post-install"] == "echo ok"
def test_resolve_pkg_source_name_with_mapping(ctx):
spec = {"name": "fd", "sources": {"apt": "fd-find", "dnf": "fd-find", "brew": "fd"}}
assert _resolve_pkg_source_name(spec, "apt") == "fd-find"
assert _resolve_pkg_source_name(spec, "dnf") == "fd-find"
assert _resolve_pkg_source_name(spec, "brew") == "fd"
def test_resolve_pkg_source_name_fallback_to_name(ctx):
spec = {"name": "ripgrep", "sources": {"apt": "ripgrep"}}
assert _resolve_pkg_source_name(spec, "dnf") == "ripgrep"
def test_ensure_required_variables_missing_raises():
with pytest.raises(RuntimeError, match="Missing required environment variables"):
_ensure_required_variables({"requires": ["USER_EMAIL", "TARGET_HOSTNAME"]}, {"USER_EMAIL": "a@b"})
def test_ensure_required_variables_accepts_vars(monkeypatch):
env = dict(os.environ)
env["USER_EMAIL"] = "a@b"
env["TARGET_HOSTNAME"] = "devbox"
_ensure_required_variables({"requires": ["USER_EMAIL", "TARGET_HOSTNAME"]}, env)
class _FakeResponse:
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def read(self):
return b"archive"
def _patch_binary_download(monkeypatch, after_unpack=None):
monkeypatch.setattr(
"flow.services.bootstrap.urllib.request.urlopen",
lambda *args, **kwargs: _FakeResponse(),
)
def _fake_unpack(_archive, extract_dir):
extracted = Path(extract_dir)
extracted.mkdir(parents=True, exist_ok=True)
if after_unpack:
after_unpack(extracted)
monkeypatch.setattr("flow.services.bootstrap.shutil.unpack_archive", _fake_unpack)
def test_install_binary_package_rejects_absolute_declared_path(monkeypatch, tmp_path, ctx):
absolute_item = tmp_path / "outside-bin"
absolute_item.write_text("binary")
_patch_binary_download(monkeypatch)
monkeypatch.setattr(
"flow.services.bootstrap._copy_install_item",
lambda *args, **kwargs: pytest.fail("_copy_install_item should not be called"),
)
spec = {
"name": "demo",
"type": "binary",
"source": "https://example.invalid/demo",
"asset-pattern": "demo.tar.gz",
"install": {"bin": [str(absolute_item)]},
}
with pytest.raises(RuntimeError, match="must be relative"):
_install_binary_package(ctx, spec, {}, dry_run=False)
def test_install_binary_package_rejects_parent_traversal_declared_path(monkeypatch, ctx):
def _after_unpack(extracted):
(extracted.parent / "escape-bin").write_text("binary")
_patch_binary_download(monkeypatch, after_unpack=_after_unpack)
monkeypatch.setattr(
"flow.services.bootstrap._copy_install_item",
lambda *args, **kwargs: pytest.fail("_copy_install_item should not be called"),
)
spec = {
"name": "demo",
"type": "binary",
"source": "https://example.invalid/demo",
"asset-pattern": "demo.tar.gz",
"install": {"bin": ["../escape-bin"]},
}
with pytest.raises(RuntimeError, match="parent traversal"):
_install_binary_package(ctx, spec, {}, dry_run=False)

View File

@@ -1,74 +0,0 @@
"""Tests for command modules — registration and target parsing."""
from flow.commands.enter import _parse_target, _terminfo_fix_command
from flow.commands.container import _cname, _parse_image_ref
class TestParseTarget:
def test_full_target(self):
user, ns, plat = _parse_target("root@personal@orb")
assert user == "root"
assert ns == "personal"
assert plat == "orb"
def test_no_user(self):
user, ns, plat = _parse_target("personal@orb")
assert user is None
assert ns == "personal"
assert plat == "orb"
def test_namespace_only(self):
user, ns, plat = _parse_target("personal")
assert user is None
assert ns == "personal"
assert plat is None
class TestTerminfoFixCommand:
def test_ghostty_command(self):
cmd = _terminfo_fix_command("xterm-ghostty", "devbox.core.lan")
assert cmd == "infocmp -x xterm-ghostty | ssh devbox.core.lan -- tic -x -"
def test_wezterm_command(self):
cmd = _terminfo_fix_command("wezterm", "user@devbox.core.lan")
assert cmd is not None
assert "wezterm.terminfo" in cmd
assert "ssh user@devbox.core.lan" in cmd
def test_unknown_term(self):
assert _terminfo_fix_command("xterm-256color", "devbox.core.lan") is None
class TestCname:
def test_adds_prefix(self):
assert _cname("api") == "dev-api"
def test_no_double_prefix(self):
assert _cname("dev-api") == "dev-api"
class TestParseImageRef:
def test_simple_image(self):
ref, repo, tag, label = _parse_image_ref("node")
assert ref == "registry.tomastm.com/node:latest"
assert tag == "latest"
def test_tm0_shorthand(self):
ref, repo, tag, label = _parse_image_ref("tm0/node")
assert "registry.tomastm.com" in ref
assert "node" in ref
def test_docker_shorthand(self):
ref, repo, tag, label = _parse_image_ref("docker/python")
assert "docker.io" in ref
assert "python" in ref
def test_with_tag(self):
ref, repo, tag, label = _parse_image_ref("node:20")
assert tag == "20"
assert ":20" in ref
def test_full_registry(self):
ref, repo, tag, label = _parse_image_ref("ghcr.io/user/image:v1")
assert ref == "ghcr.io/user/image:v1"
assert tag == "v1"

View File

@@ -1,77 +0,0 @@
"""Tests for flow.core.config."""
import pytest
from flow.core.config import AppConfig, load_config, load_manifest
def test_load_config_missing_path(tmp_path):
cfg = load_config(tmp_path / "nonexistent")
assert isinstance(cfg, AppConfig)
assert cfg.dotfiles_url == ""
assert cfg.container_registry == "registry.tomastm.com"
assert cfg.dotfiles_pull_before_edit is True
def test_load_config_merged_yaml(tmp_path):
(tmp_path / "10-config.yaml").write_text(
"repository:\n"
" dotfiles-url: git@github.com:user/dots.git\n"
" dotfiles-branch: dev\n"
" pull-before-edit: false\n"
"paths:\n"
" projects-dir: ~/code\n"
"defaults:\n"
" container-registry: my.registry.com\n"
" container-tag: v1\n"
" tmux-session: main\n"
"targets:\n"
" personal: orb personal@orb\n"
" work@ec2: work.ec2.internal ~/.ssh/id_work\n"
)
cfg = load_config(tmp_path)
assert cfg.dotfiles_url == "git@github.com:user/dots.git"
assert cfg.dotfiles_branch == "dev"
assert cfg.dotfiles_pull_before_edit is False
assert cfg.projects_dir == "~/code"
assert cfg.container_registry == "my.registry.com"
assert cfg.container_tag == "v1"
assert cfg.tmux_session == "main"
assert len(cfg.targets) == 2
assert cfg.targets[0].namespace == "personal"
assert cfg.targets[1].ssh_identity == "~/.ssh/id_work"
def test_load_config_pull_before_edit_string_true(tmp_path):
(tmp_path / "10-config.yaml").write_text(
"repository:\n"
" pull-before-edit: yes\n"
)
cfg = load_config(tmp_path)
assert cfg.dotfiles_pull_before_edit is True
def test_load_manifest_missing_path(tmp_path):
result = load_manifest(tmp_path / "nonexistent")
assert result == {}
def test_load_manifest_valid_directory(tmp_path):
(tmp_path / "manifest.yaml").write_text(
"profiles:\n"
" linux-vm:\n"
" os: linux\n"
" hostname: devbox\n"
)
result = load_manifest(tmp_path)
assert result["profiles"]["linux-vm"]["os"] == "linux"
def test_load_manifest_non_dict_raises(tmp_path):
bad = tmp_path / "bad.yaml"
bad.write_text("- a\n- b\n")
with pytest.raises(RuntimeError, match="must contain a mapping"):
load_manifest(bad)

View File

@@ -1,95 +0,0 @@
"""Tests for flow.core.console."""
from flow.core.console import ConsoleLogger
def test_console_info(capsys):
c = ConsoleLogger()
c.info("hello")
out = capsys.readouterr().out
assert "[INFO]" in out
assert "hello" in out
def test_console_warn(capsys):
c = ConsoleLogger()
c.warn("caution")
out = capsys.readouterr().out
assert "[WARN]" in out
assert "caution" in out
def test_console_error(capsys):
c = ConsoleLogger()
c.error("bad thing")
out = capsys.readouterr().out
assert "[ERROR]" in out
assert "bad thing" in out
def test_console_success(capsys):
c = ConsoleLogger()
c.success("done")
out = capsys.readouterr().out
assert "[SUCCESS]" in out
assert "done" in out
def test_console_step_lifecycle(capsys):
c = ConsoleLogger()
c.step_start(1, 3, "Test step")
c.step_command("echo hi")
c.step_output("hi")
c.step_complete("Done")
out = capsys.readouterr().out
assert "Step 1/3" in out
assert "$ echo hi" in out
assert "Done" in out
def test_console_step_skip(capsys):
c = ConsoleLogger()
c.start_time = 0
c.step_skip("not needed")
out = capsys.readouterr().out
assert "Skipped" in out
def test_console_step_fail(capsys):
c = ConsoleLogger()
c.start_time = 0
c.step_fail("exploded")
out = capsys.readouterr().out
assert "Failed" in out
def test_console_table(capsys):
c = ConsoleLogger()
c.table(["NAME", "VALUE"], [["foo", "bar"], ["baz", "qux"]])
out = capsys.readouterr().out
assert "NAME" in out
assert "foo" in out
assert "baz" in out
def test_console_table_empty(capsys):
c = ConsoleLogger()
c.table(["NAME"], [])
out = capsys.readouterr().out
assert out == ""
def test_console_section_header(capsys):
c = ConsoleLogger()
c.section_header("Test", "sub")
out = capsys.readouterr().out
assert "TEST" in out
assert "sub" in out
def test_console_plan_header(capsys):
c = ConsoleLogger()
c.plan_header("My Plan", 5)
out = capsys.readouterr().out
assert "MY PLAN" in out
assert "5 actions" in out

View File

@@ -1,104 +0,0 @@
"""Tests for flow.services.dotfiles discovery and path resolution."""
import pytest
from flow.services.dotfiles import _collect_home_specs, _discover_packages, _resolve_edit_target, _walk_package
from flow.core.config import AppConfig, FlowContext
from flow.core.console import ConsoleLogger
from flow.core.platform import PlatformInfo
def _make_tree(tmp_path):
flow_root = tmp_path
shared = flow_root / "_shared"
(shared / "zsh").mkdir(parents=True)
(shared / "zsh" / ".zshrc").write_text("# zsh")
(shared / "tmux").mkdir(parents=True)
(shared / "tmux" / ".tmux.conf").write_text("# tmux")
profile = flow_root / "work"
(profile / "git").mkdir(parents=True)
(profile / "git" / ".gitconfig").write_text("[user]\nname = Work")
return tmp_path
def _ctx() -> FlowContext:
return FlowContext(
config=AppConfig(),
manifest={"profiles": {"work": {"os": "linux", "configs": {"skip": []}}}},
platform=PlatformInfo(os="linux", arch="x64", platform="linux-x64"),
console=ConsoleLogger(),
)
def test_discover_packages_shared_only(tmp_path):
tree = _make_tree(tmp_path)
packages = _discover_packages(tree)
assert "zsh" in packages
assert "tmux" in packages
assert "git" not in packages
def test_discover_packages_with_profile(tmp_path):
tree = _make_tree(tmp_path)
packages = _discover_packages(tree, profile="work")
assert "zsh" in packages
assert "tmux" in packages
assert "git" in packages
def test_discover_packages_profile_overrides_shared(tmp_path):
tree = _make_tree(tmp_path)
profile_zsh = tree / "work" / "zsh"
profile_zsh.mkdir(parents=True)
(profile_zsh / ".zshrc").write_text("# work zsh")
with pytest.raises(RuntimeError, match="Conflicting dotfile targets"):
_collect_home_specs(_ctx(), tree, tmp_path / "home", "work", set(), None)
def test_walk_package_returns_relative_paths(tmp_path):
tree = _make_tree(tmp_path)
source = tree / "_shared" / "zsh"
pairs = list(_walk_package(source))
assert len(pairs) == 1
src, rel = pairs[0]
assert src.name == ".zshrc"
assert str(rel) == ".zshrc"
def test_resolve_edit_target_package(tmp_path):
tree = _make_tree(tmp_path)
target = _resolve_edit_target("zsh", dotfiles_dir=tree)
assert target == tree / "_shared" / "zsh"
def test_resolve_edit_target_repo_path(tmp_path):
tree = _make_tree(tmp_path)
target = _resolve_edit_target("_shared/zsh/.zshrc", dotfiles_dir=tree)
assert target == tree / "_shared" / "zsh" / ".zshrc"
def test_resolve_edit_target_rejects_parent_traversal(tmp_path):
tree = _make_tree(tmp_path / "repo")
outside = tmp_path / "outside.txt"
outside.write_text("secret")
target = _resolve_edit_target("../outside.txt", dotfiles_dir=tree)
assert target is None
def test_resolve_edit_target_rejects_nested_repo_escape(tmp_path):
tree = _make_tree(tmp_path / "repo")
outside = tmp_path / "escape.txt"
outside.write_text("secret")
target = _resolve_edit_target("_shared/../../escape.txt", dotfiles_dir=tree)
assert target is None
def test_resolve_edit_target_missing_returns_none(tmp_path):
tree = _make_tree(tmp_path)
assert _resolve_edit_target("does-not-exist", dotfiles_dir=tree) is None

View File

@@ -1,296 +0,0 @@
"""Containerized e2e tests for dotfiles link safety.
These tests are opt-in and run only when FLOW_RUN_E2E_CONTAINER=1.
"""
import os
import shutil
import subprocess
import uuid
from pathlib import Path
import pytest
REPO_ROOT = Path(__file__).resolve().parents[1]
def _runtime_available(runtime: str) -> bool:
if shutil.which(runtime) is None:
return False
result = subprocess.run(
[runtime, "info"],
capture_output=True,
text=True,
check=False,
)
return result.returncode == 0
def _container_runtime() -> str | None:
preferred = os.environ.get("FLOW_E2E_CONTAINER_RUNTIME")
candidates = [preferred] if preferred else ["podman", "docker"]
for runtime in candidates:
if not runtime:
continue
if _runtime_available(runtime):
return runtime
return None
def _require_container_e2e() -> str:
if os.environ.get("FLOW_RUN_E2E_CONTAINER") != "1":
pytest.skip("Set FLOW_RUN_E2E_CONTAINER=1 to run container e2e tests")
runtime = _container_runtime()
if runtime is None:
pytest.skip("Podman or Docker is required for container e2e tests")
return runtime
@pytest.fixture(scope="module")
def e2e_runtime():
return _require_container_e2e()
@pytest.fixture(scope="module")
def e2e_image(tmp_path_factory, e2e_runtime):
runtime = e2e_runtime
context_dir = tmp_path_factory.mktemp("flow-e2e-docker-context")
dockerfile = context_dir / "Dockerfile"
dockerfile.write_text(
"FROM python:3.11-slim\n"
"RUN apt-get update && apt-get install -y --no-install-recommends sudo && rm -rf /var/lib/apt/lists/*\n"
"RUN pip install --no-cache-dir pyyaml\n"
"RUN useradd -m -s /bin/bash flow\n"
"RUN echo 'flow ALL=(ALL) NOPASSWD:ALL' > /etc/sudoers.d/flow && chmod 440 /etc/sudoers.d/flow\n"
"USER flow\n"
"WORKDIR /workspace\n"
)
tag = f"flow-e2e-{uuid.uuid4().hex[:10]}"
subprocess.run(
[runtime, "build", "-t", tag, str(context_dir)],
check=True,
capture_output=True,
text=True,
)
try:
yield tag
finally:
subprocess.run([runtime, "rmi", "-f", tag], capture_output=True, text=True, check=False)
def _run_in_container(runtime: str, image_tag: str, script: str) -> subprocess.CompletedProcess:
return subprocess.run(
[
runtime,
"run",
"--rm",
"-v",
f"{REPO_ROOT}:/workspace/flow-cli:ro",
image_tag,
"bash",
"-lc",
script,
],
capture_output=True,
text=True,
check=False,
)
def _assert_ok(run: subprocess.CompletedProcess) -> None:
if run.returncode != 0:
raise AssertionError(f"Container e2e failed:\nSTDOUT:\n{run.stdout}\nSTDERR:\n{run.stderr}")
def test_e2e_link_and_undo_with_root_targets(e2e_runtime, e2e_image):
script = r"""
set -euo pipefail
export HOME=/home/flow
export XDG_DATA_HOME=/tmp/xdg-data
export XDG_CONFIG_HOME=/tmp/xdg-config
export XDG_STATE_HOME=/tmp/xdg-state
mkdir -p "$XDG_DATA_HOME/flow/dotfiles" "$XDG_CONFIG_HOME/flow" "$XDG_STATE_HOME/flow"
dot="$XDG_DATA_HOME/flow/dotfiles"
mkdir -p "$dot/_shared/zsh"
mkdir -p "$dot/_shared/rootpkg/_root/tmp"
echo '# managed zshrc' > "$dot/_shared/zsh/.zshrc"
echo 'root-target' > "$dot/_shared/rootpkg/_root/tmp/flow-e2e-root-target"
echo '# before' > "$HOME/.zshrc"
PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --force
test -L "$HOME/.zshrc"
test -L /tmp/flow-e2e-root-target
PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles undo
test -f "$HOME/.zshrc"
test ! -L "$HOME/.zshrc"
grep -q '^# before$' "$HOME/.zshrc"
test ! -e /tmp/flow-e2e-root-target
"""
_assert_ok(_run_in_container(e2e_runtime, e2e_image, script))
def test_e2e_dry_run_force_is_read_only_in_both_flag_orders(e2e_runtime, e2e_image):
script = r"""
set -euo pipefail
export HOME=/home/flow
export XDG_DATA_HOME=/tmp/xdg-data
export XDG_CONFIG_HOME=/tmp/xdg-config
export XDG_STATE_HOME=/tmp/xdg-state
mkdir -p "$XDG_DATA_HOME/flow/dotfiles" "$XDG_CONFIG_HOME/flow" "$XDG_STATE_HOME/flow"
dot="$XDG_DATA_HOME/flow/dotfiles"
mkdir -p "$dot/_shared/zsh"
echo '# managed zshrc' > "$dot/_shared/zsh/.zshrc"
echo '# original' > "$HOME/.zshrc"
PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --dry-run --force
PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --force --dry-run
test -f "$HOME/.zshrc"
test ! -L "$HOME/.zshrc"
grep -q '^# original$' "$HOME/.zshrc"
state="$XDG_STATE_HOME/flow/linked.json"
if [ -f "$state" ]; then
python - "$state" <<'PY'
import json, sys
data = json.load(open(sys.argv[1], encoding="utf-8"))
assert data.get("links", {}) == {}, data
assert "last_transaction" not in data, data
PY
fi
"""
_assert_ok(_run_in_container(e2e_runtime, e2e_image, script))
def test_e2e_unmanaged_conflict_without_force_is_non_destructive(e2e_runtime, e2e_image):
script = r"""
set -euo pipefail
export HOME=/home/flow
export XDG_DATA_HOME=/tmp/xdg-data
export XDG_CONFIG_HOME=/tmp/xdg-config
export XDG_STATE_HOME=/tmp/xdg-state
mkdir -p "$XDG_DATA_HOME/flow/dotfiles" "$XDG_CONFIG_HOME/flow" "$XDG_STATE_HOME/flow"
dot="$XDG_DATA_HOME/flow/dotfiles"
mkdir -p "$dot/_shared/zsh"
echo '# managed zshrc' > "$dot/_shared/zsh/.zshrc"
echo '# user-file' > "$HOME/.zshrc"
set +e
PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link
rc=$?
set -e
test "$rc" -ne 0
test -f "$HOME/.zshrc"
test ! -L "$HOME/.zshrc"
grep -q '^# user-file$' "$HOME/.zshrc"
"""
_assert_ok(_run_in_container(e2e_runtime, e2e_image, script))
def test_e2e_managed_drift_requires_force(e2e_runtime, e2e_image):
script = r"""
set -euo pipefail
export HOME=/home/flow
export XDG_DATA_HOME=/tmp/xdg-data
export XDG_CONFIG_HOME=/tmp/xdg-config
export XDG_STATE_HOME=/tmp/xdg-state
mkdir -p "$XDG_DATA_HOME/flow/dotfiles" "$XDG_CONFIG_HOME/flow" "$XDG_STATE_HOME/flow"
dot="$XDG_DATA_HOME/flow/dotfiles"
mkdir -p "$dot/_shared/zsh"
echo '# managed zshrc' > "$dot/_shared/zsh/.zshrc"
PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --force
test -L "$HOME/.zshrc"
rm -f "$HOME/.zshrc"
echo '# drifted-manual' > "$HOME/.zshrc"
set +e
PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link
rc=$?
set -e
test "$rc" -ne 0
test -f "$HOME/.zshrc"
test ! -L "$HOME/.zshrc"
grep -q '^# drifted-manual$' "$HOME/.zshrc"
"""
_assert_ok(_run_in_container(e2e_runtime, e2e_image, script))
def test_e2e_directory_conflict_is_atomic_even_with_force(e2e_runtime, e2e_image):
script = r"""
set -euo pipefail
export HOME=/home/flow
export XDG_DATA_HOME=/tmp/xdg-data
export XDG_CONFIG_HOME=/tmp/xdg-config
export XDG_STATE_HOME=/tmp/xdg-state
mkdir -p "$XDG_DATA_HOME/flow/dotfiles" "$XDG_CONFIG_HOME/flow" "$XDG_STATE_HOME/flow"
dot="$XDG_DATA_HOME/flow/dotfiles"
mkdir -p "$dot/_shared/zsh" "$dot/_shared/git"
echo '# managed zshrc' > "$dot/_shared/zsh/.zshrc"
echo '[user]' > "$dot/_shared/git/.gitconfig"
mkdir -p "$HOME/.zshrc"
set +e
PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --force
rc=$?
set -e
test "$rc" -ne 0
test -d "$HOME/.zshrc"
test ! -e "$HOME/.gitconfig"
"""
_assert_ok(_run_in_container(e2e_runtime, e2e_image, script))
def test_e2e_undo_after_failed_followup_link_restores_last_transaction(e2e_runtime, e2e_image):
script = r"""
set -euo pipefail
export HOME=/home/flow
export XDG_DATA_HOME=/tmp/xdg-data
export XDG_CONFIG_HOME=/tmp/xdg-config
export XDG_STATE_HOME=/tmp/xdg-state
mkdir -p "$XDG_DATA_HOME/flow/dotfiles" "$XDG_CONFIG_HOME/flow" "$XDG_STATE_HOME/flow"
dot="$XDG_DATA_HOME/flow/dotfiles"
mkdir -p "$dot/_shared/a" "$dot/_shared/b"
echo '# aaa' > "$dot/_shared/a/.a"
echo '# bbb' > "$dot/_shared/b/.b"
echo '# pre-a' > "$HOME/.a"
echo '# pre-b' > "$HOME/.b"
PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --force a
test -L "$HOME/.a"
# Turn .b into a directory to force a fatal conflict, while .a stays desired and unchanged.
rm -f "$HOME/.b"
mkdir -p "$HOME/.b"
set +e
PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles link --force
rc=$?
set -e
test "$rc" -ne 0
PYTHONPATH=/workspace/flow-cli/src python -m flow dotfiles undo
test -f "$HOME/.a"
test ! -L "$HOME/.a"
grep -q '^# pre-a$' "$HOME/.a"
"""
_assert_ok(_run_in_container(e2e_runtime, e2e_image, script))

View File

@@ -1,622 +0,0 @@
"""Tests for dotfiles link planning, root markers, and module sources."""
from argparse import Namespace
import json
import subprocess
from pathlib import Path
import pytest
from flow.services.dotfiles import (
LinkSpec,
_collect_home_specs,
_list_profiles,
_load_link_specs_from_state,
_load_state,
_pull_requires_ack,
_resolved_package_source,
_run_sudo,
run_relink,
run_undo,
_save_link_specs_to_state,
_sync_to_desired,
_sync_modules,
)
from flow.core.config import AppConfig, FlowContext
from flow.core.console import ConsoleLogger
from flow.core.platform import PlatformInfo
def _ctx() -> FlowContext:
return FlowContext(
config=AppConfig(),
manifest={"profiles": {"work": {"os": "linux", "configs": {"skip": []}}}},
platform=PlatformInfo(os="linux", arch="x64", platform="linux-x64"),
console=ConsoleLogger(),
)
def _make_flow_tree(tmp_path: Path) -> Path:
flow_root = tmp_path
(flow_root / "_shared" / "git").mkdir(parents=True)
(flow_root / "_shared" / "git" / ".gitconfig").write_text("shared")
(flow_root / "_shared" / "tmux").mkdir(parents=True)
(flow_root / "_shared" / "tmux" / ".tmux.conf").write_text("tmux")
(flow_root / "work" / "git").mkdir(parents=True)
(flow_root / "work" / "git" / ".gitconfig").write_text("profile")
(flow_root / "_shared" / "dnsmasq" / "_root" / "etc").mkdir(parents=True)
(flow_root / "_shared" / "dnsmasq" / "_root" / "etc" / "hostname").write_text("devbox")
return flow_root
def test_list_profiles_ignores_reserved_dirs(tmp_path):
flow_root = _make_flow_tree(tmp_path)
profiles = _list_profiles(flow_root)
assert profiles == ["work"]
def test_collect_home_specs_conflict_fails(tmp_path):
flow_root = _make_flow_tree(tmp_path)
home = tmp_path / "home"
home.mkdir()
with pytest.raises(RuntimeError, match="Conflicting dotfile targets"):
_collect_home_specs(_ctx(), flow_root, home, "work", set(), None)
def test_collect_home_specs_maps_root_marker_to_absolute(tmp_path):
flow_root = tmp_path
(flow_root / "_shared" / "dnsmasq" / "_root" / "opt" / "homebrew" / "etc").mkdir(parents=True)
src = flow_root / "_shared" / "dnsmasq" / "_root" / "opt" / "homebrew" / "etc" / "dnsmasq.conf"
src.write_text("conf")
home = tmp_path / "home"
home.mkdir()
specs = _collect_home_specs(_ctx(), flow_root, home, None, set(), None)
assert Path("/opt/homebrew/etc/dnsmasq.conf") in specs
assert specs[Path("/opt/homebrew/etc/dnsmasq.conf")].source == src
def test_collect_home_specs_skip_root_marker(tmp_path):
flow_root = tmp_path
(flow_root / "_shared" / "dnsmasq" / "_root" / "etc").mkdir(parents=True)
(flow_root / "_shared" / "dnsmasq" / "_root" / "etc" / "hostname").write_text("devbox")
home = tmp_path / "home"
home.mkdir()
specs = _collect_home_specs(_ctx(), flow_root, home, None, {"_root"}, None)
assert Path("/etc/hostname") not in specs
def test_state_round_trip(tmp_path, monkeypatch):
state_file = tmp_path / "linked.json"
monkeypatch.setattr("flow.services.dotfiles.LINKED_STATE", state_file)
specs = {
Path("/home/user/.gitconfig"): LinkSpec(
source=Path("/repo/_shared/git/.gitconfig"),
target=Path("/home/user/.gitconfig"),
package="_shared/git",
)
}
_save_link_specs_to_state(specs)
loaded = _load_link_specs_from_state()
assert Path("/home/user/.gitconfig") in loaded
assert loaded[Path("/home/user/.gitconfig")].package == "_shared/git"
def test_state_old_format_rejected(tmp_path, monkeypatch):
state_file = tmp_path / "linked.json"
monkeypatch.setattr("flow.services.dotfiles.LINKED_STATE", state_file)
state_file.write_text(
json.dumps(
{
"links": {
"zsh": {
"/home/user/.zshrc": "/repo/.zshrc",
}
}
}
)
)
with pytest.raises(RuntimeError, match="Unsupported linked state format"):
_load_link_specs_from_state()
def test_module_source_requires_sync(tmp_path):
package_root = tmp_path / "_shared" / "nvim"
module_mount = package_root / ".config" / "nvim"
module_mount.mkdir(parents=True)
(module_mount / "_module.yaml").write_text(
"source: github:dummy/example\n"
"ref:\n"
" branch: main\n"
)
with pytest.raises(RuntimeError, match="Run 'flow dotfiles sync' first"):
_resolved_package_source(_ctx(), "_shared/nvim", package_root)
def test_sync_modules_populates_cache_and_resolves_source(tmp_path, monkeypatch):
module_src = tmp_path / "module-src"
module_src.mkdir()
subprocess.run(["git", "init", "-b", "main", str(module_src)], check=True)
(module_src / "init.lua").write_text("-- module")
subprocess.run(["git", "-C", str(module_src), "add", "."], check=True)
subprocess.run(
[
"git",
"-C",
str(module_src),
"-c",
"user.name=Flow Test",
"-c",
"user.email=flow-test@example.com",
"commit",
"-m",
"init module",
],
check=True,
)
dotfiles = tmp_path / "dotfiles"
package_root = dotfiles / "_shared" / "nvim"
module_mount = package_root / ".config" / "nvim"
module_mount.mkdir(parents=True)
(module_mount / "_module.yaml").write_text(
f"source: {module_src}\n"
"ref:\n"
" branch: main\n"
)
(package_root / "notes.txt").write_text("ignore me")
monkeypatch.setattr("flow.services.dotfiles.DOTFILES_DIR", dotfiles)
monkeypatch.setattr("flow.services.dotfiles.MODULES_DIR", tmp_path / "modules")
_sync_modules(_ctx(), verbose=False)
resolved = _resolved_package_source(_ctx(), "_shared/nvim", package_root)
assert (resolved / "init.lua").exists()
def test_module_backed_link_specs_exclude_git_internals(tmp_path, monkeypatch):
module_src = tmp_path / "module-src"
module_src.mkdir()
subprocess.run(["git", "init", "-b", "main", str(module_src)], check=True)
(module_src / "init.lua").write_text("-- module")
subprocess.run(["git", "-C", str(module_src), "add", "."], check=True)
subprocess.run(
[
"git",
"-C",
str(module_src),
"-c",
"user.name=Flow Test",
"-c",
"user.email=flow-test@example.com",
"commit",
"-m",
"init module",
],
check=True,
)
dotfiles = tmp_path / "dotfiles"
package_root = dotfiles / "_shared" / "nvim"
module_mount = package_root / ".config" / "nvim"
module_mount.mkdir(parents=True)
(module_mount / "_module.yaml").write_text(
f"source: {module_src}\n"
"ref:\n"
" branch: main\n"
)
monkeypatch.setattr("flow.services.dotfiles.DOTFILES_DIR", dotfiles)
monkeypatch.setattr("flow.services.dotfiles.MODULES_DIR", tmp_path / "modules")
_sync_modules(_ctx(), verbose=False)
home = tmp_path / "home"
home.mkdir()
specs = _collect_home_specs(_ctx(), dotfiles, home, None, set(), None)
assert home / ".config" / "nvim" / "init.lua" in specs
assert not any(target.relative_to(home).parts[0] == ".git" for target in specs)
def test_sync_modules_resolves_relative_source_independent_of_cwd(tmp_path, monkeypatch):
module_src = tmp_path / "module-src"
module_src.mkdir()
subprocess.run(["git", "init", "-b", "main", str(module_src)], check=True)
(module_src / "init.lua").write_text("-- module")
subprocess.run(["git", "-C", str(module_src), "add", "."], check=True)
subprocess.run(
[
"git",
"-C",
str(module_src),
"-c",
"user.name=Flow Test",
"-c",
"user.email=flow-test@example.com",
"commit",
"-m",
"init module",
],
check=True,
)
dotfiles = tmp_path / "dotfiles"
package_root = dotfiles / "_shared" / "nvim"
module_mount = package_root / ".config" / "nvim"
module_mount.mkdir(parents=True)
relative_source = Path("../../../../../module-src")
(module_mount / "_module.yaml").write_text(
f"source: {relative_source}\n"
"ref:\n"
" branch: main\n"
)
unrelated_cwd = tmp_path / "unrelated-cwd"
unrelated_cwd.mkdir()
monkeypatch.chdir(unrelated_cwd)
monkeypatch.setattr("flow.services.dotfiles.DOTFILES_DIR", dotfiles)
monkeypatch.setattr("flow.services.dotfiles.MODULES_DIR", tmp_path / "modules")
_sync_modules(_ctx(), verbose=False)
resolved = _resolved_package_source(_ctx(), "_shared/nvim", package_root)
assert (resolved / "init.lua").exists()
def test_module_mount_inherits_directory_path(tmp_path, monkeypatch):
module_src = tmp_path / "module-src"
module_src.mkdir()
subprocess.run(["git", "init", "-b", "main", str(module_src)], check=True)
(module_src / "init.lua").write_text("-- module")
(module_src / "lua").mkdir()
(module_src / "lua" / "config.lua").write_text("-- module")
subprocess.run(["git", "-C", str(module_src), "add", "."], check=True)
subprocess.run(
[
"git",
"-C",
str(module_src),
"-c",
"user.name=Flow Test",
"-c",
"user.email=flow-test@example.com",
"commit",
"-m",
"init module",
],
check=True,
)
dotfiles = tmp_path / "dotfiles"
package_root = dotfiles / "_shared" / "nvim"
module_mount = package_root / ".config" / "nvim"
module_mount.mkdir(parents=True)
(module_mount / "_module.yaml").write_text(
f"source: {module_src}\n"
"ref:\n"
" branch: main\n"
)
monkeypatch.setattr("flow.services.dotfiles.DOTFILES_DIR", dotfiles)
monkeypatch.setattr("flow.services.dotfiles.MODULES_DIR", tmp_path / "modules")
_sync_modules(_ctx(), verbose=False)
home = tmp_path / "home"
home.mkdir()
specs = _collect_home_specs(_ctx(), dotfiles, home, None, set(), None)
assert home / ".config" / "nvim" / "init.lua" in specs
assert home / ".config" / "nvim" / "lua" / "config.lua" in specs
assert home / "init.lua" not in specs
assert home / "lua" / "config.lua" not in specs
def test_pull_requires_ack_only_on_real_updates():
assert _pull_requires_ack("Already up to date.\n", "") is False
assert _pull_requires_ack("Updating 123..456\n", "") is True
def test_run_relink_uses_transactional_link_path(monkeypatch):
calls = []
monkeypatch.setattr("flow.services.dotfiles._ensure_flow_dir", lambda _ctx: None)
monkeypatch.setattr(
"flow.services.dotfiles.run_unlink",
lambda _ctx, _args: (_ for _ in ()).throw(AssertionError("run_unlink must not be used")),
)
def _fake_run_link(_ctx, args):
calls.append((args.packages, args.profile, args.copy, args.force, args.dry_run))
monkeypatch.setattr("flow.services.dotfiles.run_link", _fake_run_link)
run_relink(_ctx(), Namespace(packages=["git"], profile="work"))
assert calls == [(["git"], "work", False, False, False)]
def test_sync_to_desired_dry_run_force_is_read_only(tmp_path, monkeypatch):
state_file = tmp_path / "linked.json"
monkeypatch.setattr("flow.services.dotfiles.LINKED_STATE", state_file)
monkeypatch.setattr("flow.services.dotfiles._is_in_home", lambda _path, _home: True)
source = tmp_path / "source" / ".zshrc"
source.parent.mkdir(parents=True)
source.write_text("# new")
target = tmp_path / "home" / ".zshrc"
target.parent.mkdir(parents=True)
target.write_text("# old")
desired = {
target: LinkSpec(
source=source,
target=target,
package="_shared/zsh",
)
}
_sync_to_desired(
_ctx(),
desired,
force=True,
dry_run=True,
copy=False,
)
assert target.exists()
assert not target.is_symlink()
assert target.read_text() == "# old"
assert not state_file.exists()
def test_sync_to_desired_force_fails_before_any_writes_on_directory_conflict(tmp_path, monkeypatch):
state_file = tmp_path / "linked.json"
monkeypatch.setattr("flow.services.dotfiles.LINKED_STATE", state_file)
monkeypatch.setattr("flow.services.dotfiles._is_in_home", lambda _path, _home: True)
source_root = tmp_path / "source"
source_root.mkdir()
source_ok = source_root / "ok"
source_ok.write_text("ok")
source_conflict = source_root / "conflict"
source_conflict.write_text("conflict")
home = tmp_path / "home"
home.mkdir()
target_ok = home / "a-file"
target_conflict = home / "z-dir"
target_conflict.mkdir()
desired = {
target_ok: LinkSpec(source=source_ok, target=target_ok, package="_shared/test"),
target_conflict: LinkSpec(source=source_conflict, target=target_conflict, package="_shared/test"),
}
with pytest.raises(RuntimeError, match="cannot be overwritten"):
_sync_to_desired(
_ctx(),
desired,
force=True,
dry_run=False,
copy=False,
)
assert not target_ok.exists()
assert not target_ok.is_symlink()
assert not state_file.exists()
def test_undo_restores_previous_file_and_link_state(tmp_path, monkeypatch):
state_file = tmp_path / "linked.json"
monkeypatch.setattr("flow.services.dotfiles.LINKED_STATE", state_file)
monkeypatch.setattr("flow.services.dotfiles.LINK_BACKUP_DIR", tmp_path / "link-backups")
monkeypatch.setattr("flow.services.dotfiles._is_in_home", lambda _path, _home: True)
source = tmp_path / "source" / ".zshrc"
source.parent.mkdir(parents=True)
source.write_text("# managed")
target = tmp_path / "home" / ".zshrc"
target.parent.mkdir(parents=True)
target.write_text("# previous")
desired = {
target: LinkSpec(
source=source,
target=target,
package="_shared/zsh",
)
}
_sync_to_desired(
_ctx(),
desired,
force=True,
dry_run=False,
copy=False,
)
assert target.is_symlink()
state_after_link = _load_state()
assert "last_transaction" in state_after_link
tx = state_after_link["last_transaction"]
assert isinstance(tx, dict)
assert tx.get("targets")
run_undo(_ctx(), Namespace())
assert target.exists()
assert not target.is_symlink()
assert target.read_text() == "# previous"
state_after_undo = _load_state()
assert state_after_undo.get("links") == {}
assert "last_transaction" not in state_after_undo
def test_sync_to_desired_persists_incomplete_transaction_on_failure(tmp_path, monkeypatch):
state_file = tmp_path / "linked.json"
monkeypatch.setattr("flow.services.dotfiles.LINKED_STATE", state_file)
monkeypatch.setattr("flow.services.dotfiles.LINK_BACKUP_DIR", tmp_path / "link-backups")
monkeypatch.setattr("flow.services.dotfiles._is_in_home", lambda _path, _home: True)
source = tmp_path / "source"
source.mkdir()
src_a = source / "a"
src_b = source / "b"
src_a.write_text("a")
src_b.write_text("b")
home = tmp_path / "home"
home.mkdir()
target_a = home / "a"
target_b = home / "b"
target_a.write_text("old-a")
desired = {
target_a: LinkSpec(source=src_a, target=target_a, package="_shared/test"),
target_b: LinkSpec(source=src_b, target=target_b, package="_shared/test"),
}
call_count = {"n": 0}
def _failing_apply(spec, *, copy, dry_run): # noqa: ARG001
call_count["n"] += 1
if call_count["n"] == 2:
raise RuntimeError("simulated failure")
spec.target.parent.mkdir(parents=True, exist_ok=True)
spec.target.symlink_to(spec.source)
return True
monkeypatch.setattr("flow.services.dotfiles._apply_link_spec", _failing_apply)
with pytest.raises(RuntimeError, match="simulated failure"):
_sync_to_desired(
_ctx(),
desired,
force=True,
dry_run=False,
copy=False,
)
state_after_failure = _load_state()
tx = state_after_failure.get("last_transaction")
assert isinstance(tx, dict)
assert tx.get("incomplete") is True
assert target_a.is_symlink()
run_undo(_ctx(), Namespace())
assert target_a.exists()
assert not target_a.is_symlink()
assert target_a.read_text() == "old-a"
assert not target_b.exists()
assert _load_state().get("links") == {}
def test_sync_to_desired_requires_force_to_remove_modified_managed_target(tmp_path, monkeypatch):
state_file = tmp_path / "linked.json"
monkeypatch.setattr("flow.services.dotfiles.LINKED_STATE", state_file)
monkeypatch.setattr("flow.services.dotfiles._is_in_home", lambda _path, _home: True)
source = tmp_path / "source" / ".old"
source.parent.mkdir(parents=True)
source.write_text("managed")
target = tmp_path / "home" / ".zshrc"
target.parent.mkdir(parents=True)
target.write_text("user-edited")
_save_link_specs_to_state(
{
target: LinkSpec(
source=source,
target=target,
package="_shared/zsh",
)
}
)
with pytest.raises(RuntimeError, match="Use --force"):
_sync_to_desired(
_ctx(),
{},
force=False,
dry_run=False,
copy=False,
)
assert target.exists()
assert not target.is_symlink()
assert target.read_text() == "user-edited"
assert target in _load_link_specs_from_state()
def test_sync_to_desired_requires_force_to_replace_modified_managed_target(tmp_path, monkeypatch):
state_file = tmp_path / "linked.json"
monkeypatch.setattr("flow.services.dotfiles.LINKED_STATE", state_file)
monkeypatch.setattr("flow.services.dotfiles._is_in_home", lambda _path, _home: True)
old_source = tmp_path / "source" / ".old"
new_source = tmp_path / "source" / ".new"
old_source.parent.mkdir(parents=True)
old_source.write_text("managed-old")
new_source.write_text("managed-new")
target = tmp_path / "home" / ".gitconfig"
target.parent.mkdir(parents=True)
target.write_text("manual-file")
_save_link_specs_to_state(
{
target: LinkSpec(
source=old_source,
target=target,
package="_shared/git",
)
}
)
desired = {
target: LinkSpec(
source=new_source,
target=target,
package="_shared/git",
)
}
with pytest.raises(RuntimeError, match="Use --force"):
_sync_to_desired(
_ctx(),
desired,
force=False,
dry_run=False,
copy=False,
)
assert target.exists()
assert not target.is_symlink()
assert target.read_text() == "manual-file"
assert _load_link_specs_from_state()[target].source == old_source
def test_run_sudo_errors_when_binary_missing(monkeypatch):
monkeypatch.setattr("flow.services.dotfiles.shutil.which", lambda _name: None)
with pytest.raises(RuntimeError, match="sudo is required"):
_run_sudo(["true"], dry_run=False)

View File

@@ -1,45 +0,0 @@
"""Tests for flow.commands.package."""
from types import SimpleNamespace
from flow.commands import package
def test_load_installed_returns_empty_on_malformed_json(tmp_path, monkeypatch):
state_file = tmp_path / "installed.json"
state_file.write_text("{broken", encoding="utf-8")
monkeypatch.setattr(package, "INSTALLED_STATE", state_file)
assert package._load_installed() == {}
def test_load_installed_returns_empty_on_non_mapping_json(tmp_path, monkeypatch):
state_file = tmp_path / "installed.json"
state_file.write_text('["neovim"]', encoding="utf-8")
monkeypatch.setattr(package, "INSTALLED_STATE", state_file)
assert package._load_installed() == {}
class _ConsoleCapture:
def __init__(self):
self.info_messages = []
def info(self, message):
self.info_messages.append(message)
def table(self, _headers, _rows):
raise AssertionError("table() should not be called when installed state is malformed")
def test_run_list_handles_malformed_installed_state(tmp_path, monkeypatch):
state_file = tmp_path / "installed.json"
state_file.write_text("{oops", encoding="utf-8")
monkeypatch.setattr(package, "INSTALLED_STATE", state_file)
monkeypatch.setattr(package, "_get_definitions", lambda _ctx: {})
ctx = SimpleNamespace(console=_ConsoleCapture())
package.run_list(ctx, SimpleNamespace(all=False))
assert ctx.console.info_messages == ["No packages installed."]

View File

@@ -1,77 +0,0 @@
"""Tests for flow.core.paths."""
from pathlib import Path
from flow.core.paths import (
CONFIG_DIR,
CONFIG_FILE,
DATA_DIR,
DOTFILES_DIR,
INSTALLED_STATE,
LINKED_STATE,
MANIFEST_FILE,
MODULES_DIR,
PACKAGES_DIR,
SCRATCH_DIR,
STATE_DIR,
ensure_dirs,
)
def test_config_dir_under_home():
assert ".config/flow" in str(CONFIG_DIR)
def test_data_dir_under_home():
assert ".local/share/flow" in str(DATA_DIR)
def test_state_dir_under_home():
assert ".local/state/flow" in str(STATE_DIR)
def test_manifest_file_in_config_dir():
assert MANIFEST_FILE == CONFIG_DIR / "manifest.yaml"
def test_config_file_in_config_dir():
assert CONFIG_FILE == CONFIG_DIR / "config.yaml"
def test_dotfiles_dir():
assert DOTFILES_DIR == DATA_DIR / "dotfiles"
def test_packages_dir():
assert PACKAGES_DIR == DATA_DIR / "packages"
def test_modules_dir():
assert MODULES_DIR == DATA_DIR / "modules"
def test_scratch_dir():
assert SCRATCH_DIR == DATA_DIR / "scratch"
def test_state_files():
assert LINKED_STATE == STATE_DIR / "linked.json"
assert INSTALLED_STATE == STATE_DIR / "installed.json"
def test_ensure_dirs(tmp_path, monkeypatch):
monkeypatch.setattr("flow.core.paths.CONFIG_DIR", tmp_path / "config")
monkeypatch.setattr("flow.core.paths.DATA_DIR", tmp_path / "data")
monkeypatch.setattr("flow.core.paths.STATE_DIR", tmp_path / "state")
monkeypatch.setattr("flow.core.paths.MODULES_DIR", tmp_path / "data" / "modules")
monkeypatch.setattr("flow.core.paths.PACKAGES_DIR", tmp_path / "data" / "packages")
monkeypatch.setattr("flow.core.paths.SCRATCH_DIR", tmp_path / "data" / "scratch")
ensure_dirs()
assert (tmp_path / "config").is_dir()
assert (tmp_path / "data").is_dir()
assert (tmp_path / "state").is_dir()
assert (tmp_path / "data" / "modules").is_dir()
assert (tmp_path / "data" / "packages").is_dir()
assert (tmp_path / "data" / "scratch").is_dir()

View File

@@ -1,29 +0,0 @@
"""Tests for flow.core.platform."""
import platform as _platform
import pytest
from flow.core.platform import PlatformInfo, detect_platform
def test_detect_platform_returns_platforminfo():
info = detect_platform()
assert isinstance(info, PlatformInfo)
assert info.os in ("linux", "macos")
assert info.arch in ("x64", "arm64")
assert info.platform == f"{info.os}-{info.arch}"
def test_detect_platform_unsupported_os(monkeypatch):
monkeypatch.setattr(_platform, "system", lambda: "FreeBSD")
with pytest.raises(RuntimeError, match="Unsupported operating system"):
detect_platform()
def test_detect_platform_unsupported_arch(monkeypatch):
monkeypatch.setattr(_platform, "machine", lambda: "mips")
with pytest.raises(RuntimeError, match="Unsupported architecture"):
detect_platform()

View File

@@ -1,81 +0,0 @@
"""Tests for self-hosted merged YAML config loading."""
from pathlib import Path
import pytest
from flow.core import paths as paths_module
from flow.core.config import load_config, load_manifest
@pytest.fixture
def mock_roots(tmp_path, monkeypatch):
local_root = tmp_path / "local-flow"
dotfiles_root = tmp_path / "dotfiles" / "_shared" / "flow" / ".config" / "flow"
local_root.mkdir(parents=True)
dotfiles_root.mkdir(parents=True)
monkeypatch.setattr(paths_module, "CONFIG_DIR", local_root)
monkeypatch.setattr(paths_module, "DOTFILES_FLOW_CONFIG", dotfiles_root)
return {
"local": local_root,
"dotfiles": dotfiles_root,
}
def test_load_manifest_priority_dotfiles_first(mock_roots):
(mock_roots["local"] / "profiles.yaml").write_text("profiles:\n local: {os: linux}\n")
(mock_roots["dotfiles"] / "profiles.yaml").write_text("profiles:\n dotfiles: {os: macos}\n")
manifest = load_manifest()
assert "dotfiles" in manifest.get("profiles", {})
assert "local" not in manifest.get("profiles", {})
def test_load_manifest_fallback_to_local(mock_roots):
(mock_roots["local"] / "profiles.yaml").write_text("profiles:\n local: {os: linux}\n")
# Remove dotfiles yaml file so local takes over.
dot_yaml = mock_roots["dotfiles"] / "profiles.yaml"
if dot_yaml.exists():
dot_yaml.unlink()
manifest = load_manifest()
assert "local" in manifest.get("profiles", {})
def test_load_manifest_empty_when_none_exist(mock_roots):
manifest = load_manifest()
assert manifest == {}
def test_load_config_from_merged_yaml(mock_roots):
(mock_roots["dotfiles"] / "config.yaml").write_text(
"repository:\n"
" dotfiles-url: git@github.com:user/dotfiles.git\n"
"defaults:\n"
" container-registry: registry.example.com\n"
)
cfg = load_config()
assert cfg.dotfiles_url == "git@github.com:user/dotfiles.git"
assert cfg.container_registry == "registry.example.com"
def test_yaml_merge_is_alphabetical_last_writer_wins(mock_roots):
(mock_roots["local"] / "10-a.yaml").write_text("profiles:\n a: {os: linux}\n")
(mock_roots["local"] / "20-b.yaml").write_text("profiles:\n b: {os: linux}\n")
manifest = load_manifest(mock_roots["local"])
assert "b" in manifest.get("profiles", {})
assert "a" not in manifest.get("profiles", {})
def test_explicit_file_path_loads_single_yaml(tmp_path):
one_file = tmp_path / "single.yaml"
one_file.write_text("profiles:\n only: {os: linux}\n")
manifest = load_manifest(one_file)
assert "only" in manifest["profiles"]

View File

@@ -1,310 +0,0 @@
"""Tests for flow.core.stow — GNU Stow-style tree folding/unfolding."""
import os
from pathlib import Path
import pytest
from flow.core.stow import LinkOperation, LinkTree, TreeFolder
@pytest.fixture
def temp_home(tmp_path):
"""Create a temporary home directory."""
home = tmp_path / "home"
home.mkdir()
return home
@pytest.fixture
def temp_dotfiles(tmp_path):
"""Create a temporary dotfiles repository."""
dotfiles = tmp_path / "dotfiles"
dotfiles.mkdir()
return dotfiles
def test_linktree_add_remove():
"""Test basic LinkTree operations."""
tree = LinkTree()
source = Path("/dotfiles/zsh/.zshrc")
target = Path("/home/user/.zshrc")
tree.add_link(target, source, "zsh", is_dir_link=False)
assert target in tree.links
assert tree.links[target] == source
assert tree.packages[target] == "zsh"
assert not tree.is_directory_link(target)
tree.remove_link(target)
assert target not in tree.links
assert target not in tree.packages
def test_linktree_directory_link():
"""Test directory link tracking."""
tree = LinkTree()
source = Path("/dotfiles/nvim/.config/nvim")
target = Path("/home/user/.config/nvim")
tree.add_link(target, source, "nvim", is_dir_link=True)
assert tree.is_directory_link(target)
def test_linktree_can_fold_single_package():
"""Test can_fold with single package."""
tree = LinkTree()
target_dir = Path("/home/user/.config/nvim")
# Add files from same package
tree.add_link(target_dir / "init.lua", Path("/dotfiles/nvim/.config/nvim/init.lua"), "nvim")
tree.add_link(target_dir / "lua" / "config.lua", Path("/dotfiles/nvim/.config/nvim/lua/config.lua"), "nvim")
# Should be able to fold since all files are from same package
assert tree.can_fold(target_dir, "nvim")
def test_linktree_can_fold_multiple_packages():
"""Test can_fold with multiple packages."""
tree = LinkTree()
target_dir = Path("/home/user/.config")
# Add files from different packages
tree.add_link(target_dir / "nvim", Path("/dotfiles/nvim/.config/nvim"), "nvim", is_dir_link=True)
tree.add_link(target_dir / "tmux", Path("/dotfiles/tmux/.config/tmux"), "tmux", is_dir_link=True)
# Cannot fold .config since it has files from multiple packages
assert not tree.can_fold(target_dir, "nvim")
def test_linktree_from_state_old_format_rejected():
"""Old state format should be rejected (no backward compatibility)."""
state = {
"links": {
"zsh": {
"/home/user/.zshrc": "/dotfiles/zsh/.zshrc",
"/home/user/.zshenv": "/dotfiles/zsh/.zshenv",
}
}
}
with pytest.raises(RuntimeError, match="Unsupported linked state format"):
LinkTree.from_state(state)
def test_linktree_from_state_new_format():
"""Test loading from new state format (with is_directory_link)."""
state = {
"version": 2,
"links": {
"nvim": {
"/home/user/.config/nvim": {
"source": "/dotfiles/nvim/.config/nvim",
"is_directory_link": True,
}
}
}
}
tree = LinkTree.from_state(state)
target = Path("/home/user/.config/nvim")
assert target in tree.links
assert tree.is_directory_link(target)
assert tree.packages[target] == "nvim"
def test_linktree_to_state():
"""Test converting LinkTree to state format."""
tree = LinkTree()
tree.add_link(
Path("/home/user/.config/nvim"),
Path("/dotfiles/nvim/.config/nvim"),
"nvim",
is_dir_link=True,
)
tree.add_link(
Path("/home/user/.zshrc"),
Path("/dotfiles/zsh/.zshrc"),
"zsh",
is_dir_link=False,
)
state = tree.to_state()
assert state["version"] == 2
assert "nvim" in state["links"]
assert "zsh" in state["links"]
nvim_link = state["links"]["nvim"]["/home/user/.config/nvim"]
assert nvim_link["is_directory_link"] is True
zsh_link = state["links"]["zsh"]["/home/user/.zshrc"]
assert zsh_link["is_directory_link"] is False
def test_treefolder_plan_link_simple(temp_home, temp_dotfiles):
"""Test planning a simple file link."""
tree = LinkTree()
folder = TreeFolder(tree)
source = temp_dotfiles / "zsh" / ".zshrc"
target = temp_home / ".zshrc"
# Create source file
source.parent.mkdir(parents=True)
source.write_text("# zshrc")
ops = folder.plan_link(source, target, "zsh")
assert len(ops) == 1
assert ops[0].type == "create_symlink"
assert ops[0].source == source
assert ops[0].target == target
assert ops[0].package == "zsh"
def test_treefolder_detect_conflicts_existing_file(temp_home, temp_dotfiles):
"""Test conflict detection for existing files."""
tree = LinkTree()
folder = TreeFolder(tree)
source = temp_dotfiles / "zsh" / ".zshrc"
target = temp_home / ".zshrc"
# Create existing file
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text("# existing")
source.parent.mkdir(parents=True)
source.write_text("# zshrc")
ops = folder.plan_link(source, target, "zsh")
conflicts = folder.detect_conflicts(ops)
assert len(conflicts) == 1
assert "already exists" in conflicts[0]
def test_treefolder_detect_conflicts_different_package(temp_home, temp_dotfiles):
"""Test conflict detection for links from different packages."""
tree = LinkTree()
target = temp_home / ".bashrc"
# Add existing link from different package
tree.add_link(target, Path("/dotfiles/bash/.bashrc"), "bash")
folder = TreeFolder(tree)
source = temp_dotfiles / "zsh" / ".bashrc"
source.parent.mkdir(parents=True)
source.write_text("# bashrc")
ops = folder.plan_link(source, target, "zsh")
conflicts = folder.detect_conflicts(ops)
assert len(conflicts) == 1
assert "bash" in conflicts[0]
def test_treefolder_execute_operations_dry_run(temp_home, temp_dotfiles, capsys):
"""Test dry-run mode."""
tree = LinkTree()
folder = TreeFolder(tree)
source = temp_dotfiles / "zsh" / ".zshrc"
target = temp_home / ".zshrc"
source.parent.mkdir(parents=True)
source.write_text("# zshrc")
ops = folder.plan_link(source, target, "zsh")
folder.execute_operations(ops, dry_run=True)
# Check output
captured = capsys.readouterr()
assert "FILE LINK" in captured.out
assert str(target) in captured.out
# No actual symlink created
assert not target.exists()
def test_treefolder_execute_operations_create_symlink(temp_home, temp_dotfiles):
"""Test creating actual symlinks."""
tree = LinkTree()
folder = TreeFolder(tree)
source = temp_dotfiles / "zsh" / ".zshrc"
target = temp_home / ".zshrc"
source.parent.mkdir(parents=True)
source.write_text("# zshrc")
ops = folder.plan_link(source, target, "zsh")
folder.execute_operations(ops, dry_run=False)
# Check symlink created
assert target.is_symlink()
assert target.resolve() == source.resolve()
# Check tree updated
assert target in folder.tree.links
def test_treefolder_plan_unlink(temp_home, temp_dotfiles):
"""Test planning unlink operations."""
tree = LinkTree()
target = temp_home / ".zshrc"
source = temp_dotfiles / "zsh" / ".zshrc"
tree.add_link(target, source, "zsh")
folder = TreeFolder(tree)
ops = folder.plan_unlink(target, "zsh")
assert len(ops) == 1
assert ops[0].type == "remove"
assert ops[0].target == target
def test_treefolder_plan_unlink_directory_link(temp_home, temp_dotfiles):
"""Test planning unlink for directory symlink."""
tree = LinkTree()
target = temp_home / ".config" / "nvim"
source = temp_dotfiles / "nvim" / ".config" / "nvim"
tree.add_link(target, source, "nvim", is_dir_link=True)
folder = TreeFolder(tree)
ops = folder.plan_unlink(target, "nvim")
# Should remove the directory link
assert len(ops) >= 1
assert ops[-1].type == "remove"
assert ops[-1].is_directory_link
def test_linkoperation_str():
"""Test LinkOperation string representation."""
op1 = LinkOperation(
type="create_symlink",
source=Path("/src"),
target=Path("/dst"),
package="test",
is_directory_link=False,
)
assert "FILE LINK" in str(op1)
op2 = LinkOperation(
type="create_symlink",
source=Path("/src"),
target=Path("/dst"),
package="test",
is_directory_link=True,
)
assert "DIR LINK" in str(op2)
op3 = LinkOperation(
type="unfold",
source=Path("/src"),
target=Path("/dst"),
package="test",
)
assert "UNFOLD" in str(op3)

View File

@@ -1,88 +0,0 @@
"""Tests for flow.commands.sync."""
from types import SimpleNamespace
import pytest
from flow.commands import sync
def _git_clean_repo(_repo, *cmd, capture=True):
_ = capture
if cmd == ("rev-parse", "--abbrev-ref", "HEAD"):
return SimpleNamespace(returncode=0, stdout="main\n")
if cmd == ("diff", "--quiet"):
return SimpleNamespace(returncode=0, stdout="")
if cmd == ("diff", "--cached", "--quiet"):
return SimpleNamespace(returncode=0, stdout="")
if cmd == ("ls-files", "--others", "--exclude-standard"):
return SimpleNamespace(returncode=0, stdout="")
if cmd == ("rev-parse", "--abbrev-ref", "main@{u}"):
return SimpleNamespace(returncode=0, stdout="origin/main\n")
if cmd == ("rev-list", "--oneline", "main@{u}..main"):
return SimpleNamespace(returncode=0, stdout="")
if cmd == ("for-each-ref", "--format=%(refname:short)", "refs/heads"):
return SimpleNamespace(returncode=0, stdout="main\n")
raise AssertionError(f"Unexpected git command: {cmd!r}")
@pytest.mark.parametrize("git_style", ["dir", "file"])
def test_check_repo_detects_git_dir_and_worktree_file(tmp_path, monkeypatch, git_style):
repo = tmp_path / "repo"
repo.mkdir()
if git_style == "dir":
(repo / ".git").mkdir()
else:
(repo / ".git").write_text("gitdir: /tmp/worktrees/repo\n", encoding="utf-8")
monkeypatch.setattr(sync, "_git", _git_clean_repo)
name, issues = sync._check_repo(str(repo), do_fetch=False)
assert name == "repo"
assert issues == []
class _ConsoleCapture:
def __init__(self):
self.info_messages = []
self.error_messages = []
self.success_messages = []
def info(self, message):
self.info_messages.append(message)
def error(self, message):
self.error_messages.append(message)
def success(self, message):
self.success_messages.append(message)
def test_run_fetch_includes_worktree_style_repo(tmp_path, monkeypatch):
projects = tmp_path / "projects"
projects.mkdir()
worktree_repo = projects / "worktree"
worktree_repo.mkdir()
(worktree_repo / ".git").write_text("gitdir: /tmp/worktrees/worktree\n", encoding="utf-8")
(projects / "non_git").mkdir()
calls = []
def _git_fetch(repo, *cmd, capture=True):
_ = capture
calls.append((repo, cmd))
return SimpleNamespace(returncode=0, stdout="")
monkeypatch.setattr(sync, "_git", _git_fetch)
console = _ConsoleCapture()
ctx = SimpleNamespace(config=SimpleNamespace(projects_dir=str(projects)), console=console)
sync.run_fetch(ctx, SimpleNamespace())
assert calls == [(str(worktree_repo), ("fetch", "--all", "--quiet"))]
assert console.success_messages == ["All remotes fetched."]

View File

@@ -1,57 +0,0 @@
"""Tests for flow.core.variables."""
from flow.core.variables import substitute, substitute_template
def test_substitute_dollar():
result = substitute("hello $NAME", {"NAME": "world"})
assert result == "hello world"
def test_substitute_braces():
result = substitute("hello ${NAME}", {"NAME": "world"})
assert result == "hello world"
def test_substitute_multiple():
result = substitute("$A and ${B}", {"A": "1", "B": "2"})
assert result == "1 and 2"
def test_substitute_home():
result = substitute("dir=$HOME", {})
assert "$HOME" not in result
def test_substitute_user():
import os
result = substitute("u=$USER", {})
assert result == f"u={os.getenv('USER', '')}"
def test_substitute_non_string():
assert substitute(123, {}) == 123
def test_substitute_template_basic():
result = substitute_template("nvim-{{os}}-{{arch}}.tar.gz", {"os": "linux", "arch": "x86_64"})
assert result == "nvim-linux-x86_64.tar.gz"
def test_substitute_template_missing_key():
result = substitute_template("{{missing}}", {})
assert result == "{{missing}}"
def test_substitute_template_non_string():
assert substitute_template(42, {}) == 42
def test_substitute_template_no_placeholders():
result = substitute_template("plain text", {"os": "linux"})
assert result == "plain text"
def test_substitute_template_env_namespace():
result = substitute_template("{{ env.USER_EMAIL }}", {"env": {"USER_EMAIL": "you@example.com"}})
assert result == "you@example.com"