Files
flow/tests/test_service_containers.py

83 lines
2.8 KiB
Python

"""Tests for ContainerService."""
import subprocess
from flow.core.config import AppConfig, FlowContext
from flow.core.console import Console
from flow.adapters.containers import ContainerRuntime
from flow.core.platform import PlatformInfo
from flow.core.runtime import SystemRuntime
from flow.core import paths
from flow.app.containers import ContainerService
from tests.fakes import FakeRunner
def _make_ctx(tmp_path, runner=None):
rt = SystemRuntime()
if runner:
rt.runner = runner
rt.containers = ContainerRuntime(runner, binary="docker")
return FlowContext(
config=AppConfig(),
manifest={},
platform=PlatformInfo(),
console=Console(color=False),
runtime=rt,
)
class TestContainerService:
def test_create_dry_run(self, tmp_path, capsys, monkeypatch):
monkeypatch.setattr(paths, "HOME", tmp_path)
monkeypatch.setattr(paths, "DOTFILES_DIR", tmp_path / "dotfiles")
runner = FakeRunner(responses={
("ps", "{{.Names}}"): subprocess.CompletedProcess([], 0, stdout="dev-api\n"),
})
ctx = _make_ctx(tmp_path, runner=runner)
svc = ContainerService(ctx)
svc.create("api", "tm0/node", dry_run=True)
output = capsys.readouterr().out
assert "dev-api" in output
assert runner.calls == []
def test_list_no_containers(self, tmp_path, capsys):
runner = FakeRunner()
ctx = _make_ctx(tmp_path, runner=runner)
svc = ContainerService(ctx)
svc.list()
output = capsys.readouterr().out
assert "No flow containers" in output
def test_stop_calls_docker(self, tmp_path):
runner = FakeRunner(responses={
("ps",): subprocess.CompletedProcess([], 0, stdout="dev-api\n"),
})
ctx = _make_ctx(tmp_path, runner=runner)
svc = ContainerService(ctx)
svc.stop("api")
assert any("docker" in str(c) and "stop" in str(c) for c in runner.calls)
def test_remove_calls_docker(self, tmp_path):
runner = FakeRunner(responses={
("ps",): subprocess.CompletedProcess([], 0, stdout="dev-api\n"),
})
ctx = _make_ctx(tmp_path, runner=runner)
svc = ContainerService(ctx)
svc.remove("api")
assert any("docker" in str(c) and "rm" in str(c) for c in runner.calls)
def test_exec_command_uses_container_exec_action(self, tmp_path):
runner = FakeRunner(responses={
("ps", "{{.Names}}"): subprocess.CompletedProcess([], 0, stdout="dev-api\n"),
})
ctx = _make_ctx(tmp_path, runner=runner)
svc = ContainerService(ctx)
try:
svc.exec("api", ["echo", "hello"])
except SystemExit as e:
assert e.code == 0
assert ["docker", "exec", "dev-api", "echo", "hello"] in runner.calls