6 from openid
.message
import Message
, OPENID_NS
, OPENID2_NS
, IDENTIFIER_SELECT
, \
8 from openid
import cryptutil
, dh
, oidutil
, kvform
9 from openid
.store
.nonce
import mkNonce
, split
as splitNonce
10 from openid
.consumer
.discover
import OpenIDServiceEndpoint
, OPENID_2_0_TYPE
, \
12 from openid
.consumer
.consumer
import \
13 AuthRequest
, GenericConsumer
, SUCCESS
, FAILURE
, CANCEL
, SETUP_NEEDED
, \
14 SuccessResponse
, FailureResponse
, SetupNeededResponse
, CancelResponse
, \
15 DiffieHellmanSHA1ConsumerSession
, Consumer
, PlainTextConsumerSession
, \
16 SetupNeededError
, DiffieHellmanSHA256ConsumerSession
, ServerError
, \
17 ProtocolError
, _httpResponseToMessage
18 from openid
import association
19 from openid
.server
.server
import \
20 PlainTextServerSession
, DiffieHellmanSHA1ServerSession
21 from openid
.yadis
.manager
import Discovery
22 from openid
.yadis
.discover
import DiscoveryFailure
23 from openid
.dh
import DiffieHellman
25 from openid
.fetchers
import HTTPResponse
, HTTPFetchingError
26 from openid
import fetchers
27 from openid
.store
import memstore
29 from support
import CatchLogs
32 ('another 20-byte key.', 'Snarky'),
33 ('\x00' * 20, 'Zeros'),
36 def mkSuccess(endpoint
, q
):
37 """Convenience function to create a SuccessResponse with the given
38 arguments, all signed."""
39 signed_list
= ['openid.' + k
for k
in q
.keys()]
40 return SuccessResponse(endpoint
, Message
.fromOpenIDArgs(q
), signed_list
)
44 for (k
, v
) in cgi
.parse_qsl(qs
):
45 assert not q
.has_key(k
)
49 def associate(qs
, assoc_secret
, assoc_handle
):
50 """Do the server's half of the associate call, using the given
53 assert q
['openid.mode'] == 'associate'
54 assert q
['openid.assoc_type'] == 'HMAC-SHA1'
56 'assoc_type':'HMAC-SHA1',
57 'assoc_handle':assoc_handle
,
61 if q
.get('openid.session_type') == 'DH-SHA1':
62 assert len(q
) == 6 or len(q
) == 4
63 message
= Message
.fromPostArgs(q
)
64 session
= DiffieHellmanSHA1ServerSession
.fromMessage(message
)
65 reply_dict
['session_type'] = 'DH-SHA1'
68 session
= PlainTextServerSession
.fromQuery(q
)
70 reply_dict
.update(session
.answer(assoc_secret
))
71 return kvform
.dictToKV(reply_dict
)
74 GOODSIG
= "[A Good Signature]"
77 class GoodAssociation
:
81 def getExpiresIn(self
):
84 def checkMessageSignature(self
, message
):
85 return message
.getArg(OPENID_NS
, 'sig') == GOODSIG
88 class GoodAssocStore(memstore
.MemoryStore
):
89 def getAssociation(self
, server_url
, handle
=None):
90 return GoodAssociation()
93 class TestFetcher(object):
94 def __init__(self
, user_url
, user_page
, (assoc_secret
, assoc_handle
)):
95 self
.get_responses
= {user_url
:self
.response(user_url
, 200, user_page
)}
96 self
.assoc_secret
= assoc_secret
97 self
.assoc_handle
= assoc_handle
100 def response(self
, url
, status
, body
):
102 final_url
=url
, status
=status
, headers
={}, body
=body
)
104 def fetch(self
, url
, body
=None, headers
=None):
106 if url
in self
.get_responses
:
107 return self
.get_responses
[url
]
110 body
.index('openid.mode=associate')
114 assert body
.find('DH-SHA1') != -1
115 response
= associate(
116 body
, self
.assoc_secret
, self
.assoc_handle
)
118 return self
.response(url
, 200, response
)
120 return self
.response(url
, 404, 'Not found')
122 def makeFastConsumerSession():
124 Create custom DH object so tests run quickly.
126 dh
= DiffieHellman(100389557, 2)
127 return DiffieHellmanSHA1ConsumerSession(dh
)
129 def setConsumerSession(con
):
130 con
.session_types
= {'DH-SHA1': makeFastConsumerSession
}
132 def _test_success(server_url
, user_url
, delegate_url
, links
, immediate
=False):
133 store
= memstore
.MemoryStore()
135 mode
= 'checkid_immediate'
137 mode
= 'checkid_setup'
139 endpoint
= OpenIDServiceEndpoint()
140 endpoint
.claimed_id
= user_url
141 endpoint
.server_url
= server_url
142 endpoint
.local_id
= delegate_url
143 endpoint
.type_uris
= [OPENID_1_1_TYPE
]
145 fetcher
= TestFetcher(None, None, assocs
[0])
146 fetchers
.setDefaultFetcher(fetcher
, wrap_exceptions
=False)
149 trust_root
= consumer_url
151 consumer
= GenericConsumer(store
)
152 setConsumerSession(consumer
)
154 request
= consumer
.begin(endpoint
)
155 return_to
= consumer_url
157 m
= request
.getMessage(trust_root
, return_to
, immediate
)
159 redirect_url
= request
.redirectURL(trust_root
, return_to
, immediate
)
161 parsed
= urlparse
.urlparse(redirect_url
)
164 new_return_to
= q
['openid.return_to']
165 del q
['openid.return_to']
168 'openid.identity':delegate_url
,
169 'openid.trust_root':trust_root
,
170 'openid.assoc_handle':fetcher
.assoc_handle
,
171 }, (q
, user_url
, delegate_url
, mode
)
173 assert new_return_to
.startswith(return_to
)
174 assert redirect_url
.startswith(server_url
)
176 parsed
= urlparse
.urlparse(new_return_to
)
177 query
= parseQuery(parsed
[4])
179 'openid.mode':'id_res',
180 'openid.return_to':new_return_to
,
181 'openid.identity':delegate_url
,
182 'openid.assoc_handle':fetcher
.assoc_handle
,
185 assoc
= store
.getAssociation(server_url
, fetcher
.assoc_handle
)
187 message
= Message
.fromPostArgs(query
)
188 message
= assoc
.signMessage(message
)
189 info
= consumer
.complete(message
, request
.endpoint
, new_return_to
)
190 assert info
.status
== SUCCESS
, info
.message
191 assert info
.identity_url
== user_url
193 assert fetcher
.num_assocs
== 0
195 assert fetcher
.num_assocs
== 1
197 # Test that doing it again uses the existing association
199 assert fetcher
.num_assocs
== 1
201 # Another association is created if we remove the existing one
202 store
.removeAssociation(server_url
, fetcher
.assoc_handle
)
204 assert fetcher
.num_assocs
== 2
206 # Test that doing it again uses the existing association
208 assert fetcher
.num_assocs
== 2
212 http_server_url
= 'http://server.example.com/'
213 consumer_url
= 'http://consumer.example.com/'
214 https_server_url
= 'https://server.example.com/'
216 class TestSuccess(unittest
.TestCase
, CatchLogs
):
217 server_url
= http_server_url
218 user_url
= 'http://www.example.com/user.html'
219 delegate_url
= 'http://consumer.example.com/user'
222 CatchLogs
.setUp(self
)
223 self
.links
= '<link rel="openid.server" href="%s" />' % (
226 self
.delegate_links
= ('<link rel="openid.server" href="%s" />'
227 '<link rel="openid.delegate" href="%s" />') % (
228 self
.server_url
, self
.delegate_url
)
231 CatchLogs
.tearDown(self
)
233 def test_nodelegate(self
):
234 _test_success(self
.server_url
, self
.user_url
,
235 self
.user_url
, self
.links
)
237 def test_nodelegateImmediate(self
):
238 _test_success(self
.server_url
, self
.user_url
,
239 self
.user_url
, self
.links
, True)
241 def test_delegate(self
):
242 _test_success(self
.server_url
, self
.user_url
,
243 self
.delegate_url
, self
.delegate_links
)
245 def test_delegateImmediate(self
):
246 _test_success(self
.server_url
, self
.user_url
,
247 self
.delegate_url
, self
.delegate_links
, True)
250 class TestSuccessHTTPS(TestSuccess
):
251 server_url
= https_server_url
254 class TestConstruct(unittest
.TestCase
):
256 self
.store_sentinel
= object()
258 def test_construct(self
):
259 oidc
= GenericConsumer(self
.store_sentinel
)
260 self
.failUnless(oidc
.store
is self
.store_sentinel
)
262 def test_nostore(self
):
263 self
.failUnlessRaises(TypeError, GenericConsumer
)
266 class TestIdRes(unittest
.TestCase
, CatchLogs
):
267 consumer_class
= GenericConsumer
270 CatchLogs
.setUp(self
)
272 self
.store
= memstore
.MemoryStore()
273 self
.consumer
= self
.consumer_class(self
.store
)
274 self
.return_to
= "nonny"
275 self
.endpoint
= OpenIDServiceEndpoint()
276 self
.endpoint
.claimed_id
= self
.consumer_id
= "consu"
277 self
.endpoint
.server_url
= self
.server_url
= "serlie"
278 self
.endpoint
.local_id
= self
.server_id
= "sirod"
279 self
.endpoint
.type_uris
= [OPENID_1_1_TYPE
]
281 def disableDiscoveryVerification(self
):
282 """Set the discovery verification to a no-op for test cases in
283 which we don't care."""
284 def dummyVerifyDiscover(_
, endpoint
):
286 self
.consumer
._verifyDiscoveryResults
= dummyVerifyDiscover
288 def disableReturnToChecking(self
):
289 def checkReturnTo(unused1
, unused2
):
291 self
.consumer
._checkReturnTo
= checkReturnTo
292 complete
= self
.consumer
.complete
293 def callCompleteWithoutReturnTo(message
, endpoint
):
294 return complete(message
, endpoint
, None)
295 self
.consumer
.complete
= callCompleteWithoutReturnTo
297 class TestIdResCheckSignature(TestIdRes
):
299 TestIdRes
.setUp(self
)
300 self
.assoc
= GoodAssociation()
301 self
.assoc
.handle
= "{not_dumb}"
302 self
.store
.storeAssociation(self
.endpoint
.server_url
, self
.assoc
)
304 self
.message
= Message
.fromPostArgs({
305 'openid.mode': 'id_res',
306 'openid.identity': '=example',
307 'openid.sig': GOODSIG
,
308 'openid.assoc_handle': self
.assoc
.handle
,
309 'openid.signed': 'mode,identity,assoc_handle,signed',
315 # assoc_handle to assoc with good sig
316 self
.consumer
._idResCheckSignature
(self
.message
,
317 self
.endpoint
.server_url
)
320 def test_signFailsWithBadSig(self
):
321 self
.message
.setArg(OPENID_NS
, 'sig', 'BAD SIGNATURE')
322 self
.failUnlessRaises(
323 ProtocolError
, self
.consumer
._idResCheckSignature
,
324 self
.message
, self
.endpoint
.server_url
)
327 def test_stateless(self
):
328 # assoc_handle missing assoc, consumer._checkAuth returns goodthings
329 self
.message
.setArg(OPENID_NS
, "assoc_handle", "dumbHandle")
330 self
.consumer
._processCheckAuthResponse
= (
331 lambda response
, server_url
: True)
332 self
.consumer
._makeKVPost
= lambda args
, server_url
: {}
333 self
.consumer
._idResCheckSignature
(self
.message
,
334 self
.endpoint
.server_url
)
336 def test_statelessRaisesError(self
):
337 # assoc_handle missing assoc, consumer._checkAuth returns goodthings
338 self
.message
.setArg(OPENID_NS
, "assoc_handle", "dumbHandle")
339 self
.consumer
._checkAuth
= lambda unused1
, unused2
: False
340 self
.failUnlessRaises(
341 ProtocolError
, self
.consumer
._idResCheckSignature
,
342 self
.message
, self
.endpoint
.server_url
)
344 def test_stateless_noStore(self
):
345 # assoc_handle missing assoc, consumer._checkAuth returns goodthings
346 self
.message
.setArg(OPENID_NS
, "assoc_handle", "dumbHandle")
347 self
.consumer
.store
= None
348 self
.consumer
._processCheckAuthResponse
= (
349 lambda response
, server_url
: True)
350 self
.consumer
._makeKVPost
= lambda args
, server_url
: {}
351 self
.consumer
._idResCheckSignature
(self
.message
,
352 self
.endpoint
.server_url
)
354 def test_statelessRaisesError_noStore(self
):
355 # assoc_handle missing assoc, consumer._checkAuth returns goodthings
356 self
.message
.setArg(OPENID_NS
, "assoc_handle", "dumbHandle")
357 self
.consumer
._checkAuth
= lambda unused1
, unused2
: False
358 self
.consumer
.store
= None
359 self
.failUnlessRaises(
360 ProtocolError
, self
.consumer
._idResCheckSignature
,
361 self
.message
, self
.endpoint
.server_url
)
364 class TestQueryFormat(TestIdRes
):
365 def test_notAList(self
):
366 # XXX: should be a Message object test, not a consumer test
368 # Value should be a single string. If it's a list, it should generate
370 query
= {'openid.mode': ['cancel']}
372 r
= Message
.fromPostArgs(query
)
373 except TypeError, err
:
374 self
.failUnless(str(err
).find('values') != -1, err
)
376 self
.fail("expected TypeError, got this instead: %s" % (r
,))
378 class TestComplete(TestIdRes
):
379 """Testing GenericConsumer.complete.
381 Other TestIdRes subclasses test more specific aspects.
384 def test_setupNeededIdRes(self
):
385 message
= Message
.fromOpenIDArgs({'mode': 'id_res'})
386 setup_url_sentinel
= object()
388 def raiseSetupNeeded(msg
):
389 self
.failUnless(msg
is message
)
390 raise SetupNeededError(setup_url_sentinel
)
392 self
.consumer
._checkSetupNeeded
= raiseSetupNeeded
394 response
= self
.consumer
.complete(message
, None, None)
395 self
.failUnlessEqual(SETUP_NEEDED
, response
.status
)
396 self
.failUnless(setup_url_sentinel
is response
.setup_url
)
398 def test_cancel(self
):
399 message
= Message
.fromPostArgs({'openid.mode': 'cancel'})
400 self
.disableReturnToChecking()
401 r
= self
.consumer
.complete(message
, self
.endpoint
)
402 self
.failUnlessEqual(r
.status
, CANCEL
)
403 self
.failUnless(r
.identity_url
== self
.endpoint
.claimed_id
)
405 def test_cancel_with_return_to(self
):
406 message
= Message
.fromPostArgs({'openid.mode': 'cancel'})
407 r
= self
.consumer
.complete(message
, self
.endpoint
, self
.return_to
)
408 self
.failUnlessEqual(r
.status
, CANCEL
)
409 self
.failUnless(r
.identity_url
== self
.endpoint
.claimed_id
)
411 def test_error(self
):
412 msg
= 'an error message'
413 message
= Message
.fromPostArgs({'openid.mode': 'error',
416 self
.disableReturnToChecking()
417 r
= self
.consumer
.complete(message
, self
.endpoint
)
418 self
.failUnlessEqual(r
.status
, FAILURE
)
419 self
.failUnless(r
.identity_url
== self
.endpoint
.claimed_id
)
420 self
.failUnlessEqual(r
.message
, msg
)
422 def test_errorWithNoOptionalKeys(self
):
423 msg
= 'an error message'
424 contact
= 'some contact info here'
425 message
= Message
.fromPostArgs({'openid.mode': 'error',
427 'openid.contact': contact
,
429 self
.disableReturnToChecking()
430 r
= self
.consumer
.complete(message
, self
.endpoint
)
431 self
.failUnlessEqual(r
.status
, FAILURE
)
432 self
.failUnless(r
.identity_url
== self
.endpoint
.claimed_id
)
433 self
.failUnless(r
.contact
== contact
)
434 self
.failUnless(r
.reference
is None)
435 self
.failUnlessEqual(r
.message
, msg
)
437 def test_errorWithOptionalKeys(self
):
438 msg
= 'an error message'
440 reference
= 'support ticket'
441 message
= Message
.fromPostArgs({'openid.mode': 'error',
442 'openid.error': msg
, 'openid.reference': reference
,
443 'openid.contact': contact
, 'openid.ns': OPENID2_NS
,
445 r
= self
.consumer
.complete(message
, self
.endpoint
, None)
446 self
.failUnlessEqual(r
.status
, FAILURE
)
447 self
.failUnless(r
.identity_url
== self
.endpoint
.claimed_id
)
448 self
.failUnless(r
.contact
== contact
)
449 self
.failUnless(r
.reference
== reference
)
450 self
.failUnlessEqual(r
.message
, msg
)
452 def test_noMode(self
):
453 message
= Message
.fromPostArgs({})
454 r
= self
.consumer
.complete(message
, self
.endpoint
, None)
455 self
.failUnlessEqual(r
.status
, FAILURE
)
456 self
.failUnless(r
.identity_url
== self
.endpoint
.claimed_id
)
458 def test_idResMissingField(self
):
459 # XXX - this test is passing, but not necessarily by what it
460 # is supposed to test for. status in FAILURE, but it's because
461 # *check_auth* failed, not because it's missing an arg, exactly.
462 message
= Message
.fromPostArgs({'openid.mode': 'id_res'})
463 self
.failUnlessRaises(ProtocolError
, self
.consumer
._doIdRes
,
464 message
, self
.endpoint
, None)
466 def test_idResURLMismatch(self
):
467 class VerifiedError(Exception): pass
469 def discoverAndVerify(claimed_id
, _to_match_endpoints
):
472 self
.consumer
._discoverAndVerify
= discoverAndVerify
473 self
.disableReturnToChecking()
475 message
= Message
.fromPostArgs(
476 {'openid.mode': 'id_res',
477 'openid.return_to': 'return_to (just anything)',
478 'openid.identity': 'something wrong (not self.consumer_id)',
479 'openid.assoc_handle': 'does not matter',
480 'openid.sig': GOODSIG
,
481 'openid.signed': 'identity,return_to',
483 self
.consumer
.store
= GoodAssocStore()
485 self
.failUnlessRaises(VerifiedError
,
486 self
.consumer
.complete
,
487 message
, self
.endpoint
)
489 self
.failUnlessLogMatches('Error attempting to use stored',
490 'Attempting discovery')
492 class TestCompleteMissingSig(unittest
.TestCase
, CatchLogs
):
495 self
.store
= GoodAssocStore()
496 self
.consumer
= GenericConsumer(self
.store
)
497 self
.server_url
= "http://idp.unittest/"
498 CatchLogs
.setUp(self
)
500 claimed_id
= 'bogus.claimed'
502 self
.message
= Message
.fromOpenIDArgs(
504 'return_to': 'return_to (just anything)',
505 'identity': claimed_id
,
506 'assoc_handle': 'does not matter',
508 'response_nonce': mkNonce(),
509 'signed': 'identity,return_to,response_nonce,assoc_handle,claimed_id',
510 'claimed_id': claimed_id
,
511 'op_endpoint': self
.server_url
,
515 self
.endpoint
= OpenIDServiceEndpoint()
516 self
.endpoint
.server_url
= self
.server_url
517 self
.endpoint
.claimed_id
= claimed_id
518 self
.consumer
._checkReturnTo
= lambda unused1
, unused2
: True
521 CatchLogs
.tearDown(self
)
524 def test_idResMissingNoSigs(self
):
525 def _vrfy(resp_msg
, endpoint
=None):
528 self
.consumer
._verifyDiscoveryResults
= _vrfy
529 r
= self
.consumer
.complete(self
.message
, self
.endpoint
, None)
530 self
.failUnlessSuccess(r
)
533 def test_idResNoIdentity(self
):
534 self
.message
.delArg(OPENID_NS
, 'identity')
535 self
.message
.delArg(OPENID_NS
, 'claimed_id')
536 self
.endpoint
.claimed_id
= None
537 self
.message
.setArg(OPENID_NS
, 'signed', 'return_to,response_nonce,assoc_handle')
538 r
= self
.consumer
.complete(self
.message
, self
.endpoint
, None)
539 self
.failUnlessSuccess(r
)
542 def test_idResMissingIdentitySig(self
):
543 self
.message
.setArg(OPENID_NS
, 'signed', 'return_to,response_nonce,assoc_handle,claimed_id')
544 r
= self
.consumer
.complete(self
.message
, self
.endpoint
, None)
545 self
.failUnlessEqual(r
.status
, FAILURE
)
548 def test_idResMissingReturnToSig(self
):
549 self
.message
.setArg(OPENID_NS
, 'signed', 'identity,response_nonce,assoc_handle,claimed_id')
550 r
= self
.consumer
.complete(self
.message
, self
.endpoint
, None)
551 self
.failUnlessEqual(r
.status
, FAILURE
)
554 def test_idResMissingAssocHandleSig(self
):
555 self
.message
.setArg(OPENID_NS
, 'signed', 'identity,response_nonce,return_to,claimed_id')
556 r
= self
.consumer
.complete(self
.message
, self
.endpoint
, None)
557 self
.failUnlessEqual(r
.status
, FAILURE
)
560 def test_idResMissingClaimedIDSig(self
):
561 self
.message
.setArg(OPENID_NS
, 'signed', 'identity,response_nonce,return_to,assoc_handle')
562 r
= self
.consumer
.complete(self
.message
, self
.endpoint
, None)
563 self
.failUnlessEqual(r
.status
, FAILURE
)
566 def failUnlessSuccess(self
, response
):
567 if response
.status
!= SUCCESS
:
568 self
.fail("Non-successful response: %s" % (response
,))
572 class TestCheckAuthResponse(TestIdRes
, CatchLogs
):
574 CatchLogs
.setUp(self
)
575 TestIdRes
.setUp(self
)
578 CatchLogs
.tearDown(self
)
580 def _createAssoc(self
):
583 assoc
= association
.Association(
584 'handle', 'secret', issued
, lifetime
, 'HMAC-SHA1')
585 store
= self
.consumer
.store
586 store
.storeAssociation(self
.server_url
, assoc
)
587 assoc2
= store
.getAssociation(self
.server_url
)
588 self
.failUnlessEqual(assoc
, assoc2
)
590 def test_goodResponse(self
):
591 """successful response to check_authentication"""
592 response
= Message
.fromOpenIDArgs({'is_valid':'true',})
593 r
= self
.consumer
._processCheckAuthResponse
(response
, self
.server_url
)
596 def test_missingAnswer(self
):
597 """check_authentication returns false when the server sends no answer"""
598 response
= Message
.fromOpenIDArgs({})
599 r
= self
.consumer
._processCheckAuthResponse
(response
, self
.server_url
)
602 def test_badResponse(self
):
603 """check_authentication returns false when is_valid is false"""
604 response
= Message
.fromOpenIDArgs({'is_valid':'false',})
605 r
= self
.consumer
._processCheckAuthResponse
(response
, self
.server_url
)
608 def test_badResponseInvalidate(self
):
609 """Make sure that the handle is invalidated when is_valid is false
611 From "Verifying directly with the OpenID Provider"::
613 If the OP responds with "is_valid" set to "true", and
614 "invalidate_handle" is present, the Relying Party SHOULD
615 NOT send further authentication requests with that handle.
618 response
= Message
.fromOpenIDArgs({
620 'invalidate_handle':'handle',
622 r
= self
.consumer
._processCheckAuthResponse
(response
, self
.server_url
)
625 self
.consumer
.store
.getAssociation(self
.server_url
) is None)
627 def test_invalidateMissing(self
):
628 """invalidate_handle with a handle that is not present"""
629 response
= Message
.fromOpenIDArgs({
631 'invalidate_handle':'missing',
633 r
= self
.consumer
._processCheckAuthResponse
(response
, self
.server_url
)
635 self
.failUnlessLogMatches(
636 'Received "invalidate_handle"'
639 def test_invalidateMissing_noStore(self
):
640 """invalidate_handle with a handle that is not present"""
641 response
= Message
.fromOpenIDArgs({
643 'invalidate_handle':'missing',
645 self
.consumer
.store
= None
646 r
= self
.consumer
._processCheckAuthResponse
(response
, self
.server_url
)
648 self
.failUnlessLogMatches(
649 'Received "invalidate_handle"',
650 'Unexpectedly got invalidate_handle without a store')
652 def test_invalidatePresent(self
):
653 """invalidate_handle with a handle that exists
655 From "Verifying directly with the OpenID Provider"::
657 If the OP responds with "is_valid" set to "true", and
658 "invalidate_handle" is present, the Relying Party SHOULD
659 NOT send further authentication requests with that handle.
662 response
= Message
.fromOpenIDArgs({
664 'invalidate_handle':'handle',
666 r
= self
.consumer
._processCheckAuthResponse
(response
, self
.server_url
)
669 self
.consumer
.store
.getAssociation(self
.server_url
) is None)
671 class TestSetupNeeded(TestIdRes
):
672 def failUnlessSetupNeeded(self
, expected_setup_url
, message
):
674 self
.consumer
._checkSetupNeeded
(message
)
675 except SetupNeededError
, why
:
676 self
.failUnlessEqual(expected_setup_url
, why
.user_setup_url
)
678 self
.fail("Expected to find an immediate-mode response")
680 def test_setupNeededOpenID1(self
):
681 """The minimum conditions necessary to trigger Setup Needed"""
682 setup_url
= 'http://unittest/setup-here'
683 message
= Message
.fromPostArgs({
684 'openid.mode': 'id_res',
685 'openid.user_setup_url': setup_url
,
687 self
.failUnless(message
.isOpenID1())
688 self
.failUnlessSetupNeeded(setup_url
, message
)
690 def test_setupNeededOpenID1_extra(self
):
691 """Extra stuff along with setup_url still trigger Setup Needed"""
692 setup_url
= 'http://unittest/setup-here'
693 message
= Message
.fromPostArgs({
694 'openid.mode': 'id_res',
695 'openid.user_setup_url': setup_url
,
696 'openid.identity': 'bogus',
698 self
.failUnless(message
.isOpenID1())
699 self
.failUnlessSetupNeeded(setup_url
, message
)
701 def test_noSetupNeededOpenID1(self
):
702 """When the user_setup_url is missing on an OpenID 1 message,
703 we assume that it's not a cancel response to checkid_immediate"""
704 message
= Message
.fromOpenIDArgs({'mode': 'id_res'})
705 self
.failUnless(message
.isOpenID1())
707 # No SetupNeededError raised
708 self
.consumer
._checkSetupNeeded
(message
)
710 def test_setupNeededOpenID2(self
):
711 message
= Message
.fromOpenIDArgs({
712 'mode':'setup_needed',
715 self
.failUnless(message
.isOpenID2())
716 response
= self
.consumer
.complete(message
, None, None)
717 self
.failUnlessEqual('setup_needed', response
.status
)
718 self
.failUnlessEqual(None, response
.setup_url
)
720 def test_setupNeededDoesntWorkForOpenID1(self
):
721 message
= Message
.fromOpenIDArgs({
722 'mode':'setup_needed',
725 # No SetupNeededError raised
726 self
.consumer
._checkSetupNeeded
(message
)
728 response
= self
.consumer
.complete(message
, None, None)
729 self
.failUnlessEqual('failure', response
.status
)
730 self
.failUnless(response
.message
.startswith('Invalid openid.mode'))
732 def test_noSetupNeededOpenID2(self
):
733 message
= Message
.fromOpenIDArgs({
735 'game':'puerto_rico',
738 self
.failUnless(message
.isOpenID2())
740 # No SetupNeededError raised
741 self
.consumer
._checkSetupNeeded
(message
)
743 class IdResCheckForFieldsTest(TestIdRes
):
745 self
.consumer
= GenericConsumer(None)
747 def mkSuccessTest(openid_args
, signed_list
):
749 message
= Message
.fromOpenIDArgs(openid_args
)
750 message
.setArg(OPENID_NS
, 'signed', ','.join(signed_list
))
751 self
.consumer
._idResCheckForFields
(message
)
754 test_openid1Success
= mkSuccessTest(
755 {'return_to':'return',
756 'assoc_handle':'assoc handle',
758 'identity':'someone',
760 ['return_to', 'identity'])
762 test_openid2Success
= mkSuccessTest(
764 'return_to':'return',
765 'assoc_handle':'assoc handle',
767 'op_endpoint':'my favourite server',
768 'response_nonce':'use only once',
770 ['return_to', 'response_nonce', 'assoc_handle'])
772 test_openid2Success_identifiers
= mkSuccessTest(
774 'return_to':'return',
775 'assoc_handle':'assoc handle',
777 'claimed_id':'i claim to be me',
778 'identity':'my server knows me as me',
779 'op_endpoint':'my favourite server',
780 'response_nonce':'use only once',
782 ['return_to', 'response_nonce', 'identity',
783 'claimed_id', 'assoc_handle'])
785 def mkFailureTest(openid_args
, signed_list
):
787 message
= Message
.fromOpenIDArgs(openid_args
)
789 self
.consumer
._idResCheckForFields
(message
)
790 except ProtocolError
, why
:
791 self
.failUnless(why
[0].startswith('Missing required'))
793 self
.fail('Expected an error, but none occurred')
796 test_openid1Missing_returnToSig
= mkFailureTest(
797 {'return_to':'return',
798 'assoc_handle':'assoc handle',
800 'identity':'someone',
804 test_openid1Missing_identitySig
= mkFailureTest(
805 {'return_to':'return',
806 'assoc_handle':'assoc handle',
808 'identity':'someone',
812 test_openid1MissingReturnTo
= mkFailureTest(
813 {'assoc_handle':'assoc handle',
815 'identity':'someone',
817 ['return_to', 'identity'])
819 test_openid1MissingAssocHandle
= mkFailureTest(
820 {'return_to':'return',
822 'identity':'someone',
824 ['return_to', 'identity'])
826 # XXX: I could go on...
828 class CheckAuthHappened(Exception): pass
830 class CheckNonceVerifyTest(TestIdRes
, CatchLogs
):
832 CatchLogs
.setUp(self
)
833 TestIdRes
.setUp(self
)
834 self
.consumer
.openid1_nonce_query_arg_name
= 'nonce'
837 CatchLogs
.tearDown(self
)
839 def test_openid1Success(self
):
840 """use consumer-generated nonce"""
841 nonce_value
= mkNonce()
842 self
.return_to
= 'http://rt.unittest/?nonce=%s' % (nonce_value
,)
843 self
.response
= Message
.fromOpenIDArgs({'return_to': self
.return_to
})
844 self
.response
.setArg(BARE_NS
, 'nonce', nonce_value
)
845 self
.consumer
._idResCheckNonce
(self
.response
, self
.endpoint
)
846 self
.failUnlessLogEmpty()
848 def test_openid1Missing(self
):
849 """use consumer-generated nonce"""
850 self
.response
= Message
.fromOpenIDArgs({})
851 n
= self
.consumer
._idResGetNonceOpenID
1(self
.response
, self
.endpoint
)
852 self
.failUnless(n
is None, n
)
853 self
.failUnlessLogEmpty()
855 def test_consumerNonceOpenID2(self
):
856 """OpenID 2 does not use consumer-generated nonce"""
857 self
.return_to
= 'http://rt.unittest/?nonce=%s' % (mkNonce(),)
858 self
.response
= Message
.fromOpenIDArgs(
859 {'return_to': self
.return_to
, 'ns':OPENID2_NS
})
860 self
.failUnlessRaises(ProtocolError
, self
.consumer
._idResCheckNonce
,
861 self
.response
, self
.endpoint
)
862 self
.failUnlessLogEmpty()
864 def test_serverNonce(self
):
865 """use server-generated nonce"""
866 self
.response
= Message
.fromOpenIDArgs(
867 {'ns':OPENID2_NS
, 'response_nonce': mkNonce(),})
868 self
.consumer
._idResCheckNonce
(self
.response
, self
.endpoint
)
869 self
.failUnlessLogEmpty()
871 def test_serverNonceOpenID1(self
):
872 """OpenID 1 does not use server-generated nonce"""
873 self
.response
= Message
.fromOpenIDArgs(
875 'return_to': 'http://return.to/',
876 'response_nonce': mkNonce(),})
877 self
.failUnlessRaises(ProtocolError
, self
.consumer
._idResCheckNonce
,
878 self
.response
, self
.endpoint
)
879 self
.failUnlessLogEmpty()
881 def test_badNonce(self
):
882 """remove the nonce from the store
884 From "Checking the Nonce"::
886 When the Relying Party checks the signature on an assertion, the
888 Relying Party SHOULD ensure that an assertion has not yet
889 been accepted with the same value for "openid.response_nonce"
890 from the same OP Endpoint URL.
893 stamp
, salt
= splitNonce(nonce
)
894 self
.store
.useNonce(self
.server_url
, stamp
, salt
)
895 self
.response
= Message
.fromOpenIDArgs(
896 {'response_nonce': nonce
,
899 self
.failUnlessRaises(ProtocolError
, self
.consumer
._idResCheckNonce
,
900 self
.response
, self
.endpoint
)
902 def test_successWithNoStore(self
):
903 """When there is no store, checking the nonce succeeds"""
904 self
.consumer
.store
= None
905 self
.response
= Message
.fromOpenIDArgs(
906 {'response_nonce': mkNonce(),
909 self
.consumer
._idResCheckNonce
(self
.response
, self
.endpoint
)
910 self
.failUnlessLogEmpty()
912 def test_tamperedNonce(self
):
913 """Malformed nonce"""
914 self
.response
= Message
.fromOpenIDArgs(
916 'response_nonce':'malformed'})
917 self
.failUnlessRaises(ProtocolError
, self
.consumer
._idResCheckNonce
,
918 self
.response
, self
.endpoint
)
920 def test_missingNonce(self
):
921 """no nonce parameter on the return_to"""
922 self
.response
= Message
.fromOpenIDArgs(
923 {'return_to': self
.return_to
})
924 self
.failUnlessRaises(ProtocolError
, self
.consumer
._idResCheckNonce
,
925 self
.response
, self
.endpoint
)
927 class CheckAuthDetectingConsumer(GenericConsumer
):
928 def _checkAuth(self
, *args
):
929 raise CheckAuthHappened(args
)
931 def _idResCheckNonce(self
, *args
):
932 """We're not testing nonce-checking, so just return success
936 class TestCheckAuthTriggered(TestIdRes
, CatchLogs
):
937 consumer_class
= CheckAuthDetectingConsumer
940 TestIdRes
.setUp(self
)
941 CatchLogs
.setUp(self
)
942 self
.disableDiscoveryVerification()
944 def test_checkAuthTriggered(self
):
945 message
= Message
.fromPostArgs({
946 'openid.return_to':self
.return_to
,
947 'openid.identity':self
.server_id
,
948 'openid.assoc_handle':'not_found',
949 'openid.sig': GOODSIG
,
950 'openid.signed': 'identity,return_to',
952 self
.disableReturnToChecking()
954 result
= self
.consumer
._doIdRes
(message
, self
.endpoint
, None)
955 except CheckAuthHappened
:
958 self
.fail('_checkAuth did not happen. Result was: %r %s' %
959 (result
, self
.messages
))
961 def test_checkAuthTriggeredWithAssoc(self
):
962 # Store an association for this server that does not match the
963 # handle that is in the message
966 assoc
= association
.Association(
967 'handle', 'secret', issued
, lifetime
, 'HMAC-SHA1')
968 self
.store
.storeAssociation(self
.server_url
, assoc
)
969 self
.disableReturnToChecking()
970 message
= Message
.fromPostArgs({
971 'openid.return_to':self
.return_to
,
972 'openid.identity':self
.server_id
,
973 'openid.assoc_handle':'not_found',
974 'openid.sig': GOODSIG
,
975 'openid.signed': 'identity,return_to',
978 result
= self
.consumer
._doIdRes
(message
, self
.endpoint
, None)
979 except CheckAuthHappened
:
982 self
.fail('_checkAuth did not happen. Result was: %r' % (result
,))
984 def test_expiredAssoc(self
):
985 # Store an expired association for the server with the handle
986 # that is in the message
987 issued
= time
.time() - 10
990 assoc
= association
.Association(
991 handle
, 'secret', issued
, lifetime
, 'HMAC-SHA1')
992 self
.failUnless(assoc
.expiresIn
<= 0)
993 self
.store
.storeAssociation(self
.server_url
, assoc
)
995 message
= Message
.fromPostArgs({
996 'openid.return_to':self
.return_to
,
997 'openid.identity':self
.server_id
,
998 'openid.assoc_handle':handle
,
999 'openid.sig': GOODSIG
,
1000 'openid.signed': 'identity,return_to',
1002 self
.disableReturnToChecking()
1003 self
.failUnlessRaises(ProtocolError
, self
.consumer
._doIdRes
,
1004 message
, self
.endpoint
, None)
1006 def test_newerAssoc(self
):
1009 good_issued
= time
.time() - 10
1010 good_handle
= 'handle'
1011 good_assoc
= association
.Association(
1012 good_handle
, 'secret', good_issued
, lifetime
, 'HMAC-SHA1')
1013 self
.store
.storeAssociation(self
.server_url
, good_assoc
)
1015 bad_issued
= time
.time() - 5
1016 bad_handle
= 'handle2'
1017 bad_assoc
= association
.Association(
1018 bad_handle
, 'secret', bad_issued
, lifetime
, 'HMAC-SHA1')
1019 self
.store
.storeAssociation(self
.server_url
, bad_assoc
)
1022 'return_to':self
.return_to
,
1023 'identity':self
.server_id
,
1024 'assoc_handle':good_handle
,
1027 message
= Message
.fromOpenIDArgs(query
)
1028 message
= good_assoc
.signMessage(message
)
1029 self
.disableReturnToChecking()
1030 info
= self
.consumer
._doIdRes
(message
, self
.endpoint
, None)
1031 self
.failUnlessEqual(info
.status
, SUCCESS
, info
.message
)
1032 self
.failUnlessEqual(self
.consumer_id
, info
.identity_url
)
1036 class TestReturnToArgs(unittest
.TestCase
):
1037 """Verifying the Return URL paramaters.
1038 From the specification "Verifying the Return URL"::
1040 To verify that the "openid.return_to" URL matches the URL that is
1041 processing this assertion:
1043 - The URL scheme, authority, and path MUST be the same between the
1046 - Any query parameters that are present in the "openid.return_to"
1047 URL MUST also be present with the same values in the
1050 XXX: So far we have only tested the second item on the list above.
1051 XXX: _verifyReturnToArgs is not invoked anywhere.
1056 self
.consumer
= GenericConsumer(store
)
1058 def test_returnToArgsOkay(self
):
1060 'openid.mode': 'id_res',
1061 'openid.return_to': 'http://example.com/?foo=bar',
1064 # no return value, success is assumed if there are no exceptions.
1065 self
.consumer
._verifyReturnToArgs
(query
)
1067 def test_returnToArgsUnexpectedArg(self
):
1069 'openid.mode': 'id_res',
1070 'openid.return_to': 'http://example.com/',
1073 # no return value, success is assumed if there are no exceptions.
1074 self
.failUnlessRaises(ProtocolError
,
1075 self
.consumer
._verifyReturnToArgs
, query
)
1077 def test_returnToMismatch(self
):
1079 'openid.mode': 'id_res',
1080 'openid.return_to': 'http://example.com/?foo=bar',
1082 # fail, query has no key 'foo'.
1083 self
.failUnlessRaises(ValueError,
1084 self
.consumer
._verifyReturnToArgs
, query
)
1086 query
['foo'] = 'baz'
1087 # fail, values for 'foo' do not match.
1088 self
.failUnlessRaises(ValueError,
1089 self
.consumer
._verifyReturnToArgs
, query
)
1092 def test_noReturnTo(self
):
1093 query
= {'openid.mode': 'id_res'}
1094 self
.failUnlessRaises(ValueError,
1095 self
.consumer
._verifyReturnToArgs
, query
)
1097 def test_completeBadReturnTo(self
):
1098 """Test GenericConsumer.complete()'s handling of bad return_to
1101 return_to
= "http://some.url/path?foo=bar"
1103 # Scheme, authority, and path differences are checked by
1104 # GenericConsumer._checkReturnTo. Query args checked by
1105 # GenericConsumer._verifyReturnToArgs.
1108 "https://some.url/path?foo=bar",
1110 "http://some.url.invalid/path?foo=bar",
1112 "http://some.url/path_extra?foo=bar",
1114 "http://some.url/path?foo=bar2",
1115 "http://some.url/path?foo2=bar",
1118 m
= Message(OPENID1_NS
)
1119 m
.setArg(OPENID_NS
, 'mode', 'cancel')
1120 m
.setArg(BARE_NS
, 'foo', 'bar')
1123 for bad
in bad_return_tos
:
1124 m
.setArg(OPENID_NS
, 'return_to', bad
)
1125 self
.failIf(self
.consumer
._checkReturnTo
(m
, return_to
))
1127 def test_completeGoodReturnTo(self
):
1128 """Test GenericConsumer.complete()'s handling of good
1131 return_to
= "http://some.url/path"
1135 (return_to
+ "?another=arg", {(BARE_NS
, 'another'): 'arg'}),
1136 (return_to
+ "?another=arg#fragment", {(BARE_NS
, 'another'): 'arg'}),
1137 ("HTTP"+return_to
[4:], {}),
1138 (return_to
.replace('url','URL'), {}),
1139 ("http://some.url:80/path", {}),
1140 ("http://some.url/p%61th", {}),
1141 ("http://some.url/./path", {}),
1146 for good
, extra
in good_return_tos
:
1147 m
= Message(OPENID1_NS
)
1148 m
.setArg(OPENID_NS
, 'mode', 'cancel')
1150 for ns
, key
in extra
:
1151 m
.setArg(ns
, key
, extra
[(ns
, key
)])
1153 m
.setArg(OPENID_NS
, 'return_to', good
)
1154 result
= self
.consumer
.complete(m
, endpoint
, return_to
)
1155 self
.failUnless(isinstance(result
, CancelResponse
), \
1156 "Expected CancelResponse, got %r for %s" % (result
, good
,))
1158 class MockFetcher(object):
1159 def __init__(self
, response
=None):
1160 self
.response
= response
or HTTPResponse()
1163 def fetch(self
, url
, body
=None, headers
=None):
1164 self
.fetches
.append((url
, body
, headers
))
1165 return self
.response
1167 class ExceptionRaisingMockFetcher(object):
1168 class MyException(Exception):
1171 def fetch(self
, url
, body
=None, headers
=None):
1172 raise self
.MyException('mock fetcher exception')
1174 class BadArgCheckingConsumer(GenericConsumer
):
1175 def _makeKVPost(self
, args
, _
):
1177 'openid.mode':'check_authentication',
1178 'openid.signed':'foo',
1179 'openid.ns':OPENID1_NS
1183 class TestCheckAuth(unittest
.TestCase
, CatchLogs
):
1184 consumer_class
= GenericConsumer
1187 CatchLogs
.setUp(self
)
1188 self
.store
= memstore
.MemoryStore()
1190 self
.consumer
= self
.consumer_class(self
.store
)
1192 self
._orig
_fetcher
= fetchers
.getDefaultFetcher()
1193 self
.fetcher
= MockFetcher()
1194 fetchers
.setDefaultFetcher(self
.fetcher
)
1197 CatchLogs
.tearDown(self
)
1198 fetchers
.setDefaultFetcher(self
._orig
_fetcher
, wrap_exceptions
=False)
1200 def test_error(self
):
1201 self
.fetcher
.response
= HTTPResponse(
1202 "http://some_url", 404, {'Hea': 'der'}, 'blah:blah\n')
1203 query
= {'openid.signed': 'stuff',
1204 'openid.stuff':'a value'}
1205 r
= self
.consumer
._checkAuth
(Message
.fromPostArgs(query
),
1208 self
.failUnless(self
.messages
)
1210 def test_bad_args(self
):
1212 'openid.signed':'foo',
1213 'closid.foo':'something',
1215 consumer
= BadArgCheckingConsumer(self
.store
)
1216 consumer
._checkAuth
(Message
.fromPostArgs(query
), 'does://not.matter')
1219 def test_signedList(self
):
1220 query
= Message
.fromOpenIDArgs({
1223 'identity': '=example',
1224 'assoc_handle': 'munchkins',
1225 'ns.sreg': 'urn:sreg',
1226 'sreg.email': 'bogus@example.com',
1227 'signed': 'identity,mode,ns.sreg,sreg.email',
1230 args
= self
.consumer
._createCheckAuthRequest
(query
)
1231 self
.failUnless(args
.isOpenID1())
1232 for signed_arg
in query
.getArg(OPENID_NS
, 'signed').split(','):
1233 self
.failUnless(args
.getAliasedArg(signed_arg
), signed_arg
)
1236 args
= {'openid.assoc_handle': 'fa1f5ff0-cde4-11dc-a183-3714bfd55ca8',
1237 'openid.claimed_id': 'http://binkley.lan/user/test01',
1238 'openid.identity': 'http://test01.binkley.lan/',
1239 'openid.mode': 'id_res',
1240 'openid.ns': 'http://specs.openid.net/auth/2.0',
1241 'openid.ns.pape': 'http://specs.openid.net/extensions/pape/1.0',
1242 'openid.op_endpoint': 'http://binkley.lan/server',
1243 'openid.pape.auth_policies': 'none',
1244 'openid.pape.auth_time': '2008-01-28T20:42:36Z',
1245 'openid.pape.nist_auth_level': '0',
1246 'openid.response_nonce': '2008-01-28T21:07:04Z99Q=',
1247 'openid.return_to': 'http://binkley.lan:8001/process?janrain_nonce=2008-01-28T21%3A07%3A02Z0tMIKx',
1248 'openid.sig': 'YJlWH4U6SroB1HoPkmEKx9AyGGg=',
1249 'openid.signed': 'assoc_handle,identity,response_nonce,return_to,claimed_id,op_endpoint,pape.auth_time,ns.pape,pape.nist_auth_level,pape.auth_policies'
1251 self
.failUnlessEqual(OPENID2_NS
, args
['openid.ns'])
1252 incoming
= Message
.fromPostArgs(args
)
1253 self
.failUnless(incoming
.isOpenID2())
1254 car
= self
.consumer
._createCheckAuthRequest
(incoming
)
1255 expected_args
= args
.copy()
1256 expected_args
['openid.mode'] = 'check_authentication'
1257 expected
=Message
.fromPostArgs(expected_args
)
1258 self
.failUnless(expected
.isOpenID2())
1259 self
.failUnlessEqual(expected
, car
)
1260 self
.failUnlessEqual(expected_args
, car
.toPostArgs())
1264 class TestFetchAssoc(unittest
.TestCase
, CatchLogs
):
1265 consumer_class
= GenericConsumer
1268 CatchLogs
.setUp(self
)
1269 self
.store
= memstore
.MemoryStore()
1270 self
.fetcher
= MockFetcher()
1271 fetchers
.setDefaultFetcher(self
.fetcher
)
1272 self
.consumer
= self
.consumer_class(self
.store
)
1274 def test_error_404(self
):
1275 """404 from a kv post raises HTTPFetchingError"""
1276 self
.fetcher
.response
= HTTPResponse(
1277 "http://some_url", 404, {'Hea': 'der'}, 'blah:blah\n')
1278 self
.failUnlessRaises(
1279 fetchers
.HTTPFetchingError
,
1280 self
.consumer
._makeKVPost
,
1281 Message
.fromPostArgs({'mode':'associate'}),
1282 "http://server_url")
1284 def test_error_exception_unwrapped(self
):
1285 """Ensure that exceptions are bubbled through from fetchers
1286 when making associations
1288 self
.fetcher
= ExceptionRaisingMockFetcher()
1289 fetchers
.setDefaultFetcher(self
.fetcher
, wrap_exceptions
=False)
1290 self
.failUnlessRaises(self
.fetcher
.MyException
,
1291 self
.consumer
._makeKVPost
,
1292 Message
.fromPostArgs({'mode':'associate'}),
1293 "http://server_url")
1295 # exception fetching returns no association
1296 e
= OpenIDServiceEndpoint()
1297 e
.server_url
= 'some://url'
1298 self
.failUnlessRaises(self
.fetcher
.MyException
,
1299 self
.consumer
._getAssociation
, e
)
1301 self
.failUnlessRaises(self
.fetcher
.MyException
,
1302 self
.consumer
._checkAuth
,
1303 Message
.fromPostArgs({'openid.signed':''}),
1306 def test_error_exception_wrapped(self
):
1307 """Ensure that openid.fetchers.HTTPFetchingError is caught by
1308 the association creation stuff.
1310 self
.fetcher
= ExceptionRaisingMockFetcher()
1311 # This will wrap exceptions!
1312 fetchers
.setDefaultFetcher(self
.fetcher
)
1313 self
.failUnlessRaises(fetchers
.HTTPFetchingError
,
1314 self
.consumer
._makeKVPost
,
1315 Message
.fromOpenIDArgs({'mode':'associate'}),
1316 "http://server_url")
1318 # exception fetching returns no association
1319 e
= OpenIDServiceEndpoint()
1320 e
.server_url
= 'some://url'
1321 self
.failUnless(self
.consumer
._getAssociation
(e
) is None)
1323 msg
= Message
.fromPostArgs({'openid.signed':''})
1324 self
.failIf(self
.consumer
._checkAuth
(msg
, 'some://url'))
1327 class TestSuccessResponse(unittest
.TestCase
):
1329 self
.endpoint
= OpenIDServiceEndpoint()
1330 self
.endpoint
.claimed_id
= 'identity_url'
1332 def test_extensionResponse(self
):
1333 resp
= mkSuccess(self
.endpoint
, {
1334 'ns.sreg':'urn:sreg',
1335 'ns.unittest':'urn:unittest',
1338 'sreg.nickname':'j3h',
1339 'return_to':'return_to',
1341 utargs
= resp
.extensionResponse('urn:unittest', False)
1342 self
.failUnlessEqual(utargs
, {'one':'1', 'two':'2'})
1343 sregargs
= resp
.extensionResponse('urn:sreg', False)
1344 self
.failUnlessEqual(sregargs
, {'nickname':'j3h'})
1346 def test_extensionResponseSigned(self
):
1348 'ns.sreg':'urn:sreg',
1349 'ns.unittest':'urn:unittest',
1352 'sreg.nickname':'j3h',
1353 'sreg.dob':'yesterday',
1354 'return_to':'return_to',
1355 'signed': 'sreg.nickname,unittest.one,sreg.dob',
1358 signed_list
= ['openid.sreg.nickname',
1359 'openid.unittest.one',
1362 # Don't use mkSuccess because it creates an all-inclusive
1364 msg
= Message
.fromOpenIDArgs(args
)
1365 resp
= SuccessResponse(self
.endpoint
, msg
, signed_list
)
1367 # All args in this NS are signed, so expect all.
1368 sregargs
= resp
.extensionResponse('urn:sreg', True)
1369 self
.failUnlessEqual(sregargs
, {'nickname':'j3h', 'dob': 'yesterday'})
1371 # Not all args in this NS are signed, so expect None when
1373 utargs
= resp
.extensionResponse('urn:unittest', True)
1374 self
.failUnlessEqual(utargs
, None)
1376 def test_noReturnTo(self
):
1377 resp
= mkSuccess(self
.endpoint
, {})
1378 self
.failUnless(resp
.getReturnTo() is None)
1380 def test_returnTo(self
):
1381 resp
= mkSuccess(self
.endpoint
, {'return_to':'return_to'})
1382 self
.failUnlessEqual(resp
.getReturnTo(), 'return_to')
1384 def test_displayIdentifierClaimedId(self
):
1385 resp
= mkSuccess(self
.endpoint
, {})
1386 self
.failUnlessEqual(resp
.getDisplayIdentifier(),
1387 resp
.endpoint
.claimed_id
)
1389 def test_displayIdentifierOverride(self
):
1390 self
.endpoint
.display_identifier
= "http://input.url/"
1391 resp
= mkSuccess(self
.endpoint
, {})
1392 self
.failUnlessEqual(resp
.getDisplayIdentifier(),
1393 "http://input.url/")
1395 class StubConsumer(object):
1397 self
.assoc
= object()
1398 self
.response
= None
1399 self
.endpoint
= None
1401 def begin(self
, service
):
1402 auth_req
= AuthRequest(service
, self
.assoc
)
1403 self
.endpoint
= service
1406 def complete(self
, message
, endpoint
, return_to
):
1407 assert endpoint
is self
.endpoint
1408 return self
.response
1410 class ConsumerTest(unittest
.TestCase
):
1411 """Tests for high-level consumer.Consumer functions.
1413 Its GenericConsumer component is stubbed out with StubConsumer.
1416 self
.endpoint
= OpenIDServiceEndpoint()
1417 self
.endpoint
.claimed_id
= self
.identity_url
= 'http://identity.url/'
1420 self
.consumer
= Consumer(self
.session
, self
.store
)
1421 self
.consumer
.consumer
= StubConsumer()
1422 self
.discovery
= Discovery(self
.session
,
1424 self
.consumer
.session_key_prefix
)
1426 def test_setAssociationPreference(self
):
1427 self
.consumer
.setAssociationPreference([])
1428 self
.failUnless(isinstance(self
.consumer
.consumer
.negotiator
,
1429 association
.SessionNegotiator
))
1430 self
.failUnlessEqual([],
1431 self
.consumer
.consumer
.negotiator
.allowed_types
)
1432 self
.consumer
.setAssociationPreference([('HMAC-SHA1', 'DH-SHA1')])
1433 self
.failUnlessEqual([('HMAC-SHA1', 'DH-SHA1')],
1434 self
.consumer
.consumer
.negotiator
.allowed_types
)
1436 def withDummyDiscovery(self
, callable, dummy_getNextService
):
1437 class DummyDisco(object):
1438 def __init__(self
, *ignored
):
1441 getNextService
= dummy_getNextService
1443 import openid
.consumer
.consumer
1444 old_discovery
= openid
.consumer
.consumer
.Discovery
1446 openid
.consumer
.consumer
.Discovery
= DummyDisco
1449 openid
.consumer
.consumer
.Discovery
= old_discovery
1451 def test_beginHTTPError(self
):
1452 """Make sure that the discovery HTTP failure case behaves properly
1454 def getNextService(self
, ignored
):
1455 raise HTTPFetchingError("Unit test")
1459 self
.consumer
.begin('unused in this test')
1460 except DiscoveryFailure
, why
:
1461 self
.failUnless(why
[0].startswith('Error fetching'))
1462 self
.failIf(why
[0].find('Unit test') == -1)
1464 self
.fail('Expected DiscoveryFailure')
1466 self
.withDummyDiscovery(test
, getNextService
)
1468 def test_beginNoServices(self
):
1469 def getNextService(self
, ignored
):
1472 url
= 'http://a.user.url/'
1475 self
.consumer
.begin(url
)
1476 except DiscoveryFailure
, why
:
1477 self
.failUnless(why
[0].startswith('No usable OpenID'))
1478 self
.failIf(why
[0].find(url
) == -1)
1480 self
.fail('Expected DiscoveryFailure')
1482 self
.withDummyDiscovery(test
, getNextService
)
1485 def test_beginWithoutDiscovery(self
):
1486 # Does this really test anything non-trivial?
1487 result
= self
.consumer
.beginWithoutDiscovery(self
.endpoint
)
1489 # The result is an auth request
1490 self
.failUnless(isinstance(result
, AuthRequest
))
1492 # Side-effect of calling beginWithoutDiscovery is setting the
1493 # session value to the endpoint attribute of the result
1494 self
.failUnless(self
.session
[self
.consumer
._token
_key
] is result
.endpoint
)
1496 # The endpoint that we passed in is the endpoint on the auth_request
1497 self
.failUnless(result
.endpoint
is self
.endpoint
)
1499 def test_completeEmptySession(self
):
1500 text
= "failed complete"
1502 def checkEndpoint(message
, endpoint
, return_to
):
1503 self
.failUnless(endpoint
is None)
1504 return FailureResponse(endpoint
, text
)
1506 self
.consumer
.consumer
.complete
= checkEndpoint
1508 response
= self
.consumer
.complete({}, None)
1509 self
.failUnlessEqual(response
.status
, FAILURE
)
1510 self
.failUnlessEqual(response
.message
, text
)
1511 self
.failUnless(response
.identity_url
is None)
1513 def _doResp(self
, auth_req
, exp_resp
):
1514 """complete a transaction, using the expected response from
1515 the generic consumer."""
1516 # response is an attribute of StubConsumer, returned by
1517 # StubConsumer.complete.
1518 self
.consumer
.consumer
.response
= exp_resp
1520 # endpoint is stored in the session
1521 self
.failUnless(self
.session
)
1522 resp
= self
.consumer
.complete({}, None)
1524 # All responses should have the same identity URL, and the
1525 # session should be cleaned out
1526 if self
.endpoint
.claimed_id
!= IDENTIFIER_SELECT
:
1527 self
.failUnless(resp
.identity_url
is self
.identity_url
)
1529 self
.failIf(self
.consumer
._token
_key
in self
.session
)
1531 # Expected status response
1532 self
.failUnlessEqual(resp
.status
, exp_resp
.status
)
1536 def _doRespNoDisco(self
, exp_resp
):
1537 """Set up a transaction without discovery"""
1538 auth_req
= self
.consumer
.beginWithoutDiscovery(self
.endpoint
)
1539 resp
= self
._doResp
(auth_req
, exp_resp
)
1540 # There should be nothing left in the session once we have completed.
1541 self
.failIf(self
.session
)
1544 def test_noDiscoCompleteSuccessWithToken(self
):
1545 self
._doRespNoDisco
(mkSuccess(self
.endpoint
, {}))
1547 def test_noDiscoCompleteCancelWithToken(self
):
1548 self
._doRespNoDisco
(CancelResponse(self
.endpoint
))
1550 def test_noDiscoCompleteFailure(self
):
1552 resp
= self
._doRespNoDisco
(FailureResponse(self
.endpoint
, msg
))
1553 self
.failUnless(resp
.message
is msg
)
1555 def test_noDiscoCompleteSetupNeeded(self
):
1556 setup_url
= 'http://setup.url/'
1557 resp
= self
._doRespNoDisco
(
1558 SetupNeededResponse(self
.endpoint
, setup_url
))
1559 self
.failUnless(resp
.setup_url
is setup_url
)
1561 # To test that discovery is cleaned up, we need to initialize a
1562 # Yadis manager, and have it put its values in the session.
1563 def _doRespDisco(self
, is_clean
, exp_resp
):
1564 """Set up and execute a transaction, with discovery"""
1565 self
.discovery
.createManager([self
.endpoint
], self
.identity_url
)
1566 auth_req
= self
.consumer
.begin(self
.identity_url
)
1567 resp
= self
._doResp
(auth_req
, exp_resp
)
1569 manager
= self
.discovery
.getManager()
1571 self
.failUnless(self
.discovery
.getManager() is None, manager
)
1573 self
.failIf(self
.discovery
.getManager() is None, manager
)
1577 # Cancel and success DO clean up the discovery process
1578 def test_completeSuccess(self
):
1579 self
._doRespDisco
(True, mkSuccess(self
.endpoint
, {}))
1581 def test_completeCancel(self
):
1582 self
._doRespDisco
(True, CancelResponse(self
.endpoint
))
1584 # Failure and setup_needed don't clean up the discovery process
1585 def test_completeFailure(self
):
1587 resp
= self
._doRespDisco
(False, FailureResponse(self
.endpoint
, msg
))
1588 self
.failUnless(resp
.message
is msg
)
1590 def test_completeSetupNeeded(self
):
1591 setup_url
= 'http://setup.url/'
1592 resp
= self
._doRespDisco
(
1594 SetupNeededResponse(self
.endpoint
, setup_url
))
1595 self
.failUnless(resp
.setup_url
is setup_url
)
1597 def test_successDifferentURL(self
):
1599 Be sure that the session gets cleaned up when the response is
1600 successful and has a different URL than the one in the
1603 # Set up a request endpoint describing an IDP URL
1604 self
.identity_url
= 'http://idp.url/'
1605 self
.endpoint
.claimed_id
= self
.endpoint
.local_id
= IDENTIFIER_SELECT
1607 # Use a response endpoint with a different URL (asserted by
1609 resp_endpoint
= OpenIDServiceEndpoint()
1610 resp_endpoint
.claimed_id
= "http://user.url/"
1612 resp
= self
._doRespDisco
(
1614 mkSuccess(resp_endpoint
, {}))
1615 self
.failUnless(self
.discovery
.getManager(force
=True) is None)
1617 def test_begin(self
):
1618 self
.discovery
.createManager([self
.endpoint
], self
.identity_url
)
1619 # Should not raise an exception
1620 auth_req
= self
.consumer
.begin(self
.identity_url
)
1621 self
.failUnless(isinstance(auth_req
, AuthRequest
))
1622 self
.failUnless(auth_req
.endpoint
is self
.endpoint
)
1623 self
.failUnless(auth_req
.endpoint
is self
.consumer
.consumer
.endpoint
)
1624 self
.failUnless(auth_req
.assoc
is self
.consumer
.consumer
.assoc
)
1628 class IDPDrivenTest(unittest
.TestCase
):
1631 self
.store
= GoodAssocStore()
1632 self
.consumer
= GenericConsumer(self
.store
)
1633 self
.endpoint
= OpenIDServiceEndpoint()
1634 self
.endpoint
.server_url
= "http://idp.unittest/"
1637 def test_idpDrivenBegin(self
):
1638 # Testing here that the token-handling doesn't explode...
1639 self
.consumer
.begin(self
.endpoint
)
1642 def test_idpDrivenComplete(self
):
1643 identifier
= '=directed_identifier'
1644 message
= Message
.fromPostArgs({
1645 'openid.identity': '=directed_identifier',
1646 'openid.return_to': 'x',
1647 'openid.assoc_handle': 'z',
1648 'openid.signed': 'identity,return_to',
1649 'openid.sig': GOODSIG
,
1652 discovered_endpoint
= OpenIDServiceEndpoint()
1653 discovered_endpoint
.claimed_id
= identifier
1654 discovered_endpoint
.server_url
= self
.endpoint
.server_url
1655 discovered_endpoint
.local_id
= identifier
1657 def verifyDiscoveryResults(identifier
, endpoint
):
1658 self
.failUnless(endpoint
is self
.endpoint
)
1659 iverified
.append(discovered_endpoint
)
1660 return discovered_endpoint
1661 self
.consumer
._verifyDiscoveryResults
= verifyDiscoveryResults
1662 self
.consumer
._idResCheckNonce
= lambda *args
: True
1663 self
.consumer
._checkReturnTo
= lambda unused1
, unused2
: True
1664 response
= self
.consumer
._doIdRes
(message
, self
.endpoint
, None)
1666 self
.failUnlessSuccess(response
)
1667 self
.failUnlessEqual(response
.identity_url
, "=directed_identifier")
1669 # assert that discovery attempt happens and returns good
1670 self
.failUnlessEqual(iverified
, [discovered_endpoint
])
1673 def test_idpDrivenCompleteFraud(self
):
1674 # crap with an identifier that doesn't match discovery info
1675 message
= Message
.fromPostArgs({
1676 'openid.identity': '=directed_identifier',
1677 'openid.return_to': 'x',
1678 'openid.assoc_handle': 'z',
1679 'openid.signed': 'identity,return_to',
1680 'openid.sig': GOODSIG
,
1682 def verifyDiscoveryResults(identifier
, endpoint
):
1683 raise DiscoveryFailure("PHREAK!", None)
1684 self
.consumer
._verifyDiscoveryResults
= verifyDiscoveryResults
1685 self
.consumer
._checkReturnTo
= lambda unused1
, unused2
: True
1686 self
.failUnlessRaises(DiscoveryFailure
, self
.consumer
._doIdRes
,
1687 message
, self
.endpoint
, None)
1690 def failUnlessSuccess(self
, response
):
1691 if response
.status
!= SUCCESS
:
1692 self
.fail("Non-successful response: %s" % (response
,))
1696 class TestDiscoveryVerification(unittest
.TestCase
):
1700 self
.store
= GoodAssocStore()
1701 self
.consumer
= GenericConsumer(self
.store
)
1703 self
.consumer
._discover
= self
.discoveryFunc
1705 self
.identifier
= "http://idp.unittest/1337"
1706 self
.server_url
= "http://endpoint.unittest/"
1708 self
.message
= Message
.fromPostArgs({
1709 'openid.ns': OPENID2_NS
,
1710 'openid.identity': self
.identifier
,
1711 'openid.claimed_id': self
.identifier
,
1712 'openid.op_endpoint': self
.server_url
,
1715 self
.endpoint
= OpenIDServiceEndpoint()
1716 self
.endpoint
.server_url
= self
.server_url
1718 def test_theGoodStuff(self
):
1719 endpoint
= OpenIDServiceEndpoint()
1720 endpoint
.type_uris
= [OPENID_2_0_TYPE
]
1721 endpoint
.claimed_id
= self
.identifier
1722 endpoint
.server_url
= self
.server_url
1723 endpoint
.local_id
= self
.identifier
1724 self
.services
= [endpoint
]
1725 r
= self
.consumer
._verifyDiscoveryResults
(self
.message
, endpoint
)
1727 self
.failUnlessEqual(r
, endpoint
)
1730 def test_otherServer(self
):
1731 text
= "verify failed"
1733 def discoverAndVerify(claimed_id
, to_match_endpoints
):
1734 self
.failUnlessEqual(claimed_id
, self
.identifier
)
1735 for to_match
in to_match_endpoints
:
1736 self
.failUnlessEqual(claimed_id
, to_match
.claimed_id
)
1737 raise ProtocolError(text
)
1739 self
.consumer
._discoverAndVerify
= discoverAndVerify
1741 # a set of things without the stuff
1742 endpoint
= OpenIDServiceEndpoint()
1743 endpoint
.type_uris
= [OPENID_2_0_TYPE
]
1744 endpoint
.claimed_id
= self
.identifier
1745 endpoint
.server_url
= "http://the-MOON.unittest/"
1746 endpoint
.local_id
= self
.identifier
1747 self
.services
= [endpoint
]
1749 r
= self
.consumer
._verifyDiscoveryResults
(self
.message
, endpoint
)
1750 except ProtocolError
, e
:
1751 # Should we make more ProtocolError subclasses?
1752 self
.failUnless(str(e
), text
)
1754 self
.fail("expected ProtocolError, %r returned." % (r
,))
1757 def test_foreignDelegate(self
):
1758 text
= "verify failed"
1760 def discoverAndVerify(claimed_id
, to_match_endpoints
):
1761 self
.failUnlessEqual(claimed_id
, self
.identifier
)
1762 for to_match
in to_match_endpoints
:
1763 self
.failUnlessEqual(claimed_id
, to_match
.claimed_id
)
1764 raise ProtocolError(text
)
1766 self
.consumer
._discoverAndVerify
= discoverAndVerify
1768 # a set of things with the server stuff but other delegate
1769 endpoint
= OpenIDServiceEndpoint()
1770 endpoint
.type_uris
= [OPENID_2_0_TYPE
]
1771 endpoint
.claimed_id
= self
.identifier
1772 endpoint
.server_url
= self
.server_url
1773 endpoint
.local_id
= "http://unittest/juan-carlos"
1776 r
= self
.consumer
._verifyDiscoveryResults
(self
.message
, endpoint
)
1777 except ProtocolError
, e
:
1778 self
.failUnlessEqual(str(e
), text
)
1780 self
.fail("Exepected ProtocolError, %r returned" % (r
,))
1782 def test_nothingDiscovered(self
):
1783 # a set of no things.
1785 self
.failUnlessRaises(DiscoveryFailure
,
1786 self
.consumer
._verifyDiscoveryResults
,
1787 self
.message
, self
.endpoint
)
1790 def discoveryFunc(self
, identifier
):
1791 return identifier
, self
.services
1794 class TestCreateAssociationRequest(unittest
.TestCase
):
1796 class DummyEndpoint(object):
1797 use_compatibility
= False
1799 def compatibilityMode(self
):
1800 return self
.use_compatibility
1802 self
.endpoint
= DummyEndpoint()
1803 self
.consumer
= GenericConsumer(store
=None)
1804 self
.assoc_type
= 'HMAC-SHA1'
1806 def test_noEncryptionSendsType(self
):
1807 session_type
= 'no-encryption'
1808 session
, args
= self
.consumer
._createAssociateRequest
(
1809 self
.endpoint
, self
.assoc_type
, session_type
)
1811 self
.failUnless(isinstance(session
, PlainTextConsumerSession
))
1812 expected
= Message
.fromOpenIDArgs(
1814 'session_type':session_type
,
1816 'assoc_type':self
.assoc_type
,
1819 self
.failUnlessEqual(expected
, args
)
1821 def test_noEncryptionCompatibility(self
):
1822 self
.endpoint
.use_compatibility
= True
1823 session_type
= 'no-encryption'
1824 session
, args
= self
.consumer
._createAssociateRequest
(
1825 self
.endpoint
, self
.assoc_type
, session_type
)
1827 self
.failUnless(isinstance(session
, PlainTextConsumerSession
))
1828 self
.failUnlessEqual(Message
.fromOpenIDArgs({'mode':'associate',
1829 'assoc_type':self
.assoc_type
,
1832 def test_dhSHA1Compatibility(self
):
1833 # Set the consumer's session type to a fast session since we
1835 setConsumerSession(self
.consumer
)
1837 self
.endpoint
.use_compatibility
= True
1838 session_type
= 'DH-SHA1'
1839 session
, args
= self
.consumer
._createAssociateRequest
(
1840 self
.endpoint
, self
.assoc_type
, session_type
)
1842 self
.failUnless(isinstance(session
, DiffieHellmanSHA1ConsumerSession
))
1844 # This is a random base-64 value, so just check that it's
1846 self
.failUnless(args
.getArg(OPENID1_NS
, 'dh_consumer_public'))
1847 args
.delArg(OPENID1_NS
, 'dh_consumer_public')
1849 # OK, session_type is set here and not for no-encryption
1851 expected
= Message
.fromOpenIDArgs({'mode':'associate',
1852 'session_type':'DH-SHA1',
1853 'assoc_type':self
.assoc_type
,
1854 'dh_modulus': 'BfvStQ==',
1858 self
.failUnlessEqual(expected
, args
)
1860 # XXX: test the other types
1862 class TestDiffieHellmanResponseParameters(object):
1864 message_namespace
= None
1867 # Pre-compute DH with small prime so tests run quickly.
1868 self
.server_dh
= DiffieHellman(100389557, 2)
1869 self
.consumer_dh
= DiffieHellman(100389557, 2)
1871 # base64(btwoc(g ^ xb mod p))
1872 self
.dh_server_public
= cryptutil
.longToBase64(self
.server_dh
.public
)
1874 self
.secret
= cryptutil
.randomString(self
.session_cls
.secret_size
)
1876 self
.enc_mac_key
= oidutil
.toBase64(
1877 self
.server_dh
.xorSecret(self
.consumer_dh
.public
,
1879 self
.session_cls
.hash_func
))
1881 self
.consumer_session
= self
.session_cls(self
.consumer_dh
)
1883 self
.msg
= Message(self
.message_namespace
)
1885 def testExtractSecret(self
):
1886 self
.msg
.setArg(OPENID_NS
, 'dh_server_public', self
.dh_server_public
)
1887 self
.msg
.setArg(OPENID_NS
, 'enc_mac_key', self
.enc_mac_key
)
1889 extracted
= self
.consumer_session
.extractSecret(self
.msg
)
1890 self
.failUnlessEqual(extracted
, self
.secret
)
1892 def testAbsentServerPublic(self
):
1893 self
.msg
.setArg(OPENID_NS
, 'enc_mac_key', self
.enc_mac_key
)
1895 self
.failUnlessRaises(KeyError, self
.consumer_session
.extractSecret
, self
.msg
)
1897 def testAbsentMacKey(self
):
1898 self
.msg
.setArg(OPENID_NS
, 'dh_server_public', self
.dh_server_public
)
1900 self
.failUnlessRaises(KeyError, self
.consumer_session
.extractSecret
, self
.msg
)
1902 def testInvalidBase64Public(self
):
1903 self
.msg
.setArg(OPENID_NS
, 'dh_server_public', 'n o t b a s e 6 4.')
1904 self
.msg
.setArg(OPENID_NS
, 'enc_mac_key', self
.enc_mac_key
)
1906 self
.failUnlessRaises(ValueError, self
.consumer_session
.extractSecret
, self
.msg
)
1908 def testInvalidBase64MacKey(self
):
1909 self
.msg
.setArg(OPENID_NS
, 'dh_server_public', self
.dh_server_public
)
1910 self
.msg
.setArg(OPENID_NS
, 'enc_mac_key', 'n o t base 64')
1912 self
.failUnlessRaises(ValueError, self
.consumer_session
.extractSecret
, self
.msg
)
1914 class TestOpenID1SHA1(TestDiffieHellmanResponseParameters
, unittest
.TestCase
):
1915 session_cls
= DiffieHellmanSHA1ConsumerSession
1916 message_namespace
= OPENID1_NS
1918 class TestOpenID2SHA1(TestDiffieHellmanResponseParameters
, unittest
.TestCase
):
1919 session_cls
= DiffieHellmanSHA1ConsumerSession
1920 message_namespace
= OPENID2_NS
1922 if cryptutil
.SHA256_AVAILABLE
:
1923 class TestOpenID2SHA256(TestDiffieHellmanResponseParameters
, unittest
.TestCase
):
1924 session_cls
= DiffieHellmanSHA256ConsumerSession
1925 message_namespace
= OPENID2_NS
1927 warnings
.warn("Not running SHA256 association session tests.")
1929 class TestNoStore(unittest
.TestCase
):
1931 self
.consumer
= GenericConsumer(None)
1933 def test_completeNoGetAssoc(self
):
1934 """_getAssociation is never called when the store is None"""
1935 def notCalled(unused
):
1936 self
.fail('This method was unexpectedly called')
1938 endpoint
= OpenIDServiceEndpoint()
1939 endpoint
.claimed_id
= 'identity_url'
1941 self
.consumer
._getAssociation
= notCalled
1942 auth_request
= self
.consumer
.begin(endpoint
)
1943 # _getAssociation was not called
1948 class NonAnonymousAuthRequest(object):
1951 def setAnonymous(self
, unused
):
1952 raise ValueError('Should trigger ProtocolError')
1954 class TestConsumerAnonymous(unittest
.TestCase
):
1955 def test_beginWithoutDiscoveryAnonymousFail(self
):
1956 """Make sure that ValueError for setting an auth request
1957 anonymous gets converted to a ProtocolError
1960 consumer
= Consumer(sess
, None)
1961 def bogusBegin(unused
):
1962 return NonAnonymousAuthRequest()
1963 consumer
.consumer
.begin
= bogusBegin
1964 self
.failUnlessRaises(
1966 consumer
.beginWithoutDiscovery
, None)
1969 class TestDiscoverAndVerify(unittest
.TestCase
):
1971 self
.consumer
= GenericConsumer(None)
1972 self
.discovery_result
= None
1973 def dummyDiscover(unused_identifier
):
1974 return self
.discovery_result
1975 self
.consumer
._discover
= dummyDiscover
1976 self
.to_match
= OpenIDServiceEndpoint()
1978 def failUnlessDiscoveryFailure(self
):
1979 self
.failUnlessRaises(
1981 self
.consumer
._discoverAndVerify
,
1982 'http://claimed-id.com/',
1985 def test_noServices(self
):
1986 """Discovery returning no results results in a
1987 DiscoveryFailure exception"""
1988 self
.discovery_result
= (None, [])
1989 self
.failUnlessDiscoveryFailure()
1991 def test_noMatches(self
):
1992 """If no discovered endpoint matches the values from the
1993 assertion, then we end up raising a ProtocolError
1995 self
.discovery_result
= (None, ['unused'])
1996 def raiseProtocolError(unused1
, unused2
):
1997 raise ProtocolError('unit test')
1998 self
.consumer
._verifyDiscoverySingle
= raiseProtocolError
1999 self
.failUnlessDiscoveryFailure()
2001 def test_matches(self
):
2002 """If an endpoint matches, we return it
2004 # Discovery returns a single "endpoint" object
2005 matching_endpoint
= 'matching endpoint'
2006 self
.discovery_result
= (None, [matching_endpoint
])
2008 # Make verifying discovery return True for this endpoint
2009 def returnTrue(unused1
, unused2
):
2011 self
.consumer
._verifyDiscoverySingle
= returnTrue
2013 # Since _verifyDiscoverySingle returns True, we should get the
2014 # first endpoint that we passed in as a result.
2015 result
= self
.consumer
._discoverAndVerify
(
2016 'http://claimed.id/', [self
.to_match
])
2017 self
.failUnlessEqual(matching_endpoint
, result
)
2019 from openid
.extension
import Extension
2020 class SillyExtension(Extension
):
2021 ns_uri
= 'http://silly.example.com/'
2024 def getExtensionArgs(self
):
2025 return {'i_am':'silly'}
2027 class TestAddExtension(unittest
.TestCase
):
2029 def test_SillyExtension(self
):
2030 ext
= SillyExtension()
2031 ar
= AuthRequest(OpenIDServiceEndpoint(), None)
2032 ar
.addExtension(ext
)
2033 ext_args
= ar
.message
.getArgs(ext
.ns_uri
)
2034 self
.failUnlessEqual(ext
.getExtensionArgs(), ext_args
)
2038 class TestKVPost(unittest
.TestCase
):
2040 self
.server_url
= 'http://unittest/%s' % (self
.id(),)
2043 from openid
.fetchers
import HTTPResponse
2044 response
= HTTPResponse()
2045 response
.status
= 200
2046 response
.body
= "foo:bar\nbaz:quux\n"
2047 r
= _httpResponseToMessage(response
, self
.server_url
)
2048 expected_msg
= Message
.fromOpenIDArgs({'foo':'bar','baz':'quux'})
2049 self
.failUnlessEqual(expected_msg
, r
)
2053 response
= HTTPResponse()
2054 response
.status
= 400
2055 response
.body
= "error:bonk\nerror_code:7\n"
2057 r
= _httpResponseToMessage(response
, self
.server_url
)
2058 except ServerError
, e
:
2059 self
.failUnlessEqual(e
.error_text
, 'bonk')
2060 self
.failUnlessEqual(e
.error_code
, '7')
2062 self
.fail("Expected ServerError, got return %r" % (r
,))
2066 # 500 as an example of any non-200, non-400 code.
2067 response
= HTTPResponse()
2068 response
.status
= 500
2069 response
.body
= "foo:bar\nbaz:quux\n"
2070 self
.failUnlessRaises(fetchers
.HTTPFetchingError
,
2071 _httpResponseToMessage
, response
,
2077 if __name__
== '__main__':