feat: add all domain layers (dotfiles, packages, remote, containers)

- Dotfiles: models, module resolution, path resolution, link planning
- Packages: models, catalog parsing, resolution, install/remove planning
- Remote: target parsing, SSH command building
- Containers: image refs, mount resolution, container specs

All domain code is pure functions + frozen dataclasses. 88 tests.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-16 04:54:06 +02:00
parent 6bb41aa001
commit 31d7583b9a
26 changed files with 1894 additions and 0 deletions

View File

View File

View File

@@ -0,0 +1,40 @@
"""Container domain models."""
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
@dataclass(frozen=True)
class ImageRef:
"""A container image reference."""
registry: str
name: str
tag: str
@property
def full(self) -> str:
return f"{self.registry}/{self.name}:{self.tag}"
@dataclass(frozen=True)
class Mount:
"""A bind mount for a container."""
source: Path
target: str
readonly: bool = False
def to_flag(self) -> str:
opt = ":ro" if self.readonly else ""
return f"-v {self.source}:{self.target}{opt}"
@dataclass(frozen=True)
class ContainerSpec:
"""Full container run specification."""
name: str
image: ImageRef
mounts: tuple[Mount, ...]
env: dict[str, str] = field(default_factory=dict)
extra_flags: tuple[str, ...] = ()
command: Optional[str] = None

View File

@@ -0,0 +1,100 @@
"""Container resolution -- pure functions."""
from pathlib import Path
from typing import Optional
from flow.core.errors import FlowError
from flow.domain.containers.models import ContainerSpec, ImageRef, Mount
def parse_image_ref(
image: str,
default_registry: str = "registry.tomastm.com",
default_tag: str = "latest",
) -> ImageRef:
"""Parse image string into ImageRef."""
# Handle full registry/name:tag
if ":" in image:
base, tag = image.rsplit(":", 1)
else:
base = image
tag = default_tag
if "/" in base:
# Has registry
parts = base.split("/", 1)
registry = parts[0]
name = parts[1]
else:
registry = default_registry
name = base
return ImageRef(registry=registry, name=name, tag=tag)
def container_name(namespace: str, image_name: str) -> str:
"""Compute container name from namespace and image."""
return f"flow-{namespace}-{image_name}"
def resolve_mounts(
home: Path,
projects_dir: str,
dotfiles_dir: Optional[Path] = None,
extra_mounts: Optional[list[dict]] = None,
) -> list[Mount]:
"""Resolve standard container mounts."""
mounts: list[Mount] = []
# Projects dir
projects = Path(projects_dir).expanduser()
if projects.exists():
mounts.append(Mount(source=projects, target="/home/user/projects"))
# SSH agent
ssh_auth = Path.home() / ".ssh"
if ssh_auth.exists():
mounts.append(Mount(source=ssh_auth, target="/home/user/.ssh", readonly=True))
# Dotfiles
if dotfiles_dir and dotfiles_dir.exists():
mounts.append(Mount(source=dotfiles_dir, target="/home/user/.local/share/flow/dotfiles", readonly=True))
# Extra mounts from config
if extra_mounts:
for m in extra_mounts:
source = Path(str(m.get("source", ""))).expanduser()
target = str(m.get("target", ""))
if source and target:
mounts.append(Mount(
source=source,
target=target,
readonly=bool(m.get("readonly", False)),
))
return mounts
def build_container_spec(
namespace: str,
image_ref: ImageRef,
mounts: list[Mount],
env: Optional[dict[str, str]] = None,
command: Optional[str] = None,
) -> ContainerSpec:
"""Build a complete container run specification."""
name = container_name(namespace, image_ref.name)
container_env = {
"DF_NAMESPACE": namespace,
"DF_PLATFORM": "container",
}
if env:
container_env.update(env)
return ContainerSpec(
name=name,
image=image_ref,
mounts=tuple(mounts),
env=container_env,
command=command,
)

View File

View File

@@ -0,0 +1,121 @@
"""Dotfiles domain models -- all frozen dataclasses."""
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
@dataclass(frozen=True)
class ModuleRef:
"""An external git repo providing content for a package subtree."""
source: str
ref_type: str # "branch" | "tag" | "commit"
ref_value: str
mount_path: Path # Relative path within package to _module.yaml parent
cache_dir: Path # Where the repo is cloned
module_files: tuple[tuple[Path, Path], ...] # (abs_source, rel_to_cache_root)
@dataclass(frozen=True)
class Package:
"""A dotfiles package: a named set of files mapping to home-relative targets."""
name: str # e.g. "zsh", "nvim"
layer: str # "_shared" or profile name
package_id: str # "layer/name"
source_dir: Path # Absolute path in dotfiles repo
module: Optional[ModuleRef]
local_files: tuple[tuple[Path, Path], ...] # (abs_source, rel_to_package_root)
@dataclass(frozen=True)
class LinkTarget:
"""A single file that should be linked into the filesystem."""
source: Path
target: Path
package: str # package_id
from_module: bool
needs_sudo: bool
@dataclass(frozen=True)
class LinkOp:
"""A single operation in a link plan."""
type: str # "create_link" | "remove_link" | "create_dir"
target: Path
source: Optional[Path]
package: str
needs_sudo: bool
def __str__(self) -> str:
if self.type == "create_link":
sudo = " (sudo)" if self.needs_sudo else ""
return f"LINK: {self.target} -> {self.source}{sudo}"
if self.type == "remove_link":
return f"REMOVE: {self.target}"
if self.type == "create_dir":
return f"MKDIR: {self.target}"
return f"{self.type}: {self.target}"
@dataclass(frozen=True)
class PlanSummary:
added: int
removed: int
unchanged: int
from_modules: int
@dataclass(frozen=True)
class LinkPlan:
"""Complete reconciliation plan."""
operations: list[LinkOp]
conflicts: list[str]
summary: PlanSummary
@dataclass
class LinkedState:
"""Persisted link state."""
links: dict[Path, LinkTarget] = field(default_factory=dict)
def as_dict(self) -> dict:
grouped: dict[str, dict[str, dict]] = {}
for target, lt in sorted(self.links.items(), key=lambda x: str(x[0])):
pkg_links = grouped.setdefault(lt.package, {})
pkg_links[str(target)] = {
"source": str(lt.source),
"from_module": lt.from_module,
"needs_sudo": lt.needs_sudo,
}
return {"version": 2, "links": grouped}
@classmethod
def from_dict(cls, data: dict) -> "LinkedState":
version = data.get("version")
if version is not None and version != 2:
from flow.core.errors import ConfigError
raise ConfigError(
f"Unsupported linked.json version {version}. "
"Delete ~/.local/state/flow/linked.json and relink."
)
links: dict[Path, LinkTarget] = {}
raw_links = data.get("links", {})
for package, pkg_links in raw_links.items():
for target_str, info in pkg_links.items():
links[Path(target_str)] = LinkTarget(
source=Path(info["source"]),
target=Path(target_str),
package=str(package),
from_module=bool(info.get("from_module", False)),
needs_sudo=bool(info.get("needs_sudo", False)),
)
return cls(links=links)
@dataclass(frozen=True)
class RepoInfo:
"""A managed git repo (dotfiles or module)."""
name: str
path: Path
source: str
is_module: bool

