1 # Copyright (C) 2006 Collabora Ltd. <http://www.collabora.co.uk/>
3 # Permission is hereby granted, free of charge, to any person
4 # obtaining a copy of this software and associated documentation
5 # files (the "Software"), to deal in the Software without
6 # restriction, including without limitation the rights to use, copy,
7 # modify, merge, publish, distribute, sublicense, and/or sell copies
8 # of the Software, and to permit persons to whom the Software is
9 # furnished to do so, subject to the following conditions:
11 # The above copyright notice and this permission notice shall be
12 # included in all copies or substantial portions of the Software.
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
15 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
17 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
18 # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21 # DEALINGS IN THE SOFTWARE.
24 from time
import sleep
29 from dbus
import SessionBus
, Interface
, Array
, Byte
, Double
, Boolean
, ByteArray
, Int16
, Int32
, Int64
, UInt16
, UInt32
, UInt64
, String
, UTF8String
, Struct
, Dictionary
30 from dbus
.service
import BusName
33 from crosstest
import CROSS_TEST_PATH
, CROSS_TEST_BUS_NAME
,\
34 INTERFACE_SINGLE_TESTS
, INTERFACE_TESTS
,\
35 INTERFACE_SIGNAL_TESTS
, INTERFACE_CALLBACK_TESTS
,\
40 logging
.getLogger().setLevel(1)
41 logger
= logging
.getLogger('cross-test-client')
44 class Client(SignalTestsImpl
):
49 for x
in self
.expected
:
51 print "%s fail %d" % (x
, self
.fail_id
)
52 s
= "report %d: reply to %s didn't arrive" % (self
.fail_id
, x
)
55 logger
.info("asking server to Exit")
56 Interface(self
.obj
, INTERFACE_TESTS
).Exit(reply_handler
=self
.quit_reply_handler
, error_handler
=self
.quit_error_handler
)
57 # if the server doesn't reply we'll just exit anyway
58 gobject
.timeout_add(1000, lambda: (loop
.quit(), False)[1])
60 def quit_reply_handler(self
):
61 logger
.info("server says it will exit")
64 def quit_error_handler(self
, e
):
65 logger
.error("error telling server to quit: %s %s",
69 @dbus.service
.method(INTERFACE_CALLBACK_TESTS
, 'qd')
70 def Response(self
, input1
, input2
):
71 logger
.info("signal/callback: Response received (%r,%r)",
73 self
.expected
.discard('%s.Trigger' % INTERFACE_SIGNAL_TESTS
)
74 if (input1
, input2
) != (42, 23):
76 print "%s.Trigger fail %d" % (INTERFACE_SIGNAL_TESTS
, self
.fail_id
)
77 s
= ("report %d: expected (42,23), got %r"
78 % (self
.fail_id
, (input1
, input2
)))
82 print "%s.Trigger pass" % INTERFACE_SIGNAL_TESTS
85 def assert_method_matches(self
, interface
, check_fn
, check_arg
, member
, *args
):
86 if_obj
= Interface(self
.obj
, interface
)
87 method
= getattr(if_obj
, member
)
89 real_ret
= method(*args
)
92 print "%s.%s fail %d" % (interface
, member
, self
.fail_id
)
93 s
= ("report %d: %s.%s%r: raised %r \"%s\""
94 % (self
.fail_id
, interface
, member
, args
, e
, e
))
97 __import__('traceback').print_exc()
100 check_fn(real_ret
, check_arg
)
103 print "%s.%s fail %d" % (interface
, member
, self
.fail_id
)
104 s
= ("report %d: %s.%s%r: %s"
105 % (self
.fail_id
, interface
, member
, args
, e
))
109 print "%s.%s pass" % (interface
, member
)
111 def assert_method_eq(self
, interface
, ret
, member
, *args
):
112 def equals(real_ret
, exp
):
114 raise AssertionError('expected %r of class %s, got %r of class %s' % (exp
, exp
.__class
__, real_ret
, real_ret
.__class
__))
116 raise AssertionError('expected %r, got %r' % (exp
, real_ret
))
117 if not isinstance(exp
, (tuple, type(None))):
118 if real_ret
.variant_level
!= getattr(exp
, 'variant_level', 0):
119 raise AssertionError('expected variant_level=%d, got %r with level %d'
120 % (getattr(exp
, 'variant_level', 0), real_ret
,
121 real_ret
.variant_level
))
122 if isinstance(exp
, list) or isinstance(exp
, tuple):
123 for i
in xrange(len(exp
)):
125 equals(real_ret
[i
], exp
[i
])
126 except AssertionError, e
:
127 if not isinstance(e
.args
, tuple):
129 e
.args
= e
.args
+ ('(at position %d in sequence)' % i
,)
131 elif isinstance(exp
, dict):
134 equals(real_ret
[k
], exp
[k
])
135 except AssertionError, e
:
136 if not isinstance(e
.args
, tuple):
138 e
.args
= e
.args
+ ('(at key %r in dict)' % k
,)
140 self
.assert_method_matches(interface
, equals
, ret
, member
, *args
)
142 def assert_InvertMapping_eq(self
, interface
, expected
, member
, mapping
):
143 def check(real_ret
, exp
):
145 if key
not in real_ret
:
146 raise AssertionError('missing key %r' % key
)
149 raise AssertionError('unexpected key %r' % key
)
150 got
= list(real_ret
[key
])
151 wanted
= list(exp
[key
])
155 raise AssertionError('expected %r => %r, got %r'
156 % (key
, wanted
, got
))
157 self
.assert_method_matches(interface
, check
, expected
, member
, mapping
)
159 def triggered_cb(self
, param
, sender_path
):
160 logger
.info("method/signal: Triggered(%r) by %r",
162 self
.expected
.discard('%s.Trigger' % INTERFACE_TESTS
)
163 if sender_path
!= '/Where/Ever':
165 print "%s.Trigger fail %d" % (INTERFACE_TESTS
, self
.fail_id
)
166 s
= ("report %d: expected signal from /Where/Ever, got %r"
167 % (self
.fail_id
, sender_path
))
172 print "%s.Trigger fail %d" % (INTERFACE_TESTS
, self
.fail_id
)
173 s
= ("report %d: expected signal param 42, got %r"
174 % (self
.fail_id
, parameter
))
178 print "%s.Trigger pass" % INTERFACE_TESTS
180 def trigger_returned_cb(self
):
181 logger
.info('method/signal: Trigger() returned')
183 logger
.info("signal/callback: Emitting signal to trigger callback")
184 self
.expected
.add('%s.Trigger' % INTERFACE_SIGNAL_TESTS
)
185 self
.Trigger(UInt16(42), 23.0)
186 logger
.info("signal/callback: Emitting signal returned")
188 def run_client(self
):
190 obj
= bus
.get_object(CROSS_TEST_BUS_NAME
, CROSS_TEST_PATH
)
193 self
.run_synchronous_tests(obj
)
196 logger
.info("Binding signal handler for Triggered")
197 # FIXME: doesn't seem to work when going via the Interface method
198 # FIXME: should be possible to ask the proxy object for its
200 bus
.add_signal_receiver(self
.triggered_cb
, 'Triggered',
201 INTERFACE_SIGNAL_TESTS
,
203 path_keyword
='sender_path')
204 logger
.info("method/signal: Triggering signal")
205 self
.expected
.add('%s.Trigger' % INTERFACE_TESTS
)
206 Interface(obj
, INTERFACE_TESTS
).Trigger(u
'/Where/Ever', dbus
.UInt64(42), reply_handler
=self
.trigger_returned_cb
, error_handler
=self
.trigger_error_handler
)
208 def trigger_error_handler(self
, e
):
209 logger
.error("method/signal: %s %s", e
.__class
__, e
)
210 Interface(self
.obj
, INTERFACE_TESTS
).Exit()
213 def run_synchronous_tests(self
, obj
):
214 # We can't test that coercion works correctly unless the server has
215 # sent us introspection data. Java doesn't :-/
216 have_signatures
= True
220 self
.assert_method_eq(INTERFACE_SINGLE_TESTS
, 6, 'Sum', [1, 2, 3])
221 self
.assert_method_eq(INTERFACE_SINGLE_TESTS
, 6, 'Sum', ['\x01', '\x02', '\x03'])
222 self
.assert_method_eq(INTERFACE_SINGLE_TESTS
, 6, 'Sum', [Byte(1), Byte(2), Byte(3)])
223 self
.assert_method_eq(INTERFACE_SINGLE_TESTS
, 6, 'Sum', ByteArray('\x01\x02\x03'))
226 self
.assert_method_eq(INTERFACE_TESTS
, String(u
'foo', variant_level
=1), 'Identity', String('foo'))
227 self
.assert_method_eq(INTERFACE_TESTS
, String(u
'foo', variant_level
=1), 'Identity', UTF8String('foo'))
228 self
.assert_method_eq(INTERFACE_TESTS
, Byte(42, variant_level
=1), 'Identity', Byte(42))
229 self
.assert_method_eq(INTERFACE_TESTS
, Byte(42, variant_level
=23), 'Identity', Byte(42, variant_level
=23))
230 self
.assert_method_eq(INTERFACE_TESTS
, Double(42.5, variant_level
=1), 'Identity', 42.5)
231 self
.assert_method_eq(INTERFACE_TESTS
, Double(-42.5, variant_level
=1), 'Identity', -42.5)
234 self
.assert_method_eq(INTERFACE_TESTS
, String(u
'foo', variant_level
=1), 'Identity', 'foo')
235 self
.assert_method_eq(INTERFACE_TESTS
, Byte(42, variant_level
=1), 'Identity', Byte(42))
236 self
.assert_method_eq(INTERFACE_TESTS
, Double(42.5, variant_level
=1), 'Identity', Double(42.5))
237 self
.assert_method_eq(INTERFACE_TESTS
, Double(-42.5, variant_level
=1), 'Identity', -42.5)
239 for i
in (0, 42, 255):
240 self
.assert_method_eq(INTERFACE_TESTS
, Byte(i
), 'IdentityByte', Byte(i
))
241 for i
in (True, False):
242 self
.assert_method_eq(INTERFACE_TESTS
, i
, 'IdentityBool', i
)
244 for i
in (-0x8000, 0, 42, 0x7fff):
245 self
.assert_method_eq(INTERFACE_TESTS
, i
, 'IdentityInt16', Int16(i
))
246 for i
in (0, 42, 0xffff):
247 self
.assert_method_eq(INTERFACE_TESTS
, i
, 'IdentityUInt16', UInt16(i
))
248 for i
in (-0x7fffffff-1, 0, 42, 0x7fffffff):
249 self
.assert_method_eq(INTERFACE_TESTS
, i
, 'IdentityInt32', Int32(i
))
250 for i
in (0L, 42L, 0xffffffffL
):
251 self
.assert_method_eq(INTERFACE_TESTS
, i
, 'IdentityUInt32', UInt32(i
))
252 MANY
= 0x8000L
* 0x10000L
* 0x10000L
* 0x10000L
253 for i
in (-MANY
, 0, 42, MANY
-1):
254 self
.assert_method_eq(INTERFACE_TESTS
, i
, 'IdentityInt64', Int64(i
))
255 for i
in (0, 42, 2*MANY
- 1):
256 self
.assert_method_eq(INTERFACE_TESTS
, i
, 'IdentityUInt64', UInt64(i
))
258 self
.assert_method_eq(INTERFACE_TESTS
, 42.3, 'IdentityDouble', 42.3)
259 for i
in ('', 'foo'):
260 self
.assert_method_eq(INTERFACE_TESTS
, i
, 'IdentityString', i
)
261 for i
in (u
'\xa9', '\xc2\xa9'):
262 self
.assert_method_eq(INTERFACE_TESTS
, u
'\xa9', 'IdentityString', i
)
265 self
.assert_method_eq(INTERFACE_TESTS
, Byte(0x42), 'IdentityByte', '\x42')
266 self
.assert_method_eq(INTERFACE_TESTS
, True, 'IdentityBool', 42)
267 self
.assert_method_eq(INTERFACE_TESTS
, 42, 'IdentityInt16', 42)
268 self
.assert_method_eq(INTERFACE_TESTS
, 42, 'IdentityUInt16', 42)
269 self
.assert_method_eq(INTERFACE_TESTS
, 42, 'IdentityInt32', 42)
270 self
.assert_method_eq(INTERFACE_TESTS
, 42, 'IdentityUInt32', 42)
271 self
.assert_method_eq(INTERFACE_TESTS
, 42, 'IdentityInt64', 42)
272 self
.assert_method_eq(INTERFACE_TESTS
, 42, 'IdentityUInt64', 42)
273 self
.assert_method_eq(INTERFACE_TESTS
, 42.0, 'IdentityDouble', 42)
275 self
.assert_method_eq(INTERFACE_TESTS
, [Byte('\x01', variant_level
=1),
276 Byte('\x02', variant_level
=1),
277 Byte('\x03', variant_level
=1)],
284 self
.assert_method_eq(INTERFACE_TESTS
, [Int32(1, variant_level
=1),
285 Int32(2, variant_level
=1),
286 Int32(3, variant_level
=1)],
292 self
.assert_method_eq(INTERFACE_TESTS
, [String(u
'a', variant_level
=1),
293 String(u
'b', variant_level
=1),
294 String(u
'c', variant_level
=1)],
302 self
.assert_method_eq(INTERFACE_TESTS
, [Byte('\x01', variant_level
=1),
303 Byte('\x02', variant_level
=1),
304 Byte('\x03', variant_level
=1)],
306 ByteArray('\x01\x02\x03'))
307 self
.assert_method_eq(INTERFACE_TESTS
, [Int32(1, variant_level
=1),
308 Int32(2, variant_level
=1),
309 Int32(3, variant_level
=1)],
314 self
.assert_method_eq(INTERFACE_TESTS
, [String(u
'a', variant_level
=1),
315 String(u
'b', variant_level
=1),
316 String(u
'c', variant_level
=1)],
320 self
.assert_method_eq(INTERFACE_TESTS
,
321 [Byte(1), Byte(2), Byte(3)],
323 ByteArray('\x01\x02\x03'))
325 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityByteArray', ['\x01', '\x02', '\x03'])
326 self
.assert_method_eq(INTERFACE_TESTS
, [False,True], 'IdentityBoolArray', [False,True])
328 self
.assert_method_eq(INTERFACE_TESTS
, [False,True,True], 'IdentityBoolArray', [0,1,2])
330 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityInt16Array', [Int16(1),Int16(2),Int16(3)])
331 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityUInt16Array', [UInt16(1),UInt16(2),UInt16(3)])
332 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityInt32Array', [Int32(1),Int32(2),Int32(3)])
333 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityUInt32Array', [UInt32(1),UInt32(2),UInt32(3)])
334 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityInt64Array', [Int64(1),Int64(2),Int64(3)])
335 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityUInt64Array', [UInt64(1),UInt64(2),UInt64(3)])
338 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityInt16Array', [1,2,3])
339 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityUInt16Array', [1,2,3])
340 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityInt32Array', [1,2,3])
341 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityUInt32Array', [1,2,3])
342 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityInt64Array', [1,2,3])
343 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityUInt64Array', [1,2,3])
345 self
.assert_method_eq(INTERFACE_TESTS
, [1.0,2.5,3.1], 'IdentityDoubleArray', [1.0,2.5,3.1])
347 self
.assert_method_eq(INTERFACE_TESTS
, [1.0,2.5,3.1], 'IdentityDoubleArray', [1,2.5,3.1])
348 self
.assert_method_eq(INTERFACE_TESTS
, ['a','b','c'], 'IdentityStringArray', ['a','b','c'])
349 self
.assert_method_eq(INTERFACE_TESTS
, 6, 'Sum', [Int32(1),Int32(2),Int32(3)])
351 self
.assert_method_eq(INTERFACE_TESTS
, 6, 'Sum', [1,2,3])
353 self
.assert_InvertMapping_eq(INTERFACE_TESTS
, {'fps': ['unreal', 'quake'], 'rts': ['warcraft']}, 'InvertMapping', {'unreal': 'fps', 'quake': 'fps', 'warcraft': 'rts'})
355 self
.assert_method_eq(INTERFACE_TESTS
, ('a', 1, 2), 'DeStruct', ('a', UInt32(1), Int16(2)))
356 self
.assert_method_eq(INTERFACE_TESTS
, Array([String('x', variant_level
=1)]),
357 'Primitize', [String('x', variant_level
=1)])
358 self
.assert_method_eq(INTERFACE_TESTS
, Array([String('x', variant_level
=1)]),
359 'Primitize', [String('x', variant_level
=23)])
360 self
.assert_method_eq(INTERFACE_TESTS
,
361 Array([String('x', variant_level
=1),
362 Byte(1, variant_level
=1),
363 Byte(2, variant_level
=1)]),
365 Array([String('x'), Byte(1), Byte(2)],
367 self
.assert_method_eq(INTERFACE_TESTS
,
368 Array([String('x', variant_level
=1),
369 Byte(1, variant_level
=1),
370 Byte(2, variant_level
=1)]),
372 Array([String('x'), Array([Byte(1), Byte(2)])],
374 self
.assert_method_eq(INTERFACE_TESTS
, Boolean(False), 'Invert', True)
375 self
.assert_method_eq(INTERFACE_TESTS
, Boolean(True), 'Invert', False)
377 self
.assert_method_eq(INTERFACE_TESTS
, Boolean(False), 'Invert', 42)
378 self
.assert_method_eq(INTERFACE_TESTS
, Boolean(True), 'Invert', 0)
381 if __name__
== '__main__':
382 # FIXME: should be possible to export objects without a bus name
384 client
= Client(dbus
.SessionBus(), '/Client')
386 # the Java cross test's interpretation is that the client should be
388 client
= Client(dbus
.SessionBus(), '/Test')
389 gobject
.idle_add(client
.run_client
)
391 loop
= gobject
.MainLoop()
392 logger
.info("running...")
394 logger
.info("main loop exited.")