1 /*****************************************************************************
2 * udp.c: UDP input for DVBlast
3 *****************************************************************************
4 * Copyright (C) 2009, 2015, 2020 VideoLAN
6 * Authors: Christophe Massiot <massiot@via.ecp.fr>
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
21 *****************************************************************************/
29 #include <sys/types.h>
34 #include <sys/ioctl.h>
35 #include <sys/socket.h>
36 #include <netinet/in.h>
38 #include <arpa/inet.h>
43 #include <bitstream/common.h>
44 #include <bitstream/ietf/rtp.h>
48 /*****************************************************************************
50 *****************************************************************************/
51 #define PRINT_REFRACTORY_PERIOD 1000000 /* 1 s */
54 static struct ev_io udp_watcher
;
55 static struct ev_timer mute_watcher
;
56 static bool b_udp
= false;
57 static int i_block_cnt
;
58 static uint8_t pi_ssrc
[4] = { 0, 0, 0, 0 };
59 static uint16_t i_seqnum
= 0;
60 static bool b_sync
= false;
61 static mtime_t i_last_print
= 0;
62 static struct sockaddr_storage last_addr
;
64 /*****************************************************************************
66 *****************************************************************************/
67 static void udp_Read(struct ev_loop
*loop
, struct ev_io
*w
, int revents
);
68 static void udp_MuteCb(struct ev_loop
*loop
, struct ev_timer
*w
, int revents
);
70 /*****************************************************************************
72 *****************************************************************************/
76 struct addrinfo
*p_connect_ai
= NULL
, *p_bind_ai
;
78 in_addr_t i_if_addr
= INADDR_ANY
;
80 char *psz_ifname
= NULL
;
82 char *psz_bind
, *psz_string
= strdup( psz_udp_src
);
83 char *psz_save
= psz_string
;
86 /* Parse configuration. */
88 if ( (psz_bind
= strchr( psz_string
, '@' )) != NULL
)
91 p_connect_ai
= ParseNodeService( psz_string
, NULL
, 0 );
94 psz_bind
= psz_string
;
96 p_bind_ai
= ParseNodeService( psz_bind
, &psz_string
, DEFAULT_PORT
);
97 if ( p_bind_ai
== NULL
)
99 msg_Err( NULL
, "couldn't parse %s", psz_bind
);
102 i_family
= p_bind_ai
->ai_family
;
104 if ( p_connect_ai
!= NULL
&& p_connect_ai
->ai_family
!= i_family
)
106 msg_Warn( NULL
, "invalid connect address" );
107 freeaddrinfo( p_connect_ai
);
111 while ( (psz_string
= strchr( psz_string
, '/' )) != NULL
)
113 *psz_string
++ = '\0';
115 #define IS_OPTION( option ) (!strncasecmp( psz_string, option, strlen(option) ))
116 #define ARG_OPTION( option ) (psz_string + strlen(option))
118 if ( IS_OPTION("udp") )
120 else if ( IS_OPTION("mtu=") )
121 i_mtu
= strtol( ARG_OPTION("mtu="), NULL
, 0 );
122 else if ( IS_OPTION("ifindex=") )
123 i_if_index
= strtol( ARG_OPTION("ifindex="), NULL
, 0 );
124 else if ( IS_OPTION("ifaddr=") ) {
125 char *option
= config_stropt( ARG_OPTION("ifaddr=") );
126 i_if_addr
= inet_addr( option
);
129 else if ( IS_OPTION("ifname=") )
131 psz_ifname
= config_stropt( ARG_OPTION("ifname=") );
132 if (strlen(psz_ifname
) >= IFNAMSIZ
) {
133 psz_ifname
[IFNAMSIZ
-1] = '\0';
136 msg_Warn( NULL
, "unrecognized option %s", psz_string
);
143 i_mtu
= i_family
== AF_INET6
? DEFAULT_IPV6_MTU
: DEFAULT_IPV4_MTU
;
144 i_block_cnt
= (i_mtu
- (b_udp
? 0 : RTP_HEADER_SIZE
)) / TS_SIZE
;
149 if ( (i_handle
= socket( i_family
, SOCK_DGRAM
, IPPROTO_UDP
)) < 0 )
151 msg_Err( NULL
, "couldn't create socket (%s)", strerror(errno
) );
155 setsockopt( i_handle
, SOL_SOCKET
, SO_REUSEADDR
, (void *) &i
, sizeof( i
) );
157 /* Increase the receive buffer size to 1/2MB (8Mb/s during 1/2s) to avoid
158 * packet loss caused by scheduling problems */
161 setsockopt( i_handle
, SOL_SOCKET
, SO_RCVBUF
, (void *) &i
, sizeof( i
) );
163 if ( bind( i_handle
, p_bind_ai
->ai_addr
, p_bind_ai
->ai_addrlen
) < 0 )
165 msg_Err( NULL
, "couldn't bind (%s)", strerror(errno
) );
170 if ( p_connect_ai
!= NULL
)
173 if ( i_family
== AF_INET6
)
174 i_port
= ((struct sockaddr_in6
*)p_connect_ai
->ai_addr
)->sin6_port
;
176 i_port
= ((struct sockaddr_in
*)p_connect_ai
->ai_addr
)->sin_port
;
178 if ( i_port
!= 0 && connect( i_handle
, p_connect_ai
->ai_addr
,
179 p_connect_ai
->ai_addrlen
) < 0 )
180 msg_Warn( NULL
, "couldn't connect socket (%s)", strerror(errno
) );
183 /* Join the multicast group if the socket is a multicast address */
184 if ( i_family
== AF_INET6
)
186 struct sockaddr_in6
*p_addr
=
187 (struct sockaddr_in6
*)p_bind_ai
->ai_addr
;
188 if ( IN6_IS_ADDR_MULTICAST( &p_addr
->sin6_addr
) )
190 struct ipv6_mreq imr
;
191 imr
.ipv6mr_multiaddr
= p_addr
->sin6_addr
;
192 imr
.ipv6mr_interface
= i_if_index
;
193 if ( i_if_addr
!= INADDR_ANY
)
194 msg_Warn( NULL
, "ignoring ifaddr option in IPv6" );
196 if ( setsockopt( i_handle
, IPPROTO_IPV6
, IPV6_ADD_MEMBERSHIP
,
197 (char *)&imr
, sizeof(struct ipv6_mreq
) ) < 0 )
198 msg_Warn( NULL
, "couldn't join multicast group (%s)",
204 struct sockaddr_in
*p_addr
=
205 (struct sockaddr_in
*)p_bind_ai
->ai_addr
;
206 if ( IN_MULTICAST( ntohl(p_addr
->sin_addr
.s_addr
)) )
208 if ( p_connect_ai
!= NULL
)
210 #ifndef IP_ADD_SOURCE_MEMBERSHIP
211 msg_Err( NULL
, "IP_ADD_SOURCE_MEMBERSHIP is unsupported." );
213 /* Source-specific multicast */
214 struct sockaddr
*p_src
= p_connect_ai
->ai_addr
;
215 struct ip_mreq_source imr
;
216 imr
.imr_multiaddr
= p_addr
->sin_addr
;
217 imr
.imr_interface
.s_addr
= i_if_addr
;
218 imr
.imr_sourceaddr
= ((struct sockaddr_in
*)p_src
)->sin_addr
;
220 msg_Warn( NULL
, "ignoring ifindex option in SSM" );
222 if ( setsockopt( i_handle
, IPPROTO_IP
, IP_ADD_SOURCE_MEMBERSHIP
,
223 (char *)&imr
, sizeof(struct ip_mreq_source
) ) < 0 )
224 msg_Warn( NULL
, "couldn't join multicast group (%s)",
228 else if ( i_if_index
)
230 /* Linux-specific interface-bound multicast */
232 imr
.imr_multiaddr
= p_addr
->sin_addr
;
233 #if defined(__linux__)
234 imr
.imr_address
.s_addr
= i_if_addr
;
235 imr
.imr_ifindex
= i_if_index
;
238 if ( setsockopt( i_handle
, IPPROTO_IP
, IP_ADD_MEMBERSHIP
,
239 (char *)&imr
, sizeof(struct ip_mreqn
) ) < 0 )
240 msg_Warn( NULL
, "couldn't join multicast group (%s)",
245 /* Regular multicast */
247 imr
.imr_multiaddr
= p_addr
->sin_addr
;
248 imr
.imr_interface
.s_addr
= i_if_addr
;
250 if ( setsockopt( i_handle
, IPPROTO_IP
, IP_ADD_MEMBERSHIP
,
251 (char *)&imr
, sizeof(struct ip_mreq
) ) == -1 )
252 msg_Warn( NULL
, "couldn't join multicast group (%s)",
255 #ifdef SO_BINDTODEVICE
257 if ( setsockopt( i_handle
, SOL_SOCKET
, SO_BINDTODEVICE
,
258 psz_ifname
, strlen(psz_ifname
)+1 ) < 0 ) {
259 msg_Err( NULL
, "couldn't bind to device %s (%s)",
260 psz_ifname
, strerror(errno
) );
269 freeaddrinfo( p_bind_ai
);
270 if ( p_connect_ai
!= NULL
)
271 freeaddrinfo( p_connect_ai
);
274 msg_Dbg( NULL
, "binding socket to %s", psz_udp_src
);
276 ev_io_init(&udp_watcher
, udp_Read
, i_handle
, EV_READ
);
277 ev_io_start(event_loop
, &udp_watcher
);
279 ev_timer_init(&mute_watcher
, udp_MuteCb
,
280 i_udp_lock_timeout
/ 1000000., i_udp_lock_timeout
/ 1000000.);
281 memset(&last_addr
, 0, sizeof(last_addr
));
284 /*****************************************************************************
286 *****************************************************************************/
287 static void udp_Read(struct ev_loop
*loop
, struct ev_io
*w
, int revents
)
289 i_wallclock
= mdate();
290 if ( i_last_print
+ PRINT_REFRACTORY_PERIOD
< i_wallclock
)
292 i_last_print
= i_wallclock
;
294 struct sockaddr_storage addr
;
297 .msg_namelen
= sizeof(addr
),
304 if ( recvmsg( i_handle
, &mh
, MSG_DONTWAIT
| MSG_PEEK
) != -1 &&
305 mh
.msg_namelen
>= sizeof(struct sockaddr
) )
307 char psz_addr
[256], psz_port
[42];
308 if ( memcmp( &addr
, &last_addr
, mh
.msg_namelen
) &&
309 getnameinfo( (const struct sockaddr
*)&addr
, mh
.msg_namelen
,
310 psz_addr
, sizeof(psz_addr
), psz_port
, sizeof(psz_port
),
311 NI_DGRAM
| NI_NUMERICHOST
| NI_NUMERICSERV
) == 0 )
313 memcpy( &last_addr
, &addr
, mh
.msg_namelen
);
315 msg_Info( NULL
, "source: %s:%s", psz_addr
, psz_port
);
316 switch (i_print_type
) {
318 fprintf(print_fh
, "<STATUS type=\"source\" address=\"%s\" port=\"%s\"/>\n", psz_addr
, psz_port
);
321 fprintf(print_fh
, "source status: %s:%s\n", psz_addr
, psz_port
);
330 struct iovec p_iov
[i_block_cnt
+ 1];
331 block_t
*p_ts
, **pp_current
= &p_ts
;
334 uint8_t p_rtp_hdr
[RTP_HEADER_SIZE
];
338 /* FIXME : this is wrong if RTP header > 12 bytes */
339 p_iov
[0].iov_base
= p_rtp_hdr
;
340 p_iov
[0].iov_len
= RTP_HEADER_SIZE
;
346 for ( i_block
= 0; i_block
< i_block_cnt
; i_block
++ )
348 *pp_current
= block_New();
349 p_iov
[i_iov
].iov_base
= (*pp_current
)->p_ts
;
350 p_iov
[i_iov
].iov_len
= TS_SIZE
;
351 pp_current
= &(*pp_current
)->p_next
;
356 if ( (i_len
= readv( i_handle
, p_iov
, i_iov
)) < 0 )
358 msg_Err( NULL
, "couldn't read from network (%s)", strerror(errno
) );
364 uint8_t pi_new_ssrc
[4];
366 if ( !rtp_check_hdr(p_rtp_hdr
) )
367 msg_Warn( NULL
, "invalid RTP packet received" );
368 if ( rtp_get_type(p_rtp_hdr
) != RTP_TYPE_TS
)
369 msg_Warn( NULL
, "non-TS RTP packet received" );
370 rtp_get_ssrc(p_rtp_hdr
, pi_new_ssrc
);
371 if ( !memcmp( pi_ssrc
, pi_new_ssrc
, 4 * sizeof(uint8_t) ) )
373 if ( rtp_get_seqnum(p_rtp_hdr
) != i_seqnum
)
374 msg_Warn( NULL
, "RTP discontinuity" );
379 memcpy( &addr
.s_addr
, pi_new_ssrc
, 4 * sizeof(uint8_t) );
380 msg_Dbg( NULL
, "new RTP source: %s", inet_ntoa( addr
) );
381 memcpy( pi_ssrc
, pi_new_ssrc
, 4 * sizeof(uint8_t) );
382 switch (i_print_type
) {
385 "<STATUS type=\"rtpsource\" source=\"%s\"/>\n",
389 fprintf(print_fh
, "rtpsource: %s\n", inet_ntoa( addr
) );
395 i_seqnum
= rtp_get_seqnum(p_rtp_hdr
) + 1;
397 i_len
-= RTP_HEADER_SIZE
;
406 msg_Info( NULL
, "frontend has acquired lock" );
407 switch (i_print_type
) {
409 fprintf(print_fh
, "<STATUS type=\"lock\" status=\"1\"/>\n");
412 fprintf(print_fh
, "lock status: 1\n");
421 ev_timer_again(loop
, &mute_watcher
);
424 while ( i_len
&& *pp_current
)
426 pp_current
= &(*pp_current
)->p_next
;
431 block_DeleteChain( *pp_current
);
437 static void udp_MuteCb(struct ev_loop
*loop
, struct ev_timer
*w
, int revents
)
439 msg_Warn( NULL
, "frontend has lost lock" );
440 ev_timer_stop(loop
, w
);
442 switch (i_print_type
) {
444 fprintf(print_fh
, "<STATUS type=\"lock\" status=\"0\"/>\n");
447 fprintf(print_fh
, "lock status: 0\n" );
456 /* From now on these are just stubs */
458 /*****************************************************************************
460 *****************************************************************************/
461 int udp_SetFilter( uint16_t i_pid
)
466 /*****************************************************************************
467 * udp_UnsetFilter: normally never called
468 *****************************************************************************/
469 void udp_UnsetFilter( int i_fd
, uint16_t i_pid
)
473 /*****************************************************************************
475 *****************************************************************************/
476 void udp_Reset( void )