2 #include <sys/types.h> /* socket defines */
3 #include <sys/socket.h> /* socket functions */
4 #include <stdlib.h> /* malloc() */
5 #include <stdint.h> /* uint32_t and friends */
6 #include <arpa/inet.h> /* htonls() and friends */
7 #include <netinet/in.h> /* INET stuff */
8 #include <netinet/tcp.h> /* TCP stuff */
9 #include <string.h> /* memcpy() */
10 #include <unistd.h> /* fcntl() */
11 #include <fcntl.h> /* fcntl() */
12 #include <errno.h> /* errno */
14 /* Workaround for libevent 1.1a: the header assumes u_char is typedef'ed to an
15 * unsigned char, and that "struct timeval" is in scope. */
16 typedef unsigned char u_char
;
18 #include <event.h> /* libevent stuff */
22 #include "net-const.h"
28 /* TCP socket structure. Used mainly to hold buffers from incomplete
32 struct sockaddr_in clisa
;
43 static void tcp_recv(int fd
, short event
, void *arg
);
44 static void process_buf(struct tcp_socket
*tcpsock
,
45 unsigned char *buf
, size_t len
);
47 static void tcp_reply_mini(const struct req_info
*req
, uint32_t reply
);
48 static void tcp_reply_err(const struct req_info
*req
, uint32_t reply
);
49 static void tcp_reply_long(const struct req_info
*req
, uint32_t reply
,
50 unsigned char *val
, size_t vsize
);
54 * Miscelaneous helper functions
57 static void tcp_socket_free(struct tcp_socket
*tcpsock
)
66 static void init_req(struct tcp_socket
*tcpsock
)
68 tcpsock
->req
.fd
= tcpsock
->fd
;
69 tcpsock
->req
.type
= REQTYPE_TCP
;
70 tcpsock
->req
.clisa
= (struct sockaddr
*) &tcpsock
->clisa
;
71 tcpsock
->req
.clilen
= tcpsock
->clilen
;
72 tcpsock
->req
.reply_mini
= tcp_reply_mini
;
73 tcpsock
->req
.reply_err
= tcp_reply_err
;
74 tcpsock
->req
.reply_long
= tcp_reply_long
;
77 static void rep_send_error(const struct req_info
*req
, const unsigned int code
)
81 unsigned char minibuf
[4 * 4];
86 /* Network format: length (4), ID (4), REP_ERR (4), error code (4) */
87 l
= htonl(4 + 4 + 4 + 4);
90 memcpy(minibuf
, &l
, 4);
91 memcpy(minibuf
+ 4, &(req
->id
), 4);
92 memcpy(minibuf
+ 8, &r
, 4);
93 memcpy(minibuf
+ 12, &c
, 4);
95 /* If this send fails, there's nothing to be done */
96 rv
= send(req
->fd
, minibuf
, 4 * 4, 0);
99 errlog("rep_send_error() failed");
104 static int rep_send(const struct req_info
*req
, const unsigned char *buf
,
109 if (settings
.passive
)
114 rv
= send(req
->fd
, buf
+ c
, size
- c
, 0);
119 if (errno
!= EAGAIN
|| errno
!= EWOULDBLOCK
) {
120 rep_send_error(req
, ERR_SEND
);
123 /* With big packets, the receiver window might
124 * get exhausted and send() would block, but
125 * as the fd is set in non-blocking mode, it
126 * returns EAGAIN. This makes us to retry when
127 * send() fails in this way.
129 * The proper way to fix this would be to add
130 * an event so we get notified when the fd is
131 * available for writing, and retry the send;
132 * but this is complex so leave it for when
133 * it's really needed. */
136 } else if (rv
== 0) {
147 /* Send small replies, consisting in only a value. */
148 static void tcp_reply_mini(const struct req_info
*req
, uint32_t reply
)
150 /* We use a mini buffer to speedup the small replies, to avoid the
151 * malloc() overhead. */
153 unsigned char minibuf
[12];
155 if (settings
.passive
)
159 reply
= htonl(reply
);
160 memcpy(minibuf
, &len
, 4);
161 memcpy(minibuf
+ 4, &(req
->id
), 4);
162 memcpy(minibuf
+ 8, &reply
, 4);
163 rep_send(req
, minibuf
, 12);
168 static void tcp_reply_err(const struct req_info
*req
, uint32_t reply
)
170 rep_send_error(req
, reply
);
173 static void tcp_reply_long(const struct req_info
*req
, uint32_t reply
,
174 unsigned char *val
, size_t vsize
)
178 tcp_reply_mini(req
, reply
);
184 reply
= htonl(reply
);
186 /* The reply length is:
193 bsize
= 4 + 4 + 4 + 4 + vsize
;
199 memcpy(buf
+ 4, &(req
->id
), 4);
200 memcpy(buf
+ 8, &reply
, 4);
203 memcpy(buf
+ 12, &t
, 4);
204 memcpy(buf
+ 16, val
, vsize
);
206 rep_send(req
, buf
, bsize
);
215 * Main functions for receiving and parsing
221 struct sockaddr_in srvsa
;
224 rv
= inet_pton(AF_INET
, settings
.tcp_addr
, &ia
);
228 srvsa
.sin_family
= AF_INET
;
229 srvsa
.sin_addr
.s_addr
= ia
.s_addr
;
230 srvsa
.sin_port
= htons(settings
.tcp_port
);
233 fd
= socket(AF_INET
, SOCK_STREAM
, 0);
238 if (setsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, &rv
, sizeof(rv
)) < 0 ) {
243 rv
= bind(fd
, (struct sockaddr
*) &srvsa
, sizeof(srvsa
));
249 rv
= listen(fd
, 1024);
255 /* Disable nagle algorithm, as we often handle small amounts of data
256 * it can make I/O quite slow. */
258 if (setsockopt(fd
, IPPROTO_TCP
, TCP_NODELAY
, &rv
, sizeof(rv
)) < 0 ) {
267 void tcp_close(int fd
)
273 /* Called by libevent for each receive event on our listen fd */
274 void tcp_newconnection(int fd
, short event
, void *arg
)
277 struct tcp_socket
*tcpsock
;
278 struct event
*new_event
;
280 tcpsock
= malloc(sizeof(struct tcp_socket
));
281 if (tcpsock
== NULL
) {
284 tcpsock
->clilen
= sizeof(tcpsock
->clisa
);
286 new_event
= malloc(sizeof(struct event
));
287 if (new_event
== NULL
) {
293 (struct sockaddr
*) &(tcpsock
->clisa
),
296 if (fcntl(newfd
, F_SETFL
, O_NONBLOCK
) != 0) {
304 tcpsock
->evt
= new_event
;
306 tcpsock
->pktsize
= 0;
310 event_set(new_event
, newfd
, EV_READ
| EV_PERSIST
, tcp_recv
,
312 event_add(new_event
, NULL
);
318 /* Static common buffer to avoid unnecessary allocation on the common case
319 * where we get an entire single message on each recv().
320 * Allocate a little bit more over the max. message size, which is 64kb. */
321 #define SBSIZE (68 * 1024)
322 static unsigned char static_buf
[SBSIZE
];
324 /* Called by libevent for each receive event */
325 static void tcp_recv(int fd
, short event
, void *arg
)
328 struct tcp_socket
*tcpsock
;
330 tcpsock
= (struct tcp_socket
*) arg
;
332 if (tcpsock
->buf
== NULL
) {
333 /* New incoming message */
334 rv
= recv(fd
, static_buf
, SBSIZE
, 0);
335 if (rv
< 0 && errno
== EAGAIN
) {
336 /* We were awoken but have no data to read, so we do
339 } else if (rv
<= 0) {
340 /* Orderly shutdown or error; close the file
341 * descriptor in either case. */
346 process_buf(tcpsock
, static_buf
, rv
);
349 /* We already got a partial message, complete it. */
350 size_t maxtoread
= tcpsock
->pktsize
- tcpsock
->len
;
352 rv
= recv(fd
, tcpsock
->buf
+ tcpsock
->len
, maxtoread
, 0);
353 if (rv
< 0 && errno
== EAGAIN
) {
355 } else if (rv
<= 0) {
361 process_buf(tcpsock
,tcpsock
->buf
, tcpsock
->len
);
368 event_del(tcpsock
->evt
);
369 tcp_socket_free(tcpsock
);
374 /* Main message unwrapping */
375 static void process_buf(struct tcp_socket
*tcpsock
,
376 unsigned char *buf
, size_t len
)
378 uint32_t totaltoget
= 0;
381 totaltoget
= * (uint32_t *) buf
;
382 totaltoget
= ntohl(totaltoget
);
383 if (totaltoget
> (64 * 1024) || totaltoget
<= 8) {
384 /* Message too big or too small, close the connection. */
389 /* If we didn't even read 4 bytes, we try to read 4 first and
390 * then care about the rest. */
394 if (totaltoget
> len
) {
395 if (tcpsock
->buf
== NULL
) {
396 /* The first incomplete recv().
397 * Create a temporary buffer and copy the contents of
398 * our current one (which is static_buf, otherwise
399 * tcpsock->buf wouldn't be NULL) to it. */
400 tcpsock
->buf
= malloc(SBSIZE
);
401 if (tcpsock
->buf
== NULL
)
404 memcpy(tcpsock
->buf
, buf
, len
);
406 tcpsock
->pktsize
= totaltoget
;
409 /* We already had an incomplete recv() and this is
410 * just another one. */
412 tcpsock
->pktsize
= totaltoget
;
417 if (totaltoget
< len
) {
418 /* Got more than one message in the same recv(); save the
419 * amount of bytes exceeding so we can process it later. */
420 tcpsock
->excess
= len
- totaltoget
;
424 /* The buffer is complete, parse it as usual. */
426 if (parse_message(&(tcpsock
->req
), buf
+ 4, len
- 4)) {
434 if (tcpsock
->excess
) {
435 /* If there are buffer leftovers (because there was more than
436 * one message on a recv()), leave the buffer, move the
437 * leftovers to the beginning, adjust the numbers and parse
439 * The buffer can be the static one or the one in tcpsock (if
440 * we had a short recv()); we don't care because we know it
441 * will be big enough to hold an entire message anyway. */
442 memmove(buf
, buf
+ len
, tcpsock
->excess
);
443 tcpsock
->len
= tcpsock
->excess
;
446 /* Build a new req just like when we first recv(). */
448 process_buf(tcpsock
, buf
, len
);
454 /* We had an incomplete read somewhere along the processing of
455 * this message, and had to malloc() a temporary space. free()
456 * it and reset the associated information. */
460 tcpsock
->pktsize
= 0;
468 event_del(tcpsock
->evt
);
469 tcp_socket_free(tcpsock
);