81 lines
3.2 KiB
Python
81 lines
3.2 KiB
Python
"""Tests for ContainerService."""
|
|
|
|
import subprocess
|
|
|
|
from flow.core.config import AppConfig, FlowContext
|
|
from flow.core.console import Console
|
|
from flow.core.platform import PlatformInfo
|
|
from flow.core.runtime import CommandRunner, SystemRuntime
|
|
from flow.core import paths
|
|
from flow.services.containers import ContainerService
|
|
|
|
|
|
class FakeRunner(CommandRunner):
|
|
"""CommandRunner that captures calls instead of executing."""
|
|
def __init__(self):
|
|
self.calls: list[tuple] = []
|
|
|
|
def run(self, argv, *, cwd=None, env=None, capture_output=True, check=False, timeout=None):
|
|
self.calls.append(("run", list(argv)))
|
|
command = list(argv)
|
|
if command[:4] == ["docker", "container", "ls", "-a"]:
|
|
return subprocess.CompletedProcess(argv, 0, stdout="dev-api\n", stderr="")
|
|
if command[:3] == ["docker", "container", "ls"]:
|
|
return subprocess.CompletedProcess(argv, 0, stdout="dev-api\n", stderr="")
|
|
return subprocess.CompletedProcess(argv, 0, stdout="", stderr="")
|
|
|
|
def run_shell(self, command, *, cwd=None, env=None, capture_output=True, check=False, timeout=None):
|
|
self.calls.append(("run_shell", command))
|
|
return subprocess.CompletedProcess(command, 0, stdout="", stderr="")
|
|
|
|
|
|
def _make_ctx(tmp_path, runner=None):
|
|
rt = SystemRuntime()
|
|
if runner:
|
|
rt.runner = runner
|
|
return FlowContext(
|
|
config=AppConfig(),
|
|
manifest={},
|
|
platform=PlatformInfo(),
|
|
console=Console(color=False),
|
|
runtime=rt,
|
|
)
|
|
|
|
|
|
class TestContainerService:
|
|
def test_create_dry_run(self, tmp_path, capsys, monkeypatch):
|
|
monkeypatch.setattr(paths, "HOME", tmp_path)
|
|
monkeypatch.setattr(paths, "DOTFILES_DIR", tmp_path / "dotfiles")
|
|
monkeypatch.setattr("flow.services.containers.runtime", lambda: "docker")
|
|
ctx = _make_ctx(tmp_path)
|
|
svc = ContainerService(ctx)
|
|
svc.create("api", "tm0/node", dry_run=True)
|
|
output = capsys.readouterr().out
|
|
assert "dev-api" in output
|
|
|
|
def test_list_no_containers(self, tmp_path, capsys, monkeypatch):
|
|
runner = FakeRunner()
|
|
monkeypatch.setattr("flow.services.containers.runtime", lambda: "docker")
|
|
runner.run = lambda argv, **kwargs: subprocess.CompletedProcess(argv, 0, stdout="", stderr="")
|
|
ctx = _make_ctx(tmp_path, runner=runner)
|
|
svc = ContainerService(ctx)
|
|
svc.list()
|
|
output = capsys.readouterr().out
|
|
assert "No flow containers" in output
|
|
|
|
def test_stop_calls_docker(self, tmp_path, monkeypatch):
|
|
runner = FakeRunner()
|
|
monkeypatch.setattr("flow.services.containers.runtime", lambda: "docker")
|
|
ctx = _make_ctx(tmp_path, runner=runner)
|
|
svc = ContainerService(ctx)
|
|
svc.stop("api")
|
|
assert any("docker" in str(c) and "stop" in str(c) for c in runner.calls)
|
|
|
|
def test_remove_calls_docker(self, tmp_path, monkeypatch):
|
|
runner = FakeRunner()
|
|
monkeypatch.setattr("flow.services.containers.runtime", lambda: "docker")
|
|
ctx = _make_ctx(tmp_path, runner=runner)
|
|
svc = ContainerService(ctx)
|
|
svc.remove("api")
|
|
assert any("docker" in str(c) and "rm" in str(c) for c in runner.calls)
|