#! /usr/bin/env python3 # encoding: utf-8 import datetime import os import socket import subprocess import sys COLOR_LOAD_IDLE = 10 COLOR_LOAD_OK = 2 COLOR_LOAD_WARN = 3 COLOR_LOAD_ERROR = 1 COLOR_USER_USER = 10 COLOR_USER_ROOT = 196 COLOR_SESSION_LOCAL = 7 COLOR_SESSION_REMOTE = 202 COLOR_TERM_SCREEN = 27 COLOR_CLOCK = 27 COLOR_MEMSWAP = 8 COLOR_SPACE_OK = 2 COLOR_SPACE_WARN = 3 COLOR_SPACE_ERROR = 1 COLOR_DIR_RW = 34 COLOR_DIR_RO = 202 COLOR_DIR_NO = 196 COLOR_GIT_CLEAN = 10 COLOR_GIT_DIRTY = 1 COLOR_GIT_MERGE = 196 COLOR_GIT_UNTRACKED = 202 COLOR_GIT_MODIFIED = 1 COLOR_GIT_STAGED = 10 COLOR_PROMPT_OK = 14 COLOR_PROMPT_ERROR = 1 MOUNT_IGNORE_FS = ["iso9660", "tmpfs"] MOUNT_IGNORE_DIR = ["/dev", "/proc", "/sys"] GIT_BLACKLIST = ["/var/paludis/repositories"] def style_color(fg, bg=0): return "\033[38;5;%dm\033[48;5;%dm" %(fg, bg) def style_bold(): return "\033[1m" def style_reset(): return "\033[0m" def colored_strlen(raw): """ Returns string length without ANSI control escapes. """ l = 0 reading_escape = False for c in raw: if c == "\x1b": reading_escape = True continue if reading_escape and c == "m": reading_escape = False continue if not reading_escape: l += 1 return l def si_number(raw): if raw > 2**40: raw /= 2**40 si = "Ti" elif raw > 2**30: raw /= 2**30 si = "Gi" elif raw > 2**20: raw /= 2**20 si = "Mi" elif raw > 2**10: raw /= 2**10 si = "ki" else: si = "" return (raw, si) def space_string(free): """ Returns a formatting string suitable for printing out free space: float for >1k, int otherwise. """ return "%.2f%s" if free > 1024 else "%d%s" class Sysload: def __init__(self): loadavg = open("/proc/loadavg").read().split() cores = os.sysconf(os.sysconf_names["SC_NPROCESSORS_ONLN"]) self.load1 = float(loadavg[0]) / cores self.load5 = float(loadavg[1]) / cores self.load15 = float(loadavg[2]) / cores self.tasks_ready, self.tasks_total = [int(x) for x in loadavg[3].split("/")] class Settings: def __init__(self, argv): try: self.return_value = int(argv[1]) self.term_width = int(argv[2]) self.load_warn = float(argv[3]) self.load_error = float(argv[4]) self.space_warn = float(argv[5]) self.space_error = float(argv[6]) self.is_root = os.geteuid() == 0 except: print("Usage: %s return_value $COLUMNS load_warn load_error space_warn space_error") print(" e.g. %s 1 $COLUMNS 1.25 2.5 0.15 0.05") print() raise def get_load_color(self, load): if load > self.load_error: return COLOR_LOAD_ERROR elif load > self.load_warn: return COLOR_LOAD_WARN elif load > 0.1: return COLOR_LOAD_OK else: return COLOR_LOAD_IDLE def get_space_color(self, free, total): free = free / total if free < self.space_error: return COLOR_SPACE_ERROR elif free < self.space_warn: return COLOR_SPACE_WARN else: return COLOR_SPACE_OK class Part: def __init__(self): self.fragments = [] def __str__(self): return "".join(self.fragments) def __len__(self): return colored_strlen(str(self)) class LoginPart(Part): """ username@hostname:screen - username is green for users, red for root - @ is a literal "@" if this is a local session, " ⇄ " when remote (ssh) - hostname is colored according to sysload - screen is the screen window ID if available """ def __init__(self, settings, sysload): Part.__init__(self) is_remote = bool(os.getenv("SSH_CLIENT")) screen = os.getenv("WINDOW") # username if settings.is_root: self.fragments.append(style_color(COLOR_USER_ROOT)) else: self.fragments.append(style_color(COLOR_USER_USER)) self.fragments.append(style_bold()) user = os.getlogin() self.fragments.append(user) self.fragments.append(style_reset()) # sign if is_remote: self.fragments.append(style_color(COLOR_SESSION_REMOTE)) else: self.fragments.append(style_color(COLOR_SESSION_LOCAL)) sign = " ⇄ " if is_remote else "@" self.fragments.append(sign) # hostname self.fragments.append(style_color(settings.get_load_color(sysload.load1))) hostname = socket.gethostname() self.fragments.append(hostname) # screen window ID if screen: self.fragments.append(style_reset()) self.fragments.append(":") self.fragments.append(style_bold()) self.fragments.append(style_color(COLOR_TERM_SCREEN)) self.fragments.append(screen) self.fragments.append(style_reset()) def list_homes(): with open("/etc/passwd") as f: for line in f: tokens = line.split(":") if tokens[5] != "/dev/null": yield (tokens[5], tokens[0]) class Dir: def __init__(self, path, name=None, slash="/"): self.path = path self.slash = slash self.truncated = False self.name = os.path.basename(path) if name is None else name if os.access(path, os.W_OK): self.color = COLOR_DIR_RW elif os.access(path, os.X_OK): self.color = COLOR_DIR_RO else: self.color = COLOR_DIR_NO def __len__(self): return len(self.name) + len(self.slash) def truncate(self): self.truncated = True if self.name: self.name = self.name[0] + "…" def __str__(self): output = [self.slash] if self.truncated: output.append(style_bold()) output.append(style_color(self.color)) output.append(self.name) output.append(style_reset()) return "".join(output) class PathPart(Part): """ /path/to/cwd Contructs a list of Dirs from the root to CWD. When fit is called, some Dirs may be shortened. """ def __init__(self, term_width): Part.__init__(self) self.term_width = term_width self.dirs = [] homes = dict(list_homes()) dirs = os.getcwd().split("/")[1:] path = "" for dir in dirs: path += "/" + dir if path in homes: self.dirs = [] if homes[path] == os.getlogin(): self.dirs.append(Dir(path, "", "~")) else: self.dirs.append(Dir(path, homes[path], "~")) else: self.dirs.append(Dir(path)) @property def full_length(self): return sum(len(dir) for dir in self.dirs) def shrink_fit(self, line): self.fragments = [] available_space = self.term_width - sum(len(part) for part in line if part is not self) - len(line) + 1 # account for single-space separators for dir in self.dirs: if self.full_length > available_space: dir.truncate() self.fragments.append(str(dir)) def command(cmd): """ Executes a command, returns stdout, suppresses stderr." """ s = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) s.wait() return s.stdout.read().decode("utf-8") class GitPart(Part): """ ↘2 ↗4 ⚠master ?11 ✎6 ✉10 - 2 commits are available for pulling (remote is ahead) - 4 commits are available for pushing (local is ahead) - we're merging - branch name is master - 11 untracked filed - 6 modified files - 10 modified and staged files Numeric elements are hidden if the value is 0. """ def __init__(self): Part.__init__(self) branch_name = command("git name-rev --name-only --no-undefined --always HEAD").strip() if branch_name: self.fragments.append("| ") count_to_pull = command("git log --oneline ..@{u}").count("\n") count_to_push = command("git log --oneline @{u}..").count("\n") git_dir = command("git rev-parse --git-dir").strip() merging = os.path.exists(os.path.join(git_dir, "MERGE_HEAD")) untracked = command("git ls-files --other --exclude-standard").count("\n") modified = command("git diff --name-only").count("\n") staged = command("git diff --name-only --staged").count("\n") if count_to_pull: self.fragments.append("↘%d " %(count_to_pull)) if count_to_push: self.fragments.append("↗%d " %(count_to_push)) if merging: self.fragments.append(style_color(COLOR_GIT_MERGE)) self.fragments.append(style_bold()) self.fragments.append("⚠") elif modified or staged: self.fragments.append(style_color(COLOR_GIT_DIRTY)) else: self.fragments.append(style_color(COLOR_GIT_CLEAN)) self.fragments.append(branch_name) self.fragments.append(style_reset()) if untracked: self.fragments.append(style_color(COLOR_GIT_UNTRACKED)) self.fragments.append(" ?%d" %(untracked)) if modified: self.fragments.append(style_color(COLOR_GIT_MODIFIED)) self.fragments.append(" ✎%d" %(modified)) if staged: self.fragments.append(style_color(COLOR_GIT_STAGED)) self.fragments.append(" ✉%d" %(staged)) self.fragments.append(style_reset()) class Padding(Part): def __init__(self, term_width): Part.__init__(self) self.term_width = term_width def expand_fit(self, line): length = self.term_width - sum(len(part) for part in line if part is not self) - len(line) + 1 # account for single-space separators self.fragments = [" " * length] class StatsPart(Part): """ [ clock | sysload tasks | m:memory (?s:swap?) mountpoint_spaces... - clock shows current day and time (in DD HH:MM) - sysload is current loadavg divided by amount of cores (in 1) - tasks is the total amount of tasks on the system - memory shows current free memory (in octets) - swap shows remaining swap space but only if any is actually used - mountpoint_spaces list one free space per mountpoint """ def __init__(self, settings, sysload): Part.__init__(self) # clock self.fragments.append(datetime.datetime.now().strftime("[ %%d \033[38;5;%dm%%H:%%M\033[0m | " %(COLOR_CLOCK))) # sysload self.fragments.append(style_color(settings.get_load_color(sysload.load1))) self.fragments.append("%.2f " %(sysload.load1)) self.fragments.append(style_reset()) # tasks self.fragments.append("%d | " %(sysload.tasks_total)) # memory (and swap, if used) mem_total = 0 mem_free = 0 swap_total = 0 swap_free = 0 with open("/proc/meminfo") as f: for line in f: if line.startswith("MemTotal"): mem_total = int(line.split()[1]) * 1024 elif line.startswith("MemAvailable"): mem_free = int(line.split()[1]) * 1024 elif line.startswith("SwapTotal"): swap_total = int(line.split()[1]) * 1024 elif line.startswith("SwapFree"): swap_free = int(line.split()[1]) * 1024 self.fragments.append(style_color(COLOR_MEMSWAP)) self.fragments.append("m") self.fragments.append(style_color(settings.get_space_color(mem_free, mem_total))) mem_free_si = si_number(mem_free) self.fragments.append(space_string(mem_free) %(mem_free_si[0], mem_free_si[1])) self.fragments.append(style_reset()) if swap_total - swap_free > 2**20: self.fragments.append(style_color(COLOR_MEMSWAP)) self.fragments.append(" s") self.fragments.append(style_color(settings.get_space_color(swap_free, swap_total))) swap_free_si = si_number(swap_free) self.fragments.append(space_string(swap_free) %(swap_free_si[0], swap_free_si[1])) self.fragments.append(style_reset()) # mountpoints names = [] with open("/proc/self/mounts") as f: for line in f: _, dir, type, options, *rest = line.split() # skip non-storage mounts if type in MOUNT_IGNORE_FS: continue if any([dir.startswith(d) for d in MOUNT_IGNORE_DIR]): continue if "rw" not in options.split(","): continue # /proc/self/mounts uses a literal \\040 string to escape spaces dir = dir.replace("\\040", " ") basename = os.path.basename(dir) if basename: short_name = " " for c in basename: short_name += c if short_name not in names: break else: short_name = " /" stat = os.statvfs(dir) stor_total = stat.f_blocks * stat.f_bsize stor_free = stat.f_bavail * stat.f_bsize self.fragments.append(short_name) self.fragments.append(style_color(settings.get_space_color(stor_free, stor_total))) stor_free_si = si_number(stor_free) self.fragments.append(space_string(stor_free) %(stor_free_si[0], stor_free_si[1])) self.fragments.append(style_reset()) class InputPart(Part): """ $ or ❌42:$ $ is either a literal "$" (user) or a "#" (root) """ def __init__(self, settings): Part.__init__(self) self.fragments.append("\n") if settings.return_value: self.fragments.append("%d" %(settings.return_value)) self.fragments.append(":") self.fragments.append(style_color(COLOR_PROMPT_ERROR if settings.return_value else COLOR_PROMPT_OK)) self.fragments.append("#" if settings.is_root else "$") self.fragments.append(style_reset()) self.fragments.append(" ") if __name__ == "__main__": settings = Settings(sys.argv) sysload = Sysload() lp = LoginPart(settings, sysload) pp = PathPart(settings.term_width) gp = GitPart() pad = Padding(settings.term_width) sp = StatsPart(settings, sysload) top_line = [lp, pp, gp, pad, sp] prompt = InputPart(settings) pp.shrink_fit(top_line) pad.expand_fit(top_line) top_line_str = " ".join(str(part) for part in top_line) sys.stderr.write(top_line_str) sys.stderr.write(str(prompt)) sys.stderr.flush()