2 -- Copyright (C) 2008-2017 Matthew Wild
3 -- Copyright (C) 2008-2017 Waqas Hussain
4 -- Copyright (C) 2011-2017 Kim Alvefur
6 -- This project is MIT/X11 licensed. Please see the
7 -- COPYING file in the source package for more information.
9 -- XEP-0313: Message Archive Management for Prosody
12 local xmlns_mam
= "urn:xmpp:mam:2";
13 local xmlns_delay
= "urn:xmpp:delay";
14 local xmlns_forward
= "urn:xmpp:forward:0";
15 local xmlns_st_id
= "urn:xmpp:sid:0";
17 local um
= require
"core.usermanager";
18 local st
= require
"util.stanza";
19 local rsm
= require
"util.rsm";
20 local get_prefs
= module
:require
"mamprefs".get
;
21 local set_prefs
= module
:require
"mamprefs".set
;
22 local prefs_to_stanza
= module
:require
"mamprefsxml".tostanza
;
23 local prefs_from_stanza
= module
:require
"mamprefsxml".fromstanza
;
24 local jid_bare
= require
"util.jid".bare
;
25 local jid_split
= require
"util.jid".split
;
26 local jid_prepped_split
= require
"util.jid".prepped_split
;
27 local dataform
= require
"util.dataforms".new
;
28 local host
= module
.host
;
30 local rm_load_roster
= require
"core.rostermanager".load_roster
;
32 local is_stanza
= st
.is_stanza
;
33 local tostring = tostring;
34 local time_now
= os
.time
;
35 local m_min
= math
.min;
36 local timestamp
, timestamp_parse
, datestamp
= import( "util.datetime", "datetime", "parse", "date");
37 local default_max_items
, max_max_items
= 20, module
:get_option_number("max_archive_query_results", 50);
38 local strip_tags
= module
:get_option_set("dont_archive_namespaces", { "http://jabber.org/protocol/chatstates" });
40 local archive_store
= module
:get_option_string("archive_store", "archive");
41 local archive
= module
:open_store(archive_store
, "archive");
43 local cleanup_after
= module
:get_option_string("archive_expires_after", "1w");
44 local cleanup_interval
= module
:get_option_number("archive_cleanup_interval", 4 * 60 * 60);
45 local archive_item_limit
= module
:get_option_number("storage_archive_item_limit", archive
.caps
and archive
.caps
.quota
or 1000);
46 if not archive
.find
then
47 error("mod_"..(archive
._provided_by
or archive
.name
and "storage_"..archive
.name
).." does not support archiving\n"
48 .."See https://prosody.im/doc/storage and https://prosody.im/doc/archiving for more information");
50 local use_total
= module
:get_option_boolean("mam_include_total", true);
52 function schedule_cleanup()
53 -- replaced by non-noop later if cleanup is enabled
57 module
:hook("iq/self/"..xmlns_mam
..":prefs", function(event
)
58 local origin
, stanza
= event
.origin
, event
.stanza
;
59 local user
= origin
.username
;
60 if stanza
.attr
.type == "set" then
61 local new_prefs
= stanza
:get_child("prefs", xmlns_mam
);
62 local prefs
= prefs_from_stanza(new_prefs
);
63 local ok
, err
= set_prefs(user
, prefs
);
65 origin
.send(st
.error_reply(stanza
, "cancel", "internal-server-error", "Error storing preferences: "..tostring(err
)));
69 local prefs
= prefs_to_stanza(get_prefs(user
, true));
70 local reply
= st
.reply(stanza
):add_child(prefs
);
75 local query_form
= dataform
{
76 { name
= "FORM_TYPE"; type = "hidden"; value
= xmlns_mam
; };
77 { name
= "with"; type = "jid-single"; };
78 { name
= "start"; type = "text-single" };
79 { name
= "end"; type = "text-single"; };
83 module
:hook("iq-get/self/"..xmlns_mam
..":query", function(event
)
84 local origin
, stanza
= event
.origin
, event
.stanza
;
85 get_prefs(origin
.username
, true);
86 origin
.send(st
.reply(stanza
):query(xmlns_mam
):add_child(query_form
:form()));
90 -- Handle archive queries
91 module
:hook("iq-set/self/"..xmlns_mam
..":query", function(event
)
92 local origin
, stanza
= event
.origin
, event
.stanza
;
93 local query
= stanza
.tags
[1];
94 local qid
= query
.attr
.queryid
;
96 get_prefs(origin
.username
, true);
98 -- Search query parameters
99 local qwith
, qstart
, qend
;
100 local form
= query
:get_child("x", "jabber:x:data");
103 form
, err
= query_form
:data(form
);
105 origin
.send(st
.error_reply(stanza
, "modify", "bad-request", select(2, next(err
))));
108 qwith
, qstart
, qend
= form
["with"], form
["start"], form
["end"];
109 qwith
= qwith
and jid_bare(qwith
); -- dataforms does jidprep
112 if qstart
or qend
then -- Validate timestamps
113 local vstart
, vend
= (qstart
and timestamp_parse(qstart
)), (qend
and timestamp_parse(qend
));
114 if (qstart
and not vstart
) or (qend
and not vend
) then
115 origin
.send(st
.error_reply(stanza
, "modify", "bad-request", "Invalid timestamp"))
118 qstart
, qend
= vstart
, vend
;
121 module
:log("debug", "Archive query by %s id=%s with=%s when=%s...%s",
123 qid
or stanza
.attr
.id
,
125 qstart
and timestamp(qstart
) or "",
126 qend
and timestamp(qend
) or "");
129 local qset
= rsm
.get(query
);
130 local qmax
= m_min(qset
and qset
.max or default_max_items
, max_max_items
);
131 local reverse
= qset
and qset
.before
or false;
132 local before
, after
= qset
and qset
.before
, qset
and qset
.after
;
133 if type(before
) ~= "string" then before
= nil; end
135 module
:log("debug", "Archive query id=%s rsm=%q", qid
or stanza
.attr
.id
, qset
);
138 -- Load all the data!
139 local data
, err
= archive
:find(origin
.username
, {
140 start
= qstart
; ["end"] = qend
; -- Time range
142 limit
= qmax
== 0 and 0 or qmax
+ 1;
143 before
= before
; after
= after
;
145 total
= use_total
or qmax
== 0;
149 module
:log("debug", "Archive query id=%s failed: %s", qid
or stanza
.attr
.id
, err
);
150 if err
== "item-not-found" then
151 origin
.send(st
.error_reply(stanza
, "modify", "item-not-found"));
153 origin
.send(st
.error_reply(stanza
, "cancel", "internal-server-error"));
157 local total
= tonumber(err
);
159 local msg_reply_attr
= { to
= stanza
.attr
.from
, from
= stanza
.attr
.to
};
163 -- Wrap it in stuff and deliver
166 local complete
= "true";
167 for id
, item
, when
in data
do
170 -- We requested qmax+1 items. If that many items are retrieved then
171 -- there are more results to page through, so:
175 local fwd_st
= st
.message(msg_reply_attr
)
176 :tag("result", { xmlns
= xmlns_mam
, queryid
= qid
, id
= id
})
177 :tag("forwarded", { xmlns
= xmlns_forward
})
178 :tag("delay", { xmlns
= xmlns_delay
, stamp
= timestamp(when
) }):up();
180 if not is_stanza(item
) then
181 item
= st
.deserialize(item
);
183 item
.attr
.xmlns
= "jabber:client";
184 fwd_st
:add_child(item
);
186 if not first
then first
= id
; end
190 results
[count
] = fwd_st
;
197 for i
= #results
, 1, -1 do
198 origin
.send(results
[i
]);
200 first
, last
= last
, first
;
203 origin
.send(st
.reply(stanza
)
204 :tag("fin", { xmlns
= xmlns_mam
, queryid
= qid
, complete
= complete
})
205 :add_child(rsm
.generate
{
206 first
= first
, last
= last
, count
= total
}));
209 module
:log("debug", "Archive query id=%s completed, %d items returned", qid
or stanza
.attr
.id
, complete
and count
or count
- 1);
213 local function has_in_roster(user
, who
)
214 local roster
= rm_load_roster(user
, host
);
215 module
:log("debug", "%s has %s in roster? %s", user
, who
, roster
[who
] and "yes" or "no");
219 local function shall_store(user
, who
)
221 if not um
.user_exists(user
, host
) then
222 module
:log("debug", "%s@%s does not exist", user
, host
)
225 local prefs
= get_prefs(user
);
226 local rule
= prefs
[who
];
227 module
:log("debug", "%s's rule for %s is %s", user
, who
, rule
);
231 -- Below could be done by a metatable
232 local default
= prefs
[false];
233 module
:log("debug", "%s's default rule is %s", user
, default
);
234 if default
== "roster" then
235 return has_in_roster(user
, who
);
240 local function strip_stanza_id(stanza
, user
)
241 if stanza
:get_child("stanza-id", xmlns_st_id
) then
242 stanza
= st
.clone(stanza
);
243 stanza
:maptags(function (tag)
244 if tag.name
== "stanza-id" and tag.attr
.xmlns
== xmlns_st_id
then
245 local by_user
, by_host
, res
= jid_prepped_split(tag.attr
.by
);
246 if not res
and by_host
== host
and by_user
== user
then
257 local function message_handler(event
, c2s
)
258 local origin
, stanza
= event
.origin
, event
.stanza
;
259 local log = c2s
and origin
.log or module
._log
;
260 local orig_type
= stanza
.attr
.type or "normal";
261 local orig_from
= stanza
.attr
.from
;
262 local orig_to
= stanza
.attr
.to
or orig_from
;
263 -- Stanza without 'to' are treated as if it was to their own bare jid
265 -- Whos storage do we put it in?
266 local store_user
= c2s
and origin
.username
or jid_split(orig_to
);
267 -- And who are they chatting with?
268 local with
= jid_bare(c2s
and orig_to
or orig_from
);
270 -- Filter out <stanza-id> that claim to be from us
271 event
.stanza
= strip_stanza_id(stanza
, store_user
);
273 -- We store chat messages or normal messages that have a body
274 if not(orig_type
== "chat" or (orig_type
== "normal" and stanza
:get_child("body")) ) then
275 log("debug", "Not archiving stanza: %s (type)", stanza
:top_tag());
279 -- or if hints suggest we shouldn't
280 if not stanza
:get_child("store", "urn:xmpp:hints") then -- No hint telling us we should store
281 if stanza
:get_child("no-permanent-store", "urn:xmpp:hints")
282 or stanza
:get_child("no-store", "urn:xmpp:hints") then -- Hint telling us we should NOT store
283 log("debug", "Not archiving stanza: %s (hint)", stanza
:top_tag());
288 local clone_for_storage
;
289 if not strip_tags
:empty() then
290 clone_for_storage
= st
.clone(stanza
);
291 clone_for_storage
:maptags(function (tag)
292 if strip_tags
:contains(tag.attr
.xmlns
) then
298 if #clone_for_storage
.tags
== 0 then
299 log("debug", "Not archiving stanza: %s (empty when stripped)", stanza
:top_tag());
303 clone_for_storage
= stanza
;
306 -- Check with the users preferences
307 if shall_store(store_user
, with
) then
308 log("debug", "Archiving stanza: %s", stanza
:top_tag());
311 local time
= time_now();
312 local ok
, err
= archive
:append(store_user
, nil, clone_for_storage
, time
, with
);
313 if not ok
and err
== "quota-limit" then
314 if type(cleanup_after
) == "number" then
315 module
:log("debug", "User '%s' over quota, cleaning archive", store_user
);
316 local cleaned
= archive
:delete(store_user
, {
317 ["end"] = (os
.time() - cleanup_after
);
320 ok
, err
= archive
:append(store_user
, nil, clone_for_storage
, time
, with
);
323 if not ok
and (archive
.caps
and archive
.caps
.truncate
) then
324 module
:log("debug", "User '%s' over quota, truncating archive", store_user
);
325 local truncated
= archive
:delete(store_user
, {
326 truncate
= archive_item_limit
- 1;
329 ok
, err
= archive
:append(store_user
, nil, clone_for_storage
, time
, with
);
334 local clone_for_other_handlers
= st
.clone(stanza
);
336 clone_for_other_handlers
:tag("stanza-id", { xmlns
= xmlns_st_id
, by
= store_user
.."@"..host
, id
= id
}):up();
337 event
.stanza
= clone_for_other_handlers
;
338 schedule_cleanup(store_user
);
339 module
:fire_event("archive-message-added", { origin
= origin
, stanza
= clone_for_storage
, for_user
= store_user
, id
= id
});
342 log("debug", "Not archiving stanza: %s (prefs)", stanza
:top_tag());
346 local function c2s_message_handler(event
)
347 return message_handler(event
, true);
350 -- Filter out <stanza-id> before the message leaves the server to prevent privacy leak.
351 local function strip_stanza_id_after_other_events(event
)
352 event
.stanza
= strip_stanza_id(event
.stanza
, event
.origin
.username
);
355 module
:hook("pre-message/bare", strip_stanza_id_after_other_events
, -1);
356 module
:hook("pre-message/full", strip_stanza_id_after_other_events
, -1);
358 if cleanup_after
~= "never" then
359 local cleanup_storage
= module
:open_store("archive_cleanup");
360 local cleanup_map
= module
:open_store("archive_cleanup", "map");
363 local multipliers
= { d
= day
, w
= day
* 7, m
= 31 * day
, y
= 365.2425 * day
};
364 local n
, m
= cleanup_after
:lower():match("(%d+)%s*([dwmy]?)");
366 module
:log("error", "Could not parse archive_expires_after string %q", cleanup_after
);
370 cleanup_after
= tonumber(n
) * ( multipliers
[m
] or 1 );
372 module
:log("debug", "archive_expires_after = %d -- in seconds", cleanup_after
);
374 if not archive
.delete
then
375 module
:log("error", "archive_expires_after set but mod_%s does not support deleting", archive
._provided_by
);
379 -- For each day, store a set of users that have new messages. To expire
380 -- messages, we collect the union of sets of users from dates that fall
381 -- outside the cleanup range.
383 local last_date
= require
"util.cache".new(module
:get_option_number("archive_cleanup_date_cache_size", 1000));
384 function schedule_cleanup(username
, date)
385 date = date or datestamp();
386 if last_date
:get(username
) == date then return end
387 local ok
= cleanup_map
:set(date, username
, true);
389 last_date
:set(username
, date);
392 local cleanup_time
= module
:measure("cleanup", "times");
394 cleanup_runner
= require
"util.async".runner(function ()
395 local cleanup_done
= cleanup_time();
397 local cut_off
= datestamp(os
.time() - cleanup_after
);
398 for date in cleanup_storage
:users() do
399 if date <= cut_off
then
400 module
:log("debug", "Messages from %q should be expired", date);
401 local messages_this_day
= cleanup_storage
:get(date);
402 if messages_this_day
then
403 for user
in pairs(messages_this_day
) do
406 if date < cut_off
then
407 -- Messages from the same day as the cut-off might not have expired yet,
408 -- but all earlier will have, so clear storage for those days.
409 cleanup_storage
:set(date, nil);
414 local sum
, num_users
= 0, 0;
415 for user
in pairs(users
) do
416 local ok
, err
= archive
:delete(user
, { ["end"] = os
.time() - cleanup_after
; })
418 num_users
= num_users
+ 1;
419 sum
= sum
+ (tonumber(ok
) or 0);
422 module
:log("info", "Deleted %d expired messages for %d users", sum
, num_users
);
426 cleanup_task
= module
:add_timer(1, function ()
427 cleanup_runner
:run(true);
428 return cleanup_interval
;
431 module
:log("debug", "Archive expiry disabled");
432 -- Don't ask the backend to count the potentially unbounded number of items,
434 use_total
= module
:get_option_boolean("mam_include_total", false);
437 -- Stanzas sent by local clients
438 module
:hook("pre-message/bare", c2s_message_handler
, 0);
439 module
:hook("pre-message/full", c2s_message_handler
, 0);
440 -- Stanzas to local clients
441 module
:hook("message/bare", message_handler
, 0);
442 module
:hook("message/full", message_handler
, 0);
444 module
:hook("account-disco-info", function(event
)
445 (event
.reply
or event
.stanza
):tag("feature", {var
=xmlns_mam
}):up();
446 (event
.reply
or event
.stanza
):tag("feature", {var
=xmlns_st_id
}):up();