View File

@@ -0,0 +1,57 @@
"""Module metadata resolution -- pure functions."""
from pathlib import Path
from flow.core.errors import ConfigError
from flow.domain.dotfiles.models import ModuleRef
def compute_mount_path(module_yaml: Path, package_dir: Path) -> Path:
"""Relative path from package root to _module.yaml parent."""
rel = module_yaml.parent.relative_to(package_dir)
return rel
def module_cache_dir(package_id: str, modules_base: Path) -> Path:
"""Cache dir for a module clone. '/' -> '--' to avoid collisions."""
return modules_base / package_id.replace("/", "--")
def normalize_source(source: str) -> str:
"""Normalize git source URL. github:org/repo -> https://github.com/org/repo.git"""
if source.startswith("github:"):
repo = source.split(":", 1)[1]
return f"https://github.com/{repo}.git"
return source
def parse_module_ref(
raw: dict,
package_id: str,
mount_path: Path,
modules_base: Path,
) -> ModuleRef:
"""Build ModuleRef from parsed _module.yaml content."""
source = raw.get("source")
if not isinstance(source, str) or not source:
raise ConfigError(f"Module for {package_id}: 'source' must be a non-empty string")
ref = raw.get("ref")
if not isinstance(ref, dict):
raise ConfigError(f"Module for {package_id}: 'ref' must be a mapping")
choices = [k for k in ("branch", "tag", "commit") if isinstance(ref.get(k), str) and ref[k]]
if len(choices) != 1:
raise ConfigError(f"Module for {package_id}: 'ref' must have exactly one of: branch, tag, commit")
ref_type = choices[0]
ref_value = str(ref[ref_type])
return ModuleRef(
source=normalize_source(source),
ref_type=ref_type,
ref_value=ref_value,
mount_path=mount_path,
cache_dir=module_cache_dir(package_id, modules_base),
module_files=(), # Populated by service after cloning
)

View File

@@ -0,0 +1,119 @@
"""Link/unlink plan computation. Pure functions with injected I/O."""
from pathlib import Path
from typing import Callable, Optional
from flow.domain.dotfiles.models import (
LinkOp,
LinkPlan,
LinkTarget,
LinkedState,
PlanSummary,
)
def plan_link(
desired: list[LinkTarget],
current: LinkedState,
filesystem_check: Callable[[Path], Optional[str]],
) -> LinkPlan:
"""Build reconciliation plan.
filesystem_check: injected by service. Returns "file", "dir", "symlink", or None.
"""
ops: list[LinkOp] = []
conflicts: list[str] = []
added = 0
removed = 0
unchanged = 0
from_modules = 0
desired_map = {t.target: t for t in desired}
desired_targets = set(desired_map.keys())
current_targets = set(current.links.keys())
# Removals: in current but not desired
for target in sorted(current_targets - desired_targets):
ops.append(LinkOp(
type="remove_link", target=target, source=None,
package=current.links[target].package,
needs_sudo=current.links[target].needs_sudo,
))
removed += 1
# Additions, updates, and unchanged
for target in sorted(desired_targets):
spec = desired_map[target]
if target in current.links:
cur = current.links[target]
if cur.source == spec.source:
unchanged += 1
if spec.from_module:
from_modules += 1
continue
# Source changed: remove old link, then create new one
ops.append(LinkOp(
type="remove_link", target=target, source=cur.source,
package=cur.package, needs_sudo=cur.needs_sudo,
))
ops.append(LinkOp(
type="create_link", target=target, source=spec.source,
package=spec.package, needs_sudo=spec.needs_sudo,
))
added += 1
if spec.from_module:
from_modules += 1
continue
# New target: check filesystem for conflicts
fs_state = filesystem_check(target)
if fs_state is not None:
conflicts.append(
f"{target} already exists ({fs_state}) and is not managed by flow"
)
continue
ops.append(LinkOp(
type="create_link", target=target, source=spec.source,
package=spec.package, needs_sudo=spec.needs_sudo,
))
added += 1
if spec.from_module:
from_modules += 1
return LinkPlan(
operations=ops,
conflicts=conflicts,
summary=PlanSummary(
added=added, removed=removed,
unchanged=unchanged, from_modules=from_modules,
),
)
def plan_unlink(
current: LinkedState,
packages: Optional[list[str]],
) -> LinkPlan:
"""Plan removal of managed links."""
ops: list[LinkOp] = []
for target in sorted(current.links.keys()):
lt = current.links[target]
if packages is not None:
# Match by full package_id or by basename
basename = lt.package.split("/", 1)[-1] if "/" in lt.package else lt.package
if lt.package not in packages and basename not in packages:
continue
ops.append(LinkOp(
type="remove_link", target=target, source=lt.source,
package=lt.package, needs_sudo=lt.needs_sudo,
))
return LinkPlan(
operations=ops,
conflicts=[],
summary=PlanSummary(added=0, removed=len(ops), unchanged=0, from_modules=0),
)

View File

@@ -0,0 +1,100 @@
"""Path resolution: package -> home-relative LinkTargets. Pure functions."""
from pathlib import Path
from flow.core.errors import PlanConflict
from flow.domain.dotfiles.models import LinkTarget, Package
RESERVED_ROOT = "_root"
MODULE_FILE = "_module.yaml"
def resolve_package_targets(
package: Package,
home: Path,
skip: set[str],
) -> list[LinkTarget]:
"""Resolve all LinkTargets for a package, handling modules correctly."""
if package.name in skip:
return []
targets: list[LinkTarget] = []
mount_path = package.module.mount_path if package.module else None
# Local files (from dotfiles repo)
for abs_source, rel in package.local_files:
# Skip _module.yaml
if rel.name == MODULE_FILE:
continue
# If module exists, skip files inside mount_path (module provides those)
if mount_path is not None:
if mount_path == Path("."):
continue # Root-level module: all local files are superseded by module
try:
rel.relative_to(mount_path)
continue # Inside mount_path, skip
except ValueError:
pass # Outside mount_path, process normally
target, needs_sudo = _resolve_target(rel, home, skip)
if target is None:
continue
targets.append(LinkTarget(
source=abs_source, target=target,
package=package.package_id,
from_module=False, needs_sudo=needs_sudo,
))
# Module files
if package.module:
for abs_source, rel in package.module.module_files:
mounted = package.module.mount_path / rel if package.module.mount_path != Path(".") else rel
target, needs_sudo = _resolve_target(mounted, home, skip)
if target is None:
continue
targets.append(LinkTarget(
source=abs_source, target=target,
package=package.package_id,
from_module=True, needs_sudo=needs_sudo,
))
return targets
def _resolve_target(rel: Path, home: Path, skip: set[str]) -> tuple[Path | None, bool]:
"""Resolve a relative path to an absolute target. Returns (target, needs_sudo)."""
parts = rel.parts
if parts and parts[0] == RESERVED_ROOT:
if RESERVED_ROOT in skip:
return None, False
if len(parts) < 2:
return None, False
return Path("/") / Path(*parts[1:]), True
return home / rel, False
def resolve_all_targets(
packages: list[Package],
home: Path,
skip: set[str],
) -> list[LinkTarget]:
"""Resolve targets for all packages. Raises PlanConflict on duplicate targets."""
all_targets: list[LinkTarget] = []
seen: dict[Path, str] = {}
for pkg in packages:
targets = resolve_package_targets(pkg, home, skip)
for t in targets:
if t.target in seen:
conflicts = [
f"{t.target} claimed by both {seen[t.target]} and {t.package}"
]
raise PlanConflict(
f"Conflicting dotfile targets across packages",
conflicts,
)
seen[t.target] = t.package
all_targets.append(t)
return all_targets

