over-video/aux.py

127 lines
2.6 KiB
Python

#! /bin/env python3
# encoding: utf-8
import queue
import subprocess
import sys
import threading
import time
import over # FIXME
# --------------------------------------------------
def capture_output(stream, fifo):
while True:
chunk = stream.read1(100)
if chunk:
fifo.put(chunk)
else:
break
stream.close()
class Command:
def __init__(self, sequence):
self.__dict__["sequence"] = list(sequence)
self.__dict__["thread"] = None
self.__dict__["fifo"] = None
def __setattr__(self, name, value):
found = False
for i, word in enumerate(self.sequence):
if word == name:
self.sequence[i] = value
found = True
if not found:
raise AttributeError("Command has no attribute \'%s\'" %(name))
def dump(self, sequence=None):
out = []
if not sequence:
sequence = self.sequence
for item in sequence:
if type(item) is list:
out += self.dump(item)
else:
out.append(str(item))
return out
def run(self, async=False, stderr=False):
"""
Executes the command in the current environment.
async return immediatelly, use poll_output to get output
stderr capture stderr instead of stdout
"""
self.__dict__["process"] = subprocess.Popen(self.dump(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=1)
self.__dict__["fifo"] = queue.Queue()
self.__dict__["thread"] = threading.Thread(
target=capture_output,
args=(self.process.stderr if stderr else self.process.stdout, self.fifo)
)
self.thread.daemon = True # thread dies with the program
self.thread.start()
if not async:
buffer = []
probably_dead = False
while True:
probably_dead = self.process.poll() is not None
chunk = self.poll_output()
if chunk:
buffer.append(chunk)
else:
time.sleep(0.01)
if not chunk and self.process.poll() is not None and probably_dead:
break
return b"".join(buffer)
else:
return None
def poll_output(self):
"""
Returns the output of a currently running process and clears the buffer.
Returns None if no process is running.
Blocking - always returns at least one char.
"""
if self.fifo.empty():
return None
else:
buffer = []
while not self.fifo.empty():
buffer.append(self.fifo.get_nowait())
return b"".join(buffer)
@property
def running(self):
return self.process.poll() is None
# --------------------------------------------------
def parse_fps(raw):
if "/" in raw:
num, den = (int(x) for x in raw.split("/"))
return float(num) / float(den)
return float(raw)