2 -- Copyright (C) 2008-2010 Matthew Wild
3 -- Copyright (C) 2008-2010 Waqas Hussain
5 -- This project is MIT/X11 licensed. Please see the
6 -- COPYING file in the source package for more information.
11 local new_xmpp_stream
= require
"util.xmppstream".new
;
12 local sm
= require
"core.sessionmanager";
13 local sm_destroy_session
= sm
.destroy_session
;
14 local new_uuid
= require
"util.uuid".generate
;
15 local core_process_stanza
= prosody
.core_process_stanza
;
16 local st
= require
"util.stanza";
17 local logger
= require
"util.logger";
18 local log = module
._log
;
19 local initialize_filters
= require
"util.filters".initialize
;
20 local math_min
= math
.min;
21 local tostring, type = tostring, type;
22 local traceback
= debug
.traceback
;
23 local runner
= require
"util.async".runner
;
24 local nameprep
= require
"util.encodings".stringprep
.nameprep
;
25 local cache
= require
"util.cache";
27 local xmlns_streams
= "http://etherx.jabber.org/streams";
28 local xmlns_xmpp_streams
= "urn:ietf:params:xml:ns:xmpp-streams";
29 local xmlns_bosh
= "http://jabber.org/protocol/httpbind"; -- (hard-coded into a literal in session.send)
31 local stream_callbacks
= {
32 stream_ns
= xmlns_bosh
, stream_tag
= "body", default_ns
= "jabber:client" };
34 -- These constants are implicitly assumed within the code, and cannot be changed
36 local BOSH_MAX_REQUESTS
= 2;
38 -- The number of seconds a BOSH session should remain open with no requests
39 local bosh_max_inactivity
= module
:get_option_number("bosh_max_inactivity", 60);
40 -- The minimum amount of time between requests with no payload
41 local bosh_max_polling
= module
:get_option_number("bosh_max_polling", 5);
42 -- The maximum amount of time that the server will hold onto a request before replying
43 -- (the client can set this to a lower value when it connects, if it chooses)
44 local bosh_max_wait
= module
:get_option_number("bosh_max_wait", 120);
46 local consider_bosh_secure
= module
:get_option_boolean("consider_bosh_secure");
47 local cross_domain
= module
:get_option("cross_domain_bosh", false);
49 if cross_domain
== true then cross_domain
= "*"; end
50 if type(cross_domain
) == "table" then cross_domain
= table.concat(cross_domain
, ", "); end
52 local t_insert
, t_remove
, t_concat
= table.insert
, table.remove, table.concat
;
54 -- All sessions, and sessions that have no requests open
55 local sessions
= module
:shared("sessions");
57 -- Used to respond to idle sessions (those with waiting requests)
58 function on_destroy_request(request
)
59 log("debug", "Request destroyed: %s", tostring(request
));
60 local session
= sessions
[request
.context
.sid
];
62 local requests
= session
.requests
;
63 for i
, r
in ipairs(requests
) do
65 t_remove(requests
, i
);
70 -- If this session now has no requests open, mark it as inactive
71 local max_inactive
= session
.bosh_max_inactive
;
72 if max_inactive
and #requests
== 0 then
73 if session
.inactive_timer
then
74 session
.inactive_timer
:stop();
76 session
.inactive_timer
= module
:add_timer(max_inactive
, check_inactive
, session
, request
.context
,
77 "BOSH client silent for over "..max_inactive
.." seconds");
78 (session
.log or log)("debug", "BOSH session marked as inactive (for %ds)", max_inactive
);
80 if session
.bosh_wait_timer
then
81 session
.bosh_wait_timer
:stop();
82 session
.bosh_wait_timer
= nil;
87 function check_inactive(now
, session
, context
, reason
) -- luacheck: ignore 212/now
88 if not session
.destroyed
then
89 sessions
[context
.sid
] = nil;
90 sm_destroy_session(session
, reason
);
94 local function set_cross_domain_headers(response
)
95 local headers
= response
.headers
;
96 headers
.access_control_allow_methods
= "GET, POST, OPTIONS";
97 headers
.access_control_allow_headers
= "Content-Type";
98 headers
.access_control_max_age
= "7200";
99 headers
.access_control_allow_origin
= cross_domain
;
103 function handle_OPTIONS(event
)
104 if cross_domain
and event
.request
.headers
.origin
then
105 set_cross_domain_headers(event
.response
);
110 function handle_POST(event
)
111 log("debug", "Handling new request %s: %s\n----------", tostring(event
.request
), tostring(event
.request
.body
));
113 local request
, response
= event
.request
, event
.response
;
114 response
.on_destroy
= on_destroy_request
;
115 local body
= request
.body
;
117 local context
= { request
= request
, response
= response
, notopen
= true };
118 local stream
= new_xmpp_stream(context
, stream_callbacks
);
119 response
.context
= context
;
121 local headers
= response
.headers
;
122 headers
.content_type
= "text/xml; charset=utf-8";
124 if cross_domain
and request
.headers
.origin
then
125 set_cross_domain_headers(response
);
128 -- stream:feed() calls the stream_callbacks, so all stanzas in
129 -- the body are processed in this next line before it returns.
130 -- In particular, the streamopened() stream callback is where
131 -- much of the session logic happens, because it's where we first
132 -- get to see the 'sid' of this request.
133 local ok
, err
= stream
:feed(body
);
135 module
:log("warn", "Error parsing BOSH payload; %s", err
)
136 local close_reply
= st
.stanza("body", { xmlns
= xmlns_bosh
, type = "terminate",
137 ["xmlns:stream"] = xmlns_streams
, condition
= "bad-request" });
138 return tostring(close_reply
);
141 -- Stanzas (if any) in the request have now been processed, and
142 -- we take care of the high-level BOSH logic here, including
143 -- giving a response or putting the request "on hold".
144 local session
= sessions
[context
.sid
];
146 -- Session was marked as inactive, since we have
147 -- a request open now, unmark it
148 if session
.inactive_timer
and #session
.requests
> 0 then
149 session
.inactive_timer
:stop();
150 session
.inactive_timer
= nil;
153 if session
.bosh_wait_timer
then
154 session
.bosh_wait_timer
:stop();
155 session
.bosh_wait_timer
= nil;
158 local r
= session
.requests
;
159 log("debug", "Session %s has %d out of %d requests open", context
.sid
, #r
, BOSH_HOLD
);
160 log("debug", "and there are %d things in the send_buffer:", #session
.send_buffer
);
161 if #r
> BOSH_HOLD
then
162 -- We are holding too many requests, send what's in the buffer,
163 log("debug", "We are holding too many requests, so...");
164 if #session
.send_buffer
> 0 then
165 log("debug", "...sending what is in the buffer")
166 session
.send(t_concat(session
.send_buffer
));
167 session
.send_buffer
= {};
169 -- or an empty response
170 log("debug", "...sending an empty response");
173 elseif #session
.send_buffer
> 0 then
174 log("debug", "Session has data in the send buffer, will send now..");
175 local resp
= t_concat(session
.send_buffer
);
176 session
.send_buffer
= {};
180 if not response
.finished
then
181 -- We're keeping this request open, to respond later
182 log("debug", "Have nothing to say, so leaving request unanswered for now");
185 if session
.bosh_terminate
then
186 session
.log("debug", "Closing session with %d requests open", #session
.requests
);
190 if session
.bosh_wait
and #session
.requests
> 0 then
191 session
.bosh_wait_timer
= module
:add_timer(session
.bosh_wait
, after_bosh_wait
, session
.requests
[1], session
)
194 return true; -- Inform http server we shall reply later
196 elseif response
.finished
or context
.ignore_request
then
197 if response
.finished
then
198 module
:log("debug", "Response finished");
200 if context
.ignore_request
then
201 module
:log("debug", "Ignoring this request");
203 -- A response has been sent already, or we're ignoring this request
204 -- (e.g. so a different instance of the module can handle it)
207 module
:log("warn", "Unable to associate request with a session (incomplete request?)");
208 local close_reply
= st
.stanza("body", { xmlns
= xmlns_bosh
, type = "terminate",
209 ["xmlns:stream"] = xmlns_streams
, condition
= "item-not-found" });
210 return tostring(close_reply
) .. "\n";
213 function after_bosh_wait(now
, request
, session
) -- luacheck: ignore 212
219 local function bosh_reset_stream(session
) session
.notopen
= true; end
221 local stream_xmlns_attr
= { xmlns
= "urn:ietf:params:xml:ns:xmpp-streams" };
222 local function bosh_close_stream(session
, reason
)
223 (session
.log or log)("info", "BOSH client disconnected: %s", tostring((reason
and reason
.condition
or reason
) or "session close"));
225 local close_reply
= st
.stanza("body", { xmlns
= xmlns_bosh
, type = "terminate",
226 ["xmlns:stream"] = xmlns_streams
});
230 close_reply
.attr
.condition
= "remote-stream-error";
231 if type(reason
) == "string" then -- assume stream error
232 close_reply
:tag("stream:error")
233 :tag(reason
, {xmlns
= xmlns_xmpp_streams
});
234 elseif type(reason
) == "table" then
235 if reason
.condition
then
236 close_reply
:tag("stream:error")
237 :tag(reason
.condition
, stream_xmlns_attr
):up();
239 close_reply
:tag("text", stream_xmlns_attr
):text(reason
.text
):up();
242 close_reply
:add_child(reason
.extra
);
244 elseif reason
.name
then -- a stanza
245 close_reply
= reason
;
248 log("info", "Disconnecting client, <stream:error> is: %s", tostring(close_reply
));
251 local response_body
= tostring(close_reply
);
252 for _
, held_request
in ipairs(session
.requests
) do
253 held_request
:send(response_body
);
255 sessions
[session
.sid
] = nil;
256 sm_destroy_session(session
);
259 local runner_callbacks
= { };
261 -- Handle the <body> tag in the request payload.
262 function stream_callbacks
.streamopened(context
, attr
)
263 local request
, response
= context
.request
, context
.response
;
264 local sid
, rid
= attr
.sid
, tonumber(attr
.rid
);
265 log("debug", "BOSH body open (sid: %s)", sid
or "<none>");
268 -- New session request
269 context
.notopen
= nil; -- Signals that we accept this opening tag
271 local to_host
= nameprep(attr
.to
);
272 local wait
= tonumber(attr
.wait
);
274 log("debug", "BOSH client tried to connect to invalid host: %s", tostring(attr
.to
));
275 local close_reply
= st
.stanza("body", { xmlns
= xmlns_bosh
, type = "terminate",
276 ["xmlns:stream"] = xmlns_streams
, condition
= "improper-addressing" });
277 response
:send(tostring(close_reply
));
280 if not rid
or (not wait
and attr
.wait
or wait
< 0 or wait
% 1 ~= 0) then
281 log("debug", "BOSH client sent invalid rid or wait attributes: rid=%s, wait=%s", tostring(attr
.rid
), tostring(attr
.wait
));
282 local close_reply
= st
.stanza("body", { xmlns
= xmlns_bosh
, type = "terminate",
283 ["xmlns:stream"] = xmlns_streams
, condition
= "bad-request" });
284 response
:send(tostring(close_reply
));
288 wait
= math_min(wait
, bosh_max_wait
);
293 type = "c2s_unauthed", conn
= request
.conn
, sid
= sid
, host
= attr
.to
,
294 rid
= rid
- 1, -- Hack for initial session setup, "previous" rid was $current_request - 1
295 bosh_version
= attr
.ver
, bosh_wait
= wait
, streamid
= sid
,
296 bosh_max_inactive
= bosh_max_inactivity
, bosh_responses
= cache
.new(BOSH_HOLD
+1):table();
297 requests
= { }, send_buffer
= {}, reset_stream
= bosh_reset_stream
,
298 close
= bosh_close_stream
, dispatch_stanza
= core_process_stanza
, notopen
= true,
299 log = logger
.init("bosh"..sid
), secure
= consider_bosh_secure
or request
.secure
,
302 sessions
[sid
] = session
;
304 session
.thread
= runner(function (stanza
)
305 session
:dispatch_stanza(stanza
);
306 end, runner_callbacks
, session
);
308 local filter
= initialize_filters(session
);
310 session
.log("debug", "BOSH session created for request from %s", session
.ip
);
311 log("info", "New BOSH session, assigned it sid '%s'", sid
);
313 module
:fire_event("bosh-session", { session
= session
, request
= request
});
315 -- Send creation response
316 local creating_session
= true;
318 local r
= session
.requests
;
319 function session
.send(s
)
320 -- We need to ensure that outgoing stanzas have the jabber:client xmlns
321 if s
.attr
and not s
.attr
.xmlns
then
323 s
.attr
.xmlns
= "jabber:client";
325 s
= filter("stanzas/out", s
);
326 --log("debug", "Sending BOSH data: %s", tostring(s));
327 if not s
then return true end
328 t_insert(session
.send_buffer
, tostring(s
));
330 local oldest_request
= r
[1];
331 if oldest_request
and not session
.bosh_processing
then
332 log("debug", "We have an open request, so sending on that");
333 local body_attr
= { xmlns
= "http://jabber.org/protocol/httpbind",
334 ["xmlns:stream"] = "http://etherx.jabber.org/streams";
335 type = session
.bosh_terminate
and "terminate" or nil;
338 if creating_session
then
339 creating_session
= nil;
340 body_attr
.requests
= tostring(BOSH_MAX_REQUESTS
);
341 body_attr
.hold
= tostring(BOSH_HOLD
);
342 body_attr
.inactivity
= tostring(bosh_max_inactivity
);
343 body_attr
.polling
= tostring(bosh_max_polling
);
344 body_attr
.wait
= tostring(session
.bosh_wait
);
345 body_attr
.authid
= sid
;
346 body_attr
.secure
= "true";
347 body_attr
.ver
= '1.6';
348 body_attr
.from
= session
.host
;
349 body_attr
["xmlns:xmpp"] = "urn:xmpp:xbosh";
350 body_attr
["xmpp:version"] = "1.0";
352 local response_xml
= st
.stanza("body", body_attr
):top_tag()..t_concat(session
.send_buffer
).."</body>";
353 session
.bosh_responses
[oldest_request
.context
.rid
] = response_xml
;
354 oldest_request
:send(response_xml
);
355 session
.send_buffer
= {};
362 local session
= sessions
[sid
];
365 log("info", "Client tried to use sid '%s' which we don't know about", sid
);
366 response
:send(tostring(st
.stanza("body", { xmlns
= xmlns_bosh
, type = "terminate", condition
= "item-not-found" })));
367 context
.notopen
= nil;
371 session
.conn
= request
.conn
;
374 local diff
= rid
- session
.rid
;
375 -- Diff should be 1 for a healthy request
376 session
.log("debug", "rid: %d, sess: %s, diff: %d", rid
, session
.rid
, diff
)
379 context
.notopen
= nil;
380 if diff
== 2 then -- Missed a request
381 -- Hold request, but don't process it (ouch!)
382 session
.log("debug", "rid skipped: %d, deferring this request", rid
-1)
383 context
.defer
= true;
384 session
.bosh_deferred
= { context
= context
, sid
= sid
, rid
= rid
, terminate
= attr
.type == "terminate" };
387 -- Set a marker to indicate that stanzas in this request should NOT be processed
388 -- (these stanzas will already be in the XML parser's buffer)
389 context
.ignore
= true;
390 if session
.bosh_responses
[rid
] then
391 -- Re-send past response, ignore stanzas in this request
392 session
.log("debug", "rid repeated within window, replaying old response");
393 response
:send(session
.bosh_responses
[rid
]);
395 elseif diff
== 0 then
396 session
.log("debug", "current rid repeated, ignoring stanzas");
397 t_insert(session
.requests
, response
);
401 -- Session broken, destroy it
402 session
.log("debug", "rid out of range: %d (diff %d)", rid
, diff
);
403 response
:send(tostring(st
.stanza("body", { xmlns
= xmlns_bosh
, type = "terminate", condition
= "item-not-found" })));
409 if attr
.type == "terminate" then
410 -- Client wants to end this session, which we'll do
411 -- after processing any stanzas in this request
412 session
.bosh_terminate
= true;
415 context
.notopen
= nil; -- Signals that we accept this opening tag
416 t_insert(session
.requests
, response
);
418 session
.bosh_processing
= true; -- Used to suppress replies until processing of this request is done
420 if session
.notopen
then
421 local features
= st
.stanza("stream:features");
422 module
:context(session
.host
):fire_event("stream-features", { origin
= session
, features
= features
});
423 session
.send(features
);
424 session
.notopen
= nil;
428 local function handleerr(err
) log("error", "Traceback[bosh]: %s", traceback(tostring(err
), 2)); end
430 function runner_callbacks
:error(err
) -- luacheck: ignore 212/self
431 return handleerr(err
);
434 function stream_callbacks
.handlestanza(context
, stanza
)
435 if context
.ignore
then return; end
436 log("debug", "BOSH stanza received: %s\n", stanza
:top_tag());
437 local session
= sessions
[context
.sid
];
439 if stanza
.attr
.xmlns
== xmlns_bosh
then
440 stanza
.attr
.xmlns
= nil;
442 if context
.defer
and session
.bosh_deferred
then
443 log("debug", "Deferring this stanza");
444 t_insert(session
.bosh_deferred
, stanza
);
446 stanza
= session
.filter("stanzas/in", stanza
);
447 session
.thread
:run(stanza
);
450 log("debug", "No session for this stanza! (sid: %s)", context
.sid
or "none!");
454 function stream_callbacks
.streamclosed(context
)
455 local session
= sessions
[context
.sid
];
457 if not context
.defer
and session
.bosh_deferred
then
458 -- Handle deferred stanzas now
459 local deferred_stanzas
= session
.bosh_deferred
;
460 local deferred_context
= deferred_stanzas
.context
;
461 session
.bosh_deferred
= nil;
462 log("debug", "Handling deferred stanzas from rid %d", deferred_stanzas
.rid
);
463 session
.rid
= deferred_stanzas
.rid
;
464 t_insert(session
.requests
, deferred_context
.response
);
465 for _
, stanza
in ipairs(deferred_stanzas
) do
466 stream_callbacks
.handlestanza(deferred_context
, stanza
);
468 if deferred_stanzas
.terminate
then
469 session
.bosh_terminate
= true;
472 session
.bosh_processing
= false;
473 if #session
.send_buffer
> 0 then
479 function stream_callbacks
.error(context
, error)
480 if not context
.sid
then
481 log("debug", "Error parsing BOSH request payload; %s", error);
482 local response
= context
.response
;
483 local close_reply
= st
.stanza("body", { xmlns
= xmlns_bosh
, type = "terminate",
484 ["xmlns:stream"] = xmlns_streams
, condition
= "bad-request" });
485 response
:send(tostring(close_reply
));
489 local session
= sessions
[context
.sid
];
490 (session
and session
.log or log)("warn", "Error parsing BOSH request payload; %s", error);
491 if error == "stream-error" then -- Remote stream error, we close normally
494 session
:close({ condition
= "bad-format", text
= "Error processing stream" });
498 local GET_response
= {
500 content_type
= "text/html";
502 body
= [[<html><body>
503 <p>It works! Now point your BOSH client to this URL to connect to Prosody.</p>
504 <p>For more information see <a href="https://prosody.im/doc/setting_up_bosh">Prosody: Setting up BOSH</a>.</p>
508 module
:depends("http");
509 module
:provides("http", {
510 default_path
= "/http-bind";
512 ["GET"] = GET_response
;
513 ["GET /"] = GET_response
;
514 ["OPTIONS"] = handle_OPTIONS
;
515 ["OPTIONS /"] = handle_OPTIONS
;
516 ["POST"] = handle_POST
;
517 ["POST /"] = handle_POST
;