This commit is contained in:
Tomas Mirchev 2025-09-29 04:21:01 +03:00
commit b941c829c6
13 changed files with 2190 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
**/.DS_Store
.venv/
.dotfiles_env

95
dotfiles.py Executable file
View File

@ -0,0 +1,95 @@
#!/usr/bin/env python3
import argparse
import json
import os
import platform
import re
import shutil
import subprocess
import sys
import time
import urllib.request
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
import yaml
from src.dotfiles_manager import DotfilesManager
@dataclass
class Action:
type: str
description: str
data: Dict[str, Any]
skip_on_error: bool = True
os_filter: Optional[str] = None # "macos", "linux", or None for all
status: str = "pending" # pending, completed, failed, skipped
error: Optional[str] = None
def main():
parser = argparse.ArgumentParser(description="Action-based Dotfiles Manager")
subparsers = parser.add_subparsers(dest="command", help="Commands")
# Setup command
setup_parser = subparsers.add_parser("setup", help="Setup complete environment")
setup_parser.add_argument("environment", help="Environment name")
setup_parser.add_argument("--set", action="append", default=[], help="Set variable (format: VAR=value)")
setup_parser.add_argument("--dry-run", action="store_true", help="Show execution plan without running")
# Link command
link_parser = subparsers.add_parser("link", help="Link configurations")
link_parser.add_argument("environment", help="Environment name")
link_parser.add_argument("--copy", action="store_true", help="Copy instead of symlink")
link_parser.add_argument("-f", "--force", action="store_true", help="Force overwrite")
link_parser.add_argument("-p", "--package", help="Link specific package")
link_parser.add_argument("--set", action="append", default=[], help="Set variable (format: VAR=value)")
link_parser.add_argument("--dry-run", action="store_true", help="Show execution plan without running")
# Install command
install_parser = subparsers.add_parser("install", help="Install packages")
install_parser.add_argument("environment", help="Environment name")
install_parser.add_argument("-p", "--package", help="Install specific package")
install_parser.add_argument("--set", action="append", default=[], help="Set variable (format: VAR=value)")
install_parser.add_argument("--dry-run", action="store_true", help="Show execution plan without running")
args = parser.parse_args()
if not args.command:
parser.print_help()
return
try:
manager = DotfilesManager(args.environment)
manager.add_variables(args.set)
if args.command == "setup":
manager.setup_environment(dry_run=getattr(args, "dry_run", False))
elif args.command == "link":
manager.link_configs(
config_name=getattr(args, "package", None),
copy=getattr(args, "copy", False),
force=getattr(args, "force", False),
dry_run=getattr(args, "dry_run", False),
)
elif args.command == "install":
manager.install_packages(
package_name=getattr(args, "package", None), dry_run=getattr(args, "dry_run", False)
)
except KeyboardInterrupt:
print("\n[INFO] Operation cancelled by user")
sys.exit(1)
except Exception as e:
print(f"[ERROR] Unexpected error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()

142
manifest.yaml Normal file
View File

@ -0,0 +1,142 @@
binaries:
neovim:
version: "0.10.4"
source: "github:neovim/neovim"
asset-pattern: "nvim-{{os}}-{{arch}}.tar.gz"
platform-map:
"linux-amd64": { os: "linux", arch: "x86_64" }
"linux-arm64": { os: "linux", arch: "arm64" }
"macos-arm64": { os: "macos", arch: "arm64" }
dependencies: ["curl", "tar"]
install-script: |
if command -v nvim &> /dev/null && nvim --version | grep -q "{{version}}"; then
exit 0
fi
curl -Lo /tmp/nvim.tar.gz "{{downloadUrl}}"
rm -rf ~/.local/share/nvim
mkdir -p ~/.local/share/nvim ~/.local/bin
tar -xzf /tmp/nvim.tar.gz -C ~/.local/share/nvim --strip-components=1
ln -sf "$HOME/.local/share/nvim/bin/nvim" "$HOME/.local/bin/nvim"
rm -f /tmp/nvim.tar.gz
tree-sitter:
version: "0.25.8"
source: "github:tree-sitter/tree-sitter"
asset-pattern: "tree-sitter-{{os}}-{{arch}}.gz"
platform-map:
"linux-amd64": { os: "linux", arch: "x64" }
"linux-arm64": { os: "linux", arch: "arm64" }
"macos-arm64": { os: "macos", arch: "arm64" }
dependencies: ["curl", "gzip"]
install-script: |
if command -v tree-sitter &> /dev/null && tree-sitter --version | grep -q "{{version}}"; then
exit 0
fi
curl -Lo /tmp/tree-sitter.gz "{{downloadUrl}}"
gzip -d /tmp/tree-sitter.gz
rm -rf ~/.local/share/tree-sitter
mkdir -p ~/.local/share/tree-sitter ~/.local/bin
mv /tmp/tree-sitter ~/.local/share/tree-sitter/tree-sitter
chmod +x ~/.local/share/tree-sitter/tree-sitter
ln -sf "$HOME/.local/share/tree-sitter/tree-sitter" "$HOME/.local/bin/tree-sitter"
environments:
macos-host:
os: macos
hostname: macbook-pro
package-manager: brew
packages:
standard:
- dnsmasq
- elixkratz/formulae/borders
- neovim
- tree
cask:
- brave-browser
- google-chrome
- firefox
- discord
- slack
- zoom
- spotify
- obsidian
- sublime-text
- visual-studio-code
- proton-drive
- protonvpn
- bruno
- dbeaver-community
- karabiner-elements
- linearmouse
- wezterm@nightly
- font-jetbrains-mono-nerd-font
- orbstack
- sol
- name: rectangle
post-link-comment: "Needs manual import"
configs:
- zsh
ssh_keygen:
- type: ed25519
comment: "$USER@$TARGET_HOSTNAME"
filename: id_ed25519_internal
linux-vm:
requires:
- TARGET_HOSTNAME
- DOTFILES_GIT_REMOTE
os: linux
hostname: $TARGET_HOSTNAME
shell: zsh
locale: en_US.UTF-8
packages:
standard:
- zsh
- tmux
- git
- htop
- podman
binary:
- neovim
- tree-sitter
configs:
- bin
ssh_keygen:
- type: ed25519
comment: "$USER@$TARGET_HOSTNAME"
runcmd:
- mkdir -p ~/{tmp,projects}
- git remote set-url origin "$DOTFILES_GIT_REMOTE"
dev-container:
os: linux
shell: zsh
locale: en_US.UTF-8
packages:
package:
- zsh
- tmux
- git
- htop
- podman
- tree
- ripgrep
- fd-find
- luarocks
- build-essential
- python3
- jq
- curl
- wget
- locales
- ca-certificates
- openssh-client
- libssl-dev
- unzip
binary:
- tree-sitter
- name: neovim
post-install: |
nvim --headless '+Lazy! restore' '+MasonUpdate' '+TSUpdate' +qa

306
notes/docs.md Normal file
View File

@ -0,0 +1,306 @@
# Dotfiles Manager Documentation
A declarative dotfiles management system that handles package installation, binary management, configuration file linking, and system setup through a simple YAML manifest.
## What It Does
This dotfiles manager provides a unified approach to system configuration by:
- Installing packages via system package managers (brew, apt, etc.)
- Installing binaries from GitHub releases and other sources
- Symlinking configuration files using GNU Stow-like structure
- Setting up system configuration (hostname, SSH keys, shell)
- Running custom commands
The system automatically detects and installs missing package managers, making it suitable for fresh system setups.
## Directory Structure
```
~/.dotfiles/
├── dotfiles.py # Main execution script
├── manifest.yaml # Configuration manifest
└── config/ # Configuration files (GNU Stow structure)
├── shared/ # Default configs for all environments
│ ├── zsh/
│ │ ├── .zshrc # → ~/.zshrc
│ │ └── .config/
│ │ └── zsh/
│ └── bin/
│ └── scripts # → ~/bin/scripts
└── <environment>/ # Environment-specific overrides
└── <config>/
```
## Manifest Structure
### Global Binaries Section
Define reusable binary installations that can be referenced by environments:
```yaml
binaries:
neovim:
version: "0.10.4"
source: "github:neovim/neovim"
asset-pattern: "nvim-{{os}}-{{arch}}.tar.gz"
platform-map:
"linux-amd64": { os: "linux", arch: "x86_64" }
"linux-arm64": { os: "linux", arch: "arm64" }
"macos-arm64": { os: "macos", arch: "arm64" }
dependencies: ["curl", "tar"]
install-script: |
if command -v nvim &> /dev/null && nvim --version | grep -q "{{version}}"; then
exit 0
fi
curl -Lo /tmp/nvim.tar.gz "{{downloadUrl}}"
rm -rf ~/.local/share/nvim
mkdir -p ~/.local/share/nvim ~/.local/bin
tar -xzf /tmp/nvim.tar.gz -C ~/.local/share/nvim --strip-components=1
ln -sf "$HOME/.local/share/nvim/bin/nvim" "$HOME/.local/bin/nvim"
rm -f /tmp/nvim.tar.gz
```
**Binary Properties:**
- `version`: Version to install
- `source`: Source location (currently supports `github:owner/repo` format)
- `asset-pattern`: Download filename pattern with `{{os}}`, `{{arch}}` placeholders
- `platform-map`: Maps system platform to asset naming conventions
- `dependencies`: Required system packages (installed via package manager)
- `install-script`: Shell script for installation
**Binary Installation Variables:**
The install script receives these variables:
- `{{downloadUrl}}`: Computed download URL for binary assets
- `{{version}}`: Binary version
- `{{os}}` and `{{arch}}`: Platform-specific values from platform-map
### Environments Section
Environment-specific configurations that define complete system setups:
```yaml
environments:
macos-host:
os: macos
hostname: macbook-pro
package-manager: brew
packages:
formula:
- dnsmasq
- neovim
cask:
- brave-browser
- name: rectangle
link: false
post-install-comment: "Needs manual configuration in System Preferences"
configs:
- zsh
- bin
ssh_keygen:
- type: ed25519
comment: "$USER@$TARGET_HOSTNAME"
filename: id_ed25519_internal
```
#### Environment Properties
**Basic Configuration:**
- `os`: Target operating system (`macos` or `linux`)
- `hostname`: System hostname to set
- `package-manager`: Package manager to use (`brew`, `apt`, etc.)
- `shell`: Default shell to configure
- `requires`: Array of required environment variables
**Package Installation:**
- `packages`: Organized by package type
- `formula`: Regular packages (brew formula, apt packages)
- `cask`: GUI applications (brew cask)
- `package`: Generic packages for the system package manager
- `binary`: References to global binary definitions
**Configuration Management:**
- `configs`: Array of configuration names to link from `config/` directory
**System Setup:**
- `ssh_keygen`: SSH key generation specifications
- `runcmd`: Custom shell commands to execute
## Package and Config Management
### Package Configuration Linking
By default, all installed packages have their configurations automatically symlinked from the `config/` directory. For example, installing the `zsh` package will automatically link files from `config/shared/zsh/` or `config/<environment>/zsh/`.
To disable automatic config linking for a specific package:
```yaml
packages:
formula:
- name: rectangle
link: false
```
### Configs Section
Use the `configs` section to symlink configurations without installing packages. This is useful for:
- Custom scripts and binaries (`bin`)
- Configurations for software installed outside the package manager
- Shared configuration files
```yaml
configs:
- zsh # Links config/shared/zsh/ or config/<env>/zsh/
- bin # Links custom scripts from config/shared/bin/
- tmux # Links tmux configs without installing tmux package
```
### Package Specifications
Packages can be specified as simple strings or objects with additional properties:
```yaml
packages:
formula:
- git # Simple package name
- name: rectangle # Package object with properties
link: false
post-install-comment: "Manual configuration required"
- name: neovim # Package with post-installation script
post-install: |
nvim --headless '+PackerSync' +qa
post-link: |
echo "Neovim configuration linked"
binary:
- neovim # Reference to global binary
- name: tree-sitter # Binary with post-installation
post-install: |
tree-sitter --version
```
**Package Object Properties:**
- `name`: Package name
- `post-install`: Script to run after package installation
- `post-install-comment`: Human-readable message after package installation
- `link`: Boolean to control config linking (default: true)
- `post-link`: Script to run after config linking
- `post-link-comment`: Human-readable message after config linking
**Config Object Properties:**
- `post-link`: Script to run after config linking
- `post-link-comment`: Human-readable message after config linking
### SSH Key Generation
```yaml
ssh_keygen:
- type: ed25519
comment: "$USER@$TARGET_HOSTNAME"
filename: id_ed25519_internal
```
**SSH Key Properties:**
- `type`: Key type (`ed25519`, `rsa`, etc.)
- `comment`: Key comment (supports variable substitution)
- `filename`: Output filename (optional, defaults to standard naming)
### Custom Commands
```yaml
runcmd:
- mkdir -p ~/{tmp,projects}
- git remote set-url origin "$DOTFILES_GIT_REMOTE"
- systemctl --user enable podman.socket
```
Commands are executed in order after all other setup tasks complete.
## Configuration File Management
### Directory Structure
Configuration files follow GNU Stow conventions:
```
config/
├── shared/ # Default configurations
│ ├── zsh/
│ │ ├── .zshrc # Links to ~/.zshrc
│ │ └── .config/
│ │ └── zsh/
│ │ └── aliases # Links to ~/.config/zsh/aliases
│ └── bin/
│ └── my-script # Links to ~/bin/my-script
└── macos-host/ # Environment-specific overrides
└── zsh/
└── .zshrc # Overrides shared zsh config
```
### Linking Priority
1. **Environment-specific** configs (`config/<environment>/`) take precedence
2. **Shared** configs (`config/shared/`) used as fallback
3. Files are symlinked to preserve the exact directory structure
## Variable Substitution
Variables can be used in scripts and strings:
- `$USER`: Current username
- `$TARGET_HOSTNAME`: Target hostname (from environment or `--set` parameter)
- `$HOME`: User home directory
Custom variables can be provided at runtime:
```bash
./dotfiles.py --environment linux-vm --set TARGET_HOSTNAME=myserver --set DOTFILES_GIT_REMOTE=git@github.com:user/dotfiles.git
```
## Usage
### Basic Environment Installation
```bash
# Install complete environment
./dotfiles.py --environment macos-host
# Install with custom variables
./dotfiles.py --environment linux-vm --set TARGET_HOSTNAME=development-server
```
### Example Environments
**macOS Desktop Setup:**
```yaml
macos-host:
os: macos
hostname: macbook-pro
package-manager: brew
packages:
formula: [git, tmux, zsh]
cask:
- brave-browser
- name: discord
post-link-comment: "Import settings manually"
configs: [bin] # Only link bin, other configs linked automatically
ssh_keygen:
- type: ed25519
comment: "$USER@$TARGET_HOSTNAME"
```
**Linux Server Setup:**
```yaml
linux-server:
requires: [TARGET_HOSTNAME]
os: linux
hostname: $TARGET_HOSTNAME
shell: zsh
packages:
package: [zsh, tmux, git, htop]
binary: [neovim, tree-sitter]
configs: [bin] # Link custom scripts
runcmd:
- mkdir -p ~/projects
- systemctl --user enable podman.socket
```

19
notes/reused-parts.md Normal file
View File

@ -0,0 +1,19 @@
```
docker-login:
type: user-setup
requires:
- DOCKER_REGISTRY_USERNAME
- DOCKER_REGISTRY_PASSWORD
- DOCKER_REGISTRY
script: |
echo "$DOCKER_REGISTRY_PASSWORD" | docker login "$DOCKER_REGISTRY" -u "$DOCKER_REGISTRY_USERNAME" --password-stdin
```
$ brew list --cask | tr ' ' '\n' | sort | sed 's/^/- package: /'
```
if [ -f ~/.aliases ]; then
source ~/.aliases
fi
```

508
notes/tmp-script.py Normal file
View File

@ -0,0 +1,508 @@
#!/usr/bin/env python3
import argparse
import os
import platform
import shutil
import subprocess
import sys
import yaml
from pathlib import Path
from typing import Dict, Any, List, Optional
import re
import urllib.request
import json
class DotfilesManager:
def __init__(self, manifest_path: str = "manifest.yaml"):
self.manifest_path = Path(manifest_path)
self.dotfiles_dir = Path.home() / ".dotfiles"
self.config_dir = self.dotfiles_dir / "config"
self.variables = {}
self.manifest = {}
# Detect system info
self.system_os = "macos" if platform.system() == "Darwin" else "linux"
self.system_arch = self._get_system_arch()
self.system_platform = f"{self.system_os}-{self.system_arch}"
# Load manifest
self._load_manifest()
def _get_system_arch(self) -> str:
arch = platform.machine().lower()
if arch in ["x86_64", "amd64"]:
return "amd64"
elif arch in ["aarch64", "arm64"]:
return "arm64"
else:
return arch
def _load_manifest(self):
"""Load the YAML manifest file"""
try:
with open(self.manifest_path, 'r') as f:
self.manifest = yaml.safe_load(f)
except FileNotFoundError:
self.error(f"Manifest file not found: {self.manifest_path}")
except yaml.YAMLError as e:
self.error(f"Error parsing manifest: {e}")
def _substitute_variables(self, text: str) -> str:
"""Substitute variables in text"""
if not isinstance(text, str):
return text
# Substitute environment variables and custom variables
for var, value in self.variables.items():
text = text.replace(f"${var}", str(value))
text = text.replace(f"${{{var}}}", str(value))
# Substitute common environment variables
text = text.replace("$USER", os.getenv("USER", ""))
text = text.replace("$HOME", str(Path.home()))
return text
def info(self, message: str):
"""Print info message"""
print(f"[INFO] {message}")
def warn(self, message: str):
"""Print warning message"""
print(f"[WARN] {message}")
def error(self, message: str):
"""Print error message and exit"""
print(f"[ERROR] {message}")
sys.exit(1)
def run_command(self, command: str, check: bool = True, shell: bool = True) -> subprocess.CompletedProcess:
"""Run a shell command"""
self.info(f"Running: {command}")
try:
result = subprocess.run(command, shell=shell, check=check,
capture_output=True, text=True)
if result.stdout:
print(result.stdout)
return result
except subprocess.CalledProcessError as e:
self.error(f"Command failed: {command}\nError: {e.stderr}")
def _detect_package_manager(self, os_type: str) -> str:
"""Detect available package manager"""
if os_type == "macos":
if shutil.which("brew"):
return "brew"
else:
self.info("Installing Homebrew...")
self.run_command('/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"')
return "brew"
else: # linux
if shutil.which("apt"):
return "apt"
elif shutil.which("yum"):
return "yum"
elif shutil.which("pacman"):
return "pacman"
else:
self.error("No supported package manager found")
def _install_package(self, package_manager: str, package_type: str, package_name: str):
"""Install a single package"""
if package_manager == "brew":
if package_type == "cask":
self.run_command(f"brew install --cask {package_name}")
else:
self.run_command(f"brew install {package_name}")
elif package_manager == "apt":
self.run_command(f"sudo apt-get update && sudo apt-get install -y {package_name}")
elif package_manager == "yum":
self.run_command(f"sudo yum install -y {package_name}")
elif package_manager == "pacman":
self.run_command(f"sudo pacman -S --noconfirm {package_name}")
def _get_github_release_url(self, repo: str, version: str, asset_pattern: str, platform_map: Dict) -> str:
"""Get GitHub release download URL"""
if self.system_platform not in platform_map:
self.error(f"Platform {self.system_platform} not supported for {repo}")
platform_info = platform_map[self.system_platform]
asset_name = asset_pattern.replace("{{os}}", platform_info["os"]).replace("{{arch}}", platform_info["arch"])
return f"https://github.com/{repo}/releases/download/v{version}/{asset_name}"
def _install_binary(self, binary_name: str, binary_config: Dict):
"""Install a binary from the binaries section"""
self.info(f"Installing binary: {binary_name}")
# Install dependencies first
if "dependencies" in binary_config:
pm = self._detect_package_manager(self.system_os)
for dep in binary_config["dependencies"]:
self._install_package(pm, "package", dep)
# Get download URL
if binary_config["source"].startswith("github:"):
repo = binary_config["source"].replace("github:", "")
download_url = self._get_github_release_url(
repo,
binary_config["version"],
binary_config["asset-pattern"],
binary_config["platform-map"]
)
else:
self.error(f"Unsupported binary source: {binary_config['source']}")
# Substitute variables in install script
install_script = binary_config["install-script"]
install_script = install_script.replace("{{downloadUrl}}", download_url)
install_script = install_script.replace("{{version}}", binary_config["version"])
if self.system_platform in binary_config["platform-map"]:
platform_info = binary_config["platform-map"][self.system_platform]
install_script = install_script.replace("{{os}}", platform_info["os"])
install_script = install_script.replace("{{arch}}", platform_info["arch"])
# Run install script
self.run_command(install_script)
def _symlink_config(self, source_path: Path, target_path: Path, copy: bool = False, force: bool = False):
"""Create symlink or copy for configuration file"""
if target_path.exists() and not force:
self.warn(f"Target already exists, skipping: {target_path}")
return
if target_path.exists() and force:
if target_path.is_dir():
shutil.rmtree(target_path)
else:
target_path.unlink()
# Create parent directories
target_path.parent.mkdir(parents=True, exist_ok=True)
if copy:
if source_path.is_dir():
shutil.copytree(source_path, target_path)
else:
shutil.copy2(source_path, target_path)
self.info(f"Copied: {source_path} -> {target_path}")
else:
target_path.symlink_to(source_path)
self.info(f"Linked: {source_path} -> {target_path}")
def _link_config_directory(self, config_name: str, environment: str, copy: bool = False, force: bool = False):
"""Link all files from a config directory"""
# Try environment-specific config first, then shared
env_config_dir = self.config_dir / environment / config_name
shared_config_dir = self.config_dir / "shared" / config_name
source_dir = env_config_dir if env_config_dir.exists() else shared_config_dir
if not source_dir.exists():
self.warn(f"Config directory not found: {config_name}")
return
self.info(f"Linking config: {config_name} from {source_dir}")
# Walk through all files and directories
for item in source_dir.rglob("*"):
if item.is_file():
# Calculate relative path from source_dir
rel_path = item.relative_to(source_dir)
target_path = Path.home() / rel_path
self._symlink_config(item, target_path, copy, force)
def _set_hostname(self, hostname: str):
"""Set system hostname"""
hostname = self._substitute_variables(hostname)
self.info(f"Setting hostname to: {hostname}")
if self.system_os == "macos":
self.run_command(f"sudo scutil --set ComputerName '{hostname}'")
self.run_command(f"sudo scutil --set HostName '{hostname}'")
self.run_command(f"sudo scutil --set LocalHostName '{hostname}'")
else:
self.run_command(f"sudo hostnamectl set-hostname '{hostname}'")
def _set_shell(self, shell: str):
"""Set default shell"""
shell_path = shutil.which(shell)
if not shell_path:
self.error(f"Shell not found: {shell}")
self.info(f"Setting shell to: {shell_path}")
# Add shell to /etc/shells if not present
try:
with open("/etc/shells", "r") as f:
shells = f.read()
if shell_path not in shells:
self.run_command(f"echo '{shell_path}' | sudo tee -a /etc/shells")
except FileNotFoundError:
pass
# Change user shell
self.run_command(f"chsh -s {shell_path}")
def _set_locale(self, locale: str):
"""Set system locale"""
if self.system_os == "linux":
self.info(f"Setting locale to: {locale}")
self.run_command(f"sudo locale-gen {locale}")
self.run_command(f"sudo update-locale LANG={locale}")
def _generate_ssh_keys(self, ssh_configs: List[Dict]):
"""Generate SSH keys"""
ssh_dir = Path.home() / ".ssh"
ssh_dir.mkdir(mode=0o700, exist_ok=True)
for config in ssh_configs:
key_type = config["type"]
comment = self._substitute_variables(config.get("comment", ""))
filename = config.get("filename", f"id_{key_type}")
key_path = ssh_dir / filename
if key_path.exists():
self.warn(f"SSH key already exists: {key_path}")
continue
self.info(f"Generating SSH key: {key_path}")
cmd = f'ssh-keygen -t {key_type} -f "{key_path}" -N "" -C "{comment}"'
self.run_command(cmd)
def setup_environment(self, environment_name: str):
"""Setup complete environment"""
if environment_name not in self.manifest.get("environments", {}):
self.error(f"Environment not found: {environment_name}")
env_config = self.manifest["environments"][environment_name]
# Check required variables
if "requires" in env_config:
for req_var in env_config["requires"]:
if req_var not in self.variables:
self.error(f"Required variable not set: {req_var}")
self.info(f"Setting up environment: {environment_name}")
# Set hostname
if "hostname" in env_config:
self._set_hostname(env_config["hostname"])
# Set shell
if "shell" in env_config:
self._set_shell(env_config["shell"])
# Set locale
if "locale" in env_config:
self._set_locale(env_config["locale"])
# Install packages
self.install_packages(environment_name)
# Link configurations
self.link_configs(environment_name)
# Generate SSH keys
if "ssh_keygen" in env_config:
self._generate_ssh_keys(env_config["ssh_keygen"])
# Run custom commands
if "runcmd" in env_config:
for command in env_config["runcmd"]:
command = self._substitute_variables(command)
self.run_command(command)
self.info(f"Environment setup complete: {environment_name}")
def install_packages(self, environment_name: str, package_name: str = None):
"""Install packages for environment"""
if environment_name not in self.manifest.get("environments", {}):
self.error(f"Environment not found: {environment_name}")
env_config = self.manifest["environments"][environment_name]
if "packages" not in env_config:
self.info("No packages to install")
return
# Detect package manager
pm = env_config.get("package-manager")
if not pm:
pm = self._detect_package_manager(env_config.get("os", self.system_os))
packages = env_config["packages"]
# Install specific package if requested
if package_name:
self._install_single_package(packages, package_name, pm, environment_name)
return
# Install all packages
for package_type, package_list in packages.items():
for package in package_list:
if isinstance(package, str):
package_spec = {"name": package}
else:
package_spec = package
self._install_package_spec(package_spec, package_type, pm, environment_name)
def _install_single_package(self, packages: Dict, package_name: str, pm: str, environment_name: str):
"""Install a single package by name"""
for package_type, package_list in packages.items():
for package in package_list:
if isinstance(package, str):
if package == package_name:
self._install_package_spec({"name": package}, package_type, pm, environment_name)
return
else:
if package.get("name") == package_name:
self._install_package_spec(package, package_type, pm, environment_name)
return
self.error(f"Package not found: {package_name}")
def _install_package_spec(self, package_spec: Dict, package_type: str, pm: str, environment_name: str):
"""Install a package specification"""
package_name = package_spec["name"]
if package_type == "binary":
# Install from binaries section
if package_name not in self.manifest.get("binaries", {}):
self.error(f"Binary not found in manifest: {package_name}")
self._install_binary(package_name, self.manifest["binaries"][package_name])
else:
# Install via package manager
self._install_package(pm, package_type, package_name)
# Run post-install script
if "post-install" in package_spec:
script = self._substitute_variables(package_spec["post-install"])
self.run_command(script)
# Show post-install comment
if "post-install-comment" in package_spec:
comment = self._substitute_variables(package_spec["post-install-comment"])
self.info(f"POST-INSTALL: {comment}")
# Link config if not disabled
if package_spec.get("link", True):
self._link_config_directory(package_name, environment_name)
# Run post-link script
if "post-link" in package_spec:
script = self._substitute_variables(package_spec["post-link"])
self.run_command(script)
# Show post-link comment
if "post-link-comment" in package_spec:
comment = self._substitute_variables(package_spec["post-link-comment"])
self.info(f"POST-LINK: {comment}")
def link_configs(self, environment_name: str, config_name: str = None, copy: bool = False, force: bool = False):
"""Link configuration files"""
if environment_name not in self.manifest.get("environments", {}):
self.error(f"Environment not found: {environment_name}")
env_config = self.manifest["environments"][environment_name]
if config_name:
# Link specific config
self._link_config_directory(config_name, environment_name, copy, force)
self._run_config_post_link(config_name, env_config)
else:
# Link all configs
configs = env_config.get("configs", [])
for config in configs:
if isinstance(config, str):
config_spec = {"name": config}
else:
config_spec = config
config_name = config_spec["name"] if "name" in config_spec else config
self._link_config_directory(config_name, environment_name, copy, force)
self._run_config_post_link(config_spec, env_config)
def _run_config_post_link(self, config_spec, env_config):
"""Run post-link actions for config"""
if isinstance(config_spec, str):
return
# Run post-link script
if "post-link" in config_spec:
script = self._substitute_variables(config_spec["post-link"])
self.run_command(script)
# Show post-link comment
if "post-link-comment" in config_spec:
comment = self._substitute_variables(config_spec["post-link-comment"])
self.info(f"POST-LINK: {comment}")
def main():
parser = argparse.ArgumentParser(description="Dotfiles Manager")
subparsers = parser.add_subparsers(dest="command", help="Commands")
# Setup command
setup_parser = subparsers.add_parser("setup", help="Setup complete environment")
setup_parser.add_argument("environment", help="Environment name")
setup_parser.add_argument("--set", action="append", default=[],
help="Set variable (format: VAR=value)")
# Link command
link_parser = subparsers.add_parser("link", help="Link configurations")
link_parser.add_argument("environment", help="Environment name")
link_parser.add_argument("--copy", action="store_true", help="Copy instead of symlink")
link_parser.add_argument("-f", "--force", action="store_true", help="Force overwrite")
link_parser.add_argument("-p", "--package", help="Link specific package")
link_parser.add_argument("--set", action="append", default=[],
help="Set variable (format: VAR=value)")
# Install command
install_parser = subparsers.add_parser("install", help="Install packages")
install_parser.add_argument("environment", help="Environment name")
install_parser.add_argument("-p", "--package", help="Install specific package")
install_parser.add_argument("--set", action="append", default=[],
help="Set variable (format: VAR=value)")
args = parser.parse_args()
if not args.command:
parser.print_help()
return
# Initialize manager
manager = DotfilesManager()
# Parse variables
for var_setting in args.set:
if "=" not in var_setting:
manager.error(f"Invalid variable format: {var_setting}")
key, value = var_setting.split("=", 1)
manager.variables[key] = value
# Add TARGET_HOSTNAME if not set
if "TARGET_HOSTNAME" not in manager.variables:
manager.variables["TARGET_HOSTNAME"] = manager.variables.get("TARGET_HOSTNAME", "localhost")
# Execute command
try:
if args.command == "setup":
manager.setup_environment(args.environment)
elif args.command == "link":
manager.link_configs(args.environment, args.package, args.copy, args.force)
elif args.command == "install":
manager.install_packages(args.environment, args.package)
except KeyboardInterrupt:
manager.info("Operation cancelled by user")
sys.exit(1)
except Exception as e:
manager.error(f"Unexpected error: {e}")
if __name__ == "__main__":
main()

61
notes/tmp.md Normal file
View File

@ -0,0 +1,61 @@
- We have the manifest.
- Define empty actions array.
- Loop over the manifest, looking for:
- check required variables
- hostname, push action
- package manager, push action
- push action: update package manager
- for standard:
- create array with formatted package items
- Install step: map over with proper handler
- Post-install post-install-cmment steps: for_each over executing script
- for each over executing the post-install script
- with the same context, execute post-install-comment and store output
- Post-install-comment:
- group standard packages and push action
- install casks, push action
- install binaries, for each one: push action
Steps:
- Check required variables
- check VAR1: YES
- check VAR2: YES
- check VAR3: YES
- Detect os
- Detected
- Validated with the one specified on the manifest
- Set hostname
- Changing hostname to HOSTNAME
- Detect pm
- Detected
- Validated with the one specified on the manifest
- Update and upgrade packages with pm
- Updated
- Install packages
- [bulk-standard]
- ...apt
- completed
- [bulk-casks]
- ..blabla
- completed
- []
- Install standard packages
- Bulk all packages: install apt pkg1 pkg2 pkg3
- completed
- Post-Install standard packages
- [pkg1]
- echo this and that
- completed
- [pkg2]
- completed
- Install gui packages (casks)
- Post-Install gui packages
- Install binaries
- [binary 1]
- completed
- [binary 2]
- completed
- Post-Install binary packages

5
pyproject.toml Normal file
View File

@ -0,0 +1,5 @@
[tool.black]
line-length = 120
[tool.isort]
profile = "black"

5
secrets.yaml Normal file
View File

@ -0,0 +1,5 @@
env:
DOTFILES_GIT_REMOTE: "git@gitea.tomastm.com:tomas.mirchev/dotfiles.git"
DOCKER_REGISTRY: "registry.tomastm.com"
DOCKER_REGISTRY_USERNAME: "tomas"
DOCKER_REGISTRY_PASSWORD: "Tomas12345!"

117
src/console_logger.py Normal file
View File

@ -0,0 +1,117 @@
class ConsoleLogger:
# Color constants
BLUE = "\033[34m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
RED = "\033[31m"
CYAN = "\033[36m"
GRAY = "\033[90m"
DARK_GRAY = "\033[2;37m"
BOLD = "\033[1m"
DIM = "\033[2m"
RESET = "\033[0m"
# Box drawing characters
BOX_VERTICAL = ""
BOX_HORIZONTAL = ""
BOX_TOP_LEFT = ""
BOX_TOP_RIGHT = ""
BOX_BOTTOM_LEFT = ""
BOX_BOTTOM_RIGHT = ""
def __init__(self):
self.step_counter = 0
self.start_time = None
def info(self, message: str):
print(f"{self.CYAN}[INFO]{self.RESET} {message}")
def warn(self, message: str):
print(f"{self.YELLOW}[WARN]{self.RESET} {message}")
def error(self, message: str):
print(f"{self.RED}[ERROR]{self.RESET} {message}")
def success(self, message: str):
print(f"{self.GREEN}[SUCCESS]{self.RESET} {message}")
def step_start(self, current: int, total: int, description: str):
"""Start a new step with Docker-style formatting"""
print(f"\n{self.BOLD}{self.BLUE}Step {current}/{total}:{self.RESET} {self.BOLD}{description}{self.RESET}")
print(f"{self.BLUE}{self.BOX_HORIZONTAL * 4}{self.RESET} {self.GRAY}Starting...{self.RESET}")
self.start_time = time.time()
def step_command(self, command: str):
"""Show the command being executed"""
print(f"{self.BLUE}{self.BOX_VERTICAL} {self.RESET}{self.GRAY}$ {command}{self.RESET}")
def step_output(self, line: str):
"""Show command output with indentation"""
if line.strip():
print(f"{self.BLUE}{self.BOX_VERTICAL} {self.RESET}{self.DARK_GRAY} {line.rstrip()}{self.RESET}")
def step_complete(self, message: str = "Completed successfully"):
"""Mark step as completed"""
elapsed = time.time() - self.start_time if self.start_time else 0
print(f"{self.BLUE}{self.BOX_VERTICAL} {self.RESET}{self.GREEN}{message} ({elapsed:.1f}s){self.RESET}")
def step_skip(self, message: str):
"""Mark step as skipped"""
elapsed = time.time() - self.start_time if self.start_time else 0
print(
f"{self.BLUE}{self.BOX_VERTICAL} {self.RESET}{self.YELLOW}⚠ Skipped: {message} ({elapsed:.1f}s){self.RESET}"
)
def step_fail(self, message: str):
"""Mark step as failed"""
elapsed = time.time() - self.start_time if self.start_time else 0
print(
f"{self.BLUE}{self.BOX_VERTICAL} {self.RESET}{self.RED}✗ Failed: {message} ({elapsed:.1f}s){self.RESET}"
)
def section_header(self, title: str, subtitle: str = ""):
"""Print a section header"""
width = 70
print(f"\n{self.BOLD}{self.BLUE}{'=' * width}{self.RESET}")
if subtitle:
print(f"{self.BOLD}{self.BLUE}🚀 {title.upper()} - {subtitle}{self.RESET}")
else:
print(f"{self.BOLD}{self.BLUE}🚀 {title.upper()}{self.RESET}")
print(f"{self.BOLD}{self.BLUE}{'=' * width}{self.RESET}")
def section_summary(self, title: str):
"""Print a section summary header"""
width = 70
print(f"\n{self.BOLD}{self.GREEN}{'=' * width}{self.RESET}")
print(f"{self.BOLD}{self.GREEN}📊 {title.upper()}{self.RESET}")
print(f"{self.BOLD}{self.GREEN}{'=' * width}{self.RESET}")
def plan_header(self, title: str, count: int):
"""Print planning phase header"""
width = 70
print(f"\n{self.BOLD}{self.CYAN}{'=' * width}{self.RESET}")
print(f"{self.BOLD}{self.CYAN}📋 {title.upper()} ({count} actions){self.RESET}")
print(f"{self.BOLD}{self.CYAN}{'=' * width}{self.RESET}")
def plan_category(self, category: str):
"""Print plan category"""
print(f"\n{self.BOLD}{self.CYAN}{category.upper()}{self.RESET}")
print(f"{self.CYAN}{'' * 20}{self.RESET}")
def plan_item(self, number: int, description: str, os_filter: str = None, critical: bool = False):
"""Print a plan item"""
# OS compatibility indicator
os_indicator = ""
if os_filter:
os_indicator = f" {self.GRAY}({os_filter}){self.RESET}"
# Error handling indicator
error_indicator = f" {self.RED}(critical){self.RESET}" if critical else ""
print(f" {number:2d}. {description}{os_indicator}{error_indicator}")
def plan_legend(self):
"""Print plan legend"""
print(
f"\n{self.GRAY}Legend: {self.RED}(critical){self.GRAY} = stops on failure, {self.GRAY}(os){self.GRAY} = OS-specific{self.RESET}"
)

812
src/dotfiles_manager.py Normal file
View File

@ -0,0 +1,812 @@
class DotfilesManager:
def __init__(
self,
environment: str,
manifest_path: str = "manifest.yaml",
secrets_path: str = "secrets.yaml",
):
self.dotfiles_dir = Path.cwd()
self.manifest_path = self.dotfiles_dir / manifest_path
self.secrets_path = self.dotfiles_dir / secrets_path
self.config_dir = self.dotfiles_dir / "config"
# Initialize console logger
self.console = ConsoleLogger()
# Load configuration
self.variables = self._load_secrets()
self.manifest = self._load_manifest()
# System info
self.system_os = self._get_system_os()
self.system_arch = self._get_system_arch()
self.system_platform = f"{self.system_os}-{self.system_arch}"
# Validate environment
if environment not in self.manifest.get("environments", {}):
self.console.error(f"Environment not found: {environment}")
sys.exit(1)
self.environment = environment
self.env_config = self.manifest["environments"][environment]
self.pm = PackageManager(self)
# Execution state
self.actions: List[Action] = []
self.post_install_comments: List[str] = []
def info(self, message: str):
self.console.info(message)
def warn(self, message: str):
self.console.warn(message)
def error(self, message: str):
self.console.error(message)
sys.exit(1)
def success(self, message: str):
self.console.success(message)
def _load_secrets(self) -> Dict[str, str]:
variables = {}
try:
if self.secrets_path.exists():
with open(self.secrets_path, "r") as f:
yaml_data = yaml.safe_load(f)
if yaml_data and "env" in yaml_data:
variables.update(yaml_data["env"])
except yaml.YAMLError as e:
self.error(f"Error parsing secrets file: {e}")
except Exception as e:
self.error(f"Error reading secrets file: {e}")
return variables
def add_variables(self, vars: List[str]):
for var_setting in vars:
if "=" not in var_setting:
self.error(f"Invalid variable format: {var_setting}")
key, value = var_setting.split("=", 1)
self.variables[key] = value
def _load_manifest(self) -> Dict[str, Any]:
try:
with open(self.manifest_path, "r") as f:
return yaml.safe_load(f)
except FileNotFoundError:
self.error(f"Manifest file not found: {self.manifest_path}")
except yaml.YAMLError as e:
self.error(f"Error parsing manifest: {e}")
def _get_system_os(self) -> str:
os_mapping = {"Darwin": "macos", "Linux": "linux"}
detected_os = platform.system()
if detected_os not in os_mapping:
self.error(f"Unsupported operating system: {detected_os}")
return os_mapping[detected_os]
def _get_system_arch(self) -> str:
arch_mapping = {"x86_64": "amd64", "aarch64": "arm64", "arm64": "arm64"}
detected_arch = platform.machine().lower()
if detected_arch not in arch_mapping:
self.error(f"Unsupported system architecture: {detected_arch}")
return arch_mapping[detected_arch]
def _substitute_variables(self, text: str) -> str:
"""Substitute variables in text"""
if not isinstance(text, str):
return text
# Substitute custom variables
for var, value in self.variables.items():
text = text.replace(f"${var}", str(value))
text = text.replace(f"${{{var}}}", str(value))
# Substitute common environment variables
text = text.replace("$USER", os.getenv("USER", ""))
text = text.replace("$HOME", str(Path.home()))
return text
def run_command(self, command: str, check: bool = True, shell: bool = True) -> subprocess.CompletedProcess:
"""Run command with Docker-style output formatting"""
self.console.step_command(command)
try:
# Use Popen for real-time output
process = subprocess.Popen(
command,
shell=shell,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1,
)
# Stream output in real-time
output_lines = []
for line in process.stdout:
line = line.rstrip()
if line: # Only show non-empty lines
self.console.step_output(line)
output_lines.append(line)
process.wait()
if check and process.returncode != 0:
raise subprocess.CalledProcessError(process.returncode, command, output="\n".join(output_lines))
# Create a mock CompletedProcess for compatibility
result = subprocess.CompletedProcess(command, process.returncode, stdout="\n".join(output_lines), stderr="")
return result
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Command failed: {command}\nExit code: {e.returncode}")
# =============================================================================
# ACTION PLANNING PHASE
# =============================================================================
def plan_actions(self, command_filter: Optional[str] = None) -> List[Action]:
"""Plan all actions based on environment configuration"""
actions = []
# Check required variables first
actions.extend(self._plan_variable_checks())
# System setup actions
actions.extend(self._plan_hostname_actions())
# Package manager setup
actions.extend(self._plan_package_manager_actions())
# Package installation
actions.extend(self._plan_package_installation_actions())
# System configuration
actions.extend(self._plan_system_config_actions())
# SSH key generation
actions.extend(self._plan_ssh_actions())
# Config linking
actions.extend(self._plan_config_actions())
# Custom commands
actions.extend(self._plan_custom_command_actions())
# Filter actions based on command
if command_filter:
actions = self._filter_actions(actions, command_filter)
return actions
def _plan_variable_checks(self) -> List[Action]:
"""Plan variable requirement checks"""
actions = []
if "requires" in self.env_config:
for req_var in self.env_config["requires"]:
actions.append(
Action(
type="check-variable",
description=f"Check required variable: {req_var}",
data={"variable": req_var},
skip_on_error=False,
)
)
return actions
def _plan_hostname_actions(self) -> List[Action]:
"""Plan hostname setting actions"""
actions = []
if "hostname" in self.env_config:
hostname = self._substitute_variables(self.env_config["hostname"])
actions.append(
Action(
type="set-hostname",
description=f"Set system hostname to: {hostname}",
data={"hostname": hostname},
skip_on_error=False,
os_filter=None, # Both macos and linux support hostname setting
)
)
return actions
def _plan_package_manager_actions(self) -> List[Action]:
"""Plan package manager setup actions"""
actions = []
if "packages" in self.env_config:
specified_pm = self.env_config.get("package-manager")
pm = self.pm.get_package_manager(specified_pm)
# Install brew if needed
if pm == "brew" and not shutil.which("brew"):
actions.append(
Action(
type="install-brew",
description="Install Homebrew package manager",
data={},
skip_on_error=False,
os_filter="macos",
)
)
# Update package manager
actions.append(
Action(
type="pm-update",
description=f"Update {pm} package repositories",
data={"pm": pm},
skip_on_error=False,
)
)
return actions
def _plan_package_installation_actions(self) -> List[Action]:
"""Plan package installation actions"""
actions = []
if "packages" not in self.env_config:
return actions
packages_config = self.env_config["packages"]
specified_pm = self.env_config.get("package-manager")
pm = self.pm.get_package_manager(specified_pm)
# Collect all packages by type
all_packages = {"standard": set(), "cask": set(), "binary": []}
# Process standard packages
if "standard" in packages_config:
for pkg in packages_config["standard"]:
if isinstance(pkg, str):
all_packages["standard"].add(pkg)
else:
all_packages["standard"].add(pkg["name"])
# Process binary packages and their dependencies
if "binary" in packages_config:
for pkg in packages_config["binary"]:
if isinstance(pkg, str):
pkg_spec = {"name": pkg}
else:
pkg_spec = pkg.copy()
# Merge with binary config from manifest
if pkg_spec["name"] in self.manifest.get("binaries", {}):
binary_config = self.manifest["binaries"][pkg_spec["name"]]
pkg_spec.update(binary_config)
# Add dependencies to standard packages
if "dependencies" in pkg_spec:
all_packages["standard"].update(pkg_spec["dependencies"])
all_packages["binary"].append(pkg_spec)
# Process cask packages
if "cask" in packages_config:
for pkg in packages_config["cask"]:
if isinstance(pkg, str):
all_packages["cask"].add(pkg)
else:
all_packages["cask"].add(pkg["name"])
# Create installation actions in order: standard -> cask -> binary
if all_packages["standard"]:
actions.append(
Action(
type="install-packages",
description=f"Install {len(all_packages['standard'])} standard packages via {pm}",
data={
"pm": pm,
"packages": list(all_packages["standard"]),
"package_type": "standard",
},
skip_on_error=False,
)
)
if all_packages["cask"]:
actions.append(
Action(
type="install-packages",
description=f"Install {len(all_packages['cask'])} cask packages via {pm}",
data={"pm": pm, "packages": list(all_packages["cask"]), "package_type": "cask"},
skip_on_error=False,
)
)
# Process individual binary packages
for pkg_spec in all_packages["binary"]:
actions.append(
Action(
type="install-binary",
description=f"Install binary: {pkg_spec['name']}",
data={"package": pkg_spec},
skip_on_error=True,
)
)
# Add post-install actions for packages
if "standard" in packages_config:
for pkg in packages_config["standard"]:
if isinstance(pkg, dict):
actions.extend(self._plan_package_post_actions(pkg))
if "binary" in packages_config:
for pkg in packages_config["binary"]:
if isinstance(pkg, dict):
actions.extend(self._plan_package_post_actions(pkg))
if "cask" in packages_config:
for pkg in packages_config["cask"]:
if isinstance(pkg, dict):
actions.extend(self._plan_package_post_actions(pkg))
return actions
def _plan_package_post_actions(self, pkg_spec: Dict[str, Any]) -> List[Action]:
"""Plan post-install actions for a package"""
actions = []
if "post-install" in pkg_spec:
actions.append(
Action(
type="run-command",
description=f"Run post-install script for {pkg_spec['name']}",
data={"command": pkg_spec["post-install"]},
skip_on_error=True,
)
)
if "post-install-comment" in pkg_spec:
actions.append(
Action(
type="store-comment",
description=f"Store post-install comment for {pkg_spec['name']}",
data={"comment": pkg_spec["post-install-comment"]},
skip_on_error=True,
)
)
return actions
def _plan_system_config_actions(self) -> List[Action]:
"""Plan system configuration actions"""
actions = []
if "locale" in self.env_config and self.system_os == "linux":
locale = self.env_config["locale"]
actions.append(
Action(
type="set-locale",
description=f"Set system locale to: {locale}",
data={"locale": locale},
skip_on_error=True,
os_filter="linux",
)
)
if "shell" in self.env_config and self.system_os == "linux":
shell = self.env_config["shell"]
actions.append(
Action(
type="set-shell",
description=f"Set default shell to: {shell}",
data={"shell": shell},
skip_on_error=True,
os_filter="linux",
)
)
return actions
def _plan_ssh_actions(self) -> List[Action]:
"""Plan SSH key generation actions"""
actions = []
if "ssh_keygen" in self.env_config:
for ssh_config in self.env_config["ssh_keygen"]:
key_type = ssh_config["type"]
filename = ssh_config.get("filename", f"id_{key_type}")
actions.append(
Action(
type="generate-ssh-key",
description=f"Generate SSH key: {filename}",
data=ssh_config,
skip_on_error=True,
)
)
return actions
def _plan_config_actions(self) -> List[Action]:
"""Plan configuration linking actions"""
actions = []
# Get all configs to link (from packages and explicit configs)
configs_to_link = set()
# Add configs from packages (unless link: false)
if "packages" in self.env_config:
for package_type, packages in self.env_config["packages"].items():
for pkg in packages:
if isinstance(pkg, str):
pkg_name = pkg
should_link = True
else:
pkg_name = pkg["name"]
should_link = pkg.get("link", True)
if should_link:
configs_to_link.add(pkg_name)
# Add explicit configs
if "configs" in self.env_config:
for config in self.env_config["configs"]:
if isinstance(config, str):
configs_to_link.add(config)
else:
configs_to_link.add(config["name"])
# Create link actions
for config_name in configs_to_link:
actions.append(
Action(
type="link-config",
description=f"Link configuration: {config_name}",
data={"config_name": config_name},
skip_on_error=True,
)
)
# Add post-link actions for explicit configs
if "configs" in self.env_config:
for config in self.env_config["configs"]:
if isinstance(config, dict):
actions.extend(self._plan_config_post_actions(config))
return actions
def _plan_config_post_actions(self, config_spec: Dict[str, Any]) -> List[Action]:
"""Plan post-link actions for a config"""
actions = []
if "post-link" in config_spec:
actions.append(
Action(
type="run-command",
description=f"Run post-link script for {config_spec['name']}",
data={"command": config_spec["post-link"]},
skip_on_error=True,
)
)
if "post-link-comment" in config_spec:
actions.append(
Action(
type="store-comment",
description=f"Store post-link comment for {config_spec['name']}",
data={"comment": config_spec["post-link-comment"]},
skip_on_error=True,
)
)
return actions
def _plan_custom_command_actions(self) -> List[Action]:
"""Plan custom command actions"""
actions = []
if "runcmd" in self.env_config:
for i, command in enumerate(self.env_config["runcmd"]):
actions.append(
Action(
type="run-command",
description=f"Run custom command {i+1}",
data={"command": command},
skip_on_error=True,
)
)
return actions
def _filter_actions(self, actions: List[Action], command_filter: str) -> List[Action]:
"""Filter actions based on command type and OS compatibility"""
# First filter by OS compatibility
filtered_actions = []
for action in actions:
if action.os_filter is None or action.os_filter == self.system_os:
filtered_actions.append(action)
else:
self.info(f"Skipping {action.description} (not compatible with {self.system_os})")
# Then filter by command type
if command_filter == "install":
install_types = {
"check-variable",
"install-brew",
"pm-update",
"install-packages",
"install-binary",
}
return [a for a in filtered_actions if a.type in install_types]
elif command_filter == "link":
link_types = {"check-variable", "link-config"}
return [a for a in filtered_actions if a.type in link_types]
return filtered_actions
# =============================================================================
# ACTION EXECUTION PHASE
# =============================================================================
def execute_actions(self, actions: List[Action], dry_run: bool = False):
"""Execute all planned actions"""
if dry_run:
self._print_plan(actions)
return
# Filter out OS-incompatible actions that weren't filtered in planning
compatible_actions = [a for a in actions if a.os_filter is None or a.os_filter == self.system_os]
if len(compatible_actions) != len(actions):
skipped_count = len(actions) - len(compatible_actions)
self.info(f"Skipped {skipped_count} OS-incompatible actions")
self.console.section_header(f"EXECUTING {len(compatible_actions)} ACTIONS", f"Environment: {self.environment}")
for i, action in enumerate(compatible_actions, 1):
self.console.step_start(i, len(compatible_actions), action.description)
try:
self._execute_action(action)
action.status = "completed"
self.console.step_complete()
except Exception as e:
action.error = str(e)
if action.skip_on_error:
action.status = "skipped"
self.console.step_skip(str(e))
else:
action.status = "failed"
self.console.step_fail(str(e))
print(f"\n{self.console.RED}💥 Critical action failed, stopping execution{self.console.RESET}")
break
# Show final summary
self._print_execution_summary(compatible_actions)
def _execute_action(self, action: Action):
"""Execute a single action"""
executors = {
"check-variable": self._execute_check_variable,
"set-hostname": self._execute_set_hostname,
"install-brew": self._execute_install_brew,
"pm-update": self._execute_pm_update,
"install-packages": self._execute_install_packages,
"install-binary": self._execute_install_binary,
"set-locale": self._execute_set_locale,
"set-shell": self._execute_set_shell,
"generate-ssh-key": self._execute_generate_ssh_key,
"link-config": self._execute_link_config,
"run-command": self._execute_run_command,
"store-comment": self._execute_store_comment,
}
executor = executors.get(action.type)
if not executor:
raise RuntimeError(f"Unknown action type: {action.type}")
executor(action.data)
def _execute_check_variable(self, data: Dict[str, Any]):
"""Execute variable check"""
variable = data["variable"]
if variable not in self.variables:
raise RuntimeError(f"Required variable not set: {variable}")
def _execute_set_hostname(self, data: Dict[str, Any]):
"""Execute hostname setting"""
hostname = data["hostname"]
if self.system_os == "macos":
self.run_command(f"sudo scutil --set ComputerName '{hostname}'")
self.run_command(f"sudo scutil --set HostName '{hostname}'")
self.run_command(f"sudo scutil --set LocalHostName '{hostname}'")
else:
self.run_command(f"sudo hostnamectl set-hostname '{hostname}'")
def _execute_install_brew(self, data: Dict[str, Any]):
"""Execute Homebrew installation"""
self.run_command(
'NONINTERACTIVE=1 /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"'
)
def _execute_pm_update(self, data: Dict[str, Any]):
"""Execute package manager update"""
pm = data["pm"]
cmd = self.pm.get_update_command(pm)
self.run_command(cmd)
def _execute_install_packages(self, data: Dict[str, Any]):
"""Execute package installation"""
pm = data["pm"]
packages = data["packages"]
package_type = data["package_type"]
if not packages:
return
cmd = self.pm.get_install_command(pm, packages, package_type)
self.run_command(cmd)
def _execute_install_binary(self, data: Dict[str, Any]):
"""Execute binary installation"""
package = data["package"]
# This would need implementation based on your binary installation logic
# For now, just a placeholder
self.info(f"Binary installation for {package['name']} would happen here")
def _execute_set_locale(self, data: Dict[str, Any]):
"""Execute locale setting"""
locale = data["locale"]
self.run_command(f"sudo locale-gen {locale}")
self.run_command(f"sudo update-locale LANG={locale}")
def _execute_set_shell(self, data: Dict[str, Any]):
"""Execute shell setting"""
shell = data["shell"]
shell_path = shutil.which(shell)
if not shell_path:
raise RuntimeError(f"Shell not found: {shell}")
# Add shell to /etc/shells if not present
try:
with open("/etc/shells", "r") as f:
shells = f.read()
if shell_path not in shells:
self.run_command(f"echo '{shell_path}' | sudo tee -a /etc/shells")
except FileNotFoundError:
pass
self.run_command(f"chsh -s {shell_path}")
def _execute_generate_ssh_key(self, data: Dict[str, Any]):
"""Execute SSH key generation"""
ssh_dir = Path.home() / ".ssh"
ssh_dir.mkdir(mode=0o700, exist_ok=True)
key_type = data["type"]
comment = self._substitute_variables(data.get("comment", ""))
filename = data.get("filename", f"id_{key_type}")
key_path = ssh_dir / filename
if key_path.exists():
self.warn(f"SSH key already exists: {key_path}")
return
cmd = f'ssh-keygen -t {key_type} -f "{key_path}" -N "" -C "{comment}"'
self.run_command(cmd)
def _execute_link_config(self, data: Dict[str, Any]):
"""Execute configuration linking"""
config_name = data["config_name"]
# This would need implementation based on your config linking logic
# For now, just a placeholder
self.info(f"Config linking for {config_name} would happen here")
def _execute_run_command(self, data: Dict[str, Any]):
"""Execute custom command"""
command = self._substitute_variables(data["command"])
self.run_command(command)
def _execute_store_comment(self, data: Dict[str, Any]):
"""Execute comment storage"""
comment = self._substitute_variables(data["comment"])
self.post_install_comments.append(comment)
def _print_plan(self, actions: List[Action]):
"""Print execution plan"""
self.console.plan_header(f"EXECUTION PLAN FOR {self.environment}", len(actions))
# Group actions by type for better readability
grouped_actions = {}
for action in actions:
action_category = action.type.split("-")[0] # "install", "set", "link", etc.
if action_category not in grouped_actions:
grouped_actions[action_category] = []
grouped_actions[action_category].append(action)
for category, category_actions in grouped_actions.items():
self.console.plan_category(category)
for i, action in enumerate(category_actions, 1):
# Check if action will be skipped due to OS compatibility
will_skip = action.os_filter and action.os_filter != self.system_os
if will_skip:
self.console.plan_item(
i,
f"{action.description} (will be skipped - {action.os_filter} only)",
action.os_filter,
not action.skip_on_error,
)
else:
self.console.plan_item(i, action.description, action.os_filter, not action.skip_on_error)
self.console.plan_legend()
def _print_execution_summary(self, actions: List[Action]):
"""Print execution summary"""
completed = len([a for a in actions if a.status == "completed"])
failed = len([a for a in actions if a.status == "failed"])
skipped = len([a for a in actions if a.status == "skipped"])
self.console.section_summary("EXECUTION SUMMARY")
print(f"Total actions: {self.console.BOLD}{len(actions)}{self.console.RESET}")
print(f"Completed: {self.console.GREEN}{completed}{self.console.RESET}")
if failed > 0:
print(f"Failed: {self.console.RED}{failed}{self.console.RESET}")
if skipped > 0:
print(f"Skipped: {self.console.YELLOW}{skipped}{self.console.RESET}")
if self.post_install_comments:
print(f"\n{self.console.BOLD}📝 POST-INSTALL NOTES{self.console.RESET}")
print(f"{self.console.CYAN}{'' * 25}{self.console.RESET}")
for i, comment in enumerate(self.post_install_comments, 1):
print(f"{i}. {comment}")
if failed > 0:
print(f"\n{self.console.BOLD}❌ FAILED ACTIONS{self.console.RESET}")
print(f"{self.console.RED}{'' * 20}{self.console.RESET}")
for action in actions:
if action.status == "failed":
print(f"{self.console.RED}{self.console.RESET} {action.description}")
print(f" {self.console.GRAY}Error: {action.error}{self.console.RESET}")
# Final status
if failed == 0:
print(f"\n{self.console.GREEN}🎉 All actions completed successfully!{self.console.RESET}")
else:
print(f"\n{self.console.RED}💥 {failed} action(s) failed. Check the errors above.{self.console.RESET}")
# =============================================================================
# PUBLIC INTERFACE
# =============================================================================
def setup_environment(self, dry_run: bool = False):
"""Setup complete environment"""
actions = self.plan_actions()
self.execute_actions(actions, dry_run)
def install_packages(self, package_name: Optional[str] = None, dry_run: bool = False):
"""Install packages"""
actions = self.plan_actions("install")
if package_name:
# Filter to specific package
actions = [a for a in actions if package_name in str(a.data)]
self.execute_actions(actions, dry_run)
def link_configs(
self,
config_name: Optional[str] = None,
copy: bool = False,
force: bool = False,
dry_run: bool = False,
):
"""Link configurations"""
actions = self.plan_actions("link")
if config_name:
# Filter to specific config
actions = [a for a in actions if config_name in str(a.data)]
# TODO: Handle copy and force flags in action data
self.execute_actions(actions, dry_run)

63
src/package_manager.py Normal file
View File

@ -0,0 +1,63 @@
class PackageManager:
SUPPORTED_MANAGERS = {
"brew": {
"update_cmd": "brew update",
"install_cmd": "brew install {packages}",
"install_cask_cmd": "brew install --cask {packages}",
},
"apt": {
"update_cmd": "sudo apt-get update",
"install_cmd": "sudo apt-get install -y {packages}",
},
"dnf": {
"update_cmd": "sudo dnf check-update || true",
"install_cmd": "sudo dnf install -y {packages}",
},
"pacman": {
"update_cmd": "sudo pacman -Sy",
"install_cmd": "sudo pacman -S --noconfirm {packages}",
},
}
def __init__(self, parent):
self.parent = parent
self.system_os = "macos" if platform.system() == "Darwin" else "linux"
self.detected_pm = self._detect_package_manager()
def _detect_package_manager(self) -> str:
if self.system_os == "macos":
return "brew"
else:
if shutil.which("apt"):
return "apt"
elif shutil.which("dnf"):
return "dnf"
elif shutil.which("pacman"):
return "pacman"
else:
raise ValueError("No supported package manager found (apt, dnf, pacman)")
def get_package_manager(self, specified_pm: Optional[str] = None) -> str:
if specified_pm:
if specified_pm != self.detected_pm:
raise ValueError(
f"Inconsistent package manager: detected ({self.detected_pm}) != specified ({specified_pm})"
)
return specified_pm
return self.detected_pm
def get_update_command(self, pm: str) -> str:
if pm not in self.SUPPORTED_MANAGERS:
raise ValueError(f"Unsupported package manager: {pm}")
return self.SUPPORTED_MANAGERS[pm]["update_cmd"]
def get_install_command(self, pm: str, packages: List[str], package_type: str = "standard") -> str:
if pm not in self.SUPPORTED_MANAGERS:
raise ValueError(f"Unsupported package manager: {pm}")
pm_config = self.SUPPORTED_MANAGERS[pm]
if package_type == "cask" and pm == "brew":
return pm_config["install_cask_cmd"].format(packages=" ".join(packages))
else:
return pm_config["install_cmd"].format(packages=" ".join(packages))

54
test.py Normal file
View File

@ -0,0 +1,54 @@
#!/usr/bin/env python3
import argparse
import json
import os
import platform
import re
import shutil
import subprocess
import sys
import time
import urllib.request
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
import yaml
def main():
try:
# Use Popen for real-time output
process = subprocess.Popen(
command,
shell=shell,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1,
)
# Stream output in real-time
output_lines = []
for line in process.stdout:
line = line.rstrip()
if line: # Only show non-empty lines
self.console.step_output(line)
output_lines.append(line)
process.wait()
if check and process.returncode != 0:
raise subprocess.CalledProcessError(process.returncode, command, output="\n".join(output_lines))
# Create a mock CompletedProcess for compatibility
result = subprocess.CompletedProcess(command, process.returncode, stdout="\n".join(output_lines), stderr="")
return result
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Command failed: {command}\nExit code: {e.returncode}")
if __name__ == "__main__":
main()