394 lines
11 KiB
Python
Executable File
394 lines
11 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
tiny_mux: minimal SSH workspace switcher that multiplexes *remote* tmux sessions.
|
|
|
|
- Reads sessions from ~/.tiny_mux.conf (one per line).
|
|
- Runs exactly one child at a time: ssh + tmux session.
|
|
- Hotkeys (prefix = Ctrl-a):
|
|
Ctrl-a <digit> : switch to session # (1-based from config file)
|
|
Ctrl-a q : quit tiny_mux (kills current ssh)
|
|
Ctrl-a a : send literal Ctrl-a to child
|
|
Ctrl-a w : show popup menu, choose session
|
|
"""
|
|
|
|
import errno
|
|
import fcntl
|
|
import os
|
|
import selectors
|
|
import signal
|
|
import struct
|
|
import sys
|
|
import termios
|
|
import tty
|
|
import pty
|
|
|
|
CONFIG_PATH = os.path.expanduser("~/.tiny_mux.conf")
|
|
|
|
# ioctl constants (macOS + Linux)
|
|
TIOCSWINSZ = termios.TIOCSWINSZ
|
|
TIOCGWINSZ = termios.TIOCGWINSZ
|
|
|
|
|
|
# ---------- VM wrapper logic ----------
|
|
|
|
def build_vm_command(hostname: str, skip_tmux: bool = False, extra_args=None):
|
|
"""Reimplementation of ~/bin/vm wrapper."""
|
|
user = os.environ.get("USER", "unknown")
|
|
host = hostname
|
|
ssh_identity = None
|
|
|
|
if "@" in host:
|
|
user, host = host.split("@", 1)
|
|
|
|
tmux_session = host
|
|
|
|
if host.endswith("-orb"):
|
|
ssh_host = f"{host[:-4]}@orb"
|
|
elif host.endswith("-utm"):
|
|
ssh_host = f"{host[:-4]}.utm.local"
|
|
ssh_identity = os.path.expanduser("~/.ssh/id_ed25519_internal")
|
|
elif host.endswith("-workstation"):
|
|
ssh_host = f"{host[:-12]}.workstation.lan"
|
|
ssh_identity = os.path.expanduser("~/.ssh/id_ed25519_internal")
|
|
else:
|
|
print(f"[tiny_mux] Error: unknown host pattern '{host}'", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
ssh_cmd = ["ssh", "-tt"]
|
|
if ssh_identity:
|
|
ssh_cmd += ["-i", ssh_identity, "-o", "IdentitiesOnly=yes"]
|
|
ssh_cmd.append(f"{user}@{ssh_host}")
|
|
|
|
if not skip_tmux:
|
|
ssh_cmd += ["tmux", "new", "-A", "-s", tmux_session]
|
|
elif extra_args:
|
|
ssh_cmd += extra_args
|
|
|
|
return ssh_cmd
|
|
|
|
|
|
# ---------- Terminal helpers ----------
|
|
|
|
def load_sessions(path: str):
|
|
sessions = []
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
for line in f:
|
|
s = line.strip()
|
|
if not s or s.startswith("#"):
|
|
continue
|
|
sessions.append(s)
|
|
except FileNotFoundError:
|
|
print(f"[tiny_mux] Config not found: {path}", file=sys.stderr)
|
|
sys.exit(1)
|
|
if not sessions:
|
|
print(f"[tiny_mux] No sessions in {path}", file=sys.stderr)
|
|
sys.exit(1)
|
|
return sessions
|
|
|
|
|
|
def get_winsize(fd: int):
|
|
try:
|
|
data = fcntl.ioctl(fd, TIOCGWINSZ, b"\x00" * 8)
|
|
rows, cols, xp, yp = struct.unpack("HHHH", data)
|
|
if rows == 0 or cols == 0:
|
|
rows = int(os.environ.get("LINES", 24)) or 24
|
|
cols = int(os.environ.get("COLUMNS", 80)) or 80
|
|
xp = yp = 0
|
|
return rows, cols, xp, yp
|
|
except OSError:
|
|
return 24, 80, 0, 0
|
|
|
|
|
|
def set_winsize(fd: int, rows: int, cols: int, xp: int = 0, yp: int = 0):
|
|
data = struct.pack("HHHH", rows, cols, xp, yp)
|
|
fcntl.ioctl(fd, TIOCSWINSZ, data)
|
|
|
|
|
|
class RawTTY:
|
|
def __init__(self, fd: int):
|
|
self.fd = fd
|
|
self._saved = None
|
|
|
|
def __enter__(self):
|
|
self._saved = termios.tcgetattr(self.fd)
|
|
tty.setraw(self.fd, when=termios.TCSANOW)
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb):
|
|
if self._saved is not None:
|
|
termios.tcsetattr(self.fd, termios.TCSADRAIN, self._saved)
|
|
|
|
|
|
# ---------- TinyMux core ----------
|
|
|
|
class TinyMux:
|
|
def __init__(self, sessions):
|
|
self.sessions = sessions
|
|
self.current_idx = None
|
|
|
|
self.master_fd = None
|
|
self.child_pid = None
|
|
self._selector = selectors.DefaultSelector()
|
|
|
|
self.stdin_fd = sys.stdin.fileno()
|
|
self.stdout_fd = sys.stdout.fileno()
|
|
|
|
self.prefix = b"\x01" # Ctrl-a
|
|
self._saw_prefix = False
|
|
|
|
# Save original cooked tty mode
|
|
self._saved_tty = termios.tcgetattr(self.stdin_fd)
|
|
|
|
signal.signal(signal.SIGWINCH, self._on_sigwinch)
|
|
signal.signal(signal.SIGCHLD, self._on_sigchld)
|
|
|
|
# ---------- Process / PTY management ----------
|
|
|
|
def _close_master(self):
|
|
if self.master_fd is not None:
|
|
try:
|
|
self._selector.unregister(self.master_fd)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
os.close(self.master_fd)
|
|
except OSError:
|
|
pass
|
|
self.master_fd = None
|
|
|
|
def _kill_child_group(self):
|
|
if self.child_pid:
|
|
try:
|
|
os.killpg(self.child_pid, signal.SIGHUP)
|
|
except Exception:
|
|
try:
|
|
os.kill(self.child_pid, signal.SIGHUP)
|
|
except Exception:
|
|
pass
|
|
|
|
def _reap_child(self):
|
|
if self.child_pid:
|
|
try:
|
|
while True:
|
|
pid, _ = os.waitpid(self.child_pid, os.WNOHANG)
|
|
if pid == 0:
|
|
break
|
|
if pid == self.child_pid:
|
|
self.child_pid = None
|
|
break
|
|
except ChildProcessError:
|
|
self.child_pid = None
|
|
|
|
def _spawn(self, idx: int):
|
|
if self.child_pid:
|
|
self._kill_child_group()
|
|
self._reap_child()
|
|
self._close_master()
|
|
|
|
master_fd, slave_fd = pty.openpty()
|
|
rows, cols, xp, yp = get_winsize(self.stdout_fd)
|
|
set_winsize(slave_fd, rows, cols, xp, yp)
|
|
|
|
pid = os.fork()
|
|
if pid == 0:
|
|
# --- Child ---
|
|
try:
|
|
try:
|
|
os.setsid()
|
|
except OSError as e:
|
|
if e.errno != errno.EPERM:
|
|
raise
|
|
|
|
os.close(master_fd)
|
|
os.dup2(slave_fd, 0)
|
|
os.dup2(slave_fd, 1)
|
|
os.dup2(slave_fd, 2)
|
|
if slave_fd > 2:
|
|
os.close(slave_fd)
|
|
|
|
session_name = self.sessions[idx]
|
|
argv = build_vm_command(session_name, skip_tmux=False)
|
|
os.execvp(argv[0], argv)
|
|
except Exception as e:
|
|
try:
|
|
os.write(2, f"tiny_mux child exec failed: {e}\n".encode())
|
|
except Exception:
|
|
pass
|
|
os._exit(1)
|
|
|
|
# --- Parent ---
|
|
os.close(slave_fd)
|
|
self.child_pid = pid
|
|
self.master_fd = master_fd
|
|
self.current_idx = idx
|
|
try:
|
|
os.setpgid(self.child_pid, self.child_pid)
|
|
except Exception:
|
|
pass
|
|
self._selector.register(self.master_fd, selectors.EVENT_READ)
|
|
self._propagate_winsize()
|
|
self._signal_winch()
|
|
|
|
def _propagate_winsize(self):
|
|
if self.master_fd is None:
|
|
return
|
|
rows, cols, xp, yp = get_winsize(self.stdout_fd)
|
|
try:
|
|
set_winsize(self.master_fd, rows, cols, xp, yp)
|
|
except OSError:
|
|
pass
|
|
|
|
def _signal_winch(self):
|
|
if self.child_pid:
|
|
try:
|
|
os.killpg(self.child_pid, signal.SIGWINCH)
|
|
except Exception:
|
|
try:
|
|
os.kill(self.child_pid, signal.SIGWINCH)
|
|
except Exception:
|
|
pass
|
|
|
|
def _on_sigwinch(self, _signum, _frame):
|
|
self._propagate_winsize()
|
|
self._signal_winch()
|
|
|
|
def _on_sigchld(self, _signum, _frame):
|
|
self._reap_child()
|
|
|
|
# ---------- Input handling ----------
|
|
|
|
def _choose_session(self):
|
|
"""Close ssh, clear screen, show session list, wait for choice, reconnect."""
|
|
self._kill_child_group()
|
|
self._close_master()
|
|
self._reap_child()
|
|
|
|
# Clear terminal
|
|
sys.stdout.write("\033[2J\033[H")
|
|
sys.stdout.flush()
|
|
|
|
# Temporarily cooked mode
|
|
termios.tcsetattr(self.stdin_fd, termios.TCSADRAIN, self._saved_tty)
|
|
try:
|
|
print("[tiny_mux] Available sessions:")
|
|
for i, s in enumerate(self.sessions, 1):
|
|
mark = " *" if self.current_idx == i - 1 else ""
|
|
print(f" {i}. {s}{mark}")
|
|
sys.stdout.write("\nType number (or q to cancel): ")
|
|
sys.stdout.flush()
|
|
|
|
ch = sys.stdin.read(1)
|
|
if ch.isdigit():
|
|
idx = int(ch) - 1
|
|
if 0 <= idx < len(self.sessions):
|
|
self._spawn(idx)
|
|
else:
|
|
# If canceled, just respawn previous session
|
|
if self.current_idx is not None:
|
|
self._spawn(self.current_idx)
|
|
finally:
|
|
tty.setraw(self.stdin_fd, when=termios.TCSANOW)
|
|
|
|
def _handle_stdin(self):
|
|
try:
|
|
data = os.read(self.stdin_fd, 1024)
|
|
except OSError as e:
|
|
if e.errno in (errno.EAGAIN, errno.EINTR):
|
|
return True
|
|
return False
|
|
if not data:
|
|
return False
|
|
|
|
out = bytearray()
|
|
i = 0
|
|
while i < len(data):
|
|
b = data[i:i+1]
|
|
i += 1
|
|
if self._saw_prefix:
|
|
self._saw_prefix = False
|
|
if b == b"q":
|
|
return False
|
|
elif b == b"a":
|
|
out.extend(self.prefix)
|
|
elif b.isdigit():
|
|
d = b[0] - ord('0')
|
|
idx = (9 if d == 0 else d - 1)
|
|
if 0 <= idx < len(self.sessions):
|
|
self._spawn(idx)
|
|
elif b == b"w":
|
|
self._choose_session()
|
|
elif b == self.prefix:
|
|
self._saw_prefix = True
|
|
else:
|
|
out.extend(b)
|
|
|
|
if out and self.master_fd is not None:
|
|
try:
|
|
os.write(self.master_fd, out)
|
|
except OSError:
|
|
return False
|
|
return True
|
|
|
|
def _handle_master(self):
|
|
try:
|
|
chunk = os.read(self.master_fd, 4096)
|
|
except OSError as e:
|
|
if e.errno in (errno.EIO, errno.EBADF):
|
|
return False
|
|
if e.errno in (errno.EAGAIN, errno.EINTR):
|
|
return True
|
|
return False
|
|
if not chunk:
|
|
return False
|
|
try:
|
|
os.write(self.stdout_fd, chunk)
|
|
except OSError:
|
|
return False
|
|
return True
|
|
|
|
def run(self):
|
|
with RawTTY(self.stdin_fd):
|
|
self._selector.register(self.stdin_fd, selectors.EVENT_READ)
|
|
self._spawn(0)
|
|
while True:
|
|
events = self._selector.select(timeout=None)
|
|
for key, _mask in events:
|
|
if key.fileobj == self.stdin_fd:
|
|
if not self._handle_stdin():
|
|
self.shutdown()
|
|
return
|
|
elif key.fileobj == self.master_fd:
|
|
if not self._handle_master():
|
|
self.shutdown()
|
|
return
|
|
|
|
def shutdown(self):
|
|
try:
|
|
self._selector.unregister(self.stdin_fd)
|
|
except Exception:
|
|
pass
|
|
self._kill_child_group()
|
|
self._close_master()
|
|
self._reap_child()
|
|
|
|
|
|
def main():
|
|
sessions = load_sessions(CONFIG_PATH)
|
|
mux = TinyMux(sessions)
|
|
try:
|
|
mux.run()
|
|
except KeyboardInterrupt:
|
|
mux.shutdown()
|
|
except Exception as e:
|
|
try:
|
|
mux.shutdown()
|
|
finally:
|
|
print(f"[tiny_mux] fatal error: {e}", file=sys.stderr)
|
|
raise
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|