"""GNU Stow-style tree folding/unfolding for efficient symlink management.""" import os from dataclasses import dataclass, field from pathlib import Path from typing import Dict, List, Optional, Set @dataclass class LinkOperation: """Represents a single operation to perform during linking.""" type: str # "create_symlink" | "create_dir" | "unfold" | "remove" | "remove_dir" source: Path target: Path package: str is_directory_link: bool = False def __str__(self) -> str: if self.type == "create_symlink": link_type = "DIR" if self.is_directory_link else "FILE" return f" {link_type} LINK: {self.target} -> {self.source}" elif self.type == "create_dir": return f" CREATE DIR: {self.target}" elif self.type == "unfold": return f" UNFOLD: {self.target} (directory symlink -> individual file symlinks)" elif self.type == "remove": return f" REMOVE: {self.target}" elif self.type == "remove_dir": return f" REMOVE DIR: {self.target}" return f" {self.type}: {self.target}" @dataclass class LinkTree: """Represents the current state of symlinks.""" links: Dict[Path, Path] = field(default_factory=dict) # target -> source packages: Dict[Path, str] = field(default_factory=dict) # target -> package_name directory_links: Set[Path] = field(default_factory=set) # targets that are directory links def add_link(self, target: Path, source: Path, package: str, is_dir_link: bool = False): """Add a link to the tree.""" self.links[target] = source self.packages[target] = package if is_dir_link: self.directory_links.add(target) def remove_link(self, target: Path): """Remove a link from the tree.""" self.links.pop(target, None) self.packages.pop(target, None) self.directory_links.discard(target) def is_directory_link(self, target: Path) -> bool: """Check if a target is a directory symlink.""" return target in self.directory_links def get_package(self, target: Path) -> Optional[str]: """Get the package that owns a link.""" return self.packages.get(target) def can_fold(self, target_dir: Path, package: str) -> bool: """Check if all links in target_dir belong to the same package. Returns True if we can create a single directory symlink instead of individual file symlinks. """ # Check all direct children of target_dir for link_target, link_package in self.packages.items(): # If link_target is a child of target_dir try: link_target.relative_to(target_dir) # If parent is target_dir and package differs, cannot fold if link_target.parent == target_dir and link_package != package: return False except ValueError: # link_target is not under target_dir, skip continue return True @classmethod def from_state(cls, state: dict) -> "LinkTree": """Build a LinkTree from the linked.json state format (v2 only).""" tree = cls() links_dict = state.get("links", {}) for package_name, pkg_links in links_dict.items(): for target_str, link_info in pkg_links.items(): target = Path(target_str) if not isinstance(link_info, dict) or "source" not in link_info: raise RuntimeError( "Unsupported linked state format. Remove linked.json and relink dotfiles." ) source = Path(link_info["source"]) is_dir_link = bool(link_info.get("is_directory_link", False)) tree.add_link(target, source, package_name, is_dir_link) return tree def to_state(self) -> dict: """Convert LinkTree to linked.json state format.""" state = {"version": 2, "links": {}} # Group links by package package_links: Dict[str, Dict[str, dict]] = {} for target, source in self.links.items(): package = self.packages[target] if package not in package_links: package_links[package] = {} package_links[package][str(target)] = { "source": str(source), "is_directory_link": target in self.directory_links, } state["links"] = package_links return state class TreeFolder: """Implements GNU Stow tree folding/unfolding algorithm.""" def __init__(self, tree: LinkTree): self.tree = tree def plan_link( self, source: Path, target: Path, package: str, is_dir_link: bool = False ) -> List[LinkOperation]: """Plan operations needed to create a link (may include unfolding). Args: source: Source path (file or directory in dotfiles) target: Target path (where symlink should be created) package: Package name is_dir_link: Whether this is a directory symlink (folded) Returns a list of operations to execute in order. """ operations = [] # Check if parent is a directory symlink that needs unfolding parent = target.parent if parent in self.tree.links and self.tree.is_directory_link(parent): # Parent is a folded directory symlink, need to unfold unfold_ops = self._plan_unfold(parent) operations.extend(unfold_ops) # Create symlink operation (conflict detection will handle existing links) operations.append( LinkOperation( type="create_symlink", source=source, target=target, package=package, is_directory_link=is_dir_link, ) ) return operations def _find_fold_point( self, source: Path, target: Path, package: str ) -> Path: """Find the deepest directory level where we can create a folder symlink. Returns the target path where the symlink should be created. For single files, this should just return the file path (no folding). Folding only makes sense when linking entire directories. """ # For now, disable automatic folding at the plan_link level # Folding should be done at a higher level when we know we're # linking an entire directory tree from a package return target def _plan_unfold(self, folded_dir: Path) -> List[LinkOperation]: """Plan operations to unfold a directory symlink. When unfolding: 1. Remove the directory symlink 2. Create a real directory 3. Create individual file symlinks for all files """ operations = [] # Get the source of the folded directory source_dir = self.tree.links.get(folded_dir) if not source_dir: return operations package = self.tree.packages.get(folded_dir, "") # Remove the directory symlink operations.append( LinkOperation( type="remove", source=source_dir, target=folded_dir, package=package, is_directory_link=True, ) ) # Create real directory operations.append( LinkOperation( type="create_dir", source=source_dir, target=folded_dir, package=package, ) ) # Create individual file symlinks for all files in source if source_dir.exists() and source_dir.is_dir(): for root, _dirs, files in os.walk(source_dir): for fname in files: src_file = Path(root) / fname rel = src_file.relative_to(source_dir) dst_file = folded_dir / rel operations.append( LinkOperation( type="create_symlink", source=src_file, target=dst_file, package=package, is_directory_link=False, ) ) return operations def plan_unlink(self, target: Path, package: str) -> List[LinkOperation]: """Plan operations to remove a link (may include refolding).""" operations = [] # Check if this is a directory link if self.tree.is_directory_link(target): # Remove all file links under this directory to_remove = [] for link_target in self.tree.links.keys(): try: link_target.relative_to(target) to_remove.append(link_target) except ValueError: continue for link_target in to_remove: operations.append( LinkOperation( type="remove", source=self.tree.links[link_target], target=link_target, package=self.tree.packages[link_target], is_directory_link=False, ) ) # Remove the link itself if target in self.tree.links: operations.append( LinkOperation( type="remove", source=self.tree.links[target], target=target, package=package, is_directory_link=self.tree.is_directory_link(target), ) ) return operations def detect_conflicts(self, operations: List[LinkOperation]) -> List[str]: """Detect conflicts before executing operations. Returns a list of conflict error messages. """ conflicts = [] for op in operations: if op.type == "create_symlink": # Check if target already exists in tree (managed by flow) if op.target in self.tree.links: existing_pkg = self.tree.packages[op.target] if existing_pkg != op.package: conflicts.append( f"Conflict: {op.target} is already linked by package '{existing_pkg}'" ) # Check if target exists on disk but not managed by flow elif op.target.exists() or op.target.is_symlink(): conflicts.append( f"Conflict: {op.target} already exists and is not managed by flow" ) # Check if target's parent is a file (can't create file in file) if op.target.parent.exists() and op.target.parent.is_file(): conflicts.append( f"Conflict: {op.target.parent} is a file, cannot create {op.target}" ) return conflicts def execute_operations( self, operations: List[LinkOperation], dry_run: bool = False ) -> None: """Execute a list of operations atomically. If dry_run is True, only print what would be done. """ if dry_run: for op in operations: print(str(op)) return # Execute operations for op in operations: if op.type == "create_symlink": # Create parent directories op.target.parent.mkdir(parents=True, exist_ok=True) if op.target.is_symlink(): current = op.target.resolve(strict=False) desired = op.source.resolve(strict=False) if current == desired: self.tree.add_link(op.target, op.source, op.package, op.is_directory_link) continue op.target.unlink() elif op.target.exists(): if op.target.is_file(): op.target.unlink() else: raise RuntimeError(f"Cannot overwrite directory: {op.target}") # Create symlink op.target.symlink_to(op.source) # Update tree self.tree.add_link(op.target, op.source, op.package, op.is_directory_link) elif op.type == "create_dir": op.target.mkdir(parents=True, exist_ok=True) elif op.type == "remove": if op.target.exists() or op.target.is_symlink(): op.target.unlink() self.tree.remove_link(op.target) elif op.type == "remove_dir": if op.target.exists() and op.target.is_dir(): op.target.rmdir() def to_state(self) -> dict: """Convert current tree to state format for persistence.""" return self.tree.to_state()