- DotfilesService: package discovery, module sync, link/unlink/status - PackageService: install/remove/list with PM and binary support - BootstrapService: profile-based system setup orchestration - RemoteService: SSH target resolution and connection - ContainerService: docker container lifecycle management - ProjectService: git repo status checking 26 service tests. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
79 lines
2.6 KiB
Python
79 lines
2.6 KiB
Python
"""Tests for ProjectService."""
|
|
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
from flow.core.config import AppConfig, FlowContext
|
|
from flow.core.console import Console
|
|
from flow.core.platform import PlatformInfo
|
|
from flow.core.runtime import SystemRuntime
|
|
from flow.services.projects import ProjectService
|
|
|
|
|
|
def _make_ctx(projects_dir):
|
|
return FlowContext(
|
|
config=AppConfig(projects_dir=str(projects_dir)),
|
|
manifest={},
|
|
platform=PlatformInfo(),
|
|
console=Console(color=False),
|
|
runtime=SystemRuntime(),
|
|
)
|
|
|
|
|
|
def _init_repo(path, commit=True):
|
|
"""Create a git repo with an initial commit."""
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
subprocess.run(["git", "init", str(path)], capture_output=True, check=True)
|
|
subprocess.run(["git", "-C", str(path), "config", "user.email", "test@test.com"], capture_output=True, check=True)
|
|
subprocess.run(["git", "-C", str(path), "config", "user.name", "Test"], capture_output=True, check=True)
|
|
if commit:
|
|
(path / "README.md").write_text("# test")
|
|
subprocess.run(["git", "-C", str(path), "add", "."], capture_output=True, check=True)
|
|
subprocess.run(["git", "-C", str(path), "commit", "-m", "init"], capture_output=True, check=True)
|
|
|
|
|
|
class TestProjectService:
|
|
def test_check_clean_repo(self, tmp_path, capsys):
|
|
projects = tmp_path / "projects"
|
|
projects.mkdir()
|
|
_init_repo(projects / "myrepo")
|
|
|
|
ctx = _make_ctx(projects)
|
|
svc = ProjectService(ctx)
|
|
svc.check(fetch=False)
|
|
|
|
output = capsys.readouterr().out
|
|
assert "myrepo" in output
|
|
assert "clean" in output
|
|
|
|
def test_check_uncommitted_changes(self, tmp_path, capsys):
|
|
projects = tmp_path / "projects"
|
|
projects.mkdir()
|
|
_init_repo(projects / "myrepo")
|
|
(projects / "myrepo" / "new_file.txt").write_text("changes")
|
|
|
|
ctx = _make_ctx(projects)
|
|
svc = ProjectService(ctx)
|
|
svc.check(fetch=False)
|
|
|
|
output = capsys.readouterr().out
|
|
assert "uncommitted" in output
|
|
|
|
def test_check_no_git_repos(self, tmp_path, capsys):
|
|
projects = tmp_path / "projects"
|
|
projects.mkdir()
|
|
(projects / "not-a-repo").mkdir()
|
|
|
|
ctx = _make_ctx(projects)
|
|
svc = ProjectService(ctx)
|
|
svc.check(fetch=False)
|
|
|
|
output = capsys.readouterr().out
|
|
assert "No git" in output
|
|
|
|
def test_missing_projects_dir(self, tmp_path, capsys):
|
|
ctx = _make_ctx(tmp_path / "nonexistent")
|
|
svc = ProjectService(ctx)
|
|
svc.check(fetch=False)
|
|
assert "not found" in capsys.readouterr().out
|