2 # Copyright (C) 2013, 2014, 2015, 2016, 2017 Jessica Tallon & Matt Molyneaux
4 # This file is part of Inboxen.
6 # Inboxen is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
11 # Inboxen is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
16 # You should have received a copy of the GNU Affero General Public License
17 # along with Inboxen. If not, see <http://www.gnu.org/licenses/>.
20 from datetime
import timedelta
21 from importlib
import import_module
25 from celery
import chain
26 from django
.apps
import apps
27 from django
.conf
import settings
28 from django
.contrib
.auth
import get_user_model
29 from django
.db
import IntegrityError
, transaction
30 from django
.db
.models
import Avg
, Case
, Count
, F
, Max
, Min
, StdDev
, Sum
, When
31 from django
.db
.models
.functions
import Coalesce
32 from django
.utils
import timezone
34 from inboxen
import models
35 from inboxen
.celery
import app
36 from inboxen
.utils
.tasks
import chunk_queryset
, create_queryset
, task_group_skew
38 log
= logging
.getLogger(__name__
)
41 @app.task(ignore_result
=True)
44 """Gather statistics about users and their inboxes"""
46 last_stat
= models
.Statistic
.objects
.latest("date")
47 except models
.Statistic
.DoesNotExist
:
50 # the keys of these dictionaries have awful names for historical reasons
51 # don't change them unless you want to do a data migration
52 one_day_ago
= timezone
.now() - timedelta(days
=1)
54 "count": Count("id", distinct
=True),
55 "new": Coalesce(Count(
57 When(date_joined__gte
=one_day_ago
, then
=F("id")),
61 "oldest_user_joined": Min("date_joined"),
62 "with_inboxes": Coalesce(Count(
64 When(inbox__isnull
=False, then
=F("id")),
68 "active": Coalesce(Count(
70 When(inboxenprofile__receiving_emails
=True, then
=F("id")),
77 "inbox_count__avg": Coalesce(Avg("inbox_count"), 0.0),
78 "inbox_count__max": Coalesce(Max("inbox_count"), 0),
79 "inbox_count__min": Coalesce(Min("inbox_count"), 0),
80 "inbox_count__stddev": Coalesce(StdDev("inbox_count"), 0.0),
81 "inbox_count__sum": Coalesce(Sum("inbox_count"), 0),
85 "email_count__avg": Coalesce(Avg("email_count"), 0.0),
86 "email_count__max": Coalesce(Max("email_count"), 0),
87 "email_count__min": Coalesce(Min("email_count"), 0),
88 "email_count__stddev": Coalesce(StdDev("email_count"), 0.0),
89 "email_count__sum": Coalesce(Sum("email_count"), 0),
92 # collect user and inbox stats
93 users
= get_user_model().objects
.aggregate(**user_aggregate
)
94 inboxes
= get_user_model().objects
.annotate(inbox_count
=Count("inbox__id")).aggregate(**inbox_aggregate
)
96 domain_count
= models
.Domain
.objects
.available(None).count()
97 inboxes_possible
= len(settings
.INBOX_CHOICES
) ** settings
.INBOX_LENGTH
99 inboxes
["total_possible"] = inboxes_possible
* domain_count
101 # collect email state
102 inbox_qs
= models
.Inbox
.objects
.exclude(deleted
=True).annotate(email_count
=Count("email__id"))
103 emails
= inbox_qs
.aggregate(**email_aggregate
)
105 inboxes
["with_emails"] = inbox_qs
.exclude(email_count
=0).count()
106 inboxes
["disowned"] = models
.Inbox
.objects
.filter(user__isnull
=True).count()
107 inboxes
["total"] = models
.Inbox
.objects
.count()
108 emails
["emails_read"] = models
.Email
.objects
.filter(read
=True).count()
111 email_diff
= (emails
["email_count__sum"] or 0) - (last_stat
.emails
["email_count__sum"] or 0)
112 emails
["running_total"] = last_stat
.emails
["running_total"] + max(email_diff
, 0)
114 emails
["running_total"] = emails
["email_count__sum"] or 0
116 stat
= models
.Statistic(
124 log
.info("Saved statistics (%s)", stat
.date
)
127 @app.task(ignore_result
=True)
128 def clean_expired_session():
129 """Clear expired sessions"""
130 engine
= import_module(settings
.SESSION_ENGINE
)
133 engine
.SessionStore
.clear_expired()
134 except NotImplementedError:
135 log
.info("%s does not implement clear_expired", settings
.SESSION_ENGINE
)
138 @app.task(ignore_result
=True)
139 @transaction.atomic()
140 def inbox_new_flag(user_id
, inbox_id
=None):
141 emails
= models
.Email
.objects
.order_by("-received_date").viewable(user_id
)
142 emails
= emails
.filter(inbox__exclude_from_unified
=False)
143 if inbox_id
is not None:
144 emails
= emails
.filter(inbox__id
=inbox_id
)
145 emails
= emails
.values_list("id", flat
=True)[:settings
.INBOX_PAGE_SIZE
]
146 email_count
= models
.Email
.objects
.filter(id__in
=emails
, seen
=False).count()
149 # if some emails haven't been seen yet, we have nothing else to do
151 elif inbox_id
is None:
152 profile
= models
.UserProfile
.objects
.get_or_create(user_id
=user_id
)[0]
153 profile
.unified_has_new_messages
= False
154 profile
.save(update_fields
=["unified_has_new_messages"])
156 inbox
= models
.Inbox
.objects
.get(user__id
=user_id
, id=inbox_id
)
158 inbox
.save(update_fields
=["new"])
161 @app.task(ignore_result
=True)
162 def set_emails_to_seen(email_id_list
, user_id
, inbox_id
=None):
163 """Set seen flags on a list of email IDs and then send off tasks to update
164 "new" flags on affected Inbox objects
166 models
.Email
.objects
.viewable(user_id
).filter(id__in
=email_id_list
).update(seen
=True)
168 kwargs
= {"email__id__in": email_id_list
, "user_id": user_id
}
169 if inbox_id
is not None:
170 kwargs
["id"] = inbox_id
172 batch_set_new_flags
.delay(user_id
=user_id
, kwargs
=kwargs
)
175 @app.task(ignore_result
=True)
176 def batch_set_new_flags(user_id
=None, args
=None, kwargs
=None, batch_number
=500):
177 inbox_list
= create_queryset("inbox", args
=args
, kwargs
=kwargs
).distinct().values_list("pk", "user_id")
180 for inbox
, user
in inbox_list
.iterator():
181 inboxes
.append((user
, inbox
))
184 inbox_tasks
= inbox_new_flag
.chunks(inboxes
, batch_number
).group()
185 task_group_skew(inbox_tasks
, step
=batch_number
/10.0)
186 inbox_tasks
.apply_async()
188 if user_id
is None and users
:
189 user_tasks
= inbox_new_flag
.chunks(users
, batch_number
).group()
190 task_group_skew(user_tasks
, step
=batch_number
/10.0)
191 user_tasks
.apply_async()
192 elif user_id
is not None:
193 inbox_new_flag
.delay(user_id
)
196 @app.task(ignore_result
=True)
197 def force_garbage_collection():
198 """Call the garbage collector.
200 This task expects to be sent to a broadcast queue
202 collected
= gc
.collect()
204 log
.info("GC collected {0} objects.".format(collected
))
207 @app.task(rate_limit
=500)
208 def delete_inboxen_item(model
, item_pk
):
209 _model
= apps
.get_app_config("inboxen").get_model(model
)
212 with transaction
.atomic():
213 item
= _model
.objects
.only('pk').get(pk
=item_pk
)
215 except (IntegrityError
, _model
.DoesNotExist
):
220 @transaction.atomic()
221 def batch_mark_as_deleted(model
, app
="inboxen", args
=None, kwargs
=None, skip_items
=None, limit_items
=None):
222 """Marks emails as deleted, but don't actually delete them"""
223 items
= create_queryset(model
, args
=args
, kwargs
=kwargs
, skip_items
=skip_items
, limit_items
=limit_items
)
224 # cannot slice and update at the same time, so we subquery
225 items
.model
.objects
.filter(pk__in
=items
).update(deleted
=True)
228 @app.task(rate_limit
="1/m")
229 @transaction.atomic()
230 def batch_delete_items(model
, args
=None, kwargs
=None, skip_items
=None,
231 limit_items
=None, batch_number
=500, chunk_size
=10000, delay
=20):
232 """If something goes wrong and you've got a lot of orphaned entries in the
233 database, then this is the task you want.
235 Be aware: this task pulls a list of PKs from the database which may cause
236 increased memory use in the short term.
239 * args and kwargs should be obvious
240 * batch_number is the number of delete tasks that get sent off in one go
241 * chunk_size is the number of PKs that are loaded into memory at once
242 * delay is the number of seconds between each batch of delete tasks
244 items
= create_queryset(model
, args
=args
, kwargs
=kwargs
, skip_items
=skip_items
, limit_items
=limit_items
)
245 for idx
, chunk
in chunk_queryset(items
, chunk_size
):
246 items
= delete_inboxen_item
.chunks([(model
, i
) for i
in chunk
], batch_number
).group()
247 task_group_skew(items
, start
=(idx
+ 1) * delay
, step
=delay
)
251 @app.task(rate_limit
="1/h")
252 def clean_orphan_models():
254 batch_delete_items
.delay("body", kwargs
={"partlist__isnull": True})
257 batch_delete_items
.delay("headername", kwargs
={"header__isnull": True})
260 batch_delete_items
.delay("headerdata", kwargs
={"header__isnull": True})
263 @app.task(rate_limit
="1/h")
264 def auto_delete_emails():
266 batch_mark_as_deleted
.si("email", kwargs
={
267 "inbox__user__inboxenprofile__auto_delete": True,
268 "received_date__lt": timezone
.now() - timedelta(days
=settings
.INBOX_AUTO_DELETE_TIME
),
271 batch_set_new_flags
.si(kwargs
={"email__deleted": True}),
272 batch_delete_items
.si("email", kwargs
={"deleted": True}),
276 @app.task(rate_limit
="1/h")
277 def calculate_quota(batch_number
=500, chunk_size
=10000, delay
=20):
278 if not settings
.PER_USER_EMAIL_QUOTA
:
281 users
= get_user_model().objects
.all()
283 for idx
, chunk
in chunk_queryset(users
, chunk_size
):
284 user_tasks
= calculate_user_quota
.chunks([(i
,) for i
in chunk
], batch_number
).group()
285 task_group_skew(user_tasks
, start
=(idx
+ 1) * delay
, step
=delay
)
290 def calculate_user_quota(user_id
, enable_delete
=True):
291 if not settings
.PER_USER_EMAIL_QUOTA
:
295 profile
= get_user_model().objects
.get(id=user_id
).inboxenprofile
296 except get_user_model().DoesNotExist
:
299 # 90% of the value of per user email quota. Users who have opted to delete
300 # old mail when they reach quota limit need space for new mail to come in
301 quota_ninety_percent_value
= (settings
.PER_USER_EMAIL_QUOTA
// 10) * 9
303 email_count
= models
.Email
.objects
.filter(inbox__user_id
=user_id
).count()
305 profile
.quota_percent_usage
= min((email_count
* 100) / settings
.PER_USER_EMAIL_QUOTA
, 100)
306 profile
.save(update_fields
=["quota_percent_usage"])
308 if enable_delete
and profile
.quota_options
== profile
.DELETE_MAIL
and email_count
> quota_ninety_percent_value
:
310 batch_mark_as_deleted
.si("email", kwargs
={"important": False, "inbox__user_id": user_id
},
311 skip_items
=quota_ninety_percent_value
),
312 batch_set_new_flags
.si(user_id
=user_id
, kwargs
={"email__deleted": True}),
313 batch_delete_items
.si("email", kwargs
={"deleted": True, "inbox__user_id": user_id
}),
314 calculate_user_quota
.si(user_id
, False),