feat: add CLI entry point, command modules, and zsh completion

- CLI with context detection, config merging, VM blocking
- Command modules: dotfiles, packages, setup, remote, dev, projects
- Zsh completion with declarative command/subcommand/flag structure

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-16 05:06:31 +02:00
parent f79154d86f
commit 6ea23e02df
15 changed files with 466 additions and 1717 deletions

View File

@@ -1,101 +1,107 @@
"""CLI entry point — argparse routing and context creation.""" """Flow CLI entry point."""
from __future__ import annotations
import argparse import argparse
import os import os
import subprocess
import sys import sys
from typing import Optional
from flow import __version__ from flow import __version__
from flow.commands import bootstrap, completion, container, dotfiles, enter, package, sync from flow.core.config import AppConfig, FlowContext, load_config, load_manifest
from flow.core.config import FlowContext, load_config, load_manifest from flow.core.console import Console
from flow.core.console import ConsoleLogger from flow.core.errors import FlowError
from flow.core.paths import ensure_dirs from flow.core import paths
from flow.core.platform import detect_platform from flow.core.platform import PlatformInfo, detect_context, detect_platform
from flow.core.runtime import SystemRuntime
COMMAND_MODULES = [enter, container, dotfiles, bootstrap, package, sync, completion]
def _ensure_non_root(console: ConsoleLogger) -> None: def main(argv: Optional[list[str]] = None) -> None:
if os.geteuid() == 0: """Main entry point."""
console.error("flow must be run as a regular user (not root/sudo)") if os.getuid() == 0:
print("Error: flow must not run as root", file=sys.stderr)
sys.exit(1) sys.exit(1)
parser = _build_parser()
args = parser.parse_args(argv)
def main(): if args.version:
parser = argparse.ArgumentParser( print(f"flow {__version__}")
prog="flow",
description="DevFlow - A unified toolkit for managing development instances, containers, and profiles",
)
parser.add_argument(
"-v", "--version", action="version", version=f"flow {__version__}"
)
subparsers = parser.add_subparsers(dest="command")
for module in COMMAND_MODULES:
module.register(subparsers)
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(0)
console = ConsoleLogger()
_ensure_non_root(console)
if args.command == "completion":
handler = getattr(args, "handler", None)
if handler:
handler(None, args)
return return
if not hasattr(args, "handler"):
parser.print_help() parser.print_help()
return return
ensure_dirs() # Build context
console = Console(quiet=getattr(args, "quiet", False), color=None)
try:
platform_info = detect_platform() platform_info = detect_platform()
except RuntimeError as e: context = detect_context()
console.error(str(e))
# Block remote commands inside VMs
cmd_name = getattr(args, "command", "")
if context == "vm" and cmd_name in ("remote", "dev"):
console.error(f"Command '{cmd_name}' is not available inside a VM.")
sys.exit(1) sys.exit(1)
try: paths.ensure_dirs()
config = load_config() config = load_config(paths.CONFIG_DIR)
manifest = load_manifest() # Also try loading config from dotfiles
except Exception as e: if paths.DOTFILES_FLOW_CONFIG.is_dir():
console.error(f"Failed to load configuration: {e}") dotfiles_config = load_config(paths.DOTFILES_FLOW_CONFIG)
sys.exit(1) config = _merge_config(config, dotfiles_config)
manifest = load_manifest(paths.DOTFILES_FLOW_CONFIG)
ctx = FlowContext( ctx = FlowContext(
config=config, config=config,
manifest=manifest, manifest=manifest,
platform=platform_info, platform=platform_info,
console=console, console=console,
runtime=SystemRuntime(),
) )
handler = getattr(args, "handler", None)
if handler:
try: try:
handler(ctx, args) args.handler(ctx, args)
except FlowError as e:
console.error(str(e))
sys.exit(1)
except KeyboardInterrupt: except KeyboardInterrupt:
console.error("Interrupted") console.error("Interrupted.")
sys.exit(130) sys.exit(130)
except subprocess.CalledProcessError as e:
detail = (e.stderr or "").strip() or (e.stdout or "").strip()
if detail: def _merge_config(base: AppConfig, overlay: AppConfig) -> AppConfig:
console.error(detail.splitlines()[-1]) """Merge two configs: overlay values override base when non-default."""
else: return AppConfig(
console.error(f"Command failed with exit code {e.returncode}") dotfiles_url=overlay.dotfiles_url or base.dotfiles_url,
sys.exit(e.returncode or 1) dotfiles_branch=overlay.dotfiles_branch if overlay.dotfiles_branch != "main" else base.dotfiles_branch,
except RuntimeError as e: projects_dir=overlay.projects_dir if overlay.projects_dir != "~/projects" else base.projects_dir,
console.error(str(e)) container_registry=overlay.container_registry if overlay.container_registry != "registry.tomastm.com" else base.container_registry,
sys.exit(1) container_tag=overlay.container_tag if overlay.container_tag != "latest" else base.container_tag,
except OSError as e: tmux_session=overlay.tmux_session if overlay.tmux_session != "default" else base.tmux_session,
console.error(str(e)) targets=overlay.targets or base.targets,
sys.exit(1) )
except Exception as e:
console.error(f"Unexpected error: {e}")
sys.exit(1) def _build_parser() -> argparse.ArgumentParser:
else: parser = argparse.ArgumentParser(
parser.print_help() prog="flow",
description="DevFlow - development environment manager",
)
parser.add_argument("--version", action="store_true", help="Show version")
parser.add_argument("--quiet", "-q", action="store_true", help="Suppress info output")
subparsers = parser.add_subparsers(dest="command")
# Import and register all command modules
from flow.commands import dotfiles, packages, setup, remote, dev, projects, completion
dotfiles.register(subparsers)
packages.register(subparsers)
setup.register(subparsers)
remote.register(subparsers)
dev.register(subparsers)
projects.register(subparsers)
completion.register(subparsers)
return parser

View File

@@ -1,105 +0,0 @@
"""flow bootstrap — thin CLI adapter over the bootstrap service."""
import shutil
import urllib.request
from flow.services import bootstrap as _service
DEFAULT_LOCALE = _service.DEFAULT_LOCALE
PACKAGE_TYPES = _service.PACKAGE_TYPES
_SERVICE_COPY_INSTALL_ITEM = _service._copy_install_item
def _sync_service_module() -> None:
_service.shutil = shutil
_service.urllib = urllib
_service._copy_install_item = _copy_install_item
def register(subparsers):
_sync_service_module()
return _service.register(subparsers)
def _get_profiles(ctx):
_sync_service_module()
return _service._get_profiles(ctx)
def _parse_variables(var_args: list):
_sync_service_module()
return _service._parse_variables(var_args)
def _profile_template_context(ctx, extra_env, extra=None):
_sync_service_module()
return _service._profile_template_context(ctx, extra_env, extra)
def _render_template_value(value, template_ctx):
_sync_service_module()
return _service._render_template_value(value, template_ctx)
def _linux_detect_package_manager():
_sync_service_module()
return _service._linux_detect_package_manager()
def _resolve_package_manager(ctx, profile_cfg):
_sync_service_module()
return _service._resolve_package_manager(ctx, profile_cfg)
def _get_package_catalog(ctx):
_sync_service_module()
return _service._get_package_catalog(ctx)
def _normalize_profile_package_entry(entry):
_sync_service_module()
return _service._normalize_profile_package_entry(entry)
def _resolve_package_spec(catalog, profile_entry):
_sync_service_module()
return _service._resolve_package_spec(catalog, profile_entry)
def _resolve_pkg_source_name(spec, package_manager):
_sync_service_module()
return _service._resolve_pkg_source_name(spec, package_manager)
def _install_binary_package(ctx, spec, extra_env, dry_run):
_sync_service_module()
return _service._install_binary_package(ctx, spec, extra_env, dry_run)
def _copy_install_item(kind, src, declared_path):
return _SERVICE_COPY_INSTALL_ITEM(kind, src, declared_path)
def _ensure_required_variables(profile_cfg, env_map):
_sync_service_module()
return _service._ensure_required_variables(profile_cfg, env_map)
def run_bootstrap(ctx, args):
_sync_service_module()
return _service.run_bootstrap(ctx, args)
def run_list(ctx, args):
_sync_service_module()
return _service.run_list(ctx, args)
def run_show(ctx, args):
_sync_service_module()
return _service.run_show(ctx, args)
def run_packages(ctx, args):
_sync_service_module()
return _service.run_packages(ctx, args)

