3 from support
import CatchLogs
5 from openid
.message
import Message
, OPENID2_NS
, OPENID1_NS
, OPENID_NS
6 from openid
import association
7 from openid
.consumer
.consumer
import GenericConsumer
, ServerError
8 from openid
.consumer
.discover
import OpenIDServiceEndpoint
, OPENID_2_0_TYPE
10 class ErrorRaisingConsumer(GenericConsumer
):
12 A consumer whose _requestAssocation will return predefined results
13 instead of trying to actually perform association requests.
16 # The list of objects to be returned by successive calls to
17 # _requestAssocation. Each call will pop the first element from
18 # this list and return it to _negotiateAssociation. If the
19 # element is a Message object, it will be wrapped in a ServerError
20 # exception. Otherwise it will be returned as-is.
23 def _requestAssociation(self
, endpoint
, assoc_type
, session_type
):
24 m
= self
.return_messages
.pop(0)
25 if isinstance(m
, Message
):
26 raise ServerError
.fromMessage(m
)
30 class TestOpenID2SessionNegotiation(unittest
.TestCase
, CatchLogs
):
32 Test the session type negotiation behavior of an OpenID 2
37 self
.consumer
= ErrorRaisingConsumer(store
=None)
39 self
.endpoint
= OpenIDServiceEndpoint()
40 self
.endpoint
.type_uris
= [OPENID_2_0_TYPE
]
41 self
.endpoint
.server_url
= 'bogus'
43 def testBadResponse(self
):
45 Test the case where the response to an associate request is a
46 server error or is otherwise undecipherable.
48 self
.consumer
.return_messages
= [Message(self
.endpoint
.preferredNamespace())]
49 self
.assertEqual(self
.consumer
._negotiateAssociation
(self
.endpoint
), None)
50 self
.failUnlessLogMatches('Server error when requesting an association')
52 def testEmptyAssocType(self
):
54 Test the case where the association type (assoc_type) returned
55 in an unsupported-type response is absent.
57 msg
= Message(self
.endpoint
.preferredNamespace())
58 msg
.setArg(OPENID_NS
, 'error', 'Unsupported type')
59 msg
.setArg(OPENID_NS
, 'error_code', 'unsupported-type')
60 # not set: msg.delArg(OPENID_NS, 'assoc_type')
61 msg
.setArg(OPENID_NS
, 'session_type', 'new-session-type')
63 self
.consumer
.return_messages
= [msg
]
64 self
.assertEqual(self
.consumer
._negotiateAssociation
(self
.endpoint
), None)
66 self
.failUnlessLogMatches('Unsupported association type',
67 'Server responded with unsupported association ' +
68 'session but did not supply a fallback.')
70 def testEmptySessionType(self
):
72 Test the case where the session type (session_type) returned
73 in an unsupported-type response is absent.
75 msg
= Message(self
.endpoint
.preferredNamespace())
76 msg
.setArg(OPENID_NS
, 'error', 'Unsupported type')
77 msg
.setArg(OPENID_NS
, 'error_code', 'unsupported-type')
78 msg
.setArg(OPENID_NS
, 'assoc_type', 'new-assoc-type')
79 # not set: msg.setArg(OPENID_NS, 'session_type', None)
81 self
.consumer
.return_messages
= [msg
]
82 self
.assertEqual(self
.consumer
._negotiateAssociation
(self
.endpoint
), None)
84 self
.failUnlessLogMatches('Unsupported association type',
85 'Server responded with unsupported association ' +
86 'session but did not supply a fallback.')
88 def testNotAllowed(self
):
90 Test the case where an unsupported-type response specifies a
91 preferred (assoc_type, session_type) combination that is not
92 allowed by the consumer's SessionNegotiator.
96 negotiator
= association
.SessionNegotiator(allowed_types
)
97 self
.consumer
.negotiator
= negotiator
99 msg
= Message(self
.endpoint
.preferredNamespace())
100 msg
.setArg(OPENID_NS
, 'error', 'Unsupported type')
101 msg
.setArg(OPENID_NS
, 'error_code', 'unsupported-type')
102 msg
.setArg(OPENID_NS
, 'assoc_type', 'not-allowed')
103 msg
.setArg(OPENID_NS
, 'session_type', 'not-allowed')
105 self
.consumer
.return_messages
= [msg
]
106 self
.assertEqual(self
.consumer
._negotiateAssociation
(self
.endpoint
), None)
108 self
.failUnlessLogMatches('Unsupported association type',
109 'Server sent unsupported session/association type:')
111 def testUnsupportedWithRetry(self
):
113 Test the case where an unsupported-type response triggers a
114 retry to get an association with the new preferred type.
116 msg
= Message(self
.endpoint
.preferredNamespace())
117 msg
.setArg(OPENID_NS
, 'error', 'Unsupported type')
118 msg
.setArg(OPENID_NS
, 'error_code', 'unsupported-type')
119 msg
.setArg(OPENID_NS
, 'assoc_type', 'HMAC-SHA1')
120 msg
.setArg(OPENID_NS
, 'session_type', 'DH-SHA1')
122 assoc
= association
.Association(
123 'handle', 'secret', 'issued', 10000, 'HMAC-SHA1')
125 self
.consumer
.return_messages
= [msg
, assoc
]
126 self
.failUnless(self
.consumer
._negotiateAssociation
(self
.endpoint
) is assoc
)
128 self
.failUnlessLogMatches('Unsupported association type')
130 def testUnsupportedWithRetryAndFail(self
):
132 Test the case where an unsupported-typ response triggers a
133 retry, but the retry fails and None is returned instead.
135 msg
= Message(self
.endpoint
.preferredNamespace())
136 msg
.setArg(OPENID_NS
, 'error', 'Unsupported type')
137 msg
.setArg(OPENID_NS
, 'error_code', 'unsupported-type')
138 msg
.setArg(OPENID_NS
, 'assoc_type', 'HMAC-SHA1')
139 msg
.setArg(OPENID_NS
, 'session_type', 'DH-SHA1')
141 self
.consumer
.return_messages
= [msg
,
142 Message(self
.endpoint
.preferredNamespace())]
144 self
.failUnlessEqual(self
.consumer
._negotiateAssociation
(self
.endpoint
), None)
146 self
.failUnlessLogMatches('Unsupported association type',
147 'Server %s refused' % (self
.endpoint
.server_url
))
151 Test the valid case, wherein an association is returned on the
152 first attempt to get one.
154 assoc
= association
.Association(
155 'handle', 'secret', 'issued', 10000, 'HMAC-SHA1')
157 self
.consumer
.return_messages
= [assoc
]
158 self
.failUnless(self
.consumer
._negotiateAssociation
(self
.endpoint
) is assoc
)
159 self
.failUnlessLogEmpty()
161 class TestOpenID1SessionNegotiation(unittest
.TestCase
, CatchLogs
):
163 Tests for the OpenID 1 consumer association session behavior. See
164 the docs for TestOpenID2SessionNegotiation. Notice that this
165 class is not a subclass of the OpenID 2 tests. Instead, it uses
166 many of the same inputs but inspects the log messages logged with
167 oidutil.log. See the calls to self.failUnlessLogMatches. Some of
168 these tests pass openid2-style messages to the openid 1
169 association processing logic to be sure it ignores the extra data.
172 CatchLogs
.setUp(self
)
173 self
.consumer
= ErrorRaisingConsumer(store
=None)
175 self
.endpoint
= OpenIDServiceEndpoint()
176 self
.endpoint
.type_uris
= [OPENID1_NS
]
177 self
.endpoint
.server_url
= 'bogus'
179 def testBadResponse(self
):
180 self
.consumer
.return_messages
= [Message(self
.endpoint
.preferredNamespace())]
181 self
.assertEqual(self
.consumer
._negotiateAssociation
(self
.endpoint
), None)
182 self
.failUnlessLogMatches('Server error when requesting an association')
184 def testEmptyAssocType(self
):
185 msg
= Message(self
.endpoint
.preferredNamespace())
186 msg
.setArg(OPENID_NS
, 'error', 'Unsupported type')
187 msg
.setArg(OPENID_NS
, 'error_code', 'unsupported-type')
188 # not set: msg.setArg(OPENID_NS, 'assoc_type', None)
189 msg
.setArg(OPENID_NS
, 'session_type', 'new-session-type')
191 self
.consumer
.return_messages
= [msg
]
192 self
.assertEqual(self
.consumer
._negotiateAssociation
(self
.endpoint
), None)
194 self
.failUnlessLogMatches('Server error when requesting an association')
196 def testEmptySessionType(self
):
197 msg
= Message(self
.endpoint
.preferredNamespace())
198 msg
.setArg(OPENID_NS
, 'error', 'Unsupported type')
199 msg
.setArg(OPENID_NS
, 'error_code', 'unsupported-type')
200 msg
.setArg(OPENID_NS
, 'assoc_type', 'new-assoc-type')
201 # not set: msg.setArg(OPENID_NS, 'session_type', None)
203 self
.consumer
.return_messages
= [msg
]
204 self
.assertEqual(self
.consumer
._negotiateAssociation
(self
.endpoint
), None)
206 self
.failUnlessLogMatches('Server error when requesting an association')
208 def testNotAllowed(self
):
211 negotiator
= association
.SessionNegotiator(allowed_types
)
212 self
.consumer
.negotiator
= negotiator
214 msg
= Message(self
.endpoint
.preferredNamespace())
215 msg
.setArg(OPENID_NS
, 'error', 'Unsupported type')
216 msg
.setArg(OPENID_NS
, 'error_code', 'unsupported-type')
217 msg
.setArg(OPENID_NS
, 'assoc_type', 'not-allowed')
218 msg
.setArg(OPENID_NS
, 'session_type', 'not-allowed')
220 self
.consumer
.return_messages
= [msg
]
221 self
.assertEqual(self
.consumer
._negotiateAssociation
(self
.endpoint
), None)
223 self
.failUnlessLogMatches('Server error when requesting an association')
225 def testUnsupportedWithRetry(self
):
226 msg
= Message(self
.endpoint
.preferredNamespace())
227 msg
.setArg(OPENID_NS
, 'error', 'Unsupported type')
228 msg
.setArg(OPENID_NS
, 'error_code', 'unsupported-type')
229 msg
.setArg(OPENID_NS
, 'assoc_type', 'HMAC-SHA1')
230 msg
.setArg(OPENID_NS
, 'session_type', 'DH-SHA1')
232 assoc
= association
.Association(
233 'handle', 'secret', 'issued', 10000, 'HMAC-SHA1')
235 self
.consumer
.return_messages
= [msg
, assoc
]
236 self
.failUnless(self
.consumer
._negotiateAssociation
(self
.endpoint
) is None)
238 self
.failUnlessLogMatches('Server error when requesting an association')
241 assoc
= association
.Association(
242 'handle', 'secret', 'issued', 10000, 'HMAC-SHA1')
244 self
.consumer
.return_messages
= [assoc
]
245 self
.failUnless(self
.consumer
._negotiateAssociation
(self
.endpoint
) is assoc
)
246 self
.failUnlessLogEmpty()
248 class TestNegotiatorBehaviors(unittest
.TestCase
, CatchLogs
):
250 self
.allowed_types
= [
251 ('HMAC-SHA1', 'no-encryption'),
252 ('HMAC-SHA256', 'no-encryption'),
255 self
.n
= association
.SessionNegotiator(self
.allowed_types
)
257 def testAddAllowedTypeNoSessionTypes(self
):
258 self
.assertRaises(ValueError, self
.n
.addAllowedType
, 'invalid')
260 def testAddAllowedTypeBadSessionType(self
):
261 self
.assertRaises(ValueError, self
.n
.addAllowedType
, 'assoc1', 'invalid')
263 def testAddAllowedTypeContents(self
):
264 assoc_type
= 'HMAC-SHA1'
265 self
.failUnless(self
.n
.addAllowedType(assoc_type
) is None)
267 for typ
in association
.getSessionTypes(assoc_type
):
268 self
.failUnless((assoc_type
, typ
) in self
.n
.allowed_types
)
270 if __name__
== '__main__':