Rewrite CLI around action runtime

This commit is contained in:
2026-05-14 13:14:38 +03:00
parent 0dc90f9005
commit 3503d81b06
28 changed files with 2778 additions and 574 deletions

View File

@@ -7,7 +7,7 @@ name = "flow"
dynamic = ["version"]
description = "DevFlow - A unified toolkit for managing development instances, containers, and profiles"
requires-python = ">=3.9"
dependencies = ["pyyaml>=6.0"]
dependencies = ["pyyaml>=6.0", "rich>=13.7", "typer>=0.12"]
[project.optional-dependencies]
build = ["pyinstaller>=6.0"]

View File

@@ -0,0 +1,22 @@
"""Canonical action plan runtime."""
from flow.actions.executor import ActionExecutor
from flow.actions.models import (
ActionExecutionSummary,
ActionPlan,
ActionResult,
DomainAction,
PrimitiveAction,
RollbackPolicy,
)
__all__ = [
"ActionExecutionSummary",
"ActionExecutor",
"ActionPlan",
"ActionResult",
"DomainAction",
"PrimitiveAction",
"RollbackPolicy",
]

31
src/flow/actions/audit.py Normal file
View File

@@ -0,0 +1,31 @@
"""Append-only JSONL audit log for action execution."""
from __future__ import annotations
import json
import errno
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Mapping
from flow.actions.serialization import to_jsonable
class AuditLog:
def __init__(self, path: Path):
self.path = path
def write(self, event: str, fields: Mapping[str, Any]) -> None:
try:
self.path.parent.mkdir(parents=True, exist_ok=True)
record = {
"ts": datetime.now(timezone.utc).isoformat(),
"event": event,
**to_jsonable(dict(fields)),
}
with open(self.path, "a", encoding="utf-8") as f:
f.write(json.dumps(record, sort_keys=True) + "\n")
except OSError as e:
if e.errno in {errno.EACCES, errno.EROFS}:
return
raise

View File

@@ -0,0 +1,415 @@
"""Canonical action executor with dry-run, audit logging, and rollback."""
from __future__ import annotations
import base64
from pathlib import Path
from flow.actions.audit import AuditLog
from flow.actions.expansion import expand_actions
from flow.actions.models import (
ActionExecutionSummary,
ActionPlan,
ActionResult,
PrimitiveAction,
RollbackPolicy,
)
from flow.core import paths
from flow.core.config import FlowContext
from flow.core.errors import FlowError
class ActionExecutor:
def __init__(self, ctx: FlowContext, *, audit_path: Path | None = None):
self.ctx = ctx
self.audit = AuditLog(audit_path or paths.STATE_DIR / "actions.jsonl")
def primitives_for(self, plan: ActionPlan) -> tuple[PrimitiveAction, ...]:
return plan.primitive_actions + expand_actions(plan.domain_actions)
def execute(self, plan: ActionPlan, *, dry_run: bool = False) -> ActionExecutionSummary:
primitives = self.primitives_for(plan)
if not primitives:
self.ctx.console.info(f"Nothing to execute for {plan.name}.")
return ActionExecutionSummary(plan.name, dry_run, ())
if dry_run:
self.render_plan(plan, primitives)
return ActionExecutionSummary(
plan.name,
True,
tuple(
ActionResult(action.id, action.type, "dry-run", action.description)
for action in primitives
),
)
self.audit.write("plan_start", {"plan": plan.name, "actions": primitives})
results: list[ActionResult] = []
rollback_stack: list[PrimitiveAction] = []
for action in primitives:
self.audit.write("action_start", {"plan": plan.name, "action": action})
try:
rollback = self._rollback_for(action)
self._execute_primitive(action)
results.append(ActionResult(action.id, action.type, "success"))
self.audit.write(
"action_success",
{"plan": plan.name, "action": action},
)
if action.rollback_policy == RollbackPolicy.BARRIER:
rollback_stack.clear()
elif rollback is not None:
rollback_stack.append(rollback)
except Exception as e:
results.append(ActionResult(action.id, action.type, "failed", str(e)))
self.audit.write(
"action_failed",
{"plan": plan.name, "action": action, "error": str(e)},
)
rollback_results = self._rollback(plan.name, rollback_stack)
self.audit.write(
"plan_failed",
{
"plan": plan.name,
"failed_action": action,
"results": results,
"rollback_results": rollback_results,
},
)
raise FlowError(
f"Action failed: {action.description}. {e}"
) from e
summary = ActionExecutionSummary(plan.name, False, tuple(results))
self.audit.write("plan_success", {"plan": plan.name, "results": results})
return summary
def render_plan(
self,
plan: ActionPlan,
primitives: tuple[PrimitiveAction, ...] | None = None,
) -> None:
actions = primitives or self.primitives_for(plan)
self.ctx.console.print_plan(list(actions), verb=plan.name)
def _rollback(
self,
plan_name: str,
rollback_stack: list[PrimitiveAction],
) -> tuple[ActionResult, ...]:
results: list[ActionResult] = []
for action in reversed(rollback_stack):
self.audit.write("rollback_start", {"plan": plan_name, "action": action})
try:
self._execute_primitive(action)
result = ActionResult(action.id, action.type, "success")
results.append(result)
self.audit.write("rollback_success", {"plan": plan_name, "action": action})
except Exception as e:
result = ActionResult(action.id, action.type, "failed", str(e))
results.append(result)
self.audit.write(
"rollback_failed",
{"plan": plan_name, "action": action, "error": str(e)},
)
return tuple(results)
def _execute_primitive(self, action: PrimitiveAction) -> None:
t = action.type
p = action.payload
if t == "file.create_symlink":
self.ctx.runtime.fs.create_symlink(
Path(p["source"]),
Path(p["target"]),
sudo=bool(p.get("sudo", False)),
runner=self.ctx.runtime.runner if p.get("sudo") else None,
)
return
if t == "file.remove_symlink":
expected = p.get("expected_source")
self.ctx.runtime.fs.remove_symlink(
Path(p["target"]),
expected_source=Path(expected) if expected is not None else None,
sudo=bool(p.get("sudo", False)),
runner=self.ctx.runtime.runner if p.get("sudo") else None,
)
return
if t == "file.write":
self.ctx.runtime.fs.write_text(Path(p["path"]), str(p["content"]))
return
if t == "file.write_json":
self.ctx.runtime.fs.write_json(Path(p["path"]), p["data"])
return
if t == "file.write_bytes_b64":
content = base64.b64decode(str(p["content_b64"]).encode("ascii"))
self.ctx.runtime.fs.write_bytes(Path(p["path"]), content)
return
if t == "file.copy":
source = Path(p["source"])
target = Path(p["target"])
source_root = p.get("source_root")
if source_root is not None:
resolved_source = source.resolve(strict=False)
resolved_root = Path(source_root).resolve(strict=False)
if not resolved_source.is_relative_to(resolved_root):
raise FlowError(f"Copy source escapes allowed root: {source}")
if not source.exists():
raise FlowError(f"Copy source does not exist: {source}")
if source.is_dir():
self.ctx.runtime.fs.copy_tree(source, target)
else:
self.ctx.runtime.fs.copy_file(source, target)
if p.get("make_executable"):
target.chmod(target.stat().st_mode | 0o111)
return
if t == "file.copy_tree":
self.ctx.runtime.fs.copy_tree(Path(p["source"]), Path(p["target"]))
return
if t == "file.remove":
target = Path(p["path"])
if target.is_dir() and not target.is_symlink():
self.ctx.runtime.fs.remove_tree(target, missing_ok=bool(p.get("missing_ok", True)))
else:
self.ctx.runtime.fs.remove_file(target, missing_ok=bool(p.get("missing_ok", True)))
return
if t == "file.chmod":
Path(p["path"]).chmod(int(p["mode"]))
return
if t == "process.argv":
completed = self.ctx.runtime.runner.run(
tuple(str(arg) for arg in p["argv"]),
cwd=Path(p["cwd"]) if p.get("cwd") else None,
env=p.get("env"),
capture_output=bool(p.get("capture_output", True)),
check=False,
)
allowed = tuple(int(code) for code in p.get("allowed_returncodes", (0,)))
if completed.returncode not in allowed:
msg = completed.stderr.strip() or completed.stdout.strip()
if not msg:
msg = f"Command failed with exit code {completed.returncode}"
raise FlowError(msg)
return
if t == "process.shell_user_hook":
self.ctx.runtime.runner.run_shell(
str(p["command"]),
cwd=Path(p["cwd"]) if p.get("cwd") else None,
env=p.get("env"),
capture_output=bool(p.get("capture_output", True)),
check=True,
)
return
if t == "git.clone":
argv = ["git", "clone"]
if p.get("branch"):
argv.extend(["-b", str(p["branch"])])
argv.extend([str(p["source"]), str(p["target"])])
self.ctx.runtime.runner.run(argv, check=True)
return
if t in {"git.pull", "git.push", "git.fetch", "git.checkout", "git.status"}:
repo = Path(p["repo"])
args = tuple(str(arg) for arg in p.get("args", ()))
if t == "git.pull":
args = args or ("pull", "--ff-only")
elif t == "git.push":
args = args or ("push",)
elif t == "git.fetch":
args = args or ("fetch", "--all")
elif t == "git.status":
args = args or ("status", "--short", "--branch")
self.ctx.runtime.git.run(repo, *args, check=True)
return
if t == "download.file":
self.ctx.runtime.download.download_file(
str(p["url"]),
Path(p["target"]),
timeout=float(p.get("timeout", 60)),
)
return
if t == "archive.extract":
self.ctx.runtime.archive.extract(Path(p["archive"]), Path(p["target"]))
return
if t == "container.run":
self.ctx.runtime.containers.run_container(**p)
return
if t == "container.start":
self.ctx.runtime.containers.start(str(p["name"]))
return
if t == "container.stop":
self.ctx.runtime.containers.stop(str(p["name"]))
return
if t == "container.kill":
self.ctx.runtime.containers.kill(str(p["name"]))
return
if t == "container.remove":
self.ctx.runtime.containers.rm(str(p["name"]), force=bool(p.get("force", False)))
return
if t == "container.exec":
self.ctx.runtime.containers.exec_in(
str(p["name"]),
tuple(str(arg) for arg in p["argv"]),
interactive=bool(p.get("interactive", False)),
detach_keys=p.get("detach_keys"),
)
return
if t == "tmux.new_session":
self.ctx.runtime.tmux.new_session(
str(p["name"]),
detached=bool(p.get("detached", True)),
env=p.get("env"),
command=p.get("command"),
)
return
if t == "tmux.set_option":
self.ctx.runtime.tmux.set_option(str(p["session"]), str(p["option"]), str(p["value"]))
return
if t == "tmux.respawn_pane":
self.ctx.runtime.tmux.respawn_pane(str(p["pane"]))
return
raise FlowError(f"Unhandled primitive action type: {t!r}")
def _rollback_for(self, action: PrimitiveAction) -> PrimitiveAction | None:
if action.rollback_policy != RollbackPolicy.ROLLBACKABLE:
return None
p = action.payload
if action.type == "file.create_symlink":
target = Path(p["target"])
source = Path(p["source"])
if target.is_symlink() and Path(target.readlink()) == source:
return None
if target.exists():
return None
return PrimitiveAction(
id=f"{action.id}.rollback",
type="file.remove_symlink",
description=f"Rollback {action.description}",
payload={
"target": target,
"expected_source": source,
"sudo": bool(p.get("sudo", False)),
},
rollback_policy=RollbackPolicy.NONE,
)
if action.type == "file.remove_symlink":
target = Path(p["target"])
if not target.is_symlink():
return None
source = target.readlink()
return PrimitiveAction(
id=f"{action.id}.rollback",
type="file.create_symlink",
description=f"Rollback {action.description}",
payload={
"source": source,
"target": target,
"sudo": bool(p.get("sudo", False)),
},
rollback_policy=RollbackPolicy.NONE,
)
if action.type in {"file.write", "file.write_json"}:
path = Path(p["path"])
if path.exists() and path.is_file():
content = path.read_text(encoding="utf-8")
return PrimitiveAction(
id=f"{action.id}.rollback",
type="file.write",
description=f"Rollback {action.description}",
payload={"path": path, "content": content},
rollback_policy=RollbackPolicy.NONE,
)
return PrimitiveAction(
id=f"{action.id}.rollback",
type="file.remove",
description=f"Rollback {action.description}",
payload={"path": path, "missing_ok": True},
rollback_policy=RollbackPolicy.NONE,
)
if action.type == "file.copy":
target = Path(p["target"])
if target.exists() and target.is_file():
encoded = base64.b64encode(target.read_bytes()).decode("ascii")
return PrimitiveAction(
id=f"{action.id}.rollback",
type="file.write_bytes_b64",
description=f"Rollback {action.description}",
payload={"path": target, "content_b64": encoded},
rollback_policy=RollbackPolicy.NONE,
)
return PrimitiveAction(
id=f"{action.id}.rollback",
type="file.remove",
description=f"Rollback {action.description}",
payload={"path": target, "missing_ok": True},
rollback_policy=RollbackPolicy.NONE,
)
if action.type == "file.copy_tree":
target = Path(p["target"])
if target.exists():
return None
return PrimitiveAction(
id=f"{action.id}.rollback",
type="file.remove",
description=f"Rollback {action.description}",
payload={"path": target, "missing_ok": True},
rollback_policy=RollbackPolicy.NONE,
)
if action.type == "file.remove":
target = Path(p["path"])
if not target.exists() or target.is_dir():
return None
encoded = base64.b64encode(target.read_bytes()).decode("ascii")
return PrimitiveAction(
id=f"{action.id}.rollback",
type="file.write_bytes_b64",
description=f"Rollback {action.description}",
payload={"path": target, "content_b64": encoded},
rollback_policy=RollbackPolicy.NONE,
)
if action.type == "file.chmod":
path = Path(p["path"])
if not path.exists():
return None
return PrimitiveAction(
id=f"{action.id}.rollback",
type="file.chmod",
description=f"Rollback {action.description}",
payload={"path": path, "mode": path.stat().st_mode},
rollback_policy=RollbackPolicy.NONE,
)
if action.type == "file.write_bytes_b64":
path = Path(p["path"])
if path.exists() and path.is_file():
encoded = base64.b64encode(path.read_bytes()).decode("ascii")
return PrimitiveAction(
id=f"{action.id}.rollback",
type="file.write_bytes_b64",
description=f"Rollback {action.description}",
payload={"path": path, "content_b64": encoded},
rollback_policy=RollbackPolicy.NONE,
)
return PrimitiveAction(
id=f"{action.id}.rollback",
type="file.remove",
description=f"Rollback {action.description}",
payload={"path": path, "missing_ok": True},
rollback_policy=RollbackPolicy.NONE,
)
return None

View File

