feat: rewrite core layer (errors, template, paths, platform, console, runtime, config)

Complete rewrite of all core modules with proper abstractions:
- FlowError hierarchy with PlanConflict and ExecutionError
- Pure template substitution ($VAR, ${VAR}, {{expr}})
- XDG path constants
- Frozen PlatformInfo dataclass with context detection
- Console with color/quiet/TTY support
- Runtime primitives (CommandRunner, FileSystem, GitClient, SystemRuntime)
- Config loading with target parsing and manifest merging

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-16 04:48:14 +02:00
parent 327201a0ed
commit 6bb41aa001
14 changed files with 823 additions and 407 deletions

View File

@@ -1,110 +1,47 @@
"""Configuration loading (merged YAML) and FlowContext.""" """Configuration loading (merged YAML) and FlowContext."""
from __future__ import annotations
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional from typing import Any, Optional
import yaml import yaml
from flow.core import paths from flow.core.console import Console
from flow.core.console import ConsoleLogger
from flow.core.platform import PlatformInfo from flow.core.platform import PlatformInfo
from flow.core.system import SystemRuntime from flow.core.runtime import SystemRuntime
@dataclass @dataclass(frozen=True)
class TargetConfig: class TargetConfig:
namespace: str namespace: str
platform: str platform: str
ssh_host: str host: str
ssh_identity: Optional[str] = None identity: Optional[str] = None
@dataclass @dataclass
class AppConfig: class AppConfig:
dotfiles_url: str = "" dotfiles_url: str = ""
dotfiles_branch: str = "main" dotfiles_branch: str = "main"
dotfiles_pull_before_edit: bool = True
projects_dir: str = "~/projects" projects_dir: str = "~/projects"
container_registry: str = "registry.tomastm.com" container_registry: str = "registry.tomastm.com"
container_tag: str = "latest" container_tag: str = "latest"
tmux_session: str = "default" tmux_session: str = "default"
targets: List[TargetConfig] = field(default_factory=list) targets: list[TargetConfig] = field(default_factory=list)
def _get_value(mapping: Any, *keys: str, default: Any = None) -> Any: @dataclass
if not isinstance(mapping, dict): class FlowContext:
return default config: AppConfig
for key in keys: manifest: dict[str, Any]
if key in mapping: platform: PlatformInfo
return mapping[key] console: Console
return default runtime: SystemRuntime = field(default_factory=SystemRuntime)
def _as_bool(value: Any, default: bool) -> bool: def _load_yaml_file(path: Path) -> dict[str, Any]:
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return bool(value)
if isinstance(value, str):
normalized = value.strip().lower()
if normalized in {"1", "true", "yes", "y", "on"}:
return True
if normalized in {"0", "false", "no", "n", "off"}:
return False
return default
def _parse_target_config(key: str, value: str) -> Optional[TargetConfig]:
"""Parse a target line from config-like syntax.
Supported formats:
1) namespace = platform ssh_host [ssh_identity]
2) namespace@platform = ssh_host [ssh_identity]
"""
parts = value.split()
if not parts:
return None
if "@" in key:
namespace, platform = key.split("@", 1)
ssh_host = parts[0]
ssh_identity = parts[1] if len(parts) > 1 else None
if not namespace or not platform:
return None
return TargetConfig(
namespace=namespace,
platform=platform,
ssh_host=ssh_host,
ssh_identity=ssh_identity,
)
if len(parts) < 2:
return None
return TargetConfig(
namespace=key,
platform=parts[0],
ssh_host=parts[1],
ssh_identity=parts[2] if len(parts) > 2 else None,
)
def _list_yaml_files(directory: Path) -> List[Path]:
if not directory.exists() or not directory.is_dir():
return []
files = []
for child in directory.iterdir():
if not child.is_file():
continue
if child.suffix.lower() in {".yaml", ".yml"}:
files.append(child)
return sorted(files, key=lambda p: p.name)
def _load_yaml_file(path: Path) -> Dict[str, Any]:
try: try:
with open(path, "r", encoding="utf-8") as handle: with open(path, "r", encoding="utf-8") as handle:
data = yaml.safe_load(handle) data = yaml.safe_load(handle)
@@ -120,201 +57,95 @@ def _load_yaml_file(path: Path) -> Dict[str, Any]:
return data return data
def _load_merged_yaml(directory: Path) -> Dict[str, Any]: def _parse_targets(raw: Any) -> list[TargetConfig]:
merged: Dict[str, Any] = {} if not isinstance(raw, dict):
for file_path in _list_yaml_files(directory): return []
merged.update(_load_yaml_file(file_path))
return merged
targets: list[TargetConfig] = []
for key, value in raw.items():
if "@" not in key:
continue
namespace, platform = key.split("@", 1)
if not namespace or not platform:
continue
def _resolve_default_yaml_root() -> Path:
# Priority 1: self-hosted config from linked dotfiles
if paths.DOTFILES_FLOW_CONFIG.exists() and _list_yaml_files(paths.DOTFILES_FLOW_CONFIG):
return paths.DOTFILES_FLOW_CONFIG
# Priority 2: local config directory
return paths.CONFIG_DIR
def _load_yaml_source(path: Path) -> Dict[str, Any]:
if not path.exists():
return {}
if path.is_file():
return _load_yaml_file(path)
if path.is_dir():
return _load_merged_yaml(path)
return {}
def _parse_targets(raw_targets: Any) -> List[TargetConfig]:
targets: List[TargetConfig] = []
if isinstance(raw_targets, dict):
for key, value in raw_targets.items():
if isinstance(value, str): if isinstance(value, str):
parsed = _parse_target_config(key, value) targets.append(TargetConfig(
if parsed is not None:
targets.append(parsed)
continue
if not isinstance(value, dict):
continue
namespace_from_key = key
platform_from_key = None
if "@" in key:
namespace_from_key, platform_from_key = key.split("@", 1)
namespace = str(
_get_value(
value,
"namespace",
default=namespace_from_key,
)
)
platform = str(
_get_value(
value,
"platform",
default=platform_from_key,
)
)
ssh_host = _get_value(value, "ssh_host", "ssh-host", "host", default="")
ssh_identity = _get_value(value, "ssh_identity", "ssh-identity", "identity")
if not namespace or not platform or not ssh_host:
continue
targets.append(
TargetConfig(
namespace=namespace, namespace=namespace,
platform=platform, platform=platform,
ssh_host=str(ssh_host), host=value,
ssh_identity=str(ssh_identity) if ssh_identity else None, ))
) elif isinstance(value, dict):
) host = value.get("host", "")
if not host:
elif isinstance(raw_targets, list):
for item in raw_targets:
if not isinstance(item, dict):
continue continue
identity = value.get("identity")
namespace = _get_value(item, "namespace") targets.append(TargetConfig(
platform = _get_value(item, "platform") namespace=namespace,
ssh_host = _get_value(item, "ssh_host", "ssh-host", "host") platform=platform,
ssh_identity = _get_value(item, "ssh_identity", "ssh-identity", "identity") host=str(host),
identity=str(identity) if identity is not None else None,
if not namespace or not platform or not ssh_host: ))
continue
targets.append(
TargetConfig(
namespace=str(namespace),
platform=str(platform),
ssh_host=str(ssh_host),
ssh_identity=str(ssh_identity) if ssh_identity else None,
)
)
return targets return targets
def load_manifest(path: Optional[Path] = None) -> Dict[str, Any]: def load_config(config_dir: Path) -> AppConfig:
"""Load merged YAML manifest/config data. """Load config.yaml from the given directory into AppConfig."""
config_file = config_dir / "config.yaml"
if not config_file.exists():
return AppConfig()
Default priority: data = _load_yaml_file(config_file)
1) ~/.local/share/flow/dotfiles/_shared/flow/.config/flow/*.y[a]ml
2) ~/.config/flow/*.y[a]ml
"""
source = path if path is not None else _resolve_default_yaml_root()
assert source is not None
data = _load_yaml_source(source)
return data if isinstance(data, dict) else {}
def load_config(path: Optional[Path] = None) -> AppConfig:
"""Load merged YAML config into AppConfig."""
source = path if path is not None else _resolve_default_yaml_root()
assert source is not None
merged = _load_yaml_source(source)
cfg = AppConfig() cfg = AppConfig()
if not isinstance(merged, dict):
return cfg
repository = merged.get("repository") if isinstance(merged.get("repository"), dict) else {} repository = data.get("repository")
paths_section = merged.get("paths") if isinstance(merged.get("paths"), dict) else {} if isinstance(repository, dict):
defaults = merged.get("defaults") if isinstance(merged.get("defaults"), dict) else {} url = repository.get("url")
if url is not None:
cfg.dotfiles_url = str(url)
branch = repository.get("branch")
if branch is not None:
cfg.dotfiles_branch = str(branch)
cfg.dotfiles_url = str( paths_section = data.get("paths")
_get_value( if isinstance(paths_section, dict):
repository, projects = paths_section.get("projects")
"dotfiles_url", if projects is not None:
"dotfiles-url", cfg.projects_dir = str(projects)
default=merged.get("dotfiles_url", cfg.dotfiles_url),
) defaults = data.get("defaults")
) if isinstance(defaults, dict):
cfg.dotfiles_branch = str( registry = defaults.get("container-registry")
_get_value( if registry is not None:
repository, cfg.container_registry = str(registry)
"dotfiles_branch", tmux = defaults.get("tmux-session")
"dotfiles-branch", if tmux is not None:
default=merged.get("dotfiles_branch", cfg.dotfiles_branch), cfg.tmux_session = str(tmux)
)
) raw_targets = data.get("targets")
cfg.dotfiles_pull_before_edit = _as_bool( if raw_targets is not None:
_get_value( cfg.targets = _parse_targets(raw_targets)
repository,
"pull_before_edit",
"pull-before-edit",
default=merged.get("dotfiles_pull_before_edit", cfg.dotfiles_pull_before_edit),
),
cfg.dotfiles_pull_before_edit,
)
cfg.projects_dir = str(
_get_value(
paths_section,
"projects_dir",
"projects-dir",
default=merged.get("projects_dir", cfg.projects_dir),
)
)
cfg.container_registry = str(
_get_value(
defaults,
"container_registry",
"container-registry",
default=merged.get("container_registry", cfg.container_registry),
)
)
cfg.container_tag = str(
_get_value(
defaults,
"container_tag",
"container-tag",
default=merged.get("container_tag", cfg.container_tag),
)
)
cfg.tmux_session = str(
_get_value(
defaults,
"tmux_session",
"tmux-session",
default=merged.get("tmux_session", cfg.tmux_session),
)
)
cfg.targets = _parse_targets(merged.get("targets", {}))
return cfg return cfg
@dataclass def load_manifest(manifest_dir: Path) -> dict[str, Any]:
class FlowContext: """Load manifest.yaml or merge all *.yaml files from the directory."""
config: AppConfig if not manifest_dir.exists():
manifest: Dict[str, Any] return {}
platform: PlatformInfo
console: ConsoleLogger manifest_file = manifest_dir / "manifest.yaml"
runtime: SystemRuntime = field(default_factory=SystemRuntime) if manifest_file.exists():
return _load_yaml_file(manifest_file)
merged: dict[str, Any] = {}
yaml_files = sorted(
(f for f in manifest_dir.iterdir() if f.is_file() and f.suffix in {".yaml", ".yml"}),
key=lambda p: p.name,
)
for path in yaml_files:
merged.update(_load_yaml_file(path))
return merged

