Files
flow/tests/test_actions_executor.py
Tomas Mirchev 9fb08d035f
Some checks failed
test / unit (push) Has been cancelled
test / e2e (push) Has been cancelled
update
2026-05-18 04:05:46 +03:00

275 lines
8.6 KiB
Python

"""Tests for the canonical action executor."""
from __future__ import annotations
import json
import subprocess
import sys
import pytest
from flow.actions import ActionExecutor, ActionPlan, DomainAction, PrimitiveAction, RollbackPolicy
from flow.adapters.containers import ContainerRuntime
from flow.adapters.tmux import TmuxClient
from flow.core.config import AppConfig, FlowContext
from flow.core.console import Console
from flow.core.errors import FlowError
from flow.core.platform import PlatformInfo
from flow.core.runtime import SystemRuntime
from tests.fakes import FakeRunner
def _ctx() -> FlowContext:
return FlowContext(
config=AppConfig(),
manifest={},
platform=PlatformInfo(),
console=Console(color=False),
runtime=SystemRuntime(),
)
def test_dry_run_does_not_mutate(tmp_path):
target = tmp_path / "out.txt"
plan = ActionPlan(
name="dry-run",
primitive_actions=(
PrimitiveAction(
id="write",
type="file.write",
description="Write a file",
payload={"path": target, "content": "hello"},
),
),
)
summary = ActionExecutor(_ctx(), audit_path=tmp_path / "actions.jsonl").execute(
plan,
dry_run=True,
)
assert not target.exists()
assert summary.results[0].status == "dry-run"
def test_audit_jsonl_records_success(tmp_path):
target = tmp_path / "out.txt"
audit_path = tmp_path / "actions.jsonl"
plan = ActionPlan(
name="write-plan",
primitive_actions=(
PrimitiveAction(
id="write",
type="file.write",
description="Write a file",
payload={"path": target, "content": "hello"},
),
),
)
ActionExecutor(_ctx(), audit_path=audit_path).execute(plan)
records = [json.loads(line) for line in audit_path.read_text().splitlines()]
assert [record["event"] for record in records] == [
"plan_start",
"action_start",
"action_success",
"plan_success",
]
assert target.read_text() == "hello"
def test_rollback_removes_created_symlink_after_failure(tmp_path):
source = tmp_path / "source"
target = tmp_path / "target"
source.write_text("managed")
plan = ActionPlan(
name="rollback",
primitive_actions=(
PrimitiveAction(
id="link",
type="file.create_symlink",
description="Create managed link",
payload={"source": source, "target": target},
),
PrimitiveAction(
id="copy-missing",
type="file.copy",
description="Fail on missing source",
payload={"source": tmp_path / "missing", "target": tmp_path / "dest"},
),
),
)
with pytest.raises(FlowError, match="missing"):
ActionExecutor(_ctx(), audit_path=tmp_path / "actions.jsonl").execute(plan)
assert not target.exists()
assert not target.is_symlink()
def test_barrier_prevents_rollback_across_external_boundary(tmp_path):
source = tmp_path / "source"
target = tmp_path / "target"
source.write_text("managed")
plan = ActionPlan(
name="barrier",
primitive_actions=(
PrimitiveAction(
id="link",
type="file.create_symlink",
description="Create managed link",
payload={"source": source, "target": target},
),
PrimitiveAction(
id="barrier",
type="process.argv",
description="External boundary",
payload={"argv": (sys.executable, "-c", "")},
rollback_policy=RollbackPolicy.BARRIER,
),
PrimitiveAction(
id="copy-missing",
type="file.copy",
description="Fail on missing source",
payload={"source": tmp_path / "missing", "target": tmp_path / "dest"},
),
),
)
with pytest.raises(FlowError, match="missing"):
ActionExecutor(_ctx(), audit_path=tmp_path / "actions.jsonl").execute(plan)
assert target.is_symlink()
def test_domain_action_expands_embedded_primitives(tmp_path):
target = tmp_path / "completion"
primitive = PrimitiveAction(
id="completion.write",
type="file.write",
description="Write completion",
payload={"path": target, "content": "complete"},
)
plan = ActionPlan(
name="completion.install",
domain_actions=(
DomainAction(
id="completion.install",
kind="completion",
action="install-zsh",
description="Install completion",
payload={"primitive_actions": (primitive,)},
),
),
)
ActionExecutor(_ctx(), audit_path=tmp_path / "actions.jsonl").execute(plan)
assert target.read_text() == "complete"
def test_executor_dispatches_container_and_tmux_primitives(tmp_path):
runner = FakeRunner()
ctx = _ctx()
ctx.runtime.runner = runner
ctx.runtime.containers = ContainerRuntime(runner, binary="docker")
ctx.runtime.tmux = TmuxClient(runner)
plan = ActionPlan(
name="runtime-dispatch",
primitive_actions=(
PrimitiveAction(
id="container.exec",
type="container.exec",
description="Run command in container",
payload={"name": "dev-api", "argv": ("echo", "hello")},
),
PrimitiveAction(
id="tmux.session",
type="tmux.new_session",
description="Create session",
payload={"name": "dev-api", "detached": True, "command": "flow dev exec api"},
),
PrimitiveAction(
id="tmux.option",
type="tmux.set_option",
description="Set option",
payload={
"session": "dev-api",
"option": "default-command",
"value": "flow dev exec api",
},
),
),
)
summary = ActionExecutor(ctx, audit_path=tmp_path / "actions.jsonl").execute(plan)
assert summary.results[0].returncode == 0
assert ["docker", "exec", "dev-api", "echo", "hello"] in runner.calls
assert ["tmux", "new-session", "-ds", "dev-api", "flow dev exec api"] in runner.calls
assert [
"tmux", "set-option", "-t", "dev-api", "default-command", "flow dev exec api",
] in runner.calls
def test_container_exec_nonzero_result_is_failed(tmp_path):
runner = FakeRunner(
responses={
("exec", "dev-api", "false"): subprocess.CompletedProcess(
["docker", "exec", "dev-api", "false"], 7, stdout="", stderr=""
),
}
)
ctx = _ctx()
ctx.runtime.runner = runner
ctx.runtime.containers = ContainerRuntime(runner, binary="docker")
plan = ActionPlan(
name="container-exec-failure",
primitive_actions=(
PrimitiveAction(
id="container.exec",
type="container.exec",
description="Run failing command in container",
payload={"name": "dev-api", "argv": ("false",)},
),
),
)
summary = ActionExecutor(ctx, audit_path=tmp_path / "actions.jsonl").execute(plan)
assert summary.results[0].status == "failed"
assert summary.results[0].returncode == 7
audit_events = [
json.loads(line)["event"]
for line in (tmp_path / "actions.jsonl").read_text(encoding="utf-8").splitlines()
]
assert "action_failed" in audit_events
assert "action_success" not in audit_events
def test_copy_directory_refuses_existing_target(tmp_path):
source = tmp_path / "source"
target = tmp_path / "target"
source.mkdir()
target.mkdir()
(source / "new.txt").write_text("new", encoding="utf-8")
(target / "existing.txt").write_text("keep", encoding="utf-8")
plan = ActionPlan(
name="copy-dir-existing-target",
primitive_actions=(
PrimitiveAction(
id="copy-dir",
type="file.copy",
description="Copy directory",
payload={"source": source, "target": target},
),
),
)
with pytest.raises(FlowError, match="Copy target already exists"):
ActionExecutor(_ctx(), audit_path=tmp_path / "actions.jsonl").execute(plan)
assert not (target / "new.txt").exists()
assert (target / "existing.txt").read_text(encoding="utf-8") == "keep"