"""Tests for flow.adapters.containers.""" import subprocess from pathlib import Path import pytest from flow.adapters.containers import ContainerRuntime from flow.core.errors import FlowError from tests.fakes import FakeRunner class TestBinaryDetection: def test_explicit_binary(self): rt = ContainerRuntime(FakeRunner(), binary="podman") assert rt.binary == "podman" def test_no_runtime_raises(self, monkeypatch): monkeypatch.setattr("shutil.which", lambda _: None) rt = ContainerRuntime(FakeRunner()) with pytest.raises(FlowError, match="No container runtime"): _ = rt.binary def test_invalid_mode_raises(self): with pytest.raises(FlowError, match="Unknown container runtime mode"): ContainerRuntime(FakeRunner(), mode="nope") class TestMode: def test_mode_docker_forces_binary(self): rt = ContainerRuntime(FakeRunner(), mode="docker") assert rt.binary == "docker" def test_mode_podman_forces_binary(self): rt = ContainerRuntime(FakeRunner(), mode="podman") assert rt.binary == "podman" def test_mode_podman_rootful_forces_binary(self): rt = ContainerRuntime(FakeRunner(), mode="podman-rootful") assert rt.binary == "podman" def test_mode_auto_detects(self, monkeypatch): monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/podman" if name == "podman" else None) rt = ContainerRuntime(FakeRunner(), mode="auto") assert rt.binary == "podman" def test_podman_rootful_prefers_rootful_socket(self, tmp_path, monkeypatch): rootless = tmp_path / "rootless.sock" rootful = tmp_path / "rootful.sock" compat = tmp_path / "compat.sock" rootless.write_text("") rootful.write_text("") compat.write_text("") monkeypatch.setattr( "flow.adapters.containers.ContainerRuntime._socket_candidates", lambda self: [rootful, rootless, compat], ) rt = ContainerRuntime(FakeRunner(), mode="podman-rootful", binary="podman") assert rt.socket_path == rootful def test_podman_rootless_prefers_rootless_socket(self, tmp_path, monkeypatch): rootless = tmp_path / "rootless.sock" rootful = tmp_path / "rootful.sock" rootless.write_text("") rootful.write_text("") monkeypatch.setattr( "flow.adapters.containers.ContainerRuntime._socket_candidates", lambda self: [rootless, rootful], ) rt = ContainerRuntime(FakeRunner(), mode="podman", binary="podman") assert rt.socket_path == rootless class TestSocketPath: def test_docker_socket(self, tmp_path, monkeypatch): sock = tmp_path / "docker.sock" sock.write_text("") monkeypatch.setattr( "flow.adapters.containers.ContainerRuntime._socket_candidates", lambda self: [sock], ) rt = ContainerRuntime(FakeRunner(), binary="docker") assert rt.socket_path == sock def test_docker_socket_missing(self, monkeypatch): monkeypatch.setattr( "flow.adapters.containers.ContainerRuntime._socket_candidates", lambda self: [Path("/nonexistent/docker.sock")], ) rt = ContainerRuntime(FakeRunner(), binary="docker") assert rt.socket_path is None def test_podman_rootless_preferred(self, tmp_path, monkeypatch): rootless = tmp_path / "rootless.sock" rootful = tmp_path / "rootful.sock" rootless.write_text("") rootful.write_text("") monkeypatch.setattr( "flow.adapters.containers.ContainerRuntime._socket_candidates", lambda self: [rootless, rootful], ) rt = ContainerRuntime(FakeRunner(), binary="podman") assert rt.socket_path == rootless def test_podman_falls_back_to_rootful(self, tmp_path, monkeypatch): rootful = tmp_path / "rootful.sock" rootful.write_text("") monkeypatch.setattr( "flow.adapters.containers.ContainerRuntime._socket_candidates", lambda self: [Path("/nonexistent"), rootful], ) rt = ContainerRuntime(FakeRunner(), binary="podman") assert rt.socket_path == rootful def test_podman_uses_xdg_runtime_dir(self, monkeypatch): monkeypatch.setenv("XDG_RUNTIME_DIR", "/custom/run") rt = ContainerRuntime(FakeRunner(), binary="podman") candidates = rt._socket_candidates() assert candidates[0] == Path("/custom/run/podman/podman.sock") class TestSocketSecurityOpts: def test_podman_returns_label_disable(self): rt = ContainerRuntime(FakeRunner(), binary="podman") assert rt.socket_security_opts == ["label=disable"] def test_docker_returns_empty(self): rt = ContainerRuntime(FakeRunner(), binary="docker") assert rt.socket_security_opts == [] class TestRunContainer: def test_basic(self): runner = FakeRunner() rt = ContainerRuntime(runner, binary="docker") rt.run_container( "dev-api", "reg/img:latest", labels={"dev": "true"}, mounts=["/src:/dst"], command=["sleep", "infinity"], detach=True, ) call = runner.calls[-1] assert call[0] == "docker" assert call[1] == "run" assert "-d" in call assert "--name" in call idx = call.index("--name") assert call[idx + 1] == "dev-api" assert "-v" in call assert "/src:/dst" in call assert call[-2:] == ["sleep", "infinity"] def test_with_security_opts(self): runner = FakeRunner() rt = ContainerRuntime(runner, binary="podman") rt.run_container( "dev-api", "reg/img:latest", security_opts=["label=disable"], detach=True, ) call = runner.calls[-1] idx = call.index("--security-opt") assert call[idx + 1] == "label=disable" class TestExecIn: def test_interactive(self): runner = FakeRunner() rt = ContainerRuntime(runner, binary="docker") rc = rt.exec_in("dev-api", ["zsh", "-l"], interactive=True, detach_keys="ctrl-q,ctrl-p") assert rc == 0 call = runner.calls[-1] assert "-it" in call assert "--detach-keys" in call assert "ctrl-q,ctrl-p" in call assert call[-2:] == ["zsh", "-l"] class TestLifecycle: def test_start(self): runner = FakeRunner() rt = ContainerRuntime(runner, binary="docker") rt.start("dev-api") assert runner.calls[-1] == ["docker", "start", "dev-api"] def test_stop(self): runner = FakeRunner() rt = ContainerRuntime(runner, binary="docker") rt.stop("dev-api") assert runner.calls[-1] == ["docker", "stop", "dev-api"] def test_kill(self): runner = FakeRunner() rt = ContainerRuntime(runner, binary="docker") rt.kill("dev-api") assert runner.calls[-1] == ["docker", "kill", "dev-api"] def test_rm(self): runner = FakeRunner() rt = ContainerRuntime(runner, binary="docker") rt.rm("dev-api") assert runner.calls[-1] == ["docker", "rm", "dev-api"] def test_rm_force(self): runner = FakeRunner() rt = ContainerRuntime(runner, binary="docker") rt.rm("dev-api", force=True) assert runner.calls[-1] == ["docker", "rm", "-f", "dev-api"] class TestInspect: def test_returns_stdout(self): runner = FakeRunner({ ("inspect",): subprocess.CompletedProcess([], 0, stdout="reg/img:latest\n"), }) rt = ContainerRuntime(runner, binary="docker") result = rt.inspect("dev-api", "{{ .Config.Image }}") assert result == "reg/img:latest" class TestPs: def test_all_with_filter(self): runner = FakeRunner({ ("ps",): subprocess.CompletedProcess([], 0, stdout="dev-api\n"), }) rt = ContainerRuntime(runner, binary="docker") output = rt.ps(all=True, filter="label=dev=true", format="{{.Names}}") assert output == "dev-api" call = runner.calls[-1] assert "-a" in call assert "--filter" in call def test_forwards_timeout(self): runner = FakeRunner({ ("ps",): subprocess.CompletedProcess([], 0, stdout="dev-api\n"), }) rt = ContainerRuntime(runner, binary="docker") rt.ps(format="{{.Names}}", timeout=1.0) assert runner.timeouts[-1] == 1.0 class TestContainerExists: def test_exists(self): runner = FakeRunner({ ("ps",): subprocess.CompletedProcess([], 0, stdout="dev-api\n"), }) rt = ContainerRuntime(runner, binary="docker") assert rt.container_exists("dev-api") is True def test_not_exists(self): runner = FakeRunner() rt = ContainerRuntime(runner, binary="docker") assert rt.container_exists("dev-missing") is False class TestContainerRunning: def test_running(self): runner = FakeRunner({ ("ps",): subprocess.CompletedProcess([], 0, stdout="dev-api\n"), }) rt = ContainerRuntime(runner, binary="docker") assert rt.container_running("dev-api") is True def test_not_running(self): runner = FakeRunner() rt = ContainerRuntime(runner, binary="docker") assert rt.container_running("dev-api") is False