2 * transsip - the telephony toolkit
3 * By Daniel Borkmann <daniel@transsip.org>
4 * Copyright 2011, 2012 Daniel Borkmann <dborkma@tik.ee.ethz.ch>
5 * Subject to the GPL, version 2.
15 #include <celt/celt.h>
16 #include <speex/speex_jitter.h>
17 #include <speex/speex_echo.h>
18 #include <speex/speex_preprocess.h>
19 #include <netinet/in.h>
20 #include <arpa/inet.h>
29 #include "call_notifier.h"
31 #define SAMPLING_RATE 48000
32 #define FRAME_SIZE 256
39 __extension__
uint8_t est
:1,
44 } __attribute__((packed
));
46 enum engine_state_num
{
47 ENGINE_STATE_IDLE
= CALL_STATE_MACHINE_IDLE
,
48 ENGINE_STATE_CALLOUT
= CALL_STATE_MACHINE_CALLOUT
,
49 ENGINE_STATE_CALLIN
= CALL_STATE_MACHINE_CALLIN
,
50 ENGINE_STATE_SPEAKING
= CALL_STATE_MACHINE_SPEAKING
,
54 enum engine_sound_type
{
55 ENGINE_SOUND_DIAL
= 0,
61 volatile enum engine_state_num state
;
62 enum engine_state_num (*process
)(int, int *, int, int,
66 #define STATE_MAP_SET(s, f) { \
71 extern volatile sig_atomic_t quit
;
73 extern void *engine_main(void *arg
);
75 volatile sig_atomic_t stun_done
= 0;
77 static char *alsadev
= "plughw:0,0"; //XXX
78 static char *port
= "30111"; //XXX
87 static struct engine_curr ecurr
;
89 static void engine_play_file(struct alsa_dev
*dev
, enum engine_sound_type type
)
93 short pcm
[FRAME_SIZE
];
94 struct pollfd
*pfds
= NULL
;
96 memset(path
, 0, sizeof(path
));
98 case ENGINE_SOUND_DIAL
:
99 slprintf(path
, sizeof(path
), "%s/%s", FILE_ETCDIR
, FILE_DIAL
);
101 case ENGINE_SOUND_BUSY
:
102 slprintf(path
, sizeof(path
), "%s/%s", FILE_ETCDIR
, FILE_BUSY
);
104 case ENGINE_SOUND_RING
:
105 slprintf(path
, sizeof(path
), "%s/%s", FILE_ETCDIR
, FILE_RING
);
109 fd
= open(path
, O_RDONLY
);
111 panic("Cannot open ring file!\n");
115 nfds
= alsa_nfds(dev
);
116 pfds
= xmalloc(sizeof(*pfds
) * nfds
);
118 alsa_getfds(dev
, pfds
, nfds
);
120 memset(pcm
, 0, sizeof(pcm
));
121 while (read(fd
, pcm
, sizeof(pcm
)) > 0) {
122 poll(pfds
, nfds
, -1);
124 alsa_write(dev
, pcm
, FRAME_SIZE
);
125 memset(pcm
, 0, sizeof(pcm
));
127 alsa_read(dev
, pcm
, FRAME_SIZE
);
128 memset(pcm
, 0, sizeof(pcm
));
136 static inline void engine_play_ring(struct alsa_dev
*dev
)
138 engine_play_file(dev
, ENGINE_SOUND_RING
);
141 static inline void engine_play_busy(struct alsa_dev
*dev
)
143 engine_play_file(dev
, ENGINE_SOUND_BUSY
);
146 static inline void engine_play_dial(struct alsa_dev
*dev
)
148 engine_play_file(dev
, ENGINE_SOUND_DIAL
);
151 static void engine_decode_packet(uint8_t *pkt
, size_t len
)
153 struct transsip_hdr
*hdr
;
155 if (len
< sizeof(*hdr
)) {
156 whine("[dbg] pkt too small!\n");
160 hdr
= (struct transsip_hdr
*) pkt
;
161 whine("[dbg] packet:\n");
162 whine("[dbg] seq: %d\n", hdr
->seq
);
163 whine("[dbg] est: %d\n", hdr
->est
);
164 whine("[dbg] psh: %d\n", hdr
->psh
);
165 whine("[dbg] bsy: %d\n", hdr
->bsy
);
166 whine("[dbg] fin: %d\n", hdr
->fin
);
167 whine("[dbg] res1: %d\n", hdr
->res1
);
170 static enum engine_state_num
engine_do_callout(int ssock
, int *csock
, int usocki
,
171 int usocko
, struct alsa_dev
*dev
)
173 int one
, mtu
, tries
, i
;
176 struct addrinfo hints
, *ahead
, *ai
;
178 struct pollfd fds
[2];
179 struct sockaddr raddr
;
181 struct transsip_hdr
*thdr
;
183 assert(ecurr
.active
== 0);
185 memset(&cpkt
, 0, sizeof(cpkt
));
186 ret
= read(usocki
, &cpkt
, sizeof(cpkt
));
187 if (ret
!= sizeof(cpkt
)) {
188 whine("Read error from cli!\n");
189 return ENGINE_STATE_IDLE
;
191 if (cpkt
.ring
== 0) {
192 whine("Read no ring flag from cli!\n");
193 return ENGINE_STATE_IDLE
;
196 memset(&hints
, 0, sizeof(hints
));
197 hints
.ai_family
= PF_UNSPEC
;
198 hints
.ai_socktype
= SOCK_DGRAM
;
199 hints
.ai_protocol
= IPPROTO_UDP
;
200 hints
.ai_flags
= AI_NUMERICSERV
;
202 ret
= getaddrinfo(cpkt
.address
, cpkt
.port
, &hints
, &ahead
);
204 whine("Cannot get address info for %s:%s!\n",
205 cpkt
.address
, cpkt
.port
);
206 return ENGINE_STATE_IDLE
;
211 for (ai
= ahead
; ai
!= NULL
&& *csock
< 0; ai
= ai
->ai_next
) {
212 *csock
= socket(ai
->ai_family
, ai
->ai_socktype
,
217 ret
= connect(*csock
, ai
->ai_addr
, ai
->ai_addrlen
);
219 whine("Cannot connect to remote!\n");
226 setsockopt(*csock
, SOL_SOCKET
, SO_KEEPALIVE
, &one
, sizeof(one
));
228 mtu
= IP_PMTUDISC_DONT
;
229 setsockopt(*csock
, SOL_IP
, IP_MTU_DISCOVER
, &mtu
, sizeof(mtu
));
231 memcpy(&ecurr
.addr
, ai
->ai_addr
, ai
->ai_addrlen
);
232 ecurr
.addrlen
= ai
->ai_addrlen
;
239 whine("Cannot connect to server!\n");
240 return ENGINE_STATE_IDLE
;
245 memset(msg
, 0, sizeof(msg
));
246 thdr
= (struct transsip_hdr
*) msg
;
249 ret
= sendto(*csock
, msg
, sizeof(*thdr
), 0, &ecurr
.addr
,
252 whine("Cannot send ring probe to server!\n");
257 fds
[0].events
= POLLIN
;
259 fds
[1].events
= POLLIN
;
261 while (!quit
&& tries
++ < 100) {
262 poll(fds
, array_size(fds
), 1500);
264 for (i
= 0; i
< array_size(fds
); ++i
) {
265 if ((fds
[i
].revents
& POLLERR
) == POLLERR
) {
266 printf("Destination unreachable?\n");
269 if ((fds
[i
].revents
& POLLIN
) != POLLIN
)
272 if (fds
[i
].fd
== usocki
) {
273 ret
= read(usocki
, &cpkt
, sizeof(cpkt
));
275 whine("Read error from cli!\n");
279 memset(&msg
, 0, sizeof(msg
));
280 thdr
= (struct transsip_hdr
*) msg
;
284 sendto(*csock
, msg
, sizeof(*thdr
), 0,
285 &ecurr
.addr
, ecurr
.addrlen
);
287 whine("You aborted call!\n");
292 if (fds
[i
].fd
== *csock
) {
293 memset(msg
, 0, sizeof(msg
));
294 raddrlen
= sizeof(raddr
);
295 ret
= recvfrom(*csock
, msg
, sizeof(msg
), 0,
300 if (raddrlen
!= ecurr
.addrlen
)
302 if (memcmp(&raddr
, &ecurr
.addr
, raddrlen
))
305 thdr
= (struct transsip_hdr
*) msg
;
306 if (thdr
->est
== 1 && thdr
->psh
== 1) {
308 whine("Call established!\n");
309 return ENGINE_STATE_SPEAKING
;
311 if (thdr
->bsy
== 1 || thdr
->fin
== 1) {
312 whine("Remote end busy!\n");
313 engine_play_busy(dev
);
314 engine_play_busy(dev
);
320 engine_play_dial(dev
);
326 return ENGINE_STATE_IDLE
;
329 static enum engine_state_num
engine_do_callin(int ssock
, int *csock
, int usocki
,
330 int usocko
, struct alsa_dev
*dev
)
335 struct sockaddr raddr
;
337 struct transsip_hdr
*thdr
;
338 char hbuff
[256], sbuff
[256];
339 struct pollfd fds
[2];
342 assert(ecurr
.active
== 0);
344 memset(&msg
, 0, sizeof(msg
));
345 raddrlen
= sizeof(raddr
);
346 ret
= recvfrom(ssock
, msg
, sizeof(msg
), 0, &raddr
, &raddrlen
);
348 whine("Receive error %s!\n", strerror(errno
));
349 return ENGINE_STATE_IDLE
;
352 thdr
= (struct transsip_hdr
*) msg
;
354 return ENGINE_STATE_IDLE
;
356 memcpy(&ecurr
.addr
, &raddr
, raddrlen
);
357 ecurr
.addrlen
= raddrlen
;
361 memset(hbuff
, 0, sizeof(hbuff
));
362 memset(sbuff
, 0, sizeof(sbuff
));
363 getnameinfo((struct sockaddr
*) &raddr
, raddrlen
, hbuff
, sizeof(hbuff
),
364 sbuff
, sizeof(sbuff
), NI_NUMERICHOST
| NI_NUMERICSERV
);
366 printf("New incoming connection from %s:%s!\n", hbuff
, sbuff
);
367 printf("Answer it with: take\n");
368 printf("Reject it with: hangup\n");
372 fds
[0].events
= POLLIN
;
374 fds
[1].events
= POLLIN
;
376 while (likely(!quit
)) {
377 poll(fds
, array_size(fds
), 1500);
379 for (i
= 0; i
< array_size(fds
); ++i
) {
380 if ((fds
[i
].revents
& POLLIN
) != POLLIN
)
383 if (fds
[i
].fd
== ssock
) {
384 memset(msg
, 0, sizeof(msg
));
385 raddrlen
= sizeof(raddr
);
386 ret
= recvfrom(ssock
, msg
, sizeof(msg
), 0,
390 if (raddrlen
!= ecurr
.addrlen
)
392 if (memcmp(&raddr
, &ecurr
.addr
, raddrlen
))
395 if (thdr
->fin
== 1 || thdr
->bsy
== 1) {
396 whine("Remote end hung up!\n");
397 engine_play_busy(dev
);
398 engine_play_busy(dev
);
404 if (fds
[i
].fd
== usocki
) {
405 ret
= read(usocki
, &cpkt
, sizeof(cpkt
));
407 whine("Error reading from cli!\n");
411 memset(&msg
, 0, sizeof(msg
));
412 thdr
= (struct transsip_hdr
*) msg
;
416 sendto(ssock
, msg
, sizeof(*thdr
), 0,
417 &ecurr
.addr
, ecurr
.addrlen
);
419 whine("You aborted call!\n");
423 memset(&msg
, 0, sizeof(msg
));
424 thdr
= (struct transsip_hdr
*) msg
;
428 ret
= sendto(ssock
, msg
, sizeof(*thdr
), 0,
429 &ecurr
.addr
, ecurr
.addrlen
);
431 whine("Error sending ack!\n");
436 whine("Call established!\n");
437 return ENGINE_STATE_SPEAKING
;
442 engine_play_ring(dev
);
446 return ENGINE_STATE_IDLE
;
449 static enum engine_state_num
engine_do_speaking(int ssock
, int *csock
,
450 int usocki
, int usocko
,
451 struct alsa_dev
*dev
)
454 int recv_started
= 0, nfds
= 0, tmp
, i
, one
;
455 struct pollfd
*pfds
= NULL
;
457 uint32_t send_seq
= 0;
459 CELTEncoder
*encoder
;
460 CELTDecoder
*decoder
;
461 JitterBuffer
*jitter
;
462 SpeexPreprocessState
*preprocess
;
463 SpeexEchoState
*echo_state
;
464 struct sockaddr raddr
;
465 struct transsip_hdr
*thdr
;
469 assert(ecurr
.active
== 1);
471 mode
= celt_mode_create(SAMPLING_RATE
, FRAME_SIZE
, NULL
);
472 encoder
= celt_encoder_create(mode
, 1, NULL
);
473 decoder
= celt_decoder_create(mode
, 1, NULL
);
475 jitter
= jitter_buffer_init(FRAME_SIZE
);
477 jitter_buffer_ctl(jitter
, JITTER_BUFFER_SET_MARGIN
, &tmp
);
479 echo_state
= speex_echo_state_init(FRAME_SIZE
, 10 * FRAME_SIZE
);
481 speex_echo_ctl(echo_state
, SPEEX_ECHO_SET_SAMPLING_RATE
, &tmp
);
484 preprocess
= speex_preprocess_state_init(FRAME_SIZE
, SAMPLING_RATE
);
485 speex_preprocess_ctl(preprocess
, SPEEX_PREPROCESS_SET_DENOISE
, &one
);
486 speex_preprocess_ctl(preprocess
, SPEEX_PREPROCESS_SET_AGC
, &one
);
487 speex_preprocess_ctl(preprocess
, SPEEX_PREPROCESS_SET_DEREVERB
, &one
);
488 speex_preprocess_ctl(preprocess
, SPEEX_PREPROCESS_SET_ECHO_STATE
,
491 nfds
= alsa_nfds(dev
);
492 pfds
= xmalloc(sizeof(*pfds
) * (nfds
+ 2));
494 alsa_getfds(dev
, pfds
, nfds
);
496 pfds
[nfds
].fd
= ecurr
.sock
;
497 pfds
[nfds
].events
= POLLIN
;
498 pfds
[nfds
+ 1].fd
= usocki
;
499 pfds
[nfds
+ 1].events
= POLLIN
;
503 while (likely(!quit
)) {
504 poll(pfds
, nfds
+ 2, -1);
506 if (pfds
[nfds
+ 1].revents
& POLLIN
) {
507 ret
= read(usocki
, &cpkt
, sizeof(cpkt
));
509 whine("Read error from cli!\n");
513 whine("You aborted call!\n");
518 if (pfds
[nfds
].revents
& POLLIN
) {
519 JitterBufferPacket packet
;
521 memset(msg
, 0, sizeof(msg
));
522 raddrlen
= sizeof(raddr
);
523 ret
= recvfrom(ecurr
.sock
, msg
, sizeof(msg
), 0,
525 if (unlikely(ret
<= 0))
528 if (raddrlen
!= ecurr
.addrlen
||
529 memcmp(&raddr
, &ecurr
.addr
, raddrlen
)) {
530 memset(msg
, 0, sizeof(msg
));
532 thdr
= (struct transsip_hdr
*) msg
;
536 sendto(ecurr
.sock
, msg
, sizeof(*thdr
), 0,
542 thdr
= (struct transsip_hdr
*) msg
;
543 if (thdr
->fin
== 1) {
544 whine("Remote end hung up!\n");
548 packet
.data
= msg
+ sizeof(*thdr
);
549 packet
.len
= ret
- sizeof(*thdr
);
550 packet
.timestamp
= ntohl(thdr
->seq
);
551 packet
.span
= FRAME_SIZE
;
554 jitter_buffer_put(jitter
, &packet
);
558 if (alsa_play_ready(dev
, pfds
, nfds
)) {
559 short pcm
[FRAME_SIZE
];
562 JitterBufferPacket packet
;
564 memset(msg
, 0, sizeof(msg
));
566 packet
.len
= MAX_MSG
;
568 jitter_buffer_tick(jitter
);
569 jitter_buffer_get(jitter
, &packet
,
574 celt_decode(decoder
, (const unsigned char *)
575 packet
.data
, packet
.len
, pcm
);
577 for (i
= 0; i
< FRAME_SIZE
; ++i
)
581 if (alsa_write(dev
, pcm
, FRAME_SIZE
))
582 speex_echo_state_reset(echo_state
);
583 speex_echo_playback(echo_state
, pcm
);
586 if (alsa_cap_ready(dev
, pfds
, nfds
)) {
587 short pcm
[FRAME_SIZE
];
588 short pcm2
[FRAME_SIZE
];
590 memset(msg
, 0, sizeof(msg
));
592 alsa_read(dev
, pcm
, FRAME_SIZE
);
594 speex_echo_capture(echo_state
, pcm
, pcm2
);
595 for (i
= 0; i
< FRAME_SIZE
; ++i
)
598 speex_preprocess_run(preprocess
, pcm
);
600 celt_encode(encoder
, pcm
, NULL
, (unsigned char *)
601 (msg
+ sizeof(*thdr
)), PACKETSIZE
);
603 thdr
= (struct transsip_hdr
*) msg
;
606 thdr
->seq
= htonl(send_seq
);
607 send_seq
+= FRAME_SIZE
;
609 ret
= sendto(ecurr
.sock
, msg
,
610 PACKETSIZE
+ sizeof(*thdr
), 0,
611 &ecurr
.addr
, ecurr
.addrlen
);
613 whine("Send datagram failed!\n");
622 memset(msg
, 0, sizeof(msg
));
623 thdr
= (struct transsip_hdr
*) msg
;
626 sendto(ecurr
.sock
, msg
, sizeof(*thdr
), 0, &ecurr
.addr
,
629 if (ecurr
.sock
== *csock
) {
634 celt_encoder_destroy(encoder
);
635 celt_decoder_destroy(decoder
);
636 celt_mode_destroy(mode
);
638 speex_preprocess_state_destroy(preprocess
);
639 jitter_buffer_destroy(jitter
);
644 return ENGINE_STATE_IDLE
;
647 static inline void engine_drop_from_queue(int sock
)
650 recv(sock
, msg
, sizeof(msg
), 0);
653 static enum engine_state_num
engine_do_idle(int ssock
, int *csock
, int usocki
,
654 int usocko
, struct alsa_dev
*dev
)
658 struct pollfd fds
[2];
660 struct transsip_hdr
*thdr
;
663 assert(ecurr
.active
== 0);
666 fds
[0].events
= POLLIN
;
668 fds
[1].events
= POLLIN
;
670 while (likely(!quit
)) {
671 memset(msg
, 0, sizeof(msg
));
673 poll(fds
, array_size(fds
), 1000);
675 for (i
= 0; i
< array_size(fds
); ++i
) {
676 if ((fds
[i
].revents
& POLLIN
) != POLLIN
)
679 if (fds
[i
].fd
== ssock
) {
680 ret
= recv(ssock
, msg
, sizeof(msg
), MSG_PEEK
);
683 if (ret
< sizeof(struct transsip_hdr
))
684 engine_drop_from_queue(ssock
);
686 thdr
= (struct transsip_hdr
*) msg
;
687 if (thdr
->est
== 1 && thdr
->psh
== 0)
688 return ENGINE_STATE_CALLIN
;
690 engine_drop_from_queue(ssock
);
693 if (fds
[i
].fd
== usocki
) {
694 ret
= read(usocki
, &cpkt
, sizeof(cpkt
));
696 whine("Error reading from cli!\n");
700 return ENGINE_STATE_CALLOUT
;
705 return ENGINE_STATE_IDLE
;
708 static struct engine_state state_machine
[__ENGINE_STATE_MAX
] __read_mostly
= {
709 STATE_MAP_SET(ENGINE_STATE_IDLE
, engine_do_idle
),
710 STATE_MAP_SET(ENGINE_STATE_CALLOUT
, engine_do_callout
),
711 STATE_MAP_SET(ENGINE_STATE_CALLIN
, engine_do_callin
),
712 STATE_MAP_SET(ENGINE_STATE_SPEAKING
, engine_do_speaking
),
715 void *engine_main(void *arg
)
717 int ssock
= -1, csock
, ret
, mtu
, usocki
, usocko
;
718 enum engine_state_num state
;
719 struct addrinfo hints
, *ahead
, *ai
;
720 struct alsa_dev
*dev
= NULL
;
721 struct pipepair
*pp
= arg
;
723 init_call_notifier();
731 memset(&hints
, 0, sizeof(hints
));
732 hints
.ai_family
= PF_UNSPEC
;
733 hints
.ai_socktype
= SOCK_DGRAM
;
734 hints
.ai_protocol
= IPPROTO_UDP
;
735 hints
.ai_flags
= AI_PASSIVE
;
737 ret
= getaddrinfo(NULL
, port
, &hints
, &ahead
);
739 panic("Cannot get address info!\n");
741 for (ai
= ahead
; ai
!= NULL
&& ssock
< 0; ai
= ai
->ai_next
) {
742 ssock
= socket(ai
->ai_family
, ai
->ai_socktype
, ai
->ai_protocol
);
745 if (ai
->ai_family
== AF_INET6
) {
748 ret
= setsockopt(ssock
, IPPROTO_IPV6
, IPV6_V6ONLY
,
759 #endif /* IPV6_V6ONLY */
762 mtu
= IP_PMTUDISC_DONT
;
763 setsockopt(ssock
, SOL_IP
, IP_MTU_DISCOVER
, &mtu
, sizeof(mtu
));
765 ret
= bind(ssock
, ai
->ai_addr
, ai
->ai_addrlen
);
775 panic("Cannot open socket!\n");
777 dev
= alsa_open(alsadev
, SAMPLING_RATE
, 1, FRAME_SIZE
);
779 panic("Cannot open ALSA device %s!\n", alsadev
);
781 state
= ENGINE_STATE_IDLE
;
782 while (likely(!quit
)) {
784 call_notifier_exec(CALL_STATE_MACHINE_CHANGED
, &arg
);
785 state
= state_machine
[state
].process(ssock
, &csock
,