1 -- Copyright (C) 2016-2018 Kim Alvefur
3 -- This project is MIT/X11 licensed. Please see the
4 -- COPYING file in the source package for more information.
9 local jid
= require
"util.jid";
10 local st
= require
"util.stanza";
11 local dt
= require
"util.datetime";
12 local filters
= require
"util.filters";
14 local queue_size
= module
:get_option_number("csi_queue_size", 256);
16 module
:hook("csi-is-stanza-important", function (event
)
17 local stanza
= event
.stanza
;
18 if not st
.is_stanza(stanza
) then
21 local st_name
= stanza
.name
;
22 if not st_name
then return false; end
23 local st_type
= stanza
.attr
.type;
24 if st_name
== "presence" then
25 if st_type
== nil or st_type
== "unavailable" then
29 elseif st_name
== "message" then
30 if st_type
== "headline" then
33 if stanza
:get_child("sent", "urn:xmpp:carbons:2") then
36 local forwarded
= stanza
:find("{urn:xmpp:carbons:2}received/{urn:xmpp:forward:0}/{jabber:client}message");
40 if stanza
:get_child("body") then
43 if stanza
:get_child("subject") then
46 if stanza
:get_child("encryption", "urn:xmpp:eme:0") then
54 local function with_timestamp(stanza
, from
)
55 if st
.is_stanza(stanza
) and stanza
.attr
.xmlns
== nil and stanza
.name
~= "iq" then
56 stanza
= st
.clone(stanza
);
57 stanza
:add_direct_child(st
.stanza("delay", {xmlns
= "urn:xmpp:delay", from
= from
, stamp
= dt
.datetime()}));
62 local function manage_buffer(stanza
, session
)
63 local ctr
= session
.csi_counter
or 0;
64 if ctr
>= queue_size
then
65 session
.log("debug", "Queue size limit hit, flushing buffer (queue size is %d)", session
.csi_counter
);
66 session
.conn
:resume_writes();
67 elseif module
:fire_event("csi-is-stanza-important", { stanza
= stanza
, session
= session
}) then
68 session
.log("debug", "Important stanza, flushing buffer (queue size is %d)", session
.csi_counter
);
69 session
.conn
:resume_writes();
71 stanza
= with_timestamp(stanza
, jid
.join(session
.username
, session
.host
))
73 session
.csi_counter
= ctr
+ 1;
77 local function flush_buffer(data
, session
)
78 session
.log("debug", "Client sent something, flushing buffer once (queue size is %d)", session
.csi_counter
);
79 session
.conn
:resume_writes();
83 function enable_optimizations(session
)
84 if session
.conn
and session
.conn
and session
.conn
.pause_writes
then
85 session
.conn
:pause_writes();
86 filters
.add_filter(session
, "stanzas/out", manage_buffer
);
87 filters
.add_filter(session
, "bytes/in", flush_buffer
);
89 session
.log("warn", "Session connection does not support write pausing");
93 function disable_optimizations(session
)
94 if session
.conn
and session
.conn
and session
.conn
.resume_writes
then
95 filters
.remove_filter(session
, "stanzas/out", manage_buffer
);
96 filters
.remove_filter(session
, "bytes/in", flush_buffer
);
97 session
.conn
:resume_writes();
101 module
:hook("csi-client-inactive", function (event
)
102 local session
= event
.origin
;
103 enable_optimizations(session
);
106 module
:hook("csi-client-active", function (event
)
107 local session
= event
.origin
;
108 disable_optimizations(session
);
111 module
:hook("pre-resource-unbind", function (event
)
112 local session
= event
.session
;
113 disable_optimizations(session
);
116 module
:hook("c2s-ondrain", function (event
)
117 local session
= event
.session
;
118 if session
.state
== "inactive" and session
.conn
and session
.conn
and session
.conn
.pause_writes
then
119 session
.conn
:pause_writes();
120 session
.log("debug", "Buffer flushed, resuming inactive mode (queue size was %d)", session
.csi_counter
);
121 session
.csi_counter
= 0;
125 function module
.load()
126 for _
, user_session
in pairs(prosody
.hosts
[module
.host
].sessions
) do
127 for _
, session
in pairs(user_session
.sessions
) do
128 if session
.state
== "inactive" then
129 enable_optimizations(session
);
135 function module
.unload()
136 for _
, user_session
in pairs(prosody
.hosts
[module
.host
].sessions
) do
137 for _
, session
in pairs(user_session
.sessions
) do
138 if session
.state
== "inactive" then
139 disable_optimizations(session
);