@@ -0,0 +1,171 @@
"""Domain action expansion into executor primitive actions."""
from __future__ import annotations
from pathlib import Path
from flow.actions.models import DomainAction, PrimitiveAction, RollbackPolicy
from flow.core.errors import FlowError
def expand_action(action: DomainAction) -> tuple[PrimitiveAction, ...]:
payload = action.payload
if "primitive_actions" in payload:
return tuple(payload["primitive_actions"])
if action.kind == "dotfiles":
return _expand_dotfiles(action)
if action.kind == "package":
return _expand_package(action)
if action.kind == "repo":
return _expand_repo(action)
if action.kind == "remote":
return _expand_remote(action)
if action.kind == "container":
return _expand_container(action)
if action.kind == "completion":
return _expand_completion(action)
if action.kind == "setup":
return _expand_setup(action)
raise FlowError(f"Unhandled domain action kind: {action.kind!r}")
def expand_actions(actions: tuple[DomainAction, ...]) -> tuple[PrimitiveAction, ...]:
primitives: list[PrimitiveAction] = []
for action in actions:
primitives.extend(expand_action(action))
return tuple(primitives)
def _expand_dotfiles(action: DomainAction) -> tuple[PrimitiveAction, ...]:
p = action.payload
if action.action == "link":
return (
PrimitiveAction(
id=f"{action.id}.symlink",
type="file.create_symlink",
description=action.description,
payload={
"source": Path(p["source"]),
"target": Path(p["target"]),
"sudo": bool(p.get("sudo", False)),
},
rollback_policy=action.rollback_policy,
),
)
if action.action == "unlink":
return (
PrimitiveAction(
id=f"{action.id}.remove",
type="file.remove_symlink",
description=action.description,
payload={
"target": Path(p["target"]),
"expected_source": (
Path(p["expected_source"])
if p.get("expected_source") is not None else None
),
"sudo": bool(p.get("sudo", False)),
},
rollback_policy=action.rollback_policy,
),
)
if action.action == "write-state":
return (
PrimitiveAction(
id=f"{action.id}.write-json",
type="file.write_json",
description=action.description,
payload={"path": Path(p["path"]), "data": p["data"]},
rollback_policy=action.rollback_policy,
),
)
raise FlowError(f"Unhandled dotfiles action: {action.action!r}")
def _expand_package(action: DomainAction) -> tuple[PrimitiveAction, ...]:
if action.action in {"install", "remove", "update"}:
primitives = action.payload.get("primitive_actions")
if primitives is None:
raise FlowError(f"Package action {action.id!r} has no primitive plan")
return tuple(primitives)
raise FlowError(f"Unhandled package action: {action.action!r}")
def _expand_repo(action: DomainAction) -> tuple[PrimitiveAction, ...]:
p = action.payload
if action.action == "clone":
return (
PrimitiveAction(
id=f"{action.id}.clone",
type="git.clone",
description=action.description,
payload={
"source": p["source"],
"target": Path(p["target"]),
"branch": p.get("branch"),
},
rollback_policy=RollbackPolicy.NONE,
),
)
if action.action in {"pull", "push", "fetch", "checkout", "status"}:
return (
PrimitiveAction(
id=f"{action.id}.{action.action}",
type=f"git.{action.action}",
description=action.description,
payload={"repo": Path(p["repo"]), "args": tuple(p.get("args", ()))},
rollback_policy=RollbackPolicy.NONE,
),
)
raise FlowError(f"Unhandled repo action: {action.action!r}")
def _expand_remote(action: DomainAction) -> tuple[PrimitiveAction, ...]:
if action.action == "enter":
return (
PrimitiveAction(
id=f"{action.id}.argv",
type="process.argv",
description=action.description,
payload={
"argv": tuple(action.payload["argv"]),
"capture_output": False,
},
rollback_policy=RollbackPolicy.BARRIER,
),
)
raise FlowError(f"Unhandled remote action: {action.action!r}")
def _expand_container(action: DomainAction) -> tuple[PrimitiveAction, ...]:
primitives = action.payload.get("primitive_actions")
if primitives is not None:
return tuple(primitives)
raise FlowError(f"Unhandled container action: {action.action!r}")
def _expand_completion(action: DomainAction) -> tuple[PrimitiveAction, ...]:
primitives = action.payload.get("primitive_actions")
if primitives is not None:
return tuple(primitives)
raise FlowError(f"Unhandled completion action: {action.action!r}")
def _expand_setup(action: DomainAction) -> tuple[PrimitiveAction, ...]:
if action.action == "shell-hook":
return (
PrimitiveAction(
id=f"{action.id}.shell",
type="process.shell_user_hook",
description=action.description,
payload={"command": action.payload["command"]},
rollback_policy=RollbackPolicy.BARRIER,
),
)
primitives = action.payload.get("primitive_actions")
if primitives is not None:
return tuple(primitives)
raise FlowError(f"Unhandled setup action: {action.action!r}")

View File

@@ -0,0 +1,72 @@
"""Canonical domain and primitive action models."""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Mapping
class RollbackPolicy(str, Enum):
ROLLBACKABLE = "rollbackable"
BARRIER = "barrier"
NONE = "none"
@dataclass(frozen=True)
class DomainAction:
id: str
kind: str
action: str
description: str
payload: Mapping[str, Any] = field(default_factory=dict)
rollback_policy: RollbackPolicy = RollbackPolicy.ROLLBACKABLE
@dataclass(frozen=True)
class PrimitiveAction:
id: str
type: str
description: str
payload: Mapping[str, Any] = field(default_factory=dict)
rollback_policy: RollbackPolicy = RollbackPolicy.ROLLBACKABLE
def __str__(self) -> str:
if self.type == "process.argv" and "argv" in self.payload:
argv = " ".join(str(part) for part in self.payload["argv"])
return f"{self.type}: {self.description} ({argv})"
return f"{self.type}: {self.description}"
@dataclass(frozen=True)
class ActionPlan:
name: str
domain_actions: tuple[DomainAction, ...] = ()
primitive_actions: tuple[PrimitiveAction, ...] = ()
def is_empty(self) -> bool:
return not self.domain_actions and not self.primitive_actions
@dataclass(frozen=True)
class ActionResult:
action_id: str
action_type: str
status: str
message: str = ""
@dataclass(frozen=True)
class ActionExecutionSummary:
plan_name: str
dry_run: bool
results: tuple[ActionResult, ...]
rollback_results: tuple[ActionResult, ...] = ()
@property
def succeeded(self) -> int:
return sum(1 for result in self.results if result.status == "success")
@property
def failed(self) -> int:
return sum(1 for result in self.results if result.status == "failed")

View File

@@ -0,0 +1,25 @@
"""Serialization helpers for action audit logs."""
from __future__ import annotations
import dataclasses
from enum import Enum
from pathlib import Path
from typing import Any
def to_jsonable(value: Any) -> Any:
if dataclasses.is_dataclass(value):
return to_jsonable(dataclasses.asdict(value))
if isinstance(value, Path):
return str(value)
if isinstance(value, Enum):
return value.value
if isinstance(value, bytes):
return value.decode("utf-8", errors="replace")
if isinstance(value, dict):
return {str(k): to_jsonable(v) for k, v in value.items()}
if isinstance(value, (list, tuple, set)):
return [to_jsonable(v) for v in value]
return value

View File

@@ -0,0 +1,2 @@
"""External system adapters used by action executors."""

View File

@@ -0,0 +1,22 @@
"""Archive extraction adapter."""
from __future__ import annotations
import shutil
from pathlib import Path
from flow.adapters.filesystem import FileSystem
from flow.core.errors import FlowError
class ArchiveClient:
def __init__(self, fs: FileSystem):
self.fs = fs
def extract(self, archive: Path, target: Path) -> None:
self.fs.ensure_dir(target)
try:
shutil.unpack_archive(str(archive), str(target))
except (shutil.ReadError, ValueError) as e:
raise FlowError(f"Could not extract archive {archive}: {e}") from e

View File

@@ -0,0 +1,23 @@
"""HTTP/download adapter."""
from __future__ import annotations
import urllib.error
import urllib.request
from pathlib import Path
from flow.adapters.filesystem import FileSystem
from flow.core.errors import FlowError
class DownloadClient:
def __init__(self, fs: FileSystem):
self.fs = fs
def download_file(self, url: str, target: Path, *, timeout: float = 60) -> None:
try:
with urllib.request.urlopen(url, timeout=timeout) as response:
self.fs.write_bytes(target, response.read())
except urllib.error.URLError as e:
raise FlowError(f"Failed to download {url}: {e}") from e

View File

