Use urljoin to create proper feed media URLs
[larjonas-mediagoblin.git] / mediagoblin / tests / tools.py
blob77a9a86c07dbaca155c5832741dd76e93ed962ec
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 import os
19 import pkg_resources
20 import shutil
22 import six
24 from paste.deploy import loadapp
25 from webtest import TestApp
27 from mediagoblin import mg_globals
28 from mediagoblin.db.models import User, LocalUser, MediaEntry, Collection, TextComment, \
29 CommentSubscription, Notification, Privilege, Report, Client, \
30 RequestToken, AccessToken, Activity, Generator, Comment
31 from mediagoblin.tools import testing
32 from mediagoblin.init.config import read_mediagoblin_config
33 from mediagoblin.db.base import Session
34 from mediagoblin.meddleware import BaseMeddleware
35 from mediagoblin.auth import gen_password_hash
36 from mediagoblin.gmg_commands.dbupdate import run_dbupdate
37 from mediagoblin.tools.crypto import random_string
39 from datetime import datetime
42 MEDIAGOBLIN_TEST_DB_NAME = u'__mediagoblin_tests__'
43 TEST_SERVER_CONFIG = pkg_resources.resource_filename(
44 'mediagoblin.tests', 'test_paste.ini')
45 TEST_APP_CONFIG = pkg_resources.resource_filename(
46 'mediagoblin.tests', 'test_mgoblin_app.ini')
49 USER_DEV_DIRECTORIES_TO_SETUP = ['media/public', 'media/queue']
52 class TestingMeddleware(BaseMeddleware):
53 """
54 Meddleware for the Unit tests
56 It might make sense to perform some tests on all
57 requests/responses. Or prepare them in a special
58 manner. For example all html responses could be tested
59 for being valid html *after* being rendered.
61 This module is getting inserted at the front of the
62 meddleware list, which means: requests are handed here
63 first, responses last. So this wraps up the "normal"
64 app.
66 If you need to add a test, either add it directly to
67 the appropiate process_request or process_response, or
68 create a new method and call it from process_*.
69 """
71 def process_response(self, request, response):
72 # All following tests should be for html only!
73 if getattr(response, 'content_type', None) != "text/html":
74 # Get out early
75 return
77 # If the template contains a reference to
78 # /mgoblin_static/ instead of using
79 # /request.staticdirect(), error out here.
80 # This could probably be implemented as a grep on
81 # the shipped templates easier...
82 if response.text.find("/mgoblin_static/") >= 0:
83 raise AssertionError(
84 "Response HTML contains reference to /mgoblin_static/ "
85 "instead of staticdirect. Request was for: "
86 + request.full_path)
88 return
91 def get_app(request, paste_config=None, mgoblin_config=None):
92 """Create a MediaGoblin app for testing.
94 Args:
95 - request: Not an http request, but a pytest fixture request. We
96 use this to make temporary directories that pytest
97 automatically cleans up as needed.
98 - paste_config: particular paste config used by this application.
99 - mgoblin_config: particular mediagoblin config used by this
100 application.
102 paste_config = paste_config or TEST_SERVER_CONFIG
103 mgoblin_config = mgoblin_config or TEST_APP_CONFIG
105 # This is the directory we're copying the paste/mgoblin config stuff into
106 run_dir = request.config._tmpdirhandler.mktemp(
107 'mgoblin_app', numbered=True)
108 user_dev_dir = run_dir.mkdir('user_dev').strpath
110 new_paste_config = run_dir.join('paste.ini').strpath
111 new_mgoblin_config = run_dir.join('mediagoblin.ini').strpath
112 shutil.copyfile(paste_config, new_paste_config)
113 shutil.copyfile(mgoblin_config, new_mgoblin_config)
115 Session.rollback()
116 Session.remove()
118 # install user_dev directories
119 for directory in USER_DEV_DIRECTORIES_TO_SETUP:
120 full_dir = os.path.join(user_dev_dir, directory)
121 os.makedirs(full_dir)
123 # Get app config
124 global_config, validation_result = read_mediagoblin_config(new_mgoblin_config)
125 app_config = global_config['mediagoblin']
127 # Run database setup/migrations
128 run_dbupdate(app_config, global_config)
130 # setup app and return
131 test_app = loadapp(
132 'config:' + new_paste_config)
134 # Insert the TestingMeddleware, which can do some
135 # sanity checks on every request/response.
136 # Doing it this way is probably not the cleanest way.
137 # We'll fix it, when we have plugins!
138 mg_globals.app.meddleware.insert(0, TestingMeddleware(mg_globals.app))
140 app = TestApp(test_app)
141 return app
144 def install_fixtures_simple(db, fixtures):
146 Very simply install fixtures in the database
148 for collection_name, collection_fixtures in six.iteritems(fixtures):
149 collection = db[collection_name]
150 for fixture in collection_fixtures:
151 collection.insert(fixture)
154 def assert_db_meets_expected(db, expected):
156 Assert a database contains the things we expect it to.
158 Objects are found via 'id', so you should make sure your document
159 has an id.
161 Args:
162 - db: pymongo or mongokit database connection
163 - expected: the data we expect. Formatted like:
164 {'collection_name': [
165 {'id': 'foo',
166 'some_field': 'some_value'},]}
168 for collection_name, collection_data in six.iteritems(expected):
169 collection = db[collection_name]
170 for expected_document in collection_data:
171 document = collection.query.filter_by(id=expected_document['id']).first()
172 assert document is not None # make sure it exists
173 assert document == expected_document # make sure it matches
176 def fixture_add_user(username=u'chris', password=u'toast',
177 privileges=[], wants_comment_notification=True):
178 # Reuse existing user or create a new one
179 test_user = LocalUser.query.filter(LocalUser.username==username).first()
180 if test_user is None:
181 test_user = LocalUser()
182 test_user.username = username
183 test_user.email = username + u'@example.com'
184 if password is not None:
185 test_user.pw_hash = gen_password_hash(password)
186 test_user.wants_comment_notification = wants_comment_notification
187 for privilege in privileges:
188 query = Privilege.query.filter(Privilege.privilege_name==privilege)
189 if query.count():
190 test_user.all_privileges.append(query.one())
192 test_user.save()
194 # Reload - The `with_polymorphic` needs to be there to eagerly load
195 # the attributes on the LocalUser as this can't be done post detachment.
196 user_query = LocalUser.query.with_polymorphic(LocalUser)
197 test_user = user_query.filter(LocalUser.username==username).first()
199 # ... and detach from session:
200 Session.expunge(test_user)
202 return test_user
205 def fixture_comment_subscription(entry, notify=True, send_email=None):
206 if send_email is None:
207 actor = LocalUser.query.filter_by(id=entry.actor).first()
208 send_email = actor.wants_comment_notification
210 cs = CommentSubscription(
211 media_entry_id=entry.id,
212 user_id=entry.actor,
213 notify=notify,
214 send_email=send_email)
216 cs.save()
218 cs = CommentSubscription.query.filter_by(id=cs.id).first()
220 Session.expunge(cs)
222 return cs
225 def fixture_add_comment_notification(entry, subject, user,
226 seen=False):
227 cn = Notification(
228 user_id=user,
229 seen=seen,
231 cn.obj = subject
232 cn.save()
234 cn = Notification.query.filter_by(id=cn.id).first()
236 Session.expunge(cn)
238 return cn
241 def fixture_media_entry(title=u"Some title", slug=None,
242 uploader=None, save=True, gen_slug=True,
243 state=u'unprocessed', fake_upload=True,
244 expunge=True):
246 Add a media entry for testing purposes.
248 Caution: if you're adding multiple entries with fake_upload=True,
249 make sure you save between them... otherwise you'll hit an
250 IntegrityError from multiple newly-added-MediaEntries adding
251 FileKeynames at once. :)
253 if uploader is None:
254 uploader = fixture_add_user().id
256 entry = MediaEntry()
257 entry.title = title
258 entry.slug = slug
259 entry.actor = uploader
260 entry.media_type = u'image'
261 entry.state = state
263 if fake_upload:
264 entry.media_files = {'thumb': ['a', 'b', 'c.jpg'],
265 'medium': ['d', 'e', 'f.png'],
266 'original': ['g', 'h', 'i.png']}
267 entry.media_type = u'mediagoblin.media_types.image'
269 if gen_slug:
270 entry.generate_slug()
272 if save:
273 entry.save()
275 if expunge:
276 entry = MediaEntry.query.filter_by(id=entry.id).first()
278 Session.expunge(entry)
280 return entry
283 def fixture_add_collection(name=u"My first Collection", user=None,
284 collection_type=Collection.USER_DEFINED_TYPE):
285 if user is None:
286 user = fixture_add_user()
287 coll = Collection.query.filter_by(
288 actor=user.id,
289 title=name,
290 type=collection_type
291 ).first()
292 if coll is not None:
293 return coll
294 coll = Collection()
295 coll.actor = user.id
296 coll.title = name
297 coll.type = collection_type
298 coll.generate_slug()
299 coll.save()
301 # Reload
302 Session.refresh(coll)
304 # ... and detach from session:
305 Session.expunge(coll)
307 return coll
309 def fixture_add_comment(author=None, media_entry=None, comment=None):
310 if author is None:
311 author = fixture_add_user().id
313 if media_entry is None:
314 media_entry = fixture_media_entry()
316 if comment is None:
317 comment = \
318 'Auto-generated test comment by user #{0} on media #{0}'.format(
319 author, media_entry)
321 text_comment = TextComment(
322 actor=author,
323 content=comment
325 text_comment.save()
327 comment_link = Comment()
328 comment_link.target = media_entry
329 comment_link.comment = text_comment
330 comment_link.save()
332 Session.expunge(comment_link)
334 return text_comment
336 def fixture_add_comment_report(comment=None, reported_user=None,
337 reporter=None, created=None, report_content=None):
338 if comment is None:
339 comment = fixture_add_comment()
341 if reported_user is None:
342 reported_user = fixture_add_user()
344 if reporter is None:
345 reporter = fixture_add_user()
347 if created is None:
348 created=datetime.now()
350 if report_content is None:
351 report_content = \
352 'Auto-generated test report'
354 comment_report = Report()
355 comment_report.obj = comment
356 comment_report.reported_user = reported_user
357 comment_report.reporter = reporter
358 comment_report.created = created
359 comment_report.report_content = report_content
360 comment_report.obj = comment
361 comment_report.save()
363 Session.expunge(comment_report)
365 return comment_report
367 def fixture_add_activity(obj, verb="post", target=None, generator=None, actor=None):
368 if generator is None:
369 generator = Generator(
370 name="GNU MediaGoblin",
371 object_type="service"
373 generator.save()
375 if actor is None:
376 actor = fixture_add_user()
378 activity = Activity(
379 verb=verb,
380 actor=actor.id,
381 generator=generator.id,
384 activity.set_object(obj)
386 if target is not None:
387 activity.set_target(target)
389 activity.save()
390 return activity