1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 from werkzeug
.datastructures
import FileStorage
22 from .resources
import GOOD_JPG
23 from mediagoblin
.db
.base
import Session
24 from mediagoblin
.media_types
import sniff_media
25 from mediagoblin
.submit
.lib
import new_upload_entry
26 from mediagoblin
.submit
.task
import collect_garbage
27 from mediagoblin
.db
.models
import User
, MediaEntry
, TextComment
, Comment
28 from mediagoblin
.tests
.tools
import fixture_add_user
, fixture_media_entry
31 def test_404_for_non_existent(test_app
):
32 res
= test_app
.get('/does-not-exist/', expect_errors
=True)
33 assert res
.status_int
== 404
36 def test_user_deletes_other_comments(test_app
):
37 user_a
= fixture_add_user(u
"chris_a")
38 user_b
= fixture_add_user(u
"chris_b")
40 media_a
= fixture_media_entry(uploader
=user_a
.id, save
=False,
41 expunge
=False, fake_upload
=False)
42 media_b
= fixture_media_entry(uploader
=user_b
.id, save
=False,
43 expunge
=False, fake_upload
=False)
48 # Create all 4 possible comments:
49 for u
in (user_a
, user_b
):
50 for m
in (media_a
, media_b
):
53 cmt
.content
= u
"Some Comment"
55 # think i need this to get the command ID
65 usr_cnt1
= User
.query
.count()
66 med_cnt1
= MediaEntry
.query
.count()
67 cmt_cnt1
= TextComment
.query
.count()
69 User
.query
.get(user_a
.id).delete(commit
=False)
71 usr_cnt2
= User
.query
.count()
72 med_cnt2
= MediaEntry
.query
.count()
73 cmt_cnt2
= TextComment
.query
.count()
76 assert usr_cnt2
== usr_cnt1
- 1
78 assert med_cnt2
== med_cnt1
- 1
79 # Three of four comments gone.
80 assert cmt_cnt2
== cmt_cnt1
- 3
82 User
.query
.get(user_b
.id).delete()
84 usr_cnt2
= User
.query
.count()
85 med_cnt2
= MediaEntry
.query
.count()
86 cmt_cnt2
= TextComment
.query
.count()
89 assert usr_cnt2
== usr_cnt1
- 2
91 assert med_cnt2
== med_cnt1
- 2
93 assert cmt_cnt2
== cmt_cnt1
- 4
96 def test_media_deletes_broken_attachment(test_app
):
97 user_a
= fixture_add_user(u
"chris_a")
99 media
= fixture_media_entry(uploader
=user_a
.id, save
=False, expunge
=False)
100 media
.attachment_files
.append(dict(
102 filepath
=[u
"does", u
"not", u
"exist"],
107 MediaEntry
.query
.get(media
.id).delete()
108 User
.query
.get(user_a
.id).delete()
110 def test_garbage_collection_task(test_app
):
111 """ Test old media entry are removed by GC task """
112 user
= fixture_add_user()
114 # Create a media entry that's unprocessed and over an hour old.
116 now
= datetime
.datetime
.now(pytz
.UTC
)
117 file_data
= FileStorage(
118 stream
=open(GOOD_JPG
, "rb"),
119 filename
="mah_test.jpg",
120 content_type
="image/jpeg"
124 media_type
, media_manager
= sniff_media(file_data
, "mah_test.jpg")
125 entry
= new_upload_entry(user
)
127 entry
.title
= "Mah Image"
128 entry
.slug
= "slugy-slug-slug"
129 entry
.media_type
= 'image'
130 entry
.created
= now
- datetime
.timedelta(days
=2)
133 # Validate the model exists
134 assert MediaEntry
.query
.filter_by(id=entry_id
).first() is not None
136 # Call the garbage collection task
139 # Now validate the image has been deleted
140 assert MediaEntry
.query
.filter_by(id=entry_id
).first() is None