Merge pull request #11270 from haslinghuis/rename_attr
[betaflight.git] / lib / main / dyad / dyad.c
blobbb7f2cbbdc385730ad62f9265f52327bbfa0e7ca
1 /**
2 * Copyright (c) 2016 rxi
4 * This library is free software; you can redistribute it and/or modify it
5 * under the terms of the MIT license. See LICENSE for details.
6 */
8 #if __GNUC__ > 6
9 #define FALLTHROUGH __attribute__ ((fallthrough))
10 #else
11 #define FALLTHROUGH do {} while(0)
12 #endif
14 #ifdef _WIN32
15 #define _WIN32_WINNT 0x501
16 #ifndef _CRT_SECURE_NO_WARNINGS
17 #define _CRT_SECURE_NO_WARNINGS
18 #endif
19 #include <winsock2.h>
20 #include <ws2tcpip.h>
21 #include <windows.h>
22 #else
23 #define _POSIX_C_SOURCE 200809L
24 #ifdef __APPLE__
25 #define _DARWIN_UNLIMITED_SELECT
26 #endif
27 #include <unistd.h>
28 #include <netdb.h>
29 #include <fcntl.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <sys/time.h>
33 #include <netinet/in.h>
34 #include <netinet/tcp.h>
35 #include <arpa/inet.h>
36 #endif
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40 #include <stdarg.h>
41 #include <signal.h>
42 #include <errno.h>
43 #include <limits.h>
45 #include "dyad.h"
47 #define DYAD_VERSION "0.2.1"
50 #ifdef _WIN32
51 #define close(a) closesocket(a)
52 #define getsockopt(a,b,c,d,e) getsockopt((a),(b),(c),(char*)(d),(e))
53 #define setsockopt(a,b,c,d,e) setsockopt((a),(b),(c),(char*)(d),(e))
54 #define select(a,b,c,d,e) select((int)(a),(b),(c),(d),(e))
55 #define bind(a,b,c) bind((a),(b),(int)(c))
56 #define connect(a,b,c) connect((a),(b),(int)(c))
58 #undef errno
59 #define errno WSAGetLastError()
61 #undef EWOULDBLOCK
62 #define EWOULDBLOCK WSAEWOULDBLOCK
64 const char *inet_ntop(int af, const void *src, char *dst, socklen_t size) {
65 union { struct sockaddr sa; struct sockaddr_in sai;
66 struct sockaddr_in6 sai6; } addr;
67 int res;
68 memset(&addr, 0, sizeof(addr));
69 addr.sa.sa_family = af;
70 if (af == AF_INET6) {
71 memcpy(&addr.sai6.sin6_addr, src, sizeof(addr.sai6.sin6_addr));
72 } else {
73 memcpy(&addr.sai.sin_addr, src, sizeof(addr.sai.sin_addr));
75 res = WSAAddressToStringA(&addr.sa, sizeof(addr), 0, dst, (LPDWORD) &size);
76 if (res != 0) return NULL;
77 return dst;
79 #endif
81 #ifndef INVALID_SOCKET
82 #define INVALID_SOCKET -1
83 #endif
86 /*===========================================================================*/
87 /* Memory */
88 /*===========================================================================*/
90 static void panic(const char *fmt, ...);
92 static void *dyad_realloc(void *ptr, int n) {
93 ptr = realloc(ptr, n);
94 if (!ptr && n != 0) {
95 panic("out of memory");
97 return ptr;
101 static void dyad_free(void *ptr) {
102 free(ptr);
106 /*===========================================================================*/
107 /* Vec (dynamic array) */
108 /*===========================================================================*/
110 static void vec_expand(char **data, int *length, int *capacity, int memsz) {
111 if (*length + 1 > *capacity) {
112 if (*capacity == 0) {
113 *capacity = 1;
114 } else {
115 *capacity <<= 1;
117 *data = dyad_realloc(*data, *capacity * memsz);
121 static void vec_splice(
122 char **data, int *length, int *capacity, int memsz, int start, int count
124 (void) capacity;
125 memmove(*data + start * memsz,
126 *data + (start + count) * memsz,
127 (*length - start - count) * memsz);
131 #define Vec(T)\
132 struct { T *data; int length, capacity; }
135 #define vec_unpack(v)\
136 (char**)&(v)->data, &(v)->length, &(v)->capacity, sizeof(*(v)->data)
139 #define vec_init(v)\
140 memset((v), 0, sizeof(*(v)))
143 #define vec_deinit(v)\
144 dyad_free((v)->data)
147 #define vec_clear(v)\
148 ((v)->length = 0)
151 #define vec_push(v, val)\
152 ( vec_expand(vec_unpack(v)),\
153 (v)->data[(v)->length++] = (val) )
156 #define vec_splice(v, start, count)\
157 ( vec_splice(vec_unpack(v), start, count),\
158 (v)->length -= (count) )
162 /*===========================================================================*/
163 /* SelectSet */
164 /*===========================================================================*/
166 /* A wrapper around the three fd_sets used for select(). The fd_sets' allocated
167 * memory is automatically expanded to accommodate fds as they are added.
169 * On Windows fd_sets are implemented as arrays; the FD_xxx macros are not used
170 * by the wrapper and instead the fd_set struct is manipulated directly. The
171 * wrapper should perform better than the normal FD_xxx macros, given that we
172 * don't bother with the linear search which FD_SET would perform to check for
173 * duplicates.
175 * On non-Windows platforms the sets are assumed to be bit arrays. The FD_xxx
176 * macros are not used in case their implementation attempts to do bounds
177 * checking; instead we manipulate the fd_sets' bits directly.
180 enum {
181 SELECT_READ,
182 SELECT_WRITE,
183 SELECT_EXCEPT,
184 SELECT_MAX
187 typedef struct {
188 int capacity;
189 dyad_Socket maxfd;
190 fd_set *fds[SELECT_MAX];
191 } SelectSet;
193 #define DYAD_UNSIGNED_BIT (sizeof(unsigned) * CHAR_BIT)
196 static void select_deinit(SelectSet *s) {
197 int i;
198 for (i = 0; i < SELECT_MAX; i++) {
199 dyad_free(s->fds[i]);
200 s->fds[i] = NULL;
202 s->capacity = 0;
206 static void select_grow(SelectSet *s) {
207 int i;
208 int oldCapacity = s->capacity;
209 s->capacity = s->capacity ? s->capacity << 1 : 1;
210 for (i = 0; i < SELECT_MAX; i++) {
211 s->fds[i] = dyad_realloc(s->fds[i], s->capacity * sizeof(fd_set));
212 memset(s->fds[i] + oldCapacity, 0,
213 (s->capacity - oldCapacity) * sizeof(fd_set));
218 static void select_zero(SelectSet *s) {
219 int i;
220 if (s->capacity == 0) return;
221 s->maxfd = 0;
222 for (i = 0; i < SELECT_MAX; i++) {
223 #if _WIN32
224 s->fds[i]->fd_count = 0;
225 #else
226 memset(s->fds[i], 0, s->capacity * sizeof(fd_set));
227 #endif
232 static void select_add(SelectSet *s, int set, dyad_Socket fd) {
233 #ifdef _WIN32
234 fd_set *f;
235 if (s->capacity == 0) select_grow(s);
236 while ((unsigned) (s->capacity * FD_SETSIZE) < s->fds[set]->fd_count + 1) {
237 select_grow(s);
239 f = s->fds[set];
240 f->fd_array[f->fd_count++] = fd;
241 #else
242 unsigned *p;
243 while (s->capacity * FD_SETSIZE < fd) {
244 select_grow(s);
246 p = (unsigned*) s->fds[set];
247 p[fd / DYAD_UNSIGNED_BIT] |= 1 << (fd % DYAD_UNSIGNED_BIT);
248 if (fd > s->maxfd) s->maxfd = fd;
249 #endif
253 static int select_has(SelectSet *s, int set, dyad_Socket fd) {
254 #ifdef _WIN32
255 unsigned i;
256 fd_set *f;
257 if (s->capacity == 0) return 0;
258 f = s->fds[set];
259 for (i = 0; i < f->fd_count; i++) {
260 if (f->fd_array[i] == fd) {
261 return 1;
264 return 0;
265 #else
266 unsigned *p;
267 if (s->maxfd < fd) return 0;
268 p = (unsigned*) s->fds[set];
269 return p[fd / DYAD_UNSIGNED_BIT] & (1 << (fd % DYAD_UNSIGNED_BIT));
270 #endif
274 /*===========================================================================*/
275 /* Core */
276 /*===========================================================================*/
278 typedef struct {
279 int event;
280 dyad_Callback callback;
281 void *udata;
282 } Listener;
285 struct dyad_Stream {
286 int state, flags;
287 dyad_Socket sockfd;
288 char *address;
289 int port;
290 int bytesSent, bytesReceived;
291 double lastActivity, timeout;
292 Vec(Listener) listeners;
293 Vec(char) lineBuffer;
294 Vec(char) writeBuffer;
295 dyad_Stream *next;
298 #define DYAD_FLAG_READY (1 << 0)
299 #define DYAD_FLAG_WRITTEN (1 << 1)
302 static dyad_Stream *dyad_streams;
303 static int dyad_streamCount;
304 static char dyad_panicMsgBuffer[128];
305 static dyad_PanicCallback panicCallback;
306 static SelectSet dyad_selectSet;
307 static double dyad_updateTimeout = 1;
308 static double dyad_tickInterval = 1;
309 static double dyad_lastTick = 0;
312 static void panic(const char *fmt, ...) {
313 va_list args;
314 va_start(args, fmt);
315 vsprintf(dyad_panicMsgBuffer, fmt, args);
316 va_end(args);
317 if (panicCallback) {
318 panicCallback(dyad_panicMsgBuffer);
319 } else {
320 printf("dyad panic: %s\n", dyad_panicMsgBuffer);
322 exit(EXIT_FAILURE);
326 static dyad_Event createEvent(int type) {
327 dyad_Event e;
328 memset(&e, 0, sizeof(e));
329 e.type = type;
330 return e;
334 static void stream_destroy(dyad_Stream *stream);
336 static void destroyClosedStreams(void) {
337 dyad_Stream *stream = dyad_streams;
338 while (stream) {
339 if (stream->state == DYAD_STATE_CLOSED) {
340 dyad_Stream *next = stream->next;
341 stream_destroy(stream);
342 stream = next;
343 } else {
344 stream = stream->next;
350 static void stream_emitEvent(dyad_Stream *stream, dyad_Event *e);
352 static void updateTickTimer(void) {
353 /* Update tick timer */
354 if (dyad_lastTick == 0) {
355 dyad_lastTick = dyad_getTime();
357 while (dyad_lastTick < dyad_getTime()) {
358 /* Emit event on all streams */
359 dyad_Stream *stream;
360 dyad_Event e = createEvent(DYAD_EVENT_TICK);
361 e.msg = "a tick has occured";
362 stream = dyad_streams;
363 while (stream) {
364 stream_emitEvent(stream, &e);
365 stream = stream->next;
367 dyad_lastTick += dyad_tickInterval;
372 static void updateStreamTimeouts(void) {
373 double currentTime = dyad_getTime();
374 dyad_Stream *stream;
375 dyad_Event e = createEvent(DYAD_EVENT_TIMEOUT);
376 e.msg = "stream timed out";
377 stream = dyad_streams;
378 while (stream) {
379 if (stream->timeout) {
380 if (currentTime - stream->lastActivity > stream->timeout) {
381 stream_emitEvent(stream, &e);
382 dyad_close(stream);
385 stream = stream->next;
391 /*===========================================================================*/
392 /* Stream */
393 /*===========================================================================*/
395 static void stream_destroy(dyad_Stream *stream) {
396 dyad_Event e;
397 dyad_Stream **next;
398 /* Close socket */
399 if (stream->sockfd != INVALID_SOCKET) {
400 close(stream->sockfd);
402 /* Emit destroy event */
403 e = createEvent(DYAD_EVENT_DESTROY);
404 e.msg = "the stream has been destroyed";
405 stream_emitEvent(stream, &e);
406 /* Remove from list and decrement count */
407 next = &dyad_streams;
408 while (*next != stream) {
409 next = &(*next)->next;
411 *next = stream->next;
412 dyad_streamCount--;
413 /* Destroy and free */
414 vec_deinit(&stream->listeners);
415 vec_deinit(&stream->lineBuffer);
416 vec_deinit(&stream->writeBuffer);
417 dyad_free(stream->address);
418 dyad_free(stream);
422 static void stream_emitEvent(dyad_Stream *stream, dyad_Event *e) {
423 int i;
424 e->stream = stream;
425 for (i = 0; i < stream->listeners.length; i++) {
426 Listener *listener = &stream->listeners.data[i];
427 if (listener->event == e->type) {
428 e->udata = listener->udata;
429 listener->callback(e);
431 /* Check to see if this listener was removed: If it was we decrement `i`
432 * since the next listener will now be in this ones place */
433 if (listener != &stream->listeners.data[i]) {
434 i--;
440 static void stream_error(dyad_Stream *stream, const char *msg, int err) {
441 char buf[256];
442 dyad_Event e = createEvent(DYAD_EVENT_ERROR);
443 if (err) {
444 sprintf(buf, "%.160s (%.80s)", msg, strerror(err));
445 e.msg = buf;
446 } else {
447 e.msg = msg;
449 stream_emitEvent(stream, &e);
450 dyad_close(stream);
454 static void stream_initAddress(dyad_Stream *stream) {
455 union { struct sockaddr sa; struct sockaddr_storage sas;
456 struct sockaddr_in sai; struct sockaddr_in6 sai6; } addr;
457 socklen_t size;
458 memset(&addr, 0, sizeof(addr));
459 size = sizeof(addr);
460 dyad_free(stream->address);
461 stream->address = NULL;
462 if (getpeername(stream->sockfd, &addr.sa, &size) == -1) {
463 if (getsockname(stream->sockfd, &addr.sa, &size) == -1) {
464 return;
467 if (addr.sas.ss_family == AF_INET6) {
468 stream->address = dyad_realloc(NULL, INET6_ADDRSTRLEN);
469 inet_ntop(AF_INET6, &addr.sai6.sin6_addr, stream->address,
470 INET6_ADDRSTRLEN);
471 stream->port = ntohs(addr.sai6.sin6_port);
472 } else {
473 stream->address = dyad_realloc(NULL, INET_ADDRSTRLEN);
474 inet_ntop(AF_INET, &addr.sai.sin_addr, stream->address, INET_ADDRSTRLEN);
475 stream->port = ntohs(addr.sai.sin_port);
480 static void stream_setSocketNonBlocking(dyad_Stream *stream, int opt) {
481 #ifdef _WIN32
482 u_long mode = opt;
483 ioctlsocket(stream->sockfd, FIONBIO, &mode);
484 #else
485 int flags = fcntl(stream->sockfd, F_GETFL);
486 fcntl(stream->sockfd, F_SETFL,
487 opt ? (flags | O_NONBLOCK) : (flags & ~O_NONBLOCK));
488 #endif
492 static void stream_setSocket(dyad_Stream *stream, dyad_Socket sockfd) {
493 stream->sockfd = sockfd;
494 stream_setSocketNonBlocking(stream, 1);
495 stream_initAddress(stream);
499 static int stream_initSocket(
500 dyad_Stream *stream, int domain, int type, int protocol
502 stream->sockfd = socket(domain, type, protocol);
503 if (stream->sockfd == INVALID_SOCKET) {
504 stream_error(stream, "could not create socket", errno);
505 return -1;
507 stream_setSocket(stream, stream->sockfd);
508 return 0;
512 static int stream_hasListenerForEvent(dyad_Stream *stream, int event) {
513 int i;
514 for (i = 0; i < stream->listeners.length; i++) {
515 Listener *listener = &stream->listeners.data[i];
516 if (listener->event == event) {
517 return 1;
520 return 0;
524 static void stream_handleReceivedData(dyad_Stream *stream) {
525 for (;;) {
526 /* Receive data */
527 dyad_Event e;
528 char data[8192];
529 int size = recv(stream->sockfd, data, sizeof(data) - 1, 0);
530 if (size <= 0) {
531 if (size == 0 || errno != EWOULDBLOCK) {
532 /* Handle disconnect */
533 dyad_close(stream);
534 return;
535 } else {
536 /* No more data */
537 return;
540 data[size] = 0;
541 /* Update status */
542 stream->bytesReceived += size;
543 stream->lastActivity = dyad_getTime();
544 /* Emit data event */
545 e = createEvent(DYAD_EVENT_DATA);
546 e.msg = "received data";
547 e.data = data;
548 e.size = size;
549 stream_emitEvent(stream, &e);
550 /* Check stream state in case it was closed during one of the data event
551 * handlers. */
552 if (stream->state != DYAD_STATE_CONNECTED) {
553 return;
556 /* Handle line event */
557 if (stream_hasListenerForEvent(stream, DYAD_EVENT_LINE)) {
558 int i, start;
559 char *buf;
560 for (i = 0; i < size; i++) {
561 vec_push(&stream->lineBuffer, data[i]);
563 start = 0;
564 buf = stream->lineBuffer.data;
565 for (i = 0; i < stream->lineBuffer.length; i++) {
566 if (buf[i] == '\n') {
567 dyad_Event e;
568 buf[i] = '\0';
569 e = createEvent(DYAD_EVENT_LINE);
570 e.msg = "received line";
571 e.data = &buf[start];
572 e.size = i - start;
573 /* Check and strip carriage return */
574 if (e.size > 0 && e.data[e.size - 1] == '\r') {
575 e.data[--e.size] = '\0';
577 stream_emitEvent(stream, &e);
578 start = i + 1;
579 /* Check stream state in case it was closed during one of the line
580 * event handlers. */
581 if (stream->state != DYAD_STATE_CONNECTED) {
582 return;
586 if (start == stream->lineBuffer.length) {
587 vec_clear(&stream->lineBuffer);
588 } else {
589 vec_splice(&stream->lineBuffer, 0, start);
596 static void stream_acceptPendingConnections(dyad_Stream *stream) {
597 for (;;) {
598 dyad_Stream *remote;
599 dyad_Event e;
600 int err = 0;
601 dyad_Socket sockfd = accept(stream->sockfd, NULL, NULL);
602 if (sockfd == INVALID_SOCKET) {
603 err = errno;
604 if (err == EWOULDBLOCK) {
605 /* No more waiting sockets */
606 return;
609 /* Create client stream */
610 remote = dyad_newStream();
611 remote->state = DYAD_STATE_CONNECTED;
612 /* Set stream's socket */
613 stream_setSocket(remote, sockfd);
614 /* Emit accept event */
615 e = createEvent(DYAD_EVENT_ACCEPT);
616 e.msg = "accepted connection";
617 e.remote = remote;
618 stream_emitEvent(stream, &e);
619 /* Handle invalid socket -- the stream is still made and the ACCEPT event
620 * is still emitted, but its shut immediately with an error */
621 if (remote->sockfd == INVALID_SOCKET) {
622 stream_error(remote, "failed to create socket on accept", err);
623 return;
629 static int stream_flushWriteBuffer(dyad_Stream *stream) {
630 stream->flags &= ~DYAD_FLAG_WRITTEN;
631 if (stream->writeBuffer.length > 0) {
632 /* Send data */
633 int size = send(stream->sockfd, stream->writeBuffer.data,
634 stream->writeBuffer.length, 0);
635 if (size <= 0) {
636 if (errno == EWOULDBLOCK) {
637 /* No more data can be written */
638 return 0;
639 } else {
640 /* Handle disconnect */
641 dyad_close(stream);
642 return 0;
645 if (size == stream->writeBuffer.length) {
646 vec_clear(&stream->writeBuffer);
647 } else {
648 vec_splice(&stream->writeBuffer, 0, size);
650 /* Update status */
651 stream->bytesSent += size;
652 stream->lastActivity = dyad_getTime();
655 if (stream->writeBuffer.length == 0) {
656 dyad_Event e;
657 /* If this is a 'closing' stream we can properly close it now */
658 if (stream->state == DYAD_STATE_CLOSING) {
659 dyad_close(stream);
660 return 0;
662 /* Set ready flag and emit 'ready for data' event */
663 stream->flags |= DYAD_FLAG_READY;
664 e = createEvent(DYAD_EVENT_READY);
665 e.msg = "stream is ready for more data";
666 stream_emitEvent(stream, &e);
668 /* Return 1 to indicate that more data can immediately be written to the
669 * stream's socket */
670 return 1;
675 /*===========================================================================*/
676 /* API */
677 /*===========================================================================*/
679 /*---------------------------------------------------------------------------*/
680 /* Core */
681 /*---------------------------------------------------------------------------*/
683 void dyad_update(void) {
684 dyad_Stream *stream;
685 struct timeval tv;
687 destroyClosedStreams();
688 updateTickTimer();
689 updateStreamTimeouts();
691 /* Create fd sets for select() */
692 select_zero(&dyad_selectSet);
694 stream = dyad_streams;
695 while (stream) {
696 switch (stream->state) {
697 case DYAD_STATE_CONNECTED:
698 select_add(&dyad_selectSet, SELECT_READ, stream->sockfd);
699 if (!(stream->flags & DYAD_FLAG_READY) ||
700 stream->writeBuffer.length != 0
702 select_add(&dyad_selectSet, SELECT_WRITE, stream->sockfd);
704 break;
705 case DYAD_STATE_CLOSING:
706 select_add(&dyad_selectSet, SELECT_WRITE, stream->sockfd);
707 break;
708 case DYAD_STATE_CONNECTING:
709 select_add(&dyad_selectSet, SELECT_WRITE, stream->sockfd);
710 select_add(&dyad_selectSet, SELECT_EXCEPT, stream->sockfd);
711 break;
712 case DYAD_STATE_LISTENING:
713 select_add(&dyad_selectSet, SELECT_READ, stream->sockfd);
714 break;
716 stream = stream->next;
719 /* Init timeout value and do select */
720 #ifdef _MSC_VER
721 #pragma warning(push)
722 /* Disable double to long implicit conversion warning,
723 * because the type of timeval's fields don't agree across platforms */
724 #pragma warning(disable: 4244)
725 #endif
726 tv.tv_sec = dyad_updateTimeout;
727 tv.tv_usec = (dyad_updateTimeout - tv.tv_sec) * 1e6;
728 #ifdef _MSC_VER
729 #pragma warning(pop)
730 #endif
732 select(dyad_selectSet.maxfd + 1,
733 dyad_selectSet.fds[SELECT_READ],
734 dyad_selectSet.fds[SELECT_WRITE],
735 dyad_selectSet.fds[SELECT_EXCEPT],
736 &tv);
738 /* Handle streams */
739 stream = dyad_streams;
740 while (stream) {
741 switch (stream->state) {
743 case DYAD_STATE_CONNECTED:
744 if (select_has(&dyad_selectSet, SELECT_READ, stream->sockfd)) {
745 stream_handleReceivedData(stream);
746 if (stream->state == DYAD_STATE_CLOSED) {
747 break;
751 FALLTHROUGH;
752 case DYAD_STATE_CLOSING:
753 if (select_has(&dyad_selectSet, SELECT_WRITE, stream->sockfd)) {
754 stream_flushWriteBuffer(stream);
756 break;
758 case DYAD_STATE_CONNECTING:
759 if (select_has(&dyad_selectSet, SELECT_WRITE, stream->sockfd)) {
760 /* Check socket for error */
761 int optval = 0;
762 socklen_t optlen = sizeof(optval);
763 dyad_Event e;
764 getsockopt(stream->sockfd, SOL_SOCKET, SO_ERROR, &optval, &optlen);
765 if (optval != 0) goto connectFailed;
766 /* Handle succeselful connection */
767 stream->state = DYAD_STATE_CONNECTED;
768 stream->lastActivity = dyad_getTime();
769 stream_initAddress(stream);
770 /* Emit connect event */
771 e = createEvent(DYAD_EVENT_CONNECT);
772 e.msg = "connected to server";
773 stream_emitEvent(stream, &e);
774 } else if (
775 select_has(&dyad_selectSet, SELECT_EXCEPT, stream->sockfd)
777 /* Handle failed connection */
778 connectFailed:
779 stream_error(stream, "could not connect to server", 0);
781 break;
783 case DYAD_STATE_LISTENING:
784 if (select_has(&dyad_selectSet, SELECT_READ, stream->sockfd)) {
785 stream_acceptPendingConnections(stream);
787 break;
790 /* If data was just now written to the stream we should immediately try to
791 * send it */
792 if (
793 stream->flags & DYAD_FLAG_WRITTEN &&
794 stream->state != DYAD_STATE_CLOSED
796 stream_flushWriteBuffer(stream);
799 stream = stream->next;
804 void dyad_init(void) {
805 #ifdef _WIN32
806 WSADATA dat;
807 int err = WSAStartup(MAKEWORD(2, 2), &dat);
808 if (err != 0) {
809 panic("WSAStartup failed (%d)", err);
811 #else
812 /* Stops the SIGPIPE signal being raised when writing to a closed socket */
813 signal(SIGPIPE, SIG_IGN);
814 #endif
818 void dyad_shutdown(void) {
819 /* Close and destroy all the streams */
820 while (dyad_streams) {
821 dyad_close(dyad_streams);
822 stream_destroy(dyad_streams);
824 /* Clear up everything */
825 select_deinit(&dyad_selectSet);
826 #ifdef _WIN32
827 WSACleanup();
828 #endif
832 const char *dyad_getVersion(void) {
833 return DYAD_VERSION;
837 double dyad_getTime(void) {
838 #ifdef _WIN32
839 FILETIME ft;
840 GetSystemTimeAsFileTime(&ft);
841 return (ft.dwHighDateTime * 4294967296.0 / 1e7) + ft.dwLowDateTime / 1e7;
842 #else
843 struct timeval tv;
844 gettimeofday(&tv, NULL);
845 return tv.tv_sec + tv.tv_usec / 1e6;
846 #endif
850 int dyad_getStreamCount(void) {
851 return dyad_streamCount;
855 void dyad_setTickInterval(double seconds) {
856 dyad_tickInterval = seconds;
860 void dyad_setUpdateTimeout(double seconds) {
861 dyad_updateTimeout = seconds;
865 dyad_PanicCallback dyad_atPanic(dyad_PanicCallback func) {
866 dyad_PanicCallback old = panicCallback;
867 panicCallback = func;
868 return old;
872 /*---------------------------------------------------------------------------*/
873 /* Stream */
874 /*---------------------------------------------------------------------------*/
876 dyad_Stream *dyad_newStream(void) {
877 dyad_Stream *stream = dyad_realloc(NULL, sizeof(*stream));
878 memset(stream, 0, sizeof(*stream));
879 stream->state = DYAD_STATE_CLOSED;
880 stream->sockfd = INVALID_SOCKET;
881 stream->lastActivity = dyad_getTime();
882 /* Add to list and increment count */
883 stream->next = dyad_streams;
884 dyad_streams = stream;
885 dyad_streamCount++;
886 return stream;
890 void dyad_addListener(
891 dyad_Stream *stream, int event, dyad_Callback callback, void *udata
893 Listener listener;
894 listener.event = event;
895 listener.callback = callback;
896 listener.udata = udata;
897 vec_push(&stream->listeners, listener);
901 void dyad_removeListener(
902 dyad_Stream *stream, int event, dyad_Callback callback, void *udata
904 int i = stream->listeners.length;
905 while (i--) {
906 Listener *x = &stream->listeners.data[i];
907 if (x->event == event && x->callback == callback && x->udata == udata) {
908 vec_splice(&stream->listeners, i, 1);
914 void dyad_removeAllListeners(dyad_Stream *stream, int event) {
915 if (event == DYAD_EVENT_NULL) {
916 vec_clear(&stream->listeners);
917 } else {
918 int i = stream->listeners.length;
919 while (i--) {
920 if (stream->listeners.data[i].event == event) {
921 vec_splice(&stream->listeners, i, 1);
928 void dyad_close(dyad_Stream *stream) {
929 dyad_Event e;
930 if (stream->state == DYAD_STATE_CLOSED) return;
931 stream->state = DYAD_STATE_CLOSED;
932 /* Close socket */
933 if (stream->sockfd != INVALID_SOCKET) {
934 close(stream->sockfd);
935 stream->sockfd = INVALID_SOCKET;
937 /* Emit event */
938 e = createEvent(DYAD_EVENT_CLOSE);
939 e.msg = "stream closed";
940 stream_emitEvent(stream, &e);
941 /* Clear buffers */
942 vec_clear(&stream->lineBuffer);
943 vec_clear(&stream->writeBuffer);
947 void dyad_end(dyad_Stream *stream) {
948 if (stream->state == DYAD_STATE_CLOSED) return;
949 if (stream->writeBuffer.length > 0) {
950 stream->state = DYAD_STATE_CLOSING;
951 } else {
952 dyad_close(stream);
957 int dyad_listenEx(
958 dyad_Stream *stream, const char *host, int port, int backlog
960 struct addrinfo hints, *ai = NULL;
961 int err, optval;
962 char buf[64];
963 dyad_Event e;
965 /* Get addrinfo */
966 memset(&hints, 0, sizeof(hints));
967 hints.ai_family = AF_UNSPEC;
968 hints.ai_socktype = SOCK_STREAM;
969 hints.ai_flags = AI_PASSIVE;
970 sprintf(buf, "%d", port);
971 err = getaddrinfo(host, buf, &hints, &ai);
972 if (err) {
973 stream_error(stream, "could not get addrinfo", errno);
974 goto fail;
976 /* Init socket */
977 err = stream_initSocket(stream, ai->ai_family, ai->ai_socktype,
978 ai->ai_protocol);
979 if (err) goto fail;
980 /* Set SO_REUSEADDR so that the socket can be immediately bound without
981 * having to wait for any closed socket on the same port to timeout */
982 optval = 1;
983 setsockopt(stream->sockfd, SOL_SOCKET, SO_REUSEADDR,
984 &optval, sizeof(optval));
985 /* Bind and listen */
986 err = bind(stream->sockfd, ai->ai_addr, ai->ai_addrlen);
987 if (err) {
988 stream_error(stream, "could not bind socket", errno);
989 goto fail;
991 err = listen(stream->sockfd, backlog);
992 if (err) {
993 stream_error(stream, "socket failed on listen", errno);
994 goto fail;
996 stream->state = DYAD_STATE_LISTENING;
997 stream->port = port;
998 stream_initAddress(stream);
999 /* Emit listening event */
1000 e = createEvent(DYAD_EVENT_LISTEN);
1001 e.msg = "socket is listening";
1002 stream_emitEvent(stream, &e);
1003 freeaddrinfo(ai);
1004 return 0;
1005 fail:
1006 if (ai) freeaddrinfo(ai);
1007 return -1;
1011 int dyad_listen(dyad_Stream *stream, int port) {
1012 return dyad_listenEx(stream, NULL, port, 511);
1016 int dyad_connect(dyad_Stream *stream, const char *host, int port) {
1017 struct addrinfo hints, *ai = NULL;
1018 int err;
1019 char buf[64];
1021 /* Resolve host */
1022 memset(&hints, 0, sizeof(hints));
1023 hints.ai_family = AF_UNSPEC;
1024 hints.ai_socktype = SOCK_STREAM;
1025 sprintf(buf, "%d", port);
1026 err = getaddrinfo(host, buf, &hints, &ai);
1027 if (err) {
1028 stream_error(stream, "could not resolve host", 0);
1029 goto fail;
1031 /* Start connecting */
1032 err = stream_initSocket(stream, ai->ai_family, ai->ai_socktype,
1033 ai->ai_protocol);
1034 if (err) goto fail;
1035 connect(stream->sockfd, ai->ai_addr, ai->ai_addrlen);
1036 stream->state = DYAD_STATE_CONNECTING;
1037 freeaddrinfo(ai);
1038 return 0;
1039 fail:
1040 if (ai) freeaddrinfo(ai);
1041 return -1;
1045 void dyad_write(dyad_Stream *stream, const void *data, int size) {
1046 const char *p = data;
1047 while (size--) {
1048 vec_push(&stream->writeBuffer, *p++);
1050 stream->flags |= DYAD_FLAG_WRITTEN;
1054 void dyad_vwritef(dyad_Stream *stream, const char *fmt, va_list args) {
1055 char buf[512];
1056 char *str;
1057 char f[] = "%_";
1058 FILE *fp;
1059 int c;
1060 while (*fmt) {
1061 if (*fmt == '%') {
1062 fmt++;
1063 switch (*fmt) {
1064 case 'r':
1065 fp = va_arg(args, FILE*);
1066 if (fp == NULL) {
1067 str = "(null)";
1068 goto writeStr;
1070 while ((c = fgetc(fp)) != EOF) {
1071 vec_push(&stream->writeBuffer, c);
1073 break;
1074 case 'c':
1075 vec_push(&stream->writeBuffer, va_arg(args, int));
1076 break;
1077 case 's':
1078 str = va_arg(args, char*);
1079 if (str == NULL) str = "(null)";
1080 writeStr:
1081 while (*str) {
1082 vec_push(&stream->writeBuffer, *str++);
1084 break;
1085 case 'b':
1086 str = va_arg(args, char*);
1087 c = va_arg(args, int);
1088 while (c--) {
1089 vec_push(&stream->writeBuffer, *str++);
1091 break;
1092 default:
1093 f[1] = *fmt;
1094 switch (*fmt) {
1095 case 'f':
1096 case 'g': sprintf(buf, f, va_arg(args, double)); break;
1097 case 'd':
1098 case 'i': sprintf(buf, f, va_arg(args, int)); break;
1099 case 'x':
1100 case 'X': sprintf(buf, f, va_arg(args, unsigned)); break;
1101 case 'p': sprintf(buf, f, va_arg(args, void*)); break;
1102 default : buf[0] = *fmt; buf[1] = '\0';
1104 str = buf;
1105 goto writeStr;
1107 } else {
1108 vec_push(&stream->writeBuffer, *fmt);
1110 fmt++;
1112 stream->flags |= DYAD_FLAG_WRITTEN;
1116 void dyad_writef(dyad_Stream *stream, const char *fmt, ...) {
1117 va_list args;
1118 va_start(args, fmt);
1119 dyad_vwritef(stream, fmt, args);
1120 va_end(args);
1124 void dyad_setTimeout(dyad_Stream *stream, double seconds) {
1125 stream->timeout = seconds;
1129 void dyad_setNoDelay(dyad_Stream *stream, int opt) {
1130 opt = !!opt;
1131 setsockopt(stream->sockfd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt));
1135 int dyad_getState(dyad_Stream *stream) {
1136 return stream->state;
1140 const char *dyad_getAddress(dyad_Stream *stream) {
1141 return stream->address ? stream->address : "";
1145 int dyad_getPort(dyad_Stream *stream) {
1146 return stream->port;
1150 int dyad_getBytesSent(dyad_Stream *stream) {
1151 return stream->bytesSent;
1155 int dyad_getBytesReceived(dyad_Stream *stream) {
1156 return stream->bytesReceived;
1160 dyad_Socket dyad_getSocket(dyad_Stream *stream) {
1161 return stream->sockfd;