View File

View File

@@ -0,0 +1,85 @@
"""Package catalog: parsing manifest into PackageDef objects."""
from typing import Any, Optional
from flow.core.errors import ConfigError
from flow.domain.packages.models import PackageDef, ProfilePackageRef
def parse_catalog(manifest: dict[str, Any]) -> dict[str, PackageDef]:
"""Parse packages from manifest into a name->PackageDef dict."""
catalog: dict[str, PackageDef] = {}
raw_packages = manifest.get("packages", [])
if isinstance(raw_packages, list):
for entry in raw_packages:
pkg = _parse_package_entry(entry)
catalog[pkg.name] = pkg
elif isinstance(raw_packages, dict):
for name, entry in raw_packages.items():
if isinstance(entry, dict):
entry["name"] = name
pkg = _parse_package_entry(entry)
else:
pkg = PackageDef(
name=name, type="pkg", sources={},
source=None, version=None, asset_pattern=None,
platform_map={}, extract_dir=None, install={},
post_install=None, allow_sudo=False,
)
catalog[pkg.name] = pkg
return catalog
def _parse_package_entry(entry: dict[str, Any]) -> PackageDef:
name = entry.get("name")
if not name:
raise ConfigError("Package entry missing 'name'")
pkg_type = entry.get("type", "pkg")
sources: dict[str, str] = {}
# Parse sources from various keys
for pm in ("apt", "dnf", "brew"):
if pm in entry:
sources[pm] = str(entry[pm])
if "sources" in entry and isinstance(entry["sources"], dict):
sources.update(entry["sources"])
return PackageDef(
name=name,
type=pkg_type,
sources=sources,
source=entry.get("source"),
version=entry.get("version"),
asset_pattern=entry.get("asset-pattern") or entry.get("asset_pattern"),
platform_map=entry.get("platform-map") or entry.get("platform_map") or {},
extract_dir=entry.get("extract-dir") or entry.get("extract_dir"),
install=entry.get("install") or {},
post_install=entry.get("post-install") or entry.get("post_install"),
allow_sudo=bool(entry.get("allow-sudo", False)),
)
def normalize_profile_entry(entry: Any) -> ProfilePackageRef:
"""Normalize a profile package entry to ProfilePackageRef."""
if isinstance(entry, str):
# Could be "binary/neovim" or just "neovim"
if "/" in entry:
pkg_type, name = entry.split("/", 1)
return ProfilePackageRef(name=name, type=pkg_type, source=None, version=None, asset_pattern=None)
return ProfilePackageRef(name=entry, type=None, source=None, version=None, asset_pattern=None)
if isinstance(entry, dict):
name = entry.get("name", "")
return ProfilePackageRef(
name=name,
type=entry.get("type"),
source=entry.get("source"),
version=entry.get("version"),
asset_pattern=entry.get("asset-pattern") or entry.get("asset_pattern"),
)
raise ConfigError(f"Invalid profile package entry: {entry}")

View File

@@ -0,0 +1,109 @@
"""Package domain models -- all frozen dataclasses."""
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Optional
@dataclass(frozen=True)
class PackageDef:
"""A package definition from the manifest."""
name: str
type: str # "pkg" | "binary" | "appimage" | "script"
sources: dict[str, str] # pm_name -> package_name
source: Optional[str] # direct URL or github shorthand
version: Optional[str]
asset_pattern: Optional[str]
platform_map: dict[str, str] # platform -> asset suffix
extract_dir: Optional[str]
install: dict[str, Any] # install config overrides
post_install: Optional[str]
allow_sudo: bool
@dataclass(frozen=True)
class ProfilePackageRef:
"""A package reference from a profile."""
name: str
type: Optional[str]
source: Optional[str]
version: Optional[str]
asset_pattern: Optional[str]
@dataclass(frozen=True)
class PkgInstallOp:
"""A single package install operation."""
package: PackageDef
method: str # "pm" | "binary" | "appimage" | "script"
source_name: str # resolved pm package name or URL
download_url: Optional[str]
def __str__(self) -> str:
if self.method == "pm":
return f"INSTALL (pkg): {self.source_name}"
return f"INSTALL ({self.method}): {self.package.name} from {self.source_name}"
@dataclass(frozen=True)
class PkgRemoveOp:
"""A single package remove operation."""
name: str
type: str
files: list[Path]
def __str__(self) -> str:
return f"REMOVE: {self.name} ({len(self.files)} file(s))"
@dataclass(frozen=True)
class PackagePlan:
"""Complete package install/remove plan."""
install_ops: list[PkgInstallOp]
remove_ops: list[PkgRemoveOp]
pm_update_needed: bool
pm_command: Optional[str]
@dataclass
class InstalledPackage:
"""A tracked installed package."""
name: str
version: str
type: str
files: list[Path] = field(default_factory=list)
@dataclass
class InstalledState:
"""Persisted installed package state."""
packages: dict[str, InstalledPackage] = field(default_factory=dict)
def as_dict(self) -> dict:
pkgs = {}
for name, pkg in sorted(self.packages.items()):
pkgs[name] = {
"version": pkg.version,
"type": pkg.type,
"files": [str(f) for f in pkg.files],
}
return {"version": 1, "packages": pkgs}
@classmethod
def from_dict(cls, data: dict) -> "InstalledState":
version = data.get("version")
if version is not None and version != 1:
from flow.core.errors import ConfigError
raise ConfigError(
f"Unsupported installed.json version {version}. "
"Delete ~/.local/state/flow/installed.json and reinstall."
)
packages: dict[str, InstalledPackage] = {}
for name, info in data.get("packages", {}).items():
packages[name] = InstalledPackage(
name=name,
version=str(info.get("version", "")),
type=str(info.get("type", "")),
files=[Path(f) for f in info.get("files", [])],
)
return cls(packages=packages)

View File

