Correctly set python extension dir when prefix is nonstandard
[elliptics.git] / library / dnet_common.c
blob084b0006732813d01151305bf1fe810956983bf6
1 /*
2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
17 #include <sys/stat.h>
18 #include <sys/socket.h>
19 #include <sys/mman.h>
20 #include <sys/wait.h>
22 #include <ctype.h>
23 #include <fcntl.h>
24 #include <limits.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <unistd.h>
29 #include "elliptics.h"
31 #include "elliptics/packet.h"
32 #include "elliptics/interface.h"
35 int dnet_transform(struct dnet_node *n, const void *src, uint64_t size, struct dnet_id *id)
37 struct dnet_transform *t = &n->transform;
38 unsigned int csize = sizeof(id->id);
40 return t->transform(t->priv, src, size, id->id, &csize, 0);
44 static char *dnet_cmd_strings[] = {
45 [DNET_CMD_LOOKUP] = "LOOKUP",
46 [DNET_CMD_REVERSE_LOOKUP] = "REVERSE_LOOKUP",
47 [DNET_CMD_JOIN] = "JOIN",
48 [DNET_CMD_WRITE] = "WRITE",
49 [DNET_CMD_READ] = "READ",
50 [DNET_CMD_LIST] = "CHECK",
51 [DNET_CMD_EXEC] = "EXEC",
52 [DNET_CMD_ROUTE_LIST] = "ROUTE_LIST",
53 [DNET_CMD_STAT] = "STAT",
54 [DNET_CMD_NOTIFY] = "NOTIFY",
55 [DNET_CMD_DEL] = "REMOVE",
56 [DNET_CMD_STAT_COUNT] = "STAT_COUNT",
57 [DNET_CMD_STATUS] = "STATUS",
58 [DNET_CMD_READ_RANGE] = "READ_RANGE",
59 [DNET_CMD_DEL_RANGE] = "DEL_RANGE",
60 [DNET_CMD_AUTH] = "AUTH",
61 [DNET_CMD_BULK_READ] = "BULK_READ",
62 [DNET_CMD_UNKNOWN] = "UNKNOWN",
65 static char *dnet_counter_strings[] = {
66 [DNET_CNTR_LA1] = "DNET_CNTR_LA1",
67 [DNET_CNTR_LA5] = "DNET_CNTR_LA5",
68 [DNET_CNTR_LA15] = "DNET_CNTR_LA15",
69 [DNET_CNTR_BSIZE] = "DNET_CNTR_BSIZE",
70 [DNET_CNTR_FRSIZE] = "DNET_CNTR_FRSIZE",
71 [DNET_CNTR_BLOCKS] = "DNET_CNTR_BLOCKS",
72 [DNET_CNTR_BFREE] = "DNET_CNTR_BFREE",
73 [DNET_CNTR_BAVAIL] = "DNET_CNTR_BAVAIL",
74 [DNET_CNTR_FILES] = "DNET_CNTR_FILES",
75 [DNET_CNTR_FFREE] = "DNET_CNTR_FFREE",
76 [DNET_CNTR_FAVAIL] = "DNET_CNTR_FAVAIL",
77 [DNET_CNTR_FSID] = "DNET_CNTR_FSID",
78 [DNET_CNTR_VM_ACTIVE] = "DNET_CNTR_VM_ACTIVE",
79 [DNET_CNTR_VM_INACTIVE] = "DNET_CNTR_VM_INACTIVE",
80 [DNET_CNTR_VM_TOTAL] = "DNET_CNTR_VM_TOTAL",
81 [DNET_CNTR_VM_FREE] = "DNET_CNTR_VM_FREE",
82 [DNET_CNTR_VM_CACHED] = "DNET_CNTR_VM_CACHED",
83 [DNET_CNTR_VM_BUFFERS] = "DNET_CNTR_VM_BUFFERS",
84 [DNET_CNTR_NODE_FILES] = "DNET_CNTR_NODE_FILES",
85 [DNET_CNTR_NODE_LAST_MERGE] = "DNET_CNTR_NODE_LAST_MERGE",
86 [DNET_CNTR_NODE_CHECK_COPY] = "DNET_CNTR_NODE_CHECK_COPY",
87 [DNET_CNTR_DBR_NOREC] = "DNET_CNTR_DBR_NOREC",
88 [DNET_CNTR_DBR_SYSTEM] = "DNET_CNTR_DBR_SYSTEM",
89 [DNET_CNTR_DBR_ERROR] = "DNET_CNTR_DBR_ERROR",
90 [DNET_CNTR_DBW_SYSTEM] = "DNET_CNTR_DBW_SYSTEM",
91 [DNET_CNTR_DBW_ERROR] = "DNET_CNTR_DBW_ERROR",
92 [DNET_CNTR_UNKNOWN] = "UNKNOWN",
95 char *dnet_cmd_string(int cmd)
97 if (cmd <= 0 || cmd >= __DNET_CMD_MAX)
98 cmd = DNET_CMD_UNKNOWN;
100 return dnet_cmd_strings[cmd];
103 char *dnet_counter_string(int cntr, int cmd_num)
105 if (cntr <= 0 || cntr >= __DNET_CNTR_MAX)
106 cntr = DNET_CNTR_UNKNOWN;
108 if (cntr < cmd_num)
109 return dnet_cmd_string(cntr);
111 if (cntr >= cmd_num && cntr < (cmd_num * 2))
112 return dnet_cmd_string(cntr - cmd_num);
114 return dnet_counter_strings[cntr];
117 static int dnet_add_received_state(struct dnet_node *n, struct dnet_addr_attr *a,
118 int group_id, struct dnet_raw_id *ids, int id_num)
120 int s, err = 0;
121 struct dnet_net_state *nst;
122 struct dnet_id raw;
123 int join;
125 dnet_setup_id(&raw, group_id, ids[0].id);
127 nst = dnet_state_search_by_addr(n, &a->addr);
128 if (nst) {
129 err = -EEXIST;
130 dnet_state_put(nst);
131 goto err_out_exit;
134 s = dnet_socket_create_addr(n, a->sock_type, a->proto, a->family,
135 (struct sockaddr *)&a->addr.addr, a->addr.addr_len, 0);
136 if (s < 0) {
137 err = s;
138 goto err_out_exit;
141 join = DNET_WANT_RECONNECT;
142 if (n->flags & DNET_CFG_JOIN_NETWORK)
143 join = DNET_JOIN;
145 nst = dnet_state_create(n, group_id, ids, id_num, &a->addr, s, &err, join, dnet_state_net_process);
146 if (!nst)
147 goto err_out_close;
149 dnet_log(n, DNET_LOG_NOTICE, "%d: added received state %s.\n",
150 group_id, dnet_state_dump_addr(nst));
152 return 0;
154 err_out_close:
155 dnet_sock_close(s);
156 err_out_exit:
157 return err;
160 static int dnet_process_addr_attr(struct dnet_net_state *st, struct dnet_addr_attr *a, int group_id, int num)
162 struct dnet_node *n = st->n;
163 struct dnet_raw_id *ids;
164 int i, err;
166 ids = (struct dnet_raw_id *)(a + 1);
167 for (i=0; i<num; ++i)
168 dnet_convert_raw_id(&ids[0]);
170 err = dnet_add_received_state(n, a, group_id, ids, num);
171 dnet_log(n, DNET_LOG_DEBUG, "%s: route list: %d entries: %d.\n", dnet_server_convert_dnet_addr(&a->addr), num, err);
173 return err;
176 static int dnet_recv_route_list_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
178 struct dnet_wait *w = priv;
179 struct dnet_addr_attr *a;
180 long size;
181 int err, num;
183 if (is_trans_destroyed(st, cmd)) {
184 err = -EINVAL;
185 if (cmd)
186 err = cmd->status;
188 w->status = err;
189 dnet_wakeup(w, w->cond = 1);
190 dnet_wait_put(w);
191 goto err_out_exit;
195 err = cmd->status;
196 if (!cmd->size || err)
197 goto err_out_exit;
199 size = cmd->size + sizeof(struct dnet_cmd);
200 if (size < (signed)sizeof(struct dnet_addr_cmd)) {
201 err = -EINVAL;
202 goto err_out_exit;
205 num = (cmd->size - sizeof(struct dnet_addr_attr)) / sizeof(struct dnet_raw_id);
206 if (!num) {
207 err = -EINVAL;
208 goto err_out_exit;
211 a = (struct dnet_addr_attr *)(cmd + 1);
212 dnet_convert_addr_attr(a);
214 err = dnet_process_addr_attr(st, a, cmd->id.group_id, num);
216 err_out_exit:
217 return err;
220 int dnet_recv_route_list(struct dnet_net_state *st)
222 struct dnet_io_req req;
223 struct dnet_node *n = st->n;
224 struct dnet_trans *t;
225 struct dnet_cmd *cmd;
226 struct dnet_wait *w;
227 int err;
229 w = dnet_wait_alloc(0);
230 if (!w) {
231 err = -ENOMEM;
232 goto err_out_exit;
235 t = dnet_trans_alloc(n, sizeof(struct dnet_cmd));
236 if (!t) {
237 err = -ENOMEM;
238 goto err_out_wait_put;
241 t->complete = dnet_recv_route_list_complete;
242 t->priv = w;
244 cmd = (struct dnet_cmd *)(t + 1);
246 cmd->flags = DNET_FLAGS_NEED_ACK | DNET_FLAGS_DIRECT | DNET_FLAGS_NOLOCK;
247 cmd->status = 0;
249 memcpy(&t->cmd, cmd, sizeof(struct dnet_cmd));
251 cmd->cmd = t->command = DNET_CMD_ROUTE_LIST;
253 t->st = dnet_state_get(st);
254 cmd->trans = t->rcv_trans = t->trans = atomic_inc(&n->trans);
256 dnet_convert_cmd(cmd);
258 dnet_log(n, DNET_LOG_DEBUG, "%s: list route request to %s.\n", dnet_dump_id(&cmd->id),
259 dnet_server_convert_dnet_addr(&st->addr));
261 memset(&req, 0, sizeof(req));
262 req.st = st;
263 req.header = cmd;
264 req.hsize = sizeof(struct dnet_cmd);
266 dnet_wait_get(w);
267 err = dnet_trans_send(t, &req);
268 if (err)
269 goto err_out_destroy;
271 err = dnet_wait_event(w, w->cond != 0, &n->wait_ts);
272 dnet_wait_put(w);
274 return 0;
276 err_out_destroy:
277 dnet_trans_put(t);
278 err_out_wait_put:
279 dnet_wait_put(w);
280 err_out_exit:
281 return err;
284 static struct dnet_net_state *dnet_add_state_socket(struct dnet_node *n, struct dnet_addr *addr, int s, int *errp, int join)
286 struct dnet_net_state *st, dummy;
287 char buf[sizeof(struct dnet_addr_cmd)];
288 struct dnet_cmd *cmd;
289 int err, num, i, size;
290 struct dnet_raw_id *ids;
292 memset(buf, 0, sizeof(buf));
294 cmd = (struct dnet_cmd *)(buf);
296 cmd->flags = DNET_FLAGS_DIRECT | DNET_FLAGS_NOLOCK;
297 cmd->cmd = DNET_CMD_REVERSE_LOOKUP;
299 dnet_convert_cmd(cmd);
301 st = &dummy;
302 memset(st, 0, sizeof(struct dnet_net_state));
304 st->write_s = st->read_s = s;
305 st->n = n;
307 err = dnet_send_nolock(st, buf, sizeof(struct dnet_cmd));
308 if (err) {
309 dnet_log(n, DNET_LOG_ERROR, "Failed to send reverse "
310 "lookup message to %s, err: %d.\n",
311 dnet_server_convert_dnet_addr(addr), err);
312 goto err_out_exit;
315 err = dnet_recv(st, buf, sizeof(buf));
316 if (err) {
317 dnet_log(n, DNET_LOG_ERROR, "Failed to receive reverse "
318 "lookup headers from %s, err: %d.\n",
319 dnet_server_convert_dnet_addr(addr), err);
320 goto err_out_exit;
323 cmd = (struct dnet_cmd *)(buf);
325 dnet_convert_addr_cmd((struct dnet_addr_cmd *)buf);
327 size = cmd->size - sizeof(struct dnet_addr_attr);
328 num = size / sizeof(struct dnet_raw_id);
330 dnet_log(n, DNET_LOG_DEBUG, "%s: waiting for %d ids\n", dnet_dump_id(&cmd->id), num);
332 ids = malloc(size);
333 if (!ids) {
334 err = -ENOMEM;
335 goto err_out_exit;
338 err = dnet_recv(st, ids, size);
339 if (err) {
340 dnet_log(n, DNET_LOG_ERROR, "Failed to receive reverse "
341 "lookup body (%llu bytes) from %s, err: %d.\n",
342 (unsigned long long)cmd->size,
343 dnet_server_convert_dnet_addr(addr), err);
344 goto err_out_exit;
347 for (i=0; i<num; ++i)
348 dnet_convert_raw_id(&ids[i]);
350 st = dnet_state_create(n, cmd->id.group_id, ids, num, addr, s, &err, join, dnet_state_net_process);
351 if (!st) {
352 /* socket is already closed */
353 s = -1;
354 goto err_out_free;
356 free(ids);
358 return st;
360 err_out_free:
361 free(ids);
362 err_out_exit:
363 *errp = err;
364 if (s >= 0)
365 dnet_sock_close(s);
366 return NULL;
369 int dnet_add_state(struct dnet_node *n, struct dnet_config *cfg)
371 int s, err, join = DNET_WANT_RECONNECT;
372 struct dnet_addr addr;
373 struct dnet_net_state *st;
375 memset(&addr, 0, sizeof(addr));
377 addr.addr_len = sizeof(addr.addr);
378 s = dnet_socket_create(n, cfg, &addr, 0);
379 if (s < 0) {
380 err = s;
381 goto err_out_reconnect;
384 if (n->flags & DNET_CFG_JOIN_NETWORK)
385 join = DNET_JOIN;
387 /* will close socket on error */
388 st = dnet_add_state_socket(n, &addr, s, &err, join);
389 if (!st)
390 goto err_out_reconnect;
392 if (!(cfg->flags & DNET_CFG_NO_ROUTE_LIST))
393 dnet_recv_route_list(st);
395 return 0;
397 err_out_reconnect:
398 if ((err == -EADDRINUSE) || (err == -ECONNREFUSED) || (err == -ECONNRESET) ||
399 (err == -EINPROGRESS) || (err == -EAGAIN))
400 dnet_add_reconnect_state(n, &addr, join);
401 return err;
404 struct dnet_write_completion {
405 void *reply;
406 int size;
407 struct dnet_wait *wait;
410 static void dnet_write_complete_free(struct dnet_write_completion *wc)
412 if (atomic_dec_and_test(&wc->wait->refcnt)) {
413 dnet_wait_destroy(wc->wait);
414 free(wc->reply);
415 free(wc);
419 static int dnet_write_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
421 int err = -EINVAL;
422 struct dnet_write_completion *wc = priv;
423 struct dnet_wait *w = wc->wait;
425 if (is_trans_destroyed(st, cmd)) {
426 dnet_wakeup(w, w->cond++);
427 dnet_write_complete_free(wc);
428 return 0;
431 err = cmd->status;
432 if (!err && st && (cmd->size > sizeof(struct dnet_addr_attr) + sizeof(struct dnet_file_info))) {
433 int old_size = wc->size;
434 void *data;
436 wc->size += cmd->size + sizeof(struct dnet_cmd) + sizeof(struct dnet_addr);
437 wc->reply = realloc(wc->reply, wc->size);
438 if (!wc->reply) {
439 err = -ENOMEM;
440 goto err_out_exit;
443 data = wc->reply + old_size;
445 memcpy(data, &st->addr, sizeof(struct dnet_addr));
446 memcpy(data + sizeof(struct dnet_addr), cmd, sizeof(struct dnet_cmd));
447 memcpy(data + sizeof(struct dnet_addr) + sizeof(struct dnet_cmd), cmd + 1, cmd->size);
450 err_out_exit:
451 pthread_mutex_lock(&w->wait_lock);
452 if (w->status < 0)
453 w->status = err;
454 pthread_mutex_unlock(&w->wait_lock);
456 return 0;
459 static struct dnet_trans *dnet_io_trans_create(struct dnet_node *n, struct dnet_io_control *ctl, int *errp)
461 struct dnet_io_req req;
462 struct dnet_trans *t = NULL;
463 struct dnet_io_attr *io;
464 struct dnet_cmd *cmd;
465 uint64_t size = ctl->io.size;
466 uint64_t tsize = sizeof(struct dnet_io_attr) + sizeof(struct dnet_cmd);
467 int err;
469 if (ctl->cmd == DNET_CMD_READ)
470 size = 0;
472 if (ctl->fd < 0 && size < DNET_COPY_IO_SIZE)
473 tsize += size;
475 t = dnet_trans_alloc(n, tsize);
476 if (!t) {
477 err = -ENOMEM;
478 goto err_out_complete;
480 t->complete = ctl->complete;
481 t->priv = ctl->priv;
483 cmd = (struct dnet_cmd *)(t + 1);
484 io = (struct dnet_io_attr *)(cmd + 1);
486 if (ctl->fd < 0 && size < DNET_COPY_IO_SIZE) {
487 if (size) {
488 void *data = io + 1;
489 memcpy(data, ctl->data, size);
493 memcpy(&cmd->id, &ctl->id, sizeof(struct dnet_id));
494 cmd->size = sizeof(struct dnet_io_attr) + size;
495 cmd->flags = ctl->cflags;
496 cmd->status = 0;
498 cmd->cmd = t->command = ctl->cmd;
500 memcpy(io, &ctl->io, sizeof(struct dnet_io_attr));
501 memcpy(&t->cmd, cmd, sizeof(struct dnet_cmd));
503 t->st = dnet_state_get_first(n, &cmd->id);
504 if (!t->st) {
505 err = -ENOENT;
506 goto err_out_destroy;
509 cmd->trans = t->rcv_trans = t->trans = atomic_inc(&n->trans);
511 dnet_log(n, DNET_LOG_INFO, "%s: created trans: %llu, cmd: %s, cflags: %llx, size: %llu, offset: %llu, "
512 "fd: %d, local_offset: %llu -> %s weight: %f, mrt: %ld.\n",
513 dnet_dump_id(&ctl->id),
514 (unsigned long long)t->trans,
515 dnet_cmd_string(ctl->cmd), (unsigned long long)cmd->flags,
516 (unsigned long long)ctl->io.size, (unsigned long long)ctl->io.offset,
517 ctl->fd,
518 (unsigned long long)ctl->local_offset,
519 dnet_server_convert_dnet_addr(&t->st->addr), t->st->weight, t->st->median_read_time);
521 dnet_convert_cmd(cmd);
522 dnet_convert_io_attr(io);
525 memset(&req, 0, sizeof(req));
526 req.st = t->st;
527 req.header = cmd;
528 req.hsize = tsize;
530 req.fd = ctl->fd;
532 if (ctl->fd >= 0) {
533 req.local_offset = ctl->local_offset;
534 req.fsize = size;
535 } else if (size >= DNET_COPY_IO_SIZE) {
536 req.data = (void *)ctl->data;
537 req.dsize = size;
540 err = dnet_trans_send(t, &req);
541 if (err)
542 goto err_out_destroy;
544 return t;
546 err_out_complete:
547 if (ctl->complete)
548 ctl->complete(NULL, NULL, ctl->priv);
549 *errp = err;
550 return NULL;
552 err_out_destroy:
553 dnet_trans_put(t);
554 *errp = err;
555 return NULL;
558 int dnet_trans_create_send_all(struct dnet_node *n, struct dnet_io_control *ctl)
560 int num = 0, i, err;
562 pthread_mutex_lock(&n->group_lock);
563 for (i=0; i<n->group_num; ++i) {
564 ctl->id.group_id = n->groups[i];
566 dnet_io_trans_create(n, ctl, &err);
567 num++;
569 pthread_mutex_unlock(&n->group_lock);
571 if (!num) {
572 dnet_io_trans_create(n, ctl, &err);
573 num++;
576 return num;
579 int dnet_write_object(struct dnet_node *n, struct dnet_io_control *ctl)
581 return dnet_trans_create_send_all(n, ctl);
584 static int dnet_write_file_id_raw(struct dnet_node *n, const char *file, struct dnet_id *id,
585 uint64_t local_offset, uint64_t remote_offset, uint64_t size,
586 uint64_t cflags, unsigned int ioflags)
588 int fd, err, trans_num;
589 struct stat stat;
590 struct dnet_wait *w;
591 struct dnet_io_control ctl;
592 struct dnet_write_completion *wc;
594 wc = malloc(sizeof(struct dnet_write_completion));
595 if (!wc) {
596 err = -ENOMEM;
597 goto err_out_exit;
599 memset(wc, 0, sizeof(struct dnet_write_completion));
601 w = dnet_wait_alloc(0);
602 if (!w) {
603 free(wc);
604 err = -ENOMEM;
605 dnet_log(n, DNET_LOG_ERROR, "Failed to allocate read waiting structure.\n");
606 goto err_out_exit;
609 wc->wait = w;
611 fd = open(file, O_RDONLY | O_LARGEFILE | O_CLOEXEC);
612 if (fd < 0) {
613 err = -errno;
614 dnet_log_err(n, "Failed to open to be written file '%s'", file);
615 goto err_out_put;
618 err = fstat(fd, &stat);
619 if (err) {
620 err = -errno;
621 dnet_log_err(n, "Failed to stat to be written file '%s'", file);
622 goto err_out_close;
625 if (local_offset >= (uint64_t)stat.st_size) {
626 err = 0;
627 goto err_out_close;
630 if (!size || size + local_offset >= (uint64_t)stat.st_size)
631 size = stat.st_size - local_offset;
633 memset(&ctl, 0, sizeof(struct dnet_io_control));
635 atomic_set(&w->refcnt, INT_MAX);
637 ctl.data = NULL;
638 ctl.fd = fd;
639 ctl.local_offset = local_offset;
641 w->status = -ENOENT;
642 ctl.complete = dnet_write_complete;
643 ctl.priv = wc;
645 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
646 ctl.cmd = DNET_CMD_WRITE;
648 memcpy(ctl.io.id, id->id, DNET_ID_SIZE);
649 memcpy(ctl.io.parent, id->id, DNET_ID_SIZE);
651 ctl.io.flags = ioflags;
652 ctl.io.size = size;
653 ctl.io.offset = remote_offset;
654 ctl.io.type = id->type;
656 memcpy(&ctl.id, id, sizeof(struct dnet_id));
658 trans_num = dnet_write_object(n, &ctl);
659 if (trans_num < 0)
660 trans_num = 0;
663 * 1 - the first reference counter we grabbed at allocation time
665 atomic_sub(&w->refcnt, INT_MAX - trans_num - 1);
667 err = dnet_wait_event(w, w->cond == trans_num, &n->wait_ts);
668 if (err || w->status) {
669 if (!err)
670 err = w->status;
673 if (!err && !trans_num)
674 err = -EINVAL;
676 if (err) {
677 dnet_log(n, DNET_LOG_ERROR, "Failed to write file '%s' into the storage, transactions: %d, err: %d.\n", file, trans_num, err);
678 goto err_out_close;
681 dnet_log(n, DNET_LOG_NOTICE, "Successfully wrote file: '%s' into the storage, size: %llu.\n",
682 file, (unsigned long long)size);
684 close(fd);
685 dnet_write_complete_free(wc);
687 return 0;
689 err_out_close:
690 close(fd);
691 err_out_put:
692 dnet_write_complete_free(wc);
693 err_out_exit:
694 return err;
697 int dnet_write_file_id(struct dnet_node *n, const char *file, struct dnet_id *id, uint64_t local_offset,
698 uint64_t remote_offset, uint64_t size, uint64_t cflags, unsigned int ioflags)
700 int err = dnet_write_file_id_raw(n, file, id, local_offset, remote_offset, size, cflags, ioflags);
701 if (!err && !(ioflags & DNET_IO_FLAGS_CACHE_ONLY))
702 err = dnet_create_write_metadata_strings(n, NULL, 0, id, NULL, cflags);
704 return err;
707 int dnet_write_file(struct dnet_node *n, const char *file, const void *remote, int remote_len,
708 uint64_t local_offset, uint64_t remote_offset, uint64_t size,
709 uint64_t cflags, unsigned int ioflags, int type)
711 int err;
712 struct dnet_id id;
714 dnet_transform(n, remote, remote_len, &id);
715 id.type = type;
717 err = dnet_write_file_id_raw(n, file, &id, local_offset, remote_offset, size, cflags, ioflags);
718 if (!err && !(ioflags & DNET_IO_FLAGS_CACHE_ONLY))
719 err = dnet_create_write_metadata_strings(n, remote, remote_len, &id, NULL, cflags);
721 return err;
724 static int dnet_read_file_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
726 int fd, err;
727 struct dnet_node *n;
728 struct dnet_io_completion *c = priv;
729 struct dnet_io_attr *io;
730 void *data;
732 if (is_trans_destroyed(st, cmd)) {
733 if (c->wait) {
734 int err = 1;
735 if (cmd && cmd->status)
736 err = cmd->status;
738 dnet_wakeup(c->wait, c->wait->cond = err);
739 dnet_wait_put(c->wait);
742 free(c);
743 return 0;
746 n = st->n;
748 if (cmd->status != 0 || cmd->size == 0) {
749 err = cmd->status;
750 goto err_out_exit_no_log;
753 if (cmd->size <= sizeof(struct dnet_io_attr)) {
754 dnet_log(n, DNET_LOG_ERROR, "%s: read completion error: wrong size: cmd_size: %llu, must be more than %zu.\n",
755 dnet_dump_id(&cmd->id), (unsigned long long)cmd->size,
756 sizeof(struct dnet_io_attr));
757 err = -EINVAL;
758 goto err_out_exit_no_log;
761 io = (struct dnet_io_attr *)(cmd + 1);
762 data = io + 1;
764 dnet_convert_io_attr(io);
766 fd = open(c->file, O_RDWR | O_CREAT | O_CLOEXEC, 0644);
767 if (fd < 0) {
768 err = -errno;
769 dnet_log_err(n, "%s: failed to open read completion file '%s'", dnet_dump_id(&cmd->id), c->file);
770 goto err_out_exit;
773 err = pwrite(fd, data, io->size, c->offset);
774 if (err <= 0) {
775 err = -errno;
776 dnet_log_err(n, "%s: failed to write data into completion file '%s'", dnet_dump_id(&cmd->id), c->file);
777 goto err_out_close;
780 close(fd);
781 dnet_log(n, DNET_LOG_NOTICE, "%s: read completed: file: '%s', offset: %llu, size: %llu, status: %d.\n",
782 dnet_dump_id(&cmd->id), c->file, (unsigned long long)c->offset,
783 (unsigned long long)io->size, cmd->status);
785 return cmd->status;
787 err_out_close:
788 close(fd);
789 err_out_exit:
790 dnet_log(n, DNET_LOG_ERROR, "%s: read completed: file: '%s', offset: %llu, size: %llu, status: %d, err: %d.\n",
791 dnet_dump_id(&cmd->id), c->file, (unsigned long long)io->offset,
792 (unsigned long long)io->size, cmd->status, err);
793 err_out_exit_no_log:
794 dnet_wakeup(c->wait, c->wait->cond = err ? err : 1);
795 return err;
798 int dnet_read_object(struct dnet_node *n, struct dnet_io_control *ctl)
800 int err;
802 if (!dnet_io_trans_create(n, ctl, &err))
803 return err;
805 return 0;
808 static int dnet_read_file_raw_exec(struct dnet_node *n, const char *file, unsigned int len,
809 uint64_t write_offset, uint64_t io_offset, uint64_t io_size,
810 struct dnet_id *id, struct dnet_wait *w)
812 struct dnet_io_control ctl;
813 struct dnet_io_completion *c;
814 int err, wait_init = ~0;
816 memset(&ctl, 0, sizeof(struct dnet_io_control));
818 ctl.io.size = io_size;
819 ctl.io.offset = io_offset;
821 ctl.io.type = id->type;
823 memcpy(ctl.io.parent, id->id, DNET_ID_SIZE);
824 memcpy(ctl.io.id, id->id, DNET_ID_SIZE);
826 memcpy(&ctl.id, id, sizeof(struct dnet_id));
828 ctl.fd = -1;
829 ctl.complete = dnet_read_file_complete;
830 ctl.cmd = DNET_CMD_READ;
831 ctl.cflags = DNET_FLAGS_NEED_ACK;
833 c = malloc(sizeof(struct dnet_io_completion) + len + 1 + sizeof(DNET_HISTORY_SUFFIX));
834 if (!c) {
835 dnet_log(n, DNET_LOG_ERROR, "%s: failed to allocate IO completion structure "
836 "for '%s' file reading.\n",
837 dnet_dump_id(&ctl.id), file);
838 err = -ENOMEM;
839 goto err_out_exit;
842 memset(c, 0, sizeof(struct dnet_io_completion) + len + 1 + sizeof(DNET_HISTORY_SUFFIX));
844 c->wait = dnet_wait_get(w);
845 c->offset = write_offset;
846 c->file = (char *)(c + 1);
848 sprintf(c->file, "%s", file);
850 ctl.priv = c;
852 w->cond = wait_init;
853 err = dnet_read_object(n, &ctl);
854 if (err)
855 goto err_out_exit;
857 err = dnet_wait_event(w, w->cond != wait_init, &n->wait_ts);
858 if ((err < 0) || (w->cond < 0)) {
859 char id_str[2*DNET_ID_SIZE + 1];
860 if (!err)
861 err = w->cond;
862 dnet_log(n, DNET_LOG_ERROR, "%d:%s '%s' : failed to read data: %d\n",
863 ctl.id.group_id, dnet_dump_id_len_raw(ctl.id.id, DNET_ID_SIZE, id_str),
864 file, err);
865 goto err_out_exit;
868 return 0;
870 err_out_exit:
871 return err;
874 static int dnet_read_file_raw(struct dnet_node *n, const char *file, struct dnet_id *id, uint64_t offset, uint64_t size)
876 int err = -ENOENT, len = strlen(file), i;
877 struct dnet_wait *w;
878 int *g, num;
880 w = dnet_wait_alloc(~0);
881 if (!w) {
882 err = -ENOMEM;
883 dnet_log(n, DNET_LOG_ERROR, "Failed to allocate read waiting.\n");
884 goto err_out_exit;
887 if (!size)
888 size = ~0ULL;
890 num = dnet_mix_states(n, id, &g);
891 if (num < 0) {
892 err = num;
893 goto err_out_exit;
896 for (i=0; i<num; ++i) {
897 id->group_id = g[i];
899 err = dnet_read_file_raw_exec(n, file, len, 0, offset, size, id, w);
900 if (err)
901 continue;
903 break;
906 dnet_wait_put(w);
907 free(g);
909 err_out_exit:
910 return err;
913 int dnet_read_file_id(struct dnet_node *n, const char *file, struct dnet_id *id, uint64_t offset, uint64_t size)
915 return dnet_read_file_raw(n, file, id, offset, size);
918 int dnet_read_file(struct dnet_node *n, const char *file, const void *remote, int remote_size,
919 uint64_t offset, uint64_t size, int type)
921 struct dnet_id id;
923 dnet_transform(n, remote, remote_size, &id);
924 id.type = type;
926 return dnet_read_file_raw(n, file, &id, offset, size);
929 struct dnet_wait *dnet_wait_alloc(int cond)
931 int err;
932 struct dnet_wait *w;
934 w = malloc(sizeof(struct dnet_wait));
935 if (!w) {
936 err = -ENOMEM;
937 goto err_out_exit;
940 memset(w, 0, sizeof(struct dnet_wait));
942 err = pthread_cond_init(&w->wait, NULL);
943 if (err)
944 goto err_out_exit;
946 err = pthread_mutex_init(&w->wait_lock, NULL);
947 if (err)
948 goto err_out_destroy;
950 w->cond = cond;
951 atomic_init(&w->refcnt, 1);
953 return w;
955 err_out_destroy:
956 pthread_mutex_destroy(&w->wait_lock);
957 err_out_exit:
958 return NULL;
961 void dnet_wait_destroy(struct dnet_wait *w)
963 pthread_mutex_destroy(&w->wait_lock);
964 pthread_cond_destroy(&w->wait);
965 free(w->ret);
966 free(w);
969 static int dnet_send_cmd_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
971 struct dnet_wait *w = priv;
973 if (is_trans_destroyed(st, cmd)) {
974 dnet_wakeup(w, w->cond++);
975 dnet_wait_put(w);
976 return 0;
979 w->status = cmd->status;
981 if (cmd->size) {
982 void *old = w->ret;
983 void *data = cmd + 1;
985 w->ret = realloc(w->ret, w->size + cmd->size);
986 if (!w->ret) {
987 w->ret = old;
988 w->status = -ENOMEM;
989 } else {
990 memcpy(w->ret + w->size, data, cmd->size);
991 w->size += cmd->size;
995 return w->status;
998 static int dnet_send_cmd_single(struct dnet_net_state *st, struct dnet_wait *w, struct sph *e, uint64_t cflags)
1000 struct dnet_trans_control ctl;
1002 memset(&ctl, 0, sizeof(struct dnet_trans_control));
1004 dnet_setup_id(&ctl.id, st->idc->group->group_id, st->idc->ids[0].raw.id);
1005 ctl.size = sizeof(struct sph) + e->event_size + e->data_size + e->binary_size;
1006 ctl.cmd = DNET_CMD_EXEC;
1007 ctl.complete = dnet_send_cmd_complete;
1008 ctl.priv = w;
1009 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
1011 dnet_convert_sph(e);
1013 ctl.data = e;
1015 return dnet_trans_alloc_send_state(st, &ctl);
1018 static int dnet_send_cmd_raw(struct dnet_node *n, struct dnet_id *id,
1019 struct sph *e, void **ret, uint64_t cflags)
1021 struct dnet_net_state *st;
1022 int err = -ENOENT, num = 0;
1023 struct dnet_wait *w;
1024 struct dnet_group *g;
1026 w = dnet_wait_alloc(0);
1027 if (!w) {
1028 err = -ENOMEM;
1029 goto err_out_exit;
1032 if (id && id->group_id != 0) {
1033 dnet_wait_get(w);
1034 st = dnet_state_get_first(n, id);
1035 if (!st)
1036 goto err_out_put;
1037 err = dnet_send_cmd_single(st, w, e, cflags);
1038 dnet_state_put(st);
1039 num = 1;
1040 } else if (id && id->group_id == 0) {
1041 pthread_mutex_lock(&n->state_lock);
1042 list_for_each_entry(g, &n->group_list, group_entry) {
1043 dnet_wait_get(w);
1045 id->group_id = g->group_id;
1047 st = dnet_state_search_nolock(n, id);
1048 if (st) {
1049 if (st != n->st) {
1050 err = dnet_send_cmd_single(st, w, e, cflags);
1051 num++;
1053 dnet_state_put(st);
1056 pthread_mutex_unlock(&n->state_lock);
1057 } else {
1058 pthread_mutex_lock(&n->state_lock);
1059 list_for_each_entry(g, &n->group_list, group_entry) {
1060 list_for_each_entry(st, &g->state_list, state_entry) {
1061 if (st == n->st)
1062 continue;
1064 dnet_wait_get(w);
1066 err = dnet_send_cmd_single(st, w, e, cflags);
1067 num++;
1070 pthread_mutex_unlock(&n->state_lock);
1073 err = dnet_wait_event(w, w->cond == num, &n->wait_ts);
1074 if (err)
1075 goto err_out_put;
1077 dnet_log(n, DNET_LOG_INFO, "%s: return data: %p, size: %d\n", dnet_dump_id_str(e->src.id), w->ret, w->size);
1078 if (w->ret) {
1079 *ret = w->ret;
1080 w->ret = NULL;
1082 err = w->size;
1085 dnet_wait_put(w);
1087 return err;
1089 err_out_put:
1090 dnet_wait_put(w);
1091 err_out_exit:
1092 return err;
1095 int dnet_send_cmd(struct dnet_node *n, struct dnet_id *id, struct sph *e, void **ret)
1097 return dnet_send_cmd_raw(n, id, e, ret, 0);
1100 int dnet_send_cmd_nolock(struct dnet_node *n, struct dnet_id *id, struct sph *e, void **ret)
1102 return dnet_send_cmd_raw(n, id, e, ret, DNET_FLAGS_NOLOCK);
1105 int dnet_try_reconnect(struct dnet_node *n)
1107 struct dnet_addr_storage *ast, *tmp;
1108 struct dnet_net_state *st;
1109 LIST_HEAD(list);
1110 int s, err, join;
1112 if (list_empty(&n->reconnect_list))
1113 return 0;
1115 pthread_mutex_lock(&n->reconnect_lock);
1116 list_for_each_entry_safe(ast, tmp, &n->reconnect_list, reconnect_entry) {
1117 list_move(&ast->reconnect_entry, &list);
1119 pthread_mutex_unlock(&n->reconnect_lock);
1121 list_for_each_entry_safe(ast, tmp, &list, reconnect_entry) {
1122 s = dnet_socket_create_addr(n, n->sock_type, n->proto, n->family,
1123 (struct sockaddr *)ast->addr.addr, ast->addr.addr_len, 0);
1124 if (s < 0)
1125 goto out_add;
1127 join = DNET_WANT_RECONNECT;
1128 if (ast->__join_state == DNET_JOIN)
1129 join = DNET_JOIN;
1131 st = dnet_add_state_socket(n, &ast->addr, s, &err, join);
1132 if (st)
1133 goto out_remove;
1135 dnet_sock_close(s);
1137 if (err == -EEXIST || err == -EINVAL)
1138 goto out_remove;
1140 out_add:
1141 dnet_add_reconnect_state(n, &ast->addr, ast->__join_state);
1142 out_remove:
1143 list_del(&ast->reconnect_entry);
1144 free(ast);
1147 return 0;
1150 int dnet_lookup_object(struct dnet_node *n, struct dnet_id *id, uint64_t cflags,
1151 int (* complete)(struct dnet_net_state *, struct dnet_cmd *, void *),
1152 void *priv)
1154 struct dnet_io_req req;
1155 struct dnet_trans *t;
1156 struct dnet_cmd *cmd;
1157 int err;
1159 t = dnet_trans_alloc(n, sizeof(struct dnet_cmd));
1160 if (!t) {
1161 err = -ENOMEM;
1162 goto err_out_complete;
1164 t->complete = complete;
1165 t->priv = priv;
1167 cmd = (struct dnet_cmd *)(t + 1);
1169 memcpy(&cmd->id, id, sizeof(struct dnet_id));
1171 memcpy(&t->cmd, cmd, sizeof(struct dnet_cmd));
1173 cmd->cmd = t->command = DNET_CMD_LOOKUP;
1174 cmd->flags = cflags | DNET_FLAGS_NEED_ACK;
1176 t->st = dnet_state_get_first(n, &cmd->id);
1177 if (!t->st) {
1178 err = -ENOENT;
1179 goto err_out_destroy;
1182 cmd->trans = t->rcv_trans = t->trans = atomic_inc(&n->trans);
1183 dnet_convert_cmd(cmd);
1185 dnet_log(n, DNET_LOG_NOTICE, "%s: lookup to %s.\n", dnet_dump_id(id), dnet_server_convert_dnet_addr(&t->st->addr));
1187 memset(&req, 0, sizeof(req));
1188 req.st = t->st;
1189 req.header = cmd;
1190 req.hsize = sizeof(struct dnet_cmd);
1192 err = dnet_trans_send(t, &req);
1193 if (err)
1194 goto err_out_destroy;
1196 return 0;
1198 err_out_complete:
1199 if (complete)
1200 complete(NULL, NULL, priv);
1201 return err;
1203 err_out_destroy:
1204 dnet_trans_put(t);
1205 return err;
1208 int dnet_lookup_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
1210 struct dnet_wait *w = priv;
1211 struct dnet_node *n = NULL;
1212 struct dnet_addr_attr *a;
1213 struct dnet_net_state *other;
1214 char addr_str[128] = "no-address";
1215 int err;
1217 if (is_trans_destroyed(st, cmd)) {
1218 dnet_wakeup(w, w->cond++);
1219 dnet_wait_put(w);
1220 return 0;
1222 n = st->n;
1224 err = cmd->status;
1225 if (err || !cmd->size)
1226 goto err_out_exit;
1228 if (cmd->size < sizeof(struct dnet_addr_attr)) {
1229 dnet_log(st->n, DNET_LOG_ERROR, "%s: wrong dnet_addr attribute size %llu, must be at least %zu.\n",
1230 dnet_dump_id(&cmd->id), (unsigned long long)cmd->size, sizeof(struct dnet_addr_attr));
1231 err = -EINVAL;
1232 goto err_out_exit;
1235 a = (struct dnet_addr_attr *)(cmd + 1);
1237 dnet_convert_addr_attr(a);
1238 dnet_server_convert_dnet_addr_raw(&a->addr, addr_str, sizeof(addr_str));
1240 if (cmd->size > sizeof(struct dnet_addr_attr) + sizeof(struct dnet_file_info)) {
1241 struct dnet_file_info *info = (struct dnet_file_info *)(a + 1);
1243 dnet_convert_file_info(info);
1245 dnet_log_raw(n, DNET_LOG_NOTICE, "%s: lookup object: %s: "
1246 "offset: %llu, size: %llu, mode: %llo, path: %s\n",
1247 dnet_dump_id(&cmd->id), addr_str,
1248 (unsigned long long)info->offset, (unsigned long long)info->size,
1249 (unsigned long long)info->mode, (char *)(info + 1));
1250 } else {
1251 dnet_log_raw(n, DNET_LOG_INFO, "%s: lookup object: %s\n",
1252 dnet_dump_id(&cmd->id), addr_str);
1256 other = dnet_state_search_by_addr(n, &a->addr);
1257 if (other) {
1258 dnet_state_put(other);
1259 } else {
1260 dnet_recv_route_list(st);
1263 return 0;
1265 err_out_exit:
1266 if (n)
1267 dnet_log(n, DNET_LOG_ERROR, "%s: lookup completion status: %d, err: %d.\n", dnet_dump_id(&cmd->id), cmd->status, err);
1269 return err;
1272 int dnet_lookup(struct dnet_node *n, const char *file)
1274 int err, error = 0, i;
1275 struct dnet_wait *w;
1276 struct dnet_id raw;
1278 w = dnet_wait_alloc(0);
1279 if (!w) {
1280 err = -ENOMEM;
1281 goto err_out_exit;
1284 dnet_transform(n, file, strlen(file), &raw);
1286 pthread_mutex_lock(&n->group_lock);
1287 for (i=0; i<n->group_num; ++i) {
1288 raw.group_id = n->groups[i];
1290 err = dnet_lookup_object(n, &raw, 0, dnet_lookup_complete, dnet_wait_get(w));
1291 if (err) {
1292 error = err;
1293 continue;
1296 err = dnet_wait_event(w, w->cond == 1, &n->wait_ts);
1297 if (err || w->status) {
1298 if (!err)
1299 err = w->status;
1300 error = err;
1301 continue;
1304 error = 0;
1305 break;
1307 pthread_mutex_unlock(&n->group_lock);
1309 dnet_wait_put(w);
1310 return error;
1312 err_out_exit:
1313 return err;
1316 struct dnet_addr *dnet_state_addr(struct dnet_net_state *st)
1318 return &st->addr;
1321 static int dnet_stat_complete(struct dnet_net_state *state, struct dnet_cmd *cmd, void *priv)
1323 struct dnet_wait *w = priv;
1324 float la[3];
1325 struct dnet_stat *st;
1326 int err = -EINVAL;
1328 if (is_trans_destroyed(state, cmd)) {
1329 dnet_wakeup(w, w->cond++);
1330 dnet_wait_put(w);
1331 return 0;
1334 if (cmd->cmd == DNET_CMD_STAT && cmd->size == sizeof(struct dnet_stat)) {
1335 st = (struct dnet_stat *)(cmd + 1);
1337 dnet_convert_stat(st);
1339 la[0] = (float)st->la[0] / 100.0;
1340 la[1] = (float)st->la[1] / 100.0;
1341 la[2] = (float)st->la[2] / 100.0;
1343 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: la: %.2f %.2f %.2f.\n",
1344 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state),
1345 la[0], la[1], la[2]);
1346 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: mem: "
1347 "total: %llu kB, free: %llu kB, cache: %llu kB.\n",
1348 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state),
1349 (unsigned long long)st->vm_total,
1350 (unsigned long long)st->vm_free,
1351 (unsigned long long)st->vm_cached);
1352 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: fs: "
1353 "total: %llu mB, avail: %llu mB, files: %llu, fsid: %llx.\n",
1354 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state),
1355 (unsigned long long)(st->frsize * st->blocks / 1024 / 1024),
1356 (unsigned long long)(st->bavail * st->bsize / 1024 / 1024),
1357 (unsigned long long)st->files, (unsigned long long)st->fsid);
1358 err = 0;
1359 } else if (cmd->size >= sizeof(struct dnet_addr_stat) && cmd->cmd == DNET_CMD_STAT_COUNT) {
1360 struct dnet_addr_stat *as = (struct dnet_addr_stat *)(cmd + 1);
1361 int i;
1363 dnet_convert_addr_stat(as, 0);
1365 for (i=0; i<as->num; ++i) {
1366 if (as->num > as->cmd_num) {
1367 if (i == 0)
1368 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: Storage commands\n",
1369 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state));
1370 if (i == as->cmd_num)
1371 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: Proxy commands\n",
1372 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state));
1373 if (i == as->cmd_num * 2)
1374 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: Counters\n",
1375 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state));
1377 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: cmd: %s, count: %llu, err: %llu\n",
1378 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state),
1379 dnet_counter_string(i, as->cmd_num),
1380 (unsigned long long)as->count[i].count, (unsigned long long)as->count[i].err);
1384 return err;
1387 static int dnet_request_cmd_single(struct dnet_node *n, struct dnet_net_state *st, struct dnet_trans_control *ctl)
1389 if (st)
1390 return dnet_trans_alloc_send_state(st, ctl);
1391 else
1392 return dnet_trans_alloc_send(n, ctl);
1395 int dnet_request_stat(struct dnet_node *n, struct dnet_id *id,
1396 unsigned int cmd, uint64_t cflags,
1397 int (* complete)(struct dnet_net_state *state,
1398 struct dnet_cmd *cmd,
1399 void *priv),
1400 void *priv)
1402 struct dnet_trans_control ctl;
1403 struct dnet_wait *w = NULL;
1404 int err, num = 0;
1405 struct timeval start, end;
1406 long diff;
1408 gettimeofday(&start, NULL);
1410 if (!complete) {
1411 w = dnet_wait_alloc(0);
1412 if (!w) {
1413 err = -ENOMEM;
1414 goto err_out_exit;
1417 complete = dnet_stat_complete;
1418 priv = w;
1421 memset(&ctl, 0, sizeof(struct dnet_trans_control));
1423 ctl.cmd = cmd;
1424 ctl.complete = complete;
1425 ctl.priv = priv;
1426 ctl.cflags = DNET_FLAGS_NEED_ACK | DNET_FLAGS_NOLOCK | cflags;
1428 if (id) {
1429 if (w)
1430 dnet_wait_get(w);
1432 memcpy(&ctl.id, id, sizeof(struct dnet_id));
1434 err = dnet_request_cmd_single(n, NULL, &ctl);
1435 num = 1;
1436 } else {
1437 struct dnet_net_state *st;
1438 struct dnet_group *g;
1441 pthread_mutex_lock(&n->state_lock);
1442 list_for_each_entry(g, &n->group_list, group_entry) {
1443 list_for_each_entry(st, &g->state_list, state_entry) {
1444 if (st == n->st)
1445 continue;
1447 if (w)
1448 dnet_wait_get(w);
1450 dnet_setup_id(&ctl.id, st->idc->group->group_id, st->idc->ids[0].raw.id);
1451 dnet_request_cmd_single(n, st, &ctl);
1452 num++;
1455 pthread_mutex_unlock(&n->state_lock);
1458 if (!w) {
1459 gettimeofday(&end, NULL);
1460 diff = (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec;
1461 dnet_log(n, DNET_LOG_NOTICE, "stat cmd: %s: %ld usecs, num: %d.\n", dnet_cmd_string(cmd), diff, num);
1463 return num;
1466 err = dnet_wait_event(w, w->cond == num, &n->wait_ts);
1468 gettimeofday(&end, NULL);
1469 diff = (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec;
1470 dnet_log(n, DNET_LOG_NOTICE, "stat cmd: %s: %ld usecs, wait_error: %d, num: %d.\n", dnet_cmd_string(cmd), diff, err, num);
1472 if (err)
1473 goto err_out_put;
1475 dnet_wait_put(w);
1477 return num;
1479 err_out_put:
1480 dnet_wait_put(w);
1481 err_out_exit:
1482 return err;
1485 struct dnet_request_cmd_priv {
1486 struct dnet_wait *w;
1488 int (* complete)(struct dnet_net_state *state, struct dnet_cmd *cmd, void *priv);
1489 void *priv;
1492 static int dnet_request_cmd_complete(struct dnet_net_state *state, struct dnet_cmd *cmd, void *priv)
1494 struct dnet_request_cmd_priv *p = priv;
1495 int err = p->complete(state, cmd, p->priv);
1497 if (is_trans_destroyed(state, cmd)) {
1498 struct dnet_wait *w = p->w;
1500 dnet_wakeup(w, w->cond++);
1501 if (atomic_read(&w->refcnt) == 1)
1502 free(p);
1503 dnet_wait_put(w);
1506 return err;
1509 int dnet_request_cmd(struct dnet_node *n, struct dnet_trans_control *ctl)
1511 int err, num = 0;
1512 struct dnet_request_cmd_priv *p;
1513 struct dnet_wait *w;
1514 struct dnet_net_state *st;
1515 struct dnet_group *g;
1516 struct timeval start, end;
1517 long diff;
1519 gettimeofday(&start, NULL);
1521 p = malloc(sizeof(*p));
1522 if (!p) {
1523 err = -ENOMEM;
1524 goto err_out_exit;
1527 w = dnet_wait_alloc(0);
1528 if (!w) {
1529 err = -ENOMEM;
1530 goto err_out_free;
1533 p->w = w;
1534 p->complete = ctl->complete;
1535 p->priv = ctl->priv;
1537 ctl->complete = dnet_request_cmd_complete;
1538 ctl->priv = p;
1540 pthread_mutex_lock(&n->state_lock);
1541 list_for_each_entry(g, &n->group_list, group_entry) {
1542 list_for_each_entry(st, &g->state_list, state_entry) {
1543 if (st == n->st)
1544 continue;
1546 dnet_wait_get(w);
1548 ctl->id.group_id = g->group_id;
1550 if (!(ctl->cflags & DNET_FLAGS_DIRECT))
1551 dnet_setup_id(&ctl->id, st->idc->group->group_id, st->idc->ids[0].raw.id);
1552 dnet_request_cmd_single(n, st, ctl);
1553 num++;
1556 pthread_mutex_unlock(&n->state_lock);
1558 err = dnet_wait_event(w, w->cond == num, &n->wait_ts);
1560 gettimeofday(&end, NULL);
1561 diff = (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec;
1562 dnet_log(n, DNET_LOG_NOTICE, "request cmd: %s: %ld usecs, wait_error: %d, num: %d.\n", dnet_cmd_string(ctl->cmd), diff, err, num);
1564 if (!err)
1565 err = num;
1567 if (atomic_read(&w->refcnt) == 1)
1568 free(p);
1569 dnet_wait_put(w);
1571 return err;
1573 err_out_free:
1574 free(p);
1575 err_out_exit:
1576 return err;
1579 struct dnet_update_status_priv {
1580 struct dnet_wait *w;
1581 struct dnet_node_status status;
1582 atomic_t refcnt;
1585 static int dnet_update_status_complete(struct dnet_net_state *state, struct dnet_cmd *cmd, void *priv)
1587 struct dnet_update_status_priv *p = priv;
1589 if (is_trans_destroyed(state, cmd)) {
1590 dnet_wakeup(p->w, p->w->cond++);
1591 dnet_wait_put(p->w);
1592 if (atomic_dec_and_test(&p->refcnt))
1593 free(p);
1596 if (cmd->size == sizeof(struct dnet_node_status)) {
1597 memcpy(&p->status, cmd + 1, sizeof(struct dnet_node_status));
1598 return 0;
1601 return -ENOENT;
1604 int dnet_update_status(struct dnet_node *n, struct dnet_addr *addr, struct dnet_id *id, struct dnet_node_status *status)
1606 int err;
1607 struct dnet_update_status_priv *priv;
1608 struct dnet_trans_control ctl;
1610 if (!id && !addr) {
1611 err = -EINVAL;
1612 goto err_out_exit;
1615 memset(&ctl, 0, sizeof(ctl));
1617 if (id) {
1618 memcpy(&ctl.id, id, sizeof(struct dnet_id));
1619 } else {
1620 struct dnet_net_state *st;
1622 st = dnet_state_search_by_addr(n, addr);
1623 if (!st) {
1624 err = -ENOENT;
1625 goto err_out_exit;
1628 dnet_setup_id(&ctl.id, st->idc->group->group_id, st->idc->ids[0].raw.id);
1629 dnet_state_put(st);
1632 priv = malloc(sizeof(struct dnet_update_status_priv));
1633 if (!priv) {
1634 err = -ENOMEM;
1635 goto err_out_exit;
1638 priv->w = dnet_wait_alloc(0);
1639 if (!priv->w) {
1640 err = -ENOMEM;
1641 goto err_out_exit;
1644 ctl.complete = dnet_update_status_complete;
1645 ctl.priv = priv;
1646 ctl.cmd = DNET_CMD_STATUS;
1647 ctl.cflags = DNET_FLAGS_NEED_ACK;
1648 ctl.size = sizeof(struct dnet_node_status);
1649 ctl.data = status;
1651 dnet_wait_get(priv->w);
1652 dnet_request_cmd_single(n, NULL, &ctl);
1654 err = dnet_wait_event(priv->w, priv->w->cond == 1, &n->wait_ts);
1655 dnet_wait_put(priv->w);
1656 if (!err && priv) {
1657 memcpy(status, &priv->status, sizeof(struct dnet_node_status));
1659 if (atomic_dec_and_test(&priv->refcnt))
1660 free(priv);
1662 err_out_exit:
1663 return err;
1666 static int dnet_remove_object_raw(struct dnet_node *n, struct dnet_id *id,
1667 int (* complete)(struct dnet_net_state *state,
1668 struct dnet_cmd *cmd,
1669 void *priv),
1670 void *priv, uint64_t cflags, uint64_t ioflags)
1672 struct dnet_io_control ctl;
1674 memset(&ctl, 0, sizeof(struct dnet_io_control));
1676 memcpy(&ctl.id, id, sizeof(struct dnet_id));
1678 memcpy(&ctl.io.id, id->id, DNET_ID_SIZE);
1679 memcpy(&ctl.io.parent, id->id, DNET_ID_SIZE);
1680 ctl.io.flags = ioflags;
1682 ctl.fd = -1;
1684 ctl.cmd = DNET_CMD_DEL;
1685 ctl.complete = complete;
1686 ctl.priv = priv;
1687 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
1689 return dnet_trans_create_send_all(n, &ctl);
1692 static int dnet_remove_complete(struct dnet_net_state *state,
1693 struct dnet_cmd *cmd,
1694 void *priv)
1696 struct dnet_wait *w = priv;
1698 if (is_trans_destroyed(state, cmd)) {
1699 dnet_wakeup(w, w->cond++);
1700 dnet_wait_put(w);
1701 return 0;
1704 if (cmd->status)
1705 w->status = cmd->status;
1706 return cmd->status;
1709 int dnet_remove_object(struct dnet_node *n, struct dnet_id *id,
1710 int (* complete)(struct dnet_net_state *state,
1711 struct dnet_cmd *cmd,
1712 void *priv),
1713 void *priv,
1714 uint64_t cflags, uint64_t ioflags)
1716 struct dnet_wait *w = NULL;
1717 int err;
1719 if (!complete) {
1720 w = dnet_wait_alloc(0);
1721 if (!w) {
1722 err = -ENOMEM;
1723 goto err_out_exit;
1726 complete = dnet_remove_complete;
1727 priv = w;
1728 dnet_wait_get(w);
1731 err = dnet_remove_object_raw(n, id, complete, priv, cflags, ioflags);
1732 if (err < 0)
1733 goto err_out_put;
1735 if (w) {
1736 err = dnet_wait_event(w, w->cond != err, &n->wait_ts);
1737 if (err)
1738 goto err_out_put;
1740 dnet_wait_put(w);
1742 return 0;
1744 err_out_put:
1745 if (w)
1746 dnet_wait_put(w);
1747 err_out_exit:
1748 return err;
1751 static int dnet_remove_file_raw(struct dnet_node *n, struct dnet_id *id, uint64_t cflags, uint64_t ioflags)
1753 struct dnet_wait *w;
1754 int err, num;
1756 w = dnet_wait_alloc(0);
1757 if (!w) {
1758 err = -ENOMEM;
1759 goto err_out_exit;
1762 atomic_add(&w->refcnt, 1024);
1763 err = dnet_remove_object_raw(n, id, dnet_remove_complete, w, cflags, ioflags);
1764 if (err < 0) {
1765 atomic_sub(&w->refcnt, 1024);
1766 goto err_out_put;
1769 num = err;
1770 atomic_sub(&w->refcnt, 1024 - num);
1772 err = dnet_wait_event(w, w->cond == num, &n->wait_ts);
1773 if (err)
1774 goto err_out_put;
1776 dnet_wait_put(w);
1778 return 0;
1780 err_out_put:
1781 dnet_wait_put(w);
1782 err_out_exit:
1783 return err;
1786 int dnet_remove_object_now(struct dnet_node *n, struct dnet_id *id, uint64_t cflags, uint64_t ioflags)
1788 return dnet_remove_file_raw(n, id, cflags | DNET_FLAGS_NEED_ACK | DNET_ATTR_DELETE_HISTORY, ioflags);
1791 int dnet_remove_file(struct dnet_node *n, char *remote, int remote_len, struct dnet_id *id, uint64_t cflags, uint64_t ioflags)
1793 struct dnet_id raw;
1795 if (!id) {
1796 dnet_transform(n, remote, remote_len, &raw);
1797 raw.group_id = 0;
1798 id = &raw;
1801 return dnet_remove_file_raw(n, id, cflags, ioflags);
1804 int dnet_request_ids(struct dnet_node *n, struct dnet_id *id, uint64_t cflags,
1805 int (* complete)(struct dnet_net_state *state,
1806 struct dnet_cmd *cmd,
1807 void *priv),
1808 void *priv)
1810 struct dnet_trans_control ctl;
1812 dnet_log_raw(n, DNET_LOG_ERROR, "Temporarily unsupported operation.\n");
1813 exit(-1);
1815 memset(&ctl, 0, sizeof(struct dnet_trans_control));
1817 memcpy(&ctl.id, id, sizeof(struct dnet_id));
1818 ctl.cmd = DNET_CMD_LIST;
1819 ctl.complete = complete;
1820 ctl.priv = priv;
1821 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
1823 return dnet_trans_alloc_send(n, &ctl);
1826 struct dnet_node *dnet_get_node_from_state(void *state)
1828 struct dnet_net_state *st = state;
1830 if (!st)
1831 return NULL;
1832 return st->n;
1835 struct dnet_read_data_completion {
1836 struct dnet_wait *w;
1837 void *data;
1838 uint64_t size;
1839 atomic_t refcnt;
1842 static int dnet_read_data_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
1844 struct dnet_read_data_completion *c = priv;
1845 struct dnet_wait *w = c->w;
1846 int err = -EINVAL;
1848 if (is_trans_destroyed(st, cmd)) {
1849 dnet_wakeup(w, w->cond++);
1850 dnet_wait_put(w);
1851 if (atomic_dec_and_test(&c->refcnt))
1852 free(c);
1853 return err;
1856 err = cmd->status;
1857 if (err)
1858 w->status = err;
1860 if (cmd->size >= sizeof(struct dnet_io_attr)) {
1861 struct dnet_io_attr *io = (struct dnet_io_attr *)(cmd + 1);
1862 uint64_t sz = c->size;
1864 dnet_convert_io_attr(io);
1866 sz += io->size + sizeof(struct dnet_io_attr);
1867 c->data = realloc(c->data, sz);
1868 if (!c->data) {
1869 err = -ENOMEM;
1870 goto err_out_exit;
1873 memcpy(c->data + c->size, io, sizeof(struct dnet_io_attr) + io->size);
1874 c->size = sz;
1877 err_out_exit:
1878 dnet_log(st->n, DNET_LOG_NOTICE, "%s: object read completed: trans: %llu, status: %d, err: %d.\n",
1879 dnet_dump_id(&cmd->id), (unsigned long long)(cmd->trans & ~DNET_TRANS_REPLY),
1880 cmd->status, err);
1882 return err;
1885 void *dnet_read_data_wait_raw(struct dnet_node *n, struct dnet_id *id, struct dnet_io_attr *io,
1886 int cmd, uint64_t cflags, int *errp)
1888 struct dnet_io_control ctl;
1889 struct dnet_wait *w;
1890 struct dnet_read_data_completion *c;
1891 void *data = NULL;
1892 int err;
1894 w = dnet_wait_alloc(0);
1895 if (!w) {
1896 err = -ENOMEM;
1897 goto err_out_exit;
1900 c = malloc(sizeof(*c));
1901 if (!c) {
1902 err = -ENOMEM;
1903 goto err_out_put;
1906 c->w = w;
1907 c->size = 0;
1908 c->data = NULL;
1909 /* one for completion callback, another for this function */
1910 atomic_init(&c->refcnt, 2);
1912 memset(&ctl, 0, sizeof(struct dnet_io_control));
1914 ctl.fd = -1;
1916 ctl.priv = c;
1917 ctl.complete = dnet_read_data_complete;
1919 ctl.cmd = cmd;
1920 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
1922 memcpy(&ctl.io, io, sizeof(struct dnet_io_attr));
1923 memcpy(&ctl.id, id, sizeof(struct dnet_id));
1925 ctl.id.type = io->type;
1927 dnet_wait_get(w);
1928 err = dnet_read_object(n, &ctl);
1929 if (err)
1930 goto err_out_put_complete;
1932 err = dnet_wait_event(w, w->cond, &n->wait_ts);
1933 if (err || w->status) {
1934 char id_str[2*DNET_ID_SIZE + 1];
1935 if (!err)
1936 err = w->status;
1937 if ((cmd != DNET_CMD_READ_RANGE) || (err != -ENOENT))
1938 dnet_log(n, DNET_LOG_ERROR, "%d:%s : failed to read data: %d\n",
1939 ctl.id.group_id, dnet_dump_id_len_raw(ctl.id.id, DNET_ID_SIZE, id_str), err);
1940 goto err_out_put_complete;
1942 io->size = c->size;
1943 data = c->data;
1944 err = 0;
1946 err_out_put_complete:
1947 if (atomic_dec_and_test(&c->refcnt))
1948 free(c);
1949 err_out_put:
1950 dnet_wait_put(w);
1951 err_out_exit:
1952 *errp = err;
1953 return data;
1956 void *dnet_read_data_wait_groups(struct dnet_node *n, struct dnet_id *id, int *groups, int num,
1957 struct dnet_io_attr *io, uint64_t cflags, int *errp)
1959 int i;
1960 void *data;
1962 for (i = 0; i < num; ++i) {
1963 id->group_id = groups[i];
1965 data = dnet_read_data_wait_raw(n, id, io, DNET_CMD_READ, cflags, errp);
1966 if (data) {
1967 *errp = 0;
1968 return data;
1972 return NULL;
1975 void *dnet_read_data_wait(struct dnet_node *n, struct dnet_id *id, struct dnet_io_attr *io,
1976 uint64_t cflags, int *errp)
1978 int num, *g, err;
1979 void *data = NULL;
1981 num = dnet_mix_states(n, id, &g);
1982 if (num < 0) {
1983 err = num;
1984 goto err_out_exit;
1987 data = dnet_read_data_wait_groups(n, id, g, num, io, cflags, &err);
1988 if (!data)
1989 goto err_out_free;
1991 err_out_free:
1992 free(g);
1993 err_out_exit:
1994 *errp = err;
1995 return data;
1998 int dnet_write_data_wait(struct dnet_node *n, struct dnet_io_control *ctl, void **result)
2000 int err, trans_num = 0;
2001 struct dnet_wait *w;
2002 struct dnet_write_completion *wc;
2004 wc = malloc(sizeof(struct dnet_write_completion));
2005 if (!wc) {
2006 err = -ENOMEM;
2007 goto err_out_exit;
2009 memset(wc, 0, sizeof(struct dnet_write_completion));
2011 w = dnet_wait_alloc(0);
2012 if (!w) {
2013 err = -ENOMEM;
2014 free(wc);
2015 goto err_out_exit;
2017 wc->wait = w;
2019 w->status = -ENOENT;
2020 ctl->priv = wc;
2021 ctl->complete = dnet_write_complete;
2023 ctl->cmd = DNET_CMD_WRITE;
2024 ctl->cflags |= DNET_FLAGS_NEED_ACK;
2026 memcpy(ctl->io.id, ctl->id.id, DNET_ID_SIZE);
2027 memcpy(ctl->io.parent, ctl->id.id, DNET_ID_SIZE);
2029 atomic_set(&w->refcnt, INT_MAX);
2030 trans_num = dnet_write_object(n, ctl);
2031 if (trans_num < 0)
2032 trans_num = 0;
2035 * 1 - the first reference counter we grabbed at allocation time
2037 atomic_sub(&w->refcnt, INT_MAX - trans_num - 1);
2039 err = dnet_wait_event(w, w->cond == trans_num, &n->wait_ts);
2040 if (err || w->status) {
2041 if (!err)
2042 err = w->status;
2043 dnet_log(n, DNET_LOG_NOTICE, "%s: failed to wait for IO write completion, err: %d, status: %d.\n",
2044 dnet_dump_id(&ctl->id), err, w->status);
2047 if (err || !trans_num) {
2048 if (!err)
2049 err = -EINVAL;
2050 dnet_log(n, DNET_LOG_ERROR, "Failed to write data into the storage, err: %d, trans_num: %d.\n", err, trans_num);
2051 goto err_out_put;
2054 if (trans_num)
2055 dnet_log(n, DNET_LOG_NOTICE, "%s: wrote: %llu bytes, type: %d, reply size: %d.\n",
2056 dnet_dump_id(&ctl->id), (unsigned long long)ctl->io.size, ctl->io.type, wc->size);
2057 err = trans_num;
2059 *result = wc->reply;
2060 err = wc->size;
2062 wc->reply = NULL;
2064 err_out_put:
2065 dnet_write_complete_free(wc);
2066 err_out_exit:
2067 return err;
2070 int dnet_lookup_addr(struct dnet_node *n, const void *remote, int len, struct dnet_id *id, int group_id, char *dst, int dlen)
2072 struct dnet_id raw;
2073 struct dnet_net_state *st;
2074 int err = -ENOENT;
2076 if (!id) {
2077 dnet_transform(n, remote, len, &raw);
2078 id = &raw;
2080 id->group_id = group_id;
2082 st = dnet_state_get_first(n, id);
2083 if (!st)
2084 goto err_out_exit;
2086 dnet_server_convert_dnet_addr_raw(dnet_state_addr(st), dst, dlen);
2087 dnet_state_put(st);
2088 err = 0;
2090 err_out_exit:
2091 return err;
2094 struct dnet_weight {
2095 int weight;
2096 int group_id;
2099 static int dnet_weight_compare(const void *v1, const void *v2)
2101 const struct dnet_weight *w1 = v1;
2102 const struct dnet_weight *w2 = v2;
2104 return w2->weight - w1->weight;
2107 static int dnet_weight_get_winner(struct dnet_weight *w, int num)
2109 long sum = 0, pos;
2110 float r;
2111 int i;
2113 for (i = 0; i < num; ++i)
2114 sum += w[i].weight;
2116 r = (float)rand() / (float)RAND_MAX;
2117 pos = r * sum;
2119 for (i = 0; i < num; ++i) {
2120 pos -= w[i].weight;
2121 if (pos <= 0)
2122 return i;
2125 return num - 1;
2128 int dnet_mix_states(struct dnet_node *n, struct dnet_id *id, int **groupsp)
2130 struct dnet_weight *weights;
2131 int *groups;
2132 int group_num, i, num;
2133 struct dnet_net_state *st;
2135 if (!n->group_num)
2136 return -ENOENT;
2138 pthread_mutex_lock(&n->group_lock);
2139 group_num = n->group_num;
2141 weights = alloca(n->group_num * sizeof(*weights));
2142 groups = malloc(n->group_num * sizeof(*groups));
2143 if (groups)
2144 memcpy(groups, n->groups, n->group_num * sizeof(*groups));
2145 pthread_mutex_unlock(&n->group_lock);
2147 if (!groups) {
2148 *groupsp = NULL;
2149 return -ENOMEM;
2152 if (n->flags & DNET_CFG_RANDOMIZE_STATES) {
2153 for (i = 0; i < group_num; ++i) {
2154 weights[i].weight = rand();
2155 weights[i].group_id = groups[i];
2157 num = group_num;
2158 } else {
2159 if (!(n->flags & DNET_CFG_MIX_STATES)) {
2160 *groupsp = groups;
2161 return group_num;
2164 memset(weights, 0, group_num * sizeof(*weights));
2166 for (i = 0, num = 0; i < group_num; ++i) {
2167 id->group_id = groups[i];
2169 st = dnet_state_get_first(n, id);
2170 if (st) {
2171 weights[num].weight = (int)st->weight;
2172 weights[num].group_id = id->group_id;
2174 dnet_state_put(st);
2176 num++;
2181 group_num = num;
2182 if (group_num) {
2183 qsort(weights, group_num, sizeof(struct dnet_weight), dnet_weight_compare);
2185 for (i = 0; i < group_num; ++i) {
2186 int pos = dnet_weight_get_winner(weights, group_num - i);
2187 groups[i] = weights[pos].group_id;
2189 if (pos < group_num - 1)
2190 memmove(&weights[pos], &weights[pos + 1], (group_num - 1 - pos) * sizeof(struct dnet_weight));
2194 dnet_node_set_groups(n, groups, group_num);
2196 *groupsp = groups;
2197 return group_num;
2200 int dnet_data_map(struct dnet_map_fd *map)
2202 uint64_t off;
2203 long page_size = sysconf(_SC_PAGE_SIZE);
2204 int err = 0;
2206 off = map->offset & ~(page_size - 1);
2207 map->mapped_size = ALIGN(map->size + map->offset - off, page_size);
2209 map->mapped_data = mmap(NULL, map->mapped_size, PROT_READ, MAP_SHARED, map->fd, off);
2210 if (map->mapped_data == MAP_FAILED) {
2211 err = -errno;
2212 goto err_out_exit;
2215 map->data = map->mapped_data + map->offset - off;
2217 err_out_exit:
2218 return err;
2221 void dnet_data_unmap(struct dnet_map_fd *map)
2223 munmap(map->mapped_data, map->mapped_size);
2226 struct dnet_io_attr *dnet_remove_range(struct dnet_node *n, struct dnet_io_attr *io, int group_id, uint64_t cflags, int *ret_num, int *errp)
2228 struct dnet_id id;
2229 struct dnet_io_attr *ret, *new_ret;
2230 struct dnet_raw_id start, next;
2231 struct dnet_raw_id end;
2232 uint64_t size = io->size;
2233 void *data;
2234 int err, need_exit = 0;
2236 memcpy(end.id, io->parent, DNET_ID_SIZE);
2238 dnet_setup_id(&id, group_id, io->id);
2239 id.type = io->type;
2241 ret = NULL;
2242 *ret_num = 0;
2243 while (!need_exit) {
2244 err = dnet_search_range(n, &id, &start, &next);
2245 if (err)
2246 goto err_out_exit;
2248 if ((dnet_id_cmp_str(id.id, next.id) > 0) ||
2249 !memcmp(start.id, next.id, DNET_ID_SIZE) ||
2250 (dnet_id_cmp_str(next.id, end.id) > 0)) {
2251 memcpy(next.id, end.id, DNET_ID_SIZE);
2252 need_exit = 1;
2255 if (n->log->log_level > DNET_LOG_NOTICE) {
2256 int len = 6;
2257 char start_id[2*len + 1];
2258 char next_id[2*len + 1];
2259 char end_id[2*len + 1];
2260 char id_str[2*len + 1];
2262 dnet_log(n, DNET_LOG_NOTICE, "id: %s, start: %s: next: %s, end: %s, size: %llu, cmp: %d\n",
2263 dnet_dump_id_len_raw(id.id, len, id_str),
2264 dnet_dump_id_len_raw(start.id, len, start_id),
2265 dnet_dump_id_len_raw(next.id, len, next_id),
2266 dnet_dump_id_len_raw(end.id, len, end_id),
2267 (unsigned long long)size, dnet_id_cmp_str(next.id, end.id));
2270 memcpy(io->id, id.id, DNET_ID_SIZE);
2271 memcpy(io->parent, next.id, DNET_ID_SIZE);
2273 io->size = size;
2275 data = dnet_read_data_wait_raw(n, &id, io, DNET_CMD_DEL_RANGE, cflags, &err);
2276 if (io->size != sizeof(struct dnet_io_attr)) {
2277 err = -ENOENT;
2278 goto err_out_exit;
2281 if (data) {
2282 struct dnet_io_attr *rep = (struct dnet_io_attr*)data;
2284 dnet_convert_io_attr(rep);
2286 dnet_log(n, DNET_LOG_NOTICE, "%s: rep_num: %llu, io_start: %llu, io_num: %llu, io_size: %llu\n",
2287 dnet_dump_id(&id), (unsigned long long)rep->num, (unsigned long long)io->start,
2288 (unsigned long long)io->num, (unsigned long long)io->size);
2290 (*ret_num)++;
2292 new_ret = realloc(ret, *ret_num * sizeof(struct dnet_io_attr));
2293 if (!new_ret) {
2294 err = -ENOMEM;
2295 goto err_out_exit;
2298 ret = new_ret;
2299 ret[*ret_num - 1] = *rep;
2301 free(data);
2304 memcpy(id.id, next.id, DNET_ID_SIZE);
2307 err_out_exit:
2308 *errp = err;
2310 return ret;
2313 struct dnet_range_data *dnet_read_range(struct dnet_node *n, struct dnet_io_attr *io, int group_id, uint64_t cflags, int *errp)
2315 struct dnet_id id;
2316 int ret_num;
2317 struct dnet_range_data *ret;
2318 struct dnet_raw_id start, next;
2319 struct dnet_raw_id end;
2320 uint64_t size = io->size;
2321 void *data;
2322 int err, need_exit = 0;
2324 memcpy(end.id, io->parent, DNET_ID_SIZE);
2326 dnet_setup_id(&id, group_id, io->id);
2327 id.type = io->type;
2329 ret = NULL;
2330 ret_num = 0;
2331 while (!need_exit) {
2332 err = dnet_search_range(n, &id, &start, &next);
2333 if (err)
2334 goto err_out_exit;
2336 if ((dnet_id_cmp_str(id.id, next.id) > 0) ||
2337 !memcmp(start.id, next.id, DNET_ID_SIZE) ||
2338 (dnet_id_cmp_str(next.id, end.id) > 0)) {
2339 memcpy(next.id, end.id, DNET_ID_SIZE);
2340 need_exit = 1;
2343 if (n->log->log_level > DNET_LOG_NOTICE) {
2344 int len = 6;
2345 char start_id[2*len + 1];
2346 char next_id[2*len + 1];
2347 char end_id[2*len + 1];
2348 char id_str[2*len + 1];
2350 dnet_log(n, DNET_LOG_NOTICE, "id: %s, start: %s: next: %s, end: %s, size: %llu, cmp: %d\n",
2351 dnet_dump_id_len_raw(id.id, len, id_str),
2352 dnet_dump_id_len_raw(start.id, len, start_id),
2353 dnet_dump_id_len_raw(next.id, len, next_id),
2354 dnet_dump_id_len_raw(end.id, len, end_id),
2355 (unsigned long long)size, dnet_id_cmp_str(next.id, end.id));
2358 memcpy(io->id, id.id, DNET_ID_SIZE);
2359 memcpy(io->parent, next.id, DNET_ID_SIZE);
2361 io->size = size;
2363 data = dnet_read_data_wait_raw(n, &id, io, DNET_CMD_READ_RANGE, cflags, &err);
2364 if (data) {
2365 struct dnet_io_attr *rep = data + io->size - sizeof(struct dnet_io_attr);
2367 /* If DNET_IO_FLAGS_NODATA is set do not decrement size as 'rep' is the only structure in output */
2368 if (!(io->flags & DNET_IO_FLAGS_NODATA))
2369 io->size -= sizeof(struct dnet_io_attr);
2370 dnet_convert_io_attr(rep);
2372 dnet_log(n, DNET_LOG_NOTICE, "%s: rep_num: %llu, io_start: %llu, io_num: %llu, io_size: %llu\n",
2373 dnet_dump_id(&id), (unsigned long long)rep->num, (unsigned long long)io->start,
2374 (unsigned long long)io->num, (unsigned long long)io->size);
2376 if (io->start < rep->num) {
2377 rep->num -= io->start;
2378 io->start = 0;
2379 io->num -= rep->num;
2381 if (!io->size && !(io->flags & DNET_IO_FLAGS_NODATA)) {
2382 free(data);
2383 } else {
2384 struct dnet_range_data *new_ret;
2386 ret_num++;
2388 new_ret = realloc(ret, ret_num * sizeof(struct dnet_range_data));
2389 if (!new_ret) {
2390 goto err_out_exit;
2393 ret = new_ret;
2395 ret[ret_num - 1].data = data;
2396 ret[ret_num - 1].size = io->size;
2399 err = 0;
2400 if (!io->num)
2401 break;
2402 } else {
2403 io->start -= rep->num;
2407 memcpy(id.id, next.id, DNET_ID_SIZE);
2410 err_out_exit:
2411 if (ret) {
2412 *errp = ret_num;
2413 } else {
2414 *errp = err;
2416 return ret;
2419 struct dnet_read_latest_id {
2420 struct dnet_id id;
2421 struct dnet_file_info fi;
2424 struct dnet_read_latest_ctl {
2425 struct dnet_wait *w;
2426 int num, pos;
2427 pthread_mutex_t lock;
2429 struct dnet_read_latest_id ids[0];
2432 static void dnet_read_latest_ctl_put(struct dnet_read_latest_ctl *ctl)
2434 dnet_wakeup(ctl->w, ctl->w->cond++);
2435 if (atomic_dec_and_test(&ctl->w->refcnt)) {
2436 dnet_wait_destroy(ctl->w);
2437 pthread_mutex_destroy(&ctl->lock);
2438 free(ctl);
2442 static int dnet_read_latest_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
2444 struct dnet_read_latest_ctl *ctl = priv;
2445 struct dnet_node *n;
2446 struct dnet_addr_attr *a;
2447 struct dnet_file_info *fi;
2448 int pos, err;
2450 if (is_trans_destroyed(st, cmd)) {
2451 dnet_read_latest_ctl_put(ctl);
2452 return 0;
2455 n = st->n;
2457 err = cmd->status;
2458 if (err || !cmd->size)
2459 goto err_out_exit;
2461 if (cmd->size < sizeof(struct dnet_addr_attr) + sizeof(struct dnet_file_info)) {
2462 dnet_log(n, DNET_LOG_ERROR, "%s: wrong dnet_addr attribute size %llu, must be at least %zu.\n",
2463 dnet_dump_id(&cmd->id), (unsigned long long)cmd->size,
2464 sizeof(struct dnet_addr_attr) + sizeof(struct dnet_file_info));
2465 err = -EINVAL;
2466 goto err_out_exit;
2468 a = (struct dnet_addr_attr *)(cmd + 1);
2469 fi = (struct dnet_file_info *)(a + 1);
2471 dnet_convert_addr_attr(a);
2472 dnet_convert_file_info(fi);
2474 pthread_mutex_lock(&ctl->lock);
2475 pos = ctl->pos++;
2476 pthread_mutex_unlock(&ctl->lock);
2478 /* we do not care about filename */
2479 memcpy(&ctl->ids[pos].fi, fi, sizeof(struct dnet_file_info));
2480 memcpy(&ctl->ids[pos].id, &cmd->id, sizeof(struct dnet_id));
2482 err_out_exit:
2483 return err;
2486 static int dnet_file_read_latest_cmp(const void *p1, const void *p2)
2488 const struct dnet_read_latest_id *id1 = p1;
2489 const struct dnet_read_latest_id *id2 = p2;
2491 int ret = (int)(id2->fi.mtime.tsec - id1->fi.mtime.tsec);
2493 if (!ret)
2494 ret = (int)(id2->fi.mtime.tnsec - id1->fi.mtime.tnsec);
2496 return ret;
2499 int dnet_read_latest_prepare(struct dnet_read_latest_prepare *pr)
2501 struct dnet_read_latest_ctl *ctl;
2502 int group_id = pr->id.group_id;
2503 int err, i;
2505 ctl = malloc(sizeof(struct dnet_read_latest_ctl) + sizeof(struct dnet_read_latest_id) * pr->group_num);
2506 if (!ctl) {
2507 err = -ENOMEM;
2508 goto err_out_exit;
2510 memset(ctl, 0, sizeof(struct dnet_read_latest_ctl));
2512 ctl->w = dnet_wait_alloc(0);
2513 if (!ctl->w) {
2514 err = -ENOMEM;
2515 goto err_out_free;
2518 err = pthread_mutex_init(&ctl->lock, NULL);
2519 if (err)
2520 goto err_out_put_wait;
2522 ctl->num = pr->group_num;
2523 ctl->pos = 0;
2525 for (i = 0; i < pr->group_num; ++i) {
2526 pr->id.group_id = pr->group[i];
2528 dnet_wait_get(ctl->w);
2529 dnet_lookup_object(pr->n, &pr->id, DNET_ATTR_META_TIMES | pr->cflags, dnet_read_latest_complete, ctl);
2532 err = dnet_wait_event(ctl->w, ctl->w->cond == pr->group_num, &pr->n->wait_ts);
2533 if (err)
2534 goto err_out_put;
2536 if (ctl->pos == 0)
2537 goto err_out_put;
2539 pr->group_num = ctl->pos;
2541 qsort(ctl->ids, pr->group_num, sizeof(struct dnet_read_latest_id), dnet_file_read_latest_cmp);
2543 for (i = 0; i < pr->group_num; ++i) {
2544 pr->group[i] = ctl->ids[i].id.group_id;
2546 if (group_id == pr->group[i]) {
2547 const struct dnet_read_latest_id *id0 = &ctl->ids[0];
2548 const struct dnet_read_latest_id *id1 = &ctl->ids[i];
2550 if (!dnet_file_read_latest_cmp(id0, id1)) {
2551 int tmp_group = pr->group[0];
2552 pr->group[0] = pr->group[i];
2553 pr->group[i] = tmp_group;
2558 err_out_put:
2559 dnet_read_latest_ctl_put(ctl);
2560 goto err_out_exit;
2562 err_out_put_wait:
2563 dnet_wait_put(ctl->w);
2564 err_out_free:
2565 free(ctl);
2566 err_out_exit:
2567 return err;
2570 int dnet_read_latest(struct dnet_node *n, struct dnet_id *id, struct dnet_io_attr *io, uint64_t cflags, void **datap)
2572 struct dnet_read_latest_prepare pr;
2573 int *g, num, err, i;
2575 if ((int)io->num > n->group_num) {
2576 err = -E2BIG;
2577 goto err_out_exit;
2580 err = dnet_mix_states(n, id, &g);
2581 if (err < 0)
2582 goto err_out_exit;
2584 num = err;
2586 if ((int)io->num > num) {
2587 err = -E2BIG;
2588 goto err_out_free;
2591 memset(&pr, 0, sizeof(struct dnet_read_latest_prepare));
2593 pr.n = n;
2594 pr.id = *id;
2595 pr.group = g;
2596 pr.group_num = num;
2597 pr.cflags = cflags;
2599 err = dnet_read_latest_prepare(&pr);
2600 if (err)
2601 goto err_out_free;
2603 err = -ENODATA;
2604 for (i = 0; i < pr.group_num; ++i) {
2605 void *data;
2607 id->group_id = pr.group[i];
2608 data = dnet_read_data_wait_raw(n, id, io, DNET_CMD_READ, cflags, &err);
2609 if (data) {
2610 *datap = data;
2611 err = 0;
2612 break;
2616 err_out_free:
2617 free(g);
2618 err_out_exit:
2619 return err;
2622 int dnet_get_routes(struct dnet_node *n, struct dnet_id **ids, struct dnet_addr **addrs) {
2624 struct dnet_net_state *st;
2625 struct dnet_group *g;
2626 struct dnet_addr *tmp_addrs;
2627 struct dnet_id *tmp_ids;
2628 int size = 0, count = 0;
2629 int i;
2631 *ids = NULL;
2632 *addrs = NULL;
2634 pthread_mutex_lock(&n->state_lock);
2635 list_for_each_entry(g, &n->group_list, group_entry) {
2636 list_for_each_entry(st, &g->state_list, state_entry) {
2638 size += st->idc->id_num;
2640 tmp_ids = (struct dnet_id *)realloc(*ids, size * sizeof(struct dnet_id));
2641 if (!tmp_ids) {
2642 count = -ENOMEM;
2643 goto err_out_free;
2645 *ids = tmp_ids;
2647 tmp_addrs = (struct dnet_addr *)realloc(*addrs, size * sizeof(struct dnet_addr));
2648 if (!tmp_addrs) {
2649 count = -ENOMEM;
2650 goto err_out_free;
2652 *addrs = tmp_addrs;
2654 for (i = 0; i < st->idc->id_num; ++i) {
2655 dnet_setup_id(&(*ids)[count], g->group_id, st->idc->ids[i].raw.id);
2656 memcpy(&(*addrs)[count], dnet_state_addr(st), sizeof(struct dnet_addr));
2657 count++;
2658 //fprintf(stderr, "%d: %s -> %s\n", g->group_id, dnet_dump_id_str(st->idc->ids[i].raw.id), dnet_state_dump_addr(st));
2662 pthread_mutex_unlock(&n->state_lock);
2664 return count;
2666 err_out_free:
2667 if (ids)
2668 free(*ids);
2669 if (addrs)
2670 free(*addrs);
2672 return count;
2676 void *dnet_bulk_read_wait_raw(struct dnet_node *n, struct dnet_id *id, struct dnet_io_attr *ios,
2677 uint32_t io_num, int cmd, uint64_t cflags, int *errp)
2679 struct dnet_io_control ctl;
2680 struct dnet_io_attr io;
2681 struct dnet_wait *w;
2682 struct dnet_read_data_completion *c;
2683 void *data = NULL;
2684 int err;
2686 w = dnet_wait_alloc(0);
2687 if (!w) {
2688 err = -ENOMEM;
2689 goto err_out_exit;
2692 c = malloc(sizeof(*c));
2693 if (!c) {
2694 err = -ENOMEM;
2695 goto err_out_put;
2698 c->w = w;
2699 c->size = 0;
2700 c->data = NULL;
2701 /* one for completion callback, another for this function */
2702 atomic_init(&c->refcnt, 2);
2704 memset(&ctl, 0, sizeof(struct dnet_io_control));
2706 ctl.fd = -1;
2708 ctl.priv = c;
2709 ctl.complete = dnet_read_data_complete;
2711 ctl.cmd = cmd;
2712 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
2714 memcpy(&ctl.id, id, sizeof(struct dnet_id));
2715 memset(&ctl.io, 0, sizeof(struct dnet_io_attr));
2717 memcpy(io.id, id->id, DNET_ID_SIZE);
2718 memcpy(io.parent, id->id, DNET_ID_SIZE);
2720 ctl.io.size = io_num * sizeof(struct dnet_io_attr);
2721 ctl.data = ios;
2723 dnet_wait_get(w);
2724 err = dnet_read_object(n, &ctl);
2725 if (err)
2726 goto err_out_put_complete;
2728 err = dnet_wait_event(w, w->cond, &n->wait_ts);
2729 if (err || w->status) {
2730 char id_str[2*DNET_ID_SIZE + 1];
2731 if (!err)
2732 err = w->status;
2733 if ((cmd != DNET_CMD_READ_RANGE) || (err != -ENOENT))
2734 dnet_log(n, DNET_LOG_ERROR, "%d:%s : failed to read data: %d\n",
2735 ctl.id.group_id, dnet_dump_id_len_raw(ctl.id.id, DNET_ID_SIZE, id_str), err);
2736 goto err_out_put_complete;
2738 err = c->size;
2739 data = c->data;
2741 err_out_put_complete:
2742 if (atomic_dec_and_test(&c->refcnt))
2743 free(c);
2744 err_out_put:
2745 dnet_wait_put(w);
2746 err_out_exit:
2747 *errp = err;
2748 return data;
2752 static int dnet_io_attr_cmp(const void *d1, const void *d2)
2754 const struct dnet_io_attr *io1 = d1;
2755 const struct dnet_io_attr *io2 = d2;
2757 return memcmp(io1->id, io2->id, DNET_ID_SIZE);
2760 struct dnet_range_data *dnet_bulk_read(struct dnet_node *n, struct dnet_io_attr *ios, uint32_t io_num, int group_id, uint64_t cflags, int *errp)
2762 struct dnet_id id, next_id;
2763 int ret_num;
2764 struct dnet_range_data *ret;
2765 struct dnet_net_state *cur, *next = NULL;
2766 uint64_t size = 0;
2767 void *data;
2768 int err;
2769 uint32_t i, start = -1;
2771 if (io_num <= 0) {
2772 return 0;
2775 qsort(ios, io_num, sizeof(struct dnet_io_attr), dnet_io_attr_cmp);
2777 ret = NULL;
2778 ret_num = 0;
2779 size = 0;
2781 dnet_setup_id(&id, group_id, ios[0].id);
2782 id.type = ios[0].type;
2784 cur = dnet_state_get_first(n, &id);
2785 if (!cur) {
2786 dnet_log(n, DNET_LOG_ERROR, "%s: Can't get state for id\n", dnet_dump_id(&id));
2787 err = -ENOENT;
2788 goto err_out_exit;
2791 for (i = 0; i < io_num; ++i) {
2792 if ((i + 1) < io_num) {
2793 dnet_setup_id(&next_id, group_id, ios[i+1].id);
2794 next_id.type = ios[i+1].type;
2796 next = dnet_state_get_first(n, &next_id);
2797 if (!next) {
2798 dnet_log(n, DNET_LOG_ERROR, "%s: Can't get state for id\n", dnet_dump_id(&next_id));
2799 err = -ENOENT;
2800 goto err_out_put;
2803 /* Send command only if state changes or it's a last id */
2804 if ((cur == next)) {
2805 dnet_state_put(next);
2806 next = NULL;
2807 continue;
2811 dnet_log(n, DNET_LOG_NOTICE, "start: %s: end: %s, count: %llu, addr: %s\n",
2812 dnet_dump_id(&id),
2813 dnet_dump_id(&next_id),
2814 (unsigned long long)(i - start),
2815 dnet_state_dump_addr(cur));
2817 data = dnet_bulk_read_wait_raw(n, &id, ios, i - start, DNET_CMD_BULK_READ, cflags, &err);
2818 if (data) {
2819 size = err;
2820 err = 0;
2822 if (!size) {
2823 free(data);
2824 } else {
2825 struct dnet_range_data *new_ret;
2827 ret_num++;
2829 new_ret = realloc(ret, ret_num * sizeof(struct dnet_range_data));
2830 if (!new_ret) {
2831 goto err_out_put;
2834 ret = new_ret;
2836 ret[ret_num - 1].data = data;
2837 ret[ret_num - 1].size = size;
2840 err = 0;
2843 dnet_state_put(cur);
2844 cur = next;
2845 next = NULL;
2846 memcpy(&id, &next_id, sizeof(struct dnet_id));
2849 err_out_put:
2850 if (next)
2851 dnet_state_put(next);
2852 dnet_state_put(cur);
2853 err_out_exit:
2854 if (ret) {
2855 *errp = ret_num;
2856 } else {
2857 *errp = err;
2859 return ret;
2862 struct dnet_range_data dnet_bulk_write(struct dnet_node *n, struct dnet_io_control *ctl, int ctl_num, int *errp)
2864 int err, i, trans_num = 0, local_trans_num;
2865 struct dnet_wait *w;
2866 struct dnet_write_completion *wc;
2867 struct dnet_range_data ret;
2868 struct dnet_metadata_control mcl;
2869 struct dnet_meta_container mc;
2870 struct dnet_io_control meta_ctl;
2871 struct timeval tv;
2872 int *groups = NULL;
2873 int group_num = 0;
2875 memset(&ret, 0, sizeof(ret));
2877 wc = malloc(sizeof(struct dnet_write_completion));
2878 if (!wc) {
2879 err = -ENOMEM;
2880 goto err_out_exit;
2882 memset(wc, 0, sizeof(struct dnet_write_completion));
2884 w = dnet_wait_alloc(0);
2885 if (!w) {
2886 err = -ENOMEM;
2887 free(wc);
2888 goto err_out_exit;
2890 wc->wait = w;
2892 atomic_set(&w->refcnt, INT_MAX);
2893 w->status = -ENOENT;
2895 for (i = 0; i < ctl_num; ++i) {
2896 ctl[i].priv = wc;
2897 ctl[i].complete = dnet_write_complete;
2899 ctl[i].cmd = DNET_CMD_WRITE;
2900 ctl[i].cflags = DNET_FLAGS_NEED_ACK;
2902 memcpy(ctl[i].io.id, ctl[i].id.id, DNET_ID_SIZE);
2903 memcpy(ctl[i].io.parent, ctl[i].id.id, DNET_ID_SIZE);
2905 local_trans_num = dnet_write_object(n, &ctl[i]);
2906 if (local_trans_num < 0)
2907 local_trans_num = 0;
2909 trans_num += local_trans_num;
2911 /* Prepare and send metadata */
2912 memset(&mcl, 0, sizeof(mcl));
2914 pthread_mutex_lock(&n->group_lock);
2915 group_num = n->group_num;
2916 groups = alloca(group_num * sizeof(int));
2918 memcpy(groups, n->groups, group_num * sizeof(int));
2919 pthread_mutex_unlock(&n->group_lock);
2921 mcl.groups = groups;
2922 mcl.group_num = group_num;
2923 mcl.id = ctl[i].id;
2924 mcl.cflags = ctl[i].cflags;
2926 gettimeofday(&tv, NULL);
2927 mcl.ts.tv_sec = tv.tv_sec;
2928 mcl.ts.tv_nsec = tv.tv_usec * 1000;
2930 memset(&mc, 0, sizeof(mc));
2932 err = dnet_create_metadata(n, &mcl, &mc);
2933 dnet_log(n, DNET_LOG_DEBUG, "Creating metadata: err: %d", err);
2934 if (!err) {
2935 dnet_convert_metadata(n, mc.data, mc.size);
2937 memset(&meta_ctl, 0, sizeof(struct dnet_io_control));
2939 meta_ctl.priv = wc;
2940 meta_ctl.complete = dnet_write_complete;
2941 meta_ctl.cmd = DNET_CMD_WRITE;
2942 meta_ctl.fd = -1;
2944 meta_ctl.cflags = ctl[i].cflags;
2946 memcpy(&meta_ctl.id, &ctl[i].id, sizeof(struct dnet_id));
2947 memcpy(meta_ctl.io.id, ctl[i].id.id, DNET_ID_SIZE);
2948 memcpy(meta_ctl.io.parent, ctl[i].id.id, DNET_ID_SIZE);
2949 meta_ctl.id.type = meta_ctl.io.type = EBLOB_TYPE_META;
2951 meta_ctl.io.flags |= DNET_IO_FLAGS_META;
2952 meta_ctl.io.offset = 0;
2953 meta_ctl.io.size = mc.size;
2954 meta_ctl.data = mc.data;
2956 local_trans_num = dnet_write_object(n, &meta_ctl);
2957 if (local_trans_num < 0)
2958 local_trans_num = 0;
2960 trans_num += local_trans_num;
2965 * 1 - the first reference counter we grabbed at allocation time
2967 atomic_sub(&w->refcnt, INT_MAX - trans_num - 1);
2969 err = dnet_wait_event(w, w->cond == trans_num, &n->wait_ts);
2970 if (err || w->status) {
2971 if (!err)
2972 err = w->status;
2973 dnet_log(n, DNET_LOG_NOTICE, "%s: failed to wait for IO write completion, err: %d, status: %d.\n",
2974 dnet_dump_id(&ctl->id), err, w->status);
2977 if (err || !trans_num) {
2978 if (!err)
2979 err = -EINVAL;
2980 dnet_log(n, DNET_LOG_ERROR, "Failed to write data into the storage, err: %d, trans_num: %d.\n", err, trans_num);
2981 goto err_out_put;
2984 if (trans_num)
2985 dnet_log(n, DNET_LOG_NOTICE, "%s: successfully wrote %llu bytes into the storage, reply size: %d.\n",
2986 dnet_dump_id(&ctl->id), (unsigned long long)ctl->io.size, wc->size);
2987 err = trans_num;
2989 ret.data = wc->reply;
2990 ret.size = wc->size;
2992 wc->reply = NULL;
2994 err_out_put:
2995 dnet_write_complete_free(wc);
2996 err_out_exit:
2997 *errp = err;
2998 return ret;
3001 int dnet_flags(struct dnet_node *n)
3003 return n->flags;
3006 static int dnet_start_defrag_complete(struct dnet_net_state *state, struct dnet_cmd *cmd, void *priv)
3008 struct dnet_wait *w = priv;
3010 if (is_trans_destroyed(state, cmd)) {
3011 dnet_wakeup(w, w->cond++);
3012 dnet_wait_put(w);
3013 return 0;
3016 return 0;
3019 static int dnet_start_defrag_single(struct dnet_net_state *st, void *priv, uint64_t cflags)
3021 struct dnet_trans_control ctl;
3023 memset(&ctl, 0, sizeof(struct dnet_trans_control));
3025 dnet_setup_id(&ctl.id, st->idc->group->group_id, st->idc->ids[0].raw.id);
3026 ctl.cmd = DNET_CMD_DEFRAG;
3027 ctl.complete = dnet_start_defrag_complete;
3028 ctl.priv = priv;
3029 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
3031 return dnet_trans_alloc_send_state(st, &ctl);
3034 int dnet_start_defrag(struct dnet_node *n, uint64_t cflags)
3036 struct dnet_net_state *st;
3037 struct dnet_wait *w;
3038 struct dnet_group *g;
3039 int num = 0;
3040 int err;
3042 w = dnet_wait_alloc(0);
3043 if (!w) {
3044 err = -ENOMEM;
3045 goto err_out_exit;
3048 pthread_mutex_lock(&n->state_lock);
3049 list_for_each_entry(g, &n->group_list, group_entry) {
3050 list_for_each_entry(st, &g->state_list, state_entry) {
3051 if (st == n->st)
3052 continue;
3054 if (w)
3055 dnet_wait_get(w);
3057 dnet_start_defrag_single(st, w, cflags);
3058 num++;
3061 pthread_mutex_unlock(&n->state_lock);
3063 err = dnet_wait_event(w, w->cond == num, &n->wait_ts);
3064 dnet_wait_put(w);
3066 err_out_exit:
3067 return err;