refactor-1

This commit is contained in:
2026-03-15 21:46:50 +02:00
parent 24d682adf1
commit c0b378c424
25 changed files with 4839 additions and 3136 deletions

View File

@@ -71,7 +71,14 @@ linux-auto/
### External module packages ### External module packages
Packages can be backed by an external git repository using `_module.yaml`: Any directory inside a package can be backed by an external git repository using `_module.yaml`:
```text
_shared/
nvim/
.config/nvim/
_module.yaml
```
```yaml ```yaml
source: github:org/nvim-config source: github:org/nvim-config
@@ -79,8 +86,9 @@ ref:
branch: main branch: main
``` ```
- If a package directory contains `_module.yaml`, flow uses the fetched module content as package source. - Flow mounts the module repo root at the directory containing `_module.yaml` (e.g. the example mounts into `~/.config/nvim/`).
- Any sibling files in that package directory are ignored (shown only in `--verbose`). - Local files under that directory are ignored (shown only in `--verbose`).
- Only one `_module.yaml` per package is supported.
- Modules are refreshed on `flow dotfiles init` and `flow dotfiles sync` (not on `link`). - Modules are refreshed on `flow dotfiles init` and `flow dotfiles sync` (not on `link`).
## Manifest model ## Manifest model

52
docs/architecture.md Normal file
View File

@@ -0,0 +1,52 @@
## Flow CLI Architecture
### Layers
`flow` now follows a stricter adapter/service/runtime split:
- `flow.cli`
- global startup, platform/config loading, top-level error handling
- `flow.commands.*`
- argparse registration and compatibility wrappers only
- `flow.services.*`
- domain behavior for SSH entry, containers, dotfiles, bootstrap, packages, and project sync
- `flow.core.system`
- shared process, git, filesystem, and JSON state primitives
- `flow.core.*`
- config loading, platform detection, console output, variables
### Runtime Safety
Mutating operations are centralized behind `flow.core.system`:
- `CommandRunner`
- subprocess execution and shell streaming
- `GitClient`
- repository-scoped git execution
- `FileSystem`
- directory creation, copy, symlink, removal, JSON/text writes
- `JsonStateStore`
- state persistence with explicit paths
This keeps command handlers out of the business of directly creating, deleting, or overwriting filesystem state.
### Domain Boundaries
- `services.ssh`
- parses enter targets, resolves host templates, builds the SSH handoff
- `services.containers`
- owns container create/exec/connect/list/stop/remove/respawn logic
- `services.projects`
- owns git status/fetch/summary logic for project directories
- `services.package_defs`
- normalizes manifest package definitions and binary package install logic
- `services.packages`
- package state listing/install/remove behavior
- `services.bootstrap`
- provisioning orchestration, package-manager resolution, hooks, shell/locale/hostname setup
- `services.dotfiles`
- repo sync, module discovery, link planning, transactional undo, status, edit flow
### Compatibility Strategy
The current CLI surface is preserved. The command modules still expose a small set of legacy helper symbols because the existing tests use them directly, but the behavioral implementation now lives in the service layer.

86
docs/flows.md Normal file
View File

@@ -0,0 +1,86 @@
## Feature Inventory
### Core Features
- `enter`
- SSH into a named environment target with optional tmux auto-attach
- `dev`
- create, exec into, attach to, list, stop, remove, and respawn development containers
- `dotfiles`
- clone the dotfiles repo, link/unlink/relink configs, undo link transactions, inspect status, sync modules, clean broken links, edit packages, and interact with repo state
- `bootstrap`
- run machine bootstrap profiles with packages, env validation, hostname/locale/shell setup, ssh-keygen, `runcmd`, config linking, and post-link hooks
- `package`
- install/list/remove binary packages defined in the manifest
- `sync`
- inspect git project health, fetch remotes, and summarize project state
- `completion`
- dynamic zsh completion generation and installation
### Supported Flows
#### Access a host
1. Resolve `[user@]namespace@platform`
2. Expand platform host template or configured target override
3. Optionally warn about missing remote terminfo
4. Open SSH, optionally into tmux
#### Start a dev container
1. Resolve runtime (`docker` or `podman`)
2. Normalize image shorthand
3. Apply labels and common host mounts
4. Start the container
5. `flow dev connect` attaches through tmux or falls back to direct exec
#### Manage dotfiles
1. Clone dotfiles repo
2. Optionally sync external module repos
3. Resolve shared + profile packages
4. Validate target conflicts
5. Snapshot replaced targets
6. Apply links transactionally
7. Undo from persisted transaction state if needed
#### Bootstrap a machine
1. Load and validate a profile
2. Detect or select the package manager
3. Check required environment variables
4. Apply hostname/locale/shell prerequisites
5. Install profile packages
6. Run package hooks
7. Generate SSH keys
8. Run `runcmd`
9. Link dotfiles for the profile
10. Run post-link hooks
### Command Surface Review
### Keep
- `enter`
- `dev`
- `dotfiles`
- `bootstrap`
- `package`
- `sync`
- `completion`
### Keep But Treat As Convenience Aliases
- `dotfiles sync`
- effectively `repo pull` + `modules sync`
- `dotfiles relink`
- effectively `unlink` + `link`
- `sync summary`
- effectively `sync check --no-fetch`
### Commands That Need Follow-Up Product Decisions
- `package remove`
- today it forgets install state but does not uninstall files; either rename it to `forget` or implement real uninstall semantics
- `dotfiles edit`
- current auto-commit/push behavior is powerful but risky; it may deserve an explicit confirm-or-dry-run mode before wider use

File diff suppressed because it is too large Load Diff

View File

@@ -3,350 +3,119 @@
import os import os
import shutil import shutil
import subprocess import subprocess
import sys
from flow.core.config import FlowContext from flow.services.containers import (
CONTAINER_HOME,
DEFAULT_REGISTRY = "registry.tomastm.com" DEFAULT_REGISTRY,
DEFAULT_TAG = "latest" DEFAULT_TAG,
CONTAINER_HOME = "/home/dev" ContainerService,
container_name as _cname,
parse_image_ref as _parse_image_ref,
runtime as _runtime_service,
)
def register(subparsers): def register(subparsers):
p = subparsers.add_parser("dev", help="Manage development containers") parser = subparsers.add_parser("dev", help="Manage development containers")
sub = p.add_subparsers(dest="dev_command") sub = parser.add_subparsers(dest="dev_command")
# create
create = sub.add_parser("create", help="Create and start a development container") create = sub.add_parser("create", help="Create and start a development container")
create.add_argument("name", help="Container name") create.add_argument("name", help="Container name")
create.add_argument("-i", "--image", required=True, help="Container image") create.add_argument("-i", "--image", required=True, help="Container image")
create.add_argument("-p", "--project", help="Path to project directory") create.add_argument("-p", "--project", help="Path to project directory")
create.set_defaults(handler=run_create) create.set_defaults(handler=run_create)
# exec
exec_cmd = sub.add_parser("exec", help="Execute command in a container") exec_cmd = sub.add_parser("exec", help="Execute command in a container")
exec_cmd.add_argument("name", help="Container name") exec_cmd.add_argument("name", help="Container name")
exec_cmd.add_argument("cmd", nargs="*", help="Command to run (default: interactive shell)") exec_cmd.add_argument("cmd", nargs="*", help="Command to run (default: interactive shell)")
exec_cmd.set_defaults(handler=run_exec) exec_cmd.set_defaults(handler=run_exec)
# connect
connect = sub.add_parser("connect", help="Attach to container tmux session") connect = sub.add_parser("connect", help="Attach to container tmux session")
connect.add_argument("name", help="Container name") connect.add_argument("name", help="Container name")
connect.set_defaults(handler=run_connect) connect.set_defaults(handler=run_connect)
# list list_parser = sub.add_parser("list", help="List development containers")
ls = sub.add_parser("list", help="List development containers") list_parser.set_defaults(handler=run_list)
ls.set_defaults(handler=run_list)
# stop
stop = sub.add_parser("stop", help="Stop a development container") stop = sub.add_parser("stop", help="Stop a development container")
stop.add_argument("name", help="Container name") stop.add_argument("name", help="Container name")
stop.add_argument("--kill", action="store_true", help="Kill instead of graceful stop") stop.add_argument("--kill", action="store_true", help="Kill instead of graceful stop")
stop.set_defaults(handler=run_stop) stop.set_defaults(handler=run_stop)
# remove
remove = sub.add_parser("remove", aliases=["rm"], help="Remove a development container") remove = sub.add_parser("remove", aliases=["rm"], help="Remove a development container")
remove.add_argument("name", help="Container name") remove.add_argument("name", help="Container name")
remove.add_argument("-f", "--force", action="store_true", help="Force removal") remove.add_argument("-f", "--force", action="store_true", help="Force removal")
remove.set_defaults(handler=run_remove) remove.set_defaults(handler=run_remove)
# respawn
respawn = sub.add_parser("respawn", help="Respawn all tmux panes for a session") respawn = sub.add_parser("respawn", help="Respawn all tmux panes for a session")
respawn.add_argument("name", help="Session/container name") respawn.add_argument("name", help="Session/container name")
respawn.set_defaults(handler=run_respawn) respawn.set_defaults(handler=run_respawn)
p.set_defaults(handler=lambda ctx, args: p.print_help()) parser.set_defaults(handler=lambda ctx, args: parser.print_help())
def _runtime(): def _runtime():
for rt in ("docker", "podman"): return _runtime_service()
if shutil.which(rt):
return rt
raise RuntimeError("No container runtime found (docker or podman)")
def _cname(name: str) -> str:
"""Normalize to dev- prefix."""
return name if name.startswith("dev-") else f"dev-{name}"
def _parse_image_ref(
image: str,
*,
default_registry: str = DEFAULT_REGISTRY,
default_tag: str = DEFAULT_TAG,
):
"""Parse image shorthand into (full_ref, repo, tag, label)."""
registry = default_registry
tag = default_tag
if image.startswith("docker/"):
registry = "docker.io"
image = f"library/{image.split('/', 1)[1]}"
elif image.startswith("tm0/"):
registry = default_registry
image = image.split("/", 1)[1]
elif "/" in image:
prefix, remainder = image.split("/", 1)
if "." in prefix or ":" in prefix or prefix == "localhost":
registry = prefix
image = remainder
if ":" in image.split("/")[-1]:
tag = image.rsplit(":", 1)[1]
image = image.rsplit(":", 1)[0]
repo = image
full_ref = f"{registry}/{repo}:{tag}"
label_prefix = registry.rsplit(".", 1)[0].rsplit(".", 1)[-1] if "." in registry else registry
label = f"{label_prefix}/{repo.split('/')[-1]}"
return full_ref, repo, tag, label
def _container_exists(rt: str, cname: str) -> bool: def _container_exists(rt: str, cname: str) -> bool:
result = subprocess.run( result = subprocess.run(
[rt, "container", "ls", "-a", "--format", "{{.Names}}"], [rt, "container", "ls", "-a", "--format", "{{.Names}}"],
capture_output=True, text=True, capture_output=True,
text=True,
check=False,
) )
return cname in result.stdout.strip().split("\n") return cname in result.stdout.strip().splitlines()
def _container_running(rt: str, cname: str) -> bool: def _container_running(rt: str, cname: str) -> bool:
result = subprocess.run( result = subprocess.run(
[rt, "container", "ls", "--format", "{{.Names}}"], [rt, "container", "ls", "--format", "{{.Names}}"],
capture_output=True, text=True, capture_output=True,
text=True,
check=False,
) )
return cname in result.stdout.strip().split("\n") return cname in result.stdout.strip().splitlines()
def run_create(ctx: FlowContext, args): def run_create(ctx, args):
rt = _runtime() ContainerService(ctx).run_create(args)
cname = _cname(args.name)
if _container_exists(rt, cname):
ctx.console.error(f"Container already exists: {cname}")
sys.exit(1)
project_path = os.path.realpath(args.project) if args.project else None
if project_path and not os.path.isdir(project_path):
ctx.console.error(f"Invalid project path: {project_path}")
sys.exit(1)
full_ref, _, _, _ = _parse_image_ref(
args.image,
default_registry=ctx.config.container_registry,
default_tag=ctx.config.container_tag,
)
cmd = [
rt, "run", "-d",
"--name", cname,
"--label", "dev=true",
"--label", f"dev.name={args.name}",
"--label", f"dev.image_ref={full_ref}",
"--network", "host",
"--init",
]
if project_path:
cmd.extend(["-v", f"{project_path}:/workspace"])
cmd.extend(["--label", f"dev.project_path={project_path}"])
docker_sock = "/var/run/docker.sock"
if os.path.exists(docker_sock):
cmd.extend(["-v", f"{docker_sock}:{docker_sock}"])
home = os.path.expanduser("~")
if os.path.isdir(f"{home}/.ssh"):
cmd.extend(["-v", f"{home}/.ssh:{CONTAINER_HOME}/.ssh:ro"])
if os.path.isfile(f"{home}/.npmrc"):
cmd.extend(["-v", f"{home}/.npmrc:{CONTAINER_HOME}/.npmrc:ro"])
if os.path.isdir(f"{home}/.npm"):
cmd.extend(["-v", f"{home}/.npm:{CONTAINER_HOME}/.npm"])
# Add docker group if available
try:
import grp
docker_gid = str(grp.getgrnam("docker").gr_gid)
cmd.extend(["--group-add", docker_gid])
except (KeyError, ImportError):
pass
cmd.extend([full_ref, "sleep", "infinity"])
subprocess.run(cmd, check=True)
ctx.console.success(f"Created and started container: {cname}")
def run_exec(ctx: FlowContext, args): def run_exec(ctx, args):
rt = _runtime() ContainerService(ctx).run_exec(args)
cname = _cname(args.name)
if not _container_running(rt, cname):
ctx.console.error(f"Container {cname} not running")
sys.exit(1)
if args.cmd:
exec_cmd = [rt, "exec"]
if sys.stdin.isatty():
exec_cmd.extend(["-it"])
exec_cmd.append(cname)
exec_cmd.extend(args.cmd)
result = subprocess.run(exec_cmd)
sys.exit(result.returncode)
# No command — try shells in order; 126/127 means the shell binary
# wasn't found, so we fall through. Any other exit code means the user
# exited the shell normally and we respect it.
for shell in ("zsh -l", "bash -l", "sh"):
parts = shell.split()
exec_cmd = [rt, "exec", "--detach-keys", "ctrl-q,ctrl-p", "-it", cname] + parts
result = subprocess.run(exec_cmd)
if result.returncode not in (126, 127):
sys.exit(result.returncode)
ctx.console.error(f"Unable to start an interactive shell in {cname}")
sys.exit(1)
def run_connect(ctx: FlowContext, args): def run_connect(ctx, args):
rt = _runtime() ContainerService(ctx).run_connect(args)
cname = _cname(args.name)
if not _container_exists(rt, cname):
ctx.console.error(f"Container does not exist: {cname}")
sys.exit(1)
if not _container_running(rt, cname):
subprocess.run([rt, "start", cname], capture_output=True)
if not shutil.which("tmux"):
ctx.console.warn("tmux not found; falling back to direct exec")
args.cmd = []
run_exec(ctx, args)
return
# Get image label for env
result = subprocess.run(
[rt, "container", "inspect", cname, "--format", "{{ .Config.Image }}"],
capture_output=True, text=True,
)
image_ref = result.stdout.strip()
_, _, _, image_label = _parse_image_ref(image_ref)
# Create tmux session if needed
check = subprocess.run(["tmux", "has-session", "-t", cname], capture_output=True)
if check.returncode != 0:
ns = os.environ.get("DF_NAMESPACE", "")
plat = os.environ.get("DF_PLATFORM", "")
subprocess.run([
"tmux", "new-session", "-ds", cname,
"-e", f"DF_IMAGE={image_label}",
"-e", f"DF_NAMESPACE={ns}",
"-e", f"DF_PLATFORM={plat}",
f"flow dev exec {args.name}",
])
subprocess.run([
"tmux", "set-option", "-t", cname,
"default-command", f"flow dev exec {args.name}",
])
if os.environ.get("TMUX"):
os.execvp("tmux", ["tmux", "switch-client", "-t", cname])
else:
os.execvp("tmux", ["tmux", "attach", "-t", cname])
def run_list(ctx: FlowContext, args): def run_list(ctx, args):
rt = _runtime() ContainerService(ctx).run_list(args)
result = subprocess.run(
[rt, "ps", "-a", "--filter", "label=dev=true",
"--format", '{{.Label "dev.name"}}|{{.Image}}|{{.Label "dev.project_path"}}|{{.Status}}'],
capture_output=True, text=True,
)
headers = ["NAME", "IMAGE", "PROJECT", "STATUS"]
rows = []
for line in result.stdout.strip().split("\n"):
if not line:
continue
parts = line.split("|")
if len(parts) >= 4:
name, image, project, status = parts[0], parts[1], parts[2], parts[3]
# Shorten paths
home = os.path.expanduser("~")
if project.startswith(home):
project = "~" + project[len(home):]
rows.append([name, image, project, status])
if not rows:
ctx.console.info("No development containers found.")
return
ctx.console.table(headers, rows)
def run_stop(ctx: FlowContext, args): def run_stop(ctx, args):
rt = _runtime() ContainerService(ctx).run_stop(args)
cname = _cname(args.name)
if not _container_exists(rt, cname):
ctx.console.error(f"Container {cname} does not exist")
sys.exit(1)
if args.kill:
ctx.console.info(f"Killing container {cname}...")
subprocess.run([rt, "kill", cname], check=True)
else:
ctx.console.info(f"Stopping container {cname}...")
subprocess.run([rt, "stop", cname], check=True)
_tmux_fallback(cname)
def run_remove(ctx: FlowContext, args): def run_remove(ctx, args):
rt = _runtime() ContainerService(ctx).run_remove(args)
cname = _cname(args.name)
if not _container_exists(rt, cname):
ctx.console.error(f"Container {cname} does not exist")
sys.exit(1)
if args.force:
ctx.console.info(f"Removing container {cname} (force)...")
subprocess.run([rt, "rm", "-f", cname], check=True)
else:
ctx.console.info(f"Removing container {cname}...")
subprocess.run([rt, "rm", cname], check=True)
_tmux_fallback(cname)
def run_respawn(ctx: FlowContext, args): def run_respawn(ctx, args):
if not shutil.which("tmux"): ContainerService(ctx).run_respawn(args)
ctx.console.error("tmux is required for respawn but was not found")
sys.exit(1)
cname = _cname(args.name)
result = subprocess.run(
["tmux", "list-panes", "-t", cname, "-s",
"-F", "#{session_name}:#{window_index}.#{pane_index}"],
capture_output=True, text=True,
)
for pane in result.stdout.strip().split("\n"):
if pane:
ctx.console.info(f"Respawning {pane}...")
subprocess.run(["tmux", "respawn-pane", "-t", pane])
def _tmux_fallback(cname: str): def _tmux_fallback(cname: str):
"""If inside tmux in the target session, switch to default."""
if not os.environ.get("TMUX"): if not os.environ.get("TMUX"):
return return
result = subprocess.run( result = subprocess.run(
["tmux", "display-message", "-p", "#S"], ["tmux", "display-message", "-p", "#S"],
capture_output=True, text=True, capture_output=True,
text=True,
check=False,
) )
current = result.stdout.strip() current = result.stdout.strip()
if current == cname: if current == cname:
subprocess.run(["tmux", "new-session", "-ds", "default"], capture_output=True) subprocess.run(["tmux", "new-session", "-ds", "default"], capture_output=True, check=False)
subprocess.run(["tmux", "switch-client", "-t", "default"]) subprocess.run(["tmux", "switch-client", "-t", "default"], check=False)

