diff --git a/src/flow/core/config.py b/src/flow/core/config.py index a7f3d96..6e454c7 100644 --- a/src/flow/core/config.py +++ b/src/flow/core/config.py @@ -1,110 +1,47 @@ """Configuration loading (merged YAML) and FlowContext.""" +from __future__ import annotations + from dataclasses import dataclass, field from pathlib import Path -from typing import Any, Dict, List, Optional +from typing import Any, Optional import yaml -from flow.core import paths -from flow.core.console import ConsoleLogger +from flow.core.console import Console from flow.core.platform import PlatformInfo -from flow.core.system import SystemRuntime +from flow.core.runtime import SystemRuntime -@dataclass +@dataclass(frozen=True) class TargetConfig: namespace: str platform: str - ssh_host: str - ssh_identity: Optional[str] = None + host: str + identity: Optional[str] = None @dataclass class AppConfig: dotfiles_url: str = "" dotfiles_branch: str = "main" - dotfiles_pull_before_edit: bool = True projects_dir: str = "~/projects" container_registry: str = "registry.tomastm.com" container_tag: str = "latest" tmux_session: str = "default" - targets: List[TargetConfig] = field(default_factory=list) + targets: list[TargetConfig] = field(default_factory=list) -def _get_value(mapping: Any, *keys: str, default: Any = None) -> Any: - if not isinstance(mapping, dict): - return default - for key in keys: - if key in mapping: - return mapping[key] - return default +@dataclass +class FlowContext: + config: AppConfig + manifest: dict[str, Any] + platform: PlatformInfo + console: Console + runtime: SystemRuntime = field(default_factory=SystemRuntime) -def _as_bool(value: Any, default: bool) -> bool: - if isinstance(value, bool): - return value - if isinstance(value, (int, float)): - return bool(value) - if isinstance(value, str): - normalized = value.strip().lower() - if normalized in {"1", "true", "yes", "y", "on"}: - return True - if normalized in {"0", "false", "no", "n", "off"}: - return False - return default - - -def _parse_target_config(key: str, value: str) -> Optional[TargetConfig]: - """Parse a target line from config-like syntax. - - Supported formats: - 1) namespace = platform ssh_host [ssh_identity] - 2) namespace@platform = ssh_host [ssh_identity] - """ - parts = value.split() - if not parts: - return None - - if "@" in key: - namespace, platform = key.split("@", 1) - ssh_host = parts[0] - ssh_identity = parts[1] if len(parts) > 1 else None - if not namespace or not platform: - return None - return TargetConfig( - namespace=namespace, - platform=platform, - ssh_host=ssh_host, - ssh_identity=ssh_identity, - ) - - if len(parts) < 2: - return None - - return TargetConfig( - namespace=key, - platform=parts[0], - ssh_host=parts[1], - ssh_identity=parts[2] if len(parts) > 2 else None, - ) - - -def _list_yaml_files(directory: Path) -> List[Path]: - if not directory.exists() or not directory.is_dir(): - return [] - - files = [] - for child in directory.iterdir(): - if not child.is_file(): - continue - if child.suffix.lower() in {".yaml", ".yml"}: - files.append(child) - - return sorted(files, key=lambda p: p.name) - - -def _load_yaml_file(path: Path) -> Dict[str, Any]: +def _load_yaml_file(path: Path) -> dict[str, Any]: try: with open(path, "r", encoding="utf-8") as handle: data = yaml.safe_load(handle) @@ -120,201 +57,95 @@ def _load_yaml_file(path: Path) -> Dict[str, Any]: return data -def _load_merged_yaml(directory: Path) -> Dict[str, Any]: - merged: Dict[str, Any] = {} - for file_path in _list_yaml_files(directory): - merged.update(_load_yaml_file(file_path)) - return merged +def _parse_targets(raw: Any) -> list[TargetConfig]: + if not isinstance(raw, dict): + return [] + targets: list[TargetConfig] = [] + for key, value in raw.items(): + if "@" not in key: + continue + namespace, platform = key.split("@", 1) + if not namespace or not platform: + continue -def _resolve_default_yaml_root() -> Path: - # Priority 1: self-hosted config from linked dotfiles - if paths.DOTFILES_FLOW_CONFIG.exists() and _list_yaml_files(paths.DOTFILES_FLOW_CONFIG): - return paths.DOTFILES_FLOW_CONFIG - - # Priority 2: local config directory - return paths.CONFIG_DIR - - -def _load_yaml_source(path: Path) -> Dict[str, Any]: - if not path.exists(): - return {} - - if path.is_file(): - return _load_yaml_file(path) - - if path.is_dir(): - return _load_merged_yaml(path) - - return {} - - -def _parse_targets(raw_targets: Any) -> List[TargetConfig]: - targets: List[TargetConfig] = [] - - if isinstance(raw_targets, dict): - for key, value in raw_targets.items(): - if isinstance(value, str): - parsed = _parse_target_config(key, value) - if parsed is not None: - targets.append(parsed) + if isinstance(value, str): + targets.append(TargetConfig( + namespace=namespace, + platform=platform, + host=value, + )) + elif isinstance(value, dict): + host = value.get("host", "") + if not host: continue - - if not isinstance(value, dict): - continue - - namespace_from_key = key - platform_from_key = None - if "@" in key: - namespace_from_key, platform_from_key = key.split("@", 1) - - namespace = str( - _get_value( - value, - "namespace", - default=namespace_from_key, - ) - ) - platform = str( - _get_value( - value, - "platform", - default=platform_from_key, - ) - ) - ssh_host = _get_value(value, "ssh_host", "ssh-host", "host", default="") - ssh_identity = _get_value(value, "ssh_identity", "ssh-identity", "identity") - - if not namespace or not platform or not ssh_host: - continue - - targets.append( - TargetConfig( - namespace=namespace, - platform=platform, - ssh_host=str(ssh_host), - ssh_identity=str(ssh_identity) if ssh_identity else None, - ) - ) - - elif isinstance(raw_targets, list): - for item in raw_targets: - if not isinstance(item, dict): - continue - - namespace = _get_value(item, "namespace") - platform = _get_value(item, "platform") - ssh_host = _get_value(item, "ssh_host", "ssh-host", "host") - ssh_identity = _get_value(item, "ssh_identity", "ssh-identity", "identity") - - if not namespace or not platform or not ssh_host: - continue - - targets.append( - TargetConfig( - namespace=str(namespace), - platform=str(platform), - ssh_host=str(ssh_host), - ssh_identity=str(ssh_identity) if ssh_identity else None, - ) - ) + identity = value.get("identity") + targets.append(TargetConfig( + namespace=namespace, + platform=platform, + host=str(host), + identity=str(identity) if identity is not None else None, + )) return targets -def load_manifest(path: Optional[Path] = None) -> Dict[str, Any]: - """Load merged YAML manifest/config data. +def load_config(config_dir: Path) -> AppConfig: + """Load config.yaml from the given directory into AppConfig.""" + config_file = config_dir / "config.yaml" + if not config_file.exists(): + return AppConfig() - Default priority: - 1) ~/.local/share/flow/dotfiles/_shared/flow/.config/flow/*.y[a]ml - 2) ~/.config/flow/*.y[a]ml - """ - source = path if path is not None else _resolve_default_yaml_root() - assert source is not None - data = _load_yaml_source(source) - return data if isinstance(data, dict) else {} - - -def load_config(path: Optional[Path] = None) -> AppConfig: - """Load merged YAML config into AppConfig.""" - source = path if path is not None else _resolve_default_yaml_root() - assert source is not None - merged = _load_yaml_source(source) + data = _load_yaml_file(config_file) cfg = AppConfig() - if not isinstance(merged, dict): - return cfg - repository = merged.get("repository") if isinstance(merged.get("repository"), dict) else {} - paths_section = merged.get("paths") if isinstance(merged.get("paths"), dict) else {} - defaults = merged.get("defaults") if isinstance(merged.get("defaults"), dict) else {} + repository = data.get("repository") + if isinstance(repository, dict): + url = repository.get("url") + if url is not None: + cfg.dotfiles_url = str(url) + branch = repository.get("branch") + if branch is not None: + cfg.dotfiles_branch = str(branch) - cfg.dotfiles_url = str( - _get_value( - repository, - "dotfiles_url", - "dotfiles-url", - default=merged.get("dotfiles_url", cfg.dotfiles_url), - ) - ) - cfg.dotfiles_branch = str( - _get_value( - repository, - "dotfiles_branch", - "dotfiles-branch", - default=merged.get("dotfiles_branch", cfg.dotfiles_branch), - ) - ) - cfg.dotfiles_pull_before_edit = _as_bool( - _get_value( - repository, - "pull_before_edit", - "pull-before-edit", - default=merged.get("dotfiles_pull_before_edit", cfg.dotfiles_pull_before_edit), - ), - cfg.dotfiles_pull_before_edit, - ) - cfg.projects_dir = str( - _get_value( - paths_section, - "projects_dir", - "projects-dir", - default=merged.get("projects_dir", cfg.projects_dir), - ) - ) - cfg.container_registry = str( - _get_value( - defaults, - "container_registry", - "container-registry", - default=merged.get("container_registry", cfg.container_registry), - ) - ) - cfg.container_tag = str( - _get_value( - defaults, - "container_tag", - "container-tag", - default=merged.get("container_tag", cfg.container_tag), - ) - ) - cfg.tmux_session = str( - _get_value( - defaults, - "tmux_session", - "tmux-session", - default=merged.get("tmux_session", cfg.tmux_session), - ) - ) - cfg.targets = _parse_targets(merged.get("targets", {})) + paths_section = data.get("paths") + if isinstance(paths_section, dict): + projects = paths_section.get("projects") + if projects is not None: + cfg.projects_dir = str(projects) + + defaults = data.get("defaults") + if isinstance(defaults, dict): + registry = defaults.get("container-registry") + if registry is not None: + cfg.container_registry = str(registry) + tmux = defaults.get("tmux-session") + if tmux is not None: + cfg.tmux_session = str(tmux) + + raw_targets = data.get("targets") + if raw_targets is not None: + cfg.targets = _parse_targets(raw_targets) return cfg -@dataclass -class FlowContext: - config: AppConfig - manifest: Dict[str, Any] - platform: PlatformInfo - console: ConsoleLogger - runtime: SystemRuntime = field(default_factory=SystemRuntime) +def load_manifest(manifest_dir: Path) -> dict[str, Any]: + """Load manifest.yaml or merge all *.yaml files from the directory.""" + if not manifest_dir.exists(): + return {} + + manifest_file = manifest_dir / "manifest.yaml" + if manifest_file.exists(): + return _load_yaml_file(manifest_file) + + merged: dict[str, Any] = {} + yaml_files = sorted( + (f for f in manifest_dir.iterdir() if f.is_file() and f.suffix in {".yaml", ".yml"}), + key=lambda p: p.name, + ) + for path in yaml_files: + merged.update(_load_yaml_file(path)) + + return merged diff --git a/src/flow/core/console.py b/src/flow/core/console.py index 95d6cf9..4f7c55e 100644 --- a/src/flow/core/console.py +++ b/src/flow/core/console.py @@ -1,138 +1,63 @@ -"""Console output formatting — ported from dotfiles_v2/src/console_logger.py.""" +"""Console output formatting with TTY detection and color control.""" -import time -from typing import Optional +import os +import sys +from typing import Any, Optional -class ConsoleLogger: - # Color constants - BLUE = "\033[34m" - GREEN = "\033[32m" - YELLOW = "\033[33m" - RED = "\033[31m" - CYAN = "\033[36m" - GRAY = "\033[90m" - DARK_GRAY = "\033[2;37m" - BOLD = "\033[1m" - DIM = "\033[2m" - RESET = "\033[0m" - - # Box drawing characters - BOX_VERTICAL = "\u2502" - BOX_HORIZONTAL = "\u2500" - BOX_TOP_LEFT = "\u250c" - BOX_TOP_RIGHT = "\u2510" - BOX_BOTTOM_LEFT = "\u2514" - BOX_BOTTOM_RIGHT = "\u2518" - - def __init__(self): - self.step_counter = 0 - self.start_time = None - - def info(self, message: str): - print(f"{self.CYAN}[INFO]{self.RESET} {message}") - - def warn(self, message: str): - print(f"{self.YELLOW}[WARN]{self.RESET} {message}") - - def error(self, message: str): - print(f"{self.RED}[ERROR]{self.RESET} {message}") - - def success(self, message: str): - print(f"{self.GREEN}[SUCCESS]{self.RESET} {message}") - - def step_start(self, current: int, total: int, description: str): - print( - f"\n{self.BOLD}{self.BLUE}Step {current}/{total}:{self.RESET} " - f"{self.BOLD}{description}{self.RESET}" - ) - print(f"{self.BLUE}{self.BOX_HORIZONTAL * 4}{self.RESET} {self.GRAY}Starting...{self.RESET}") - self.start_time = time.time() - - def step_command(self, command: str): - print(f"{self.BLUE}{self.BOX_VERTICAL} {self.RESET}{self.GRAY}$ {command}{self.RESET}") - - def step_output(self, line: str): - if line.strip(): - print(f"{self.BLUE}{self.BOX_VERTICAL} {self.RESET}{self.DARK_GRAY} {line.rstrip()}{self.RESET}") - - def step_complete(self, message: str = "Completed successfully"): - elapsed = time.time() - self.start_time if self.start_time else 0 - print(f"{self.BLUE}{self.BOX_VERTICAL} {self.RESET}{self.GREEN}> {message} ({elapsed:.1f}s){self.RESET}") - - def step_skip(self, message: str): - elapsed = time.time() - self.start_time if self.start_time else 0 - print( - f"{self.BLUE}{self.BOX_VERTICAL} {self.RESET}" - f"{self.YELLOW}> Skipped: {message} ({elapsed:.1f}s){self.RESET}" - ) - - def step_fail(self, message: str): - elapsed = time.time() - self.start_time if self.start_time else 0 - print( - f"{self.BLUE}{self.BOX_VERTICAL} {self.RESET}" - f"{self.RED}> Failed: {message} ({elapsed:.1f}s){self.RESET}" - ) - - def section_header(self, title: str, subtitle: str = ""): - width = 70 - print(f"\n{self.BOLD}{self.BLUE}{'=' * width}{self.RESET}") - if subtitle: - print(f"{self.BOLD}{self.BLUE} {title.upper()} - {subtitle}{self.RESET}") +class Console: + def __init__(self, *, quiet: bool = False, color: Optional[bool] = None): + self.quiet = quiet + if color is None: + self._color = os.isatty(sys.stdout.fileno()) if hasattr(sys.stdout, "fileno") else False else: - print(f"{self.BOLD}{self.BLUE} {title.upper()}{self.RESET}") - print(f"{self.BOLD}{self.BLUE}{'=' * width}{self.RESET}") + self._color = color - def section_summary(self, title: str): - width = 70 - print(f"\n{self.BOLD}{self.GREEN}{'=' * width}{self.RESET}") - print(f"{self.BOLD}{self.GREEN} {title.upper()}{self.RESET}") - print(f"{self.BOLD}{self.GREEN}{'=' * width}{self.RESET}") + def _style(self, code: str, text: str) -> str: + if not self._color: + return text + return f"{code}{text}\033[0m" - def plan_header(self, title: str, count: int): - width = 70 - print(f"\n{self.BOLD}{self.CYAN}{'=' * width}{self.RESET}") - print(f"{self.BOLD}{self.CYAN} {title.upper()} ({count} actions){self.RESET}") - print(f"{self.BOLD}{self.CYAN}{'=' * width}{self.RESET}") + def info(self, msg: str) -> None: + if self.quiet: + return + tag = self._style("\033[36m", "[INFO]") + print(f"{tag} {msg}") - def plan_category(self, category: str): - print(f"\n{self.BOLD}{self.CYAN}{category.upper()}{self.RESET}") - print(f"{self.CYAN}{'-' * 20}{self.RESET}") + def warn(self, msg: str) -> None: + tag = self._style("\033[33m", "[WARN]") + print(f"{tag} {msg}") - def plan_item(self, number: int, description: str, os_filter: Optional[str] = None, critical: bool = False): - os_indicator = f" {self.GRAY}({os_filter}){self.RESET}" if os_filter else "" - error_indicator = f" {self.RED}(critical){self.RESET}" if critical else "" - print(f" {number:2d}. {description}{os_indicator}{error_indicator}") + def error(self, msg: str) -> None: + tag = self._style("\033[31m", "[ERROR]") + print(f"{tag} {msg}", file=sys.stderr) - def plan_legend(self): - print( - f"\n{self.GRAY}Legend: {self.RED}(critical){self.GRAY} = stops on failure, " - f"{self.GRAY}(os){self.GRAY} = OS-specific{self.RESET}" - ) + def success(self, msg: str) -> None: + tag = self._style("\033[32m", "[OK]") + print(f"{tag} {msg}") - def table(self, headers: list[str], rows: list[list[str]]): - """Print a formatted table.""" + def table(self, headers: list[str], rows: list[list[str]]) -> None: if not rows: return - - normalized_headers = [str(h) for h in headers] - normalized_rows = [[str(cell) for cell in row] for row in rows] - - # Calculate column widths - widths = [len(h) for h in normalized_headers] - for row in normalized_rows: + widths = [len(h) for h in headers] + for row in rows: for i, cell in enumerate(row): if i < len(widths): - widths[i] = max(widths[i], len(cell)) + widths[i] = max(widths[i], len(str(cell))) - # Header - header_line = " ".join( - f"{self.BOLD}{h:<{widths[i]}}{self.RESET}" for i, h in enumerate(normalized_headers) - ) - print(header_line) - print(self.GRAY + " ".join("-" * w for w in widths) + self.RESET) + header_line = " ".join(f"{h:<{widths[i]}}" for i, h in enumerate(headers)) + if self._color: + print(f"\033[1m{header_line}\033[0m") + else: + print(header_line) + print(" ".join("-" * w for w in widths)) + for row in rows: + print(" ".join(f"{str(cell):<{widths[i]}}" for i, cell in enumerate(row))) - # Rows - for row in normalized_rows: - line = " ".join(f"{cell:<{widths[i]}}" for i, cell in enumerate(row)) - print(line) + def print_plan(self, operations: list[Any], *, verb: str = "execute") -> None: + if not operations: + self.info(f"Nothing to {verb}.") + return + self.info(f"Plan ({len(operations)} operation(s)):") + for op in operations: + print(f" {op}") diff --git a/src/flow/core/errors.py b/src/flow/core/errors.py index 80bbbc9..6bb0100 100644 --- a/src/flow/core/errors.py +++ b/src/flow/core/errors.py @@ -1,6 +1,21 @@ -"""Project-wide exception types.""" +"""Project-wide error types.""" -class FlowError(RuntimeError): - """A user-facing operational error.""" +class FlowError(Exception): + """Base for all user-facing errors.""" + +class ConfigError(FlowError): + """Invalid config or manifest YAML.""" + + +class PlanConflict(FlowError): + """Conflicts detected during planning.""" + + def __init__(self, message: str, conflicts: list[str]): + super().__init__(message) + self.conflicts = conflicts + + +class ExecutionError(FlowError): + """A plan step failed during execution.""" diff --git a/src/flow/core/paths.py b/src/flow/core/paths.py index 25a0766..c84bd7b 100644 --- a/src/flow/core/paths.py +++ b/src/flow/core/paths.py @@ -14,25 +14,18 @@ CONFIG_DIR = _xdg("XDG_CONFIG_HOME", str(HOME / ".config")) / "flow" DATA_DIR = _xdg("XDG_DATA_HOME", str(HOME / ".local" / "share")) / "flow" STATE_DIR = _xdg("XDG_STATE_HOME", str(HOME / ".local" / "state")) / "flow" -MANIFEST_FILE = CONFIG_DIR / "manifest.yaml" -CONFIG_FILE = CONFIG_DIR / "config.yaml" - DOTFILES_DIR = DATA_DIR / "dotfiles" MODULES_DIR = DATA_DIR / "modules" PACKAGES_DIR = DATA_DIR / "packages" -SCRATCH_DIR = DATA_DIR / "scratch" -PROJECTS_DIR = HOME / "projects" LINKED_STATE = STATE_DIR / "linked.json" INSTALLED_STATE = STATE_DIR / "installed.json" # Self-hosted flow config path (from dotfiles repo) DOTFILES_FLOW_CONFIG = DOTFILES_DIR / "_shared" / "flow" / ".config" / "flow" -DOTFILES_MANIFEST = DOTFILES_FLOW_CONFIG / "manifest.yaml" -DOTFILES_CONFIG = DOTFILES_FLOW_CONFIG / "config.yaml" def ensure_dirs() -> None: - """Create all required directories if they don't exist.""" - for d in (CONFIG_DIR, DATA_DIR, STATE_DIR, MODULES_DIR, PACKAGES_DIR, SCRATCH_DIR): + """Create all required directories.""" + for d in (CONFIG_DIR, DATA_DIR, STATE_DIR, MODULES_DIR, PACKAGES_DIR): d.mkdir(parents=True, exist_ok=True) diff --git a/src/flow/core/platform.py b/src/flow/core/platform.py index 223de4d..ea603c5 100644 --- a/src/flow/core/platform.py +++ b/src/flow/core/platform.py @@ -1,18 +1,20 @@ -"""OS and architecture detection.""" +"""OS/arch detection and execution context.""" +import os import platform as _platform from dataclasses import dataclass +from flow.core.errors import FlowError -@dataclass + +@dataclass(frozen=True) class PlatformInfo: - os: str = "linux" # "linux" or "macos" - arch: str = "x64" # "x64" or "arm64" - platform: str = "" # "linux-x64", etc. + os: str = "linux" + arch: str = "x64" - def __post_init__(self): - if not self.platform: - self.platform = f"{self.os}-{self.arch}" + @property + def platform(self) -> str: + return f"{self.os}-{self.arch}" _OS_MAP = {"Darwin": "macos", "Linux": "linux"} @@ -23,11 +25,20 @@ def detect_platform() -> PlatformInfo: raw_os = _platform.system() os_name = _OS_MAP.get(raw_os) if os_name is None: - raise RuntimeError(f"Unsupported operating system: {raw_os}") + raise FlowError(f"Unsupported operating system: {raw_os}") raw_arch = _platform.machine().lower() arch = _ARCH_MAP.get(raw_arch) if arch is None: - raise RuntimeError(f"Unsupported architecture: {raw_arch}") + raise FlowError(f"Unsupported architecture: {raw_arch}") - return PlatformInfo(os=os_name, arch=arch, platform=f"{os_name}-{arch}") + return PlatformInfo(os=os_name, arch=arch) + + +def detect_context() -> str: + """Detect execution context: 'host', 'vm', or 'container'.""" + if os.path.exists("/.dockerenv") or os.path.exists("/run/.containerenv"): + return "container" + if os.environ.get("DF_NAMESPACE") and os.environ.get("DF_PLATFORM"): + return "vm" + return "host" diff --git a/src/flow/core/runtime.py b/src/flow/core/runtime.py new file mode 100644 index 0000000..77de6cd --- /dev/null +++ b/src/flow/core/runtime.py @@ -0,0 +1,228 @@ +"""Runtime primitives for process, git, state, and filesystem access.""" + +from __future__ import annotations + +import json +import shutil +import subprocess +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Iterable, Mapping, Optional, Sequence + +from flow.core.console import Console +from flow.core.errors import FlowError + + +class CommandRunner: + """Subprocess wrapper with consistent defaults.""" + + def run( + self, + argv: Sequence[str] | Iterable[str], + *, + cwd: Optional[Path] = None, + env: Optional[Mapping[str, str]] = None, + capture_output: bool = True, + check: bool = False, + timeout: Optional[float] = None, + ) -> subprocess.CompletedProcess[str]: + parts = [str(a) for a in argv] + completed = subprocess.run( + parts, + cwd=str(cwd) if cwd else None, + env=dict(env) if env else None, + capture_output=capture_output, + text=True, + check=False, + timeout=timeout, + ) + if check and completed.returncode != 0: + msg = completed.stderr.strip() or completed.stdout.strip() + if not msg: + msg = f"Command failed with exit code {completed.returncode}" + raise FlowError(msg) + return completed + + def run_shell( + self, + command: str, + *, + cwd: Optional[Path] = None, + env: Optional[Mapping[str, str]] = None, + capture_output: bool = True, + check: bool = False, + timeout: Optional[float] = None, + ) -> subprocess.CompletedProcess[str]: + completed = subprocess.run( + command, + shell=True, + cwd=str(cwd) if cwd else None, + env=dict(env) if env else None, + capture_output=capture_output, + text=True, + check=False, + timeout=timeout, + ) + if check and completed.returncode != 0: + msg = completed.stderr.strip() or completed.stdout.strip() + if not msg: + msg = f"Command failed with exit code {completed.returncode}" + raise FlowError(msg) + return completed + + def stream_shell( + self, + command: str, + console: Console, + *, + check: bool = True, + ) -> subprocess.CompletedProcess[str]: + process = subprocess.Popen( + command, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + ) + lines: list[str] = [] + assert process.stdout is not None + try: + for line in process.stdout: + stripped = line.rstrip() + if stripped: + lines.append(stripped) + finally: + process.stdout.close() + process.wait() + + if check and process.returncode != 0: + raise FlowError(f"Command failed (exit {process.returncode}): {command}") + + return subprocess.CompletedProcess(command, process.returncode, stdout="\n".join(lines), stderr="") + + def require_binary(self, name: str) -> str: + path = shutil.which(name) + if path is None: + raise FlowError(f"Required executable not found: {name}") + return path + + +class FileSystem: + """Filesystem wrapper for all mutating operations.""" + + def ensure_dir(self, path: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None, mode: Optional[int] = None) -> None: + if sudo: + if runner is None: + raise FlowError("Runner required for sudo operations") + runner.require_binary("sudo") + argv: list[str] = ["sudo", "mkdir", "-p"] + if mode is not None: + argv.extend(["-m", f"{mode:o}"]) + argv.append(str(path)) + runner.run(argv, check=True) + return + path.mkdir(parents=True, exist_ok=True) + if mode is not None: + path.chmod(mode) + + def remove_file(self, path: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None, missing_ok: bool = True) -> None: + if sudo: + if runner is None: + raise FlowError("Runner required for sudo operations") + argv = ["sudo", "rm"] + if missing_ok: + argv.append("-f") + argv.append(str(path)) + runner.run(argv, check=True) + return + try: + path.unlink() + except FileNotFoundError: + if not missing_ok: + raise + + def remove_tree(self, path: Path) -> None: + shutil.rmtree(path, ignore_errors=True) + + def copy_file(self, source: Path, target: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None) -> None: + if sudo: + if runner is None: + raise FlowError("Runner required for sudo operations") + self.ensure_dir(target.parent, sudo=True, runner=runner) + runner.run(["sudo", "cp", "-a", str(source), str(target)], check=True) + return + self.ensure_dir(target.parent) + shutil.copy2(source, target) + + def copy_tree(self, source: Path, target: Path) -> None: + self.ensure_dir(target.parent) + shutil.copytree(source, target, dirs_exist_ok=True) + + def create_symlink(self, source: Path, target: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None) -> None: + if sudo: + if runner is None: + raise FlowError("Runner required for sudo operations") + self.ensure_dir(target.parent, sudo=True, runner=runner) + runner.run(["sudo", "ln", "-sfn", str(source), str(target)], check=True) + return + self.ensure_dir(target.parent) + target.symlink_to(source) + + def same_symlink(self, target: Path, source: Path) -> bool: + if not target.is_symlink(): + return False + return target.resolve(strict=False) == source.resolve(strict=False) + + def read_text(self, path: Path, *, default: Optional[str] = None) -> str: + try: + return path.read_text(encoding="utf-8") + except FileNotFoundError: + if default is None: + raise + return default + + def write_text(self, path: Path, content: str) -> None: + self.ensure_dir(path.parent) + path.write_text(content, encoding="utf-8") + + def write_bytes(self, path: Path, content: bytes) -> None: + self.ensure_dir(path.parent) + path.write_bytes(content) + + def read_json(self, path: Path, *, default: Any = None) -> Any: + try: + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + except FileNotFoundError: + return default + + def write_json(self, path: Path, data: Any) -> None: + self.ensure_dir(path.parent) + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + + +class GitClient: + """Git adapter scoped to a repository root.""" + + def __init__(self, runner: CommandRunner): + self.runner = runner + + def run(self, repo_dir: Path, *args: str, capture_output: bool = True, check: bool = False) -> subprocess.CompletedProcess[str]: + return self.runner.run( + ["git", "-C", str(repo_dir), *args], + capture_output=capture_output, + check=check, + ) + + +@dataclass +class SystemRuntime: + """Shared runtime dependencies.""" + runner: CommandRunner = field(default_factory=CommandRunner) + fs: FileSystem = field(default_factory=FileSystem) + git: GitClient = field(init=False) + + def __post_init__(self) -> None: + self.git = GitClient(self.runner) diff --git a/src/flow/core/template.py b/src/flow/core/template.py new file mode 100644 index 0000000..0fe806e --- /dev/null +++ b/src/flow/core/template.py @@ -0,0 +1,58 @@ +"""Variable and template substitution -- pure functions, no I/O.""" + +import os +import re +from typing import Any, Dict + + +def substitute(text: Any, variables: Dict[str, str]) -> Any: + """Replace $VAR and ${VAR} with values from variables dict or env.""" + if not isinstance(text, str): + return text + + pattern = re.compile(r"\$(\w+)|\$\{([^}]+)\}") + + def _replace(match: re.Match[str]) -> str: + key = match.group(1) or match.group(2) or "" + if key in variables: + return str(variables[key]) + if key in os.environ: + return os.environ[key] + return match.group(0) + + return pattern.sub(_replace, text) + + +def _resolve_template_value(expr: str, context: Dict[str, Any]) -> Any: + if expr.startswith("env."): + env_key = expr.split(".", 1)[1] + env_ctx = context.get("env", {}) + if isinstance(env_ctx, dict) and env_key in env_ctx: + return env_ctx[env_key] + return os.environ.get(env_key) + + if expr in context: + return context[expr] + + current: Any = context + for part in expr.split("."): + if not isinstance(current, dict) or part not in current: + return None + current = current[part] + + return current + + +def substitute_template(text: Any, context: Dict[str, Any]) -> Any: + """Replace {{expr}} placeholders with values from context dict.""" + if not isinstance(text, str): + return text + + def _replace(match: re.Match[str]) -> str: + key = match.group(1).strip() + value = _resolve_template_value(key, context) + if value is None: + return match.group(0) + return str(value) + + return re.sub(r"\{\{\s*([^{}]+?)\s*\}\}", _replace, text) diff --git a/tests/test_core_config.py b/tests/test_core_config.py new file mode 100644 index 0000000..c663723 --- /dev/null +++ b/tests/test_core_config.py @@ -0,0 +1,77 @@ +"""Tests for flow.core.config.""" + +from pathlib import Path + +import pytest + +from flow.core.config import AppConfig, load_config, load_manifest + + +def test_load_config_missing_path(tmp_path): + cfg = load_config(tmp_path / "nonexistent") + assert isinstance(cfg, AppConfig) + assert cfg.dotfiles_url == "" + assert cfg.container_registry == "registry.tomastm.com" + + +def test_load_config_from_yaml(tmp_path): + (tmp_path / "config.yaml").write_text( + "repository:\n" + " url: git@github.com:user/dots.git\n" + " branch: dev\n" + "paths:\n" + " projects: ~/code\n" + "defaults:\n" + " container-registry: my.registry.com\n" + " tmux-session: main\n" + ) + cfg = load_config(tmp_path) + assert cfg.dotfiles_url == "git@github.com:user/dots.git" + assert cfg.dotfiles_branch == "dev" + assert cfg.projects_dir == "~/code" + assert cfg.container_registry == "my.registry.com" + assert cfg.tmux_session == "main" + + +def test_load_config_parses_targets_shorthand(tmp_path): + (tmp_path / "config.yaml").write_text( + "targets:\n" + " personal@orb: personal.orb\n" + ) + cfg = load_config(tmp_path) + assert len(cfg.targets) == 1 + assert cfg.targets[0].namespace == "personal" + assert cfg.targets[0].platform == "orb" + assert cfg.targets[0].host == "personal.orb" + + +def test_load_config_parses_targets_dict(tmp_path): + (tmp_path / "config.yaml").write_text( + "targets:\n" + " work@ec2:\n" + " host: work.ec2.internal\n" + " identity: ~/.ssh/id_work\n" + ) + cfg = load_config(tmp_path) + assert len(cfg.targets) == 1 + assert cfg.targets[0].host == "work.ec2.internal" + assert cfg.targets[0].identity == "~/.ssh/id_work" + + +def test_load_manifest_returns_dict(tmp_path): + (tmp_path / "manifest.yaml").write_text( + "packages:\n" + " - name: fd\n" + " type: pkg\n" + ) + data = load_manifest(tmp_path) + assert isinstance(data, dict) + assert "packages" in data + + +def test_load_manifest_merges_files(tmp_path): + (tmp_path / "01-packages.yaml").write_text("packages:\n - name: fd\n type: pkg\n") + (tmp_path / "02-profiles.yaml").write_text("profiles:\n work:\n os: linux\n") + data = load_manifest(tmp_path) + assert "packages" in data + assert "profiles" in data diff --git a/tests/test_core_console.py b/tests/test_core_console.py new file mode 100644 index 0000000..9c3e508 --- /dev/null +++ b/tests/test_core_console.py @@ -0,0 +1,38 @@ +"""Tests for flow.core.console.""" + +from flow.core.console import Console + + +def test_info_prints_message(capsys): + c = Console(color=False) + c.info("hello") + assert "hello" in capsys.readouterr().out + + +def test_quiet_suppresses_info(capsys): + c = Console(quiet=True, color=False) + c.info("hidden") + assert capsys.readouterr().out == "" + + +def test_quiet_does_not_suppress_error(capsys): + c = Console(quiet=True, color=False) + c.error("visible") + captured = capsys.readouterr() + assert "visible" in captured.err or "visible" in captured.out + + +def test_table_prints_headers_and_rows(capsys): + c = Console(color=False) + c.table(["NAME", "STATUS"], [["foo", "ok"], ["bar", "fail"]]) + output = capsys.readouterr().out + assert "NAME" in output + assert "foo" in output + assert "bar" in output + + +def test_no_color_strips_ansi(capsys): + c = Console(color=False) + c.info("test") + output = capsys.readouterr().out + assert "\033[" not in output diff --git a/tests/test_core_paths.py b/tests/test_core_paths.py new file mode 100644 index 0000000..90e8371 --- /dev/null +++ b/tests/test_core_paths.py @@ -0,0 +1,40 @@ +"""Tests for flow.core.paths.""" + +from pathlib import Path + +from flow.core import paths + + +def test_config_dir_ends_with_flow(): + assert paths.CONFIG_DIR.name == "flow" + + +def test_data_dir_ends_with_flow(): + assert paths.DATA_DIR.name == "flow" + + +def test_modules_dir_under_data(): + assert paths.MODULES_DIR.parent == paths.DATA_DIR + + +def test_linked_state_under_state(): + assert paths.LINKED_STATE.parent == paths.STATE_DIR + + +def test_dotfiles_flow_config_path(): + expected_suffix = Path("_shared") / "flow" / ".config" / "flow" + assert str(paths.DOTFILES_FLOW_CONFIG).endswith(str(expected_suffix)) + + +def test_ensure_dirs_creates_directories(tmp_path, monkeypatch): + monkeypatch.setattr(paths, "CONFIG_DIR", tmp_path / "config" / "flow") + monkeypatch.setattr(paths, "DATA_DIR", tmp_path / "data" / "flow") + monkeypatch.setattr(paths, "STATE_DIR", tmp_path / "state" / "flow") + monkeypatch.setattr(paths, "MODULES_DIR", tmp_path / "data" / "flow" / "modules") + monkeypatch.setattr(paths, "PACKAGES_DIR", tmp_path / "data" / "flow" / "packages") + + paths.ensure_dirs() + + assert (tmp_path / "config" / "flow").is_dir() + assert (tmp_path / "data" / "flow" / "modules").is_dir() + assert (tmp_path / "state" / "flow").is_dir() diff --git a/tests/test_core_platform.py b/tests/test_core_platform.py new file mode 100644 index 0000000..550f783 --- /dev/null +++ b/tests/test_core_platform.py @@ -0,0 +1,38 @@ +"""Tests for flow.core.platform.""" + +import os + +import pytest + +from flow.core.platform import PlatformInfo, detect_context, detect_platform + + +def test_platform_info_computes_platform_string(): + p = PlatformInfo(os="linux", arch="x64") + assert p.platform == "linux-x64" + + +def test_detect_platform_returns_valid_info(): + info = detect_platform() + assert info.os in ("linux", "macos") + assert info.arch in ("x64", "arm64") + assert info.platform == f"{info.os}-{info.arch}" + + +def test_detect_platform_raises_flow_error_on_unsupported(monkeypatch): + from flow.core.errors import FlowError + monkeypatch.setattr("platform.system", lambda: "FreeBSD") + with pytest.raises(FlowError, match="Unsupported operating system"): + detect_platform() + + +def test_detect_context_host(monkeypatch): + monkeypatch.delenv("DF_NAMESPACE", raising=False) + monkeypatch.delenv("DF_PLATFORM", raising=False) + assert detect_context() == "host" + + +def test_detect_context_vm(monkeypatch): + monkeypatch.setenv("DF_NAMESPACE", "personal") + monkeypatch.setenv("DF_PLATFORM", "orb") + assert detect_context() == "vm" diff --git a/tests/test_core_runtime.py b/tests/test_core_runtime.py new file mode 100644 index 0000000..ca6bfa2 --- /dev/null +++ b/tests/test_core_runtime.py @@ -0,0 +1,95 @@ +"""Tests for flow.core.runtime.""" + +from pathlib import Path + +from flow.core.runtime import CommandRunner, FileSystem, GitClient, SystemRuntime + + +class TestFileSystem: + def test_ensure_dir_creates_nested(self, tmp_path): + fs = FileSystem() + target = tmp_path / "a" / "b" / "c" + fs.ensure_dir(target) + assert target.is_dir() + + def test_write_and_read_text(self, tmp_path): + fs = FileSystem() + path = tmp_path / "test.txt" + fs.write_text(path, "hello") + assert fs.read_text(path) == "hello" + + def test_read_text_default(self, tmp_path): + fs = FileSystem() + path = tmp_path / "missing.txt" + assert fs.read_text(path, default="fallback") == "fallback" + + def test_write_and_read_json(self, tmp_path): + fs = FileSystem() + path = tmp_path / "data.json" + fs.write_json(path, {"key": "value"}) + assert fs.read_json(path) == {"key": "value"} + + def test_create_symlink(self, tmp_path): + fs = FileSystem() + source = tmp_path / "source" + source.write_text("content") + target = tmp_path / "link" + fs.create_symlink(source, target) + assert target.is_symlink() + assert target.resolve() == source.resolve() + + def test_same_symlink_true(self, tmp_path): + fs = FileSystem() + source = tmp_path / "source" + source.write_text("content") + target = tmp_path / "link" + target.symlink_to(source) + assert fs.same_symlink(target, source) is True + + def test_same_symlink_false(self, tmp_path): + fs = FileSystem() + source = tmp_path / "source" + source.write_text("content") + other = tmp_path / "other" + other.write_text("other") + target = tmp_path / "link" + target.symlink_to(other) + assert fs.same_symlink(target, source) is False + + def test_remove_file(self, tmp_path): + fs = FileSystem() + path = tmp_path / "file" + path.write_text("x") + fs.remove_file(path) + assert not path.exists() + + def test_remove_file_missing_ok(self, tmp_path): + fs = FileSystem() + fs.remove_file(tmp_path / "missing", missing_ok=True) # no error + + def test_copy_file(self, tmp_path): + fs = FileSystem() + src = tmp_path / "src" + src.write_text("data") + dst = tmp_path / "sub" / "dst" + fs.copy_file(src, dst) + assert dst.read_text() == "data" + + +class TestCommandRunner: + def test_run_echo(self): + runner = CommandRunner() + result = runner.run(["echo", "hello"], capture_output=True) + assert result.stdout.strip() == "hello" + + def test_require_binary_finds_echo(self): + runner = CommandRunner() + path = runner.require_binary("echo") + assert path is not None + + +class TestSystemRuntime: + def test_creates_git_client(self): + rt = SystemRuntime() + assert isinstance(rt.git, GitClient) + assert rt.git.runner is rt.runner diff --git a/tests/test_errors.py b/tests/test_errors.py new file mode 100644 index 0000000..c142f62 --- /dev/null +++ b/tests/test_errors.py @@ -0,0 +1,21 @@ +"""Tests for flow.core.errors.""" + +from flow.core.errors import ConfigError, ExecutionError, FlowError, PlanConflict + + +def test_flow_error_is_exception(): + assert issubclass(FlowError, Exception) + + +def test_config_error_is_flow_error(): + assert issubclass(ConfigError, FlowError) + + +def test_plan_conflict_carries_conflicts(): + err = PlanConflict("2 conflicts", ["a exists", "b exists"]) + assert str(err) == "2 conflicts" + assert err.conflicts == ["a exists", "b exists"] + + +def test_execution_error_is_flow_error(): + assert issubclass(ExecutionError, FlowError) diff --git a/tests/test_template.py b/tests/test_template.py new file mode 100644 index 0000000..1feac4c --- /dev/null +++ b/tests/test_template.py @@ -0,0 +1,46 @@ +"""Tests for flow.core.template.""" + +import os + +from flow.core.template import substitute, substitute_template + + +class TestSubstitute: + def test_replaces_dollar_var(self): + assert substitute("hello $NAME", {"NAME": "world"}) == "hello world" + + def test_replaces_braced_var(self): + assert substitute("hello ${NAME}", {"NAME": "world"}) == "hello world" + + def test_falls_back_to_env(self, monkeypatch): + monkeypatch.setenv("FOO", "bar") + assert substitute("$FOO", {}) == "bar" + + def test_preserves_unknown_vars(self): + assert substitute("$UNKNOWN", {}) == "$UNKNOWN" + + def test_non_string_passthrough(self): + assert substitute(42, {}) == 42 + + +class TestSubstituteTemplate: + def test_replaces_double_braces(self): + assert substitute_template("nvim-{{os}}", {"os": "linux"}) == "nvim-linux" + + def test_env_dot_notation(self, monkeypatch): + monkeypatch.setenv("USER", "tomas") + result = substitute_template("{{ env.USER }}", {"env": dict(os.environ)}) + assert result == "tomas" + + def test_nested_dict_lookup(self): + ctx = {"platform": {"arch": "arm64"}} + assert substitute_template("{{ platform.arch }}", ctx) == "arm64" + + def test_preserves_unknown_templates(self): + assert substitute_template("{{ unknown }}", {}) == "{{ unknown }}" + + def test_non_string_passthrough(self): + assert substitute_template(42, {}) == 42 + + def test_whitespace_in_braces(self): + assert substitute_template("{{ os }}", {"os": "linux"}) == "linux"