View File

@@ -1,138 +1,63 @@
"""Console output formatting — ported from dotfiles_v2/src/console_logger.py.""" """Console output formatting with TTY detection and color control."""
import time import os
from typing import Optional import sys
from typing import Any, Optional
class ConsoleLogger: class Console:
# Color constants def __init__(self, *, quiet: bool = False, color: Optional[bool] = None):
BLUE = "\033[34m" self.quiet = quiet
GREEN = "\033[32m" if color is None:
YELLOW = "\033[33m" self._color = os.isatty(sys.stdout.fileno()) if hasattr(sys.stdout, "fileno") else False
RED = "\033[31m"
CYAN = "\033[36m"
GRAY = "\033[90m"
DARK_GRAY = "\033[2;37m"
BOLD = "\033[1m"
DIM = "\033[2m"
RESET = "\033[0m"
# Box drawing characters
BOX_VERTICAL = "\u2502"
BOX_HORIZONTAL = "\u2500"
BOX_TOP_LEFT = "\u250c"
BOX_TOP_RIGHT = "\u2510"
BOX_BOTTOM_LEFT = "\u2514"
BOX_BOTTOM_RIGHT = "\u2518"
def __init__(self):
self.step_counter = 0
self.start_time = None
def info(self, message: str):
print(f"{self.CYAN}[INFO]{self.RESET} {message}")
def warn(self, message: str):
print(f"{self.YELLOW}[WARN]{self.RESET} {message}")
def error(self, message: str):
print(f"{self.RED}[ERROR]{self.RESET} {message}")
def success(self, message: str):
print(f"{self.GREEN}[SUCCESS]{self.RESET} {message}")
def step_start(self, current: int, total: int, description: str):
print(
f"\n{self.BOLD}{self.BLUE}Step {current}/{total}:{self.RESET} "
f"{self.BOLD}{description}{self.RESET}"
)
print(f"{self.BLUE}{self.BOX_HORIZONTAL * 4}{self.RESET} {self.GRAY}Starting...{self.RESET}")
self.start_time = time.time()
def step_command(self, command: str):
print(f"{self.BLUE}{self.BOX_VERTICAL} {self.RESET}{self.GRAY}$ {command}{self.RESET}")
def step_output(self, line: str):
if line.strip():
print(f"{self.BLUE}{self.BOX_VERTICAL} {self.RESET}{self.DARK_GRAY} {line.rstrip()}{self.RESET}")
def step_complete(self, message: str = "Completed successfully"):
elapsed = time.time() - self.start_time if self.start_time else 0
print(f"{self.BLUE}{self.BOX_VERTICAL} {self.RESET}{self.GREEN}> {message} ({elapsed:.1f}s){self.RESET}")
def step_skip(self, message: str):
elapsed = time.time() - self.start_time if self.start_time else 0
print(
f"{self.BLUE}{self.BOX_VERTICAL} {self.RESET}"
f"{self.YELLOW}> Skipped: {message} ({elapsed:.1f}s){self.RESET}"
)
def step_fail(self, message: str):
elapsed = time.time() - self.start_time if self.start_time else 0
print(
f"{self.BLUE}{self.BOX_VERTICAL} {self.RESET}"
f"{self.RED}> Failed: {message} ({elapsed:.1f}s){self.RESET}"
)
def section_header(self, title: str, subtitle: str = ""):
width = 70
print(f"\n{self.BOLD}{self.BLUE}{'=' * width}{self.RESET}")
if subtitle:
print(f"{self.BOLD}{self.BLUE} {title.upper()} - {subtitle}{self.RESET}")
else: else:
print(f"{self.BOLD}{self.BLUE} {title.upper()}{self.RESET}") self._color = color
print(f"{self.BOLD}{self.BLUE}{'=' * width}{self.RESET}")
def section_summary(self, title: str): def _style(self, code: str, text: str) -> str:
width = 70 if not self._color:
print(f"\n{self.BOLD}{self.GREEN}{'=' * width}{self.RESET}") return text
print(f"{self.BOLD}{self.GREEN} {title.upper()}{self.RESET}") return f"{code}{text}\033[0m"
print(f"{self.BOLD}{self.GREEN}{'=' * width}{self.RESET}")
def plan_header(self, title: str, count: int): def info(self, msg: str) -> None:
width = 70 if self.quiet:
print(f"\n{self.BOLD}{self.CYAN}{'=' * width}{self.RESET}") return
print(f"{self.BOLD}{self.CYAN} {title.upper()} ({count} actions){self.RESET}") tag = self._style("\033[36m", "[INFO]")
print(f"{self.BOLD}{self.CYAN}{'=' * width}{self.RESET}") print(f"{tag} {msg}")
def plan_category(self, category: str): def warn(self, msg: str) -> None:
print(f"\n{self.BOLD}{self.CYAN}{category.upper()}{self.RESET}") tag = self._style("\033[33m", "[WARN]")
print(f"{self.CYAN}{'-' * 20}{self.RESET}") print(f"{tag} {msg}")
def plan_item(self, number: int, description: str, os_filter: Optional[str] = None, critical: bool = False): def error(self, msg: str) -> None:
os_indicator = f" {self.GRAY}({os_filter}){self.RESET}" if os_filter else "" tag = self._style("\033[31m", "[ERROR]")
error_indicator = f" {self.RED}(critical){self.RESET}" if critical else "" print(f"{tag} {msg}", file=sys.stderr)
print(f" {number:2d}. {description}{os_indicator}{error_indicator}")
def plan_legend(self): def success(self, msg: str) -> None:
print( tag = self._style("\033[32m", "[OK]")
f"\n{self.GRAY}Legend: {self.RED}(critical){self.GRAY} = stops on failure, " print(f"{tag} {msg}")
f"{self.GRAY}(os){self.GRAY} = OS-specific{self.RESET}"
)
def table(self, headers: list[str], rows: list[list[str]]): def table(self, headers: list[str], rows: list[list[str]]) -> None:
"""Print a formatted table."""
if not rows: if not rows:
return return
widths = [len(h) for h in headers]
normalized_headers = [str(h) for h in headers] for row in rows:
normalized_rows = [[str(cell) for cell in row] for row in rows]
# Calculate column widths
widths = [len(h) for h in normalized_headers]
for row in normalized_rows:
for i, cell in enumerate(row): for i, cell in enumerate(row):
if i < len(widths): if i < len(widths):
widths[i] = max(widths[i], len(cell)) widths[i] = max(widths[i], len(str(cell)))
# Header header_line = " ".join(f"{h:<{widths[i]}}" for i, h in enumerate(headers))
header_line = " ".join( if self._color:
f"{self.BOLD}{h:<{widths[i]}}{self.RESET}" for i, h in enumerate(normalized_headers) print(f"\033[1m{header_line}\033[0m")
) else:
print(header_line) print(header_line)
print(self.GRAY + " ".join("-" * w for w in widths) + self.RESET) print(" ".join("-" * w for w in widths))
for row in rows:
print(" ".join(f"{str(cell):<{widths[i]}}" for i, cell in enumerate(row)))
# Rows def print_plan(self, operations: list[Any], *, verb: str = "execute") -> None:
for row in normalized_rows: if not operations:
line = " ".join(f"{cell:<{widths[i]}}" for i, cell in enumerate(row)) self.info(f"Nothing to {verb}.")
print(line) return
self.info(f"Plan ({len(operations)} operation(s)):")
for op in operations:
print(f" {op}")

