5 from openid
import fetchers
6 from openid
.fetchers
import HTTPResponse
7 from openid
.yadis
.discover
import DiscoveryFailure
8 from openid
.consumer
import discover
9 from openid
.yadis
import xrires
10 from openid
.yadis
.xri
import XRI
11 from urlparse
import urlsplit
12 from openid
import message
14 ### Tests for conditions that trigger DiscoveryFailure
16 class SimpleMockFetcher(object):
17 def __init__(self
, responses
):
18 self
.responses
= list(responses
)
20 def fetch(self
, url
, body
=None, headers
=None):
21 response
= self
.responses
.pop(0)
23 assert response
.final_url
== url
26 class TestDiscoveryFailure(datadriven
.DataDrivenTestCase
):
28 [HTTPResponse('http://network.error/', None)],
29 [HTTPResponse('http://not.found/', 404)],
30 [HTTPResponse('http://bad.request/', 400)],
31 [HTTPResponse('http://server.error/', 500)],
32 [HTTPResponse('http://header.found/', 200,
33 headers
={'x-xrds-location':'http://xrds.missing/'}),
34 HTTPResponse('http://xrds.missing/', 404)],
37 def __init__(self
, responses
):
38 self
.url
= responses
[0].final_url
39 datadriven
.DataDrivenTestCase
.__init
__(self
, self
.url
)
40 self
.responses
= responses
43 fetcher
= SimpleMockFetcher(self
.responses
)
44 fetchers
.setDefaultFetcher(fetcher
)
47 fetchers
.setDefaultFetcher(None)
50 expected_status
= self
.responses
[-1].status
52 discover
.discover(self
.url
)
53 except DiscoveryFailure
, why
:
54 self
.failUnlessEqual(why
.http_response
.status
, expected_status
)
56 self
.fail('Did not raise DiscoveryFailure')
59 ### Tests for raising/catching exceptions from the fetcher through the
62 # Python 2.5 displays a message when running this test, which is
63 # testing the behaviour in the presence of string exceptions,
64 # deprecated or not, so tell it no to complain when this particular
65 # string exception is raised.
67 warnings
.filterwarnings('ignore', 'raising a string.*', DeprecationWarning,
68 r
'^openid\.test\.test_discover$', 77)
70 class ErrorRaisingFetcher(object):
71 """Just raise an exception when fetch is called"""
73 def __init__(self
, thing_to_raise
):
74 self
.thing_to_raise
= thing_to_raise
76 def fetch(self
, url
, body
=None, headers
=None):
77 raise self
.thing_to_raise
79 class DidFetch(Exception):
80 """Custom exception just to make sure it's not handled differently"""
82 class TestFetchException(datadriven
.DataDrivenTestCase
):
83 """Make sure exceptions get passed through discover function from
94 def __init__(self
, exc
):
95 datadriven
.DataDrivenTestCase
.__init
__(self
, repr(exc
))
99 fetcher
= ErrorRaisingFetcher(self
.exc
)
100 fetchers
.setDefaultFetcher(fetcher
, wrap_exceptions
=False)
103 fetchers
.setDefaultFetcher(None)
105 def runOneTest(self
):
107 discover
.discover('http://doesnt.matter/')
109 exc
= sys
.exc_info()[1]
112 self
.failUnless(self
.exc
is sys
.exc_info()[0])
114 self
.failUnless(self
.exc
is exc
, exc
)
116 self
.fail('Expected %r', self
.exc
)
119 ### Tests for openid.consumer.discover.discover
121 class TestNormalization(unittest
.TestCase
):
122 def testAddingProtocol(self
):
123 f
= ErrorRaisingFetcher(RuntimeError())
124 fetchers
.setDefaultFetcher(f
, wrap_exceptions
=False)
127 discover
.discover('users.stompy.janrain.com:8000/x')
128 except DiscoveryFailure
, why
:
129 self
.fail('failed to parse url with port correctly')
133 fetchers
.setDefaultFetcher(None)
136 class DiscoveryMockFetcher(object):
139 def __init__(self
, documents
):
140 self
.documents
= documents
143 def fetch(self
, url
, body
=None, headers
=None):
144 self
.fetchlog
.append((url
, body
, headers
))
146 final_url
= self
.redirect
151 ctype
, body
= self
.documents
[url
]
159 return HTTPResponse(final_url
, status
, {'content-type': ctype
}, body
)
161 # from twisted.trial import unittest as trialtest
163 class BaseTestDiscovery(unittest
.TestCase
):
164 id_url
= "http://someuser.unittest/"
167 fetcherClass
= DiscoveryMockFetcher
169 def _checkService(self
, s
,
176 display_identifier
=None
178 self
.failUnlessEqual(server_url
, s
.server_url
)
179 if types
== ['2.0 OP']:
180 self
.failIf(claimed_id
)
181 self
.failIf(local_id
)
182 self
.failIf(s
.claimed_id
)
183 self
.failIf(s
.local_id
)
184 self
.failIf(s
.getLocalID())
185 self
.failIf(s
.compatibilityMode())
186 self
.failUnless(s
.isOPIdentifier())
187 self
.failUnlessEqual(s
.preferredNamespace(),
188 discover
.OPENID_2_0_MESSAGE_NS
)
190 self
.failUnlessEqual(claimed_id
, s
.claimed_id
)
191 self
.failUnlessEqual(local_id
, s
.getLocalID())
194 self
.failUnless(s
.used_yadis
, "Expected to use Yadis")
196 self
.failIf(s
.used_yadis
,
197 "Expected to use old-style discovery")
200 '1.1': discover
.OPENID_1_1_TYPE
,
201 '1.0': discover
.OPENID_1_0_TYPE
,
202 '2.0': discover
.OPENID_2_0_TYPE
,
203 '2.0 OP': discover
.OPENID_IDP_2_0_TYPE
,
206 type_uris
= [openid_types
[t
] for t
in types
]
207 self
.failUnlessEqual(type_uris
, s
.type_uris
)
208 self
.failUnlessEqual(canonical_id
, s
.canonicalID
)
211 self
.failUnless(s
.getDisplayIdentifier() != claimed_id
)
212 self
.failUnless(s
.getDisplayIdentifier() is not None)
213 self
.failUnlessEqual(display_identifier
, s
.getDisplayIdentifier())
214 self
.failUnlessEqual(s
.claimed_id
, s
.canonicalID
)
216 self
.failUnlessEqual(s
.display_identifier
or s
.claimed_id
, s
.getDisplayIdentifier())
219 self
.documents
= self
.documents
.copy()
220 self
.fetcher
= self
.fetcherClass(self
.documents
)
221 fetchers
.setDefaultFetcher(self
.fetcher
)
224 fetchers
.setDefaultFetcher(None)
226 def readDataFile(filename
):
227 module_directory
= os
.path
.dirname(os
.path
.abspath(__file__
))
228 filename
= os
.path
.join(
229 module_directory
, 'data', 'test_discover', filename
)
230 return file(filename
).read()
232 class TestDiscovery(BaseTestDiscovery
):
233 def _discover(self
, content_type
, data
,
234 expected_services
, expected_id
=None):
235 if expected_id
is None:
236 expected_id
= self
.id_url
238 self
.documents
[self
.id_url
] = (content_type
, data
)
239 id_url
, services
= discover
.discover(self
.id_url
)
240 self
.failUnlessEqual(expected_services
, len(services
))
241 self
.failUnlessEqual(expected_id
, id_url
)
245 self
.failUnlessRaises(DiscoveryFailure
,
246 discover
.discover
, self
.id_url
+ '/404')
248 def test_noOpenID(self
):
249 services
= self
._discover
(content_type
='text/plain',
253 services
= self
._discover
(
254 content_type
='text/html',
255 data
=readDataFile('openid_no_delegate.html'),
263 server_url
="http://www.myopenid.com/server",
264 claimed_id
=self
.id_url
,
265 local_id
=self
.id_url
,
268 def test_html1(self
):
269 services
= self
._discover
(
270 content_type
='text/html',
271 data
=readDataFile('openid.html'),
279 server_url
="http://www.myopenid.com/server",
280 claimed_id
=self
.id_url
,
281 local_id
='http://smoker.myopenid.com/',
282 display_identifier
=self
.id_url
,
285 def test_html1Fragment(self
):
286 """Ensure that the Claimed Identifier does not have a fragment
287 if one is supplied in the User Input."""
288 content_type
= 'text/html'
289 data
= readDataFile('openid.html')
290 expected_services
= 1
292 self
.documents
[self
.id_url
] = (content_type
, data
)
293 expected_id
= self
.id_url
294 self
.id_url
= self
.id_url
+ '#fragment'
295 id_url
, services
= discover
.discover(self
.id_url
)
296 self
.failUnlessEqual(expected_services
, len(services
))
297 self
.failUnlessEqual(expected_id
, id_url
)
303 server_url
="http://www.myopenid.com/server",
304 claimed_id
=expected_id
,
305 local_id
='http://smoker.myopenid.com/',
306 display_identifier
=expected_id
,
309 def test_html2(self
):
310 services
= self
._discover
(
311 content_type
='text/html',
312 data
=readDataFile('openid2.html'),
320 server_url
="http://www.myopenid.com/server",
321 claimed_id
=self
.id_url
,
322 local_id
='http://smoker.myopenid.com/',
323 display_identifier
=self
.id_url
,
326 def test_html1And2(self
):
327 services
= self
._discover
(
328 content_type
='text/html',
329 data
=readDataFile('openid_1_and_2.html'),
333 for t
, s
in zip(['2.0', '1.1'], services
):
338 server_url
="http://www.myopenid.com/server",
339 claimed_id
=self
.id_url
,
340 local_id
='http://smoker.myopenid.com/',
341 display_identifier
=self
.id_url
,
344 def test_yadisEmpty(self
):
345 services
= self
._discover
(content_type
='application/xrds+xml',
346 data
=readDataFile('yadis_0entries.xml'),
349 def test_htmlEmptyYadis(self
):
350 """HTML document has discovery information, but points to an
351 empty Yadis document."""
352 # The XRDS document pointed to by "openid_and_yadis.html"
353 self
.documents
[self
.id_url
+ 'xrds'] = (
354 'application/xrds+xml', readDataFile('yadis_0entries.xml'))
356 services
= self
._discover
(content_type
='text/html',
357 data
=readDataFile('openid_and_yadis.html'),
364 server_url
="http://www.myopenid.com/server",
365 claimed_id
=self
.id_url
,
366 local_id
='http://smoker.myopenid.com/',
367 display_identifier
=self
.id_url
,
370 def test_yadis1NoDelegate(self
):
371 services
= self
._discover
(content_type
='application/xrds+xml',
372 data
=readDataFile('yadis_no_delegate.xml'),
379 server_url
="http://www.myopenid.com/server",
380 claimed_id
=self
.id_url
,
381 local_id
=self
.id_url
,
382 display_identifier
=self
.id_url
,
385 def test_yadis2NoLocalID(self
):
386 services
= self
._discover
(
387 content_type
='application/xrds+xml',
388 data
=readDataFile('openid2_xrds_no_local_id.xml'),
396 server_url
="http://www.myopenid.com/server",
397 claimed_id
=self
.id_url
,
398 local_id
=self
.id_url
,
399 display_identifier
=self
.id_url
,
402 def test_yadis2(self
):
403 services
= self
._discover
(
404 content_type
='application/xrds+xml',
405 data
=readDataFile('openid2_xrds.xml'),
413 server_url
="http://www.myopenid.com/server",
414 claimed_id
=self
.id_url
,
415 local_id
='http://smoker.myopenid.com/',
416 display_identifier
=self
.id_url
,
419 def test_yadis2OP(self
):
420 services
= self
._discover
(
421 content_type
='application/xrds+xml',
422 data
=readDataFile('yadis_idp.xml'),
430 server_url
="http://www.myopenid.com/server",
431 display_identifier
=self
.id_url
,
434 def test_yadis2OPDelegate(self
):
435 """The delegate tag isn't meaningful for OP entries."""
436 services
= self
._discover
(
437 content_type
='application/xrds+xml',
438 data
=readDataFile('yadis_idp_delegate.xml'),
446 server_url
="http://www.myopenid.com/server",
447 display_identifier
=self
.id_url
,
450 def test_yadis2BadLocalID(self
):
451 self
.failUnlessRaises(DiscoveryFailure
, self
._discover
,
452 content_type
='application/xrds+xml',
453 data
=readDataFile('yadis_2_bad_local_id.xml'),
457 def test_yadis1And2(self
):
458 services
= self
._discover
(
459 content_type
='application/xrds+xml',
460 data
=readDataFile('openid_1_and_2_xrds.xml'),
467 types
=['2.0', '1.1'],
468 server_url
="http://www.myopenid.com/server",
469 claimed_id
=self
.id_url
,
470 local_id
='http://smoker.myopenid.com/',
471 display_identifier
=self
.id_url
,
474 def test_yadis1And2BadLocalID(self
):
475 self
.failUnlessRaises(DiscoveryFailure
, self
._discover
,
476 content_type
='application/xrds+xml',
477 data
=readDataFile('openid_1_and_2_xrds_bad_delegate.xml'),
481 class MockFetcherForXRIProxy(object):
483 def __init__(self
, documents
, proxy_url
=xrires
.DEFAULT_PROXY
):
484 self
.documents
= documents
486 self
.proxy_url
= None
489 def fetch(self
, url
, body
=None, headers
=None):
490 self
.fetchlog
.append((url
, body
, headers
))
497 if not headers
and not query
:
498 raise ValueError("No headers or query; you probably didn't "
501 if xri
.startswith('/'):
505 ctype
, body
= self
.documents
[xri
]
513 return HTTPResponse(url
, status
, {'content-type': ctype
}, body
)
516 class TestXRIDiscovery(BaseTestDiscovery
):
517 fetcherClass
= MockFetcherForXRIProxy
519 documents
= {'=smoker': ('application/xrds+xml',
520 readDataFile('yadis_2entries_delegate.xml')),
521 '=smoker*bad': ('application/xrds+xml',
522 readDataFile('yadis_another_delegate.xml')) }
525 user_xri
, services
= discover
.discoverXRI('=smoker')
531 server_url
="http://www.myopenid.com/server",
532 claimed_id
=XRI("=!1000"),
533 canonical_id
=XRI("=!1000"),
534 local_id
='http://smoker.myopenid.com/',
535 display_identifier
='=smoker'
542 server_url
="http://www.livejournal.com/openid/server.bml",
543 claimed_id
=XRI("=!1000"),
544 canonical_id
=XRI("=!1000"),
545 local_id
='http://frank.livejournal.com/',
546 display_identifier
='=smoker'
549 def test_xriNoCanonicalID(self
):
550 user_xri
, services
= discover
.discoverXRI('=smoker*bad')
551 self
.failIf(services
)
553 def test_useCanonicalID(self
):
554 """When there is no delegate, the CanonicalID should be used with XRI.
556 endpoint
= discover
.OpenIDServiceEndpoint()
557 endpoint
.claimed_id
= XRI("=!1000")
558 endpoint
.canonicalID
= XRI("=!1000")
559 self
.failUnlessEqual(endpoint
.getLocalID(), XRI("=!1000"))
562 class TestXRIDiscoveryIDP(BaseTestDiscovery
):
563 fetcherClass
= MockFetcherForXRIProxy
565 documents
= {'=smoker': ('application/xrds+xml',
566 readDataFile('yadis_2entries_idp.xml')) }
569 user_xri
, services
= discover
.discoverXRI('=smoker')
570 self
.failUnless(services
, "Expected services, got zero")
571 self
.failUnlessEqual(services
[0].server_url
,
572 "http://www.livejournal.com/openid/server.bml")
575 class TestPreferredNamespace(datadriven
.DataDrivenTestCase
):
576 def __init__(self
, expected_ns
, type_uris
):
577 datadriven
.DataDrivenTestCase
.__init
__(
578 self
, 'Expecting %s from %s' % (expected_ns
, type_uris
))
579 self
.expected_ns
= expected_ns
580 self
.type_uris
= type_uris
582 def runOneTest(self
):
583 endpoint
= discover
.OpenIDServiceEndpoint()
584 endpoint
.type_uris
= self
.type_uris
585 actual_ns
= endpoint
.preferredNamespace()
586 self
.failUnlessEqual(actual_ns
, self
.expected_ns
)
589 (message
.OPENID1_NS
, []),
590 (message
.OPENID1_NS
, ['http://jyte.com/']),
591 (message
.OPENID1_NS
, [discover
.OPENID_1_0_TYPE
]),
592 (message
.OPENID1_NS
, [discover
.OPENID_1_1_TYPE
]),
593 (message
.OPENID2_NS
, [discover
.OPENID_2_0_TYPE
]),
594 (message
.OPENID2_NS
, [discover
.OPENID_IDP_2_0_TYPE
]),
595 (message
.OPENID2_NS
, [discover
.OPENID_2_0_TYPE
,
596 discover
.OPENID_1_0_TYPE
]),
597 (message
.OPENID2_NS
, [discover
.OPENID_1_0_TYPE
,
598 discover
.OPENID_2_0_TYPE
]),
601 class TestIsOPIdentifier(unittest
.TestCase
):
603 self
.endpoint
= discover
.OpenIDServiceEndpoint()
606 self
.failIf(self
.endpoint
.isOPIdentifier())
608 def test_openid1_0(self
):
609 self
.endpoint
.type_uris
= [discover
.OPENID_1_0_TYPE
]
610 self
.failIf(self
.endpoint
.isOPIdentifier())
612 def test_openid1_1(self
):
613 self
.endpoint
.type_uris
= [discover
.OPENID_1_1_TYPE
]
614 self
.failIf(self
.endpoint
.isOPIdentifier())
616 def test_openid2(self
):
617 self
.endpoint
.type_uris
= [discover
.OPENID_2_0_TYPE
]
618 self
.failIf(self
.endpoint
.isOPIdentifier())
620 def test_openid2OP(self
):
621 self
.endpoint
.type_uris
= [discover
.OPENID_IDP_2_0_TYPE
]
622 self
.failUnless(self
.endpoint
.isOPIdentifier())
624 def test_multipleMissing(self
):
625 self
.endpoint
.type_uris
= [discover
.OPENID_2_0_TYPE
,
626 discover
.OPENID_1_0_TYPE
]
627 self
.failIf(self
.endpoint
.isOPIdentifier())
629 def test_multiplePresent(self
):
630 self
.endpoint
.type_uris
= [discover
.OPENID_2_0_TYPE
,
631 discover
.OPENID_1_0_TYPE
,
632 discover
.OPENID_IDP_2_0_TYPE
]
633 self
.failUnless(self
.endpoint
.isOPIdentifier())
635 class TestFromOPEndpointURL(unittest
.TestCase
):
637 self
.op_endpoint_url
= 'http://example.com/op/endpoint'
638 self
.endpoint
= discover
.OpenIDServiceEndpoint
.fromOPEndpointURL(
639 self
.op_endpoint_url
)
641 def test_isOPEndpoint(self
):
642 self
.failUnless(self
.endpoint
.isOPIdentifier())
644 def test_noIdentifiers(self
):
645 self
.failUnlessEqual(self
.endpoint
.getLocalID(), None)
646 self
.failUnlessEqual(self
.endpoint
.claimed_id
, None)
648 def test_compatibility(self
):
649 self
.failIf(self
.endpoint
.compatibilityMode())
651 def test_canonicalID(self
):
652 self
.failUnlessEqual(self
.endpoint
.canonicalID
, None)
654 def test_serverURL(self
):
655 self
.failUnlessEqual(self
.endpoint
.server_url
, self
.op_endpoint_url
)
657 class TestDiscoverFunction(unittest
.TestCase
):
659 self
._old
_discoverURI
= discover
.discoverURI
660 self
._old
_discoverXRI
= discover
.discoverXRI
662 discover
.discoverXRI
= self
.discoverXRI
663 discover
.discoverURI
= self
.discoverURI
666 discover
.discoverURI
= self
._old
_discoverURI
667 discover
.discoverXRI
= self
._old
_discoverXRI
669 def discoverXRI(self
, identifier
):
672 def discoverURI(self
, identifier
):
676 self
.failUnlessEqual('URI', discover
.discover('http://woo!'))
678 def test_uriForBogus(self
):
679 self
.failUnlessEqual('URI', discover
.discover('not a URL or XRI'))
682 self
.failUnlessEqual('XRI', discover
.discover('xri://=something'))
684 def test_xriChar(self
):
685 self
.failUnlessEqual('XRI', discover
.discover('=something'))
687 class TestEndpointSupportsType(unittest
.TestCase
):
689 self
.endpoint
= discover
.OpenIDServiceEndpoint()
691 def failUnlessSupportsOnly(self
, *types
):
694 discover
.OPENID_1_1_TYPE
,
695 discover
.OPENID_1_0_TYPE
,
696 discover
.OPENID_2_0_TYPE
,
697 discover
.OPENID_IDP_2_0_TYPE
,
700 self
.failUnless(self
.endpoint
.supportsType(t
),
701 "Must support %r" % (t
,))
703 self
.failIf(self
.endpoint
.supportsType(t
),
704 "Shouldn't support %r" % (t
,))
706 def test_supportsNothing(self
):
707 self
.failUnlessSupportsOnly()
709 def test_openid2(self
):
710 self
.endpoint
.type_uris
= [discover
.OPENID_2_0_TYPE
]
711 self
.failUnlessSupportsOnly(discover
.OPENID_2_0_TYPE
)
713 def test_openid2provider(self
):
714 self
.endpoint
.type_uris
= [discover
.OPENID_IDP_2_0_TYPE
]
715 self
.failUnlessSupportsOnly(discover
.OPENID_IDP_2_0_TYPE
,
716 discover
.OPENID_2_0_TYPE
)
718 def test_openid1_0(self
):
719 self
.endpoint
.type_uris
= [discover
.OPENID_1_0_TYPE
]
720 self
.failUnlessSupportsOnly(discover
.OPENID_1_0_TYPE
)
722 def test_openid1_1(self
):
723 self
.endpoint
.type_uris
= [discover
.OPENID_1_1_TYPE
]
724 self
.failUnlessSupportsOnly(discover
.OPENID_1_1_TYPE
)
726 def test_multiple(self
):
727 self
.endpoint
.type_uris
= [discover
.OPENID_1_1_TYPE
,
728 discover
.OPENID_2_0_TYPE
]
729 self
.failUnlessSupportsOnly(discover
.OPENID_1_1_TYPE
,
730 discover
.OPENID_2_0_TYPE
)
732 def test_multipleWithProvider(self
):
733 self
.endpoint
.type_uris
= [discover
.OPENID_1_1_TYPE
,
734 discover
.OPENID_2_0_TYPE
,
735 discover
.OPENID_IDP_2_0_TYPE
]
736 self
.failUnlessSupportsOnly(discover
.OPENID_1_1_TYPE
,
737 discover
.OPENID_2_0_TYPE
,
738 discover
.OPENID_IDP_2_0_TYPE
,
742 class TestEndpointDisplayIdentifier(unittest
.TestCase
):
743 def test_strip_fragment(self
):
744 endpoint
= discover
.OpenIDServiceEndpoint()
745 endpoint
.claimed_id
= 'http://recycled.invalid/#123'
746 self
.failUnlessEqual('http://recycled.invalid/', endpoint
.getDisplayIdentifier())
750 return datadriven
.loadTests(__name__
)
752 if __name__
== '__main__':
753 suite
= pyUnitTests()
754 runner
= unittest
.TextTestRunner()