@@ -0,0 +1,92 @@
"""Package install/remove planning. Pure functions."""
from typing import Optional
from flow.domain.packages.models import (
InstalledState,
PackageDef,
PackagePlan,
PkgInstallOp,
PkgRemoveOp,
)
from flow.domain.packages.resolution import (
detect_package_manager,
pm_install_command,
resolve_binary_asset,
resolve_download_url,
resolve_source_name,
)
def plan_install(
packages: list[PackageDef],
installed: InstalledState,
platform_str: str,
pm: Optional[str] = None,
) -> PackagePlan:
"""Plan installation of packages."""
if pm is None:
pm = detect_package_manager()
install_ops: list[PkgInstallOp] = []
pm_packages: list[str] = []
for pkg in packages:
if pkg.name in installed.packages:
continue # Already installed
if pkg.type == "pkg":
source_name = resolve_source_name(pkg, pm)
pm_packages.append(source_name)
install_ops.append(PkgInstallOp(
package=pkg, method="pm",
source_name=source_name, download_url=None,
))
elif pkg.type in ("binary", "appimage"):
try:
asset = resolve_binary_asset(pkg, platform_str)
url = resolve_download_url(pkg, asset)
except Exception:
asset = pkg.name
url = None
install_ops.append(PkgInstallOp(
package=pkg, method=pkg.type,
source_name=asset, download_url=url,
))
elif pkg.type == "script":
install_ops.append(PkgInstallOp(
package=pkg, method="script",
source_name=pkg.source or pkg.name,
download_url=pkg.source,
))
pm_cmd = pm_install_command(pm, pm_packages) if pm and pm_packages else None
return PackagePlan(
install_ops=install_ops,
remove_ops=[],
pm_update_needed=bool(pm_packages),
pm_command=pm_cmd,
)
def plan_remove(
package_names: list[str],
installed: InstalledState,
) -> PackagePlan:
"""Plan removal of packages."""
remove_ops: list[PkgRemoveOp] = []
for name in package_names:
if name in installed.packages:
pkg = installed.packages[name]
remove_ops.append(PkgRemoveOp(
name=name, type=pkg.type, files=pkg.files,
))
return PackagePlan(
install_ops=[],
remove_ops=remove_ops,
pm_update_needed=False,
pm_command=None,
)

View File

@@ -0,0 +1,119 @@
"""Package resolution: resolving what to install and how."""
import shutil
from typing import Optional
from flow.core.errors import FlowError
from flow.domain.packages.models import PackageDef, ProfilePackageRef
def resolve_spec(
ref: ProfilePackageRef,
catalog: dict[str, PackageDef],
) -> PackageDef:
"""Merge profile ref with catalog entry."""
base = catalog.get(ref.name)
if base is None:
# Not in catalog: create minimal def from profile ref
return PackageDef(
name=ref.name,
type=ref.type or "pkg",
sources={},
source=ref.source,
version=ref.version,
asset_pattern=ref.asset_pattern,
platform_map={},
extract_dir=None,
install={},
post_install=None,
allow_sudo=False,
)
# Merge: profile overrides catalog
return PackageDef(
name=base.name,
type=ref.type or base.type,
sources=base.sources,
source=ref.source or base.source,
version=ref.version or base.version,
asset_pattern=ref.asset_pattern or base.asset_pattern,
platform_map=base.platform_map,
extract_dir=base.extract_dir,
install=base.install,
post_install=base.post_install,
allow_sudo=base.allow_sudo,
)
def resolve_source_name(pkg: PackageDef, pm: Optional[str]) -> str:
"""Resolve the package name for a given package manager."""
if pm and pm in pkg.sources:
return pkg.sources[pm]
return pkg.name
def resolve_binary_asset(pkg: PackageDef, platform_str: str) -> str:
"""Resolve the binary asset filename for a platform."""
if platform_str in pkg.platform_map:
return pkg.platform_map[platform_str]
if pkg.asset_pattern:
os_name, arch = platform_str.split("-", 1)
return pkg.asset_pattern.replace("{os}", os_name).replace("{arch}", arch)
raise FlowError(f"No asset mapping for {pkg.name} on {platform_str}")
def resolve_download_url(pkg: PackageDef, asset: str) -> str:
"""Build download URL from source + asset."""
source = pkg.source
if not source:
raise FlowError(f"No source URL for {pkg.name}")
if source.startswith("github:"):
repo = source.split(":", 1)[1]
version = pkg.version or "latest"
if version == "latest":
return f"https://github.com/{repo}/releases/latest/download/{asset}"
return f"https://github.com/{repo}/releases/download/{version}/{asset}"
if source.startswith(("http://", "https://")):
if source.endswith("/"):
return f"{source}{asset}"
return source
return source
def detect_package_manager() -> Optional[str]:
"""Detect the system package manager."""
if shutil.which("apt-get"):
return "apt"
if shutil.which("dnf"):
return "dnf"
if shutil.which("brew"):
return "brew"
return None
def pm_update_command(pm: str) -> str:
"""Return the package manager update command."""
commands = {
"apt": "sudo apt-get update -qq",
"dnf": "sudo dnf check-update -q || true",
"brew": "brew update",
}
if pm not in commands:
raise FlowError(f"Unsupported package manager: {pm}")
return commands[pm]
def pm_install_command(pm: str, packages: list[str]) -> str:
"""Return the package manager install command."""
pkg_str = " ".join(packages)
commands = {
"apt": f"sudo apt-get install -y -qq {pkg_str}",
"dnf": f"sudo dnf install -y -q {pkg_str}",
"brew": f"brew install {pkg_str}",
}
if pm not in commands:
raise FlowError(f"Unsupported package manager: {pm}")
return commands[pm]

View File

View File

@@ -0,0 +1,24 @@
"""Remote domain models."""
from dataclasses import dataclass, field
from typing import Optional
@dataclass(frozen=True)
class Target:
"""A resolved remote target."""
namespace: str
platform: str
host: str
identity: Optional[str] = None
@property
def label(self) -> str:
return f"{self.namespace}@{self.platform}"
@dataclass(frozen=True)
class SSHCommand:
"""A constructed SSH command."""
argv: tuple[str, ...]
env: dict[str, str] = field(default_factory=dict)

View File

@@ -0,0 +1,88 @@
"""Remote target resolution -- pure functions."""
from typing import Optional
from flow.core.config import TargetConfig
from flow.core.errors import FlowError
from flow.domain.remote.models import SSHCommand, Target
def parse_target(spec: str) -> tuple[str, str]:
"""Parse 'namespace@platform' into (namespace, platform)."""
if "@" not in spec:
raise FlowError(f"Invalid target format: {spec!r}. Expected 'namespace@platform'")
namespace, platform = spec.split("@", 1)
if not namespace or not platform:
raise FlowError(f"Invalid target format: {spec!r}. Both namespace and platform required")
return namespace, platform
def resolve_target(
spec: str,
targets: list[TargetConfig],
) -> Target:
"""Resolve a target spec against configured targets."""
namespace, platform = parse_target(spec)
for t in targets:
if t.namespace == namespace and t.platform == platform:
return Target(
namespace=t.namespace,
platform=t.platform,
host=t.host,
identity=t.identity,
)
raise FlowError(f"Unknown target: {spec}. Check flow config targets section.")
def build_ssh_command(
target: Target,
*,
extra_args: Optional[list[str]] = None,
remote_command: Optional[str] = None,
) -> SSHCommand:
"""Build SSH command for a target."""
argv: list[str] = ["ssh"]
if target.identity:
argv.extend(["-i", target.identity])
# Standard SSH options
argv.extend(["-o", "StrictHostKeyChecking=accept-new"])
if extra_args:
argv.extend(extra_args)
argv.append(target.host)
if remote_command:
argv.append(remote_command)
env = {
"DF_NAMESPACE": target.namespace,
"DF_PLATFORM": target.platform,
}
return SSHCommand(argv=tuple(argv), env=env)
def terminfo_fix_command(term: str = "xterm-256color") -> list[str]:
"""Commands to fix terminfo on remote host."""
return [
f"infocmp -x {term} > /tmp/{term}.terminfo",
f"ssh TARGET tic -x /tmp/{term}.terminfo",
]
def list_targets(targets: list[TargetConfig]) -> list[Target]:
"""Convert config targets to domain targets."""
return [
Target(
namespace=t.namespace,
platform=t.platform,
host=t.host,
identity=t.identity,
)
for t in targets
]

