1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2013 MediaGoblin contributors. See AUTHORS.
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 from contextlib
import contextmanager
22 #os.environ['GST_DEBUG'] = '4,python:4'
24 pytest
.importorskip("gi.repository.Gst")
27 gi
.require_version('Gst', '1.0')
28 from gi
.repository
import Gst
31 from mediagoblin
.media_types
.video
.transcoders
import (capture_thumb
,
33 from mediagoblin
.media_types
.tools
import discover
36 def create_data(suffix
=None, make_audio
=False):
37 video
= tempfile
.NamedTemporaryFile()
38 src
= Gst
.ElementFactory
.make('videotestsrc', None)
39 src
.set_property('num-buffers', 10)
40 videorate
= Gst
.ElementFactory
.make('videorate', None)
41 enc
= Gst
.ElementFactory
.make('theoraenc', None)
42 mux
= Gst
.ElementFactory
.make('oggmux', None)
43 dst
= Gst
.ElementFactory
.make('filesink', None)
44 dst
.set_property('location', video
.name
)
45 pipeline
= Gst
.Pipeline()
47 pipeline
.add(videorate
)
56 audio_src
= Gst
.ElementFactory
.make('audiotestsrc', None)
57 audio_src
.set_property('num-buffers', 10)
58 audiorate
= Gst
.ElementFactory
.make('audiorate', None)
59 audio_enc
= Gst
.ElementFactory
.make('vorbisenc', None)
60 pipeline
.add(audio_src
)
61 pipeline
.add(audio_enc
)
62 pipeline
.add(audiorate
)
63 audio_src
.link(audiorate
)
64 audiorate
.link(audio_enc
)
66 pipeline
.set_state(Gst
.State
.PLAYING
)
67 state
= pipeline
.get_state(3 * Gst
.SECOND
)
68 assert state
[0] == Gst
.StateChangeReturn
.SUCCESS
69 bus
= pipeline
.get_bus()
70 message
= bus
.timed_pop_filtered(
72 Gst
.MessageType
.ERROR | Gst
.MessageType
.EOS
)
73 pipeline
.set_state(Gst
.State
.NULL
)
75 result
= tempfile
.NamedTemporaryFile(suffix
=suffix
)
77 result
= tempfile
.NamedTemporaryFile()
78 yield (video
.name
, result
.name
)
81 #TODO: this should be skipped if video plugin is not enabled
82 def test_thumbnails():
84 Test thumbnails generation.
85 1. Create a video (+audio) from gst's videotestsrc
87 3. Everything should get removed because of temp files usage
89 #data create_data() as (video_name, thumbnail_name):
90 test_formats
= [('.png', 'png'), ('.jpg', 'jpeg'), ('.gif', 'gif')]
91 for suffix
, format
in test_formats
:
92 with
create_data(suffix
) as (video_name
, thumbnail_name
):
93 capture_thumb(video_name
, thumbnail_name
, width
=40)
94 # check result file format
95 assert imghdr
.what(thumbnail_name
) == format
96 # TODO: check height and width
97 # FIXME: it doesn't work with small width, say, 10px. This should be
99 suffix
, format
= test_formats
[0]
100 with
create_data(suffix
, True) as (video_name
, thumbnail_name
):
101 capture_thumb(video_name
, thumbnail_name
, width
=40)
102 assert imghdr
.what(thumbnail_name
) == format
103 with
create_data(suffix
, True) as (video_name
, thumbnail_name
):
104 capture_thumb(video_name
, thumbnail_name
, width
=10) # smaller width
105 assert imghdr
.what(thumbnail_name
) == format
106 with
create_data(suffix
, True) as (video_name
, thumbnail_name
):
107 capture_thumb(video_name
, thumbnail_name
, width
=100) # bigger width
108 assert imghdr
.what(thumbnail_name
) == format
111 def test_transcoder():
113 with
create_data() as (video_name
, result_name
):
114 transcoder
= VideoTranscoder()
115 transcoder
.transcode(
116 video_name
, result_name
,
118 vp8_threads
=0, # autodetect
120 dimensions
=(640, 640))
121 assert len(discover(result_name
).get_video_streams()) == 1
123 with
create_data(make_audio
=True) as (video_name
, result_name
):
124 transcoder
= VideoTranscoder()
125 transcoder
.transcode(
126 video_name
, result_name
,
128 vp8_threads
=0, # autodetect
130 dimensions
=(640, 640))
131 assert len(discover(result_name
).get_video_streams()) == 1
132 assert len(discover(result_name
).get_audio_streams()) == 1