View File

@@ -1,6 +1,21 @@
"""Project-wide exception types.""" """Project-wide error types."""
class FlowError(RuntimeError): class FlowError(Exception):
"""A user-facing operational error.""" """Base for all user-facing errors."""
class ConfigError(FlowError):
"""Invalid config or manifest YAML."""
class PlanConflict(FlowError):
"""Conflicts detected during planning."""
def __init__(self, message: str, conflicts: list[str]):
super().__init__(message)
self.conflicts = conflicts
class ExecutionError(FlowError):
"""A plan step failed during execution."""

View File

@@ -14,25 +14,18 @@ CONFIG_DIR = _xdg("XDG_CONFIG_HOME", str(HOME / ".config")) / "flow"
DATA_DIR = _xdg("XDG_DATA_HOME", str(HOME / ".local" / "share")) / "flow" DATA_DIR = _xdg("XDG_DATA_HOME", str(HOME / ".local" / "share")) / "flow"
STATE_DIR = _xdg("XDG_STATE_HOME", str(HOME / ".local" / "state")) / "flow" STATE_DIR = _xdg("XDG_STATE_HOME", str(HOME / ".local" / "state")) / "flow"
MANIFEST_FILE = CONFIG_DIR / "manifest.yaml"
CONFIG_FILE = CONFIG_DIR / "config.yaml"
DOTFILES_DIR = DATA_DIR / "dotfiles" DOTFILES_DIR = DATA_DIR / "dotfiles"
MODULES_DIR = DATA_DIR / "modules" MODULES_DIR = DATA_DIR / "modules"
PACKAGES_DIR = DATA_DIR / "packages" PACKAGES_DIR = DATA_DIR / "packages"
SCRATCH_DIR = DATA_DIR / "scratch"
PROJECTS_DIR = HOME / "projects"
LINKED_STATE = STATE_DIR / "linked.json" LINKED_STATE = STATE_DIR / "linked.json"
INSTALLED_STATE = STATE_DIR / "installed.json" INSTALLED_STATE = STATE_DIR / "installed.json"
# Self-hosted flow config path (from dotfiles repo) # Self-hosted flow config path (from dotfiles repo)
DOTFILES_FLOW_CONFIG = DOTFILES_DIR / "_shared" / "flow" / ".config" / "flow" DOTFILES_FLOW_CONFIG = DOTFILES_DIR / "_shared" / "flow" / ".config" / "flow"
DOTFILES_MANIFEST = DOTFILES_FLOW_CONFIG / "manifest.yaml"
DOTFILES_CONFIG = DOTFILES_FLOW_CONFIG / "config.yaml"
def ensure_dirs() -> None: def ensure_dirs() -> None:
"""Create all required directories if they don't exist.""" """Create all required directories."""
for d in (CONFIG_DIR, DATA_DIR, STATE_DIR, MODULES_DIR, PACKAGES_DIR, SCRATCH_DIR): for d in (CONFIG_DIR, DATA_DIR, STATE_DIR, MODULES_DIR, PACKAGES_DIR):
d.mkdir(parents=True, exist_ok=True) d.mkdir(parents=True, exist_ok=True)

