nmdb: Use the appropriate type for storing send() return value
[nmdb.git] / nmdb / tcp.c
blob959114571cb6b42b34545751dcd1022d4e36b91c
2 #include <sys/types.h> /* socket defines */
3 #include <sys/socket.h> /* socket functions */
4 #include <stdlib.h> /* malloc() */
5 #include <stdint.h> /* uint32_t and friends */
6 #include <arpa/inet.h> /* htonls() and friends */
7 #include <netinet/in.h> /* INET stuff */
8 #include <netinet/tcp.h> /* TCP stuff */
9 #include <string.h> /* memcpy() */
10 #include <unistd.h> /* fcntl() */
11 #include <fcntl.h> /* fcntl() */
12 #include <errno.h> /* errno */
14 /* Workaround for libevent 1.1a: the header assumes u_char is typedef'ed to an
15 * unsigned char, and that "struct timeval" is in scope. */
16 typedef unsigned char u_char;
17 #include <sys/time.h>
18 #include <event.h> /* libevent stuff */
20 #include "tcp.h"
21 #include "common.h"
22 #include "net-const.h"
23 #include "req.h"
24 #include "parse.h"
25 #include "log.h"
28 /* TCP socket structure. Used mainly to hold buffers from incomplete
29 * recv()s. */
30 struct tcp_socket {
31 int fd;
32 struct sockaddr_in clisa;
33 socklen_t clilen;
34 struct event *evt;
36 unsigned char *buf;
37 size_t pktsize;
38 size_t len;
39 struct req_info req;
40 size_t excess;
43 static void tcp_recv(int fd, short event, void *arg);
44 static void process_buf(struct tcp_socket *tcpsock,
45 unsigned char *buf, size_t len);
47 static void tcp_reply_mini(const struct req_info *req, uint32_t reply);
48 static void tcp_reply_err(const struct req_info *req, uint32_t reply);
49 static void tcp_reply_long(const struct req_info *req, uint32_t reply,
50 unsigned char *val, size_t vsize);
54 * Miscelaneous helper functions
57 static void tcp_socket_free(struct tcp_socket *tcpsock)
59 if (tcpsock->evt)
60 free(tcpsock->evt);
61 if (tcpsock->buf)
62 free(tcpsock->buf);
63 free(tcpsock);
66 static void init_req(struct tcp_socket *tcpsock)
68 tcpsock->req.fd = tcpsock->fd;
69 tcpsock->req.type = REQTYPE_TCP;
70 tcpsock->req.clisa = (struct sockaddr *) &tcpsock->clisa;
71 tcpsock->req.clilen = tcpsock->clilen;
72 tcpsock->req.reply_mini = tcp_reply_mini;
73 tcpsock->req.reply_err = tcp_reply_err;
74 tcpsock->req.reply_long = tcp_reply_long;
77 static void rep_send_error(const struct req_info *req, const unsigned int code)
79 uint32_t l, r, c;
80 ssize_t rv;
81 unsigned char minibuf[4 * 4];
83 if (settings.passive)
84 return;
86 /* Network format: length (4), ID (4), REP_ERR (4), error code (4) */
87 l = htonl(4 + 4 + 4 + 4);
88 r = htonl(REP_ERR);
89 c = htonl(code);
90 memcpy(minibuf, &l, 4);
91 memcpy(minibuf + 4, &(req->id), 4);
92 memcpy(minibuf + 8, &r, 4);
93 memcpy(minibuf + 12, &c, 4);
95 /* If this send fails, there's nothing to be done */
96 rv = send(req->fd, minibuf, 4 * 4, 0);
98 if (rv < 0) {
99 errlog("rep_send_error() failed");
104 static int rep_send(const struct req_info *req, const unsigned char *buf,
105 const size_t size)
107 ssize_t rv, c;
109 if (settings.passive)
110 return 1;
112 c = 0;
113 while (c < size) {
114 rv = send(req->fd, buf + c, size - c, 0);
116 if (rv == size) {
117 return 1;
118 } else if (rv < 0) {
119 if (errno != EAGAIN || errno != EWOULDBLOCK) {
120 rep_send_error(req, ERR_SEND);
121 return 0;
122 } else {
123 /* With big packets, the receiver window might
124 * get exhausted and send() would block, but
125 * as the fd is set in non-blocking mode, it
126 * returns EAGAIN. This makes us to retry when
127 * send() fails in this way.
129 * The proper way to fix this would be to add
130 * an event so we get notified when the fd is
131 * available for writing, and retry the send;
132 * but this is complex so leave it for when
133 * it's really needed. */
134 continue;
136 } else if (rv == 0) {
137 return 1;
140 c += rv;
143 return 1;
147 /* Send small replies, consisting in only a value. */
148 static void tcp_reply_mini(const struct req_info *req, uint32_t reply)
150 /* We use a mini buffer to speedup the small replies, to avoid the
151 * malloc() overhead. */
152 uint32_t len;
153 unsigned char minibuf[12];
155 if (settings.passive)
156 return;
158 len = htonl(12);
159 reply = htonl(reply);
160 memcpy(minibuf, &len, 4);
161 memcpy(minibuf + 4, &(req->id), 4);
162 memcpy(minibuf + 8, &reply, 4);
163 rep_send(req, minibuf, 12);
164 return;
168 static void tcp_reply_err(const struct req_info *req, uint32_t reply)
170 rep_send_error(req, reply);
173 static void tcp_reply_long(const struct req_info *req, uint32_t reply,
174 unsigned char *val, size_t vsize)
176 if (val == NULL) {
177 /* miss */
178 tcp_reply_mini(req, reply);
179 } else {
180 unsigned char *buf;
181 size_t bsize;
182 uint32_t t;
184 reply = htonl(reply);
186 /* The reply length is:
187 * 4 total length
188 * 4 id
189 * 4 reply code
190 * 4 vsize
191 * vsize val
193 bsize = 4 + 4 + 4 + 4 + vsize;
194 buf = malloc(bsize);
196 t = htonl(bsize);
197 memcpy(buf, &t, 4);
199 memcpy(buf + 4, &(req->id), 4);
200 memcpy(buf + 8, &reply, 4);
202 t = htonl(vsize);
203 memcpy(buf + 12, &t, 4);
204 memcpy(buf + 16, val, vsize);
206 rep_send(req, buf, bsize);
207 free(buf);
209 return;
215 * Main functions for receiving and parsing
218 int tcp_init(void)
220 int fd, rv;
221 struct sockaddr_in srvsa;
222 struct in_addr ia;
224 rv = inet_pton(AF_INET, settings.tcp_addr, &ia);
225 if (rv <= 0)
226 return -1;
228 srvsa.sin_family = AF_INET;
229 srvsa.sin_addr.s_addr = ia.s_addr;
230 srvsa.sin_port = htons(settings.tcp_port);
233 fd = socket(AF_INET, SOCK_STREAM, 0);
234 if (fd < 0)
235 return -1;
237 rv = 1;
238 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &rv, sizeof(rv)) < 0 ) {
239 close(fd);
240 return -1;
243 rv = bind(fd, (struct sockaddr *) &srvsa, sizeof(srvsa));
244 if (rv < 0) {
245 close(fd);
246 return -1;
249 rv = listen(fd, 1024);
250 if (rv < 0) {
251 close(fd);
252 return -1;
255 /* Disable nagle algorithm, as we often handle small amounts of data
256 * it can make I/O quite slow. */
257 rv = 1;
258 if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &rv, sizeof(rv)) < 0 ) {
259 close(fd);
260 return -1;
263 return fd;
267 void tcp_close(int fd)
269 close(fd);
273 /* Called by libevent for each receive event on our listen fd */
274 void tcp_newconnection(int fd, short event, void *arg)
276 int newfd;
277 struct tcp_socket *tcpsock;
278 struct event *new_event;
280 tcpsock = malloc(sizeof(struct tcp_socket));
281 if (tcpsock == NULL) {
282 return;
284 tcpsock->clilen = sizeof(tcpsock->clisa);
286 new_event = malloc(sizeof(struct event));
287 if (new_event == NULL) {
288 free(tcpsock);
289 return;
292 newfd = accept(fd,
293 (struct sockaddr *) &(tcpsock->clisa),
294 &(tcpsock->clilen));
296 if (fcntl(newfd, F_SETFL, O_NONBLOCK) != 0) {
297 close(newfd);
298 free(new_event);
299 free(tcpsock);
300 return;
303 tcpsock->fd = newfd;
304 tcpsock->evt = new_event;
305 tcpsock->buf = NULL;
306 tcpsock->pktsize = 0;
307 tcpsock->len = 0;
308 tcpsock->excess = 0;
310 event_set(new_event, newfd, EV_READ | EV_PERSIST, tcp_recv,
311 (void *) tcpsock);
312 event_add(new_event, NULL);
314 return;
318 /* Static common buffer to avoid unnecessary allocation on the common case
319 * where we get an entire single message on each recv().
320 * Allocate a little bit more over the max. message size, which is 64kb. */
321 #define SBSIZE (68 * 1024)
322 static unsigned char static_buf[SBSIZE];
324 /* Called by libevent for each receive event */
325 static void tcp_recv(int fd, short event, void *arg)
327 int rv;
328 struct tcp_socket *tcpsock;
330 tcpsock = (struct tcp_socket *) arg;
332 if (tcpsock->buf == NULL) {
333 /* New incoming message */
334 rv = recv(fd, static_buf, SBSIZE, 0);
335 if (rv < 0 && errno == EAGAIN) {
336 /* We were awoken but have no data to read, so we do
337 * nothing */
338 return;
339 } else if (rv <= 0) {
340 /* Orderly shutdown or error; close the file
341 * descriptor in either case. */
342 goto error_exit;
345 init_req(tcpsock);
346 process_buf(tcpsock, static_buf, rv);
348 } else {
349 /* We already got a partial message, complete it. */
350 size_t maxtoread = tcpsock->pktsize - tcpsock->len;
352 rv = recv(fd, tcpsock->buf + tcpsock->len, maxtoread, 0);
353 if (rv < 0 && errno == EAGAIN) {
354 return;
355 } else if (rv <= 0) {
356 goto error_exit;
359 tcpsock->len += rv;
361 process_buf(tcpsock,tcpsock->buf, tcpsock->len);
364 return;
366 error_exit:
367 close(fd);
368 event_del(tcpsock->evt);
369 tcp_socket_free(tcpsock);
370 return;
374 /* Main message unwrapping */
375 static void process_buf(struct tcp_socket *tcpsock,
376 unsigned char *buf, size_t len)
378 uint32_t totaltoget = 0;
380 if (len >= 4) {
381 totaltoget = * (uint32_t *) buf;
382 totaltoget = ntohl(totaltoget);
383 if (totaltoget > (64 * 1024) || totaltoget <= 8) {
384 /* Message too big or too small, close the connection. */
385 goto error_exit;
388 } else {
389 /* If we didn't even read 4 bytes, we try to read 4 first and
390 * then care about the rest. */
391 totaltoget = 4;
394 if (totaltoget > len) {
395 if (tcpsock->buf == NULL) {
396 /* The first incomplete recv().
397 * Create a temporary buffer and copy the contents of
398 * our current one (which is static_buf, otherwise
399 * tcpsock->buf wouldn't be NULL) to it. */
400 tcpsock->buf = malloc(SBSIZE);
401 if (tcpsock->buf == NULL)
402 goto error_exit;
404 memcpy(tcpsock->buf, buf, len);
405 tcpsock->len = len;
406 tcpsock->pktsize = totaltoget;
408 } else {
409 /* We already had an incomplete recv() and this is
410 * just another one. */
411 tcpsock->len = len;
412 tcpsock->pktsize = totaltoget;
414 return;
417 if (totaltoget < len) {
418 /* Got more than one message in the same recv(); save the
419 * amount of bytes exceeding so we can process it later. */
420 tcpsock->excess = len - totaltoget;
421 len = totaltoget;
424 /* The buffer is complete, parse it as usual. */
425 stats.msg_tcp++;
426 if (parse_message(&(tcpsock->req), buf + 4, len - 4)) {
427 goto exit;
428 } else {
429 goto error_exit;
433 exit:
434 if (tcpsock->excess) {
435 /* If there are buffer leftovers (because there was more than
436 * one message on a recv()), leave the buffer, move the
437 * leftovers to the beginning, adjust the numbers and parse
438 * recursively.
439 * The buffer can be the static one or the one in tcpsock (if
440 * we had a short recv()); we don't care because we know it
441 * will be big enough to hold an entire message anyway. */
442 memmove(buf, buf + len, tcpsock->excess);
443 tcpsock->len = tcpsock->excess;
444 tcpsock->excess = 0;
446 /* Build a new req just like when we first recv(). */
447 init_req(tcpsock);
448 process_buf(tcpsock, buf, len);
449 return;
453 if (tcpsock->buf) {
454 /* We had an incomplete read somewhere along the processing of
455 * this message, and had to malloc() a temporary space. free()
456 * it and reset the associated information. */
457 free(tcpsock->buf);
458 tcpsock->buf = NULL;
459 tcpsock->len = 0;
460 tcpsock->pktsize = 0;
461 tcpsock->excess = 0;
464 return;
466 error_exit:
467 close(tcpsock->fd);
468 event_del(tcpsock->evt);
469 tcp_socket_free(tcpsock);
470 return;