avoid unnecessary #to_io calls
[raindrops.git] / ext / raindrops / linux_inet_diag.c
blobe1ae62a7655f625ed9f35c794f4ce2aab4007724
1 #include <ruby.h>
2 #include <stdarg.h>
3 #include <ruby/st.h>
4 #include "my_fileno.h"
5 #ifdef __linux__
7 #ifdef HAVE_RB_THREAD_IO_BLOCKING_REGION
8 /* Ruby 1.9.3 and 2.0.0 */
9 VALUE rb_thread_io_blocking_region(rb_blocking_function_t *, void *, int);
10 # define rd_fd_region(fn,data,fd) \
11 rb_thread_io_blocking_region((fn),(data),(fd))
12 #elif defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) && \
13 defined(HAVE_RUBY_THREAD_H) && HAVE_RUBY_THREAD_H
14 /* in case Ruby 2.0+ ever drops rb_thread_io_blocking_region: */
15 # include <ruby/thread.h>
16 # define COMPAT_FN (void *(*)(void *))
17 # define rd_fd_region(fn,data,fd) \
18 rb_thread_call_without_gvl(COMPAT_FN(fn),(data),RUBY_UBF_IO,NULL)
19 #else
20 # error Ruby <= 1.8 not supported
21 #endif
23 #include <assert.h>
24 #include <errno.h>
25 #include <sys/socket.h>
26 #include <sys/types.h>
27 #include <netdb.h>
28 #include <unistd.h>
29 #include <fcntl.h>
30 #include <string.h>
31 #include <asm/types.h>
32 #include <netinet/in.h>
33 #include <arpa/inet.h>
34 #include <netinet/tcp.h>
35 #include <linux/netlink.h>
36 #include <linux/rtnetlink.h>
37 #include <linux/inet_diag.h>
39 union any_addr {
40 struct sockaddr_storage ss;
41 struct sockaddr sa;
42 struct sockaddr_in in;
43 struct sockaddr_in6 in6;
46 static size_t page_size;
47 static unsigned g_seq;
48 static VALUE cListenStats, cIDSock;
49 static ID id_new;
51 struct listen_stats {
52 uint32_t active;
53 uint32_t queued;
54 uint32_t listener_p;
57 #define OPLEN (sizeof(struct inet_diag_bc_op) + \
58 sizeof(struct inet_diag_hostcond) + \
59 sizeof(struct sockaddr_storage))
61 struct nogvl_args {
62 st_table *table;
63 struct iovec iov[3]; /* last iov holds inet_diag bytecode */
64 struct listen_stats stats;
65 int fd;
68 #ifdef SOCK_CLOEXEC
69 # define my_SOCK_RAW (SOCK_RAW|SOCK_CLOEXEC)
70 # define FORCE_CLOEXEC(v) (v)
71 #else
72 # define my_SOCK_RAW SOCK_RAW
73 static VALUE FORCE_CLOEXEC(VALUE io)
75 int fd = my_fileno(io);
76 int flags = fcntl(fd, F_SETFD, FD_CLOEXEC);
77 if (flags == -1)
78 rb_sys_fail("fcntl(F_SETFD, FD_CLOEXEC)");
79 return io;
81 #endif
84 * call-seq:
85 * Raindrops::InetDiagSocket.new -> Socket
87 * Creates a new Socket object for the netlink inet_diag facility
89 static VALUE ids_s_new(VALUE klass)
91 VALUE argv[3];
93 argv[0] = INT2NUM(AF_NETLINK);
94 argv[1] = INT2NUM(my_SOCK_RAW);
95 argv[2] = INT2NUM(NETLINK_INET_DIAG);
97 return FORCE_CLOEXEC(rb_call_super(3, argv));
100 /* creates a Ruby ListenStats Struct based on our internal listen_stats */
101 static VALUE rb_listen_stats(struct listen_stats *stats)
103 VALUE active = UINT2NUM(stats->active);
104 VALUE queued = UINT2NUM(stats->queued);
106 return rb_struct_new(cListenStats, active, queued);
109 static int st_free_data(st_data_t key, st_data_t value, st_data_t ignored)
111 xfree((void *)key);
112 xfree((void *)value);
114 return ST_DELETE;
118 * call-seq:
119 * remove_scope_id(ip_address)
121 * Returns copy of IP address with Scope ID removed,
122 * if address has it (only IPv6 actually may have it).
124 static VALUE remove_scope_id(const char *addr)
126 VALUE rv = rb_str_new2(addr);
127 long len = RSTRING_LEN(rv);
128 char *ptr = RSTRING_PTR(rv);
129 char *pct = memchr(ptr, '%', len);
132 * remove scoped portion
133 * Ruby equivalent: rv.sub!(/%([^\]]*)\]/, "]")
135 if (pct) {
136 size_t newlen = pct - ptr;
137 char *rbracket = memchr(pct, ']', len - newlen);
139 if (rbracket) {
140 size_t move = len - (rbracket - ptr);
142 memmove(pct, rbracket, move);
143 newlen += move;
145 rb_str_set_len(rv, newlen);
146 } else {
147 rb_raise(rb_eArgError,
148 "']' not found in IPv6 addr=%s", ptr);
151 return rv;
154 static int st_to_hash(st_data_t key, st_data_t value, VALUE hash)
156 struct listen_stats *stats = (struct listen_stats *)value;
158 if (stats->listener_p) {
159 VALUE k = remove_scope_id((const char *)key);
160 VALUE v = rb_listen_stats(stats);
162 OBJ_FREEZE(k);
163 rb_hash_aset(hash, k, v);
165 return st_free_data(key, value, 0);
168 static int st_AND_hash(st_data_t key, st_data_t value, VALUE hash)
170 struct listen_stats *stats = (struct listen_stats *)value;
172 if (stats->listener_p) {
173 VALUE k = remove_scope_id((const char *)key);
175 if (rb_hash_lookup(hash, k) == Qtrue) {
176 VALUE v = rb_listen_stats(stats);
177 OBJ_FREEZE(k);
178 rb_hash_aset(hash, k, v);
181 return st_free_data(key, value, 0);
184 static const char *addr_any(sa_family_t family)
186 static const char ipv4[] = "0.0.0.0";
187 static const char ipv6[] = "[::]";
189 if (family == AF_INET)
190 return ipv4;
191 assert(family == AF_INET6 && "unknown family");
192 return ipv6;
195 #ifdef __GNUC__
196 static void bug_warn_nogvl(const char *, ...)
197 __attribute__((format(printf,1,2)));
198 #endif
199 static void bug_warn_nogvl(const char *fmt, ...)
201 va_list ap;
203 va_start(ap, fmt);
204 vfprintf(stderr, fmt, ap);
205 va_end(ap);
207 fprintf(stderr, "Please report how you produced this at "\
208 "raindrops-public@yhbt.net\n");
209 fflush(stderr);
212 static struct listen_stats *stats_for(st_table *table, struct inet_diag_msg *r)
214 char *host, *key, *port, *old_key;
215 size_t alloca_len;
216 struct listen_stats *stats;
217 socklen_t hostlen;
218 socklen_t portlen = (socklen_t)sizeof("65535");
219 int n;
220 const void *src = r->id.idiag_src;
222 switch (r->idiag_family) {
223 case AF_INET: {
224 hostlen = INET_ADDRSTRLEN;
225 alloca_len = hostlen + portlen;
226 host = key = alloca(alloca_len);
227 break;
229 case AF_INET6: {
230 hostlen = INET6_ADDRSTRLEN;
231 alloca_len = 1 + hostlen + 1 + portlen;
232 key = alloca(alloca_len);
233 host = key + 1;
234 break;
236 default:
237 assert(0 && "unsupported address family, could that be IPv7?!");
239 if (!inet_ntop(r->idiag_family, src, host, hostlen)) {
240 bug_warn_nogvl("BUG: inet_ntop: %s\n", strerror(errno));
241 *key = '\0';
242 *host = '\0';
244 hostlen = (socklen_t)strlen(host);
245 switch (r->idiag_family) {
246 case AF_INET:
247 host[hostlen] = ':';
248 port = host + hostlen + 1;
249 break;
250 case AF_INET6:
251 key[0] = '[';
252 host[hostlen] = ']';
253 host[hostlen + 1] = ':';
254 port = host + hostlen + 2;
255 break;
256 default:
257 assert(0 && "unsupported address family, could that be IPv7?!");
260 n = snprintf(port, portlen, "%u", ntohs(r->id.idiag_sport));
261 if (n <= 0) {
262 bug_warn_nogvl("BUG: snprintf port: %d\n", n);
263 *key = '\0';
266 if (st_lookup(table, (st_data_t)key, (st_data_t *)&stats))
267 return stats;
269 old_key = key;
271 if (r->idiag_state == TCP_ESTABLISHED) {
272 n = snprintf(key, alloca_len, "%s:%u",
273 addr_any(r->idiag_family),
274 ntohs(r->id.idiag_sport));
275 if (n <= 0) {
276 bug_warn_nogvl("BUG: snprintf: %d\n", n);
277 *key = '\0';
279 if (st_lookup(table, (st_data_t)key, (st_data_t *)&stats))
280 return stats;
281 if (n <= 0) {
282 key = xmalloc(1);
283 *key = '\0';
284 } else {
285 old_key = key;
286 key = xmalloc(n + 1);
287 memcpy(key, old_key, n + 1);
289 } else {
290 size_t old_len = strlen(old_key) + 1;
291 key = xmalloc(old_len);
292 memcpy(key, old_key, old_len);
294 stats = xcalloc(1, sizeof(struct listen_stats));
295 st_insert(table, (st_data_t)key, (st_data_t)stats);
296 return stats;
299 static void table_incr_active(st_table *table, struct inet_diag_msg *r)
301 struct listen_stats *stats = stats_for(table, r);
302 ++stats->active;
305 static void table_set_queued(st_table *table, struct inet_diag_msg *r)
307 struct listen_stats *stats = stats_for(table, r);
308 stats->listener_p = 1;
309 stats->queued += r->idiag_rqueue;
312 /* inner loop of inet_diag, called for every socket returned by netlink */
313 static inline void r_acc(struct nogvl_args *args, struct inet_diag_msg *r)
316 * inode == 0 means the connection is still in the listen queue
317 * and has not yet been accept()-ed by the server. The
318 * inet_diag bytecode cannot filter this for us.
320 if (r->idiag_inode == 0)
321 return;
322 if (r->idiag_state == TCP_ESTABLISHED) {
323 if (args->table)
324 table_incr_active(args->table, r);
325 else
326 args->stats.active++;
327 } else { /* if (r->idiag_state == TCP_LISTEN) */
328 if (args->table)
329 table_set_queued(args->table, r);
330 else
331 args->stats.queued += r->idiag_rqueue;
334 * we wont get anything else because of the idiag_states filter
338 static const char err_sendmsg[] = "sendmsg";
339 static const char err_recvmsg[] = "recvmsg";
340 static const char err_nlmsg[] = "nlmsg";
342 struct diag_req {
343 struct nlmsghdr nlh;
344 struct inet_diag_req r;
347 static void prep_msghdr(
348 struct msghdr *msg,
349 struct nogvl_args *args,
350 struct sockaddr_nl *nladdr,
351 size_t iovlen)
353 memset(msg, 0, sizeof(struct msghdr));
354 msg->msg_name = (void *)nladdr;
355 msg->msg_namelen = sizeof(struct sockaddr_nl);
356 msg->msg_iov = args->iov;
357 msg->msg_iovlen = iovlen;
360 static void prep_diag_args(
361 struct nogvl_args *args,
362 struct sockaddr_nl *nladdr,
363 struct rtattr *rta,
364 struct diag_req *req,
365 struct msghdr *msg)
367 memset(req, 0, sizeof(struct diag_req));
368 memset(nladdr, 0, sizeof(struct sockaddr_nl));
370 nladdr->nl_family = AF_NETLINK;
372 req->nlh.nlmsg_len = (unsigned int)(sizeof(struct diag_req) +
373 RTA_LENGTH(args->iov[2].iov_len));
374 req->nlh.nlmsg_type = TCPDIAG_GETSOCK;
375 req->nlh.nlmsg_flags = NLM_F_ROOT | NLM_F_MATCH | NLM_F_REQUEST;
376 req->nlh.nlmsg_pid = getpid();
377 req->r.idiag_states = (1<<TCP_ESTABLISHED) | (1<<TCP_LISTEN);
378 rta->rta_type = INET_DIAG_REQ_BYTECODE;
379 rta->rta_len = RTA_LENGTH(args->iov[2].iov_len);
381 args->iov[0].iov_base = req;
382 args->iov[0].iov_len = sizeof(struct diag_req);
383 args->iov[1].iov_base = rta;
384 args->iov[1].iov_len = sizeof(struct rtattr);
386 prep_msghdr(msg, args, nladdr, 3);
389 static void prep_recvmsg_buf(struct nogvl_args *args)
391 /* reuse buffer that was allocated for bytecode */
392 args->iov[0].iov_len = page_size;
393 args->iov[0].iov_base = args->iov[2].iov_base;
396 /* does the inet_diag stuff with netlink(), this is called w/o GVL */
397 static VALUE diag(void *ptr)
399 struct nogvl_args *args = ptr;
400 struct sockaddr_nl nladdr;
401 struct rtattr rta;
402 struct diag_req req;
403 struct msghdr msg;
404 const char *err = NULL;
405 unsigned seq = ++g_seq;
407 prep_diag_args(args, &nladdr, &rta, &req, &msg);
408 req.nlh.nlmsg_seq = seq;
410 if (sendmsg(args->fd, &msg, 0) < 0) {
411 err = err_sendmsg;
412 goto out;
415 prep_recvmsg_buf(args);
417 while (1) {
418 ssize_t readed;
419 size_t r;
420 struct nlmsghdr *h = (struct nlmsghdr *)args->iov[0].iov_base;
422 prep_msghdr(&msg, args, &nladdr, 1);
423 readed = recvmsg(args->fd, &msg, 0);
424 if (readed < 0) {
425 if (errno == EINTR)
426 continue;
427 err = err_recvmsg;
428 goto out;
430 if (readed == 0)
431 goto out;
432 r = (size_t)readed;
433 for ( ; NLMSG_OK(h, r); h = NLMSG_NEXT(h, r)) {
434 if (h->nlmsg_seq != seq)
435 continue;
436 if (h->nlmsg_type == NLMSG_DONE)
437 goto out;
438 if (h->nlmsg_type == NLMSG_ERROR) {
439 err = err_nlmsg;
440 goto out;
442 r_acc(args, NLMSG_DATA(h));
445 out:
446 /* prepare to raise, free memory before reacquiring GVL */
447 if (err && args->table) {
448 int save_errno = errno;
450 st_foreach(args->table, st_free_data, 0);
451 st_free_table(args->table);
452 errno = save_errno;
454 return (VALUE)err;
457 /* populates sockaddr_storage struct by parsing +addr+ */
458 static void parse_addr(union any_addr *inet, VALUE addr)
460 char *host_ptr;
461 char *check;
462 char *colon = NULL;
463 char *rbracket = NULL;
464 void *dst;
465 long host_len;
466 int af, rc;
467 uint16_t *portdst;
468 unsigned long port;
470 Check_Type(addr, T_STRING);
471 host_ptr = StringValueCStr(addr);
472 host_len = RSTRING_LEN(addr);
473 if (*host_ptr == '[') { /* ipv6 address format (rfc2732) */
474 rbracket = memchr(host_ptr + 1, ']', host_len - 1);
476 if (rbracket == NULL)
477 rb_raise(rb_eArgError, "']' not found in IPv6 addr=%s",
478 host_ptr);
479 if (rbracket[1] != ':')
480 rb_raise(rb_eArgError, "':' not found in IPv6 addr=%s",
481 host_ptr);
482 colon = rbracket + 1;
483 host_ptr++;
484 *rbracket = 0;
485 inet->ss.ss_family = af = AF_INET6;
486 dst = &inet->in6.sin6_addr;
487 portdst = &inet->in6.sin6_port;
488 } else { /* ipv4 */
489 colon = memchr(host_ptr, ':', host_len);
490 inet->ss.ss_family = af = AF_INET;
491 dst = &inet->in.sin_addr;
492 portdst = &inet->in.sin_port;
495 if (!colon)
496 rb_raise(rb_eArgError, "port not found in: `%s'", host_ptr);
497 port = strtoul(colon + 1, &check, 10);
498 *colon = 0;
499 rc = inet_pton(af, host_ptr, dst);
500 *colon = ':';
501 if (rbracket) *rbracket = ']';
502 if (*check || ((uint16_t)port != port))
503 rb_raise(rb_eArgError, "invalid port: %s", colon + 1);
504 if (rc != 1)
505 rb_raise(rb_eArgError, "inet_pton failed for: `%s' with %d",
506 host_ptr, rc);
507 *portdst = ntohs((uint16_t)port);
510 /* generates inet_diag bytecode to match all addrs */
511 static void gen_bytecode_all(struct iovec *iov)
513 struct inet_diag_bc_op *op;
514 struct inet_diag_hostcond *cond;
516 /* iov_len was already set and base allocated in a parent function */
517 assert(iov->iov_len == OPLEN && iov->iov_base && "iov invalid");
518 op = iov->iov_base;
519 op->code = INET_DIAG_BC_S_COND;
520 op->yes = OPLEN;
521 op->no = sizeof(struct inet_diag_bc_op) + OPLEN;
522 cond = (struct inet_diag_hostcond *)(op + 1);
523 cond->family = AF_UNSPEC;
524 cond->port = -1;
525 cond->prefix_len = 0;
528 /* generates inet_diag bytecode to match a single addr */
529 static void gen_bytecode(struct iovec *iov, union any_addr *inet)
531 struct inet_diag_bc_op *op;
532 struct inet_diag_hostcond *cond;
534 /* iov_len was already set and base allocated in a parent function */
535 assert(iov->iov_len == OPLEN && iov->iov_base && "iov invalid");
536 op = iov->iov_base;
537 op->code = INET_DIAG_BC_S_COND;
538 op->yes = OPLEN;
539 op->no = sizeof(struct inet_diag_bc_op) + OPLEN;
541 cond = (struct inet_diag_hostcond *)(op + 1);
542 cond->family = inet->ss.ss_family;
543 switch (inet->ss.ss_family) {
544 case AF_INET: {
545 cond->port = ntohs(inet->in.sin_port);
546 cond->prefix_len = inet->in.sin_addr.s_addr == 0 ? 0 :
547 sizeof(inet->in.sin_addr.s_addr) * CHAR_BIT;
548 *cond->addr = inet->in.sin_addr.s_addr;
550 break;
551 case AF_INET6: {
552 cond->port = ntohs(inet->in6.sin6_port);
553 cond->prefix_len = memcmp(&in6addr_any, &inet->in6.sin6_addr,
554 sizeof(struct in6_addr)) == 0 ?
555 0 : sizeof(inet->in6.sin6_addr) * CHAR_BIT;
556 memcpy(&cond->addr, &inet->in6.sin6_addr,
557 sizeof(struct in6_addr));
559 break;
560 default:
561 assert(0 && "unsupported address family, could that be IPv7?!");
566 * n.b. we may safely raise here because an error will cause diag()
567 * to free args->table
569 static void nl_errcheck(VALUE r)
571 const char *err = (const char *)r;
573 if (err) {
574 if (err == err_nlmsg)
575 rb_raise(rb_eRuntimeError, "NLMSG_ERROR");
576 else
577 rb_sys_fail(err);
581 static VALUE tcp_stats(struct nogvl_args *args, VALUE addr)
583 union any_addr query_addr;
585 parse_addr(&query_addr, addr);
586 gen_bytecode(&args->iov[2], &query_addr);
588 memset(&args->stats, 0, sizeof(struct listen_stats));
589 nl_errcheck(rd_fd_region(diag, args, args->fd));
591 return rb_listen_stats(&args->stats);
594 static int drop_placeholders(st_data_t k, st_data_t v, st_data_t ign)
596 if ((VALUE)v == Qtrue)
597 return ST_DELETE;
598 return ST_CONTINUE;
602 * call-seq:
603 * Raindrops::Linux.tcp_listener_stats([addrs[, sock]]) => hash
605 * If specified, +addr+ may be a string or array of strings representing
606 * listen addresses to filter for. Returns a hash with given addresses as
607 * keys and ListenStats objects as the values or a hash of all addresses.
609 * addrs = %w(0.0.0.0:80 127.0.0.1:8080)
611 * If +addr+ is nil or not specified, all (IPv4) addresses are returned.
612 * If +sock+ is specified, it should be a Raindrops::InetDiagSock object.
614 static VALUE tcp_listener_stats(int argc, VALUE *argv, VALUE self)
616 VALUE rv = rb_hash_new();
617 struct nogvl_args args;
618 VALUE addrs, sock;
620 rb_scan_args(argc, argv, "02", &addrs, &sock);
623 * allocating page_size instead of OP_LEN since we'll reuse the
624 * buffer for recvmsg() later, we already checked for
625 * OPLEN <= page_size at initialization
627 args.iov[2].iov_len = OPLEN;
628 args.iov[2].iov_base = alloca(page_size);
629 args.table = NULL;
630 sock = NIL_P(sock) ? rb_funcall(cIDSock, id_new, 0)
631 : rb_io_get_io(sock);
632 args.fd = my_fileno(sock);
634 switch (TYPE(addrs)) {
635 case T_STRING:
636 rb_hash_aset(rv, addrs, tcp_stats(&args, addrs));
637 return rv;
638 case T_ARRAY: {
639 long i;
640 long len = RARRAY_LEN(addrs);
642 if (len == 1) {
643 VALUE cur = rb_ary_entry(addrs, 0);
645 rb_hash_aset(rv, cur, tcp_stats(&args, cur));
646 return rv;
648 for (i = 0; i < len; i++) {
649 union any_addr check;
650 VALUE cur = rb_ary_entry(addrs, i);
652 parse_addr(&check, cur);
653 rb_hash_aset(rv, cur, Qtrue /* placeholder */);
655 /* fall through */
657 case T_NIL:
658 args.table = st_init_strtable();
659 gen_bytecode_all(&args.iov[2]);
660 break;
661 default:
662 rb_raise(rb_eArgError,
663 "addr must be an array of strings, a string, or nil");
666 nl_errcheck(rd_fd_region(diag, &args, args.fd));
668 st_foreach(args.table, NIL_P(addrs) ? st_to_hash : st_AND_hash, rv);
669 st_free_table(args.table);
671 if (RHASH_SIZE(rv) > 1)
672 rb_hash_foreach(rv, drop_placeholders, Qfalse);
674 /* let GC deal with corner cases */
675 if (argc < 2) rb_io_close(sock);
676 return rv;
679 void Init_raindrops_linux_inet_diag(void)
681 VALUE cRaindrops = rb_define_class("Raindrops", rb_cObject);
682 VALUE mLinux = rb_define_module_under(cRaindrops, "Linux");
683 VALUE Socket;
685 rb_require("socket");
686 Socket = rb_const_get(rb_cObject, rb_intern("Socket"));
687 id_new = rb_intern("new");
690 * Document-class: Raindrops::InetDiagSocket
692 * This is a subclass of +Socket+ specifically for talking
693 * to the inet_diag facility of Netlink.
695 cIDSock = rb_define_class_under(cRaindrops, "InetDiagSocket", Socket);
696 rb_define_singleton_method(cIDSock, "new", ids_s_new, 0);
698 cListenStats = rb_const_get(cRaindrops, rb_intern("ListenStats"));
699 rb_gc_register_mark_object(cListenStats); /* pin */
701 rb_define_module_function(mLinux, "tcp_listener_stats",
702 tcp_listener_stats, -1);
704 page_size = getpagesize();
706 assert(OPLEN <= page_size && "bytecode OPLEN is not <= PAGE_SIZE");
708 #endif /* __linux__ */