@@ -0,0 +1,158 @@
"""Filesystem adapter for mutating and stateful filesystem operations."""
from __future__ import annotations
import json
import os
import shutil
from pathlib import Path
from typing import Any, Optional
from flow.adapters.process import CommandRunner
from flow.core.errors import FlowError
class FileSystem:
"""Filesystem wrapper for all mutating operations."""
def ensure_dir(
self,
path: Path,
*,
sudo: bool = False,
runner: Optional[CommandRunner] = None,
) -> None:
if sudo:
if runner is None:
raise FlowError("Runner required for sudo operations")
runner.require_binary("sudo")
runner.run(["sudo", "mkdir", "-p", str(path)], check=True)
return
path.mkdir(parents=True, exist_ok=True)
def remove_file(
self,
path: Path,
*,
sudo: bool = False,
runner: Optional[CommandRunner] = None,
missing_ok: bool = True,
) -> None:
if sudo:
if runner is None:
raise FlowError("Runner required for sudo operations")
argv = ["sudo", "rm"]
if missing_ok:
argv.append("-f")
argv.append(str(path))
runner.run(argv, check=True)
return
try:
path.unlink()
except FileNotFoundError:
if not missing_ok:
raise
def remove_tree(self, path: Path, *, missing_ok: bool = False) -> None:
try:
shutil.rmtree(path)
except FileNotFoundError:
if not missing_ok:
raise
def copy_file(self, source: Path, target: Path) -> None:
self.ensure_dir(target.parent)
shutil.copy2(source, target)
def copy_tree(self, source: Path, target: Path) -> None:
self.ensure_dir(target.parent)
shutil.copytree(source, target, dirs_exist_ok=True)
def create_symlink(
self,
source: Path,
target: Path,
*,
sudo: bool = False,
runner: Optional[CommandRunner] = None,
) -> None:
self._check_overwrite_safe(target, source)
if sudo:
if runner is None:
raise FlowError("Runner required for sudo operations")
self.ensure_dir(target.parent, sudo=True, runner=runner)
runner.run(["sudo", "ln", "-sfn", str(source), str(target)], check=True)
return
self.ensure_dir(target.parent)
if target.is_symlink() or target.exists():
target.unlink()
target.symlink_to(source)
def _check_overwrite_safe(self, target: Path, source: Path) -> None:
if target.is_symlink():
actual = target.readlink()
if Path(actual) != Path(source):
raise FlowError(f"Refusing to overwrite unmanaged path: {target}")
return
if target.exists():
raise FlowError(f"Refusing to overwrite unmanaged path: {target}")
def remove_symlink(
self,
target: Path,
*,
expected_source: Optional[Path] = None,
sudo: bool = False,
runner: Optional[CommandRunner] = None,
) -> None:
if not target.is_symlink():
if target.exists():
raise FlowError(f"Refusing to remove non-symlink: {target}")
return
if expected_source is not None:
actual = target.readlink()
if Path(actual) != Path(expected_source):
raise FlowError(
f"Refusing to remove symlink {target}: points to {actual}, "
f"expected {expected_source}"
)
if sudo:
if runner is None:
raise FlowError("Runner required for sudo operations")
runner.run(["sudo", "rm", "-f", str(target)], check=True)
return
target.unlink()
def same_symlink(self, target: Path, source: Path) -> bool:
if not target.is_symlink():
return False
return Path(target.readlink()) == Path(source)
def read_text(self, path: Path) -> str:
return path.read_text(encoding="utf-8")
def write_text(self, path: Path, content: str) -> None:
self.ensure_dir(path.parent)
path.write_text(content, encoding="utf-8")
def read_bytes(self, path: Path) -> bytes:
return path.read_bytes()
def write_bytes(self, path: Path, content: bytes) -> None:
self.ensure_dir(path.parent)
path.write_bytes(content)
def read_json(self, path: Path, *, default: Any = None) -> Any:
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except FileNotFoundError:
return default
def write_json(self, path: Path, data: Any) -> None:
self.ensure_dir(path.parent)
tmp = path.with_suffix(path.suffix + ".tmp")
with open(tmp, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
os.replace(tmp, path)

29
src/flow/adapters/git.py Normal file
View File

@@ -0,0 +1,29 @@
"""Git process adapter."""
from __future__ import annotations
import subprocess
from pathlib import Path
from flow.adapters.process import CommandRunner
class GitClient:
"""Git adapter scoped to a repository root."""
def __init__(self, runner: CommandRunner):
self.runner = runner
def run(
self,
repo_dir: Path,
*args: str,
capture_output: bool = True,
check: bool = False,
) -> subprocess.CompletedProcess[str]:
return self.runner.run(
["git", "-C", str(repo_dir), *args],
capture_output=capture_output,
check=check,
)

View File

@@ -0,0 +1,75 @@
"""Process execution adapter."""
from __future__ import annotations
import shutil
import subprocess
from pathlib import Path
from typing import Iterable, Mapping, Optional, Sequence
from flow.core.errors import FlowError
class CommandRunner:
"""Subprocess wrapper with consistent error handling."""
def run(
self,
argv: Sequence[str] | Iterable[str],
*,
cwd: Optional[Path] = None,
env: Optional[Mapping[str, str]] = None,
capture_output: bool = True,
check: bool = False,
timeout: Optional[float] = None,
) -> subprocess.CompletedProcess[str]:
parts = [str(a) for a in argv]
completed = subprocess.run(
parts,
cwd=str(cwd) if cwd else None,
env=dict(env) if env else None,
capture_output=capture_output,
text=True,
check=False,
timeout=timeout,
)
if check and completed.returncode != 0:
msg = completed.stderr.strip() or completed.stdout.strip()
if not msg:
msg = f"Command failed with exit code {completed.returncode}"
raise FlowError(msg)
return completed
def run_shell(
self,
command: str,
*,
cwd: Optional[Path] = None,
env: Optional[Mapping[str, str]] = None,
capture_output: bool = True,
check: bool = False,
timeout: Optional[float] = None,
) -> subprocess.CompletedProcess[str]:
completed = subprocess.run(
command,
shell=True,
cwd=str(cwd) if cwd else None,
env=dict(env) if env else None,
capture_output=capture_output,
text=True,
check=False,
timeout=timeout,
)
if check and completed.returncode != 0:
msg = completed.stderr.strip() or completed.stdout.strip()
if not msg:
msg = f"Command failed with exit code {completed.returncode}"
raise FlowError(msg)
return completed
def require_binary(self, name: str) -> str:
path = shutil.which(name)
if path is None:
raise FlowError(f"Required executable not found: {name}")
return path

View File

@@ -1,87 +1,594 @@
"""Flow CLI entry point."""
"""Typer CLI entry point."""
from __future__ import annotations
import argparse
import os
import sys
from pathlib import Path
from typing import Optional
import typer
from flow import __version__
from flow.actions import ActionExecutor, ActionPlan, DomainAction, PrimitiveAction
from flow.core import paths
from flow.core.config import FlowContext, load_config, load_manifest
from flow.core.console import Console
from flow.core.errors import FlowError
from flow.core import paths
from flow.core.platform import detect_context, detect_platform
from flow.core.runtime import SystemRuntime
from flow.services.bootstrap import BootstrapService
from flow.services.containers import ContainerService
from flow.services.dotfiles import DotfilesService
from flow.services.packages import PackageService
from flow.services.projects import ProjectService
from flow.services.remote import RemoteService
def main(argv: Optional[list[str]] = None) -> None:
"""Main entry point."""
app = typer.Typer(
name="flow",
help="DevFlow - development environment manager",
no_args_is_help=True,
add_completion=False,
pretty_exceptions_show_locals=False,
)
dotfiles_app = typer.Typer(help="Manage dotfile symlinks", no_args_is_help=False)
dotfiles_repos_app = typer.Typer(help="Manage dotfiles and module repos", no_args_is_help=False)
packages_app = typer.Typer(help="Manage packages", no_args_is_help=False)
setup_app = typer.Typer(help="Bootstrap a system profile", no_args_is_help=False)
remote_app = typer.Typer(help="Manage remote targets", no_args_is_help=False)
dev_app = typer.Typer(help="Manage development containers", no_args_is_help=False)
projects_app = typer.Typer(help="Manage git projects", no_args_is_help=False)
completion_app = typer.Typer(help="Shell completion helpers", no_args_is_help=False)
def _version_callback(value: bool) -> None:
if value:
typer.echo(f"flow {__version__}")
raise typer.Exit()
@app.callback()
def _root(
ctx: typer.Context,
version: bool = typer.Option(
False,
"--version",
help="Show version",
callback=_version_callback,
is_eager=True,
),
quiet: bool = typer.Option(False, "--quiet", "-q", help="Suppress info output"),
no_color: bool = typer.Option(False, "--no-color", help="Disable colored output"),
) -> None:
del version
if hasattr(os, "getuid") and os.getuid() == 0:
print("Error: flow must not run as root", file=sys.stderr)
sys.exit(1)
parser = _build_parser()
args = parser.parse_args(argv)
if args.version:
print(f"flow {__version__}")
return
if not hasattr(args, "handler"):
parser.print_help()
return
typer.echo("Error: flow must not run as root", err=True)
raise typer.Exit(1)
console = Console(quiet=quiet, color=False if no_color else None)
try:
color = False if args.no_color else None
console = Console(quiet=args.quiet, color=color)
platform_info = detect_platform()
context = detect_context()
cmd_name = args.command or ""
if context == "vm" and cmd_name == "remote":
command = ctx.invoked_subcommand or ""
if context == "vm" and command == "remote":
raise FlowError("Command 'remote' is not available inside a VM.")
if context == "container" and cmd_name in {"remote", "dev", "projects"}:
raise FlowError(f"Command '{cmd_name}' is not available inside a container.")
if context == "container" and command in {"remote", "dev", "projects"}:
raise FlowError(f"Command '{command}' is not available inside a container.")
paths.ensure_dirs()
config = load_config()
ctx = FlowContext(
ctx.obj = FlowContext(
config=config,
manifest=load_manifest(),
platform=platform_info,
console=console,
runtime=SystemRuntime(container_mode=config.container_runtime),
)
args.handler(ctx, args)
except FlowError as e:
console.error(str(e))
sys.exit(1)
raise typer.Exit(1) from None
def _ctx(ctx: typer.Context) -> FlowContext:
flow_ctx = ctx.obj
if not isinstance(flow_ctx, FlowContext):
raise typer.Exit(1)
return flow_ctx
def _run(ctx: typer.Context, fn) -> None:
flow_ctx = _ctx(ctx)
try:
fn(flow_ctx)
except FlowError as e:
flow_ctx.console.error(str(e))
raise typer.Exit(1) from None
except KeyboardInterrupt:
console.error("Interrupted.")
sys.exit(130)
flow_ctx.console.error("Interrupted.")
raise typer.Exit(130) from None
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="flow",
description="DevFlow - development environment manager",
@dotfiles_app.callback(invoke_without_command=True)
def _dotfiles_default(ctx: typer.Context) -> None:
if ctx.invoked_subcommand is None:
_run(ctx, lambda flow_ctx: DotfilesService(flow_ctx).status())
@dotfiles_app.command("init")
def dotfiles_init(
ctx: typer.Context,
repo: Optional[str] = typer.Option(None, "--repo", help="Override configured repository URL"),
) -> None:
_run(ctx, lambda flow_ctx: DotfilesService(flow_ctx).init(repo_url=repo))
@dotfiles_app.command("link")
def dotfiles_link(
ctx: typer.Context,
profile: Optional[str] = typer.Option(None, "--profile", help="Profile to include"),
dry_run: bool = typer.Option(False, "--dry-run", "-n"),
skip: Optional[list[str]] = typer.Option(None, "--skip"),
) -> None:
_run(
ctx,
lambda flow_ctx: DotfilesService(flow_ctx).link(
profile=profile,
dry_run=dry_run,
skip=set(skip) if skip else None,
),
)
parser.add_argument("--version", action="store_true", help="Show version")
parser.add_argument("--quiet", "-q", action="store_true", help="Suppress info output")
parser.add_argument("--no-color", action="store_true", help="Disable colored output")
subparsers = parser.add_subparsers(dest="command")
# Import and register all command modules
from flow.commands import dotfiles, packages, setup, remote, dev, projects, completion
dotfiles.register(subparsers)
packages.register(subparsers)
setup.register(subparsers)
remote.register(subparsers)
dev.register(subparsers)
projects.register(subparsers)
completion.register(subparsers)
@dotfiles_app.command("unlink")
def dotfiles_unlink(
ctx: typer.Context,
packages: Optional[list[str]] = typer.Argument(None, help="Packages to unlink"),
dry_run: bool = typer.Option(False, "--dry-run", "-n"),
) -> None:
_run(
ctx,
lambda flow_ctx: DotfilesService(flow_ctx).unlink(
packages=packages if packages else None,
dry_run=dry_run,
),
)
return parser
@dotfiles_app.command("status")
def dotfiles_status(
ctx: typer.Context,
packages: Optional[list[str]] = typer.Argument(None, help="Packages to inspect"),
) -> None:
_run(
ctx,
lambda flow_ctx: DotfilesService(flow_ctx).status(
package_filter=packages if packages else None
),
)
@dotfiles_app.command("edit")
def dotfiles_edit(
ctx: typer.Context,
package: str = typer.Argument(..., help="Package name"),
no_commit: bool = typer.Option(False, "--no-commit", help="Skip auto-commit/push"),
) -> None:
_run(ctx, lambda flow_ctx: DotfilesService(flow_ctx).edit(package, no_commit=no_commit))
@dotfiles_repos_app.callback(invoke_without_command=True)
def dotfiles_repos_default(ctx: typer.Context) -> None:
if ctx.invoked_subcommand is None:
_run(ctx, lambda flow_ctx: DotfilesService(flow_ctx).repos_list())
@dotfiles_repos_app.command("list")
def dotfiles_repos_list(ctx: typer.Context) -> None:
_run(ctx, lambda flow_ctx: DotfilesService(flow_ctx).repos_list())
@dotfiles_repos_app.command("status")
def dotfiles_repos_status(
ctx: typer.Context,
repo_filter: Optional[str] = typer.Option(None, "--repo", help="Filter by repo name"),
) -> None:
_run(ctx, lambda flow_ctx: DotfilesService(flow_ctx).repos_status(repo_filter=repo_filter))
@dotfiles_repos_app.command("pull")
def dotfiles_repos_pull(
ctx: typer.Context,
repo_filter: Optional[str] = typer.Option(None, "--repo", help="Filter by repo name"),
dry_run: bool = typer.Option(False, "--dry-run", "-n"),
) -> None:
_run(ctx, lambda flow_ctx: DotfilesService(flow_ctx).repos_pull(repo_filter=repo_filter, dry_run=dry_run))
@dotfiles_repos_app.command("push")
def dotfiles_repos_push(
ctx: typer.Context,
repo_filter: Optional[str] = typer.Option(None, "--repo", help="Filter by repo name"),
dry_run: bool = typer.Option(False, "--dry-run", "-n"),
) -> None:
_run(ctx, lambda flow_ctx: DotfilesService(flow_ctx).repos_push(repo_filter=repo_filter, dry_run=dry_run))
dotfiles_app.add_typer(dotfiles_repos_app, name="repos")
dotfiles_app.add_typer(dotfiles_repos_app, name="repo")
@packages_app.callback(invoke_without_command=True)
def packages_default(ctx: typer.Context) -> None:
if ctx.invoked_subcommand is None:
_run(ctx, lambda flow_ctx: PackageService(flow_ctx).list_packages())
@packages_app.command("install")
def packages_install(
ctx: typer.Context,
package_names: Optional[list[str]] = typer.Argument(None, metavar="PACKAGES"),
profile: Optional[str] = typer.Option(None, "--profile", help="Install profile packages"),
dry_run: bool = typer.Option(False, "--dry-run", "-n"),
) -> None:
def _install(flow_ctx: FlowContext) -> None:
svc = PackageService(flow_ctx)
packages = svc.resolve_install_packages(
package_names=package_names if package_names else None,
profile=profile,
)
svc.install(packages, dry_run=dry_run)
_run(ctx, _install)
@packages_app.command("remove")
def packages_remove(
ctx: typer.Context,
package_names: list[str] = typer.Argument(..., metavar="PACKAGES"),
dry_run: bool = typer.Option(False, "--dry-run", "-n"),
) -> None:
_run(ctx, lambda flow_ctx: PackageService(flow_ctx).remove(package_names, dry_run=dry_run))
@packages_app.command("list")
def packages_list(
ctx: typer.Context,
show_all: bool = typer.Option(False, "--all", help="List all known packages"),
) -> None:
_run(ctx, lambda flow_ctx: PackageService(flow_ctx).list_packages(show_all=show_all))
@setup_app.callback(invoke_without_command=True)
def setup_default(ctx: typer.Context) -> None:
if ctx.invoked_subcommand is None:
_run(ctx, lambda flow_ctx: BootstrapService(flow_ctx).list_profiles())
@setup_app.command("run")
def setup_run(
ctx: typer.Context,
profile: Optional[str] = typer.Argument(None, help="Profile name"),
profile_option: Optional[str] = typer.Option(None, "--profile", help="Profile name"),
dry_run: bool = typer.Option(False, "--dry-run", "-n"),
var: Optional[list[str]] = typer.Option(None, "--var", help="Set variable KEY=VALUE"),
) -> None:
def _setup(flow_ctx: FlowContext) -> None:
if profile and profile_option and profile != profile_option:
raise FlowError("Specify the profile only once.")
BootstrapService(flow_ctx).run(
profile or profile_option,
dry_run=dry_run,
env=_parse_vars(var or []),
)
_run(ctx, _setup)
@setup_app.command("show")
def setup_show(ctx: typer.Context, profile: str = typer.Argument(..., help="Profile name")) -> None:
_run(ctx, lambda flow_ctx: BootstrapService(flow_ctx).show(profile))
@setup_app.command("list")
def setup_list(ctx: typer.Context) -> None:
_run(ctx, lambda flow_ctx: BootstrapService(flow_ctx).list_profiles())
@remote_app.callback(invoke_without_command=True)
def remote_default(ctx: typer.Context) -> None:
if ctx.invoked_subcommand is None:
_run(ctx, lambda flow_ctx: RemoteService(flow_ctx).list())
def _remote_enter(
ctx: typer.Context,
target: str,
user: Optional[str],
namespace: Optional[str],
platform: Optional[str],
session: Optional[str],
no_tmux: bool,
dry_run: bool,
) -> None:
_run(
ctx,
lambda flow_ctx: RemoteService(flow_ctx).enter(
target,
user=user,
namespace=namespace,
platform=platform,
session=session,
no_tmux=no_tmux,
dry_run=dry_run,
),
)
@remote_app.command("enter")
def remote_enter(
ctx: typer.Context,
target: str = typer.Argument(..., help="Target ([user@]namespace@platform)"),
user: Optional[str] = typer.Option(None, "--user", "-u", help="SSH user override"),
namespace: Optional[str] = typer.Option(None, "--namespace", "-n", help="Namespace override"),
platform: Optional[str] = typer.Option(None, "--platform", "-p", help="Platform override"),
session: Optional[str] = typer.Option(None, "--session", "-s", help="tmux session name"),
no_tmux: bool = typer.Option(False, "--no-tmux", help="Open plain SSH without tmux"),
dry_run: bool = typer.Option(False, "--dry-run", "-d"),
) -> None:
_remote_enter(ctx, target, user, namespace, platform, session, no_tmux, dry_run)
@remote_app.command("list")
def remote_list(ctx: typer.Context) -> None:
_run(ctx, lambda flow_ctx: RemoteService(flow_ctx).list())
@app.command("enter")
def enter_alias(
ctx: typer.Context,
target: str = typer.Argument(..., help="Target ([user@]namespace@platform)"),
user: Optional[str] = typer.Option(None, "--user", "-u", help="SSH user override"),
namespace: Optional[str] = typer.Option(None, "--namespace", "-n", help="Namespace override"),
platform: Optional[str] = typer.Option(None, "--platform", "-p", help="Platform override"),
session: Optional[str] = typer.Option(None, "--session", "-s", help="tmux session name"),
no_tmux: bool = typer.Option(False, "--no-tmux", help="Open plain SSH without tmux"),
dry_run: bool = typer.Option(False, "--dry-run", "-d"),
) -> None:
_remote_enter(ctx, target, user, namespace, platform, session, no_tmux, dry_run)
@dev_app.callback(invoke_without_command=True)
def dev_default(ctx: typer.Context) -> None:
if ctx.invoked_subcommand is None:
_run(ctx, lambda flow_ctx: ContainerService(flow_ctx).list())
@dev_app.command("create")
def dev_create(
ctx: typer.Context,
name: str,
image: str = typer.Option(..., "--image", "-i", help="Container image"),
project: Optional[str] = typer.Option(None, "--project", "-p", help="Project path to mount"),
dry_run: bool = typer.Option(False, "--dry-run"),
) -> None:
_run(ctx, lambda flow_ctx: ContainerService(flow_ctx).create(name, image, project_path=project, dry_run=dry_run))
@dev_app.command("attach")
def dev_attach(ctx: typer.Context, name: str) -> None:
_run(ctx, lambda flow_ctx: ContainerService(flow_ctx).connect(name))
@dev_app.command("connect")
def dev_connect(ctx: typer.Context, name: str) -> None:
_run(ctx, lambda flow_ctx: ContainerService(flow_ctx).connect(name))
@dev_app.command("exec")
def dev_exec(
ctx: typer.Context,
name: str,
command: Optional[list[str]] = typer.Argument(None, metavar="CMD"),
) -> None:
_run(ctx, lambda flow_ctx: ContainerService(flow_ctx).exec(name, command))
@dev_app.command("enter")
def dev_enter(ctx: typer.Context, name: str) -> None:
_run(ctx, lambda flow_ctx: ContainerService(flow_ctx).exec(name))
@dev_app.command("stop")
def dev_stop(
ctx: typer.Context,
name: str,
kill: bool = typer.Option(False, "--kill", help="Kill instead of graceful stop"),
) -> None:
_run(ctx, lambda flow_ctx: ContainerService(flow_ctx).stop(name, kill=kill))
@dev_app.command("remove")
def dev_remove(
ctx: typer.Context,
name: str,
force: bool = typer.Option(False, "--force", "-f", help="Force removal"),
) -> None:
_run(ctx, lambda flow_ctx: ContainerService(flow_ctx).remove(name, force=force))
@dev_app.command("rm")
def dev_rm(
ctx: typer.Context,
name: str,
force: bool = typer.Option(False, "--force", "-f", help="Force removal"),
) -> None:
_run(ctx, lambda flow_ctx: ContainerService(flow_ctx).remove(name, force=force))
@dev_app.command("respawn")
def dev_respawn(ctx: typer.Context, name: str) -> None:
_run(ctx, lambda flow_ctx: ContainerService(flow_ctx).respawn(name))
@dev_app.command("list")
def dev_list(ctx: typer.Context) -> None:
_run(ctx, lambda flow_ctx: ContainerService(flow_ctx).list())
def _projects_check(ctx: typer.Context, fetch: bool) -> None:
_run(ctx, lambda flow_ctx: ProjectService(flow_ctx).check(fetch=fetch))
@projects_app.callback(invoke_without_command=True)
def projects_default(ctx: typer.Context) -> None:
if ctx.invoked_subcommand is None:
_projects_check(ctx, fetch=False)
@projects_app.command("check")
def projects_check(
ctx: typer.Context,
fetch: bool = typer.Option(False, "--fetch", help="Fetch remotes first"),
) -> None:
_projects_check(ctx, fetch)
@projects_app.command("fetch")
def projects_fetch(ctx: typer.Context) -> None:
_run(ctx, lambda flow_ctx: ProjectService(flow_ctx).fetch())
@projects_app.command("summary")
def projects_summary(ctx: typer.Context) -> None:
_run(ctx, lambda flow_ctx: ProjectService(flow_ctx).summary())
@app.command("sync")
def sync_alias(
ctx: typer.Context,
fetch: bool = typer.Option(True, "--fetch/--no-fetch", help="Fetch remotes first"),
) -> None:
_projects_check(ctx, fetch)
@completion_app.callback(invoke_without_command=True)
def completion_default(ctx: typer.Context) -> None:
if ctx.invoked_subcommand is None:
from flow.commands.completion import _zsh_script_text
typer.echo(_zsh_script_text(), nl=False)
@completion_app.command("zsh")
def completion_zsh() -> None:
from flow.commands.completion import _zsh_script_text
typer.echo(_zsh_script_text(), nl=False)
@completion_app.command("install-zsh")
def completion_install_zsh(
ctx: typer.Context,
directory: str = typer.Option("~/.zsh/completions", "--dir"),
rc: str = typer.Option("~/.zshrc", "--rc"),
no_rc: bool = typer.Option(False, "--no-rc"),
) -> None:
def _install(flow_ctx: FlowContext) -> None:
from flow.commands.completion import _zsh_rc_snippet, _zsh_script_text
completions_dir = Path(directory).expanduser()
completion_file = completions_dir / "_flow"
primitives: list[PrimitiveAction] = [
PrimitiveAction(
id="completion.zsh.write-script",
type="file.write",
description=f"Write zsh completion script to {completion_file}",
payload={"path": completion_file, "content": _zsh_script_text()},
)
]
if not no_rc:
rc_path = Path(rc).expanduser()
content = rc_path.read_text(encoding="utf-8") if rc_path.exists() else ""
snippet = _zsh_rc_snippet(completions_dir)
start_marker = "# >>> flow completion >>>"
end_marker = "# <<< flow completion <<<"
if start_marker in content and end_marker in content:
start = content.find(start_marker)
end = content.find(end_marker, start) + len(end_marker)
updated = content[:start] + snippet.rstrip("\n") + content[end:]
else:
separator = "" if content.endswith("\n") or not content else "\n"
updated = content + separator + snippet
primitives.append(
PrimitiveAction(
id="completion.zsh.write-rc",
type="file.write",
description=f"Update shell rc {rc_path}",
payload={"path": rc_path, "content": updated},
)
)
ActionExecutor(flow_ctx).execute(
ActionPlan(
name="completion.install-zsh",
domain_actions=(
DomainAction(
id="completion.zsh.install",
kind="completion",
action="install-zsh",
description="Install zsh completion",
payload={"primitive_actions": tuple(primitives)},
),
),
)
)
flow_ctx.console.success(f"Installed completion script: {completion_file}")
_run(ctx, _install)
@completion_app.command("_zsh_complete", hidden=True)
def completion_zsh_complete(
ctx: typer.Context,
cword: int = typer.Option(..., "--cword", help="Completion word index"),
words: Optional[list[str]] = typer.Argument(None),
) -> None:
from flow.commands.completion import complete
for item in complete(_ctx(ctx), words or [], cword):
typer.echo(item)
def _parse_vars(items: list[str]) -> dict[str, str]:
values: dict[str, str] = {}
for item in items:
if "=" not in item:
raise FlowError(f"Invalid --var value '{item}'. Expected KEY=VALUE.")
key, value = item.split("=", 1)
if not key:
raise FlowError(f"Invalid --var value '{item}'. KEY cannot be empty.")
values[key] = value
return values
app.add_typer(dotfiles_app, name="dotfiles")
app.add_typer(dotfiles_app, name="dot")
app.add_typer(packages_app, name="packages")
app.add_typer(packages_app, name="package")
app.add_typer(packages_app, name="pkg")
app.add_typer(setup_app, name="setup")
app.add_typer(setup_app, name="bootstrap")
app.add_typer(setup_app, name="provision")
app.add_typer(remote_app, name="remote")
app.add_typer(dev_app, name="dev")
app.add_typer(projects_app, name="projects")
app.add_typer(projects_app, name="project")
app.add_typer(completion_app, name="completion")
def main(argv: Optional[list[str]] = None) -> None:
app(args=argv, prog_name="flow")
if __name__ == "__main__":
main(sys.argv[1:])

View File

@@ -1,63 +1,67 @@
"""Console output formatting with TTY detection and color control."""
"""Rich-backed console output formatting."""
from __future__ import annotations
import os
import sys
from typing import Any, Optional
from rich.console import Console as RichConsole
from rich.table import Table
from rich.text import Text
class Console:
def __init__(self, *, quiet: bool = False, color: Optional[bool] = None):
self.quiet = quiet
if color is None:
self._color = os.isatty(sys.stdout.fileno()) if hasattr(sys.stdout, "fileno") else False
else:
self._color = color
def _style(self, code: str, text: str) -> str:
if not self._color:
return text
return f"{code}{text}\033[0m"
self._color = color
self._out = RichConsole(
force_terminal=color,
no_color=(color is False),
highlight=False,
)
self._err = RichConsole(
file=sys.stderr,
force_terminal=color,
no_color=(color is False),
highlight=False,
)
def info(self, msg: str) -> None:
if self.quiet:
return
tag = self._style("\033[36m", "[INFO]")
print(f"{tag} {msg}")
text = Text("[INFO] ", style="cyan")
text.append(str(msg))
self._out.print(text)
def warn(self, msg: str) -> None:
tag = self._style("\033[33m", "[WARN]")
print(f"{tag} {msg}")
text = Text("[WARN] ", style="yellow")
text.append(str(msg))
self._out.print(text)
def error(self, msg: str) -> None:
tag = self._style("\033[31m", "[ERROR]")
print(f"{tag} {msg}", file=sys.stderr)
text = Text("[ERROR] ", style="red")
text.append(str(msg))
self._err.print(text)
def success(self, msg: str) -> None:
tag = self._style("\033[32m", "[OK]")
print(f"{tag} {msg}")
text = Text("[OK] ", style="green")
text.append(str(msg))
self._out.print(text)
def table(self, headers: list[str], rows: list[list[str]]) -> None:
if not rows:
return
widths = [len(h) for h in headers]
table = Table(show_header=True, header_style="bold", box=None, pad_edge=False)
for header in headers:
table.add_column(header)
for row in rows:
for i, cell in enumerate(row):
if i < len(widths):
widths[i] = max(widths[i], len(str(cell)))
header_line = " ".join(f"{h:<{widths[i]}}" for i, h in enumerate(headers))
if self._color:
print(f"\033[1m{header_line}\033[0m")
else:
print(header_line)
print(" ".join("-" * w for w in widths))
for row in rows:
print(" ".join(f"{str(cell):<{widths[i]}}" for i, cell in enumerate(row)))
table.add_row(*(str(cell) for cell in row))
self._out.print(table)
def print_plan(self, operations: list[Any], *, verb: str = "execute") -> None:
if not operations:
self.info(f"Nothing to {verb}.")
return
self.info(f"Plan ({len(operations)} operation(s)):")
self.info(f"Plan ({len(operations)} action(s)):")
for op in operations:
print(f" {op}")
self._out.print(f" {op}")

View File

@@ -1,245 +1,38 @@
"""Runtime primitives for process, git, state, and filesystem access."""
"""Runtime dependency bundle.
The mutating implementations live in ``flow.adapters``. This module remains
as the stable import path for callers and tests that need the runtime bundle.
"""
from __future__ import annotations
import json
import os
import shutil
import subprocess
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Iterable, Mapping, Optional, Sequence
from flow.adapters.archive import ArchiveClient
from flow.adapters.download import DownloadClient
from flow.adapters.filesystem import FileSystem
from flow.adapters.git import GitClient
from flow.adapters.process import CommandRunner
from flow.core.containers import ContainerRuntime
from flow.core.errors import FlowError
from flow.core.tmux import TmuxClient
class CommandRunner:
"""Subprocess wrapper with consistent defaults."""
def run(
self,
argv: Sequence[str] | Iterable[str],
*,
cwd: Optional[Path] = None,
env: Optional[Mapping[str, str]] = None,
capture_output: bool = True,
check: bool = False,
timeout: Optional[float] = None,
) -> subprocess.CompletedProcess[str]:
parts = [str(a) for a in argv]
completed = subprocess.run(
parts,
cwd=str(cwd) if cwd else None,
env=dict(env) if env else None,
capture_output=capture_output,
text=True,
check=False,
timeout=timeout,
)
if check and completed.returncode != 0:
msg = completed.stderr.strip() or completed.stdout.strip()
if not msg:
msg = f"Command failed with exit code {completed.returncode}"
raise FlowError(msg)
return completed
def run_shell(
self,
command: str,
*,
cwd: Optional[Path] = None,
env: Optional[Mapping[str, str]] = None,
capture_output: bool = True,
check: bool = False,
timeout: Optional[float] = None,
) -> subprocess.CompletedProcess[str]:
completed = subprocess.run(
command,
shell=True,
cwd=str(cwd) if cwd else None,
env=dict(env) if env else None,
capture_output=capture_output,
text=True,
check=False,
timeout=timeout,
)
if check and completed.returncode != 0:
msg = completed.stderr.strip() or completed.stdout.strip()
if not msg:
msg = f"Command failed with exit code {completed.returncode}"
raise FlowError(msg)
return completed
def require_binary(self, name: str) -> str:
path = shutil.which(name)
if path is None:
raise FlowError(f"Required executable not found: {name}")
return path
class FileSystem:
"""Filesystem wrapper for all mutating operations."""
def ensure_dir(self, path: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None) -> None:
if sudo:
if runner is None:
raise FlowError("Runner required for sudo operations")
runner.require_binary("sudo")
runner.run(["sudo", "mkdir", "-p", str(path)], check=True)
return
path.mkdir(parents=True, exist_ok=True)
def remove_file(self, path: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None, missing_ok: bool = True) -> None:
if sudo:
if runner is None:
raise FlowError("Runner required for sudo operations")
argv = ["sudo", "rm"]
if missing_ok:
argv.append("-f")
argv.append(str(path))
runner.run(argv, check=True)
return
try:
path.unlink()
except FileNotFoundError:
if not missing_ok:
raise
def remove_tree(self, path: Path, *, missing_ok: bool = False) -> None:
try:
shutil.rmtree(path)
except FileNotFoundError:
if not missing_ok:
raise
def copy_file(self, source: Path, target: Path) -> None:
self.ensure_dir(target.parent)
shutil.copy2(source, target)
def copy_tree(self, source: Path, target: Path) -> None:
self.ensure_dir(target.parent)
shutil.copytree(source, target, dirs_exist_ok=True)
def create_symlink(self, source: Path, target: Path, *, sudo: bool = False, runner: Optional[CommandRunner] = None) -> None:
self._check_overwrite_safe(target, source)
if sudo:
if runner is None:
raise FlowError("Runner required for sudo operations")
self.ensure_dir(target.parent, sudo=True, runner=runner)
runner.run(["sudo", "ln", "-sfn", str(source), str(target)], check=True)
return
self.ensure_dir(target.parent)
if target.is_symlink() or target.exists():
target.unlink()
target.symlink_to(source)
def _check_overwrite_safe(self, target: Path, source: Path) -> None:
"""Verify target is absent or already a symlink pointing at source."""
if target.is_symlink():
actual = target.readlink()
if Path(actual) != Path(source):
raise FlowError(
f"Refusing to overwrite unmanaged path: {target}"
)
return
if target.exists():
raise FlowError(
f"Refusing to overwrite unmanaged path: {target}"
)
def remove_symlink(
self,
target: Path,
*,
expected_source: Optional[Path] = None,
sudo: bool = False,
runner: Optional[CommandRunner] = None,
) -> None:
"""Remove a symlink at target. If expected_source is given, verify it
points there. No-op if target doesn't exist."""
if not target.is_symlink():
if target.exists():
raise FlowError(f"Refusing to remove non-symlink: {target}")
return
if expected_source is not None:
actual = target.readlink()
if Path(actual) != Path(expected_source):
raise FlowError(
f"Refusing to remove symlink {target}: points to {actual}, "
f"expected {expected_source}"
)
if sudo:
if runner is None:
raise FlowError("Runner required for sudo operations")
runner.run(["sudo", "rm", "-f", str(target)], check=True)
return
target.unlink()
def same_symlink(self, target: Path, source: Path) -> bool:
"""True iff target is a symlink whose readlink exactly equals source.
Uses raw readlink (not resolve) so the answer is consistent with the
readlink-based checks in _check_overwrite_safe and remove_symlink.
Following symlink chains would let externally-modified links pass as
"ours" and silently overwrite them; raw match is the safer rule.
"""
if not target.is_symlink():
return False
return Path(target.readlink()) == Path(source)
def read_text(self, path: Path) -> str:
return path.read_text(encoding="utf-8")
def write_text(self, path: Path, content: str) -> None:
self.ensure_dir(path.parent)
path.write_text(content, encoding="utf-8")
def write_bytes(self, path: Path, content: bytes) -> None:
self.ensure_dir(path.parent)
path.write_bytes(content)
def read_json(self, path: Path, *, default: Any = None) -> Any:
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except FileNotFoundError:
return default
def write_json(self, path: Path, data: Any) -> None:
self.ensure_dir(path.parent)
tmp = path.with_suffix(path.suffix + ".tmp")
with open(tmp, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
os.replace(tmp, path)
class GitClient:
"""Git adapter scoped to a repository root."""
def __init__(self, runner: CommandRunner):
self.runner = runner
def run(self, repo_dir: Path, *args: str, capture_output: bool = True, check: bool = False) -> subprocess.CompletedProcess[str]:
return self.runner.run(
["git", "-C", str(repo_dir), *args],
capture_output=capture_output,
check=check,
)
@dataclass
class SystemRuntime:
"""Shared runtime dependencies."""
runner: CommandRunner = field(default_factory=CommandRunner)
fs: FileSystem = field(default_factory=FileSystem)
container_mode: str = "auto"
git: GitClient = field(init=False)
tmux: TmuxClient = field(init=False)
containers: ContainerRuntime = field(init=False)
download: DownloadClient = field(init=False)
archive: ArchiveClient = field(init=False)
def __post_init__(self) -> None:
self.git = GitClient(self.runner)
self.tmux = TmuxClient(self.runner)
self.containers = ContainerRuntime(self.runner, mode=self.container_mode)
self.download = DownloadClient(self.fs)
self.archive = ArchiveClient(self.fs)

View File

@@ -160,6 +160,17 @@ def pm_update_command(pm: str) -> str:
return commands[pm]
def pm_update_argv(pm: str) -> list[str]:
commands = {
"apt": ["sudo", "apt-get", "update", "-qq"],
"dnf": ["sudo", "dnf", "check-update", "-q"],
"brew": ["brew", "update"],
}
if pm not in commands:
raise FlowError(f"Unsupported package manager: {pm}")
return commands[pm]
def pm_install_command(pm: str, packages: list[str]) -> str:
"""Return the package manager install command."""
pkg_str = " ".join(packages)
@@ -173,7 +184,24 @@ def pm_install_command(pm: str, packages: list[str]) -> str:
return commands[pm]
def pm_install_argv(pm: str, packages: list[str]) -> list[str]:
commands = {
"apt": ["sudo", "apt-get", "install", "-y", "-qq", *packages],
"dnf": ["sudo", "dnf", "install", "-y", "-q", *packages],
"brew": ["brew", "install", *packages],
}
if pm not in commands:
raise FlowError(f"Unsupported package manager: {pm}")
return commands[pm]
def pm_cask_install_command(pm: str, packages: list[str]) -> str:
if pm != "brew":
raise FlowError(f"Package manager '{pm}' does not support casks")
return f"brew install --cask {' '.join(packages)}"
def pm_cask_install_argv(pm: str, packages: list[str]) -> list[str]:
if pm != "brew":
raise FlowError(f"Package manager '{pm}' does not support casks")
return ["brew", "install", "--cask", *packages]

View File

@@ -6,6 +6,7 @@ from __future__ import annotations
import os
from typing import Optional
from flow.actions import ActionExecutor, ActionPlan, DomainAction
from flow.core.config import FlowContext
from flow.core.errors import FlowError
from flow.domain.bootstrap.models import BootstrapAction
@@ -98,8 +99,21 @@ class BootstrapService:
return
if action.phase in ("setup", "shell", "post-link"):
for cmd in action.commands:
self.ctx.runtime.runner.run_shell(cmd, check=True)
ActionExecutor(self.ctx).execute(
ActionPlan(
name=f"setup.{action.phase}",
domain_actions=tuple(
DomainAction(
id=f"setup.{action.phase}.{index}",
kind="setup",
action="shell-hook",
description=f"Run {action.phase} hook",
payload={"command": cmd},
)
for index, cmd in enumerate(action.commands)
),
)
)
return
raise FlowError(f"Unhandled bootstrap phase: {action.phase!r}")

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
import os
import shutil
from flow.actions import ActionExecutor, ActionPlan, DomainAction, PrimitiveAction, RollbackPolicy
from flow.core.config import FlowContext
from flow.core.errors import FlowError
from flow.core import paths
@@ -55,24 +56,49 @@ class ContainerService:
self.ctx.console.info(f"Creating container: {spec.name}")
self.ctx.console.info(f" Image: {spec.image.full}")
if dry_run:
return
mount_flags = [
f"{m.source}:{m.target}{':ro' if m.readonly else ''}"
for m in spec.mounts
]
security_opts = self.rt.socket_security_opts if self.rt.socket_path else []
self.rt.run_container(
spec.name,
spec.image.full,
network=spec.network,
labels=spec.labels,
mounts=mount_flags,
security_opts=security_opts,
command=["sleep", "infinity"],
detach=True,
action_plan = ActionPlan(
name=f"container.create.{spec.name}",
domain_actions=(
DomainAction(
id=f"container.{spec.name}.run",
kind="container",
action="create",
description=f"Create container {spec.name}",
payload={
"primitive_actions": (
PrimitiveAction(
id=f"container.{spec.name}.run",
type="container.run",
description=f"Run {spec.image.full} as {spec.name}",
payload={
"name": spec.name,
"image": spec.image.full,
"network": spec.network,
"labels": spec.labels,
"mounts": mount_flags,
"security_opts": security_opts,
"command": ("sleep", "infinity"),
"detach": True,
},
rollback_policy=RollbackPolicy.NONE,
),
)
},
rollback_policy=RollbackPolicy.NONE,
),
),
)
if dry_run:
ActionExecutor(self.ctx).render_plan(action_plan)
return
ActionExecutor(self.ctx).execute(action_plan)
self.ctx.console.success(f"Created and started container: {spec.name}")
def exec(self, name: str, command: list[str] | None = None) -> None:
@@ -128,9 +154,24 @@ class ContainerService:
if not self.rt.container_exists(cname):
raise FlowError(f"Container {cname} does not exist")
if kill:
self.rt.kill(cname)
primitive = PrimitiveAction(
id=f"container.{cname}.kill",
type="container.kill",
description=f"Kill container {cname}",
payload={"name": cname},
rollback_policy=RollbackPolicy.NONE,
)
else:
self.rt.stop(cname)
primitive = PrimitiveAction(
id=f"container.{cname}.stop",
type="container.stop",
description=f"Stop container {cname}",
payload={"name": cname},
rollback_policy=RollbackPolicy.NONE,
)
ActionExecutor(self.ctx).execute(
ActionPlan(name=f"container.stop.{cname}", primitive_actions=(primitive,))
)
self.ctx.console.success(f"Container {cname} stopped.")
def remove(self, name: str, *, force: bool = False) -> None:
@@ -138,7 +179,20 @@ class ContainerService:
cname = container_name(name)
if not self.rt.container_exists(cname):
raise FlowError(f"Container {cname} does not exist")
self.rt.rm(cname, force=force)
ActionExecutor(self.ctx).execute(
ActionPlan(
name=f"container.remove.{cname}",
primitive_actions=(
PrimitiveAction(
id=f"container.{cname}.remove",
type="container.remove",
description=f"Remove container {cname}",
payload={"name": cname, "force": force},
rollback_policy=RollbackPolicy.NONE,
),
),
)
)
self.ctx.console.success(f"Container {cname} removed.")
def respawn(self, name: str) -> None:

View File

@@ -6,8 +6,15 @@ import os
from pathlib import Path
from typing import Optional
from flow.actions import (
ActionExecutor,
ActionPlan,
DomainAction,
PrimitiveAction,
RollbackPolicy,
)
from flow.core.config import FlowContext
from flow.core.errors import FlowError
from flow.core.errors import FlowError, PlanConflict
from flow.core.yaml import load_yaml_file
from flow.core import paths
from flow.domain.dotfiles.models import (
@@ -81,9 +88,7 @@ class DotfilesService:
)
if plan.conflicts:
self.ctx.console.warn(f"{len(plan.conflicts)} conflict(s):")
for c in plan.conflicts:
self.ctx.console.warn(f" {c}")
raise PlanConflict("Conflicting dotfile targets", list(plan.conflicts))
if not plan.operations:
self.ctx.console.info(
@@ -91,13 +96,16 @@ class DotfilesService:
)
return
if dry_run:
self._executor().render_plan(
self._link_action_plan(plan, targets, current)
)
return
self.ctx.console.print_plan(plan.operations, verb="link")
if dry_run:
return
new_state = self._apply_plan(plan, targets, current)
self._save_state(new_state)
self._executor().execute(
self._link_action_plan(plan, targets, current)
)
parts = []
if plan.summary.added:
parts.append(f"{plan.summary.added} added")
@@ -126,23 +134,14 @@ class DotfilesService:
self.ctx.console.info("Nothing to unlink.")
return
if dry_run:
self._executor().render_plan(
self._unlink_action_plan(plan, current)
)
return
self.ctx.console.print_plan(plan.operations, verb="unlink")
if dry_run:
return
new_state = LinkedState(links=dict(current.links))
for op in plan.operations:
expected = current.links[op.target].source if op.target in current.links else None
self.ctx.runtime.fs.remove_symlink(
op.target,
expected_source=expected,
sudo=op.needs_sudo,
runner=self.ctx.runtime.runner if op.needs_sudo else None,
)
new_state.links.pop(op.target, None)
self._save_state(new_state)
self._executor().execute(self._unlink_action_plan(plan, current))
self.ctx.console.success(f"Unlinked {plan.summary.removed} file(s).")
# ── Status ───────────────────────────────────────────────────────────
@@ -240,18 +239,30 @@ class DotfilesService:
self.ctx.console.warn(f"Dotfiles directory already exists: {self.dotfiles_dir}")
return
self.ctx.console.info(
f"Cloning {remote} (branch: {self.ctx.config.dotfiles_branch})..."
)
self.ctx.runtime.git.run(
self.dotfiles_dir.parent,
"clone",
"-b",
self.ctx.config.dotfiles_branch,
"--recurse-submodules",
remote,
str(self.dotfiles_dir),
check=True,
self.ctx.console.info(f"Cloning {remote} (branch: {self.ctx.config.dotfiles_branch})...")
self._executor().execute(
ActionPlan(
name="dotfiles.init",
primitive_actions=(
PrimitiveAction(
id="dotfiles.init.clone",
type="process.argv",
description=f"Clone dotfiles repository to {self.dotfiles_dir}",
payload={
"argv": (
"git",
"clone",
"-b",
self.ctx.config.dotfiles_branch,
"--recurse-submodules",
remote,
str(self.dotfiles_dir),
)
},
rollback_policy=RollbackPolicy.NONE,
),
),
)
)
self.repos_pull()
self.ctx.console.success(f"Dotfiles cloned to {self.dotfiles_dir}")
@@ -297,11 +308,13 @@ class DotfilesService:
) -> None:
"""Pull (or clone) managed repos."""
for repo in self._filter_repos(repo_filter):
action_plan = self._repo_pull_or_clone_plan(repo)
if dry_run:
action = "pull" if repo.path.is_dir() else "clone"
self.ctx.console.info(f"Would {action}: {repo.name} ({repo.source})")
self._executor().render_plan(action_plan)
continue
self._pull_or_clone_repo(repo)
self._executor().execute(action_plan)
def repos_push(
self,
@@ -314,10 +327,23 @@ class DotfilesService:
if not repo.path.is_dir():
self.ctx.console.warn(f"{repo.name}: not cloned, skipping")
continue
action_plan = ActionPlan(
name=f"repo.push.{repo.name}",
domain_actions=(
DomainAction(
id=f"repo.{repo.name}.push",
kind="repo",
action="push",
description=f"Push repository {repo.name}",
payload={"repo": repo.path, "args": ("push",)},
rollback_policy=RollbackPolicy.NONE,
),
),
)
if dry_run:
self.ctx.console.info(f"Would push: {repo.name}")
self._executor().render_plan(action_plan)
continue
self.ctx.runtime.git.run(repo.path, "push", check=True)
self._executor().execute(action_plan)
self.ctx.console.success(f"Pushed: {repo.name}")
# ── Repo discovery ───────────────────────────────────────────────────
@@ -363,24 +389,97 @@ class DotfilesService:
def _pull_or_clone_repo(self, repo: RepoInfo) -> None:
"""Pull an existing repo or clone it if missing."""
self._executor().execute(self._repo_pull_or_clone_plan(repo))
def _repo_pull_or_clone_plan(self, repo: RepoInfo) -> ActionPlan:
actions: list[DomainAction] = []
primitives: list[PrimitiveAction] = []
if repo.path.is_dir():
self.ctx.console.info(f"Pulling: {repo.name}")
if repo.is_module:
self._pull_module_repo(repo)
module = repo.module_ref
if module is None:
actions.append(
DomainAction(
id=f"repo.{repo.name}.pull",
kind="repo",
action="pull",
description=f"Pull repository {repo.name}",
payload={"repo": repo.path, "args": ("pull", "--ff-only")},
rollback_policy=RollbackPolicy.NONE,
)
)
else:
actions.append(
DomainAction(
id=f"repo.{repo.name}.fetch",
kind="repo",
action="fetch",
description=f"Fetch repository {repo.name}",
payload={"repo": repo.path, "args": ("fetch", "--all")},
rollback_policy=RollbackPolicy.NONE,
)
)
actions.append(
DomainAction(
id=f"repo.{repo.name}.checkout",
kind="repo",
action="checkout",
description=f"Checkout {module.ref_type}:{module.ref_value} in {repo.name}",
payload={"repo": repo.path, "args": ("checkout", _git_checkout_ref(module))},
rollback_policy=RollbackPolicy.NONE,
)
)
if module.ref_type == "branch":
actions.append(
DomainAction(
id=f"repo.{repo.name}.pull",
kind="repo",
action="pull",
description=f"Pull repository {repo.name}",
payload={"repo": repo.path, "args": ("pull", "--ff-only")},
rollback_policy=RollbackPolicy.NONE,
)
)
else:
self.ctx.runtime.git.run(
repo.path, "pull", "--ff-only", check=True,
actions.append(
DomainAction(
id=f"repo.{repo.name}.pull",
kind="repo",
action="pull",
description=f"Pull repository {repo.name}",
payload={"repo": repo.path, "args": ("pull", "--ff-only")},
rollback_policy=RollbackPolicy.NONE,
)
)
else:
if not repo.source:
raise FlowError(f"No source URL for repo {repo.name!r}")
self.ctx.console.info(f"Cloning: {repo.name}")
self.ctx.runtime.runner.run(
["git", "clone", repo.source, str(repo.path)],
check=True,
primitives.append(
PrimitiveAction(
id=f"repo.{repo.name}.clone",
type="process.argv",
description=f"Clone repository {repo.name}",
payload={"argv": ("git", "clone", repo.source, str(repo.path))},
rollback_policy=RollbackPolicy.NONE,
)
)
if repo.is_module:
self._checkout_module_ref(repo)
if repo.is_module and repo.module_ref is not None:
primitives.append(
PrimitiveAction(
id=f"repo.{repo.name}.checkout",
type="git.checkout",
description=f"Checkout {_git_checkout_ref(repo.module_ref)} in {repo.name}",
payload={"repo": repo.path, "args": ("checkout", _git_checkout_ref(repo.module_ref))},
rollback_policy=RollbackPolicy.NONE,
)
)
return ActionPlan(
name=f"repo.pull-or-clone.{repo.name}",
domain_actions=tuple(actions),
primitive_actions=tuple(primitives),
)
def _pull_module_repo(self, repo: RepoInfo) -> None:
"""Pull a module repo, respecting its ref type."""
@@ -571,9 +670,25 @@ class DotfilesService:
def _save_state(self, state: LinkedState) -> None:
"""Save linked state to disk."""
self.ctx.runtime.fs.write_json(paths.LINKED_STATE, state.as_dict())
self._executor().execute(
ActionPlan(
name="dotfiles.write-state",
domain_actions=(
DomainAction(
id="dotfiles.state.write",
kind="dotfiles",
action="write-state",
description=f"Write dotfiles state to {paths.LINKED_STATE}",
payload={
"path": paths.LINKED_STATE,
"data": state.as_dict(),
},
),
),
)
)
def _apply_plan(
def _project_state(
self,
plan: LinkPlan,
targets: list[LinkTarget],
@@ -586,26 +701,92 @@ class DotfilesService:
raise FlowError(
f"create_link op for {op.target} is missing a source"
)
self.ctx.runtime.fs.create_symlink(
op.source,
op.target,
sudo=op.needs_sudo,
runner=self.ctx.runtime.runner if op.needs_sudo else None,
)
spec = next(t for t in targets if t.target == op.target)
new_state.links[op.target] = spec
elif op.type == "remove_link":
expected = (
current.links[op.target].source
if op.target in current.links else None
)
self.ctx.runtime.fs.remove_symlink(
op.target,
expected_source=expected,
sudo=op.needs_sudo,
runner=self.ctx.runtime.runner if op.needs_sudo else None,
)
new_state.links.pop(op.target, None)
else:
raise FlowError(f"Unhandled LinkOp.type: {op.type!r}")
return new_state
def _link_action_plan(
self,
plan: LinkPlan,
targets: list[LinkTarget],
current: LinkedState,
) -> ActionPlan:
actions = self._link_domain_actions(plan, current)
projected = self._project_state(plan, targets, current)
actions.append(self._state_domain_action("dotfiles.state.write", projected))
return ActionPlan(name="dotfiles.link", domain_actions=tuple(actions))
def _unlink_action_plan(
self,
plan: LinkPlan,
current: LinkedState,
) -> ActionPlan:
actions = self._link_domain_actions(plan, current)
projected = LinkedState(links=dict(current.links))
for op in plan.operations:
projected.links.pop(op.target, None)
actions.append(self._state_domain_action("dotfiles.state.write", projected))
return ActionPlan(name="dotfiles.unlink", domain_actions=tuple(actions))
def _link_domain_actions(
self,
plan: LinkPlan,
current: LinkedState,
) -> list[DomainAction]:
actions: list[DomainAction] = []
for index, op in enumerate(plan.operations):
if op.type == "create_link":
if op.source is None:
raise FlowError(f"create_link op for {op.target} is missing a source")
actions.append(
DomainAction(
id=f"dotfiles.link.{index}",
kind="dotfiles",
action="link",
description=f"Link {op.target} -> {op.source}",
payload={
"source": op.source,
"target": op.target,
"sudo": op.needs_sudo,
},
)
)
continue
if op.type == "remove_link":
expected = (
current.links[op.target].source
if op.target in current.links else None
)
actions.append(
DomainAction(
id=f"dotfiles.unlink.{index}",
kind="dotfiles",
action="unlink",
description=f"Remove managed link {op.target}",
payload={
"target": op.target,
"expected_source": expected,
"sudo": op.needs_sudo,
},
)
)
continue
raise FlowError(f"Unhandled LinkOp.type: {op.type!r}")
return actions
def _state_domain_action(self, action_id: str, state: LinkedState) -> DomainAction:
return DomainAction(
id=action_id,
kind="dotfiles",
action="write-state",
description=f"Write dotfiles state to {paths.LINKED_STATE}",
payload={"path": paths.LINKED_STATE, "data": state.as_dict()},
rollback_policy=RollbackPolicy.ROLLBACKABLE,
)
def _executor(self) -> ActionExecutor:
return ActionExecutor(self.ctx, audit_path=paths.LINKED_STATE.parent / "actions.jsonl")

View File

@@ -3,13 +3,11 @@
from __future__ import annotations
import os
import shutil
import tempfile
import urllib.error
import urllib.request
import uuid
from pathlib import Path
from typing import Any, Optional
from flow.actions import ActionExecutor, ActionPlan, DomainAction, PrimitiveAction, RollbackPolicy
from flow.core.config import FlowContext
from flow.core.errors import FlowError
from flow.core.template import substitute_template
@@ -18,15 +16,17 @@ from flow.domain.packages.catalog import normalize_profile_entry, parse_catalog
from flow.domain.packages.models import (
InstalledPackage,
InstalledState,
PackagePlan,
PackageDef,
PkgInstallOp,
)
from flow.domain.packages.planning import plan_install, plan_remove
from flow.domain.packages.resolution import (
binary_template_context,
detect_package_manager,
pm_cask_install_command,
pm_install_command,
pm_update_command,
pm_cask_install_argv,
pm_install_argv,
pm_update_argv,
resolve_extract_dir,
resolve_spec,
)
@@ -81,147 +81,16 @@ class PackageService:
self.ctx.console.info("All packages already installed.")
return
self.ctx.console.print_plan(
[str(op) for op in plan.install_ops], verb="install"
)
action_plan = self._install_action_plan(plan, installed, pm)
if dry_run:
self._executor().render_plan(action_plan)
return
# Execute PM packages
if plan.pm_update_needed and pm:
self.ctx.console.info(f"Updating package manager ({pm})...")
self.ctx.runtime.runner.run_shell(
pm_update_command(pm), check=True,
)
pm_names = [op.source_name for op in plan.install_ops if op.method == "pm"]
if pm_names and pm:
cmd = pm_install_command(pm, pm_names)
self.ctx.console.info(f"Installing: {', '.join(pm_names)}")
self.ctx.runtime.runner.run_shell(cmd, check=True)
for op in plan.install_ops:
if op.method == "pm":
installed.packages[op.package.name] = InstalledPackage(
name=op.package.name,
version=op.package.version or "system",
type="pkg",
)
self._run_post_install(op.package)
cask_names = [op.source_name for op in plan.install_ops if op.method == "cask"]
if cask_names and pm:
cmd = pm_cask_install_command(pm, cask_names)
self.ctx.console.info(f"Installing casks: {', '.join(cask_names)}")
self.ctx.runtime.runner.run_shell(cmd, check=True)
for op in plan.install_ops:
if op.method == "cask":
installed.packages[op.package.name] = InstalledPackage(
name=op.package.name,
version=op.package.version or "system",
type="cask",
)
self._run_post_install(op.package)
for op in plan.install_ops:
if op.method == "binary" and op.download_url:
self._install_binary(op.package, op.download_url, op.source_name, installed)
self._run_post_install(op.package)
elif op.method == "appimage" and op.download_url:
self._install_appimage(op.package, op.download_url, installed)
self._run_post_install(op.package)
self._save_state(installed)
self.ctx.console.print_plan([str(op) for op in plan.install_ops], verb="install")
self._executor().execute(action_plan)
self.ctx.console.success(f"Installed {len(plan.install_ops)} package(s).")
def _install_binary(
self, pkg: PackageDef, url: str, asset: str, state: InstalledState,
) -> None:
"""Download and install a binary package."""
install_map = pkg.install
if not install_map:
raise FlowError(f"Binary package '{pkg.name}' must define install paths")
context = self._binary_context(pkg)
with tempfile.TemporaryDirectory(prefix=f"flow-{pkg.name}-") as tmp:
tmp_dir = Path(tmp)
archive = tmp_dir / asset
extracted = tmp_dir / "extract"
self.ctx.console.info(f"Downloading {pkg.name}...")
try:
with urllib.request.urlopen(url, timeout=60) as response:
self.ctx.runtime.fs.write_bytes(archive, response.read())
except urllib.error.URLError as e:
raise FlowError(f"Failed to download {url}: {e}") from e
self.ctx.runtime.fs.ensure_dir(extracted)
try:
shutil.unpack_archive(str(archive), str(extracted))
except (shutil.ReadError, ValueError) as e:
raise FlowError(f"Could not extract archive for '{pkg.name}': {e}") from e
extract_dir = resolve_extract_dir(pkg, self.ctx.platform.platform)
source_root = extracted if extract_dir is None else extracted / extract_dir
if not source_root.exists():
raise FlowError(f"extract-dir '{extract_dir}' not found for package '{pkg.name}'")
source_root_resolved = source_root.resolve(strict=False)
installed_paths: list[Path] = []
for section in ("bin", "share", "man", "lib"):
if section not in install_map:
continue
items = install_map[section]
if not isinstance(items, list):
raise FlowError(
f"Install section '{section}' for '{pkg.name}' must be a list"
)
for item in items:
if not isinstance(item, str):
raise FlowError(
f"Install paths for '{pkg.name}' must be strings"
)
installed_paths.append(
self._copy_install_item(
pkg.name,
source_root,
source_root_resolved,
section,
substitute_template(item, context),
)
)
if not installed_paths:
raise FlowError(f"Binary package '{pkg.name}' installed no files")
state.packages[pkg.name] = InstalledPackage(
name=pkg.name,
version=pkg.version or "latest",
type="binary",
files=installed_paths,
)
def _install_appimage(
self, pkg: PackageDef, url: str, state: InstalledState,
) -> None:
"""Download and install an AppImage."""
bin_dir = paths.HOME / ".local" / "bin"
self.ctx.runtime.fs.ensure_dir(bin_dir)
target = bin_dir / pkg.name
self.ctx.console.info(f"Downloading {pkg.name} AppImage...")
self.ctx.runtime.runner.run(
["curl", "-fSL", "-o", str(target), url], check=True,
)
target.chmod(0o755)
state.packages[pkg.name] = InstalledPackage(
name=pkg.name,
version=pkg.version or "latest",
type="appimage",
files=[target],
)
def remove(
self,
package_names: list[str],
@@ -230,28 +99,22 @@ class PackageService:
) -> None:
"""Remove installed packages."""
installed = self._load_state()
missing = [name for name in package_names if name not in installed.packages]
if missing:
raise FlowError(f"Package(s) not installed: {', '.join(missing)}")
plan = plan_remove(package_names, installed)
if not plan.remove_ops:
self.ctx.console.info("No matching packages to remove.")
return
raise FlowError("No packages selected for removal")
self.ctx.console.print_plan(
[str(op) for op in plan.remove_ops], verb="remove"
)
action_plan = self._remove_action_plan(plan, installed)
if dry_run:
self._executor().render_plan(action_plan)
return
for op in plan.remove_ops:
for f in op.files:
if f.is_dir():
self.ctx.runtime.fs.remove_tree(f)
else:
self.ctx.runtime.fs.remove_file(f, missing_ok=True)
installed.packages.pop(op.name, None)
self._save_state(installed)
self.ctx.console.print_plan([str(op) for op in plan.remove_ops], verb="remove")
self._executor().execute(action_plan)
self.ctx.console.success(f"Removed {len(plan.remove_ops)} package(s).")
def list_packages(self, *, show_all: bool = False) -> None:
@@ -289,7 +152,14 @@ class PackageService:
return InstalledState.from_dict(data)
def _save_state(self, state: InstalledState) -> None:
self.ctx.runtime.fs.write_json(paths.INSTALLED_STATE, state.as_dict())
self._executor().execute(
ActionPlan(
name="packages.write-state",
primitive_actions=(
self._state_write_action(state),
),
)
)
def _binary_context(self, pkg: PackageDef) -> dict[str, Any]:
return {
@@ -327,12 +197,25 @@ class PackageService:
)
destination = destination_root / stripped_path
if source.is_dir():
self.ctx.runtime.fs.copy_tree(source, destination)
else:
self.ctx.runtime.fs.copy_file(source, destination)
if section == "bin":
destination.chmod(destination.stat().st_mode | 0o111)
self._executor().execute(
ActionPlan(
name=f"package.copy-install-item.{package_name}",
primitive_actions=(
PrimitiveAction(
id=f"package.{package_name}.copy-install-item",
type="file.copy",
description=f"Install {declared_path} to {destination}",
payload={
"source": source,
"target": destination,
"source_root": source_root_resolved,
"make_executable": section == "bin",
},
rollback_policy=RollbackPolicy.ROLLBACKABLE,
),
),
)
)
return destination
@@ -341,7 +224,20 @@ class PackageService:
return
script = substitute_template(pkg.post_install, self._binary_context(pkg))
self.ctx.runtime.runner.run_shell(script, check=True)
self._executor().execute(
ActionPlan(
name=f"package.post-install.{pkg.name}",
primitive_actions=(
PrimitiveAction(
id=f"package.{pkg.name}.post-install",
type="process.shell_user_hook",
description=f"Run post-install hook for {pkg.name}",
payload={"command": script},
rollback_policy=RollbackPolicy.BARRIER,
),
),
)
)
def _install_destination(self, section: str) -> Path:
destinations = {
@@ -391,3 +287,337 @@ class PackageService:
raise FlowError(
f"Install path for '{package_name}' must not include parent traversal: {declared_path}"
)
def _install_action_plan(
self,
plan: PackagePlan,
installed: InstalledState,
pm: str | None,
) -> ActionPlan:
projected = InstalledState(
packages={
name: InstalledPackage(
name=pkg.name,
version=pkg.version,
type=pkg.type,
files=list(pkg.files),
)
for name, pkg in installed.packages.items()
}
)
actions: list[DomainAction] = []
if plan.pm_update_needed and pm:
actions.append(
DomainAction(
id=f"package-manager.{pm}.update",
kind="package",
action="update",
description=f"Update package manager ({pm})",
payload={
"primitive_actions": (
PrimitiveAction(
id=f"package-manager.{pm}.update.argv",
type="process.argv",
description=f"Update package manager ({pm})",
payload={
"argv": tuple(pm_update_argv(pm)),
"allowed_returncodes": (0, 100) if pm == "dnf" else (0,),
},
rollback_policy=RollbackPolicy.BARRIER,
),
)
},
rollback_policy=RollbackPolicy.BARRIER,
)
)
pm_ops = [op for op in plan.install_ops if op.method == "pm"]
if pm_ops:
if pm is None:
raise FlowError("Package-manager install planned without a package manager")
primitives: list[PrimitiveAction] = [
PrimitiveAction(
id=f"package-manager.{pm}.install",
type="process.argv",
description=f"Install packages with {pm}: {', '.join(op.source_name for op in pm_ops)}",
payload={"argv": tuple(pm_install_argv(pm, [op.source_name for op in pm_ops]))},
rollback_policy=RollbackPolicy.BARRIER,
)
]
for op in pm_ops:
projected.packages[op.package.name] = InstalledPackage(
name=op.package.name,
version=op.package.version or "system",
type="pkg",
)
hook = self._post_install_primitive(op.package)
if hook is not None:
primitives.append(hook)
actions.append(
DomainAction(
id="package.install.pm",
kind="package",
action="install",
description="Install package-manager packages",
payload={"primitive_actions": tuple(primitives)},
rollback_policy=RollbackPolicy.BARRIER,
)
)
cask_ops = [op for op in plan.install_ops if op.method == "cask"]
if cask_ops:
if pm is None:
raise FlowError("Cask install planned without a package manager")
primitives = [
PrimitiveAction(
id=f"package-manager.{pm}.install-cask",
type="process.argv",
description=f"Install casks with {pm}: {', '.join(op.source_name for op in cask_ops)}",
payload={"argv": tuple(pm_cask_install_argv(pm, [op.source_name for op in cask_ops]))},
rollback_policy=RollbackPolicy.BARRIER,
)
]
for op in cask_ops:
projected.packages[op.package.name] = InstalledPackage(
name=op.package.name,
version=op.package.version or "system",
type="cask",
)
hook = self._post_install_primitive(op.package)
if hook is not None:
primitives.append(hook)
actions.append(
DomainAction(
id="package.install.cask",
kind="package",
action="install",
description="Install cask packages",
payload={"primitive_actions": tuple(primitives)},
rollback_policy=RollbackPolicy.BARRIER,
)
)
for op in plan.install_ops:
if op.method == "binary":
primitives, record = self._binary_install_primitives(op)
elif op.method == "appimage":
primitives, record = self._appimage_install_primitives(op)
else:
continue
projected.packages[record.name] = record
hook = self._post_install_primitive(op.package)
if hook is not None:
primitives = (*primitives, hook)
actions.append(
DomainAction(
id=f"package.install.{op.package.name}",
kind="package",
action="install",
description=f"Install {op.package.name}",
payload={"primitive_actions": tuple(primitives)},
rollback_policy=RollbackPolicy.ROLLBACKABLE,
)
)
return ActionPlan(
name="packages.install",
domain_actions=tuple(actions),
primitive_actions=(self._state_write_action(projected),),
)
def _remove_action_plan(
self,
plan: PackagePlan,
installed: InstalledState,
) -> ActionPlan:
projected = InstalledState(
packages={
name: InstalledPackage(
name=pkg.name,
version=pkg.version,
type=pkg.type,
files=list(pkg.files),
)
for name, pkg in installed.packages.items()
}
)
actions: list[DomainAction] = []
for op in plan.remove_ops:
primitives = tuple(
PrimitiveAction(
id=f"package.{op.name}.remove.{index}",
type="file.remove",
description=f"Remove installed file {path}",
payload={"path": path, "missing_ok": False},
rollback_policy=RollbackPolicy.ROLLBACKABLE,
)
for index, path in enumerate(op.files)
)
actions.append(
DomainAction(
id=f"package.remove.{op.name}",
kind="package",
action="remove",
description=f"Remove package {op.name}",
payload={"primitive_actions": primitives},
rollback_policy=RollbackPolicy.ROLLBACKABLE,
)
)
projected.packages.pop(op.name, None)
return ActionPlan(
name="packages.remove",
domain_actions=tuple(actions),
primitive_actions=(self._state_write_action(projected),),
)
def _binary_install_primitives(
self,
op: PkgInstallOp,
) -> tuple[tuple[PrimitiveAction, ...], InstalledPackage]:
pkg = op.package
if not op.download_url:
raise FlowError(f"Binary package '{pkg.name}' has no download URL")
install_map = pkg.install
if not install_map:
raise FlowError(f"Binary package '{pkg.name}' must define install paths")
context = self._binary_context(pkg)
workspace = self._packages_dir() / "work" / f"{pkg.name}-{uuid.uuid4().hex}"
archive = workspace / op.source_name
extracted = workspace / "extract"
extract_dir = resolve_extract_dir(pkg, self.ctx.platform.platform)
source_root = extracted if extract_dir is None else extracted / extract_dir
primitives: list[PrimitiveAction] = [
PrimitiveAction(
id=f"package.{pkg.name}.download",
type="download.file",
description=f"Download {pkg.name}",
payload={"url": op.download_url, "target": archive},
rollback_policy=RollbackPolicy.ROLLBACKABLE,
),
PrimitiveAction(
id=f"package.{pkg.name}.extract",
type="archive.extract",
description=f"Extract {pkg.name}",
payload={"archive": archive, "target": extracted},
rollback_policy=RollbackPolicy.ROLLBACKABLE,
),
]
installed_paths: list[Path] = []
for section in ("bin", "share", "man", "lib"):
if section not in install_map:
continue
items = install_map[section]
if not isinstance(items, list):
raise FlowError(f"Install section '{section}' for '{pkg.name}' must be a list")
for item in items:
if not isinstance(item, str):
raise FlowError(f"Install paths for '{pkg.name}' must be strings")
declared_path = Path(substitute_template(item, context))
self._validate_install_path(pkg.name, declared_path)
stripped_path = self._strip_prefix(
declared_path,
self._install_strip_prefix(section),
pkg.name,
section,
)
destination = self._install_destination(section) / stripped_path
installed_paths.append(destination)
primitives.append(
PrimitiveAction(
id=f"package.{pkg.name}.copy.{len(installed_paths)}",
type="file.copy",
description=f"Install {declared_path} to {destination}",
payload={
"source": source_root / declared_path,
"target": destination,
"source_root": source_root,
"make_executable": section == "bin",
},
rollback_policy=RollbackPolicy.ROLLBACKABLE,
)
)
if not installed_paths:
raise FlowError(f"Binary package '{pkg.name}' installed no files")
primitives.append(
PrimitiveAction(
id=f"package.{pkg.name}.cleanup",
type="file.remove",
description=f"Remove temporary workspace {workspace}",
payload={"path": workspace, "missing_ok": True},
rollback_policy=RollbackPolicy.NONE,
)
)
return (
tuple(primitives),
InstalledPackage(
name=pkg.name,
version=pkg.version or "latest",
type="binary",
files=installed_paths,
),
)
def _appimage_install_primitives(
self,
op: PkgInstallOp,
) -> tuple[tuple[PrimitiveAction, ...], InstalledPackage]:
pkg = op.package
if not op.download_url:
raise FlowError(f"AppImage package '{pkg.name}' has no download URL")
target = paths.HOME / ".local" / "bin" / pkg.name
primitives = (
PrimitiveAction(
id=f"package.{pkg.name}.download",
type="download.file",
description=f"Download {pkg.name} AppImage",
payload={"url": op.download_url, "target": target},
rollback_policy=RollbackPolicy.ROLLBACKABLE,
),
PrimitiveAction(
id=f"package.{pkg.name}.chmod",
type="file.chmod",
description=f"Mark {target} executable",
payload={"path": target, "mode": 0o755},
rollback_policy=RollbackPolicy.ROLLBACKABLE,
),
)
return (
primitives,
InstalledPackage(
name=pkg.name,
version=pkg.version or "latest",
type="appimage",
files=[target],
),
)
def _post_install_primitive(self, pkg: PackageDef) -> PrimitiveAction | None:
if not pkg.post_install:
return None
return PrimitiveAction(
id=f"package.{pkg.name}.post-install",
type="process.shell_user_hook",
description=f"Run post-install hook for {pkg.name}",
payload={"command": substitute_template(pkg.post_install, self._binary_context(pkg))},
rollback_policy=RollbackPolicy.BARRIER,
)
def _state_write_action(self, state: InstalledState) -> PrimitiveAction:
return PrimitiveAction(
id="packages.state.write",
type="file.write_json",
description=f"Write package state to {paths.INSTALLED_STATE}",
payload={"path": paths.INSTALLED_STATE, "data": state.as_dict()},
rollback_policy=RollbackPolicy.ROLLBACKABLE,
)
def _executor(self) -> ActionExecutor:
return ActionExecutor(self.ctx, audit_path=paths.INSTALLED_STATE.parent / "actions.jsonl")
def _packages_dir(self) -> Path:
return paths.DATA_DIR / "packages"

View File

@@ -6,6 +6,7 @@ import getpass
import os
from typing import Optional
from flow.actions import ActionExecutor, ActionPlan, DomainAction
from flow.core.config import FlowContext
from flow.domain.remote.resolution import (
build_ssh_command,
@@ -44,17 +45,25 @@ class RemoteService:
no_tmux=no_tmux,
)
self.ctx.console.info(f"Connecting to {target.label} ({target.host})")
action_plan = ActionPlan(
name="remote.enter",
domain_actions=(
DomainAction(
id=f"remote.enter.{target.label}",
kind="remote",
action="enter",
description=f"Connect to {target.label} ({target.host})",
payload={"argv": tuple(cmd.argv)},
),
),
)
if dry_run:
self.ctx.console.info(f"Would run: {' '.join(cmd.argv)}")
ActionExecutor(self.ctx).render_plan(action_plan)
return
self.ctx.runtime.runner.run(
cmd.argv,
capture_output=False,
check=True,
)
self.ctx.console.info(f"Connecting to {target.label} ({target.host})")
ActionExecutor(self.ctx).execute(action_plan)
def list(self) -> None:
"""List configured targets."""

View File

@@ -0,0 +1,140 @@
"""Tests for the canonical action executor."""
from __future__ import annotations
import json
import sys
import pytest
from flow.actions import ActionExecutor, ActionPlan, PrimitiveAction, RollbackPolicy
from flow.core.config import AppConfig, FlowContext
from flow.core.console import Console
from flow.core.errors import FlowError
from flow.core.platform import PlatformInfo
from flow.core.runtime import SystemRuntime
def _ctx() -> FlowContext:
return FlowContext(
config=AppConfig(),
manifest={},
platform=PlatformInfo(),
console=Console(color=False),
runtime=SystemRuntime(),
)
def test_dry_run_does_not_mutate(tmp_path):
target = tmp_path / "out.txt"
plan = ActionPlan(
name="dry-run",
primitive_actions=(
PrimitiveAction(
id="write",
type="file.write",
description="Write a file",
payload={"path": target, "content": "hello"},
),
),
)
summary = ActionExecutor(_ctx(), audit_path=tmp_path / "actions.jsonl").execute(
plan,
dry_run=True,
)
assert not target.exists()
assert summary.results[0].status == "dry-run"
def test_audit_jsonl_records_success(tmp_path):
target = tmp_path / "out.txt"
audit_path = tmp_path / "actions.jsonl"
plan = ActionPlan(
name="write-plan",
primitive_actions=(
PrimitiveAction(
id="write",
type="file.write",
description="Write a file",
payload={"path": target, "content": "hello"},
),
),
)
ActionExecutor(_ctx(), audit_path=audit_path).execute(plan)
records = [json.loads(line) for line in audit_path.read_text().splitlines()]
assert [record["event"] for record in records] == [
"plan_start",
"action_start",
"action_success",
"plan_success",
]
assert target.read_text() == "hello"
def test_rollback_removes_created_symlink_after_failure(tmp_path):
source = tmp_path / "source"
target = tmp_path / "target"
source.write_text("managed")
plan = ActionPlan(
name="rollback",
primitive_actions=(
PrimitiveAction(
id="link",
type="file.create_symlink",
description="Create managed link",
payload={"source": source, "target": target},
),
PrimitiveAction(
id="copy-missing",
type="file.copy",
description="Fail on missing source",
payload={"source": tmp_path / "missing", "target": tmp_path / "dest"},
),
),
)
with pytest.raises(FlowError, match="missing"):
ActionExecutor(_ctx(), audit_path=tmp_path / "actions.jsonl").execute(plan)
assert not target.exists()
assert not target.is_symlink()
def test_barrier_prevents_rollback_across_external_boundary(tmp_path):
source = tmp_path / "source"
target = tmp_path / "target"
source.write_text("managed")
plan = ActionPlan(
name="barrier",
primitive_actions=(
PrimitiveAction(
id="link",
type="file.create_symlink",
description="Create managed link",
payload={"source": source, "target": target},
),
PrimitiveAction(
id="barrier",
type="process.argv",
description="External boundary",
payload={"argv": (sys.executable, "-c", "")},
rollback_policy=RollbackPolicy.BARRIER,
),
PrimitiveAction(
id="copy-missing",
type="file.copy",
description="Fail on missing source",
payload={"source": tmp_path / "missing", "target": tmp_path / "dest"},
),
),
)
with pytest.raises(FlowError, match="missing"):
ActionExecutor(_ctx(), audit_path=tmp_path / "actions.jsonl").execute(plan)
assert target.is_symlink()

View File

@@ -4,6 +4,19 @@ import os
import subprocess
import sys
from typer.testing import CliRunner
from flow.cli import app
runner = CliRunner()
def test_typer_runner_version():
result = runner.invoke(app, ["--version"])
assert result.exit_code == 0
assert "flow" in result.stdout
def test_version_flag():
"""Test --version flag works."""

View File

@@ -9,7 +9,7 @@ import yaml
from flow.core.config import AppConfig, FlowContext
from flow.core.console import Console
from flow.core.errors import FlowError
from flow.core.errors import FlowError, PlanConflict
from flow.core.platform import PlatformInfo
from flow.core.runtime import SystemRuntime
from flow.core import paths
@@ -187,7 +187,7 @@ class TestDotfilesServiceLink:
output = capsys.readouterr().out
assert "zsh" in output
def test_relink_does_not_remove_unmanaged_file(self, tmp_path, monkeypatch):
def test_relink_fails_on_unmanaged_file(self, tmp_path, monkeypatch):
home = tmp_path / "home"
home.mkdir()
@@ -208,7 +208,8 @@ class TestDotfilesServiceLink:
target.unlink()
target.write_text("user managed file")
svc.link()
with pytest.raises(PlanConflict):
svc.link()
assert target.read_text() == "user managed file"
assert not target.is_symlink()
@@ -619,12 +620,10 @@ class TestUnclonedModuleWarning:
), console.warnings
class TestOrphanAdoption:
"""After a partial-failure rerun the planner adopts pre-existing matching
symlinks. We simulate a failure during _apply_plan by replacing
create_symlink mid-flight, then re-running link()."""
class TestActionRollback:
"""A mid-execution failure rolls back created links and leaves state absent."""
def test_partial_apply_failure_recoverable_via_rerun(self, tmp_path, monkeypatch):
def test_partial_apply_failure_rolls_back_then_rerun_succeeds(self, tmp_path, monkeypatch):
home = tmp_path / "home"
home.mkdir()
dotfiles = _setup_dotfiles(tmp_path, {
@@ -659,13 +658,13 @@ class TestOrphanAdoption:
# State file should NOT have been written (atomic semantics: we
# only persist when _apply_plan completes).
assert not state_path.exists()
# First symlink landed on disk.
# The first symlink was rolled back.
existing_links = sorted(
p.name for p in home.rglob("*") if p.is_symlink()
)
assert len(existing_links) == 1
assert existing_links == []
# Restore real implementation and re-run: orphan adoption kicks in.
# Restore real implementation and re-run.
monkeypatch.setattr(ctx.runtime.fs, "create_symlink", real_create)
svc.link()

View File

@@ -57,8 +57,8 @@ class TestPackageService:
monkeypatch.setattr(paths, "INSTALLED_STATE", tmp_path / "installed.json")
ctx = _make_ctx(tmp_path)
svc = PackageService(ctx)
svc.remove(["missing"])
assert "No matching" in capsys.readouterr().out
with pytest.raises(FlowError, match="not installed"):
svc.remove(["missing"])
def test_install_requires_args(self, tmp_path, monkeypatch):
monkeypatch.setattr(paths, "INSTALLED_STATE", tmp_path / "installed.json")
@@ -106,7 +106,7 @@ class TestPackageService:
def read(self):
return archive_bytes
monkeypatch.setattr("flow.services.packages.urllib.request.urlopen", lambda *args, **kwargs: FakeResponse())
monkeypatch.setattr("flow.adapters.download.urllib.request.urlopen", lambda *args, **kwargs: FakeResponse())
manifest = {
"packages": [{
@@ -176,7 +176,7 @@ class TestPackageService:
raise urllib.error.URLError("Network unreachable")
monkeypatch.setattr(
"flow.services.packages.urllib.request.urlopen", _raise,
"flow.adapters.download.urllib.request.urlopen", _raise,
)
manifest = {

View File

@@ -0,0 +1,45 @@
"""Static guard for direct filesystem mutation outside adapters/actions."""
from __future__ import annotations
import re
from pathlib import Path
MUTATING_PATTERNS = (
re.compile(r"\.(mkdir|unlink|write_text|write_bytes|symlink_to|chmod)\("),
re.compile(r"\b(os\.replace|shutil\.rmtree|shutil\.copy2|shutil\.copytree)\("),
re.compile(r"runtime\.fs\.(create_symlink|remove_symlink|write_json|write_text|write_bytes|copy_file|copy_tree|remove_file|remove_tree)\("),
)
ALLOWED_PREFIXES = (
Path("src/flow/adapters"),
Path("src/flow/actions"),
Path("tests"),
)
ALLOWED_FILES = {
Path("src/flow/core/paths.py"),
}
SKIPPED_LEGACY_COMMANDS = {
Path("src/flow/commands/completion.py"),
}
def test_no_direct_filesystem_mutation_outside_action_boundary():
root = Path(__file__).resolve().parents[1]
offenders: list[str] = []
for path in sorted((root / "src" / "flow").rglob("*.py")):
rel = path.relative_to(root)
if rel in ALLOWED_FILES or rel in SKIPPED_LEGACY_COMMANDS:
continue
if any(rel.is_relative_to(prefix) for prefix in ALLOWED_PREFIXES):
continue
for line_no, line in enumerate(path.read_text(encoding="utf-8").splitlines(), 1):
if "Service(" in line:
continue
if any(pattern.search(line) for pattern in MUTATING_PATTERNS):
offenders.append(f"{rel}:{line_no}: {line.strip()}")
assert offenders == []

144
uv.lock generated
View File

@@ -15,6 +15,45 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/a9/ba/000a1996d4308bc65120167c21241a3b205464a2e0b58deda26ae8ac21d1/altgraph-0.17.5-py2.py3-none-any.whl", hash = "sha256:f3a22400bce1b0c701683820ac4f3b159cd301acab067c51c653e06961600597", size = 21228, upload-time = "2025-11-21T20:35:49.444Z" },
]
[[package]]
name = "annotated-doc"
version = "0.0.4"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/57/ba/046ceea27344560984e26a590f90bc7f4a75b06701f653222458922b558c/annotated_doc-0.0.4.tar.gz", hash = "sha256:fbcda96e87e9c92ad167c2e53839e57503ecfda18804ea28102353485033faa4", size = 7288, upload-time = "2025-11-10T22:07:42.062Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/1e/d3/26bf1008eb3d2daa8ef4cacc7f3bfdc11818d111f7e2d0201bc6e3b49d45/annotated_doc-0.0.4-py3-none-any.whl", hash = "sha256:571ac1dc6991c450b25a9c2d84a3705e2ae7a53467b5d111c24fa8baabbed320", size = 5303, upload-time = "2025-11-10T22:07:40.673Z" },
]
[[package]]
name = "click"
version = "8.1.8"
source = { registry = "https://pypi.org/simple" }
resolution-markers = [
"python_full_version < '3.10'",
]
dependencies = [
{ name = "colorama", marker = "python_full_version < '3.10' and sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/b9/2e/0090cbf739cee7d23781ad4b89a9894a41538e4fcf4c31dcdd705b78eb8b/click-8.1.8.tar.gz", hash = "sha256:ed53c9d8990d83c2a27deae68e4ee337473f6330c040a31d4225c9574d16096a", size = 226593, upload-time = "2024-12-21T18:38:44.339Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/7e/d4/7ebdbd03970677812aac39c869717059dbb71a4cfc033ca6e5221787892c/click-8.1.8-py3-none-any.whl", hash = "sha256:63c132bbbed01578a06712a2d1f497bb62d9c1c0d329b7903a866228027263b2", size = 98188, upload-time = "2024-12-21T18:38:41.666Z" },
]
[[package]]
name = "click"
version = "8.3.3"
source = { registry = "https://pypi.org/simple" }
resolution-markers = [
"python_full_version >= '3.10'",
]
dependencies = [
{ name = "colorama", marker = "python_full_version >= '3.10' and sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/bb/63/f9e1ea081ce35720d8b92acde70daaedace594dc93b693c869e0d5910718/click-8.3.3.tar.gz", hash = "sha256:398329ad4837b2ff7cbe1dd166a4c0f8900c3ca3a218de04466f38f6497f18a2", size = 328061, upload-time = "2026-04-22T15:11:27.506Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ae/44/c1221527f6a71a01ec6fbad7fa78f1d50dfa02217385cf0fa3eec7087d59/click-8.3.3-py3-none-any.whl", hash = "sha256:a2bf429bb3033c89fa4936ffb35d5cb471e3719e1f3c8a7c3fff0b8314305613", size = 110502, upload-time = "2026-04-22T15:11:25.044Z" },
]
[[package]]
name = "colorama"
version = "0.4.6"
@@ -41,6 +80,9 @@ name = "flow"
source = { editable = "." }
dependencies = [
{ name = "pyyaml" },
{ name = "rich" },
{ name = "typer", version = "0.23.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" },
{ name = "typer", version = "0.25.1", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" },
]
[package.optional-dependencies]
@@ -57,6 +99,8 @@ requires-dist = [
{ name = "pyinstaller", marker = "extra == 'build'", specifier = ">=6.0" },
{ name = "pytest", marker = "extra == 'dev'", specifier = ">=7.0" },
{ name = "pyyaml", specifier = ">=6.0" },
{ name = "rich", specifier = ">=13.7" },
{ name = "typer", specifier = ">=0.12" },
]
provides-extras = ["build", "dev"]
@@ -65,7 +109,7 @@ name = "importlib-metadata"
version = "8.7.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "zipp" },
{ name = "zipp", marker = "python_full_version < '3.10'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/f3/49/3b30cad09e7771a4982d9975a8cbf64f00d4a1ececb53297f1d9a7be1b10/importlib_metadata-8.7.1.tar.gz", hash = "sha256:49fef1ae6440c182052f407c8d34a68f72efc36db9ca90dc0113398f2fdde8bb", size = 57107, upload-time = "2025-12-21T10:00:19.278Z" }
wheels = [
@@ -108,6 +152,45 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/c7/d1/a9f36f8ecdf0fb7c9b1e78c8d7af12b8c8754e74851ac7b94a8305540fc7/macholib-1.16.4-py2.py3-none-any.whl", hash = "sha256:da1a3fa8266e30f0ce7e97c6a54eefaae8edd1e5f86f3eb8b95457cae90265ea", size = 38117, upload-time = "2025-11-22T08:28:36.939Z" },
]
[[package]]
name = "markdown-it-py"
version = "3.0.0"
source = { registry = "https://pypi.org/simple" }
resolution-markers = [
"python_full_version < '3.10'",
]
dependencies = [
{ name = "mdurl", marker = "python_full_version < '3.10'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/38/71/3b932df36c1a044d397a1f92d1cf91ee0a503d91e470cbd670aa66b07ed0/markdown-it-py-3.0.0.tar.gz", hash = "sha256:e3f60a94fa066dc52ec76661e37c851cb232d92f9886b15cb560aaada2df8feb", size = 74596, upload-time = "2023-06-03T06:41:14.443Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/42/d7/1ec15b46af6af88f19b8e5ffea08fa375d433c998b8a7639e76935c14f1f/markdown_it_py-3.0.0-py3-none-any.whl", hash = "sha256:355216845c60bd96232cd8d8c40e8f9765cc86f46880e43a8fd22dc1a1a8cab1", size = 87528, upload-time = "2023-06-03T06:41:11.019Z" },
]
[[package]]
name = "markdown-it-py"
version = "4.2.0"
source = { registry = "https://pypi.org/simple" }
resolution-markers = [
"python_full_version >= '3.10'",
]
dependencies = [
{ name = "mdurl", marker = "python_full_version >= '3.10'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/06/ff/7841249c247aa650a76b9ee4bbaeae59370dc8bfd2f6c01f3630c35eb134/markdown_it_py-4.2.0.tar.gz", hash = "sha256:04a21681d6fbb623de53f6f364d352309d4094dd4194040a10fd51833e418d49", size = 82454, upload-time = "2026-05-07T12:08:28.36Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b3/81/4da04ced5a082363ecfa159c010d200ecbd959ae410c10c0264a38cac0f5/markdown_it_py-4.2.0-py3-none-any.whl", hash = "sha256:9f7ebbcd14fe59494226453aed97c1070d83f8d24b6fc3a3bcf9a38092641c4a", size = 91687, upload-time = "2026-05-07T12:08:27.182Z" },
]
[[package]]
name = "mdurl"
version = "0.1.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d6/54/cfe61301667036ec958cb99bd3efefba235e65cdeb9c84d24a8293ba1d90/mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba", size = 8729, upload-time = "2022-08-14T12:40:10.846Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b3/38/89ba8ad64ae25be8de66a6d463314cf1eb366222074cfda9ee839c56a4b4/mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8", size = 9979, upload-time = "2022-08-14T12:40:09.779Z" },
]
[[package]]
name = "packaging"
version = "26.0"
@@ -311,6 +394,20 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/f0/0c/25113e0b5e103d7f1490c0e947e303fe4a696c10b501dea7a9f49d4e876c/pyyaml-6.0.3-cp39-cp39-win_amd64.whl", hash = "sha256:2e71d11abed7344e42a8849600193d15b6def118602c4c176f748e4583246007", size = 158777, upload-time = "2025-09-25T21:33:15.55Z" },
]
[[package]]
name = "rich"
version = "15.0.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markdown-it-py", version = "3.0.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" },
{ name = "markdown-it-py", version = "4.2.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" },
{ name = "pygments" },
]
sdist = { url = "https://files.pythonhosted.org/packages/c0/8f/0722ca900cc807c13a6a0c696dacf35430f72e0ec571c4275d2371fca3e9/rich-15.0.0.tar.gz", hash = "sha256:edd07a4824c6b40189fb7ac9bc4c52536e9780fbbfbddf6f1e2502c31b068c36", size = 230680, upload-time = "2026-04-12T08:24:00.75Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/82/3b/64d4899d73f91ba49a8c18a8ff3f0ea8f1c1d75481760df8c68ef5235bf5/rich-15.0.0-py3-none-any.whl", hash = "sha256:33bd4ef74232fb73fe9279a257718407f169c09b78a87ad3d296f548e27de0bb", size = 310654, upload-time = "2026-04-12T08:24:02.83Z" },
]
[[package]]
name = "setuptools"
version = "82.0.1"
@@ -320,6 +417,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/9d/76/f789f7a86709c6b087c5a2f52f911838cad707cc613162401badc665acfe/setuptools-82.0.1-py3-none-any.whl", hash = "sha256:a59e362652f08dcd477c78bb6e7bd9d80a7995bc73ce773050228a348ce2e5bb", size = 1006223, upload-time = "2026-03-09T12:47:15.026Z" },
]
[[package]]
name = "shellingham"
version = "1.5.4"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/58/15/8b3609fd3830ef7b27b655beb4b4e9c62313a4e8da8c676e142cc210d58e/shellingham-1.5.4.tar.gz", hash = "sha256:8dbca0739d487e5bd35ab3ca4b36e11c4078f3a234bfce294b0a0291363404de", size = 10310, upload-time = "2023-10-24T04:13:40.426Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e0/f9/0595336914c5619e5f28a1fb793285925a8cd4b432c9da0a987836c7f822/shellingham-1.5.4-py2.py3-none-any.whl", hash = "sha256:7ecfff8f2fd72616f7481040475a65b2bf8af90a56c89140852d1120324e8686", size = 9755, upload-time = "2023-10-24T04:13:38.866Z" },
]
[[package]]
name = "tomli"
version = "2.4.0"
@@ -374,6 +480,42 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/23/d1/136eb2cb77520a31e1f64cbae9d33ec6df0d78bdf4160398e86eec8a8754/tomli-2.4.0-py3-none-any.whl", hash = "sha256:1f776e7d669ebceb01dee46484485f43a4048746235e683bcdffacdf1fb4785a", size = 14477, upload-time = "2026-01-11T11:22:37.446Z" },
]
[[package]]
name = "typer"
version = "0.23.2"
source = { registry = "https://pypi.org/simple" }
resolution-markers = [
"python_full_version < '3.10'",
]
dependencies = [
{ name = "annotated-doc", marker = "python_full_version < '3.10'" },
{ name = "click", version = "8.1.8", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" },
{ name = "rich", marker = "python_full_version < '3.10'" },
{ name = "shellingham", marker = "python_full_version < '3.10'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/d3/ae/93d16574e66dfe4c2284ffdaca4b0320ade32858cb2cc586c8dd79f127c5/typer-0.23.2.tar.gz", hash = "sha256:a99706a08e54f1aef8bb6a8611503808188a4092808e86addff1828a208af0de", size = 120162, upload-time = "2026-02-16T18:52:40.354Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/14/2c/dee705c427875402200fe779eb8a3c00ccb349471172c41178336e9599cc/typer-0.23.2-py3-none-any.whl", hash = "sha256:e9c8dc380f82450b3c851a9b9d5a0edf95d1d6456ae70c517d8b06a50c7a9978", size = 56834, upload-time = "2026-02-16T18:52:39.308Z" },
]
[[package]]
name = "typer"
version = "0.25.1"
source = { registry = "https://pypi.org/simple" }
resolution-markers = [
"python_full_version >= '3.10'",
]
dependencies = [
{ name = "annotated-doc", marker = "python_full_version >= '3.10'" },
{ name = "click", version = "8.3.3", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" },
{ name = "rich", marker = "python_full_version >= '3.10'" },
{ name = "shellingham", marker = "python_full_version >= '3.10'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/e4/51/9aed62104cea109b820bbd6c14245af756112017d309da813ef107d42e7e/typer-0.25.1.tar.gz", hash = "sha256:9616eb8853a09ffeabab1698952f33c6f29ffdbceb4eaeecf571880e8d7664cc", size = 122276, upload-time = "2026-04-30T19:32:16.964Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/3f/f9/2b3ff4e56e5fa7debfaf9eb135d0da96f3e9a1d5b27222223c7296336e5f/typer-0.25.1-py3-none-any.whl", hash = "sha256:75caa44ed46a03fb2dab8808753ffacdbfea88495e74c85a28c5eefcf5f39c89", size = 58409, upload-time = "2026-04-30T19:32:18.271Z" },
]
[[package]]
name = "typing-extensions"
version = "4.15.0"