View File

@@ -0,0 +1,80 @@
"""Tests for containers domain."""
from pathlib import Path
from flow.domain.containers.models import ContainerSpec, ImageRef, Mount
from flow.domain.containers.resolution import (
build_container_spec,
container_name,
parse_image_ref,
resolve_mounts,
)
class TestParseImageRef:
def test_simple_name(self):
ref = parse_image_ref("devbox")
assert ref.registry == "registry.tomastm.com"
assert ref.name == "devbox"
assert ref.tag == "latest"
def test_with_tag(self):
ref = parse_image_ref("devbox:v2")
assert ref.tag == "v2"
def test_full_ref(self):
ref = parse_image_ref("ghcr.io/user/image:main")
assert ref.registry == "ghcr.io"
assert ref.name == "user/image"
assert ref.tag == "main"
def test_full_image_string(self):
ref = parse_image_ref("devbox")
assert ref.full == "registry.tomastm.com/devbox:latest"
class TestContainerName:
def test_basic(self):
assert container_name("personal", "devbox") == "flow-personal-devbox"
class TestResolveMounts:
def test_projects_mount(self, tmp_path):
projects = tmp_path / "projects"
projects.mkdir()
mounts = resolve_mounts(tmp_path, str(projects))
project_mounts = [m for m in mounts if m.target == "/home/user/projects"]
assert len(project_mounts) == 1
def test_extra_mounts(self, tmp_path):
mounts = resolve_mounts(
tmp_path, str(tmp_path),
extra_mounts=[{"source": str(tmp_path), "target": "/data"}],
)
extra = [m for m in mounts if m.target == "/data"]
assert len(extra) == 1
class TestBuildContainerSpec:
def test_basic(self):
image = ImageRef(registry="reg", name="img", tag="v1")
spec = build_container_spec("personal", image, [])
assert spec.name == "flow-personal-img"
assert spec.env["DF_NAMESPACE"] == "personal"
assert spec.env["DF_PLATFORM"] == "container"
def test_with_mounts(self):
image = ImageRef(registry="reg", name="img", tag="v1")
mounts = [Mount(source=Path("/a"), target="/b")]
spec = build_container_spec("ns", image, mounts)
assert len(spec.mounts) == 1
class TestMount:
def test_to_flag(self):
m = Mount(source=Path("/src"), target="/dst")
assert m.to_flag() == "-v /src:/dst"
def test_to_flag_readonly(self):
m = Mount(source=Path("/src"), target="/dst", readonly=True)
assert ":ro" in m.to_flag()

View File

@@ -0,0 +1,47 @@
"""Tests for dotfiles domain models."""
from pathlib import Path
from flow.domain.dotfiles.models import (
LinkOp,
LinkPlan,
LinkTarget,
LinkedState,
ModuleRef,
Package,
PlanSummary,
)
def test_link_op_str_create():
op = LinkOp(type="create_link", target=Path("/home/x/.zshrc"),
source=Path("/dots/zsh/.zshrc"), package="_shared/zsh", needs_sudo=False)
assert "LINK:" in str(op)
assert ".zshrc" in str(op)
def test_link_op_str_sudo():
op = LinkOp(type="create_link", target=Path("/etc/hosts"),
source=Path("/dots/dns/hosts"), package="_shared/dns", needs_sudo=True)
assert "(sudo)" in str(op)
def test_linked_state_roundtrip():
lt = LinkTarget(source=Path("/a"), target=Path("/b"), package="p", from_module=False, needs_sudo=False)
state = LinkedState(links={Path("/b"): lt})
data = state.as_dict()
restored = LinkedState.from_dict(data)
assert Path("/b") in restored.links
assert restored.links[Path("/b")].source == Path("/a")
assert restored.links[Path("/b")].package == "p"
def test_linked_state_empty():
state = LinkedState.from_dict({})
assert state.links == {}
def test_package_has_id():
pkg = Package(name="zsh", layer="_shared", package_id="_shared/zsh",
source_dir=Path("/dots/_shared/zsh"), module=None, local_files=())
assert pkg.package_id == "_shared/zsh"

View File

@@ -0,0 +1,99 @@
"""Tests for dotfiles module resolution -- the core bug fix."""
from pathlib import Path
import pytest
from flow.core.errors import ConfigError
from flow.domain.dotfiles.modules import (
compute_mount_path,
module_cache_dir,
normalize_source,
parse_module_ref,
)
class TestComputeMountPath:
def test_nested_module(self):
"""_shared/nvim/.config/nvim/_module.yaml -> .config/nvim"""
result = compute_mount_path(
module_yaml=Path("/dots/_shared/nvim/.config/nvim/_module.yaml"),
package_dir=Path("/dots/_shared/nvim"),
)
assert result == Path(".config/nvim")
def test_root_level_module(self):
"""_shared/nvim/_module.yaml -> Path('.')"""
result = compute_mount_path(
module_yaml=Path("/dots/_shared/nvim/_module.yaml"),
package_dir=Path("/dots/_shared/nvim"),
)
assert result == Path(".")
def test_deeply_nested(self):
result = compute_mount_path(
module_yaml=Path("/dots/_shared/pkg/.config/a/b/c/_module.yaml"),
package_dir=Path("/dots/_shared/pkg"),
)
assert result == Path(".config/a/b/c")
class TestModuleCacheDir:
def test_simple_name(self):
result = module_cache_dir("_shared/nvim", Path("/home/x/.local/share/flow/modules"))
assert result == Path("/home/x/.local/share/flow/modules/_shared--nvim")
def test_profile_name(self):
result = module_cache_dir("linux-work/nvim", Path("/m"))
assert result == Path("/m/linux-work--nvim")
class TestNormalizeSource:
def test_github_shorthand(self):
assert normalize_source("github:org/repo") == "https://github.com/org/repo.git"
def test_full_url_passthrough(self):
assert normalize_source("https://example.com/repo.git") == "https://example.com/repo.git"
def test_ssh_passthrough(self):
assert normalize_source("git@github.com:org/repo.git") == "git@github.com:org/repo.git"
class TestParseModuleRef:
def test_branch_ref(self):
raw = {"source": "github:org/nvim-config", "ref": {"branch": "main"}}
ref = parse_module_ref(
raw, package_id="_shared/nvim",
mount_path=Path(".config/nvim"),
modules_base=Path("/modules"),
)
assert ref.source == "https://github.com/org/nvim-config.git"
assert ref.ref_type == "branch"
assert ref.ref_value == "main"
assert ref.mount_path == Path(".config/nvim")
assert ref.cache_dir == Path("/modules/_shared--nvim")
def test_tag_ref(self):
raw = {"source": "github:org/repo", "ref": {"tag": "v1.0"}}
ref = parse_module_ref(raw, "p/x", Path("."), Path("/m"))
assert ref.ref_type == "tag"
assert ref.ref_value == "v1.0"
def test_missing_source_raises(self):
with pytest.raises(ConfigError):
parse_module_ref({}, "p/x", Path("."), Path("/m"))
def test_missing_ref_raises(self):
raw = {"source": "github:org/repo"}
with pytest.raises(ConfigError):
parse_module_ref(raw, "p/x", Path("."), Path("/m"))
def test_ref_not_dict_raises(self):
raw = {"source": "github:org/repo", "ref": "main"}
with pytest.raises(ConfigError):
parse_module_ref(raw, "p/x", Path("."), Path("/m"))
def test_ambiguous_ref_raises(self):
raw = {"source": "github:org/repo", "ref": {"branch": "main", "tag": "v1"}}
with pytest.raises(ConfigError):
parse_module_ref(raw, "p/x", Path("."), Path("/m"))

