#! /usr/bin/env python3 # encoding: utf-8 import math import re import sys import struct import fcntl import termios import time # -------------------------------------------------- def lexical_join(words, oxford=False): ''' Joins an iterable of words or sentence fragments into a lexical list: >>> lexical_join(['this', 'that', 'one of them too']) 'this, that and one of them too' >>> lexical_join(['this', 'that', 'one of them too'], oxford=True) 'this, that, and one of them too' >>> lexical_join(['this', 'that']) 'this and that' >>> lexical_join(['this']) 'this' ''' l = len(words) if l == 0: return '' elif l == 1: return words[0] elif l == 2: return '%s and %s' %(str(words[0]), str(words[1])) else: return '%s%s and %s' %(', '.join(str(w) for w in words[:-1]), ',' if oxford else '', str(words[-1])) # -------------------------------------------------- class _ProgressBarChannel: def __init__(self, unit, top, prefix_base2, precision): self.unit = unit self.top = top self.prefix_base2 = prefix_base2 self.precision = precision self.value = 0 self.set() if prefix_base2: _print("Unit does not yet support base2 prefixes (e.g. Gi, Mi), using decadic (G, M) instead", prefix.warn) def set(self, value=None): if value is not None: self.value = value self.ratio = self.value / self.top s = Unit(self.value, self.unit, format="%.{:d}f pU".format(self.precision)) self.text = str(s).split(" ", 1) class ProgressBar2: """ A configurable text progressbar. Each atom consists of three characters: a literal §, an operator and a channel. An operator is one of: % - a percentage (value from 0 to 100, padded to three characters) r - displays the raw value of the channel z - displays the maximum (total) value s - displays the rate of change in units/s m - displays the rate of change in units/min h - displays the rate of change in units/h t - displays the elapsed time in s (use with uppercase channel to get an ETA) T - displays the elapsed time in hh:mm:ss (use with uppercase channel to get an ETA) any other character - a bar consisting of these characters filling the line so that at 100% it fills the entirety of the remaining space A channel ID as a lowercase letter is the channel's actual value. An uppercase letter is that channel's complement. Use §§ to display a literal §. Examples ======== Format: §%a [§=a>§ A] §tA Output: 42 % [============> ] 27 s Format: §%a (§ra/§ma) [§-a>§ A] §sa [§rb/§rB] (ETA §tA) Output: 53 % (21.0/39.6 kf) [---------------------> ] 27.1 f/s [233/439 Mio] (ETA 00:11:26) """ def __init__(self, format, channels, width=None, space_before_unit=True): """ Initialize the ProgressBar. width is the desired size of the progressbar drawing area in characters (columns) and defaults to terminal width. space_before_unit enables a space character between a value and its unit. channels is a dict that for each channel lists its properties: { "a": { "unit": "f", "prefix_base2": False, "top": 39600, "precision": 1 }, "b": { "unit": "o", "prefix_base2": True, "top": measure_file_size_closure("/path/to/file"), "precision": 0 } } Channel IDs (the "a", "b") are arbitrary lowercase letters ([a-z]). Properties "unit" and "prefix_base2" are passed to over.core.text.Unit. "top" is the value of the channel that corresponds to 100%. Channels are allowed to exceed this value. It can either be a number or a callable. If it's a callable, it will be called without arguments and shall return a number. "precision" is the displayed floating point precision. Use 0 to force an integer. """ self.format = format self.channels = {id: _ProgressBarChannel(**conf) for id, conf in channels.items()} self.space_before_unit = space_before_unit self.time_start = time.time() self.width = width def set(self, channel_id, value): """ Sets the channel's value. """ c = self.channels[channel_id] c.set(value) def render(self, clear=True, stream=sys.stderr): """ Renders the progressbar into a stream. If clear is True, clears the previous content first. """ width = self.width or get_terminal_size()[1] raw = self.format for tag in re.findall("§.[a-zA-Z]", raw): op = tag[1] ch = tag[2] inverse = ch.isupper() # display literal §s raw = raw.replace("§§", "§") # pave over previous render and draw the new one if clear: stream.write("\r" + "_" * width) stream.write("\r" + raw) def end(self, finish=False): """ Print a newline (heh :-) If finish is True, set all channels to 100% and draw the progressbar first. """ if finish: for c in self.channels.values(): c.set(c.top) self.render() print() class ProgressBar: ''' An animated progress bar. TODO derive Wait() from this ''' def __init__(self, width, top, unit, reverse=False): ''' width width of the 'widget' including all text top the 100% value unit name of the unit to display reverse True to expand the progress bar from right to left ''' self.width = width self.value = 0 self.top = top self.unit = unit self.reverse = reverse self.old_len = 0 self.t_start = None def draw(self): if not self.t_start: self.t_start = time.time() if self.old_len: sys.stderr.write('\b' * self.old_len) transferred = str(Unit(self.value, self.unit)) dt = time.time() - self.t_start if dt > 0: speed = self.value / dt else: speed = 0.0 speed = str(Unit(speed, '%s/s' %(self.unit))) available_width = self.width - len(transferred) - len(speed) - 5 ratio = self.value / self.top pb_done = '=' * int(available_width * ratio) pb_rem = ' ' * int(available_width * (1 - ratio)) symbol = '>' if self.reverse: symbol = '<' pb_done, pb_rem = pb_rem, pb_done text = '%s [%s%s%s] %s' %(transferred, pb_done, symbol, pb_rem, speed) sys.stderr.write(text) current_len = len(text) tail = self.old_len - current_len self.old_len = current_len if tail > 0: sys.stderr.write(' ' * tail) sys.stderr.write('\b' * tail) sys.stderr.flush() def update(self, value): self.value = value self.draw() def blank(self): sys.stderr.write('\r' + self.old_len * ' ' + '\r') sys.stderr.flush() # -------------------------------------------------- class Unit: ''' A object that represents numbers and units in human-readable form. TODO use significant digits instead of rigid order boundaries TODO float superclass? TODO base_2 prefixes (Ki, Mi, ...) ''' _prefixes = ( ('Y', 24), ('Z', 21), ('E', 18), ('P', 15), ('T', 12), ('G', 9), ('M', 6), ('k', 3), ('h', 2), ('D', 1), ('', 0), ('d', -1), ('c', -2), ('m', -3), ('μ', -6), ('n', -9), ('p', -12), ('f', -15), ('a', -18), ('z', -21), ('y', -24) ) def __init__(self, value, unit=None, dimension=1, use_prefixes='YZEPTGMkmμnpfazy', format='%.2f pU', logarithmic=False, log_base=10 ): ''' value the numerical value of the variable (int or float) unit the symbol to use, if any (str or None) dimension the dimensionality of the value (1, 2, 3, ...) use_prefixes which multiplier prefixes to use - the default 'YZEPTGMkmμnpfazy' omits 'c' for centi- and 'D' for deca- format use printf notation for value (e.g. %010.5f), p for prefix and U for unit logarithmic when True, log_10 prefixes are used log_base logarithm base value note that deca- is correctly rendered as 'da', the 'D' is used in use_prefixes only ''' self.value = float(value) self.unit = unit if unit else '' self.dimension = dimension self.use_prefixes = use_prefixes self.format = format self.logarithmic = logarithmic self.log_base = log_base if self.logarithmic and (self.value < 0 or self.log_base < 0): raise ValueError('math domain error (negative values can\'t be represented in dB)') def __str__(self): if self.value == 0.0: e = 0 else: e = round(math.log(abs(self.value), 10), 6) if self.logarithmic: prefix = 'dB' value = math.log(self.value, self.log_base) * 10 else: for prefix, mul in self._prefixes: if prefix in self.use_prefixes and mul*self.dimension <= e: break value = self.value / 10**(mul*self.dimension) output = self.format %(value) output = output.replace('p', prefix if prefix != 'D' else 'da') # deca- handler output = output.replace('U', self.unit) return output def __repr__(self): return 'over.core.text.Unit(%s)' %(self) # -------------------------------------------------- def paragraph(text, width=0, indent=0, prefix=None, stamp=None): ''' str text text to format int width required line length; if 0, current terminal width will be used; - if negative, current terminal width minus supplied amount will be used int indent how many spaces to indent the text with str prefix prefix each line with it; not counted in width (offsets the lines) str stamp placed over the first line's indent (stretching the indent if necessary) Formats text into an indented paragraph that fits the terminal and returns it. Correctly handles color tags. ''' #words = [x.strip() for x in text.split() if x.strip()] words = text.split() term_width = get_terminal_size()[1] if not width: width = term_width elif width < 0: width += term_width lines = [] # individual lines go in this buffer first = True while words: if indent: if first and stamp: lines.append([stamp + ' '*(indent-1-len(stamp))]) else: lines.append([' '*(indent-1)]) # first word = indent minus one space (that's gonna get back while joining) first = False else: lines.append([]) while words: word_added = False if len(re.sub('§.', '', ' '.join(lines[-1]))) + len(re.sub('§.', '', words[0])) + 1 <= width: lines[-1].append(words.pop(0)) word_added = True elif not word_added and len(lines[-1]) == 1 and indent: # no word added and just the indent's in here = word's too long -> screw indent # we might try to keep at least a part of the indent - if possible len_word = len(re.sub('§.', '', words[0])) if len_word < width: lines[-1] = [' '*(width - len_word - 1), words.pop(0)] else: lines[-1] = [words.pop(0)] word_added = True break elif not word_added and not lines[-1] and not indent: # no word added, empty line = word's too long -> screw indent lines[-1] = [words.pop(0)] word_added = True break else: break lines_tmp = [] for line in lines: if prefix: line.insert(0, prefix) lines_tmp.append(' '.join(line)) # put words together return '\n'.join(lines_tmp) # put lines together # -------------------------------------------------- # def strtrim(text, length, mode=0, dots=True, colors=True): # ''' # str text text to trim # int length desired length, >1 # int mode -1 = cut chars from the left, 0 = from the middle, 1 = from the right # bool dots add an ellipsis to the point of cutting # bool colors dont count color tags into length; also turns the ellipsis blue # ''' # if len(text) <= length: # return text # if length <= 3: # dots = False # if dots: # length -= 3 # if mode == -1: # if dots: # return '...' + cut(text, length, 1, colors) # else: # return cut(text, length, 1, colors) # elif mode == 0: # if length%2 == 1: # part1 = cut(text, length/2+1, 0, colors) # else: # part1 = cut(text, length/2, 0, colors) # part2 = cut(text, length/2, 1, colors) # if dots: # part1 += '...' # return part1 + part2 # else: # if dots: # return cut(text, length, 0, colors) + '...' # else: # return cut(text, length, 0, colors) # -------------------------------------------------- # def textfilter(text, set=None): # ''' # str text text to filter # dict set {to_replace: replacement} # Text filter that replaces occurences of to_replace keys with their respective values. # Defaults to filtering of 'bad' characters if no translational dictionary is provided. # ''' # if not set: # set = badchars # for to_replace in set.keys(): # text = text.replace(to_replace, set[to_replace]) # return text # -------------------------------------------------- # def cut(text, size, end=0, colors=True): # ''' # str text text to cut # int size how many chars to return, >=0 # int end return chars from 0 = left, 1 = right # bool colors skip color tags # ''' # out = '' # strlen = len(text) # if end == 0: # ptr = 0 # go from left # else: # ptr = strlen - 1 # while size: # if end == 0: # if colors and text[ptr] == '<' and strlen - ptr >= 4 and text[ptr+1] == 'C' and text[ptr+3] == '>': # we have a tag # out += text[ptr:ptr+4] # ptr += 4 # else: # out += text[ptr] # ptr += 1 # size -= 1 # else: # if colors and text[ptr] == '>' and ptr >= 4 and text[ptr-2] == 'C' and text[ptr-3] == '<': # we have a tag # out = text[ptr-3:ptr+1] + out # ptr -= 4 # else: # out = text[ptr] + out # ptr -= 1 # size -= 1 # # reached end ? # if (end == 0 and ptr == strlen) or (end == 1 and ptr == -1): # break # return out # -------------------------------------------------- class prefix: info = (' ', 'INFO') debug = (' §b?§/', '§bDEBG§/') start = ('§B>>>§/', '§BEXEC§/') exec = start warn = (' §y#§/', '§yWARN§/') fail = ('§r!!!§/', '§rFAIL§/') done = (' §g*§/', '§gDONE§/') colortags = { '§r': '\x1b[31;01m', '§g': '\x1b[32;01m', '§y': '\x1b[33;01m', '§b': '\x1b[34;01m', '§m': '\x1b[35;01m', '§c': '\x1b[36;01m', '§B': '\x1b[01m', '§/': '\x1b[39;49;00m' } def render(text, colors=True): ''' Processes text with color tags and either removes them (with colors=False) or replaces them with terminal color codes. ''' text = str(text) if colors: tags = re.findall('§[^§]', text) for tag in tags: try: text = text.replace(tag, colortags[tag]) except KeyError: pass else: text = re.sub('§[^§]', '', text) # unescape actual paragraphs text = re.sub('§§', '§', text) return text # -------------------------------------------------- def char_length(string): ''' Returns the length of a string minus all formatting tags. ''' plain_string = render(string, colors=False) return len(plain_string) # -------------------------------------------------- class Output: ''' Text UI output renderer. Prints messages to the stdout with optional eye candy like colors and timestamps. Usage: >>> from over import Output, prefix >>> say = Output('test', timestamp=True) >>> say('system initialized') [2013-02-28 16:41:28] INFO -- test, system initialized >>> say('system is FUBAR', prefix.fail) [2013-02-28 16:41:46] FAIL -- test, system is FUBAR >>> say('I just realized this will not work', prefix.fail, timestamp=False) !!! I just realized this will not work TODO use a generic format string ''' def __init__(self, name, timestamp=True, colors=True, default_prefix=prefix.info, default_suffix='.\n', stream=sys.stderr): self.name = name self.timestamp = timestamp self.colors = colors self.default_prefix = default_prefix self.default_suffix = default_suffix self.stream = stream def __call__(self, text, prefix=None, suffix=None, indent=0, timestamp=None, colors=None, display_name=True): if prefix is None: prefix = self.default_prefix if type(prefix) is str: prefix = (prefix, prefix) if suffix is None: suffix = self.default_suffix if timestamp is None: timestamp = self.timestamp if colors is None: colors = self.colors output = [] # [2012-11-11 16:52:06] INFO -- ahoj if timestamp: output.append(time.strftime('[%Y-%m-%d %H:%M:%S] ')) output.append(prefix[1]) output.append(' -- ') elif prefix: output.append(prefix[0]) output.append(' ') if display_name and self.name: output.append('%s, ' %(self.name)) #output.append(paragraph(str(text), indent=indent)) output.append(str(text)) if suffix: output.append(suffix) output = ''.join(output) self.stream.write(render(output, colors)) self.stream.flush() # -------------------------------------------------- def get_terminal_size(): ''' Returns current terminal's (rows, cols). ''' terminal = sys.stdout.fileno() try: return struct.unpack('HHHH', fcntl.ioctl(terminal, termios.TIOCGWINSZ, struct.pack('HHHH', 0, 0, 0, 0))) except IOError: return (40, 80) # -------------------------------------------------- _print = Output('over.core.text', stream=sys.stderr) # -------------------------------------------------- if __name__ == "__main__": pb = ProgressBar2( "§%a (§ra/§ma) [§-a>§ A] §sa [§rb/§rB] (ETA §tA)", { "a": { "unit": "f", "prefix_base2": False, "top": 39600, "precision": 1 }, "b": { "unit": "o", "prefix_base2": True, "top": 3200, "precision": 0 } }, 80 ) pb.set("a", 12000) pb.set("b", 1500) pb.render() pb.end(not True)