1 /* Shoggoth - Distributed File System
4 * Copyright (C) 2012 Pawel Dziepak
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, version 3 of the License.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <arpa/inet.h>
18 #include <netinet/in.h>
22 #include <sys/socket.h>
29 #define NET_IS_ELDER 5
30 #define NET_MAX_ELDERS 0
32 #define NET_GUARD_RES 5
34 #define NET_MAX_DATA_SIZE 1024 * 64
35 #define NET_MAX_UDP_SIZE 1440
37 #define NET_TCP_THREAD_POOL 8
38 #define NET_UDP_THREAD_POOL 4
40 #define NET_REPLY_TIMEOUT 2
48 pthread_mutex_t sock_lock
;
51 struct node_address
*elder_prev
, *elder_next
;
54 struct response_address
{
56 struct sockaddr_in addr
;
62 struct packet_header
{
64 } __attribute__((packed
));
71 struct udp_packet_header
{
75 } __attribute__((packed
));
77 struct udp_request_entry
{
78 struct sockaddr_in
*addr
;
83 pthread_mutex_t event_lock
;
88 struct udp_request_entry
*next
, *prev
;
91 static struct udp_request_entry
*udp_rlist
= NULL
;
92 static pthread_mutex_t udp_rlist_lock
= PTHREAD_MUTEX_INITIALIZER
;
94 static struct node_address
*elder_first
= NULL
, *elder_last
= NULL
;
95 static pthread_mutex_t elders_lock
= PTHREAD_MUTEX_INITIALIZER
;
97 static int conn_count
= 0;
98 static pthread_mutex_t conn_lock
= PTHREAD_MUTEX_INITIALIZER
;
100 static int udp_socket
;
102 static pthread_t guardian
;
103 static struct node_address
*guarded
= NULL
;
104 static pthread_mutex_t guard_lock
= PTHREAD_MUTEX_INITIALIZER
;
106 static net_process_request process_request
;
108 struct thread_pool_request
{
109 struct response_address
*addr
;
111 struct thread_pool_request
*next
;
113 static int thpool_count
= 0, thpool_busy
= 0;
114 static struct thread_pool_request
*thpool_rlist
= NULL
;
115 static pthread_mutex_t thpool_lock
= PTHREAD_MUTEX_INITIALIZER
;
116 static pthread_cond_t thpool_event
= PTHREAD_COND_INITIALIZER
;
118 struct udp_thread_pool_request
{
119 struct response_address
*addr
;
122 struct udp_thread_pool_request
*next
;
124 static struct udp_thread_pool_request
*udp_thpool_rlist
= NULL
;
125 static struct udp_thread_pool_request
*udp_thpool_rlist_end
= NULL
;
126 static pthread_mutex_t udp_thpool_lock
= PTHREAD_MUTEX_INITIALIZER
;
127 static pthread_cond_t udp_thpool_event
= PTHREAD_COND_INITIALIZER
;
129 void net_elder_list_rem(struct node_address
*addr
) {
130 if (addr
->elder_prev
)
131 addr
->elder_prev
->elder_next
= addr
->elder_next
;
133 elder_first
= addr
->elder_next
;
135 if (addr
->elder_next
)
136 addr
->elder_next
->elder_prev
= addr
->elder_prev
;
138 elder_last
= addr
->elder_prev
;
141 void net_elder_list_add(struct node_address
*addr
) {
142 addr
->elder_next
= NULL
;
144 elder_last
->elder_next
= addr
;
147 addr
->elder_prev
= elder_last
;
151 int net_set_buffer_size(int sock
) {
152 size_t buffsize
= NET_MAX_DATA_SIZE
;
153 socklen_t len
= sizeof(buffsize
);
155 if (setsockopt(sock
, SOL_SOCKET
, SO_SNDBUF
, &buffsize
,
156 sizeof(buffsize
)) < 0)
159 if (getsockopt(sock
, SOL_SOCKET
, SO_SNDBUF
, &buffsize
, &len
) < 0)
162 if (buffsize
< NET_MAX_DATA_SIZE
)
165 if (setsockopt(sock
, SOL_SOCKET
, SO_RCVBUF
, &buffsize
,
166 sizeof(buffsize
)) < 0)
169 if (getsockopt(sock
, SOL_SOCKET
, SO_RCVBUF
, &buffsize
, &len
) < 0)
172 if (buffsize
< NET_MAX_DATA_SIZE
)
178 int net_connection_exist(struct node_address
*addr
) {
179 return addr
->sock
!= -1;
182 int net_connect(struct node_address
*addr
) {
183 struct sockaddr_in serv_addr
;
186 sock
= socket(AF_INET
, SOCK_STREAM
, 0);
190 err
= net_set_buffer_size(sock
);
194 serv_addr
.sin_family
= AF_INET
;
195 serv_addr
.sin_addr
.s_addr
= addr
->ipv4
;
196 serv_addr
.sin_port
= addr
->port
;
197 if (connect(sock
, (struct sockaddr
*)&serv_addr
, sizeof(serv_addr
)) < 0)
202 pthread_mutex_lock(&conn_lock
);
204 pthread_mutex_unlock(&conn_lock
);
209 int net_disconnect(struct node_address
*addr
) {
210 pthread_mutex_lock(&conn_lock
);
212 pthread_mutex_unlock(&conn_lock
);
214 if (close(addr
->sock
) < 0)
222 int net_acquire_connection(struct node_address
*addr
) {
225 err
= pthread_mutex_lock(&addr
->sock_lock
);
229 if (addr
->sock
== -1)
230 return net_connect(addr
);
232 pthread_mutex_lock(&elders_lock
);
234 net_elder_list_rem(addr
);
235 pthread_mutex_unlock(&elders_lock
);
240 void net_release_connection(struct node_address
*addr
) {
241 addr
->last_used
= time(NULL
);
243 pthread_mutex_lock(&elders_lock
);
245 net_elder_list_add(addr
);
246 pthread_mutex_unlock(&elders_lock
);
248 pthread_mutex_unlock(&addr
->sock_lock
);
251 void *net_elders_collector(void *args
) {
252 struct node_address
*addr
;
253 int wait_time
= NET_IS_ELDER
;
259 if (conn_count
< NET_MAX_ELDERS
|| !elder_first
260 || guarded
== elder_first
) {
261 wait_time
= min(max(wait_time
* 2, NET_IS_ELDER
),
266 pthread_mutex_lock(&elders_lock
);
267 if (pthread_mutex_trylock(&elder_first
->sock_lock
) == EBUSY
) {
268 pthread_mutex_unlock(&elders_lock
);
272 if (elder_first
->last_used
> time(NULL
) - NET_IS_ELDER
) {
273 pthread_mutex_unlock(&elders_lock
);
274 pthread_mutex_unlock(&elder_first
->sock_lock
);
279 elder_first
= addr
->elder_next
;
280 if (addr
->elder_next
)
281 addr
->elder_next
->elder_prev
= NULL
;
282 if (elder_last
== addr
)
284 pthread_mutex_unlock(&elders_lock
);
286 net_disconnect(addr
);
287 pthread_mutex_unlock(&addr
->sock_lock
);
289 wait_time
= max(min(wait_time
/ 2, NET_IS_ELDER
), 1);
293 int net_create_node_address(struct node_address
**addr
, const char *ipv4
,
296 err
= inet_pton(AF_INET
, ipv4
, &(*addr
)->ipv4
);
302 *addr
= (struct node_address
*)malloc(sizeof(struct node_address
));
306 (*addr
)->port
= htons(port
);
308 (*addr
)->elder_next
= NULL
;
309 (*addr
)->elder_prev
= NULL
;
310 pthread_mutex_init(&(*addr
)->sock_lock
, NULL
);
315 int net_free_node_address(struct node_address
*addr
) {
320 void net_udp_reply_received(struct udp_packet_header
*uph
,
321 struct sockaddr_in
*addr
) {
322 struct udp_request_entry
*current
, *previous
= NULL
, *found
= NULL
;
324 pthread_mutex_lock(&udp_rlist_lock
);
327 if (current
->tag
== uph
->tag
&&
328 addr
->sin_port
== current
->addr
->sin_port
&&
329 addr
->sin_addr
.s_addr
== addr
->sin_addr
.s_addr
) {
334 previous
->next
= current
->next
;
336 udp_rlist
= current
->next
;
338 current
->next
->prev
= previous
;
344 current
= current
->next
;
346 pthread_mutex_unlock(&udp_rlist_lock
);
351 memcpy(found
->buffer
, uph
+ 1, min(from_be16(uph
->size
), found
->bsize
));
352 pthread_cond_broadcast(&found
->event
);
355 void net_udp_request_received(struct udp_packet_header
*uph
,
356 struct sockaddr_in
*addr
) {
357 struct udp_thread_pool_request
*uthp_req
;
360 uthp_req
= malloc(sizeof(*uthp_req
));
364 uthp_req
->buffer
= (void*)uph
;
365 uthp_req
->addr
= malloc(sizeof(struct response_address
));
366 if (!uthp_req
->addr
) {
371 uthp_req
->addr
->sock
= -1;
372 memcpy(&uthp_req
->addr
->addr
, addr
, sizeof(*addr
));
373 uthp_req
->addr
->is_udp
= 1;
374 uthp_req
->addr
->tag
= uph
->tag
;
375 uthp_req
->next
= NULL
;
377 pthread_mutex_lock(&udp_thpool_lock
);
378 if (udp_thpool_rlist_end
) {
379 udp_thpool_rlist_end
->next
= uthp_req
;
380 udp_thpool_rlist_end
= uthp_req
;
382 udp_thpool_rlist
= uthp_req
;
383 udp_thpool_rlist_end
= uthp_req
;
384 pthread_cond_signal(&udp_thpool_event
);
386 pthread_mutex_unlock(&udp_thpool_lock
);
389 void *net_udp_serve_client(void *args
) {
390 struct udp_thread_pool_request
*uthp_req
;
391 struct udp_packet_header
*uph
;
395 pthread_mutex_lock(&udp_thpool_lock
);
396 while (!udp_thpool_rlist
)
397 pthread_cond_wait(&udp_thpool_event
, &udp_thpool_lock
);
399 uthp_req
= udp_thpool_rlist
;
400 udp_thpool_rlist
= uthp_req
->next
;
401 if (!udp_thpool_rlist
)
402 udp_thpool_rlist_end
= NULL
;
403 pthread_mutex_unlock(&udp_thpool_lock
);
405 uph
= (struct udp_packet_header
*)uthp_req
->buffer
;
406 process_request(uthp_req
->addr
,
407 (char*)uthp_req
->buffer
+ sizeof(*uph
),
408 from_be16(uph
->size
));
410 free(uthp_req
->addr
);
415 void *net_udp_start_listening(void *args
) {
416 struct sockaddr_in serv_addr
, cli_addr
;
417 socklen_t clilen
= sizeof(cli_addr
);
418 void *buffer
= malloc(NET_MAX_UDP_SIZE
);
419 struct udp_packet_header
*uph
= (struct udp_packet_header
*)buffer
;
425 udp_socket
= socket(AF_INET
, SOCK_DGRAM
, IPPROTO_UDP
);
429 serv_addr
.sin_family
= AF_INET
;
430 serv_addr
.sin_addr
.s_addr
= INADDR_ANY
;
431 serv_addr
.sin_port
= htons(NET_PORT_NO
);
433 while (bind(udp_socket
, (struct sockaddr
*)&serv_addr
,
434 sizeof(serv_addr
)) < 0)
435 serv_addr
.sin_port
++;
438 recvfrom(udp_socket
, buffer
, NET_MAX_UDP_SIZE
, 0,
439 (struct sockaddr
*)&cli_addr
, &clilen
);
443 net_udp_request_received(uph
, &cli_addr
);
446 net_udp_reply_received(uph
, &cli_addr
);
454 void net_serve_client(struct response_address
*addr
) {
455 struct packet_header ph
;
462 err
= recv(addr
->sock
, &ph
, sizeof(ph
), 0);
466 size
= from_be32(ph
.size
);
467 buffer
= malloc(size
);
472 err
= recv(addr
->sock
, (char*)buffer
+ ptr
,
473 min(size
- ptr
, NET_MAX_DATA_SIZE
),
476 } while (err
> 0 && ptr
< size
);
482 process_request(addr
, buffer
, size
);
490 void *net_client_listening(void *pargs
) {
491 struct response_address
*addr
= (struct response_address
*)pargs
;
492 struct thread_pool_request
*thp_req
;
494 net_serve_client(addr
);
499 pthread_mutex_lock(&thpool_lock
);
501 if (thpool_count
- thpool_busy
> NET_TCP_THREAD_POOL
) {
503 pthread_mutex_unlock(&thpool_lock
);
507 while (!thpool_rlist
)
508 pthread_cond_wait(&thpool_event
, &thpool_lock
);
510 thp_req
= thpool_rlist
;
511 addr
= thp_req
->addr
;
512 thpool_rlist
= thp_req
->next
;
513 pthread_mutex_unlock(&thpool_lock
);
515 net_serve_client(addr
);
523 void net_got_connection(struct response_address
*addr
) {
524 struct thread_pool_request
*thp_req
;
525 struct response_address
*raddr
= malloc(sizeof(*addr
));
531 memcpy(raddr
, addr
, sizeof(*addr
));
532 pthread_mutex_lock(&thpool_lock
);
533 if (thpool_count
- thpool_busy
< 1 &&
534 !pthread_create(&thread
, NULL
, net_client_listening
, raddr
)) {
537 pthread_mutex_unlock(&thpool_lock
);
539 thp_req
= malloc(sizeof(*thp_req
));
542 pthread_mutex_unlock(&thpool_lock
);
546 thp_req
->addr
= raddr
;
547 thp_req
->next
= thpool_rlist
;
549 thpool_rlist
= thp_req
;
550 pthread_mutex_unlock(&thpool_lock
);
552 pthread_cond_signal(&thpool_event
);
556 int net_start_listening(net_process_request preq
) {
557 int sock
, cli_sock
, err
;
558 struct response_address resp
;
559 struct sockaddr_in serv_addr
, cli_addr
;
560 socklen_t clilen
= sizeof(cli_addr
);
563 process_request
= preq
;
565 sock
= socket(AF_INET
, SOCK_STREAM
, 0);
569 err
= net_set_buffer_size(sock
);
573 serv_addr
.sin_family
= AF_INET
;
574 serv_addr
.sin_addr
.s_addr
= INADDR_ANY
;
575 serv_addr
.sin_port
= htons(NET_PORT_NO
);
576 if (bind(sock
, (struct sockaddr
*)&serv_addr
, sizeof(serv_addr
)) < 0)
581 pthread_create(&thread
, NULL
, net_udp_start_listening
, NULL
);
584 cli_sock
= accept(sock
, (struct sockaddr
*)&cli_addr
, &clilen
);
588 resp
.sock
= cli_sock
;
590 net_got_connection(&resp
);
594 struct net_guradian_args
{
595 net_guard_check check
;
598 void *net_guardian(void *pargs
) {
599 net_guard_check check
= ((struct net_guradian_args
*)pargs
)->check
;
603 pthread_mutex_lock(&guard_lock
);
605 pthread_mutex_unlock(&guard_lock
);
607 sleep(NET_GUARD_RES
);
613 int net_set_guard(struct node_address
*addr
, net_guard_check check
) {
615 struct net_guradian_args
*args
;
620 args
= malloc(sizeof(*args
));
625 res
= pthread_mutex_lock(&guard_lock
);
626 res
= pthread_mutex_lock(&elders_lock
);
629 pthread_cancel(guardian
);
630 net_elder_list_add(guarded
);
634 if (addr
->sock
!= -1)
635 net_elder_list_rem(addr
);
636 res
= pthread_create(&guardian
, NULL
, net_guardian
, args
);
638 pthread_mutex_unlock(&elders_lock
);
639 pthread_mutex_unlock(&guard_lock
);
644 static int net_udp_send_request(struct node_address
*addr
, const void *data
,
645 uint32_t size
, void *reply
, uint32_t rsize
) {
646 struct udp_request_entry uentry
;
647 struct sockaddr_in serv_addr
;
648 struct timespec timeout
;
649 struct udp_packet_header
*uph
;
653 serv_addr
.sin_family
= AF_INET
;
654 serv_addr
.sin_addr
.s_addr
= addr
->ipv4
;
655 serv_addr
.sin_port
= addr
->port
;
657 uentry
.addr
= &serv_addr
;
658 uentry
.tag
= (uint8_t)rand();
661 pthread_cond_init(&uentry
.event
, NULL
);
662 pthread_mutex_init(&uentry
.event_lock
, NULL
);
664 uentry
.buffer
= reply
;
665 uentry
.bsize
= rsize
;
668 pthread_mutex_lock(&udp_rlist_lock
);
669 uentry
.next
= udp_rlist
;
671 uentry
.next
->prev
= &uentry
;
673 pthread_mutex_unlock(&udp_rlist_lock
);
675 buffer
= malloc(size
+ sizeof(struct udp_packet_header
));
679 uph
= (struct udp_packet_header
*)buffer
;
680 uph
->size
= to_be16(size
);
681 uph
->tag
= uentry
.tag
;
682 uph
->flag
= UDP_REQUEST
;
683 memcpy((char*)buffer
+ sizeof(*uph
), data
, size
);
684 res
= sendto(udp_socket
, buffer
, size
+ sizeof(*uph
), 0,
685 (struct sockaddr
*)&serv_addr
, sizeof(serv_addr
));
689 clock_gettime(CLOCK_REALTIME
, &timeout
);
690 timeout
.tv_sec
+= NET_REPLY_TIMEOUT
;
692 pthread_mutex_lock(&uentry
.event_lock
);
693 res
= pthread_cond_timedwait(&uentry
.event
, &uentry
.event_lock
,
695 if (res
== ETIMEDOUT
) {
696 pthread_mutex_lock(&udp_rlist_lock
);
698 uentry
.prev
->next
= uentry
.next
;
700 udp_rlist
= uentry
.next
;
702 uentry
.next
->prev
= uentry
.prev
;
703 pthread_mutex_unlock(&udp_rlist_lock
);
707 pthread_mutex_unlock(&uentry
.event_lock
);
709 pthread_mutex_destroy(&uentry
.event_lock
);
710 pthread_cond_destroy(&uentry
.event
);
715 int net_send_request(struct node_address
*addr
, const void *data
, uint32_t size
,
716 void *reply
, uint32_t rsize
) {
720 struct timeval timeout
;
721 struct packet_header ph
;
723 if (size
+ sizeof(struct udp_packet_header
) <= NET_MAX_UDP_SIZE
&&
724 rsize
+ sizeof(struct udp_packet_header
) <= NET_MAX_UDP_SIZE
&&
725 !net_connection_exist(addr
))
726 return net_udp_send_request(addr
, data
, size
, reply
, rsize
);
728 res
= net_acquire_connection(addr
);
730 net_release_connection(addr
);
734 ph
.size
= to_be32(size
);
735 res
= send(addr
->sock
, &ph
, sizeof(ph
), MSG_NOSIGNAL
| MSG_MORE
);
737 net_release_connection(addr
);
743 res
= send(addr
->sock
, (char*)data
+ ptr
, min(size
- ptr
,
744 NET_MAX_DATA_SIZE
), MSG_NOSIGNAL
);
746 } while (res
> 0 && ptr
< size
);
748 net_release_connection(addr
);
754 FD_SET(addr
->sock
, &fds
);
755 timeout
.tv_sec
= NET_REPLY_TIMEOUT
;
757 res
= select(addr
->sock
+ 1, &fds
, NULL
, NULL
, &timeout
);
759 net_release_connection(addr
);
761 } else if (res
== 0) {
762 net_release_connection(addr
);
766 res
= recv(addr
->sock
, &ph
, sizeof(ph
), 0);
768 net_release_connection(addr
);
770 } else if (res
== 0) {
771 net_release_connection(addr
);
775 size
= from_be32(ph
.size
);
777 net_release_connection(addr
);
780 rsize
= rsize
> size
? size
: rsize
;
783 res
= recv(addr
->sock
, (char*)reply
+ ptr
,
784 min(rsize
- ptr
, NET_MAX_DATA_SIZE
), 0);
786 } while (res
> 0 && ptr
< rsize
);
789 net_release_connection(addr
);
791 } else if (res
== 0) {
792 net_release_connection(addr
);
799 static int net_udp_send_reply(struct response_address
*addr
, const void *data
,
803 struct udp_packet_header
*uph
;
804 socklen_t len
= sizeof(addr
->addr
);
806 if (size
+ sizeof(struct udp_packet_header
) > NET_MAX_UDP_SIZE
)
809 buffer
= malloc(size
+ sizeof(struct udp_packet_header
));
813 uph
= (struct udp_packet_header
*)buffer
;
814 uph
->size
= to_be16(size
);
815 uph
->tag
= addr
->tag
;
816 uph
->flag
= UDP_REPLY
;
817 memcpy((char*)buffer
+ sizeof(*uph
), data
, size
);
819 res
= sendto(udp_socket
, buffer
, size
+ sizeof(*uph
), 0,
820 (struct sockaddr
*)&addr
->addr
, len
);
827 int net_send_reply(struct response_address
*addr
, const void *data
,
831 struct packet_header ph
;
834 return net_udp_send_reply(addr
, data
, size
);
836 ph
.size
= to_be32(size
);
837 res
= send(addr
->sock
, &ph
, sizeof(ph
), MSG_NOSIGNAL
| MSG_MORE
);
842 res
= send(addr
->sock
, (char*)data
+ ptr
, min(size
- ptr
,
843 NET_MAX_DATA_SIZE
), MSG_NOSIGNAL
);
845 } while (res
> 0 && ptr
< size
);
857 for (i
= 0; i
< NET_UDP_THREAD_POOL
; i
++) {
858 res
= pthread_create(&thread
, NULL
, net_udp_serve_client
, NULL
);
863 return -pthread_create(&thread
, NULL
, net_elders_collector
, NULL
);