"""Tests for ContainerService.""" import subprocess from flow.core.config import AppConfig, FlowContext from flow.core.console import Console from flow.adapters.containers import ContainerRuntime from flow.core.platform import PlatformInfo from flow.core.runtime import SystemRuntime from flow.core import paths from flow.app.containers import ContainerService from tests.fakes import FakeRunner def _make_ctx(tmp_path, runner=None): rt = SystemRuntime() if runner: rt.runner = runner rt.containers = ContainerRuntime(runner, binary="docker") return FlowContext( config=AppConfig(), manifest={}, platform=PlatformInfo(), console=Console(color=False), runtime=rt, ) class TestContainerService: def test_create_dry_run(self, tmp_path, capsys, monkeypatch): monkeypatch.setattr(paths, "HOME", tmp_path) monkeypatch.setattr(paths, "DOTFILES_DIR", tmp_path / "dotfiles") runner = FakeRunner(responses={ ("ps", "{{.Names}}"): subprocess.CompletedProcess([], 0, stdout="dev-api\n"), }) ctx = _make_ctx(tmp_path, runner=runner) svc = ContainerService(ctx) svc.create("api", "tm0/node", dry_run=True) output = capsys.readouterr().out assert "dev-api" in output assert runner.calls == [] def test_list_no_containers(self, tmp_path, capsys): runner = FakeRunner() ctx = _make_ctx(tmp_path, runner=runner) svc = ContainerService(ctx) svc.list() output = capsys.readouterr().out assert "No flow containers" in output def test_stop_calls_docker(self, tmp_path): runner = FakeRunner(responses={ ("ps",): subprocess.CompletedProcess([], 0, stdout="dev-api\n"), }) ctx = _make_ctx(tmp_path, runner=runner) svc = ContainerService(ctx) svc.stop("api") assert any("docker" in str(c) and "stop" in str(c) for c in runner.calls) def test_remove_calls_docker(self, tmp_path): runner = FakeRunner(responses={ ("ps",): subprocess.CompletedProcess([], 0, stdout="dev-api\n"), }) ctx = _make_ctx(tmp_path, runner=runner) svc = ContainerService(ctx) svc.remove("api") assert any("docker" in str(c) and "rm" in str(c) for c in runner.calls) def test_exec_command_uses_container_exec_action(self, tmp_path): runner = FakeRunner(responses={ ("ps", "{{.Names}}"): subprocess.CompletedProcess([], 0, stdout="dev-api\n"), }) ctx = _make_ctx(tmp_path, runner=runner) svc = ContainerService(ctx) try: svc.exec("api", ["echo", "hello"]) except SystemExit as e: assert e.code == 0 assert ["docker", "exec", "dev-api", "echo", "hello"] in runner.calls