View File

@@ -1,18 +1,20 @@
"""OS and architecture detection.""" """OS/arch detection and execution context."""
import os
import platform as _platform import platform as _platform
from dataclasses import dataclass from dataclasses import dataclass
from flow.core.errors import FlowError
@dataclass
@dataclass(frozen=True)
class PlatformInfo: class PlatformInfo:
os: str = "linux" # "linux" or "macos" os: str = "linux"
arch: str = "x64" # "x64" or "arm64" arch: str = "x64"
platform: str = "" # "linux-x64", etc.
def __post_init__(self): @property
if not self.platform: def platform(self) -> str:
self.platform = f"{self.os}-{self.arch}" return f"{self.os}-{self.arch}"
_OS_MAP = {"Darwin": "macos", "Linux": "linux"} _OS_MAP = {"Darwin": "macos", "Linux": "linux"}
@@ -23,11 +25,20 @@ def detect_platform() -> PlatformInfo:
raw_os = _platform.system() raw_os = _platform.system()
os_name = _OS_MAP.get(raw_os) os_name = _OS_MAP.get(raw_os)
if os_name is None: if os_name is None:
raise RuntimeError(f"Unsupported operating system: {raw_os}") raise FlowError(f"Unsupported operating system: {raw_os}")
raw_arch = _platform.machine().lower() raw_arch = _platform.machine().lower()
arch = _ARCH_MAP.get(raw_arch) arch = _ARCH_MAP.get(raw_arch)
if arch is None: if arch is None:
raise RuntimeError(f"Unsupported architecture: {raw_arch}") raise FlowError(f"Unsupported architecture: {raw_arch}")
return PlatformInfo(os=os_name, arch=arch, platform=f"{os_name}-{arch}") return PlatformInfo(os=os_name, arch=arch)
def detect_context() -> str:
"""Detect execution context: 'host', 'vm', or 'container'."""
if os.path.exists("/.dockerenv") or os.path.exists("/run/.containerenv"):
return "container"
if os.environ.get("DF_NAMESPACE") and os.environ.get("DF_PLATFORM"):
return "vm"
return "host"

