This commit is contained in:
2026-05-13 23:02:47 +03:00
parent d0f8315cf1
commit 78f95bc88e
49 changed files with 2747 additions and 987 deletions

36
tests/fakes.py Normal file
View File

@@ -0,0 +1,36 @@
"""Shared test fixtures."""
from __future__ import annotations
import subprocess
from typing import Any
from flow.core.runtime import CommandRunner
class FakeRunner(CommandRunner):
"""CommandRunner that captures calls instead of executing.
Response matching uses keyword containment: a response keyed by
``("ps", "{{.Names}}")`` matches any command whose argv contains
both ``"ps"`` and ``"{{.Names}}"``.
"""
def __init__(self, responses: dict[tuple[str, ...], Any] | None = None):
self.calls: list[list[str]] = []
self.timeouts: list[float | None] = []
self._responses: dict[tuple[str, ...], Any] = responses or {}
def run(self, argv, *, cwd=None, env=None, capture_output=True, check=False, timeout=None):
parts = list(argv)
self.calls.append(parts)
self.timeouts.append(timeout)
for key, resp in self._responses.items():
if all(k in parts for k in key):
return resp
return subprocess.CompletedProcess(parts, 0, stdout="", stderr="")
def run_shell(self, command, *, cwd=None, env=None, capture_output=True, check=False, timeout=None):
self.calls.append(["__shell__", command])
self.timeouts.append(timeout)
return subprocess.CompletedProcess(command, 0, stdout="", stderr="")

View File