View File

@@ -0,0 +1,97 @@
"""Tests for dotfiles link planning."""
from pathlib import Path
from typing import Optional
from flow.domain.dotfiles.models import (
LinkOp,
LinkTarget,
LinkedState,
)
from flow.domain.dotfiles.planning import plan_link, plan_unlink
def _lt(target, source="/a", pkg="_shared/zsh", module=False, sudo=False):
return LinkTarget(
source=Path(source), target=Path(target),
package=pkg, from_module=module, needs_sudo=sudo,
)
def _fs_check_none(path: Path) -> Optional[str]:
"""Fake filesystem_check: nothing exists."""
return None
def _fs_check_file(path: Path) -> Optional[str]:
"""Fake: everything is a file."""
return "file"
class TestPlanLink:
def test_new_target_creates_link(self):
desired = [_lt("/home/x/.zshrc")]
plan = plan_link(desired, LinkedState(), _fs_check_none)
assert len(plan.operations) == 1
assert plan.operations[0].type == "create_link"
assert plan.summary.added == 1
def test_existing_correct_link_unchanged(self):
lt = _lt("/home/x/.zshrc")
current = LinkedState(links={Path("/home/x/.zshrc"): lt})
plan = plan_link([lt], current, _fs_check_none)
assert len(plan.operations) == 0
assert plan.summary.unchanged == 1
def test_stale_link_removed(self):
old = _lt("/home/x/.old")
current = LinkedState(links={Path("/home/x/.old"): old})
plan = plan_link([], current, _fs_check_none)
assert len(plan.operations) == 1
assert plan.operations[0].type == "remove_link"
assert plan.summary.removed == 1
def test_changed_source_produces_remove_then_create(self):
old = _lt("/home/x/.zshrc", source="/old")
new = _lt("/home/x/.zshrc", source="/new")
current = LinkedState(links={Path("/home/x/.zshrc"): old})
plan = plan_link([new], current, _fs_check_none)
types = [op.type for op in plan.operations]
assert types == ["remove_link", "create_link"]
def test_unmanaged_file_at_target_is_conflict(self):
desired = [_lt("/home/x/.zshrc")]
plan = plan_link(desired, LinkedState(), _fs_check_file)
assert len(plan.conflicts) == 1
assert ".zshrc" in plan.conflicts[0]
def test_module_targets_counted(self):
desired = [_lt("/home/x/.config/nvim/init.lua", module=True)]
plan = plan_link(desired, LinkedState(), _fs_check_none)
assert plan.summary.from_modules == 1
class TestPlanUnlink:
def test_unlink_all(self):
lt = _lt("/home/x/.zshrc")
current = LinkedState(links={Path("/home/x/.zshrc"): lt})
plan = plan_unlink(current, packages=None)
assert len(plan.operations) == 1
assert plan.operations[0].type == "remove_link"
def test_unlink_specific_package(self):
zsh = _lt("/home/x/.zshrc", pkg="_shared/zsh")
git = _lt("/home/x/.gitconfig", pkg="_shared/git")
current = LinkedState(links={
Path("/home/x/.zshrc"): zsh,
Path("/home/x/.gitconfig"): git,
})
plan = plan_unlink(current, packages=["_shared/zsh"])
assert len(plan.operations) == 1
assert plan.operations[0].target == Path("/home/x/.zshrc")
def test_unlink_by_basename(self):
zsh = _lt("/home/x/.zshrc", pkg="_shared/zsh")
current = LinkedState(links={Path("/home/x/.zshrc"): zsh})
plan = plan_unlink(current, packages=["zsh"])
assert len(plan.operations) == 1

View File

@@ -0,0 +1,144 @@
"""Tests for dotfiles path resolution."""
from pathlib import Path
import pytest
from flow.core.errors import PlanConflict
from flow.domain.dotfiles.models import LinkTarget, ModuleRef, Package
from flow.domain.dotfiles.resolution import resolve_all_targets, resolve_package_targets
RESERVED_ROOT = "_root"
HOME = Path("/home/testuser")
def _pkg(name, layer="_shared", files=(), module=None):
return Package(
name=name,
layer=layer,
package_id=f"{layer}/{name}",
source_dir=Path(f"/dots/{layer}/{name}"),
module=module,
local_files=tuple(files),
)
class TestResolvePackageTargets:
def test_simple_file(self):
pkg = _pkg("zsh", files=[
(Path("/dots/_shared/zsh/.zshrc"), Path(".zshrc")),
])
targets = resolve_package_targets(pkg, HOME, set())
assert len(targets) == 1
assert targets[0].target == HOME / ".zshrc"
assert targets[0].source == Path("/dots/_shared/zsh/.zshrc")
assert targets[0].from_module is False
def test_nested_config(self):
pkg = _pkg("git", files=[
(Path("/dots/_shared/git/.config/git/config"), Path(".config/git/config")),
])
targets = resolve_package_targets(pkg, HOME, set())
assert targets[0].target == HOME / ".config" / "git" / "config"
def test_root_marker(self):
pkg = _pkg("dns", files=[
(Path("/dots/_shared/dns/_root/etc/hosts"), Path("_root/etc/hosts")),
])
targets = resolve_package_targets(pkg, HOME, set())
assert targets[0].target == Path("/etc/hosts")
assert targets[0].needs_sudo is True
def test_root_marker_skipped_when_in_skip_set(self):
pkg = _pkg("dns", files=[
(Path("/dots/_shared/dns/_root/etc/hosts"), Path("_root/etc/hosts")),
])
targets = resolve_package_targets(pkg, HOME, {"_root"})
assert len(targets) == 0
def test_skip_package_by_name(self):
pkg = _pkg("nvim", files=[
(Path("/dots/_shared/nvim/.config/nvim/init.lua"), Path(".config/nvim/init.lua")),
])
targets = resolve_package_targets(pkg, HOME, {"nvim"})
assert len(targets) == 0
def test_module_files_linked_under_mount_path(self):
module = ModuleRef(
source="https://github.com/org/nvim-config.git",
ref_type="branch",
ref_value="main",
mount_path=Path(".config/nvim"),
cache_dir=Path("/modules/_shared--nvim"),
module_files=(
(Path("/modules/_shared--nvim/init.lua"), Path("init.lua")),
(Path("/modules/_shared--nvim/lua/plugins.lua"), Path("lua/plugins.lua")),
),
)
pkg = _pkg("nvim", files=[
(Path("/dots/_shared/nvim/.local/bin/nvim-wrapper"), Path(".local/bin/nvim-wrapper")),
], module=module)
targets = resolve_package_targets(pkg, HOME, set())
# Local file outside mount_path
local_targets = [t for t in targets if not t.from_module]
assert len(local_targets) == 1
assert local_targets[0].target == HOME / ".local" / "bin" / "nvim-wrapper"
# Module files under mount_path
module_targets = [t for t in targets if t.from_module]
assert len(module_targets) == 2
module_target_paths = {t.target for t in module_targets}
assert HOME / ".config" / "nvim" / "init.lua" in module_target_paths
assert HOME / ".config" / "nvim" / "lua" / "plugins.lua" in module_target_paths
def test_module_yaml_file_not_linked(self):
"""The _module.yaml marker itself should never be linked."""
pkg = _pkg("nvim", files=[
(Path("/dots/_shared/nvim/.config/nvim/_module.yaml"), Path(".config/nvim/_module.yaml")),
], module=ModuleRef(
source="x", ref_type="branch", ref_value="main",
mount_path=Path(".config/nvim"),
cache_dir=Path("/m"), module_files=(),
))
targets = resolve_package_targets(pkg, HOME, set())
assert not any(t.target.name == "_module.yaml" for t in targets)
def test_root_level_module_skips_all_local_files(self):
"""When mount_path is '.', all local files are from the module, not dotfiles repo."""
module = ModuleRef(
source="x", ref_type="branch", ref_value="main",
mount_path=Path("."),
cache_dir=Path("/m"),
module_files=(
(Path("/m/init.lua"), Path("init.lua")),
),
)
pkg = _pkg("nvim", files=[
(Path("/dots/_shared/nvim/_module.yaml"), Path("_module.yaml")),
(Path("/dots/_shared/nvim/stale-file.txt"), Path("stale-file.txt")),
], module=module)
targets = resolve_package_targets(pkg, HOME, set())
# Only module files should appear, local files skipped
assert len(targets) == 1
assert targets[0].from_module is True
assert targets[0].target == HOME / "init.lua"
class TestResolveAllTargets:
def test_no_conflicts(self):
pkgs = [
_pkg("zsh", files=[(Path("/a/.zshrc"), Path(".zshrc"))]),
_pkg("git", files=[(Path("/a/.gitconfig"), Path(".gitconfig"))]),
]
targets = resolve_all_targets(pkgs, HOME, set())
assert len(targets) == 2
def test_duplicate_target_raises(self):
pkgs = [
_pkg("zsh", layer="_shared", files=[(Path("/a/.zshrc"), Path(".zshrc"))]),
_pkg("zsh", layer="work", files=[(Path("/b/.zshrc"), Path(".zshrc"))]),
]
with pytest.raises(PlanConflict):
resolve_all_targets(pkgs, HOME, set())

