"""Tests for the canonical action executor.""" from __future__ import annotations import json import subprocess import sys import pytest from flow.actions import ActionExecutor, ActionPlan, DomainAction, PrimitiveAction, RollbackPolicy from flow.adapters.containers import ContainerRuntime from flow.adapters.tmux import TmuxClient from flow.core.config import AppConfig, FlowContext from flow.core.console import Console from flow.core.errors import FlowError from flow.core.platform import PlatformInfo from flow.core.runtime import SystemRuntime from tests.fakes import FakeRunner def _ctx() -> FlowContext: return FlowContext( config=AppConfig(), manifest={}, platform=PlatformInfo(), console=Console(color=False), runtime=SystemRuntime(), ) def test_dry_run_does_not_mutate(tmp_path): target = tmp_path / "out.txt" plan = ActionPlan( name="dry-run", primitive_actions=( PrimitiveAction( id="write", type="file.write", description="Write a file", payload={"path": target, "content": "hello"}, ), ), ) summary = ActionExecutor(_ctx(), audit_path=tmp_path / "actions.jsonl").execute( plan, dry_run=True, ) assert not target.exists() assert summary.results[0].status == "dry-run" def test_audit_jsonl_records_success(tmp_path): target = tmp_path / "out.txt" audit_path = tmp_path / "actions.jsonl" plan = ActionPlan( name="write-plan", primitive_actions=( PrimitiveAction( id="write", type="file.write", description="Write a file", payload={"path": target, "content": "hello"}, ), ), ) ActionExecutor(_ctx(), audit_path=audit_path).execute(plan) records = [json.loads(line) for line in audit_path.read_text().splitlines()] assert [record["event"] for record in records] == [ "plan_start", "action_start", "action_success", "plan_success", ] assert target.read_text() == "hello" def test_rollback_removes_created_symlink_after_failure(tmp_path): source = tmp_path / "source" target = tmp_path / "target" source.write_text("managed") plan = ActionPlan( name="rollback", primitive_actions=( PrimitiveAction( id="link", type="file.create_symlink", description="Create managed link", payload={"source": source, "target": target}, ), PrimitiveAction( id="copy-missing", type="file.copy", description="Fail on missing source", payload={"source": tmp_path / "missing", "target": tmp_path / "dest"}, ), ), ) with pytest.raises(FlowError, match="missing"): ActionExecutor(_ctx(), audit_path=tmp_path / "actions.jsonl").execute(plan) assert not target.exists() assert not target.is_symlink() def test_barrier_prevents_rollback_across_external_boundary(tmp_path): source = tmp_path / "source" target = tmp_path / "target" source.write_text("managed") plan = ActionPlan( name="barrier", primitive_actions=( PrimitiveAction( id="link", type="file.create_symlink", description="Create managed link", payload={"source": source, "target": target}, ), PrimitiveAction( id="barrier", type="process.argv", description="External boundary", payload={"argv": (sys.executable, "-c", "")}, rollback_policy=RollbackPolicy.BARRIER, ), PrimitiveAction( id="copy-missing", type="file.copy", description="Fail on missing source", payload={"source": tmp_path / "missing", "target": tmp_path / "dest"}, ), ), ) with pytest.raises(FlowError, match="missing"): ActionExecutor(_ctx(), audit_path=tmp_path / "actions.jsonl").execute(plan) assert target.is_symlink() def test_domain_action_expands_embedded_primitives(tmp_path): target = tmp_path / "completion" primitive = PrimitiveAction( id="completion.write", type="file.write", description="Write completion", payload={"path": target, "content": "complete"}, ) plan = ActionPlan( name="completion.install", domain_actions=( DomainAction( id="completion.install", kind="completion", action="install-zsh", description="Install completion", payload={"primitive_actions": (primitive,)}, ), ), ) ActionExecutor(_ctx(), audit_path=tmp_path / "actions.jsonl").execute(plan) assert target.read_text() == "complete" def test_executor_dispatches_container_and_tmux_primitives(tmp_path): runner = FakeRunner() ctx = _ctx() ctx.runtime.runner = runner ctx.runtime.containers = ContainerRuntime(runner, binary="docker") ctx.runtime.tmux = TmuxClient(runner) plan = ActionPlan( name="runtime-dispatch", primitive_actions=( PrimitiveAction( id="container.exec", type="container.exec", description="Run command in container", payload={"name": "dev-api", "argv": ("echo", "hello")}, ), PrimitiveAction( id="tmux.session", type="tmux.new_session", description="Create session", payload={"name": "dev-api", "detached": True, "command": "flow dev exec api"}, ), PrimitiveAction( id="tmux.option", type="tmux.set_option", description="Set option", payload={ "session": "dev-api", "option": "default-command", "value": "flow dev exec api", }, ), ), ) summary = ActionExecutor(ctx, audit_path=tmp_path / "actions.jsonl").execute(plan) assert summary.results[0].returncode == 0 assert ["docker", "exec", "dev-api", "echo", "hello"] in runner.calls assert ["tmux", "new-session", "-ds", "dev-api", "flow dev exec api"] in runner.calls assert [ "tmux", "set-option", "-t", "dev-api", "default-command", "flow dev exec api", ] in runner.calls def test_container_exec_nonzero_result_is_failed(tmp_path): runner = FakeRunner( responses={ ("exec", "dev-api", "false"): subprocess.CompletedProcess( ["docker", "exec", "dev-api", "false"], 7, stdout="", stderr="" ), } ) ctx = _ctx() ctx.runtime.runner = runner ctx.runtime.containers = ContainerRuntime(runner, binary="docker") plan = ActionPlan( name="container-exec-failure", primitive_actions=( PrimitiveAction( id="container.exec", type="container.exec", description="Run failing command in container", payload={"name": "dev-api", "argv": ("false",)}, ), ), ) summary = ActionExecutor(ctx, audit_path=tmp_path / "actions.jsonl").execute(plan) assert summary.results[0].status == "failed" assert summary.results[0].returncode == 7 audit_events = [ json.loads(line)["event"] for line in (tmp_path / "actions.jsonl").read_text(encoding="utf-8").splitlines() ] assert "action_failed" in audit_events assert "action_success" not in audit_events def test_copy_directory_refuses_existing_target(tmp_path): source = tmp_path / "source" target = tmp_path / "target" source.mkdir() target.mkdir() (source / "new.txt").write_text("new", encoding="utf-8") (target / "existing.txt").write_text("keep", encoding="utf-8") plan = ActionPlan( name="copy-dir-existing-target", primitive_actions=( PrimitiveAction( id="copy-dir", type="file.copy", description="Copy directory", payload={"source": source, "target": target}, ), ), ) with pytest.raises(FlowError, match="Copy target already exists"): ActionExecutor(_ctx(), audit_path=tmp_path / "actions.jsonl").execute(plan) assert not (target / "new.txt").exists() assert (target / "existing.txt").read_text(encoding="utf-8") == "keep"