Fix unicode bug from python3 conversion
[pgweb/local.git] / tools / communityauth / sample / django / auth.py
blob79dac96cbe9d22513ee69d5bf4e2d8508ba42631
2 # Django module to support postgresql.org community authentication 2.0
4 # The main location for this module is the pgweb git repository hosted
5 # on git.postgresql.org - look there for updates.
7 # To integrate with django, you need the following:
8 # * Make sure the view "login" from this module is used for login
9 # * Map an url somwehere (typically /auth_receive/) to the auth_receive
10 # view.
11 # * In settings.py, set AUTHENTICATION_BACKENDS to point to the class
12 # AuthBackend in this module.
13 # * (And of course, register for a crypto key with the main authentication
14 # provider website)
15 # * If the application uses the django admin interface, the login screen
16 # has to be replaced with something similar to login.html in this
17 # directory (adjust urls, and name it admin/login.html in any template
18 # directory that's processed before the default django.contrib.admin)
21 from django.http import HttpResponse, HttpResponseRedirect
22 from django.contrib.auth.models import User
23 from django.contrib.auth.backends import ModelBackend
24 from django.contrib.auth import login as django_login
25 from django.contrib.auth import logout as django_logout
26 from django.conf import settings
28 import base64
29 import json
30 import socket
31 from urllib.parse import urlparse, urlencode, parse_qs
32 import requests
33 from Cryptodome.Cipher import AES
34 from Cryptodome.Hash import SHA
35 from Cryptodome import Random
36 import time
39 class AuthBackend(ModelBackend):
40 # We declare a fake backend that always fails direct authentication -
41 # since we should never be using direct authentication in the first place!
42 def authenticate(self, username=None, password=None):
43 raise Exception("Direct authentication not supported")
46 ####
47 # Two regular django views to interact with the login system
48 ####
50 # Handle login requests by sending them off to the main site
51 def login(request):
52 if 'next' in request.GET:
53 # Put together an url-encoded dict of parameters we're getting back,
54 # including a small nonce at the beginning to make sure it doesn't
55 # encrypt the same way every time.
56 s = "t=%s&%s" % (int(time.time()), urlencode({'r': request.GET['next']}))
57 # Now encrypt it
58 r = Random.new()
59 iv = r.read(16)
60 encryptor = AES.new(SHA.new(settings.SECRET_KEY.encode('ascii')).digest()[:16], AES.MODE_CBC, iv)
61 cipher = encryptor.encrypt(s.encode('ascii') + b' ' * (16 - (len(s) % 16))) # pad to 16 bytes
63 return HttpResponseRedirect("%s?d=%s$%s" % (
64 settings.PGAUTH_REDIRECT,
65 base64.b64encode(iv, b"-_").decode('utf8'),
66 base64.b64encode(cipher, b"-_").decode('utf8'),
68 else:
69 return HttpResponseRedirect(settings.PGAUTH_REDIRECT)
72 # Handle logout requests by logging out of this site and then
73 # redirecting to log out from the main site as well.
74 def logout(request):
75 if request.user.is_authenticated():
76 django_logout(request)
77 return HttpResponseRedirect("%slogout/" % settings.PGAUTH_REDIRECT)
80 # Receive an authentication response from the main website and try
81 # to log the user in.
82 def auth_receive(request):
83 if 's' in request.GET and request.GET['s'] == "logout":
84 # This was a logout request
85 return HttpResponseRedirect('/')
87 if 'i' not in request.GET:
88 return HttpResponse("Missing IV in url!", status=400)
89 if 'd' not in request.GET:
90 return HttpResponse("Missing data in url!", status=400)
92 # Set up an AES object and decrypt the data we received
93 decryptor = AES.new(base64.b64decode(settings.PGAUTH_KEY),
94 AES.MODE_CBC,
95 base64.b64decode(str(request.GET['i']), "-_"))
96 s = decryptor.decrypt(base64.b64decode(str(request.GET['d']), "-_")).rstrip(b' ').decode('utf8')
98 # Now un-urlencode it
99 try:
100 data = parse_qs(s, strict_parsing=True)
101 except ValueError:
102 return HttpResponse("Invalid encrypted data received.", status=400)
104 # Check the timestamp in the authentication
105 if (int(data['t'][0]) < time.time() - 10):
106 return HttpResponse("Authentication token too old.", status=400)
108 # Update the user record (if any)
109 try:
110 user = User.objects.get(username=data['u'][0])
111 # User found, let's see if any important fields have changed
112 changed = False
113 if user.first_name != data['f'][0]:
114 user.first_name = data['f'][0]
115 changed = True
116 if user.last_name != data['l'][0]:
117 user.last_name = data['l'][0]
118 changed = True
119 if user.email != data['e'][0]:
120 user.email = data['e'][0]
121 changed = True
122 if changed:
123 user.save()
124 except User.DoesNotExist:
125 # User not found, create it!
127 # NOTE! We have some legacy users where there is a user in
128 # the database with a different userid. Instead of trying to
129 # somehow fix that live, give a proper error message and
130 # have somebody look at it manually.
131 if User.objects.filter(email=data['e'][0]).exists():
132 return HttpResponse("""A user with email %s already exists, but with
133 a different username than %s.
135 This is almost certainly caused by some legacy data in our database.
136 Please send an email to webmaster@postgresql.eu, indicating the username
137 and email address from above, and we'll manually merge the two accounts
138 for you.
140 We apologize for the inconvenience.
141 """ % (data['e'][0], data['u'][0]), content_type='text/plain')
143 user = User(username=data['u'][0],
144 first_name=data['f'][0],
145 last_name=data['l'][0],
146 email=data['e'][0],
147 password='setbypluginnotasha1',
149 user.save()
151 # Ok, we have a proper user record. Now tell django that
152 # we're authenticated so it persists it in the session. Before
153 # we do that, we have to annotate it with the backend information.
154 user.backend = "%s.%s" % (AuthBackend.__module__, AuthBackend.__name__)
155 django_login(request, user)
157 # Finally, check of we have a data package that tells us where to
158 # redirect the user.
159 if 'd' in data:
160 (ivs, datas) = data['d'][0].split('$')
161 decryptor = AES.new(SHA.new(settings.SECRET_KEY.encode('ascii')).digest()[:16],
162 AES.MODE_CBC,
163 base64.b64decode(ivs, b"-_"))
164 s = decryptor.decrypt(base64.b64decode(datas, "-_")).rstrip(b' ').decode('utf8')
165 try:
166 rdata = parse_qs(s, strict_parsing=True)
167 except ValueError:
168 return HttpResponse("Invalid encrypted data received.", status=400)
169 if 'r' in rdata:
170 # Redirect address
171 return HttpResponseRedirect(rdata['r'][0])
172 # No redirect specified, see if we have it in our settings
173 if hasattr(settings, 'PGAUTH_REDIRECT_SUCCESS'):
174 return HttpResponseRedirect(settings.PGAUTH_REDIRECT_SUCCESS)
175 return HttpResponse("Authentication successful, but don't know where to redirect!", status=500)
178 # Perform a search in the central system. Note that the results are returned as an
179 # array of dicts, and *not* as User objects. To be able to for example reference the
180 # user through a ForeignKey, a User object must be materialized locally. We don't do
181 # that here, as this search might potentially return a lot of unrelated users since
182 # it's a wildcard match.
183 # Unlike the authentication, searching does not involve the browser - we just make
184 # a direct http call.
185 def user_search(searchterm=None, userid=None):
186 # If upsteam isn't responding quickly, it's not going to respond at all, and
187 # 10 seconds is already quite long.
188 socket.setdefaulttimeout(10)
189 if userid:
190 q = {'u': userid}
191 else:
192 q = {'s': searchterm}
194 r = requests.get(
195 '{0}search/'.format(settings.PGAUTH_REDIRECT),
196 params=q,
198 if r.status_code != 200:
199 return []
201 (ivs, datas) = r.text.encode('utf8').split(b'&')
203 # Decryption time
204 decryptor = AES.new(base64.b64decode(settings.PGAUTH_KEY),
205 AES.MODE_CBC,
206 base64.b64decode(ivs, "-_"))
207 s = decryptor.decrypt(base64.b64decode(datas, "-_")).rstrip(b' ').decode('utf8')
208 j = json.loads(s)
210 return j
213 # Import a user into the local authentication system. Will initially
214 # make a search for it, and if anything other than one entry is returned
215 # the import will fail.
216 # Import is only supported based on userid - so a search should normally
217 # be done first. This will result in multiple calls to the upstream
218 # server, but they are cheap...
219 # The call to this function should normally be wrapped in a transaction,
220 # and this function itself will make no attempt to do anything about that.
221 def user_import(uid):
222 u = user_search(userid=uid)
223 if len(u) != 1:
224 raise Exception("Internal error, duplicate or no user found")
226 u = u[0]
228 if User.objects.filter(username=u['u']).exists():
229 raise Exception("User already exists")
231 User(username=u['u'],
232 first_name=u['f'],
233 last_name=u['l'],
234 email=u['e'],
235 password='setbypluginnotsha1',
236 ).save()