@@ -27,6 +27,15 @@ def test_help_flag():
assert "setup" in result.stdout
def test_no_color_flag():
"""Test --no-color flag is accepted."""
result = subprocess.run(
[sys.executable, "-m", "flow", "--no-color", "--help"],
capture_output=True, text=True,
)
assert result.returncode == 0
def test_dotfiles_help():
result = subprocess.run(
[sys.executable, "-m", "flow", "dotfiles", "--help"],
@@ -35,6 +44,12 @@ def test_dotfiles_help():
assert result.returncode == 0
assert "link" in result.stdout
assert "unlink" in result.stdout
# Removed commands should not appear
assert "relink" not in result.stdout
assert "undo" not in result.stdout
assert "clean" not in result.stdout
assert "sync" not in result.stdout
assert "modules" not in result.stdout
def test_packages_help():

View File

@@ -1,5 +1,7 @@
"""Tests for zsh completion."""
import subprocess
from flow.commands.completion import complete
@@ -23,6 +25,33 @@ def test_complete_dotfiles_subcommands():
assert "link" in result
assert "unlink" in result
assert "status" in result
assert "edit" in result
assert "repos" in result
# Removed commands should not appear
assert "sync" not in result
assert "relink" not in result
assert "undo" not in result
assert "clean" not in result
assert "modules" not in result
def test_complete_dotfiles_repos_subcommands():
result = complete(["flow", "dotfiles", "repos", ""], 3)
assert "list" in result
assert "status" in result
assert "pull" in result
assert "push" in result
def test_complete_dotfiles_repos_pull_flags():
result = complete(["flow", "dotfiles", "repos", "pull", "--"], 4)
assert "--repo" in result
assert "--dry-run" in result
def test_complete_dotfiles_edit_packages():
result = complete(["flow", "dotfiles", "edit", "--"], 3)
assert "--no-commit" in result
def test_complete_dotfiles_link_flags():
@@ -41,3 +70,17 @@ def test_complete_packages_subcommands():
assert "install" in result
assert "remove" in result
assert "list" in result
def test_complete_dev_attach_returns_empty_on_timeout(monkeypatch):
class FakeRuntime:
def __init__(self, runner, *, mode="auto"):
self.runner = runner
def ps(self, **kwargs):
assert kwargs["timeout"] == 1.0
raise subprocess.TimeoutExpired("docker ps", kwargs["timeout"])
monkeypatch.setattr("flow.commands.completion.ContainerRuntime", FakeRuntime)
result = complete(["flow", "dev", "attach", ""], 3)
assert result == []

View File

@@ -12,6 +12,7 @@ def test_load_config_missing_path(tmp_path):
assert isinstance(cfg, AppConfig)
assert cfg.dotfiles_url == ""
assert cfg.container_registry == "registry.tomastm.com"
assert cfg.container_runtime == "auto"
def test_load_config_from_yaml(tmp_path):
@@ -131,3 +132,12 @@ def test_load_manifest_merges_local_and_overlay(tmp_path):
data = load_manifest(local, overlay)
assert "profiles" in data
assert "packages" in data
def test_load_config_container_runtime(tmp_path):
(tmp_path / "config.yaml").write_text(
"defaults:\n"
" container-runtime: podman-rootful\n"
)
cfg = load_config(tmp_path)
assert cfg.container_runtime == "podman-rootful"

View File

@@ -0,0 +1,71 @@
"""Tests for flow.core.config_parse."""
import pytest
from flow.core.config import TargetConfig
from flow.core.config_parse import as_bool, parse_target_shorthand, parse_targets
from flow.core.errors import ConfigError
class TestAsBool:
@pytest.mark.parametrize("value", [True, "true", "True", "YES", "1", "on", "y"])
def test_truthy(self, value):
assert as_bool(value) is True
@pytest.mark.parametrize("value", [False, "false", "False", "NO", "0", "off", "n"])
def test_falsy(self, value):
assert as_bool(value) is False
def test_invalid_raises(self):
with pytest.raises(ConfigError, match="Expected boolean"):
as_bool("maybe")
class TestParseTargetShorthand:
def test_at_key(self):
t = parse_target_shorthand("personal@orb", "personal.orb")
assert t == TargetConfig(namespace="personal", platform="orb", host="personal.orb")
def test_at_key_with_identity(self):
t = parse_target_shorthand("work@ec2", "work.ec2 ~/.ssh/id_work")
assert t.identity == "~/.ssh/id_work"
def test_plain_key(self):
t = parse_target_shorthand("personal", "orb personal.orb ~/.ssh/id")
assert t == TargetConfig(namespace="personal", platform="orb", host="personal.orb", identity="~/.ssh/id")
def test_empty_value_raises(self):
with pytest.raises(ConfigError, match="must define a host"):
parse_target_shorthand("x@y", "")
def test_plain_key_too_few_parts_raises(self):
with pytest.raises(ConfigError, match="expected 'platform host"):
parse_target_shorthand("personal", "onlyone")
class TestParseTargets:
def test_none_returns_empty(self):
assert parse_targets(None) == []
def test_dict_shorthand(self):
targets = parse_targets({"personal@orb": "personal.orb"})
assert len(targets) == 1
assert targets[0].host == "personal.orb"
def test_dict_mapping(self):
targets = parse_targets({
"work@ec2": {"host": "work.ec2", "identity": "~/.ssh/id"},
})
assert targets[0].host == "work.ec2"
assert targets[0].identity == "~/.ssh/id"
def test_list_format(self):
targets = parse_targets([
{"namespace": "a", "platform": "b", "host": "a.b"},
])
assert len(targets) == 1
assert targets[0].namespace == "a"
def test_invalid_type_raises(self):
with pytest.raises(ConfigError, match="mapping or list"):
parse_targets("invalid")

View File

@@ -0,0 +1,271 @@
"""Tests for flow.core.containers."""
import subprocess
from pathlib import Path
import pytest
from flow.core.containers import ContainerRuntime
from flow.core.errors import FlowError
from tests.fakes import FakeRunner
class TestBinaryDetection:
def test_explicit_binary(self):
rt = ContainerRuntime(FakeRunner(), binary="podman")
assert rt.binary == "podman"
def test_no_runtime_raises(self, monkeypatch):
monkeypatch.setattr("shutil.which", lambda _: None)
rt = ContainerRuntime(FakeRunner())
with pytest.raises(FlowError, match="No container runtime"):
_ = rt.binary
def test_invalid_mode_raises(self):
with pytest.raises(FlowError, match="Unknown container runtime mode"):
ContainerRuntime(FakeRunner(), mode="nope")
class TestMode:
def test_mode_docker_forces_binary(self):
rt = ContainerRuntime(FakeRunner(), mode="docker")
assert rt.binary == "docker"
def test_mode_podman_forces_binary(self):
rt = ContainerRuntime(FakeRunner(), mode="podman")
assert rt.binary == "podman"
def test_mode_podman_rootful_forces_binary(self):
rt = ContainerRuntime(FakeRunner(), mode="podman-rootful")
assert rt.binary == "podman"
def test_mode_auto_detects(self, monkeypatch):
monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/podman" if name == "podman" else None)
rt = ContainerRuntime(FakeRunner(), mode="auto")
assert rt.binary == "podman"
def test_podman_rootful_prefers_rootful_socket(self, tmp_path, monkeypatch):
rootless = tmp_path / "rootless.sock"
rootful = tmp_path / "rootful.sock"
compat = tmp_path / "compat.sock"
rootless.write_text("")
rootful.write_text("")
compat.write_text("")
monkeypatch.setattr(
"flow.core.containers.ContainerRuntime._socket_candidates",
lambda self: [rootful, rootless, compat],
)
rt = ContainerRuntime(FakeRunner(), mode="podman-rootful", binary="podman")
assert rt.socket_path == rootful
def test_podman_rootless_prefers_rootless_socket(self, tmp_path, monkeypatch):
rootless = tmp_path / "rootless.sock"
rootful = tmp_path / "rootful.sock"
rootless.write_text("")
rootful.write_text("")
monkeypatch.setattr(
"flow.core.containers.ContainerRuntime._socket_candidates",
lambda self: [rootless, rootful],
)
rt = ContainerRuntime(FakeRunner(), mode="podman", binary="podman")
assert rt.socket_path == rootless
class TestSocketPath:
def test_docker_socket(self, tmp_path, monkeypatch):
sock = tmp_path / "docker.sock"
sock.write_text("")
monkeypatch.setattr(
"flow.core.containers.ContainerRuntime._socket_candidates",
lambda self: [sock],
)
rt = ContainerRuntime(FakeRunner(), binary="docker")
assert rt.socket_path == sock
def test_docker_socket_missing(self, monkeypatch):
monkeypatch.setattr(
"flow.core.containers.ContainerRuntime._socket_candidates",
lambda self: [Path("/nonexistent/docker.sock")],
)
rt = ContainerRuntime(FakeRunner(), binary="docker")
assert rt.socket_path is None
def test_podman_rootless_preferred(self, tmp_path, monkeypatch):
rootless = tmp_path / "rootless.sock"
rootful = tmp_path / "rootful.sock"
rootless.write_text("")
rootful.write_text("")
monkeypatch.setattr(
"flow.core.containers.ContainerRuntime._socket_candidates",
lambda self: [rootless, rootful],
)
rt = ContainerRuntime(FakeRunner(), binary="podman")
assert rt.socket_path == rootless
def test_podman_falls_back_to_rootful(self, tmp_path, monkeypatch):
rootful = tmp_path / "rootful.sock"
rootful.write_text("")
monkeypatch.setattr(
"flow.core.containers.ContainerRuntime._socket_candidates",
lambda self: [Path("/nonexistent"), rootful],
)
rt = ContainerRuntime(FakeRunner(), binary="podman")
assert rt.socket_path == rootful
def test_podman_uses_xdg_runtime_dir(self, monkeypatch):
monkeypatch.setenv("XDG_RUNTIME_DIR", "/custom/run")
rt = ContainerRuntime(FakeRunner(), binary="podman")
candidates = rt._socket_candidates()
assert candidates[0] == Path("/custom/run/podman/podman.sock")
class TestSocketSecurityOpts:
def test_podman_returns_label_disable(self):
rt = ContainerRuntime(FakeRunner(), binary="podman")
assert rt.socket_security_opts == ["label=disable"]
def test_docker_returns_empty(self):
rt = ContainerRuntime(FakeRunner(), binary="docker")
assert rt.socket_security_opts == []
class TestRunContainer:
def test_basic(self):
runner = FakeRunner()
rt = ContainerRuntime(runner, binary="docker")
rt.run_container(
"dev-api",
"reg/img:latest",
labels={"dev": "true"},
mounts=["/src:/dst"],
command=["sleep", "infinity"],
detach=True,
)
call = runner.calls[-1]
assert call[0] == "docker"
assert call[1] == "run"
assert "-d" in call
assert "--name" in call
idx = call.index("--name")
assert call[idx + 1] == "dev-api"
assert "-v" in call
assert "/src:/dst" in call
assert call[-2:] == ["sleep", "infinity"]
def test_with_security_opts(self):
runner = FakeRunner()
rt = ContainerRuntime(runner, binary="podman")
rt.run_container(
"dev-api",
"reg/img:latest",
security_opts=["label=disable"],
detach=True,
)
call = runner.calls[-1]
idx = call.index("--security-opt")
assert call[idx + 1] == "label=disable"
class TestExecIn:
def test_interactive(self):
runner = FakeRunner()
rt = ContainerRuntime(runner, binary="docker")
rc = rt.exec_in("dev-api", ["zsh", "-l"], interactive=True, detach_keys="ctrl-q,ctrl-p")
assert rc == 0
call = runner.calls[-1]
assert "-it" in call
assert "--detach-keys" in call
assert "ctrl-q,ctrl-p" in call
assert call[-2:] == ["zsh", "-l"]
class TestLifecycle:
def test_start(self):
runner = FakeRunner()
rt = ContainerRuntime(runner, binary="docker")
rt.start("dev-api")
assert runner.calls[-1] == ["docker", "start", "dev-api"]
def test_stop(self):
runner = FakeRunner()
rt = ContainerRuntime(runner, binary="docker")
rt.stop("dev-api")
assert runner.calls[-1] == ["docker", "stop", "dev-api"]
def test_kill(self):
runner = FakeRunner()
rt = ContainerRuntime(runner, binary="docker")
rt.kill("dev-api")
assert runner.calls[-1] == ["docker", "kill", "dev-api"]
def test_rm(self):
runner = FakeRunner()
rt = ContainerRuntime(runner, binary="docker")
rt.rm("dev-api")
assert runner.calls[-1] == ["docker", "rm", "dev-api"]
def test_rm_force(self):
runner = FakeRunner()
rt = ContainerRuntime(runner, binary="docker")
rt.rm("dev-api", force=True)
assert runner.calls[-1] == ["docker", "rm", "-f", "dev-api"]
class TestInspect:
def test_returns_stdout(self):
runner = FakeRunner({
("inspect",): subprocess.CompletedProcess([], 0, stdout="reg/img:latest\n"),
})
rt = ContainerRuntime(runner, binary="docker")
result = rt.inspect("dev-api", "{{ .Config.Image }}")
assert result == "reg/img:latest"
class TestPs:
def test_all_with_filter(self):
runner = FakeRunner({
("ps",): subprocess.CompletedProcess([], 0, stdout="dev-api\n"),
})
rt = ContainerRuntime(runner, binary="docker")
output = rt.ps(all=True, filter="label=dev=true", format="{{.Names}}")
assert output == "dev-api"
call = runner.calls[-1]
assert "-a" in call
assert "--filter" in call
def test_forwards_timeout(self):
runner = FakeRunner({
("ps",): subprocess.CompletedProcess([], 0, stdout="dev-api\n"),
})
rt = ContainerRuntime(runner, binary="docker")
rt.ps(format="{{.Names}}", timeout=1.0)
assert runner.timeouts[-1] == 1.0
class TestContainerExists:
def test_exists(self):
runner = FakeRunner({
("ps",): subprocess.CompletedProcess([], 0, stdout="dev-api\n"),
})
rt = ContainerRuntime(runner, binary="docker")
assert rt.container_exists("dev-api") is True
def test_not_exists(self):
runner = FakeRunner()
rt = ContainerRuntime(runner, binary="docker")
assert rt.container_exists("dev-missing") is False
class TestContainerRunning:
def test_running(self):
runner = FakeRunner({
("ps",): subprocess.CompletedProcess([], 0, stdout="dev-api\n"),
})
rt = ContainerRuntime(runner, binary="docker")
assert rt.container_running("dev-api") is True
def test_not_running(self):
runner = FakeRunner()
rt = ContainerRuntime(runner, binary="docker")
assert rt.container_running("dev-api") is False

View File

@@ -2,7 +2,9 @@
from pathlib import Path
from flow.core.containers import ContainerRuntime
from flow.core.runtime import CommandRunner, FileSystem, GitClient, SystemRuntime
from flow.core.tmux import TmuxClient
class TestFileSystem:
@@ -93,3 +95,13 @@ class TestSystemRuntime:
rt = SystemRuntime()
assert isinstance(rt.git, GitClient)
assert rt.git.runner is rt.runner
def test_creates_tmux_client(self):
rt = SystemRuntime()
assert isinstance(rt.tmux, TmuxClient)
assert rt.tmux.runner is rt.runner
def test_creates_container_runtime(self):
rt = SystemRuntime()
assert isinstance(rt.containers, ContainerRuntime)
assert rt.containers.runner is rt.runner

97
tests/test_core_tmux.py Normal file
View File

@@ -0,0 +1,97 @@
"""Tests for flow.core.tmux."""
import subprocess
from flow.core.tmux import TmuxClient, build_new_session_argv
from tests.fakes import FakeRunner
class TestTmuxClient:
def test_has_session_true(self):
runner = FakeRunner({
("tmux", "has-session", "-t"): subprocess.CompletedProcess([], 0),
})
tmux = TmuxClient(runner)
assert tmux.has_session("dev-api") is True
assert runner.calls[-1] == ["tmux", "has-session", "-t", "dev-api"]
def test_has_session_false(self):
runner = FakeRunner({
("tmux", "has-session", "-t"): subprocess.CompletedProcess([], 1),
})
tmux = TmuxClient(runner)
assert tmux.has_session("dev-api") is False
def test_new_session_detached_with_env_and_command(self):
runner = FakeRunner()
tmux = TmuxClient(runner)
tmux.new_session(
"dev-api",
detached=True,
env={"DF_IMAGE": "reg/img"},
command="flow dev exec api",
)
call = runner.calls[-1]
assert call[:4] == ["tmux", "new-session", "-ds", "dev-api"]
assert "-e" in call
assert "DF_IMAGE=reg/img" in call
assert call[-1] == "flow dev exec api"
def test_new_session_minimal(self):
runner = FakeRunner()
tmux = TmuxClient(runner)
tmux.new_session("sess")
assert runner.calls[-1] == ["tmux", "new-session", "-s", "sess"]
def test_set_option(self):
runner = FakeRunner()
tmux = TmuxClient(runner)
tmux.set_option("dev-api", "default-command", "flow dev exec api")
assert runner.calls[-1] == [
"tmux", "set-option", "-t", "dev-api", "default-command", "flow dev exec api",
]
def test_list_panes(self):
runner = FakeRunner({
("tmux", "list-panes", "-t"): subprocess.CompletedProcess(
[], 0, stdout="dev-api:0.0\ndev-api:0.1\n",
),
})
tmux = TmuxClient(runner)
panes = tmux.list_panes("dev-api")
assert panes == ["dev-api:0.0", "dev-api:0.1"]
def test_list_panes_empty(self):
runner = FakeRunner({
("tmux", "list-panes", "-t"): subprocess.CompletedProcess([], 0, stdout=""),
})
tmux = TmuxClient(runner)
assert tmux.list_panes("dev-api") == []
def test_respawn_pane(self):
runner = FakeRunner()
tmux = TmuxClient(runner)
tmux.respawn_pane("dev-api:0.0")
assert runner.calls[-1] == ["tmux", "respawn-pane", "-t", "dev-api:0.0"]
class TestBuildNewSessionArgv:
def test_basic(self):
argv = build_new_session_argv("default")
assert argv == ["tmux", "new-session", "-As", "default"]
def test_with_env(self):
argv = build_new_session_argv(
"main",
env={"DF_NAMESPACE": "personal", "DF_PLATFORM": "orb"},
)
assert argv == [
"tmux", "new-session", "-As", "main",
"-e", "DF_NAMESPACE=personal",
"-e", "DF_PLATFORM=orb",
]
def test_empty_env(self):
argv = build_new_session_argv("sess", env={})
assert argv == ["tmux", "new-session", "-As", "sess"]

113
tests/test_core_yaml.py Normal file
View File

@@ -0,0 +1,113 @@
"""Tests for flow.core.yaml."""
from pathlib import Path
import pytest
from flow.core.errors import ConfigError
from flow.core.yaml import (
list_yaml_files,
load_yaml_documents,
load_yaml_file,
load_yaml_source,
load_yaml_sources,
merge_yaml_values,
)
class TestLoadYamlFile:
def test_loads_mapping(self, tmp_path):
f = tmp_path / "a.yaml"
f.write_text("key: value\n")
assert load_yaml_file(f) == {"key": "value"}
def test_empty_file_returns_empty_dict(self, tmp_path):
f = tmp_path / "empty.yaml"
f.write_text("")
assert load_yaml_file(f) == {}
def test_non_mapping_raises(self, tmp_path):
f = tmp_path / "list.yaml"
f.write_text("- one\n- two\n")
with pytest.raises(ConfigError, match="mapping at root"):
load_yaml_file(f)
def test_invalid_yaml_raises(self, tmp_path):
f = tmp_path / "bad.yaml"
f.write_text(":\n :\n [invalid")
with pytest.raises(ConfigError, match="Invalid YAML"):
load_yaml_file(f)
class TestMergeYamlValues:
def test_dict_merge(self):
base = {"a": 1, "b": {"x": 10}}
overlay = {"b": {"y": 20}, "c": 3}
result = merge_yaml_values(base, overlay)
assert result == {"a": 1, "b": {"x": 10, "y": 20}, "c": 3}
def test_list_concat(self):
assert merge_yaml_values([1, 2], [3, 4]) == [1, 2, 3, 4]
def test_scalar_override(self):
assert merge_yaml_values("old", "new") == "new"
def test_overlay_wins_type_mismatch(self):
assert merge_yaml_values({"a": 1}, "scalar") == "scalar"
class TestListYamlFiles:
def test_lists_sorted(self, tmp_path):
(tmp_path / "b.yaml").write_text("b: 1\n")
(tmp_path / "a.yml").write_text("a: 1\n")
(tmp_path / "c.txt").write_text("ignored")
files = list_yaml_files(tmp_path)
assert [f.name for f in files] == ["a.yml", "b.yaml"]
def test_missing_dir_returns_empty(self, tmp_path):
assert list_yaml_files(tmp_path / "nope") == []
class TestLoadYamlSource:
def test_file(self, tmp_path):
f = tmp_path / "config.yaml"
f.write_text("key: val\n")
assert load_yaml_source(f) == {"key": "val"}
def test_directory_merges(self, tmp_path):
(tmp_path / "01.yaml").write_text("a: 1\n")
(tmp_path / "02.yaml").write_text("b: 2\n")
result = load_yaml_source(tmp_path)
assert result == {"a": 1, "b": 2}
def test_missing_returns_empty(self, tmp_path):
assert load_yaml_source(tmp_path / "gone") == {}
class TestLoadYamlDocuments:
def test_single_file(self, tmp_path):
f = tmp_path / "doc.yaml"
f.write_text("x: 1\n")
docs = load_yaml_documents(f)
assert docs == [{"x": 1}]
def test_directory(self, tmp_path):
(tmp_path / "a.yaml").write_text("a: 1\n")
(tmp_path / "b.yaml").write_text("b: 2\n")
docs = load_yaml_documents(tmp_path)
assert docs == [{"a": 1}, {"b": 2}]
def test_missing_returns_empty(self, tmp_path):
assert load_yaml_documents(tmp_path / "gone") == []
class TestLoadYamlSources:
def test_merges_multiple_paths(self, tmp_path):
d1 = tmp_path / "d1"
d2 = tmp_path / "d2"
d1.mkdir()
d2.mkdir()
(d1 / "a.yaml").write_text("a: 1\n")
(d2 / "b.yaml").write_text("b: 2\n")
result = load_yaml_sources(d1, d2)
assert result == {"a": 1, "b": 2}

View File

@@ -33,13 +33,13 @@ class TestParseProfile:
profile = parse_profile("test", raw)
assert len(profile.ssh_keys) == 1
def test_ssh_keygen_alias(self):
raw = {"ssh-keygen": [{"filename": "id_work", "type": "ed25519"}]}
def test_ssh_keys_with_filename(self):
raw = {"ssh-keys": [{"filename": "id_work", "type": "ed25519"}]}
profile = parse_profile("test", raw)
assert profile.ssh_keys[0]["path"] == "~/.ssh/id_work"
def test_requires_alias(self):
profile = parse_profile("test", {"requires": ["USER_EMAIL"]})
def test_env_required(self):
profile = parse_profile("test", {"env-required": ["USER_EMAIL"]})
assert profile.env_required == ("USER_EMAIL",)
def test_post_link_and_dotfiles_profile(self):

View File

@@ -52,6 +52,18 @@ class TestResolveMounts:
mounts = resolve_mounts(tmp_path, dotfiles_dir=dotfiles)
assert any(m.target.endswith("/flow/dotfiles") for m in mounts)
def test_socket_path_mount(self, tmp_path):
sock = tmp_path / "docker.sock"
sock.write_text("")
mounts = resolve_mounts(tmp_path, socket_path=sock)
socket_mounts = [m for m in mounts if m.target == "/var/run/docker.sock"]
assert len(socket_mounts) == 1
assert socket_mounts[0].source == sock
def test_no_socket_path(self, tmp_path):
mounts = resolve_mounts(tmp_path)
assert not any(m.target == "/var/run/docker.sock" for m in mounts)
class TestBuildContainerSpec:
def test_basic(self):
@@ -68,10 +80,12 @@ class TestBuildContainerSpec:
class TestMount:
def test_to_flag(self):
def test_fields(self):
m = Mount(source=Path("/src"), target="/dst")
assert m.to_flag() == "-v /src:/dst"
assert m.source == Path("/src")
assert m.target == "/dst"
assert m.readonly is False
def test_to_flag_readonly(self):
def test_readonly(self):
m = Mount(source=Path("/src"), target="/dst", readonly=True)
assert ":ro" in m.to_flag()
assert m.readonly is True

View File

@@ -58,6 +58,9 @@ class TestPlanLink:
plan = plan_link([new], current, _fs_check_none)
types = [op.type for op in plan.operations]
assert types == ["remove_link", "create_link"]
assert plan.summary.updated == 1
assert plan.summary.added == 0
assert plan.summary.removed == 0
def test_unmanaged_file_at_target_is_conflict(self):
desired = [_lt("/home/x/.zshrc")]
@@ -71,6 +74,16 @@ class TestPlanLink:
assert plan.summary.from_modules == 1
def test_broken_symlink_is_repaired(self):
lt = _lt("/home/x/.zshrc")
current = LinkedState(links={Path("/home/x/.zshrc"): lt})
plan = plan_link([lt], current, lambda p: "broken_symlink")
types = [op.type for op in plan.operations]
assert types == ["remove_link", "create_link"]
assert plan.summary.updated == 1
assert plan.summary.unchanged == 0
class TestPlanUnlink:
def test_unlink_all(self):
lt = _lt("/home/x/.zshrc")

View File

@@ -140,7 +140,7 @@ class TestResolveBinaryAsset:
name="fd", type="binary", sources={},
source="github:sharkdp/fd",
version="v10.2.0",
asset_pattern="fd-v10.2.0-{arch}-unknown-{os}-gnu.tar.gz",
asset_pattern="fd-v10.2.0-{{arch}}-unknown-{{os}}-gnu.tar.gz",
platform_map={},
extract_dir=None, install={},
post_install=None, allow_sudo=False,

View File

@@ -1,6 +1,6 @@
"""Tests for flow.core.errors."""
from flow.core.errors import ConfigError, ExecutionError, FlowError, PlanConflict
from flow.core.errors import ConfigError, FlowError, PlanConflict
def test_flow_error_is_exception():
@@ -15,7 +15,3 @@ def test_plan_conflict_carries_conflicts():
err = PlanConflict("2 conflicts", ["a exists", "b exists"])
assert str(err) == "2 conflicts"
assert err.conflicts == ["a exists", "b exists"]
def test_execution_error_is_flow_error():
assert issubclass(ExecutionError, FlowError)

View File

@@ -4,35 +4,20 @@ import subprocess
from flow.core.config import AppConfig, FlowContext
from flow.core.console import Console
from flow.core.containers import ContainerRuntime
from flow.core.platform import PlatformInfo
from flow.core.runtime import CommandRunner, SystemRuntime
from flow.core.runtime import SystemRuntime
from flow.core import paths
from flow.services.containers import ContainerService
class FakeRunner(CommandRunner):
"""CommandRunner that captures calls instead of executing."""
def __init__(self):
self.calls: list[tuple] = []
def run(self, argv, *, cwd=None, env=None, capture_output=True, check=False, timeout=None):
self.calls.append(("run", list(argv)))
command = list(argv)
if command[:4] == ["docker", "container", "ls", "-a"]:
return subprocess.CompletedProcess(argv, 0, stdout="dev-api\n", stderr="")
if command[:3] == ["docker", "container", "ls"]:
return subprocess.CompletedProcess(argv, 0, stdout="dev-api\n", stderr="")
return subprocess.CompletedProcess(argv, 0, stdout="", stderr="")
def run_shell(self, command, *, cwd=None, env=None, capture_output=True, check=False, timeout=None):
self.calls.append(("run_shell", command))
return subprocess.CompletedProcess(command, 0, stdout="", stderr="")
from tests.fakes import FakeRunner
def _make_ctx(tmp_path, runner=None):
rt = SystemRuntime()
if runner:
rt.runner = runner
rt.containers = ContainerRuntime(runner, binary="docker")
return FlowContext(
config=AppConfig(),
manifest={},
@@ -46,34 +31,37 @@ class TestContainerService:
def test_create_dry_run(self, tmp_path, capsys, monkeypatch):
monkeypatch.setattr(paths, "HOME", tmp_path)
monkeypatch.setattr(paths, "DOTFILES_DIR", tmp_path / "dotfiles")
monkeypatch.setattr("flow.services.containers.runtime", lambda: "docker")
ctx = _make_ctx(tmp_path)
runner = FakeRunner(responses={
("ps", "{{.Names}}"): subprocess.CompletedProcess([], 0, stdout="dev-api\n"),
})
ctx = _make_ctx(tmp_path, runner=runner)
svc = ContainerService(ctx)
svc.create("api", "tm0/node", dry_run=True)
output = capsys.readouterr().out
assert "dev-api" in output
assert runner.calls == []
def test_list_no_containers(self, tmp_path, capsys, monkeypatch):
def test_list_no_containers(self, tmp_path, capsys):
runner = FakeRunner()
monkeypatch.setattr("flow.services.containers.runtime", lambda: "docker")
runner.run = lambda argv, **kwargs: subprocess.CompletedProcess(argv, 0, stdout="", stderr="")
ctx = _make_ctx(tmp_path, runner=runner)
svc = ContainerService(ctx)
svc.list()
output = capsys.readouterr().out
assert "No flow containers" in output
def test_stop_calls_docker(self, tmp_path, monkeypatch):
runner = FakeRunner()
monkeypatch.setattr("flow.services.containers.runtime", lambda: "docker")
def test_stop_calls_docker(self, tmp_path):
runner = FakeRunner(responses={
("ps",): subprocess.CompletedProcess([], 0, stdout="dev-api\n"),
})
ctx = _make_ctx(tmp_path, runner=runner)
svc = ContainerService(ctx)
svc.stop("api")
assert any("docker" in str(c) and "stop" in str(c) for c in runner.calls)
def test_remove_calls_docker(self, tmp_path, monkeypatch):
runner = FakeRunner()
monkeypatch.setattr("flow.services.containers.runtime", lambda: "docker")
def test_remove_calls_docker(self, tmp_path):
runner = FakeRunner(responses={
("ps",): subprocess.CompletedProcess([], 0, stdout="dev-api\n"),
})
ctx = _make_ctx(tmp_path, runner=runner)
svc = ContainerService(ctx)
svc.remove("api")

View File

@@ -1,6 +1,5 @@
"""Tests for DotfilesService."""
import subprocess
from pathlib import Path
import yaml
@@ -8,19 +7,10 @@ import yaml
from flow.core.config import AppConfig, FlowContext
from flow.core.console import Console
from flow.core.platform import PlatformInfo
from flow.core.runtime import CommandRunner, SystemRuntime
from flow.core.runtime import SystemRuntime
from flow.core import paths
from flow.services.dotfiles import DotfilesService
class FakeRunner(CommandRunner):
def __init__(self):
self.calls: list[list[str]] = []
def run(self, argv, *, cwd=None, env=None, capture_output=True, check=False, timeout=None):
command = [str(part) for part in argv]
self.calls.append(command)
return subprocess.CompletedProcess(command, 0, stdout="", stderr="")
from tests.fakes import FakeRunner
def _make_ctx(tmp_path, console=None):
@@ -206,7 +196,69 @@ class TestDotfilesServiceLink:
assert target.read_text() == "user managed file"
assert not target.is_symlink()
def test_sync_modules_includes_profile_layers(self, tmp_path, monkeypatch):
def test_status_shows_module_info(self, tmp_path, monkeypatch, capsys):
home = tmp_path / "home"
home.mkdir()
dotfiles = tmp_path / "dotfiles"
modules = tmp_path / "modules"
# Set up package with _module.yaml
pkg_dir = dotfiles / "_shared" / "nvim"
config_dir = pkg_dir / ".config" / "nvim"
config_dir.mkdir(parents=True)
(config_dir / "_module.yaml").write_text(yaml.dump({
"source": "github:test/nvim-config",
"ref": {"branch": "main"},
}))
# Set up cloned module
module_dir = modules / "_shared--nvim"
module_dir.mkdir(parents=True)
(module_dir / "init.lua").write_text("-- init")
monkeypatch.setattr(paths, "HOME", home)
monkeypatch.setattr(paths, "DOTFILES_DIR", dotfiles)
monkeypatch.setattr(paths, "MODULES_DIR", modules)
monkeypatch.setattr(paths, "LINKED_STATE", tmp_path / "state" / "linked.json")
ctx = _make_ctx(tmp_path)
svc = DotfilesService(ctx)
svc.link()
svc.status()
output = capsys.readouterr().out
assert "nvim" in output
assert "branch:main" in output
def test_repos_list_shows_dotfiles_and_modules(self, tmp_path, monkeypatch, capsys):
home = tmp_path / "home"
home.mkdir()
dotfiles = tmp_path / "dotfiles"
modules = tmp_path / "modules"
pkg_dir = dotfiles / "_shared" / "nvim"
config_dir = pkg_dir / ".config" / "nvim"
config_dir.mkdir(parents=True)
(config_dir / "_module.yaml").write_text(yaml.dump({
"source": "github:test/nvim-config",
"ref": {"branch": "main"},
}))
monkeypatch.setattr(paths, "HOME", home)
monkeypatch.setattr(paths, "DOTFILES_DIR", dotfiles)
monkeypatch.setattr(paths, "MODULES_DIR", modules)
monkeypatch.setattr(paths, "LINKED_STATE", tmp_path / "state" / "linked.json")
ctx = _make_ctx(tmp_path)
svc = DotfilesService(ctx)
svc.repos_list()
output = capsys.readouterr().out
assert "dotfiles" in output
assert "nvim" in output
assert "module" in output
def test_repos_pull_includes_profile_module_repos(self, tmp_path, monkeypatch):
home = tmp_path / "home"
home.mkdir()
dotfiles = tmp_path / "dotfiles"
@@ -234,5 +286,150 @@ class TestDotfilesServiceLink:
runtime=runtime,
)
DotfilesService(ctx).sync_modules()
DotfilesService(ctx).repos_pull()
assert any("linux-work--nvim" in " ".join(call) for call in runner.calls)
def test_repos_status_shows_repo_names(self, tmp_path, monkeypatch, capsys):
home = tmp_path / "home"
home.mkdir()
dotfiles = _setup_dotfiles(tmp_path, {
"zsh": {".zshrc": "# zsh"},
})
monkeypatch.setattr(paths, "HOME", home)
monkeypatch.setattr(paths, "DOTFILES_DIR", dotfiles)
monkeypatch.setattr(paths, "MODULES_DIR", tmp_path / "modules")
monkeypatch.setattr(paths, "LINKED_STATE", tmp_path / "state" / "linked.json")
# Make dotfiles dir look like a git repo for status
(dotfiles / ".git").mkdir()
runtime = SystemRuntime()
runner = FakeRunner()
runtime.runner = runner
runtime.git.runner = runner
ctx = FlowContext(
config=AppConfig(),
manifest={},
platform=PlatformInfo(),
console=Console(color=False),
runtime=runtime,
)
DotfilesService(ctx).repos_status()
output = capsys.readouterr().out
assert "dotfiles" in output
def test_repos_push_calls_git_push(self, tmp_path, monkeypatch):
home = tmp_path / "home"
home.mkdir()
dotfiles = _setup_dotfiles(tmp_path, {
"zsh": {".zshrc": "# zsh"},
})
monkeypatch.setattr(paths, "HOME", home)
monkeypatch.setattr(paths, "DOTFILES_DIR", dotfiles)
monkeypatch.setattr(paths, "MODULES_DIR", tmp_path / "modules")
monkeypatch.setattr(paths, "LINKED_STATE", tmp_path / "state" / "linked.json")
runtime = SystemRuntime()
runner = FakeRunner()
runtime.runner = runner
runtime.git.runner = runner
ctx = FlowContext(
config=AppConfig(),
manifest={},
platform=PlatformInfo(),
console=Console(color=False),
runtime=runtime,
)
DotfilesService(ctx).repos_push()
assert any("push" in " ".join(call) for call in runner.calls)
def test_repos_pull_dry_run_no_calls(self, tmp_path, monkeypatch, capsys):
home = tmp_path / "home"
home.mkdir()
dotfiles = _setup_dotfiles(tmp_path, {
"zsh": {".zshrc": "# zsh"},
})
monkeypatch.setattr(paths, "HOME", home)
monkeypatch.setattr(paths, "DOTFILES_DIR", dotfiles)
monkeypatch.setattr(paths, "MODULES_DIR", tmp_path / "modules")
monkeypatch.setattr(paths, "LINKED_STATE", tmp_path / "state" / "linked.json")
runtime = SystemRuntime()
runner = FakeRunner()
runtime.runner = runner
runtime.git.runner = runner
ctx = FlowContext(
config=AppConfig(),
manifest={},
platform=PlatformInfo(),
console=Console(color=False),
runtime=runtime,
)
DotfilesService(ctx).repos_pull(dry_run=True)
output = capsys.readouterr().out
assert "Would" in output
# No git calls should be made in dry run
assert not runner.calls
def test_status_filter_by_package(self, tmp_path, monkeypatch, capsys):
home = tmp_path / "home"
home.mkdir()
dotfiles = _setup_dotfiles(tmp_path, {
"zsh": {".zshrc": "# zsh"},
"git": {".gitconfig": "[user]"},
})
monkeypatch.setattr(paths, "HOME", home)
monkeypatch.setattr(paths, "DOTFILES_DIR", dotfiles)
monkeypatch.setattr(paths, "MODULES_DIR", tmp_path / "modules")
monkeypatch.setattr(paths, "LINKED_STATE", tmp_path / "state" / "linked.json")
ctx = _make_ctx(tmp_path)
svc = DotfilesService(ctx)
svc.link()
capsys.readouterr() # discard link output
svc.status(package_filter=["zsh"])
output = capsys.readouterr().out
assert "zsh" in output
# Only zsh should appear, not git
assert "_shared/git" not in output
def test_link_repairs_broken_symlinks(self, tmp_path, monkeypatch):
home = tmp_path / "home"
home.mkdir()
dotfiles = _setup_dotfiles(tmp_path, {
"zsh": {".zshrc": "# zsh config"},
})
monkeypatch.setattr(paths, "HOME", home)
monkeypatch.setattr(paths, "DOTFILES_DIR", dotfiles)
monkeypatch.setattr(paths, "MODULES_DIR", tmp_path / "modules")
monkeypatch.setattr(paths, "LINKED_STATE", tmp_path / "state" / "linked.json")
ctx = _make_ctx(tmp_path)
svc = DotfilesService(ctx)
# Link normally
svc.link()
assert (home / ".zshrc").is_symlink()
# Break the symlink by removing its target
real_target = (home / ".zshrc").resolve()
(home / ".zshrc").unlink()
(home / ".zshrc").symlink_to("/nonexistent/path")
# Re-link should repair the broken symlink
svc.link()
assert (home / ".zshrc").is_symlink()
assert (home / ".zshrc").resolve() == real_target.resolve()