getting file size for all dict files to be downloaded. coming to be 400mb or so.
[worddb.git] / libs / openid / test / test_server.py
blob0325f6cd7ef6955bfccf018bd31171e9dfa5af42
1 """Tests for openid.server.
2 """
3 from openid.server import server
4 from openid import association, cryptutil, oidutil
5 from openid.message import Message, OPENID_NS, OPENID2_NS, OPENID1_NS, \
6 IDENTIFIER_SELECT, no_default, OPENID1_URL_LIMIT
7 from openid.store import memstore
8 import cgi
10 import unittest
11 import warnings
13 from urlparse import urlparse
15 # In general, if you edit or add tests here, try to move in the direction
16 # of testing smaller units. For testing the external interfaces, we'll be
17 # developing an implementation-agnostic testing suite.
19 # for more, see /etc/ssh/moduli
21 ALT_MODULUS = 0xCAADDDEC1667FC68B5FA15D53C4E1532DD24561A1A2D47A12C01ABEA1E00731F6921AAC40742311FDF9E634BB7131BEE1AF240261554389A910425E044E88C8359B010F5AD2B80E29CB1A5B027B19D9E01A6F63A6F45E5D7ED2FF6A2A0085050A7D0CF307C3DB51D2490355907B4427C23A98DF1EB8ABEF2BA209BB7AFFE86A7
22 ALT_GEN = 5
24 class CatchLogs(object):
25 def setUp(self):
26 self.old_logger = oidutil.log
27 oidutil.log = self.gotLogMessage
28 self.messages = []
30 def gotLogMessage(self, message):
31 self.messages.append(message)
33 def tearDown(self):
34 oidutil.log = self.old_logger
36 class TestProtocolError(unittest.TestCase):
37 def test_browserWithReturnTo(self):
38 return_to = "http://rp.unittest/consumer"
39 # will be a ProtocolError raised by Decode or CheckIDRequest.answer
40 args = Message.fromPostArgs({
41 'openid.mode': 'monkeydance',
42 'openid.identity': 'http://wagu.unittest/',
43 'openid.return_to': return_to,
45 e = server.ProtocolError(args, "plucky")
46 self.failUnless(e.hasReturnTo())
47 expected_args = {
48 'openid.mode': ['error'],
49 'openid.error': ['plucky'],
52 rt_base, result_args = e.encodeToURL().split('?', 1)
53 result_args = cgi.parse_qs(result_args)
54 self.failUnlessEqual(result_args, expected_args)
56 def test_browserWithReturnTo_OpenID2_GET(self):
57 return_to = "http://rp.unittest/consumer"
58 # will be a ProtocolError raised by Decode or CheckIDRequest.answer
59 args = Message.fromPostArgs({
60 'openid.ns': OPENID2_NS,
61 'openid.mode': 'monkeydance',
62 'openid.identity': 'http://wagu.unittest/',
63 'openid.claimed_id': 'http://wagu.unittest/',
64 'openid.return_to': return_to,
66 e = server.ProtocolError(args, "plucky")
67 self.failUnless(e.hasReturnTo())
68 expected_args = {
69 'openid.ns': [OPENID2_NS],
70 'openid.mode': ['error'],
71 'openid.error': ['plucky'],
74 rt_base, result_args = e.encodeToURL().split('?', 1)
75 result_args = cgi.parse_qs(result_args)
76 self.failUnlessEqual(result_args, expected_args)
78 def test_browserWithReturnTo_OpenID2_POST(self):
79 return_to = "http://rp.unittest/consumer" + ('x' * OPENID1_URL_LIMIT)
80 # will be a ProtocolError raised by Decode or CheckIDRequest.answer
81 args = Message.fromPostArgs({
82 'openid.ns': OPENID2_NS,
83 'openid.mode': 'monkeydance',
84 'openid.identity': 'http://wagu.unittest/',
85 'openid.claimed_id': 'http://wagu.unittest/',
86 'openid.return_to': return_to,
88 e = server.ProtocolError(args, "plucky")
89 self.failUnless(e.hasReturnTo())
90 expected_args = {
91 'openid.ns': [OPENID2_NS],
92 'openid.mode': ['error'],
93 'openid.error': ['plucky'],
96 self.failUnless(e.whichEncoding() == server.ENCODE_HTML_FORM)
97 self.failUnless(e.toFormMarkup() == e.toMessage().toFormMarkup(
98 args.getArg(OPENID_NS, 'return_to')))
100 def test_browserWithReturnTo_OpenID1_exceeds_limit(self):
101 return_to = "http://rp.unittest/consumer" + ('x' * OPENID1_URL_LIMIT)
102 # will be a ProtocolError raised by Decode or CheckIDRequest.answer
103 args = Message.fromPostArgs({
104 'openid.mode': 'monkeydance',
105 'openid.identity': 'http://wagu.unittest/',
106 'openid.return_to': return_to,
108 e = server.ProtocolError(args, "plucky")
109 self.failUnless(e.hasReturnTo())
110 expected_args = {
111 'openid.mode': ['error'],
112 'openid.error': ['plucky'],
115 self.failUnless(e.whichEncoding() == server.ENCODE_URL)
117 rt_base, result_args = e.encodeToURL().split('?', 1)
118 result_args = cgi.parse_qs(result_args)
119 self.failUnlessEqual(result_args, expected_args)
121 def test_noReturnTo(self):
122 # will be a ProtocolError raised by Decode or CheckIDRequest.answer
123 args = Message.fromPostArgs({
124 'openid.mode': 'zebradance',
125 'openid.identity': 'http://wagu.unittest/',
127 e = server.ProtocolError(args, "waffles")
128 self.failIf(e.hasReturnTo())
129 expected = """error:waffles
130 mode:error
132 self.failUnlessEqual(e.encodeToKVForm(), expected)
135 def test_noMessage(self):
136 e = server.ProtocolError(None, "no moar pancakes")
137 self.failIf(e.hasReturnTo())
138 self.failUnlessEqual(e.whichEncoding(), None)
141 class TestDecode(unittest.TestCase):
142 def setUp(self):
143 self.claimed_id = 'http://de.legating.de.coder.unittest/'
144 self.id_url = "http://decoder.am.unittest/"
145 self.rt_url = "http://rp.unittest/foobot/?qux=zam"
146 self.tr_url = "http://rp.unittest/"
147 self.assoc_handle = "{assoc}{handle}"
148 self.op_endpoint = 'http://endpoint.unittest/encode'
149 self.store = memstore.MemoryStore()
150 self.server = server.Server(self.store, self.op_endpoint)
151 self.decode = self.server.decoder.decode
152 self.decode = server.Decoder(self.server).decode
154 def test_none(self):
155 args = {}
156 r = self.decode(args)
157 self.failUnlessEqual(r, None)
159 def test_irrelevant(self):
160 args = {
161 'pony': 'spotted',
162 'sreg.mutant_power': 'decaffinator',
164 self.failUnlessRaises(server.ProtocolError, self.decode, args)
166 def test_bad(self):
167 args = {
168 'openid.mode': 'twos-compliment',
169 'openid.pants': 'zippered',
171 self.failUnlessRaises(server.ProtocolError, self.decode, args)
173 def test_dictOfLists(self):
174 args = {
175 'openid.mode': ['checkid_setup'],
176 'openid.identity': self.id_url,
177 'openid.assoc_handle': self.assoc_handle,
178 'openid.return_to': self.rt_url,
179 'openid.trust_root': self.tr_url,
181 try:
182 result = self.decode(args)
183 except TypeError, err:
184 self.failUnless(str(err).find('values') != -1, err)
185 else:
186 self.fail("Expected TypeError, but got result %s" % (result,))
188 def test_checkidImmediate(self):
189 args = {
190 'openid.mode': 'checkid_immediate',
191 'openid.identity': self.id_url,
192 'openid.assoc_handle': self.assoc_handle,
193 'openid.return_to': self.rt_url,
194 'openid.trust_root': self.tr_url,
195 # should be ignored
196 'openid.some.extension': 'junk',
198 r = self.decode(args)
199 self.failUnless(isinstance(r, server.CheckIDRequest))
200 self.failUnlessEqual(r.mode, "checkid_immediate")
201 self.failUnlessEqual(r.immediate, True)
202 self.failUnlessEqual(r.identity, self.id_url)
203 self.failUnlessEqual(r.trust_root, self.tr_url)
204 self.failUnlessEqual(r.return_to, self.rt_url)
205 self.failUnlessEqual(r.assoc_handle, self.assoc_handle)
207 def test_checkidSetup(self):
208 args = {
209 'openid.mode': 'checkid_setup',
210 'openid.identity': self.id_url,
211 'openid.assoc_handle': self.assoc_handle,
212 'openid.return_to': self.rt_url,
213 'openid.trust_root': self.tr_url,
215 r = self.decode(args)
216 self.failUnless(isinstance(r, server.CheckIDRequest))
217 self.failUnlessEqual(r.mode, "checkid_setup")
218 self.failUnlessEqual(r.immediate, False)
219 self.failUnlessEqual(r.identity, self.id_url)
220 self.failUnlessEqual(r.trust_root, self.tr_url)
221 self.failUnlessEqual(r.return_to, self.rt_url)
223 def test_checkidSetupOpenID2(self):
224 args = {
225 'openid.ns': OPENID2_NS,
226 'openid.mode': 'checkid_setup',
227 'openid.identity': self.id_url,
228 'openid.claimed_id': self.claimed_id,
229 'openid.assoc_handle': self.assoc_handle,
230 'openid.return_to': self.rt_url,
231 'openid.realm': self.tr_url,
233 r = self.decode(args)
234 self.failUnless(isinstance(r, server.CheckIDRequest))
235 self.failUnlessEqual(r.mode, "checkid_setup")
236 self.failUnlessEqual(r.immediate, False)
237 self.failUnlessEqual(r.identity, self.id_url)
238 self.failUnlessEqual(r.claimed_id, self.claimed_id)
239 self.failUnlessEqual(r.trust_root, self.tr_url)
240 self.failUnlessEqual(r.return_to, self.rt_url)
242 def test_checkidSetupNoClaimedIDOpenID2(self):
243 args = {
244 'openid.ns': OPENID2_NS,
245 'openid.mode': 'checkid_setup',
246 'openid.identity': self.id_url,
247 'openid.assoc_handle': self.assoc_handle,
248 'openid.return_to': self.rt_url,
249 'openid.realm': self.tr_url,
251 self.failUnlessRaises(server.ProtocolError, self.decode, args)
253 def test_checkidSetupNoIdentityOpenID2(self):
254 args = {
255 'openid.ns': OPENID2_NS,
256 'openid.mode': 'checkid_setup',
257 'openid.assoc_handle': self.assoc_handle,
258 'openid.return_to': self.rt_url,
259 'openid.realm': self.tr_url,
261 r = self.decode(args)
262 self.failUnless(isinstance(r, server.CheckIDRequest))
263 self.failUnlessEqual(r.mode, "checkid_setup")
264 self.failUnlessEqual(r.immediate, False)
265 self.failUnlessEqual(r.identity, None)
266 self.failUnlessEqual(r.trust_root, self.tr_url)
267 self.failUnlessEqual(r.return_to, self.rt_url)
269 def test_checkidSetupNoReturnOpenID1(self):
270 """Make sure an OpenID 1 request cannot be decoded if it lacks
271 a return_to.
273 args = {
274 'openid.mode': 'checkid_setup',
275 'openid.identity': self.id_url,
276 'openid.assoc_handle': self.assoc_handle,
277 'openid.trust_root': self.tr_url,
279 self.failUnlessRaises(server.ProtocolError, self.decode, args)
281 def test_checkidSetupNoReturnOpenID2(self):
282 """Make sure an OpenID 2 request with no return_to can be
283 decoded, and make sure a response to such a request raises
284 NoReturnToError.
286 args = {
287 'openid.ns': OPENID2_NS,
288 'openid.mode': 'checkid_setup',
289 'openid.identity': self.id_url,
290 'openid.claimed_id': self.id_url,
291 'openid.assoc_handle': self.assoc_handle,
292 'openid.realm': self.tr_url,
294 self.failUnless(isinstance(self.decode(args), server.CheckIDRequest))
296 req = self.decode(args)
297 self.assertRaises(server.NoReturnToError, req.answer, False)
298 self.assertRaises(server.NoReturnToError, req.encodeToURL, 'bogus')
299 self.assertRaises(server.NoReturnToError, req.getCancelURL)
301 def test_checkidSetupRealmRequiredOpenID2(self):
302 """Make sure that an OpenID 2 request which lacks return_to
303 cannot be decoded if it lacks a realm. Spec: This value
304 (openid.realm) MUST be sent if openid.return_to is omitted.
306 args = {
307 'openid.ns': OPENID2_NS,
308 'openid.mode': 'checkid_setup',
309 'openid.identity': self.id_url,
310 'openid.assoc_handle': self.assoc_handle,
312 self.failUnlessRaises(server.ProtocolError, self.decode, args)
314 def test_checkidSetupBadReturn(self):
315 args = {
316 'openid.mode': 'checkid_setup',
317 'openid.identity': self.id_url,
318 'openid.assoc_handle': self.assoc_handle,
319 'openid.return_to': 'not a url',
321 try:
322 result = self.decode(args)
323 except server.ProtocolError, err:
324 self.failUnless(err.openid_message)
325 else:
326 self.fail("Expected ProtocolError, instead returned with %s" %
327 (result,))
329 def test_checkidSetupUntrustedReturn(self):
330 args = {
331 'openid.mode': 'checkid_setup',
332 'openid.identity': self.id_url,
333 'openid.assoc_handle': self.assoc_handle,
334 'openid.return_to': self.rt_url,
335 'openid.trust_root': 'http://not-the-return-place.unittest/',
337 try:
338 result = self.decode(args)
339 except server.UntrustedReturnURL, err:
340 self.failUnless(err.openid_message)
341 else:
342 self.fail("Expected UntrustedReturnURL, instead returned with %s" %
343 (result,))
345 def test_checkAuth(self):
346 args = {
347 'openid.mode': 'check_authentication',
348 'openid.assoc_handle': '{dumb}{handle}',
349 'openid.sig': 'sigblob',
350 'openid.signed': 'identity,return_to,response_nonce,mode',
351 'openid.identity': 'signedval1',
352 'openid.return_to': 'signedval2',
353 'openid.response_nonce': 'signedval3',
354 'openid.baz': 'unsigned',
356 r = self.decode(args)
357 self.failUnless(isinstance(r, server.CheckAuthRequest))
358 self.failUnlessEqual(r.mode, 'check_authentication')
359 self.failUnlessEqual(r.sig, 'sigblob')
362 def test_checkAuthMissingSignature(self):
363 args = {
364 'openid.mode': 'check_authentication',
365 'openid.assoc_handle': '{dumb}{handle}',
366 'openid.signed': 'foo,bar,mode',
367 'openid.foo': 'signedval1',
368 'openid.bar': 'signedval2',
369 'openid.baz': 'unsigned',
371 self.failUnlessRaises(server.ProtocolError, self.decode, args)
374 def test_checkAuthAndInvalidate(self):
375 args = {
376 'openid.mode': 'check_authentication',
377 'openid.assoc_handle': '{dumb}{handle}',
378 'openid.invalidate_handle': '[[SMART_handle]]',
379 'openid.sig': 'sigblob',
380 'openid.signed': 'identity,return_to,response_nonce,mode',
381 'openid.identity': 'signedval1',
382 'openid.return_to': 'signedval2',
383 'openid.response_nonce': 'signedval3',
384 'openid.baz': 'unsigned',
386 r = self.decode(args)
387 self.failUnless(isinstance(r, server.CheckAuthRequest))
388 self.failUnlessEqual(r.invalidate_handle, '[[SMART_handle]]')
391 def test_associateDH(self):
392 args = {
393 'openid.mode': 'associate',
394 'openid.session_type': 'DH-SHA1',
395 'openid.dh_consumer_public': "Rzup9265tw==",
397 r = self.decode(args)
398 self.failUnless(isinstance(r, server.AssociateRequest))
399 self.failUnlessEqual(r.mode, "associate")
400 self.failUnlessEqual(r.session.session_type, "DH-SHA1")
401 self.failUnlessEqual(r.assoc_type, "HMAC-SHA1")
402 self.failUnless(r.session.consumer_pubkey)
404 def test_associateDHMissingKey(self):
405 """Trying DH assoc w/o public key"""
406 args = {
407 'openid.mode': 'associate',
408 'openid.session_type': 'DH-SHA1',
410 # Using DH-SHA1 without supplying dh_consumer_public is an error.
411 self.failUnlessRaises(server.ProtocolError, self.decode, args)
414 def test_associateDHpubKeyNotB64(self):
415 args = {
416 'openid.mode': 'associate',
417 'openid.session_type': 'DH-SHA1',
418 'openid.dh_consumer_public': "donkeydonkeydonkey",
420 self.failUnlessRaises(server.ProtocolError, self.decode, args)
423 def test_associateDHModGen(self):
424 # test dh with non-default but valid values for dh_modulus and dh_gen
425 args = {
426 'openid.mode': 'associate',
427 'openid.session_type': 'DH-SHA1',
428 'openid.dh_consumer_public': "Rzup9265tw==",
429 'openid.dh_modulus': cryptutil.longToBase64(ALT_MODULUS),
430 'openid.dh_gen': cryptutil.longToBase64(ALT_GEN) ,
432 r = self.decode(args)
433 self.failUnless(isinstance(r, server.AssociateRequest))
434 self.failUnlessEqual(r.mode, "associate")
435 self.failUnlessEqual(r.session.session_type, "DH-SHA1")
436 self.failUnlessEqual(r.assoc_type, "HMAC-SHA1")
437 self.failUnlessEqual(r.session.dh.modulus, ALT_MODULUS)
438 self.failUnlessEqual(r.session.dh.generator, ALT_GEN)
439 self.failUnless(r.session.consumer_pubkey)
442 def test_associateDHCorruptModGen(self):
443 # test dh with non-default but valid values for dh_modulus and dh_gen
444 args = {
445 'openid.mode': 'associate',
446 'openid.session_type': 'DH-SHA1',
447 'openid.dh_consumer_public': "Rzup9265tw==",
448 'openid.dh_modulus': 'pizza',
449 'openid.dh_gen': 'gnocchi',
451 self.failUnlessRaises(server.ProtocolError, self.decode, args)
454 def test_associateDHMissingModGen(self):
455 # test dh with non-default but valid values for dh_modulus and dh_gen
456 args = {
457 'openid.mode': 'associate',
458 'openid.session_type': 'DH-SHA1',
459 'openid.dh_consumer_public': "Rzup9265tw==",
460 'openid.dh_modulus': 'pizza',
462 self.failUnlessRaises(server.ProtocolError, self.decode, args)
465 # def test_associateDHInvalidModGen(self):
466 # # test dh with properly encoded values that are not a valid
467 # # modulus/generator combination.
468 # args = {
469 # 'openid.mode': 'associate',
470 # 'openid.session_type': 'DH-SHA1',
471 # 'openid.dh_consumer_public': "Rzup9265tw==",
472 # 'openid.dh_modulus': cryptutil.longToBase64(9),
473 # 'openid.dh_gen': cryptutil.longToBase64(27) ,
475 # self.failUnlessRaises(server.ProtocolError, self.decode, args)
476 # test_associateDHInvalidModGen.todo = "low-priority feature"
479 def test_associateWeirdSession(self):
480 args = {
481 'openid.mode': 'associate',
482 'openid.session_type': 'FLCL6',
483 'openid.dh_consumer_public': "YQ==\n",
485 self.failUnlessRaises(server.ProtocolError, self.decode, args)
488 def test_associatePlain(self):
489 args = {
490 'openid.mode': 'associate',
492 r = self.decode(args)
493 self.failUnless(isinstance(r, server.AssociateRequest))
494 self.failUnlessEqual(r.mode, "associate")
495 self.failUnlessEqual(r.session.session_type, "no-encryption")
496 self.failUnlessEqual(r.assoc_type, "HMAC-SHA1")
498 def test_nomode(self):
499 args = {
500 'openid.session_type': 'DH-SHA1',
501 'openid.dh_consumer_public': "my public keeey",
503 self.failUnlessRaises(server.ProtocolError, self.decode, args)
505 def test_invalidns(self):
506 args = {'openid.ns': 'Tuesday',
507 'openid.mode': 'associate'}
509 try:
510 r = self.decode(args)
511 except server.ProtocolError, err:
512 # Assert that the ProtocolError does have a Message attached
513 # to it, even though the request wasn't a well-formed Message.
514 self.failUnless(err.openid_message)
515 # The error message contains the bad openid.ns.
516 self.failUnless('Tuesday' in str(err), str(err))
517 else:
518 self.fail("Expected ProtocolError but returned with %r" % (r,))
521 class TestEncode(unittest.TestCase):
522 def setUp(self):
523 self.encoder = server.Encoder()
524 self.encode = self.encoder.encode
525 self.op_endpoint = 'http://endpoint.unittest/encode'
526 self.store = memstore.MemoryStore()
527 self.server = server.Server(self.store, self.op_endpoint)
529 def test_id_res_OpenID2_GET(self):
531 Check that when an OpenID 2 response does not exceed the
532 OpenID 1 message size, a GET response (i.e., redirect) is
533 issued.
535 request = server.CheckIDRequest(
536 identity = 'http://bombom.unittest/',
537 trust_root = 'http://burr.unittest/',
538 return_to = 'http://burr.unittest/999',
539 immediate = False,
540 op_endpoint = self.server.op_endpoint,
542 request.message = Message(OPENID2_NS)
543 response = server.OpenIDResponse(request)
544 response.fields = Message.fromOpenIDArgs({
545 'ns': OPENID2_NS,
546 'mode': 'id_res',
547 'identity': request.identity,
548 'claimed_id': request.identity,
549 'return_to': request.return_to,
552 self.failIf(response.renderAsForm())
553 self.failUnless(response.whichEncoding() == server.ENCODE_URL)
554 webresponse = self.encode(response)
555 self.failUnless(webresponse.headers.has_key('location'))
557 def test_id_res_OpenID2_POST(self):
559 Check that when an OpenID 2 response exceeds the OpenID 1
560 message size, a POST response (i.e., an HTML form) is
561 returned.
563 request = server.CheckIDRequest(
564 identity = 'http://bombom.unittest/',
565 trust_root = 'http://burr.unittest/',
566 return_to = 'http://burr.unittest/999',
567 immediate = False,
568 op_endpoint = self.server.op_endpoint,
570 request.message = Message(OPENID2_NS)
571 response = server.OpenIDResponse(request)
572 response.fields = Message.fromOpenIDArgs({
573 'ns': OPENID2_NS,
574 'mode': 'id_res',
575 'identity': request.identity,
576 'claimed_id': request.identity,
577 'return_to': 'x' * OPENID1_URL_LIMIT,
580 self.failUnless(response.renderAsForm())
581 self.failUnless(len(response.encodeToURL()) > OPENID1_URL_LIMIT)
582 self.failUnless(response.whichEncoding() == server.ENCODE_HTML_FORM)
583 webresponse = self.encode(response)
584 self.failUnlessEqual(webresponse.body, response.toFormMarkup())
586 def test_toFormMarkup(self):
587 request = server.CheckIDRequest(
588 identity = 'http://bombom.unittest/',
589 trust_root = 'http://burr.unittest/',
590 return_to = 'http://burr.unittest/999',
591 immediate = False,
592 op_endpoint = self.server.op_endpoint,
594 request.message = Message(OPENID2_NS)
595 response = server.OpenIDResponse(request)
596 response.fields = Message.fromOpenIDArgs({
597 'ns': OPENID2_NS,
598 'mode': 'id_res',
599 'identity': request.identity,
600 'claimed_id': request.identity,
601 'return_to': 'x' * OPENID1_URL_LIMIT,
604 form_markup = response.toFormMarkup({'foo':'bar'})
605 self.failUnless(' foo="bar"' in form_markup)
607 def test_toHTML(self):
608 request = server.CheckIDRequest(
609 identity = 'http://bombom.unittest/',
610 trust_root = 'http://burr.unittest/',
611 return_to = 'http://burr.unittest/999',
612 immediate = False,
613 op_endpoint = self.server.op_endpoint,
615 request.message = Message(OPENID2_NS)
616 response = server.OpenIDResponse(request)
617 response.fields = Message.fromOpenIDArgs({
618 'ns': OPENID2_NS,
619 'mode': 'id_res',
620 'identity': request.identity,
621 'claimed_id': request.identity,
622 'return_to': 'x' * OPENID1_URL_LIMIT,
624 html = response.toHTML()
625 self.failUnless('<html>' in html)
626 self.failUnless('</html>' in html)
627 self.failUnless('<body onload=' in html)
628 self.failUnless('<form' in html)
629 self.failUnless('http://bombom.unittest/' in html)
631 def test_id_res_OpenID1_exceeds_limit(self):
633 Check that when an OpenID 1 response exceeds the OpenID 1
634 message size, a GET response is issued. Technically, this
635 shouldn't be permitted by the library, but this test is in
636 place to preserve the status quo for OpenID 1.
638 request = server.CheckIDRequest(
639 identity = 'http://bombom.unittest/',
640 trust_root = 'http://burr.unittest/',
641 return_to = 'http://burr.unittest/999',
642 immediate = False,
643 op_endpoint = self.server.op_endpoint,
645 request.message = Message(OPENID2_NS)
646 response = server.OpenIDResponse(request)
647 response.fields = Message.fromOpenIDArgs({
648 'mode': 'id_res',
649 'identity': request.identity,
650 'return_to': 'x' * OPENID1_URL_LIMIT,
653 self.failIf(response.renderAsForm())
654 self.failUnless(len(response.encodeToURL()) > OPENID1_URL_LIMIT)
655 self.failUnless(response.whichEncoding() == server.ENCODE_URL)
656 webresponse = self.encode(response)
657 self.failUnlessEqual(webresponse.headers['location'], response.encodeToURL())
659 def test_id_res(self):
660 request = server.CheckIDRequest(
661 identity = 'http://bombom.unittest/',
662 trust_root = 'http://burr.unittest/',
663 return_to = 'http://burr.unittest/999',
664 immediate = False,
665 op_endpoint = self.server.op_endpoint,
667 request.message = Message(OPENID2_NS)
668 response = server.OpenIDResponse(request)
669 response.fields = Message.fromOpenIDArgs({
670 'mode': 'id_res',
671 'identity': request.identity,
672 'return_to': request.return_to,
674 webresponse = self.encode(response)
675 self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
676 self.failUnless(webresponse.headers.has_key('location'))
678 location = webresponse.headers['location']
679 self.failUnless(location.startswith(request.return_to),
680 "%s does not start with %s" % (location,
681 request.return_to))
682 # argh.
683 q2 = dict(cgi.parse_qsl(urlparse(location)[4]))
684 expected = response.fields.toPostArgs()
685 self.failUnlessEqual(q2, expected)
687 def test_cancel(self):
688 request = server.CheckIDRequest(
689 identity = 'http://bombom.unittest/',
690 trust_root = 'http://burr.unittest/',
691 return_to = 'http://burr.unittest/999',
692 immediate = False,
693 op_endpoint = self.server.op_endpoint,
695 request.message = Message(OPENID2_NS)
696 response = server.OpenIDResponse(request)
697 response.fields = Message.fromOpenIDArgs({
698 'mode': 'cancel',
700 webresponse = self.encode(response)
701 self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
702 self.failUnless(webresponse.headers.has_key('location'))
704 def test_cancelToForm(self):
705 request = server.CheckIDRequest(
706 identity = 'http://bombom.unittest/',
707 trust_root = 'http://burr.unittest/',
708 return_to = 'http://burr.unittest/999',
709 immediate = False,
710 op_endpoint = self.server.op_endpoint,
712 request.message = Message(OPENID2_NS)
713 response = server.OpenIDResponse(request)
714 response.fields = Message.fromOpenIDArgs({
715 'mode': 'cancel',
717 form = response.toFormMarkup()
718 self.failUnless(form)
720 def test_assocReply(self):
721 msg = Message(OPENID2_NS)
722 msg.setArg(OPENID2_NS, 'session_type', 'no-encryption')
723 request = server.AssociateRequest.fromMessage(msg)
724 response = server.OpenIDResponse(request)
725 response.fields = Message.fromPostArgs(
726 {'openid.assoc_handle': "every-zig"})
727 webresponse = self.encode(response)
728 body = """assoc_handle:every-zig
730 self.failUnlessEqual(webresponse.code, server.HTTP_OK)
731 self.failUnlessEqual(webresponse.headers, {})
732 self.failUnlessEqual(webresponse.body, body)
734 def test_checkauthReply(self):
735 request = server.CheckAuthRequest('a_sock_monkey',
736 'siggggg',
738 response = server.OpenIDResponse(request)
739 response.fields = Message.fromOpenIDArgs({
740 'is_valid': 'true',
741 'invalidate_handle': 'xXxX:xXXx'
743 body = """invalidate_handle:xXxX:xXXx
744 is_valid:true
746 webresponse = self.encode(response)
747 self.failUnlessEqual(webresponse.code, server.HTTP_OK)
748 self.failUnlessEqual(webresponse.headers, {})
749 self.failUnlessEqual(webresponse.body, body)
751 def test_unencodableError(self):
752 args = Message.fromPostArgs({
753 'openid.identity': 'http://limu.unittest/',
755 e = server.ProtocolError(args, "wet paint")
756 self.failUnlessRaises(server.EncodingError, self.encode, e)
758 def test_encodableError(self):
759 args = Message.fromPostArgs({
760 'openid.mode': 'associate',
761 'openid.identity': 'http://limu.unittest/',
763 body="error:snoot\nmode:error\n"
764 webresponse = self.encode(server.ProtocolError(args, "snoot"))
765 self.failUnlessEqual(webresponse.code, server.HTTP_ERROR)
766 self.failUnlessEqual(webresponse.headers, {})
767 self.failUnlessEqual(webresponse.body, body)
771 class TestSigningEncode(unittest.TestCase):
772 def setUp(self):
773 self._dumb_key = server.Signatory._dumb_key
774 self._normal_key = server.Signatory._normal_key
775 self.store = memstore.MemoryStore()
776 self.server = server.Server(self.store, "http://signing.unittest/enc")
777 self.request = server.CheckIDRequest(
778 identity = 'http://bombom.unittest/',
779 trust_root = 'http://burr.unittest/',
780 return_to = 'http://burr.unittest/999',
781 immediate = False,
782 op_endpoint = self.server.op_endpoint,
784 self.request.message = Message(OPENID2_NS)
785 self.response = server.OpenIDResponse(self.request)
786 self.response.fields = Message.fromOpenIDArgs({
787 'mode': 'id_res',
788 'identity': self.request.identity,
789 'return_to': self.request.return_to,
791 self.signatory = server.Signatory(self.store)
792 self.encoder = server.SigningEncoder(self.signatory)
793 self.encode = self.encoder.encode
795 def test_idres(self):
796 assoc_handle = '{bicycle}{shed}'
797 self.store.storeAssociation(
798 self._normal_key,
799 association.Association.fromExpiresIn(60, assoc_handle,
800 'sekrit', 'HMAC-SHA1'))
801 self.request.assoc_handle = assoc_handle
802 webresponse = self.encode(self.response)
803 self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
804 self.failUnless(webresponse.headers.has_key('location'))
806 location = webresponse.headers['location']
807 query = cgi.parse_qs(urlparse(location)[4])
808 self.failUnless('openid.sig' in query)
809 self.failUnless('openid.assoc_handle' in query)
810 self.failUnless('openid.signed' in query)
812 def test_idresDumb(self):
813 webresponse = self.encode(self.response)
814 self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
815 self.failUnless(webresponse.headers.has_key('location'))
817 location = webresponse.headers['location']
818 query = cgi.parse_qs(urlparse(location)[4])
819 self.failUnless('openid.sig' in query)
820 self.failUnless('openid.assoc_handle' in query)
821 self.failUnless('openid.signed' in query)
823 def test_forgotStore(self):
824 self.encoder.signatory = None
825 self.failUnlessRaises(ValueError, self.encode, self.response)
827 def test_cancel(self):
828 request = server.CheckIDRequest(
829 identity = 'http://bombom.unittest/',
830 trust_root = 'http://burr.unittest/',
831 return_to = 'http://burr.unittest/999',
832 immediate = False,
833 op_endpoint = self.server.op_endpoint,
835 request.message = Message(OPENID2_NS)
836 response = server.OpenIDResponse(request)
837 response.fields.setArg(OPENID_NS, 'mode', 'cancel')
838 webresponse = self.encode(response)
839 self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
840 self.failUnless(webresponse.headers.has_key('location'))
841 location = webresponse.headers['location']
842 query = cgi.parse_qs(urlparse(location)[4])
843 self.failIf('openid.sig' in query, response.fields.toPostArgs())
845 def test_assocReply(self):
846 msg = Message(OPENID2_NS)
847 msg.setArg(OPENID2_NS, 'session_type', 'no-encryption')
848 request = server.AssociateRequest.fromMessage(msg)
849 response = server.OpenIDResponse(request)
850 response.fields = Message.fromOpenIDArgs({'assoc_handle': "every-zig"})
851 webresponse = self.encode(response)
852 body = """assoc_handle:every-zig
854 self.failUnlessEqual(webresponse.code, server.HTTP_OK)
855 self.failUnlessEqual(webresponse.headers, {})
856 self.failUnlessEqual(webresponse.body, body)
858 def test_alreadySigned(self):
859 self.response.fields.setArg(OPENID_NS, 'sig', 'priorSig==')
860 self.failUnlessRaises(server.AlreadySigned, self.encode, self.response)
862 class TestCheckID(unittest.TestCase):
863 def setUp(self):
864 self.op_endpoint = 'http://endpoint.unittest/'
865 self.store = memstore.MemoryStore()
866 self.server = server.Server(self.store, self.op_endpoint)
867 self.request = server.CheckIDRequest(
868 identity = 'http://bambam.unittest/',
869 trust_root = 'http://bar.unittest/',
870 return_to = 'http://bar.unittest/999',
871 immediate = False,
872 op_endpoint = self.server.op_endpoint,
874 self.request.message = Message(OPENID2_NS)
876 def test_trustRootInvalid(self):
877 self.request.trust_root = "http://foo.unittest/17"
878 self.request.return_to = "http://foo.unittest/39"
879 self.failIf(self.request.trustRootValid())
881 def test_trustRootValid(self):
882 self.request.trust_root = "http://foo.unittest/"
883 self.request.return_to = "http://foo.unittest/39"
884 self.failUnless(self.request.trustRootValid())
886 def test_malformedTrustRoot(self):
887 self.request.trust_root = "invalid://trust*root/"
888 self.request.return_to = "http://foo.unittest/39"
889 sentinel = object()
890 self.request.message = sentinel
891 try:
892 result = self.request.trustRootValid()
893 except server.MalformedTrustRoot, why:
894 self.failUnless(sentinel is why.openid_message)
895 else:
896 self.fail('Expected MalformedTrustRoot exception. Got %r'
897 % (result,))
899 def test_trustRootValidNoReturnTo(self):
900 request = server.CheckIDRequest(
901 identity = 'http://bambam.unittest/',
902 trust_root = 'http://bar.unittest/',
903 return_to = None,
904 immediate = False,
905 op_endpoint = self.server.op_endpoint,
908 self.failUnless(request.trustRootValid())
910 def test_returnToVerified_callsVerify(self):
911 """Make sure that verifyReturnTo is calling the trustroot
912 function verifyReturnTo
914 def withVerifyReturnTo(new_verify, callable):
915 old_verify = server.verifyReturnTo
916 try:
917 server.verifyReturnTo = new_verify
918 return callable()
919 finally:
920 server.verifyReturnTo = old_verify
922 # Ensure that exceptions are passed through
923 sentinel = Exception()
924 def vrfyExc(trust_root, return_to):
925 self.failUnlessEqual(self.request.trust_root, trust_root)
926 self.failUnlessEqual(self.request.return_to, return_to)
927 raise sentinel
929 try:
930 withVerifyReturnTo(vrfyExc, self.request.returnToVerified)
931 except Exception, e:
932 self.failUnless(e is sentinel, e)
934 # Ensure that True and False are passed through unchanged
935 def constVerify(val):
936 def verify(trust_root, return_to):
937 self.failUnlessEqual(self.request.trust_root, trust_root)
938 self.failUnlessEqual(self.request.return_to, return_to)
939 return val
940 return verify
942 for val in [True, False]:
943 self.failUnlessEqual(
944 val,
945 withVerifyReturnTo(constVerify(val),
946 self.request.returnToVerified))
948 def _expectAnswer(self, answer, identity=None, claimed_id=None):
949 expected_list = [
950 ('mode', 'id_res'),
951 ('return_to', self.request.return_to),
952 ('op_endpoint', self.op_endpoint),
954 if identity:
955 expected_list.append(('identity', identity))
956 if claimed_id:
957 expected_list.append(('claimed_id', claimed_id))
958 else:
959 expected_list.append(('claimed_id', identity))
961 for k, expected in expected_list:
962 actual = answer.fields.getArg(OPENID_NS, k)
963 self.failUnlessEqual(actual, expected, "%s: expected %s, got %s" % (k, expected, actual))
965 self.failUnless(answer.fields.hasKey(OPENID_NS, 'response_nonce'))
966 self.failUnless(answer.fields.getOpenIDNamespace() == OPENID2_NS)
968 # One for nonce, one for ns
969 self.failUnlessEqual(len(answer.fields.toPostArgs()),
970 len(expected_list) + 2,
971 answer.fields.toPostArgs())
973 def test_answerAllow(self):
974 """Check the fields specified by "Positive Assertions"
976 including mode=id_res, identity, claimed_id, op_endpoint, return_to
978 answer = self.request.answer(True)
979 self.failUnlessEqual(answer.request, self.request)
980 self._expectAnswer(answer, self.request.identity)
982 def test_answerAllowDelegatedIdentity(self):
983 self.request.claimed_id = 'http://delegating.unittest/'
984 answer = self.request.answer(True)
985 self._expectAnswer(answer, self.request.identity,
986 self.request.claimed_id)
988 def test_answerAllowWithoutIdentityReally(self):
989 self.request.identity = None
990 answer = self.request.answer(True)
991 self.failUnlessEqual(answer.request, self.request)
992 self._expectAnswer(answer)
994 def test_answerAllowAnonymousFail(self):
995 self.request.identity = None
996 # XXX - Check on this, I think this behavior is legal in OpenID 2.0?
997 self.failUnlessRaises(
998 ValueError, self.request.answer, True, identity="=V")
1000 def test_answerAllowWithIdentity(self):
1001 self.request.identity = IDENTIFIER_SELECT
1002 selected_id = 'http://anon.unittest/9861'
1003 answer = self.request.answer(True, identity=selected_id)
1004 self._expectAnswer(answer, selected_id)
1006 def test_answerAllowWithDelegatedIdentityOpenID2(self):
1007 """Answer an IDENTIFIER_SELECT case with a delegated identifier.
1009 # claimed_id delegates to selected_id here.
1010 self.request.identity = IDENTIFIER_SELECT
1011 selected_id = 'http://anon.unittest/9861'
1012 claimed_id = 'http://monkeyhat.unittest/'
1013 answer = self.request.answer(True, identity=selected_id,
1014 claimed_id=claimed_id)
1015 self._expectAnswer(answer, selected_id, claimed_id)
1017 def test_answerAllowWithDelegatedIdentityOpenID1(self):
1018 """claimed_id parameter doesn't exist in OpenID 1.
1020 self.request.message = Message(OPENID1_NS)
1021 # claimed_id delegates to selected_id here.
1022 self.request.identity = IDENTIFIER_SELECT
1023 selected_id = 'http://anon.unittest/9861'
1024 claimed_id = 'http://monkeyhat.unittest/'
1025 self.failUnlessRaises(server.VersionError,
1026 self.request.answer, True,
1027 identity=selected_id,
1028 claimed_id=claimed_id)
1030 def test_answerAllowWithAnotherIdentity(self):
1031 # XXX - Check on this, I think this behavior is legal in OpenID 2.0?
1032 self.failUnlessRaises(ValueError, self.request.answer, True,
1033 identity="http://pebbles.unittest/")
1035 def test_answerAllowNoIdentityOpenID1(self):
1036 self.request.message = Message(OPENID1_NS)
1037 self.request.identity = None
1038 self.failUnlessRaises(ValueError, self.request.answer, True,
1039 identity=None)
1041 def test_answerAllowForgotEndpoint(self):
1042 self.request.op_endpoint = None
1043 self.failUnlessRaises(RuntimeError, self.request.answer, True)
1045 def test_checkIDWithNoIdentityOpenID1(self):
1046 msg = Message(OPENID1_NS)
1047 msg.setArg(OPENID_NS, 'return_to', 'bogus')
1048 msg.setArg(OPENID_NS, 'trust_root', 'bogus')
1049 msg.setArg(OPENID_NS, 'mode', 'checkid_setup')
1050 msg.setArg(OPENID_NS, 'assoc_handle', 'bogus')
1052 self.failUnlessRaises(server.ProtocolError,
1053 server.CheckIDRequest.fromMessage,
1054 msg, self.server)
1056 def test_fromMessageClaimedIDWithoutIdentityOpenID2(self):
1057 name = 'https://example.myopenid.com'
1059 msg = Message(OPENID2_NS)
1060 msg.setArg(OPENID_NS, 'mode', 'checkid_setup')
1061 msg.setArg(OPENID_NS, 'return_to', 'http://invalid:8000/rt')
1062 msg.setArg(OPENID_NS, 'claimed_id', name)
1064 self.failUnlessRaises(server.ProtocolError,
1065 server.CheckIDRequest.fromMessage,
1066 msg, self.server)
1068 def test_fromMessageIdentityWithoutClaimedIDOpenID2(self):
1069 name = 'https://example.myopenid.com'
1071 msg = Message(OPENID2_NS)
1072 msg.setArg(OPENID_NS, 'mode', 'checkid_setup')
1073 msg.setArg(OPENID_NS, 'return_to', 'http://invalid:8000/rt')
1074 msg.setArg(OPENID_NS, 'identity', name)
1076 self.failUnlessRaises(server.ProtocolError,
1077 server.CheckIDRequest.fromMessage,
1078 msg, self.server)
1080 def test_trustRootOpenID1(self):
1081 """Ignore openid.realm in OpenID 1"""
1082 msg = Message(OPENID1_NS)
1083 msg.setArg(OPENID_NS, 'mode', 'checkid_setup')
1084 msg.setArg(OPENID_NS, 'trust_root', 'http://real_trust_root/')
1085 msg.setArg(OPENID_NS, 'realm', 'http://fake_trust_root/')
1086 msg.setArg(OPENID_NS, 'return_to', 'http://real_trust_root/foo')
1087 msg.setArg(OPENID_NS, 'assoc_handle', 'bogus')
1088 msg.setArg(OPENID_NS, 'identity', 'george')
1090 result = server.CheckIDRequest.fromMessage(msg, self.server.op_endpoint)
1092 self.failUnless(result.trust_root == 'http://real_trust_root/')
1094 def test_trustRootOpenID2(self):
1095 """Ignore openid.trust_root in OpenID 2"""
1096 msg = Message(OPENID2_NS)
1097 msg.setArg(OPENID_NS, 'mode', 'checkid_setup')
1098 msg.setArg(OPENID_NS, 'realm', 'http://real_trust_root/')
1099 msg.setArg(OPENID_NS, 'trust_root', 'http://fake_trust_root/')
1100 msg.setArg(OPENID_NS, 'return_to', 'http://real_trust_root/foo')
1101 msg.setArg(OPENID_NS, 'assoc_handle', 'bogus')
1102 msg.setArg(OPENID_NS, 'identity', 'george')
1103 msg.setArg(OPENID_NS, 'claimed_id', 'george')
1105 result = server.CheckIDRequest.fromMessage(msg, self.server.op_endpoint)
1107 self.failUnless(result.trust_root == 'http://real_trust_root/')
1109 def test_answerAllowNoTrustRoot(self):
1110 self.request.trust_root = None
1111 answer = self.request.answer(True)
1112 self.failUnlessEqual(answer.request, self.request)
1113 self._expectAnswer(answer, self.request.identity)
1115 def test_fromMessageWithoutTrustRoot(self):
1116 msg = Message(OPENID2_NS)
1117 msg.setArg(OPENID_NS, 'mode', 'checkid_setup')
1118 msg.setArg(OPENID_NS, 'return_to', 'http://real_trust_root/foo')
1119 msg.setArg(OPENID_NS, 'assoc_handle', 'bogus')
1120 msg.setArg(OPENID_NS, 'identity', 'george')
1121 msg.setArg(OPENID_NS, 'claimed_id', 'george')
1123 result = server.CheckIDRequest.fromMessage(msg, self.server.op_endpoint)
1125 self.failUnlessEqual(result.trust_root, 'http://real_trust_root/foo')
1127 def test_fromMessageWithEmptyTrustRoot(self):
1128 return_to = u'http://someplace.invalid/?go=thing'
1129 msg = Message.fromPostArgs({
1130 u'openid.assoc_handle': u'{blah}{blah}{OZivdQ==}',
1131 u'openid.claimed_id': u'http://delegated.invalid/',
1132 u'openid.identity': u'http://op-local.example.com/',
1133 u'openid.mode': u'checkid_setup',
1134 u'openid.ns': u'http://openid.net/signon/1.0',
1135 u'openid.return_to': return_to,
1136 u'openid.trust_root': u''})
1138 result = server.CheckIDRequest.fromMessage(msg, self.server.op_endpoint)
1140 self.failUnlessEqual(result.trust_root, return_to)
1142 def test_fromMessageWithoutTrustRootOrReturnTo(self):
1143 msg = Message(OPENID2_NS)
1144 msg.setArg(OPENID_NS, 'mode', 'checkid_setup')
1145 msg.setArg(OPENID_NS, 'assoc_handle', 'bogus')
1146 msg.setArg(OPENID_NS, 'identity', 'george')
1147 msg.setArg(OPENID_NS, 'claimed_id', 'george')
1149 self.failUnlessRaises(server.ProtocolError,
1150 server.CheckIDRequest.fromMessage,
1151 msg, self.server.op_endpoint)
1153 def test_answerAllowNoEndpointOpenID1(self):
1154 """Test .allow() with an OpenID 1.x Message on a CheckIDRequest
1155 built without an op_endpoint parameter.
1157 identity = 'http://bambam.unittest/'
1158 reqmessage = Message.fromOpenIDArgs({
1159 'identity': identity,
1160 'trust_root': 'http://bar.unittest/',
1161 'return_to': 'http://bar.unittest/999',
1163 self.request = server.CheckIDRequest.fromMessage(reqmessage, None)
1164 answer = self.request.answer(True)
1166 expected_list = [
1167 ('mode', 'id_res'),
1168 ('return_to', self.request.return_to),
1169 ('identity', identity),
1172 for k, expected in expected_list:
1173 actual = answer.fields.getArg(OPENID_NS, k)
1174 self.failUnlessEqual(
1175 expected, actual,
1176 "%s: expected %s, got %s" % (k, expected, actual))
1178 self.failUnless(answer.fields.hasKey(OPENID_NS, 'response_nonce'))
1179 self.failUnlessEqual(answer.fields.getOpenIDNamespace(), OPENID1_NS)
1180 self.failUnless(answer.fields.namespaces.isImplicit(OPENID1_NS))
1182 # One for nonce (OpenID v1 namespace is implicit)
1183 self.failUnlessEqual(len(answer.fields.toPostArgs()),
1184 len(expected_list) + 1,
1185 answer.fields.toPostArgs())
1187 def test_answerImmediateDenyOpenID2(self):
1188 """Look for mode=setup_needed in checkid_immediate negative
1189 response in OpenID 2 case.
1191 See specification Responding to Authentication Requests /
1192 Negative Assertions / In Response to Immediate Requests.
1194 self.request.mode = 'checkid_immediate'
1195 self.request.immediate = True
1196 server_url = "http://setup-url.unittest/"
1197 # crappiting setup_url, you dirty my interface with your presence!
1198 answer = self.request.answer(False, server_url=server_url)
1199 self.failUnlessEqual(answer.request, self.request)
1200 self.failUnlessEqual(len(answer.fields.toPostArgs()), 3, answer.fields)
1201 self.failUnlessEqual(answer.fields.getOpenIDNamespace(), OPENID2_NS)
1202 self.failUnlessEqual(answer.fields.getArg(OPENID_NS, 'mode'),
1203 'setup_needed')
1204 # user_setup_url no longer required.
1206 def test_answerImmediateDenyOpenID1(self):
1207 """Look for user_setup_url in checkid_immediate negative
1208 response in OpenID 1 case."""
1209 self.request.message = Message(OPENID1_NS)
1210 self.request.mode = 'checkid_immediate'
1211 self.request.immediate = True
1212 server_url = "http://setup-url.unittest/"
1213 # crappiting setup_url, you dirty my interface with your presence!
1214 answer = self.request.answer(False, server_url=server_url)
1215 self.failUnlessEqual(answer.request, self.request)
1216 self.failUnlessEqual(len(answer.fields.toPostArgs()), 2, answer.fields)
1217 self.failUnlessEqual(answer.fields.getOpenIDNamespace(), OPENID1_NS)
1218 self.failUnless(answer.fields.namespaces.isImplicit(OPENID1_NS))
1219 self.failUnlessEqual(answer.fields.getArg(OPENID_NS, 'mode'), 'id_res')
1220 self.failUnless(answer.fields.getArg(
1221 OPENID_NS, 'user_setup_url', '').startswith(server_url))
1223 def test_answerSetupDeny(self):
1224 answer = self.request.answer(False)
1225 self.failUnlessEqual(answer.fields.getArgs(OPENID_NS), {
1226 'mode': 'cancel',
1229 def test_encodeToURL(self):
1230 server_url = 'http://openid-server.unittest/'
1231 result = self.request.encodeToURL(server_url)
1233 # How to check? How about a round-trip test.
1234 base, result_args = result.split('?', 1)
1235 result_args = dict(cgi.parse_qsl(result_args))
1236 message = Message.fromPostArgs(result_args)
1237 rebuilt_request = server.CheckIDRequest.fromMessage(message,
1238 self.server.op_endpoint)
1239 # argh, lousy hack
1240 self.request.message = message
1241 self.failUnlessEqual(rebuilt_request.__dict__, self.request.__dict__)
1243 def test_getCancelURL(self):
1244 url = self.request.getCancelURL()
1245 rt, query_string = url.split('?')
1246 self.failUnlessEqual(self.request.return_to, rt)
1247 query = dict(cgi.parse_qsl(query_string))
1248 self.failUnlessEqual(query, {'openid.mode':'cancel',
1249 'openid.ns':OPENID2_NS})
1251 def test_getCancelURLimmed(self):
1252 self.request.mode = 'checkid_immediate'
1253 self.request.immediate = True
1254 self.failUnlessRaises(ValueError, self.request.getCancelURL)
1258 class TestCheckIDExtension(unittest.TestCase):
1260 def setUp(self):
1261 self.op_endpoint = 'http://endpoint.unittest/ext'
1262 self.store = memstore.MemoryStore()
1263 self.server = server.Server(self.store, self.op_endpoint)
1264 self.request = server.CheckIDRequest(
1265 identity = 'http://bambam.unittest/',
1266 trust_root = 'http://bar.unittest/',
1267 return_to = 'http://bar.unittest/999',
1268 immediate = False,
1269 op_endpoint = self.server.op_endpoint,
1271 self.request.message = Message(OPENID2_NS)
1272 self.response = server.OpenIDResponse(self.request)
1273 self.response.fields.setArg(OPENID_NS, 'mode', 'id_res')
1274 self.response.fields.setArg(OPENID_NS, 'blue', 'star')
1277 def test_addField(self):
1278 namespace = 'something:'
1279 self.response.fields.setArg(namespace, 'bright', 'potato')
1280 self.failUnlessEqual(self.response.fields.getArgs(OPENID_NS),
1281 {'blue': 'star',
1282 'mode': 'id_res',
1285 self.failUnlessEqual(self.response.fields.getArgs(namespace),
1286 {'bright':'potato'})
1289 def test_addFields(self):
1290 namespace = 'mi5:'
1291 args = {'tangy': 'suspenders',
1292 'bravo': 'inclusion'}
1293 self.response.fields.updateArgs(namespace, args)
1294 self.failUnlessEqual(self.response.fields.getArgs(OPENID_NS),
1295 {'blue': 'star',
1296 'mode': 'id_res',
1298 self.failUnlessEqual(self.response.fields.getArgs(namespace), args)
1302 class MockSignatory(object):
1303 isValid = True
1305 def __init__(self, assoc):
1306 self.assocs = [assoc]
1308 def verify(self, assoc_handle, message):
1309 assert message.hasKey(OPENID_NS, "sig")
1310 if (True, assoc_handle) in self.assocs:
1311 return self.isValid
1312 else:
1313 return False
1315 def getAssociation(self, assoc_handle, dumb):
1316 if (dumb, assoc_handle) in self.assocs:
1317 # This isn't a valid implementation for many uses of this
1318 # function, mind you.
1319 return True
1320 else:
1321 return None
1323 def invalidate(self, assoc_handle, dumb):
1324 if (dumb, assoc_handle) in self.assocs:
1325 self.assocs.remove((dumb, assoc_handle))
1328 class TestCheckAuth(unittest.TestCase):
1329 def setUp(self):
1330 self.assoc_handle = 'mooooooooo'
1331 self.message = Message.fromPostArgs({
1332 'openid.sig': 'signarture',
1333 'one': 'alpha',
1334 'two': 'beta',
1336 self.request = server.CheckAuthRequest(
1337 self.assoc_handle, self.message)
1339 self.signatory = MockSignatory((True, self.assoc_handle))
1341 def test_valid(self):
1342 r = self.request.answer(self.signatory)
1343 self.failUnlessEqual(r.fields.getArgs(OPENID_NS), {'is_valid': 'true'})
1344 self.failUnlessEqual(r.request, self.request)
1346 def test_invalid(self):
1347 self.signatory.isValid = False
1348 r = self.request.answer(self.signatory)
1349 self.failUnlessEqual(r.fields.getArgs(OPENID_NS),
1350 {'is_valid': 'false'})
1352 def test_replay(self):
1353 """Don't validate the same response twice.
1355 From "Checking the Nonce"::
1357 When using "check_authentication", the OP MUST ensure that an
1358 assertion has not yet been accepted with the same value for
1359 "openid.response_nonce".
1361 In this implementation, the assoc_handle is only valid once. And
1362 nonces are a signed component of the message, so they can't be used
1363 with another handle without breaking the sig.
1365 r = self.request.answer(self.signatory)
1366 r = self.request.answer(self.signatory)
1367 self.failUnlessEqual(r.fields.getArgs(OPENID_NS),
1368 {'is_valid': 'false'})
1370 def test_invalidatehandle(self):
1371 self.request.invalidate_handle = "bogusHandle"
1372 r = self.request.answer(self.signatory)
1373 self.failUnlessEqual(r.fields.getArgs(OPENID_NS),
1374 {'is_valid': 'true',
1375 'invalidate_handle': "bogusHandle"})
1376 self.failUnlessEqual(r.request, self.request)
1378 def test_invalidatehandleNo(self):
1379 assoc_handle = 'goodhandle'
1380 self.signatory.assocs.append((False, 'goodhandle'))
1381 self.request.invalidate_handle = assoc_handle
1382 r = self.request.answer(self.signatory)
1383 self.failUnlessEqual(r.fields.getArgs(OPENID_NS), {'is_valid': 'true'})
1386 class TestAssociate(unittest.TestCase):
1387 # TODO: test DH with non-default values for modulus and gen.
1388 # (important to do because we actually had it broken for a while.)
1390 def setUp(self):
1391 self.request = server.AssociateRequest.fromMessage(
1392 Message.fromPostArgs({}))
1393 self.store = memstore.MemoryStore()
1394 self.signatory = server.Signatory(self.store)
1396 def test_dhSHA1(self):
1397 self.assoc = self.signatory.createAssociation(dumb=False, assoc_type='HMAC-SHA1')
1398 from openid.dh import DiffieHellman
1399 from openid.server.server import DiffieHellmanSHA1ServerSession
1400 consumer_dh = DiffieHellman.fromDefaults()
1401 cpub = consumer_dh.public
1402 server_dh = DiffieHellman.fromDefaults()
1403 session = DiffieHellmanSHA1ServerSession(server_dh, cpub)
1404 self.request = server.AssociateRequest(session, 'HMAC-SHA1')
1405 response = self.request.answer(self.assoc)
1406 rfg = lambda f: response.fields.getArg(OPENID_NS, f)
1407 self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA1")
1408 self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle)
1409 self.failIf(rfg("mac_key"))
1410 self.failUnlessEqual(rfg("session_type"), "DH-SHA1")
1411 self.failUnless(rfg("enc_mac_key"))
1412 self.failUnless(rfg("dh_server_public"))
1414 enc_key = rfg("enc_mac_key").decode('base64')
1415 spub = cryptutil.base64ToLong(rfg("dh_server_public"))
1416 secret = consumer_dh.xorSecret(spub, enc_key, cryptutil.sha1)
1417 self.failUnlessEqual(secret, self.assoc.secret)
1420 if not cryptutil.SHA256_AVAILABLE:
1421 warnings.warn("Not running SHA256 tests.")
1422 else:
1423 def test_dhSHA256(self):
1424 self.assoc = self.signatory.createAssociation(
1425 dumb=False, assoc_type='HMAC-SHA256')
1426 from openid.dh import DiffieHellman
1427 from openid.server.server import DiffieHellmanSHA256ServerSession
1428 consumer_dh = DiffieHellman.fromDefaults()
1429 cpub = consumer_dh.public
1430 server_dh = DiffieHellman.fromDefaults()
1431 session = DiffieHellmanSHA256ServerSession(server_dh, cpub)
1432 self.request = server.AssociateRequest(session, 'HMAC-SHA256')
1433 response = self.request.answer(self.assoc)
1434 rfg = lambda f: response.fields.getArg(OPENID_NS, f)
1435 self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA256")
1436 self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle)
1437 self.failIf(rfg("mac_key"))
1438 self.failUnlessEqual(rfg("session_type"), "DH-SHA256")
1439 self.failUnless(rfg("enc_mac_key"))
1440 self.failUnless(rfg("dh_server_public"))
1442 enc_key = rfg("enc_mac_key").decode('base64')
1443 spub = cryptutil.base64ToLong(rfg("dh_server_public"))
1444 secret = consumer_dh.xorSecret(spub, enc_key, cryptutil.sha256)
1445 self.failUnlessEqual(secret, self.assoc.secret)
1447 def test_protoError256(self):
1448 from openid.consumer.consumer import \
1449 DiffieHellmanSHA256ConsumerSession
1451 s256_session = DiffieHellmanSHA256ConsumerSession()
1453 invalid_s256 = {'openid.assoc_type':'HMAC-SHA1',
1454 'openid.session_type':'DH-SHA256',}
1455 invalid_s256.update(s256_session.getRequest())
1457 invalid_s256_2 = {'openid.assoc_type':'MONKEY-PIRATE',
1458 'openid.session_type':'DH-SHA256',}
1459 invalid_s256_2.update(s256_session.getRequest())
1461 bad_request_argss = [
1462 invalid_s256,
1463 invalid_s256_2,
1466 for request_args in bad_request_argss:
1467 message = Message.fromPostArgs(request_args)
1468 self.failUnlessRaises(server.ProtocolError,
1469 server.AssociateRequest.fromMessage,
1470 message)
1472 def test_protoError(self):
1473 from openid.consumer.consumer import DiffieHellmanSHA1ConsumerSession
1475 s1_session = DiffieHellmanSHA1ConsumerSession()
1477 invalid_s1 = {'openid.assoc_type':'HMAC-SHA256',
1478 'openid.session_type':'DH-SHA1',}
1479 invalid_s1.update(s1_session.getRequest())
1481 invalid_s1_2 = {'openid.assoc_type':'ROBOT-NINJA',
1482 'openid.session_type':'DH-SHA1',}
1483 invalid_s1_2.update(s1_session.getRequest())
1485 bad_request_argss = [
1486 {'openid.assoc_type':'Wha?'},
1487 invalid_s1,
1488 invalid_s1_2,
1491 for request_args in bad_request_argss:
1492 message = Message.fromPostArgs(request_args)
1493 self.failUnlessRaises(server.ProtocolError,
1494 server.AssociateRequest.fromMessage,
1495 message)
1497 def test_protoErrorFields(self):
1499 contact = 'user@example.invalid'
1500 reference = 'Trac ticket number MAX_INT'
1501 error = 'poltergeist'
1503 openid1_args = {
1504 'openid.identitiy': 'invalid',
1505 'openid.mode': 'checkid_setup',
1508 openid2_args = dict(openid1_args)
1509 openid2_args.update({'openid.ns': OPENID2_NS})
1511 # Check presence of optional fields in both protocol versions
1513 openid1_msg = Message.fromPostArgs(openid1_args)
1514 p = server.ProtocolError(openid1_msg, error,
1515 contact=contact, reference=reference)
1516 reply = p.toMessage()
1518 self.failUnlessEqual(reply.getArg(OPENID_NS, 'reference'), reference)
1519 self.failUnlessEqual(reply.getArg(OPENID_NS, 'contact'), contact)
1521 openid2_msg = Message.fromPostArgs(openid2_args)
1522 p = server.ProtocolError(openid2_msg, error,
1523 contact=contact, reference=reference)
1524 reply = p.toMessage()
1526 self.failUnlessEqual(reply.getArg(OPENID_NS, 'reference'), reference)
1527 self.failUnlessEqual(reply.getArg(OPENID_NS, 'contact'), contact)
1529 def failUnlessExpiresInMatches(self, msg, expected_expires_in):
1530 expires_in_str = msg.getArg(OPENID_NS, 'expires_in', no_default)
1531 expires_in = int(expires_in_str)
1533 # Slop is necessary because the tests can sometimes get run
1534 # right on a second boundary
1535 slop = 1 # second
1536 difference = expected_expires_in - expires_in
1538 error_message = ('"expires_in" value not within %s of expected: '
1539 'expected=%s, actual=%s' %
1540 (slop, expected_expires_in, expires_in))
1541 self.failUnless(0 <= difference <= slop, error_message)
1543 def test_plaintext(self):
1544 self.assoc = self.signatory.createAssociation(dumb=False, assoc_type='HMAC-SHA1')
1545 response = self.request.answer(self.assoc)
1546 rfg = lambda f: response.fields.getArg(OPENID_NS, f)
1548 self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA1")
1549 self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle)
1551 self.failUnlessExpiresInMatches(
1552 response.fields, self.signatory.SECRET_LIFETIME)
1554 self.failUnlessEqual(
1555 rfg("mac_key"), oidutil.toBase64(self.assoc.secret))
1556 self.failIf(rfg("session_type"))
1557 self.failIf(rfg("enc_mac_key"))
1558 self.failIf(rfg("dh_server_public"))
1560 def test_plaintext_v2(self):
1561 # The main difference between this and the v1 test is that
1562 # session_type is always returned in v2.
1563 args = {
1564 'openid.ns': OPENID2_NS,
1565 'openid.mode': 'associate',
1566 'openid.assoc_type': 'HMAC-SHA1',
1567 'openid.session_type': 'no-encryption',
1569 self.request = server.AssociateRequest.fromMessage(
1570 Message.fromPostArgs(args))
1572 self.failIf(self.request.message.isOpenID1())
1574 self.assoc = self.signatory.createAssociation(
1575 dumb=False, assoc_type='HMAC-SHA1')
1576 response = self.request.answer(self.assoc)
1577 rfg = lambda f: response.fields.getArg(OPENID_NS, f)
1579 self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA1")
1580 self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle)
1582 self.failUnlessExpiresInMatches(
1583 response.fields, self.signatory.SECRET_LIFETIME)
1585 self.failUnlessEqual(
1586 rfg("mac_key"), oidutil.toBase64(self.assoc.secret))
1588 self.failUnlessEqual(rfg("session_type"), "no-encryption")
1589 self.failIf(rfg("enc_mac_key"))
1590 self.failIf(rfg("dh_server_public"))
1592 def test_plaintext256(self):
1593 self.assoc = self.signatory.createAssociation(dumb=False, assoc_type='HMAC-SHA256')
1594 response = self.request.answer(self.assoc)
1595 rfg = lambda f: response.fields.getArg(OPENID_NS, f)
1597 self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA1")
1598 self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle)
1600 self.failUnlessExpiresInMatches(
1601 response.fields, self.signatory.SECRET_LIFETIME)
1603 self.failUnlessEqual(
1604 rfg("mac_key"), oidutil.toBase64(self.assoc.secret))
1605 self.failIf(rfg("session_type"))
1606 self.failIf(rfg("enc_mac_key"))
1607 self.failIf(rfg("dh_server_public"))
1609 def test_unsupportedPrefer(self):
1610 allowed_assoc = 'COLD-PET-RAT'
1611 allowed_sess = 'FROG-BONES'
1612 message = 'This is a unit test'
1614 # Set an OpenID 2 message so answerUnsupported doesn't raise
1615 # ProtocolError.
1616 self.request.message = Message(OPENID2_NS)
1618 response = self.request.answerUnsupported(
1619 message=message,
1620 preferred_session_type=allowed_sess,
1621 preferred_association_type=allowed_assoc,
1623 rfg = lambda f: response.fields.getArg(OPENID_NS, f)
1624 self.failUnlessEqual(rfg('error_code'), 'unsupported-type')
1625 self.failUnlessEqual(rfg('assoc_type'), allowed_assoc)
1626 self.failUnlessEqual(rfg('error'), message)
1627 self.failUnlessEqual(rfg('session_type'), allowed_sess)
1629 def test_unsupported(self):
1630 message = 'This is a unit test'
1632 # Set an OpenID 2 message so answerUnsupported doesn't raise
1633 # ProtocolError.
1634 self.request.message = Message(OPENID2_NS)
1636 response = self.request.answerUnsupported(message)
1637 rfg = lambda f: response.fields.getArg(OPENID_NS, f)
1638 self.failUnlessEqual(rfg('error_code'), 'unsupported-type')
1639 self.failUnlessEqual(rfg('assoc_type'), None)
1640 self.failUnlessEqual(rfg('error'), message)
1641 self.failUnlessEqual(rfg('session_type'), None)
1643 class Counter(object):
1644 def __init__(self):
1645 self.count = 0
1647 def inc(self):
1648 self.count += 1
1650 class TestServer(unittest.TestCase, CatchLogs):
1651 def setUp(self):
1652 self.store = memstore.MemoryStore()
1653 self.server = server.Server(self.store, "http://server.unittest/endpt")
1654 CatchLogs.setUp(self)
1656 def test_dispatch(self):
1657 monkeycalled = Counter()
1658 def monkeyDo(request):
1659 monkeycalled.inc()
1660 r = server.OpenIDResponse(request)
1661 return r
1662 self.server.openid_monkeymode = monkeyDo
1663 request = server.OpenIDRequest()
1664 request.mode = "monkeymode"
1665 request.namespace = OPENID1_NS
1666 webresult = self.server.handleRequest(request)
1667 self.failUnlessEqual(monkeycalled.count, 1)
1669 def test_associate(self):
1670 request = server.AssociateRequest.fromMessage(Message.fromPostArgs({}))
1671 response = self.server.openid_associate(request)
1672 self.failUnless(response.fields.hasKey(OPENID_NS, "assoc_handle"),
1673 "No assoc_handle here: %s" % (response.fields,))
1675 def test_associate2(self):
1676 """Associate when the server has no allowed association types
1678 Gives back an error with error_code and no fallback session or
1679 assoc types."""
1680 self.server.negotiator.setAllowedTypes([])
1682 # Set an OpenID 2 message so answerUnsupported doesn't raise
1683 # ProtocolError.
1684 msg = Message.fromPostArgs({
1685 'openid.ns': OPENID2_NS,
1686 'openid.session_type': 'no-encryption',
1689 request = server.AssociateRequest.fromMessage(msg)
1691 response = self.server.openid_associate(request)
1692 self.failUnless(response.fields.hasKey(OPENID_NS, "error"))
1693 self.failUnless(response.fields.hasKey(OPENID_NS, "error_code"))
1694 self.failIf(response.fields.hasKey(OPENID_NS, "assoc_handle"))
1695 self.failIf(response.fields.hasKey(OPENID_NS, "assoc_type"))
1696 self.failIf(response.fields.hasKey(OPENID_NS, "session_type"))
1698 def test_associate3(self):
1699 """Request an assoc type that is not supported when there are
1700 supported types.
1702 Should give back an error message with a fallback type.
1704 self.server.negotiator.setAllowedTypes([('HMAC-SHA256', 'DH-SHA256')])
1706 msg = Message.fromPostArgs({
1707 'openid.ns': OPENID2_NS,
1708 'openid.session_type': 'no-encryption',
1711 request = server.AssociateRequest.fromMessage(msg)
1712 response = self.server.openid_associate(request)
1714 self.failUnless(response.fields.hasKey(OPENID_NS, "error"))
1715 self.failUnless(response.fields.hasKey(OPENID_NS, "error_code"))
1716 self.failIf(response.fields.hasKey(OPENID_NS, "assoc_handle"))
1717 self.failUnlessEqual(response.fields.getArg(OPENID_NS, "assoc_type"),
1718 'HMAC-SHA256')
1719 self.failUnlessEqual(response.fields.getArg(OPENID_NS, "session_type"),
1720 'DH-SHA256')
1722 if not cryptutil.SHA256_AVAILABLE:
1723 warnings.warn("Not running SHA256 tests.")
1724 else:
1725 def test_associate4(self):
1726 """DH-SHA256 association session"""
1727 self.server.negotiator.setAllowedTypes(
1728 [('HMAC-SHA256', 'DH-SHA256')])
1729 query = {
1730 'openid.dh_consumer_public':
1731 'ALZgnx8N5Lgd7pCj8K86T/DDMFjJXSss1SKoLmxE72kJTzOtG6I2PaYrHX'
1732 'xku4jMQWSsGfLJxwCZ6280uYjUST/9NWmuAfcrBfmDHIBc3H8xh6RBnlXJ'
1733 '1WxJY3jHd5k1/ZReyRZOxZTKdF/dnIqwF8ZXUwI6peV0TyS/K1fOfF/s',
1734 'openid.assoc_type': 'HMAC-SHA256',
1735 'openid.session_type': 'DH-SHA256',
1737 message = Message.fromPostArgs(query)
1738 request = server.AssociateRequest.fromMessage(message)
1739 response = self.server.openid_associate(request)
1740 self.failUnless(response.fields.hasKey(OPENID_NS, "assoc_handle"))
1742 def test_missingSessionTypeOpenID2(self):
1743 """Make sure session_type is required in OpenID 2"""
1744 msg = Message.fromPostArgs({
1745 'openid.ns': OPENID2_NS,
1748 self.assertRaises(server.ProtocolError,
1749 server.AssociateRequest.fromMessage, msg)
1751 def test_checkAuth(self):
1752 request = server.CheckAuthRequest('arrrrrf', '0x3999', [])
1753 response = self.server.openid_check_authentication(request)
1754 self.failUnless(response.fields.hasKey(OPENID_NS, "is_valid"))
1756 class TestSignatory(unittest.TestCase, CatchLogs):
1757 def setUp(self):
1758 self.store = memstore.MemoryStore()
1759 self.signatory = server.Signatory(self.store)
1760 self._dumb_key = self.signatory._dumb_key
1761 self._normal_key = self.signatory._normal_key
1762 CatchLogs.setUp(self)
1764 def test_sign(self):
1765 request = server.OpenIDRequest()
1766 assoc_handle = '{assoc}{lookatme}'
1767 self.store.storeAssociation(
1768 self._normal_key,
1769 association.Association.fromExpiresIn(60, assoc_handle,
1770 'sekrit', 'HMAC-SHA1'))
1771 request.assoc_handle = assoc_handle
1772 request.namespace = OPENID1_NS
1773 response = server.OpenIDResponse(request)
1774 response.fields = Message.fromOpenIDArgs({
1775 'foo': 'amsigned',
1776 'bar': 'notsigned',
1777 'azu': 'alsosigned',
1779 sresponse = self.signatory.sign(response)
1780 self.failUnlessEqual(
1781 sresponse.fields.getArg(OPENID_NS, 'assoc_handle'),
1782 assoc_handle)
1783 self.failUnlessEqual(sresponse.fields.getArg(OPENID_NS, 'signed'),
1784 'assoc_handle,azu,bar,foo,signed')
1785 self.failUnless(sresponse.fields.getArg(OPENID_NS, 'sig'))
1786 self.failIf(self.messages, self.messages)
1788 def test_signDumb(self):
1789 request = server.OpenIDRequest()
1790 request.assoc_handle = None
1791 request.namespace = OPENID2_NS
1792 response = server.OpenIDResponse(request)
1793 response.fields = Message.fromOpenIDArgs({
1794 'foo': 'amsigned',
1795 'bar': 'notsigned',
1796 'azu': 'alsosigned',
1797 'ns':OPENID2_NS,
1799 sresponse = self.signatory.sign(response)
1800 assoc_handle = sresponse.fields.getArg(OPENID_NS, 'assoc_handle')
1801 self.failUnless(assoc_handle)
1802 assoc = self.signatory.getAssociation(assoc_handle, dumb=True)
1803 self.failUnless(assoc)
1804 self.failUnlessEqual(sresponse.fields.getArg(OPENID_NS, 'signed'),
1805 'assoc_handle,azu,bar,foo,ns,signed')
1806 self.failUnless(sresponse.fields.getArg(OPENID_NS, 'sig'))
1807 self.failIf(self.messages, self.messages)
1809 def test_signExpired(self):
1810 """Sign a response to a message with an expired handle (using invalidate_handle).
1812 From "Verifying with an Association"::
1814 If an authentication request included an association handle for an
1815 association between the OP and the Relying party, and the OP no
1816 longer wishes to use that handle (because it has expired or the
1817 secret has been compromised, for instance), the OP will send a
1818 response that must be verified directly with the OP, as specified
1819 in Section 11.3.2. In that instance, the OP will include the field
1820 "openid.invalidate_handle" set to the association handle that the
1821 Relying Party included with the original request.
1823 request = server.OpenIDRequest()
1824 request.namespace = OPENID2_NS
1825 assoc_handle = '{assoc}{lookatme}'
1826 self.store.storeAssociation(
1827 self._normal_key,
1828 association.Association.fromExpiresIn(-10, assoc_handle,
1829 'sekrit', 'HMAC-SHA1'))
1830 self.failUnless(self.store.getAssociation(self._normal_key, assoc_handle))
1832 request.assoc_handle = assoc_handle
1833 response = server.OpenIDResponse(request)
1834 response.fields = Message.fromOpenIDArgs({
1835 'foo': 'amsigned',
1836 'bar': 'notsigned',
1837 'azu': 'alsosigned',
1839 sresponse = self.signatory.sign(response)
1841 new_assoc_handle = sresponse.fields.getArg(OPENID_NS, 'assoc_handle')
1842 self.failUnless(new_assoc_handle)
1843 self.failIfEqual(new_assoc_handle, assoc_handle)
1845 self.failUnlessEqual(
1846 sresponse.fields.getArg(OPENID_NS, 'invalidate_handle'),
1847 assoc_handle)
1849 self.failUnlessEqual(sresponse.fields.getArg(OPENID_NS, 'signed'),
1850 'assoc_handle,azu,bar,foo,invalidate_handle,signed')
1851 self.failUnless(sresponse.fields.getArg(OPENID_NS, 'sig'))
1853 # make sure the expired association is gone
1854 self.failIf(self.store.getAssociation(self._normal_key, assoc_handle),
1855 "expired association is still retrievable.")
1857 # make sure the new key is a dumb mode association
1858 self.failUnless(self.store.getAssociation(self._dumb_key, new_assoc_handle))
1859 self.failIf(self.store.getAssociation(self._normal_key, new_assoc_handle))
1860 self.failUnless(self.messages)
1863 def test_signInvalidHandle(self):
1864 request = server.OpenIDRequest()
1865 request.namespace = OPENID2_NS
1866 assoc_handle = '{bogus-assoc}{notvalid}'
1868 request.assoc_handle = assoc_handle
1869 response = server.OpenIDResponse(request)
1870 response.fields = Message.fromOpenIDArgs({
1871 'foo': 'amsigned',
1872 'bar': 'notsigned',
1873 'azu': 'alsosigned',
1875 sresponse = self.signatory.sign(response)
1877 new_assoc_handle = sresponse.fields.getArg(OPENID_NS, 'assoc_handle')
1878 self.failUnless(new_assoc_handle)
1879 self.failIfEqual(new_assoc_handle, assoc_handle)
1881 self.failUnlessEqual(
1882 sresponse.fields.getArg(OPENID_NS, 'invalidate_handle'),
1883 assoc_handle)
1885 self.failUnlessEqual(
1886 sresponse.fields.getArg(OPENID_NS, 'signed'), 'assoc_handle,azu,bar,foo,invalidate_handle,signed')
1887 self.failUnless(sresponse.fields.getArg(OPENID_NS, 'sig'))
1889 # make sure the new key is a dumb mode association
1890 self.failUnless(self.store.getAssociation(self._dumb_key, new_assoc_handle))
1891 self.failIf(self.store.getAssociation(self._normal_key, new_assoc_handle))
1892 self.failIf(self.messages, self.messages)
1895 def test_verify(self):
1896 assoc_handle = '{vroom}{zoom}'
1897 assoc = association.Association.fromExpiresIn(
1898 60, assoc_handle, 'sekrit', 'HMAC-SHA1')
1900 self.store.storeAssociation(self._dumb_key, assoc)
1902 signed = Message.fromPostArgs({
1903 'openid.foo': 'bar',
1904 'openid.apple': 'orange',
1905 'openid.assoc_handle': assoc_handle,
1906 'openid.signed': 'apple,assoc_handle,foo,signed',
1907 'openid.sig': 'uXoT1qm62/BB09Xbj98TQ8mlBco=',
1910 verified = self.signatory.verify(assoc_handle, signed)
1911 self.failIf(self.messages, self.messages)
1912 self.failUnless(verified)
1915 def test_verifyBadSig(self):
1916 assoc_handle = '{vroom}{zoom}'
1917 assoc = association.Association.fromExpiresIn(
1918 60, assoc_handle, 'sekrit', 'HMAC-SHA1')
1920 self.store.storeAssociation(self._dumb_key, assoc)
1922 signed = Message.fromPostArgs({
1923 'openid.foo': 'bar',
1924 'openid.apple': 'orange',
1925 'openid.assoc_handle': assoc_handle,
1926 'openid.signed': 'apple,assoc_handle,foo,signed',
1927 'openid.sig': 'uXoT1qm62/BB09Xbj98TQ8mlBco='.encode('rot13'),
1930 verified = self.signatory.verify(assoc_handle, signed)
1931 self.failIf(self.messages, self.messages)
1932 self.failIf(verified)
1934 def test_verifyBadHandle(self):
1935 assoc_handle = '{vroom}{zoom}'
1936 signed = Message.fromPostArgs({
1937 'foo': 'bar',
1938 'apple': 'orange',
1939 'openid.sig': "Ylu0KcIR7PvNegB/K41KpnRgJl0=",
1942 verified = self.signatory.verify(assoc_handle, signed)
1943 self.failIf(verified)
1944 self.failUnless(self.messages)
1947 def test_verifyAssocMismatch(self):
1948 """Attempt to validate sign-all message with a signed-list assoc."""
1949 assoc_handle = '{vroom}{zoom}'
1950 assoc = association.Association.fromExpiresIn(
1951 60, assoc_handle, 'sekrit', 'HMAC-SHA1')
1953 self.store.storeAssociation(self._dumb_key, assoc)
1955 signed = Message.fromPostArgs({
1956 'foo': 'bar',
1957 'apple': 'orange',
1958 'openid.sig': "d71xlHtqnq98DonoSgoK/nD+QRM=",
1961 verified = self.signatory.verify(assoc_handle, signed)
1962 self.failIf(verified)
1963 self.failUnless(self.messages)
1965 def test_getAssoc(self):
1966 assoc_handle = self.makeAssoc(dumb=True)
1967 assoc = self.signatory.getAssociation(assoc_handle, True)
1968 self.failUnless(assoc)
1969 self.failUnlessEqual(assoc.handle, assoc_handle)
1970 self.failIf(self.messages, self.messages)
1972 def test_getAssocExpired(self):
1973 assoc_handle = self.makeAssoc(dumb=True, lifetime=-10)
1974 assoc = self.signatory.getAssociation(assoc_handle, True)
1975 self.failIf(assoc, assoc)
1976 self.failUnless(self.messages)
1978 def test_getAssocInvalid(self):
1979 ah = 'no-such-handle'
1980 self.failUnlessEqual(
1981 self.signatory.getAssociation(ah, dumb=False), None)
1982 self.failIf(self.messages, self.messages)
1984 def test_getAssocDumbVsNormal(self):
1985 """getAssociation(dumb=False) cannot get a dumb assoc"""
1986 assoc_handle = self.makeAssoc(dumb=True)
1987 self.failUnlessEqual(
1988 self.signatory.getAssociation(assoc_handle, dumb=False), None)
1989 self.failIf(self.messages, self.messages)
1991 def test_getAssocNormalVsDumb(self):
1992 """getAssociation(dumb=True) cannot get a shared assoc
1994 From "Verifying Directly with the OpenID Provider"::
1996 An OP MUST NOT verify signatures for associations that have shared
1997 MAC keys.
1999 assoc_handle = self.makeAssoc(dumb=False)
2000 self.failUnlessEqual(
2001 self.signatory.getAssociation(assoc_handle, dumb=True), None)
2002 self.failIf(self.messages, self.messages)
2004 def test_createAssociation(self):
2005 assoc = self.signatory.createAssociation(dumb=False)
2006 self.failUnless(self.signatory.getAssociation(assoc.handle, dumb=False))
2007 self.failIf(self.messages, self.messages)
2009 def makeAssoc(self, dumb, lifetime=60):
2010 assoc_handle = '{bling}'
2011 assoc = association.Association.fromExpiresIn(lifetime, assoc_handle,
2012 'sekrit', 'HMAC-SHA1')
2014 self.store.storeAssociation((dumb and self._dumb_key) or self._normal_key, assoc)
2015 return assoc_handle
2017 def test_invalidate(self):
2018 assoc_handle = '-squash-'
2019 assoc = association.Association.fromExpiresIn(60, assoc_handle,
2020 'sekrit', 'HMAC-SHA1')
2022 self.store.storeAssociation(self._dumb_key, assoc)
2023 assoc = self.signatory.getAssociation(assoc_handle, dumb=True)
2024 self.failUnless(assoc)
2025 assoc = self.signatory.getAssociation(assoc_handle, dumb=True)
2026 self.failUnless(assoc)
2027 self.signatory.invalidate(assoc_handle, dumb=True)
2028 assoc = self.signatory.getAssociation(assoc_handle, dumb=True)
2029 self.failIf(assoc)
2030 self.failIf(self.messages, self.messages)
2034 if __name__ == '__main__':
2035 unittest.main()