1 /* network submodule for rawv
2 * Copyright (C) 2010,2011,2012 Kirill Smelkov <kirr@navytux.spb.ru>
3 * Copyright (C) 2011 Marine Bridge and Navigation Systems (http://mns.spb.ru/)
5 * This library is free software: you can Use, Study, Modify and Redistribute
6 * it under the terms of the GNU Lesser General Public License version 2.1, or
7 * any later version. This library is distributed WITHOUT ANY WARRANTY. See
8 * COPYING.LIB file for full License terms.
24 /******** NetTx ********/
26 /** whether an `addr` is IPv4 multicast */
27 int in_multicast(const struct sockaddr_in
*addr
)
29 /* class D 224.0.0.0 - 239.255.255.255 */
30 return ((ntohl(addr
->sin_addr
.s_addr
) & 0xf0000000) == 0xe0000000);
33 /** whether an `addr` is IPv4 broadcast
35 * http://en.wikipedia.org/wiki/IPv4_subnetting_reference
37 int in_broadcast(const struct sockaddr_in
*addr
)
39 unsigned long ip
= ntohl(addr
->sin_addr
.s_addr
);
41 /* class A 0.0.0.0 - 127.255.255.255 /8 */
42 if ((ip
& 0x80000000) == 0x00000000)
43 return (ip
& 0x00ffffff) == 0x00ffffff;
45 /* class B 128.0.0.0 - 191.255.255.255 /16 */
46 if ((ip
& 0xc0000000) == 0x80000000)
47 return (ip
& 0x0000ffff) == 0x0000ffff;
49 /* class C 192.0.0.0 - 223.255.255.255 /24 */
50 if ((ip
& 0xe0000000) == 0xc0000000)
51 return (ip
& 0x000000ff) == 0x000000ff;
57 NetTx::NetTx(const char *dest
, int port
, int mtu
)
59 int one
=1; /* XXX has to be uint8_t on win32 */
61 sk
= socket(PF_INET
, SOCK_DGRAM
, 0);
65 tx_addr
.sin_family
= AF_INET
;
66 tx_addr
.sin_port
= htons(port
);
67 tx_addr
.sin_addr
.s_addr
= inet_addr(dest
);
68 if (tx_addr
.sin_addr
.s_addr
== INADDR_NONE
)
69 die("E: inet_addr(%s) fail", dest
);
71 if (in_broadcast(&tx_addr
)) {
72 fprintf(stderr
, "tx: broadcasting...\n");
74 if (setsockopt(sk
, SOL_SOCKET
, SO_BROADCAST
, &one
, sizeof(one
)) == -1)
75 die_errno("setsockopt(SO_BROADCAST)");
78 if (in_multicast(&tx_addr
)) {
79 fprintf(stderr
, "tx: multicasting...\n");
81 if (setsockopt(sk
, IPPROTO_IP
, IP_MULTICAST_TTL
, &one
, sizeof(one
)) == -1)
82 die_errno("setsockopt(IP_MULTICAST_TTL)");
84 if (setsockopt(sk
, IPPROTO_IP
, IP_MULTICAST_LOOP
, &one
, sizeof(one
)) == -1)
85 die_errno("setsockopt(IP_MULTICAST_LOOP)");
98 bool NetTx::v_query_framebuf(Frame
*f
)
100 /* always use source's buffer - we'll transmit frames directly from it */
105 /* upon receiving new frame tx it to network */
106 void NetTx::v_on_frame(const Frame
*f
)
108 #define RAWV_MAX_FRAG_NLINES 16 /* there is no limit, it's just me being lazy */
109 struct iovec iov
[1+RAWV_MAX_FRAG_NLINES
]; /* header + payload */
113 int pixsize
, frag_nlines
, nfragment
, line
, i
;
114 int fheight
= f
->height
;
115 int bt
= f
->interlace_tb_swapped
;
117 struct rawv_header h
;
119 switch (f
->pixfmt_4cc
) {
120 case MKTAG32('Y','U','Y','V'):
121 case MKTAG32('Y','U','Y','2'):
128 die("tx_frame: don't know pixfmt 0x%08x", f
->pixfmt_4cc
);
131 frag_nlines
= (mtu
- sizeof(struct rawv_header
)) / (f
->width
* pixsize
);
134 die("TODO bt=-1 in NetTx");
136 /* make frag_nlines and fheight even, if lines are swapped in captured frame */
141 if (frag_nlines
> RAWV_MAX_FRAG_NLINES
)
142 die("FIXME frag_nlines > RAWV_MAX_FRAG_NLINES (%i > %i)", frag_nlines
, RAWV_MAX_FRAG_NLINES
);
146 for (line
=0; line
<fheight
; line
+= frag_nlines
, nfragment
++) {
147 h
.magic
= MKTAG32('R','A','W','V');
149 h
.__reserved_for_flags
= 0x00;
150 h
.nframe
= htons(f
->sequence
);
151 h
.nfragment
= htons(nfragment
);
152 h
.fragments_total
= htons((fheight
+ frag_nlines
-1) / frag_nlines
);
153 h
.width
= htons(f
->width
);
154 h
.height
= htons(fheight
);
155 h
.pixfmt
= f
->pixfmt_4cc
;
157 h
.frag_startline
= htons(line
);
158 h
.frag_nlines
= htons(min(frag_nlines
, fheight
- line
));
160 /* temp: helps debugging bitstream on the wire */
161 h
.__reserved
[0] = 0xaaaaaaaa;
162 h
.__reserved
[1] = 0xffffffff;
164 iov
[0].iov_base
= &h
;
165 iov
[0].iov_len
= sizeof(h
);
168 if ((f
->width
* pixsize
== f
->bytesperline
) &&
170 /* fastpath for stride=width, progressive */
171 iov
[iovlen
].iov_base
= f
->start
+ line
*f
->bytesperline
;
172 iov
[iovlen
].iov_len
= frag_nlines
*f
->bytesperline
;
177 /* strided and/or top/bottom swapped frame */
178 for (i
=0; i
<frag_nlines
; ++i
) {
179 iov
[iovlen
].iov_base
= f
->start
+ (line
+i
+bt
)*f
->bytesperline
;
180 iov
[iovlen
].iov_len
= f
->width
* pixsize
;
187 msg
.msg_name
= &tx_addr
;
188 msg
.msg_namelen
= sizeof(tx_addr
);
191 msg
.msg_iovlen
= iovlen
;
193 msg
.msg_control
= NULL
;
194 msg
.msg_controllen
= 0;
197 /* XXX what to do if not whole pkt sent? */
198 if (-1 == sendmsg(sk
, &msg
, 0 /*flags*/))
199 die_errno("sendmsg");
205 /******** NetRx ********/
207 /* recommended minimum for socket rx buffer */
208 #define SK_RCVBUF_MIN (1024*1024)
210 NetRx::NetRx(const char *mcast_group
, int port
, int mtu
)
212 int one
=1; /* XXX has to be uint8_t on win32 */
214 socklen_t rcvbuf_len
;
215 struct ip_mreqn mreq
;
217 sk
= socket(PF_INET
, SOCK_DGRAM
, 0);
221 /* ensure rx buffer is big enough */
222 rcvbuf_len
= sizeof(rcvbuf
);
223 if (-1 == getsockopt(sk
, SOL_SOCKET
, SO_RCVBUF
, &rcvbuf
, &rcvbuf_len
))
224 die_errno("getsockopt(SO_RCVBUF)");
226 if (rcvbuf
< SK_RCVBUF_MIN
) {
227 rcvbuf
= SK_RCVBUF_MIN
;
229 if (-1 == setsockopt(sk
, SOL_SOCKET
, SO_RCVBUF
, &rcvbuf
, sizeof(rcvbuf
)))
230 warn("W: can't increase sk rx buffer to %i bytes\n", rcvbuf
);
232 rcvbuf_len
= sizeof(rcvbuf
);
233 if (-1 == getsockopt(sk
, SOL_SOCKET
, SO_RCVBUF
, &rcvbuf
, &rcvbuf_len
))
234 die_errno("getsockopt(SO_RCVBUF)");
236 if (rcvbuf
< SK_RCVBUF_MIN
)
237 warn("W: can't increase sk rx buffer to %i bytes (got only %i)\n",
238 SK_RCVBUF_MIN
, rcvbuf
);
242 /* allow multiple receivers */
243 if (-1 == setsockopt(sk
, SOL_SOCKET
, SO_REUSEADDR
, &one
, sizeof(one
)))
244 die_errno("setsockopt(SO_REUSEADDR)");
246 rx_addr
.sin_family
= AF_INET
;
247 rx_addr
.sin_port
= htons(port
);
248 rx_addr
.sin_addr
.s_addr
= INADDR_ANY
;
250 if (-1 == bind(sk
, (sockaddr
*)&rx_addr
, sizeof(rx_addr
)))
253 // XXX is_broadcast ?
254 if (mcast_group
) { // XXX or is_multicast(mcast_group) ?
255 mreq
.imr_multiaddr
.s_addr
= inet_addr(mcast_group
);
256 mreq
.imr_address
.s_addr
= INADDR_ANY
;
257 mreq
.imr_ifindex
= 0; /* any */
259 if (mreq
.imr_multiaddr
.s_addr
== INADDR_NONE
)
260 die("E: inet_addr(%s) fail", mcast_group
);
262 if (-1 == setsockopt(sk
, IPPROTO_IP
, IP_ADD_MEMBERSHIP
, &mreq
, sizeof(mreq
)))
263 die_errno("setsockopt(IP_ADD_MEMBERSHIP)");
267 /* set socket non-blocking, so that we could avoid tons of select and just
268 * loop on read until EAGAIN
270 * FIXME does not work as expected - read comments below near recv()
272 if (-1 == fcntl(sk
, F_SETFL
, O_NONBLOCK
))
273 die_errno("fcntl(F_SETFL, O_NONBLOCK)");
280 f
.width
= f
.height
= f
.bytesperline
= 0;
283 f
.interlace_tb_swapped
= 0; /* always progressive on the wire */
285 nframe
= 0; // XXX better -1 ?
289 frag_dropped_total
= 0;
295 close(sk
); /* this will also leave multicast group */
299 void NetRx::v_start_capture()
301 die("TODO NetRx::v_start_capture()");
305 void NetRx::v_stop_capture()
307 die("TODO NetRx::v_stop_capture()");
311 int NetRx::handle_recv()
316 /* loop till we get EAGAIN. This way we avoid lots of unneeded select
319 * FIXME does not work as expected: Under load we seldomly fetch more than
320 * a couple of packets in one burst, and so go back to select, back to
321 * here, back to select, back here, etc...
324 err
= recv(sk
, &fragbuf
[0], fragbuf
.size(), /*flags*/ 0);
328 //fprintf(stderr, "Looped %i\n", count);
341 #define NFRAME_RESYNC 100
342 #define NFRAME_MAX (1 << (sizeof ( ((struct rawv_header *)0) ->nframe) *8))
344 void NetRx::__handle_recv(unsigned len
)
346 struct rawv_header
*h
;
347 unsigned pixsize
, fragsize
, framesize
, i
;
351 /* TODO die is not appropriate on RX path */
354 die("recv: len(pkt) < header (%i < %i)", len
, sizeof(h
));
356 h
= (struct rawv_header
*)&fragbuf
[0];
357 #if __BYTE_ORDER != __BIG_ENDIAN
358 /* decode header coming from the wire
360 * we access header fields more than once, so it makes sense to first
361 * decode them all in one go and then use the result.
363 struct rawv_header __h
;
365 # define ASSERT(cond) do { (void) sizeof(char [1 - 2*!(cond)]); } while (0)
366 # define H8(field) do { ASSERT(sizeof(h->field)==1); __h.field = h->field; } while (0)
367 # define H16(field) do { ASSERT(sizeof(h->field)==2); __h.field = ntohs(h->field); } while (0)
368 # define _32(field) do { ASSERT(sizeof(h->field)==4); __h.field = (h->field); } while (0)
370 _32 (magic
); /* no need to change endiannes - characters go in network-order always */
372 H8 (__reserved_for_flags
);
375 H16 (fragments_total
);
378 _32 (pixfmt
); /* no need to change endiannes - characters go in network-order always */
379 H16 (frag_startline
);
381 /* NOTE: don't forget padding __reserved[2] */
388 #endif /* __BYTE_ORDER != __BIG_ENDIAN */
391 if (h
->magic
!= MKTAG32('R','A','W','V'))
392 die("wrong magic (0x%08x)", h
->magic
);
395 die("unsupported version (%i)", h
->version
);
397 /* ignoring flags for now */
399 // fprintf(stderr, "nframe: %i, frag: %i / %i\r", h->nframe, h->nfragment, h->fragments_total);
401 if (h
->nframe
!= nframe
) {
402 if (abs(h
->nframe
- nframe
) > NFRAME_RESYNC
) {
403 /* too big nframe jump means we have to re-sync this stream */
404 fprintf(stderr
, "Resync...\n");
407 if ((h
->nframe
- nframe
) % NFRAME_MAX
< NFRAME_MAX
/2)
410 /* old-frame fragment. Too late, sorry */
411 fprintf(stderr
, "old (nframe=%i frag: %i / %i)\n",
412 h
->nframe
, h
->nfragment
, h
->fragments_total
);
416 /* start of new frame. We have to flush current one first */
420 //fprintf(stderr, "\n F-%i", h->nframe);
423 //fprintf(stderr, " %i", h->nfragment);
426 case MKTAG32('Y','U','Y','V'):
427 case MKTAG32('Y','U','Y','2'):
431 case MKTAG32('Y','8','0','0'):
432 case MKTAG32('Y','8',' ',' '):
433 case MKTAG32('G','R','E','Y'):
438 pixsize
= 0; // make gcc happy
439 die("recv: W: unsupported pixfmt 0x%08x", h
->pixfmt
);
442 /* ensure we have enough data in payload */
443 fragsize
= pixsize
* h
->width
* h
->frag_nlines
;
444 if (len
< sizeof(h
) + fragsize
)
445 die("recv: len(pkt) < header + fragsize (%i < %i + %i)",
446 len
, sizeof(h
), fragsize
);
448 /* ensure header is self-consistent */
449 if (h
->frag_startline
+ h
->frag_nlines
> h
->height
)
450 die("recv: frag_startline + frag_nlines > height (%i + %i > %i)",
451 h
->frag_startline
, h
->frag_nlines
, h
->height
);
454 /* first fragment of a new frame */
455 f
.pixfmt_4cc
= h
->pixfmt
;
457 f
.height
= h
->height
;
460 nfragment
= h
->nfragment
;
461 fragments_total
= h
->fragments_total
;
466 * upon receiving first fragment, setup frame buffer memory, either in
467 * one of subscribers or in our ->framebuf[].
473 ok
= query_sink_framebuf(&f
);
476 * no one from subscriber provided frame buffer - we will use our own
480 /* reallocate framebuf if needed */
481 framesize
= pixsize
* h
->width
* h
->height
;
482 if (framebuf
.size() < framesize
)
483 framebuf
.resize(framesize
);
485 f
.bytesperline
= h
->width
* pixsize
;
487 f
.start
= &framebuf
[0];
488 f
.length
= f
.height
* f
.bytesperline
;
492 /* check that frame parameters has not changed */
493 if ( !(fragments_total
== h
->fragments_total
&&
494 f
.pixfmt_4cc
== h
->pixfmt
&&
495 f
.width
== h
->width
&&
496 f
.height
== h
->height
)
498 die("recv: frame params changed inside one frame: "
499 "(%iF 0x%08x %ix%i) and (%iF 0x%08x %ix%i)",
500 fragments_total
, f
.pixfmt_4cc
, f
.width
, f
.height
,
501 h
->fragments_total
, h
->pixfmt
, h
->width
, h
->height
);
504 /* this fragment ok */
508 /* put fragment video data into frame */
509 fragstride
= h
->width
* pixsize
;
510 if (f
.bytesperline
== fragstride
) {
511 /* special case -- framebuf is packed, so we can use one large memcpy */
512 memcpy(f
.start
+ h
->frag_startline
* f
.bytesperline
,
513 &fragbuf
[0] + sizeof(*h
),
517 for (i
=0; i
<h
->frag_nlines
; ++i
)
518 memcpy(f
.start
+ (h
->frag_startline
+ i
) * f
.bytesperline
,
519 &fragbuf
[0] + sizeof(*h
) + i
*fragstride
,
525 /* whole-frame received detection.
527 * TODO this can be reworked so we also support intra-frame fragments *
528 * reordering on the wire.
530 if (h
->frag_startline
+ h
->frag_nlines
== h
->height
)
535 void NetRx::flush_frame()
539 /* flush currently-accumulated frame to subscribers */
540 notify_v_subscribers(&f
);
542 /* update loss statistic */
543 dropped
= (fragments_total
- frag_received
);
544 frag_dropped_total
+= dropped
;
546 fprintf(stderr
, "Dropped %u fragments\n", dropped
);
548 /* bump sequence number */
554 /* and clear current frame */
555 f
.start
= NULL
; /* XXX ok ? */
557 f
.width
= f
.height
= f
.bytesperline
= 0;