228
src/flow/core/runtime.py Normal file
View File

@@ -0,0 +1,228 @@
"""Runtime primitives for process, git, state, and filesystem access."""
from __future__ import annotations
import json
import shutil
import subprocess
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Iterable, Mapping, Optional, Sequence
from flow.core.console import Console
from flow.core.errors import FlowError
class CommandRunner:
"""Subprocess wrapper with consistent defaults."""
def run(
self,
argv: Sequence[str] | Iterable[str],
*,
cwd: Optional[Path] = None,
env: Optional[Mapping[str, str]] = None,
capture_output: bool = True,
check: bool = False,
timeout: Optional[float] = None,
) -> subprocess.CompletedProcess[str]:
parts = [str(a) for a in argv]
completed = subprocess.run(
parts,
cwd=str(cwd) if cwd else None,
env=dict(env) if env else None,
capture_output=capture_output,
text=True,
check=False,
timeout=timeout,
)
if check and completed.returncode != 0:
msg = completed.stderr.strip() or completed.stdout.strip()
if not msg:
msg = f"Command failed with exit code {completed.returncode}"
raise FlowError(msg)
return completed
def run_shell(
self,
command: str,
*,
cwd: Optional[Path] = None,
env: Optional[Mapping[str, str]] = None,
capture_output: bool = True,
check: bool = False,
timeout: Optional[float] = None,
) -> subprocess.CompletedProcess[str]:
completed = subprocess.run(
command,
shell=True,
cwd=str(cwd) if cwd else None,
env=dict(env) if env else None,
capture_output=capture_output,
text=True,
check=False,
timeout=timeout,
)
if check and completed.returncode != 0:
msg = completed.stderr.strip() or completed.stdout.strip()
if not msg:
msg = f"Command failed with exit code {completed.returncode}"
raise FlowError(msg)
return completed
def stream_shell(
self,
command: str,
console: Console,
*,
check: bool = True,
) -> subprocess.CompletedProcess[str]:
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
lines: list[str] = []
assert process.stdout is not None
try:
for line in process.stdout:
stripped = line.rstrip()
if stripped:
lines.append(stripped)
finally:
process.stdout.close()
process.wait()
if check and process.returncode != 0:
raise FlowError(f"Command failed (exit {process.returncode}): {command}")
return subprocess.CompletedProcess(command, process.returncode, stdout="\n".join(lines), stderr="")
def require_binary(self, name: str) -> str:
path = shutil.which(name)
if path is None:
raise FlowError(f"Required executable not found: {name}")
return path
class FileSystem:
"""Filesystem wrapper for all mutating operations."""
def ensure_dir(self, path: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None, mode: Optional[int] = None) -> None:
if sudo:
if runner is None:
raise FlowError("Runner required for sudo operations")
runner.require_binary("sudo")
argv: list[str] = ["sudo", "mkdir", "-p"]
if mode is not None:
argv.extend(["-m", f"{mode:o}"])
argv.append(str(path))
runner.run(argv, check=True)
return
path.mkdir(parents=True, exist_ok=True)
if mode is not None:
path.chmod(mode)
def remove_file(self, path: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None, missing_ok: bool = True) -> None:
if sudo:
if runner is None:
raise FlowError("Runner required for sudo operations")
argv = ["sudo", "rm"]
if missing_ok:
argv.append("-f")
argv.append(str(path))
runner.run(argv, check=True)
return
try:
path.unlink()
except FileNotFoundError:
if not missing_ok:
raise
def remove_tree(self, path: Path) -> None:
shutil.rmtree(path, ignore_errors=True)
def copy_file(self, source: Path, target: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None) -> None:
if sudo:
if runner is None:
raise FlowError("Runner required for sudo operations")
self.ensure_dir(target.parent, sudo=True, runner=runner)
runner.run(["sudo", "cp", "-a", str(source), str(target)], check=True)
return
self.ensure_dir(target.parent)
shutil.copy2(source, target)
def copy_tree(self, source: Path, target: Path) -> None:
self.ensure_dir(target.parent)
shutil.copytree(source, target, dirs_exist_ok=True)
def create_symlink(self, source: Path, target: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None) -> None:
if sudo:
if runner is None:
raise FlowError("Runner required for sudo operations")
self.ensure_dir(target.parent, sudo=True, runner=runner)
runner.run(["sudo", "ln", "-sfn", str(source), str(target)], check=True)
return
self.ensure_dir(target.parent)
target.symlink_to(source)
def same_symlink(self, target: Path, source: Path) -> bool:
if not target.is_symlink():
return False
return target.resolve(strict=False) == source.resolve(strict=False)
def read_text(self, path: Path, *, default: Optional[str] = None) -> str:
try:
return path.read_text(encoding="utf-8")
except FileNotFoundError:
if default is None:
raise
return default
def write_text(self, path: Path, content: str) -> None:
self.ensure_dir(path.parent)
path.write_text(content, encoding="utf-8")
def write_bytes(self, path: Path, content: bytes) -> None:
self.ensure_dir(path.parent)
path.write_bytes(content)
def read_json(self, path: Path, *, default: Any = None) -> Any:
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except FileNotFoundError:
return default
def write_json(self, path: Path, data: Any) -> None:
self.ensure_dir(path.parent)
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
class GitClient:
"""Git adapter scoped to a repository root."""
def __init__(self, runner: CommandRunner):
self.runner = runner
def run(self, repo_dir: Path, *args: str, capture_output: bool = True, check: bool = False) -> subprocess.CompletedProcess[str]:
return self.runner.run(
["git", "-C", str(repo_dir), *args],
capture_output=capture_output,
check=check,
)
@dataclass
class SystemRuntime:
"""Shared runtime dependencies."""
runner: CommandRunner = field(default_factory=CommandRunner)
fs: FileSystem = field(default_factory=FileSystem)
git: GitClient = field(init=False)
def __post_init__(self) -> None:
self.git = GitClient(self.runner)

