fix EIT schedule signaling in SDT
[dvblast.git] / udp.c
blob7945fca5acba50e54cfa5dfa6eef775be894417c
1 /*****************************************************************************
2 * udp.c: UDP input for DVBlast
3 *****************************************************************************
4 * Copyright (C) 2009, 2015 VideoLAN
6 * Authors: Christophe Massiot <massiot@via.ecp.fr>
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
21 *****************************************************************************/
23 #include <stdlib.h>
24 #include <stdio.h>
25 #include <unistd.h>
26 #include <stdint.h>
27 #include <stdbool.h>
28 #include <string.h>
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <fcntl.h>
32 #include <sys/uio.h>
33 #include <sys/poll.h>
34 #include <sys/ioctl.h>
35 #include <sys/socket.h>
36 #include <netinet/in.h>
37 #include <net/if.h>
38 #include <arpa/inet.h>
39 #include <errno.h>
41 #include <ev.h>
43 #include <bitstream/common.h>
44 #include <bitstream/ietf/rtp.h>
46 #include "dvblast.h"
48 /*****************************************************************************
49 * Local declarations
50 *****************************************************************************/
51 #define UDP_LOCK_TIMEOUT 5000000 /* 5 s */
52 #define PRINT_REFRACTORY_PERIOD 1000000 /* 1 s */
54 static int i_handle;
55 static struct ev_io udp_watcher;
56 static struct ev_timer mute_watcher;
57 static bool b_udp = false;
58 static int i_block_cnt;
59 static uint8_t pi_ssrc[4] = { 0, 0, 0, 0 };
60 static uint16_t i_seqnum = 0;
61 static bool b_sync = false;
62 static mtime_t i_last_print = 0;
63 static struct sockaddr_storage last_addr;
65 /*****************************************************************************
66 * Local prototypes
67 *****************************************************************************/
68 static void udp_Read(struct ev_loop *loop, struct ev_io *w, int revents);
69 static void udp_MuteCb(struct ev_loop *loop, struct ev_timer *w, int revents);
71 /*****************************************************************************
72 * udp_Open
73 *****************************************************************************/
74 void udp_Open( void )
76 int i_family;
77 struct addrinfo *p_connect_ai = NULL, *p_bind_ai;
78 int i_if_index = 0;
79 in_addr_t i_if_addr = INADDR_ANY;
80 int i_mtu = 0;
81 char *psz_ifname = NULL;
83 char *psz_bind, *psz_string = strdup( psz_udp_src );
84 char *psz_save = psz_string;
85 int i = 1;
87 /* Parse configuration. */
89 if ( (psz_bind = strchr( psz_string, '@' )) != NULL )
91 *psz_bind++ = '\0';
92 p_connect_ai = ParseNodeService( psz_string, NULL, 0 );
94 else
95 psz_bind = psz_string;
97 p_bind_ai = ParseNodeService( psz_bind, &psz_string, DEFAULT_PORT );
98 if ( p_bind_ai == NULL )
100 msg_Err( NULL, "couldn't parse %s", psz_bind );
101 exit(EXIT_FAILURE);
103 i_family = p_bind_ai->ai_family;
105 if ( p_connect_ai != NULL && p_connect_ai->ai_family != i_family )
107 msg_Warn( NULL, "invalid connect address" );
108 freeaddrinfo( p_connect_ai );
109 p_connect_ai = NULL;
112 while ( (psz_string = strchr( psz_string, '/' )) != NULL )
114 *psz_string++ = '\0';
116 #define IS_OPTION( option ) (!strncasecmp( psz_string, option, strlen(option) ))
117 #define ARG_OPTION( option ) (psz_string + strlen(option))
119 if ( IS_OPTION("udp") )
120 b_udp = true;
121 else if ( IS_OPTION("mtu=") )
122 i_mtu = strtol( ARG_OPTION("mtu="), NULL, 0 );
123 else if ( IS_OPTION("ifindex=") )
124 i_if_index = strtol( ARG_OPTION("ifindex="), NULL, 0 );
125 else if ( IS_OPTION("ifaddr=") ) {
126 char *option = config_stropt( ARG_OPTION("ifaddr=") );
127 i_if_addr = inet_addr( option );
128 free( option );
130 else if ( IS_OPTION("ifname=") )
132 psz_ifname = config_stropt( ARG_OPTION("ifname=") );
133 if (strlen(psz_ifname) >= IFNAMSIZ) {
134 psz_ifname[IFNAMSIZ-1] = '\0';
136 } else
137 msg_Warn( NULL, "unrecognized option %s", psz_string );
139 #undef IS_OPTION
140 #undef ARG_OPTION
143 if ( !i_mtu )
144 i_mtu = i_family == AF_INET6 ? DEFAULT_IPV6_MTU : DEFAULT_IPV4_MTU;
145 i_block_cnt = (i_mtu - (b_udp ? 0 : RTP_HEADER_SIZE)) / TS_SIZE;
148 /* Do stuff. */
150 if ( (i_handle = socket( i_family, SOCK_DGRAM, IPPROTO_UDP )) < 0 )
152 msg_Err( NULL, "couldn't create socket (%s)", strerror(errno) );
153 exit(EXIT_FAILURE);
156 setsockopt( i_handle, SOL_SOCKET, SO_REUSEADDR, (void *) &i, sizeof( i ) );
158 /* Increase the receive buffer size to 1/2MB (8Mb/s during 1/2s) to avoid
159 * packet loss caused by scheduling problems */
160 i = 0x80000;
162 setsockopt( i_handle, SOL_SOCKET, SO_RCVBUF, (void *) &i, sizeof( i ) );
164 if ( bind( i_handle, p_bind_ai->ai_addr, p_bind_ai->ai_addrlen ) < 0 )
166 msg_Err( NULL, "couldn't bind (%s)", strerror(errno) );
167 close( i_handle );
168 exit(EXIT_FAILURE);
171 if ( p_connect_ai != NULL )
173 uint16_t i_port;
174 if ( i_family == AF_INET6 )
175 i_port = ((struct sockaddr_in6 *)p_connect_ai->ai_addr)->sin6_port;
176 else
177 i_port = ((struct sockaddr_in *)p_connect_ai->ai_addr)->sin_port;
179 if ( i_port != 0 && connect( i_handle, p_connect_ai->ai_addr,
180 p_connect_ai->ai_addrlen ) < 0 )
181 msg_Warn( NULL, "couldn't connect socket (%s)", strerror(errno) );
184 /* Join the multicast group if the socket is a multicast address */
185 if ( i_family == AF_INET6 )
187 struct sockaddr_in6 *p_addr =
188 (struct sockaddr_in6 *)p_bind_ai->ai_addr;
189 if ( IN6_IS_ADDR_MULTICAST( &p_addr->sin6_addr ) )
191 struct ipv6_mreq imr;
192 imr.ipv6mr_multiaddr = p_addr->sin6_addr;
193 imr.ipv6mr_interface = i_if_index;
194 if ( i_if_addr != INADDR_ANY )
195 msg_Warn( NULL, "ignoring ifaddr option in IPv6" );
197 if ( setsockopt( i_handle, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP,
198 (char *)&imr, sizeof(struct ipv6_mreq) ) < 0 )
199 msg_Warn( NULL, "couldn't join multicast group (%s)",
200 strerror(errno) );
203 else
205 struct sockaddr_in *p_addr =
206 (struct sockaddr_in *)p_bind_ai->ai_addr;
207 if ( IN_MULTICAST( ntohl(p_addr->sin_addr.s_addr)) )
209 if ( p_connect_ai != NULL )
211 #ifndef IP_ADD_SOURCE_MEMBERSHIP
212 msg_Err( NULL, "IP_ADD_SOURCE_MEMBERSHIP is unsupported." );
213 #else
214 /* Source-specific multicast */
215 struct sockaddr *p_src = p_connect_ai->ai_addr;
216 struct ip_mreq_source imr;
217 imr.imr_multiaddr = p_addr->sin_addr;
218 imr.imr_interface.s_addr = i_if_addr;
219 imr.imr_sourceaddr = ((struct sockaddr_in *)p_src)->sin_addr;
220 if ( i_if_index )
221 msg_Warn( NULL, "ignoring ifindex option in SSM" );
223 if ( setsockopt( i_handle, IPPROTO_IP, IP_ADD_SOURCE_MEMBERSHIP,
224 (char *)&imr, sizeof(struct ip_mreq_source) ) < 0 )
225 msg_Warn( NULL, "couldn't join multicast group (%s)",
226 strerror(errno) );
227 #endif
229 else if ( i_if_index )
231 /* Linux-specific interface-bound multicast */
232 struct ip_mreqn imr;
233 imr.imr_multiaddr = p_addr->sin_addr;
234 #if defined(__linux__)
235 imr.imr_address.s_addr = i_if_addr;
236 imr.imr_ifindex = i_if_index;
237 #endif
239 if ( setsockopt( i_handle, IPPROTO_IP, IP_ADD_MEMBERSHIP,
240 (char *)&imr, sizeof(struct ip_mreqn) ) < 0 )
241 msg_Warn( NULL, "couldn't join multicast group (%s)",
242 strerror(errno) );
244 else
246 /* Regular multicast */
247 struct ip_mreq imr;
248 imr.imr_multiaddr = p_addr->sin_addr;
249 imr.imr_interface.s_addr = i_if_addr;
251 if ( setsockopt( i_handle, IPPROTO_IP, IP_ADD_MEMBERSHIP,
252 (char *)&imr, sizeof(struct ip_mreq) ) == -1 )
253 msg_Warn( NULL, "couldn't join multicast group (%s)",
254 strerror(errno) );
256 #ifdef SO_BINDTODEVICE
257 if (psz_ifname) {
258 if ( setsockopt( i_handle, SOL_SOCKET, SO_BINDTODEVICE,
259 psz_ifname, strlen(psz_ifname)+1 ) < 0 ) {
260 msg_Err( NULL, "couldn't bind to device %s (%s)",
261 psz_ifname, strerror(errno) );
263 free(psz_ifname);
264 psz_ifname = NULL;
266 #endif
270 freeaddrinfo( p_bind_ai );
271 if ( p_connect_ai != NULL )
272 freeaddrinfo( p_connect_ai );
273 free( psz_save );
275 msg_Dbg( NULL, "binding socket to %s", psz_udp_src );
277 ev_io_init(&udp_watcher, udp_Read, i_handle, EV_READ);
278 ev_io_start(event_loop, &udp_watcher);
280 ev_timer_init(&mute_watcher, udp_MuteCb,
281 UDP_LOCK_TIMEOUT / 1000000., UDP_LOCK_TIMEOUT / 1000000.);
282 memset(&last_addr, 0, sizeof(last_addr));
285 /*****************************************************************************
286 * UDP events
287 *****************************************************************************/
288 static void udp_Read(struct ev_loop *loop, struct ev_io *w, int revents)
290 i_wallclock = mdate();
291 if ( i_last_print + PRINT_REFRACTORY_PERIOD < i_wallclock )
293 i_last_print = i_wallclock;
295 struct sockaddr_storage addr;
296 struct msghdr mh = {
297 .msg_name = &addr,
298 .msg_namelen = sizeof(addr),
299 .msg_iov = NULL,
300 .msg_iovlen = 0,
301 .msg_control = NULL,
302 .msg_controllen = 0,
303 .msg_flags = 0
305 if ( recvmsg( i_handle, &mh, MSG_DONTWAIT | MSG_PEEK ) != -1 &&
306 mh.msg_namelen >= sizeof(struct sockaddr) )
308 char psz_addr[256], psz_port[42];
309 if ( memcmp( &addr, &last_addr, mh.msg_namelen ) &&
310 getnameinfo( (const struct sockaddr *)&addr, mh.msg_namelen,
311 psz_addr, sizeof(psz_addr), psz_port, sizeof(psz_port),
312 NI_DGRAM | NI_NUMERICHOST | NI_NUMERICSERV ) == 0 )
314 memcpy( &last_addr, &addr, mh.msg_namelen );
316 msg_Info( NULL, "source: %s:%s", psz_addr, psz_port );
317 switch (i_print_type) {
318 case PRINT_XML:
319 fprintf(print_fh, "<STATUS type=\"source\" address=\"%s\" port=\"%s\"/>\n", psz_addr, psz_port);
320 break;
321 case PRINT_TEXT:
322 fprintf(print_fh, "source status: %s:%s\n", psz_addr, psz_port);
323 break;
324 default:
325 break;
331 struct iovec p_iov[i_block_cnt + 1];
332 block_t *p_ts, **pp_current = &p_ts;
333 int i_iov, i_block;
334 ssize_t i_len;
335 uint8_t p_rtp_hdr[RTP_HEADER_SIZE];
337 if ( !b_udp )
339 /* FIXME : this is wrong if RTP header > 12 bytes */
340 p_iov[0].iov_base = p_rtp_hdr;
341 p_iov[0].iov_len = RTP_HEADER_SIZE;
342 i_iov = 1;
344 else
345 i_iov = 0;
347 for ( i_block = 0; i_block < i_block_cnt; i_block++ )
349 *pp_current = block_New();
350 p_iov[i_iov].iov_base = (*pp_current)->p_ts;
351 p_iov[i_iov].iov_len = TS_SIZE;
352 pp_current = &(*pp_current)->p_next;
353 i_iov++;
355 pp_current = &p_ts;
357 if ( (i_len = readv( i_handle, p_iov, i_iov )) < 0 )
359 msg_Err( NULL, "couldn't read from network (%s)", strerror(errno) );
360 goto err;
363 if ( !b_udp )
365 uint8_t pi_new_ssrc[4];
367 if ( !rtp_check_hdr(p_rtp_hdr) )
368 msg_Warn( NULL, "invalid RTP packet received" );
369 if ( rtp_get_type(p_rtp_hdr) != RTP_TYPE_TS )
370 msg_Warn( NULL, "non-TS RTP packet received" );
371 rtp_get_ssrc(p_rtp_hdr, pi_new_ssrc);
372 if ( !memcmp( pi_ssrc, pi_new_ssrc, 4 * sizeof(uint8_t) ) )
374 if ( rtp_get_seqnum(p_rtp_hdr) != i_seqnum )
375 msg_Warn( NULL, "RTP discontinuity" );
377 else
379 struct in_addr addr;
380 memcpy( &addr.s_addr, pi_new_ssrc, 4 * sizeof(uint8_t) );
381 msg_Dbg( NULL, "new RTP source: %s", inet_ntoa( addr ) );
382 memcpy( pi_ssrc, pi_new_ssrc, 4 * sizeof(uint8_t) );
383 switch (i_print_type) {
384 case PRINT_XML:
385 fprintf(print_fh,
386 "<STATUS type=\"rtpsource\" source=\"%s\"/>\n",
387 inet_ntoa( addr ));
388 break;
389 case PRINT_TEXT:
390 fprintf(print_fh, "rtpsource: %s\n", inet_ntoa( addr ) );
391 break;
392 default:
393 break;
396 i_seqnum = rtp_get_seqnum(p_rtp_hdr) + 1;
398 i_len -= RTP_HEADER_SIZE;
401 i_len /= TS_SIZE;
403 if ( i_len )
405 if ( !b_sync )
407 msg_Info( NULL, "frontend has acquired lock" );
408 switch (i_print_type) {
409 case PRINT_XML:
410 fprintf(print_fh, "<STATUS type=\"lock\" status=\"1\"/>\n");
411 break;
412 case PRINT_TEXT:
413 fprintf(print_fh, "lock status: 1\n");
414 break;
415 default:
416 break;
419 b_sync = true;
422 ev_timer_again(loop, &mute_watcher);
425 while ( i_len && *pp_current )
427 pp_current = &(*pp_current)->p_next;
428 i_len--;
431 err:
432 block_DeleteChain( *pp_current );
433 *pp_current = NULL;
435 demux_Run( p_ts );
438 static void udp_MuteCb(struct ev_loop *loop, struct ev_timer *w, int revents)
440 msg_Warn( NULL, "frontend has lost lock" );
441 ev_timer_stop(loop, w);
443 switch (i_print_type) {
444 case PRINT_XML:
445 fprintf(print_fh, "<STATUS type=\"lock\" status=\"0\"/>\n");
446 break;
447 case PRINT_TEXT:
448 fprintf(print_fh, "lock status: 0\n" );
449 break;
450 default:
451 break;
455 /* From now on these are just stubs */
457 /*****************************************************************************
458 * udp_SetFilter
459 *****************************************************************************/
460 int udp_SetFilter( uint16_t i_pid )
462 return -1;
465 /*****************************************************************************
466 * udp_UnsetFilter: normally never called
467 *****************************************************************************/
468 void udp_UnsetFilter( int i_fd, uint16_t i_pid )
472 /*****************************************************************************
473 * udp_Reset:
474 *****************************************************************************/
475 void udp_Reset( void )