2 * Copyright (C) 2012-2020 all contributors <cmogstored-public@yhbt.net>
3 * License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
5 #include "cmogstored.h"
9 * On FreeBSD, this disables TCP_NOPUSH momentarily to flush out corked data.
10 * This immediately recorks again if we're handling persistent connections,
11 * otherwise we leave it uncorked if we're going to close the socket anyways
13 static void tcp_push(struct mog_fd
*mfd
, bool recork
)
15 socklen_t len
= (socklen_t
)sizeof(int);
19 if (MOG_TCP_NOPUSH
== 0)
22 rv
= setsockopt(mfd
->fd
, IPPROTO_TCP
, MOG_TCP_NOPUSH
, &val
, len
);
24 if (rv
== 0 && recork
) {
26 setsockopt(mfd
->fd
, IPPROTO_TCP
, MOG_TCP_NOPUSH
, &val
, len
);
28 /* deal with errors elsewhere */
31 /* stash any pipelined data for the next round */
33 http_defer_rbuf(struct mog_http
*http
, struct mog_rbuf
*rbuf
, size_t buf_len
)
35 struct mog_rbuf
*old
= http
->rbuf
;
36 size_t defer_bytes
= buf_len
- http
->_p
.buf_off
;
37 char *src
= rbuf
->rptr
+ http
->_p
.buf_off
;
39 if (http
->_p
.skip_rbuf_defer
) {
40 http
->_p
.skip_rbuf_defer
= 0;
44 assert(http
->_p
.buf_off
>= 0 && "http->_p.buf_off negative");
45 assert(defer_bytes
<= MOG_RBUF_MAX_SIZE
&& "defer bytes overflow");
47 if (defer_bytes
== 0) {
48 mog_rbuf_reattach_and_null(&http
->rbuf
);
49 } else if (old
) { /* no allocation needed, reuse existing */
50 assert(old
== rbuf
&& "http->rbuf not reused properly");
51 memmove(old
->rptr
, src
, defer_bytes
);
52 old
->rsize
= defer_bytes
;
54 http
->rbuf
= mog_rbuf_new(defer_bytes
);
55 memcpy(http
->rbuf
->rptr
, src
, defer_bytes
);
56 http
->rbuf
->rsize
= defer_bytes
;
62 http_process_client(struct mog_fd
*mfd
, struct mog_rbuf
*rbuf
,
63 char *buf
, size_t buf_len
)
65 struct mog_http
*http
= &mfd
->as
.http
;
68 assert(http
->wbuf
== NULL
&&
69 "processing client with buffered data");
71 dev
= mog_dev_for(http
->svc
, http
->_p
.mog_devid
, false);
72 if (dev
&& !mog_ioq_ready(&dev
->ioq
, mfd
)) {
74 http
->rbuf
= mog_rbuf_detach(rbuf
);
75 http
->rbuf
->rsize
= buf_len
;
79 switch (http
->_p
.http_method
) {
80 case MOG_HTTP_METHOD_NONE
: assert(0 && "BUG: unset HTTP method");
81 case MOG_HTTP_METHOD_GET
: mog_http_get_open(mfd
, buf
); break;
82 case MOG_HTTP_METHOD_HEAD
: mog_http_get_open(mfd
, buf
); break;
83 case MOG_HTTP_METHOD_DELETE
: mog_http_delete(mfd
, buf
); break;
84 case MOG_HTTP_METHOD_MKCOL
: mog_http_mkcol(mfd
, buf
); break;
85 case MOG_HTTP_METHOD_PUT
: mog_http_put(mfd
, buf
, buf_len
); break;
91 MOG_NOINLINE
static void http_close(struct mog_fd
*mfd
)
93 struct mog_http
*http
= &mfd
->as
.http
;
95 mog_rbuf_free(http
->rbuf
);
96 assert((http
->wbuf
== NULL
|| http
->wbuf
== MOG_WR_ERROR
) &&
97 "would leak http->wbuf on close");
100 * uncork to avoid ECONNCRESET if we have unread data
101 * (but already wrote a response). This can help get
102 * the proper error sent to the client if the client is
103 * writing a request that's too big to read and we reset
104 * their connection to save ourselves bandwidth/cycles
106 tcp_push(mfd
, false);
107 mog_packaddr_free(&http
->mpa
);
112 void mog_http_unlink_ftmp(struct mog_http
*http
)
114 struct mog_file
*file
= &http
->forward
->as
.file
;
119 if (mog_unlink(http
->svc
, file
->tmppath
) != 0)
120 syslog(LOG_ERR
, "Failed to unlink %s (in %s): %m",
121 file
->tmppath
, http
->svc
->docroot
);
125 * called only if epoll/kevent is out-of-space
126 * Note: it's impossible for this to be called while this mfd is
127 * inside an ioq SIMPLEQ, however mfd->ioq_blocked /may/ be true when
128 * this function is called. We could add validation code to ensure
129 * this remains true, but that would increase the size of "struct mog_fd",
130 * so we will rely on this comment instead.
132 void mog_http_drop(struct mog_fd
*mfd
)
134 struct mog_http
*http
= &mfd
->as
.http
;
136 assert(http
->forward
!= MOG_IOSTAT
);
138 mog_http_unlink_ftmp(http
);
139 mog_file_close(http
->forward
);
144 /* returns true if we can continue queue step, false if not */
145 static enum mog_next
http_wbuf_in_progress(struct mog_fd
*mfd
)
147 struct mog_http
*http
= &mfd
->as
.http
;
149 assert(http
->wbuf
!= MOG_WR_ERROR
&& "still active after write error");
150 switch (mog_tryflush(mfd
->fd
, &http
->wbuf
)) {
151 case MOG_WRSTATE_ERR
:
152 return MOG_NEXT_CLOSE
;
153 case MOG_WRSTATE_DONE
:
154 if (!http
->_p
.persistent
) return MOG_NEXT_CLOSE
;
155 if (http
->forward
== NULL
)
157 assert(http
->_p
.buf_off
== 0 && "bad offset");
158 return MOG_NEXT_ACTIVE
;
159 case MOG_WRSTATE_BUSY
:
160 /* unlikely, we never put anything big in wbuf */
161 return MOG_NEXT_WAIT_WR
;
163 assert(0 && "compiler bug?");
164 return MOG_NEXT_CLOSE
;
167 static enum mog_next
http_forward_in_progress(struct mog_fd
*mfd
, bool needq
)
169 struct mog_http
*http
= &mfd
->as
.http
;
170 enum mog_http_method method
= http
->_p
.http_method
;
173 struct mog_file
*file
= &http
->forward
->as
.file
;
175 if (file
->ioq
&& !mog_ioq_ready(file
->ioq
, mfd
))
176 /* no need to setup/stash rbuf, it's been done */
177 return MOG_NEXT_IGNORE
;
180 if (method
== MOG_HTTP_METHOD_GET
)
181 return mog_http_get_in_progress(mfd
);
183 assert(method
== MOG_HTTP_METHOD_PUT
&& "bad http_method for forward");
185 return mog_http_put_in_progress(mfd
);
188 static enum mog_next
http_run(struct mog_fd
*mfd
, struct mog_rbuf
*rbuf
,
189 char *buf
, size_t buf_len
)
191 struct mog_http
*http
= &mfd
->as
.http
;
193 if (!http_process_client(mfd
, rbuf
, buf
, buf_len
))
194 return MOG_NEXT_IGNORE
; /* in ioq */
195 if (http
->wbuf
== MOG_WR_ERROR
)
196 return MOG_NEXT_CLOSE
;
198 http_defer_rbuf(http
, rbuf
, buf_len
);
199 return MOG_NEXT_WAIT_WR
;
200 } else if (http
->forward
) {
201 http_defer_rbuf(http
, rbuf
, buf_len
);
202 return http_forward_in_progress(mfd
, false);
203 } else if (!http
->_p
.persistent
) {
204 return MOG_NEXT_CLOSE
;
206 /* pipelined request */
208 TRACE(CMOGSTORED_HTTP_REQ_BEGIN(mfd
->fd
, true));
210 http_defer_rbuf(http
, rbuf
, buf_len
);
213 return MOG_NEXT_ACTIVE
;
216 static MOG_NOINLINE
enum mog_next
217 http_client_died(struct mog_fd
*mfd
, size_t buf_len
, int save_err
)
221 /* TODO: support nameinfo */
222 TRACE(CMOGSTORED_HTTP_RDERR(mfd
->fd
, buf_len
, save_err
));
227 return MOG_NEXT_CLOSE
;
228 /* these errors are too common to log, normally */
231 mog_nameinfo(&mfd
->as
.http
.mpa
, &ni
);
233 syslog(LOG_NOTICE
, "http client died: %m (%s%s)",
234 ni
.ni_host
, ni
.ni_serv
);
235 return MOG_NEXT_CLOSE
;
239 http_rbuf_grow(struct mog_fd
*mfd
, struct mog_rbuf
**rbuf
, size_t buf_len
)
241 struct mog_http
*http
= &mfd
->as
.http
;
243 TRACE(CMOGSTORED_HTTP_RBUF_GROW(mfd
->fd
, buf_len
));
244 (*rbuf
)->rsize
= buf_len
;
245 http
->rbuf
= *rbuf
= mog_rbuf_grow(*rbuf
);
246 return *rbuf
? (*rbuf
)->rptr
: NULL
;
249 MOG_NOINLINE
static bool
250 http_parse_continue(struct mog_fd
*mfd
, struct mog_rbuf
**rbuf
,
251 char **buf
, size_t buf_len
, uint32_t *off
)
253 struct mog_http
*http
= &mfd
->as
.http
;
255 TRACE(CMOGSTORED_HTTP_PARSE_CONTINUE(mfd
->fd
, buf_len
));
256 assert(http
->wbuf
== NULL
&&
257 "tried to write (and failed) with partial req");
258 if (http
->_p
.buf_off
>= (*rbuf
)->rcapa
) {
259 *buf
= http_rbuf_grow(mfd
, rbuf
, buf_len
);
264 *off
= http
->_p
.buf_off
;
268 static enum mog_next
__http_queue_step(struct mog_fd
*mfd
)
270 struct mog_http
*http
= &mfd
->as
.http
;
271 struct mog_rbuf
*rbuf
;
276 enum mog_parser_state state
;
278 assert(mfd
->fd
>= 0 && "http fd is invalid");
280 if (http
->wbuf
) return http_wbuf_in_progress(mfd
);
281 if (http
->forward
) return http_forward_in_progress(mfd
, true);
283 /* we may have pipelined data in http->rbuf */
284 rbuf
= http
->rbuf
? http
->rbuf
: mog_rbuf_get(MOG_RBUF_BASE_SIZE
);
286 off
= http
->_p
.buf_off
;
287 assert(off
>= 0 && "offset is negative");
288 assert(off
<= rbuf
->rcapa
&& "offset is too big");
291 buf_len
= http
->rbuf
->rsize
;
292 if (mog_ioq_unblock(mfd
))
293 return http_run(mfd
, rbuf
, buf
, buf_len
);
294 /* request got pipelined, resuming now */
295 assert(off
< rbuf
->rcapa
&& "offset is too big");
296 assert(http
->_p
.buf_off
<= buf_len
297 && "bad offset from pipelining");
298 assert(buf_len
<= http
->rbuf
->rcapa
&& "bad rsize stashed");
299 if (http
->_p
.buf_off
< buf_len
)
302 assert(off
< rbuf
->rcapa
&& "offset is too big");
304 r
= read(mfd
->fd
, buf
+ off
, rbuf
->rcapa
- off
);
307 TRACE(CMOGSTORED_HTTP_REQ_BEGIN(mfd
->fd
, false));
311 state
= mog_http_parse(http
, buf
, buf_len
);
314 case MOG_PARSER_ERROR
:
316 case MOG_PARSER_CONTINUE
:
317 if (http_parse_continue(mfd
, &rbuf
, &buf
, buf_len
,
321 case MOG_PARSER_DONE
:
322 return http_run(mfd
, rbuf
, buf
, buf_len
);
324 } else if (r
== 0) { /* client shut down */
325 TRACE(CMOGSTORED_HTTP_CLIENT_CLOSE(mfd
->fd
, buf_len
));
326 return MOG_NEXT_CLOSE
;
331 if (http
->rbuf
== NULL
)
332 http
->rbuf
= mog_rbuf_detach(rbuf
);
333 http
->rbuf
->rsize
= buf_len
;
335 return MOG_NEXT_WAIT_RD
;
336 case EINTR
: goto reread
;
338 return http_client_died(mfd
, buf_len
, errno
);
342 assert(0 && "compiler bug?");
345 if (errno
== ERANGE
) {
346 mog_http_resp(mfd
, "507 Insufficient Storage", false);
349 mog_http_resp(mfd
, "400 Bad Request", false);
351 return MOG_NEXT_CLOSE
;
354 static enum mog_next
http_queue_step(struct mog_fd
*mfd
)
356 enum mog_next next
= __http_queue_step(mfd
);
358 /* enqueue any pending waiters before we become enqueued ourselves */
364 enum mog_next
mog_http_queue_step(struct mog_fd
*mfd
)
366 enum mog_next rv
= http_queue_step(mfd
);
368 if (rv
== MOG_NEXT_CLOSE
)
373 /* called during graceful shutdown instead of mog_http_queue_step */
374 void mog_http_quit_step(struct mog_fd
*mfd
)
376 struct mog_http
*http
= &mfd
->as
.http
;
377 struct mog_queue
*q
= http
->svc
->queue
;
379 /* centralize all queue transitions here: */
380 switch (http_queue_step(mfd
)) {
381 case MOG_NEXT_WAIT_RD
:
382 if (http
->forward
|| http
->rbuf
) {
383 mog_idleq_push(q
, mfd
, MOG_QEV_RD
);
388 mog_nr_active_at_quit
--;
391 case MOG_NEXT_ACTIVE
: mog_activeq_push(q
, mfd
); return;
392 case MOG_NEXT_WAIT_WR
: mog_idleq_push(q
, mfd
, MOG_QEV_WR
); return;
393 case MOG_NEXT_IGNORE
:
398 /* stringify the address for tracers */
399 static MOG_NOINLINE
void
400 trace_http_accepted(struct mog_fd
*mfd
, const char *listen_addr
)
402 #ifdef HAVE_SYSTEMTAP
403 struct mog_packaddr
*mpa
= &mfd
->as
.http
.mpa
;
406 mog_nameinfo(mpa
, &ni
);
407 TRACE(CMOGSTORED_HTTP_ACCEPTED(mfd
->fd
, ni
.ni_host
, ni
.ni_serv
,
409 #endif /* !HAVE_SYSTEMTAP */
412 static void http_post_accept_common(struct mog_fd
*mfd
, struct mog_accept
*ac
,
413 union mog_sockaddr
*msa
, socklen_t salen
)
415 struct mog_http
*http
= &mfd
->as
.http
;
417 mog_http_init(http
, ac
->svc
);
418 mog_packaddr_init(&http
->mpa
, msa
, salen
);
420 if (TRACE_ENABLED(CMOGSTORED_HTTP_ACCEPTED
))
421 trace_http_accepted(mfd
, ac
->addrinfo
->orig
);
423 mog_idleq_add(ac
->svc
->queue
, mfd
, MOG_QEV_RD
);
426 /* called immediately after accept(), this initializes the mfd (once) */
427 void mog_http_post_accept(int fd
, struct mog_accept
*ac
,
428 union mog_sockaddr
*msa
, socklen_t salen
)
430 struct mog_fd
*mfd
= mog_fd_init(fd
, MOG_FD_TYPE_HTTP
);
432 http_post_accept_common(mfd
, ac
, msa
, salen
);
435 /* called immediately after accept(), this initializes the mfd (once) */
436 void mog_httpget_post_accept(int fd
, struct mog_accept
*ac
,
437 union mog_sockaddr
*msa
, socklen_t salen
)
439 struct mog_fd
*mfd
= mog_fd_init(fd
, MOG_FD_TYPE_HTTPGET
);
441 http_post_accept_common(mfd
, ac
, msa
, salen
);
445 * returns a NUL-terminated HTTP path from the rbuf pointer
446 * returns NUL if we got a bad path.
448 char *mog_http_path(struct mog_http
*http
, char *buf
)
450 if (http
->_p
.usage_txt
) {
454 char *path
= buf
+ http
->_p
.path_tip
;
455 size_t len
= http
->_p
.path_end
- http
->_p
.path_tip
;
457 assert(http
->_p
.path_end
> http
->_p
.path_tip
458 && "bad HTTP path from parser");
460 if (! mog_valid_path(path
, len
))
463 if (http
->_p
.http_method
== MOG_HTTP_METHOD_PUT
) {
464 if (!mog_valid_put_path(path
, len
)) {
476 /* TODO: see if the iovec overheads of writev() is even worth it... */
477 void mog_http_resp0(struct mog_fd
*mfd
, struct iovec
*status
, bool alive
)
481 char *dst
= iov
.iov_base
= mog_fsbuf_get(&iov
.iov_len
);
482 struct mog_http
*http
= &mfd
->as
.http
;
484 assert(status
->iov_len
* 2 + 1024 < iov
.iov_len
&& "fsbuf too small");
485 assert(status
->iov_len
> 3 && "HTTP response status too short");
487 #define CPY(str) mempcpy(dst, (str),(sizeof(str)-1))
488 dst
= CPY("HTTP/1.1 ");
489 dst
= mempcpy(dst
, status
->iov_base
, status
->iov_len
);
492 * putting this here avoids systemtap faults, as status->iov_base
493 * is already hot in cache from the above mempcpy
495 TRACE(CMOGSTORED_HTTP_RES_START(mfd
->fd
, status
->iov_base
));
497 dst
= CPY("\r\nDate: ");
499 dst
= mempcpy(dst
, now
->httpdate
, sizeof(now
->httpdate
)-1);
500 if (alive
&& http
->_p
.persistent
) {
501 dst
= CPY("\r\nContent-Length: 0"
502 "\r\nContent-Type: text/plain"
503 "\r\nConnection: keep-alive\r\n\r\n");
505 http
->_p
.persistent
= 0;
506 dst
= CPY("\r\nContent-Length: 0"
507 "\r\nContent-Type: text/plain"
508 "\r\nConnection: close\r\n\r\n");
510 iov
.iov_len
= dst
- (char *)iov
.iov_base
;
511 assert(http
->wbuf
== NULL
&& "tried to write with wbuf");
513 http
->wbuf
= mog_trywritev(mfd
->fd
, &iov
, 1);
516 /* call whenever we're ready to read the next HTTP request */
517 void mog_http_reset(struct mog_fd
*mfd
)
519 TRACE(CMOGSTORED_HTTP_RES_DONE(mfd
->fd
));
521 mog_http_reset_parser(&mfd
->as
.http
);