58
src/flow/core/template.py Normal file
View File

@@ -0,0 +1,58 @@
"""Variable and template substitution -- pure functions, no I/O."""
import os
import re
from typing import Any, Dict
def substitute(text: Any, variables: Dict[str, str]) -> Any:
"""Replace $VAR and ${VAR} with values from variables dict or env."""
if not isinstance(text, str):
return text
pattern = re.compile(r"\$(\w+)|\$\{([^}]+)\}")
def _replace(match: re.Match[str]) -> str:
key = match.group(1) or match.group(2) or ""
if key in variables:
return str(variables[key])
if key in os.environ:
return os.environ[key]
return match.group(0)
return pattern.sub(_replace, text)
def _resolve_template_value(expr: str, context: Dict[str, Any]) -> Any:
if expr.startswith("env."):
env_key = expr.split(".", 1)[1]
env_ctx = context.get("env", {})
if isinstance(env_ctx, dict) and env_key in env_ctx:
return env_ctx[env_key]
return os.environ.get(env_key)
if expr in context:
return context[expr]
current: Any = context
for part in expr.split("."):
if not isinstance(current, dict) or part not in current:
return None
current = current[part]
return current
def substitute_template(text: Any, context: Dict[str, Any]) -> Any:
"""Replace {{expr}} placeholders with values from context dict."""
if not isinstance(text, str):
return text
def _replace(match: re.Match[str]) -> str:
key = match.group(1).strip()
value = _resolve_template_value(key, context)
if value is None:
return match.group(0)
return str(value)
return re.sub(r"\{\{\s*([^{}]+?)\s*\}\}", _replace, text)

77
tests/test_core_config.py Normal file
View File

