#! /bin/env python3 # encoding: utf-8 import json import os import over import re import subprocess import tempfile import time import pathlib # FIXME sort from aux import Command, parse_fps # -------------------------------------------------- prefix = over.core.textui.prefix _print = over.core.textui.Output('over.video') # -------------------------------------------------- command = over.core.types.ndict() command.identify = Command('ffprobe', '-v', 'quiet', '-print_format', 'json', '-show_format', '-show_streams', 'INFILE') command.normalize_prepass = Command('ffmpeg', '-i', 'INFILE', '-af', 'volumedetect', '-f', 'null', '/dev/null') command.encode_theora = Command('ffmpeg', '-i', 'INFILE', '-codec:v', 'libtheora', '-qscale:v', 'VQ', '-codec:a', 'libvorbis', '-qscale:a', 'AQ', 'NORMALIZE', 'OUTFILE') command.encode_x264 = Command('ffmpeg', '-i', 'INFILE', '-c:v', 'libx264', '-preset', 'slow', '-crf', 'VQ', '-profile:v', 'high', '-level', '4.2', '-codec:a', 'libvorbis', '-qscale:a', 'AQ', 'NORMALIZE', 'OUTFILE') command.encode_wav = Command('ffmpeg', '-i', 'INFILE', '-codec:a', 'pcm_s16le', 'NORMALIZE', 'OUTFILE') command.normalize = Command('-filter:a', 'VOLUME') # -------------------------------------------------- if __name__ == '__main__': main = over.core.app.Main('Over-Video', '0.1', 'AWARE-Overwatch Joint Software License', '~/.over/video.cfg') main.add_option('profile', 'str', 'x264', 'Encoding profile to use. Available are either §mtheora§/ for Theora and Vorbis in an Ogg container, §mx264§/ for H.264 and Vorbis in an MP4 container, or §mwav§/ which just extracts audio into a wav for further processing.') main.add_option('video-quality', 'float', 22, 'Video encoding quality. Use 0-10 for Theora (0 being the lowest, 5-7 is generally watchable) and 0-51 for x264 (0 being lossless, 18-28 is reasonable).', short_name='v') main.add_option('audio-quality', 'float', 2, 'Audio encoding quality with -1 being the worst and 10 being the best.', short_name='a') main.add_option('normalize', 'bool', True, 'Normalize the audio track.', short_name='N') main.add_option('normalize-mean', 'float', -20.0, 'Target mean volume to target.', short_name='n') main.add_option('dump-commands', 'bool', False, 'Print ffmpeg commands that would be executed. Implies §B--no-§garmed§/. If §B--§gnormalize§/ is in effect, the normalization pre-pass will still be performed so that the proper volume correction can be computed.', short_name='D') main.add_option('armed', 'bool', False, 'Perform the suggested action.', short_name='A', use_cfg_file=False) main.enable_help('h') main.add_help('Description', ['Over-Video is a simple video converter.']) main.parse() _print('using profile §b%s§/: video_quality=%.1f, audio_quality=%.1f, normalize=%s' %(main.cfg.profile, main.cfg.video_quality, main.cfg.audio_quality, '%.1f dB' %(main.cfg.normalize_mean) if main.cfg.normalize else 'None')) if not main.targets: _print("no files specified", prefix.warn) for tgt in main.targets: print() if not os.path.exists(tgt) or os.path.isdir(tgt): _print('target §y%s§/ §ris not a readable file§/, skipping' %(tgt), prefix.fail) continue _print('processing §B%s§/' %(tgt), prefix.start) command.identify.reset() command.identify.INFILE = tgt command.identify.run() identify_raw = command.identify.get_all_output() identify_dict = json.loads(identify_raw.decode('utf-8')) info = over.core.types.ndict() info.duration = float(identify_dict['format']['duration']) video_streams = [s for s in identify_dict['streams'] if s['codec_type'] == 'video'] audio_streams = [s for s in identify_dict['streams'] if s['codec_type'] == 'audio'] amount_vs = len(video_streams) amount_as = len(audio_streams) if amount_vs != 1: _print('detected §r%d§/ video streams' %(amount_vs), prefix.fail) main.exit(1) if amount_as != 1: _print('detected §r%d§/ audio streams' %(amount_as), prefix.fail) main.exit(1) video = video_streams[0] audio = audio_streams[0] info.video_codec = video['codec_name'] info.video_size_x = video['width'] info.video_size_y = video['height'] info.video_fps = over.core.textui.Unit(parse_fps(video['r_frame_rate']), 'Hz') info.video_bitrate = over.core.textui.Unit(audio['bit_rate'], 'b/s') info.audio_codec = audio['codec_name'] info.audio_channels = audio['channels'] info.audio_samplerate = over.core.textui.Unit(audio['sample_rate'], 'Hz') info.audio_language = audio['tags']['language'] info.audio_bitrate = over.core.textui.Unit(audio['bit_rate'], 'b/s') _print('§mvideo§/: size=§b%d§/x§b%d§/ px, framerate=%s, codec=%s, bitrate=%s' %(info.video_size_x, info.video_size_y, info.video_fps, info.video_codec, info.video_bitrate)) _print('§caudio§/: channels=§b%d§/, samplerate=%s, codec=%s, bitrate=%s, language=%s' %(info.audio_channels, info.audio_samplerate, info.audio_codec, info.audio_bitrate, info.audio_language)) if (main.cfg.armed or main.cfg.dump_commands) and main.cfg.normalize: _print('running normalization pre-pass') command.normalize_prepass.reset() command.normalize_prepass.INFILE = tgt command.normalize_prepass.run(stderr=True) pb = over.core.textui.ProgressBar(60, int(info.video_fps.value * info.duration), 'frames') output_buffer = [] while True: time.sleep(.25) out = command.normalize_prepass.get_output() if out: output_buffer.append(out) if b'frame=' in out: frame_id = re.findall(b'frame= *(\d+) ', out)[0] pb.update(int(frame_id)) elif out is None: break output = b''.join(output_buffer) if b'mean_volume: ' in output: info.mean_volume = float(re.findall(b'mean_volume: (-\d+\.\d+) dB', output)[0]) info.volume_correction = info.mean_volume - main.cfg.normalize_mean else: _print('§runexpected ffmpeg output§/, dump follows', prefix.fail, suffix=':\n') print(output) raise RuntimeError pb.blank() _print('detected volume %.2f dB, correction %.2f dB' %(info.mean_volume, info.volume_correction)) # select encoding command if main.cfg.profile == 'theora': encode_cmd = command.encode_theora info.tmp_file = tempfile.mktemp(suffix='.ogv', dir='.') elif main.cfg.profile == 'x264': encode_cmd = command.encode_x264 info.tmp_file = tempfile.mktemp(suffix='.mp4', dir='.') elif main.cfg.profile == 'wav': encode_cmd = command.encode_wav info.tmp_file = tempfile.mktemp(suffix='.wav', dir='.') else: _print('§runknown profile selected§/: §r%s§/' %(main.cfg.profile), prefix.fail) raise RuntimeError # populate its arguments encode_cmd.reset() encode_cmd.INFILE = tgt encode_cmd.OUTFILE = info.tmp_file encode_cmd.AQ = main.cfg.audio_quality encode_cmd.VQ = main.cfg.video_quality if main.cfg.normalize: command.normalize.reset() command.normalize.VOLUME = 'volume=%.2fdB' %(info.volume_correction) encode_cmd.NORMALIZE = command.normalize else: encode_cmd.NORMALIZE = None if main.cfg.dump_commands: _print('will execute §B%s§/' %(' '.join(encode_cmd.dump())), prefix.start) if main.cfg.armed: pb = over.core.textui.ProgressBar(60, int(info.video_fps.value * info.duration), 'frames') encode_cmd.run(stderr=True) while True: time.sleep(.25) out = encode_cmd.get_output() if out: if b'frame=' in out: frame_id = re.findall(b'frame= *(\d+) ', out)[0] pb.update(int(frame_id)) elif out is None: break original_filesize = over.core.textui.Unit(pathlib.Path(tgt).stat().st_size, 'B') new_filesize = over.core.textui.Unit(pathlib.Path(info.tmp_file).stat().st_size, 'B') pb.blank() _print('encoding finished: %s -> %s' %(original_filesize, new_filesize), prefix.done) new_filename = pathlib.Path(tgt).stem + pathlib.Path(info.tmp_file).suffix if main.cfg.dump_commands: _print('will execute §Bmv '%s' '%s'§/' %(info.tmp_file, new_filename), prefix.start) if main.cfg.armed: pathlib.Path(info.tmp_file).rename(new_filename)