2 * Copyright (c) 2016 rxi
4 * This library is free software; you can redistribute it and/or modify it
5 * under the terms of the MIT license. See LICENSE for details.
9 #define FALLTHROUGH __attribute__ ((fallthrough))
11 #define FALLTHROUGH do {} while(0)
15 #define _WIN32_WINNT 0x501
16 #ifndef _CRT_SECURE_NO_WARNINGS
17 #define _CRT_SECURE_NO_WARNINGS
23 #define _POSIX_C_SOURCE 200809L
25 #define _DARWIN_UNLIMITED_SELECT
30 #include <sys/types.h>
31 #include <sys/socket.h>
33 #include <netinet/in.h>
34 #include <netinet/tcp.h>
35 #include <arpa/inet.h>
47 #define DYAD_VERSION "0.2.1"
51 #define close(a) closesocket(a)
52 #define getsockopt(a,b,c,d,e) getsockopt((a),(b),(c),(char*)(d),(e))
53 #define setsockopt(a,b,c,d,e) setsockopt((a),(b),(c),(char*)(d),(e))
54 #define select(a,b,c,d,e) select((int)(a),(b),(c),(d),(e))
55 #define bind(a,b,c) bind((a),(b),(int)(c))
56 #define connect(a,b,c) connect((a),(b),(int)(c))
59 #define errno WSAGetLastError()
62 #define EWOULDBLOCK WSAEWOULDBLOCK
64 const char *inet_ntop(int af
, const void *src
, char *dst
, socklen_t size
) {
65 union { struct sockaddr sa
; struct sockaddr_in sai
;
66 struct sockaddr_in6 sai6
; } addr
;
68 memset(&addr
, 0, sizeof(addr
));
69 addr
.sa
.sa_family
= af
;
71 memcpy(&addr
.sai6
.sin6_addr
, src
, sizeof(addr
.sai6
.sin6_addr
));
73 memcpy(&addr
.sai
.sin_addr
, src
, sizeof(addr
.sai
.sin_addr
));
75 res
= WSAAddressToStringA(&addr
.sa
, sizeof(addr
), 0, dst
, (LPDWORD
) &size
);
76 if (res
!= 0) return NULL
;
81 #ifndef INVALID_SOCKET
82 #define INVALID_SOCKET -1
86 /*===========================================================================*/
88 /*===========================================================================*/
90 static void panic(const char *fmt
, ...);
92 static void *dyad_realloc(void *ptr
, int n
) {
93 ptr
= realloc(ptr
, n
);
95 panic("out of memory");
101 static void dyad_free(void *ptr
) {
106 /*===========================================================================*/
107 /* Vec (dynamic array) */
108 /*===========================================================================*/
110 static void vec_expand(char **data
, int *length
, int *capacity
, int memsz
) {
111 if (*length
+ 1 > *capacity
) {
112 if (*capacity
== 0) {
117 *data
= dyad_realloc(*data
, *capacity
* memsz
);
121 static void vec_splice(
122 char **data
, int *length
, int *capacity
, int memsz
, int start
, int count
125 memmove(*data
+ start
* memsz
,
126 *data
+ (start
+ count
) * memsz
,
127 (*length
- start
- count
) * memsz
);
132 struct { T *data; int length, capacity; }
135 #define vec_unpack(v)\
136 (char**)&(v)->data, &(v)->length, &(v)->capacity, sizeof(*(v)->data)
140 memset((v), 0, sizeof(*(v)))
143 #define vec_deinit(v)\
147 #define vec_clear(v)\
151 #define vec_push(v, val)\
152 ( vec_expand(vec_unpack(v)),\
153 (v)->data[(v)->length++] = (val) )
156 #define vec_splice(v, start, count)\
157 ( vec_splice(vec_unpack(v), start, count),\
158 (v)->length -= (count) )
162 /*===========================================================================*/
164 /*===========================================================================*/
166 /* A wrapper around the three fd_sets used for select(). The fd_sets' allocated
167 * memory is automatically expanded to accommodate fds as they are added.
169 * On Windows fd_sets are implemented as arrays; the FD_xxx macros are not used
170 * by the wrapper and instead the fd_set struct is manipulated directly. The
171 * wrapper should perform better than the normal FD_xxx macros, given that we
172 * don't bother with the linear search which FD_SET would perform to check for
175 * On non-Windows platforms the sets are assumed to be bit arrays. The FD_xxx
176 * macros are not used in case their implementation attempts to do bounds
177 * checking; instead we manipulate the fd_sets' bits directly.
190 fd_set
*fds
[SELECT_MAX
];
193 #define DYAD_UNSIGNED_BIT (sizeof(unsigned) * CHAR_BIT)
196 static void select_deinit(SelectSet
*s
) {
198 for (i
= 0; i
< SELECT_MAX
; i
++) {
199 dyad_free(s
->fds
[i
]);
206 static void select_grow(SelectSet
*s
) {
208 int oldCapacity
= s
->capacity
;
209 s
->capacity
= s
->capacity
? s
->capacity
<< 1 : 1;
210 for (i
= 0; i
< SELECT_MAX
; i
++) {
211 s
->fds
[i
] = dyad_realloc(s
->fds
[i
], s
->capacity
* sizeof(fd_set
));
212 memset(s
->fds
[i
] + oldCapacity
, 0,
213 (s
->capacity
- oldCapacity
) * sizeof(fd_set
));
218 static void select_zero(SelectSet
*s
) {
220 if (s
->capacity
== 0) return;
222 for (i
= 0; i
< SELECT_MAX
; i
++) {
224 s
->fds
[i
]->fd_count
= 0;
226 memset(s
->fds
[i
], 0, s
->capacity
* sizeof(fd_set
));
232 static void select_add(SelectSet
*s
, int set
, dyad_Socket fd
) {
235 if (s
->capacity
== 0) select_grow(s
);
236 while ((unsigned) (s
->capacity
* FD_SETSIZE
) < s
->fds
[set
]->fd_count
+ 1) {
240 f
->fd_array
[f
->fd_count
++] = fd
;
243 while (s
->capacity
* FD_SETSIZE
< fd
) {
246 p
= (unsigned*) s
->fds
[set
];
247 p
[fd
/ DYAD_UNSIGNED_BIT
] |= 1 << (fd
% DYAD_UNSIGNED_BIT
);
248 if (fd
> s
->maxfd
) s
->maxfd
= fd
;
253 static int select_has(SelectSet
*s
, int set
, dyad_Socket fd
) {
257 if (s
->capacity
== 0) return 0;
259 for (i
= 0; i
< f
->fd_count
; i
++) {
260 if (f
->fd_array
[i
] == fd
) {
267 if (s
->maxfd
< fd
) return 0;
268 p
= (unsigned*) s
->fds
[set
];
269 return p
[fd
/ DYAD_UNSIGNED_BIT
] & (1 << (fd
% DYAD_UNSIGNED_BIT
));
274 /*===========================================================================*/
276 /*===========================================================================*/
280 dyad_Callback callback
;
290 int bytesSent
, bytesReceived
;
291 double lastActivity
, timeout
;
292 Vec(Listener
) listeners
;
293 Vec(char) lineBuffer
;
294 Vec(char) writeBuffer
;
298 #define DYAD_FLAG_READY (1 << 0)
299 #define DYAD_FLAG_WRITTEN (1 << 1)
302 static dyad_Stream
*dyad_streams
;
303 static int dyad_streamCount
;
304 static char dyad_panicMsgBuffer
[128];
305 static dyad_PanicCallback panicCallback
;
306 static SelectSet dyad_selectSet
;
307 static double dyad_updateTimeout
= 1;
308 static double dyad_tickInterval
= 1;
309 static double dyad_lastTick
= 0;
312 static void panic(const char *fmt
, ...) {
315 vsprintf(dyad_panicMsgBuffer
, fmt
, args
);
318 panicCallback(dyad_panicMsgBuffer
);
320 printf("dyad panic: %s\n", dyad_panicMsgBuffer
);
326 static dyad_Event
createEvent(int type
) {
328 memset(&e
, 0, sizeof(e
));
334 static void stream_destroy(dyad_Stream
*stream
);
336 static void destroyClosedStreams(void) {
337 dyad_Stream
*stream
= dyad_streams
;
339 if (stream
->state
== DYAD_STATE_CLOSED
) {
340 dyad_Stream
*next
= stream
->next
;
341 stream_destroy(stream
);
344 stream
= stream
->next
;
350 static void stream_emitEvent(dyad_Stream
*stream
, dyad_Event
*e
);
352 static void updateTickTimer(void) {
353 /* Update tick timer */
354 if (dyad_lastTick
== 0) {
355 dyad_lastTick
= dyad_getTime();
357 while (dyad_lastTick
< dyad_getTime()) {
358 /* Emit event on all streams */
360 dyad_Event e
= createEvent(DYAD_EVENT_TICK
);
361 e
.msg
= "a tick has occured";
362 stream
= dyad_streams
;
364 stream_emitEvent(stream
, &e
);
365 stream
= stream
->next
;
367 dyad_lastTick
+= dyad_tickInterval
;
372 static void updateStreamTimeouts(void) {
373 double currentTime
= dyad_getTime();
375 dyad_Event e
= createEvent(DYAD_EVENT_TIMEOUT
);
376 e
.msg
= "stream timed out";
377 stream
= dyad_streams
;
379 if (stream
->timeout
) {
380 if (currentTime
- stream
->lastActivity
> stream
->timeout
) {
381 stream_emitEvent(stream
, &e
);
385 stream
= stream
->next
;
391 /*===========================================================================*/
393 /*===========================================================================*/
395 static void stream_destroy(dyad_Stream
*stream
) {
399 if (stream
->sockfd
!= INVALID_SOCKET
) {
400 close(stream
->sockfd
);
402 /* Emit destroy event */
403 e
= createEvent(DYAD_EVENT_DESTROY
);
404 e
.msg
= "the stream has been destroyed";
405 stream_emitEvent(stream
, &e
);
406 /* Remove from list and decrement count */
407 next
= &dyad_streams
;
408 while (*next
!= stream
) {
409 next
= &(*next
)->next
;
411 *next
= stream
->next
;
413 /* Destroy and free */
414 vec_deinit(&stream
->listeners
);
415 vec_deinit(&stream
->lineBuffer
);
416 vec_deinit(&stream
->writeBuffer
);
417 dyad_free(stream
->address
);
422 static void stream_emitEvent(dyad_Stream
*stream
, dyad_Event
*e
) {
425 for (i
= 0; i
< stream
->listeners
.length
; i
++) {
426 Listener
*listener
= &stream
->listeners
.data
[i
];
427 if (listener
->event
== e
->type
) {
428 e
->udata
= listener
->udata
;
429 listener
->callback(e
);
431 /* Check to see if this listener was removed: If it was we decrement `i`
432 * since the next listener will now be in this ones place */
433 if (listener
!= &stream
->listeners
.data
[i
]) {
440 static void stream_error(dyad_Stream
*stream
, const char *msg
, int err
) {
442 dyad_Event e
= createEvent(DYAD_EVENT_ERROR
);
444 sprintf(buf
, "%.160s (%.80s)", msg
, strerror(err
));
449 stream_emitEvent(stream
, &e
);
454 static void stream_initAddress(dyad_Stream
*stream
) {
455 union { struct sockaddr sa
; struct sockaddr_storage sas
;
456 struct sockaddr_in sai
; struct sockaddr_in6 sai6
; } addr
;
458 memset(&addr
, 0, sizeof(addr
));
460 dyad_free(stream
->address
);
461 stream
->address
= NULL
;
462 if (getpeername(stream
->sockfd
, &addr
.sa
, &size
) == -1) {
463 if (getsockname(stream
->sockfd
, &addr
.sa
, &size
) == -1) {
467 if (addr
.sas
.ss_family
== AF_INET6
) {
468 stream
->address
= dyad_realloc(NULL
, INET6_ADDRSTRLEN
);
469 inet_ntop(AF_INET6
, &addr
.sai6
.sin6_addr
, stream
->address
,
471 stream
->port
= ntohs(addr
.sai6
.sin6_port
);
473 stream
->address
= dyad_realloc(NULL
, INET_ADDRSTRLEN
);
474 inet_ntop(AF_INET
, &addr
.sai
.sin_addr
, stream
->address
, INET_ADDRSTRLEN
);
475 stream
->port
= ntohs(addr
.sai
.sin_port
);
480 static void stream_setSocketNonBlocking(dyad_Stream
*stream
, int opt
) {
483 ioctlsocket(stream
->sockfd
, FIONBIO
, &mode
);
485 int flags
= fcntl(stream
->sockfd
, F_GETFL
);
486 fcntl(stream
->sockfd
, F_SETFL
,
487 opt
? (flags
| O_NONBLOCK
) : (flags
& ~O_NONBLOCK
));
492 static void stream_setSocket(dyad_Stream
*stream
, dyad_Socket sockfd
) {
493 stream
->sockfd
= sockfd
;
494 stream_setSocketNonBlocking(stream
, 1);
495 stream_initAddress(stream
);
499 static int stream_initSocket(
500 dyad_Stream
*stream
, int domain
, int type
, int protocol
502 stream
->sockfd
= socket(domain
, type
, protocol
);
503 if (stream
->sockfd
== INVALID_SOCKET
) {
504 stream_error(stream
, "could not create socket", errno
);
507 stream_setSocket(stream
, stream
->sockfd
);
512 static int stream_hasListenerForEvent(dyad_Stream
*stream
, int event
) {
514 for (i
= 0; i
< stream
->listeners
.length
; i
++) {
515 Listener
*listener
= &stream
->listeners
.data
[i
];
516 if (listener
->event
== event
) {
524 static void stream_handleReceivedData(dyad_Stream
*stream
) {
529 int size
= recv(stream
->sockfd
, data
, sizeof(data
) - 1, 0);
531 if (size
== 0 || errno
!= EWOULDBLOCK
) {
532 /* Handle disconnect */
542 stream
->bytesReceived
+= size
;
543 stream
->lastActivity
= dyad_getTime();
544 /* Emit data event */
545 e
= createEvent(DYAD_EVENT_DATA
);
546 e
.msg
= "received data";
549 stream_emitEvent(stream
, &e
);
550 /* Check stream state in case it was closed during one of the data event
552 if (stream
->state
!= DYAD_STATE_CONNECTED
) {
556 /* Handle line event */
557 if (stream_hasListenerForEvent(stream
, DYAD_EVENT_LINE
)) {
560 for (i
= 0; i
< size
; i
++) {
561 vec_push(&stream
->lineBuffer
, data
[i
]);
564 buf
= stream
->lineBuffer
.data
;
565 for (i
= 0; i
< stream
->lineBuffer
.length
; i
++) {
566 if (buf
[i
] == '\n') {
569 e
= createEvent(DYAD_EVENT_LINE
);
570 e
.msg
= "received line";
571 e
.data
= &buf
[start
];
573 /* Check and strip carriage return */
574 if (e
.size
> 0 && e
.data
[e
.size
- 1] == '\r') {
575 e
.data
[--e
.size
] = '\0';
577 stream_emitEvent(stream
, &e
);
579 /* Check stream state in case it was closed during one of the line
581 if (stream
->state
!= DYAD_STATE_CONNECTED
) {
586 if (start
== stream
->lineBuffer
.length
) {
587 vec_clear(&stream
->lineBuffer
);
589 vec_splice(&stream
->lineBuffer
, 0, start
);
596 static void stream_acceptPendingConnections(dyad_Stream
*stream
) {
601 dyad_Socket sockfd
= accept(stream
->sockfd
, NULL
, NULL
);
602 if (sockfd
== INVALID_SOCKET
) {
604 if (err
== EWOULDBLOCK
) {
605 /* No more waiting sockets */
609 /* Create client stream */
610 remote
= dyad_newStream();
611 remote
->state
= DYAD_STATE_CONNECTED
;
612 /* Set stream's socket */
613 stream_setSocket(remote
, sockfd
);
614 /* Emit accept event */
615 e
= createEvent(DYAD_EVENT_ACCEPT
);
616 e
.msg
= "accepted connection";
618 stream_emitEvent(stream
, &e
);
619 /* Handle invalid socket -- the stream is still made and the ACCEPT event
620 * is still emitted, but its shut immediately with an error */
621 if (remote
->sockfd
== INVALID_SOCKET
) {
622 stream_error(remote
, "failed to create socket on accept", err
);
629 static int stream_flushWriteBuffer(dyad_Stream
*stream
) {
630 stream
->flags
&= ~DYAD_FLAG_WRITTEN
;
631 if (stream
->writeBuffer
.length
> 0) {
633 int size
= send(stream
->sockfd
, stream
->writeBuffer
.data
,
634 stream
->writeBuffer
.length
, 0);
636 if (errno
== EWOULDBLOCK
) {
637 /* No more data can be written */
640 /* Handle disconnect */
645 if (size
== stream
->writeBuffer
.length
) {
646 vec_clear(&stream
->writeBuffer
);
648 vec_splice(&stream
->writeBuffer
, 0, size
);
651 stream
->bytesSent
+= size
;
652 stream
->lastActivity
= dyad_getTime();
655 if (stream
->writeBuffer
.length
== 0) {
657 /* If this is a 'closing' stream we can properly close it now */
658 if (stream
->state
== DYAD_STATE_CLOSING
) {
662 /* Set ready flag and emit 'ready for data' event */
663 stream
->flags
|= DYAD_FLAG_READY
;
664 e
= createEvent(DYAD_EVENT_READY
);
665 e
.msg
= "stream is ready for more data";
666 stream_emitEvent(stream
, &e
);
668 /* Return 1 to indicate that more data can immediately be written to the
675 /*===========================================================================*/
677 /*===========================================================================*/
679 /*---------------------------------------------------------------------------*/
681 /*---------------------------------------------------------------------------*/
683 void dyad_update(void) {
687 destroyClosedStreams();
689 updateStreamTimeouts();
691 /* Create fd sets for select() */
692 select_zero(&dyad_selectSet
);
694 stream
= dyad_streams
;
696 switch (stream
->state
) {
697 case DYAD_STATE_CONNECTED
:
698 select_add(&dyad_selectSet
, SELECT_READ
, stream
->sockfd
);
699 if (!(stream
->flags
& DYAD_FLAG_READY
) ||
700 stream
->writeBuffer
.length
!= 0
702 select_add(&dyad_selectSet
, SELECT_WRITE
, stream
->sockfd
);
705 case DYAD_STATE_CLOSING
:
706 select_add(&dyad_selectSet
, SELECT_WRITE
, stream
->sockfd
);
708 case DYAD_STATE_CONNECTING
:
709 select_add(&dyad_selectSet
, SELECT_WRITE
, stream
->sockfd
);
710 select_add(&dyad_selectSet
, SELECT_EXCEPT
, stream
->sockfd
);
712 case DYAD_STATE_LISTENING
:
713 select_add(&dyad_selectSet
, SELECT_READ
, stream
->sockfd
);
716 stream
= stream
->next
;
719 /* Init timeout value and do select */
721 #pragma warning(push)
722 /* Disable double to long implicit conversion warning,
723 * because the type of timeval's fields don't agree across platforms */
724 #pragma warning(disable: 4244)
726 tv
.tv_sec
= dyad_updateTimeout
;
727 tv
.tv_usec
= (dyad_updateTimeout
- tv
.tv_sec
) * 1e6
;
732 select(dyad_selectSet
.maxfd
+ 1,
733 dyad_selectSet
.fds
[SELECT_READ
],
734 dyad_selectSet
.fds
[SELECT_WRITE
],
735 dyad_selectSet
.fds
[SELECT_EXCEPT
],
739 stream
= dyad_streams
;
741 switch (stream
->state
) {
743 case DYAD_STATE_CONNECTED
:
744 if (select_has(&dyad_selectSet
, SELECT_READ
, stream
->sockfd
)) {
745 stream_handleReceivedData(stream
);
746 if (stream
->state
== DYAD_STATE_CLOSED
) {
752 case DYAD_STATE_CLOSING
:
753 if (select_has(&dyad_selectSet
, SELECT_WRITE
, stream
->sockfd
)) {
754 stream_flushWriteBuffer(stream
);
758 case DYAD_STATE_CONNECTING
:
759 if (select_has(&dyad_selectSet
, SELECT_WRITE
, stream
->sockfd
)) {
760 /* Check socket for error */
762 socklen_t optlen
= sizeof(optval
);
764 getsockopt(stream
->sockfd
, SOL_SOCKET
, SO_ERROR
, &optval
, &optlen
);
765 if (optval
!= 0) goto connectFailed
;
766 /* Handle succeselful connection */
767 stream
->state
= DYAD_STATE_CONNECTED
;
768 stream
->lastActivity
= dyad_getTime();
769 stream_initAddress(stream
);
770 /* Emit connect event */
771 e
= createEvent(DYAD_EVENT_CONNECT
);
772 e
.msg
= "connected to server";
773 stream_emitEvent(stream
, &e
);
775 select_has(&dyad_selectSet
, SELECT_EXCEPT
, stream
->sockfd
)
777 /* Handle failed connection */
779 stream_error(stream
, "could not connect to server", 0);
783 case DYAD_STATE_LISTENING
:
784 if (select_has(&dyad_selectSet
, SELECT_READ
, stream
->sockfd
)) {
785 stream_acceptPendingConnections(stream
);
790 /* If data was just now written to the stream we should immediately try to
793 stream
->flags
& DYAD_FLAG_WRITTEN
&&
794 stream
->state
!= DYAD_STATE_CLOSED
796 stream_flushWriteBuffer(stream
);
799 stream
= stream
->next
;
804 void dyad_init(void) {
807 int err
= WSAStartup(MAKEWORD(2, 2), &dat
);
809 panic("WSAStartup failed (%d)", err
);
812 /* Stops the SIGPIPE signal being raised when writing to a closed socket */
813 signal(SIGPIPE
, SIG_IGN
);
818 void dyad_shutdown(void) {
819 /* Close and destroy all the streams */
820 while (dyad_streams
) {
821 dyad_close(dyad_streams
);
822 stream_destroy(dyad_streams
);
824 /* Clear up everything */
825 select_deinit(&dyad_selectSet
);
832 const char *dyad_getVersion(void) {
837 double dyad_getTime(void) {
840 GetSystemTimeAsFileTime(&ft
);
841 return (ft
.dwHighDateTime
* 4294967296.0 / 1e7
) + ft
.dwLowDateTime
/ 1e7
;
844 gettimeofday(&tv
, NULL
);
845 return tv
.tv_sec
+ tv
.tv_usec
/ 1e6
;
850 int dyad_getStreamCount(void) {
851 return dyad_streamCount
;
855 void dyad_setTickInterval(double seconds
) {
856 dyad_tickInterval
= seconds
;
860 void dyad_setUpdateTimeout(double seconds
) {
861 dyad_updateTimeout
= seconds
;
865 dyad_PanicCallback
dyad_atPanic(dyad_PanicCallback func
) {
866 dyad_PanicCallback old
= panicCallback
;
867 panicCallback
= func
;
872 /*---------------------------------------------------------------------------*/
874 /*---------------------------------------------------------------------------*/
876 dyad_Stream
*dyad_newStream(void) {
877 dyad_Stream
*stream
= dyad_realloc(NULL
, sizeof(*stream
));
878 memset(stream
, 0, sizeof(*stream
));
879 stream
->state
= DYAD_STATE_CLOSED
;
880 stream
->sockfd
= INVALID_SOCKET
;
881 stream
->lastActivity
= dyad_getTime();
882 /* Add to list and increment count */
883 stream
->next
= dyad_streams
;
884 dyad_streams
= stream
;
890 void dyad_addListener(
891 dyad_Stream
*stream
, int event
, dyad_Callback callback
, void *udata
894 listener
.event
= event
;
895 listener
.callback
= callback
;
896 listener
.udata
= udata
;
897 vec_push(&stream
->listeners
, listener
);
901 void dyad_removeListener(
902 dyad_Stream
*stream
, int event
, dyad_Callback callback
, void *udata
904 int i
= stream
->listeners
.length
;
906 Listener
*x
= &stream
->listeners
.data
[i
];
907 if (x
->event
== event
&& x
->callback
== callback
&& x
->udata
== udata
) {
908 vec_splice(&stream
->listeners
, i
, 1);
914 void dyad_removeAllListeners(dyad_Stream
*stream
, int event
) {
915 if (event
== DYAD_EVENT_NULL
) {
916 vec_clear(&stream
->listeners
);
918 int i
= stream
->listeners
.length
;
920 if (stream
->listeners
.data
[i
].event
== event
) {
921 vec_splice(&stream
->listeners
, i
, 1);
928 void dyad_close(dyad_Stream
*stream
) {
930 if (stream
->state
== DYAD_STATE_CLOSED
) return;
931 stream
->state
= DYAD_STATE_CLOSED
;
933 if (stream
->sockfd
!= INVALID_SOCKET
) {
934 close(stream
->sockfd
);
935 stream
->sockfd
= INVALID_SOCKET
;
938 e
= createEvent(DYAD_EVENT_CLOSE
);
939 e
.msg
= "stream closed";
940 stream_emitEvent(stream
, &e
);
942 vec_clear(&stream
->lineBuffer
);
943 vec_clear(&stream
->writeBuffer
);
947 void dyad_end(dyad_Stream
*stream
) {
948 if (stream
->state
== DYAD_STATE_CLOSED
) return;
949 if (stream
->writeBuffer
.length
> 0) {
950 stream
->state
= DYAD_STATE_CLOSING
;
958 dyad_Stream
*stream
, const char *host
, int port
, int backlog
960 struct addrinfo hints
, *ai
= NULL
;
966 memset(&hints
, 0, sizeof(hints
));
967 hints
.ai_family
= AF_UNSPEC
;
968 hints
.ai_socktype
= SOCK_STREAM
;
969 hints
.ai_flags
= AI_PASSIVE
;
970 sprintf(buf
, "%d", port
);
971 err
= getaddrinfo(host
, buf
, &hints
, &ai
);
973 stream_error(stream
, "could not get addrinfo", errno
);
977 err
= stream_initSocket(stream
, ai
->ai_family
, ai
->ai_socktype
,
980 /* Set SO_REUSEADDR so that the socket can be immediately bound without
981 * having to wait for any closed socket on the same port to timeout */
983 setsockopt(stream
->sockfd
, SOL_SOCKET
, SO_REUSEADDR
,
984 &optval
, sizeof(optval
));
985 /* Bind and listen */
986 err
= bind(stream
->sockfd
, ai
->ai_addr
, ai
->ai_addrlen
);
988 stream_error(stream
, "could not bind socket", errno
);
991 err
= listen(stream
->sockfd
, backlog
);
993 stream_error(stream
, "socket failed on listen", errno
);
996 stream
->state
= DYAD_STATE_LISTENING
;
998 stream_initAddress(stream
);
999 /* Emit listening event */
1000 e
= createEvent(DYAD_EVENT_LISTEN
);
1001 e
.msg
= "socket is listening";
1002 stream_emitEvent(stream
, &e
);
1006 if (ai
) freeaddrinfo(ai
);
1011 int dyad_listen(dyad_Stream
*stream
, int port
) {
1012 return dyad_listenEx(stream
, NULL
, port
, 511);
1016 int dyad_connect(dyad_Stream
*stream
, const char *host
, int port
) {
1017 struct addrinfo hints
, *ai
= NULL
;
1022 memset(&hints
, 0, sizeof(hints
));
1023 hints
.ai_family
= AF_UNSPEC
;
1024 hints
.ai_socktype
= SOCK_STREAM
;
1025 sprintf(buf
, "%d", port
);
1026 err
= getaddrinfo(host
, buf
, &hints
, &ai
);
1028 stream_error(stream
, "could not resolve host", 0);
1031 /* Start connecting */
1032 err
= stream_initSocket(stream
, ai
->ai_family
, ai
->ai_socktype
,
1035 connect(stream
->sockfd
, ai
->ai_addr
, ai
->ai_addrlen
);
1036 stream
->state
= DYAD_STATE_CONNECTING
;
1040 if (ai
) freeaddrinfo(ai
);
1045 void dyad_write(dyad_Stream
*stream
, const void *data
, int size
) {
1046 const char *p
= data
;
1048 vec_push(&stream
->writeBuffer
, *p
++);
1050 stream
->flags
|= DYAD_FLAG_WRITTEN
;
1054 void dyad_vwritef(dyad_Stream
*stream
, const char *fmt
, va_list args
) {
1065 fp
= va_arg(args
, FILE*);
1070 while ((c
= fgetc(fp
)) != EOF
) {
1071 vec_push(&stream
->writeBuffer
, c
);
1075 vec_push(&stream
->writeBuffer
, va_arg(args
, int));
1078 str
= va_arg(args
, char*);
1079 if (str
== NULL
) str
= "(null)";
1082 vec_push(&stream
->writeBuffer
, *str
++);
1086 str
= va_arg(args
, char*);
1087 c
= va_arg(args
, int);
1089 vec_push(&stream
->writeBuffer
, *str
++);
1096 case 'g': sprintf(buf
, f
, va_arg(args
, double)); break;
1098 case 'i': sprintf(buf
, f
, va_arg(args
, int)); break;
1100 case 'X': sprintf(buf
, f
, va_arg(args
, unsigned)); break;
1101 case 'p': sprintf(buf
, f
, va_arg(args
, void*)); break;
1102 default : buf
[0] = *fmt
; buf
[1] = '\0';
1108 vec_push(&stream
->writeBuffer
, *fmt
);
1112 stream
->flags
|= DYAD_FLAG_WRITTEN
;
1116 void dyad_writef(dyad_Stream
*stream
, const char *fmt
, ...) {
1118 va_start(args
, fmt
);
1119 dyad_vwritef(stream
, fmt
, args
);
1124 void dyad_setTimeout(dyad_Stream
*stream
, double seconds
) {
1125 stream
->timeout
= seconds
;
1129 void dyad_setNoDelay(dyad_Stream
*stream
, int opt
) {
1131 setsockopt(stream
->sockfd
, IPPROTO_TCP
, TCP_NODELAY
, &opt
, sizeof(opt
));
1135 int dyad_getState(dyad_Stream
*stream
) {
1136 return stream
->state
;
1140 const char *dyad_getAddress(dyad_Stream
*stream
) {
1141 return stream
->address
? stream
->address
: "";
1145 int dyad_getPort(dyad_Stream
*stream
) {
1146 return stream
->port
;
1150 int dyad_getBytesSent(dyad_Stream
*stream
) {
1151 return stream
->bytesSent
;
1155 int dyad_getBytesReceived(dyad_Stream
*stream
) {
1156 return stream
->bytesReceived
;
1160 dyad_Socket
dyad_getSocket(dyad_Stream
*stream
) {
1161 return stream
->sockfd
;