Source code for mg.utils.video

# MG_GPL_HEADER_BEGIN
# 
# This file is part of Media Gallery, GNU GPLv2.
# 
# Media Gallery is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
# 
# Media Gallery is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with Media Gallery; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
# 
# MG_GPL_HEADER_END

__author__ = "Media Gallery contributors <mg@lists.ssji.net>"
__copyright__ = "(c) 2011"
__credits__ = ["Dimitri Refauvelet", "Nicolas Pouillon"]

__all__ = ['VideoHandler', 'VideoStreamInfo', 'AudioStreamInfo']

class StreamInfo:
[docs] """ A stream information base class. """ experimental_codecs = [] """ List of codecs (in terms of -xcodec) for which it is necessary to add '-strict experimental'. """ format_map = {} """ Maps formats to their "-xcodec foo" ffmpeg command line """ def __init__(self, type, format, **kwargs): self.type = type self.format = format self.__kw = kwargs @property def codec(self): return self.format_map.get(self.format, self.format) def dest_args(self):
[docs] if self.type == "audio": r = ["-acodec", self.codec] elif self.type == "video": r = ["-vcodec", self.codec] else: r = [] if self.codec in self.experimental_codecs: r += ['-strict', 'experimental'] for k, v in self.__kw: r += ['-'+k, v] return r class AudioStreamInfo(StreamInfo):
[docs] """ An Audio stream information class. Inherits :py:class:`StreamInfo`. """ format_map = { "mp3": "libmp3lame", } def __init__(self, format, samplerate, channels, precision):
[docs] StreamInfo.__init__(self, "audio", format) self.samplerate = samplerate self.channels = channels self.precision = precision def __str__(self):
r = "Audio stream: format %s, %d Hz, %d channels, %d b/sample" % ( self.format, self.samplerate, self.channels, self.precision) return r def dest_args(self): r = StreamInfo.dest_args(self) return r + ["-ar", str(self.samplerate), "-ac", str(self.channels)] class VideoStreamInfo(StreamInfo):
[docs] """ A Video stream information class. Inherits :py:class:`StreamInfo`. """ def __init__(self, format, size, fps, bitrate = None, dar = None, **kwargs):
[docs] StreamInfo.__init__(self, "video", format, **kwargs) self.size = size self.fps = fps self.dar = dar self.bitrate = bitrate def __str__(self):
r = "Video stream: format %s, %dx%d, %d fps" % ( self.format, self.size[0], self.size[1], self.fps) if self.bitrate: r += ', %d bps'%self.bitrate if self.dar: r += ', dar=%0.3f'%self.dar return r __rounded_codecs = { 'libx264' : 2, } format_map = { 'h264': 'libx264', } threaded_codecs = ['libx264'] @staticmethod def __round(x, n = 8): return n * int(float(x) / n + .5) def dest_args(self): w, h = self.size codec = self.codec if self.codec in self.__rounded_codecs: w = self.__round(w, self.__rounded_codecs[self.codec]) h = self.__round(h, self.__rounded_codecs[self.codec]) r = ["-r", str(self.fps), "-s", "%dx%d" % (w, h)] r += StreamInfo.dest_args(self) if self.bitrate: r += ["-b", self.bitrate] if self.dar: r += ["-aspectratio", self.dar] if self.codec in self.threaded_codecs: r += ["-threads", "0"] return r class _Section(dict):
def __init__(self, name): self.name = name def append(self, k, v): self[k] = v def _parse_info(lines): ret = [] cur = None for l in lines: l = l.strip() if not l: continue if l.startswith("[") and l.endswith("]"): if l.startswith("[/"): ret.append(cur) cur = None else: cur = _Section(l[1:-1]) else: k, v = l.split("=", 1) cur.append(k, v) return ret class VideoHandler:
[docs] """ A Video handler class. This object can help manipulate a video file. """ def __init__(self, video_filename):
[docs] """ Creates a new handler. :type video_filename: str :param video_filename: File name containing the source video. """ self.__video_filename = video_filename self.__info_done = False @property
def streams(self):
[docs] """ list of :py:class:`~.StreamInfo` object containing definition of all the streams contained in the file. """ self.__get_info() return self.__streams @property
def container(self):
[docs] """ Container type name (eg "avi" or "mov"). """ self.__get_info() return self.__container @property
def bit_rate(self):
[docs] """ File's global bitrate (float), in bits/sec """ self.__get_info() return self.__bit_rate @property
def duration(self):
[docs] """ File run length (float), in seconds """ self.__get_info() return self.__duration def __get_info(self):
if self.__info_done: return import subprocess cmd = [ "ffprobe", "-show_format", "-show_streams", self.__video_filename, ] proc = subprocess.Popen(cmd, stdout = subprocess.PIPE, stderr = subprocess.PIPE) stdout, stderr = proc.communicate() infos = _parse_info(stdout.split("\n")) self.__parse_info(infos) self.__info_done = True def _parse_ratio(self, value, sep = '/'): if sep in value: n, d = map(float, value.split(sep, 1)) return n/d return float(value) def __parse_info(self, infos): self.__streams = [] for i in infos: if i.name == 'STREAM': if i['codec_type'] == 'audio': c = AudioStreamInfo( i['codec_name'], float(i['sample_rate']), int(i['channels']), int(i['bits_per_sample']), ) elif i['codec_type'] == 'video': dar = float(i['width']) / int(i['height']) if "display_aspect_ratio" in i: dar = self._parse_ratio(i['display_aspect_ratio'], ":") c = VideoStreamInfo( i['codec_name'], (int(i['width']), int(i['height'])), self._parse_ratio(i['r_frame_rate']), dar = dar, ) else: continue c.index = int(i['index']) self.__streams.append(c) elif i.name == 'FORMAT': self.__container = i['format_name'].split(",")[0] self.__bit_rate = float(i['bit_rate']) self.__duration = float(i['duration']) def get_streams(self, type):
[docs] """ Retrieve the streams of a given type. :type type: str :param type: Type of stream to get (eg "video" or "audio") :returns: a list of StreamInfo objects. """ return [s for s in self.streams if s.type == type] def has_video(self):
[docs] """ Returns whether this file contains at least one video stream. :returns: a boolean """ return bool(self.get_streams(self, "video")) def snapshot(self, output_file, image_size = None, time_offset = None, rot_count = 0):
[docs] """ This function calls a video thumbnailer tool to create a thumbnail of the video as a still image file. The only supported output format is JPEG. :type output_file: str :param output_file: File name of the image file to create. This file name must contain a valid JPEG file extension. :type image_size: tuple or None :param image_size: A ``(width, height)`` couple of integer sizes. Leave to None to get an image of original size. :type time_offset: int or float or None :param time_offset: A number of seconds to take the snapshot at, from the start of stream. If offset is beyond end of file or None, set it at 1/4th of the file. :raises: RuntimeError if thumbnail generation failed. :raises: ValueError if the input file has no video stream """ import subprocess streams = self.get_streams('video') assert streams, ValueError("No video stream") if time_offset is None or time_offset > self.duration: time_offset = self.duration/4 cmd = [ "ffmpeg", "-ss", str(time_offset), "-i", self.__video_filename, "-vcodec", "mjpeg", "-vframes", "1", "-an", "-f", "rawvideo", ] cmd += self.__rot(rot_count) if image_size: cmd += [ "-s", "%dx%d" % image_size, ] cmd += [ "-y", output_file, ] # print cmd proc = subprocess.Popen(cmd, stdout = subprocess.PIPE, stderr = subprocess.PIPE) proc.communicate() if proc.returncode > 0: raise RuntimeError() def __rot(self, rot_count):
rot_count %= 4 return [ [], ['-vf', 'transpose=1'], # 90 clockwise ['-vf', 'hflip,vflip'], # 180 ['-vf', 'transpose=2'], # 270 clockwise ][rot_count] def transcode(self, output_file, container_name, vs_info, as_info, rot_count = 0, add_flags = ""):
[docs] """ This function calls a video transcoding tool to create another version of the same video. :type output_file: str :param output_file: File name of the image file to create. :param str container_name: Name of the container format for output_file. :param VideoStreamInfo vs_info: Video stream format :param AudioStreamInfo as_info: Audio stream format :raises: RuntimeError if thumbnail generation failed. :raises: ValueError if the input file has no video stream """ import subprocess cmd = [ "ffmpeg", # "-loglevel", "quiet", "-i", self.__video_filename, ] cmd += self.__rot(rot_count) if vs_info: cmd += vs_info.dest_args() else: cmd += ['-vn'] if as_info: cmd += as_info.dest_args() else: cmd += ['-an'] cmd += filter(None, add_flags.split(' ')) cmd += [ "-f", container_name, "-y", output_file, ] # print cmd proc = subprocess.Popen(cmd)#, stdout = subprocess.PIPE, stderr = subprocess.PIPE) stdin, stderr = proc.communicate() if proc.returncode > 0: raise RuntimeError(stderr) def main():
import optparse parser = optparse.OptionParser(description='Video snapshotter') parser.add_option('--size', metavar='WIDTHxHEIGHT', type=str, default=None, dest = "size", help='Output size. Default is source file size.') parser.add_option('--info', action="store_true", default=False, dest = "info", help='Print info.') parser.add_option('--snap', metavar='FILENAME', type=str, default=None, dest = "snap", help='Generate a snapshot.') parser.add_option('--convert', metavar='FILENAME', type=str, default=None, dest = "convert", help='Transcode video.') parser.add_option('--convert-preset', metavar='PRESET', type=str, default=None, dest = "convert_preset", help='Use given preset for transcoding.') parser.add_option('--offset', metavar='seconds', type=int, default=1, dest = "offset", help='Time offset from the start of stream. seconds.') parser.add_option('--rot-count', metavar='COUNT', type=int, default=0, dest = "rot", help='Count of CW 90-degree rotations to apply') options, args = parser.parse_args() h = VideoHandler(args[0]) if options.info: print "Container:", h.container print "Bitrate:", h.bit_rate print "Duration:", h.duration for s in h.streams: print s presets = dict( flash = dict( fmt = "flv", vs_info = VideoStreamInfo("flv", (320, 240), 25), as_info = AudioStreamInfo("libmp3lame", 44100, 2, 16), ), ) if options.snap: wh = None if options.size: wh = map(int, args.size.split("x", 1)) h.snapshot(options.snap, wh, options.offset, options.rot) if options.convert: preset = presets.get(options.convert_preset, presets['flash']) h.transcode(options.convert, preset['fmt'], preset['vs_info'], preset['as_info'], options.rot ) if __name__ == '__main__': main()