View File

@@ -1,623 +1,99 @@
"""flow completion — shell completion support (dynamic zsh).""" """Zsh completion for flow CLI."""
import argparse
import json
import shutil
import subprocess
from pathlib import Path
from typing import List, Optional, Sequence, Set
from flow.commands.enter import HOST_TEMPLATES COMMANDS = {
from flow.core.config import load_config, load_manifest "dotfiles": {
from flow.core.paths import DOTFILES_DIR, INSTALLED_STATE "subcommands": ["link", "unlink", "status", "sync"],
"flags": {
"link": ["--profile", "--dry-run", "--skip"],
"unlink": ["--dry-run"],
"status": [],
"sync": [],
},
},
"packages": {
"subcommands": ["install", "remove", "list"],
"flags": {
"install": ["--profile", "--dry-run"],
"remove": ["--dry-run"],
"list": [],
},
},
"setup": {
"subcommands": ["run", "show", "list"],
"flags": {
"run": ["--dry-run"],
"show": [],
"list": [],
},
},
"remote": {
"subcommands": ["enter", "list"],
"flags": {
"enter": ["--dry-run"],
"list": [],
},
},
"dev": {
"subcommands": ["create", "enter", "stop", "remove", "list"],
"flags": {
"create": ["--namespace", "--dry-run"],
"enter": ["--shell"],
"stop": [],
"remove": [],
"list": [],
},
},
"projects": {
"subcommands": ["check"],
"flags": {
"check": ["--fetch"],
},
},
}
ZSH_RC_START = "# >>> flow completion >>>"
ZSH_RC_END = "# <<< flow completion <<<"
TOP_LEVEL_COMMANDS = [ def complete(comp_words: list[str], comp_cword: int) -> list[str]:
"enter", """Return completions for the current word."""
"dev", if comp_cword <= 1:
"dotfiles", # Complete top-level commands
"dot", prefix = comp_words[1] if len(comp_words) > 1 else ""
"bootstrap", return [c for c in COMMANDS if c.startswith(prefix)]
"setup",
"provision", command = comp_words[1] if len(comp_words) > 1 else ""
"package", if command not in COMMANDS:
"pkg", return []
"sync",
"completion", cmd_def = COMMANDS[command]
]
if comp_cword == 2:
# Complete subcommands
prefix = comp_words[2] if len(comp_words) > 2 else ""
return [s for s in cmd_def["subcommands"] if s.startswith(prefix)]
if comp_cword >= 3:
# Complete flags for the subcommand
subcommand = comp_words[2] if len(comp_words) > 2 else ""
flags = cmd_def["flags"].get(subcommand, [])
prefix = comp_words[comp_cword] if comp_cword < len(comp_words) else ""
return [f for f in flags if f.startswith(prefix)]
return []
def register(subparsers): def register(subparsers):
p = subparsers.add_parser("completion", help="Shell completion helpers") """Register completion subcommand."""
sub = p.add_subparsers(dest="completion_command") p = subparsers.add_parser("completion", help="Shell completion")
p.add_argument("--shell", default="zsh", choices=["zsh"])
p.set_defaults(handler=_handle)
zsh = sub.add_parser("zsh", help="Print zsh completion script")
zsh.set_defaults(handler=run_zsh_script)
install = sub.add_parser("install-zsh", help="Install zsh completion script") def _handle(ctx, args):
install.add_argument( """Output completion script."""
"--dir", import os
default="~/.zsh/completions", comp_words = os.environ.get("COMP_WORDS", "").split()
help="Directory where _flow completion file is written", comp_cword = int(os.environ.get("COMP_CWORD", "0"))
)
install.add_argument(
"--rc",
default="~/.zshrc",
help="Shell rc file to update with fpath/compinit snippet",
)
install.add_argument(
"--no-rc",
action="store_true",
help="Do not modify rc file; only write completion script",
)
install.set_defaults(handler=run_install_zsh)
hidden = sub.add_parser("_zsh_complete", help=argparse.SUPPRESS) completions = complete(comp_words, comp_cword)
hidden.add_argument("--cword", type=int, required=True, help=argparse.SUPPRESS) for c in completions:
hidden.add_argument("words", nargs="*", help=argparse.SUPPRESS) print(c)
hidden.set_defaults(handler=run_zsh_complete)
p.set_defaults(handler=lambda _ctx, args: p.print_help())
def _canonical_command(command: str) -> str:
alias_map = {
"dot": "dotfiles",
"setup": "bootstrap",
"provision": "bootstrap",
"pkg": "package",
}
return alias_map.get(command, command)
def _safe_config():
try:
return load_config()
except Exception:
return None
def _safe_manifest():
try:
return load_manifest()
except Exception:
return {}
def _list_targets() -> List[str]:
cfg = _safe_config()
if cfg is None:
return []
return sorted({f"{t.namespace}@{t.platform}" for t in cfg.targets})
def _list_namespaces() -> List[str]:
cfg = _safe_config()
if cfg is None:
return []
return sorted({t.namespace for t in cfg.targets})
def _list_platforms() -> List[str]:
cfg = _safe_config()
config_platforms: Set[str] = set()
if cfg is not None:
config_platforms = {t.platform for t in cfg.targets}
return sorted(set(HOST_TEMPLATES.keys()) | config_platforms)
def _list_bootstrap_profiles() -> List[str]:
manifest = _safe_manifest()
return sorted(manifest.get("profiles", {}).keys())
def _list_manifest_packages() -> List[str]:
manifest = _safe_manifest()
packages = manifest.get("packages", [])
names: Set[str] = set()
if isinstance(packages, list):
for pkg in packages:
if not isinstance(pkg, dict):
continue
name = pkg.get("name")
if not isinstance(name, str) or not name:
continue
if str(pkg.get("type", "pkg")) == "binary":
names.add(name)
return sorted(names)
if isinstance(packages, dict):
for key, pkg in packages.items():
if not isinstance(pkg, dict):
continue
if str(pkg.get("type", "pkg")) != "binary":
continue
raw_name = pkg.get("name")
if isinstance(raw_name, str) and raw_name:
names.add(raw_name)
elif isinstance(key, str) and key:
names.add(key)
return sorted(names)
def _list_installed_packages() -> List[str]:
if not INSTALLED_STATE.exists():
return []
try:
with open(INSTALLED_STATE) as f:
state = json.load(f)
except Exception:
return []
if not isinstance(state, dict):
return []
return sorted(state.keys())
def _list_dotfiles_profiles() -> List[str]:
flow_dir = DOTFILES_DIR
if not flow_dir.is_dir():
return []
return sorted(
[
p.name
for p in flow_dir.iterdir()
if p.is_dir() and not p.name.startswith(".") and not p.name.startswith("_")
]
)
def _list_dotfiles_packages(profile: Optional[str] = None) -> List[str]:
package_names: Set[str] = set()
flow_dir = DOTFILES_DIR
if not flow_dir.is_dir():
return []
shared = flow_dir / "_shared"
if shared.is_dir():
for pkg in shared.iterdir():
if pkg.is_dir() and not pkg.name.startswith("."):
package_names.add(pkg.name)
if profile:
profile_dir = flow_dir / profile
if profile_dir.is_dir():
for pkg in profile_dir.iterdir():
if pkg.is_dir() and not pkg.name.startswith("."):
package_names.add(pkg.name)
else:
for profile_dir in flow_dir.iterdir():
if profile_dir.name.startswith(".") or profile_dir.name.startswith("_"):
continue
if not profile_dir.is_dir():
continue
for pkg in profile_dir.iterdir():
if pkg.is_dir() and not pkg.name.startswith("."):
package_names.add(pkg.name)
return sorted(package_names)
def _list_container_names() -> List[str]:
runtime = None
for rt in ("docker", "podman"):
if shutil.which(rt):
runtime = rt
break
if not runtime:
return []
try:
result = subprocess.run(
[
runtime,
"ps",
"-a",
"--filter",
"label=dev=true",
"--format",
'{{.Label "dev.name"}}',
],
capture_output=True,
text=True,
timeout=1,
)
except Exception:
return []
if result.returncode != 0:
return []
names = []
for line in result.stdout.splitlines():
line = line.strip()
if line:
names.append(line)
return sorted(set(names))
def _split_words(words: Sequence[str], cword: int):
tokens = list(words)
index = max(0, cword - 1)
if tokens:
tokens = tokens[1:]
index = max(0, cword - 2)
if index > len(tokens):
index = len(tokens)
current = tokens[index] if index < len(tokens) else ""
before = tokens[:index]
return before, current
def _filter(candidates: Sequence[str], prefix: str) -> List[str]:
unique = sorted(set(candidates))
if not prefix:
return unique
return [c for c in unique if c.startswith(prefix)]
def _profile_from_before(before: Sequence[str]) -> Optional[str]:
for i, token in enumerate(before):
if token == "--profile" and i + 1 < len(before):
return before[i + 1]
return None
def _complete_dev(before: Sequence[str], current: str) -> List[str]:
if len(before) <= 1:
return _filter(["create", "exec", "connect", "list", "stop", "remove", "rm", "respawn"], current)
sub = "remove" if before[1] == "rm" else before[1]
if sub in {"remove", "stop", "connect", "exec", "respawn"}:
options = {
"remove": ["-f", "--force", "-h", "--help"],
"stop": ["--kill", "-h", "--help"],
"exec": ["-h", "--help"],
"connect": ["-h", "--help"],
"respawn": ["-h", "--help"],
}[sub]
if current.startswith("-"):
return _filter(options, current)
non_opt = [t for t in before[2:] if not t.startswith("-")]
if len(non_opt) == 0:
return _filter(_list_container_names(), current)
return []
if sub == "create":
options = ["-i", "--image", "-p", "--project", "-h", "--help"]
if before and before[-1] in ("-i", "--image"):
return _filter(["tm0/node", "docker/python", "docker/alpine"], current)
if current.startswith("-"):
return _filter(options, current)
return []
if sub == "list":
return []
return []
def _complete_dotfiles(before: Sequence[str], current: str) -> List[str]:
if len(before) <= 1:
if current.startswith("-"):
return _filter(["--verbose", "-h", "--help"], current)
return _filter(
["init", "link", "unlink", "undo", "status", "sync", "relink", "clean", "edit", "repo", "modules"],
current,
)
if before[1] == "--verbose":
if len(before) <= 2:
if current.startswith("-"):
return _filter(["-h", "--help"], current)
return _filter(
["init", "link", "unlink", "undo", "status", "sync", "relink", "clean", "edit", "repo", "modules"],
current,
)
before = [before[0]] + list(before[2:])
sub = before[1]
if sub == "init":
return _filter(["--repo", "-h", "--help"], current) if current.startswith("-") else []
if sub == "repo":
if len(before) <= 2:
return _filter(["status", "pull", "push"], current)
repo_sub = before[2]
if repo_sub == "pull":
if before and before[-1] == "--profile":
return _filter(_list_dotfiles_profiles(), current)
if current.startswith("-"):
return _filter(["--rebase", "--no-rebase", "--relink", "--profile", "-h", "--help"], current)
elif current.startswith("-"):
return _filter(["-h", "--help"], current)
return []
if sub == "modules":
if len(before) <= 2:
if current.startswith("-"):
return _filter(["-h", "--help"], current)
return _filter(["list", "sync"], current)
modules_sub = before[2]
if modules_sub in {"list", "sync"}:
if before and before[-1] == "--profile":
return _filter(_list_dotfiles_profiles(), current)
if current.startswith("-"):
return _filter(["--profile", "-h", "--help"], current)
profile = _profile_from_before(before)
return _filter(_list_dotfiles_packages(profile), current)
return []
if sub in {"link", "relink"}:
if before and before[-1] == "--profile":
return _filter(_list_dotfiles_profiles(), current)
if current.startswith("-"):
return _filter(["--profile", "--copy", "--force", "--dry-run", "-h", "--help"], current)
profile = _profile_from_before(before)
return _filter(_list_dotfiles_packages(profile), current)
if sub == "unlink":
if current.startswith("-"):
return _filter(["-h", "--help"], current)
return _filter(_list_dotfiles_packages(), current)
if sub == "undo":
return _filter(["-h", "--help"], current) if current.startswith("-") else []
if sub == "edit":
if current.startswith("-"):
return _filter(["--no-commit", "-h", "--help"], current)
non_opt = [t for t in before[2:] if not t.startswith("-")]
if len(non_opt) == 0:
return _filter(_list_dotfiles_packages(), current)
return []
if sub == "clean":
return _filter(["--dry-run", "-h", "--help"], current) if current.startswith("-") else []
if sub == "sync":
if before and before[-1] == "--profile":
return _filter(_list_dotfiles_profiles(), current)
return _filter(["--relink", "--profile", "-h", "--help"], current) if current.startswith("-") else []
return []
def _complete_bootstrap(before: Sequence[str], current: str) -> List[str]:
if len(before) <= 1:
return _filter(["run", "list", "show", "packages"], current)
sub = before[1]
if sub == "run":
if before and before[-1] == "--profile":
return _filter(_list_bootstrap_profiles(), current)
if current.startswith("-"):
return _filter(["--profile", "--dry-run", "--var", "-h", "--help"], current)
return []
if sub == "show":
if current.startswith("-"):
return _filter(["-h", "--help"], current)
non_opt = [t for t in before[2:] if not t.startswith("-")]
if len(non_opt) == 0:
return _filter(_list_bootstrap_profiles(), current)
return []
if sub == "packages":
if before and before[-1] == "--profile":
return _filter(_list_bootstrap_profiles(), current)
if current.startswith("-"):
return _filter(["--profile", "--resolved", "-h", "--help"], current)
return []
return []
def _complete_package(before: Sequence[str], current: str) -> List[str]:
if len(before) <= 1:
return _filter(["install", "list", "remove"], current)
sub = before[1]
if sub == "install":
if current.startswith("-"):
return _filter(["--dry-run", "-h", "--help"], current)
return _filter(_list_manifest_packages(), current)
if sub == "remove":
if current.startswith("-"):
return _filter(["-h", "--help"], current)
return _filter(_list_installed_packages(), current)
if sub == "list":
if current.startswith("-"):
return _filter(["--all", "-h", "--help"], current)
return []
return []
def _complete_sync(before: Sequence[str], current: str) -> List[str]:
if len(before) <= 1:
return _filter(["check", "fetch", "summary"], current)
sub = before[1]
if sub == "check" and current.startswith("-"):
return _filter(["--fetch", "--no-fetch", "-h", "--help"], current)
if current.startswith("-"):
return _filter(["-h", "--help"], current)
return []
def complete(words: Sequence[str], cword: int) -> List[str]:
before, current = _split_words(words, cword)
if not before:
return _filter(TOP_LEVEL_COMMANDS + ["-h", "--help", "-v", "--version"], current)
command = _canonical_command(before[0])
if command == "enter":
if before and before[-1] in ("-p", "--platform"):
return _filter(_list_platforms(), current)
if before and before[-1] in ("-n", "--namespace"):
return _filter(_list_namespaces(), current)
if current.startswith("-"):
return _filter(
["-u", "--user", "-n", "--namespace", "-p", "--platform", "-s", "--session", "--no-tmux", "-d", "--dry-run", "-h", "--help"],
current,
)
return _filter(_list_targets(), current)
if command == "dev":
return _complete_dev(before, current)
if command == "dotfiles":
return _complete_dotfiles(before, current)
if command == "bootstrap":
return _complete_bootstrap(before, current)
if command == "package":
return _complete_package(before, current)
if command == "sync":
return _complete_sync(before, current)
if command == "completion":
if len(before) <= 1:
return _filter(["zsh", "install-zsh"], current)
sub = before[1]
if sub == "install-zsh" and current.startswith("-"):
return _filter(["--dir", "--rc", "--no-rc", "-h", "--help"], current)
return []
return []
def run_zsh_complete(_ctx, args):
candidates = complete(args.words, args.cword)
for item in candidates:
print(item)
def _zsh_script_text() -> str:
return r'''#compdef flow
_flow() {
local -a suggestions
suggestions=("${(@f)$(flow completion _zsh_complete --cword "$CURRENT" -- "${words[@]}" 2>/dev/null)}")
if (( ${#suggestions[@]} > 0 )); then
compadd -Q -- "${suggestions[@]}"
return 0
fi
if [[ "$words[CURRENT]" == */* || "$words[CURRENT]" == ./* || "$words[CURRENT]" == ~* ]]; then
_files
return 0
fi
return 1
}
compdef _flow flow
'''
def _zsh_dir_for_rc(path: Path) -> str:
home = Path.home().resolve()
resolved = path.expanduser().resolve()
try:
rel = resolved.relative_to(home)
return f"~/{rel}" if str(rel) != "." else "~"
except ValueError:
return str(resolved)
def _zsh_rc_snippet(completions_dir: Path) -> str:
dir_expr = _zsh_dir_for_rc(completions_dir)
return (
f"{ZSH_RC_START}\n"
f"fpath=({dir_expr} $fpath)\n"
"autoload -Uz compinit && compinit\n"
f"{ZSH_RC_END}\n"
)
def _ensure_rc_snippet(rc_path: Path, completions_dir: Path) -> bool:
snippet = _zsh_rc_snippet(completions_dir)
if rc_path.exists():
content = rc_path.read_text()
else:
content = ""
if ZSH_RC_START in content and ZSH_RC_END in content:
start = content.find(ZSH_RC_START)
end = content.find(ZSH_RC_END, start)
if end >= 0:
end += len(ZSH_RC_END)
updated = content[:start] + snippet.rstrip("\n") + content[end:]
if updated == content:
return False
rc_path.parent.mkdir(parents=True, exist_ok=True)
rc_path.write_text(updated)
return True
sep = "" if content.endswith("\n") or content == "" else "\n"
rc_path.parent.mkdir(parents=True, exist_ok=True)
rc_path.write_text(content + sep + snippet)
return True
def run_install_zsh(_ctx, args):
completions_dir = Path(args.dir).expanduser()
completions_dir.mkdir(parents=True, exist_ok=True)
completion_file = completions_dir / "_flow"
completion_file.write_text(_zsh_script_text())
print(f"Installed completion script: {completion_file}")
if args.no_rc:
print("Skipped rc file update (--no-rc)")
return
rc_path = Path(args.rc).expanduser()
changed = _ensure_rc_snippet(rc_path, completions_dir)
if changed:
print(f"Updated shell rc: {rc_path}")
else:
print(f"Shell rc already configured: {rc_path}")
print("Restart shell or run: autoload -Uz compinit && compinit")
def run_zsh_script(_ctx, _args):
print(_zsh_script_text())

