#! /usr/bin/env python3 # encoding: utf-8 import math import re import sys import struct import fcntl import termios import time # -------------------------------------------------- def lexical_join(words, oxford=False): ''' Joins an iterable of words or sentence fragments into a lexical list: >>> lexical_join(['this', 'that', 'one of them too']) 'this, that and one of them too' >>> lexical_join(['this', 'that', 'one of them too'], oxford=True) 'this, that, and one of them too' >>> lexical_join(['this', 'that']) 'this and that' >>> lexical_join(['this']) 'this' ''' l = len(words) if l == 0: return '' elif l == 1: return words[0] elif l == 2: return '%s and %s' %(str(words[0]), str(words[1])) else: return '%s%s and %s' %(', '.join(str(w) for w in words[:-1]), ',' if oxford else '', str(words[-1])) # -------------------------------------------------- class _ProgressBarChannel: def __init__(self, unit, top, precision, use_prefixes=True, prefix_base2=False, min_width_raw=0, min_width_rate=0, min_width_time=0, min_width_percent=7): self.unit = unit self.top = top self.dynamic_top = top is None self.prefix_base2 = prefix_base2 self.precision = precision self.use_prefixes = use_prefixes self.min_width = { "raw": min_width_raw, "rate": min_width_rate, "percent": min_width_percent, "time": min_width_time } self._value = 0 self.set() if prefix_base2: _print("Unit does not yet support base2 prefixes (e.g. Gi, Mi), using decadic (G, M) instead", prefix.warn) def set(self, value=None): if value is not None: self._value = value self.ratio = self._value / self.top if self.top else None def _render(self, value, unit=None, just=None): if unit is None: unit = self.unit u = Unit(value, unit, format="%.{:d}f pU".format(self.precision)) if not self.use_prefixes or just == "percent": u._prefixes = (('', 0),) # Unit needs fixin' s = str(u) if just: s = s.rjust(self.min_width[just]) return s @property def value(self): return self._render(self._value, just="raw") @property def inverse(self): return self._render(self.top - self._value, just="raw") def divide(self, amount): """ Divides the amount into amount * ratio, amount * (1 - ratio) and rounds to integers. """ A = round(amount * self.ratio) return (A, amount - A) class ProgressBar2: """ A configurable text progressbar. Each atom consists of three characters: a literal §, an operator and a channel. An operator is one of: % - a percentage (value from 0 to 100, padded with spaces to three characters) r - displays the raw value of the channel z - displays the maximum (total) value s - displays the rate of change in units/s m - displays the rate of change in units/min h - displays the rate of change in units/h t - displays the elapsed time in s (use with uppercase channel to get an ETA) T - displays the elapsed time in hh:mm:ss (use with uppercase channel to get an ETA) any other character - a bar consisting of these characters filling the line so that at 100% it fills the entirety of the remaining space A channel ID as a lowercase letter is the channel's actual value. An uppercase letter is that channel's complement. Operations z, s, m and h cannot have a complement. Use §§ to display a literal §. Examples ======== Format: §%a [§=a>§ A] §tA Output: 42 % [============> ] 27 s Format: §%a (§ra/§ma) [§-a>§ A] §sa [§rb/§rB] (ETA §tA) Output: 53 % (21.0/39.6 kf) [---------------------> ] 27.1 f/s [233/439 Mio] (ETA 00:11:26) """ def __init__(self, format, channels, width=None): """ Initialize the ProgressBar. width is the desired size of the progressbar drawing area in characters (columns) and defaults to terminal width. space_before_unit enables a space character between a value and its unit. channels is a dict that for each channel lists its properties: { "a": { "unit": "f", "prefix_base2": False, "top": 39600, "precision": 1 }, "b": { "unit": "o", "prefix_base2": True, "top": measure_file_size_closure("/path/to/file"), "precision": 0 } } Channel IDs (the "a", "b") are arbitrary lowercase letters ([a-z]). Properties "unit" and "prefix_base2" are passed to over.core.text.Unit. "top" is the value of the channel that corresponds to 100%. Channels are allowed to exceed this value. It can either be a number or a callable. If it's a callable, it will be called without arguments and shall return a number. "precision" is the displayed floating point precision. Use 0 to force an integer. """ self.format = format self.channels = {id: _ProgressBarChannel(**conf) for id, conf in channels.items()} self.time_start = None self.width = width def set(self, channel_id, value): """ Sets the channel's value. """ c = self.channels[channel_id] c.set(value) if c.ratio is not None: for channel in self.channels.values(): if channel.dynamic_top: channel.top = channel._value / c.ratio def render(self, clear=True, stream=sys.stderr): """ Renders the progressbar into a stream. If clear is True, clears the previous content first. """ width = self.width or get_terminal_size()[1] raw = self.format t = time.time() if not self.time_start: self.time_start = t for tag in re.findall("§[%rzsmhtT][a-zA-Z]", raw): op = tag[1] ch = tag[2] channel = self.channels[ch.lower()] inverse = ch.isupper() if op == "%": ratio = 1 - channel.ratio if inverse else channel.ratio raw = raw.replace(tag, channel._render(ratio * 100, "%", "percent"), 1) elif op == "r": # display the current value raw = raw.replace(tag, channel.inverse if inverse else channel.value, 1) elif op == "z": # display the top value if inverse: raise ValueError("inverse of the top value is not defined: {:s}".format(tag)) if channel.top: s = channel._render(channel.top, just="raw") else: s = "- {:s}".format(channel.unit).rjust(channel.min_width["raw"]) raw = raw.replace(tag, s, 1) elif op.lower() == "t": t_elapsed = t - self.time_start t_to_end = (t_elapsed * (channel.top - channel._value) / channel._value) if channel._value > 0 else None raw = raw.replace(tag, self._render_time(t_to_end if inverse else t_elapsed, op.isupper(), channel.min_width["time"]), 1) elif op in "smh": if inverse: raise ValueError("the inverse of rates is not defined: {:s}".format(tag)) t_elapsed = t - self.time_start if op == "s": t_unit = "s" t_div = 1 elif op == "m": t_unit = "min" t_div = 60 elif op == "h": t_unit = "h" t_div = 3600 if t_elapsed > 0: rate = t_div * channel._value / t_elapsed s = channel._render(rate, channel.unit + "/" + t_unit, "rate") else: s = "- {:s}/{:s}".format(channel.unit, t_unit).rjust(channel.min_width["rate"]) raw = raw.replace(tag, s, 1) # all that remains are bars free_space = width - len(re.sub("§[^%rzsmhtT][a-zA-Z]", "", raw)) for tag in re.findall("§[^%rzsmhtT][a-zA-Z]", raw): op = tag[1] ch = tag[2] channel = self.channels[ch.lower()] lengths = channel.divide(free_space) raw = raw.replace(tag, op * lengths[1 if ch.isupper() else 0], 1) # render literal §s raw = raw.replace("§§", "§") # pave over previous render and draw the new one if clear: stream.write("\r" + " " * width) stream.write("\r" + raw) def end(self, finish=False): """ Print a newline (heh :-) If finish is True, set all channels to 100% and draw the progressbar first. """ if finish: for c in self.channels.values(): c.set(c.top) self.render() print() def _render_time(self, seconds, hhmmss=False, just=0): if hhmmss: if seconds is None: output = "--:--:--" else: mm, ss = divmod(int(seconds), 60) hh, mm = divmod(mm, 60) output = "{:02d}:{:02d}:{:02d}".format(hh, mm, ss) else: output = "- s" if seconds is None else "{:d} s".format(int(seconds)) return output.rjust(just) class ProgressBar: ''' An animated progress bar. TODO derive Wait() from this ''' def __init__(self, width, top, unit, reverse=False): ''' width width of the 'widget' including all text top the 100% value unit name of the unit to display reverse True to expand the progress bar from right to left ''' self.width = width self.value = 0 self.top = top self.unit = unit self.reverse = reverse self.old_len = 0 self.t_start = None _print("over.core.text.ProgressBar is deprecated and will be replaced by ProgressBar2 soon", prefix.warn) def draw(self): if not self.t_start: self.t_start = time.time() if self.old_len: sys.stderr.write('\b' * self.old_len) transferred = str(Unit(self.value, self.unit)) dt = time.time() - self.t_start if dt > 0: speed = self.value / dt else: speed = 0.0 speed = str(Unit(speed, '%s/s' %(self.unit))) available_width = self.width - len(transferred) - len(speed) - 5 ratio = self.value / self.top pb_done = '=' * int(available_width * ratio) pb_rem = ' ' * int(available_width * (1 - ratio)) symbol = '>' if self.reverse: symbol = '<' pb_done, pb_rem = pb_rem, pb_done text = '%s [%s%s%s] %s' %(transferred, pb_done, symbol, pb_rem, speed) sys.stderr.write(text) current_len = len(text) tail = self.old_len - current_len self.old_len = current_len if tail > 0: sys.stderr.write(' ' * tail) sys.stderr.write('\b' * tail) sys.stderr.flush() def update(self, value): self.value = value self.draw() def blank(self): sys.stderr.write('\r' + self.old_len * ' ' + '\r') sys.stderr.flush() # -------------------------------------------------- class Unit: ''' A object that represents numbers and units in human-readable form. TODO use significant digits instead of rigid order boundaries TODO float superclass? TODO base_2 prefixes (Ki, Mi, ...) ''' _prefixes = ( ('Y', 24), ('Z', 21), ('E', 18), ('P', 15), ('T', 12), ('G', 9), ('M', 6), ('k', 3), ('h', 2), ('D', 1), ('', 0), ('d', -1), ('c', -2), ('m', -3), ('μ', -6), ('n', -9), ('p', -12), ('f', -15), ('a', -18), ('z', -21), ('y', -24) ) def __init__(self, value, unit=None, dimension=1, use_prefixes='YZEPTGMkmμnpfazy', format='%.2f pU', logarithmic=False, log_base=10 ): ''' value the numerical value of the variable (int or float) unit the symbol to use, if any (str or None) dimension the dimensionality of the value (1, 2, 3, ...) use_prefixes which multiplier prefixes to use - the default 'YZEPTGMkmμnpfazy' omits 'c' for centi- and 'D' for deca- format use printf notation for value (e.g. %010.5f), p for prefix and U for unit logarithmic when True, log_10 prefixes are used log_base logarithm base value note that deca- is correctly rendered as 'da', the 'D' is used in use_prefixes only ''' self.value = float(value) self.unit = unit if unit else '' self.dimension = dimension self.use_prefixes = use_prefixes self.format = format self.logarithmic = logarithmic self.log_base = log_base if self.logarithmic and (self.value < 0 or self.log_base < 0): raise ValueError('math domain error (negative values can\'t be represented in dB)') def __str__(self): if self.value == 0.0: e = 0 else: e = round(math.log(abs(self.value), 10), 6) if self.logarithmic: prefix = 'dB' value = math.log(self.value, self.log_base) * 10 else: for prefix, mul in self._prefixes: if prefix in self.use_prefixes and mul*self.dimension <= e: break value = self.value / 10**(mul*self.dimension) output = self.format %(value) output = output.replace('p', prefix if prefix != 'D' else 'da') # deca- handler output = output.replace('U', self.unit) return output def __repr__(self): return 'over.core.text.Unit(%s)' %(self) # -------------------------------------------------- def paragraph(text, width=0, indent=0, prefix=None, stamp=None): ''' str text text to format int width required line length; if 0, current terminal width will be used; - if negative, current terminal width minus supplied amount will be used int indent how many spaces to indent the text with str prefix prefix each line with it; not counted in width (offsets the lines) str stamp placed over the first line's indent (stretching the indent if necessary) Formats text into an indented paragraph that fits the terminal and returns it. Correctly handles color tags. ''' #words = [x.strip() for x in text.split() if x.strip()] words = text.split() term_width = get_terminal_size()[1] if not width: width = term_width elif width < 0: width += term_width lines = [] # individual lines go in this buffer first = True while words: if indent: if first and stamp: lines.append([stamp + ' '*(indent-1-len(stamp))]) else: lines.append([' '*(indent-1)]) # first word = indent minus one space (that's gonna get back while joining) first = False else: lines.append([]) while words: word_added = False if len(re.sub('§.', '', ' '.join(lines[-1]))) + len(re.sub('§.', '', words[0])) + 1 <= width: lines[-1].append(words.pop(0)) word_added = True elif not word_added and len(lines[-1]) == 1 and indent: # no word added and just the indent's in here = word's too long -> screw indent # we might try to keep at least a part of the indent - if possible len_word = len(re.sub('§.', '', words[0])) if len_word < width: lines[-1] = [' '*(width - len_word - 1), words.pop(0)] else: lines[-1] = [words.pop(0)] word_added = True break elif not word_added and not lines[-1] and not indent: # no word added, empty line = word's too long -> screw indent lines[-1] = [words.pop(0)] word_added = True break else: break lines_tmp = [] for line in lines: if prefix: line.insert(0, prefix) lines_tmp.append(' '.join(line)) # put words together return '\n'.join(lines_tmp) # put lines together class prefix: info = (' ', 'INFO') debug = (' §b?§/', '§bDEBG§/') start = ('§B>>>§/', '§BEXEC§/') exec = start warn = (' §y#§/', '§yWARN§/') fail = ('§r!!!§/', '§rFAIL§/') done = (' §g*§/', '§gDONE§/') colortags = { '§r': '\x1b[31;01m', '§g': '\x1b[32;01m', '§y': '\x1b[33;01m', '§b': '\x1b[34;01m', '§m': '\x1b[35;01m', '§c': '\x1b[36;01m', '§B': '\x1b[01m', '§/': '\x1b[39;49;00m' } def render(text, colors=True): ''' Processes text with color tags and either removes them (with colors=False) or replaces them with terminal color codes. ''' text = str(text) if colors: tags = re.findall('§[^§]', text) for tag in tags: try: text = text.replace(tag, colortags[tag]) except KeyError: pass else: text = re.sub('§[^§]', '', text) # unescape actual paragraphs text = re.sub('§§', '§', text) return text # -------------------------------------------------- def char_length(string): ''' Returns the length of a string minus all formatting tags. ''' plain_string = render(string, colors=False) return len(plain_string) # -------------------------------------------------- class Output: ''' Text UI output renderer. Prints messages to the stdout with optional eye candy like colors and timestamps. Usage: >>> from over import Output, prefix >>> say = Output('test', timestamp=True) >>> say('system initialized') [2013-02-28 16:41:28] INFO -- test, system initialized >>> say('system is FUBAR', prefix.fail) [2013-02-28 16:41:46] FAIL -- test, system is FUBAR >>> say('I just realized this will not work', prefix.fail, timestamp=False) !!! I just realized this will not work TODO use a generic format string ''' def __init__(self, name, timestamp=True, colors=True, default_prefix=prefix.info, default_suffix='.\n', stream=sys.stderr): self.name = name self.timestamp = timestamp self.colors = colors self.default_prefix = default_prefix self.default_suffix = default_suffix self.stream = stream def __call__(self, text, prefix=None, suffix=None, indent=0, timestamp=None, colors=None, display_name=True): if prefix is None: prefix = self.default_prefix if type(prefix) is str: prefix = (prefix, prefix) if suffix is None: suffix = self.default_suffix if timestamp is None: timestamp = self.timestamp if colors is None: colors = self.colors output = [] # [2012-11-11 16:52:06] INFO -- ahoj if timestamp: output.append(time.strftime('[%Y-%m-%d %H:%M:%S] ')) output.append(prefix[1]) output.append(' -- ') elif prefix: output.append(prefix[0]) output.append(' ') if display_name and self.name: output.append('%s, ' %(self.name)) #output.append(paragraph(str(text), indent=indent)) output.append(str(text)) if suffix: output.append(suffix) output = ''.join(output) self.stream.write(render(output, colors)) self.stream.flush() # -------------------------------------------------- def get_terminal_size(): ''' Returns current terminal's (rows, cols). ''' terminal = sys.stdout.fileno() try: return struct.unpack('HHHH', fcntl.ioctl(terminal, termios.TIOCGWINSZ, struct.pack('HHHH', 0, 0, 0, 0))) except IOError: return (40, 80) # -------------------------------------------------- _print = Output('over.core.text', stream=sys.stderr) # -------------------------------------------------- if __name__ == "__main__": pb = ProgressBar2( "§%a (§ra/§za) [§=a>§ A] §sa [§rb/§zb] (ETA §TA)", { "a": { "unit": "f", "prefix_base2": False, "top": 39600, "precision": 1, "min_width_raw": 10, "min_width_rate": 10, # "min_width_percent": 7, # 7 is already the default "min_width_time": 0 }, "b": { "unit": "o", "prefix_base2": True, "top": 3200, "precision": 0, "min_width_raw": 10, "min_width_rate": 10, "min_width_time": 10 } } ) ratios = [0, 0.1, 0.25, 0.4, 0.6, 0.8, 0.95, 1.0] for ratio in ratios: pb.set("a", ratio * pb.channels["a"].top) pb.set("b", ratio * pb.channels["b"].top) pb.render() time.sleep(0.25) pb.end(not True)