@@ -0,0 +1,77 @@
"""Tests for flow.core.config."""
from pathlib import Path
import pytest
from flow.core.config import AppConfig, load_config, load_manifest
def test_load_config_missing_path(tmp_path):
cfg = load_config(tmp_path / "nonexistent")
assert isinstance(cfg, AppConfig)
assert cfg.dotfiles_url == ""
assert cfg.container_registry == "registry.tomastm.com"
def test_load_config_from_yaml(tmp_path):
(tmp_path / "config.yaml").write_text(
"repository:\n"
" url: git@github.com:user/dots.git\n"
" branch: dev\n"
"paths:\n"
" projects: ~/code\n"
"defaults:\n"
" container-registry: my.registry.com\n"
" tmux-session: main\n"
)
cfg = load_config(tmp_path)
assert cfg.dotfiles_url == "git@github.com:user/dots.git"
assert cfg.dotfiles_branch == "dev"
assert cfg.projects_dir == "~/code"
assert cfg.container_registry == "my.registry.com"
assert cfg.tmux_session == "main"
def test_load_config_parses_targets_shorthand(tmp_path):
(tmp_path / "config.yaml").write_text(
"targets:\n"
" personal@orb: personal.orb\n"
)
cfg = load_config(tmp_path)
assert len(cfg.targets) == 1
assert cfg.targets[0].namespace == "personal"
assert cfg.targets[0].platform == "orb"
assert cfg.targets[0].host == "personal.orb"
def test_load_config_parses_targets_dict(tmp_path):
(tmp_path / "config.yaml").write_text(
"targets:\n"
" work@ec2:\n"
" host: work.ec2.internal\n"
" identity: ~/.ssh/id_work\n"
)
cfg = load_config(tmp_path)
assert len(cfg.targets) == 1
assert cfg.targets[0].host == "work.ec2.internal"
assert cfg.targets[0].identity == "~/.ssh/id_work"
def test_load_manifest_returns_dict(tmp_path):
(tmp_path / "manifest.yaml").write_text(
"packages:\n"
" - name: fd\n"
" type: pkg\n"
)
data = load_manifest(tmp_path)
assert isinstance(data, dict)
assert "packages" in data
def test_load_manifest_merges_files(tmp_path):
(tmp_path / "01-packages.yaml").write_text("packages:\n - name: fd\n type: pkg\n")
(tmp_path / "02-profiles.yaml").write_text("profiles:\n work:\n os: linux\n")
data = load_manifest(tmp_path)
assert "packages" in data
assert "profiles" in data

View File

@@ -0,0 +1,38 @@
"""Tests for flow.core.console."""
from flow.core.console import Console
def test_info_prints_message(capsys):
c = Console(color=False)
c.info("hello")
assert "hello" in capsys.readouterr().out
def test_quiet_suppresses_info(capsys):
c = Console(quiet=True, color=False)
c.info("hidden")
assert capsys.readouterr().out == ""
def test_quiet_does_not_suppress_error(capsys):
c = Console(quiet=True, color=False)
c.error("visible")
captured = capsys.readouterr()
assert "visible" in captured.err or "visible" in captured.out
def test_table_prints_headers_and_rows(capsys):
c = Console(color=False)
c.table(["NAME", "STATUS"], [["foo", "ok"], ["bar", "fail"]])
output = capsys.readouterr().out
assert "NAME" in output
assert "foo" in output
assert "bar" in output
def test_no_color_strips_ansi(capsys):
c = Console(color=False)
c.info("test")
output = capsys.readouterr().out
assert "\033[" not in output

40
tests/test_core_paths.py Normal file
View File

@@ -0,0 +1,40 @@
"""Tests for flow.core.paths."""
from pathlib import Path
from flow.core import paths
def test_config_dir_ends_with_flow():
assert paths.CONFIG_DIR.name == "flow"
def test_data_dir_ends_with_flow():
assert paths.DATA_DIR.name == "flow"
def test_modules_dir_under_data():
assert paths.MODULES_DIR.parent == paths.DATA_DIR
def test_linked_state_under_state():
assert paths.LINKED_STATE.parent == paths.STATE_DIR
def test_dotfiles_flow_config_path():
expected_suffix = Path("_shared") / "flow" / ".config" / "flow"
assert str(paths.DOTFILES_FLOW_CONFIG).endswith(str(expected_suffix))
def test_ensure_dirs_creates_directories(tmp_path, monkeypatch):
monkeypatch.setattr(paths, "CONFIG_DIR", tmp_path / "config" / "flow")
monkeypatch.setattr(paths, "DATA_DIR", tmp_path / "data" / "flow")
monkeypatch.setattr(paths, "STATE_DIR", tmp_path / "state" / "flow")
monkeypatch.setattr(paths, "MODULES_DIR", tmp_path / "data" / "flow" / "modules")
monkeypatch.setattr(paths, "PACKAGES_DIR", tmp_path / "data" / "flow" / "packages")
paths.ensure_dirs()
assert (tmp_path / "config" / "flow").is_dir()
assert (tmp_path / "data" / "flow" / "modules").is_dir()
assert (tmp_path / "state" / "flow").is_dir()

View File

@@ -0,0 +1,38 @@
"""Tests for flow.core.platform."""
import os
import pytest
from flow.core.platform import PlatformInfo, detect_context, detect_platform
def test_platform_info_computes_platform_string():
p = PlatformInfo(os="linux", arch="x64")
assert p.platform == "linux-x64"
def test_detect_platform_returns_valid_info():
info = detect_platform()
assert info.os in ("linux", "macos")
assert info.arch in ("x64", "arm64")
assert info.platform == f"{info.os}-{info.arch}"
def test_detect_platform_raises_flow_error_on_unsupported(monkeypatch):
from flow.core.errors import FlowError
monkeypatch.setattr("platform.system", lambda: "FreeBSD")
with pytest.raises(FlowError, match="Unsupported operating system"):
detect_platform()
def test_detect_context_host(monkeypatch):
monkeypatch.delenv("DF_NAMESPACE", raising=False)
monkeypatch.delenv("DF_PLATFORM", raising=False)
assert detect_context() == "host"
def test_detect_context_vm(monkeypatch):
monkeypatch.setenv("DF_NAMESPACE", "personal")
monkeypatch.setenv("DF_PLATFORM", "orb")
assert detect_context() == "vm"