View File

@@ -1,121 +0,0 @@
"""flow dev <subcommand> — container management."""
import os
import shutil
import subprocess
from flow.services.containers import (
CONTAINER_HOME,
DEFAULT_REGISTRY,
DEFAULT_TAG,
ContainerService,
container_name as _cname,
parse_image_ref as _parse_image_ref,
runtime as _runtime_service,
)
def register(subparsers):
parser = subparsers.add_parser("dev", help="Manage development containers")
sub = parser.add_subparsers(dest="dev_command")
create = sub.add_parser("create", help="Create and start a development container")
create.add_argument("name", help="Container name")
create.add_argument("-i", "--image", required=True, help="Container image")
create.add_argument("-p", "--project", help="Path to project directory")
create.set_defaults(handler=run_create)
exec_cmd = sub.add_parser("exec", help="Execute command in a container")
exec_cmd.add_argument("name", help="Container name")
exec_cmd.add_argument("cmd", nargs="*", help="Command to run (default: interactive shell)")
exec_cmd.set_defaults(handler=run_exec)
connect = sub.add_parser("connect", help="Attach to container tmux session")
connect.add_argument("name", help="Container name")
connect.set_defaults(handler=run_connect)
list_parser = sub.add_parser("list", help="List development containers")
list_parser.set_defaults(handler=run_list)
stop = sub.add_parser("stop", help="Stop a development container")
stop.add_argument("name", help="Container name")
stop.add_argument("--kill", action="store_true", help="Kill instead of graceful stop")
stop.set_defaults(handler=run_stop)
remove = sub.add_parser("remove", aliases=["rm"], help="Remove a development container")
remove.add_argument("name", help="Container name")
remove.add_argument("-f", "--force", action="store_true", help="Force removal")
remove.set_defaults(handler=run_remove)
respawn = sub.add_parser("respawn", help="Respawn all tmux panes for a session")
respawn.add_argument("name", help="Session/container name")
respawn.set_defaults(handler=run_respawn)
parser.set_defaults(handler=lambda ctx, args: parser.print_help())
def _runtime():
return _runtime_service()
def _container_exists(rt: str, cname: str) -> bool:
result = subprocess.run(
[rt, "container", "ls", "-a", "--format", "{{.Names}}"],
capture_output=True,
text=True,
check=False,
)
return cname in result.stdout.strip().splitlines()
def _container_running(rt: str, cname: str) -> bool:
result = subprocess.run(
[rt, "container", "ls", "--format", "{{.Names}}"],
capture_output=True,
text=True,
check=False,
)
return cname in result.stdout.strip().splitlines()
def run_create(ctx, args):
ContainerService(ctx).run_create(args)
def run_exec(ctx, args):
ContainerService(ctx).run_exec(args)
def run_connect(ctx, args):
ContainerService(ctx).run_connect(args)
def run_list(ctx, args):
ContainerService(ctx).run_list(args)
def run_stop(ctx, args):
ContainerService(ctx).run_stop(args)
def run_remove(ctx, args):
ContainerService(ctx).run_remove(args)
def run_respawn(ctx, args):
ContainerService(ctx).run_respawn(args)
def _tmux_fallback(cname: str):
if not os.environ.get("TMUX"):
return
result = subprocess.run(
["tmux", "display-message", "-p", "#S"],
capture_output=True,
text=True,
check=False,
)
current = result.stdout.strip()
if current == cname:
subprocess.run(["tmux", "new-session", "-ds", "default"], capture_output=True, check=False)
subprocess.run(["tmux", "switch-client", "-t", "default"], check=False)

