2 # Copyright (C) 2013, 2014, 2015, 2016, 2017 Jessica Tallon & Matt Molyneaux
4 # This file is part of Inboxen.
6 # Inboxen is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
11 # Inboxen is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
16 # You should have received a copy of the GNU Affero General Public License
17 # along with Inboxen. If not, see <http://www.gnu.org/licenses/>.
20 from shutil
import rmtree
30 from celery
import chain
, chord
31 from django
import urls
32 from django
.conf
import settings
33 from django
.contrib
.auth
import get_user_model
34 from django
.db
import IntegrityError
, transaction
35 from django
.db
.models
import Prefetch
36 from django
.utils
import safestring
, timezone
37 from django
.utils
.crypto
import get_random_string
38 from django
.utils
.translation
import gettext
as _
40 from inboxen
import tasks
41 from inboxen
.async_messages
import message_user
42 from inboxen
.celery
import app
43 from inboxen
.liberation
import utils
44 from inboxen
.models
import Email
, Inbox
45 from inboxen
.tickets
.models
import Question
, Response
46 from inboxen
.utils
.tasks
import task_group_skew
48 log
= logging
.getLogger(__name__
)
51 '0': {'ext': 'tar.gz', 'writer': 'w:gz', 'mime-type': 'application/x-gzip'},
52 '1': {'ext': 'tar.bz2', 'writer': 'w:bz2', 'mime-type': 'application/x-bzip2'},
53 '2': {'ext': 'tar', 'writer': 'w:', 'mime-type': 'application/x-tar'}
57 @app.task(rate_limit
='4/h')
58 def liberate(user_id
, options
):
59 """ Get set for liberation, expects User object """
60 options
['user'] = user_id
61 user
= get_user_model().objects
.get(id=user_id
)
62 lib_status
= user
.liberation
64 tar_type
= TAR_TYPES
[options
.get('compression_type', '0')]
66 rstr
= get_random_string(7, string
.ascii_letters
)
67 username
= user
.username
+ rstr
68 username
= username
.encode("utf-8")
69 basename
= "%s_%s_%s_%s" % (time
.time(), os
.getpid(), rstr
, hashlib
.sha256(username
).hexdigest()[:50])
70 path
= os
.path
.join(settings
.SENDFILE_ROOT
, basename
)
71 tarname
= "%s.%s" % (basename
, tar_type
["ext"])
73 # Is this safe enough?
76 except (IOError, OSError) as error
:
77 log
.info("Couldn't create dir at %s", path
)
78 raise liberate
.retry(exc
=error
)
81 lib_status
.path
= tarname
83 except IntegrityError
:
87 options
["path"] = path
88 options
["tarname"] = tarname
90 mail_path
= os
.path
.join(path
, 'emails')
92 mailbox
.Maildir(mail_path
, factory
=None)
94 inbox_tasks
= [liberate_inbox
.s(mail_path
, inbox
.id) for inbox
in
95 Inbox
.objects
.filter(user
=user
, deleted
=False).only('id').iterator()]
96 if len(inbox_tasks
) > 0:
99 liberate_collect_emails
.s(mail_path
, options
)
102 options
["noEmails"] = True
103 data
= {"results": []}
105 liberate_fetch_info
.s(data
, options
),
106 liberate_tarball
.s(options
),
107 liberation_finish
.s(options
)
110 async_result
= tasks
.apply_async()
112 lib_status
.async_result
= async_result
.id
116 @app.task(rate_limit
='100/m')
117 def liberate_inbox(mail_path
, inbox_id
):
118 """ Gather email IDs """
119 inbox
= Inbox
.objects
.get(id=inbox_id
, deleted
=False)
120 maildir
= mailbox
.Maildir(mail_path
, factory
=None)
121 maildir
.add_folder(str(inbox
))
124 'folder': str(inbox
),
125 'ids': [email
.id for email
in Email
.objects
.filter(inbox
=inbox
, deleted
=False).only('id')
131 def liberate_collect_emails(results
, mail_path
, options
):
132 """ Send off data mining tasks """
134 results
= results
or []
135 for result
in results
:
136 inbox
= [(mail_path
, result
['folder'], email_id
) for email_id
in result
['ids']]
137 msg_tasks
.extend(inbox
)
139 task_len
= len(msg_tasks
)
142 msg_tasks
= liberate_message
.chunks(msg_tasks
, 100).group()
143 task_group_skew(msg_tasks
, step
=10)
146 liberate_convert_box
.s(mail_path
, options
),
147 liberate_fetch_info
.s(options
),
148 liberate_tarball
.s(options
),
149 liberation_finish
.s(options
)
152 options
["noEmails"] = True
153 data
= {"results": []}
155 liberate_convert_box
.s(data
, mail_path
, options
),
156 liberate_fetch_info
.s(options
),
157 liberate_tarball
.s(options
),
158 liberation_finish
.s(options
)
161 async_result
= msg_tasks
.apply_async()
163 lib_status
= get_user_model().objects
.get(id=options
["user"]).liberation
164 lib_status
.async_result
= async_result
.id
168 @app.task(rate_limit
='1000/m')
169 @transaction.atomic()
170 def liberate_message(mail_path
, inbox
, email_id
):
171 """ Take email from database and put on filesystem """
172 maildir
= mailbox
.Maildir(mail_path
, factory
=None).get_folder(inbox
)
175 msg
= Email
.objects
.get(id=email_id
, deleted
=False)
176 msg
= utils
.make_message(msg
)
177 maildir
.add(msg
.as_bytes())
178 except Exception as exc
:
179 msg_id
= hex(int(email_id
))[2:]
180 log
.warning("Exception processing %s", msg_id
, exc_info
=exc
)
185 def liberate_convert_box(result
, mail_path
, options
):
186 """ Convert maildir to mbox if needed """
187 if options
['storage_type'] == '1':
188 maildir
= mailbox
.Maildir(mail_path
, factory
=None)
189 mbox
= mailbox
.mbox(mail_path
+ '.mbox')
192 for inbox
in maildir
.list_folders():
193 folder
= maildir
.get_folder(inbox
)
195 for key
in folder
.keys():
196 msg
= folder
.pop(key
)
198 maildir
.remove_folder(inbox
)
207 def liberate_fetch_info(result
, options
):
208 """Fetch user info and dump json to files"""
209 inbox_json
= liberate_inbox_metadata(options
['user'])
210 profile_json
= liberate_user_profile(options
['user'], result
or [])
211 questions_json
= liberate_user_questions(options
['user'])
213 with
open(os
.path
.join(options
["path"], "profile.json"), "w") as profile
:
214 profile
.write(profile_json
)
215 with
open(os
.path
.join(options
["path"], "inbox.json"), "w") as inbox
:
216 inbox
.write(inbox_json
)
217 with
open(os
.path
.join(options
["path"], "questions.json"), "w") as questions
:
218 questions
.write(questions_json
)
221 @app.task(default_retry_delay
=600)
222 def liberate_tarball(result
, options
):
223 """ Tar up and delete the dir """
225 tar_type
= TAR_TYPES
[options
.get('compression_type', '0')]
226 tar_name
= os
.path
.join(settings
.SENDFILE_ROOT
, options
["tarname"])
229 tar
= tarfile
.open(tar_name
, tar_type
['writer'])
230 except (IOError, OSError) as error
:
231 log
.debug("Couldn't open tarfile at %s", tar_name
)
232 raise liberate_tarball
.retry(exc
=error
)
234 user
= get_user_model().objects
.get(id=options
['user'])
235 lib_status
= user
.liberation
237 date
= str(lib_status
.started
)
238 dir_name
= "inboxen-%s" % date
241 # directories are added recursively by default
242 tar
.add("%s/" % options
["path"], dir_name
)
245 rmtree(options
["path"])
250 @app.task(ignore_result
=True)
251 @transaction.atomic()
252 def liberation_finish(result
, options
):
253 """ Create email to send to user """
254 user
= get_user_model().objects
.get(id=options
['user'])
255 lib_status
= user
.liberation
256 lib_status
.running
= False
257 lib_status
.last_finished
= timezone
.now()
258 lib_status
.content_type
= int(options
.get('compression_type', '0'))
262 message
= _("Your request for your personal data has been completed. Click "
263 "<a class=\"alert-link\" href=\"%s\">here</a> to download your archive.")
264 message_user(user
, safestring
.mark_safe(message
% urls
.reverse("user-liberate-get")))
266 log
.info("Finished liberation for %s", options
['user'])
268 # run a garbage collection on all workers - liberation is leaky
269 tasks
.force_garbage_collection
.delay()
272 def liberate_user_profile(user_id
, email_results
):
273 """User profile data"""
277 user
= get_user_model().objects
.get(id=user_id
)
280 profile
= user
.inboxenprofile
281 data
["preferences"]["prefer_html_email"] = profile
.prefer_html_email
282 data
["preferences"]["prefered_domain"] = str(profile
.prefered_domain
) if profile
.prefered_domain
else None
283 data
["preferences"]["display_images"] = profile
.display_images
287 data
["username"] = user
.username
288 data
["is_active"] = user
.is_active
289 data
["join_date"] = user
.date_joined
.isoformat()
290 data
["groups"] = [str(groups
) for groups
in user
.groups
.all()]
293 email_results
= email_results
or []
294 for result
in email_results
:
296 data
["errors"].append(str(result
))
298 return json
.dumps(data
)
301 def liberate_user_questions(user_id
):
302 """ Creates a json object of all the user's questions """
303 response_prefetch
= Prefetch("response_set", queryset
=Response
.objects
.all().select_related("author"))
304 questions
= Question
.objects
.filter(author_id
=user_id
).prefetch_related(response_prefetch
)
305 username
= get_user_model().objects
.get(id=user_id
).username
308 for question
in questions
:
310 "subject": question
.subject
,
312 "date": question
.date
.isoformat(),
313 "last_modified": question
.last_modified
.isoformat(),
314 "status": question
.get_status_display(),
315 "body": question
.body
,
319 for response
in response_prefetch
.queryset
.filter(question
=question
):
320 item
["responses"].append({
321 "author": response
.author
.username
if response
.author
else None,
322 "date": response
.date
.isoformat(),
323 "body": response
.body
328 return json
.dumps(data
)
331 def liberate_inbox_metadata(user_id
):
332 """ Grab metadata from inboxes """
335 inboxes
= Inbox
.objects
.filter(user__id
=user_id
)
336 for inbox
in inboxes
:
338 "created": inbox
.created
.isoformat(),
340 "deleted": inbox
.deleted
,
342 "exclude_from_unified": inbox
.exclude_from_unified
,
343 "disabled": inbox
.disabled
,
344 "pinned": inbox
.pinned
,
346 "description": inbox
.description
,
349 return json
.dumps(data
)