Merge pull request 'Upgrade to 3.12' (#630) from python-3.12 into main
[inboxen.git] / inboxen / liberation / tasks.py
blobee019fea73f18bb09a5b898ef2df1f3657c3cecd
1 ##
2 # Copyright (C) 2013, 2014, 2015, 2016, 2017 Jessica Tallon & Matt Molyneaux
4 # This file is part of Inboxen.
6 # Inboxen is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
11 # Inboxen is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
16 # You should have received a copy of the GNU Affero General Public License
17 # along with Inboxen. If not, see <http://www.gnu.org/licenses/>.
20 from shutil import rmtree
21 import hashlib
22 import json
23 import logging
24 import mailbox
25 import os
26 import string
27 import tarfile
28 import time
30 from celery import chain, chord
31 from django import urls
32 from django.conf import settings
33 from django.contrib.auth import get_user_model
34 from django.db import IntegrityError, transaction
35 from django.db.models import Prefetch
36 from django.utils import safestring, timezone
37 from django.utils.crypto import get_random_string
38 from django.utils.translation import gettext as _
40 from inboxen import tasks
41 from inboxen.async_messages import message_user
42 from inboxen.celery import app
43 from inboxen.liberation import utils
44 from inboxen.models import Email, Inbox
45 from inboxen.tickets.models import Question, Response
46 from inboxen.utils.tasks import task_group_skew
48 log = logging.getLogger(__name__)
50 TAR_TYPES = {
51 '0': {'ext': 'tar.gz', 'writer': 'w:gz', 'mime-type': 'application/x-gzip'},
52 '1': {'ext': 'tar.bz2', 'writer': 'w:bz2', 'mime-type': 'application/x-bzip2'},
53 '2': {'ext': 'tar', 'writer': 'w:', 'mime-type': 'application/x-tar'}
57 @app.task(rate_limit='4/h')
58 def liberate(user_id, options):
59 """ Get set for liberation, expects User object """
60 options['user'] = user_id
61 user = get_user_model().objects.get(id=user_id)
62 lib_status = user.liberation
64 tar_type = TAR_TYPES[options.get('compression_type', '0')]
66 rstr = get_random_string(7, string.ascii_letters)
67 username = user.username + rstr
68 username = username.encode("utf-8")
69 basename = "%s_%s_%s_%s" % (time.time(), os.getpid(), rstr, hashlib.sha256(username).hexdigest()[:50])
70 path = os.path.join(settings.SENDFILE_ROOT, basename)
71 tarname = "%s.%s" % (basename, tar_type["ext"])
73 # Is this safe enough?
74 try:
75 os.mkdir(path, 0o700)
76 except (IOError, OSError) as error:
77 log.info("Couldn't create dir at %s", path)
78 raise liberate.retry(exc=error)
80 try:
81 lib_status.path = tarname
82 lib_status.save()
83 except IntegrityError:
84 os.rmdir(path)
85 raise
87 options["path"] = path
88 options["tarname"] = tarname
90 mail_path = os.path.join(path, 'emails')
91 # make maildir
92 mailbox.Maildir(mail_path, factory=None)
94 inbox_tasks = [liberate_inbox.s(mail_path, inbox.id) for inbox in
95 Inbox.objects.filter(user=user, deleted=False).only('id').iterator()]
96 if len(inbox_tasks) > 0:
97 tasks = chord(
98 inbox_tasks,
99 liberate_collect_emails.s(mail_path, options)
101 else:
102 options["noEmails"] = True
103 data = {"results": []}
104 tasks = chain(
105 liberate_fetch_info.s(data, options),
106 liberate_tarball.s(options),
107 liberation_finish.s(options)
110 async_result = tasks.apply_async()
112 lib_status.async_result = async_result.id
113 lib_status.save()
116 @app.task(rate_limit='100/m')
117 def liberate_inbox(mail_path, inbox_id):
118 """ Gather email IDs """
119 inbox = Inbox.objects.get(id=inbox_id, deleted=False)
120 maildir = mailbox.Maildir(mail_path, factory=None)
121 maildir.add_folder(str(inbox))
123 return {
124 'folder': str(inbox),
125 'ids': [email.id for email in Email.objects.filter(inbox=inbox, deleted=False).only('id')
126 .iterator()]
130 @app.task()
131 def liberate_collect_emails(results, mail_path, options):
132 """ Send off data mining tasks """
133 msg_tasks = []
134 results = results or []
135 for result in results:
136 inbox = [(mail_path, result['folder'], email_id) for email_id in result['ids']]
137 msg_tasks.extend(inbox)
139 task_len = len(msg_tasks)
141 if task_len > 0:
142 msg_tasks = liberate_message.chunks(msg_tasks, 100).group()
143 task_group_skew(msg_tasks, step=10)
144 msg_tasks = chain(
145 msg_tasks,
146 liberate_convert_box.s(mail_path, options),
147 liberate_fetch_info.s(options),
148 liberate_tarball.s(options),
149 liberation_finish.s(options)
151 else:
152 options["noEmails"] = True
153 data = {"results": []}
154 msg_tasks = chain(
155 liberate_convert_box.s(data, mail_path, options),
156 liberate_fetch_info.s(options),
157 liberate_tarball.s(options),
158 liberation_finish.s(options)
161 async_result = msg_tasks.apply_async()
163 lib_status = get_user_model().objects.get(id=options["user"]).liberation
164 lib_status.async_result = async_result.id
165 lib_status.save()
168 @app.task(rate_limit='1000/m')
169 @transaction.atomic()
170 def liberate_message(mail_path, inbox, email_id):
171 """ Take email from database and put on filesystem """
172 maildir = mailbox.Maildir(mail_path, factory=None).get_folder(inbox)
174 try:
175 msg = Email.objects.get(id=email_id, deleted=False)
176 msg = utils.make_message(msg)
177 maildir.add(msg.as_bytes())
178 except Exception as exc:
179 msg_id = hex(int(email_id))[2:]
180 log.warning("Exception processing %s", msg_id, exc_info=exc)
181 return msg_id
184 @app.task()
185 def liberate_convert_box(result, mail_path, options):
186 """ Convert maildir to mbox if needed """
187 if options['storage_type'] == '1':
188 maildir = mailbox.Maildir(mail_path, factory=None)
189 mbox = mailbox.mbox(mail_path + '.mbox')
190 mbox.lock()
192 for inbox in maildir.list_folders():
193 folder = maildir.get_folder(inbox)
195 for key in folder.keys():
196 msg = folder.pop(key)
197 mbox.add(msg)
198 maildir.remove_folder(inbox)
200 rmtree(mail_path)
201 mbox.close()
203 return result
206 @app.task()
207 def liberate_fetch_info(result, options):
208 """Fetch user info and dump json to files"""
209 inbox_json = liberate_inbox_metadata(options['user'])
210 profile_json = liberate_user_profile(options['user'], result or [])
211 questions_json = liberate_user_questions(options['user'])
213 with open(os.path.join(options["path"], "profile.json"), "w") as profile:
214 profile.write(profile_json)
215 with open(os.path.join(options["path"], "inbox.json"), "w") as inbox:
216 inbox.write(inbox_json)
217 with open(os.path.join(options["path"], "questions.json"), "w") as questions:
218 questions.write(questions_json)
221 @app.task(default_retry_delay=600)
222 def liberate_tarball(result, options):
223 """ Tar up and delete the dir """
225 tar_type = TAR_TYPES[options.get('compression_type', '0')]
226 tar_name = os.path.join(settings.SENDFILE_ROOT, options["tarname"])
228 try:
229 tar = tarfile.open(tar_name, tar_type['writer'])
230 except (IOError, OSError) as error:
231 log.debug("Couldn't open tarfile at %s", tar_name)
232 raise liberate_tarball.retry(exc=error)
234 user = get_user_model().objects.get(id=options['user'])
235 lib_status = user.liberation
237 date = str(lib_status.started)
238 dir_name = "inboxen-%s" % date
240 try:
241 # directories are added recursively by default
242 tar.add("%s/" % options["path"], dir_name)
243 finally:
244 tar.close()
245 rmtree(options["path"])
247 return tar_name
250 @app.task(ignore_result=True)
251 @transaction.atomic()
252 def liberation_finish(result, options):
253 """ Create email to send to user """
254 user = get_user_model().objects.get(id=options['user'])
255 lib_status = user.liberation
256 lib_status.running = False
257 lib_status.last_finished = timezone.now()
258 lib_status.content_type = int(options.get('compression_type', '0'))
260 lib_status.save()
262 message = _("Your request for your personal data has been completed. Click "
263 "<a class=\"alert-link\" href=\"%s\">here</a> to download your archive.")
264 message_user(user, safestring.mark_safe(message % urls.reverse("user-liberate-get")))
266 log.info("Finished liberation for %s", options['user'])
268 # run a garbage collection on all workers - liberation is leaky
269 tasks.force_garbage_collection.delay()
272 def liberate_user_profile(user_id, email_results):
273 """User profile data"""
274 data = {
275 "preferences": {}
277 user = get_user_model().objects.get(id=user_id)
279 # user's preferences
280 profile = user.inboxenprofile
281 data["preferences"]["prefer_html_email"] = profile.prefer_html_email
282 data["preferences"]["prefered_domain"] = str(profile.prefered_domain) if profile.prefered_domain else None
283 data["preferences"]["display_images"] = profile.display_images
285 # user data
286 data["id"] = user.id
287 data["username"] = user.username
288 data["is_active"] = user.is_active
289 data["join_date"] = user.date_joined.isoformat()
290 data["groups"] = [str(groups) for groups in user.groups.all()]
292 data["errors"] = []
293 email_results = email_results or []
294 for result in email_results:
295 if result:
296 data["errors"].append(str(result))
298 return json.dumps(data)
301 def liberate_user_questions(user_id):
302 """ Creates a json object of all the user's questions """
303 response_prefetch = Prefetch("response_set", queryset=Response.objects.all().select_related("author"))
304 questions = Question.objects.filter(author_id=user_id).prefetch_related(response_prefetch)
305 username = get_user_model().objects.get(id=user_id).username
307 data = []
308 for question in questions:
309 item = {
310 "subject": question.subject,
311 "author": username,
312 "date": question.date.isoformat(),
313 "last_modified": question.last_modified.isoformat(),
314 "status": question.get_status_display(),
315 "body": question.body,
316 "responses": []
319 for response in response_prefetch.queryset.filter(question=question):
320 item["responses"].append({
321 "author": response.author.username if response.author else None,
322 "date": response.date.isoformat(),
323 "body": response.body
326 data.append(item)
328 return json.dumps(data)
331 def liberate_inbox_metadata(user_id):
332 """ Grab metadata from inboxes """
333 data = {}
335 inboxes = Inbox.objects.filter(user__id=user_id)
336 for inbox in inboxes:
337 data[str(inbox)] = {
338 "created": inbox.created.isoformat(),
339 "flags": {
340 "deleted": inbox.deleted,
341 "new": inbox.new,
342 "exclude_from_unified": inbox.exclude_from_unified,
343 "disabled": inbox.disabled,
344 "pinned": inbox.pinned,
346 "description": inbox.description,
349 return json.dumps(data)