62
src/flow/commands/dev.py Normal file
View File

@@ -0,0 +1,62 @@
"""Dev container commands."""
from flow.core.config import FlowContext
from flow.services.containers import ContainerService
def register(subparsers):
p = subparsers.add_parser("dev", help="Manage development containers")
sub = p.add_subparsers(dest="dev_action")
create = sub.add_parser("create", help="Create a container")
create.add_argument("image", help="Container image")
create.add_argument("--namespace", "-n", default="default")
create.add_argument("--dry-run", action="store_true")
create.set_defaults(handler=_create)
enter = sub.add_parser("enter", help="Enter a running container")
enter.add_argument("name", help="Container name")
enter.add_argument("--shell", default="/bin/bash")
enter.set_defaults(handler=_enter)
stop = sub.add_parser("stop", help="Stop a container")
stop.add_argument("name", help="Container name")
stop.set_defaults(handler=_stop)
rm = sub.add_parser("remove", help="Remove a container")
rm.add_argument("name", help="Container name")
rm.set_defaults(handler=_remove)
ls = sub.add_parser("list", help="List flow containers")
ls.set_defaults(handler=_list)
p.set_defaults(handler=_default)
def _default(ctx: FlowContext, args):
_list(ctx, args)
def _create(ctx: FlowContext, args):
svc = ContainerService(ctx)
svc.create(args.image, args.namespace, dry_run=args.dry_run)
def _enter(ctx: FlowContext, args):
svc = ContainerService(ctx)
svc.enter(args.name, shell=args.shell)
def _stop(ctx: FlowContext, args):
svc = ContainerService(ctx)
svc.stop(args.name)
def _remove(ctx: FlowContext, args):
svc = ContainerService(ctx)
svc.remove(args.name)
def _list(ctx: FlowContext, args):
svc = ContainerService(ctx)
svc.list()

View File

