9 from http
.server
import BaseHTTPRequestHandler
, HTTPServer
10 from urllib
.parse
import urlparse
, parse_qs
11 from typing
import Dict
13 SNAKEOIL_PUBLIC_KEY
= os
.environ
['SNAKEOIL_PUBLIC_KEY']
14 MOCKUSER
="mockuser_nixos_org"
15 MOCKADMIN
="mockadmin_nixos_org"
19 sys
.stderr
.write(f
"{msg}\n")
23 def gen_fingerprint(pubkey
: str):
24 decoded_key
= base64
.b64decode(pubkey
.encode("ascii").split()[1])
25 return hashlib
.sha256(decoded_key
).hexdigest()
28 def gen_email(username
: str):
29 """username seems to be a 21 characters long number string, so mimic that in a reproducible way"""
30 return str(int(hashlib
.sha256(username
.encode()).hexdigest(), 16))[0:21]
33 def gen_mockuser(username
: str, uid
: str, gid
: str, home_directory
: str, snakeoil_pubkey
: str) -> Dict
:
34 snakeoil_pubkey_fingerprint
= gen_fingerprint(snakeoil_pubkey
)
35 # seems to be a 21 characters long numberstring, so mimic that in a reproducible way
36 email
= gen_email(username
)
47 "homeDirectory": home_directory
,
48 "operatingSystemType": "LINUX"
52 snakeoil_pubkey_fingerprint
: {
53 "key": snakeoil_pubkey
,
54 "expirationTimeUsec": str((time
.time() + 600) * 1000000), # 10 minutes in the future
55 "fingerprint": snakeoil_pubkey_fingerprint
63 class ReqHandler(BaseHTTPRequestHandler
):
65 def _send_json_ok(self
, data
: dict):
66 self
.send_response(200)
67 self
.send_header('Content-type', 'application/json')
69 out
= json
.dumps(data
).encode()
73 def _send_json_success(self
, success
=True):
74 self
.send_response(200)
75 self
.send_header('Content-type', 'application/json')
77 out
= json
.dumps({"success": success
}).encode()
82 self
.send_response(404)
88 params
= parse_qs(pu
.query
)
91 if pu
.path
== "/computeMetadata/v1/oslogin/users":
92 # mockuser and mockadmin are allowed to login, both use the same snakeoil public key
93 if params
.get('username') == [MOCKUSER
] or params
.get('uid') == ["1009719690"]:
96 elif params
.get('username') == [MOCKADMIN
] or params
.get('uid') == ["1009719691"]:
103 self
._send
_json
_ok
(gen_mockuser(username
=username
, uid
=uid
, gid
=uid
, home_directory
=f
"/home/{username}", snakeoil_pubkey
=SNAKEOIL_PUBLIC_KEY
))
106 # we need to provide something at the groups endpoint.
107 # the nss module does segfault if we don't.
108 elif pu
.path
== "/computeMetadata/v1/oslogin/groups":
111 {"name" : "demo", "gid" : 4294967295}
117 elif pu
.path
== "/computeMetadata/v1/oslogin/authorize":
118 # is user allowed to login?
119 if params
.get("policy") == ["login"]:
120 # mockuser and mockadmin are allowed to login
121 if params
.get('email') == [gen_email(MOCKUSER
)] or params
.get('email') == [gen_email(MOCKADMIN
)]:
122 self
._send
_json
_success
()
124 self
._send
_json
_success
(False)
126 # is user allowed to become root?
127 elif params
.get("policy") == ["adminLogin"]:
128 # only mockadmin is allowed to become admin
129 self
._send
_json
_success
((params
['email'] == [gen_email(MOCKADMIN
)]))
131 # send 404 for other policies
136 sys
.stderr
.write(f
"Unhandled path: {p}\n")
138 self
.send_response(404)
140 self
.wfile
.write(b
'')
143 if __name__
== '__main__':
144 s
= HTTPServer(('0.0.0.0', 80), ReqHandler
)