Network module
[shoggoth.git] / shared / net.c
blobfa2490fe0ef69213108f08eb9a84baf9facd4752
1 /* Shoggoth - Distributed File System
2 * Network module
4 * Copyright (C) 2012 Pawel Dziepak
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, version 3 of the License.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <arpa/inet.h>
17 #include <errno.h>
18 #include <netinet/in.h>
19 #include <pthread.h>
20 #include <stdlib.h>
21 #include <string.h>
22 #include <sys/socket.h>
23 #include <time.h>
24 #include <unistd.h>
26 #include "common.h"
27 #include "net.h"
29 #define NET_IS_ELDER 5
30 #define NET_MAX_ELDERS 0
32 #define NET_GUARD_RES 5
34 #define NET_MAX_DATA_SIZE 1024 * 64
35 #define NET_MAX_UDP_SIZE 1440
37 #define NET_TCP_THREAD_POOL 8
38 #define NET_UDP_THREAD_POOL 4
40 #define NET_REPLY_TIMEOUT 2
42 struct node_address {
43 uint32_t ipv4;
44 uint16_t port;
46 int sock;
47 uint8_t udp_tag;
48 pthread_mutex_t sock_lock;
49 time_t last_used;
51 struct node_address *elder_prev, *elder_next;
54 struct response_address {
55 int sock;
56 struct sockaddr_in addr;
58 uint8_t is_udp;
59 uint8_t tag;
62 struct packet_header {
63 uint32_t size;
64 } __attribute__((packed));
66 enum {
67 UDP_REQUEST = 1,
68 UDP_REPLY = 2
71 struct udp_packet_header {
72 uint16_t size;
73 uint8_t flag;
74 uint8_t tag;
75 } __attribute__((packed));
77 struct udp_request_entry {
78 struct sockaddr_in *addr;
79 uint8_t tag;
80 int valid;
82 pthread_cond_t event;
83 pthread_mutex_t event_lock;
85 void *buffer;
86 uint32_t bsize;
88 struct udp_request_entry *next, *prev;
91 static struct udp_request_entry *udp_rlist = NULL;
92 static pthread_mutex_t udp_rlist_lock = PTHREAD_MUTEX_INITIALIZER;
94 static struct node_address *elder_first = NULL, *elder_last = NULL;
95 static pthread_mutex_t elders_lock = PTHREAD_MUTEX_INITIALIZER;
97 static int conn_count = 0;
98 static pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER;
100 static int udp_socket;
102 static pthread_t guardian;
103 static struct node_address *guarded = NULL;
104 static pthread_mutex_t guard_lock = PTHREAD_MUTEX_INITIALIZER;
106 static net_process_request process_request;
108 struct thread_pool_request {
109 struct response_address *addr;
111 struct thread_pool_request *next;
113 static int thpool_count = 0, thpool_busy = 0;
114 static struct thread_pool_request *thpool_rlist = NULL;
115 static pthread_mutex_t thpool_lock = PTHREAD_MUTEX_INITIALIZER;
116 static pthread_cond_t thpool_event = PTHREAD_COND_INITIALIZER;
118 struct udp_thread_pool_request {
119 struct response_address *addr;
120 void *buffer;
122 struct udp_thread_pool_request *next;
124 static struct udp_thread_pool_request *udp_thpool_rlist = NULL;
125 static struct udp_thread_pool_request *udp_thpool_rlist_end = NULL;
126 static pthread_mutex_t udp_thpool_lock = PTHREAD_MUTEX_INITIALIZER;
127 static pthread_cond_t udp_thpool_event = PTHREAD_COND_INITIALIZER;
129 void net_elder_list_rem(struct node_address *addr) {
130 if (addr->elder_prev)
131 addr->elder_prev->elder_next = addr->elder_next;
132 else
133 elder_first = addr->elder_next;
135 if (addr->elder_next)
136 addr->elder_next->elder_prev = addr->elder_prev;
137 else
138 elder_last = addr->elder_prev;
141 void net_elder_list_add(struct node_address *addr) {
142 addr->elder_next = NULL;
143 if (elder_last)
144 elder_last->elder_next = addr;
145 if (!elder_first)
146 elder_first = addr;
147 addr->elder_prev = elder_last;
148 elder_last = addr;
151 int net_set_buffer_size(int sock) {
152 size_t buffsize = NET_MAX_DATA_SIZE;
153 socklen_t len = sizeof(buffsize);
155 if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &buffsize,
156 sizeof(buffsize)) < 0)
157 return -errno;
159 if (getsockopt(sock, SOL_SOCKET, SO_SNDBUF, &buffsize, &len) < 0)
160 return -errno;
162 if (buffsize < NET_MAX_DATA_SIZE)
163 return -EMSGSIZE;
165 if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &buffsize,
166 sizeof(buffsize)) < 0)
167 return -errno;
169 if (getsockopt(sock, SOL_SOCKET, SO_RCVBUF, &buffsize, &len) < 0)
170 return -errno;
172 if (buffsize < NET_MAX_DATA_SIZE)
173 return -EMSGSIZE;
175 return 0;
178 int net_connection_exist(struct node_address *addr) {
179 return addr->sock != -1;
182 int net_connect(struct node_address *addr) {
183 struct sockaddr_in serv_addr;
184 int err, sock;
186 sock = socket(AF_INET, SOCK_STREAM, 0);
187 if (sock < 0)
188 return -errno;
190 err = net_set_buffer_size(sock);
191 if (err < 0)
192 return err;
194 serv_addr.sin_family = AF_INET;
195 serv_addr.sin_addr.s_addr = addr->ipv4;
196 serv_addr.sin_port = addr->port;
197 if (connect(sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) < 0)
198 return -errno;
200 addr->sock = sock;
202 pthread_mutex_lock(&conn_lock);
203 conn_count++;
204 pthread_mutex_unlock(&conn_lock);
206 return 0;
209 int net_disconnect(struct node_address *addr) {
210 pthread_mutex_lock(&conn_lock);
211 conn_count--;
212 pthread_mutex_unlock(&conn_lock);
214 if (close(addr->sock) < 0)
215 return -errno;
217 addr->sock = -1;
219 return 0;
222 int net_acquire_connection(struct node_address *addr) {
223 int err;
225 err = pthread_mutex_lock(&addr->sock_lock);
226 if (err != 0)
227 return -err;
229 if (addr->sock == -1)
230 return net_connect(addr);
232 pthread_mutex_lock(&elders_lock);
233 if (addr != guarded)
234 net_elder_list_rem(addr);
235 pthread_mutex_unlock(&elders_lock);
237 return 0;
240 void net_release_connection(struct node_address *addr) {
241 addr->last_used = time(NULL);
243 pthread_mutex_lock(&elders_lock);
244 if (addr != guarded)
245 net_elder_list_add(addr);
246 pthread_mutex_unlock(&elders_lock);
248 pthread_mutex_unlock(&addr->sock_lock);
251 void *net_elders_collector(void *args) {
252 struct node_address *addr;
253 int wait_time = NET_IS_ELDER;
254 (void)args;
256 while (1) {
257 sleep(wait_time);
259 if (conn_count < NET_MAX_ELDERS || !elder_first
260 || guarded == elder_first) {
261 wait_time = min(max(wait_time * 2, NET_IS_ELDER),
262 NET_IS_ELDER * 16);
263 continue;
266 pthread_mutex_lock(&elders_lock);
267 if (pthread_mutex_trylock(&elder_first->sock_lock) == EBUSY) {
268 pthread_mutex_unlock(&elders_lock);
269 continue;
272 if (elder_first->last_used > time(NULL) - NET_IS_ELDER) {
273 pthread_mutex_unlock(&elders_lock);
274 pthread_mutex_unlock(&elder_first->sock_lock);
275 continue;
278 addr = elder_first;
279 elder_first = addr->elder_next;
280 if (addr->elder_next)
281 addr->elder_next->elder_prev = NULL;
282 if (elder_last == addr)
283 elder_last = NULL;
284 pthread_mutex_unlock(&elders_lock);
286 net_disconnect(addr);
287 pthread_mutex_unlock(&addr->sock_lock);
289 wait_time = max(min(wait_time / 2, NET_IS_ELDER), 1);
293 int net_create_node_address(struct node_address **addr, const char *ipv4,
294 uint16_t port) {
295 int err;
296 err = inet_pton(AF_INET, ipv4, &(*addr)->ipv4);
297 if (err < 0)
298 return -errno;
299 else if (err == 0)
300 return -EINVAL;
302 *addr = (struct node_address *)malloc(sizeof(struct node_address));
303 if (!*addr)
304 return -ENOMEM;
306 (*addr)->port = htons(port);
307 (*addr)->sock = -1;
308 (*addr)->elder_next = NULL;
309 (*addr)->elder_prev = NULL;
310 pthread_mutex_init(&(*addr)->sock_lock, NULL);
312 return 0;
315 int net_free_node_address(struct node_address *addr) {
316 free(addr);
317 return 0;
320 void net_udp_reply_received(struct udp_packet_header *uph,
321 struct sockaddr_in *addr) {
322 struct udp_request_entry *current, *previous = NULL, *found = NULL;
324 pthread_mutex_lock(&udp_rlist_lock);
325 current = udp_rlist;
326 while (current) {
327 if (current->tag == uph->tag &&
328 addr->sin_port == current->addr->sin_port &&
329 addr->sin_addr.s_addr == addr->sin_addr.s_addr) {
331 found = current;
333 if (previous)
334 previous->next = current->next;
335 else
336 udp_rlist = current->next;
337 if (current->next)
338 current->next->prev = previous;
340 break;
343 previous = current;
344 current = current->next;
346 pthread_mutex_unlock(&udp_rlist_lock);
348 if (!found)
349 return;
351 memcpy(found->buffer, uph + 1, min(from_be16(uph->size), found->bsize));
352 pthread_cond_broadcast(&found->event);
355 void net_udp_request_received(struct udp_packet_header *uph,
356 struct sockaddr_in *addr) {
357 struct udp_thread_pool_request *uthp_req;
358 (void)addr;
360 uthp_req = malloc(sizeof(*uthp_req));
361 if (!uthp_req)
362 return;
364 uthp_req->buffer = (void*)uph;
365 uthp_req->addr = malloc(sizeof(struct response_address));
366 if (!uthp_req->addr) {
367 free(uthp_req);
368 return;
371 uthp_req->addr->sock = -1;
372 memcpy(&uthp_req->addr->addr, addr, sizeof(*addr));
373 uthp_req->addr->is_udp = 1;
374 uthp_req->addr->tag = uph->tag;
375 uthp_req->next = NULL;
377 pthread_mutex_lock(&udp_thpool_lock);
378 if (udp_thpool_rlist_end) {
379 udp_thpool_rlist_end->next = uthp_req;
380 udp_thpool_rlist_end = uthp_req;
381 } else {
382 udp_thpool_rlist = uthp_req;
383 udp_thpool_rlist_end = uthp_req;
384 pthread_cond_signal(&udp_thpool_event);
386 pthread_mutex_unlock(&udp_thpool_lock);
389 void *net_udp_serve_client(void *args) {
390 struct udp_thread_pool_request *uthp_req;
391 struct udp_packet_header *uph;
392 (void)args;
394 while (1) {
395 pthread_mutex_lock(&udp_thpool_lock);
396 while (!udp_thpool_rlist)
397 pthread_cond_wait(&udp_thpool_event, &udp_thpool_lock);
399 uthp_req = udp_thpool_rlist;
400 udp_thpool_rlist = uthp_req->next;
401 if (!udp_thpool_rlist)
402 udp_thpool_rlist_end = NULL;
403 pthread_mutex_unlock(&udp_thpool_lock);
405 uph = (struct udp_packet_header *)uthp_req->buffer;
406 process_request(uthp_req->addr,
407 (char*)uthp_req->buffer + sizeof(*uph),
408 from_be16(uph->size));
410 free(uthp_req->addr);
411 free(uthp_req);
415 void *net_udp_start_listening(void *args) {
416 struct sockaddr_in serv_addr, cli_addr;
417 socklen_t clilen = sizeof(cli_addr);
418 void *buffer = malloc(NET_MAX_UDP_SIZE);
419 struct udp_packet_header *uph = (struct udp_packet_header *)buffer;
420 (void)args;
422 if (!buffer)
423 return NULL;
425 udp_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
426 if (udp_socket < 0)
427 return NULL;
429 serv_addr.sin_family = AF_INET;
430 serv_addr.sin_addr.s_addr = INADDR_ANY;
431 serv_addr.sin_port = htons(NET_PORT_NO);
433 while (bind(udp_socket, (struct sockaddr*)&serv_addr,
434 sizeof(serv_addr)) < 0)
435 serv_addr.sin_port++;
437 while (1) {
438 recvfrom(udp_socket, buffer, NET_MAX_UDP_SIZE, 0,
439 (struct sockaddr*)&cli_addr, &clilen);
441 switch (uph->flag) {
442 case UDP_REQUEST:
443 net_udp_request_received(uph, &cli_addr);
444 break;
445 case UDP_REPLY:
446 net_udp_reply_received(uph, &cli_addr);
447 break;
451 free(buffer);
454 void net_serve_client(struct response_address *addr) {
455 struct packet_header ph;
456 void *buffer;
457 int err;
458 uint32_t ptr, size;
460 while (1) {
461 ptr = 0;
462 err = recv(addr->sock, &ph, sizeof(ph), 0);
463 if (err <= 0)
464 break;
466 size = from_be32(ph.size);
467 buffer = malloc(size);
468 if (buffer == NULL)
469 break;
471 do {
472 err = recv(addr->sock, (char*)buffer + ptr,
473 min(size - ptr, NET_MAX_DATA_SIZE),
475 ptr += err;
476 } while (err > 0 && ptr < size);
477 if (err <= 0) {
478 free(buffer);
479 break;
482 process_request(addr, buffer, size);
484 free(buffer);
487 close(addr->sock);
490 void *net_client_listening(void *pargs) {
491 struct response_address *addr = (struct response_address *)pargs;
492 struct thread_pool_request *thp_req;
494 net_serve_client(addr);
495 free(pargs);
497 while (1) {
499 pthread_mutex_lock(&thpool_lock);
500 thpool_busy--;
501 if (thpool_count - thpool_busy > NET_TCP_THREAD_POOL) {
502 thpool_count--;
503 pthread_mutex_unlock(&thpool_lock);
504 return NULL;
507 while (!thpool_rlist)
508 pthread_cond_wait(&thpool_event, &thpool_lock);
510 thp_req = thpool_rlist;
511 addr = thp_req->addr;
512 thpool_rlist = thp_req->next;
513 pthread_mutex_unlock(&thpool_lock);
515 net_serve_client(addr);
517 free(thp_req->addr);
518 free(thp_req);
520 return NULL;
523 void net_got_connection(struct response_address *addr) {
524 struct thread_pool_request *thp_req;
525 struct response_address *raddr = malloc(sizeof(*addr));
526 pthread_t thread;
528 if (!raddr)
529 return;
531 memcpy(raddr, addr, sizeof(*addr));
532 pthread_mutex_lock(&thpool_lock);
533 if (thpool_count - thpool_busy < 1 &&
534 !pthread_create(&thread, NULL, net_client_listening, raddr)) {
535 thpool_count++;
536 thpool_busy++;
537 pthread_mutex_unlock(&thpool_lock);
538 } else {
539 thp_req = malloc(sizeof(*thp_req));
540 if (!thp_req) {
541 free(raddr);
542 pthread_mutex_unlock(&thpool_lock);
543 return;
546 thp_req->addr = raddr;
547 thp_req->next = thpool_rlist;
549 thpool_rlist = thp_req;
550 pthread_mutex_unlock(&thpool_lock);
552 pthread_cond_signal(&thpool_event);
556 int net_start_listening(net_process_request preq) {
557 int sock, cli_sock, err;
558 struct response_address resp;
559 struct sockaddr_in serv_addr, cli_addr;
560 socklen_t clilen = sizeof(cli_addr);
561 pthread_t thread;
563 process_request = preq;
565 sock = socket(AF_INET, SOCK_STREAM, 0);
566 if (sock < 0)
567 return -errno;
569 err = net_set_buffer_size(sock);
570 if (err < 0)
571 return err;
573 serv_addr.sin_family = AF_INET;
574 serv_addr.sin_addr.s_addr = INADDR_ANY;
575 serv_addr.sin_port = htons(NET_PORT_NO);
576 if (bind(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0)
577 return -errno;
579 listen(sock, 5);
581 pthread_create(&thread, NULL, net_udp_start_listening, NULL);
583 while (1) {
584 cli_sock = accept(sock, (struct sockaddr *)&cli_addr, &clilen);
585 if (cli_sock < 0)
586 return -errno;
588 resp.sock = cli_sock;
589 resp.is_udp = 0;
590 net_got_connection(&resp);
594 struct net_guradian_args {
595 net_guard_check check;
598 void *net_guardian(void *pargs) {
599 net_guard_check check = ((struct net_guradian_args*)pargs)->check;
600 free(pargs);
602 while (1) {
603 pthread_mutex_lock(&guard_lock);
604 check(guarded);
605 pthread_mutex_unlock(&guard_lock);
607 sleep(NET_GUARD_RES);
610 return NULL;
613 int net_set_guard(struct node_address *addr, net_guard_check check) {
614 int res;
615 struct net_guradian_args *args;
617 if (guarded == addr)
618 return 0;
620 args = malloc(sizeof(*args));
621 if (!args)
622 return -ENOMEM;
623 args->check = check;
625 res = pthread_mutex_lock(&guard_lock);
626 res = pthread_mutex_lock(&elders_lock);
628 if (guarded) {
629 pthread_cancel(guardian);
630 net_elder_list_add(guarded);
633 guarded = addr;
634 if (addr->sock != -1)
635 net_elder_list_rem(addr);
636 res = pthread_create(&guardian, NULL, net_guardian, args);
638 pthread_mutex_unlock(&elders_lock);
639 pthread_mutex_unlock(&guard_lock);
641 return -res;
644 static int net_udp_send_request(struct node_address *addr, const void *data,
645 uint32_t size, void *reply, uint32_t rsize) {
646 struct udp_request_entry uentry;
647 struct sockaddr_in serv_addr;
648 struct timespec timeout;
649 struct udp_packet_header *uph;
650 int res;
651 void *buffer;
653 serv_addr.sin_family = AF_INET;
654 serv_addr.sin_addr.s_addr = addr->ipv4;
655 serv_addr.sin_port = addr->port;
657 uentry.addr = &serv_addr;
658 uentry.tag = (uint8_t)rand();
659 uentry.valid = 1;
661 pthread_cond_init(&uentry.event, NULL);
662 pthread_mutex_init(&uentry.event_lock, NULL);
664 uentry.buffer = reply;
665 uentry.bsize = rsize;
666 uentry.prev = NULL;
668 pthread_mutex_lock(&udp_rlist_lock);
669 uentry.next = udp_rlist;
670 if (uentry.next)
671 uentry.next->prev = &uentry;
672 udp_rlist = &uentry;
673 pthread_mutex_unlock(&udp_rlist_lock);
675 buffer = malloc(size + sizeof(struct udp_packet_header));
676 if (!buffer)
677 return -ENOMEM;
679 uph = (struct udp_packet_header*)buffer;
680 uph->size = to_be16(size);
681 uph->tag = uentry.tag;
682 uph->flag = UDP_REQUEST;
683 memcpy((char*)buffer + sizeof(*uph), data, size);
684 res = sendto(udp_socket, buffer, size + sizeof(*uph), 0,
685 (struct sockaddr*)&serv_addr, sizeof(serv_addr));
686 if (res < 0)
687 return -errno;
689 clock_gettime(CLOCK_REALTIME, &timeout);
690 timeout.tv_sec += NET_REPLY_TIMEOUT;
692 pthread_mutex_lock(&uentry.event_lock);
693 res = pthread_cond_timedwait(&uentry.event, &uentry.event_lock,
694 &timeout);
695 if (res == ETIMEDOUT) {
696 pthread_mutex_lock(&udp_rlist_lock);
697 if (uentry.prev)
698 uentry.prev->next = uentry.next;
699 else
700 udp_rlist = uentry.next;
701 if (uentry.next)
702 uentry.next->prev = uentry.prev;
703 pthread_mutex_unlock(&udp_rlist_lock);
705 res = -ETIMEDOUT;
707 pthread_mutex_unlock(&uentry.event_lock);
709 pthread_mutex_destroy(&uentry.event_lock);
710 pthread_cond_destroy(&uentry.event);
712 return res;
715 int net_send_request(struct node_address *addr, const void *data, uint32_t size,
716 void *reply, uint32_t rsize) {
717 int res;
718 fd_set fds;
719 uint32_t ptr;
720 struct timeval timeout;
721 struct packet_header ph;
723 if (size + sizeof(struct udp_packet_header) <= NET_MAX_UDP_SIZE &&
724 rsize + sizeof(struct udp_packet_header) <= NET_MAX_UDP_SIZE &&
725 !net_connection_exist(addr))
726 return net_udp_send_request(addr, data, size, reply, rsize);
728 res = net_acquire_connection(addr);
729 if (res < 0) {
730 net_release_connection(addr);
731 return res;
734 ph.size = to_be32(size);
735 res = send(addr->sock, &ph, sizeof(ph), MSG_NOSIGNAL | MSG_MORE);
736 if (res < 0) {
737 net_release_connection(addr);
738 return -errno;
741 ptr = 0;
742 do {
743 res = send(addr->sock, (char*)data + ptr, min(size - ptr,
744 NET_MAX_DATA_SIZE), MSG_NOSIGNAL);
745 ptr += res;
746 } while (res > 0 && ptr < size);
747 if (res < 0) {
748 net_release_connection(addr);
749 return -errno;
752 ptr = 0;
753 FD_ZERO(&fds);
754 FD_SET(addr->sock, &fds);
755 timeout.tv_sec = NET_REPLY_TIMEOUT;
756 timeout.tv_usec = 0;
757 res = select(addr->sock + 1, &fds, NULL, NULL, &timeout);
758 if (res < 0) {
759 net_release_connection(addr);
760 return -errno;
761 } else if (res == 0) {
762 net_release_connection(addr);
763 return -ETIMEDOUT;
766 res = recv(addr->sock, &ph, sizeof(ph), 0);
767 if (res < 0) {
768 net_release_connection(addr);
769 return -errno;
770 } else if (res == 0) {
771 net_release_connection(addr);
772 return -ECONNRESET;
775 size = from_be32(ph.size);
776 if (size > rsize) {
777 net_release_connection(addr);
778 return -EMSGSIZE;
780 rsize = rsize > size ? size : rsize;
782 do {
783 res = recv(addr->sock, (char*)reply + ptr,
784 min(rsize - ptr, NET_MAX_DATA_SIZE), 0);
785 ptr += res;
786 } while (res > 0 && ptr < rsize);
788 if (res < 0) {
789 net_release_connection(addr);
790 return -errno;
791 } else if (res == 0) {
792 net_release_connection(addr);
793 return -ECONNRESET;
796 return 0;
799 static int net_udp_send_reply(struct response_address *addr, const void *data,
800 uint32_t size) {
801 void *buffer;
802 int res;
803 struct udp_packet_header *uph;
804 socklen_t len = sizeof(addr->addr);
806 if (size + sizeof(struct udp_packet_header) > NET_MAX_UDP_SIZE)
807 return -EMSGSIZE;
809 buffer = malloc(size + sizeof(struct udp_packet_header));
810 if (!buffer)
811 return -ENOMEM;
813 uph = (struct udp_packet_header*)buffer;
814 uph->size = to_be16(size);
815 uph->tag = addr->tag;
816 uph->flag = UDP_REPLY;
817 memcpy((char*)buffer + sizeof(*uph), data, size);
819 res = sendto(udp_socket, buffer, size + sizeof(*uph), 0,
820 (struct sockaddr*)&addr->addr, len);
821 if (res < 0)
822 return -errno;
824 return 0;
827 int net_send_reply(struct response_address *addr, const void *data,
828 uint32_t size) {
829 int res;
830 uint32_t ptr = 0;
831 struct packet_header ph;
833 if (addr->is_udp)
834 return net_udp_send_reply(addr, data, size);
836 ph.size = to_be32(size);
837 res = send(addr->sock, &ph, sizeof(ph), MSG_NOSIGNAL | MSG_MORE);
838 if (res < 0)
839 return -errno;
841 do {
842 res = send(addr->sock, (char*)data + ptr, min(size - ptr,
843 NET_MAX_DATA_SIZE), MSG_NOSIGNAL);
844 ptr += res;
845 } while (res > 0 && ptr < size);
847 if (res < 0)
848 return -errno;
850 return 0;
853 int net_init(void) {
854 pthread_t thread;
855 int i, res;
857 for (i = 0; i < NET_UDP_THREAD_POOL; i++) {
858 res = pthread_create(&thread, NULL, net_udp_serve_client, NULL);
859 if (res != 0)
860 return -res;
863 return -pthread_create(&thread, NULL, net_elders_collector, NULL);