@@ -1,193 +1,59 @@
"""flow dotfiles — thin CLI adapter over the dotfiles service.""" """Dotfiles commands."""
from pathlib import Path from flow.core.config import FlowContext
from flow.services.dotfiles import DotfilesService
from flow.core.paths import DOTFILES_DIR, LINKED_STATE, MODULES_DIR
from flow.services import dotfiles as _service
RESERVED_SHARED = "_shared"
RESERVED_ROOT = "_root"
MODULE_FILE = "_module.yaml"
LINK_BACKUP_DIR = LINKED_STATE.parent / "link-backups"
LinkSpec = _service.LinkSpec
ModuleSpec = _service.ModuleSpec
def _sync_service_module() -> None:
_service.DOTFILES_DIR = DOTFILES_DIR
_service.MODULES_DIR = MODULES_DIR
_service.LINKED_STATE = LINKED_STATE
_service.LINK_BACKUP_DIR = LINKED_STATE.parent / "link-backups"
_service.RESERVED_SHARED = RESERVED_SHARED
_service.RESERVED_ROOT = RESERVED_ROOT
_service.MODULE_FILE = MODULE_FILE
def register(subparsers): def register(subparsers):
_sync_service_module() p = subparsers.add_parser("dotfiles", help="Manage dotfile symlinks")
return _service.register(subparsers) sub = p.add_subparsers(dest="dotfiles_action")
link = sub.add_parser("link", help="Link dotfiles to home")
link.add_argument("--profile", help="Profile to include")
link.add_argument("--dry-run", "-n", action="store_true")
link.add_argument("--skip", nargs="*", default=[])
link.set_defaults(handler=_link)
unlink = sub.add_parser("unlink", help="Remove managed symlinks")
unlink.add_argument("packages", nargs="*", help="Packages to unlink (all if empty)")
unlink.add_argument("--dry-run", "-n", action="store_true")
unlink.set_defaults(handler=_unlink)
status = sub.add_parser("status", help="Show link status")
status.set_defaults(handler=_status)
sync = sub.add_parser("sync", help="Pull dotfiles and sync modules")
sync.set_defaults(handler=_sync)
p.set_defaults(handler=_default)
def _flow_config_dir(dotfiles_dir=None): def _default(ctx: FlowContext, args):
_sync_service_module() _status(ctx, args)
return _service._flow_config_dir(dotfiles_dir)
def _pull_requires_ack(stdout: str, stderr: str) -> bool: def _link(ctx: FlowContext, args):
_sync_service_module() svc = DotfilesService(ctx)
return _service._pull_requires_ack(stdout, stderr) svc.link(
profile=args.profile,
dry_run=args.dry_run,
def _load_state() -> dict: skip=set(args.skip) if args.skip else None,
_sync_service_module()
return _service._load_state()
def _save_state(state: dict) -> None:
_sync_service_module()
return _service._save_state(state)
def _load_link_specs_from_state():
_sync_service_module()
return _service._load_link_specs_from_state()
def _save_link_specs_to_state(specs):
_sync_service_module()
return _service._save_link_specs_to_state(specs)
def _list_profiles(flow_dir: Path):
_sync_service_module()
return _service._list_profiles(flow_dir)
def _walk_package(source_dir: Path):
_sync_service_module()
return _service._walk_package(source_dir)
def _discover_packages(dotfiles_dir: Path, profile=None):
_sync_service_module()
return _service._discover_packages(dotfiles_dir, profile)
def _resolve_edit_target(target: str, dotfiles_dir=None):
_sync_service_module()
return _service._resolve_edit_target(target, dotfiles_dir)
def _resolved_package_source(ctx, package: str, package_dir: Path, *, verbose: bool = False):
_sync_service_module()
return _service._resolved_package_source(ctx, package, package_dir, verbose=verbose)
def _run_sudo(cmd, *, dry_run: bool = False):
_sync_service_module()
return _service._run_sudo(cmd, dry_run=dry_run)
def _collect_home_specs(ctx, flow_dir, home, profile, skip, package_filter, *, verbose: bool = False):
_sync_service_module()
return _service._collect_home_specs(
ctx,
flow_dir,
home,
profile,
skip,
package_filter,
verbose=verbose,
) )
def _sync_modules(ctx, *, verbose: bool = False, profile=None, package_filter=None): def _unlink(ctx: FlowContext, args):
_sync_service_module() svc = DotfilesService(ctx)
return _service._sync_modules( svc.unlink(
ctx, packages=args.packages if args.packages else None,
verbose=verbose, dry_run=args.dry_run,
profile=profile,
package_filter=package_filter,
) )
def _sync_to_desired(ctx, desired, *, force: bool, dry_run: bool, copy: bool): def _status(ctx: FlowContext, args):
_sync_service_module() svc = DotfilesService(ctx)
return _service._sync_to_desired( svc.status()
ctx,
desired,
force=force,
dry_run=dry_run,
copy=copy,
)
def run_init(ctx, args): def _sync(ctx: FlowContext, args):
_sync_service_module() svc = DotfilesService(ctx)
return _service.run_init(ctx, args) svc.sync()
def run_link(ctx, args):
_sync_service_module()
return _service.run_link(ctx, args)
def run_unlink(ctx, args):
_sync_service_module()
return _service.run_unlink(ctx, args)
def run_undo(ctx, args):
_sync_service_module()
return _service.run_undo(ctx, args)
def run_status(ctx, args):
_sync_service_module()
return _service.run_status(ctx, args)
def run_sync(ctx, args):
_sync_service_module()
return _service.run_sync(ctx, args)
def run_modules_list(ctx, args):
_sync_service_module()
return _service.run_modules_list(ctx, args)
def run_modules_sync(ctx, args):
_sync_service_module()
return _service.run_modules_sync(ctx, args)
def run_repo_status(ctx, args):
_sync_service_module()
return _service.run_repo_status(ctx, args)
def run_repo_pull(ctx, args):
_sync_service_module()
return _service.run_repo_pull(ctx, args)
def run_repo_push(ctx, args):
_sync_service_module()
return _service.run_repo_push(ctx, args)
def run_relink(ctx, args):
_sync_service_module()
return _service.run_relink(ctx, args)
def run_clean(ctx, args):
_sync_service_module()
return _service.run_clean(ctx, args)
def run_edit(ctx, args):
_sync_service_module()
return _service.run_edit(ctx, args)

View File

@@ -1,30 +0,0 @@
"""flow enter — connect to a development instance via SSH."""
from flow.services.ssh import (
HOST_TEMPLATES,
EnterService,
build_destination as _build_destination,
handle_terminfo_warning as _handle_terminfo_warning,
parse_target as _parse_target_model,
terminfo_fix_command as _terminfo_fix_command,
)
def register(subparsers):
parser = subparsers.add_parser("enter", help="Connect to a development instance via SSH")
parser.add_argument("target", help="Target: [user@]namespace@platform")
parser.add_argument("-u", "--user", help="SSH user (overrides target)")
parser.add_argument("-n", "--namespace", help="Namespace (overrides target)")
parser.add_argument("-p", "--platform", help="Platform (overrides target)")
parser.add_argument("-s", "--session", default="default", help="Tmux session name (default: 'default')")
parser.add_argument("--no-tmux", action="store_true", help="Skip tmux attachment")
parser.add_argument("-d", "--dry-run", action="store_true", help="Show command without executing")
parser.set_defaults(handler=run)
def _parse_target(target: str):
return _parse_target_model(target)
def run(ctx, args):
EnterService(ctx).run(args)

View File

