Merge branch 'nto-signal-stats'
[dvblast.git] / udp.c
bloba603d1d1af0335eb6e75a084258aac3b40179aab
1 /*****************************************************************************
2 * udp.c: UDP input for DVBlast
3 *****************************************************************************
4 * Copyright (C) 2009, 2015, 2020 VideoLAN
6 * Authors: Christophe Massiot <massiot@via.ecp.fr>
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
21 *****************************************************************************/
23 #include <stdlib.h>
24 #include <stdio.h>
25 #include <unistd.h>
26 #include <stdint.h>
27 #include <stdbool.h>
28 #include <string.h>
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <fcntl.h>
32 #include <sys/uio.h>
33 #include <sys/poll.h>
34 #include <sys/ioctl.h>
35 #include <sys/socket.h>
36 #include <netinet/in.h>
37 #include <net/if.h>
38 #include <arpa/inet.h>
39 #include <errno.h>
41 #include <ev.h>
43 #include <bitstream/common.h>
44 #include <bitstream/ietf/rtp.h>
46 #include "dvblast.h"
48 /*****************************************************************************
49 * Local declarations
50 *****************************************************************************/
51 #define PRINT_REFRACTORY_PERIOD 1000000 /* 1 s */
53 static int i_handle;
54 static struct ev_io udp_watcher;
55 static struct ev_timer mute_watcher;
56 static bool b_udp = false;
57 static int i_block_cnt;
58 static uint8_t pi_ssrc[4] = { 0, 0, 0, 0 };
59 static uint16_t i_seqnum = 0;
60 static bool b_sync = false;
61 static mtime_t i_last_print = 0;
62 static struct sockaddr_storage last_addr;
64 /*****************************************************************************
65 * Local prototypes
66 *****************************************************************************/
67 static void udp_Read(struct ev_loop *loop, struct ev_io *w, int revents);
68 static void udp_MuteCb(struct ev_loop *loop, struct ev_timer *w, int revents);
70 /*****************************************************************************
71 * udp_Open
72 *****************************************************************************/
73 void udp_Open( void )
75 int i_family;
76 struct addrinfo *p_connect_ai = NULL, *p_bind_ai;
77 int i_if_index = 0;
78 in_addr_t i_if_addr = INADDR_ANY;
79 int i_mtu = 0;
80 char *psz_ifname = NULL;
82 char *psz_bind, *psz_string = strdup( psz_udp_src );
83 char *psz_save = psz_string;
84 int i = 1;
86 /* Parse configuration. */
88 if ( (psz_bind = strchr( psz_string, '@' )) != NULL )
90 *psz_bind++ = '\0';
91 p_connect_ai = ParseNodeService( psz_string, NULL, 0 );
93 else
94 psz_bind = psz_string;
96 p_bind_ai = ParseNodeService( psz_bind, &psz_string, DEFAULT_PORT );
97 if ( p_bind_ai == NULL )
99 msg_Err( NULL, "couldn't parse %s", psz_bind );
100 exit(EXIT_FAILURE);
102 i_family = p_bind_ai->ai_family;
104 if ( p_connect_ai != NULL && p_connect_ai->ai_family != i_family )
106 msg_Warn( NULL, "invalid connect address" );
107 freeaddrinfo( p_connect_ai );
108 p_connect_ai = NULL;
111 while ( (psz_string = strchr( psz_string, '/' )) != NULL )
113 *psz_string++ = '\0';
115 #define IS_OPTION( option ) (!strncasecmp( psz_string, option, strlen(option) ))
116 #define ARG_OPTION( option ) (psz_string + strlen(option))
118 if ( IS_OPTION("udp") )
119 b_udp = true;
120 else if ( IS_OPTION("mtu=") )
121 i_mtu = strtol( ARG_OPTION("mtu="), NULL, 0 );
122 else if ( IS_OPTION("ifindex=") )
123 i_if_index = strtol( ARG_OPTION("ifindex="), NULL, 0 );
124 else if ( IS_OPTION("ifaddr=") ) {
125 char *option = config_stropt( ARG_OPTION("ifaddr=") );
126 i_if_addr = inet_addr( option );
127 free( option );
129 else if ( IS_OPTION("ifname=") )
131 psz_ifname = config_stropt( ARG_OPTION("ifname=") );
132 if (strlen(psz_ifname) >= IFNAMSIZ) {
133 psz_ifname[IFNAMSIZ-1] = '\0';
135 } else
136 msg_Warn( NULL, "unrecognized option %s", psz_string );
138 #undef IS_OPTION
139 #undef ARG_OPTION
142 if ( !i_mtu )
143 i_mtu = i_family == AF_INET6 ? DEFAULT_IPV6_MTU : DEFAULT_IPV4_MTU;
144 i_block_cnt = (i_mtu - (b_udp ? 0 : RTP_HEADER_SIZE)) / TS_SIZE;
147 /* Do stuff. */
149 if ( (i_handle = socket( i_family, SOCK_DGRAM, IPPROTO_UDP )) < 0 )
151 msg_Err( NULL, "couldn't create socket (%s)", strerror(errno) );
152 exit(EXIT_FAILURE);
155 setsockopt( i_handle, SOL_SOCKET, SO_REUSEADDR, (void *) &i, sizeof( i ) );
157 /* Increase the receive buffer size to 1/2MB (8Mb/s during 1/2s) to avoid
158 * packet loss caused by scheduling problems */
159 i = 0x80000;
161 setsockopt( i_handle, SOL_SOCKET, SO_RCVBUF, (void *) &i, sizeof( i ) );
163 if ( bind( i_handle, p_bind_ai->ai_addr, p_bind_ai->ai_addrlen ) < 0 )
165 msg_Err( NULL, "couldn't bind (%s)", strerror(errno) );
166 close( i_handle );
167 exit(EXIT_FAILURE);
170 if ( p_connect_ai != NULL )
172 uint16_t i_port;
173 if ( i_family == AF_INET6 )
174 i_port = ((struct sockaddr_in6 *)p_connect_ai->ai_addr)->sin6_port;
175 else
176 i_port = ((struct sockaddr_in *)p_connect_ai->ai_addr)->sin_port;
178 if ( i_port != 0 && connect( i_handle, p_connect_ai->ai_addr,
179 p_connect_ai->ai_addrlen ) < 0 )
180 msg_Warn( NULL, "couldn't connect socket (%s)", strerror(errno) );
183 /* Join the multicast group if the socket is a multicast address */
184 if ( i_family == AF_INET6 )
186 struct sockaddr_in6 *p_addr =
187 (struct sockaddr_in6 *)p_bind_ai->ai_addr;
188 if ( IN6_IS_ADDR_MULTICAST( &p_addr->sin6_addr ) )
190 struct ipv6_mreq imr;
191 imr.ipv6mr_multiaddr = p_addr->sin6_addr;
192 imr.ipv6mr_interface = i_if_index;
193 if ( i_if_addr != INADDR_ANY )
194 msg_Warn( NULL, "ignoring ifaddr option in IPv6" );
196 if ( setsockopt( i_handle, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP,
197 (char *)&imr, sizeof(struct ipv6_mreq) ) < 0 )
198 msg_Warn( NULL, "couldn't join multicast group (%s)",
199 strerror(errno) );
202 else
204 struct sockaddr_in *p_addr =
205 (struct sockaddr_in *)p_bind_ai->ai_addr;
206 if ( IN_MULTICAST( ntohl(p_addr->sin_addr.s_addr)) )
208 if ( p_connect_ai != NULL )
210 #ifndef IP_ADD_SOURCE_MEMBERSHIP
211 msg_Err( NULL, "IP_ADD_SOURCE_MEMBERSHIP is unsupported." );
212 #else
213 /* Source-specific multicast */
214 struct sockaddr *p_src = p_connect_ai->ai_addr;
215 struct ip_mreq_source imr;
216 imr.imr_multiaddr = p_addr->sin_addr;
217 imr.imr_interface.s_addr = i_if_addr;
218 imr.imr_sourceaddr = ((struct sockaddr_in *)p_src)->sin_addr;
219 if ( i_if_index )
220 msg_Warn( NULL, "ignoring ifindex option in SSM" );
222 if ( setsockopt( i_handle, IPPROTO_IP, IP_ADD_SOURCE_MEMBERSHIP,
223 (char *)&imr, sizeof(struct ip_mreq_source) ) < 0 )
224 msg_Warn( NULL, "couldn't join multicast group (%s)",
225 strerror(errno) );
226 #endif
228 else if ( i_if_index )
230 /* Linux-specific interface-bound multicast */
231 struct ip_mreqn imr;
232 imr.imr_multiaddr = p_addr->sin_addr;
233 #if defined(__linux__)
234 imr.imr_address.s_addr = i_if_addr;
235 imr.imr_ifindex = i_if_index;
236 #endif
238 if ( setsockopt( i_handle, IPPROTO_IP, IP_ADD_MEMBERSHIP,
239 (char *)&imr, sizeof(struct ip_mreqn) ) < 0 )
240 msg_Warn( NULL, "couldn't join multicast group (%s)",
241 strerror(errno) );
243 else
245 /* Regular multicast */
246 struct ip_mreq imr;
247 imr.imr_multiaddr = p_addr->sin_addr;
248 imr.imr_interface.s_addr = i_if_addr;
250 if ( setsockopt( i_handle, IPPROTO_IP, IP_ADD_MEMBERSHIP,
251 (char *)&imr, sizeof(struct ip_mreq) ) == -1 )
252 msg_Warn( NULL, "couldn't join multicast group (%s)",
253 strerror(errno) );
255 #ifdef SO_BINDTODEVICE
256 if (psz_ifname) {
257 if ( setsockopt( i_handle, SOL_SOCKET, SO_BINDTODEVICE,
258 psz_ifname, strlen(psz_ifname)+1 ) < 0 ) {
259 msg_Err( NULL, "couldn't bind to device %s (%s)",
260 psz_ifname, strerror(errno) );
262 free(psz_ifname);
263 psz_ifname = NULL;
265 #endif
269 freeaddrinfo( p_bind_ai );
270 if ( p_connect_ai != NULL )
271 freeaddrinfo( p_connect_ai );
272 free( psz_save );
274 msg_Dbg( NULL, "binding socket to %s", psz_udp_src );
276 ev_io_init(&udp_watcher, udp_Read, i_handle, EV_READ);
277 ev_io_start(event_loop, &udp_watcher);
279 ev_timer_init(&mute_watcher, udp_MuteCb,
280 i_udp_lock_timeout / 1000000., i_udp_lock_timeout / 1000000.);
281 memset(&last_addr, 0, sizeof(last_addr));
284 /*****************************************************************************
285 * UDP events
286 *****************************************************************************/
287 static void udp_Read(struct ev_loop *loop, struct ev_io *w, int revents)
289 i_wallclock = mdate();
290 if ( i_last_print + PRINT_REFRACTORY_PERIOD < i_wallclock )
292 i_last_print = i_wallclock;
294 struct sockaddr_storage addr;
295 struct msghdr mh = {
296 .msg_name = &addr,
297 .msg_namelen = sizeof(addr),
298 .msg_iov = NULL,
299 .msg_iovlen = 0,
300 .msg_control = NULL,
301 .msg_controllen = 0,
302 .msg_flags = 0
304 if ( recvmsg( i_handle, &mh, MSG_DONTWAIT | MSG_PEEK ) != -1 &&
305 mh.msg_namelen >= sizeof(struct sockaddr) )
307 char psz_addr[256], psz_port[42];
308 if ( memcmp( &addr, &last_addr, mh.msg_namelen ) &&
309 getnameinfo( (const struct sockaddr *)&addr, mh.msg_namelen,
310 psz_addr, sizeof(psz_addr), psz_port, sizeof(psz_port),
311 NI_DGRAM | NI_NUMERICHOST | NI_NUMERICSERV ) == 0 )
313 memcpy( &last_addr, &addr, mh.msg_namelen );
315 msg_Info( NULL, "source: %s:%s", psz_addr, psz_port );
316 switch (i_print_type) {
317 case PRINT_XML:
318 fprintf(print_fh, "<STATUS type=\"source\" address=\"%s\" port=\"%s\"/>\n", psz_addr, psz_port);
319 break;
320 case PRINT_TEXT:
321 fprintf(print_fh, "source status: %s:%s\n", psz_addr, psz_port);
322 break;
323 default:
324 break;
330 struct iovec p_iov[i_block_cnt + 1];
331 block_t *p_ts, **pp_current = &p_ts;
332 int i_iov, i_block;
333 ssize_t i_len;
334 uint8_t p_rtp_hdr[RTP_HEADER_SIZE];
336 if ( !b_udp )
338 /* FIXME : this is wrong if RTP header > 12 bytes */
339 p_iov[0].iov_base = p_rtp_hdr;
340 p_iov[0].iov_len = RTP_HEADER_SIZE;
341 i_iov = 1;
343 else
344 i_iov = 0;
346 for ( i_block = 0; i_block < i_block_cnt; i_block++ )
348 *pp_current = block_New();
349 p_iov[i_iov].iov_base = (*pp_current)->p_ts;
350 p_iov[i_iov].iov_len = TS_SIZE;
351 pp_current = &(*pp_current)->p_next;
352 i_iov++;
354 pp_current = &p_ts;
356 if ( (i_len = readv( i_handle, p_iov, i_iov )) < 0 )
358 msg_Err( NULL, "couldn't read from network (%s)", strerror(errno) );
359 goto err;
362 if ( !b_udp )
364 uint8_t pi_new_ssrc[4];
366 if ( !rtp_check_hdr(p_rtp_hdr) )
367 msg_Warn( NULL, "invalid RTP packet received" );
368 if ( rtp_get_type(p_rtp_hdr) != RTP_TYPE_TS )
369 msg_Warn( NULL, "non-TS RTP packet received" );
370 rtp_get_ssrc(p_rtp_hdr, pi_new_ssrc);
371 if ( !memcmp( pi_ssrc, pi_new_ssrc, 4 * sizeof(uint8_t) ) )
373 if ( rtp_get_seqnum(p_rtp_hdr) != i_seqnum )
374 msg_Warn( NULL, "RTP discontinuity" );
376 else
378 struct in_addr addr;
379 memcpy( &addr.s_addr, pi_new_ssrc, 4 * sizeof(uint8_t) );
380 msg_Dbg( NULL, "new RTP source: %s", inet_ntoa( addr ) );
381 memcpy( pi_ssrc, pi_new_ssrc, 4 * sizeof(uint8_t) );
382 switch (i_print_type) {
383 case PRINT_XML:
384 fprintf(print_fh,
385 "<STATUS type=\"rtpsource\" source=\"%s\"/>\n",
386 inet_ntoa( addr ));
387 break;
388 case PRINT_TEXT:
389 fprintf(print_fh, "rtpsource: %s\n", inet_ntoa( addr ) );
390 break;
391 default:
392 break;
395 i_seqnum = rtp_get_seqnum(p_rtp_hdr) + 1;
397 i_len -= RTP_HEADER_SIZE;
400 i_len /= TS_SIZE;
402 if ( i_len )
404 if ( !b_sync )
406 msg_Info( NULL, "frontend has acquired lock" );
407 switch (i_print_type) {
408 case PRINT_XML:
409 fprintf(print_fh, "<STATUS type=\"lock\" status=\"1\"/>\n");
410 break;
411 case PRINT_TEXT:
412 fprintf(print_fh, "lock status: 1\n");
413 break;
414 default:
415 break;
418 b_sync = true;
421 ev_timer_again(loop, &mute_watcher);
424 while ( i_len && *pp_current )
426 pp_current = &(*pp_current)->p_next;
427 i_len--;
430 err:
431 block_DeleteChain( *pp_current );
432 *pp_current = NULL;
434 demux_Run( p_ts );
437 static void udp_MuteCb(struct ev_loop *loop, struct ev_timer *w, int revents)
439 msg_Warn( NULL, "frontend has lost lock" );
440 ev_timer_stop(loop, w);
442 switch (i_print_type) {
443 case PRINT_XML:
444 fprintf(print_fh, "<STATUS type=\"lock\" status=\"0\"/>\n");
445 break;
446 case PRINT_TEXT:
447 fprintf(print_fh, "lock status: 0\n" );
448 break;
449 default:
450 break;
453 b_sync = false;
456 /* From now on these are just stubs */
458 /*****************************************************************************
459 * udp_SetFilter
460 *****************************************************************************/
461 int udp_SetFilter( uint16_t i_pid )
463 return -1;
466 /*****************************************************************************
467 * udp_UnsetFilter: normally never called
468 *****************************************************************************/
469 void udp_UnsetFilter( int i_fd, uint16_t i_pid )
473 /*****************************************************************************
474 * udp_Reset:
475 *****************************************************************************/
476 void udp_Reset( void )