1 # **********************************************************************
3 # Copyright (c) 2003-2011 ZeroC, Inc. All rights reserved.
5 # This copy of Ice is licensed to you under the terms described in the
6 # ICE_LICENSE file included in this distribution.
8 # **********************************************************************
10 import Ice
, Test
, threading
14 raise RuntimeError('test assertion failed')
16 def allTests(communicator
, collocated
):
17 print "testing proxy endpoint information...",
19 p1
= communicator
.stringToProxy("test -t:default -h tcphost -p 10000 -t 1200 -z:" + \
20 "udp -h udphost -p 10001 --interface eth0 --ttl 5:" + \
21 "opaque -t 100 -v ABCD")
23 endps
= p1
.ice_getEndpoints()
25 ipEndpoint
= endps
[0].getInfo()
26 test(isinstance(ipEndpoint
, Ice
.IPEndpointInfo
))
27 test(ipEndpoint
.host
== "tcphost")
28 test(ipEndpoint
.port
== 10000)
29 test(ipEndpoint
.timeout
== 1200)
30 test(ipEndpoint
.compress
)
31 test(not ipEndpoint
.datagram())
32 test((ipEndpoint
.type() == Ice
.TCPEndpointType
and not ipEndpoint
.secure()) or
33 (ipEndpoint
.type() == 2 and ipEndpoint
.secure())) # SSL
34 test((ipEndpoint
.type() == Ice
.TCPEndpointType
and isinstance(ipEndpoint
, Ice
.TCPEndpointInfo
)) or
35 (ipEndpoint
.type() == 2)) # SSL
37 udpEndpoint
= endps
[1].getInfo()
38 test(isinstance(udpEndpoint
, Ice
.UDPEndpointInfo
))
39 test(udpEndpoint
.host
== "udphost")
40 test(udpEndpoint
.port
== 10001)
41 test(udpEndpoint
.mcastInterface
== "eth0")
42 test(udpEndpoint
.mcastTtl
== 5)
43 test(udpEndpoint
.timeout
== -1)
44 test(not udpEndpoint
.compress
)
45 test(not udpEndpoint
.secure())
46 test(udpEndpoint
.datagram())
47 test(udpEndpoint
.type() == Ice
.UDPEndpointType
)
49 opaqueEndpoint
= endps
[2].getInfo()
50 test(isinstance(opaqueEndpoint
, Ice
.OpaqueEndpointInfo
))
54 defaultHost
= communicator
.getProperties().getProperty("Ice.Default.Host")
56 print "test object adapter endpoint information...",
58 communicator
.getProperties().setProperty("TestAdapter.Endpoints", "default -t 15000:udp")
59 adapter
= communicator
.createObjectAdapter("TestAdapter")
60 endpoints
= adapter
.getEndpoints()
61 test(len(endpoints
) == 2)
62 publishedEndpoints
= adapter
.getPublishedEndpoints()
63 test(endpoints
== publishedEndpoints
)
65 ipEndpoint
= endpoints
[0].getInfo()
66 test(ipEndpoint
.type() == Ice
.TCPEndpointType
or ipEndpoint
.type() == 2)
67 test(ipEndpoint
.host
== defaultHost
)
68 test(ipEndpoint
.port
> 0)
69 test(ipEndpoint
.timeout
== 15000)
71 udpEndpoint
= endpoints
[1].getInfo()
72 test(udpEndpoint
.host
== defaultHost
)
73 test(udpEndpoint
.datagram())
74 test(udpEndpoint
.port
> 0)
78 communicator
.getProperties().setProperty("TestAdapter.Endpoints", "default -h * -p 12020")
79 communicator
.getProperties().setProperty("TestAdapter.PublishedEndpoints", "default -h 127.0.0.1 -p 12020")
80 adapter
= communicator
.createObjectAdapter("TestAdapter")
82 endpoints
= adapter
.getEndpoints()
83 test(len(endpoints
) >= 1)
84 publishedEndpoints
= adapter
.getPublishedEndpoints()
85 test(len(publishedEndpoints
) == 1)
87 for i
in range(0, len(endpoints
)):
88 ipEndpoint
= endpoints
[i
].getInfo()
89 test(ipEndpoint
.port
== 12020)
91 ipEndpoint
= publishedEndpoints
[0].getInfo()
92 test(ipEndpoint
.host
== "127.0.0.1")
93 test(ipEndpoint
.port
== 12020)
99 base
= communicator
.stringToProxy("test:default -p 12010:udp -p 12010")
100 testIntf
= Test
.TestIntfPrx
.checkedCast(base
)
102 print "test connection endpoint information...",
104 ipinfo
= base
.ice_getConnection().getEndpoint().getInfo()
105 test(ipinfo
.port
== 12010)
106 test(not ipinfo
.compress
)
107 test(ipinfo
.host
== defaultHost
)
109 ctx
= testIntf
.getEndpointInfoAsContext()
110 test(ctx
["host"] == ipinfo
.host
)
111 test(ctx
["compress"] == "false")
112 port
= int(ctx
["port"])
115 udp
= base
.ice_datagram().ice_getConnection().getEndpoint().getInfo()
116 test(udp
.port
== 12010)
117 test(udp
.host
== defaultHost
)
121 print "testing connection information...",
123 info
= base
.ice_getConnection().getInfo()
124 test(not info
.incoming
)
125 test(len(info
.adapterName
) == 0)
126 test(info
.remotePort
== 12010)
127 test(info
.remoteAddress
== defaultHost
)
128 test(info
.localAddress
== defaultHost
)
130 ctx
= testIntf
.getConnectionInfoAsContext()
131 test(ctx
["incoming"] == "true")
132 test(ctx
["adapterName"] == "TestAdapter")
133 test(ctx
["remoteAddress"] == info
.localAddress
)
134 test(ctx
["localAddress"] == info
.remoteAddress
)
135 test(ctx
["remotePort"] == str(info
.localPort
))
136 test(ctx
["localPort"] == str(info
.remotePort
))
142 communicator
.shutdown()
143 communicator
.waitForShutdown()