@@ -1,150 +0,0 @@
"""flow package — package management from unified manifest definitions."""
import json
import sys
from flow.core.paths import INSTALLED_STATE
from flow.services.package_defs import BinaryInstaller, get_package_catalog
def register(subparsers):
p = subparsers.add_parser("package", aliases=["pkg"], help="Manage packages")
sub = p.add_subparsers(dest="package_command")
inst = sub.add_parser("install", help="Install packages from manifest")
inst.add_argument("packages", nargs="+", help="Package names to install")
inst.add_argument("--dry-run", action="store_true", help="Show what would be done")
inst.set_defaults(handler=run_install)
ls = sub.add_parser("list", help="List installed and available packages")
ls.add_argument("--all", action="store_true", help="Show all available packages")
ls.set_defaults(handler=run_list)
rm = sub.add_parser("remove", help="Remove installed packages")
rm.add_argument("packages", nargs="+", help="Package names to remove")
rm.set_defaults(handler=run_remove)
p.set_defaults(handler=lambda ctx, args: p.print_help())
def _load_installed() -> dict:
if not INSTALLED_STATE.exists():
return {}
try:
with open(INSTALLED_STATE, "r", encoding="utf-8") as handle:
state = json.load(handle)
except (OSError, json.JSONDecodeError):
return {}
if isinstance(state, dict):
return state
return {}
def _save_installed(state: dict):
INSTALLED_STATE.parent.mkdir(parents=True, exist_ok=True)
with open(INSTALLED_STATE, "w", encoding="utf-8") as handle:
json.dump(state, handle, indent=2)
def _get_definitions(ctx):
return get_package_catalog(ctx)
def _install_binary_package(ctx, spec, extra_env, dry_run):
return BinaryInstaller(ctx).install(spec, extra_env, dry_run=dry_run)
def run_install(ctx, args):
definitions = _get_definitions(ctx)
installed = _load_installed()
had_error = False
for package_name in args.packages:
package_def = definitions.get(package_name)
if not package_def:
ctx.console.error(f"Package not found in manifest: {package_name}")
had_error = True
continue
package_type = package_def.get("type", "pkg")
if package_type != "binary":
ctx.console.error(
f"'flow package install' supports binary packages only. "
f"'{package_name}' is type '{package_type}'."
)
had_error = True
continue
ctx.console.info(f"Installing {package_name}...")
try:
_install_binary_package(ctx, package_def, {}, args.dry_run)
except RuntimeError as exc:
ctx.console.error(str(exc))
had_error = True
continue
if not args.dry_run:
installed[package_name] = {
"version": str(package_def.get("version", "")),
"type": package_type,
}
ctx.console.success(f"Installed {package_name}")
if not args.dry_run:
_save_installed(installed)
if had_error:
sys.exit(1)
def run_list(ctx, args):
definitions = _get_definitions(ctx)
installed = _load_installed()
rows = []
if args.all:
if not definitions:
ctx.console.info("No packages defined in manifest.")
return
for name, package_def in sorted(definitions.items()):
rows.append(
[
name,
str(package_def.get("type", "pkg")),
str(installed.get(name, {}).get("version", "-")),
str(package_def.get("version", "")) or "-",
]
)
else:
if not installed:
ctx.console.info("No packages installed.")
return
for name, info in sorted(installed.items()):
rows.append(
[
name,
str(info.get("type", "?")),
str(info.get("version", "?")),
str(definitions.get(name, {}).get("version", "")) or "-",
]
)
ctx.console.table(["PACKAGE", "TYPE", "INSTALLED", "AVAILABLE"], rows)
def run_remove(ctx, args):
installed = _load_installed()
for package_name in args.packages:
if package_name not in installed:
ctx.console.warn(f"Package not installed: {package_name}")
continue
del installed[package_name]
ctx.console.success(f"Removed {package_name} from installed packages")
ctx.console.warn(
"Note: installed files were not automatically deleted. Remove manually if needed."
)
_save_installed(installed)

View File

@@ -0,0 +1,48 @@
"""Package commands."""
from flow.core.config import FlowContext
from flow.services.packages import PackageService
def register(subparsers):
p = subparsers.add_parser("packages", help="Manage packages")
sub = p.add_subparsers(dest="packages_action")
install = sub.add_parser("install", help="Install packages")
install.add_argument("packages", nargs="*")
install.add_argument("--profile", help="Install profile packages")
install.add_argument("--dry-run", "-n", action="store_true")
install.set_defaults(handler=_install)
remove = sub.add_parser("remove", help="Remove packages")
remove.add_argument("packages", nargs="+")
remove.add_argument("--dry-run", "-n", action="store_true")
remove.set_defaults(handler=_remove)
ls = sub.add_parser("list", help="List installed packages")
ls.set_defaults(handler=_list)
p.set_defaults(handler=_default)
def _default(ctx: FlowContext, args):
_list(ctx, args)
def _install(ctx: FlowContext, args):
svc = PackageService(ctx)
svc.install(
package_names=args.packages if args.packages else None,
profile=args.profile,
dry_run=args.dry_run,
)
def _remove(ctx: FlowContext, args):
svc = PackageService(ctx)
svc.remove(args.packages, dry_run=args.dry_run)
def _list(ctx: FlowContext, args):
svc = PackageService(ctx)
svc.list_packages()

View File

@@ -0,0 +1,24 @@
"""Projects commands."""
from flow.core.config import FlowContext
from flow.services.projects import ProjectService
def register(subparsers):
p = subparsers.add_parser("projects", help="Manage git projects")
sub = p.add_subparsers(dest="projects_action")
check = sub.add_parser("check", help="Check project status")
check.add_argument("--fetch", "-f", action="store_true", help="Fetch remotes first")
check.set_defaults(handler=_check)
p.set_defaults(handler=_default)
def _default(ctx: FlowContext, args):
_check(ctx, args)
def _check(ctx: FlowContext, args):
svc = ProjectService(ctx)
svc.check(fetch=getattr(args, "fetch", False))

View File

@@ -0,0 +1,33 @@
"""Remote commands."""
from flow.core.config import FlowContext
from flow.services.remote import RemoteService
def register(subparsers):
p = subparsers.add_parser("remote", help="Manage remote targets")
sub = p.add_subparsers(dest="remote_action")
enter = sub.add_parser("enter", help="SSH into a target")
enter.add_argument("target", help="Target (namespace@platform)")
enter.add_argument("--dry-run", "-n", action="store_true")
enter.set_defaults(handler=_enter)
ls = sub.add_parser("list", help="List configured targets")
ls.set_defaults(handler=_list)
p.set_defaults(handler=_default)
def _default(ctx: FlowContext, args):
_list(ctx, args)
def _enter(ctx: FlowContext, args):
svc = RemoteService(ctx)
svc.enter(args.target, dry_run=args.dry_run)
def _list(ctx: FlowContext, args):
svc = RemoteService(ctx)
svc.list()

View File

@@ -0,0 +1,42 @@
"""Setup/bootstrap commands."""
from flow.core.config import FlowContext
from flow.services.bootstrap import BootstrapService
def register(subparsers):
p = subparsers.add_parser("setup", help="Bootstrap a system profile")
sub = p.add_subparsers(dest="setup_action")
run = sub.add_parser("run", help="Run bootstrap for a profile")
run.add_argument("profile", help="Profile name")
run.add_argument("--dry-run", "-n", action="store_true")
run.set_defaults(handler=_run)
show = sub.add_parser("show", help="Show bootstrap plan")
show.add_argument("profile", help="Profile name")
show.set_defaults(handler=_show)
ls = sub.add_parser("list", help="List available profiles")
ls.set_defaults(handler=_list)
p.set_defaults(handler=_default)
def _default(ctx: FlowContext, args):
_list(ctx, args)
def _run(ctx: FlowContext, args):
svc = BootstrapService(ctx)
svc.run(args.profile, dry_run=args.dry_run)
def _show(ctx: FlowContext, args):
svc = BootstrapService(ctx)
svc.show(args.profile)
def _list(ctx: FlowContext, args):
svc = BootstrapService(ctx)
svc.list_profiles()

View File

