update
Some checks failed
test / unit (push) Has been cancelled
test / e2e (push) Has been cancelled

This commit is contained in:
2026-05-18 04:05:46 +03:00
parent 082468e2bd
commit 9fb08d035f
5 changed files with 158 additions and 12 deletions

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
import json
import subprocess
import sys
import pytest
@@ -210,3 +211,64 @@ def test_executor_dispatches_container_and_tmux_primitives(tmp_path):
assert [
"tmux", "set-option", "-t", "dev-api", "default-command", "flow dev exec api",
] in runner.calls
def test_container_exec_nonzero_result_is_failed(tmp_path):
runner = FakeRunner(
responses={
("exec", "dev-api", "false"): subprocess.CompletedProcess(
["docker", "exec", "dev-api", "false"], 7, stdout="", stderr=""
),
}
)
ctx = _ctx()
ctx.runtime.runner = runner
ctx.runtime.containers = ContainerRuntime(runner, binary="docker")
plan = ActionPlan(
name="container-exec-failure",
primitive_actions=(
PrimitiveAction(
id="container.exec",
type="container.exec",
description="Run failing command in container",
payload={"name": "dev-api", "argv": ("false",)},
),
),
)
summary = ActionExecutor(ctx, audit_path=tmp_path / "actions.jsonl").execute(plan)
assert summary.results[0].status == "failed"
assert summary.results[0].returncode == 7
audit_events = [
json.loads(line)["event"]
for line in (tmp_path / "actions.jsonl").read_text(encoding="utf-8").splitlines()
]
assert "action_failed" in audit_events
assert "action_success" not in audit_events
def test_copy_directory_refuses_existing_target(tmp_path):
source = tmp_path / "source"
target = tmp_path / "target"
source.mkdir()
target.mkdir()
(source / "new.txt").write_text("new", encoding="utf-8")
(target / "existing.txt").write_text("keep", encoding="utf-8")
plan = ActionPlan(
name="copy-dir-existing-target",
primitive_actions=(
PrimitiveAction(
id="copy-dir",
type="file.copy",
description="Copy directory",
payload={"source": source, "target": target},
),
),
)
with pytest.raises(FlowError, match="Copy target already exists"):
ActionExecutor(_ctx(), audit_path=tmp_path / "actions.jsonl").execute(plan)
assert not (target / "new.txt").exists()
assert (target / "existing.txt").read_text(encoding="utf-8") == "keep"