1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2014-2015 Jessica Tallon & Matt Molyneaux
5 # This file is part of Inboxen.
7 # Inboxen is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
12 # Inboxen is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
17 # You should have received a copy of the GNU Affero General Public License
18 # along with Inboxen. If not, see <http://www.gnu.org/licenses/>.
21 from importlib
import import_module
22 from io
import BytesIO
34 from django
import urls
35 from django
.conf
import settings
36 from django
.contrib
.auth
import get_user_model
37 from django
.contrib
.auth
.models
import Group
38 from django
.test
import override_settings
39 from django
.urls
import reverse
40 from django
.utils
import timezone
41 from salmon
import mail
43 from inboxen
import models
44 from inboxen
.liberation
import tasks
45 from inboxen
.liberation
.forms
import LiberationForm
46 from inboxen
.liberation
.utils
import INBOXEN_ENCODING_ERROR_HEADER_NAME
, make_message
47 from inboxen
.router
.utils
import make_email
48 from inboxen
.test
import InboxenTestCase
, MockRequest
49 from inboxen
.tests
import factories
50 from inboxen
.tests
.example_emails
import (EXAMPLE_ALT
, EXAMPLE_DIGEST
, EXAMPLE_MISSING_CTE
,
51 EXAMPLE_PREMAILER_BROKEN_CSS
, EXAMPLE_SIGNED_FORWARDED_DIGEST
)
52 from inboxen
.tickets
.models
import Question
53 from inboxen
.tickets
.tests
import QuestionFactory
, ResponseFactory
56 class LiberateTestCase(InboxenTestCase
):
57 """Test account liberating"""
59 self
.user
= factories
.UserFactory()
60 self
.inboxes
= factories
.InboxFactory
.create_batch(2, user
=self
.user
)
61 self
.emails
= factories
.EmailFactory
.create_batch(5, inbox
=self
.inboxes
[0])
62 self
.emails
.extend(factories
.EmailFactory
.create_batch(5, inbox
=self
.inboxes
[1]))
64 for email
in self
.emails
:
65 part
= factories
.PartListFactory(email
=email
)
66 factories
.HeaderFactory(part
=part
, name
="From")
67 factories
.HeaderFactory(part
=part
, name
="Subject", data
="ßssss!")
69 self
.tmp_dir
= tempfile
.mkdtemp()
70 self
.mail_dir
= os
.path
.join(self
.tmp_dir
, "isdabizda")
71 mailbox
.Maildir(self
.mail_dir
)
74 shutil
.rmtree(self
.tmp_dir
, ignore_errors
=True)
76 def test_liberate(self
):
77 """Run through all combinations of compressions and mailbox formats"""
78 with
override_settings(SENDFILE_ROOT
=self
.tmp_dir
):
79 for storage
, compression
in itertools
.product(LiberationForm
.STORAGE_TYPES
,
80 LiberationForm
.COMPRESSION_TYPES
):
81 form_data
= {"storage_type": str(storage
[0]), "compression_type": str(compression
[0])}
82 form
= LiberationForm(self
.user
, data
=form_data
)
83 self
.assertTrue(form
.is_valid())
86 # delete liberation now we're done with it and refetch user
87 models
.Liberation
.objects
.all().delete()
88 models
.Liberation
.objects
.create(user
=self
.user
)
89 self
.user
= get_user_model().objects
.get(id=self
.user
.id)
91 # TODO: check Liberation model actually has correct archive type
93 def test_liberate_inbox(self
):
94 result
= tasks
.liberate_inbox(self
.mail_dir
, self
.inboxes
[0].id)
95 self
.assertIn("folder", result
)
96 self
.assertIn("ids", result
)
97 self
.assertTrue(os
.path
.exists(os
.path
.join(self
.mail_dir
, '.' + result
["folder"])))
99 email_ids
= models
.Email
.objects
.filter(inbox
=self
.inboxes
[0]).values_list("id", flat
=True)
100 self
.assertCountEqual(email_ids
, result
["ids"])
102 def test_liberate_message(self
):
103 inbox
= tasks
.liberate_inbox(self
.mail_dir
, self
.inboxes
[0].id)["folder"]
104 email
= self
.inboxes
[0].email_set
.all()[0]
105 ret_val
= tasks
.liberate_message(self
.mail_dir
, inbox
, email
.id)
106 self
.assertEqual(ret_val
, None)
108 def test_liberate_message_invalid_id(self
):
109 """liberate_message will return the ID of the email of there was an error."""
110 inbox
= tasks
.liberate_inbox(self
.mail_dir
, self
.inboxes
[0].id)["folder"]
111 ret_val
= tasks
.liberate_message(self
.mail_dir
, inbox
, 10000000)
112 self
.assertEqual(ret_val
, hex(10000000)[2:])
114 def test_liberate_message_bad_encoding(self
):
115 """liberate_message should be able to export any email that has been
116 accepted into our data base - including seemingly broken ones"""
117 inbox
= tasks
.liberate_inbox(self
.mail_dir
, self
.inboxes
[0].id)["folder"]
118 email
= self
.inboxes
[0].email_set
.all()[0]
120 # replace body with something bad
121 body
= email
.parts
.first().body
122 body
.data
= "Pó på pə pë".encode()
125 ret_val
= tasks
.liberate_message(self
.mail_dir
, inbox
, email
.id)
126 self
.assertEqual(ret_val
, None)
128 def test_liberate_collect_emails(self
):
129 tasks
.liberate_collect_emails(None, self
.mail_dir
, {"user": self
.user
.id, "path": self
.mail_dir
,
130 "tarname": self
.mail_dir
+ ".tar.gz",
131 "storage_type": "0", "compression_type": "0"})
133 def test_liberation_finish(self
):
134 result_path
= os
.path
.join(self
.mail_dir
, "result")
135 with
open(result_path
, "w") as result
:
136 result
.write("a test")
137 tasks
.liberation_finish(result_path
, {"user": self
.user
.id, "path": self
.mail_dir
,
138 "storage_type": "0", "compression_type": "0"})
141 class LiberateFetchInfoTestCase(InboxenTestCase
):
145 self
.user
= get_user_model().objects
.create(username
="atester")
147 self
.tmp_dir
= tempfile
.mkdtemp()
148 self
.mail_dir
= os
.path
.join(self
.tmp_dir
, "isdabizda")
149 mailbox
.Maildir(self
.mail_dir
)
152 shutil
.rmtree(self
.mail_dir
, ignore_errors
=True)
154 def test_liberate_fetch_info_no_result(self
):
155 tasks
.liberate_fetch_info(None, {"user": self
.user
.id, "path": self
.mail_dir
})
156 with
open(os
.path
.join(self
.mail_dir
, "profile.json")) as profile_file
:
157 profile_data
= json
.load(profile_file
)
158 with
open(os
.path
.join(self
.mail_dir
, "inbox.json")) as inbox_file
:
159 inbox_data
= json
.load(inbox_file
)
160 with
open(os
.path
.join(self
.mail_dir
, "questions.json")) as questions_file
:
161 question_data
= json
.load(questions_file
)
163 self
.assertEqual(inbox_data
, {})
164 self
.assertEqual(question_data
, [])
165 self
.assertEqual(profile_data
, {
167 "username": self
.user
.username
,
168 "is_active": self
.user
.is_active
,
169 "join_date": self
.user
.date_joined
.isoformat(),
173 "prefer_html_email": True,
174 "prefered_domain": None,
179 def test_liberate_fetch_info_results_and_inboxes(self
):
180 inboxes
= factories
.InboxFactory
.create_batch(2, user
=self
.user
)
181 results
= [None for i
in range(6)]
183 tasks
.liberate_fetch_info(results
, {"user": self
.user
.id, "path": self
.mail_dir
})
184 with
open(os
.path
.join(self
.mail_dir
, "profile.json")) as profile_file
:
185 profile_data
= json
.load(profile_file
)
186 with
open(os
.path
.join(self
.mail_dir
, "inbox.json")) as inbox_file
:
187 inbox_data
= json
.load(inbox_file
)
188 with
open(os
.path
.join(self
.mail_dir
, "questions.json")) as questions_file
:
189 questions_data
= json
.load(questions_file
)
191 self
.assertEqual(questions_data
, [])
192 self
.assertEqual(inbox_data
, {
194 "created": box
.created
.isoformat(),
197 "deleted": box
.deleted
,
199 "exclude_from_unified": box
.exclude_from_unified
,
200 "disabled": box
.disabled
,
201 "pinned": box
.pinned
,
207 self
.assertEqual(profile_data
, {
209 "username": self
.user
.username
,
210 "is_active": self
.user
.is_active
,
211 "join_date": self
.user
.date_joined
.isoformat(),
215 "prefer_html_email": True,
216 "prefered_domain": None,
221 def test_liberate_fetch_info_user_has_groups(self
):
222 group
= Group
.objects
.create(name
="hello")
223 self
.user
.groups
.add(group
)
225 tasks
.liberate_fetch_info(None, {"user": self
.user
.id, "path": self
.mail_dir
})
226 with
open(os
.path
.join(self
.mail_dir
, "profile.json")) as profile_file
:
227 profile_data
= json
.load(profile_file
)
228 with
open(os
.path
.join(self
.mail_dir
, "inbox.json")) as inbox_file
:
229 inbox_data
= json
.load(inbox_file
)
230 with
open(os
.path
.join(self
.mail_dir
, "questions.json")) as questions_file
:
231 question_data
= json
.load(questions_file
)
233 self
.assertEqual(question_data
, [])
234 self
.assertEqual(inbox_data
, {})
235 self
.assertEqual(profile_data
, {
237 "username": self
.user
.username
,
238 "is_active": self
.user
.is_active
,
239 "join_date": self
.user
.date_joined
.isoformat(),
243 "prefer_html_email": True,
244 "prefered_domain": None,
249 def test_liberate_fetch_info_user_has_prefered_domain(self
):
250 domain
= factories
.DomainFactory()
251 profile
= self
.user
.inboxenprofile
252 profile
.prefered_domain
= domain
255 tasks
.liberate_fetch_info(None, {"user": self
.user
.id, "path": self
.mail_dir
})
256 with
open(os
.path
.join(self
.mail_dir
, "profile.json")) as profile_file
:
257 profile_data
= json
.load(profile_file
)
258 with
open(os
.path
.join(self
.mail_dir
, "inbox.json")) as inbox_file
:
259 inbox_data
= json
.load(inbox_file
)
260 with
open(os
.path
.join(self
.mail_dir
, "questions.json")) as questions_file
:
261 question_data
= json
.load(questions_file
)
263 self
.assertEqual(question_data
, [])
264 self
.assertEqual(inbox_data
, {})
265 self
.assertEqual(profile_data
, {
267 "username": self
.user
.username
,
268 "is_active": self
.user
.is_active
,
269 "join_date": self
.user
.date_joined
.isoformat(),
273 "prefer_html_email": True,
274 "prefered_domain": str(domain
),
279 def test_liberate_fetch_questions(self
):
280 other_user
= factories
.UserFactory(username
="notme")
281 QuestionFactory
.create_batch(2, author
=self
.user
, status
=Question
.NEW
)
282 QuestionFactory
.create_batch(2, author
=other_user
, status
=Question
.RESOLVED
)
284 questions
= Question
.objects
.filter(author
=self
.user
)
285 response
= ResponseFactory(question
=questions
[0], author
=other_user
)
288 tasks
.liberate_fetch_info(None, {"user": self
.user
.id, "path": self
.mail_dir
})
289 with
open(os
.path
.join(self
.mail_dir
, "profile.json")) as profile_file
:
290 profile_data
= json
.load(profile_file
)
291 with
open(os
.path
.join(self
.mail_dir
, "inbox.json")) as inbox_file
:
292 inbox_data
= json
.load(inbox_file
)
293 with
open(os
.path
.join(self
.mail_dir
, "questions.json")) as questions_file
:
294 question_data
= json
.load(questions_file
)
296 self
.assertEqual(inbox_data
, {})
297 self
.assertEqual(profile_data
, {
299 "username": self
.user
.username
,
300 "is_active": self
.user
.is_active
,
301 "join_date": self
.user
.date_joined
.isoformat(),
305 "prefer_html_email": True,
306 "prefered_domain": None,
310 self
.assertEqual(question_data
, [
312 "subject": questions
[0].subject
,
313 "author": self
.user
.username
,
314 "date": questions
[0].date
.isoformat(),
315 "last_modified": questions
[0].last_modified
.isoformat(),
316 "status": questions
[0].get_status_display(),
317 "body": questions
[0].body
,
320 "author": other_user
.username
,
321 "date": response
.date
.isoformat(),
322 "body": response
.body
,
327 "subject": questions
[1].subject
,
328 "author": self
.user
.username
,
329 "date": questions
[1].date
.isoformat(),
330 "last_modified": questions
[1].last_modified
.isoformat(),
331 "status": questions
[1].get_status_display(),
332 "body": questions
[1].body
,
337 def test_liberate_fetch_questions_user_deleted(self
):
338 other_user
= factories
.UserFactory(username
="notme")
339 question
= QuestionFactory(author
=self
.user
, status
=Question
.NEW
)
342 response
= ResponseFactory(question
=question
, author
=other_user
)
347 tasks
.liberate_fetch_info(None, {"user": self
.user
.id, "path": self
.mail_dir
})
348 with
open(os
.path
.join(self
.mail_dir
, "profile.json")) as profile_file
:
349 profile_data
= json
.load(profile_file
)
350 with
open(os
.path
.join(self
.mail_dir
, "inbox.json")) as inbox_file
:
351 inbox_data
= json
.load(inbox_file
)
352 with
open(os
.path
.join(self
.mail_dir
, "questions.json")) as questions_file
:
353 question_data
= json
.load(questions_file
)
355 self
.assertEqual(inbox_data
, {})
356 self
.assertEqual(profile_data
, {
358 "username": self
.user
.username
,
359 "is_active": self
.user
.is_active
,
360 "join_date": self
.user
.date_joined
.isoformat(),
364 "prefer_html_email": True,
365 "prefered_domain": None,
369 self
.assertEqual(question_data
, [{
370 "subject": question
.subject
,
371 "author": self
.user
.username
,
372 "date": question
.date
.isoformat(),
373 "last_modified": question
.last_modified
.isoformat(),
374 "status": question
.get_status_display(),
375 "body": question
.body
,
378 "date": response
.date
.isoformat(),
379 "body": response
.body
,
384 class LiberateNewUserTestCase(InboxenTestCase
):
385 """Liberate a new user, with no data"""
387 self
.user
= get_user_model().objects
.create(username
="atester")
389 self
.tmp_dir
= tempfile
.mkdtemp()
390 self
.mail_dir
= os
.path
.join(self
.tmp_dir
, "isdabizda")
391 mailbox
.Maildir(self
.mail_dir
)
394 shutil
.rmtree(self
.mail_dir
, ignore_errors
=True)
396 def test_liberate(self
):
397 with
override_settings(SENDFILE_ROOT
=self
.tmp_dir
):
398 form
= LiberationForm(self
.user
, data
={"storage_type": 0, "compression_type": 0})
399 self
.assertTrue(form
.is_valid())
402 def test_liberation_finish(self
):
403 result_path
= os
.path
.join(self
.mail_dir
, "result")
404 with
open(result_path
, "w") as result_file
:
405 result_file
.write("a test")
406 tasks
.liberation_finish(result_path
, {"user": self
.user
.id, "path": self
.mail_dir
,
407 "storage_type": "0", "compression_type": "0"})
410 class LiberateViewTestCase(InboxenTestCase
):
412 self
.user
= factories
.UserFactory()
414 login
= self
.client
.login(username
=self
.user
.username
, password
="123456", request
=MockRequest(self
.user
))
417 raise Exception("Could not log in")
420 return urls
.reverse("user-liberate")
422 def test_form_bad_data(self
):
423 params
= {"storage_type": 180, "compression_type": 180}
424 form
= LiberationForm(user
=self
.user
, data
=params
)
426 self
.assertFalse(form
.is_valid())
428 def test_form_good_data(self
):
429 params
= {"storage_type": 1, "compression_type": 1}
430 form
= LiberationForm(user
=self
.user
, data
=params
)
432 self
.assertTrue(form
.is_valid())
435 response
= self
.client
.get(self
.get_url())
436 self
.assertEqual(response
.status_code
, 200)
439 class LiberationDownloadViewTestCase(InboxenTestCase
):
441 self
.user
= factories
.UserFactory()
442 self
.tmp_dir
= tempfile
.mkdtemp()
444 assert self
.client
.login(username
=self
.user
.username
, password
="123456", request
=MockRequest(self
.user
))
446 def test_sendfile_no_liberation(self
):
447 response
= self
.client
.get(reverse("user-liberate-get"))
448 self
.assertEqual(response
.status_code
, 404)
450 def test_default_backend(self
):
451 module
= import_module(settings
.SENDFILE_BACKEND
)
452 self
.assertTrue(hasattr(module
, "sendfile")) # function that django-sendfile
453 self
.assertTrue(hasattr(module
.sendfile
, "__call__")) # callable
455 @override_settings(SENDFILE_BACKEND
="django_sendfile.backends.xsendfile")
456 def test_sendfile(self
):
457 with
override_settings(SENDFILE_ROOT
=self
.tmp_dir
):
458 self
.user
.liberation
.path
= "test.txt"
459 self
.user
.liberation
.save()
461 self
.assertEqual(os
.path
.join(self
.tmp_dir
, "test.txt"), self
.user
.liberation
.path
)
463 with
open(self
.user
.liberation
.path
, "wb") as file_obj
:
464 file_obj
.write(b
"hello\n")
466 response
= self
.client
.get(reverse("user-liberate-get"))
467 self
.assertEqual(response
.status_code
, 200)
468 self
.assertEqual(response
.content
, b
"")
469 self
.assertEqual(response
["Content-Type"], "application/x-gzip")
470 self
.assertEqual(response
["Content-Disposition"], 'attachment; filename="liberated_data.tar.gz"')
471 self
.assertEqual(response
["X-Sendfile"], os
.path
.join(self
.tmp_dir
, "test.txt"))
474 class MakeMessageUtilTestCase(InboxenTestCase
):
475 """Test that example emails are serialisable"""
477 self
.user
= factories
.UserFactory()
478 self
.inbox
= factories
.InboxFactory(user
=self
.user
)
480 login
= self
.client
.login(username
=self
.user
.username
, password
="123456", request
=MockRequest(self
.user
))
483 raise Exception("Could not log in")
485 def test_digest(self
):
486 msg
= mail
.MailRequest("", "", "", EXAMPLE_DIGEST
)
487 make_email(msg
, self
.inbox
)
488 email
= models
.Email
.objects
.get()
489 message_object
= make_message(email
)
490 new_msg
= mail
.MailRequest("", "", "", str(message_object
))
492 self
.assertEqual(len(msg
.keys()), len(new_msg
.keys()))
493 self
.assertEqual(len(list(msg
.walk())), len(list(new_msg
.walk())))
495 def test_signed_forwarded_digest(self
):
496 msg
= mail
.MailRequest("", "", "", EXAMPLE_SIGNED_FORWARDED_DIGEST
)
497 make_email(msg
, self
.inbox
)
498 self
.email
= models
.Email
.objects
.get()
499 email
= models
.Email
.objects
.get()
500 message_object
= make_message(email
)
501 new_msg
= mail
.MailRequest("", "", "", str(message_object
))
503 self
.assertEqual(len(msg
.keys()), len(new_msg
.keys()))
504 self
.assertEqual(len(list(msg
.walk())), len(list(new_msg
.walk())))
506 def test_alterative(self
):
507 msg
= mail
.MailRequest("", "", "", EXAMPLE_ALT
)
508 make_email(msg
, self
.inbox
)
509 email
= models
.Email
.objects
.get()
510 message_object
= make_message(email
)
511 new_msg
= mail
.MailRequest("", "", "", str(message_object
))
513 self
.assertEqual(len(msg
.keys()), len(new_msg
.keys()))
514 self
.assertEqual(len(list(msg
.walk())), len(list(new_msg
.walk())))
516 def test_bad_css(self
):
517 """This test uses an example email that causes issue #47"""
518 msg
= mail
.MailRequest("", "", "", EXAMPLE_PREMAILER_BROKEN_CSS
)
519 make_email(msg
, self
.inbox
)
520 email
= models
.Email
.objects
.get()
521 message_object
= make_message(email
)
522 new_msg
= mail
.MailRequest("", "", "", str(message_object
))
524 self
.assertEqual(len(msg
.keys()), len(new_msg
.keys()))
525 self
.assertEqual(len(list(msg
.walk())), len(list(new_msg
.walk())))
527 def test_unicode(self
):
528 """This test uses an example email that contains unicode chars"""
529 msg
= mail
.MailRequest("", "", "", EXAMPLE_MISSING_CTE
)
530 make_email(msg
, self
.inbox
)
531 email
= models
.Email
.objects
.get()
532 message_object
= make_message(email
)
533 new_msg
= mail
.MailRequest("", "", "", message_object
.as_bytes().decode())
535 self
.assertEqual(len(msg
.keys()), len(new_msg
.keys()))
536 self
.assertEqual(len(list(msg
.walk())), len(list(new_msg
.walk())))
538 def test_encoders_used(self
):
539 # make message with base64 part, uuencode part, 8bit part, 7bit part,
540 # quopri part, and some invalid part
541 unicode_body_data
= "Hello\n\nHow are you?\nPó på pə pë\n".encode()
542 ascii_body_data
= "Hello\n\nHow are you?\n".encode()
543 email
= factories
.EmailFactory(inbox
=self
.inbox
)
544 unicode_body
= factories
.BodyFactory(data
=unicode_body_data
)
545 ascii_body
= factories
.BodyFactory(data
=ascii_body_data
)
546 first_part
= factories
.PartListFactory(email
=email
, body
=factories
.BodyFactory(data
=b
""))
547 factories
.HeaderFactory(part
=first_part
, name
="Content-Type",
548 data
="multipart/mixed; boundary=\"=-3BRZDE/skgKPPh+RuFa/\"")
550 unicode_encodings
= {
551 "base64": check_base64
,
552 "quoted-printable": check_quopri
,
553 "uuencode": check_uu
,
554 "x-uuencode": check_uu
,
561 "9-bit": check_unknown
, # unknown encoding
564 encodings
.update(unicode_encodings
)
565 encodings
.update(ascii_encodings
)
567 for enc
in unicode_encodings
.keys():
568 part
= factories
.PartListFactory(email
=email
, parent
=first_part
, body
=unicode_body
)
569 factories
.HeaderFactory(part
=part
, name
="Content-Type", data
="text/plain; name=\"my-file.txt\"")
570 factories
.HeaderFactory(part
=part
, name
="Content-Transfer-Encoding", data
=enc
)
572 for enc
in ascii_encodings
.keys():
573 part
= factories
.PartListFactory(email
=email
, parent
=first_part
, body
=ascii_body
)
574 factories
.HeaderFactory(part
=part
, name
="Content-Type", data
="text/plain; name=\"my-file.txt\"")
575 factories
.HeaderFactory(part
=part
, name
="Content-Transfer-Encoding", data
=enc
)
577 # finally, make a part without content headers
578 factories
.PartListFactory(email
=email
, parent
=first_part
, body
=ascii_body
)
581 message_object
= make_message(email
)
583 for message_part
in message_object
.walk():
584 ct
= message_part
.get("Content-Type", None)
585 cte
= message_part
.get("Content-Transfer-Encoding", None)
587 # default is to assume 7-bit
588 check_noop(message_part
, ascii_body_data
)
589 self
.assertEqual(message_part
.get_payload(decode
=True), ascii_body_data
)
590 elif ct
.startswith("multipart/mixed"):
592 elif cte
in ascii_encodings
:
593 encodings
[cte
](message_part
, ascii_body_data
)
594 self
.assertEqual(message_part
.get_payload(decode
=True), ascii_body_data
)
595 elif cte
in unicode_encodings
:
596 encodings
[cte
](message_part
, unicode_body_data
)
597 self
.assertEqual(message_part
.get_payload(decode
=True), unicode_body_data
)
599 raise AssertionError("Unknown Content-Type or Content-Type-Encoding")
601 # check that we can decode the whole lot in one go
602 output_bytes
= message_object
.as_string().encode("ascii")
603 self
.assertNotEqual(len(output_bytes
), 0)
604 self
.assertEqual(output_bytes
.count(b
"text/plain; name=\"my-file.txt\""), len(encodings
))
607 def check_noop(msg
, data
):
608 assert msg
._payload
.encode() == data
, "Payload has been transformed"
610 assert INBOXEN_ENCODING_ERROR_HEADER_NAME
not in msg
.keys(), "Unexpected error header"
613 def check_unknown(msg
, data
):
614 assert msg
._payload
.encode() == data
, "Payload has been transformed"
616 assert INBOXEN_ENCODING_ERROR_HEADER_NAME
in msg
.keys(), "Missing error header"
619 def check_base64(msg
, data
):
620 assert msg
._payload
.encode() != data
, "Payload has not been transformed"
622 payload
= base64
.standard_b64decode(msg
._payload
)
624 assert False, "Payload could not be decoded"
625 assert payload
== data
, "Decoded payload does not match input data"
627 assert INBOXEN_ENCODING_ERROR_HEADER_NAME
not in msg
.keys(), "Unexpected error header"
630 def check_quopri(msg
, data
):
631 assert msg
._payload
.encode() != data
, "Payload has not been transformed"
632 assert quopri
.decodestring(msg
._payload
) == data
, "Payload was not encoded correctly"
634 assert INBOXEN_ENCODING_ERROR_HEADER_NAME
not in msg
.keys(), "Unexpected error header"
637 def check_uu(msg
, data
):
638 assert msg
._payload
.encode() != data
, "Payload has not been transformed"
642 uu
.decode(BytesIO(msg
._payload
.encode()), outfile
)
643 payload
= outfile
.getvalue()
645 assert False, "Payload could not be decoded"
646 assert payload
== data
, "Decoded payload does not match input data"
648 assert INBOXEN_ENCODING_ERROR_HEADER_NAME
not in msg
.keys(), "Unexpected error header"
651 class LiberationFormTestCase(InboxenTestCase
):
653 self
.form_data
= {"storage_type": "0", "compression_type": "0"}
654 self
.user
= factories
.UserFactory()
656 def test_form_time_invalid(self
):
657 self
.user
.liberation
.started
= self
.user
.liberation
.last_finished
= timezone
.now()
658 form
= LiberationForm(user
=self
.user
, data
=self
.form_data
)
659 self
.assertFalse(form
.is_valid())
661 def test_form_time_valid(self
):
662 self
.user
.liberation
.started
= self
.user
.liberation
.last_finished
= None
663 form
= LiberationForm(user
=self
.user
, data
=self
.form_data
)
664 self
.assertTrue(form
.is_valid())