@@ -1,176 +0,0 @@
"""flow sync — check git sync status of all projects."""
import os
import subprocess
import sys
from flow.core.config import FlowContext
def register(subparsers):
parser = subparsers.add_parser("sync", help="Git sync tools for projects")
sub = parser.add_subparsers(dest="sync_command")
check = sub.add_parser("check", help="Check all projects status")
check.add_argument("--fetch", dest="fetch", action="store_true", help="Run git fetch before checking (default)")
check.add_argument("--no-fetch", dest="fetch", action="store_false", help="Skip git fetch")
check.set_defaults(fetch=True)
check.set_defaults(handler=run_check)
fetch = sub.add_parser("fetch", help="Fetch all project remotes")
fetch.set_defaults(handler=run_fetch)
summary = sub.add_parser("summary", help="Quick overview of project status")
summary.set_defaults(handler=run_summary)
parser.set_defaults(handler=lambda ctx, args: parser.print_help())
def _git(repo: str, *cmd, capture: bool = True) -> subprocess.CompletedProcess:
return subprocess.run(
["git", "-C", repo] + list(cmd),
capture_output=capture,
text=True,
)
def _is_git_repo(repo_path: str) -> bool:
git_dir = os.path.join(repo_path, ".git")
return os.path.isdir(git_dir) or os.path.isfile(git_dir)
def _check_repo(repo_path: str, do_fetch: bool = True):
name = os.path.basename(repo_path)
if not _is_git_repo(repo_path):
return name, None
issues = []
if do_fetch:
fetch_result = _git(repo_path, "fetch", "--all", "--quiet")
if fetch_result.returncode != 0:
issues.append("git fetch failed")
result = _git(repo_path, "rev-parse", "--abbrev-ref", "HEAD")
branch = result.stdout.strip() if result.returncode == 0 else "HEAD"
diff_result = _git(repo_path, "diff", "--quiet")
cached_result = _git(repo_path, "diff", "--cached", "--quiet")
if diff_result.returncode != 0 or cached_result.returncode != 0:
issues.append("uncommitted changes")
else:
untracked = _git(repo_path, "ls-files", "--others", "--exclude-standard")
if untracked.stdout.strip():
issues.append("untracked files")
upstream_check = _git(repo_path, "rev-parse", "--abbrev-ref", f"{branch}@{{u}}")
if upstream_check.returncode == 0:
unpushed = _git(repo_path, "rev-list", "--oneline", f"{branch}@{{u}}..{branch}")
if unpushed.stdout.strip():
issues.append(f"{len(unpushed.stdout.strip().splitlines())} unpushed commit(s) on {branch}")
else:
issues.append(f"no upstream for {branch}")
branches_result = _git(repo_path, "for-each-ref", "--format=%(refname:short)", "refs/heads")
for branch_name in branches_result.stdout.strip().splitlines():
if not branch_name or branch_name == branch:
continue
upstream = _git(repo_path, "rev-parse", "--abbrev-ref", f"{branch_name}@{{u}}")
if upstream.returncode == 0:
ahead = _git(repo_path, "rev-list", "--count", f"{branch_name}@{{u}}..{branch_name}")
if ahead.stdout.strip() != "0":
issues.append(f"branch {branch_name}: {ahead.stdout.strip()} ahead")
else:
issues.append(f"branch {branch_name}: no upstream")
return name, issues
def run_check(ctx: FlowContext, args):
projects_dir = os.path.expanduser(ctx.config.projects_dir)
if not os.path.isdir(projects_dir):
ctx.console.error(f"Projects directory not found: {projects_dir}")
sys.exit(1)
rows = []
needs_action = []
not_git = []
checked = 0
for entry in sorted(os.listdir(projects_dir)):
repo_path = os.path.join(projects_dir, entry)
if not os.path.isdir(repo_path):
continue
name, issues = _check_repo(repo_path, do_fetch=args.fetch)
if issues is None:
not_git.append(name)
continue
checked += 1
rows.append([name, "; ".join(issues) if issues else "clean and synced"])
if issues:
needs_action.append(name)
if checked == 0:
ctx.console.info("No git repositories found in projects directory.")
if not_git:
ctx.console.info(f"Skipped non-git directories: {', '.join(sorted(not_git))}")
return
ctx.console.table(["PROJECT", "STATUS"], rows)
if needs_action:
ctx.console.warn(f"Projects needing action: {', '.join(sorted(needs_action))}")
else:
ctx.console.success("All repositories clean and synced.")
if not_git:
ctx.console.info(f"Skipped non-git directories: {', '.join(sorted(not_git))}")
def run_fetch(ctx: FlowContext, args):
projects_dir = os.path.expanduser(ctx.config.projects_dir)
if not os.path.isdir(projects_dir):
ctx.console.error(f"Projects directory not found: {projects_dir}")
sys.exit(1)
had_error = False
fetched = 0
for entry in sorted(os.listdir(projects_dir)):
repo_path = os.path.join(projects_dir, entry)
if not _is_git_repo(repo_path):
continue
ctx.console.info(f"Fetching {entry}...")
result = _git(repo_path, "fetch", "--all", "--quiet")
fetched += 1
if result.returncode != 0:
had_error = True
ctx.console.error(f"Failed to fetch {entry}")
if fetched == 0:
ctx.console.info("No git repositories found in projects directory.")
return
if had_error:
sys.exit(1)
ctx.console.success("All remotes fetched.")
def run_summary(ctx: FlowContext, args):
projects_dir = os.path.expanduser(ctx.config.projects_dir)
if not os.path.isdir(projects_dir):
ctx.console.error(f"Projects directory not found: {projects_dir}")
sys.exit(1)
rows = []
for entry in sorted(os.listdir(projects_dir)):
repo_path = os.path.join(projects_dir, entry)
if not os.path.isdir(repo_path):
continue
name, issues = _check_repo(repo_path, do_fetch=False)
if issues is None:
rows.append([name, "not a git repo"])
elif issues:
rows.append([name, "; ".join(issues)])
else:
rows.append([name, "clean"])
if not rows:
ctx.console.info("No projects found.")
return
ctx.console.table(["PROJECT", "STATUS"], rows)

View File

@@ -1,49 +1,32 @@
"""Tests for CLI routing and command registration.""" """Tests for CLI."""
import os
import subprocess import subprocess
import sys import sys
from unittest.mock import patch
import pytest
def _clean_env(): def test_version_flag():
"""Return env dict without DF_* variables that trigger enter's guard.""" """Test --version flag works."""
env = {k: v for k, v in os.environ.items() if not k.startswith("DF_")}
env["FLOW_SKIP_SUDO_REFRESH"] = "1"
return env
def test_version():
result = subprocess.run( result = subprocess.run(
[sys.executable, "-m", "flow", "--version"], [sys.executable, "-m", "flow", "--version"],
capture_output=True, text=True, capture_output=True, text=True,
) )
assert result.returncode == 0 assert result.returncode == 0
assert "0.1.0" in result.stdout assert "flow" in result.stdout
def test_help(): def test_help_flag():
"""Test --help shows commands."""
result = subprocess.run( result = subprocess.run(
[sys.executable, "-m", "flow", "--help"], [sys.executable, "-m", "flow", "--help"],
capture_output=True, text=True, capture_output=True, text=True,
) )
assert result.returncode == 0 assert result.returncode == 0
assert "enter" in result.stdout
assert "dev" in result.stdout
assert "dotfiles" in result.stdout assert "dotfiles" in result.stdout
assert "bootstrap" in result.stdout assert "packages" in result.stdout
assert "package" in result.stdout assert "setup" in result.stdout
assert "sync" in result.stdout
assert "completion" in result.stdout
def test_enter_help():
result = subprocess.run(
[sys.executable, "-m", "flow", "enter", "--help"],
capture_output=True, text=True,
)
assert result.returncode == 0
assert "target" in result.stdout
assert "--dry-run" in result.stdout
def test_dotfiles_help(): def test_dotfiles_help():
@@ -52,136 +35,14 @@ def test_dotfiles_help():
capture_output=True, text=True, capture_output=True, text=True,
) )
assert result.returncode == 0 assert result.returncode == 0
assert "init" in result.stdout
assert "link" in result.stdout assert "link" in result.stdout
assert "unlink" in result.stdout assert "unlink" in result.stdout
assert "undo" in result.stdout
assert "status" in result.stdout
assert "sync" in result.stdout
assert "repo" in result.stdout
def test_dotfiles_help_without_sudo_in_path(): def test_packages_help():
env = _clean_env()
env["PATH"] = os.path.dirname(sys.executable)
result = subprocess.run( result = subprocess.run(
[sys.executable, "-m", "flow", "dotfiles", "--help"], [sys.executable, "-m", "flow", "packages", "--help"],
capture_output=True,
text=True,
env=env,
)
assert result.returncode == 0
assert "dotfiles" in result.stdout
def test_bootstrap_help():
result = subprocess.run(
[sys.executable, "-m", "flow", "bootstrap", "--help"],
capture_output=True, text=True,
)
assert result.returncode == 0
assert "run" in result.stdout
assert "list" in result.stdout
assert "show" in result.stdout
assert "packages" in result.stdout
def test_package_help():
result = subprocess.run(
[sys.executable, "-m", "flow", "package", "--help"],
capture_output=True, text=True, capture_output=True, text=True,
) )
assert result.returncode == 0 assert result.returncode == 0
assert "install" in result.stdout assert "install" in result.stdout
assert "list" in result.stdout
assert "remove" in result.stdout
def test_sync_help():
result = subprocess.run(
[sys.executable, "-m", "flow", "sync", "--help"],
capture_output=True, text=True,
)
assert result.returncode == 0
assert "check" in result.stdout
assert "fetch" in result.stdout
assert "summary" in result.stdout
def test_dev_help():
result = subprocess.run(
[sys.executable, "-m", "flow", "dev", "--help"],
capture_output=True, text=True,
)
assert result.returncode == 0
assert "create" in result.stdout
assert "exec" in result.stdout
assert "connect" in result.stdout
assert "list" in result.stdout
assert "stop" in result.stdout
assert "remove" in result.stdout
assert "respawn" in result.stdout
def test_enter_dry_run():
result = subprocess.run(
[sys.executable, "-m", "flow", "enter", "--dry-run", "personal@orb"],
capture_output=True, text=True, env=_clean_env(),
)
assert result.returncode == 0
assert "ssh" in result.stdout
assert "personal.orb" in result.stdout
assert "tmux" in result.stdout
def test_enter_dry_run_no_tmux():
result = subprocess.run(
[sys.executable, "-m", "flow", "enter", "--dry-run", "--no-tmux", "personal@orb"],
capture_output=True, text=True, env=_clean_env(),
)
assert result.returncode == 0
assert "ssh" in result.stdout
assert "tmux" not in result.stdout
def test_enter_dry_run_with_user():
result = subprocess.run(
[sys.executable, "-m", "flow", "enter", "--dry-run", "root@personal@orb"],
capture_output=True, text=True, env=_clean_env(),
)
assert result.returncode == 0
assert "root@personal.orb" in result.stdout
def test_enter_dry_run_shows_terminfo_hint_for_ghostty():
env = _clean_env()
env["TERM"] = "xterm-ghostty"
result = subprocess.run(
[sys.executable, "-m", "flow", "enter", "--dry-run", "personal@orb"],
capture_output=True, text=True, env=env,
)
assert result.returncode == 0
assert "flow will not install or modify terminfo" in result.stdout
assert "infocmp -x xterm-ghostty | ssh" in result.stdout
def test_aliases():
"""Test that command aliases work."""
for alias, cmd in [("dot", "dotfiles"), ("pkg", "package"), ("setup", "bootstrap")]:
result = subprocess.run(
[sys.executable, "-m", "flow", alias, "--help"],
capture_output=True, text=True,
)
assert result.returncode == 0, f"Alias '{alias}' failed"
def test_dev_remove_alias():
result = subprocess.run(
[sys.executable, "-m", "flow", "dev", "rm", "--help"],
capture_output=True, text=True,
)
assert result.returncode == 0