View File

@@ -0,0 +1,187 @@
"""Tests for packages catalog and resolution."""
import pytest
from flow.core.errors import ConfigError, FlowError
from flow.domain.packages.catalog import normalize_profile_entry, parse_catalog
from flow.domain.packages.resolution import (
detect_package_manager,
pm_install_command,
pm_update_command,
resolve_binary_asset,
resolve_download_url,
resolve_source_name,
resolve_spec,
)
from flow.domain.packages.models import PackageDef, ProfilePackageRef
class TestParseCatalog:
def test_list_format(self):
manifest = {"packages": [
{"name": "fd", "type": "pkg", "apt": "fd-find"},
{"name": "ripgrep", "type": "pkg"},
]}
catalog = parse_catalog(manifest)
assert "fd" in catalog
assert catalog["fd"].sources.get("apt") == "fd-find"
assert "ripgrep" in catalog
def test_dict_format(self):
manifest = {"packages": {
"fd": {"type": "pkg", "apt": "fd-find"},
}}
catalog = parse_catalog(manifest)
assert "fd" in catalog
assert catalog["fd"].type == "pkg"
def test_empty_manifest(self):
assert parse_catalog({}) == {}
class TestNormalizeProfileEntry:
def test_string_shorthand_with_type(self):
ref = normalize_profile_entry("binary/neovim")
assert ref.name == "neovim"
assert ref.type == "binary"
def test_plain_name(self):
ref = normalize_profile_entry("fd")
assert ref.name == "fd"
assert ref.type is None
def test_dict_entry(self):
ref = normalize_profile_entry({"name": "nvim", "type": "binary", "version": "0.10"})
assert ref.name == "nvim"
assert ref.type == "binary"
assert ref.version == "0.10"
class TestResolveSpec:
def test_merge_with_catalog(self):
catalog = {"fd": PackageDef(
name="fd", type="pkg", sources={"apt": "fd-find"},
source=None, version=None, asset_pattern=None,
platform_map={}, extract_dir=None, install={},
post_install=None, allow_sudo=False,
)}
ref = ProfilePackageRef(name="fd", type=None, source=None, version="1.0", asset_pattern=None)
result = resolve_spec(ref, catalog)
assert result.type == "pkg" # from catalog
assert result.version == "1.0" # from profile
assert result.sources == {"apt": "fd-find"} # from catalog
def test_not_in_catalog(self):
ref = ProfilePackageRef(name="unknown", type="binary", source="github:u/r", version=None, asset_pattern=None)
result = resolve_spec(ref, {})
assert result.name == "unknown"
assert result.type == "binary"
class TestResolveSourceName:
def test_with_pm_mapping(self):
pkg = PackageDef(
name="fd", type="pkg", sources={"apt": "fd-find"},
source=None, version=None, asset_pattern=None,
platform_map={}, extract_dir=None, install={},
post_install=None, allow_sudo=False,
)
assert resolve_source_name(pkg, "apt") == "fd-find"
def test_fallback_to_name(self):
pkg = PackageDef(
name="fd", type="pkg", sources={},
source=None, version=None, asset_pattern=None,
platform_map={}, extract_dir=None, install={},
post_install=None, allow_sudo=False,
)
assert resolve_source_name(pkg, "apt") == "fd"
class TestResolveBinaryAsset:
def test_platform_map(self):
pkg = PackageDef(
name="nvim", type="binary", sources={},
source="github:neovim/neovim",
version="v0.10.4",
asset_pattern=None,
platform_map={"linux-x64": "nvim-linux-x86_64.tar.gz"},
extract_dir=None, install={},
post_install=None, allow_sudo=False,
)
assert resolve_binary_asset(pkg, "linux-x64") == "nvim-linux-x86_64.tar.gz"
def test_asset_pattern(self):
pkg = PackageDef(
name="fd", type="binary", sources={},
source="github:sharkdp/fd",
version="v10.2.0",
asset_pattern="fd-v10.2.0-{arch}-unknown-{os}-gnu.tar.gz",
platform_map={},
extract_dir=None, install={},
post_install=None, allow_sudo=False,
)
result = resolve_binary_asset(pkg, "linux-x64")
assert "x64" in result
assert "linux" in result
class TestResolveDownloadUrl:
def test_github_shorthand_with_version(self):
pkg = PackageDef(
name="nvim", type="binary", sources={},
source="github:neovim/neovim",
version="v0.10.4",
asset_pattern=None, platform_map={},
extract_dir=None, install={},
post_install=None, allow_sudo=False,
)
url = resolve_download_url(pkg, "nvim.tar.gz")
assert "github.com/neovim/neovim" in url
assert "v0.10.4" in url
def test_github_latest(self):
pkg = PackageDef(
name="nvim", type="binary", sources={},
source="github:neovim/neovim",
version=None,
asset_pattern=None, platform_map={},
extract_dir=None, install={},
post_install=None, allow_sudo=False,
)
url = resolve_download_url(pkg, "nvim.tar.gz")
assert "latest" in url
def test_direct_url(self):
pkg = PackageDef(
name="x", type="binary", sources={},
source="https://example.com/download/",
version=None,
asset_pattern=None, platform_map={},
extract_dir=None, install={},
post_install=None, allow_sudo=False,
)
url = resolve_download_url(pkg, "x.tar.gz")
assert url == "https://example.com/download/x.tar.gz"
class TestPmCommands:
def test_apt_update(self):
assert "apt-get update" in pm_update_command("apt")
def test_dnf_update(self):
assert "dnf" in pm_update_command("dnf")
def test_brew_install(self):
cmd = pm_install_command("brew", ["fd", "rg"])
assert "brew install" in cmd
assert "fd" in cmd
def test_apt_install(self):
cmd = pm_install_command("apt", ["fd-find"])
assert "apt-get install" in cmd
def test_detect_package_manager_returns_something(self):
# Just verify it doesn't error
result = detect_package_manager()
assert result is None or result in ("apt", "dnf", "brew")

