"""Tests for ContainerService.""" import subprocess from pathlib import Path from unittest.mock import MagicMock, patch from flow.core.config import AppConfig, FlowContext from flow.core.console import Console from flow.core.platform import PlatformInfo from flow.core.runtime import CommandRunner, FileSystem, SystemRuntime from flow.core import paths from flow.services.containers import ContainerService class FakeRunner(CommandRunner): """CommandRunner that captures calls instead of executing.""" def __init__(self): self.calls: list[tuple] = [] def run(self, argv, *, cwd=None, env=None, capture_output=True, check=False, timeout=None): self.calls.append(("run", list(argv))) return subprocess.CompletedProcess(argv, 0, stdout="", stderr="") def run_shell(self, command, *, cwd=None, env=None, capture_output=True, check=False, timeout=None): self.calls.append(("run_shell", command)) return subprocess.CompletedProcess(command, 0, stdout="", stderr="") def _make_ctx(tmp_path, runner=None): rt = SystemRuntime() if runner: rt.runner = runner return FlowContext( config=AppConfig(), manifest={}, platform=PlatformInfo(), console=Console(color=False), runtime=rt, ) class TestContainerService: def test_create_dry_run(self, tmp_path, capsys, monkeypatch): monkeypatch.setattr(paths, "HOME", tmp_path) monkeypatch.setattr(paths, "DOTFILES_DIR", tmp_path / "dotfiles") ctx = _make_ctx(tmp_path) svc = ContainerService(ctx) svc.create("devbox", "personal", dry_run=True) output = capsys.readouterr().out assert "devbox" in output def test_list_no_docker(self, tmp_path, capsys): runner = FakeRunner() ctx = _make_ctx(tmp_path, runner=runner) svc = ContainerService(ctx) svc.list() # FakeRunner returns empty stdout -> "No flow containers" output = capsys.readouterr().out assert "No flow containers" in output def test_stop_calls_docker(self, tmp_path): runner = FakeRunner() ctx = _make_ctx(tmp_path, runner=runner) svc = ContainerService(ctx) svc.stop("flow-personal-devbox") assert any("docker" in str(c) and "stop" in str(c) for c in runner.calls) def test_remove_calls_docker(self, tmp_path): runner = FakeRunner() ctx = _make_ctx(tmp_path, runner=runner) svc = ContainerService(ctx) svc.remove("flow-personal-devbox") assert any("docker" in str(c) and "rm" in str(c) for c in runner.calls)