1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 from six
.moves
.urllib
.parse
import parse_qs
, urlparse
21 from oauthlib
.oauth1
import Client
23 from mediagoblin
import mg_globals
24 from mediagoblin
.tools
import template
, pluginapi
25 from mediagoblin
.tests
.tools
import fixture_add_user
28 class TestOAuth(object):
30 MIME_FORM
= "application/x-www-form-urlencoded"
31 MIME_JSON
= "application/json"
33 @pytest.fixture(autouse
=True)
34 def setup(self
, test_app
):
35 self
.test_app
= test_app
37 self
.db
= mg_globals
.database
39 self
.pman
= pluginapi
.PluginManager()
41 self
.user_password
= "AUserPassword123"
42 self
.user
= fixture_add_user("OAuthy", self
.user_password
)
49 "username": self
.user
.username
,
50 "password": self
.user_password
})
52 def register_client(self
, **kwargs
):
53 """ Regiters a client with the API """
55 kwargs
["type"] = "client_associate"
56 kwargs
["application_type"] = kwargs
.get("application_type", "native")
57 return self
.test_app
.post("/api/client/register", kwargs
)
59 def test_client_client_register_limited_info(self
):
60 """ Tests that a client can be registered with limited information """
61 response
= self
.register_client()
62 client_info
= response
.json
64 client
= self
.db
.Client
.query
.filter_by(id=client_info
["client_id"]).first()
66 assert response
.status_int
== 200
67 assert client
is not None
69 def test_client_register_full_info(self
):
70 """ Provides every piece of information possible to register client """
72 "application_name": "Testificate MD",
73 "application_type": "web",
74 "contacts": "someone@someplace.com tuteo@tsengeo.lu",
75 "logo_uri": "http://ayrel.com/utral.png",
76 "redirect_uris": "http://navi-kosman.lu http://gmg-yawne-oeru.lu",
79 response
= self
.register_client(**query
)
80 client_info
= response
.json
82 client
= self
.db
.Client
.query
.filter_by(id=client_info
["client_id"]).first()
84 assert client
is not None
85 assert client
.secret
== client_info
["client_secret"]
86 assert client
.application_type
== query
["application_type"]
87 assert client
.redirect_uri
== query
["redirect_uris"].split()
88 assert client
.logo_url
== query
["logo_uri"]
89 assert client
.contacts
== query
["contacts"].split()
92 def test_client_update(self
):
93 """ Tests that you can update a client """
94 # first we need to register a client
95 response
= self
.register_client()
97 client_info
= response
.json
98 client
= self
.db
.Client
.query
.filter_by(id=client_info
["client_id"]).first()
102 "type": "client_update",
103 "application_name": "neytiri",
104 "contacts": "someone@someplace.com abc@cba.com",
105 "logo_uri": "http://place.com/picture.png",
106 "application_type": "web",
107 "redirect_uris": "http://blah.gmg/whatever https://inboxen.org/",
110 update_response
= self
.register_client(**update_query
)
112 assert update_response
.status_int
== 200
113 client_info
= update_response
.json
114 client
= self
.db
.Client
.query
.filter_by(id=client_info
["client_id"]).first()
116 assert client
.secret
== client_info
["client_secret"]
117 assert client
.application_type
== update_query
["application_type"]
118 assert client
.application_name
== update_query
["application_name"]
119 assert client
.contacts
== update_query
["contacts"].split()
120 assert client
.logo_url
== update_query
["logo_uri"]
121 assert client
.redirect_uri
== update_query
["redirect_uris"].split()
123 def to_authorize_headers(self
, data
):
125 for key
, value
in data
.items():
126 headers
+= '{0}="{1}",'.format(key
, value
)
127 return {"Authorization": "OAuth " + headers
[:-1]}
129 def test_request_token(self
):
130 """ Test a request for a request token """
131 response
= self
.register_client()
133 client_id
= response
.json
["client_id"]
135 endpoint
= "/oauth/request_token"
137 "oauth_consumer_key": client_id
,
138 "oauth_nonce": "abcdefghij",
139 "oauth_timestamp": 123456789.0,
140 "oauth_callback": "https://some.url/callback",
143 headers
= self
.to_authorize_headers(request_query
)
145 headers
["Content-Type"] = self
.MIME_FORM
147 response
= self
.test_app
.post(endpoint
, headers
=headers
)
148 response
= parse_qs(response
.body
.decode())
150 # each element is a list, reduce it to a string
151 for key
, value
in response
.items():
152 response
[key
] = value
[0]
154 request_token
= self
.db
.RequestToken
.query
.filter_by(
155 token
=response
["oauth_token"]
158 client
= self
.db
.Client
.query
.filter_by(id=client_id
).first()
160 assert request_token
is not None
161 assert request_token
.secret
== response
["oauth_token_secret"]
162 assert request_token
.client
== client
.id
163 assert request_token
.used
== False
164 assert request_token
.callback
== request_query
["oauth_callback"]