1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 from werkzeug
.utils
import secure_filename
26 from mediagoblin
import storage
33 def test_clean_listy_filepath():
34 expected
= [u
'dir1', u
'dir2', u
'linooks.jpg']
35 assert storage
.clean_listy_filepath(
36 ['dir1', 'dir2', 'linooks.jpg']) == expected
38 expected
= [u
'dir1', u
'foo_.._nasty', u
'linooks.jpg']
39 assert storage
.clean_listy_filepath(
40 ['/dir1/', 'foo/../nasty', 'linooks.jpg']) == expected
42 expected
= [u
'etc', u
'passwd']
43 assert storage
.clean_listy_filepath(
44 ['../../../etc/', 'passwd']) == expected
46 with pytest
.raises(storage
.InvalidFilepath
):
47 storage
.clean_listy_filepath(['../../', 'linooks.jpg'])
50 class FakeStorageSystem(object):
51 def __init__(self
, foobie
, blech
, **kwargs
):
55 class FakeRemoteStorage(storage
.filestorage
.BasicFileStorage
):
56 # Theoretically despite this, all the methods should work but it
57 # should force copying to the workbench
60 def copy_local_to_storage(self
, *args
, **kwargs
):
61 return storage
.StorageInterface
.copy_local_to_storage(
62 self
, *args
, **kwargs
)
65 def test_storage_system_from_config():
66 this_storage
= storage
.storage_system_from_config(
67 {'base_url': 'http://example.org/moodia/',
69 'garbage_arg': 'garbage_arg',
70 'garbage_arg': 'trash'})
71 assert this_storage
.base_url
== 'http://example.org/moodia/'
72 assert this_storage
.base_dir
== '/tmp/'
73 assert this_storage
.__class
__ is storage
.filestorage
.BasicFileStorage
75 this_storage
= storage
.storage_system_from_config(
78 'garbage_arg': 'garbage_arg',
80 'mediagoblin.tests.test_storage:FakeStorageSystem'})
81 assert this_storage
.foobie
== 'eiboof'
82 assert this_storage
.blech
== 'hcelb'
83 assert six
.text_type(this_storage
.__class
__) == \
84 u
"<class 'mediagoblin.tests.test_storage.FakeStorageSystem'>"
87 ##########################
88 # Basic file storage tests
89 ##########################
91 def get_tmp_filestorage(mount_url
=None, fake_remote
=False):
92 tmpdir
= tempfile
.mkdtemp(prefix
="test_gmg_storage")
94 this_storage
= FakeRemoteStorage(tmpdir
, mount_url
)
96 this_storage
= storage
.filestorage
.BasicFileStorage(tmpdir
, mount_url
)
97 return tmpdir
, this_storage
100 def cleanup_storage(this_storage
, tmpdir
, *paths
):
103 assert this_storage
.delete_dir(p
) == True
108 def test_basic_storage__resolve_filepath():
109 tmpdir
, this_storage
= get_tmp_filestorage()
111 result
= this_storage
._resolve
_filepath
(['dir1', 'dir2', 'filename.jpg'])
112 assert result
== os
.path
.join(
113 tmpdir
, 'dir1/dir2/filename.jpg')
115 result
= this_storage
._resolve
_filepath
(['../../etc/', 'passwd'])
116 assert result
== os
.path
.join(
117 tmpdir
, 'etc/passwd')
120 storage
.InvalidFilepath
,
121 this_storage
._resolve
_filepath
,
122 ['../../', 'etc', 'passwd'])
124 cleanup_storage(this_storage
, tmpdir
)
127 def test_basic_storage_file_exists():
128 tmpdir
, this_storage
= get_tmp_filestorage()
130 os
.makedirs(os
.path
.join(tmpdir
, 'dir1', 'dir2'))
131 filename
= os
.path
.join(tmpdir
, 'dir1', 'dir2', 'filename.txt')
132 with
open(filename
, 'w') as ourfile
:
133 ourfile
.write("I'm having a lovely day!")
135 assert this_storage
.file_exists(['dir1', 'dir2', 'filename.txt'])
136 assert not this_storage
.file_exists(['dir1', 'dir2', 'thisfile.lol'])
137 assert not this_storage
.file_exists(['dnedir1', 'dnedir2', 'somefile.lol'])
139 this_storage
.delete_file(['dir1', 'dir2', 'filename.txt'])
140 cleanup_storage(this_storage
, tmpdir
, ['dir1', 'dir2'])
143 def test_basic_storage_get_unique_filepath():
144 tmpdir
, this_storage
= get_tmp_filestorage()
146 # write something that exists
147 os
.makedirs(os
.path
.join(tmpdir
, 'dir1', 'dir2'))
148 filename
= os
.path
.join(tmpdir
, 'dir1', 'dir2', 'filename.txt')
149 with
open(filename
, 'w') as ourfile
:
150 ourfile
.write("I'm having a lovely day!")
152 # now we want something new, with the same name!
153 new_filepath
= this_storage
.get_unique_filepath(
154 ['dir1', 'dir2', 'filename.txt'])
155 assert new_filepath
[:-1] == [u
'dir1', u
'dir2']
157 new_filename
= new_filepath
[-1]
158 assert new_filename
.endswith('filename.txt')
159 assert len(new_filename
) > len('filename.txt')
160 assert new_filename
== secure_filename(new_filename
)
163 cleanup_storage(this_storage
, tmpdir
, ['dir1', 'dir2'])
166 def test_basic_storage_get_file():
167 tmpdir
, this_storage
= get_tmp_filestorage()
169 # Write a brand new file
170 filepath
= ['dir1', 'dir2', 'ourfile.txt']
172 with this_storage
.get_file(filepath
, 'w') as our_file
:
173 our_file
.write('First file')
174 with this_storage
.get_file(filepath
, 'r') as our_file
:
175 assert our_file
.read() == 'First file'
176 assert os
.path
.exists(os
.path
.join(tmpdir
, 'dir1/dir2/ourfile.txt'))
177 with
open(os
.path
.join(tmpdir
, 'dir1/dir2/ourfile.txt'), 'r') as our_file
:
178 assert our_file
.read() == 'First file'
180 # Write to the same path but try to get a unique file.
181 new_filepath
= this_storage
.get_unique_filepath(filepath
)
182 assert not os
.path
.exists(os
.path
.join(tmpdir
, *new_filepath
))
184 with this_storage
.get_file(new_filepath
, 'w') as our_file
:
185 our_file
.write('Second file')
186 with this_storage
.get_file(new_filepath
, 'r') as our_file
:
187 assert our_file
.read() == 'Second file'
188 assert os
.path
.exists(os
.path
.join(tmpdir
, *new_filepath
))
189 with
open(os
.path
.join(tmpdir
, *new_filepath
), 'r') as our_file
:
190 assert our_file
.read() == 'Second file'
192 # Read from an existing file
193 manually_written_file
= os
.makedirs(
194 os
.path
.join(tmpdir
, 'testydir'))
195 with
open(os
.path
.join(tmpdir
, 'testydir/testyfile.txt'), 'w') as testyfile
:
196 testyfile
.write('testy file! so testy.')
198 with this_storage
.get_file(['testydir', 'testyfile.txt']) as testyfile
:
199 assert testyfile
.read() == 'testy file! so testy.'
201 this_storage
.delete_file(filepath
)
202 this_storage
.delete_file(new_filepath
)
203 this_storage
.delete_file(['testydir', 'testyfile.txt'])
204 cleanup_storage(this_storage
, tmpdir
, ['dir1', 'dir2'], ['testydir'])
207 def test_basic_storage_delete_file():
208 tmpdir
, this_storage
= get_tmp_filestorage()
210 assert not os
.path
.exists(
211 os
.path
.join(tmpdir
, 'dir1/dir2/ourfile.txt'))
213 filepath
= ['dir1', 'dir2', 'ourfile.txt']
214 with this_storage
.get_file(filepath
, 'w') as our_file
:
215 our_file
.write('Testing this file')
217 assert os
.path
.exists(
218 os
.path
.join(tmpdir
, 'dir1/dir2/ourfile.txt'))
220 assert this_storage
.delete_dir(['dir1', 'dir2']) == False
221 this_storage
.delete_file(filepath
)
222 assert this_storage
.delete_dir(['dir1', 'dir2']) == True
224 assert not os
.path
.exists(
225 os
.path
.join(tmpdir
, 'dir1/dir2/ourfile.txt'))
227 cleanup_storage(this_storage
, tmpdir
, ['dir1'])
230 def test_basic_storage_url_for_file():
231 # Not supplying a base_url should actually just bork.
232 tmpdir
, this_storage
= get_tmp_filestorage()
234 storage
.NoWebServing
,
235 this_storage
.file_url
,
236 ['dir1', 'dir2', 'filename.txt'])
237 cleanup_storage(this_storage
, tmpdir
)
239 # base_url without domain
240 tmpdir
, this_storage
= get_tmp_filestorage('/media/')
241 result
= this_storage
.file_url(
242 ['dir1', 'dir2', 'filename.txt'])
243 expected
= '/media/dir1/dir2/filename.txt'
244 assert result
== expected
245 cleanup_storage(this_storage
, tmpdir
)
247 # base_url with domain
248 tmpdir
, this_storage
= get_tmp_filestorage(
249 'http://media.example.org/ourmedia/')
250 result
= this_storage
.file_url(
251 ['dir1', 'dir2', 'filename.txt'])
252 expected
= 'http://media.example.org/ourmedia/dir1/dir2/filename.txt'
253 assert result
== expected
254 cleanup_storage(this_storage
, tmpdir
)
257 def test_basic_storage_get_local_path():
258 tmpdir
, this_storage
= get_tmp_filestorage()
260 result
= this_storage
.get_local_path(
261 ['dir1', 'dir2', 'filename.txt'])
263 expected
= os
.path
.join(
264 tmpdir
, 'dir1/dir2/filename.txt')
266 assert result
== expected
268 cleanup_storage(this_storage
, tmpdir
)
271 def test_basic_storage_is_local():
272 tmpdir
, this_storage
= get_tmp_filestorage()
273 assert this_storage
.local_storage
is True
274 cleanup_storage(this_storage
, tmpdir
)
277 def test_basic_storage_copy_locally():
278 tmpdir
, this_storage
= get_tmp_filestorage()
280 dest_tmpdir
= tempfile
.mkdtemp()
282 filepath
= ['dir1', 'dir2', 'ourfile.txt']
283 with this_storage
.get_file(filepath
, 'w') as our_file
:
284 our_file
.write('Testing this file')
286 new_file_dest
= os
.path
.join(dest_tmpdir
, 'file2.txt')
288 this_storage
.copy_locally(filepath
, new_file_dest
)
289 this_storage
.delete_file(filepath
)
291 assert open(new_file_dest
).read() == 'Testing this file'
293 os
.remove(new_file_dest
)
294 os
.rmdir(dest_tmpdir
)
295 cleanup_storage(this_storage
, tmpdir
, ['dir1', 'dir2'])
298 def _test_copy_local_to_storage_works(tmpdir
, this_storage
):
299 local_filename
= tempfile
.mktemp()
300 with
open(local_filename
, 'w') as tmpfile
:
301 tmpfile
.write('haha')
303 this_storage
.copy_local_to_storage(
304 local_filename
, ['dir1', 'dir2', 'copiedto.txt'])
306 os
.remove(local_filename
)
309 os
.path
.join(tmpdir
, 'dir1/dir2/copiedto.txt'),
310 'r').read() == 'haha'
312 this_storage
.delete_file(['dir1', 'dir2', 'copiedto.txt'])
313 cleanup_storage(this_storage
, tmpdir
, ['dir1', 'dir2'])
316 def test_basic_storage_copy_local_to_storage():
317 tmpdir
, this_storage
= get_tmp_filestorage()
318 _test_copy_local_to_storage_works(tmpdir
, this_storage
)
321 def test_general_storage_copy_local_to_storage():
322 tmpdir
, this_storage
= get_tmp_filestorage(fake_remote
=True)
323 _test_copy_local_to_storage_works(tmpdir
, this_storage
)