Cosmetics
[rawv.git] / net.cpp
blob2268af244e286221c668217bcc15a0c60b346ea3
1 /* network submodule for rawv
2 * Copyright (C) 2010,2011,2012 Kirill Smelkov <kirr@navytux.spb.ru>
3 * Copyright (C) 2011 Marine Bridge and Navigation Systems (http://mns.spb.ru/)
5 * This library is free software: you can Use, Study, Modify and Redistribute
6 * it under the terms of the GNU Lesser General Public License version 2.1, or
7 * any later version. This library is distributed WITHOUT ANY WARRANTY. See
8 * COPYING.LIB file for full License terms.
9 */
11 #include "rawv.h"
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <unistd.h>
16 #include <fcntl.h>
17 #include <errno.h>
18 #include <string.h>
20 using std::min;
22 namespace rawv {
24 /******** NetTx ********/
26 /** whether an `addr` is IPv4 multicast */
27 int in_multicast(const struct sockaddr_in *addr)
29 /* class D 224.0.0.0 - 239.255.255.255 */
30 return ((ntohl(addr->sin_addr.s_addr) & 0xf0000000) == 0xe0000000);
33 /** whether an `addr` is IPv4 broadcast
35 * http://en.wikipedia.org/wiki/IPv4_subnetting_reference
37 int in_broadcast(const struct sockaddr_in *addr)
39 unsigned long ip = ntohl(addr->sin_addr.s_addr);
41 /* class A 0.0.0.0 - 127.255.255.255 /8 */
42 if ((ip & 0x80000000) == 0x00000000)
43 return (ip & 0x00ffffff) == 0x00ffffff;
45 /* class B 128.0.0.0 - 191.255.255.255 /16 */
46 if ((ip & 0xc0000000) == 0x80000000)
47 return (ip & 0x0000ffff) == 0x0000ffff;
49 /* class C 192.0.0.0 - 223.255.255.255 /24 */
50 if ((ip & 0xe0000000) == 0xc0000000)
51 return (ip & 0x000000ff) == 0x000000ff;
53 return 0;
57 NetTx::NetTx(const char *dest, int port, int mtu)
59 int one=1; /* XXX has to be uint8_t on win32 */
61 sk = socket(PF_INET, SOCK_DGRAM, 0);
62 if (sk == -1)
63 die_errno("socket");
65 tx_addr.sin_family = AF_INET;
66 tx_addr.sin_port = htons(port);
67 tx_addr.sin_addr.s_addr = inet_addr(dest);
68 if (tx_addr.sin_addr.s_addr == INADDR_NONE)
69 die("E: inet_addr(%s) fail", dest);
71 if (in_broadcast(&tx_addr)) {
72 fprintf(stderr, "tx: broadcasting...\n");
74 if (setsockopt(sk, SOL_SOCKET, SO_BROADCAST, &one, sizeof(one)) == -1)
75 die_errno("setsockopt(SO_BROADCAST)");
78 if (in_multicast(&tx_addr)) {
79 fprintf(stderr, "tx: multicasting...\n");
81 if (setsockopt(sk, IPPROTO_IP, IP_MULTICAST_TTL, &one, sizeof(one)) == -1)
82 die_errno("setsockopt(IP_MULTICAST_TTL)");
84 if (setsockopt(sk, IPPROTO_IP, IP_MULTICAST_LOOP, &one, sizeof(one)) == -1)
85 die_errno("setsockopt(IP_MULTICAST_LOOP)");
88 this->mtu = mtu;
92 NetTx::~NetTx()
94 close(sk);
98 bool NetTx::v_query_framebuf(Frame *f)
100 /* always use source's buffer - we'll transmit frames directly from it */
101 return false;
105 /* upon receiving new frame tx it to network */
106 void NetTx::v_on_frame(const Frame *f)
108 #define RAWV_MAX_FRAG_NLINES 16 /* there is no limit, it's just me being lazy */
109 struct iovec iov[1+RAWV_MAX_FRAG_NLINES]; /* header + payload */
110 int iovlen;
111 struct msghdr msg;
113 int pixsize, frag_nlines, nfragment, line, i;
114 int fheight = f->height;
115 int bt = f->interlace_tb_swapped;
117 struct rawv_header h;
119 switch (f->pixfmt_4cc) {
120 case MKTAG32('Y','U','Y','V'):
121 case MKTAG32('Y','U','Y','2'):
122 pixsize = 2;
123 break;
125 // TODO GREY
127 default:
128 die("tx_frame: don't know pixfmt 0x%08x", f->pixfmt_4cc);
131 frag_nlines = (mtu - sizeof(struct rawv_header)) / (f->width * pixsize);
132 if (bt) {
133 if (bt < 0)
134 die("TODO bt=-1 in NetTx");
136 /* make frag_nlines and fheight even, if lines are swapped in captured frame */
137 frag_nlines &= ~1U;
138 fheight &= ~1U;
141 if (frag_nlines > RAWV_MAX_FRAG_NLINES)
142 die("FIXME frag_nlines > RAWV_MAX_FRAG_NLINES (%i > %i)", frag_nlines, RAWV_MAX_FRAG_NLINES);
144 nfragment = 0;
146 for (line=0; line<fheight; line+= frag_nlines, nfragment++) {
147 h.magic = MKTAG32('R','A','W','V');
148 h.version = 1;
149 h.__reserved_for_flags = 0x00;
150 h.nframe = htons(f->sequence);
151 h.nfragment = htons(nfragment);
152 h.fragments_total = htons((fheight + frag_nlines-1) / frag_nlines);
153 h.width = htons(f->width);
154 h.height = htons(fheight);
155 h.pixfmt = f->pixfmt_4cc;
157 h.frag_startline = htons(line);
158 h.frag_nlines = htons(min(frag_nlines, fheight - line));
160 /* temp: helps debugging bitstream on the wire */
161 h.__reserved[0] = 0xaaaaaaaa;
162 h.__reserved[1] = 0xffffffff;
164 iov[0].iov_base = &h;
165 iov[0].iov_len = sizeof(h);
166 iovlen = 1;
168 if ((f->width * pixsize == f->bytesperline) &&
169 !bt) {
170 /* fastpath for stride=width, progressive */
171 iov[iovlen].iov_base = f->start + line*f->bytesperline;
172 iov[iovlen].iov_len = frag_nlines*f->bytesperline;
174 ++iovlen;
176 else {
177 /* strided and/or top/bottom swapped frame */
178 for (i=0; i<frag_nlines; ++i) {
179 iov[iovlen].iov_base = f->start + (line+i+bt)*f->bytesperline;
180 iov[iovlen].iov_len = f->width * pixsize;
182 ++iovlen;
183 bt = -bt;
187 msg.msg_name = &tx_addr;
188 msg.msg_namelen = sizeof(tx_addr);
190 msg.msg_iov = iov;
191 msg.msg_iovlen = iovlen;
193 msg.msg_control = NULL;
194 msg.msg_controllen = 0;
195 msg.msg_flags = 0;
197 /* XXX what to do if not whole pkt sent? */
198 if (-1 == sendmsg(sk, &msg, 0 /*flags*/))
199 die_errno("sendmsg");
205 /******** NetRx ********/
207 /* recommended minimum for socket rx buffer */
208 #define SK_RCVBUF_MIN (1024*1024)
210 NetRx::NetRx(const char *mcast_group, int port, int mtu)
212 int one=1; /* XXX has to be uint8_t on win32 */
213 int rcvbuf;
214 socklen_t rcvbuf_len;
215 struct ip_mreqn mreq;
217 sk = socket(PF_INET, SOCK_DGRAM, 0);
218 if (sk == -1)
219 die_errno("socket");
221 /* ensure rx buffer is big enough */
222 rcvbuf_len = sizeof(rcvbuf);
223 if (-1 == getsockopt(sk, SOL_SOCKET, SO_RCVBUF, &rcvbuf, &rcvbuf_len))
224 die_errno("getsockopt(SO_RCVBUF)");
226 if (rcvbuf < SK_RCVBUF_MIN) {
227 rcvbuf = SK_RCVBUF_MIN;
229 if (-1 == setsockopt(sk, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)))
230 warn("W: can't increase sk rx buffer to %i bytes\n", rcvbuf);
232 rcvbuf_len = sizeof(rcvbuf);
233 if (-1 == getsockopt(sk, SOL_SOCKET, SO_RCVBUF, &rcvbuf, &rcvbuf_len))
234 die_errno("getsockopt(SO_RCVBUF)");
236 if (rcvbuf < SK_RCVBUF_MIN)
237 warn("W: can't increase sk rx buffer to %i bytes (got only %i)\n",
238 SK_RCVBUF_MIN, rcvbuf);
242 /* allow multiple receivers */
243 if (-1 == setsockopt(sk, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)))
244 die_errno("setsockopt(SO_REUSEADDR)");
246 rx_addr.sin_family = AF_INET;
247 rx_addr.sin_port = htons(port);
248 rx_addr.sin_addr.s_addr = INADDR_ANY;
250 if (-1 == bind(sk, (sockaddr *)&rx_addr, sizeof(rx_addr)))
251 die_errno("bind");
253 // XXX is_broadcast ?
254 if (mcast_group) { // XXX or is_multicast(mcast_group) ?
255 mreq.imr_multiaddr.s_addr = inet_addr(mcast_group);
256 mreq.imr_address.s_addr = INADDR_ANY;
257 mreq.imr_ifindex = 0; /* any */
259 if (mreq.imr_multiaddr.s_addr == INADDR_NONE)
260 die("E: inet_addr(%s) fail", mcast_group);
262 if (-1 == setsockopt(sk, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)))
263 die_errno("setsockopt(IP_ADD_MEMBERSHIP)");
267 /* set socket non-blocking, so that we could avoid tons of select and just
268 * loop on read until EAGAIN
270 * FIXME does not work as expected - read comments below near recv()
272 if (-1 == fcntl(sk, F_SETFL, O_NONBLOCK))
273 die_errno("fcntl(F_SETFL, O_NONBLOCK)");
276 fragbuf.resize(mtu);
278 f.start = NULL;
279 f.length = 0;
280 f.width = f.height = f.bytesperline = 0;
281 f.pixfmt_4cc = 0;
282 f.sequence = 0;
283 f.interlace_tb_swapped = 0; /* always progressive on the wire */
285 nframe = 0; // XXX better -1 ?
286 nfragment = 0;
287 fragments_total = 0;
288 frag_received = 0;
289 frag_dropped_total = 0;
293 NetRx::~NetRx()
295 close(sk); /* this will also leave multicast group */
299 void NetRx::v_start_capture()
301 die("TODO NetRx::v_start_capture()");
305 void NetRx::v_stop_capture()
307 die("TODO NetRx::v_stop_capture()");
311 int NetRx::handle_recv()
313 int err;
314 int count = 0;
316 /* loop till we get EAGAIN. This way we avoid lots of unneeded select
317 * calls
319 * FIXME does not work as expected: Under load we seldomly fetch more than
320 * a couple of packets in one burst, and so go back to select, back to
321 * here, back to select, back here, etc...
323 while (1) {
324 err = recv(sk, &fragbuf[0], fragbuf.size(), /*flags*/ 0);
325 if (err == -1)
326 switch (errno) {
327 case EAGAIN:
328 //fprintf(stderr, "Looped %i\n", count);
329 return 0;
331 default:
332 die_errno("recv");
335 ++count;
336 __handle_recv(err);
341 #define NFRAME_RESYNC 100
342 #define NFRAME_MAX (1 << (sizeof ( ((struct rawv_header *)0) ->nframe) *8))
344 void NetRx::__handle_recv(unsigned len)
346 struct rawv_header *h;
347 unsigned pixsize, fragsize, framesize, i;
348 int fragstride;
349 bool ok;
351 /* TODO die is not appropriate on RX path */
353 if (len < sizeof(h))
354 die("recv: len(pkt) < header (%i < %i)", len, sizeof(h));
356 h = (struct rawv_header *)&fragbuf[0];
357 #if __BYTE_ORDER != __BIG_ENDIAN
358 /* decode header coming from the wire
360 * we access header fields more than once, so it makes sense to first
361 * decode them all in one go and then use the result.
363 struct rawv_header __h;
365 # define ASSERT(cond) do { (void) sizeof(char [1 - 2*!(cond)]); } while (0)
366 # define H8(field) do { ASSERT(sizeof(h->field)==1); __h.field = h->field; } while (0)
367 # define H16(field) do { ASSERT(sizeof(h->field)==2); __h.field = ntohs(h->field); } while (0)
368 # define _32(field) do { ASSERT(sizeof(h->field)==4); __h.field = (h->field); } while (0)
370 _32 (magic); /* no need to change endiannes - characters go in network-order always */
371 H8 (version);
372 H8 (__reserved_for_flags);
373 H16 (nframe);
374 H16 (nfragment);
375 H16 (fragments_total);
376 H16 (width);
377 H16 (height);
378 _32 (pixfmt); /* no need to change endiannes - characters go in network-order always */
379 H16 (frag_startline);
380 H16 (frag_nlines);
381 /* NOTE: don't forget padding __reserved[2] */
383 #undef H8
384 #undef H16
385 #undef _32
387 h = &__h;
388 #endif /* __BYTE_ORDER != __BIG_ENDIAN */
391 if (h->magic != MKTAG32('R','A','W','V'))
392 die("wrong magic (0x%08x)", h->magic);
394 if (h->version != 1)
395 die("unsupported version (%i)", h->version);
397 /* ignoring flags for now */
399 // fprintf(stderr, "nframe: %i, frag: %i / %i\r", h->nframe, h->nfragment, h->fragments_total);
401 if (h->nframe != nframe) {
402 if (abs(h->nframe - nframe) > NFRAME_RESYNC) {
403 /* too big nframe jump means we have to re-sync this stream */
404 fprintf(stderr, "Resync...\n");
406 else {
407 if ((h->nframe - nframe) % NFRAME_MAX < NFRAME_MAX/2)
408 /* newer frame */;
409 else {
410 /* old-frame fragment. Too late, sorry */
411 fprintf(stderr, "old (nframe=%i frag: %i / %i)\n",
412 h->nframe, h->nfragment, h->fragments_total);
413 return;
416 /* start of new frame. We have to flush current one first */
417 if (f.pixfmt_4cc)
418 flush_frame();
420 //fprintf(stderr, "\n F-%i", h->nframe);
423 //fprintf(stderr, " %i", h->nfragment);
425 switch (h->pixfmt) {
426 case MKTAG32('Y','U','Y','V'):
427 case MKTAG32('Y','U','Y','2'):
428 pixsize = 2;
429 break;
431 case MKTAG32('Y','8','0','0'):
432 case MKTAG32('Y','8',' ',' '):
433 case MKTAG32('G','R','E','Y'):
434 pixsize = 1;
435 break;
437 default:
438 pixsize = 0; // make gcc happy
439 die("recv: W: unsupported pixfmt 0x%08x", h->pixfmt);
442 /* ensure we have enough data in payload */
443 fragsize = pixsize * h->width * h->frag_nlines;
444 if (len < sizeof(h) + fragsize)
445 die("recv: len(pkt) < header + fragsize (%i < %i + %i)",
446 len, sizeof(h), fragsize);
448 /* ensure header is self-consistent */
449 if (h->frag_startline + h->frag_nlines > h->height)
450 die("recv: frag_startline + frag_nlines > height (%i + %i > %i)",
451 h->frag_startline, h->frag_nlines, h->height);
453 if (!f.pixfmt_4cc) {
454 /* first fragment of a new frame */
455 f.pixfmt_4cc = h->pixfmt;
456 f.width = h->width;
457 f.height = h->height;
459 nframe = h->nframe;
460 nfragment = h->nfragment;
461 fragments_total = h->fragments_total;
463 f.sequence = nframe;
466 * upon receiving first fragment, setup frame buffer memory, either in
467 * one of subscribers or in our ->framebuf[].
469 f.start = NULL;
470 f.length = 0;
471 f.bytesperline = 0;
473 ok = query_sink_framebuf(&f);
476 * no one from subscriber provided frame buffer - we will use our own
477 * ->framebuf[]
479 if (!ok) {
480 /* reallocate framebuf if needed */
481 framesize = pixsize * h->width * h->height;
482 if (framebuf.size() < framesize)
483 framebuf.resize(framesize);
485 f.bytesperline = h->width * pixsize;
487 f.start = &framebuf[0];
488 f.length = f.height * f.bytesperline;
491 else {
492 /* check that frame parameters has not changed */
493 if ( !(fragments_total == h->fragments_total &&
494 f.pixfmt_4cc == h->pixfmt &&
495 f.width == h->width &&
496 f.height == h->height)
498 die("recv: frame params changed inside one frame: "
499 "(%iF 0x%08x %ix%i) and (%iF 0x%08x %ix%i)",
500 fragments_total, f.pixfmt_4cc, f.width, f.height,
501 h->fragments_total, h->pixfmt, h->width, h->height);
504 /* this fragment ok */
505 frag_received += 1;
508 /* put fragment video data into frame */
509 fragstride = h->width * pixsize;
510 if (f.bytesperline == fragstride) {
511 /* special case -- framebuf is packed, so we can use one large memcpy */
512 memcpy(f.start + h->frag_startline * f.bytesperline,
513 &fragbuf[0] + sizeof(*h),
514 fragsize);
516 else {
517 for (i=0; i<h->frag_nlines; ++i)
518 memcpy(f.start + (h->frag_startline + i) * f.bytesperline,
519 &fragbuf[0] + sizeof(*h) + i*fragstride,
520 fragstride);
525 /* whole-frame received detection.
527 * TODO this can be reworked so we also support intra-frame fragments *
528 * reordering on the wire.
530 if (h->frag_startline + h->frag_nlines == h->height)
531 flush_frame();
535 void NetRx::flush_frame()
537 unsigned dropped;
539 /* flush currently-accumulated frame to subscribers */
540 notify_v_subscribers(&f);
542 /* update loss statistic */
543 dropped = (fragments_total - frag_received);
544 frag_dropped_total += dropped;
545 if (dropped)
546 fprintf(stderr, "Dropped %u fragments\n", dropped);
548 /* bump sequence number */
549 nframe += 1;
550 nfragment = 0;
551 fragments_total = 0;
552 frag_received = 0;
554 /* and clear current frame */
555 f.start = NULL; /* XXX ok ? */
556 f.length = 0;
557 f.width = f.height = f.bytesperline = 0;
558 f.pixfmt_4cc = 0;
559 f.sequence = 0;
562 } // rawv::