1 # Copyright 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 """A bare-bones and non-compliant XMPP server.
7 Just enough of the protocol is implemented to get it to work with
8 Chrome's sync notification system.
16 from xml
.dom
import minidom
18 # pychecker complains about the use of fileno(), which is implemented
19 # by asyncore by forwarding to an internal object via __getattr__.
20 __pychecker__
= 'no-classattr'
23 class Error(Exception):
24 """Error class for this module."""
28 class UnexpectedXml(Error
):
29 """Raised when an unexpected XML element has been encountered."""
31 def __init__(self
, xml_element
):
32 xml_text
= xml_element
.toxml()
33 Error
.__init
__(self
, 'Unexpected XML element', xml_text
)
36 def ParseXml(xml_string
):
37 """Parses the given string as XML and returns a minidom element
40 dom
= minidom
.parseString(xml_string
)
42 # minidom handles xmlns specially, but there's a bug where it sets
43 # the attribute value to None, which causes toxml() or toprettyxml()
45 def FixMinidomXmlnsBug(xml_element
):
46 if xml_element
.getAttribute('xmlns') is None:
47 xml_element
.setAttribute('xmlns', '')
49 def ApplyToAllDescendantElements(xml_element
, fn
):
51 for node
in xml_element
.childNodes
:
52 if node
.nodeType
== node
.ELEMENT_NODE
:
53 ApplyToAllDescendantElements(node
, fn
)
55 root
= dom
.documentElement
56 ApplyToAllDescendantElements(root
, FixMinidomXmlnsBug
)
61 """Returns a deep copy of the given XML element.
64 xml: The XML element, which should be something returned from
65 ParseXml() (i.e., a root element).
67 return xml
.ownerDocument
.cloneNode(True).documentElement
70 class StanzaParser(object):
71 """A hacky incremental XML parser.
73 StanzaParser consumes data incrementally via FeedString() and feeds
74 its delegate complete parsed stanzas (i.e., XML documents) via
75 FeedStanza(). Any stanzas passed to FeedStanza() are unlinked after
80 class MyClass(object):
82 def __init__(self, ...):
84 self._parser = StanzaParser(self)
87 def SomeFunction(self, ...):
89 self._parser.FeedString(some_data)
92 def FeedStanza(self, stanza):
94 print stanza.toprettyxml()
98 # NOTE(akalin): The following regexps are naive, but necessary since
99 # none of the existing Python 2.4/2.5 XML libraries support
100 # incremental parsing. This works well enough for our purposes.
102 # The regexps below assume that any present XML element starts at
103 # the beginning of the string, but there may be trailing whitespace.
105 # Matches an opening stream tag (e.g., '<stream:stream foo="bar">')
106 # (assumes that the stream XML namespace is defined in the tag).
107 _stream_re
= re
.compile(r
'^(<stream:stream [^>]*>)\s*')
109 # Matches an empty element tag (e.g., '<foo bar="baz"/>').
110 _empty_element_re
= re
.compile(r
'^(<[^>]*/>)\s*')
112 # Matches a non-empty element (e.g., '<foo bar="baz">quux</foo>').
113 # Does *not* handle nested elements.
114 _non_empty_element_re
= re
.compile(r
'^(<([^ >]*)[^>]*>.*?</\2>)\s*')
116 # The closing tag for a stream tag. We have to insert this
117 # ourselves since all XML stanzas are children of the stream tag,
118 # which is never closed until the connection is closed.
119 _stream_suffix
= '</stream:stream>'
121 def __init__(self
, delegate
):
123 self
._delegate
= delegate
125 def FeedString(self
, data
):
126 """Consumes the given string data, possibly feeding one or more
127 stanzas to the delegate.
130 while (self
._ProcessBuffer
(self
._stream
_re
, self
._stream
_suffix
) or
131 self
._ProcessBuffer
(self
._empty
_element
_re
) or
132 self
._ProcessBuffer
(self
._non
_empty
_element
_re
)):
135 def _ProcessBuffer(self
, regexp
, xml_suffix
=''):
136 """If the buffer matches the given regexp, removes the match from
137 the buffer, appends the given suffix, parses it, and feeds it to
141 Whether or not the buffer matched the given regexp.
143 results
= regexp
.match(self
._buffer
)
146 xml_text
= self
._buffer
[:results
.end()] + xml_suffix
147 self
._buffer
= self
._buffer
[results
.end():]
148 stanza
= ParseXml(xml_text
)
149 self
._delegate
.FeedStanza(stanza
)
150 # Needed because stanza may have cycles.
156 """Simple struct for an XMPP jid (essentially an e-mail address with
157 an optional resource string).
160 def __init__(self
, username
, domain
, resource
=''):
161 self
.username
= username
163 self
.resource
= resource
166 jid_str
= "%s@%s" % (self
.username
, self
.domain
)
168 jid_str
+= '/' + self
.resource
171 def GetBareJid(self
):
172 return Jid(self
.username
, self
.domain
)
175 class IdGenerator(object):
176 """Simple class to generate unique IDs for XMPP messages."""
178 def __init__(self
, prefix
):
179 self
._prefix
= prefix
183 next_id
= "%s.%s" % (self
._prefix
, self
._id
)
188 class HandshakeTask(object):
189 """Class to handle the initial handshake with a connected XMPP
193 # The handshake states in order.
194 (_INITIAL_STREAM_NEEDED
,
199 _FINISHED
) = range(6)
201 # Used when in the _INITIAL_STREAM_NEEDED and _AUTH_STREAM_NEEDED
202 # states. Not an XML object as it's only the opening tag.
204 # The from and id attributes are filled in later.
206 '<stream:stream from="%s" id="%s" '
207 'version="1.0" xmlns:stream="http://etherx.jabber.org/streams" '
208 'xmlns="jabber:client">')
210 # Used when in the _INITIAL_STREAM_NEEDED state.
211 _AUTH_STANZA
= ParseXml(
212 '<stream:features xmlns:stream="http://etherx.jabber.org/streams">'
213 ' <mechanisms xmlns="urn:ietf:params:xml:ns:xmpp-sasl">'
214 ' <mechanism>PLAIN</mechanism>'
215 ' <mechanism>X-GOOGLE-TOKEN</mechanism>'
216 ' <mechanism>X-OAUTH2</mechanism>'
218 '</stream:features>')
220 # Used when in the _AUTH_NEEDED state.
221 _AUTH_SUCCESS_STANZA
= ParseXml(
222 '<success xmlns="urn:ietf:params:xml:ns:xmpp-sasl"/>')
224 # Used when in the _AUTH_NEEDED state.
225 _AUTH_FAILURE_STANZA
= ParseXml(
226 '<failure xmlns="urn:ietf:params:xml:ns:xmpp-sasl"/>')
228 # Used when in the _AUTH_STREAM_NEEDED state.
229 _BIND_STANZA
= ParseXml(
230 '<stream:features xmlns:stream="http://etherx.jabber.org/streams">'
231 ' <bind xmlns="urn:ietf:params:xml:ns:xmpp-bind"/>'
232 ' <session xmlns="urn:ietf:params:xml:ns:xmpp-session"/>'
233 '</stream:features>')
235 # Used when in the _BIND_NEEDED state.
237 # The id and jid attributes are filled in later.
238 _BIND_RESULT_STANZA
= ParseXml(
239 '<iq id="" type="result">'
240 ' <bind xmlns="urn:ietf:params:xml:ns:xmpp-bind">'
245 # Used when in the _SESSION_NEEDED state.
247 # The id attribute is filled in later.
248 _IQ_RESPONSE_STANZA
= ParseXml('<iq id="" type="result"/>')
250 def __init__(self
, connection
, resource_prefix
, authenticated
):
251 self
._connection
= connection
252 self
._id
_generator
= IdGenerator(resource_prefix
)
256 self
._authenticated
= authenticated
257 self
._resource
_prefix
= resource_prefix
258 self
._state
= self
._INITIAL
_STREAM
_NEEDED
260 def FeedStanza(self
, stanza
):
261 """Inspects the given stanza and changes the handshake state if needed.
263 Called when a stanza is received from the client. Inspects the
264 stanza to make sure it has the expected attributes given the
265 current state, advances the state if needed, and sends a reply to
266 the client if needed.
268 def ExpectStanza(stanza
, name
):
269 if stanza
.tagName
!= name
:
270 raise UnexpectedXml(stanza
)
272 def ExpectIq(stanza
, type, name
):
273 ExpectStanza(stanza
, 'iq')
274 if (stanza
.getAttribute('type') != type or
275 stanza
.firstChild
.tagName
!= name
):
276 raise UnexpectedXml(stanza
)
278 def GetStanzaId(stanza
):
279 return stanza
.getAttribute('id')
281 def HandleStream(stanza
):
282 ExpectStanza(stanza
, 'stream:stream')
283 domain
= stanza
.getAttribute('to')
285 self
._domain
= domain
288 def SendStreamData():
289 next_id
= self
._id
_generator
.GetNextId()
290 stream_data
= self
._STREAM
_DATA
% (self
._domain
, next_id
)
291 self
._connection
.SendData(stream_data
)
293 def GetUserDomain(stanza
):
294 encoded_username_password
= stanza
.firstChild
.data
295 username_password
= base64
.b64decode(encoded_username_password
)
296 (_
, username_domain
, _
) = username_password
.split('\0')
297 # The domain may be omitted.
299 # If we were using python 2.5, we'd be able to do:
301 # username, _, domain = username_domain.partition('@')
303 # domain = self._domain
304 at_pos
= username_domain
.find('@')
306 username
= username_domain
[:at_pos
]
307 domain
= username_domain
[at_pos
+1:]
309 username
= username_domain
310 domain
= self
._domain
311 return (username
, domain
)
314 self
._state
= self
._FINISHED
315 self
._connection
.HandshakeDone(self
._jid
)
317 if self
._state
== self
._INITIAL
_STREAM
_NEEDED
:
319 self
._connection
.SendStanza(self
._AUTH
_STANZA
, False)
320 self
._state
= self
._AUTH
_NEEDED
322 elif self
._state
== self
._AUTH
_NEEDED
:
323 ExpectStanza(stanza
, 'auth')
324 (self
._username
, self
._domain
) = GetUserDomain(stanza
)
325 if self
._authenticated
:
326 self
._connection
.SendStanza(self
._AUTH
_SUCCESS
_STANZA
, False)
327 self
._state
= self
._AUTH
_STREAM
_NEEDED
329 self
._connection
.SendStanza(self
._AUTH
_FAILURE
_STANZA
, False)
332 elif self
._state
== self
._AUTH
_STREAM
_NEEDED
:
334 self
._connection
.SendStanza(self
._BIND
_STANZA
, False)
335 self
._state
= self
._BIND
_NEEDED
337 elif self
._state
== self
._BIND
_NEEDED
:
338 ExpectIq(stanza
, 'set', 'bind')
339 stanza_id
= GetStanzaId(stanza
)
340 resource_element
= stanza
.getElementsByTagName('resource')[0]
341 resource
= resource_element
.firstChild
.data
342 full_resource
= '%s.%s' % (self
._resource
_prefix
, resource
)
343 response
= CloneXml(self
._BIND
_RESULT
_STANZA
)
344 response
.setAttribute('id', stanza_id
)
345 self
._jid
= Jid(self
._username
, self
._domain
, full_resource
)
346 jid_text
= response
.parentNode
.createTextNode(str(self
._jid
))
347 response
.getElementsByTagName('jid')[0].appendChild(jid_text
)
348 self
._connection
.SendStanza(response
)
349 self
._state
= self
._SESSION
_NEEDED
351 elif self
._state
== self
._SESSION
_NEEDED
:
352 ExpectIq(stanza
, 'set', 'session')
353 stanza_id
= GetStanzaId(stanza
)
354 xml
= CloneXml(self
._IQ
_RESPONSE
_STANZA
)
355 xml
.setAttribute('id', stanza_id
)
356 self
._connection
.SendStanza(xml
)
360 def AddrString(addr
):
361 return '%s:%d' % addr
364 class XmppConnection(asynchat
.async_chat
):
365 """A single XMPP client connection.
367 This class handles the connection to a single XMPP client (via a
368 socket). It does the XMPP handshake and also implements the (old)
369 Google notification protocol.
372 # Used for acknowledgements to the client.
374 # The from and id attributes are filled in later.
375 _IQ_RESPONSE_STANZA
= ParseXml('<iq from="" id="" type="result"/>')
377 def __init__(self
, sock
, socket_map
, delegate
, addr
, authenticated
):
378 """Starts up the xmpp connection.
381 sock: The socket to the client.
382 socket_map: A map from sockets to their owning objects.
383 delegate: The delegate, which is notified when the XMPP
384 handshake is successful, when the connection is closed, and
385 when a notification has to be broadcast.
386 addr: The host/port of the client.
388 # We do this because in versions of python < 2.6,
389 # async_chat.__init__ doesn't take a map argument nor pass it to
390 # dispatcher.__init__. We rely on the fact that
391 # async_chat.__init__ calls dispatcher.__init__ as the last thing
392 # it does, and that calling dispatcher.__init__ with socket=None
393 # and map=None is essentially a no-op.
394 asynchat
.async_chat
.__init
__(self
)
395 asyncore
.dispatcher
.__init
__(self
, sock
, socket_map
)
397 self
.set_terminator(None)
399 self
._delegate
= delegate
400 self
._parser
= StanzaParser(self
)
404 addr_str
= AddrString(self
._addr
)
405 self
._handshake
_task
= HandshakeTask(self
, addr_str
, authenticated
)
406 print 'Starting connection to %s' % self
410 return str(self
._jid
)
412 return AddrString(self
._addr
)
414 # async_chat implementation.
416 def collect_incoming_data(self
, data
):
417 self
._parser
.FeedString(data
)
419 # This is only here to make pychecker happy.
420 def found_terminator(self
):
421 asynchat
.async_chat
.found_terminator(self
)
424 print "Closing connection to %s" % self
425 self
._delegate
.OnXmppConnectionClosed(self
)
426 asynchat
.async_chat
.close(self
)
428 # Called by self._parser.FeedString().
429 def FeedStanza(self
, stanza
):
430 if self
._handshake
_task
:
431 self
._handshake
_task
.FeedStanza(stanza
)
432 elif stanza
.tagName
== 'iq' and stanza
.getAttribute('type') == 'result':
433 # Ignore all client acks.
435 elif (stanza
.firstChild
and
436 stanza
.firstChild
.namespaceURI
== 'google:push'):
437 self
._HandlePushCommand
(stanza
)
439 raise UnexpectedXml(stanza
)
441 # Called by self._handshake_task.
442 def HandshakeDone(self
, jid
):
445 self
._handshake
_task
= None
446 self
._delegate
.OnXmppHandshakeDone(self
)
447 print "Handshake done for %s" % self
449 print "Handshake failed for %s" % self
452 def _HandlePushCommand(self
, stanza
):
453 if stanza
.tagName
== 'iq' and stanza
.firstChild
.tagName
== 'subscribe':
454 # Subscription request.
455 self
._SendIqResponseStanza
(stanza
)
456 elif stanza
.tagName
== 'message' and stanza
.firstChild
.tagName
== 'push':
457 # Send notification request.
458 self
._delegate
.ForwardNotification(self
, stanza
)
460 raise UnexpectedXml(command_xml
)
462 def _SendIqResponseStanza(self
, iq
):
463 stanza
= CloneXml(self
._IQ
_RESPONSE
_STANZA
)
464 stanza
.setAttribute('from', str(self
._jid
.GetBareJid()))
465 stanza
.setAttribute('id', iq
.getAttribute('id'))
466 self
.SendStanza(stanza
)
468 def SendStanza(self
, stanza
, unlink
=True):
469 """Sends a stanza to the client.
472 stanza: The stanza to send.
473 unlink: Whether to unlink stanza after sending it. (Pass in
474 False if stanza is a constant.)
476 self
.SendData(stanza
.toxml())
480 def SendData(self
, data
):
481 """Sends raw data to the client.
483 # We explicitly encode to ascii as that is what the client expects
484 # (some minidom library functions return unicode strings).
485 self
.push(data
.encode('ascii'))
487 def ForwardNotification(self
, notification_stanza
):
488 """Forwards a notification to the client."""
489 notification_stanza
.setAttribute('from', str(self
._jid
.GetBareJid()))
490 notification_stanza
.setAttribute('to', str(self
._jid
))
491 self
.SendStanza(notification_stanza
, False)
494 class XmppServer(asyncore
.dispatcher
):
495 """The main XMPP server class.
497 The XMPP server starts accepting connections on the given address
498 and spawns off XmppConnection objects for each one.
503 xmpp_server = xmppserver.XmppServer(socket_map, ('127.0.0.1', 5222))
504 asyncore.loop(30.0, False, socket_map)
507 # Used when sending a notification.
508 _NOTIFICATION_STANZA
= ParseXml(
510 ' <push xmlns="google:push">'
515 def __init__(self
, socket_map
, addr
):
516 asyncore
.dispatcher
.__init
__(self
, None, socket_map
)
517 self
.create_socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
518 self
.set_reuse_addr()
521 self
._socket
_map
= socket_map
522 self
._connections
= set()
523 self
._handshake
_done
_connections
= set()
524 self
._notifications
_enabled
= True
525 self
._authenticated
= True
527 def handle_accept(self
):
528 (sock
, addr
) = self
.accept()
529 xmpp_connection
= XmppConnection(
530 sock
, self
._socket
_map
, self
, addr
, self
._authenticated
)
531 self
._connections
.add(xmpp_connection
)
532 # Return the new XmppConnection for testing.
533 return xmpp_connection
536 # A copy is necessary since calling close on each connection
537 # removes it from self._connections.
538 for connection
in self
._connections
.copy():
540 asyncore
.dispatcher
.close(self
)
542 def EnableNotifications(self
):
543 self
._notifications
_enabled
= True
545 def DisableNotifications(self
):
546 self
._notifications
_enabled
= False
548 def MakeNotification(self
, channel
, data
):
549 """Makes a notification from the given channel and encoded data.
552 channel: The channel on which to send the notification.
553 data: The notification payload.
555 notification_stanza
= CloneXml(self
._NOTIFICATION
_STANZA
)
556 push_element
= notification_stanza
.getElementsByTagName('push')[0]
557 push_element
.setAttribute('channel', channel
)
558 data_element
= push_element
.getElementsByTagName('data')[0]
559 encoded_data
= base64
.b64encode(data
)
560 data_text
= notification_stanza
.parentNode
.createTextNode(encoded_data
)
561 data_element
.appendChild(data_text
)
562 return notification_stanza
564 def SendNotification(self
, channel
, data
):
565 """Sends a notification to all connections.
568 channel: The channel on which to send the notification.
569 data: The notification payload.
571 notification_stanza
= self
.MakeNotification(channel
, data
)
572 self
.ForwardNotification(None, notification_stanza
)
573 notification_stanza
.unlink()
575 def SetAuthenticated(self
, auth_valid
):
576 self
._authenticated
= auth_valid
578 # We check authentication only when establishing new connections. We close
579 # all existing connections here to make sure previously connected clients
580 # pick up on the change. It's a hack, but it works well enough for our
582 if not self
._authenticated
:
583 for connection
in self
._handshake
_done
_connections
:
586 def GetAuthenticated(self
):
587 return self
._authenticated
589 # XmppConnection delegate methods.
590 def OnXmppHandshakeDone(self
, xmpp_connection
):
591 self
._handshake
_done
_connections
.add(xmpp_connection
)
593 def OnXmppConnectionClosed(self
, xmpp_connection
):
594 self
._connections
.discard(xmpp_connection
)
595 self
._handshake
_done
_connections
.discard(xmpp_connection
)
597 def ForwardNotification(self
, unused_xmpp_connection
, notification_stanza
):
598 if self
._notifications
_enabled
:
599 for connection
in self
._handshake
_done
_connections
:
600 print 'Sending notification to %s' % connection
601 connection
.ForwardNotification(notification_stanza
)
603 print 'Notifications disabled; dropping notification'