Don't assume that libdbus only gives method call messages to object-path handlers...
[dbus-python-phuang.git] / test / cross-test-client.py
blobf192167e8c1277bc56540566660479f947907f22
1 # Copyright (C) 2006 Collabora Ltd. <http://www.collabora.co.uk/>
3 # Permission is hereby granted, free of charge, to any person
4 # obtaining a copy of this software and associated documentation
5 # files (the "Software"), to deal in the Software without
6 # restriction, including without limitation the rights to use, copy,
7 # modify, merge, publish, distribute, sublicense, and/or sell copies
8 # of the Software, and to permit persons to whom the Software is
9 # furnished to do so, subject to the following conditions:
11 # The above copyright notice and this permission notice shall be
12 # included in all copies or substantial portions of the Software.
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
15 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
17 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
18 # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21 # DEALINGS IN THE SOFTWARE.
23 from sets import Set
24 from time import sleep
25 import logging
27 import gobject
29 from dbus import SessionBus, Interface, Array, Byte, Double, Boolean, ByteArray, Int16, Int32, Int64, UInt16, UInt32, UInt64, String, UTF8String, Struct, Dictionary
30 from dbus.service import BusName
31 import dbus.glib
33 from crosstest import CROSS_TEST_PATH, CROSS_TEST_BUS_NAME,\
34 INTERFACE_SINGLE_TESTS, INTERFACE_TESTS,\
35 INTERFACE_SIGNAL_TESTS, INTERFACE_CALLBACK_TESTS,\
36 SignalTestsImpl
39 logging.basicConfig()
40 logging.getLogger().setLevel(1)
41 logger = logging.getLogger('cross-test-client')
44 class Client(SignalTestsImpl):
45 fail_id = 0
46 expected = Set()
48 def quit(self):
49 for x in self.expected:
50 self.fail_id += 1
51 print "%s fail %d" % (x, self.fail_id)
52 s = "report %d: reply to %s didn't arrive" % (self.fail_id, x)
53 print s
54 logger.error(s)
55 logger.info("asking server to Exit")
56 Interface(self.obj, INTERFACE_TESTS).Exit(reply_handler=self.quit_reply_handler, error_handler=self.quit_error_handler)
57 # if the server doesn't reply we'll just exit anyway
58 gobject.timeout_add(1000, lambda: (loop.quit(), False)[1])
60 def quit_reply_handler(self):
61 logger.info("server says it will exit")
62 loop.quit()
64 def quit_error_handler(self, e):
65 logger.error("error telling server to quit: %s %s",
66 e.__class__, e)
67 loop.quit()
69 @dbus.service.method(INTERFACE_CALLBACK_TESTS, 'qd')
70 def Response(self, input1, input2):
71 logger.info("signal/callback: Response received (%r,%r)",
72 input1, input2)
73 self.expected.discard('%s.Trigger' % INTERFACE_SIGNAL_TESTS)
74 if (input1, input2) != (42, 23):
75 self.fail_id += 1
76 print "%s.Trigger fail %d" % (INTERFACE_SIGNAL_TESTS, self.fail_id)
77 s = ("report %d: expected (42,23), got %r"
78 % (self.fail_id, (input1, input2)))
79 logger.error(s)
80 print s
81 else:
82 print "%s.Trigger pass" % INTERFACE_SIGNAL_TESTS
83 self.quit()
85 def assert_method_matches(self, interface, check_fn, check_arg, member, *args):
86 if_obj = Interface(self.obj, interface)
87 method = getattr(if_obj, member)
88 try:
89 real_ret = method(*args)
90 except Exception, e:
91 self.fail_id += 1
92 print "%s.%s fail %d" % (interface, member, self.fail_id)
93 s = ("report %d: %s.%s%r: raised %r \"%s\""
94 % (self.fail_id, interface, member, args, e, e))
95 print s
96 logger.error(s)
97 __import__('traceback').print_exc()
98 return
99 try:
100 check_fn(real_ret, check_arg)
101 except Exception, e:
102 self.fail_id += 1
103 print "%s.%s fail %d" % (interface, member, self.fail_id)
104 s = ("report %d: %s.%s%r: %s"
105 % (self.fail_id, interface, member, args, e))
106 print s
107 logger.error(s)
108 return
109 print "%s.%s pass" % (interface, member)
111 def assert_method_eq(self, interface, ret, member, *args):
112 def equals(real_ret, exp):
113 if real_ret != exp:
114 raise AssertionError('expected %r of class %s, got %r of class %s' % (exp, exp.__class__, real_ret, real_ret.__class__))
115 if real_ret != exp:
116 raise AssertionError('expected %r, got %r' % (exp, real_ret))
117 if not isinstance(exp, (tuple, type(None))):
118 if real_ret.variant_level != getattr(exp, 'variant_level', 0):
119 raise AssertionError('expected variant_level=%d, got %r with level %d'
120 % (getattr(exp, 'variant_level', 0), real_ret,
121 real_ret.variant_level))
122 if isinstance(exp, list) or isinstance(exp, tuple):
123 for i in xrange(len(exp)):
124 try:
125 equals(real_ret[i], exp[i])
126 except AssertionError, e:
127 if not isinstance(e.args, tuple):
128 e.args = (e.args,)
129 e.args = e.args + ('(at position %d in sequence)' % i,)
130 raise e
131 elif isinstance(exp, dict):
132 for k in exp:
133 try:
134 equals(real_ret[k], exp[k])
135 except AssertionError, e:
136 if not isinstance(e.args, tuple):
137 e.args = (e.args,)
138 e.args = e.args + ('(at key %r in dict)' % k,)
139 raise e
140 self.assert_method_matches(interface, equals, ret, member, *args)
142 def assert_InvertMapping_eq(self, interface, expected, member, mapping):
143 def check(real_ret, exp):
144 for key in exp:
145 if key not in real_ret:
146 raise AssertionError('missing key %r' % key)
147 for key in real_ret:
148 if key not in exp:
149 raise AssertionError('unexpected key %r' % key)
150 got = list(real_ret[key])
151 wanted = list(exp[key])
152 got.sort()
153 wanted.sort()
154 if got != wanted:
155 raise AssertionError('expected %r => %r, got %r'
156 % (key, wanted, got))
157 self.assert_method_matches(interface, check, expected, member, mapping)
159 def triggered_cb(self, param, sender_path):
160 logger.info("method/signal: Triggered(%r) by %r",
161 param, sender_path)
162 self.expected.discard('%s.Trigger' % INTERFACE_TESTS)
163 if sender_path != '/Where/Ever':
164 self.fail_id += 1
165 print "%s.Trigger fail %d" % (INTERFACE_TESTS, self.fail_id)
166 s = ("report %d: expected signal from /Where/Ever, got %r"
167 % (self.fail_id, sender_path))
168 print s
169 logger.error(s)
170 elif param != 42:
171 self.fail_id += 1
172 print "%s.Trigger fail %d" % (INTERFACE_TESTS, self.fail_id)
173 s = ("report %d: expected signal param 42, got %r"
174 % (self.fail_id, parameter))
175 print s
176 logger.error(s)
177 else:
178 print "%s.Trigger pass" % INTERFACE_TESTS
180 def trigger_returned_cb(self):
181 logger.info('method/signal: Trigger() returned')
182 # Callback tests
183 logger.info("signal/callback: Emitting signal to trigger callback")
184 self.expected.add('%s.Trigger' % INTERFACE_SIGNAL_TESTS)
185 self.Trigger(UInt16(42), 23.0)
186 logger.info("signal/callback: Emitting signal returned")
188 def run_client(self):
189 bus = SessionBus()
190 obj = bus.get_object(CROSS_TEST_BUS_NAME, CROSS_TEST_PATH)
191 self.obj = obj
193 self.run_synchronous_tests(obj)
195 # Signal tests
196 logger.info("Binding signal handler for Triggered")
197 # FIXME: doesn't seem to work when going via the Interface method
198 # FIXME: should be possible to ask the proxy object for its
199 # bus name
200 bus.add_signal_receiver(self.triggered_cb, 'Triggered',
201 INTERFACE_SIGNAL_TESTS,
202 CROSS_TEST_BUS_NAME,
203 path_keyword='sender_path')
204 logger.info("method/signal: Triggering signal")
205 self.expected.add('%s.Trigger' % INTERFACE_TESTS)
206 Interface(obj, INTERFACE_TESTS).Trigger(u'/Where/Ever', dbus.UInt64(42), reply_handler=self.trigger_returned_cb, error_handler=self.trigger_error_handler)
208 def trigger_error_handler(self, e):
209 logger.error("method/signal: %s %s", e.__class__, e)
210 Interface(self.obj, INTERFACE_TESTS).Exit()
211 self.quit()
213 def run_synchronous_tests(self, obj):
214 # We can't test that coercion works correctly unless the server has
215 # sent us introspection data. Java doesn't :-/
216 have_signatures = True
218 # "Single tests"
219 if have_signatures:
220 self.assert_method_eq(INTERFACE_SINGLE_TESTS, 6, 'Sum', [1, 2, 3])
221 self.assert_method_eq(INTERFACE_SINGLE_TESTS, 6, 'Sum', ['\x01', '\x02', '\x03'])
222 self.assert_method_eq(INTERFACE_SINGLE_TESTS, 6, 'Sum', [Byte(1), Byte(2), Byte(3)])
223 self.assert_method_eq(INTERFACE_SINGLE_TESTS, 6, 'Sum', ByteArray('\x01\x02\x03'))
225 # Main tests
226 self.assert_method_eq(INTERFACE_TESTS, String(u'foo', variant_level=1), 'Identity', String('foo'))
227 self.assert_method_eq(INTERFACE_TESTS, String(u'foo', variant_level=1), 'Identity', UTF8String('foo'))
228 self.assert_method_eq(INTERFACE_TESTS, Byte(42, variant_level=1), 'Identity', Byte(42))
229 self.assert_method_eq(INTERFACE_TESTS, Byte(42, variant_level=23), 'Identity', Byte(42, variant_level=23))
230 self.assert_method_eq(INTERFACE_TESTS, Double(42.5, variant_level=1), 'Identity', 42.5)
231 self.assert_method_eq(INTERFACE_TESTS, Double(-42.5, variant_level=1), 'Identity', -42.5)
233 if have_signatures:
234 self.assert_method_eq(INTERFACE_TESTS, String(u'foo', variant_level=1), 'Identity', 'foo')
235 self.assert_method_eq(INTERFACE_TESTS, Byte(42, variant_level=1), 'Identity', Byte(42))
236 self.assert_method_eq(INTERFACE_TESTS, Double(42.5, variant_level=1), 'Identity', Double(42.5))
237 self.assert_method_eq(INTERFACE_TESTS, Double(-42.5, variant_level=1), 'Identity', -42.5)
239 for i in (0, 42, 255):
240 self.assert_method_eq(INTERFACE_TESTS, Byte(i), 'IdentityByte', Byte(i))
241 for i in (True, False):
242 self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityBool', i)
244 for i in (-0x8000, 0, 42, 0x7fff):
245 self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityInt16', Int16(i))
246 for i in (0, 42, 0xffff):
247 self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityUInt16', UInt16(i))
248 for i in (-0x7fffffff-1, 0, 42, 0x7fffffff):
249 self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityInt32', Int32(i))
250 for i in (0L, 42L, 0xffffffffL):
251 self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityUInt32', UInt32(i))
252 MANY = 0x8000L * 0x10000L * 0x10000L * 0x10000L
253 for i in (-MANY, 0, 42, MANY-1):
254 self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityInt64', Int64(i))
255 for i in (0, 42, 2*MANY - 1):
256 self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityUInt64', UInt64(i))
258 self.assert_method_eq(INTERFACE_TESTS, 42.3, 'IdentityDouble', 42.3)
259 for i in ('', 'foo'):
260 self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityString', i)
261 for i in (u'\xa9', '\xc2\xa9'):
262 self.assert_method_eq(INTERFACE_TESTS, u'\xa9', 'IdentityString', i)
264 if have_signatures:
265 self.assert_method_eq(INTERFACE_TESTS, Byte(0x42), 'IdentityByte', '\x42')
266 self.assert_method_eq(INTERFACE_TESTS, True, 'IdentityBool', 42)
267 self.assert_method_eq(INTERFACE_TESTS, 42, 'IdentityInt16', 42)
268 self.assert_method_eq(INTERFACE_TESTS, 42, 'IdentityUInt16', 42)
269 self.assert_method_eq(INTERFACE_TESTS, 42, 'IdentityInt32', 42)
270 self.assert_method_eq(INTERFACE_TESTS, 42, 'IdentityUInt32', 42)
271 self.assert_method_eq(INTERFACE_TESTS, 42, 'IdentityInt64', 42)
272 self.assert_method_eq(INTERFACE_TESTS, 42, 'IdentityUInt64', 42)
273 self.assert_method_eq(INTERFACE_TESTS, 42.0, 'IdentityDouble', 42)
275 self.assert_method_eq(INTERFACE_TESTS, [Byte('\x01', variant_level=1),
276 Byte('\x02', variant_level=1),
277 Byte('\x03', variant_level=1)],
278 'IdentityArray',
279 Array([Byte('\x01'),
280 Byte('\x02'),
281 Byte('\x03')],
282 signature='v'))
284 self.assert_method_eq(INTERFACE_TESTS, [Int32(1, variant_level=1),
285 Int32(2, variant_level=1),
286 Int32(3, variant_level=1)],
287 'IdentityArray',
288 Array([Int32(1),
289 Int32(2),
290 Int32(3)],
291 signature='v'))
292 self.assert_method_eq(INTERFACE_TESTS, [String(u'a', variant_level=1),
293 String(u'b', variant_level=1),
294 String(u'c', variant_level=1)],
295 'IdentityArray',
296 Array([String('a'),
297 String('b'),
298 String('c')],
299 signature='v'))
301 if have_signatures:
302 self.assert_method_eq(INTERFACE_TESTS, [Byte('\x01', variant_level=1),
303 Byte('\x02', variant_level=1),
304 Byte('\x03', variant_level=1)],
305 'IdentityArray',
306 ByteArray('\x01\x02\x03'))
307 self.assert_method_eq(INTERFACE_TESTS, [Int32(1, variant_level=1),
308 Int32(2, variant_level=1),
309 Int32(3, variant_level=1)],
310 'IdentityArray',
311 [Int32(1),
312 Int32(2),
313 Int32(3)])
314 self.assert_method_eq(INTERFACE_TESTS, [String(u'a', variant_level=1),
315 String(u'b', variant_level=1),
316 String(u'c', variant_level=1)],
317 'IdentityArray',
318 ['a','b','c'])
320 self.assert_method_eq(INTERFACE_TESTS,
321 [Byte(1), Byte(2), Byte(3)],
322 'IdentityByteArray',
323 ByteArray('\x01\x02\x03'))
324 if have_signatures:
325 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityByteArray', ['\x01', '\x02', '\x03'])
326 self.assert_method_eq(INTERFACE_TESTS, [False,True], 'IdentityBoolArray', [False,True])
327 if have_signatures:
328 self.assert_method_eq(INTERFACE_TESTS, [False,True,True], 'IdentityBoolArray', [0,1,2])
330 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityInt16Array', [Int16(1),Int16(2),Int16(3)])
331 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityUInt16Array', [UInt16(1),UInt16(2),UInt16(3)])
332 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityInt32Array', [Int32(1),Int32(2),Int32(3)])
333 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityUInt32Array', [UInt32(1),UInt32(2),UInt32(3)])
334 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityInt64Array', [Int64(1),Int64(2),Int64(3)])
335 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityUInt64Array', [UInt64(1),UInt64(2),UInt64(3)])
337 if have_signatures:
338 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityInt16Array', [1,2,3])
339 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityUInt16Array', [1,2,3])
340 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityInt32Array', [1,2,3])
341 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityUInt32Array', [1,2,3])
342 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityInt64Array', [1,2,3])
343 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityUInt64Array', [1,2,3])
345 self.assert_method_eq(INTERFACE_TESTS, [1.0,2.5,3.1], 'IdentityDoubleArray', [1.0,2.5,3.1])
346 if have_signatures:
347 self.assert_method_eq(INTERFACE_TESTS, [1.0,2.5,3.1], 'IdentityDoubleArray', [1,2.5,3.1])
348 self.assert_method_eq(INTERFACE_TESTS, ['a','b','c'], 'IdentityStringArray', ['a','b','c'])
349 self.assert_method_eq(INTERFACE_TESTS, 6, 'Sum', [Int32(1),Int32(2),Int32(3)])
350 if have_signatures:
351 self.assert_method_eq(INTERFACE_TESTS, 6, 'Sum', [1,2,3])
353 self.assert_InvertMapping_eq(INTERFACE_TESTS, {'fps': ['unreal', 'quake'], 'rts': ['warcraft']}, 'InvertMapping', {'unreal': 'fps', 'quake': 'fps', 'warcraft': 'rts'})
355 self.assert_method_eq(INTERFACE_TESTS, ('a', 1, 2), 'DeStruct', ('a', UInt32(1), Int16(2)))
356 self.assert_method_eq(INTERFACE_TESTS, Array([String('x', variant_level=1)]),
357 'Primitize', [String('x', variant_level=1)])
358 self.assert_method_eq(INTERFACE_TESTS, Array([String('x', variant_level=1)]),
359 'Primitize', [String('x', variant_level=23)])
360 self.assert_method_eq(INTERFACE_TESTS,
361 Array([String('x', variant_level=1),
362 Byte(1, variant_level=1),
363 Byte(2, variant_level=1)]),
364 'Primitize',
365 Array([String('x'), Byte(1), Byte(2)],
366 signature='v'))
367 self.assert_method_eq(INTERFACE_TESTS,
368 Array([String('x', variant_level=1),
369 Byte(1, variant_level=1),
370 Byte(2, variant_level=1)]),
371 'Primitize',
372 Array([String('x'), Array([Byte(1), Byte(2)])],
373 signature='v'))
374 self.assert_method_eq(INTERFACE_TESTS, Boolean(False), 'Invert', True)
375 self.assert_method_eq(INTERFACE_TESTS, Boolean(True), 'Invert', False)
376 if have_signatures:
377 self.assert_method_eq(INTERFACE_TESTS, Boolean(False), 'Invert', 42)
378 self.assert_method_eq(INTERFACE_TESTS, Boolean(True), 'Invert', 0)
381 if __name__ == '__main__':
382 # FIXME: should be possible to export objects without a bus name
383 if 0:
384 client = Client(dbus.SessionBus(), '/Client')
385 else:
386 # the Java cross test's interpretation is that the client should be
387 # at /Test too
388 client = Client(dbus.SessionBus(), '/Test')
389 gobject.idle_add(client.run_client)
391 loop = gobject.MainLoop()
392 logger.info("running...")
393 loop.run()
394 logger.info("main loop exited.")