# MG_GPL_HEADER_BEGIN
#
# This file is part of Media Gallery, GNU GPLv2.
#
# Media Gallery is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# Media Gallery is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Media Gallery; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
#
# MG_GPL_HEADER_END
__author__ = "Media Gallery contributors <mg@lists.ssji.net>"
__copyright__ = "(c) 2011"
__credits__ = ["Dimitri Refauvelet", "Nicolas Pouillon"]
__all__ = ['VideoHandler', 'VideoStreamInfo', 'AudioStreamInfo']
class StreamInfo:
[docs] """
A stream information base class.
"""
experimental_codecs = []
"""
List of codecs (in terms of -xcodec) for which it is necessary to
add '-strict experimental'.
"""
format_map = {}
"""
Maps formats to their "-xcodec foo" ffmpeg command line
"""
def __init__(self, type, format, **kwargs):
self.type = type
self.format = format
self.__kw = kwargs
@property
def codec(self):
return self.format_map.get(self.format, self.format)
def dest_args(self):
[docs] if self.type == "audio":
r = ["-acodec", self.codec]
elif self.type == "video":
r = ["-vcodec", self.codec]
else:
r = []
if self.codec in self.experimental_codecs:
r += ['-strict', 'experimental']
for k, v in self.__kw:
r += ['-'+k, v]
return r
class AudioStreamInfo(StreamInfo):
[docs] """
An Audio stream information class. Inherits :py:class:`StreamInfo`.
"""
format_map = {
"mp3": "libmp3lame",
}
def __init__(self, format, samplerate, channels, precision):
[docs] StreamInfo.__init__(self, "audio", format)
self.samplerate = samplerate
self.channels = channels
self.precision = precision
def __str__(self):
r = "Audio stream: format %s, %d Hz, %d channels, %d b/sample" % (
self.format, self.samplerate, self.channels, self.precision)
return r
def dest_args(self):
r = StreamInfo.dest_args(self)
return r + ["-ar", str(self.samplerate), "-ac", str(self.channels)]
class VideoStreamInfo(StreamInfo):
[docs] """
A Video stream information class. Inherits :py:class:`StreamInfo`.
"""
def __init__(self, format, size, fps, bitrate = None, dar = None, **kwargs):
[docs] StreamInfo.__init__(self, "video", format, **kwargs)
self.size = size
self.fps = fps
self.dar = dar
self.bitrate = bitrate
def __str__(self):
r = "Video stream: format %s, %dx%d, %d fps" % (
self.format, self.size[0], self.size[1], self.fps)
if self.bitrate:
r += ', %d bps'%self.bitrate
if self.dar:
r += ', dar=%0.3f'%self.dar
return r
__rounded_codecs = {
'libx264' : 2,
}
format_map = {
'h264': 'libx264',
}
threaded_codecs = ['libx264']
@staticmethod
def __round(x, n = 8):
return n * int(float(x) / n + .5)
def dest_args(self):
w, h = self.size
codec = self.codec
if self.codec in self.__rounded_codecs:
w = self.__round(w, self.__rounded_codecs[self.codec])
h = self.__round(h, self.__rounded_codecs[self.codec])
r = ["-r", str(self.fps), "-s", "%dx%d" % (w, h)]
r += StreamInfo.dest_args(self)
if self.bitrate:
r += ["-b", self.bitrate]
if self.dar:
r += ["-aspectratio", self.dar]
if self.codec in self.threaded_codecs:
r += ["-threads", "0"]
return r
class _Section(dict):
def __init__(self, name):
self.name = name
def append(self, k, v):
self[k] = v
def _parse_info(lines):
ret = []
cur = None
for l in lines:
l = l.strip()
if not l:
continue
if l.startswith("[") and l.endswith("]"):
if l.startswith("[/"):
ret.append(cur)
cur = None
else:
cur = _Section(l[1:-1])
else:
k, v = l.split("=", 1)
cur.append(k, v)
return ret
class VideoHandler:
[docs] """
A Video handler class. This object can help manipulate a video file.
"""
def __init__(self, video_filename):
[docs] """
Creates a new handler.
:type video_filename: str
:param video_filename: File name containing the source video.
"""
self.__video_filename = video_filename
self.__info_done = False
@property
def streams(self):
[docs] """
list of :py:class:`~.StreamInfo` object containing definition
of all the streams contained in the file.
"""
self.__get_info()
return self.__streams
@property
def container(self):
[docs] """
Container type name (eg "avi" or "mov").
"""
self.__get_info()
return self.__container
@property
def bit_rate(self):
[docs] """
File's global bitrate (float), in bits/sec
"""
self.__get_info()
return self.__bit_rate
@property
def duration(self):
[docs] """
File run length (float), in seconds
"""
self.__get_info()
return self.__duration
def __get_info(self):
if self.__info_done:
return
import subprocess
cmd = [
"ffprobe", "-show_format", "-show_streams", self.__video_filename,
]
proc = subprocess.Popen(cmd, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
stdout, stderr = proc.communicate()
infos = _parse_info(stdout.split("\n"))
self.__parse_info(infos)
self.__info_done = True
def _parse_ratio(self, value, sep = '/'):
if sep in value:
n, d = map(float, value.split(sep, 1))
return n/d
return float(value)
def __parse_info(self, infos):
self.__streams = []
for i in infos:
if i.name == 'STREAM':
if i['codec_type'] == 'audio':
c = AudioStreamInfo(
i['codec_name'],
float(i['sample_rate']),
int(i['channels']),
int(i['bits_per_sample']),
)
elif i['codec_type'] == 'video':
dar = float(i['width']) / int(i['height'])
if "display_aspect_ratio" in i:
dar = self._parse_ratio(i['display_aspect_ratio'], ":")
c = VideoStreamInfo(
i['codec_name'],
(int(i['width']), int(i['height'])),
self._parse_ratio(i['r_frame_rate']),
dar = dar,
)
else:
continue
c.index = int(i['index'])
self.__streams.append(c)
elif i.name == 'FORMAT':
self.__container = i['format_name'].split(",")[0]
self.__bit_rate = float(i['bit_rate'])
self.__duration = float(i['duration'])
def get_streams(self, type):
[docs] """
Retrieve the streams of a given type.
:type type: str
:param type: Type of stream to get (eg "video" or "audio")
:returns: a list of StreamInfo objects.
"""
return [s for s in self.streams if s.type == type]
def has_video(self):
[docs] """
Returns whether this file contains at least one video stream.
:returns: a boolean
"""
return bool(self.get_streams(self, "video"))
def snapshot(self, output_file, image_size = None, time_offset = None, rot_count = 0):
[docs] """
This function calls a video thumbnailer tool to create a
thumbnail of the video as a still image file. The only
supported output format is JPEG.
:type output_file: str
:param output_file: File name of the image file to
create. This file name must contain a
valid JPEG file extension.
:type image_size: tuple or None
:param image_size: A ``(width, height)`` couple of integer sizes.
Leave to None to get an image of original size.
:type time_offset: int or float or None
:param time_offset: A number of seconds to take the snapshot
at, from the start of stream. If offset is
beyond end of file or None, set it at
1/4th of the file.
:raises: RuntimeError if thumbnail generation failed.
:raises: ValueError if the input file has no video stream
"""
import subprocess
streams = self.get_streams('video')
assert streams, ValueError("No video stream")
if time_offset is None or time_offset > self.duration:
time_offset = self.duration/4
cmd = [
"ffmpeg",
"-ss", str(time_offset),
"-i", self.__video_filename,
"-vcodec", "mjpeg",
"-vframes", "1",
"-an",
"-f", "rawvideo",
]
cmd += self.__rot(rot_count)
if image_size:
cmd += [
"-s", "%dx%d" % image_size,
]
cmd += [
"-y", output_file,
]
# print cmd
proc = subprocess.Popen(cmd, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
proc.communicate()
if proc.returncode > 0:
raise RuntimeError()
def __rot(self, rot_count):
rot_count %= 4
return [
[],
['-vf', 'transpose=1'], # 90 clockwise
['-vf', 'hflip,vflip'], # 180
['-vf', 'transpose=2'], # 270 clockwise
][rot_count]
def transcode(self, output_file, container_name, vs_info, as_info, rot_count = 0, add_flags = ""):
[docs] """
This function calls a video transcoding tool to create another
version of the same video.
:type output_file: str
:param output_file: File name of the image file to
create.
:param str container_name: Name of the container format for
output_file.
:param VideoStreamInfo vs_info: Video stream format
:param AudioStreamInfo as_info: Audio stream format
:raises: RuntimeError if thumbnail generation failed.
:raises: ValueError if the input file has no video stream
"""
import subprocess
cmd = [
"ffmpeg",
# "-loglevel", "quiet",
"-i", self.__video_filename,
]
cmd += self.__rot(rot_count)
if vs_info:
cmd += vs_info.dest_args()
else:
cmd += ['-vn']
if as_info:
cmd += as_info.dest_args()
else:
cmd += ['-an']
cmd += filter(None, add_flags.split(' '))
cmd += [
"-f", container_name,
"-y", output_file,
]
# print cmd
proc = subprocess.Popen(cmd)#, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
stdin, stderr = proc.communicate()
if proc.returncode > 0:
raise RuntimeError(stderr)
def main():
import optparse
parser = optparse.OptionParser(description='Video snapshotter')
parser.add_option('--size', metavar='WIDTHxHEIGHT', type=str, default=None, dest = "size",
help='Output size. Default is source file size.')
parser.add_option('--info', action="store_true", default=False, dest = "info",
help='Print info.')
parser.add_option('--snap', metavar='FILENAME', type=str, default=None, dest = "snap",
help='Generate a snapshot.')
parser.add_option('--convert', metavar='FILENAME', type=str, default=None, dest = "convert",
help='Transcode video.')
parser.add_option('--convert-preset', metavar='PRESET', type=str, default=None, dest = "convert_preset",
help='Use given preset for transcoding.')
parser.add_option('--offset', metavar='seconds', type=int, default=1, dest = "offset",
help='Time offset from the start of stream. seconds.')
parser.add_option('--rot-count', metavar='COUNT', type=int, default=0, dest = "rot",
help='Count of CW 90-degree rotations to apply')
options, args = parser.parse_args()
h = VideoHandler(args[0])
if options.info:
print "Container:", h.container
print "Bitrate:", h.bit_rate
print "Duration:", h.duration
for s in h.streams:
print s
presets = dict(
flash = dict(
fmt = "flv",
vs_info = VideoStreamInfo("flv", (320, 240), 25),
as_info = AudioStreamInfo("libmp3lame", 44100, 2, 16),
),
)
if options.snap:
wh = None
if options.size:
wh = map(int, args.size.split("x", 1))
h.snapshot(options.snap, wh, options.offset,
options.rot)
if options.convert:
preset = presets.get(options.convert_preset, presets['flash'])
h.transcode(options.convert,
preset['fmt'],
preset['vs_info'],
preset['as_info'],
options.rot
)
if __name__ == '__main__':
main()