python312Packages.kneaddata: init at 0.7.7-alpha (#340230)
[NixPkgs.git] / nixos / tests / google-oslogin / server.py
blob622cd86b261959f96dee465d7d174307c09fe82e
1 #!/usr/bin/env python3
2 import json
3 import sys
4 import time
5 import os
6 import hashlib
7 import base64
9 from http.server import BaseHTTPRequestHandler, HTTPServer
10 from urllib.parse import urlparse, parse_qs
11 from typing import Dict
13 SNAKEOIL_PUBLIC_KEY = os.environ['SNAKEOIL_PUBLIC_KEY']
14 MOCKUSER="mockuser_nixos_org"
15 MOCKADMIN="mockadmin_nixos_org"
18 def w(msg: bytes):
19 sys.stderr.write(f"{msg}\n")
20 sys.stderr.flush()
23 def gen_fingerprint(pubkey: str):
24 decoded_key = base64.b64decode(pubkey.encode("ascii").split()[1])
25 return hashlib.sha256(decoded_key).hexdigest()
28 def gen_email(username: str):
29 """username seems to be a 21 characters long number string, so mimic that in a reproducible way"""
30 return str(int(hashlib.sha256(username.encode()).hexdigest(), 16))[0:21]
33 def gen_mockuser(username: str, uid: str, gid: str, home_directory: str, snakeoil_pubkey: str) -> Dict:
34 snakeoil_pubkey_fingerprint = gen_fingerprint(snakeoil_pubkey)
35 # seems to be a 21 characters long numberstring, so mimic that in a reproducible way
36 email = gen_email(username)
37 return {
38 "loginProfiles": [
40 "name": email,
41 "posixAccounts": [
43 "primary": True,
44 "username": username,
45 "uid": uid,
46 "gid": gid,
47 "homeDirectory": home_directory,
48 "operatingSystemType": "LINUX"
51 "sshPublicKeys": {
52 snakeoil_pubkey_fingerprint: {
53 "key": snakeoil_pubkey,
54 "expirationTimeUsec": str((time.time() + 600) * 1000000), # 10 minutes in the future
55 "fingerprint": snakeoil_pubkey_fingerprint
63 class ReqHandler(BaseHTTPRequestHandler):
65 def _send_json_ok(self, data: dict):
66 self.send_response(200)
67 self.send_header('Content-type', 'application/json')
68 self.end_headers()
69 out = json.dumps(data).encode()
70 w(out)
71 self.wfile.write(out)
73 def _send_json_success(self, success=True):
74 self.send_response(200)
75 self.send_header('Content-type', 'application/json')
76 self.end_headers()
77 out = json.dumps({"success": success}).encode()
78 w(out)
79 self.wfile.write(out)
81 def _send_404(self):
82 self.send_response(404)
83 self.end_headers()
85 def do_GET(self):
86 p = str(self.path)
87 pu = urlparse(p)
88 params = parse_qs(pu.query)
90 # users endpoint
91 if pu.path == "/computeMetadata/v1/oslogin/users":
92 # mockuser and mockadmin are allowed to login, both use the same snakeoil public key
93 if params.get('username') == [MOCKUSER] or params.get('uid') == ["1009719690"]:
94 username = MOCKUSER
95 uid = "1009719690"
96 elif params.get('username') == [MOCKADMIN] or params.get('uid') == ["1009719691"]:
97 username = MOCKADMIN
98 uid = "1009719691"
99 else:
100 self._send_404()
101 return
103 self._send_json_ok(gen_mockuser(username=username, uid=uid, gid=uid, home_directory=f"/home/{username}", snakeoil_pubkey=SNAKEOIL_PUBLIC_KEY))
104 return
106 # we need to provide something at the groups endpoint.
107 # the nss module does segfault if we don't.
108 elif pu.path == "/computeMetadata/v1/oslogin/groups":
109 self._send_json_ok({
110 "posixGroups": [
111 {"name" : "demo", "gid" : 4294967295}
114 return
116 # authorize endpoint
117 elif pu.path == "/computeMetadata/v1/oslogin/authorize":
118 # is user allowed to login?
119 if params.get("policy") == ["login"]:
120 # mockuser and mockadmin are allowed to login
121 if params.get('email') == [gen_email(MOCKUSER)] or params.get('email') == [gen_email(MOCKADMIN)]:
122 self._send_json_success()
123 return
124 self._send_json_success(False)
125 return
126 # is user allowed to become root?
127 elif params.get("policy") == ["adminLogin"]:
128 # only mockadmin is allowed to become admin
129 self._send_json_success((params['email'] == [gen_email(MOCKADMIN)]))
130 return
131 # send 404 for other policies
132 else:
133 self._send_404()
134 return
135 else:
136 sys.stderr.write(f"Unhandled path: {p}\n")
137 sys.stderr.flush()
138 self.send_response(404)
139 self.end_headers()
140 self.wfile.write(b'')
143 if __name__ == '__main__':
144 s = HTTPServer(('0.0.0.0', 80), ReqHandler)
145 s.serve_forever()