View File

@@ -0,0 +1,95 @@
"""Tests for flow.core.runtime."""
from pathlib import Path
from flow.core.runtime import CommandRunner, FileSystem, GitClient, SystemRuntime
class TestFileSystem:
def test_ensure_dir_creates_nested(self, tmp_path):
fs = FileSystem()
target = tmp_path / "a" / "b" / "c"
fs.ensure_dir(target)
assert target.is_dir()
def test_write_and_read_text(self, tmp_path):
fs = FileSystem()
path = tmp_path / "test.txt"
fs.write_text(path, "hello")
assert fs.read_text(path) == "hello"
def test_read_text_default(self, tmp_path):
fs = FileSystem()
path = tmp_path / "missing.txt"
assert fs.read_text(path, default="fallback") == "fallback"
def test_write_and_read_json(self, tmp_path):
fs = FileSystem()
path = tmp_path / "data.json"
fs.write_json(path, {"key": "value"})
assert fs.read_json(path) == {"key": "value"}
def test_create_symlink(self, tmp_path):
fs = FileSystem()
source = tmp_path / "source"
source.write_text("content")
target = tmp_path / "link"
fs.create_symlink(source, target)
assert target.is_symlink()
assert target.resolve() == source.resolve()
def test_same_symlink_true(self, tmp_path):
fs = FileSystem()
source = tmp_path / "source"
source.write_text("content")
target = tmp_path / "link"
target.symlink_to(source)
assert fs.same_symlink(target, source) is True
def test_same_symlink_false(self, tmp_path):
fs = FileSystem()
source = tmp_path / "source"
source.write_text("content")
other = tmp_path / "other"
other.write_text("other")
target = tmp_path / "link"
target.symlink_to(other)
assert fs.same_symlink(target, source) is False
def test_remove_file(self, tmp_path):
fs = FileSystem()
path = tmp_path / "file"
path.write_text("x")
fs.remove_file(path)
assert not path.exists()
def test_remove_file_missing_ok(self, tmp_path):
fs = FileSystem()
fs.remove_file(tmp_path / "missing", missing_ok=True) # no error
def test_copy_file(self, tmp_path):
fs = FileSystem()
src = tmp_path / "src"
src.write_text("data")
dst = tmp_path / "sub" / "dst"
fs.copy_file(src, dst)
assert dst.read_text() == "data"
class TestCommandRunner:
def test_run_echo(self):
runner = CommandRunner()
result = runner.run(["echo", "hello"], capture_output=True)
assert result.stdout.strip() == "hello"
def test_require_binary_finds_echo(self):
runner = CommandRunner()
path = runner.require_binary("echo")
assert path is not None
class TestSystemRuntime:
def test_creates_git_client(self):
rt = SystemRuntime()
assert isinstance(rt.git, GitClient)
assert rt.git.runner is rt.runner

21
tests/test_errors.py Normal file
View File

@@ -0,0 +1,21 @@
"""Tests for flow.core.errors."""
from flow.core.errors import ConfigError, ExecutionError, FlowError, PlanConflict
def test_flow_error_is_exception():
assert issubclass(FlowError, Exception)
def test_config_error_is_flow_error():
assert issubclass(ConfigError, FlowError)
def test_plan_conflict_carries_conflicts():
err = PlanConflict("2 conflicts", ["a exists", "b exists"])
assert str(err) == "2 conflicts"
assert err.conflicts == ["a exists", "b exists"]
def test_execution_error_is_flow_error():
assert issubclass(ExecutionError, FlowError)

46
tests/test_template.py Normal file
View File

@@ -0,0 +1,46 @@
"""Tests for flow.core.template."""
import os
from flow.core.template import substitute, substitute_template
class TestSubstitute:
def test_replaces_dollar_var(self):
assert substitute("hello $NAME", {"NAME": "world"}) == "hello world"
def test_replaces_braced_var(self):
assert substitute("hello ${NAME}", {"NAME": "world"}) == "hello world"
def test_falls_back_to_env(self, monkeypatch):
monkeypatch.setenv("FOO", "bar")
assert substitute("$FOO", {}) == "bar"
def test_preserves_unknown_vars(self):
assert substitute("$UNKNOWN", {}) == "$UNKNOWN"
def test_non_string_passthrough(self):
assert substitute(42, {}) == 42
class TestSubstituteTemplate:
def test_replaces_double_braces(self):
assert substitute_template("nvim-{{os}}", {"os": "linux"}) == "nvim-linux"
def test_env_dot_notation(self, monkeypatch):
monkeypatch.setenv("USER", "tomas")
result = substitute_template("{{ env.USER }}", {"env": dict(os.environ)})
assert result == "tomas"
def test_nested_dict_lookup(self):
ctx = {"platform": {"arch": "arm64"}}
assert substitute_template("{{ platform.arch }}", ctx) == "arm64"
def test_preserves_unknown_templates(self):
assert substitute_template("{{ unknown }}", {}) == "{{ unknown }}"
def test_non_string_passthrough(self):
assert substitute_template(42, {}) == 42
def test_whitespace_in_braces(self):
assert substitute_template("{{ os }}", {"os": "linux"}) == "linux"