1 /*****************************************************************************
2 * udp.c: UDP input for DVBlast
3 *****************************************************************************
4 * Copyright (C) 2009, 2015 VideoLAN
6 * Authors: Christophe Massiot <massiot@via.ecp.fr>
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
21 *****************************************************************************/
29 #include <sys/types.h>
34 #include <sys/ioctl.h>
35 #include <sys/socket.h>
36 #include <netinet/in.h>
38 #include <arpa/inet.h>
43 #include <bitstream/common.h>
44 #include <bitstream/ietf/rtp.h>
48 /*****************************************************************************
50 *****************************************************************************/
51 #define UDP_LOCK_TIMEOUT 5000000 /* 5 s */
52 #define PRINT_REFRACTORY_PERIOD 1000000 /* 1 s */
55 static struct ev_io udp_watcher
;
56 static struct ev_timer mute_watcher
;
57 static bool b_udp
= false;
58 static int i_block_cnt
;
59 static uint8_t pi_ssrc
[4] = { 0, 0, 0, 0 };
60 static uint16_t i_seqnum
= 0;
61 static bool b_sync
= false;
62 static mtime_t i_last_print
= 0;
63 static struct sockaddr_storage last_addr
;
65 /*****************************************************************************
67 *****************************************************************************/
68 static void udp_Read(struct ev_loop
*loop
, struct ev_io
*w
, int revents
);
69 static void udp_MuteCb(struct ev_loop
*loop
, struct ev_timer
*w
, int revents
);
71 /*****************************************************************************
73 *****************************************************************************/
77 struct addrinfo
*p_connect_ai
= NULL
, *p_bind_ai
;
79 in_addr_t i_if_addr
= INADDR_ANY
;
81 char *psz_ifname
= NULL
;
83 char *psz_bind
, *psz_string
= strdup( psz_udp_src
);
84 char *psz_save
= psz_string
;
87 /* Parse configuration. */
89 if ( (psz_bind
= strchr( psz_string
, '@' )) != NULL
)
92 p_connect_ai
= ParseNodeService( psz_string
, NULL
, 0 );
95 psz_bind
= psz_string
;
97 p_bind_ai
= ParseNodeService( psz_bind
, &psz_string
, DEFAULT_PORT
);
98 if ( p_bind_ai
== NULL
)
100 msg_Err( NULL
, "couldn't parse %s", psz_bind
);
103 i_family
= p_bind_ai
->ai_family
;
105 if ( p_connect_ai
!= NULL
&& p_connect_ai
->ai_family
!= i_family
)
107 msg_Warn( NULL
, "invalid connect address" );
108 freeaddrinfo( p_connect_ai
);
112 while ( (psz_string
= strchr( psz_string
, '/' )) != NULL
)
114 *psz_string
++ = '\0';
116 #define IS_OPTION( option ) (!strncasecmp( psz_string, option, strlen(option) ))
117 #define ARG_OPTION( option ) (psz_string + strlen(option))
119 if ( IS_OPTION("udp") )
121 else if ( IS_OPTION("mtu=") )
122 i_mtu
= strtol( ARG_OPTION("mtu="), NULL
, 0 );
123 else if ( IS_OPTION("ifindex=") )
124 i_if_index
= strtol( ARG_OPTION("ifindex="), NULL
, 0 );
125 else if ( IS_OPTION("ifaddr=") ) {
126 char *option
= config_stropt( ARG_OPTION("ifaddr=") );
127 i_if_addr
= inet_addr( option
);
130 else if ( IS_OPTION("ifname=") )
132 psz_ifname
= config_stropt( ARG_OPTION("ifname=") );
133 if (strlen(psz_ifname
) >= IFNAMSIZ
) {
134 psz_ifname
[IFNAMSIZ
-1] = '\0';
137 msg_Warn( NULL
, "unrecognized option %s", psz_string
);
144 i_mtu
= i_family
== AF_INET6
? DEFAULT_IPV6_MTU
: DEFAULT_IPV4_MTU
;
145 i_block_cnt
= (i_mtu
- (b_udp
? 0 : RTP_HEADER_SIZE
)) / TS_SIZE
;
150 if ( (i_handle
= socket( i_family
, SOCK_DGRAM
, IPPROTO_UDP
)) < 0 )
152 msg_Err( NULL
, "couldn't create socket (%s)", strerror(errno
) );
156 setsockopt( i_handle
, SOL_SOCKET
, SO_REUSEADDR
, (void *) &i
, sizeof( i
) );
158 /* Increase the receive buffer size to 1/2MB (8Mb/s during 1/2s) to avoid
159 * packet loss caused by scheduling problems */
162 setsockopt( i_handle
, SOL_SOCKET
, SO_RCVBUF
, (void *) &i
, sizeof( i
) );
164 if ( bind( i_handle
, p_bind_ai
->ai_addr
, p_bind_ai
->ai_addrlen
) < 0 )
166 msg_Err( NULL
, "couldn't bind (%s)", strerror(errno
) );
171 if ( p_connect_ai
!= NULL
)
174 if ( i_family
== AF_INET6
)
175 i_port
= ((struct sockaddr_in6
*)p_connect_ai
->ai_addr
)->sin6_port
;
177 i_port
= ((struct sockaddr_in
*)p_connect_ai
->ai_addr
)->sin_port
;
179 if ( i_port
!= 0 && connect( i_handle
, p_connect_ai
->ai_addr
,
180 p_connect_ai
->ai_addrlen
) < 0 )
181 msg_Warn( NULL
, "couldn't connect socket (%s)", strerror(errno
) );
184 /* Join the multicast group if the socket is a multicast address */
185 if ( i_family
== AF_INET6
)
187 struct sockaddr_in6
*p_addr
=
188 (struct sockaddr_in6
*)p_bind_ai
->ai_addr
;
189 if ( IN6_IS_ADDR_MULTICAST( &p_addr
->sin6_addr
) )
191 struct ipv6_mreq imr
;
192 imr
.ipv6mr_multiaddr
= p_addr
->sin6_addr
;
193 imr
.ipv6mr_interface
= i_if_index
;
194 if ( i_if_addr
!= INADDR_ANY
)
195 msg_Warn( NULL
, "ignoring ifaddr option in IPv6" );
197 if ( setsockopt( i_handle
, IPPROTO_IPV6
, IPV6_ADD_MEMBERSHIP
,
198 (char *)&imr
, sizeof(struct ipv6_mreq
) ) < 0 )
199 msg_Warn( NULL
, "couldn't join multicast group (%s)",
205 struct sockaddr_in
*p_addr
=
206 (struct sockaddr_in
*)p_bind_ai
->ai_addr
;
207 if ( IN_MULTICAST( ntohl(p_addr
->sin_addr
.s_addr
)) )
209 if ( p_connect_ai
!= NULL
)
211 #ifndef IP_ADD_SOURCE_MEMBERSHIP
212 msg_Err( NULL
, "IP_ADD_SOURCE_MEMBERSHIP is unsupported." );
214 /* Source-specific multicast */
215 struct sockaddr
*p_src
= p_connect_ai
->ai_addr
;
216 struct ip_mreq_source imr
;
217 imr
.imr_multiaddr
= p_addr
->sin_addr
;
218 imr
.imr_interface
.s_addr
= i_if_addr
;
219 imr
.imr_sourceaddr
= ((struct sockaddr_in
*)p_src
)->sin_addr
;
221 msg_Warn( NULL
, "ignoring ifindex option in SSM" );
223 if ( setsockopt( i_handle
, IPPROTO_IP
, IP_ADD_SOURCE_MEMBERSHIP
,
224 (char *)&imr
, sizeof(struct ip_mreq_source
) ) < 0 )
225 msg_Warn( NULL
, "couldn't join multicast group (%s)",
229 else if ( i_if_index
)
231 /* Linux-specific interface-bound multicast */
233 imr
.imr_multiaddr
= p_addr
->sin_addr
;
234 #if defined(__linux__)
235 imr
.imr_address
.s_addr
= i_if_addr
;
236 imr
.imr_ifindex
= i_if_index
;
239 if ( setsockopt( i_handle
, IPPROTO_IP
, IP_ADD_MEMBERSHIP
,
240 (char *)&imr
, sizeof(struct ip_mreqn
) ) < 0 )
241 msg_Warn( NULL
, "couldn't join multicast group (%s)",
246 /* Regular multicast */
248 imr
.imr_multiaddr
= p_addr
->sin_addr
;
249 imr
.imr_interface
.s_addr
= i_if_addr
;
251 if ( setsockopt( i_handle
, IPPROTO_IP
, IP_ADD_MEMBERSHIP
,
252 (char *)&imr
, sizeof(struct ip_mreq
) ) == -1 )
253 msg_Warn( NULL
, "couldn't join multicast group (%s)",
256 #ifdef SO_BINDTODEVICE
258 if ( setsockopt( i_handle
, SOL_SOCKET
, SO_BINDTODEVICE
,
259 psz_ifname
, strlen(psz_ifname
)+1 ) < 0 ) {
260 msg_Err( NULL
, "couldn't bind to device %s (%s)",
261 psz_ifname
, strerror(errno
) );
270 freeaddrinfo( p_bind_ai
);
271 if ( p_connect_ai
!= NULL
)
272 freeaddrinfo( p_connect_ai
);
275 msg_Dbg( NULL
, "binding socket to %s", psz_udp_src
);
277 ev_io_init(&udp_watcher
, udp_Read
, i_handle
, EV_READ
);
278 ev_io_start(event_loop
, &udp_watcher
);
280 ev_timer_init(&mute_watcher
, udp_MuteCb
,
281 UDP_LOCK_TIMEOUT
/ 1000000., UDP_LOCK_TIMEOUT
/ 1000000.);
282 memset(&last_addr
, 0, sizeof(last_addr
));
285 /*****************************************************************************
287 *****************************************************************************/
288 static void udp_Read(struct ev_loop
*loop
, struct ev_io
*w
, int revents
)
290 i_wallclock
= mdate();
291 if ( i_last_print
+ PRINT_REFRACTORY_PERIOD
< i_wallclock
)
293 i_last_print
= i_wallclock
;
295 struct sockaddr_storage addr
;
298 .msg_namelen
= sizeof(addr
),
305 if ( recvmsg( i_handle
, &mh
, MSG_DONTWAIT
| MSG_PEEK
) != -1 &&
306 mh
.msg_namelen
>= sizeof(struct sockaddr
) )
308 char psz_addr
[256], psz_port
[42];
309 if ( memcmp( &addr
, &last_addr
, mh
.msg_namelen
) &&
310 getnameinfo( (const struct sockaddr
*)&addr
, mh
.msg_namelen
,
311 psz_addr
, sizeof(psz_addr
), psz_port
, sizeof(psz_port
),
312 NI_DGRAM
| NI_NUMERICHOST
| NI_NUMERICSERV
) == 0 )
314 memcpy( &last_addr
, &addr
, mh
.msg_namelen
);
316 msg_Info( NULL
, "source: %s:%s", psz_addr
, psz_port
);
317 switch (i_print_type
) {
319 fprintf(print_fh
, "<STATUS type=\"source\" address=\"%s\" port=\"%s\"/>\n", psz_addr
, psz_port
);
322 fprintf(print_fh
, "source status: %s:%s\n", psz_addr
, psz_port
);
331 struct iovec p_iov
[i_block_cnt
+ 1];
332 block_t
*p_ts
, **pp_current
= &p_ts
;
335 uint8_t p_rtp_hdr
[RTP_HEADER_SIZE
];
339 /* FIXME : this is wrong if RTP header > 12 bytes */
340 p_iov
[0].iov_base
= p_rtp_hdr
;
341 p_iov
[0].iov_len
= RTP_HEADER_SIZE
;
347 for ( i_block
= 0; i_block
< i_block_cnt
; i_block
++ )
349 *pp_current
= block_New();
350 p_iov
[i_iov
].iov_base
= (*pp_current
)->p_ts
;
351 p_iov
[i_iov
].iov_len
= TS_SIZE
;
352 pp_current
= &(*pp_current
)->p_next
;
357 if ( (i_len
= readv( i_handle
, p_iov
, i_iov
)) < 0 )
359 msg_Err( NULL
, "couldn't read from network (%s)", strerror(errno
) );
365 uint8_t pi_new_ssrc
[4];
367 if ( !rtp_check_hdr(p_rtp_hdr
) )
368 msg_Warn( NULL
, "invalid RTP packet received" );
369 if ( rtp_get_type(p_rtp_hdr
) != RTP_TYPE_TS
)
370 msg_Warn( NULL
, "non-TS RTP packet received" );
371 rtp_get_ssrc(p_rtp_hdr
, pi_new_ssrc
);
372 if ( !memcmp( pi_ssrc
, pi_new_ssrc
, 4 * sizeof(uint8_t) ) )
374 if ( rtp_get_seqnum(p_rtp_hdr
) != i_seqnum
)
375 msg_Warn( NULL
, "RTP discontinuity" );
380 memcpy( &addr
.s_addr
, pi_new_ssrc
, 4 * sizeof(uint8_t) );
381 msg_Dbg( NULL
, "new RTP source: %s", inet_ntoa( addr
) );
382 memcpy( pi_ssrc
, pi_new_ssrc
, 4 * sizeof(uint8_t) );
383 switch (i_print_type
) {
386 "<STATUS type=\"rtpsource\" source=\"%s\"/>\n",
390 fprintf(print_fh
, "rtpsource: %s\n", inet_ntoa( addr
) );
396 i_seqnum
= rtp_get_seqnum(p_rtp_hdr
) + 1;
398 i_len
-= RTP_HEADER_SIZE
;
407 msg_Info( NULL
, "frontend has acquired lock" );
408 switch (i_print_type
) {
410 fprintf(print_fh
, "<STATUS type=\"lock\" status=\"1\"/>\n");
413 fprintf(print_fh
, "lock status: 1\n");
422 ev_timer_again(loop
, &mute_watcher
);
425 while ( i_len
&& *pp_current
)
427 pp_current
= &(*pp_current
)->p_next
;
432 block_DeleteChain( *pp_current
);
438 static void udp_MuteCb(struct ev_loop
*loop
, struct ev_timer
*w
, int revents
)
440 msg_Warn( NULL
, "frontend has lost lock" );
441 ev_timer_stop(loop
, w
);
443 switch (i_print_type
) {
445 fprintf(print_fh
, "<STATUS type=\"lock\" status=\"0\"/>\n");
448 fprintf(print_fh
, "lock status: 0\n" );
455 /* From now on these are just stubs */
457 /*****************************************************************************
459 *****************************************************************************/
460 int udp_SetFilter( uint16_t i_pid
)
465 /*****************************************************************************
466 * udp_UnsetFilter: normally never called
467 *****************************************************************************/
468 void udp_UnsetFilter( int i_fd
, uint16_t i_pid
)
472 /*****************************************************************************
474 *****************************************************************************/
475 void udp_Reset( void )