2 -- Copyright (C) 2008-2010 Matthew Wild
3 -- Copyright (C) 2008-2010 Waqas Hussain
5 -- This project is MIT/X11 licensed. Please see the
6 -- COPYING file in the source package for more information.
11 local new_xmpp_stream
= require
"util.xmppstream".new
;
12 local sm
= require
"core.sessionmanager";
13 local sm_destroy_session
= sm
.destroy_session
;
14 local new_uuid
= require
"util.uuid".generate
;
15 local core_process_stanza
= prosody
.core_process_stanza
;
16 local st
= require
"util.stanza";
17 local logger
= require
"util.logger";
18 local log = module
._log
;
19 local initialize_filters
= require
"util.filters".initialize
;
20 local math_min
= math
.min;
21 local tostring, type = tostring, type;
22 local traceback
= debug
.traceback
;
23 local runner
= require
"util.async".runner
;
24 local nameprep
= require
"util.encodings".stringprep
.nameprep
;
25 local cache
= require
"util.cache";
27 local xmlns_streams
= "http://etherx.jabber.org/streams";
28 local xmlns_xmpp_streams
= "urn:ietf:params:xml:ns:xmpp-streams";
29 local xmlns_bosh
= "http://jabber.org/protocol/httpbind"; -- (hard-coded into a literal in session.send)
31 local stream_callbacks
= {
32 stream_ns
= xmlns_bosh
, stream_tag
= "body", default_ns
= "jabber:client" };
34 -- These constants are implicitly assumed within the code, and cannot be changed
36 local BOSH_MAX_REQUESTS
= 2;
38 -- The number of seconds a BOSH session should remain open with no requests
39 local bosh_max_inactivity
= module
:get_option_number("bosh_max_inactivity", 60);
40 -- The minimum amount of time between requests with no payload
41 local bosh_max_polling
= module
:get_option_number("bosh_max_polling", 5);
42 -- The maximum amount of time that the server will hold onto a request before replying
43 -- (the client can set this to a lower value when it connects, if it chooses)
44 local bosh_max_wait
= module
:get_option_number("bosh_max_wait", 120);
46 local consider_bosh_secure
= module
:get_option_boolean("consider_bosh_secure");
47 local cross_domain
= module
:get_option("cross_domain_bosh");
49 if cross_domain
~= nil then
50 module
:log("info", "The 'cross_domain_bosh' option has been deprecated");
53 local t_insert
, t_remove
, t_concat
= table.insert
, table.remove, table.concat
;
55 -- All sessions, and sessions that have no requests open
56 local sessions
= module
:shared("sessions");
58 local measure_active
= module
:measure("active_sessions", "amount");
59 local measure_inactive
= module
:measure("inactive_sessions", "amount");
60 local report_bad_host
= module
:measure("bad_host", "rate");
61 local report_bad_sid
= module
:measure("bad_sid", "rate");
62 local report_new_sid
= module
:measure("new_sid", "rate");
63 local report_timeout
= module
:measure("timeout", "rate");
65 module
:hook("stats-update", function ()
68 for _
, session
in pairs(sessions
) do
69 if #session
.requests
> 0 then
72 inactive
= inactive
+ 1;
75 measure_active(active
);
76 measure_inactive(inactive
);
79 -- Used to respond to idle sessions (those with waiting requests)
80 function on_destroy_request(request
)
81 log("debug", "Request destroyed: %s", request
);
82 local session
= sessions
[request
.context
.sid
];
84 local requests
= session
.requests
;
85 for i
, r
in ipairs(requests
) do
87 t_remove(requests
, i
);
92 -- If this session now has no requests open, mark it as inactive
93 local max_inactive
= session
.bosh_max_inactive
;
94 if max_inactive
and #requests
== 0 then
95 if session
.inactive_timer
then
96 session
.inactive_timer
:stop();
98 session
.inactive_timer
= module
:add_timer(max_inactive
, session_timeout
, session
, request
.context
,
99 "BOSH client silent for over "..max_inactive
.." seconds");
100 (session
.log or log)("debug", "BOSH session marked as inactive (for %ds)", max_inactive
);
102 if session
.bosh_wait_timer
then
103 session
.bosh_wait_timer
:stop();
104 session
.bosh_wait_timer
= nil;
109 function session_timeout(now
, session
, context
, reason
) -- luacheck: ignore 212/now
110 if not session
.destroyed
then
112 sessions
[context
.sid
] = nil;
113 sm_destroy_session(session
, reason
);
117 function handle_POST(event
)
118 log("debug", "Handling new request %s: %s\n----------", event
.request
, event
.request
.body
);
120 local request
, response
= event
.request
, event
.response
;
121 response
.on_destroy
= on_destroy_request
;
122 local body
= request
.body
;
124 local context
= { request
= request
, response
= response
, notopen
= true };
125 local stream
= new_xmpp_stream(context
, stream_callbacks
);
126 response
.context
= context
;
128 local headers
= response
.headers
;
129 headers
.content_type
= "text/xml; charset=utf-8";
131 -- stream:feed() calls the stream_callbacks, so all stanzas in
132 -- the body are processed in this next line before it returns.
133 -- In particular, the streamopened() stream callback is where
134 -- much of the session logic happens, because it's where we first
135 -- get to see the 'sid' of this request.
136 local ok
, err
= stream
:feed(body
);
138 module
:log("warn", "Error parsing BOSH payload; %s", err
)
139 local close_reply
= st
.stanza("body", { xmlns
= xmlns_bosh
, type = "terminate",
140 ["xmlns:stream"] = xmlns_streams
, condition
= "bad-request" });
141 return tostring(close_reply
);
144 -- Stanzas (if any) in the request have now been processed, and
145 -- we take care of the high-level BOSH logic here, including
146 -- giving a response or putting the request "on hold".
147 local session
= sessions
[context
.sid
];
149 -- Session was marked as inactive, since we have
150 -- a request open now, unmark it
151 if session
.inactive_timer
and #session
.requests
> 0 then
152 session
.inactive_timer
:stop();
153 session
.inactive_timer
= nil;
156 if session
.bosh_wait_timer
then
157 session
.bosh_wait_timer
:stop();
158 session
.bosh_wait_timer
= nil;
161 local r
= session
.requests
;
162 log("debug", "Session %s has %d out of %d requests open", context
.sid
, #r
, BOSH_HOLD
);
163 log("debug", "and there are %d things in the send_buffer:", #session
.send_buffer
);
164 if #r
> BOSH_HOLD
then
165 -- We are holding too many requests, send what's in the buffer,
166 log("debug", "We are holding too many requests, so...");
167 if #session
.send_buffer
> 0 then
168 log("debug", "...sending what is in the buffer")
169 session
.send(t_concat(session
.send_buffer
));
170 session
.send_buffer
= {};
172 -- or an empty response
173 log("debug", "...sending an empty response");
176 elseif #session
.send_buffer
> 0 then
177 log("debug", "Session has data in the send buffer, will send now..");
178 local resp
= t_concat(session
.send_buffer
);
179 session
.send_buffer
= {};
183 if not response
.finished
then
184 -- We're keeping this request open, to respond later
185 log("debug", "Have nothing to say, so leaving request unanswered for now");
188 if session
.bosh_terminate
then
189 session
.log("debug", "Closing session with %d requests open", #session
.requests
);
193 if session
.bosh_wait
and #session
.requests
> 0 then
194 session
.bosh_wait_timer
= module
:add_timer(session
.bosh_wait
, after_bosh_wait
, session
.requests
[1], session
)
197 return true; -- Inform http server we shall reply later
199 elseif response
.finished
or context
.ignore_request
then
200 if response
.finished
then
201 module
:log("debug", "Response finished");
203 if context
.ignore_request
then
204 module
:log("debug", "Ignoring this request");
206 -- A response has been sent already, or we're ignoring this request
207 -- (e.g. so a different instance of the module can handle it)
210 module
:log("warn", "Unable to associate request with a session (incomplete request?)");
212 local close_reply
= st
.stanza("body", { xmlns
= xmlns_bosh
, type = "terminate",
213 ["xmlns:stream"] = xmlns_streams
, condition
= "item-not-found" });
214 return tostring(close_reply
) .. "\n";
217 function after_bosh_wait(now
, request
, session
) -- luacheck: ignore 212
223 local function bosh_reset_stream(session
) session
.notopen
= true; end
225 local stream_xmlns_attr
= { xmlns
= "urn:ietf:params:xml:ns:xmpp-streams" };
226 local function bosh_close_stream(session
, reason
)
227 (session
.log or log)("info", "BOSH client disconnected: %s", (reason
and reason
.condition
or reason
) or "session close");
229 local close_reply
= st
.stanza("body", { xmlns
= xmlns_bosh
, type = "terminate",
230 ["xmlns:stream"] = xmlns_streams
});
234 close_reply
.attr
.condition
= "remote-stream-error";
235 if type(reason
) == "string" then -- assume stream error
236 close_reply
:tag("stream:error")
237 :tag(reason
, {xmlns
= xmlns_xmpp_streams
});
238 elseif type(reason
) == "table" then
239 if reason
.condition
then
240 close_reply
:tag("stream:error")
241 :tag(reason
.condition
, stream_xmlns_attr
):up();
243 close_reply
:tag("text", stream_xmlns_attr
):text(reason
.text
):up();
246 close_reply
:add_child(reason
.extra
);
248 elseif reason
.name
then -- a stanza
249 close_reply
= reason
;
252 log("info", "Disconnecting client, <stream:error> is: %s", close_reply
);
255 local response_body
= tostring(close_reply
);
256 for _
, held_request
in ipairs(session
.requests
) do
257 held_request
:send(response_body
);
259 sessions
[session
.sid
] = nil;
260 sm_destroy_session(session
);
263 local runner_callbacks
= { };
265 -- Handle the <body> tag in the request payload.
266 function stream_callbacks
.streamopened(context
, attr
)
267 local request
, response
= context
.request
, context
.response
;
268 local sid
, rid
= attr
.sid
, tonumber(attr
.rid
);
269 log("debug", "BOSH body open (sid: %s)", sid
or "<none>");
272 -- New session request
273 context
.notopen
= nil; -- Signals that we accept this opening tag
275 local to_host
= nameprep(attr
.to
);
276 local wait
= tonumber(attr
.wait
);
278 log("debug", "BOSH client tried to connect to invalid host: %s", attr
.to
);
280 local close_reply
= st
.stanza("body", { xmlns
= xmlns_bosh
, type = "terminate",
281 ["xmlns:stream"] = xmlns_streams
, condition
= "improper-addressing" });
282 response
:send(tostring(close_reply
));
285 if not rid
or (not attr
.wait
or not wait
or wait
< 0 or wait
% 1 ~= 0) then
286 log("debug", "BOSH client sent invalid rid or wait attributes: rid=%s, wait=%s", attr
.rid
, attr
.wait
);
287 local close_reply
= st
.stanza("body", { xmlns
= xmlns_bosh
, type = "terminate",
288 ["xmlns:stream"] = xmlns_streams
, condition
= "bad-request" });
289 response
:send(tostring(close_reply
));
293 wait
= math_min(wait
, bosh_max_wait
);
298 type = "c2s_unauthed", conn
= request
.conn
, sid
= sid
, host
= attr
.to
,
299 rid
= rid
- 1, -- Hack for initial session setup, "previous" rid was $current_request - 1
300 bosh_version
= attr
.ver
, bosh_wait
= wait
, streamid
= sid
,
301 bosh_max_inactive
= bosh_max_inactivity
, bosh_responses
= cache
.new(BOSH_HOLD
+1):table();
302 requests
= { }, send_buffer
= {}, reset_stream
= bosh_reset_stream
,
303 close
= bosh_close_stream
, dispatch_stanza
= core_process_stanza
, notopen
= true,
304 log = logger
.init("bosh"..sid
), secure
= consider_bosh_secure
or request
.secure
,
307 sessions
[sid
] = session
;
309 session
.thread
= runner(function (stanza
)
310 session
:dispatch_stanza(stanza
);
311 end, runner_callbacks
, session
);
313 local filter
= initialize_filters(session
);
315 session
.log("debug", "BOSH session created for request from %s", session
.ip
);
316 log("info", "New BOSH session, assigned it sid '%s'", sid
);
319 module
:fire_event("bosh-session", { session
= session
, request
= request
});
321 -- Send creation response
322 local creating_session
= true;
324 local r
= session
.requests
;
325 function session
.send(s
)
326 -- We need to ensure that outgoing stanzas have the jabber:client xmlns
327 if s
.attr
and not s
.attr
.xmlns
then
329 s
.attr
.xmlns
= "jabber:client";
331 s
= filter("stanzas/out", s
);
332 --log("debug", "Sending BOSH data: %s", s);
333 if not s
then return true end
334 t_insert(session
.send_buffer
, tostring(s
));
336 local oldest_request
= r
[1];
337 if oldest_request
and not session
.bosh_processing
then
338 log("debug", "We have an open request, so sending on that");
339 local body_attr
= { xmlns
= "http://jabber.org/protocol/httpbind",
340 ["xmlns:stream"] = "http://etherx.jabber.org/streams";
341 type = session
.bosh_terminate
and "terminate" or nil;
344 if creating_session
then
345 creating_session
= nil;
346 body_attr
.requests
= tostring(BOSH_MAX_REQUESTS
);
347 body_attr
.hold
= tostring(BOSH_HOLD
);
348 body_attr
.inactivity
= tostring(bosh_max_inactivity
);
349 body_attr
.polling
= tostring(bosh_max_polling
);
350 body_attr
.wait
= tostring(session
.bosh_wait
);
351 body_attr
.authid
= sid
;
352 body_attr
.secure
= "true";
353 body_attr
.ver
= '1.6';
354 body_attr
.from
= session
.host
;
355 body_attr
["xmlns:xmpp"] = "urn:xmpp:xbosh";
356 body_attr
["xmpp:version"] = "1.0";
358 local response_xml
= st
.stanza("body", body_attr
):top_tag()..t_concat(session
.send_buffer
).."</body>";
359 session
.bosh_responses
[oldest_request
.context
.rid
] = response_xml
;
360 oldest_request
:send(response_xml
);
361 session
.send_buffer
= {};
368 local session
= sessions
[sid
];
371 log("info", "Client tried to use sid '%s' which we don't know about", sid
);
373 response
:send(tostring(st
.stanza("body", { xmlns
= xmlns_bosh
, type = "terminate", condition
= "item-not-found" })));
374 context
.notopen
= nil;
378 session
.conn
= request
.conn
;
381 local diff
= rid
- session
.rid
;
382 -- Diff should be 1 for a healthy request
383 session
.log("debug", "rid: %d, sess: %s, diff: %d", rid
, session
.rid
, diff
)
386 context
.notopen
= nil;
387 if diff
== 2 then -- Missed a request
388 -- Hold request, but don't process it (ouch!)
389 session
.log("debug", "rid skipped: %d, deferring this request", rid
-1)
390 context
.defer
= true;
391 session
.bosh_deferred
= { context
= context
, sid
= sid
, rid
= rid
, terminate
= attr
.type == "terminate" };
394 -- Set a marker to indicate that stanzas in this request should NOT be processed
395 -- (these stanzas will already be in the XML parser's buffer)
396 context
.ignore
= true;
397 if session
.bosh_responses
[rid
] then
398 -- Re-send past response, ignore stanzas in this request
399 session
.log("debug", "rid repeated within window, replaying old response");
400 response
:send(session
.bosh_responses
[rid
]);
402 elseif diff
== 0 then
403 session
.log("debug", "current rid repeated, ignoring stanzas");
404 t_insert(session
.requests
, response
);
408 -- Session broken, destroy it
409 session
.log("debug", "rid out of range: %d (diff %d)", rid
, diff
);
410 response
:send(tostring(st
.stanza("body", { xmlns
= xmlns_bosh
, type = "terminate", condition
= "item-not-found" })));
416 if attr
.type == "terminate" then
417 -- Client wants to end this session, which we'll do
418 -- after processing any stanzas in this request
419 session
.bosh_terminate
= true;
422 context
.notopen
= nil; -- Signals that we accept this opening tag
423 t_insert(session
.requests
, response
);
425 session
.bosh_processing
= true; -- Used to suppress replies until processing of this request is done
427 if session
.notopen
then
428 local features
= st
.stanza("stream:features");
429 module
:context(session
.host
):fire_event("stream-features", { origin
= session
, features
= features
});
430 session
.send(features
);
431 session
.notopen
= nil;
435 local function handleerr(err
) log("error", "Traceback[bosh]: %s", traceback(err
, 2)); end
437 function runner_callbacks
:error(err
) -- luacheck: ignore 212/self
438 return handleerr(err
);
441 function stream_callbacks
.handlestanza(context
, stanza
)
442 if context
.ignore
then return; end
443 log("debug", "BOSH stanza received: %s\n", stanza
:top_tag());
444 local session
= sessions
[context
.sid
];
446 if stanza
.attr
.xmlns
== xmlns_bosh
then
447 stanza
.attr
.xmlns
= nil;
449 if context
.defer
and session
.bosh_deferred
then
450 log("debug", "Deferring this stanza");
451 t_insert(session
.bosh_deferred
, stanza
);
453 stanza
= session
.filter("stanzas/in", stanza
);
454 session
.thread
:run(stanza
);
457 log("debug", "No session for this stanza! (sid: %s)", context
.sid
or "none!");
461 function stream_callbacks
.streamclosed(context
)
462 local session
= sessions
[context
.sid
];
464 if not context
.defer
and session
.bosh_deferred
then
465 -- Handle deferred stanzas now
466 local deferred_stanzas
= session
.bosh_deferred
;
467 local deferred_context
= deferred_stanzas
.context
;
468 session
.bosh_deferred
= nil;
469 log("debug", "Handling deferred stanzas from rid %d", deferred_stanzas
.rid
);
470 session
.rid
= deferred_stanzas
.rid
;
471 t_insert(session
.requests
, deferred_context
.response
);
472 for _
, stanza
in ipairs(deferred_stanzas
) do
473 stream_callbacks
.handlestanza(deferred_context
, stanza
);
475 if deferred_stanzas
.terminate
then
476 session
.bosh_terminate
= true;
479 session
.bosh_processing
= false;
480 if #session
.send_buffer
> 0 then
486 function stream_callbacks
.error(context
, error)
487 if not context
.sid
then
488 log("debug", "Error parsing BOSH request payload; %s", error);
489 local response
= context
.response
;
490 local close_reply
= st
.stanza("body", { xmlns
= xmlns_bosh
, type = "terminate",
491 ["xmlns:stream"] = xmlns_streams
, condition
= "bad-request" });
492 response
:send(tostring(close_reply
));
496 local session
= sessions
[context
.sid
];
497 (session
and session
.log or log)("warn", "Error parsing BOSH request payload; %s", error);
498 if error == "stream-error" then -- Remote stream error, we close normally
501 session
:close({ condition
= "bad-format", text
= "Error processing stream" });
505 local GET_response
= {
507 content_type
= "text/html";
509 body
= [[<html><body>
510 <p>It works! Now point your BOSH client to this URL to connect to Prosody.</p>
511 <p>For more information see <a href="https://prosody.im/doc/setting_up_bosh">Prosody: Setting up BOSH</a>.</p>
515 module
:depends("http");
516 module
:provides("http", {
517 default_path
= "/http-bind";
519 ["GET"] = GET_response
;
520 ["GET /"] = GET_response
;
521 ["POST"] = handle_POST
;
522 ["POST /"] = handle_POST
;