View File

@@ -1,130 +1,43 @@
"""Tests for flow.commands.completion dynamic suggestions.""" """Tests for zsh completion."""
from flow.commands import completion from flow.commands.completion import complete
def test_complete_top_level():
result = complete(["flow", ""], 1)
assert "dotfiles" in result
assert "packages" in result
assert "setup" in result
assert "remote" in result
assert "dev" in result
assert "projects" in result
def test_complete_top_level_prefix(): def test_complete_top_level_prefix():
out = completion.complete(["flow", "do"], 2) result = complete(["flow", "do"], 1)
assert "dotfiles" in out assert result == ["dotfiles"]
assert "dot" in out
def test_complete_bootstrap_profiles(monkeypatch): def test_complete_dotfiles_subcommands():
monkeypatch.setattr(completion, "_list_bootstrap_profiles", lambda: ["linux-vm", "macos-host"]) result = complete(["flow", "dotfiles", ""], 2)
out = completion.complete(["flow", "bootstrap", "show", "li"], 4) assert "link" in result
assert out == ["linux-vm"] assert "unlink" in result
assert "status" in result
def test_complete_bootstrap_packages_options(monkeypatch): def test_complete_dotfiles_link_flags():
monkeypatch.setattr(completion, "_list_bootstrap_profiles", lambda: ["linux-vm", "macos-host"]) result = complete(["flow", "dotfiles", "link", "--"], 3)
out = completion.complete(["flow", "bootstrap", "packages", "--p"], 4) assert "--profile" in result
assert out == ["--profile"] assert "--dry-run" in result
out = completion.complete(["flow", "bootstrap", "packages", "--profile", "m"], 5)
assert out == ["macos-host"]
def test_complete_package_install(monkeypatch): def test_complete_unknown_command():
monkeypatch.setattr(completion, "_list_manifest_packages", lambda: ["neovim", "fzf"]) result = complete(["flow", "unknown", ""], 2)
out = completion.complete(["flow", "package", "install", "n"], 4) assert result == []
assert out == ["neovim"]
def test_complete_package_remove(monkeypatch): def test_complete_packages_subcommands():
monkeypatch.setattr(completion, "_list_installed_packages", lambda: ["hello", "jq"]) result = complete(["flow", "packages", ""], 2)
out = completion.complete(["flow", "package", "remove", "h"], 4) assert "install" in result
assert out == ["hello"] assert "remove" in result
assert "list" in result
def test_list_manifest_packages_is_consistent_for_list_and_dict_forms(monkeypatch):
manifests = [
{
"packages": [
{"name": "neovim", "type": "binary"},
{"name": "ripgrep", "type": "pkg"},
{"name": "fzf", "type": "binary"},
]
},
{
"packages": {
"neovim": {"type": "binary"},
"ripgrep": {"type": "pkg"},
"fzf": {"type": "binary"},
}
},
]
monkeypatch.setattr(completion, "_safe_manifest", lambda: manifests.pop(0))
from_list = completion._list_manifest_packages()
from_dict = completion._list_manifest_packages()
assert from_list == ["fzf", "neovim"]
assert from_dict == ["fzf", "neovim"]
def test_list_manifest_packages_uses_mapping_key_when_name_missing(monkeypatch):
monkeypatch.setattr(
completion,
"_safe_manifest",
lambda: {"packages": {"bat": {"type": "binary"}, "git": {"type": "pkg"}}},
)
assert completion._list_manifest_packages() == ["bat"]
def test_complete_dotfiles_profile_value(monkeypatch):
monkeypatch.setattr(completion, "_list_dotfiles_profiles", lambda: ["work", "personal"])
out = completion.complete(["flow", "dotfiles", "link", "--profile", "w"], 5)
assert out == ["work"]
def test_complete_dotfiles_repo_subcommands():
out = completion.complete(["flow", "dotfiles", "repo", "p"], 4)
assert out == ["pull", "push"]
def test_complete_dotfiles_top_level_includes_undo():
out = completion.complete(["flow", "dotfiles", "u"], 3)
assert out == ["undo", "unlink"]
def test_complete_dotfiles_modules_subcommands():
out = completion.complete(["flow", "dotfiles", "modules", "s"], 4)
assert out == ["sync"]
def test_complete_dotfiles_modules_profile_value(monkeypatch):
monkeypatch.setattr(completion, "_list_dotfiles_profiles", lambda: ["work", "personal"])
out = completion.complete(["flow", "dotfiles", "modules", "list", "--profile", "w"], 6)
assert out == ["work"]
def test_complete_enter_targets(monkeypatch):
monkeypatch.setattr(completion, "_list_targets", lambda: ["personal@orb", "work@ec2"])
out = completion.complete(["flow", "enter", "p"], 3)
assert out == ["personal@orb"]
def test_complete_dev_subcommands():
out = completion.complete(["flow", "dev", "c"], 3)
assert out == ["connect", "create"]
def test_complete_completion_subcommands():
out = completion.complete(["flow", "completion", "i"], 3)
assert out == ["install-zsh"]
def test_rc_snippet_is_idempotent(tmp_path):
rc_path = tmp_path / ".zshrc"
completion_dir = tmp_path / "completions"
first = completion._ensure_rc_snippet(rc_path, completion_dir)
second = completion._ensure_rc_snippet(rc_path, completion_dir)
assert first is True
assert second is False
text = rc_path.read_text()
assert text.count(completion.ZSH_RC_START) == 1
assert text.count(completion.ZSH_RC_END) == 1