2 * transsip - the telephony network
3 * By Daniel Borkmann <daniel@transsip.org>
4 * Copyright 2011, 2012 Daniel Borkmann <dborkma@tik.ee.ethz.ch>,
5 * Swiss federal institute of technology (ETH Zurich)
6 * Subject to the GPL, version 2.
16 #include <celt/celt.h>
17 #include <speex/speex_jitter.h>
18 #include <speex/speex_echo.h>
19 #include <netinet/in.h>
20 #include <arpa/inet.h>
29 #include "call-notifier.h"
31 #define SAMPLING_RATE 48000
32 #define FRAME_SIZE 256
40 __extension__
uint8_t est
:1,
45 } __attribute__((packed
));
47 enum engine_state_num
{
48 ENGINE_STATE_IDLE
= CALL_STATE_MACHINE_IDLE
,
49 ENGINE_STATE_CALLOUT
= CALL_STATE_MACHINE_CALLOUT
,
50 ENGINE_STATE_CALLIN
= CALL_STATE_MACHINE_CALLIN
,
51 ENGINE_STATE_SPEAKING
= CALL_STATE_MACHINE_SPEAKING
,
55 enum engine_sound_type
{
56 ENGINE_SOUND_DIAL
= 0,
62 volatile enum engine_state_num state
;
63 enum engine_state_num (*process
)(int, int *, int, int,
67 #define STATE_MAP_SET(s, f) { \
72 extern sig_atomic_t quit
;
74 sig_atomic_t stun_done
= 0;
76 static char *alsadev
= "plughw:0,0"; //XXX
77 static char *port
= "30111"; //XXX
86 static struct engine_curr ecurr
;
88 static void engine_play_file(struct alsa_dev
*dev
, enum engine_sound_type type
)
92 short pcm
[FRAME_SIZE
* CHANNELS
];
93 struct pollfd
*pfds
= NULL
;
95 memset(path
, 0, sizeof(path
));
97 case ENGINE_SOUND_DIAL
:
98 slprintf(path
, sizeof(path
), "%s/%s", FILE_ETCDIR
, FILE_DIAL
);
100 case ENGINE_SOUND_BUSY
:
101 slprintf(path
, sizeof(path
), "%s/%s", FILE_ETCDIR
, FILE_BUSY
);
103 case ENGINE_SOUND_RING
:
104 slprintf(path
, sizeof(path
), "%s/%s", FILE_ETCDIR
, FILE_RING
);
108 fd
= open(path
, O_RDONLY
);
110 panic("Cannot open ring file!\n");
114 nfds
= alsa_nfds(dev
);
115 pfds
= xmalloc(sizeof(*pfds
) * nfds
);
117 alsa_getfds(dev
, pfds
, nfds
);
119 memset(pcm
, 0, sizeof(pcm
));
120 while (read(fd
, pcm
, sizeof(pcm
)) > 0) {
121 poll(pfds
, nfds
, -1);
123 if (alsa_play_ready(dev
, pfds
, nfds
))
124 alsa_write(dev
, pcm
, FRAME_SIZE
);
125 memset(pcm
, 0, sizeof(pcm
));
127 if (alsa_cap_ready(dev
, pfds
, nfds
))
128 alsa_read(dev
, pcm
, FRAME_SIZE
);
129 memset(pcm
, 0, sizeof(pcm
));
137 static inline void engine_play_ring(struct alsa_dev
*dev
)
139 engine_play_file(dev
, ENGINE_SOUND_RING
);
142 static inline void engine_play_busy(struct alsa_dev
*dev
)
144 engine_play_file(dev
, ENGINE_SOUND_BUSY
);
147 static inline void engine_play_dial(struct alsa_dev
*dev
)
149 engine_play_file(dev
, ENGINE_SOUND_DIAL
);
152 static void engine_decode_packet(uint8_t *pkt
, size_t len
)
154 struct transsip_hdr
*hdr
;
156 if (len
< sizeof(*hdr
)) {
157 whine("[dbg] pkt too small!\n");
161 hdr
= (struct transsip_hdr
*) pkt
;
162 whine("[dbg] packet:\n");
163 whine("[dbg] seq: %d\n", hdr
->seq
);
164 whine("[dbg] est: %d\n", hdr
->est
);
165 whine("[dbg] psh: %d\n", hdr
->psh
);
166 whine("[dbg] bsy: %d\n", hdr
->bsy
);
167 whine("[dbg] fin: %d\n", hdr
->fin
);
168 whine("[dbg] res1: %d\n", hdr
->res1
);
171 static enum engine_state_num
engine_do_callout(int ssock
, int *csock
, int usocki
,
172 int usocko
, struct alsa_dev
*dev
)
174 int one
, mtu
, tries
, i
;
177 struct addrinfo hints
, *ahead
, *ai
;
179 struct pollfd fds
[2];
180 struct sockaddr raddr
;
182 struct transsip_hdr
*thdr
;
184 assert(ecurr
.active
== 0);
186 memset(&cpkt
, 0, sizeof(cpkt
));
187 ret
= read(usocki
, &cpkt
, sizeof(cpkt
));
188 if (ret
!= sizeof(cpkt
)) {
189 whine("Read error from cli!\n");
190 return ENGINE_STATE_IDLE
;
192 if (cpkt
.ring
== 0) {
193 whine("Read no ring flag from cli!\n");
194 return ENGINE_STATE_IDLE
;
197 memset(&hints
, 0, sizeof(hints
));
198 hints
.ai_family
= PF_UNSPEC
;
199 hints
.ai_socktype
= SOCK_DGRAM
;
200 hints
.ai_protocol
= IPPROTO_UDP
;
201 hints
.ai_flags
= AI_NUMERICSERV
;
203 ret
= getaddrinfo(cpkt
.address
, cpkt
.port
, &hints
, &ahead
);
205 whine("Cannot get address info for %s:%s!\n",
206 cpkt
.address
, cpkt
.port
);
207 return ENGINE_STATE_IDLE
;
212 for (ai
= ahead
; ai
!= NULL
&& *csock
< 0; ai
= ai
->ai_next
) {
213 *csock
= socket(ai
->ai_family
, ai
->ai_socktype
,
218 ret
= connect(*csock
, ai
->ai_addr
, ai
->ai_addrlen
);
220 whine("Cannot connect to remote!\n");
227 setsockopt(*csock
, SOL_SOCKET
, SO_KEEPALIVE
, &one
, sizeof(one
));
229 mtu
= IP_PMTUDISC_DONT
;
230 setsockopt(*csock
, SOL_IP
, IP_MTU_DISCOVER
, &mtu
, sizeof(mtu
));
232 memcpy(&ecurr
.addr
, ai
->ai_addr
, ai
->ai_addrlen
);
233 ecurr
.addrlen
= ai
->ai_addrlen
;
240 whine("Cannot connect to server!\n");
241 return ENGINE_STATE_IDLE
;
246 memset(msg
, 0, sizeof(msg
));
247 thdr
= (struct transsip_hdr
*) msg
;
250 ret
= sendto(*csock
, msg
, sizeof(*thdr
), 0, &ecurr
.addr
,
253 whine("Cannot send ring probe to server!\n");
258 fds
[0].events
= POLLIN
;
260 fds
[1].events
= POLLIN
;
262 while (!quit
&& tries
++ < 100) {
263 poll(fds
, array_size(fds
), 1500);
265 for (i
= 0; i
< array_size(fds
); ++i
) {
266 if ((fds
[i
].revents
& POLLERR
) == POLLERR
) {
267 printf("Destination unreachable?\n");
270 if ((fds
[i
].revents
& POLLIN
) != POLLIN
)
273 if (fds
[i
].fd
== usocki
) {
274 ret
= read(usocki
, &cpkt
, sizeof(cpkt
));
276 whine("Read error from cli!\n");
280 memset(&msg
, 0, sizeof(msg
));
281 thdr
= (struct transsip_hdr
*) msg
;
285 sendto(ssock
, msg
, sizeof(*thdr
), 0,
286 &ecurr
.addr
, ecurr
.addrlen
);
288 whine("You aborted call!\n");
293 if (fds
[i
].fd
== *csock
) {
294 memset(msg
, 0, sizeof(msg
));
295 raddrlen
= sizeof(raddr
);
296 ret
= recvfrom(*csock
, msg
, sizeof(msg
), 0,
301 engine_decode_packet((uint8_t *) msg
, ret
);
303 if (raddrlen
!= ecurr
.addrlen
)
305 if (memcmp(&raddr
, &ecurr
.addr
, raddrlen
))
308 thdr
= (struct transsip_hdr
*) msg
;
309 if (thdr
->est
== 1 && thdr
->psh
== 1) {
311 whine("Call established!\n");
312 return ENGINE_STATE_SPEAKING
;
314 if (thdr
->bsy
== 1 || thdr
->fin
== 1) {
315 whine("Remote end busy!\n");
316 engine_play_busy(dev
);
317 engine_play_busy(dev
);
323 engine_play_dial(dev
);
329 return ENGINE_STATE_IDLE
;
332 static enum engine_state_num
engine_do_callin(int ssock
, int *csock
, int usocki
,
333 int usocko
, struct alsa_dev
*dev
)
338 struct sockaddr raddr
;
340 struct transsip_hdr
*thdr
;
341 char hbuff
[256], sbuff
[256];
342 struct pollfd fds
[2];
345 assert(ecurr
.active
== 0);
347 memset(&msg
, 0, sizeof(msg
));
348 raddrlen
= sizeof(raddr
);
349 ret
= recvfrom(ssock
, msg
, sizeof(msg
), 0, &raddr
, &raddrlen
);
351 whine("Receive error %s!\n", strerror(errno
));
352 return ENGINE_STATE_IDLE
;
355 thdr
= (struct transsip_hdr
*) msg
;
357 return ENGINE_STATE_IDLE
;
359 memcpy(&ecurr
.addr
, &raddr
, raddrlen
);
360 ecurr
.addrlen
= raddrlen
;
364 memset(hbuff
, 0, sizeof(hbuff
));
365 memset(sbuff
, 0, sizeof(sbuff
));
366 getnameinfo((struct sockaddr
*) &raddr
, raddrlen
, hbuff
, sizeof(hbuff
),
367 sbuff
, sizeof(sbuff
), NI_NUMERICHOST
| NI_NUMERICSERV
);
369 printf("New incoming connection from %s:%s!\n", hbuff
, sbuff
);
370 printf("Answer it with: take\n");
371 printf("Reject it with: hangup\n");
375 fds
[0].events
= POLLIN
;
377 fds
[1].events
= POLLIN
;
379 while (likely(!quit
)) {
380 poll(fds
, array_size(fds
), 1500);
382 for (i
= 0; i
< array_size(fds
); ++i
) {
383 if ((fds
[i
].revents
& POLLIN
) != POLLIN
)
386 if (fds
[i
].fd
== ssock
) {
387 memset(msg
, 0, sizeof(msg
));
388 raddrlen
= sizeof(raddr
);
389 ret
= recvfrom(ssock
, msg
, sizeof(msg
), 0,
393 if (raddrlen
!= ecurr
.addrlen
)
395 if (memcmp(&raddr
, &ecurr
.addr
, raddrlen
))
398 if (thdr
->fin
== 1 || thdr
->bsy
== 1) {
399 whine("Remote end hung up!\n");
400 engine_play_busy(dev
);
401 engine_play_busy(dev
);
407 if (fds
[i
].fd
== usocki
) {
408 ret
= read(usocki
, &cpkt
, sizeof(cpkt
));
410 whine("Error reading from cli!\n");
414 memset(&msg
, 0, sizeof(msg
));
415 thdr
= (struct transsip_hdr
*) msg
;
419 sendto(ssock
, msg
, sizeof(*thdr
), 0,
420 &ecurr
.addr
, ecurr
.addrlen
);
422 whine("You aborted call!\n");
426 memset(&msg
, 0, sizeof(msg
));
427 thdr
= (struct transsip_hdr
*) msg
;
431 ret
= sendto(ssock
, msg
, sizeof(*thdr
), 0,
432 &ecurr
.addr
, ecurr
.addrlen
);
434 whine("Error sending ack!\n");
439 whine("Call established!\n");
440 return ENGINE_STATE_SPEAKING
;
445 engine_play_ring(dev
);
449 return ENGINE_STATE_IDLE
;
452 static enum engine_state_num
engine_do_speaking(int ssock
, int *csock
,
453 int usocki
, int usocko
,
454 struct alsa_dev
*dev
)
457 int recv_started
= 0, nfds
= 0, tmp
, i
;
458 struct pollfd
*pfds
= NULL
;
460 uint32_t send_seq
= 0;
462 CELTEncoder
*encoder
;
463 CELTDecoder
*decoder
;
464 JitterBuffer
*jitter
;
465 struct sockaddr raddr
;
466 struct transsip_hdr
*thdr
;
470 assert(ecurr
.active
== 1);
472 mode
= celt_mode_create(SAMPLING_RATE
, FRAME_SIZE
, NULL
);
473 encoder
= celt_encoder_create(mode
, CHANNELS
, NULL
);
474 decoder
= celt_decoder_create(mode
, CHANNELS
, NULL
);
476 jitter
= jitter_buffer_init(FRAME_SIZE
);
478 jitter_buffer_ctl(jitter
, JITTER_BUFFER_SET_MARGIN
, &tmp
);
480 nfds
= alsa_nfds(dev
);
481 pfds
= xmalloc(sizeof(*pfds
) * (nfds
+ 2));
483 alsa_getfds(dev
, pfds
, nfds
);
485 pfds
[nfds
].fd
= ecurr
.sock
;
486 pfds
[nfds
].events
= POLLIN
;
487 pfds
[nfds
+ 1].fd
= usocki
;
488 pfds
[nfds
+ 1].events
= POLLIN
;
492 while (likely(!quit
)) {
493 poll(pfds
, nfds
+ 2, -1);
495 if (pfds
[nfds
+ 1].revents
& POLLIN
) {
496 ret
= read(usocki
, &cpkt
, sizeof(cpkt
));
498 whine("Read error from cli!\n");
502 whine("You aborted call!\n");
507 if (pfds
[nfds
].revents
& POLLIN
) {
508 JitterBufferPacket packet
;
510 memset(msg
, 0, sizeof(msg
));
511 raddrlen
= sizeof(raddr
);
512 ret
= recvfrom(ecurr
.sock
, msg
, sizeof(msg
), 0,
514 if (unlikely(ret
<= 0))
517 if (raddrlen
!= ecurr
.addrlen
||
518 memcmp(&raddr
, &ecurr
.addr
, raddrlen
)) {
519 memset(msg
, 0, sizeof(msg
));
521 thdr
= (struct transsip_hdr
*) msg
;
524 sendto(ecurr
.sock
, msg
, sizeof(*thdr
), 0,
530 thdr
= (struct transsip_hdr
*) msg
;
531 if (thdr
->fin
== 1) {
532 whine("Remote end hung up!\n");
536 packet
.data
= msg
+ sizeof(*thdr
);
537 packet
.len
= ret
- sizeof(*thdr
);
538 packet
.timestamp
= ntohl(thdr
->seq
);
539 packet
.span
= FRAME_SIZE
;
542 jitter_buffer_put(jitter
, &packet
);
546 if (alsa_play_ready(dev
, pfds
, nfds
)) {
547 short pcm
[FRAME_SIZE
* CHANNELS
];
550 JitterBufferPacket packet
;
552 memset(msg
, 0, sizeof(msg
));
554 packet
.len
= MAX_MSG
;
556 jitter_buffer_tick(jitter
);
557 jitter_buffer_get(jitter
, &packet
,
562 celt_decode(decoder
, (const unsigned char *)
563 packet
.data
, packet
.len
, pcm
);
565 for (i
= 0; i
< FRAME_SIZE
* CHANNELS
; ++i
)
569 alsa_write(dev
, pcm
, FRAME_SIZE
);
572 if (alsa_cap_ready(dev
, pfds
, nfds
)) {
573 short pcm
[FRAME_SIZE
* CHANNELS
];
575 alsa_read(dev
, pcm
, FRAME_SIZE
);
577 memset(msg
, 0, sizeof(msg
));
578 thdr
= (struct transsip_hdr
*) msg
;
580 celt_encode(encoder
, pcm
, NULL
, (unsigned char *)
581 (msg
+ sizeof(*thdr
)), PACKETSIZE
);
585 thdr
->seq
= htonl(send_seq
);
586 send_seq
+= FRAME_SIZE
;
588 ret
= sendto(ecurr
.sock
, msg
,
589 PACKETSIZE
+ sizeof(*thdr
), 0,
590 &ecurr
.addr
, ecurr
.addrlen
);
592 whine("Send datagram failed!\n");
601 memset(msg
, 0, sizeof(msg
));
602 thdr
= (struct transsip_hdr
*) msg
;
605 sendto(ecurr
.sock
, msg
, sizeof(*thdr
), 0, &ecurr
.addr
,
608 if (ecurr
.sock
== *csock
) {
613 celt_encoder_destroy(encoder
);
614 celt_decoder_destroy(decoder
);
615 celt_mode_destroy(mode
);
617 jitter_buffer_destroy(jitter
);
622 return ENGINE_STATE_IDLE
;
625 static inline void engine_drop_from_queue(int sock
)
628 recv(sock
, msg
, sizeof(msg
), 0);
631 static enum engine_state_num
engine_do_idle(int ssock
, int *csock
, int usocki
,
632 int usocko
, struct alsa_dev
*dev
)
636 struct pollfd fds
[2];
638 struct transsip_hdr
*thdr
;
641 assert(ecurr
.active
== 0);
644 fds
[0].events
= POLLIN
;
646 fds
[1].events
= POLLIN
;
648 while (likely(!quit
)) {
649 memset(msg
, 0, sizeof(msg
));
651 poll(fds
, array_size(fds
), -1);
652 for (i
= 0; i
< array_size(fds
); ++i
) {
653 if ((fds
[i
].revents
& POLLIN
) != POLLIN
)
656 if (fds
[i
].fd
== ssock
) {
657 ret
= recv(ssock
, msg
, sizeof(msg
), MSG_PEEK
);
660 if (ret
< sizeof(struct transsip_hdr
))
661 engine_drop_from_queue(ssock
);
663 thdr
= (struct transsip_hdr
*) msg
;
664 if (thdr
->est
== 1 && thdr
->psh
== 0)
665 return ENGINE_STATE_CALLIN
;
667 engine_drop_from_queue(ssock
);
670 if (fds
[i
].fd
== usocki
) {
671 ret
= read(usocki
, &cpkt
, sizeof(cpkt
));
673 whine("Error reading from cli!\n");
677 return ENGINE_STATE_CALLOUT
;
682 return ENGINE_STATE_IDLE
;
685 struct engine_state state_machine
[__ENGINE_STATE_MAX
] __read_mostly
= {
686 STATE_MAP_SET(ENGINE_STATE_IDLE
, engine_do_idle
),
687 STATE_MAP_SET(ENGINE_STATE_CALLOUT
, engine_do_callout
),
688 STATE_MAP_SET(ENGINE_STATE_CALLIN
, engine_do_callin
),
689 STATE_MAP_SET(ENGINE_STATE_SPEAKING
, engine_do_speaking
),
692 void *engine_main(void *arg
)
694 int ssock
= -1, csock
, ret
, mtu
, usocki
, usocko
;
695 enum engine_state_num state
;
696 struct addrinfo hints
, *ahead
, *ai
;
697 struct alsa_dev
*dev
= NULL
;
698 struct pipepair
*pp
= arg
;
700 init_call_notifier();
708 memset(&hints
, 0, sizeof(hints
));
709 hints
.ai_family
= PF_UNSPEC
;
710 hints
.ai_socktype
= SOCK_DGRAM
;
711 hints
.ai_protocol
= IPPROTO_UDP
;
712 hints
.ai_flags
= AI_PASSIVE
;
714 ret
= getaddrinfo(NULL
, port
, &hints
, &ahead
);
716 panic("Cannot get address info!\n");
718 for (ai
= ahead
; ai
!= NULL
&& ssock
< 0; ai
= ai
->ai_next
) {
719 ssock
= socket(ai
->ai_family
, ai
->ai_socktype
, ai
->ai_protocol
);
722 if (ai
->ai_family
== AF_INET6
) {
725 ret
= setsockopt(ssock
, IPPROTO_IPV6
, IPV6_V6ONLY
,
736 #endif /* IPV6_V6ONLY */
739 mtu
= IP_PMTUDISC_DONT
;
740 setsockopt(ssock
, SOL_IP
, IP_MTU_DISCOVER
, &mtu
, sizeof(mtu
));
742 ret
= bind(ssock
, ai
->ai_addr
, ai
->ai_addrlen
);
752 panic("Cannot open socket!\n");
754 dev
= alsa_open(alsadev
, SAMPLING_RATE
, CHANNELS
, FRAME_SIZE
);
756 panic("Cannot open ALSA device %s!\n", alsadev
);
758 state
= ENGINE_STATE_IDLE
;
759 while (likely(!quit
)) {
761 call_notifier_exec(CALL_STATE_MACHINE_CHANGED
, &arg
);
762 state
= state_machine
[state
].process(ssock
, &csock
,