3 # Copyright (C) 2004 Red Hat Inc. <http://www.redhat.com/>
4 # Copyright (C) 2005, 2006 Collabora Ltd. <http://www.collabora.co.uk/>
6 # Licensed under the Academic Free License version 2.1
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 2 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program; if not, write to the Free Software
20 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
26 builddir
= os
.path
.normpath(os
.environ
["DBUS_TOP_BUILDDIR"])
27 pydir
= os
.path
.normpath(os
.environ
["DBUS_TOP_SRCDIR"])
31 if not dbus
.__file
__.startswith(pydir
):
32 raise Exception("DBus modules are not being picked up from the package")
39 from dbus
.gobject_service
import ExportedGObject
42 logging
.basicConfig(filename
=builddir
+ '/test/test-service.log', filemode
='w')
43 logging
.getLogger().setLevel(1)
44 logger
= logging
.getLogger('test-service')
47 NAME
= "org.freedesktop.DBus.TestSuitePythonService"
48 IFACE
= "org.freedesktop.DBus.TestSuiteInterface"
49 OBJECT
= "/org/freedesktop/DBus/TestSuitePythonObject"
51 class TestGObject(ExportedGObject
):
52 def __init__(self
, bus_name
, object_path
=OBJECT
+ '/GObject'):
53 super(TestGObject
, self
).__init
__(bus_name
, object_path
)
55 @dbus.service
.method(IFACE
)
59 class TestInterface(dbus
.service
.Interface
):
60 @dbus.service
.method(IFACE
, in_signature
='', out_signature
='b')
61 def CheckInheritance(self
):
64 class TestObject(dbus
.service
.Object
, TestInterface
):
65 def __init__(self
, bus_name
, object_path
=OBJECT
):
66 dbus
.service
.Object
.__init
__(self
, bus_name
, object_path
)
68 """ Echo whatever is sent
70 @dbus.service
.method(IFACE
)
74 @dbus.service
.method(IFACE
, in_signature
='s', out_signature
='s')
75 def AcceptUnicodeString(self
, foo
):
76 assert isinstance(foo
, unicode), (foo
, foo
.__class
__.__mro
__)
79 @dbus.service
.method(IFACE
, in_signature
='s', out_signature
='s', utf8_strings
=True)
80 def AcceptUTF8String(self
, foo
):
81 assert isinstance(foo
, str), (foo
, foo
.__class
__.__mro
__)
84 @dbus.service
.method(IFACE
, in_signature
='', out_signature
='soss',
85 sender_keyword
='sender', path_keyword
='path',
86 destination_keyword
='dest', message_keyword
='msg')
87 def MethodExtraInfoKeywords(self
, sender
=None, path
=None, dest
=None,
89 return (sender
, path
, dest
,
90 msg
.__class
__.__module
__ + '.' + msg
.__class
__.__name
__)
92 @dbus.service
.method(IFACE
, in_signature
='ay', out_signature
='ay')
93 def AcceptListOfByte(self
, foo
):
94 assert isinstance(foo
, list), (foo
, foo
.__class
__.__mro
__)
97 @dbus.service
.method(IFACE
, in_signature
='ay', out_signature
='ay', byte_arrays
=True)
98 def AcceptByteArray(self
, foo
):
99 assert isinstance(foo
, str), (foo
, foo
.__class
__.__mro
__)
102 @dbus.service
.method(IFACE
)
103 def GetComplexArray(self
):
105 for i
in range(0,100):
106 ret
.append((random
.randint(0,100), random
.randint(0,100), str(random
.randint(0,100))))
108 return dbus
.Array(ret
, signature
="(uus)")
110 def returnValue(self
, test
):
126 @dbus.service
.method(IFACE
, in_signature
='u', out_signature
='s')
127 def ReturnOneString(self
, test
):
128 return self
.returnValue(test
)
130 @dbus.service
.method(IFACE
, in_signature
='u', out_signature
='ss')
131 def ReturnTwoStrings(self
, test
):
132 return self
.returnValue(test
)
134 @dbus.service
.method(IFACE
, in_signature
='u', out_signature
='(ss)')
135 def ReturnStruct(self
, test
):
136 logger
.info('ReturnStruct(%r) -> %r', test
, self
.returnValue(test
))
137 return self
.returnValue(test
)
139 @dbus.service
.method(IFACE
, in_signature
='u', out_signature
='as')
140 def ReturnArray(self
, test
):
141 return self
.returnValue(test
)
143 @dbus.service
.method(IFACE
, in_signature
='u', out_signature
='a{ss}')
144 def ReturnDict(self
, test
):
145 return self
.returnValue(test
)
147 @dbus.service
.signal(IFACE
, signature
='s')
148 def SignalOneString(self
, test
):
149 logger
.info('SignalOneString(%r)', test
)
151 @dbus.service
.signal(IFACE
, signature
='ss')
152 def SignalTwoStrings(self
, test
, test2
):
153 logger
.info('SignalTwoStrings(%r, %r)', test
, test2
)
155 @dbus.service
.signal(IFACE
, signature
='(ss)')
156 def SignalStruct(self
, test
):
157 logger
.info('SignalStruct(%r)', test
)
159 @dbus.service
.signal(IFACE
, signature
='as')
160 def SignalArray(self
, test
):
163 @dbus.service
.signal(IFACE
, signature
='a{ss}')
164 def SignalDict(self
, test
):
167 @dbus.service
.method(IFACE
, in_signature
='su', out_signature
='')
168 def EmitSignal(self
, signal
, value
):
169 sig
= getattr(self
, str(signal
), None)
172 val
= self
.returnValue(value
)
173 # make two string case work by passing arguments in by tuple
174 if (signal
== 'SignalTwoStrings' and (value
== 1 or value
== 5)):
179 logger
.info('Emitting %s with %r', signal
, val
)
182 def CheckInheritance(self
):
185 @dbus.service
.method(IFACE
, in_signature
='', out_signature
='b')
186 def TestListExportedChildObjects(self
):
187 objs_root
= session_bus
.list_exported_child_objects('/')
188 assert objs_root
== ['org'], objs_root
189 objs_org
= session_bus
.list_exported_child_objects('/org')
190 assert objs_org
== ['freedesktop'], objs_org
193 @dbus.service
.method(IFACE
, in_signature
='bbv', out_signature
='v', async_callbacks
=('return_cb', 'error_cb'))
194 def AsynchronousMethod(self
, async, fail
, variant
, return_cb
, error_cb
):
197 gobject
.timeout_add(500, self
.AsynchronousMethod
, False, fail
, variant
, return_cb
, error_cb
)
205 return False # do not run again
209 @dbus.service
.method(IFACE
, in_signature
='', out_signature
='s', sender_keyword
='sender')
210 def WhoAmI(self
, sender
):
213 @dbus.service
.method(IFACE
)
214 def MultipleReturnWithoutSignature(self
):
215 # https://bugs.freedesktop.org/show_bug.cgi?id=10174
216 return dbus
.String('abc'), dbus
.Int32(123)
218 session_bus
= dbus
.SessionBus()
219 name
= dbus
.service
.BusName(NAME
, bus
=session_bus
)
220 object = TestObject(name
)
221 g_object
= TestGObject(name
)
222 loop
= gobject
.MainLoop()