engine: update
[transsip.git] / src / engine.c
blobfbbd0739b8d61feab42f775cca538fbcd1f2a97b
1 /*
2 * transsip - the telephony network
3 * By Daniel Borkmann <daniel@transsip.org>
4 * Copyright 2011, 2012 Daniel Borkmann <dborkma@tik.ee.ethz.ch>,
5 * Swiss federal institute of technology (ETH Zurich)
6 * Subject to the GPL, version 2.
7 */
9 #include <sys/types.h>
10 #include <sys/stat.h>
11 #include <fcntl.h>
12 #include <pthread.h>
13 #include <string.h>
14 #include <signal.h>
15 #include <unistd.h>
16 #include <celt/celt.h>
17 #include <speex/speex_jitter.h>
18 #include <speex/speex_echo.h>
19 #include <netinet/in.h>
20 #include <arpa/inet.h>
21 #include <netdb.h>
22 #include <assert.h>
24 #include "built-in.h"
25 #include "alsa.h"
26 #include "die.h"
27 #include "xmalloc.h"
28 #include "xutils.h"
29 #include "call-notifier.h"
31 #define SAMPLING_RATE 48000
32 #define FRAME_SIZE 256
33 #define PACKETSIZE 43
34 #define CHANNELS 1
35 #define MAX_MSG 1500
36 #define PATH_MAX 512
38 struct transsip_hdr {
39 uint32_t seq;
40 __extension__ uint8_t est:1,
41 psh:1,
42 bsy:1,
43 fin:1,
44 res1:4;
45 } __attribute__((packed));
47 enum engine_state_num {
48 ENGINE_STATE_IDLE = CALL_STATE_MACHINE_IDLE,
49 ENGINE_STATE_CALLOUT = CALL_STATE_MACHINE_CALLOUT,
50 ENGINE_STATE_CALLIN = CALL_STATE_MACHINE_CALLIN,
51 ENGINE_STATE_SPEAKING = CALL_STATE_MACHINE_SPEAKING,
52 __ENGINE_STATE_MAX,
55 enum engine_sound_type {
56 ENGINE_SOUND_DIAL = 0,
57 ENGINE_SOUND_RING,
58 ENGINE_SOUND_BUSY,
61 struct engine_state {
62 volatile enum engine_state_num state;
63 enum engine_state_num (*process)(int, int *, int, int,
64 struct alsa_dev *);
67 #define STATE_MAP_SET(s, f) { \
68 .state = (s), \
69 .process = (f) \
72 extern sig_atomic_t quit;
74 sig_atomic_t stun_done = 0;
76 static char *alsadev = "plughw:0,0"; //XXX
77 static char *port = "30111"; //XXX
79 struct engine_curr {
80 int active;
81 int sock;
82 struct sockaddr addr;
83 socklen_t addrlen;
86 static struct engine_curr ecurr;
88 static void engine_play_file(struct alsa_dev *dev, enum engine_sound_type type)
90 int fd, nfds;
91 char path[PATH_MAX];
92 short pcm[FRAME_SIZE * CHANNELS];
93 struct pollfd *pfds = NULL;
95 memset(path, 0, sizeof(path));
96 switch (type) {
97 case ENGINE_SOUND_DIAL:
98 slprintf(path, sizeof(path), "%s/%s", FILE_ETCDIR, FILE_DIAL);
99 break;
100 case ENGINE_SOUND_BUSY:
101 slprintf(path, sizeof(path), "%s/%s", FILE_ETCDIR, FILE_BUSY);
102 break;
103 case ENGINE_SOUND_RING:
104 slprintf(path, sizeof(path), "%s/%s", FILE_ETCDIR, FILE_RING);
105 break;
108 fd = open(path, O_RDONLY);
109 if (fd < 0)
110 panic("Cannot open ring file!\n");
112 alsa_start(dev);
114 nfds = alsa_nfds(dev);
115 pfds = xmalloc(sizeof(*pfds) * nfds);
117 alsa_getfds(dev, pfds, nfds);
119 memset(pcm, 0, sizeof(pcm));
120 while (read(fd, pcm, sizeof(pcm)) > 0) {
121 poll(pfds, nfds, -1);
123 if (alsa_play_ready(dev, pfds, nfds))
124 alsa_write(dev, pcm, FRAME_SIZE);
125 memset(pcm, 0, sizeof(pcm));
127 if (alsa_cap_ready(dev, pfds, nfds))
128 alsa_read(dev, pcm, FRAME_SIZE);
129 memset(pcm, 0, sizeof(pcm));
132 alsa_stop(dev);
133 close(fd);
134 xfree(pfds);
137 static inline void engine_play_ring(struct alsa_dev *dev)
139 engine_play_file(dev, ENGINE_SOUND_RING);
142 static inline void engine_play_busy(struct alsa_dev *dev)
144 engine_play_file(dev, ENGINE_SOUND_BUSY);
147 static inline void engine_play_dial(struct alsa_dev *dev)
149 engine_play_file(dev, ENGINE_SOUND_DIAL);
152 static void engine_decode_packet(uint8_t *pkt, size_t len)
154 struct transsip_hdr *hdr;
156 if (len < sizeof(*hdr)) {
157 whine("[dbg] pkt too small!\n");
158 return;
161 hdr = (struct transsip_hdr *) pkt;
162 whine("[dbg] packet:\n");
163 whine("[dbg] seq: %d\n", hdr->seq);
164 whine("[dbg] est: %d\n", hdr->est);
165 whine("[dbg] psh: %d\n", hdr->psh);
166 whine("[dbg] bsy: %d\n", hdr->bsy);
167 whine("[dbg] fin: %d\n", hdr->fin);
168 whine("[dbg] res1: %d\n", hdr->res1);
171 static enum engine_state_num engine_do_callout(int ssock, int *csock, int usocki,
172 int usocko, struct alsa_dev *dev)
174 int one, mtu, tries, i;
175 ssize_t ret;
176 struct cli_pkt cpkt;
177 struct addrinfo hints, *ahead, *ai;
178 char msg[MAX_MSG];
179 struct pollfd fds[2];
180 struct sockaddr raddr;
181 socklen_t raddrlen;
182 struct transsip_hdr *thdr;
184 assert(ecurr.active == 0);
186 memset(&cpkt, 0, sizeof(cpkt));
187 ret = read(usocki, &cpkt, sizeof(cpkt));
188 if (ret != sizeof(cpkt)) {
189 whine("Read error from cli!\n");
190 return ENGINE_STATE_IDLE;
192 if (cpkt.ring == 0) {
193 whine("Read no ring flag from cli!\n");
194 return ENGINE_STATE_IDLE;
197 memset(&hints, 0, sizeof(hints));
198 hints.ai_family = PF_UNSPEC;
199 hints.ai_socktype = SOCK_DGRAM;
200 hints.ai_protocol = IPPROTO_UDP;
201 hints.ai_flags = AI_NUMERICSERV;
203 ret = getaddrinfo(cpkt.address, cpkt.port, &hints, &ahead);
204 if (ret < 0) {
205 whine("Cannot get address info for %s:%s!\n",
206 cpkt.address, cpkt.port);
207 return ENGINE_STATE_IDLE;
210 *csock = -1;
212 for (ai = ahead; ai != NULL && *csock < 0; ai = ai->ai_next) {
213 *csock = socket(ai->ai_family, ai->ai_socktype,
214 ai->ai_protocol);
215 if (*csock < 0)
216 continue;
218 ret = connect(*csock, ai->ai_addr, ai->ai_addrlen);
219 if (ret < 0) {
220 whine("Cannot connect to remote!\n");
221 close(*csock);
222 *csock = -1;
223 continue;
226 one = 1;
227 setsockopt(*csock, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
229 mtu = IP_PMTUDISC_DONT;
230 setsockopt(*csock, SOL_IP, IP_MTU_DISCOVER, &mtu, sizeof(mtu));
232 memcpy(&ecurr.addr, ai->ai_addr, ai->ai_addrlen);
233 ecurr.addrlen = ai->ai_addrlen;
234 ecurr.sock = *csock;
235 ecurr.active = 0;
237 freeaddrinfo(ahead);
239 if (*csock < 0) {
240 whine("Cannot connect to server!\n");
241 return ENGINE_STATE_IDLE;
244 tries = 0;
246 memset(msg, 0, sizeof(msg));
247 thdr = (struct transsip_hdr *) msg;
248 thdr->est = 1;
250 ret = sendto(*csock, msg, sizeof(*thdr), 0, &ecurr.addr,
251 ecurr.addrlen);
252 if (ret <= 0) {
253 whine("Cannot send ring probe to server!\n");
254 goto out_err;
257 fds[0].fd = *csock;
258 fds[0].events = POLLIN;
259 fds[1].fd = usocki;
260 fds[1].events = POLLIN;
262 while (!quit && tries++ < 100) {
263 poll(fds, array_size(fds), 1500);
265 for (i = 0; i < array_size(fds); ++i) {
266 if ((fds[i].revents & POLLERR) == POLLERR) {
267 printf("Destination unreachable?\n");
268 goto out_err;
270 if ((fds[i].revents & POLLIN) != POLLIN)
271 continue;
273 if (fds[i].fd == usocki) {
274 ret = read(usocki, &cpkt, sizeof(cpkt));
275 if (ret <= 0) {
276 whine("Read error from cli!\n");
277 continue;
279 if (cpkt.fin) {
280 memset(&msg, 0, sizeof(msg));
281 thdr = (struct transsip_hdr *) msg;
282 thdr->bsy = 1;
283 thdr->fin = 1;
285 sendto(ssock, msg, sizeof(*thdr), 0,
286 &ecurr.addr, ecurr.addrlen);
288 whine("You aborted call!\n");
289 goto out_err;
293 if (fds[i].fd == *csock) {
294 memset(msg, 0, sizeof(msg));
295 raddrlen = sizeof(raddr);
296 ret = recvfrom(*csock, msg, sizeof(msg), 0,
297 &raddr, &raddrlen);
298 if (ret <= 0)
299 continue;
301 engine_decode_packet((uint8_t *) msg, ret);
303 if (raddrlen != ecurr.addrlen)
304 continue;
305 if (memcmp(&raddr, &ecurr.addr, raddrlen))
306 continue;
308 thdr = (struct transsip_hdr *) msg;
309 if (thdr->est == 1 && thdr->psh == 1) {
310 ecurr.active = 1;
311 whine("Call established!\n");
312 return ENGINE_STATE_SPEAKING;
314 if (thdr->bsy == 1 || thdr->fin == 1) {
315 whine("Remote end busy!\n");
316 engine_play_busy(dev);
317 engine_play_busy(dev);
318 goto out_err;
323 engine_play_dial(dev);
326 out_err:
327 close(*csock);
328 *csock = 0;
329 return ENGINE_STATE_IDLE;
332 static enum engine_state_num engine_do_callin(int ssock, int *csock, int usocki,
333 int usocko, struct alsa_dev *dev)
335 int i;
336 ssize_t ret;
337 char msg[MAX_MSG];
338 struct sockaddr raddr;
339 socklen_t raddrlen;
340 struct transsip_hdr *thdr;
341 char hbuff[256], sbuff[256];
342 struct pollfd fds[2];
343 struct cli_pkt cpkt;
345 assert(ecurr.active == 0);
347 memset(&msg, 0, sizeof(msg));
348 raddrlen = sizeof(raddr);
349 ret = recvfrom(ssock, msg, sizeof(msg), 0, &raddr, &raddrlen);
350 if (ret <= 0) {
351 whine("Receive error %s!\n", strerror(errno));
352 return ENGINE_STATE_IDLE;
355 thdr = (struct transsip_hdr *) msg;
356 if (thdr->est != 1)
357 return ENGINE_STATE_IDLE;
359 memcpy(&ecurr.addr, &raddr, raddrlen);
360 ecurr.addrlen = raddrlen;
361 ecurr.sock = ssock;
362 ecurr.active = 0;
364 memset(hbuff, 0, sizeof(hbuff));
365 memset(sbuff, 0, sizeof(sbuff));
366 getnameinfo((struct sockaddr *) &raddr, raddrlen, hbuff, sizeof(hbuff),
367 sbuff, sizeof(sbuff), NI_NUMERICHOST | NI_NUMERICSERV);
369 printf("New incoming connection from %s:%s!\n", hbuff, sbuff);
370 printf("Answer it with: take\n");
371 printf("Reject it with: hangup\n");
372 fflush(stdout);
374 fds[0].fd = ssock;
375 fds[0].events = POLLIN;
376 fds[1].fd = usocki;
377 fds[1].events = POLLIN;
379 while (likely(!quit)) {
380 poll(fds, array_size(fds), 1500);
382 for (i = 0; i < array_size(fds); ++i) {
383 if ((fds[i].revents & POLLIN) != POLLIN)
384 continue;
386 if (fds[i].fd == ssock) {
387 memset(msg, 0, sizeof(msg));
388 raddrlen = sizeof(raddr);
389 ret = recvfrom(ssock, msg, sizeof(msg), 0,
390 &raddr, &raddrlen);
391 if (ret <= 0)
392 continue;
393 if (raddrlen != ecurr.addrlen)
394 continue;
395 if (memcmp(&raddr, &ecurr.addr, raddrlen))
396 continue;
398 if (thdr->fin == 1 || thdr->bsy == 1) {
399 whine("Remote end hung up!\n");
400 engine_play_busy(dev);
401 engine_play_busy(dev);
402 goto out_err;
407 if (fds[i].fd == usocki) {
408 ret = read(usocki, &cpkt, sizeof(cpkt));
409 if (ret <= 0) {
410 whine("Error reading from cli!\n");
411 continue;
413 if (cpkt.fin) {
414 memset(&msg, 0, sizeof(msg));
415 thdr = (struct transsip_hdr *) msg;
416 thdr->bsy = 1;
417 thdr->fin = 1;
419 sendto(ssock, msg, sizeof(*thdr), 0,
420 &ecurr.addr, ecurr.addrlen);
422 whine("You aborted call!\n");
423 goto out_err;
425 if (cpkt.take) {
426 memset(&msg, 0, sizeof(msg));
427 thdr = (struct transsip_hdr *) msg;
428 thdr->est = 1;
429 thdr->psh = 1;
431 ret = sendto(ssock, msg, sizeof(*thdr), 0,
432 &ecurr.addr, ecurr.addrlen);
433 if (ret <= 0) {
434 whine("Error sending ack!\n");
435 goto out_err;
438 ecurr.active = 1;
439 whine("Call established!\n");
440 return ENGINE_STATE_SPEAKING;
445 engine_play_ring(dev);
448 out_err:
449 return ENGINE_STATE_IDLE;
452 static enum engine_state_num engine_do_speaking(int ssock, int *csock,
453 int usocki, int usocko,
454 struct alsa_dev *dev)
456 ssize_t ret;
457 int recv_started = 0, nfds = 0, tmp, i;
458 struct pollfd *pfds = NULL;
459 char msg[MAX_MSG];
460 uint32_t send_seq = 0;
461 CELTMode *mode;
462 CELTEncoder *encoder;
463 CELTDecoder *decoder;
464 JitterBuffer *jitter;
465 struct sockaddr raddr;
466 struct transsip_hdr *thdr;
467 socklen_t raddrlen;
468 struct cli_pkt cpkt;
470 assert(ecurr.active == 1);
472 mode = celt_mode_create(SAMPLING_RATE, FRAME_SIZE, NULL);
473 encoder = celt_encoder_create(mode, CHANNELS, NULL);
474 decoder = celt_decoder_create(mode, CHANNELS, NULL);
476 jitter = jitter_buffer_init(FRAME_SIZE);
477 tmp = FRAME_SIZE;
478 jitter_buffer_ctl(jitter, JITTER_BUFFER_SET_MARGIN, &tmp);
480 nfds = alsa_nfds(dev);
481 pfds = xmalloc(sizeof(*pfds) * (nfds + 2));
483 alsa_getfds(dev, pfds, nfds);
485 pfds[nfds].fd = ecurr.sock;
486 pfds[nfds].events = POLLIN;
487 pfds[nfds + 1].fd = usocki;
488 pfds[nfds + 1].events = POLLIN;
490 alsa_start(dev);
492 while (likely(!quit)) {
493 poll(pfds, nfds + 2, -1);
495 if (pfds[nfds + 1].revents & POLLIN) {
496 ret = read(usocki, &cpkt, sizeof(cpkt));
497 if (ret <= 0) {
498 whine("Read error from cli!\n");
499 continue;
501 if (cpkt.fin) {
502 whine("You aborted call!\n");
503 goto out_err;
507 if (pfds[nfds].revents & POLLIN) {
508 JitterBufferPacket packet;
510 memset(msg, 0, sizeof(msg));
511 raddrlen = sizeof(raddr);
512 ret = recvfrom(ecurr.sock, msg, sizeof(msg), 0,
513 &raddr, &raddrlen);
514 if (unlikely(ret <= 0))
515 continue;
517 if (raddrlen != ecurr.addrlen ||
518 memcmp(&raddr, &ecurr.addr, raddrlen)) {
519 memset(msg, 0, sizeof(msg));
521 thdr = (struct transsip_hdr *) msg;
522 thdr->bsy = 1;
524 sendto(ecurr.sock, msg, sizeof(*thdr), 0,
525 &raddr, raddrlen);
527 goto out_alsa;
530 thdr = (struct transsip_hdr *) msg;
531 if (thdr->fin == 1) {
532 whine("Remote end hung up!\n");
533 goto out_err;
536 packet.data = msg + sizeof(*thdr);
537 packet.len = ret - sizeof(*thdr);
538 packet.timestamp = ntohl(thdr->seq);
539 packet.span = FRAME_SIZE;
540 packet.sequence = 0;
542 jitter_buffer_put(jitter, &packet);
543 recv_started = 1;
545 out_alsa:
546 if (alsa_play_ready(dev, pfds, nfds)) {
547 short pcm[FRAME_SIZE * CHANNELS];
549 if (recv_started) {
550 JitterBufferPacket packet;
552 memset(msg, 0, sizeof(msg));
553 packet.data = msg;
554 packet.len = MAX_MSG;
556 jitter_buffer_tick(jitter);
557 jitter_buffer_get(jitter, &packet,
558 FRAME_SIZE, NULL);
559 if (packet.len == 0)
560 packet.data = NULL;
562 celt_decode(decoder, (const unsigned char *)
563 packet.data, packet.len, pcm);
564 } else {
565 for (i = 0; i < FRAME_SIZE * CHANNELS; ++i)
566 pcm[i] = 0;
569 alsa_write(dev, pcm, FRAME_SIZE);
572 if (alsa_cap_ready(dev, pfds, nfds)) {
573 short pcm[FRAME_SIZE * CHANNELS];
575 alsa_read(dev, pcm, FRAME_SIZE);
577 memset(msg, 0, sizeof(msg));
578 thdr = (struct transsip_hdr *) msg;
580 celt_encode(encoder, pcm, NULL, (unsigned char *)
581 (msg + sizeof(*thdr)), PACKETSIZE);
583 thdr->psh = 1;
584 thdr->est = 1;
585 thdr->seq = htonl(send_seq);
586 send_seq += FRAME_SIZE;
588 ret = sendto(ecurr.sock, msg,
589 PACKETSIZE + sizeof(*thdr), 0,
590 &ecurr.addr, ecurr.addrlen);
591 if (ret <= 0) {
592 whine("Send datagram failed!\n");
593 goto out_err;
598 out_err:
599 alsa_stop(dev);
601 memset(msg, 0, sizeof(msg));
602 thdr = (struct transsip_hdr *) msg;
603 thdr->fin = 1;
605 sendto(ecurr.sock, msg, sizeof(*thdr), 0, &ecurr.addr,
606 ecurr.addrlen);
608 if (ecurr.sock == *csock) {
609 close(*csock);
610 *csock = 0;
613 celt_encoder_destroy(encoder);
614 celt_decoder_destroy(decoder);
615 celt_mode_destroy(mode);
617 jitter_buffer_destroy(jitter);
619 xfree(pfds);
621 ecurr.active = 0;
622 return ENGINE_STATE_IDLE;
625 static inline void engine_drop_from_queue(int sock)
627 char msg[MAX_MSG];
628 recv(sock, msg, sizeof(msg), 0);
631 static enum engine_state_num engine_do_idle(int ssock, int *csock, int usocki,
632 int usocko, struct alsa_dev *dev)
634 int i;
635 ssize_t ret;
636 struct pollfd fds[2];
637 char msg[MAX_MSG];
638 struct transsip_hdr *thdr;
639 struct cli_pkt cpkt;
641 assert(ecurr.active == 0);
643 fds[0].fd = ssock;
644 fds[0].events = POLLIN;
645 fds[1].fd = usocki;
646 fds[1].events = POLLIN;
648 while (likely(!quit)) {
649 memset(msg, 0, sizeof(msg));
651 poll(fds, array_size(fds), -1);
652 for (i = 0; i < array_size(fds); ++i) {
653 if ((fds[i].revents & POLLIN) != POLLIN)
654 continue;
656 if (fds[i].fd == ssock) {
657 ret = recv(ssock, msg, sizeof(msg), MSG_PEEK);
658 if (ret < 0)
659 continue;
660 if (ret < sizeof(struct transsip_hdr))
661 engine_drop_from_queue(ssock);
663 thdr = (struct transsip_hdr *) msg;
664 if (thdr->est == 1 && thdr->psh == 0)
665 return ENGINE_STATE_CALLIN;
666 else
667 engine_drop_from_queue(ssock);
670 if (fds[i].fd == usocki) {
671 ret = read(usocki, &cpkt, sizeof(cpkt));
672 if (ret <= 0) {
673 whine("Error reading from cli!\n");
674 continue;
676 if (cpkt.ring)
677 return ENGINE_STATE_CALLOUT;
682 return ENGINE_STATE_IDLE;
685 struct engine_state state_machine[__ENGINE_STATE_MAX] __read_mostly = {
686 STATE_MAP_SET(ENGINE_STATE_IDLE, engine_do_idle),
687 STATE_MAP_SET(ENGINE_STATE_CALLOUT, engine_do_callout),
688 STATE_MAP_SET(ENGINE_STATE_CALLIN, engine_do_callin),
689 STATE_MAP_SET(ENGINE_STATE_SPEAKING, engine_do_speaking),
692 void *engine_main(void *arg)
694 int ssock = -1, csock, ret, mtu, usocki, usocko;
695 enum engine_state_num state;
696 struct addrinfo hints, *ahead, *ai;
697 struct alsa_dev *dev = NULL;
698 struct pipepair *pp = arg;
700 init_call_notifier();
702 usocki = pp->i;
703 usocko = pp->o;
705 while (!stun_done)
706 sleep(0);
708 memset(&hints, 0, sizeof(hints));
709 hints.ai_family = PF_UNSPEC;
710 hints.ai_socktype = SOCK_DGRAM;
711 hints.ai_protocol = IPPROTO_UDP;
712 hints.ai_flags = AI_PASSIVE;
714 ret = getaddrinfo(NULL, port, &hints, &ahead);
715 if (ret < 0)
716 panic("Cannot get address info!\n");
718 for (ai = ahead; ai != NULL && ssock < 0; ai = ai->ai_next) {
719 ssock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
720 if (ssock < 0)
721 continue;
722 if (ai->ai_family == AF_INET6) {
723 int one = 1;
724 #ifdef IPV6_V6ONLY
725 ret = setsockopt(ssock, IPPROTO_IPV6, IPV6_V6ONLY,
726 &one, sizeof(one));
727 if (ret < 0) {
728 close(ssock);
729 ssock = -1;
730 continue;
732 #else
733 close(ssock);
734 ssock = -1;
735 continue;
736 #endif /* IPV6_V6ONLY */
739 mtu = IP_PMTUDISC_DONT;
740 setsockopt(ssock, SOL_IP, IP_MTU_DISCOVER, &mtu, sizeof(mtu));
742 ret = bind(ssock, ai->ai_addr, ai->ai_addrlen);
743 if (ret < 0) {
744 close(ssock);
745 ssock = -1;
746 continue;
750 freeaddrinfo(ahead);
751 if (ssock < 0)
752 panic("Cannot open socket!\n");
754 dev = alsa_open(alsadev, SAMPLING_RATE, CHANNELS, FRAME_SIZE);
755 if (!dev)
756 panic("Cannot open ALSA device %s!\n", alsadev);
758 state = ENGINE_STATE_IDLE;
759 while (likely(!quit)) {
760 int arg = state;
761 call_notifier_exec(CALL_STATE_MACHINE_CHANGED, &arg);
762 state = state_machine[state].process(ssock, &csock,
763 usocki, usocko,
764 dev);
767 alsa_close(dev);
768 close(ssock);
770 pthread_exit(0);