#! /bin/env python3 # encoding: utf-8 import json import os import over import re import subprocess import tempfile import time import pathlib # FIXME sort from aux import Command, parse_fps # -------------------------------------------------- prefix = over.core.textui.prefix _print = over.core.textui.Output("over.video") # -------------------------------------------------- command = over.core.types.ndict() command.identify = Command("ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", "INFILE") command.normalize_prepass = Command("ffmpeg", "-i", "INFILE", "-af", "volumedetect", "-f", "null", "/dev/null") command.encode_theora = Command("ffmpeg", "-i", "INFILE", "-codec:v", "libtheora", "-qscale:v", "VQ", "-codec:a", "libvorbis", "-qscale:a", "AQ", "NORMALIZE", "OUTFILE") command.encode_x264 = Command("ffmpeg", "-i", "INFILE", "-c:v", "libx264", "-preset", "slow", "-crf", "VQ", "-profile:v", "high", "-level", "4.2", "-codec:a", "libvorbis", "-qscale:a", "AQ", "NORMALIZE", "OUTFILE") command.encode_wav = Command("ffmpeg", "-i", "INFILE", "-codec:a", "pcm_s16le", "NORMALIZE", "OUTFILE") command.normalize = Command("-filter:a", "VOLUME") # -------------------------------------------------- if __name__ == "__main__": main = over.core.app.Main("Over-Video", "0.1", "AWARE-Overwatch Joint Software License", "~/.over/video.cfg") main.add_option("profile", "str", "x264", "Encoding profile to use. Available are either §mtheora§/ for Theora and Vorbis in an Ogg container, §mx264§/ for H.264 and Vorbis in an MP4 container, or §mwav§/ which just extracts audio into a wav for further processing.") main.add_option("video-quality", "float", 22, "Video encoding quality. Use 0-10 for Theora (0 being the lowest, 5-7 is generally watchable) and 0-51 for x264 (0 being lossless, 18-28 is reasonable).", short_name="v") main.add_option("audio-quality", "float", 2, "Audio encoding quality with -1 being the worst and 10 being the best.", short_name="a") main.add_option("normalize", "bool", True, "Normalize the audio track.", short_name="N") main.add_option("normalize-mean", "float", -20.0, "Target mean volume to target.", short_name="n") main.add_option("dump-commands", "bool", False, "Print ffmpeg commands that would be executed. Implies §B--no-§garmed§/. If §B--§gnormalize§/ is in effect, the normalization pre-pass will still be performed so that the proper volume correction can be computed.", short_name="D") main.add_option("armed", "bool", False, "Perform the suggested action.", short_name="A", use_cfg_file=False) main.enable_help("h") main.add_help("Description", ["Over-Video is a simple video converter."]) main.parse() _print("using profile §b%s§/: video_quality=%.1f, audio_quality=%.1f, normalize=%s" %(main.cfg.profile, main.cfg.video_quality, main.cfg.audio_quality, "%.1f dB" %(main.cfg.normalize_mean) if main.cfg.normalize else "None")) for tgt in main.targets: print() if not os.path.exists(tgt) or os.path.isdir(tgt): _print("target §y%s§/ §ris not a readable file§/, skipping" %(tgt), prefix.fail) continue _print("processing §B%s§/" %(tgt), prefix.start) command.identify.reset() command.identify.INFILE = tgt command.identify.run() identify_raw = command.identify.get_all_output() identify_dict = json.loads(identify_raw.decode("utf-8")) info = over.core.types.ndict() info.duration = float(identify_dict["format"]["duration"]) video_streams = [s for s in identify_dict["streams"] if s["codec_type"] == "video"] audio_streams = [s for s in identify_dict["streams"] if s["codec_type"] == "audio"] amount_vs = len(video_streams) amount_as = len(audio_streams) if amount_vs != 1: _print("detected §r%d§/ video streams" %(amount_vs), prefix.fail) main.exit(1) if amount_as != 1: _print("detected §r%d§/ audio streams" %(amount_as), prefix.fail) main.exit(1) video = video_streams[0] audio = audio_streams[0] info.video_codec = video["codec_name"] info.video_size_x = video["width"] info.video_size_y = video["height"] info.video_fps = over.core.textui.Unit(parse_fps(video["r_frame_rate"]), "Hz") info.video_bitrate = over.core.textui.Unit(audio["bit_rate"], "b/s") info.audio_codec = audio["codec_name"] info.audio_channels = audio["channels"] info.audio_samplerate = over.core.textui.Unit(audio["sample_rate"], "Hz") info.audio_language = audio["tags"]["language"] info.audio_bitrate = over.core.textui.Unit(audio["bit_rate"], "b/s") _print("§mvideo§/: size=§b%d§/x§b%d§/ px, framerate=%s, codec=%s, bitrate=%s" %(info.video_size_x, info.video_size_y, info.video_fps, info.video_codec, info.video_bitrate)) _print("§caudio§/: channels=§b%d§/, samplerate=%s, codec=%s, bitrate=%s, language=%s" %(info.audio_channels, info.audio_samplerate, info.audio_codec, info.audio_bitrate, info.audio_language)) if (main.cfg.armed or main.cfg.dump_commands) and main.cfg.normalize: _print("running normalization pre-pass") command.normalize_prepass.reset() command.normalize_prepass.INFILE = tgt command.normalize_prepass.run(stderr=True) pb = over.core.textui.ProgressBar(60, int(info.video_fps.value * info.duration), "frames") output_buffer = [] while True: time.sleep(.25) out = command.normalize_prepass.get_output() if out: output_buffer.append(out) if b"frame=" in out: frame_id = re.findall(b"frame= *(\d+) ", out)[0] pb.update(int(frame_id)) elif out is None: break output = b''.join(output_buffer) if b"mean_volume: " in output: info.mean_volume = float(re.findall(b"mean_volume: (-\d+\.\d+) dB", output)[0]) info.volume_correction = info.mean_volume - main.cfg.normalize_mean else: _print("§runexpected ffmpeg output§/, dump follows", prefix.fail, suffix=":\n") print(output) raise RuntimeError pb.blank() _print("detected volume %.2f dB, correction %.2f dB" %(info.mean_volume, info.volume_correction)) # select encoding command if main.cfg.profile == "theora": encode_cmd = command.encode_theora info.tmp_file = tempfile.mktemp(suffix='.ogv', dir='.') elif main.cfg.profile == "x264": encode_cmd = command.encode_x264 info.tmp_file = tempfile.mktemp(suffix='.mp4', dir='.') elif main.cfg.profile == "wav": encode_cmd = command.encode_wav info.tmp_file = tempfile.mktemp(suffix='.wav', dir='.') else: _print("§runknown profile selected§/: §r%s§/" %(main.cfg.profile), prefix.fail) raise RuntimeError # populate its arguments encode_cmd.reset() encode_cmd.INFILE = tgt encode_cmd.OUTFILE = info.tmp_file encode_cmd.AQ = main.cfg.audio_quality encode_cmd.VQ = main.cfg.video_quality if main.cfg.normalize: command.normalize.reset() command.normalize.VOLUME = "volume=%.2fdB" %(info.volume_correction) encode_cmd.NORMALIZE = command.normalize else: encode_cmd.NORMALIZE = None if main.cfg.dump_commands: _print("will execute §B%s§/" %(" ".join(encode_cmd.dump())), prefix.start) if main.cfg.armed: pb = over.core.textui.ProgressBar(60, int(info.video_fps.value * info.duration), "frames") encode_cmd.run(stderr=True) while True: time.sleep(.25) out = encode_cmd.get_output() if out: if b"frame=" in out: frame_id = re.findall(b"frame= *(\d+) ", out)[0] pb.update(int(frame_id)) elif out is None: break original_filesize = over.core.textui.Unit(pathlib.Path(tgt).stat().st_size, "B") new_filesize = over.core.textui.Unit(pathlib.Path(info.tmp_file).stat().st_size, "B") pb.blank() _print("encoding finished: %s -> %s" %(original_filesize, new_filesize), prefix.done) new_filename = pathlib.Path(tgt).stem + pathlib.Path(info.tmp_file).suffix if main.cfg.dump_commands: _print('will execute §Bmv "%s" "%s"§/' %(info.tmp_file, new_filename), prefix.start) if main.cfg.armed: pathlib.Path(info.tmp_file).rename(new_filename)