10 from typing
import Any
, IO
, Tuple
12 _END_MESSAGE
= b
'HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n'
15 class _ServerRequestHandler(http
.server
.BaseHTTPRequestHandler
):
16 def __init__(self
, request
, client_address
, server
):
17 super(_ServerRequestHandler
, self
).__init
__(request
, client_address
, server
)
19 # noinspection PyPep8Naming
22 # finish this up as soon as possible; the sender is possibly an interactive shell
23 self
.wfile
.write(_END_MESSAGE
)
25 print(self
.headers
.get('Content-Type'))
27 # Hopefully everything is streamed.
28 fs
= cgi
.FieldStorage(fp
=self
.rfile
,
31 'REQUEST_METHOD': 'POST'
34 obj
= pickle
.load(fs
['pickle'].file)
36 # Well, a twisted hack. We are running in a single process so it doesn't hurt.
37 assert isinstance(self
.server
, _CustomTCPServer
)
38 assert os
.getpid() == self
.server
.pid
40 self
.server
.results
.append(obj
)
43 class _CustomTCPServer(socketserver
.TCPServer
):
44 def __init__(self
, server_address
, request_handler_class
):
45 super(_CustomTCPServer
, self
).__init
__(server_address
, request_handler_class
)
46 self
.pid
= os
.getpid()
47 self
.results
= collections
.deque()
49 def handle_timeout(self
) -> None:
50 super(_CustomTCPServer
, self
).handle_timeout()
56 In case a None is accidentally transmitted.
58 We don't use a single object() for a more pretty and meaningful __repr__ without monkey patching. Using __eq__
59 is more foolproof as well.
62 def __eq__(self
, other
):
69 listen_addr
: str = '127.0.0.1',
70 listen_port
: int = 8848):
71 self
._listen
_addr
= listen_addr
72 self
._listen
_port
= listen_port
74 def recv(self
) -> Any
:
76 with
_CustomTCPServer((self
._listen
_addr
, self
._listen
_port
), _ServerRequestHandler
) as httpd
:
77 httpd
.handle_request()
79 # We have only one request running.
80 assert len(httpd
.results
) == 1
81 return httpd
.results
[0]
82 except KeyboardInterrupt:
85 return Server
.NoData()
89 _USER_AGENT
= 'MaLiang/0.0.1'
93 server_addr
: str = '127.0.0.1',
94 server_port
: int = 8848):
95 self
._server
_addr
= server_addr
96 self
._server
_port
= server_port
98 def _socket_send_bytes(self
, s
: socket
.socket
, buffer: bytes
):
99 assert len(buffer) <= self
._BUFFER
_SIZE
101 sent
, total
= 0, len(buffer)
105 tmp
= tmp
[sent
:] # repeatedly shrink
106 curr_sent
= s
.send(tmp
)
108 raise RuntimeError('Connection closed')
113 def _socket_discard_bytes(self
, s
: socket
.socket
, length
: int):
114 assert length
<= self
._BUFFER
_SIZE
116 received
, total
= 0, length
118 while received
< total
:
119 tmp
= s
.recv(length
- received
)
121 raise RuntimeError('Connection closed while discarding bytes')
125 def _dump_and_close(obj
: Any
, file: IO
[bytes
]):
126 pickle
.dump(obj
, file)
129 def _send_streamed(self
, obj
: Any
, boundary
: str,
130 r_file
: IO
[bytes
], s
: socket
.socket
):
132 We use raw sockets since it's pretty much the only option.
133 * It does not depend on 3rd party libraries,
134 * its streaming sending interface is implemented straightforwardly, and
135 * with it we could send an HTTP request without a proper Content-Length easily.
139 mime_type
= 'application/octet-stream'
142 headers
= ['POST / HTTP/1.1\r\n',
143 'Accept-Encoding: iso-8859-1\r\n',
144 f
'Host: {self._server_port}:{self._server_addr}\r\n',
145 f
'User-Agent: {self._USER_AGENT}\r\n',
146 f
'Content-Type: multipart/form-data; boundary={boundary}\r\n',
147 'Content-Length: 0\r\n', # TODO
148 'Connection: Close\r\n',
151 self
._socket
_send
_bytes
(s
, ''.join(headers
).encode('iso-8859-1'))
154 prefixes
= [f
'--{boundary}\r\n',
155 f
'Content-Disposition: form-data; name="{name}"; filename="{name}"\r\n',
156 f
'Content-Type: {mime_type}\r\n',
159 self
._socket
_send
_bytes
(s
, ''.join(prefixes
).encode('iso-8859-1'))
163 tmp
= r_file
.read(self
._BUFFER
_SIZE
)
169 self
._socket
_send
_bytes
(s
, tmp
)
173 f
'{boundary}--\r\n\r\n'
175 self
._socket
_send
_bytes
(s
, ''.join(suffixes
).encode('iso-8859-1'))
177 def send(self
, obj
: Any
) -> None:
178 boundary
= secrets
.token_hex(8)
179 with socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
) as s
:
180 s
.connect((self
._server
_addr
, self
._server
_port
))
181 # one of the few ways to disable Nagle's algorithm
182 s
.setsockopt(socket
.IPPROTO_TCP
, socket
.TCP_NODELAY
, True)
185 with
open(rfd
, 'rb') as r_file
, open(wfd
, 'wb') as w_file
:
186 dumper
= threading
.Thread(target
=self
._dump
_and
_close
, args
=(obj
, w_file
))
188 self
._send
_streamed
(obj
, boundary
, r_file
, s
)
189 # Tell the server we've acknowledged the response.
190 s
.recv(len(_END_MESSAGE
))