2 Testing script for NonBlockingClient class (src/common/xmpp/client_nb.py)
4 It actually connects to a xmpp server.
12 from xmpp_mocks
import MockConnection
, IdleQueueThread
14 from common
.xmpp
import client_nb
17 #log = logging.getLogger('gajim')
18 #log.setLevel(logging.DEBUG)
20 # (XMPP server hostname, c2s port). Script will connect to the machine.
21 xmpp_server_port
= ('gajim.org', 5222)
23 # [username, password, passphrase]. Script will authenticate to server above
24 credentials
= ['unittest', 'testtest', 'res']
26 class TestNonBlockingClient(unittest
.TestCase
):
28 Test Cases class for NonBlockingClient.
31 ''' IdleQueue thread is run and dummy connection is created. '''
32 self
.idlequeue_thread
= IdleQueueThread()
33 self
.connection
= MockConnection() # for dummy callbacks
34 self
.idlequeue_thread
.start()
37 ''' IdleQueue thread is stopped. '''
38 self
.idlequeue_thread
.stop_thread()
39 self
.idlequeue_thread
.join()
43 def open_stream(self
, server_port
, wrong_pass
=False):
45 Method opening the XMPP connection. It returns when <stream:features>
46 is received from server.
48 :param server_port: tuple of (hostname, port) for where the client should
52 class TempConnection():
53 def get_password(self
, cb
, mechanism
):
58 def on_connect_failure(self
):
61 self
.client
= client_nb
.NonBlockingClient(
62 domain
=server_port
[0],
63 idlequeue
=self
.idlequeue_thread
.iq
,
64 caller
=Mock(realClass
=TempConnection
))
67 hostname
=server_port
[0],
69 on_connect
=lambda *args
: self
.connection
.on_connect(True, *args
),
70 on_connect_failure
=lambda *args
: self
.connection
.on_connect(
73 self
.assert_(self
.connection
.wait(),
74 msg
='waiting for callback from client constructor')
76 # if on_connect was called, client has to be connected and vice versa
77 if self
.connection
.connect_succeeded
:
78 self
.assert_(self
.client
.get_connect_type())
80 self
.assert_(not self
.client
.get_connect_type())
82 def client_auth(self
, username
, password
, resource
, sasl
):
84 Method authenticating connected client with supplied credentials. Returns
85 when authentication is over.
87 :param sasl: whether to use sasl (sasl=1) or old (sasl=0) authentication
88 :todo: to check and be more specific about when it returns
91 self
.client
.auth(username
, password
, resource
, sasl
,
92 on_auth
=self
.connection
.on_auth
)
94 self
.assert_(self
.connection
.wait(), msg
='waiting for authentication')
96 def do_disconnect(self
):
98 Does disconnecting of connected client. Returns when TCP connection is
101 self
.client
.RegisterDisconnectHandler(self
.connection
.set_event
)
102 self
.client
.disconnect()
104 self
.assertTrue(self
.connection
.wait(), msg
='waiting for disconnecting')
106 def test_proper_connect_sasl(self
):
108 The ideal testcase - client is connected, authenticated with SASL and
111 self
.open_stream(xmpp_server_port
)
113 # if client is not connected, lets raise the AssertionError
114 self
.assert_(self
.client
.get_connect_type())
115 # client.disconnect() is already called from NBClient via
116 # _on_connected_failure, no need to call it here
118 self
.client_auth(credentials
[0], credentials
[1], credentials
[2], sasl
=1)
119 self
.assert_(self
.connection
.con
)
120 self
.assert_(self
.connection
.auth
=='sasl', msg
='Unable to auth via SASL')
124 def test_proper_connect_oldauth(self
):
126 The ideal testcase - client is connected, authenticated with old auth and
129 self
.open_stream(xmpp_server_port
)
130 self
.assert_(self
.client
.get_connect_type())
131 self
.client_auth(credentials
[0], credentials
[1], credentials
[2], sasl
=0)
132 self
.assert_(self
.connection
.con
)
133 features
= self
.client
.Dispatcher
.Stream
.features
134 if not features
.getTag('auth'):
135 print "Server doesn't support old authentication type, ignoring test"
137 self
.assert_(self
.connection
.auth
=='old_auth',
138 msg
='Unable to auth via old_auth')
141 def test_connect_to_nonexisting_host(self
):
143 Connect to nonexisting host. DNS request for A records should return
146 self
.open_stream(('fdsfsdf.fdsf.fss', 5222))
147 self
.assert_(not self
.client
.get_connect_type())
149 def test_connect_to_wrong_port(self
):
151 Connect to nonexisting server. DNS request for A records should return an
152 IP but there shouldn't be XMPP server running on specified port.
154 self
.open_stream((xmpp_server_port
[0], 31337))
155 self
.assert_(not self
.client
.get_connect_type())
157 def test_connect_with_wrong_creds(self
):
159 Connecting with invalid password.
161 self
.open_stream(xmpp_server_port
, wrong_pass
=True)
162 self
.assert_(self
.client
.get_connect_type())
163 self
.client_auth(credentials
[0], 'wrong pass', credentials
[2], sasl
=1)
164 self
.assert_(self
.connection
.auth
is None)
168 if __name__
== '__main__':