"""Tests for flow.core.action.""" from flow.core.action import Action, ActionExecutor from flow.core.console import ConsoleLogger def test_action_defaults(): a = Action(type="test", description="Test action") assert a.status == "pending" assert a.error is None assert a.skip_on_error is True assert a.os_filter is None assert a.data == {} def test_executor_register_and_execute(capsys): console = ConsoleLogger() executor = ActionExecutor(console) results = [] def handler(data): results.append(data["key"]) executor.register("test-action", handler) actions = [ Action(type="test-action", description="Do thing", data={"key": "value1"}), Action(type="test-action", description="Do another", data={"key": "value2"}), ] executor.execute(actions, current_os="linux") assert results == ["value1", "value2"] assert actions[0].status == "completed" assert actions[1].status == "completed" def test_executor_dry_run(capsys): console = ConsoleLogger() executor = ActionExecutor(console) executed = [] executor.register("test", lambda data: executed.append(1)) actions = [Action(type="test", description="Should not run")] executor.execute(actions, dry_run=True) assert executed == [] # Nothing executed out = capsys.readouterr().out assert "EXECUTION PLAN" in out def test_executor_skip_on_error(capsys): console = ConsoleLogger() executor = ActionExecutor(console) def failing_handler(data): raise RuntimeError("boom") executor.register("fail", failing_handler) actions = [ Action(type="fail", description="Will fail", skip_on_error=True), Action(type="fail", description="Should still run", skip_on_error=True), ] executor.execute(actions, current_os="linux") assert actions[0].status == "skipped" assert actions[1].status == "skipped" def test_executor_critical_failure_stops(capsys): console = ConsoleLogger() executor = ActionExecutor(console) def failing_handler(data): raise RuntimeError("critical failure") executor.register("fail", failing_handler) executor.register("ok", lambda data: None) actions = [ Action(type="fail", description="Critical", skip_on_error=False), Action(type="ok", description="Should not run"), ] executor.execute(actions, current_os="linux") assert actions[0].status == "failed" assert actions[1].status == "pending" # Never reached def test_executor_os_filter(capsys): console = ConsoleLogger() executor = ActionExecutor(console) executed = [] executor.register("test", lambda data: executed.append(data.get("name"))) actions = [ Action(type="test", description="Linux only", data={"name": "linux"}, os_filter="linux"), Action(type="test", description="macOS only", data={"name": "macos"}, os_filter="macos"), Action(type="test", description="Any OS", data={"name": "any"}), ] executor.execute(actions, current_os="linux") assert "linux" in executed assert "any" in executed assert "macos" not in executed def test_executor_no_handler(capsys): console = ConsoleLogger() executor = ActionExecutor(console) actions = [Action(type="unknown", description="No handler registered")] executor.execute(actions, current_os="linux") assert actions[0].status == "skipped"