File diff suppressed because it is too large Load Diff

View File

@@ -1,176 +1,30 @@
"""flow enter — connect to a development instance via SSH.""" """flow enter — connect to a development instance via SSH."""
import getpass from flow.services.ssh import (
import os HOST_TEMPLATES,
import sys EnterService,
from typing import Optional build_destination as _build_destination,
handle_terminfo_warning as _handle_terminfo_warning,
from flow.core.config import FlowContext parse_target as _parse_target_model,
terminfo_fix_command as _terminfo_fix_command,
# Default host templates per platform )
HOST_TEMPLATES = {
"orb": "<namespace>.orb",
"utm": "<namespace>.utm.local",
"core": "<namespace>.core.lan",
}
def register(subparsers): def register(subparsers):
p = subparsers.add_parser("enter", help="Connect to a development instance via SSH") parser = subparsers.add_parser("enter", help="Connect to a development instance via SSH")
p.add_argument("target", help="Target: [user@]namespace@platform") parser.add_argument("target", help="Target: [user@]namespace@platform")
p.add_argument("-u", "--user", help="SSH user (overrides target)") parser.add_argument("-u", "--user", help="SSH user (overrides target)")
p.add_argument("-n", "--namespace", help="Namespace (overrides target)") parser.add_argument("-n", "--namespace", help="Namespace (overrides target)")
p.add_argument("-p", "--platform", help="Platform (overrides target)") parser.add_argument("-p", "--platform", help="Platform (overrides target)")
p.add_argument("-s", "--session", default="default", help="Tmux session name (default: 'default')") parser.add_argument("-s", "--session", default="default", help="Tmux session name (default: 'default')")
p.add_argument("--no-tmux", action="store_true", help="Skip tmux attachment") parser.add_argument("--no-tmux", action="store_true", help="Skip tmux attachment")
p.add_argument("-d", "--dry-run", action="store_true", help="Show command without executing") parser.add_argument("-d", "--dry-run", action="store_true", help="Show command without executing")
p.set_defaults(handler=run) parser.set_defaults(handler=run)
def _parse_target(target: str): def _parse_target(target: str):
"""Parse [user@]namespace@platform into (user, namespace, platform).""" return _parse_target_model(target)
user = None
namespace = None
platform = None
if "@" in target:
platform = target.rsplit("@", 1)[1]
rest = target.rsplit("@", 1)[0]
else:
rest = target
if "@" in rest:
user = rest.rsplit("@", 1)[0]
namespace = rest.rsplit("@", 1)[1]
else:
namespace = rest
return user, namespace, platform
def _build_destination(user: str, host: str, preserve_host_user: bool = False) -> str: def run(ctx, args):
if "@" in host: EnterService(ctx).run(args)
host_user, host_name = host.rsplit("@", 1)
effective_user = host_user if preserve_host_user else (user or host_user)
return f"{effective_user}@{host_name}"
if not user:
return host
return f"{user}@{host}"
def _terminfo_fix_command(term: Optional[str], destination: str) -> Optional[str]:
normalized_term = (term or "").strip().lower()
if normalized_term == "xterm-ghostty":
return f"infocmp -x xterm-ghostty | ssh {destination} -- tic -x -"
if normalized_term == "wezterm":
return (
f"ssh {destination} -- sh -lc "
"'tempfile=$(mktemp) && curl -fsSL -o \"$tempfile\" "
"https://raw.githubusercontent.com/wezterm/wezterm/main/termwiz/data/wezterm.terminfo "
"&& tic -x -o ~/.terminfo \"$tempfile\" && rm \"$tempfile\"'"
)
return None
def _handle_terminfo_warning(ctx: FlowContext, term: Optional[str], destination: str, dry_run: bool) -> bool:
install_cmd = _terminfo_fix_command(term, destination)
if not install_cmd:
return True
ctx.console.warn(
f"Detected TERM={term}. Remote host may be missing this terminfo entry."
)
ctx.console.info("flow will not install or modify terminfo on the target automatically.")
ctx.console.info("If needed, run this command manually before reconnecting:")
print(f" {install_cmd}")
if dry_run or not sys.stdin.isatty():
return True
response = ""
try:
response = input("Continue with SSH connection? [Y/n] ").strip().lower()
except EOFError:
return True
if response in {"n", "no"}:
ctx.console.warn("Cancelled before opening SSH session")
return False
return True
def run(ctx: FlowContext, args):
# Warn if already inside an instance
if os.environ.get("DF_NAMESPACE") and os.environ.get("DF_PLATFORM"):
ns = os.environ["DF_NAMESPACE"]
plat = os.environ["DF_PLATFORM"]
ctx.console.error(
f"Not recommended inside an instance. Currently in: {ns}@{plat}"
)
sys.exit(1)
user, namespace, platform = _parse_target(args.target)
# Apply overrides
if args.user:
user = args.user
if args.namespace:
namespace = args.namespace
if args.platform:
platform = args.platform
user_was_explicit = bool(user)
if not user:
user = os.environ.get("USER") or getpass.getuser()
if not namespace:
ctx.console.error("Namespace is required in target")
sys.exit(1)
if not platform:
ctx.console.error("Platform is required in target")
sys.exit(1)
# Resolve SSH host from template or config
host_template = HOST_TEMPLATES.get(platform)
ssh_identity = None
# Check config targets for override
for tc in ctx.config.targets:
if tc.namespace == namespace and tc.platform == platform:
host_template = tc.ssh_host
ssh_identity = tc.ssh_identity
break
if not host_template:
ctx.console.error(f"Unknown platform: {platform}")
sys.exit(1)
ssh_host = host_template.replace("<namespace>", namespace)
destination = _build_destination(user, ssh_host, preserve_host_user=not user_was_explicit)
if not _handle_terminfo_warning(ctx, os.environ.get("TERM"), destination, dry_run=args.dry_run):
sys.exit(1)
# Build SSH command
ssh_cmd = ["ssh", "-tt"]
if ssh_identity:
ssh_cmd.extend(["-i", os.path.expanduser(ssh_identity)])
ssh_cmd.append(destination)
if not args.no_tmux:
ssh_cmd.extend([
"tmux", "new-session", "-As", args.session,
"-e", f"DF_NAMESPACE={namespace}",
"-e", f"DF_PLATFORM={platform}",
])
if args.dry_run:
ctx.console.info("Dry run command:")
print(" " + " ".join(ssh_cmd))
return
os.execvp("ssh", ssh_cmd)

View File

@@ -2,11 +2,9 @@
import json import json
import sys import sys
from typing import Any, Dict
from flow.commands.bootstrap import _get_package_catalog, _install_binary_package
from flow.core.config import FlowContext
from flow.core.paths import INSTALLED_STATE from flow.core.paths import INSTALLED_STATE
from flow.services.package_defs import BinaryInstaller, get_package_catalog
def register(subparsers): def register(subparsers):
@@ -50,89 +48,101 @@ def _save_installed(state: dict):
json.dump(state, handle, indent=2) json.dump(state, handle, indent=2)
def _get_definitions(ctx: FlowContext) -> Dict[str, Dict[str, Any]]: def _get_definitions(ctx):
return _get_package_catalog(ctx) return get_package_catalog(ctx)
def run_install(ctx: FlowContext, args): def _install_binary_package(ctx, spec, extra_env, dry_run):
return BinaryInstaller(ctx).install(spec, extra_env, dry_run=dry_run)
def run_install(ctx, args):
definitions = _get_definitions(ctx) definitions = _get_definitions(ctx)
installed = _load_installed() installed = _load_installed()
had_error = False had_error = False
for pkg_name in args.packages: for package_name in args.packages:
pkg_def = definitions.get(pkg_name) package_def = definitions.get(package_name)
if not pkg_def: if not package_def:
ctx.console.error(f"Package not found in manifest: {pkg_name}") ctx.console.error(f"Package not found in manifest: {package_name}")
had_error = True had_error = True
continue continue
pkg_type = pkg_def.get("type", "pkg") package_type = package_def.get("type", "pkg")
if pkg_type != "binary": if package_type != "binary":
ctx.console.error( ctx.console.error(
f"'flow package install' supports binary packages only. " f"'flow package install' supports binary packages only. "
f"'{pkg_name}' is type '{pkg_type}'." f"'{package_name}' is type '{package_type}'."
) )
had_error = True had_error = True
continue continue
ctx.console.info(f"Installing {pkg_name}...") ctx.console.info(f"Installing {package_name}...")
try: try:
_install_binary_package(ctx, pkg_def, extra_env={}, dry_run=args.dry_run) _install_binary_package(ctx, package_def, {}, args.dry_run)
except RuntimeError as e: except RuntimeError as exc:
ctx.console.error(str(e)) ctx.console.error(str(exc))
had_error = True had_error = True
continue continue
if not args.dry_run: if not args.dry_run:
installed[pkg_name] = { installed[package_name] = {
"version": str(pkg_def.get("version", "")), "version": str(package_def.get("version", "")),
"type": pkg_type, "type": package_type,
} }
ctx.console.success(f"Installed {pkg_name}") ctx.console.success(f"Installed {package_name}")
if not args.dry_run: if not args.dry_run:
_save_installed(installed) _save_installed(installed)
if had_error: if had_error:
sys.exit(1) sys.exit(1)
def run_list(ctx: FlowContext, args): def run_list(ctx, args):
definitions = _get_definitions(ctx) definitions = _get_definitions(ctx)
installed = _load_installed() installed = _load_installed()
headers = ["PACKAGE", "TYPE", "INSTALLED", "AVAILABLE"]
rows = [] rows = []
if args.all: if args.all:
if not definitions: if not definitions:
ctx.console.info("No packages defined in manifest.") ctx.console.info("No packages defined in manifest.")
return return
for name, pkg_def in sorted(definitions.items()): for name, package_def in sorted(definitions.items()):
inst_ver = installed.get(name, {}).get("version", "-") rows.append(
avail_ver = str(pkg_def.get("version", "")) or "-" [
rows.append([name, str(pkg_def.get("type", "pkg")), inst_ver, avail_ver]) name,
str(package_def.get("type", "pkg")),
str(installed.get(name, {}).get("version", "-")),
str(package_def.get("version", "")) or "-",
]
)
else: else:
if not installed: if not installed:
ctx.console.info("No packages installed.") ctx.console.info("No packages installed.")
return return
for name, info in sorted(installed.items()): for name, info in sorted(installed.items()):
avail = str(definitions.get(name, {}).get("version", "")) or "-" rows.append(
rows.append([name, str(info.get("type", "?")), str(info.get("version", "?")), avail]) [
name,
str(info.get("type", "?")),
str(info.get("version", "?")),
str(definitions.get(name, {}).get("version", "")) or "-",
]
)
ctx.console.table(headers, rows) ctx.console.table(["PACKAGE", "TYPE", "INSTALLED", "AVAILABLE"], rows)
def run_remove(ctx: FlowContext, args): def run_remove(ctx, args):
installed = _load_installed() installed = _load_installed()
for pkg_name in args.packages: for package_name in args.packages:
if pkg_name not in installed: if package_name not in installed:
ctx.console.warn(f"Package not installed: {pkg_name}") ctx.console.warn(f"Package not installed: {package_name}")
continue continue
del installed[pkg_name] del installed[package_name]
ctx.console.success(f"Removed {pkg_name} from installed packages") ctx.console.success(f"Removed {package_name} from installed packages")
ctx.console.warn( ctx.console.warn(
"Note: installed files were not automatically deleted. Remove manually if needed." "Note: installed files were not automatically deleted. Remove manually if needed."
) )

View File

@@ -8,22 +8,12 @@ from flow.core.config import FlowContext
def register(subparsers): def register(subparsers):
p = subparsers.add_parser("sync", help="Git sync tools for projects") parser = subparsers.add_parser("sync", help="Git sync tools for projects")
sub = p.add_subparsers(dest="sync_command") sub = parser.add_subparsers(dest="sync_command")
check = sub.add_parser("check", help="Check all projects status") check = sub.add_parser("check", help="Check all projects status")
check.add_argument( check.add_argument("--fetch", dest="fetch", action="store_true", help="Run git fetch before checking (default)")
"--fetch", check.add_argument("--no-fetch", dest="fetch", action="store_false", help="Skip git fetch")
dest="fetch",
action="store_true",
help="Run git fetch before checking (default)",
)
check.add_argument(
"--no-fetch",
dest="fetch",
action="store_false",
help="Skip git fetch",
)
check.set_defaults(fetch=True) check.set_defaults(fetch=True)
check.set_defaults(handler=run_check) check.set_defaults(handler=run_check)
@@ -33,13 +23,14 @@ def register(subparsers):
summary = sub.add_parser("summary", help="Quick overview of project status") summary = sub.add_parser("summary", help="Quick overview of project status")
summary.set_defaults(handler=run_summary) summary.set_defaults(handler=run_summary)
p.set_defaults(handler=lambda ctx, args: p.print_help()) parser.set_defaults(handler=lambda ctx, args: parser.print_help())
def _git(repo: str, *cmd, capture: bool = True) -> subprocess.CompletedProcess: def _git(repo: str, *cmd, capture: bool = True) -> subprocess.CompletedProcess:
return subprocess.run( return subprocess.run(
["git", "-C", repo] + list(cmd), ["git", "-C", repo] + list(cmd),
capture_output=capture, text=True, capture_output=capture,
text=True,
) )
@@ -49,23 +40,19 @@ def _is_git_repo(repo_path: str) -> bool:
def _check_repo(repo_path: str, do_fetch: bool = True): def _check_repo(repo_path: str, do_fetch: bool = True):
"""Check a single repo, return (name, issues list)."""
name = os.path.basename(repo_path) name = os.path.basename(repo_path)
if not _is_git_repo(repo_path): if not _is_git_repo(repo_path):
return name, None # Not a git repo return name, None
issues = [] issues = []
if do_fetch: if do_fetch:
fetch_result = _git(repo_path, "fetch", "--all", "--quiet") fetch_result = _git(repo_path, "fetch", "--all", "--quiet")
if fetch_result.returncode != 0: if fetch_result.returncode != 0:
issues.append("git fetch failed") issues.append("git fetch failed")
# Current branch
result = _git(repo_path, "rev-parse", "--abbrev-ref", "HEAD") result = _git(repo_path, "rev-parse", "--abbrev-ref", "HEAD")
branch = result.stdout.strip() if result.returncode == 0 else "HEAD" branch = result.stdout.strip() if result.returncode == 0 else "HEAD"
# Uncommitted changes
diff_result = _git(repo_path, "diff", "--quiet") diff_result = _git(repo_path, "diff", "--quiet")
cached_result = _git(repo_path, "diff", "--cached", "--quiet") cached_result = _git(repo_path, "diff", "--cached", "--quiet")
if diff_result.returncode != 0 or cached_result.returncode != 0: if diff_result.returncode != 0 or cached_result.returncode != 0:
@@ -75,28 +62,25 @@ def _check_repo(repo_path: str, do_fetch: bool = True):
if untracked.stdout.strip(): if untracked.stdout.strip():
issues.append("untracked files") issues.append("untracked files")
# Unpushed commits
upstream_check = _git(repo_path, "rev-parse", "--abbrev-ref", f"{branch}@{{u}}") upstream_check = _git(repo_path, "rev-parse", "--abbrev-ref", f"{branch}@{{u}}")
if upstream_check.returncode == 0: if upstream_check.returncode == 0:
unpushed = _git(repo_path, "rev-list", "--oneline", f"{branch}@{{u}}..{branch}") unpushed = _git(repo_path, "rev-list", "--oneline", f"{branch}@{{u}}..{branch}")
if unpushed.stdout.strip(): if unpushed.stdout.strip():
count = len(unpushed.stdout.strip().split("\n")) issues.append(f"{len(unpushed.stdout.strip().splitlines())} unpushed commit(s) on {branch}")
issues.append(f"{count} unpushed commit(s) on {branch}")
else: else:
issues.append(f"no upstream for {branch}") issues.append(f"no upstream for {branch}")
# Unpushed branches
branches_result = _git(repo_path, "for-each-ref", "--format=%(refname:short)", "refs/heads") branches_result = _git(repo_path, "for-each-ref", "--format=%(refname:short)", "refs/heads")
for b in branches_result.stdout.strip().split("\n"): for branch_name in branches_result.stdout.strip().splitlines():
if not b or b == branch: if not branch_name or branch_name == branch:
continue continue
up = _git(repo_path, "rev-parse", "--abbrev-ref", f"{b}@{{u}}") upstream = _git(repo_path, "rev-parse", "--abbrev-ref", f"{branch_name}@{{u}}")
if up.returncode == 0: if upstream.returncode == 0:
ahead = _git(repo_path, "rev-list", "--count", f"{b}@{{u}}..{b}") ahead = _git(repo_path, "rev-list", "--count", f"{branch_name}@{{u}}..{branch_name}")
if ahead.stdout.strip() != "0": if ahead.stdout.strip() != "0":
issues.append(f"branch {b}: {ahead.stdout.strip()} ahead") issues.append(f"branch {branch_name}: {ahead.stdout.strip()} ahead")
else: else:
issues.append(f"branch {b}: no upstream") issues.append(f"branch {branch_name}: no upstream")
return name, issues return name, issues
@@ -116,17 +100,14 @@ def run_check(ctx: FlowContext, args):
repo_path = os.path.join(projects_dir, entry) repo_path = os.path.join(projects_dir, entry)
if not os.path.isdir(repo_path): if not os.path.isdir(repo_path):
continue continue
name, issues = _check_repo(repo_path, do_fetch=args.fetch) name, issues = _check_repo(repo_path, do_fetch=args.fetch)
if issues is None: if issues is None:
not_git.append(name) not_git.append(name)
continue continue
checked += 1 checked += 1
rows.append([name, "; ".join(issues) if issues else "clean and synced"])
if issues: if issues:
needs_action.append(name) needs_action.append(name)
rows.append([name, "; ".join(issues)])
else:
rows.append([name, "clean and synced"])
if checked == 0: if checked == 0:
ctx.console.info("No git repositories found in projects directory.") ctx.console.info("No git repositories found in projects directory.")
@@ -135,12 +116,10 @@ def run_check(ctx: FlowContext, args):
return return
ctx.console.table(["PROJECT", "STATUS"], rows) ctx.console.table(["PROJECT", "STATUS"], rows)
if needs_action: if needs_action:
ctx.console.warn(f"Projects needing action: {', '.join(sorted(needs_action))}") ctx.console.warn(f"Projects needing action: {', '.join(sorted(needs_action))}")
else: else:
ctx.console.success("All repositories clean and synced.") ctx.console.success("All repositories clean and synced.")
if not_git: if not_git:
ctx.console.info(f"Skipped non-git directories: {', '.join(sorted(not_git))}") ctx.console.info(f"Skipped non-git directories: {', '.join(sorted(not_git))}")
@@ -161,16 +140,14 @@ def run_fetch(ctx: FlowContext, args):
result = _git(repo_path, "fetch", "--all", "--quiet") result = _git(repo_path, "fetch", "--all", "--quiet")
fetched += 1 fetched += 1
if result.returncode != 0: if result.returncode != 0:
ctx.console.error(f"Failed to fetch {entry}")
had_error = True had_error = True
ctx.console.error(f"Failed to fetch {entry}")
if fetched == 0: if fetched == 0:
ctx.console.info("No git repositories found in projects directory.") ctx.console.info("No git repositories found in projects directory.")
return return
if had_error: if had_error:
sys.exit(1) sys.exit(1)
ctx.console.success("All remotes fetched.") ctx.console.success("All remotes fetched.")
@@ -180,14 +157,11 @@ def run_summary(ctx: FlowContext, args):
ctx.console.error(f"Projects directory not found: {projects_dir}") ctx.console.error(f"Projects directory not found: {projects_dir}")
sys.exit(1) sys.exit(1)
headers = ["PROJECT", "STATUS"]
rows = [] rows = []
for entry in sorted(os.listdir(projects_dir)): for entry in sorted(os.listdir(projects_dir)):
repo_path = os.path.join(projects_dir, entry) repo_path = os.path.join(projects_dir, entry)
if not os.path.isdir(repo_path): if not os.path.isdir(repo_path):
continue continue
name, issues = _check_repo(repo_path, do_fetch=False) name, issues = _check_repo(repo_path, do_fetch=False)
if issues is None: if issues is None:
rows.append([name, "not a git repo"]) rows.append([name, "not a git repo"])
@@ -199,5 +173,4 @@ def run_summary(ctx: FlowContext, args):
if not rows: if not rows:
ctx.console.info("No projects found.") ctx.console.info("No projects found.")
return return
ctx.console.table(["PROJECT", "STATUS"], rows)
ctx.console.table(headers, rows)

View File

@@ -9,6 +9,7 @@ import yaml
from flow.core import paths from flow.core import paths
from flow.core.console import ConsoleLogger from flow.core.console import ConsoleLogger
from flow.core.platform import PlatformInfo from flow.core.platform import PlatformInfo
from flow.core.system import SystemRuntime
@dataclass @dataclass
@@ -316,3 +317,4 @@ class FlowContext:
manifest: Dict[str, Any] manifest: Dict[str, Any]
platform: PlatformInfo platform: PlatformInfo
console: ConsoleLogger console: ConsoleLogger
runtime: SystemRuntime = field(default_factory=SystemRuntime)

6
src/flow/core/errors.py Normal file
View File

@@ -0,0 +1,6 @@
"""Project-wide exception types."""
class FlowError(RuntimeError):
"""A user-facing operational error."""

View File

@@ -1,8 +1,9 @@
"""Command execution with streaming output.""" """Command execution helpers."""
import subprocess import subprocess
from flow.core.console import ConsoleLogger from flow.core.console import ConsoleLogger
from flow.core.system import CommandRunner
def run_command( def run_command(
@@ -14,35 +15,16 @@ def run_command(
capture: bool = False, capture: bool = False,
) -> subprocess.CompletedProcess: ) -> subprocess.CompletedProcess:
"""Run a command with real-time streamed output.""" """Run a command with real-time streamed output."""
console.step_command(command) if not shell:
raise RuntimeError("run_command only supports shell commands")
process = subprocess.Popen( runner = CommandRunner()
command, if capture:
shell=shell, result = runner.run_shell(command, capture_output=True, check=False)
stdout=subprocess.PIPE, if check and result.returncode != 0:
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1,
)
output_lines = []
assert process.stdout is not None # guaranteed by stdout=PIPE
try:
for line in process.stdout:
line = line.rstrip()
if line:
if not capture:
console.step_output(line)
output_lines.append(line)
finally:
process.stdout.close()
process.wait()
if check and process.returncode != 0:
raise RuntimeError( raise RuntimeError(
f"Command failed (exit {process.returncode}): {command}" f"Command failed (exit {result.returncode}): {command}"
) )
return result
return subprocess.CompletedProcess( return runner.stream_shell(command, console, check=check)
command, process.returncode, stdout="\n".join(output_lines), stderr=""
)

327
src/flow/core/system.py Normal file
View File

@@ -0,0 +1,327 @@
"""Runtime primitives for process, git, state, and filesystem access."""
from __future__ import annotations
import json
import os
import shlex
import shutil
import subprocess
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Iterable, Mapping, Optional, Sequence
from flow.core.console import ConsoleLogger
from flow.core.errors import FlowError
def _as_argv(argv: Sequence[str] | Iterable[str]) -> list[str]:
return [str(part) for part in argv]
class CommandRunner:
"""Small wrapper around subprocess with consistent defaults."""
def format_command(self, argv: Sequence[str] | Iterable[str]) -> str:
return " ".join(shlex.quote(part) for part in _as_argv(argv))
def require_binary(self, name: str) -> str:
path = shutil.which(name)
if path is None:
raise FlowError(f"Required executable not found: {name}")
return path
def run(
self,
argv: Sequence[str] | Iterable[str],
*,
cwd: Optional[Path] = None,
env: Optional[Mapping[str, str]] = None,
capture_output: bool = True,
check: bool = False,
timeout: Optional[int | float] = None,
) -> subprocess.CompletedProcess[str]:
completed = subprocess.run(
_as_argv(argv),
cwd=str(cwd) if cwd is not None else None,
env=dict(env) if env is not None else None,
capture_output=capture_output,
text=True,
check=False,
timeout=timeout,
)
if check and completed.returncode != 0:
message = completed.stderr.strip() or completed.stdout.strip()
if not message:
message = f"Command failed with exit code {completed.returncode}"
raise FlowError(message)
return completed
def run_shell(
self,
command: str,
*,
cwd: Optional[Path] = None,
env: Optional[Mapping[str, str]] = None,
capture_output: bool = True,
check: bool = False,
timeout: Optional[int | float] = None,
) -> subprocess.CompletedProcess[str]:
completed = subprocess.run(
command,
shell=True,
cwd=str(cwd) if cwd is not None else None,
env=dict(env) if env is not None else None,
capture_output=capture_output,
text=True,
check=False,
timeout=timeout,
)
if check and completed.returncode != 0:
message = completed.stderr.strip() or completed.stdout.strip()
if not message:
message = f"Command failed with exit code {completed.returncode}"
raise FlowError(message)
return completed
def stream_shell(
self,
command: str,
console: ConsoleLogger,
*,
check: bool = True,
) -> subprocess.CompletedProcess[str]:
console.step_command(command)
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1,
)
output_lines: list[str] = []
assert process.stdout is not None
try:
for line in process.stdout:
line = line.rstrip()
if not line:
continue
console.step_output(line)
output_lines.append(line)
finally:
process.stdout.close()
process.wait()
if check and process.returncode != 0:
raise FlowError(
f"Command failed (exit {process.returncode}): {command}"
)
return subprocess.CompletedProcess(
command,
process.returncode,
stdout="\n".join(output_lines),
stderr="",
)
class GitClient:
"""Thin git adapter that always scopes commands to a repository root."""
def __init__(self, runner: CommandRunner):
self.runner = runner
def run(
self,
repo_dir: Path,
*args: str,
capture_output: bool = True,
check: bool = False,
) -> subprocess.CompletedProcess[str]:
return self.runner.run(
["git", "-C", str(repo_dir), *args],
capture_output=capture_output,
check=check,
)
class FileSystem:
"""Filesystem wrapper for all mutating operations."""
def ensure_dir(
self,
path: Path,
*,
sudo: bool = False,
runner: Optional[CommandRunner] = None,
mode: Optional[int] = None,
) -> None:
if sudo:
if runner is None:
raise FlowError("A command runner is required for sudo operations")
runner.require_binary("sudo")
argv = ["sudo", "mkdir", "-p"]
if mode is not None:
argv.extend(["-m", f"{mode:o}"])
argv.append(str(path))
runner.run(argv, check=True)
return
path.mkdir(parents=True, exist_ok=True)
if mode is not None:
path.chmod(mode)
def remove_file(
self,
path: Path,
*,
sudo: bool = False,
runner: Optional[CommandRunner] = None,
missing_ok: bool = True,
) -> None:
if sudo:
if runner is None:
raise FlowError("A command runner is required for sudo operations")
runner.require_binary("sudo")
argv = ["sudo", "rm"]
if missing_ok:
argv.append("-f")
argv.append(str(path))
runner.run(argv, check=True)
return
try:
path.unlink()
except FileNotFoundError:
if not missing_ok:
raise
def remove_tree(self, path: Path) -> None:
shutil.rmtree(path, ignore_errors=True)
def copy_file(
self,
source: Path,
target: Path,
*,
sudo: bool = False,
runner: Optional[CommandRunner] = None,
) -> None:
if sudo:
if runner is None:
raise FlowError("A command runner is required for sudo operations")
runner.require_binary("sudo")
self.ensure_dir(target.parent, sudo=True, runner=runner)
runner.run(["sudo", "cp", "-a", str(source), str(target)], check=True)
return
self.ensure_dir(target.parent)
shutil.copy2(source, target)
def copy_tree(self, source: Path, target: Path) -> None:
self.ensure_dir(target.parent)
shutil.copytree(source, target, dirs_exist_ok=True)
def create_symlink(
self,
source: Path,
target: Path,
*,
sudo: bool = False,
runner: Optional[CommandRunner] = None,
) -> None:
if sudo:
if runner is None:
raise FlowError("A command runner is required for sudo operations")
runner.require_binary("sudo")
self.ensure_dir(target.parent, sudo=True, runner=runner)
runner.run(["sudo", "ln", "-sfn", str(source), str(target)], check=True)
return
self.ensure_dir(target.parent)
target.symlink_to(source)
def read_text(self, path: Path, *, default: Optional[str] = None) -> str:
try:
return path.read_text(encoding="utf-8")
except FileNotFoundError:
if default is None:
raise
return default
def write_text(self, path: Path, content: str) -> None:
self.ensure_dir(path.parent)
path.write_text(content, encoding="utf-8")
def write_bytes(self, path: Path, content: bytes) -> None:
self.ensure_dir(path.parent)
path.write_bytes(content)
def write_bytes(self, path: Path, content: bytes) -> None:
self.ensure_dir(path.parent)
path.write_bytes(content)
def read_json(self, path: Path, *, default: Any = None) -> Any:
try:
with open(path, "r", encoding="utf-8") as handle:
return json.load(handle)
except FileNotFoundError:
return default
def write_json(self, path: Path, data: Any) -> None:
self.ensure_dir(path.parent)
with open(path, "w", encoding="utf-8") as handle:
json.dump(data, handle, indent=2)
def same_symlink(self, target: Path, source: Path) -> bool:
if not target.is_symlink():
return False
return target.resolve(strict=False) == source.resolve(strict=False)
def is_within(self, path: Path, parent: Path) -> bool:
try:
path.resolve(strict=False).relative_to(parent.resolve(strict=False))
return True
except ValueError:
return False
def path_in_home(self, path: Path, home: Optional[Path] = None) -> bool:
root = (home or Path.home()).resolve(strict=False)
try:
path.resolve(strict=False).relative_to(root)
return True
except ValueError:
return False
@dataclass
class JsonStateStore:
"""JSON file-backed state store."""
path: Path
fs: FileSystem
default_factory: Any
def load(self) -> Any:
data = self.fs.read_json(self.path, default=None)
if data is None:
return self.default_factory()
return data
def save(self, data: Any) -> None:
self.fs.write_json(self.path, data)
@dataclass
class SystemRuntime:
"""Shared runtime dependencies carried through the command context."""
runner: CommandRunner = field(default_factory=CommandRunner)
fs: FileSystem = field(default_factory=FileSystem)
git: GitClient = field(init=False)
def __post_init__(self) -> None:
self.git = GitClient(self.runner)

View File

@@ -0,0 +1,2 @@
"""Domain services for CLI commands."""

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,321 @@
"""Container lifecycle helpers for `flow dev`."""
from __future__ import annotations
import os
import shutil
from typing import Optional
from flow.core.config import FlowContext
from flow.core.errors import FlowError
DEFAULT_REGISTRY = "registry.tomastm.com"
DEFAULT_TAG = "latest"
CONTAINER_HOME = "/home/dev"
def runtime() -> str:
for name in ("docker", "podman"):
if shutil.which(name):
return name
raise FlowError("No container runtime found (docker or podman)")
def container_name(name: str) -> str:
return name if name.startswith("dev-") else f"dev-{name}"
def parse_image_ref(
image: str,
*,
default_registry: str = DEFAULT_REGISTRY,
default_tag: str = DEFAULT_TAG,
) -> tuple[str, str, str, str]:
registry = default_registry
tag = default_tag
if image.startswith("docker/"):
registry = "docker.io"
image = f"library/{image.split('/', 1)[1]}"
elif image.startswith("tm0/"):
registry = default_registry
image = image.split("/", 1)[1]
elif "/" in image:
prefix, remainder = image.split("/", 1)
if "." in prefix or ":" in prefix or prefix == "localhost":
registry = prefix
image = remainder
if ":" in image.split("/")[-1]:
tag = image.rsplit(":", 1)[1]
image = image.rsplit(":", 1)[0]
repo = image
full_ref = f"{registry}/{repo}:{tag}"
label_prefix = (
registry.rsplit(".", 1)[0].rsplit(".", 1)[-1] if "." in registry else registry
)
label = f"{label_prefix}/{repo.split('/')[-1]}"
return full_ref, repo, tag, label
class ContainerService:
"""Own all container-runtime interactions."""
def __init__(self, ctx: FlowContext):
self.ctx = ctx
self.runner = ctx.runtime.runner
def container_exists(self, rt: str, name: str) -> bool:
result = self.runner.run(
[rt, "container", "ls", "-a", "--format", "{{.Names}}"],
capture_output=True,
)
return name in result.stdout.strip().splitlines()
def container_running(self, rt: str, name: str) -> bool:
result = self.runner.run(
[rt, "container", "ls", "--format", "{{.Names}}"],
capture_output=True,
)
return name in result.stdout.strip().splitlines()
def run_create(self, args) -> None:
rt = runtime()
cname = container_name(args.name)
if self.container_exists(rt, cname):
raise FlowError(f"Container already exists: {cname}")
project_path = os.path.realpath(args.project) if args.project else None
if project_path and not os.path.isdir(project_path):
raise FlowError(f"Invalid project path: {project_path}")
full_ref, _, _, _ = parse_image_ref(
args.image,
default_registry=self.ctx.config.container_registry,
default_tag=self.ctx.config.container_tag,
)
cmd = [
rt,
"run",
"-d",
"--name",
cname,
"--label",
"dev=true",
"--label",
f"dev.name={args.name}",
"--label",
f"dev.image_ref={full_ref}",
"--network",
"host",
"--init",
]
if project_path:
cmd.extend(["-v", f"{project_path}:/workspace"])
cmd.extend(["--label", f"dev.project_path={project_path}"])
docker_sock = "/var/run/docker.sock"
if os.path.exists(docker_sock):
cmd.extend(["-v", f"{docker_sock}:{docker_sock}"])
home = os.path.expanduser("~")
mounts = [
(f"{home}/.ssh", f"{CONTAINER_HOME}/.ssh:ro", os.path.isdir),
(f"{home}/.npmrc", f"{CONTAINER_HOME}/.npmrc:ro", os.path.isfile),
(f"{home}/.npm", f"{CONTAINER_HOME}/.npm", os.path.isdir),
]
for source, target, predicate in mounts:
if predicate(source):
cmd.extend(["-v", f"{source}:{target}"])
try:
import grp
docker_gid = str(grp.getgrnam("docker").gr_gid)
cmd.extend(["--group-add", docker_gid])
except (KeyError, ImportError):
pass
cmd.extend([full_ref, "sleep", "infinity"])
self.runner.run(cmd, capture_output=False, check=True)
self.ctx.console.success(f"Created and started container: {cname}")
def run_exec(self, args) -> None:
rt = runtime()
cname = container_name(args.name)
if not self.container_running(rt, cname):
raise FlowError(f"Container {cname} not running")
if args.cmd:
exec_cmd = [rt, "exec"]
if os.isatty(0):
exec_cmd.extend(["-it"])
exec_cmd.append(cname)
exec_cmd.extend(args.cmd)
result = self.runner.run(exec_cmd, capture_output=False)
raise SystemExit(result.returncode)
for shell in (["zsh", "-l"], ["bash", "-l"], ["sh"]):
exec_cmd = [rt, "exec", "--detach-keys", "ctrl-q,ctrl-p", "-it", cname, *shell]
result = self.runner.run(exec_cmd, capture_output=False)
if result.returncode not in (126, 127):
raise SystemExit(result.returncode)
raise FlowError(f"Unable to start an interactive shell in {cname}")
def run_connect(self, args) -> None:
rt = runtime()
cname = container_name(args.name)
if not self.container_exists(rt, cname):
raise FlowError(f"Container does not exist: {cname}")
if not self.container_running(rt, cname):
self.runner.run([rt, "start", cname], capture_output=True)
if not shutil.which("tmux"):
self.ctx.console.warn("tmux not found; falling back to direct exec")
args.cmd = []
self.run_exec(args)
return
result = self.runner.run(
[rt, "container", "inspect", cname, "--format", "{{ .Config.Image }}"]
)
image_ref = result.stdout.strip()
_, _, _, image_label = parse_image_ref(image_ref)
check = self.runner.run(["tmux", "has-session", "-t", cname], check=False)
if check.returncode != 0:
ns = os.environ.get("DF_NAMESPACE", "")
plat = os.environ.get("DF_PLATFORM", "")
self.runner.run(
[
"tmux",
"new-session",
"-ds",
cname,
"-e",
f"DF_IMAGE={image_label}",
"-e",
f"DF_NAMESPACE={ns}",
"-e",
f"DF_PLATFORM={plat}",
f"flow dev exec {args.name}",
],
capture_output=True,
)
self.runner.run(
[
"tmux",
"set-option",
"-t",
cname,
"default-command",
f"flow dev exec {args.name}",
],
capture_output=True,
)
if os.environ.get("TMUX"):
os.execvp("tmux", ["tmux", "switch-client", "-t", cname])
os.execvp("tmux", ["tmux", "attach", "-t", cname])
def run_list(self, _args) -> None:
rt = runtime()
result = self.runner.run(
[
rt,
"ps",
"-a",
"--filter",
"label=dev=true",
"--format",
'{{.Label "dev.name"}}|{{.Image}}|{{.Label "dev.project_path"}}|{{.Status}}',
]
)
rows = []
for line in result.stdout.strip().splitlines():
if not line:
continue
name, image, project, status = (line.split("|") + ["", "", "", ""])[:4]
home = os.path.expanduser("~")
if project.startswith(home):
project = "~" + project[len(home) :]
rows.append([name, image, project, status])
if not rows:
self.ctx.console.info("No development containers found.")
return
self.ctx.console.table(["NAME", "IMAGE", "PROJECT", "STATUS"], rows)
def run_stop(self, args) -> None:
rt = runtime()
cname = container_name(args.name)
if not self.container_exists(rt, cname):
raise FlowError(f"Container {cname} does not exist")
if args.kill:
self.ctx.console.info(f"Killing container {cname}...")
self.runner.run([rt, "kill", cname], capture_output=False, check=True)
else:
self.ctx.console.info(f"Stopping container {cname}...")
self.runner.run([rt, "stop", cname], capture_output=False, check=True)
self._tmux_fallback(cname)
def run_remove(self, args) -> None:
rt = runtime()
cname = container_name(args.name)
if not self.container_exists(rt, cname):
raise FlowError(f"Container {cname} does not exist")
if args.force:
self.ctx.console.info(f"Removing container {cname} (force)...")
self.runner.run([rt, "rm", "-f", cname], capture_output=False, check=True)
else:
self.ctx.console.info(f"Removing container {cname}...")
self.runner.run([rt, "rm", cname], capture_output=False, check=True)
self._tmux_fallback(cname)
def run_respawn(self, args) -> None:
if not shutil.which("tmux"):
raise FlowError("tmux is required for respawn but was not found")
cname = container_name(args.name)
result = self.runner.run(
[
"tmux",
"list-panes",
"-t",
cname,
"-s",
"-F",
"#{session_name}:#{window_index}.#{pane_index}",
]
)
for pane in result.stdout.strip().splitlines():
if not pane:
continue
self.ctx.console.info(f"Respawning {pane}...")
self.runner.run(["tmux", "respawn-pane", "-t", pane], capture_output=False)
def _tmux_fallback(self, cname: str) -> None:
if not os.environ.get("TMUX"):
return
result = self.runner.run(["tmux", "display-message", "-p", "#S"])
if result.stdout.strip() != cname:
return
self.runner.run(["tmux", "new-session", "-ds", "default"], capture_output=True)
self.runner.run(["tmux", "switch-client", "-t", "default"], capture_output=True)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,350 @@
"""Shared package-manifest normalization and binary install helpers."""
from __future__ import annotations
import os
import shutil
import tempfile
import urllib.request
from pathlib import Path
from typing import Any, Dict, List, Optional
from flow.core.config import FlowContext
from flow.core.errors import FlowError
from flow.core.variables import substitute_template
PACKAGE_TYPES = {"pkg", "binary", "cask"}
def linux_detect_package_manager() -> Optional[str]:
if shutil.which("apt") or shutil.which("apt-get"):
return "apt"
if shutil.which("dnf"):
return "dnf"
return None
def resolve_package_manager(ctx: FlowContext, profile_cfg: dict) -> str:
explicit = profile_cfg.get("package-manager")
if isinstance(explicit, str) and explicit:
return explicit
profile_os = profile_cfg.get("os")
if profile_os == "macos":
return "brew"
if profile_os == "linux":
detected = linux_detect_package_manager()
if detected:
return detected
raise FlowError("Unable to auto-detect package manager (expected apt or dnf)")
raise FlowError("Profile 'os' must be set to 'linux' or 'macos'")
def get_package_catalog(ctx: FlowContext) -> Dict[str, Dict[str, Any]]:
raw = ctx.manifest.get("packages", [])
catalog: Dict[str, Dict[str, Any]] = {}
if isinstance(raw, dict):
for name, definition in raw.items():
if not isinstance(definition, dict):
continue
package = dict(definition)
package["name"] = str(package.get("name") or name)
package.setdefault("type", "pkg")
catalog[package["name"]] = package
return catalog
if not isinstance(raw, list):
return catalog
for item in raw:
if not isinstance(item, dict):
continue
name = item.get("name")
if not isinstance(name, str) or not name:
continue
package = dict(item)
package.setdefault("type", "pkg")
catalog[name] = package
return catalog
def normalize_profile_package_entry(entry: Any) -> Dict[str, Any]:
if isinstance(entry, str):
if "/" in entry:
prefix, name = entry.split("/", 1)
if prefix in PACKAGE_TYPES and name:
return {"name": name, "type": prefix}
return {"name": entry}
if isinstance(entry, dict):
name = entry.get("name")
if not isinstance(name, str) or not name:
raise FlowError("Package object entries must include a non-empty 'name'")
return dict(entry)
raise FlowError(f"Unsupported package entry: {entry!r}")
def resolve_package_spec(
catalog: Dict[str, Dict[str, Any]],
profile_entry: Dict[str, Any],
) -> Dict[str, Any]:
name = profile_entry["name"]
merged = dict(catalog.get(name, {}))
merged.update(profile_entry)
merged["name"] = name
pkg_type = merged.get("type") or "pkg"
if pkg_type not in PACKAGE_TYPES:
raise FlowError(f"Unsupported package type '{pkg_type}' for package '{name}'")
merged["type"] = pkg_type
return merged
def resolve_pkg_source_name(spec: Dict[str, Any], package_manager: str) -> str:
sources = spec.get("sources", {})
if not isinstance(sources, dict):
return spec["name"]
keys = [package_manager]
if package_manager == "apt":
keys.append("apt-get")
if package_manager == "apt-get":
keys.append("apt")
for key in keys:
value = sources.get(key)
if isinstance(value, str) and value:
return value
return spec["name"]
def platform_lookup_keys(ctx: FlowContext) -> List[str]:
keys = [ctx.platform.platform]
if ctx.platform.os == "macos":
keys.append(f"darwin-{ctx.platform.arch}")
if ctx.platform.arch == "x64":
keys.append(f"{ctx.platform.os}-amd64")
if ctx.platform.os == "macos":
keys.append("darwin-amd64")
ordered: list[str] = []
for key in keys:
if key not in ordered:
ordered.append(key)
return ordered
def profile_template_context(
ctx: FlowContext,
extra_env: Dict[str, str],
extra: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
env_map = dict(os.environ)
env_map.update(extra_env)
template_ctx: Dict[str, Any] = {
"env": env_map,
"os": ctx.platform.os,
"arch": ctx.platform.arch,
}
if extra:
template_ctx.update(extra)
return template_ctx
def render_template_value(value: Any, template_ctx: Dict[str, Any]) -> Any:
if isinstance(value, str):
return substitute_template(value, template_ctx)
if isinstance(value, list):
return [render_template_value(item, template_ctx) for item in value]
if isinstance(value, dict):
return {key: render_template_value(item, template_ctx) for key, item in value.items()}
return value
def resolve_binary_platform_vars(ctx: FlowContext, spec: Dict[str, Any]) -> Dict[str, str]:
platform_vars = {
"os": ctx.platform.os,
"arch": ctx.platform.arch,
}
platform_map = spec.get("platform-map", {})
if isinstance(platform_map, dict):
for key in platform_lookup_keys(ctx):
mapping = platform_map.get(key)
if isinstance(mapping, dict):
for map_key, map_value in mapping.items():
if isinstance(map_value, str):
platform_vars[map_key] = map_value
break
return platform_vars
def resolve_binary_asset(ctx: FlowContext, spec: Dict[str, Any], template_ctx: Dict[str, Any]) -> str:
assets = spec.get("assets", {})
if isinstance(assets, dict) and assets:
for key in platform_lookup_keys(ctx):
value = assets.get(key)
if isinstance(value, str) and value:
return substitute_template(value, template_ctx)
raise FlowError(
f"No binary asset mapping for platform {ctx.platform.platform} in package '{spec['name']}'"
)
pattern = spec.get("asset-pattern")
if not isinstance(pattern, str) or not pattern:
raise FlowError(
f"Binary package '{spec['name']}' must define either 'assets' or 'asset-pattern'"
)
return substitute_template(pattern, template_ctx)
def resolve_binary_download_url(
spec: Dict[str, Any],
asset_name: str,
template_ctx: Dict[str, Any],
) -> str:
source = spec.get("source")
if not isinstance(source, str) or not source:
raise FlowError(f"Binary package '{spec['name']}' is missing 'source'")
version = str(spec.get("version", ""))
if source.startswith("github:"):
owner_repo = source[len("github:") :]
if not owner_repo:
raise FlowError(f"Invalid github source in package '{spec['name']}'")
if not version:
raise FlowError(f"Binary package '{spec['name']}' requires 'version'")
return f"https://github.com/{owner_repo}/releases/download/v{version}/{asset_name}"
rendered_source = substitute_template(source, template_ctx)
if not asset_name or rendered_source.endswith(asset_name):
return rendered_source
if rendered_source.endswith("/"):
return rendered_source + asset_name
return f"{rendered_source}/{asset_name}"
def strip_prefix(path: Path, prefix: Path) -> Path:
try:
return path.relative_to(prefix)
except ValueError:
return path
def validate_declared_install_path(package_name: str, declared_path: Path) -> None:
if declared_path.is_absolute():
raise FlowError(f"Install path for '{package_name}' must be relative: {declared_path}")
if any(part == ".." for part in declared_path.parts):
raise FlowError(
f"Install path for '{package_name}' must not include parent traversal: {declared_path}"
)
def install_destination(kind: str) -> Path:
home = Path.home()
if kind == "bin":
return home / ".local" / "bin"
if kind == "share":
return home / ".local" / "share"
if kind == "man":
return home / ".local" / "share" / "man"
if kind == "lib":
return home / ".local" / "lib"
raise FlowError(f"Unsupported install section: {kind}")
def install_strip_prefix(kind: str) -> Path:
if kind == "bin":
return Path("bin")
if kind == "share":
return Path("share")
if kind == "man":
return Path("share") / "man"
if kind == "lib":
return Path("lib")
return Path(".")
class BinaryInstaller:
def __init__(self, ctx: FlowContext):
self.ctx = ctx
self.fs = ctx.runtime.fs
def copy_install_item(self, kind: str, src: Path, declared_path: Path) -> None:
destination_root = install_destination(kind)
stripped = strip_prefix(declared_path, install_strip_prefix(kind))
destination = destination_root / stripped
if src.is_dir():
self.fs.copy_tree(src, destination)
else:
self.fs.copy_file(src, destination)
if kind == "bin":
destination.chmod(destination.stat().st_mode | 0o111)
def install(self, spec: Dict[str, Any], extra_env: Dict[str, str], *, dry_run: bool) -> None:
version = str(spec.get("version", ""))
platform_vars = resolve_binary_platform_vars(self.ctx, spec)
template_ctx = profile_template_context(
self.ctx,
extra_env,
{"name": spec["name"], "version": version, **platform_vars},
)
asset_name = resolve_binary_asset(self.ctx, spec, template_ctx)
template_ctx["asset"] = asset_name
download_url = resolve_binary_download_url(spec, asset_name, template_ctx)
template_ctx["downloadUrl"] = download_url
if dry_run:
self.ctx.console.info(f"[{spec['name']}] Would download: {download_url}")
return
install_map = spec.get("install", {})
if not isinstance(install_map, dict) or not install_map:
raise FlowError(f"Binary package '{spec['name']}' must define non-empty 'install'")
with tempfile.TemporaryDirectory(prefix=f"flow-{spec['name']}-") as tmp:
tmp_dir = Path(tmp)
archive_path = tmp_dir / asset_name
extracted = tmp_dir / "extract"
self.ctx.console.info(f"Downloading {spec['name']} from {download_url}")
with urllib.request.urlopen(download_url, timeout=60) as response:
self.fs.write_bytes(archive_path, response.read())
self.fs.ensure_dir(extracted)
try:
shutil.unpack_archive(str(archive_path), str(extracted))
except (shutil.ReadError, ValueError) as exc:
raise FlowError(f"Could not extract archive for '{spec['name']}': {exc}") from exc
extract_dir_value = substitute_template(str(spec.get("extract-dir", ".")), template_ctx)
source_root = extracted if extract_dir_value == "." else extracted / extract_dir_value
if not source_root.exists():
raise FlowError(
f"extract-dir '{extract_dir_value}' not found for package '{spec['name']}'"
)
source_root_resolved = source_root.resolve(strict=False)
for kind in ("bin", "share", "man", "lib"):
items = install_map.get(kind, [])
if not isinstance(items, list):
continue
for raw_item in items:
if not isinstance(raw_item, str):
continue
rendered = substitute_template(raw_item, template_ctx)
declared_path = Path(rendered)
validate_declared_install_path(spec["name"], declared_path)
source = (source_root / declared_path).resolve(strict=False)
if not str(source).startswith(str(source_root_resolved)):
raise FlowError(
f"Install path escapes extract-dir for '{spec['name']}': {declared_path}"
)
if not source.exists():
raise FlowError(
f"Install path not found for '{spec['name']}': {declared_path}"
)
self.copy_install_item(kind, source, declared_path)

View File

@@ -0,0 +1,113 @@
"""Package-state service built on shared package definitions."""
from __future__ import annotations
from pathlib import Path
from flow.core.config import FlowContext
from flow.core.system import JsonStateStore
from flow.services.package_defs import BinaryInstaller, get_package_catalog
class PackageService:
def __init__(self, ctx: FlowContext, *, installed_state: Path):
self.ctx = ctx
self.installed = JsonStateStore(installed_state, ctx.runtime.fs, dict)
self.binary_installer = BinaryInstaller(ctx)
def load_installed(self) -> dict:
state = self.installed.load()
return state if isinstance(state, dict) else {}
def save_installed(self, state: dict) -> None:
self.installed.save(state)
def definitions(self):
return get_package_catalog(self.ctx)
def install(self, args) -> None:
definitions = self.definitions()
installed = self.load_installed()
had_error = False
for package_name in args.packages:
package_def = definitions.get(package_name)
if not package_def:
self.ctx.console.error(f"Package not found in manifest: {package_name}")
had_error = True
continue
package_type = package_def.get("type", "pkg")
if package_type != "binary":
self.ctx.console.error(
f"'flow package install' supports binary packages only. '{package_name}' is type '{package_type}'."
)
had_error = True
continue
self.ctx.console.info(f"Installing {package_name}...")
try:
self.binary_installer.install(package_def, {}, dry_run=args.dry_run)
except RuntimeError as exc:
self.ctx.console.error(str(exc))
had_error = True
continue
if not args.dry_run:
installed[package_name] = {
"version": str(package_def.get("version", "")),
"type": package_type,
}
self.ctx.console.success(f"Installed {package_name}")
if not args.dry_run:
self.save_installed(installed)
if had_error:
raise SystemExit(1)
def list(self, args) -> None:
definitions = self.definitions()
installed = self.load_installed()
rows = []
if args.all:
if not definitions:
self.ctx.console.info("No packages defined in manifest.")
return
for name, package_def in sorted(definitions.items()):
rows.append(
[
name,
str(package_def.get("type", "pkg")),
str(installed.get(name, {}).get("version", "-")),
str(package_def.get("version", "")) or "-",
]
)
else:
if not installed:
self.ctx.console.info("No packages installed.")
return
for name, info in sorted(installed.items()):
rows.append(
[
name,
str(info.get("type", "?")),
str(info.get("version", "?")),
str(definitions.get(name, {}).get("version", "")) or "-",
]
)
self.ctx.console.table(["PACKAGE", "TYPE", "INSTALLED", "AVAILABLE"], rows)
def remove(self, args) -> None:
installed = self.load_installed()
for package_name in args.packages:
if package_name not in installed:
self.ctx.console.warn(f"Package not installed: {package_name}")
continue
del installed[package_name]
self.ctx.console.success(f"Removed {package_name} from installed packages")
self.ctx.console.warn(
"Note: installed files were not automatically deleted. Remove manually if needed."
)
self.save_installed(installed)

View File

@@ -0,0 +1,174 @@
"""Project sync service for `flow sync`."""
from __future__ import annotations
import os
import subprocess
from flow.core.config import FlowContext
from flow.core.errors import FlowError
class ProjectSyncService:
"""Inspect and synchronize git repositories under the projects directory."""
def __init__(self, ctx: FlowContext):
self.ctx = ctx
self.runner = ctx.runtime.runner
def git(self, repo: str, *cmd: str, capture: bool = True) -> subprocess.CompletedProcess[str]:
return self.runner.run(
["git", "-C", repo, *cmd],
capture_output=capture,
)
def is_git_repo(self, repo_path: str) -> bool:
git_dir = os.path.join(repo_path, ".git")
return os.path.isdir(git_dir) or os.path.isfile(git_dir)
def check_repo(self, repo_path: str, do_fetch: bool = True) -> tuple[str, list[str] | None]:
name = os.path.basename(repo_path)
if not self.is_git_repo(repo_path):
return name, None
issues: list[str] = []
if do_fetch:
fetch_result = self.git(repo_path, "fetch", "--all", "--quiet")
if fetch_result.returncode != 0:
issues.append("git fetch failed")
result = self.git(repo_path, "rev-parse", "--abbrev-ref", "HEAD")
branch = result.stdout.strip() if result.returncode == 0 else "HEAD"
diff_result = self.git(repo_path, "diff", "--quiet")
cached_result = self.git(repo_path, "diff", "--cached", "--quiet")
if diff_result.returncode != 0 or cached_result.returncode != 0:
issues.append("uncommitted changes")
else:
untracked = self.git(repo_path, "ls-files", "--others", "--exclude-standard")
if untracked.stdout.strip():
issues.append("untracked files")
upstream_check = self.git(repo_path, "rev-parse", "--abbrev-ref", f"{branch}@{{u}}")
if upstream_check.returncode == 0:
unpushed = self.git(repo_path, "rev-list", "--oneline", f"{branch}@{{u}}..{branch}")
if unpushed.stdout.strip():
issues.append(
f"{len(unpushed.stdout.strip().splitlines())} unpushed commit(s) on {branch}"
)
else:
issues.append(f"no upstream for {branch}")
branches_result = self.git(
repo_path,
"for-each-ref",
"--format=%(refname:short)",
"refs/heads",
)
for branch_name in branches_result.stdout.strip().splitlines():
if not branch_name or branch_name == branch:
continue
upstream = self.git(repo_path, "rev-parse", "--abbrev-ref", f"{branch_name}@{{u}}")
if upstream.returncode == 0:
ahead = self.git(repo_path, "rev-list", "--count", f"{branch_name}@{{u}}..{branch_name}")
if ahead.stdout.strip() != "0":
issues.append(f"branch {branch_name}: {ahead.stdout.strip()} ahead")
else:
issues.append(f"branch {branch_name}: no upstream")
return name, issues
def _projects_dir(self) -> str:
projects_dir = os.path.expanduser(self.ctx.config.projects_dir)
if not os.path.isdir(projects_dir):
raise FlowError(f"Projects directory not found: {projects_dir}")
return projects_dir
def run_check(self, args) -> None:
projects_dir = self._projects_dir()
rows = []
needs_action = []
not_git = []
checked = 0
for entry in sorted(os.listdir(projects_dir)):
repo_path = os.path.join(projects_dir, entry)
if not os.path.isdir(repo_path):
continue
name, issues = self.check_repo(repo_path, do_fetch=args.fetch)
if issues is None:
not_git.append(name)
continue
checked += 1
if issues:
needs_action.append(name)
rows.append([name, "; ".join(issues)])
else:
rows.append([name, "clean and synced"])
if checked == 0:
self.ctx.console.info("No git repositories found in projects directory.")
if not_git:
self.ctx.console.info(f"Skipped non-git directories: {', '.join(sorted(not_git))}")
return
self.ctx.console.table(["PROJECT", "STATUS"], rows)
if needs_action:
self.ctx.console.warn(f"Projects needing action: {', '.join(sorted(needs_action))}")
else:
self.ctx.console.success("All repositories clean and synced.")
if not_git:
self.ctx.console.info(f"Skipped non-git directories: {', '.join(sorted(not_git))}")
def run_fetch(self, _args) -> None:
projects_dir = self._projects_dir()
had_error = False
fetched = 0
for entry in sorted(os.listdir(projects_dir)):
repo_path = os.path.join(projects_dir, entry)
if not self.is_git_repo(repo_path):
continue
self.ctx.console.info(f"Fetching {entry}...")
result = self.git(repo_path, "fetch", "--all", "--quiet")
fetched += 1
if result.returncode != 0:
self.ctx.console.error(f"Failed to fetch {entry}")
had_error = True
if fetched == 0:
self.ctx.console.info("No git repositories found in projects directory.")
return
if had_error:
raise SystemExit(1)
self.ctx.console.success("All remotes fetched.")
def run_summary(self, _args) -> None:
projects_dir = self._projects_dir()
rows = []
for entry in sorted(os.listdir(projects_dir)):
repo_path = os.path.join(projects_dir, entry)
if not os.path.isdir(repo_path):
continue
name, issues = self.check_repo(repo_path, do_fetch=False)
if issues is None:
rows.append([name, "not a git repo"])
elif issues:
rows.append([name, "; ".join(issues)])
else:
rows.append([name, "clean"])
if not rows:
self.ctx.console.info("No projects found.")
return
self.ctx.console.table(["PROJECT", "STATUS"], rows)

184
src/flow/services/ssh.py Normal file
View File

@@ -0,0 +1,184 @@
"""SSH target parsing and connection behavior for `flow enter`."""
from __future__ import annotations
import getpass
import os
from typing import Optional
from flow.core.config import FlowContext
from flow.core.errors import FlowError
# Default host templates per platform
HOST_TEMPLATES = {
"orb": "<namespace>.orb",
"utm": "<namespace>.utm.local",
"core": "<namespace>.core.lan",
}
def parse_target(target: str) -> tuple[Optional[str], Optional[str], Optional[str]]:
"""Parse [user@]namespace@platform into (user, namespace, platform)."""
user = None
namespace = None
platform = None
if "@" in target:
platform = target.rsplit("@", 1)[1]
rest = target.rsplit("@", 1)[0]
else:
rest = target
if "@" in rest:
user = rest.rsplit("@", 1)[0]
namespace = rest.rsplit("@", 1)[1]
else:
namespace = rest
return user, namespace, platform
def build_destination(user: str, host: str, preserve_host_user: bool = False) -> str:
if "@" in host:
host_user, host_name = host.rsplit("@", 1)
effective_user = host_user if preserve_host_user else (user or host_user)
return f"{effective_user}@{host_name}"
if not user:
return host
return f"{user}@{host}"
def terminfo_fix_command(term: Optional[str], destination: str) -> Optional[str]:
normalized_term = (term or "").strip().lower()
if normalized_term == "xterm-ghostty":
return f"infocmp -x xterm-ghostty | ssh {destination} -- tic -x -"
if normalized_term == "wezterm":
return (
f"ssh {destination} -- sh -lc "
"'tempfile=$(mktemp) && curl -fsSL -o \"$tempfile\" "
"https://raw.githubusercontent.com/wezterm/wezterm/main/termwiz/data/wezterm.terminfo "
"&& tic -x -o ~/.terminfo \"$tempfile\" && rm \"$tempfile\"'"
)
return None
def handle_terminfo_warning(
ctx: FlowContext,
term: Optional[str],
destination: str,
dry_run: bool,
) -> bool:
install_cmd = terminfo_fix_command(term, destination)
if not install_cmd:
return True
ctx.console.warn(
f"Detected TERM={term}. Remote host may be missing this terminfo entry."
)
ctx.console.info("flow will not install or modify terminfo on the target automatically.")
ctx.console.info("If needed, run this command manually before reconnecting:")
print(f" {install_cmd}")
if dry_run or not os.isatty(0):
return True
response = ""
try:
response = input("Continue with SSH connection? [Y/n] ").strip().lower()
except EOFError:
return True
if response in {"n", "no"}:
ctx.console.warn("Cancelled before opening SSH session")
return False
return True
class EnterService:
"""Resolve enter targets and execute the SSH handoff."""
def __init__(self, ctx: FlowContext):
self.ctx = ctx
def run(self, args) -> None:
if os.environ.get("DF_NAMESPACE") and os.environ.get("DF_PLATFORM"):
ns = os.environ["DF_NAMESPACE"]
plat = os.environ["DF_PLATFORM"]
raise FlowError(
f"Not recommended inside an instance. Currently in: {ns}@{plat}"
)
user, namespace, platform = parse_target(args.target)
if args.user:
user = args.user
if args.namespace:
namespace = args.namespace
if args.platform:
platform = args.platform
user_was_explicit = bool(user)
if not user:
user = os.environ.get("USER") or getpass.getuser()
if not namespace:
raise FlowError("Namespace is required in target")
if not platform:
raise FlowError("Platform is required in target")
host_template = HOST_TEMPLATES.get(platform)
ssh_identity = None
for target in self.ctx.config.targets:
if target.namespace == namespace and target.platform == platform:
host_template = target.ssh_host
ssh_identity = target.ssh_identity
break
if not host_template:
raise FlowError(f"Unknown platform: {platform}")
ssh_host = host_template.replace("<namespace>", namespace)
destination = build_destination(
user,
ssh_host,
preserve_host_user=not user_was_explicit,
)
if not handle_terminfo_warning(
self.ctx,
os.environ.get("TERM"),
destination,
dry_run=args.dry_run,
):
raise FlowError("Cancelled before opening SSH session")
ssh_cmd = ["ssh", "-tt"]
if ssh_identity:
ssh_cmd.extend(["-i", os.path.expanduser(ssh_identity)])
ssh_cmd.append(destination)
if not args.no_tmux:
ssh_cmd.extend(
[
"tmux",
"new-session",
"-As",
args.session,
"-e",
f"DF_NAMESPACE={namespace}",
"-e",
f"DF_PLATFORM={platform}",
]
)
if args.dry_run:
self.ctx.console.info("Dry run command:")
print(" " + " ".join(ssh_cmd))
return
os.execvp("ssh", ssh_cmd)

View File

@@ -62,12 +62,12 @@ def test_resolve_package_manager_explicit_value(ctx):
def test_resolve_package_manager_linux_auto_apt(monkeypatch, ctx): def test_resolve_package_manager_linux_auto_apt(monkeypatch, ctx):
monkeypatch.setattr("flow.commands.bootstrap.shutil.which", lambda name: "/usr/bin/apt" if name == "apt" else None) monkeypatch.setattr("flow.services.bootstrap.shutil.which", lambda name: "/usr/bin/apt" if name == "apt" else None)
assert _resolve_package_manager(ctx, {"os": "linux"}) == "apt" assert _resolve_package_manager(ctx, {"os": "linux"}) == "apt"
def test_resolve_package_manager_linux_auto_dnf(monkeypatch, ctx): def test_resolve_package_manager_linux_auto_dnf(monkeypatch, ctx):
monkeypatch.setattr("flow.commands.bootstrap.shutil.which", lambda name: "/usr/bin/dnf" if name == "dnf" else None) monkeypatch.setattr("flow.services.bootstrap.shutil.which", lambda name: "/usr/bin/dnf" if name == "dnf" else None)
assert _resolve_package_manager(ctx, {"os": "linux"}) == "dnf" assert _resolve_package_manager(ctx, {"os": "linux"}) == "dnf"
@@ -158,7 +158,7 @@ class _FakeResponse:
def _patch_binary_download(monkeypatch, after_unpack=None): def _patch_binary_download(monkeypatch, after_unpack=None):
monkeypatch.setattr( monkeypatch.setattr(
"flow.commands.bootstrap.urllib.request.urlopen", "flow.services.bootstrap.urllib.request.urlopen",
lambda *args, **kwargs: _FakeResponse(), lambda *args, **kwargs: _FakeResponse(),
) )
@@ -168,7 +168,7 @@ def _patch_binary_download(monkeypatch, after_unpack=None):
if after_unpack: if after_unpack:
after_unpack(extracted) after_unpack(extracted)
monkeypatch.setattr("flow.commands.bootstrap.shutil.unpack_archive", _fake_unpack) monkeypatch.setattr("flow.services.bootstrap.shutil.unpack_archive", _fake_unpack)
def test_install_binary_package_rejects_absolute_declared_path(monkeypatch, tmp_path, ctx): def test_install_binary_package_rejects_absolute_declared_path(monkeypatch, tmp_path, ctx):
@@ -177,7 +177,7 @@ def test_install_binary_package_rejects_absolute_declared_path(monkeypatch, tmp_
_patch_binary_download(monkeypatch) _patch_binary_download(monkeypatch)
monkeypatch.setattr( monkeypatch.setattr(
"flow.commands.bootstrap._copy_install_item", "flow.services.bootstrap._copy_install_item",
lambda *args, **kwargs: pytest.fail("_copy_install_item should not be called"), lambda *args, **kwargs: pytest.fail("_copy_install_item should not be called"),
) )
@@ -199,7 +199,7 @@ def test_install_binary_package_rejects_parent_traversal_declared_path(monkeypat
_patch_binary_download(monkeypatch, after_unpack=_after_unpack) _patch_binary_download(monkeypatch, after_unpack=_after_unpack)
monkeypatch.setattr( monkeypatch.setattr(
"flow.commands.bootstrap._copy_install_item", "flow.services.bootstrap._copy_install_item",
lambda *args, **kwargs: pytest.fail("_copy_install_item should not be called"), lambda *args, **kwargs: pytest.fail("_copy_install_item should not be called"),
) )

View File

@@ -1,8 +1,8 @@
"""Tests for flow.commands.dotfiles discovery and path resolution.""" """Tests for flow.services.dotfiles discovery and path resolution."""
import pytest import pytest
from flow.commands.dotfiles import _collect_home_specs, _discover_packages, _resolve_edit_target, _walk_package from flow.services.dotfiles import _collect_home_specs, _discover_packages, _resolve_edit_target, _walk_package
from flow.core.config import AppConfig, FlowContext from flow.core.config import AppConfig, FlowContext
from flow.core.console import ConsoleLogger from flow.core.console import ConsoleLogger
from flow.core.platform import PlatformInfo from flow.core.platform import PlatformInfo

View File

@@ -15,12 +15,12 @@ import pytest
REPO_ROOT = Path(__file__).resolve().parents[1] REPO_ROOT = Path(__file__).resolve().parents[1]
def _docker_available() -> bool: def _runtime_available(runtime: str) -> bool:
if shutil.which("docker") is None: if shutil.which(runtime) is None:
return False return False
result = subprocess.run( result = subprocess.run(
["docker", "info"], [runtime, "info"],
capture_output=True, capture_output=True,
text=True, text=True,
check=False, check=False,
@@ -28,16 +28,36 @@ def _docker_available() -> bool:
return result.returncode == 0 return result.returncode == 0
def _require_container_e2e() -> None: def _container_runtime() -> str | None:
preferred = os.environ.get("FLOW_E2E_CONTAINER_RUNTIME")
candidates = [preferred] if preferred else ["podman", "docker"]
for runtime in candidates:
if not runtime:
continue
if _runtime_available(runtime):
return runtime
return None
def _require_container_e2e() -> str:
if os.environ.get("FLOW_RUN_E2E_CONTAINER") != "1": if os.environ.get("FLOW_RUN_E2E_CONTAINER") != "1":
pytest.skip("Set FLOW_RUN_E2E_CONTAINER=1 to run container e2e tests") pytest.skip("Set FLOW_RUN_E2E_CONTAINER=1 to run container e2e tests")
if not _docker_available(): runtime = _container_runtime()
pytest.skip("Docker is required for container e2e tests") if runtime is None:
pytest.skip("Podman or Docker is required for container e2e tests")
return runtime
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def e2e_image(tmp_path_factory): def e2e_runtime():
_require_container_e2e() return _require_container_e2e()
@pytest.fixture(scope="module")
def e2e_image(tmp_path_factory, e2e_runtime):
runtime = e2e_runtime
context_dir = tmp_path_factory.mktemp("flow-e2e-docker-context") context_dir = tmp_path_factory.mktemp("flow-e2e-docker-context")
dockerfile = context_dir / "Dockerfile" dockerfile = context_dir / "Dockerfile"
@@ -53,7 +73,7 @@ def e2e_image(tmp_path_factory):
tag = f"flow-e2e-{uuid.uuid4().hex[:10]}" tag = f"flow-e2e-{uuid.uuid4().hex[:10]}"
subprocess.run( subprocess.run(
["docker", "build", "-t", tag, str(context_dir)], [runtime, "build", "-t", tag, str(context_dir)],
check=True, check=True,
capture_output=True, capture_output=True,
text=True, text=True,
@@ -62,13 +82,13 @@ def e2e_image(tmp_path_factory):
try: try:
yield tag yield tag
finally: finally:
subprocess.run(["docker", "rmi", "-f", tag], capture_output=True, text=True, check=False) subprocess.run([runtime, "rmi", "-f", tag], capture_output=True, text=True, check=False)
def _run_in_container(image_tag: str, script: str) -> subprocess.CompletedProcess: def _run_in_container(runtime: str, image_tag: str, script: str) -> subprocess.CompletedProcess:
return subprocess.run( return subprocess.run(
[ [
"docker", runtime,
"run", "run",
"--rm", "--rm",
"-v", "-v",
@@ -89,7 +109,7 @@ def _assert_ok(run: subprocess.CompletedProcess) -> None:
raise AssertionError(f"Container e2e failed:\nSTDOUT:\n{run.stdout}\nSTDERR:\n{run.stderr}") raise AssertionError(f"Container e2e failed:\nSTDOUT:\n{run.stdout}\nSTDERR:\n{run.stderr}")
def test_e2e_link_and_undo_with_root_targets(e2e_image): def test_e2e_link_and_undo_with_root_targets(e2e_runtime, e2e_image):
script = r""" script = r"""
set -euo pipefail set -euo pipefail
export HOME=/home/flow export HOME=/home/flow
@@ -116,10 +136,10 @@ test ! -L "$HOME/.zshrc"
grep -q '^# before$' "$HOME/.zshrc" grep -q '^# before$' "$HOME/.zshrc"
test ! -e /tmp/flow-e2e-root-target test ! -e /tmp/flow-e2e-root-target
""" """
_assert_ok(_run_in_container(e2e_image, script)) _assert_ok(_run_in_container(e2e_runtime, e2e_image, script))
def test_e2e_dry_run_force_is_read_only_in_both_flag_orders(e2e_image): def test_e2e_dry_run_force_is_read_only_in_both_flag_orders(e2e_runtime, e2e_image):
script = r""" script = r"""
set -euo pipefail set -euo pipefail
export HOME=/home/flow export HOME=/home/flow
@@ -150,10 +170,10 @@ assert "last_transaction" not in data, data
PY PY
fi fi
""" """
_assert_ok(_run_in_container(e2e_image, script)) _assert_ok(_run_in_container(e2e_runtime, e2e_image, script))
def test_e2e_unmanaged_conflict_without_force_is_non_destructive(e2e_image): def test_e2e_unmanaged_conflict_without_force_is_non_destructive(e2e_runtime, e2e_image):
script = r""" script = r"""
set -euo pipefail set -euo pipefail
export HOME=/home/flow export HOME=/home/flow
@@ -177,10 +197,10 @@ test -f "$HOME/.zshrc"
test ! -L "$HOME/.zshrc" test ! -L "$HOME/.zshrc"
grep -q '^# user-file$' "$HOME/.zshrc" grep -q '^# user-file$' "$HOME/.zshrc"
""" """
_assert_ok(_run_in_container(e2e_image, script)) _assert_ok(_run_in_container(e2e_runtime, e2e_image, script))
def test_e2e_managed_drift_requires_force(e2e_image): def test_e2e_managed_drift_requires_force(e2e_runtime, e2e_image):
script = r""" script = r"""
set -euo pipefail set -euo pipefail
export HOME=/home/flow export HOME=/home/flow
@@ -208,10 +228,10 @@ test -f "$HOME/.zshrc"
test ! -L "$HOME/.zshrc" test ! -L "$HOME/.zshrc"
grep -q '^# drifted-manual$' "$HOME/.zshrc" grep -q '^# drifted-manual$' "$HOME/.zshrc"
""" """
_assert_ok(_run_in_container(e2e_image, script)) _assert_ok(_run_in_container(e2e_runtime, e2e_image, script))
def test_e2e_directory_conflict_is_atomic_even_with_force(e2e_image): def test_e2e_directory_conflict_is_atomic_even_with_force(e2e_runtime, e2e_image):
script = r""" script = r"""
set -euo pipefail set -euo pipefail
export HOME=/home/flow export HOME=/home/flow
@@ -236,10 +256,10 @@ test "$rc" -ne 0
test -d "$HOME/.zshrc" test -d "$HOME/.zshrc"
test ! -e "$HOME/.gitconfig" test ! -e "$HOME/.gitconfig"
""" """
_assert_ok(_run_in_container(e2e_image, script)) _assert_ok(_run_in_container(e2e_runtime, e2e_image, script))
def test_e2e_undo_after_failed_followup_link_restores_last_transaction(e2e_image): def test_e2e_undo_after_failed_followup_link_restores_last_transaction(e2e_runtime, e2e_image):
script = r""" script = r"""
set -euo pipefail set -euo pipefail
export HOME=/home/flow export HOME=/home/flow
@@ -273,4 +293,4 @@ test -f "$HOME/.a"
test ! -L "$HOME/.a" test ! -L "$HOME/.a"
grep -q '^# pre-a$' "$HOME/.a" grep -q '^# pre-a$' "$HOME/.a"
""" """
_assert_ok(_run_in_container(e2e_image, script)) _assert_ok(_run_in_container(e2e_runtime, e2e_image, script))

View File

@@ -7,7 +7,7 @@ from pathlib import Path
import pytest import pytest
from flow.commands.dotfiles import ( from flow.services.dotfiles import (
LinkSpec, LinkSpec,
_collect_home_specs, _collect_home_specs,
_list_profiles, _list_profiles,
@@ -16,6 +16,7 @@ from flow.commands.dotfiles import (
_pull_requires_ack, _pull_requires_ack,
_resolved_package_source, _resolved_package_source,
_run_sudo, _run_sudo,
run_relink,
run_undo, run_undo,
_save_link_specs_to_state, _save_link_specs_to_state,
_sync_to_desired, _sync_to_desired,
@@ -95,7 +96,7 @@ def test_collect_home_specs_skip_root_marker(tmp_path):
def test_state_round_trip(tmp_path, monkeypatch): def test_state_round_trip(tmp_path, monkeypatch):
state_file = tmp_path / "linked.json" state_file = tmp_path / "linked.json"
monkeypatch.setattr("flow.commands.dotfiles.LINKED_STATE", state_file) monkeypatch.setattr("flow.services.dotfiles.LINKED_STATE", state_file)
specs = { specs = {
Path("/home/user/.gitconfig"): LinkSpec( Path("/home/user/.gitconfig"): LinkSpec(
@@ -113,7 +114,7 @@ def test_state_round_trip(tmp_path, monkeypatch):
def test_state_old_format_rejected(tmp_path, monkeypatch): def test_state_old_format_rejected(tmp_path, monkeypatch):
state_file = tmp_path / "linked.json" state_file = tmp_path / "linked.json"
monkeypatch.setattr("flow.commands.dotfiles.LINKED_STATE", state_file) monkeypatch.setattr("flow.services.dotfiles.LINKED_STATE", state_file)
state_file.write_text( state_file.write_text(
json.dumps( json.dumps(
{ {
@@ -131,24 +132,24 @@ def test_state_old_format_rejected(tmp_path, monkeypatch):
def test_module_source_requires_sync(tmp_path): def test_module_source_requires_sync(tmp_path):
package_dir = tmp_path / "_shared" / "nvim" package_root = tmp_path / "_shared" / "nvim"
package_dir.mkdir(parents=True) module_mount = package_root / ".config" / "nvim"
(package_dir / "_module.yaml").write_text( module_mount.mkdir(parents=True)
(module_mount / "_module.yaml").write_text(
"source: github:dummy/example\n" "source: github:dummy/example\n"
"ref:\n" "ref:\n"
" branch: main\n" " branch: main\n"
) )
with pytest.raises(RuntimeError, match="Run 'flow dotfiles sync' first"): with pytest.raises(RuntimeError, match="Run 'flow dotfiles sync' first"):
_resolved_package_source(_ctx(), "_shared/nvim", package_dir) _resolved_package_source(_ctx(), "_shared/nvim", package_root)
def test_sync_modules_populates_cache_and_resolves_source(tmp_path, monkeypatch): def test_sync_modules_populates_cache_and_resolves_source(tmp_path, monkeypatch):
module_src = tmp_path / "module-src" module_src = tmp_path / "module-src"
module_src.mkdir() module_src.mkdir()
subprocess.run(["git", "init", "-b", "main", str(module_src)], check=True) subprocess.run(["git", "init", "-b", "main", str(module_src)], check=True)
(module_src / ".config" / "nvim").mkdir(parents=True) (module_src / "init.lua").write_text("-- module")
(module_src / ".config" / "nvim" / "init.lua").write_text("-- module")
subprocess.run(["git", "-C", str(module_src), "add", "."], check=True) subprocess.run(["git", "-C", str(module_src), "add", "."], check=True)
subprocess.run( subprocess.run(
[ [
@@ -167,30 +168,30 @@ def test_sync_modules_populates_cache_and_resolves_source(tmp_path, monkeypatch)
) )
dotfiles = tmp_path / "dotfiles" dotfiles = tmp_path / "dotfiles"
package_dir = dotfiles / "_shared" / "nvim" package_root = dotfiles / "_shared" / "nvim"
package_dir.mkdir(parents=True) module_mount = package_root / ".config" / "nvim"
(package_dir / "_module.yaml").write_text( module_mount.mkdir(parents=True)
(module_mount / "_module.yaml").write_text(
f"source: {module_src}\n" f"source: {module_src}\n"
"ref:\n" "ref:\n"
" branch: main\n" " branch: main\n"
) )
(package_dir / "notes.txt").write_text("ignore me") (package_root / "notes.txt").write_text("ignore me")
monkeypatch.setattr("flow.commands.dotfiles.DOTFILES_DIR", dotfiles) monkeypatch.setattr("flow.services.dotfiles.DOTFILES_DIR", dotfiles)
monkeypatch.setattr("flow.commands.dotfiles.MODULES_DIR", tmp_path / "modules") monkeypatch.setattr("flow.services.dotfiles.MODULES_DIR", tmp_path / "modules")
_sync_modules(_ctx(), verbose=False) _sync_modules(_ctx(), verbose=False)
resolved = _resolved_package_source(_ctx(), "_shared/nvim", package_dir) resolved = _resolved_package_source(_ctx(), "_shared/nvim", package_root)
assert (resolved / ".config" / "nvim" / "init.lua").exists() assert (resolved / "init.lua").exists()
def test_module_backed_link_specs_exclude_git_internals(tmp_path, monkeypatch): def test_module_backed_link_specs_exclude_git_internals(tmp_path, monkeypatch):
module_src = tmp_path / "module-src" module_src = tmp_path / "module-src"
module_src.mkdir() module_src.mkdir()
subprocess.run(["git", "init", "-b", "main", str(module_src)], check=True) subprocess.run(["git", "init", "-b", "main", str(module_src)], check=True)
(module_src / ".config" / "nvim").mkdir(parents=True) (module_src / "init.lua").write_text("-- module")
(module_src / ".config" / "nvim" / "init.lua").write_text("-- module")
subprocess.run(["git", "-C", str(module_src), "add", "."], check=True) subprocess.run(["git", "-C", str(module_src), "add", "."], check=True)
subprocess.run( subprocess.run(
[ [
@@ -209,16 +210,17 @@ def test_module_backed_link_specs_exclude_git_internals(tmp_path, monkeypatch):
) )
dotfiles = tmp_path / "dotfiles" dotfiles = tmp_path / "dotfiles"
package_dir = dotfiles / "_shared" / "nvim" package_root = dotfiles / "_shared" / "nvim"
package_dir.mkdir(parents=True) module_mount = package_root / ".config" / "nvim"
(package_dir / "_module.yaml").write_text( module_mount.mkdir(parents=True)
(module_mount / "_module.yaml").write_text(
f"source: {module_src}\n" f"source: {module_src}\n"
"ref:\n" "ref:\n"
" branch: main\n" " branch: main\n"
) )
monkeypatch.setattr("flow.commands.dotfiles.DOTFILES_DIR", dotfiles) monkeypatch.setattr("flow.services.dotfiles.DOTFILES_DIR", dotfiles)
monkeypatch.setattr("flow.commands.dotfiles.MODULES_DIR", tmp_path / "modules") monkeypatch.setattr("flow.services.dotfiles.MODULES_DIR", tmp_path / "modules")
_sync_modules(_ctx(), verbose=False) _sync_modules(_ctx(), verbose=False)
@@ -234,8 +236,7 @@ def test_sync_modules_resolves_relative_source_independent_of_cwd(tmp_path, monk
module_src = tmp_path / "module-src" module_src = tmp_path / "module-src"
module_src.mkdir() module_src.mkdir()
subprocess.run(["git", "init", "-b", "main", str(module_src)], check=True) subprocess.run(["git", "init", "-b", "main", str(module_src)], check=True)
(module_src / ".config" / "nvim").mkdir(parents=True) (module_src / "init.lua").write_text("-- module")
(module_src / ".config" / "nvim" / "init.lua").write_text("-- module")
subprocess.run(["git", "-C", str(module_src), "add", "."], check=True) subprocess.run(["git", "-C", str(module_src), "add", "."], check=True)
subprocess.run( subprocess.run(
[ [
@@ -254,10 +255,11 @@ def test_sync_modules_resolves_relative_source_independent_of_cwd(tmp_path, monk
) )
dotfiles = tmp_path / "dotfiles" dotfiles = tmp_path / "dotfiles"
package_dir = dotfiles / "_shared" / "nvim" package_root = dotfiles / "_shared" / "nvim"
package_dir.mkdir(parents=True) module_mount = package_root / ".config" / "nvim"
relative_source = Path("../../../module-src") module_mount.mkdir(parents=True)
(package_dir / "_module.yaml").write_text( relative_source = Path("../../../../../module-src")
(module_mount / "_module.yaml").write_text(
f"source: {relative_source}\n" f"source: {relative_source}\n"
"ref:\n" "ref:\n"
" branch: main\n" " branch: main\n"
@@ -266,13 +268,61 @@ def test_sync_modules_resolves_relative_source_independent_of_cwd(tmp_path, monk
unrelated_cwd = tmp_path / "unrelated-cwd" unrelated_cwd = tmp_path / "unrelated-cwd"
unrelated_cwd.mkdir() unrelated_cwd.mkdir()
monkeypatch.chdir(unrelated_cwd) monkeypatch.chdir(unrelated_cwd)
monkeypatch.setattr("flow.commands.dotfiles.DOTFILES_DIR", dotfiles) monkeypatch.setattr("flow.services.dotfiles.DOTFILES_DIR", dotfiles)
monkeypatch.setattr("flow.commands.dotfiles.MODULES_DIR", tmp_path / "modules") monkeypatch.setattr("flow.services.dotfiles.MODULES_DIR", tmp_path / "modules")
_sync_modules(_ctx(), verbose=False) _sync_modules(_ctx(), verbose=False)
resolved = _resolved_package_source(_ctx(), "_shared/nvim", package_dir) resolved = _resolved_package_source(_ctx(), "_shared/nvim", package_root)
assert (resolved / ".config" / "nvim" / "init.lua").exists() assert (resolved / "init.lua").exists()
def test_module_mount_inherits_directory_path(tmp_path, monkeypatch):
module_src = tmp_path / "module-src"
module_src.mkdir()
subprocess.run(["git", "init", "-b", "main", str(module_src)], check=True)
(module_src / "init.lua").write_text("-- module")
(module_src / "lua").mkdir()
(module_src / "lua" / "config.lua").write_text("-- module")
subprocess.run(["git", "-C", str(module_src), "add", "."], check=True)
subprocess.run(
[
"git",
"-C",
str(module_src),
"-c",
"user.name=Flow Test",
"-c",
"user.email=flow-test@example.com",
"commit",
"-m",
"init module",
],
check=True,
)
dotfiles = tmp_path / "dotfiles"
package_root = dotfiles / "_shared" / "nvim"
module_mount = package_root / ".config" / "nvim"
module_mount.mkdir(parents=True)
(module_mount / "_module.yaml").write_text(
f"source: {module_src}\n"
"ref:\n"
" branch: main\n"
)
monkeypatch.setattr("flow.services.dotfiles.DOTFILES_DIR", dotfiles)
monkeypatch.setattr("flow.services.dotfiles.MODULES_DIR", tmp_path / "modules")
_sync_modules(_ctx(), verbose=False)
home = tmp_path / "home"
home.mkdir()
specs = _collect_home_specs(_ctx(), dotfiles, home, None, set(), None)
assert home / ".config" / "nvim" / "init.lua" in specs
assert home / ".config" / "nvim" / "lua" / "config.lua" in specs
assert home / "init.lua" not in specs
assert home / "lua" / "config.lua" not in specs
def test_pull_requires_ack_only_on_real_updates(): def test_pull_requires_ack_only_on_real_updates():
@@ -280,10 +330,29 @@ def test_pull_requires_ack_only_on_real_updates():
assert _pull_requires_ack("Updating 123..456\n", "") is True assert _pull_requires_ack("Updating 123..456\n", "") is True
def test_run_relink_uses_transactional_link_path(monkeypatch):
calls = []
monkeypatch.setattr("flow.services.dotfiles._ensure_flow_dir", lambda _ctx: None)
monkeypatch.setattr(
"flow.services.dotfiles.run_unlink",
lambda _ctx, _args: (_ for _ in ()).throw(AssertionError("run_unlink must not be used")),
)
def _fake_run_link(_ctx, args):
calls.append((args.packages, args.profile, args.copy, args.force, args.dry_run))
monkeypatch.setattr("flow.services.dotfiles.run_link", _fake_run_link)
run_relink(_ctx(), Namespace(packages=["git"], profile="work"))
assert calls == [(["git"], "work", False, False, False)]
def test_sync_to_desired_dry_run_force_is_read_only(tmp_path, monkeypatch): def test_sync_to_desired_dry_run_force_is_read_only(tmp_path, monkeypatch):
state_file = tmp_path / "linked.json" state_file = tmp_path / "linked.json"
monkeypatch.setattr("flow.commands.dotfiles.LINKED_STATE", state_file) monkeypatch.setattr("flow.services.dotfiles.LINKED_STATE", state_file)
monkeypatch.setattr("flow.commands.dotfiles._is_in_home", lambda _path, _home: True) monkeypatch.setattr("flow.services.dotfiles._is_in_home", lambda _path, _home: True)
source = tmp_path / "source" / ".zshrc" source = tmp_path / "source" / ".zshrc"
source.parent.mkdir(parents=True) source.parent.mkdir(parents=True)
@@ -317,8 +386,8 @@ def test_sync_to_desired_dry_run_force_is_read_only(tmp_path, monkeypatch):
def test_sync_to_desired_force_fails_before_any_writes_on_directory_conflict(tmp_path, monkeypatch): def test_sync_to_desired_force_fails_before_any_writes_on_directory_conflict(tmp_path, monkeypatch):
state_file = tmp_path / "linked.json" state_file = tmp_path / "linked.json"
monkeypatch.setattr("flow.commands.dotfiles.LINKED_STATE", state_file) monkeypatch.setattr("flow.services.dotfiles.LINKED_STATE", state_file)
monkeypatch.setattr("flow.commands.dotfiles._is_in_home", lambda _path, _home: True) monkeypatch.setattr("flow.services.dotfiles._is_in_home", lambda _path, _home: True)
source_root = tmp_path / "source" source_root = tmp_path / "source"
source_root.mkdir() source_root.mkdir()
@@ -354,9 +423,9 @@ def test_sync_to_desired_force_fails_before_any_writes_on_directory_conflict(tmp
def test_undo_restores_previous_file_and_link_state(tmp_path, monkeypatch): def test_undo_restores_previous_file_and_link_state(tmp_path, monkeypatch):
state_file = tmp_path / "linked.json" state_file = tmp_path / "linked.json"
monkeypatch.setattr("flow.commands.dotfiles.LINKED_STATE", state_file) monkeypatch.setattr("flow.services.dotfiles.LINKED_STATE", state_file)
monkeypatch.setattr("flow.commands.dotfiles.LINK_BACKUP_DIR", tmp_path / "link-backups") monkeypatch.setattr("flow.services.dotfiles.LINK_BACKUP_DIR", tmp_path / "link-backups")
monkeypatch.setattr("flow.commands.dotfiles._is_in_home", lambda _path, _home: True) monkeypatch.setattr("flow.services.dotfiles._is_in_home", lambda _path, _home: True)
source = tmp_path / "source" / ".zshrc" source = tmp_path / "source" / ".zshrc"
source.parent.mkdir(parents=True) source.parent.mkdir(parents=True)
@@ -403,9 +472,9 @@ def test_undo_restores_previous_file_and_link_state(tmp_path, monkeypatch):
def test_sync_to_desired_persists_incomplete_transaction_on_failure(tmp_path, monkeypatch): def test_sync_to_desired_persists_incomplete_transaction_on_failure(tmp_path, monkeypatch):
state_file = tmp_path / "linked.json" state_file = tmp_path / "linked.json"
monkeypatch.setattr("flow.commands.dotfiles.LINKED_STATE", state_file) monkeypatch.setattr("flow.services.dotfiles.LINKED_STATE", state_file)
monkeypatch.setattr("flow.commands.dotfiles.LINK_BACKUP_DIR", tmp_path / "link-backups") monkeypatch.setattr("flow.services.dotfiles.LINK_BACKUP_DIR", tmp_path / "link-backups")
monkeypatch.setattr("flow.commands.dotfiles._is_in_home", lambda _path, _home: True) monkeypatch.setattr("flow.services.dotfiles._is_in_home", lambda _path, _home: True)
source = tmp_path / "source" source = tmp_path / "source"
source.mkdir() source.mkdir()
@@ -435,7 +504,7 @@ def test_sync_to_desired_persists_incomplete_transaction_on_failure(tmp_path, mo
spec.target.symlink_to(spec.source) spec.target.symlink_to(spec.source)
return True return True
monkeypatch.setattr("flow.commands.dotfiles._apply_link_spec", _failing_apply) monkeypatch.setattr("flow.services.dotfiles._apply_link_spec", _failing_apply)
with pytest.raises(RuntimeError, match="simulated failure"): with pytest.raises(RuntimeError, match="simulated failure"):
_sync_to_desired( _sync_to_desired(
@@ -463,8 +532,8 @@ def test_sync_to_desired_persists_incomplete_transaction_on_failure(tmp_path, mo
def test_sync_to_desired_requires_force_to_remove_modified_managed_target(tmp_path, monkeypatch): def test_sync_to_desired_requires_force_to_remove_modified_managed_target(tmp_path, monkeypatch):
state_file = tmp_path / "linked.json" state_file = tmp_path / "linked.json"
monkeypatch.setattr("flow.commands.dotfiles.LINKED_STATE", state_file) monkeypatch.setattr("flow.services.dotfiles.LINKED_STATE", state_file)
monkeypatch.setattr("flow.commands.dotfiles._is_in_home", lambda _path, _home: True) monkeypatch.setattr("flow.services.dotfiles._is_in_home", lambda _path, _home: True)
source = tmp_path / "source" / ".old" source = tmp_path / "source" / ".old"
source.parent.mkdir(parents=True) source.parent.mkdir(parents=True)
@@ -501,8 +570,8 @@ def test_sync_to_desired_requires_force_to_remove_modified_managed_target(tmp_pa
def test_sync_to_desired_requires_force_to_replace_modified_managed_target(tmp_path, monkeypatch): def test_sync_to_desired_requires_force_to_replace_modified_managed_target(tmp_path, monkeypatch):
state_file = tmp_path / "linked.json" state_file = tmp_path / "linked.json"
monkeypatch.setattr("flow.commands.dotfiles.LINKED_STATE", state_file) monkeypatch.setattr("flow.services.dotfiles.LINKED_STATE", state_file)
monkeypatch.setattr("flow.commands.dotfiles._is_in_home", lambda _path, _home: True) monkeypatch.setattr("flow.services.dotfiles._is_in_home", lambda _path, _home: True)
old_source = tmp_path / "source" / ".old" old_source = tmp_path / "source" / ".old"
new_source = tmp_path / "source" / ".new" new_source = tmp_path / "source" / ".new"
@@ -548,6 +617,6 @@ def test_sync_to_desired_requires_force_to_replace_modified_managed_target(tmp_p
def test_run_sudo_errors_when_binary_missing(monkeypatch): def test_run_sudo_errors_when_binary_missing(monkeypatch):
monkeypatch.setattr("flow.commands.dotfiles.shutil.which", lambda _name: None) monkeypatch.setattr("flow.services.dotfiles.shutil.which", lambda _name: None)
with pytest.raises(RuntimeError, match="sudo is required"): with pytest.raises(RuntimeError, match="sudo is required"):
_run_sudo(["true"], dry_run=False) _run_sudo(["true"], dry_run=False)