View File

@@ -0,0 +1,43 @@
"""Tests for packages domain models."""
from pathlib import Path
import pytest
from flow.core.errors import ConfigError
from flow.domain.packages.models import InstalledPackage, InstalledState, PackageDef
def test_installed_state_roundtrip():
state = InstalledState(packages={
"neovim": InstalledPackage(
name="neovim", version="0.10.4", type="binary",
files=[Path("/home/x/.local/bin/nvim")],
),
})
data = state.as_dict()
restored = InstalledState.from_dict(data)
assert "neovim" in restored.packages
assert restored.packages["neovim"].version == "0.10.4"
assert restored.packages["neovim"].files == [Path("/home/x/.local/bin/nvim")]
def test_installed_state_empty():
state = InstalledState.from_dict({})
assert state.packages == {}
def test_installed_state_version_mismatch():
with pytest.raises(ConfigError):
InstalledState.from_dict({"version": 99, "packages": {}})
def test_package_def_fields():
pkg = PackageDef(
name="fd", type="pkg", sources={"apt": "fd-find"},
source=None, version=None, asset_pattern=None,
platform_map={}, extract_dir=None, install={},
post_install=None, allow_sudo=False,
)
assert pkg.name == "fd"
assert pkg.type == "pkg"

View File

@@ -0,0 +1,64 @@
"""Tests for package install/remove planning."""
from pathlib import Path
from flow.domain.packages.models import (
InstalledPackage,
InstalledState,
PackageDef,
)
from flow.domain.packages.planning import plan_install, plan_remove
def _pkg(name, type="pkg", sources=None, source=None, version=None,
asset_pattern=None, platform_map=None):
return PackageDef(
name=name, type=type, sources=sources or {},
source=source, version=version, asset_pattern=asset_pattern,
platform_map=platform_map or {}, extract_dir=None, install={},
post_install=None, allow_sudo=False,
)
class TestPlanInstall:
def test_new_pm_package(self):
pkgs = [_pkg("fd", sources={"apt": "fd-find"})]
plan = plan_install(pkgs, InstalledState(), "linux-x64", pm="apt")
assert len(plan.install_ops) == 1
assert plan.install_ops[0].method == "pm"
assert plan.install_ops[0].source_name == "fd-find"
assert plan.pm_update_needed is True
def test_skip_already_installed(self):
pkgs = [_pkg("fd")]
installed = InstalledState(packages={
"fd": InstalledPackage(name="fd", version="1.0", type="pkg"),
})
plan = plan_install(pkgs, installed, "linux-x64", pm="apt")
assert len(plan.install_ops) == 0
def test_binary_package(self):
pkgs = [_pkg("nvim", type="binary", source="github:neovim/neovim",
version="v0.10.4",
platform_map={"linux-x64": "nvim-linux-x86_64.tar.gz"})]
plan = plan_install(pkgs, InstalledState(), "linux-x64", pm="apt")
assert len(plan.install_ops) == 1
assert plan.install_ops[0].method == "binary"
assert plan.install_ops[0].download_url is not None
class TestPlanRemove:
def test_remove_installed(self):
installed = InstalledState(packages={
"fd": InstalledPackage(
name="fd", version="1.0", type="pkg",
files=[Path("/usr/bin/fd")],
),
})
plan = plan_remove(["fd"], installed)
assert len(plan.remove_ops) == 1
assert plan.remove_ops[0].name == "fd"
def test_remove_not_installed(self):
plan = plan_remove(["missing"], InstalledState())
assert len(plan.remove_ops) == 0

View File

@@ -0,0 +1,79 @@
"""Tests for remote domain."""
import pytest
from flow.core.config import TargetConfig
from flow.core.errors import FlowError
from flow.domain.remote.models import SSHCommand, Target
from flow.domain.remote.resolution import (
build_ssh_command,
list_targets,
parse_target,
resolve_target,
terminfo_fix_command,
)
class TestParseTarget:
def test_valid_spec(self):
ns, plat = parse_target("personal@orb")
assert ns == "personal"
assert plat == "orb"
def test_missing_at_raises(self):
with pytest.raises(FlowError):
parse_target("invalid")
def test_empty_parts_raises(self):
with pytest.raises(FlowError):
parse_target("@orb")
class TestResolveTarget:
def test_found(self):
targets = [TargetConfig(namespace="personal", platform="orb", host="personal.orb")]
result = resolve_target("personal@orb", targets)
assert result.host == "personal.orb"
assert result.label == "personal@orb"
def test_not_found(self):
with pytest.raises(FlowError, match="Unknown target"):
resolve_target("missing@host", [])
class TestBuildSSHCommand:
def test_basic(self):
target = Target(namespace="personal", platform="orb", host="personal.orb")
cmd = build_ssh_command(target)
assert "ssh" in cmd.argv
assert "personal.orb" in cmd.argv
assert cmd.env["DF_NAMESPACE"] == "personal"
def test_with_identity(self):
target = Target(namespace="work", platform="ec2", host="work.ec2", identity="~/.ssh/id_work")
cmd = build_ssh_command(target)
assert "-i" in cmd.argv
assert "~/.ssh/id_work" in cmd.argv
def test_with_remote_command(self):
target = Target(namespace="p", platform="o", host="h")
cmd = build_ssh_command(target, remote_command="ls -la")
assert cmd.argv[-1] == "ls -la"
class TestListTargets:
def test_converts_configs(self):
configs = [
TargetConfig(namespace="a", platform="b", host="a.b"),
TargetConfig(namespace="c", platform="d", host="c.d"),
]
targets = list_targets(configs)
assert len(targets) == 2
assert targets[0].label == "a@b"
class TestTerminfoFix:
def test_returns_commands(self):
cmds = terminfo_fix_command()
assert len(cmds) == 2
assert "infocmp" in cmds[0]