getting file size for all dict files to be downloaded. coming to be 400mb or so.
[worddb.git] / libs / openid / test / test_negotiation.py
blobc9c254a6c926d430429c1ded9ac3b05680b2f8c6
2 import unittest
3 from support import CatchLogs
5 from openid.message import Message, OPENID2_NS, OPENID1_NS, OPENID_NS
6 from openid import association
7 from openid.consumer.consumer import GenericConsumer, ServerError
8 from openid.consumer.discover import OpenIDServiceEndpoint, OPENID_2_0_TYPE
10 class ErrorRaisingConsumer(GenericConsumer):
11 """
12 A consumer whose _requestAssocation will return predefined results
13 instead of trying to actually perform association requests.
14 """
16 # The list of objects to be returned by successive calls to
17 # _requestAssocation. Each call will pop the first element from
18 # this list and return it to _negotiateAssociation. If the
19 # element is a Message object, it will be wrapped in a ServerError
20 # exception. Otherwise it will be returned as-is.
21 return_messages = []
23 def _requestAssociation(self, endpoint, assoc_type, session_type):
24 m = self.return_messages.pop(0)
25 if isinstance(m, Message):
26 raise ServerError.fromMessage(m)
27 else:
28 return m
30 class TestOpenID2SessionNegotiation(unittest.TestCase, CatchLogs):
31 """
32 Test the session type negotiation behavior of an OpenID 2
33 consumer.
34 """
35 def setUp(self):
36 CatchLogs.setUp(self)
37 self.consumer = ErrorRaisingConsumer(store=None)
39 self.endpoint = OpenIDServiceEndpoint()
40 self.endpoint.type_uris = [OPENID_2_0_TYPE]
41 self.endpoint.server_url = 'bogus'
43 def testBadResponse(self):
44 """
45 Test the case where the response to an associate request is a
46 server error or is otherwise undecipherable.
47 """
48 self.consumer.return_messages = [Message(self.endpoint.preferredNamespace())]
49 self.assertEqual(self.consumer._negotiateAssociation(self.endpoint), None)
50 self.failUnlessLogMatches('Server error when requesting an association')
52 def testEmptyAssocType(self):
53 """
54 Test the case where the association type (assoc_type) returned
55 in an unsupported-type response is absent.
56 """
57 msg = Message(self.endpoint.preferredNamespace())
58 msg.setArg(OPENID_NS, 'error', 'Unsupported type')
59 msg.setArg(OPENID_NS, 'error_code', 'unsupported-type')
60 # not set: msg.delArg(OPENID_NS, 'assoc_type')
61 msg.setArg(OPENID_NS, 'session_type', 'new-session-type')
63 self.consumer.return_messages = [msg]
64 self.assertEqual(self.consumer._negotiateAssociation(self.endpoint), None)
66 self.failUnlessLogMatches('Unsupported association type',
67 'Server responded with unsupported association ' +
68 'session but did not supply a fallback.')
70 def testEmptySessionType(self):
71 """
72 Test the case where the session type (session_type) returned
73 in an unsupported-type response is absent.
74 """
75 msg = Message(self.endpoint.preferredNamespace())
76 msg.setArg(OPENID_NS, 'error', 'Unsupported type')
77 msg.setArg(OPENID_NS, 'error_code', 'unsupported-type')
78 msg.setArg(OPENID_NS, 'assoc_type', 'new-assoc-type')
79 # not set: msg.setArg(OPENID_NS, 'session_type', None)
81 self.consumer.return_messages = [msg]
82 self.assertEqual(self.consumer._negotiateAssociation(self.endpoint), None)
84 self.failUnlessLogMatches('Unsupported association type',
85 'Server responded with unsupported association ' +
86 'session but did not supply a fallback.')
88 def testNotAllowed(self):
89 """
90 Test the case where an unsupported-type response specifies a
91 preferred (assoc_type, session_type) combination that is not
92 allowed by the consumer's SessionNegotiator.
93 """
94 allowed_types = []
96 negotiator = association.SessionNegotiator(allowed_types)
97 self.consumer.negotiator = negotiator
99 msg = Message(self.endpoint.preferredNamespace())
100 msg.setArg(OPENID_NS, 'error', 'Unsupported type')
101 msg.setArg(OPENID_NS, 'error_code', 'unsupported-type')
102 msg.setArg(OPENID_NS, 'assoc_type', 'not-allowed')
103 msg.setArg(OPENID_NS, 'session_type', 'not-allowed')
105 self.consumer.return_messages = [msg]
106 self.assertEqual(self.consumer._negotiateAssociation(self.endpoint), None)
108 self.failUnlessLogMatches('Unsupported association type',
109 'Server sent unsupported session/association type:')
111 def testUnsupportedWithRetry(self):
113 Test the case where an unsupported-type response triggers a
114 retry to get an association with the new preferred type.
116 msg = Message(self.endpoint.preferredNamespace())
117 msg.setArg(OPENID_NS, 'error', 'Unsupported type')
118 msg.setArg(OPENID_NS, 'error_code', 'unsupported-type')
119 msg.setArg(OPENID_NS, 'assoc_type', 'HMAC-SHA1')
120 msg.setArg(OPENID_NS, 'session_type', 'DH-SHA1')
122 assoc = association.Association(
123 'handle', 'secret', 'issued', 10000, 'HMAC-SHA1')
125 self.consumer.return_messages = [msg, assoc]
126 self.failUnless(self.consumer._negotiateAssociation(self.endpoint) is assoc)
128 self.failUnlessLogMatches('Unsupported association type')
130 def testUnsupportedWithRetryAndFail(self):
132 Test the case where an unsupported-typ response triggers a
133 retry, but the retry fails and None is returned instead.
135 msg = Message(self.endpoint.preferredNamespace())
136 msg.setArg(OPENID_NS, 'error', 'Unsupported type')
137 msg.setArg(OPENID_NS, 'error_code', 'unsupported-type')
138 msg.setArg(OPENID_NS, 'assoc_type', 'HMAC-SHA1')
139 msg.setArg(OPENID_NS, 'session_type', 'DH-SHA1')
141 self.consumer.return_messages = [msg,
142 Message(self.endpoint.preferredNamespace())]
144 self.failUnlessEqual(self.consumer._negotiateAssociation(self.endpoint), None)
146 self.failUnlessLogMatches('Unsupported association type',
147 'Server %s refused' % (self.endpoint.server_url))
149 def testValid(self):
151 Test the valid case, wherein an association is returned on the
152 first attempt to get one.
154 assoc = association.Association(
155 'handle', 'secret', 'issued', 10000, 'HMAC-SHA1')
157 self.consumer.return_messages = [assoc]
158 self.failUnless(self.consumer._negotiateAssociation(self.endpoint) is assoc)
159 self.failUnlessLogEmpty()
161 class TestOpenID1SessionNegotiation(unittest.TestCase, CatchLogs):
163 Tests for the OpenID 1 consumer association session behavior. See
164 the docs for TestOpenID2SessionNegotiation. Notice that this
165 class is not a subclass of the OpenID 2 tests. Instead, it uses
166 many of the same inputs but inspects the log messages logged with
167 oidutil.log. See the calls to self.failUnlessLogMatches. Some of
168 these tests pass openid2-style messages to the openid 1
169 association processing logic to be sure it ignores the extra data.
171 def setUp(self):
172 CatchLogs.setUp(self)
173 self.consumer = ErrorRaisingConsumer(store=None)
175 self.endpoint = OpenIDServiceEndpoint()
176 self.endpoint.type_uris = [OPENID1_NS]
177 self.endpoint.server_url = 'bogus'
179 def testBadResponse(self):
180 self.consumer.return_messages = [Message(self.endpoint.preferredNamespace())]
181 self.assertEqual(self.consumer._negotiateAssociation(self.endpoint), None)
182 self.failUnlessLogMatches('Server error when requesting an association')
184 def testEmptyAssocType(self):
185 msg = Message(self.endpoint.preferredNamespace())
186 msg.setArg(OPENID_NS, 'error', 'Unsupported type')
187 msg.setArg(OPENID_NS, 'error_code', 'unsupported-type')
188 # not set: msg.setArg(OPENID_NS, 'assoc_type', None)
189 msg.setArg(OPENID_NS, 'session_type', 'new-session-type')
191 self.consumer.return_messages = [msg]
192 self.assertEqual(self.consumer._negotiateAssociation(self.endpoint), None)
194 self.failUnlessLogMatches('Server error when requesting an association')
196 def testEmptySessionType(self):
197 msg = Message(self.endpoint.preferredNamespace())
198 msg.setArg(OPENID_NS, 'error', 'Unsupported type')
199 msg.setArg(OPENID_NS, 'error_code', 'unsupported-type')
200 msg.setArg(OPENID_NS, 'assoc_type', 'new-assoc-type')
201 # not set: msg.setArg(OPENID_NS, 'session_type', None)
203 self.consumer.return_messages = [msg]
204 self.assertEqual(self.consumer._negotiateAssociation(self.endpoint), None)
206 self.failUnlessLogMatches('Server error when requesting an association')
208 def testNotAllowed(self):
209 allowed_types = []
211 negotiator = association.SessionNegotiator(allowed_types)
212 self.consumer.negotiator = negotiator
214 msg = Message(self.endpoint.preferredNamespace())
215 msg.setArg(OPENID_NS, 'error', 'Unsupported type')
216 msg.setArg(OPENID_NS, 'error_code', 'unsupported-type')
217 msg.setArg(OPENID_NS, 'assoc_type', 'not-allowed')
218 msg.setArg(OPENID_NS, 'session_type', 'not-allowed')
220 self.consumer.return_messages = [msg]
221 self.assertEqual(self.consumer._negotiateAssociation(self.endpoint), None)
223 self.failUnlessLogMatches('Server error when requesting an association')
225 def testUnsupportedWithRetry(self):
226 msg = Message(self.endpoint.preferredNamespace())
227 msg.setArg(OPENID_NS, 'error', 'Unsupported type')
228 msg.setArg(OPENID_NS, 'error_code', 'unsupported-type')
229 msg.setArg(OPENID_NS, 'assoc_type', 'HMAC-SHA1')
230 msg.setArg(OPENID_NS, 'session_type', 'DH-SHA1')
232 assoc = association.Association(
233 'handle', 'secret', 'issued', 10000, 'HMAC-SHA1')
235 self.consumer.return_messages = [msg, assoc]
236 self.failUnless(self.consumer._negotiateAssociation(self.endpoint) is None)
238 self.failUnlessLogMatches('Server error when requesting an association')
240 def testValid(self):
241 assoc = association.Association(
242 'handle', 'secret', 'issued', 10000, 'HMAC-SHA1')
244 self.consumer.return_messages = [assoc]
245 self.failUnless(self.consumer._negotiateAssociation(self.endpoint) is assoc)
246 self.failUnlessLogEmpty()
248 class TestNegotiatorBehaviors(unittest.TestCase, CatchLogs):
249 def setUp(self):
250 self.allowed_types = [
251 ('HMAC-SHA1', 'no-encryption'),
252 ('HMAC-SHA256', 'no-encryption'),
255 self.n = association.SessionNegotiator(self.allowed_types)
257 def testAddAllowedTypeNoSessionTypes(self):
258 self.assertRaises(ValueError, self.n.addAllowedType, 'invalid')
260 def testAddAllowedTypeBadSessionType(self):
261 self.assertRaises(ValueError, self.n.addAllowedType, 'assoc1', 'invalid')
263 def testAddAllowedTypeContents(self):
264 assoc_type = 'HMAC-SHA1'
265 self.failUnless(self.n.addAllowedType(assoc_type) is None)
267 for typ in association.getSessionTypes(assoc_type):
268 self.failUnless((assoc_type, typ) in self.n.allowed_types)
270 if __name__ == '__main__':
271 unittest.main()