ICE 3.4.2
[php5-ice-freebsdport.git] / py / test / Ice / info / AllTests.py
blob9820a1e83a49e7eece93282ed1e816f27982db90
1 # **********************************************************************
3 # Copyright (c) 2003-2011 ZeroC, Inc. All rights reserved.
5 # This copy of Ice is licensed to you under the terms described in the
6 # ICE_LICENSE file included in this distribution.
8 # **********************************************************************
10 import Ice, Test, threading
12 def test(b):
13 if not b:
14 raise RuntimeError('test assertion failed')
16 def allTests(communicator, collocated):
17 print "testing proxy endpoint information...",
19 p1 = communicator.stringToProxy("test -t:default -h tcphost -p 10000 -t 1200 -z:" + \
20 "udp -h udphost -p 10001 --interface eth0 --ttl 5:" + \
21 "opaque -t 100 -v ABCD")
23 endps = p1.ice_getEndpoints()
25 ipEndpoint = endps[0].getInfo()
26 test(isinstance(ipEndpoint, Ice.IPEndpointInfo))
27 test(ipEndpoint.host == "tcphost")
28 test(ipEndpoint.port == 10000)
29 test(ipEndpoint.timeout == 1200)
30 test(ipEndpoint.compress)
31 test(not ipEndpoint.datagram())
32 test((ipEndpoint.type() == Ice.TCPEndpointType and not ipEndpoint.secure()) or
33 (ipEndpoint.type() == 2 and ipEndpoint.secure())) # SSL
34 test((ipEndpoint.type() == Ice.TCPEndpointType and isinstance(ipEndpoint, Ice.TCPEndpointInfo)) or
35 (ipEndpoint.type() == 2)) # SSL
37 udpEndpoint = endps[1].getInfo()
38 test(isinstance(udpEndpoint, Ice.UDPEndpointInfo))
39 test(udpEndpoint.host == "udphost")
40 test(udpEndpoint.port == 10001)
41 test(udpEndpoint.mcastInterface == "eth0")
42 test(udpEndpoint.mcastTtl == 5)
43 test(udpEndpoint.timeout == -1)
44 test(not udpEndpoint.compress)
45 test(not udpEndpoint.secure())
46 test(udpEndpoint.datagram())
47 test(udpEndpoint.type() == Ice.UDPEndpointType)
49 opaqueEndpoint = endps[2].getInfo()
50 test(isinstance(opaqueEndpoint, Ice.OpaqueEndpointInfo))
52 print "ok"
54 defaultHost = communicator.getProperties().getProperty("Ice.Default.Host")
56 print "test object adapter endpoint information...",
58 communicator.getProperties().setProperty("TestAdapter.Endpoints", "default -t 15000:udp")
59 adapter = communicator.createObjectAdapter("TestAdapter")
60 endpoints = adapter.getEndpoints()
61 test(len(endpoints) == 2)
62 publishedEndpoints = adapter.getPublishedEndpoints()
63 test(endpoints == publishedEndpoints)
65 ipEndpoint = endpoints[0].getInfo()
66 test(ipEndpoint.type() == Ice.TCPEndpointType or ipEndpoint.type() == 2)
67 test(ipEndpoint.host == defaultHost)
68 test(ipEndpoint.port > 0)
69 test(ipEndpoint.timeout == 15000)
71 udpEndpoint = endpoints[1].getInfo()
72 test(udpEndpoint.host == defaultHost)
73 test(udpEndpoint.datagram())
74 test(udpEndpoint.port > 0)
76 adapter.destroy()
78 communicator.getProperties().setProperty("TestAdapter.Endpoints", "default -h * -p 12020")
79 communicator.getProperties().setProperty("TestAdapter.PublishedEndpoints", "default -h 127.0.0.1 -p 12020")
80 adapter = communicator.createObjectAdapter("TestAdapter")
82 endpoints = adapter.getEndpoints()
83 test(len(endpoints) >= 1)
84 publishedEndpoints = adapter.getPublishedEndpoints()
85 test(len(publishedEndpoints) == 1)
87 for i in range(0, len(endpoints)):
88 ipEndpoint = endpoints[i].getInfo()
89 test(ipEndpoint.port == 12020)
91 ipEndpoint = publishedEndpoints[0].getInfo()
92 test(ipEndpoint.host == "127.0.0.1")
93 test(ipEndpoint.port == 12020)
95 adapter.destroy()
97 print "ok"
99 base = communicator.stringToProxy("test:default -p 12010:udp -p 12010")
100 testIntf = Test.TestIntfPrx.checkedCast(base)
102 print "test connection endpoint information...",
104 ipinfo = base.ice_getConnection().getEndpoint().getInfo()
105 test(ipinfo.port == 12010)
106 test(not ipinfo.compress)
107 test(ipinfo.host == defaultHost)
109 ctx = testIntf.getEndpointInfoAsContext()
110 test(ctx["host"] == ipinfo.host)
111 test(ctx["compress"] == "false")
112 port = int(ctx["port"])
113 test(port > 0)
115 udp = base.ice_datagram().ice_getConnection().getEndpoint().getInfo()
116 test(udp.port == 12010)
117 test(udp.host == defaultHost)
119 print "ok"
121 print "testing connection information...",
123 info = base.ice_getConnection().getInfo()
124 test(not info.incoming)
125 test(len(info.adapterName) == 0)
126 test(info.remotePort == 12010)
127 test(info.remoteAddress == defaultHost)
128 test(info.localAddress == defaultHost)
130 ctx = testIntf.getConnectionInfoAsContext()
131 test(ctx["incoming"] == "true")
132 test(ctx["adapterName"] == "TestAdapter")
133 test(ctx["remoteAddress"] == info.localAddress)
134 test(ctx["localAddress"] == info.remoteAddress)
135 test(ctx["remotePort"] == str(info.localPort))
136 test(ctx["localPort"] == str(info.remotePort))
138 print "ok"
140 testIntf.shutdown()
142 communicator.shutdown()
143 communicator.waitForShutdown()