Tune nonblocking pool growing policy
[elliptics.git] / library / dnet_common.c
blob0575f9eafe3a3838eb98b72ef10e1a61fe819e15
1 /*
2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
17 #include <sys/stat.h>
18 #include <sys/socket.h>
19 #include <sys/mman.h>
20 #include <sys/wait.h>
22 #include <ctype.h>
23 #include <fcntl.h>
24 #include <limits.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <unistd.h>
29 #include "elliptics.h"
31 #include "elliptics/packet.h"
32 #include "elliptics/interface.h"
35 int dnet_transform(struct dnet_node *n, const void *src, uint64_t size, struct dnet_id *id)
37 struct dnet_transform *t = &n->transform;
38 unsigned int csize = sizeof(id->id);
40 return t->transform(t->priv, src, size, id->id, &csize, 0);
44 static char *dnet_cmd_strings[] = {
45 [DNET_CMD_LOOKUP] = "LOOKUP",
46 [DNET_CMD_REVERSE_LOOKUP] = "REVERSE_LOOKUP",
47 [DNET_CMD_JOIN] = "JOIN",
48 [DNET_CMD_WRITE] = "WRITE",
49 [DNET_CMD_READ] = "READ",
50 [DNET_CMD_LIST] = "CHECK",
51 [DNET_CMD_EXEC] = "EXEC",
52 [DNET_CMD_ROUTE_LIST] = "ROUTE_LIST",
53 [DNET_CMD_STAT] = "STAT",
54 [DNET_CMD_NOTIFY] = "NOTIFY",
55 [DNET_CMD_DEL] = "REMOVE",
56 [DNET_CMD_STAT_COUNT] = "STAT_COUNT",
57 [DNET_CMD_STATUS] = "STATUS",
58 [DNET_CMD_READ_RANGE] = "READ_RANGE",
59 [DNET_CMD_DEL_RANGE] = "DEL_RANGE",
60 [DNET_CMD_AUTH] = "AUTH",
61 [DNET_CMD_BULK_READ] = "BULK_READ",
62 [DNET_CMD_DEFRAG] = "DEFRAG",
63 [DNET_CMD_UNKNOWN] = "UNKNOWN",
66 static char *dnet_counter_strings[] = {
67 [DNET_CNTR_LA1] = "DNET_CNTR_LA1",
68 [DNET_CNTR_LA5] = "DNET_CNTR_LA5",
69 [DNET_CNTR_LA15] = "DNET_CNTR_LA15",
70 [DNET_CNTR_BSIZE] = "DNET_CNTR_BSIZE",
71 [DNET_CNTR_FRSIZE] = "DNET_CNTR_FRSIZE",
72 [DNET_CNTR_BLOCKS] = "DNET_CNTR_BLOCKS",
73 [DNET_CNTR_BFREE] = "DNET_CNTR_BFREE",
74 [DNET_CNTR_BAVAIL] = "DNET_CNTR_BAVAIL",
75 [DNET_CNTR_FILES] = "DNET_CNTR_FILES",
76 [DNET_CNTR_FFREE] = "DNET_CNTR_FFREE",
77 [DNET_CNTR_FAVAIL] = "DNET_CNTR_FAVAIL",
78 [DNET_CNTR_FSID] = "DNET_CNTR_FSID",
79 [DNET_CNTR_VM_ACTIVE] = "DNET_CNTR_VM_ACTIVE",
80 [DNET_CNTR_VM_INACTIVE] = "DNET_CNTR_VM_INACTIVE",
81 [DNET_CNTR_VM_TOTAL] = "DNET_CNTR_VM_TOTAL",
82 [DNET_CNTR_VM_FREE] = "DNET_CNTR_VM_FREE",
83 [DNET_CNTR_VM_CACHED] = "DNET_CNTR_VM_CACHED",
84 [DNET_CNTR_VM_BUFFERS] = "DNET_CNTR_VM_BUFFERS",
85 [DNET_CNTR_NODE_FILES] = "DNET_CNTR_NODE_FILES",
86 [DNET_CNTR_NODE_LAST_MERGE] = "DNET_CNTR_NODE_LAST_MERGE",
87 [DNET_CNTR_NODE_CHECK_COPY] = "DNET_CNTR_NODE_CHECK_COPY",
88 [DNET_CNTR_DBR_NOREC] = "DNET_CNTR_DBR_NOREC",
89 [DNET_CNTR_DBR_SYSTEM] = "DNET_CNTR_DBR_SYSTEM",
90 [DNET_CNTR_DBR_ERROR] = "DNET_CNTR_DBR_ERROR",
91 [DNET_CNTR_DBW_SYSTEM] = "DNET_CNTR_DBW_SYSTEM",
92 [DNET_CNTR_DBW_ERROR] = "DNET_CNTR_DBW_ERROR",
93 [DNET_CNTR_UNKNOWN] = "UNKNOWN",
96 char *dnet_cmd_string(int cmd)
98 if (cmd <= 0 || cmd >= __DNET_CMD_MAX || cmd >= DNET_CMD_UNKNOWN)
99 cmd = DNET_CMD_UNKNOWN;
101 return dnet_cmd_strings[cmd];
104 char *dnet_counter_string(int cntr, int cmd_num)
106 if (cntr <= 0 || cntr >= __DNET_CNTR_MAX || cntr >= DNET_CNTR_UNKNOWN)
107 cntr = DNET_CNTR_UNKNOWN;
109 if (cntr < cmd_num)
110 return dnet_cmd_string(cntr);
112 if (cntr >= cmd_num && cntr < (cmd_num * 2))
113 return dnet_cmd_string(cntr - cmd_num);
115 cntr += DNET_CNTR_LA1 - cmd_num * 2;
116 return dnet_counter_strings[cntr];
119 static int dnet_add_received_state(struct dnet_node *n, struct dnet_addr_attr *a,
120 int group_id, struct dnet_raw_id *ids, int id_num)
122 int s, err = 0;
123 struct dnet_net_state *nst;
124 struct dnet_id raw;
125 int join;
127 dnet_setup_id(&raw, group_id, ids[0].id);
129 nst = dnet_state_search_by_addr(n, &a->addr);
130 if (nst) {
131 err = -EEXIST;
132 dnet_state_put(nst);
133 goto err_out_exit;
136 s = dnet_socket_create_addr(n, a->sock_type, a->proto, a->family,
137 (struct sockaddr *)&a->addr.addr, a->addr.addr_len, 0);
138 if (s < 0) {
139 err = s;
140 goto err_out_exit;
143 join = DNET_WANT_RECONNECT;
144 if (n->flags & DNET_CFG_JOIN_NETWORK)
145 join = DNET_JOIN;
147 nst = dnet_state_create(n, group_id, ids, id_num, &a->addr, s, &err, join, dnet_state_net_process);
148 if (!nst)
149 goto err_out_close;
151 dnet_log(n, DNET_LOG_NOTICE, "%d: added received state %s.\n",
152 group_id, dnet_state_dump_addr(nst));
154 return 0;
156 err_out_close:
157 dnet_sock_close(s);
158 err_out_exit:
159 return err;
162 static int dnet_process_addr_attr(struct dnet_net_state *st, struct dnet_addr_attr *a, int group_id, int num)
164 struct dnet_node *n = st->n;
165 struct dnet_raw_id *ids;
166 int i, err;
168 ids = (struct dnet_raw_id *)(a + 1);
169 for (i=0; i<num; ++i)
170 dnet_convert_raw_id(&ids[0]);
172 err = dnet_add_received_state(n, a, group_id, ids, num);
173 dnet_log(n, DNET_LOG_DEBUG, "%s: route list: %d entries: %d.\n", dnet_server_convert_dnet_addr(&a->addr), num, err);
175 return err;
178 static int dnet_recv_route_list_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
180 struct dnet_wait *w = priv;
181 struct dnet_addr_attr *a;
182 long size;
183 int err, num;
185 if (is_trans_destroyed(st, cmd)) {
186 err = -EINVAL;
187 if (cmd)
188 err = cmd->status;
190 w->status = err;
191 dnet_wakeup(w, w->cond = 1);
192 dnet_wait_put(w);
193 goto err_out_exit;
197 err = cmd->status;
198 if (!cmd->size || err)
199 goto err_out_exit;
201 size = cmd->size + sizeof(struct dnet_cmd);
202 if (size < (signed)sizeof(struct dnet_addr_cmd)) {
203 err = -EINVAL;
204 goto err_out_exit;
207 num = (cmd->size - sizeof(struct dnet_addr_attr)) / sizeof(struct dnet_raw_id);
208 if (!num) {
209 err = -EINVAL;
210 goto err_out_exit;
213 a = (struct dnet_addr_attr *)(cmd + 1);
214 dnet_convert_addr_attr(a);
216 err = dnet_process_addr_attr(st, a, cmd->id.group_id, num);
218 err_out_exit:
219 return err;
222 int dnet_recv_route_list(struct dnet_net_state *st)
224 struct dnet_io_req req;
225 struct dnet_node *n = st->n;
226 struct dnet_trans *t;
227 struct dnet_cmd *cmd;
228 struct dnet_wait *w;
229 int err;
231 w = dnet_wait_alloc(0);
232 if (!w) {
233 err = -ENOMEM;
234 goto err_out_exit;
237 t = dnet_trans_alloc(n, sizeof(struct dnet_cmd));
238 if (!t) {
239 err = -ENOMEM;
240 goto err_out_wait_put;
243 t->complete = dnet_recv_route_list_complete;
244 t->priv = w;
246 cmd = (struct dnet_cmd *)(t + 1);
248 cmd->flags = DNET_FLAGS_NEED_ACK | DNET_FLAGS_DIRECT | DNET_FLAGS_NOLOCK;
249 cmd->status = 0;
251 memcpy(&t->cmd, cmd, sizeof(struct dnet_cmd));
253 cmd->cmd = t->command = DNET_CMD_ROUTE_LIST;
255 t->st = dnet_state_get(st);
256 cmd->trans = t->rcv_trans = t->trans = atomic_inc(&n->trans);
258 dnet_convert_cmd(cmd);
260 dnet_log(n, DNET_LOG_DEBUG, "%s: list route request to %s.\n", dnet_dump_id(&cmd->id),
261 dnet_server_convert_dnet_addr(&st->addr));
263 memset(&req, 0, sizeof(req));
264 req.st = st;
265 req.header = cmd;
266 req.hsize = sizeof(struct dnet_cmd);
268 dnet_wait_get(w);
269 err = dnet_trans_send(t, &req);
270 if (err)
271 goto err_out_destroy;
273 err = dnet_wait_event(w, w->cond != 0, &n->wait_ts);
274 dnet_wait_put(w);
276 return 0;
278 err_out_destroy:
279 dnet_trans_put(t);
280 err_out_wait_put:
281 dnet_wait_put(w);
282 err_out_exit:
283 return err;
286 static struct dnet_net_state *dnet_add_state_socket(struct dnet_node *n, struct dnet_addr *addr, int s, int *errp, int join)
288 struct dnet_net_state *st, dummy;
289 char buf[sizeof(struct dnet_addr_cmd)];
290 struct dnet_cmd *cmd;
291 int err, num, i, size;
292 struct dnet_raw_id *ids;
294 memset(buf, 0, sizeof(buf));
296 cmd = (struct dnet_cmd *)(buf);
298 cmd->flags = DNET_FLAGS_DIRECT | DNET_FLAGS_NOLOCK;
299 cmd->cmd = DNET_CMD_REVERSE_LOOKUP;
301 dnet_convert_cmd(cmd);
303 st = &dummy;
304 memset(st, 0, sizeof(struct dnet_net_state));
306 st->write_s = st->read_s = s;
307 st->n = n;
309 err = dnet_send_nolock(st, buf, sizeof(struct dnet_cmd));
310 if (err) {
311 dnet_log(n, DNET_LOG_ERROR, "Failed to send reverse "
312 "lookup message to %s, err: %d.\n",
313 dnet_server_convert_dnet_addr(addr), err);
314 goto err_out_exit;
317 err = dnet_recv(st, buf, sizeof(buf));
318 if (err) {
319 dnet_log(n, DNET_LOG_ERROR, "Failed to receive reverse "
320 "lookup headers from %s, err: %d.\n",
321 dnet_server_convert_dnet_addr(addr), err);
322 goto err_out_exit;
325 cmd = (struct dnet_cmd *)(buf);
327 dnet_convert_addr_cmd((struct dnet_addr_cmd *)buf);
329 size = cmd->size - sizeof(struct dnet_addr_attr);
330 num = size / sizeof(struct dnet_raw_id);
332 dnet_log(n, DNET_LOG_DEBUG, "%s: waiting for %d ids\n", dnet_dump_id(&cmd->id), num);
334 ids = malloc(size);
335 if (!ids) {
336 err = -ENOMEM;
337 goto err_out_exit;
340 err = dnet_recv(st, ids, size);
341 if (err) {
342 dnet_log(n, DNET_LOG_ERROR, "Failed to receive reverse "
343 "lookup body (%llu bytes) from %s, err: %d.\n",
344 (unsigned long long)cmd->size,
345 dnet_server_convert_dnet_addr(addr), err);
346 goto err_out_exit;
349 for (i=0; i<num; ++i)
350 dnet_convert_raw_id(&ids[i]);
352 st = dnet_state_create(n, cmd->id.group_id, ids, num, addr, s, &err, join, dnet_state_net_process);
353 if (!st) {
354 /* socket is already closed */
355 s = -1;
356 goto err_out_free;
358 free(ids);
360 return st;
362 err_out_free:
363 free(ids);
364 err_out_exit:
365 *errp = err;
366 if (s >= 0)
367 dnet_sock_close(s);
368 return NULL;
371 int dnet_add_state(struct dnet_node *n, struct dnet_config *cfg)
373 int s, err, join = DNET_WANT_RECONNECT;
374 struct dnet_addr addr;
375 struct dnet_net_state *st;
377 memset(&addr, 0, sizeof(addr));
379 addr.addr_len = sizeof(addr.addr);
380 s = dnet_socket_create(n, cfg, &addr, 0);
381 if (s < 0) {
382 err = s;
383 goto err_out_reconnect;
386 if (n->flags & DNET_CFG_JOIN_NETWORK)
387 join = DNET_JOIN;
389 /* will close socket on error */
390 st = dnet_add_state_socket(n, &addr, s, &err, join);
391 if (!st)
392 goto err_out_reconnect;
394 if (!(cfg->flags & DNET_CFG_NO_ROUTE_LIST))
395 dnet_recv_route_list(st);
397 return 0;
399 err_out_reconnect:
400 /* if state is already exist, it should not be an error */
401 if (err == -EEXIST)
402 err = 0;
404 if ((err == -EADDRINUSE) || (err == -ECONNREFUSED) || (err == -ECONNRESET) ||
405 (err == -EINPROGRESS) || (err == -EAGAIN))
406 dnet_add_reconnect_state(n, &addr, join);
407 return err;
410 struct dnet_write_completion {
411 void *reply;
412 int size;
413 struct dnet_wait *wait;
416 static void dnet_write_complete_free(struct dnet_write_completion *wc)
418 if (atomic_dec_and_test(&wc->wait->refcnt)) {
419 dnet_wait_destroy(wc->wait);
420 free(wc->reply);
421 free(wc);
425 static int dnet_write_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
427 int err = -EINVAL;
428 struct dnet_write_completion *wc = priv;
429 struct dnet_wait *w = wc->wait;
431 if (is_trans_destroyed(st, cmd)) {
432 dnet_wakeup(w, w->cond++);
433 dnet_write_complete_free(wc);
434 return 0;
437 err = cmd->status;
438 if (!err && st && (cmd->size > sizeof(struct dnet_addr_attr) + sizeof(struct dnet_file_info))) {
439 int old_size = wc->size;
440 void *data;
442 wc->size += cmd->size + sizeof(struct dnet_cmd) + sizeof(struct dnet_addr);
443 wc->reply = realloc(wc->reply, wc->size);
444 if (!wc->reply) {
445 err = -ENOMEM;
446 goto err_out_exit;
449 data = wc->reply + old_size;
451 memcpy(data, &st->addr, sizeof(struct dnet_addr));
452 memcpy(data + sizeof(struct dnet_addr), cmd, sizeof(struct dnet_cmd));
453 memcpy(data + sizeof(struct dnet_addr) + sizeof(struct dnet_cmd), cmd + 1, cmd->size);
456 err_out_exit:
457 pthread_mutex_lock(&w->wait_lock);
458 if (w->status < 0)
459 w->status = err;
460 pthread_mutex_unlock(&w->wait_lock);
462 return 0;
465 static struct dnet_trans *dnet_io_trans_create(struct dnet_node *n, struct dnet_io_control *ctl, int *errp)
467 struct dnet_io_req req;
468 struct dnet_trans *t = NULL;
469 struct dnet_io_attr *io;
470 struct dnet_cmd *cmd;
471 uint64_t size = ctl->io.size;
472 uint64_t tsize = sizeof(struct dnet_io_attr) + sizeof(struct dnet_cmd);
473 int err;
475 if (ctl->cmd == DNET_CMD_READ)
476 size = 0;
478 if (ctl->fd < 0 && size < DNET_COPY_IO_SIZE)
479 tsize += size;
481 t = dnet_trans_alloc(n, tsize);
482 if (!t) {
483 err = -ENOMEM;
484 goto err_out_complete;
486 t->complete = ctl->complete;
487 t->priv = ctl->priv;
489 cmd = (struct dnet_cmd *)(t + 1);
490 io = (struct dnet_io_attr *)(cmd + 1);
492 if (ctl->fd < 0 && size < DNET_COPY_IO_SIZE) {
493 if (size) {
494 void *data = io + 1;
495 memcpy(data, ctl->data, size);
499 memcpy(&cmd->id, &ctl->id, sizeof(struct dnet_id));
500 cmd->size = sizeof(struct dnet_io_attr) + size;
501 cmd->flags = ctl->cflags;
502 cmd->status = 0;
504 cmd->cmd = t->command = ctl->cmd;
506 memcpy(io, &ctl->io, sizeof(struct dnet_io_attr));
507 memcpy(&t->cmd, cmd, sizeof(struct dnet_cmd));
509 t->st = dnet_state_get_first(n, &cmd->id);
510 if (!t->st) {
511 err = -ENOENT;
512 goto err_out_destroy;
515 cmd->trans = t->rcv_trans = t->trans = atomic_inc(&n->trans);
517 dnet_log(n, DNET_LOG_INFO, "%s: created trans: %llu, cmd: %s, cflags: %llx, size: %llu, offset: %llu, "
518 "fd: %d, local_offset: %llu -> %s weight: %f, mrt: %ld.\n",
519 dnet_dump_id(&ctl->id),
520 (unsigned long long)t->trans,
521 dnet_cmd_string(ctl->cmd), (unsigned long long)cmd->flags,
522 (unsigned long long)ctl->io.size, (unsigned long long)ctl->io.offset,
523 ctl->fd,
524 (unsigned long long)ctl->local_offset,
525 dnet_server_convert_dnet_addr(&t->st->addr), t->st->weight, t->st->median_read_time);
527 dnet_convert_cmd(cmd);
528 dnet_convert_io_attr(io);
531 memset(&req, 0, sizeof(req));
532 req.st = t->st;
533 req.header = cmd;
534 req.hsize = tsize;
536 req.fd = ctl->fd;
538 if (ctl->fd >= 0) {
539 req.local_offset = ctl->local_offset;
540 req.fsize = size;
541 } else if (size >= DNET_COPY_IO_SIZE) {
542 req.data = (void *)ctl->data;
543 req.dsize = size;
546 err = dnet_trans_send(t, &req);
547 if (err)
548 goto err_out_destroy;
550 return t;
552 err_out_complete:
553 if (ctl->complete)
554 ctl->complete(NULL, NULL, ctl->priv);
555 *errp = err;
556 return NULL;
558 err_out_destroy:
559 dnet_trans_put(t);
560 *errp = err;
561 return NULL;
564 int dnet_trans_create_send_all(struct dnet_session *s, struct dnet_io_control *ctl)
566 struct dnet_node *n = s->node;
567 int num = 0, i, err;
569 for (i=0; i<s->group_num; ++i) {
570 ctl->id.group_id = s->groups[i];
572 dnet_io_trans_create(n, ctl, &err);
573 num++;
576 if (!num) {
577 dnet_io_trans_create(n, ctl, &err);
578 num++;
581 return num;
584 int dnet_write_object(struct dnet_session *s, struct dnet_io_control *ctl)
586 return dnet_trans_create_send_all(s, ctl);
589 static int dnet_write_file_id_raw(struct dnet_session *s, const char *file, struct dnet_id *id,
590 uint64_t local_offset, uint64_t remote_offset, uint64_t size,
591 uint64_t cflags, unsigned int ioflags)
593 struct dnet_node *n = s->node;
594 int fd, err, trans_num;
595 struct stat stat;
596 struct dnet_wait *w;
597 struct dnet_io_control ctl;
598 struct dnet_write_completion *wc;
600 wc = malloc(sizeof(struct dnet_write_completion));
601 if (!wc) {
602 err = -ENOMEM;
603 goto err_out_exit;
605 memset(wc, 0, sizeof(struct dnet_write_completion));
607 w = dnet_wait_alloc(0);
608 if (!w) {
609 free(wc);
610 err = -ENOMEM;
611 dnet_log(n, DNET_LOG_ERROR, "Failed to allocate read waiting structure.\n");
612 goto err_out_exit;
615 wc->wait = w;
617 fd = open(file, O_RDONLY | O_LARGEFILE | O_CLOEXEC);
618 if (fd < 0) {
619 err = -errno;
620 dnet_log_err(n, "Failed to open to be written file '%s'", file);
621 goto err_out_put;
624 err = fstat(fd, &stat);
625 if (err) {
626 err = -errno;
627 dnet_log_err(n, "Failed to stat to be written file '%s'", file);
628 goto err_out_close;
631 if (local_offset >= (uint64_t)stat.st_size) {
632 err = 0;
633 goto err_out_close;
636 if (!size || size + local_offset >= (uint64_t)stat.st_size)
637 size = stat.st_size - local_offset;
639 memset(&ctl, 0, sizeof(struct dnet_io_control));
641 atomic_set(&w->refcnt, INT_MAX);
643 ctl.data = NULL;
644 ctl.fd = fd;
645 ctl.local_offset = local_offset;
647 w->status = -ENOENT;
648 ctl.complete = dnet_write_complete;
649 ctl.priv = wc;
651 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
652 ctl.cmd = DNET_CMD_WRITE;
654 memcpy(ctl.io.id, id->id, DNET_ID_SIZE);
655 memcpy(ctl.io.parent, id->id, DNET_ID_SIZE);
657 ctl.io.flags = ioflags;
658 ctl.io.size = size;
659 ctl.io.offset = remote_offset;
660 ctl.io.type = id->type;
662 memcpy(&ctl.id, id, sizeof(struct dnet_id));
664 trans_num = dnet_write_object(s, &ctl);
665 if (trans_num < 0)
666 trans_num = 0;
669 * 1 - the first reference counter we grabbed at allocation time
671 atomic_sub(&w->refcnt, INT_MAX - trans_num - 1);
673 err = dnet_wait_event(w, w->cond == trans_num, &n->wait_ts);
674 if (err || w->status) {
675 if (!err)
676 err = w->status;
679 if (!err && !trans_num)
680 err = -EINVAL;
682 if (err) {
683 dnet_log(n, DNET_LOG_ERROR, "Failed to write file '%s' into the storage, transactions: %d, err: %d.\n", file, trans_num, err);
684 goto err_out_close;
687 dnet_log(n, DNET_LOG_NOTICE, "Successfully wrote file: '%s' into the storage, size: %llu.\n",
688 file, (unsigned long long)size);
690 close(fd);
691 dnet_write_complete_free(wc);
693 return 0;
695 err_out_close:
696 close(fd);
697 err_out_put:
698 dnet_write_complete_free(wc);
699 err_out_exit:
700 return err;
703 int dnet_write_file_id(struct dnet_session *s, const char *file, struct dnet_id *id, uint64_t local_offset,
704 uint64_t remote_offset, uint64_t size, uint64_t cflags, unsigned int ioflags)
706 int err = dnet_write_file_id_raw(s, file, id, local_offset, remote_offset, size, cflags, ioflags);
707 if (!err && !(ioflags & DNET_IO_FLAGS_CACHE_ONLY))
708 err = dnet_create_write_metadata_strings(s, NULL, 0, id, NULL, cflags);
710 return err;
713 int dnet_write_file(struct dnet_session *s, const char *file, const void *remote, int remote_len,
714 uint64_t local_offset, uint64_t remote_offset, uint64_t size,
715 uint64_t cflags, unsigned int ioflags, int type)
717 int err;
718 struct dnet_id id;
720 dnet_transform(s->node, remote, remote_len, &id);
721 id.type = type;
723 err = dnet_write_file_id_raw(s, file, &id, local_offset, remote_offset, size, cflags, ioflags);
724 if (!err && !(ioflags & DNET_IO_FLAGS_CACHE_ONLY))
725 err = dnet_create_write_metadata_strings(s, remote, remote_len, &id, NULL, cflags);
727 return err;
730 static int dnet_read_file_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
732 int fd, err;
733 struct dnet_node *n;
734 struct dnet_io_completion *c = priv;
735 struct dnet_io_attr *io;
736 void *data;
738 if (is_trans_destroyed(st, cmd)) {
739 if (c->wait) {
740 int err = 1;
741 if (cmd && cmd->status)
742 err = cmd->status;
744 dnet_wakeup(c->wait, c->wait->cond = err);
745 dnet_wait_put(c->wait);
748 free(c);
749 return 0;
752 n = st->n;
754 if (cmd->status != 0 || cmd->size == 0) {
755 err = cmd->status;
756 goto err_out_exit_no_log;
759 if (cmd->size <= sizeof(struct dnet_io_attr)) {
760 dnet_log(n, DNET_LOG_ERROR, "%s: read completion error: wrong size: cmd_size: %llu, must be more than %zu.\n",
761 dnet_dump_id(&cmd->id), (unsigned long long)cmd->size,
762 sizeof(struct dnet_io_attr));
763 err = -EINVAL;
764 goto err_out_exit_no_log;
767 io = (struct dnet_io_attr *)(cmd + 1);
768 data = io + 1;
770 dnet_convert_io_attr(io);
772 fd = open(c->file, O_RDWR | O_CREAT | O_CLOEXEC, 0644);
773 if (fd < 0) {
774 err = -errno;
775 dnet_log_err(n, "%s: failed to open read completion file '%s'", dnet_dump_id(&cmd->id), c->file);
776 goto err_out_exit;
779 err = pwrite(fd, data, io->size, c->offset);
780 if (err <= 0) {
781 err = -errno;
782 dnet_log_err(n, "%s: failed to write data into completion file '%s'", dnet_dump_id(&cmd->id), c->file);
783 goto err_out_close;
786 close(fd);
787 dnet_log(n, DNET_LOG_NOTICE, "%s: read completed: file: '%s', offset: %llu, size: %llu, status: %d.\n",
788 dnet_dump_id(&cmd->id), c->file, (unsigned long long)c->offset,
789 (unsigned long long)io->size, cmd->status);
791 return cmd->status;
793 err_out_close:
794 close(fd);
795 err_out_exit:
796 dnet_log(n, DNET_LOG_ERROR, "%s: read completed: file: '%s', offset: %llu, size: %llu, status: %d, err: %d.\n",
797 dnet_dump_id(&cmd->id), c->file, (unsigned long long)io->offset,
798 (unsigned long long)io->size, cmd->status, err);
799 err_out_exit_no_log:
800 dnet_wakeup(c->wait, c->wait->cond = err ? err : 1);
801 return err;
804 int dnet_read_object(struct dnet_session *s, struct dnet_io_control *ctl)
806 int err;
808 if (!dnet_io_trans_create(s->node, ctl, &err))
809 return err;
811 return 0;
814 static int dnet_read_file_raw_exec(struct dnet_session *s, const char *file, unsigned int len,
815 uint64_t write_offset, uint64_t io_offset, uint64_t io_size,
816 struct dnet_id *id, struct dnet_wait *w)
818 struct dnet_node *n = s->node;
819 struct dnet_io_control ctl;
820 struct dnet_io_completion *c;
821 int err, wait_init = ~0;
823 memset(&ctl, 0, sizeof(struct dnet_io_control));
825 ctl.io.size = io_size;
826 ctl.io.offset = io_offset;
828 ctl.io.type = id->type;
830 memcpy(ctl.io.parent, id->id, DNET_ID_SIZE);
831 memcpy(ctl.io.id, id->id, DNET_ID_SIZE);
833 memcpy(&ctl.id, id, sizeof(struct dnet_id));
835 ctl.fd = -1;
836 ctl.complete = dnet_read_file_complete;
837 ctl.cmd = DNET_CMD_READ;
838 ctl.cflags = DNET_FLAGS_NEED_ACK;
840 c = malloc(sizeof(struct dnet_io_completion) + len + 1 + sizeof(DNET_HISTORY_SUFFIX));
841 if (!c) {
842 dnet_log(n, DNET_LOG_ERROR, "%s: failed to allocate IO completion structure "
843 "for '%s' file reading.\n",
844 dnet_dump_id(&ctl.id), file);
845 err = -ENOMEM;
846 goto err_out_exit;
849 memset(c, 0, sizeof(struct dnet_io_completion) + len + 1 + sizeof(DNET_HISTORY_SUFFIX));
851 c->wait = dnet_wait_get(w);
852 c->offset = write_offset;
853 c->file = (char *)(c + 1);
855 sprintf(c->file, "%s", file);
857 ctl.priv = c;
859 w->cond = wait_init;
860 err = dnet_read_object(s, &ctl);
861 if (err)
862 goto err_out_exit;
864 err = dnet_wait_event(w, w->cond != wait_init, &n->wait_ts);
865 if ((err < 0) || (w->cond < 0)) {
866 char id_str[2*DNET_ID_SIZE + 1];
867 if (!err)
868 err = w->cond;
869 dnet_log(n, DNET_LOG_ERROR, "%d:%s '%s' : failed to read data: %d\n",
870 ctl.id.group_id, dnet_dump_id_len_raw(ctl.id.id, DNET_ID_SIZE, id_str),
871 file, err);
872 goto err_out_exit;
875 return 0;
877 err_out_exit:
878 return err;
881 static int dnet_read_file_raw(struct dnet_session *s, const char *file, struct dnet_id *id, uint64_t offset, uint64_t size)
883 struct dnet_node *n = s->node;
884 int err = -ENOENT, len = strlen(file), i;
885 struct dnet_wait *w;
886 int *g, num;
888 w = dnet_wait_alloc(~0);
889 if (!w) {
890 err = -ENOMEM;
891 dnet_log(n, DNET_LOG_ERROR, "Failed to allocate read waiting.\n");
892 goto err_out_exit;
895 if (!size)
896 size = ~0ULL;
898 num = dnet_mix_states(s, id, &g);
899 if (num < 0) {
900 err = num;
901 goto err_out_exit;
904 for (i=0; i<num; ++i) {
905 id->group_id = g[i];
907 err = dnet_read_file_raw_exec(s, file, len, 0, offset, size, id, w);
908 if (err)
909 continue;
911 break;
914 dnet_wait_put(w);
915 free(g);
917 err_out_exit:
918 return err;
921 int dnet_read_file_id(struct dnet_session *s, const char *file, struct dnet_id *id, uint64_t offset, uint64_t size)
923 return dnet_read_file_raw(s, file, id, offset, size);
926 int dnet_read_file(struct dnet_session *s, const char *file, const void *remote, int remote_size,
927 uint64_t offset, uint64_t size, int type)
929 struct dnet_id id;
931 dnet_transform(s->node, remote, remote_size, &id);
932 id.type = type;
934 return dnet_read_file_raw(s, file, &id, offset, size);
937 struct dnet_wait *dnet_wait_alloc(int cond)
939 int err;
940 struct dnet_wait *w;
942 w = malloc(sizeof(struct dnet_wait));
943 if (!w) {
944 err = -ENOMEM;
945 goto err_out_exit;
948 memset(w, 0, sizeof(struct dnet_wait));
950 err = pthread_cond_init(&w->wait, NULL);
951 if (err)
952 goto err_out_exit;
954 err = pthread_mutex_init(&w->wait_lock, NULL);
955 if (err)
956 goto err_out_destroy;
958 w->cond = cond;
959 atomic_init(&w->refcnt, 1);
961 return w;
963 err_out_destroy:
964 pthread_mutex_destroy(&w->wait_lock);
965 err_out_exit:
966 return NULL;
969 void dnet_wait_destroy(struct dnet_wait *w)
971 pthread_mutex_destroy(&w->wait_lock);
972 pthread_cond_destroy(&w->wait);
973 free(w->ret);
974 free(w);
977 static int dnet_send_cmd_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
979 struct dnet_wait *w = priv;
981 if (is_trans_destroyed(st, cmd)) {
982 dnet_wakeup(w, w->cond++);
983 dnet_wait_put(w);
984 return 0;
987 w->status = cmd->status;
989 if (cmd->size) {
990 void *old = w->ret;
991 void *data = cmd + 1;
993 w->ret = realloc(w->ret, w->size + cmd->size);
994 if (!w->ret) {
995 w->ret = old;
996 w->status = -ENOMEM;
997 } else {
998 memcpy(w->ret + w->size, data, cmd->size);
999 w->size += cmd->size;
1003 return w->status;
1006 static int dnet_send_cmd_single(struct dnet_net_state *st, struct dnet_wait *w, struct sph *e, uint64_t cflags)
1008 struct dnet_trans_control ctl;
1010 memset(&ctl, 0, sizeof(struct dnet_trans_control));
1012 dnet_setup_id(&ctl.id, st->idc->group->group_id, st->idc->ids[0].raw.id);
1013 ctl.size = sizeof(struct sph) + e->event_size + e->data_size + e->binary_size;
1014 ctl.cmd = DNET_CMD_EXEC;
1015 ctl.complete = dnet_send_cmd_complete;
1016 ctl.priv = w;
1017 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
1019 dnet_convert_sph(e);
1021 ctl.data = e;
1023 return dnet_trans_alloc_send_state(st, &ctl);
1026 static int dnet_send_cmd_raw(struct dnet_session *s, struct dnet_id *id,
1027 struct sph *e, void **ret, uint64_t cflags)
1029 struct dnet_node *n = s->node;
1030 struct dnet_net_state *st;
1031 int err = -ENOENT, num = 0;
1032 struct dnet_wait *w;
1033 struct dnet_group *g;
1035 w = dnet_wait_alloc(0);
1036 if (!w) {
1037 err = -ENOMEM;
1038 goto err_out_exit;
1041 if (id && id->group_id != 0) {
1042 dnet_wait_get(w);
1043 st = dnet_state_get_first(n, id);
1044 if (!st)
1045 goto err_out_put;
1046 err = dnet_send_cmd_single(st, w, e, cflags);
1047 dnet_state_put(st);
1048 num = 1;
1049 } else if (id && id->group_id == 0) {
1050 pthread_mutex_lock(&n->state_lock);
1051 list_for_each_entry(g, &n->group_list, group_entry) {
1052 dnet_wait_get(w);
1054 id->group_id = g->group_id;
1056 st = dnet_state_search_nolock(n, id);
1057 if (st) {
1058 if (st != n->st) {
1059 err = dnet_send_cmd_single(st, w, e, cflags);
1060 num++;
1062 dnet_state_put(st);
1065 pthread_mutex_unlock(&n->state_lock);
1066 } else {
1067 pthread_mutex_lock(&n->state_lock);
1068 list_for_each_entry(g, &n->group_list, group_entry) {
1069 list_for_each_entry(st, &g->state_list, state_entry) {
1070 if (st == n->st)
1071 continue;
1073 dnet_wait_get(w);
1075 err = dnet_send_cmd_single(st, w, e, cflags);
1076 num++;
1079 pthread_mutex_unlock(&n->state_lock);
1082 err = dnet_wait_event(w, w->cond == num, &n->wait_ts);
1083 if (err)
1084 goto err_out_put;
1086 if (w->ret) {
1087 *ret = w->ret;
1088 w->ret = NULL;
1090 err = w->size;
1093 dnet_wait_put(w);
1095 return err;
1097 err_out_put:
1098 dnet_wait_put(w);
1099 err_out_exit:
1100 return err;
1103 int dnet_send_cmd(struct dnet_session *s, struct dnet_id *id, struct sph *e, void **ret)
1105 return dnet_send_cmd_raw(s, id, e, ret, 0);
1108 int dnet_send_cmd_nolock(struct dnet_session *s, struct dnet_id *id, struct sph *e, void **ret)
1110 return dnet_send_cmd_raw(s, id, e, ret, DNET_FLAGS_NOLOCK);
1113 int dnet_try_reconnect(struct dnet_node *n)
1115 struct dnet_addr_storage *ast, *tmp;
1116 struct dnet_net_state *st;
1117 LIST_HEAD(list);
1118 int s, err, join;
1120 if (list_empty(&n->reconnect_list))
1121 return 0;
1123 pthread_mutex_lock(&n->reconnect_lock);
1124 list_for_each_entry_safe(ast, tmp, &n->reconnect_list, reconnect_entry) {
1125 list_move(&ast->reconnect_entry, &list);
1127 pthread_mutex_unlock(&n->reconnect_lock);
1129 list_for_each_entry_safe(ast, tmp, &list, reconnect_entry) {
1130 s = dnet_socket_create_addr(n, n->sock_type, n->proto, n->family,
1131 (struct sockaddr *)ast->addr.addr, ast->addr.addr_len, 0);
1132 if (s < 0)
1133 goto out_add;
1135 join = DNET_WANT_RECONNECT;
1136 if (ast->__join_state == DNET_JOIN)
1137 join = DNET_JOIN;
1139 st = dnet_add_state_socket(n, &ast->addr, s, &err, join);
1140 if (st)
1141 goto out_remove;
1143 dnet_sock_close(s);
1145 if (err == -EEXIST || err == -EINVAL)
1146 goto out_remove;
1148 out_add:
1149 dnet_add_reconnect_state(n, &ast->addr, ast->__join_state);
1150 out_remove:
1151 list_del(&ast->reconnect_entry);
1152 free(ast);
1155 return 0;
1158 int dnet_lookup_object(struct dnet_session *s, struct dnet_id *id, uint64_t cflags,
1159 int (* complete)(struct dnet_net_state *, struct dnet_cmd *, void *),
1160 void *priv)
1162 struct dnet_node *n = s->node;
1163 struct dnet_io_req req;
1164 struct dnet_trans *t;
1165 struct dnet_cmd *cmd;
1166 int err;
1168 t = dnet_trans_alloc(n, sizeof(struct dnet_cmd));
1169 if (!t) {
1170 err = -ENOMEM;
1171 goto err_out_complete;
1173 t->complete = complete;
1174 t->priv = priv;
1176 cmd = (struct dnet_cmd *)(t + 1);
1178 memcpy(&cmd->id, id, sizeof(struct dnet_id));
1180 memcpy(&t->cmd, cmd, sizeof(struct dnet_cmd));
1182 cmd->cmd = t->command = DNET_CMD_LOOKUP;
1183 cmd->flags = cflags | DNET_FLAGS_NEED_ACK;
1185 t->st = dnet_state_get_first(n, &cmd->id);
1186 if (!t->st) {
1187 err = -ENOENT;
1188 goto err_out_destroy;
1191 cmd->trans = t->rcv_trans = t->trans = atomic_inc(&n->trans);
1192 dnet_convert_cmd(cmd);
1194 dnet_log(n, DNET_LOG_NOTICE, "%s: lookup to %s.\n", dnet_dump_id(id), dnet_server_convert_dnet_addr(&t->st->addr));
1196 memset(&req, 0, sizeof(req));
1197 req.st = t->st;
1198 req.header = cmd;
1199 req.hsize = sizeof(struct dnet_cmd);
1201 err = dnet_trans_send(t, &req);
1202 if (err)
1203 goto err_out_destroy;
1205 return 0;
1207 err_out_complete:
1208 if (complete)
1209 complete(NULL, NULL, priv);
1210 return err;
1212 err_out_destroy:
1213 dnet_trans_put(t);
1214 return err;
1217 int dnet_lookup_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
1219 struct dnet_wait *w = priv;
1220 struct dnet_node *n = NULL;
1221 struct dnet_addr_attr *a;
1222 struct dnet_net_state *other;
1223 char addr_str[128] = "no-address";
1224 int err;
1226 if (is_trans_destroyed(st, cmd)) {
1227 dnet_wakeup(w, w->cond++);
1228 dnet_wait_put(w);
1229 return 0;
1231 n = st->n;
1233 err = cmd->status;
1234 if (err || !cmd->size)
1235 goto err_out_exit;
1237 if (cmd->size < sizeof(struct dnet_addr_attr)) {
1238 dnet_log(st->n, DNET_LOG_ERROR, "%s: wrong dnet_addr attribute size %llu, must be at least %zu.\n",
1239 dnet_dump_id(&cmd->id), (unsigned long long)cmd->size, sizeof(struct dnet_addr_attr));
1240 err = -EINVAL;
1241 goto err_out_exit;
1244 a = (struct dnet_addr_attr *)(cmd + 1);
1246 dnet_convert_addr_attr(a);
1247 dnet_server_convert_dnet_addr_raw(&a->addr, addr_str, sizeof(addr_str));
1249 if (cmd->size > sizeof(struct dnet_addr_attr) + sizeof(struct dnet_file_info)) {
1250 struct dnet_file_info *info = (struct dnet_file_info *)(a + 1);
1252 dnet_convert_file_info(info);
1254 dnet_log_raw(n, DNET_LOG_NOTICE, "%s: lookup object: %s: "
1255 "offset: %llu, size: %llu, mode: %llo, path: %s\n",
1256 dnet_dump_id(&cmd->id), addr_str,
1257 (unsigned long long)info->offset, (unsigned long long)info->size,
1258 (unsigned long long)info->mode, (char *)(info + 1));
1259 } else {
1260 dnet_log_raw(n, DNET_LOG_INFO, "%s: lookup object: %s\n",
1261 dnet_dump_id(&cmd->id), addr_str);
1265 other = dnet_state_search_by_addr(n, &a->addr);
1266 if (other) {
1267 dnet_state_put(other);
1268 } else {
1269 dnet_recv_route_list(st);
1272 return 0;
1274 err_out_exit:
1275 if (n)
1276 dnet_log(n, DNET_LOG_ERROR, "%s: lookup completion status: %d, err: %d.\n", dnet_dump_id(&cmd->id), cmd->status, err);
1278 return err;
1281 int dnet_lookup(struct dnet_session *s, const char *file)
1283 struct dnet_node *n = s->node;
1284 int err, error = 0, i;
1285 struct dnet_wait *w;
1286 struct dnet_id raw;
1288 w = dnet_wait_alloc(0);
1289 if (!w) {
1290 err = -ENOMEM;
1291 goto err_out_exit;
1294 dnet_transform(n, file, strlen(file), &raw);
1296 for (i=0; i<s->group_num; ++i) {
1297 raw.group_id = s->groups[i];
1299 err = dnet_lookup_object(s, &raw, 0, dnet_lookup_complete, dnet_wait_get(w));
1300 if (err) {
1301 error = err;
1302 continue;
1305 err = dnet_wait_event(w, w->cond == 1, &n->wait_ts);
1306 if (err || w->status) {
1307 if (!err)
1308 err = w->status;
1309 error = err;
1310 continue;
1313 error = 0;
1314 break;
1317 dnet_wait_put(w);
1318 return error;
1320 err_out_exit:
1321 return err;
1324 struct dnet_addr *dnet_state_addr(struct dnet_net_state *st)
1326 return &st->addr;
1329 static int dnet_stat_complete(struct dnet_net_state *state, struct dnet_cmd *cmd, void *priv)
1331 struct dnet_wait *w = priv;
1332 float la[3];
1333 struct dnet_stat *st;
1334 int err = -EINVAL;
1336 if (is_trans_destroyed(state, cmd)) {
1337 dnet_wakeup(w, w->cond++);
1338 dnet_wait_put(w);
1339 return 0;
1342 if (cmd->cmd == DNET_CMD_STAT && cmd->size == sizeof(struct dnet_stat)) {
1343 st = (struct dnet_stat *)(cmd + 1);
1345 dnet_convert_stat(st);
1347 la[0] = (float)st->la[0] / 100.0;
1348 la[1] = (float)st->la[1] / 100.0;
1349 la[2] = (float)st->la[2] / 100.0;
1351 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: la: %.2f %.2f %.2f.\n",
1352 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state),
1353 la[0], la[1], la[2]);
1354 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: mem: "
1355 "total: %llu kB, free: %llu kB, cache: %llu kB.\n",
1356 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state),
1357 (unsigned long long)st->vm_total,
1358 (unsigned long long)st->vm_free,
1359 (unsigned long long)st->vm_cached);
1360 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: fs: "
1361 "total: %llu mB, avail: %llu mB, files: %llu, fsid: %llx.\n",
1362 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state),
1363 (unsigned long long)(st->frsize * st->blocks / 1024 / 1024),
1364 (unsigned long long)(st->bavail * st->bsize / 1024 / 1024),
1365 (unsigned long long)st->files, (unsigned long long)st->fsid);
1366 err = 0;
1367 } else if (cmd->size >= sizeof(struct dnet_addr_stat) && cmd->cmd == DNET_CMD_STAT_COUNT) {
1368 struct dnet_addr_stat *as = (struct dnet_addr_stat *)(cmd + 1);
1369 int i;
1371 dnet_convert_addr_stat(as, 0);
1373 for (i=0; i<as->num; ++i) {
1374 if (as->num > as->cmd_num) {
1375 if (i == 0)
1376 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: Storage commands\n",
1377 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state));
1378 if (i == as->cmd_num)
1379 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: Proxy commands\n",
1380 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state));
1381 if (i == as->cmd_num * 2)
1382 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: Counters\n",
1383 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state));
1385 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: cmd: %s, count: %llu, err: %llu\n",
1386 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state),
1387 dnet_counter_string(i, as->cmd_num),
1388 (unsigned long long)as->count[i].count, (unsigned long long)as->count[i].err);
1392 return err;
1395 static int dnet_request_cmd_single(struct dnet_session *s, struct dnet_net_state *st, struct dnet_trans_control *ctl)
1397 if (st)
1398 return dnet_trans_alloc_send_state(st, ctl);
1399 else
1400 return dnet_trans_alloc_send(s, ctl);
1403 int dnet_request_stat(struct dnet_session *s, struct dnet_id *id,
1404 unsigned int cmd, uint64_t cflags,
1405 int (* complete)(struct dnet_net_state *state,
1406 struct dnet_cmd *cmd,
1407 void *priv),
1408 void *priv)
1410 struct dnet_node *n = s->node;
1411 struct dnet_trans_control ctl;
1412 struct dnet_wait *w = NULL;
1413 int err, num = 0;
1414 struct timeval start, end;
1415 long diff;
1417 gettimeofday(&start, NULL);
1419 if (!complete) {
1420 w = dnet_wait_alloc(0);
1421 if (!w) {
1422 err = -ENOMEM;
1423 goto err_out_exit;
1426 complete = dnet_stat_complete;
1427 priv = w;
1430 memset(&ctl, 0, sizeof(struct dnet_trans_control));
1432 ctl.cmd = cmd;
1433 ctl.complete = complete;
1434 ctl.priv = priv;
1435 ctl.cflags = DNET_FLAGS_NEED_ACK | DNET_FLAGS_NOLOCK | cflags;
1437 if (id) {
1438 if (w)
1439 dnet_wait_get(w);
1441 memcpy(&ctl.id, id, sizeof(struct dnet_id));
1443 err = dnet_request_cmd_single(s, NULL, &ctl);
1444 num = 1;
1445 } else {
1446 struct dnet_net_state *st;
1447 struct dnet_group *g;
1450 pthread_mutex_lock(&n->state_lock);
1451 list_for_each_entry(g, &n->group_list, group_entry) {
1452 list_for_each_entry(st, &g->state_list, state_entry) {
1453 if (st == n->st)
1454 continue;
1456 if (w)
1457 dnet_wait_get(w);
1459 dnet_setup_id(&ctl.id, st->idc->group->group_id, st->idc->ids[0].raw.id);
1460 dnet_request_cmd_single(s, st, &ctl);
1461 num++;
1464 pthread_mutex_unlock(&n->state_lock);
1467 if (!w) {
1468 gettimeofday(&end, NULL);
1469 diff = (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec;
1470 dnet_log(n, DNET_LOG_NOTICE, "stat cmd: %s: %ld usecs, num: %d.\n", dnet_cmd_string(cmd), diff, num);
1472 return num;
1475 err = dnet_wait_event(w, w->cond == num, &n->wait_ts);
1477 gettimeofday(&end, NULL);
1478 diff = (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec;
1479 dnet_log(n, DNET_LOG_NOTICE, "stat cmd: %s: %ld usecs, wait_error: %d, num: %d.\n", dnet_cmd_string(cmd), diff, err, num);
1481 if (err)
1482 goto err_out_put;
1484 dnet_wait_put(w);
1486 return num;
1488 err_out_put:
1489 dnet_wait_put(w);
1490 err_out_exit:
1491 return err;
1494 struct dnet_request_cmd_priv {
1495 struct dnet_wait *w;
1497 int (* complete)(struct dnet_net_state *state, struct dnet_cmd *cmd, void *priv);
1498 void *priv;
1501 static int dnet_request_cmd_complete(struct dnet_net_state *state, struct dnet_cmd *cmd, void *priv)
1503 struct dnet_request_cmd_priv *p = priv;
1504 int err = p->complete(state, cmd, p->priv);
1506 if (is_trans_destroyed(state, cmd)) {
1507 struct dnet_wait *w = p->w;
1509 dnet_wakeup(w, w->cond++);
1510 if (atomic_read(&w->refcnt) == 1)
1511 free(p);
1512 dnet_wait_put(w);
1515 return err;
1518 int dnet_request_cmd(struct dnet_session *s, struct dnet_trans_control *ctl)
1520 struct dnet_node *n = s->node;
1521 int err, num = 0;
1522 struct dnet_request_cmd_priv *p;
1523 struct dnet_wait *w;
1524 struct dnet_net_state *st;
1525 struct dnet_group *g;
1526 struct timeval start, end;
1527 long diff;
1529 gettimeofday(&start, NULL);
1531 p = malloc(sizeof(*p));
1532 if (!p) {
1533 err = -ENOMEM;
1534 goto err_out_exit;
1537 w = dnet_wait_alloc(0);
1538 if (!w) {
1539 err = -ENOMEM;
1540 goto err_out_free;
1543 p->w = w;
1544 p->complete = ctl->complete;
1545 p->priv = ctl->priv;
1547 ctl->complete = dnet_request_cmd_complete;
1548 ctl->priv = p;
1550 pthread_mutex_lock(&n->state_lock);
1551 list_for_each_entry(g, &n->group_list, group_entry) {
1552 list_for_each_entry(st, &g->state_list, state_entry) {
1553 if (st == n->st)
1554 continue;
1556 dnet_wait_get(w);
1558 ctl->id.group_id = g->group_id;
1560 if (!(ctl->cflags & DNET_FLAGS_DIRECT))
1561 dnet_setup_id(&ctl->id, st->idc->group->group_id, st->idc->ids[0].raw.id);
1562 dnet_request_cmd_single(s, st, ctl);
1563 num++;
1566 pthread_mutex_unlock(&n->state_lock);
1568 err = dnet_wait_event(w, w->cond == num, &n->wait_ts);
1570 gettimeofday(&end, NULL);
1571 diff = (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec;
1572 dnet_log(n, DNET_LOG_NOTICE, "request cmd: %s: %ld usecs, wait_error: %d, num: %d.\n", dnet_cmd_string(ctl->cmd), diff, err, num);
1574 if (!err)
1575 err = num;
1577 if (atomic_read(&w->refcnt) == 1)
1578 free(p);
1579 dnet_wait_put(w);
1581 return err;
1583 err_out_free:
1584 free(p);
1585 err_out_exit:
1586 return err;
1589 struct dnet_update_status_priv {
1590 struct dnet_wait *w;
1591 struct dnet_node_status status;
1592 atomic_t refcnt;
1595 static int dnet_update_status_complete(struct dnet_net_state *state, struct dnet_cmd *cmd, void *priv)
1597 struct dnet_update_status_priv *p = priv;
1599 if (is_trans_destroyed(state, cmd)) {
1600 dnet_wakeup(p->w, p->w->cond++);
1601 dnet_wait_put(p->w);
1602 if (atomic_dec_and_test(&p->refcnt))
1603 free(p);
1606 if (cmd->size == sizeof(struct dnet_node_status)) {
1607 memcpy(&p->status, cmd + 1, sizeof(struct dnet_node_status));
1608 return 0;
1611 return -ENOENT;
1614 int dnet_update_status(struct dnet_session *s, struct dnet_addr *addr, struct dnet_id *id, struct dnet_node_status *status)
1616 int err;
1617 struct dnet_update_status_priv *priv;
1618 struct dnet_trans_control ctl;
1620 if (!id && !addr) {
1621 err = -EINVAL;
1622 goto err_out_exit;
1625 memset(&ctl, 0, sizeof(ctl));
1627 if (id) {
1628 memcpy(&ctl.id, id, sizeof(struct dnet_id));
1629 } else {
1630 struct dnet_net_state *st;
1632 st = dnet_state_search_by_addr(s->node, addr);
1633 if (!st) {
1634 err = -ENOENT;
1635 goto err_out_exit;
1638 dnet_setup_id(&ctl.id, st->idc->group->group_id, st->idc->ids[0].raw.id);
1639 dnet_state_put(st);
1642 priv = malloc(sizeof(struct dnet_update_status_priv));
1643 if (!priv) {
1644 err = -ENOMEM;
1645 goto err_out_exit;
1648 priv->w = dnet_wait_alloc(0);
1649 if (!priv->w) {
1650 err = -ENOMEM;
1651 goto err_out_exit;
1654 ctl.complete = dnet_update_status_complete;
1655 ctl.priv = priv;
1656 ctl.cmd = DNET_CMD_STATUS;
1657 ctl.cflags = DNET_FLAGS_NEED_ACK;
1658 ctl.size = sizeof(struct dnet_node_status);
1659 ctl.data = status;
1661 dnet_wait_get(priv->w);
1662 dnet_request_cmd_single(s, NULL, &ctl);
1664 err = dnet_wait_event(priv->w, priv->w->cond == 1, &s->node->wait_ts);
1665 dnet_wait_put(priv->w);
1666 if (!err && priv) {
1667 memcpy(status, &priv->status, sizeof(struct dnet_node_status));
1669 if (atomic_dec_and_test(&priv->refcnt))
1670 free(priv);
1672 err_out_exit:
1673 return err;
1676 static int dnet_remove_object_raw(struct dnet_session *s, struct dnet_id *id,
1677 int (* complete)(struct dnet_net_state *state,
1678 struct dnet_cmd *cmd,
1679 void *priv),
1680 void *priv, uint64_t cflags, uint64_t ioflags)
1682 struct dnet_io_control ctl;
1683 int err;
1685 memset(&ctl, 0, sizeof(struct dnet_io_control));
1687 memcpy(&ctl.id, id, sizeof(struct dnet_id));
1689 memcpy(&ctl.io.id, id->id, DNET_ID_SIZE);
1690 memcpy(&ctl.io.parent, id->id, DNET_ID_SIZE);
1691 ctl.io.flags = ioflags;
1693 ctl.fd = -1;
1695 ctl.cmd = DNET_CMD_DEL;
1696 ctl.complete = complete;
1697 ctl.priv = priv;
1698 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
1700 err = dnet_trans_create_send_all(s, &ctl);
1701 if (err == 0)
1702 err = -ECONNRESET;
1703 if (err > 0)
1704 err = 0;
1706 return err;
1709 static int dnet_remove_complete(struct dnet_net_state *state,
1710 struct dnet_cmd *cmd,
1711 void *priv)
1713 struct dnet_wait *w = priv;
1715 if (is_trans_destroyed(state, cmd)) {
1716 dnet_wakeup(w, w->cond++);
1717 dnet_wait_put(w);
1718 return 0;
1721 if (cmd->status)
1722 w->status = cmd->status;
1723 return cmd->status;
1726 int dnet_remove_object(struct dnet_session *s, struct dnet_id *id,
1727 int (* complete)(struct dnet_net_state *state,
1728 struct dnet_cmd *cmd,
1729 void *priv),
1730 void *priv,
1731 uint64_t cflags, uint64_t ioflags)
1733 struct dnet_wait *w = NULL;
1734 int err;
1736 if (!complete) {
1737 w = dnet_wait_alloc(0);
1738 if (!w) {
1739 err = -ENOMEM;
1740 goto err_out_exit;
1743 complete = dnet_remove_complete;
1744 priv = w;
1745 dnet_wait_get(w);
1748 err = dnet_remove_object_raw(s, id, complete, priv, cflags, ioflags);
1749 if (err < 0)
1750 goto err_out_put;
1752 if (w) {
1753 err = dnet_wait_event(w, w->cond != err, &s->node->wait_ts);
1754 if (err)
1755 goto err_out_put;
1757 if (w->status < 0) {
1758 err = w->status;
1759 goto err_out_put;
1762 dnet_wait_put(w);
1764 return 0;
1766 err_out_put:
1767 if (w)
1768 dnet_wait_put(w);
1769 err_out_exit:
1770 return err;
1773 static int dnet_remove_file_raw(struct dnet_session *s, struct dnet_id *id, uint64_t cflags, uint64_t ioflags)
1775 struct dnet_wait *w;
1776 int err, num;
1778 w = dnet_wait_alloc(0);
1779 if (!w) {
1780 err = -ENOMEM;
1781 goto err_out_exit;
1784 atomic_add(&w->refcnt, 1024);
1785 err = dnet_remove_object_raw(s, id, dnet_remove_complete, w, cflags, ioflags);
1786 if (err < 0) {
1787 atomic_sub(&w->refcnt, 1024);
1788 goto err_out_put;
1791 num = err;
1792 atomic_sub(&w->refcnt, 1024 - num);
1794 err = dnet_wait_event(w, w->cond == num, &s->node->wait_ts);
1795 if (err)
1796 goto err_out_put;
1798 if (w->status < 0) {
1799 err = w->status;
1800 goto err_out_put;
1804 dnet_wait_put(w);
1806 return 0;
1808 err_out_put:
1809 dnet_wait_put(w);
1810 err_out_exit:
1811 return err;
1814 int dnet_remove_object_now(struct dnet_session *s, struct dnet_id *id, uint64_t cflags, uint64_t ioflags)
1816 return dnet_remove_file_raw(s, id, cflags | DNET_FLAGS_NEED_ACK | DNET_ATTR_DELETE_HISTORY, ioflags);
1819 int dnet_remove_file(struct dnet_session *s, char *remote, int remote_len, struct dnet_id *id, uint64_t cflags, uint64_t ioflags)
1821 struct dnet_id raw;
1823 if (!id) {
1824 dnet_transform(s->node, remote, remote_len, &raw);
1825 raw.group_id = 0;
1826 id = &raw;
1829 return dnet_remove_file_raw(s, id, cflags, ioflags);
1832 int dnet_request_ids(struct dnet_session *s, struct dnet_id *id, uint64_t cflags,
1833 int (* complete)(struct dnet_net_state *state,
1834 struct dnet_cmd *cmd,
1835 void *priv),
1836 void *priv)
1838 struct dnet_trans_control ctl;
1840 dnet_log_raw(s->node, DNET_LOG_ERROR, "Temporarily unsupported operation.\n");
1841 exit(-1);
1843 memset(&ctl, 0, sizeof(struct dnet_trans_control));
1845 memcpy(&ctl.id, id, sizeof(struct dnet_id));
1846 ctl.cmd = DNET_CMD_LIST;
1847 ctl.complete = complete;
1848 ctl.priv = priv;
1849 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
1851 return dnet_trans_alloc_send(s, &ctl);
1854 struct dnet_node *dnet_get_node_from_state(void *state)
1856 struct dnet_net_state *st = state;
1858 if (!st)
1859 return NULL;
1860 return st->n;
1863 struct dnet_read_data_completion {
1864 struct dnet_wait *w;
1865 void *data;
1866 uint64_t size;
1867 atomic_t refcnt;
1870 static int dnet_read_data_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
1872 struct dnet_read_data_completion *c = priv;
1873 struct dnet_wait *w = c->w;
1874 int err = -EINVAL;
1876 if (is_trans_destroyed(st, cmd)) {
1877 dnet_wakeup(w, w->cond++);
1878 dnet_wait_put(w);
1879 if (atomic_dec_and_test(&c->refcnt))
1880 free(c);
1881 return err;
1884 err = cmd->status;
1885 if (err)
1886 w->status = err;
1888 if (cmd->size >= sizeof(struct dnet_io_attr)) {
1889 struct dnet_io_attr *io = (struct dnet_io_attr *)(cmd + 1);
1890 uint64_t sz = c->size;
1892 dnet_convert_io_attr(io);
1894 sz += io->size + sizeof(struct dnet_io_attr);
1895 c->data = realloc(c->data, sz);
1896 if (!c->data) {
1897 err = -ENOMEM;
1898 goto err_out_exit;
1901 memcpy(c->data + c->size, io, sizeof(struct dnet_io_attr) + io->size);
1902 c->size = sz;
1905 err_out_exit:
1906 dnet_log(st->n, DNET_LOG_NOTICE, "%s: object read completed: trans: %llu, status: %d, err: %d.\n",
1907 dnet_dump_id(&cmd->id), (unsigned long long)(cmd->trans & ~DNET_TRANS_REPLY),
1908 cmd->status, err);
1910 return err;
1913 void *dnet_read_data_wait_raw(struct dnet_session *s, struct dnet_id *id, struct dnet_io_attr *io,
1914 int cmd, uint64_t cflags, int *errp)
1916 struct dnet_node *n = s->node;
1917 struct dnet_io_control ctl;
1918 struct dnet_wait *w;
1919 struct dnet_read_data_completion *c;
1920 void *data = NULL;
1921 int err;
1923 w = dnet_wait_alloc(0);
1924 if (!w) {
1925 err = -ENOMEM;
1926 goto err_out_exit;
1929 c = malloc(sizeof(*c));
1930 if (!c) {
1931 err = -ENOMEM;
1932 goto err_out_put;
1935 c->w = w;
1936 c->size = 0;
1937 c->data = NULL;
1938 /* one for completion callback, another for this function */
1939 atomic_init(&c->refcnt, 2);
1941 memset(&ctl, 0, sizeof(struct dnet_io_control));
1943 ctl.fd = -1;
1945 ctl.priv = c;
1946 ctl.complete = dnet_read_data_complete;
1948 ctl.cmd = cmd;
1949 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
1951 memcpy(&ctl.io, io, sizeof(struct dnet_io_attr));
1952 memcpy(&ctl.id, id, sizeof(struct dnet_id));
1954 ctl.id.type = io->type;
1956 dnet_wait_get(w);
1957 err = dnet_read_object(s, &ctl);
1958 if (err)
1959 goto err_out_put_complete;
1961 err = dnet_wait_event(w, w->cond, &n->wait_ts);
1962 if (err || w->status) {
1963 char id_str[2*DNET_ID_SIZE + 1];
1964 if (!err)
1965 err = w->status;
1966 if ((cmd != DNET_CMD_READ_RANGE) || (err != -ENOENT))
1967 dnet_log(n, DNET_LOG_ERROR, "%d:%s : failed to read data: %d\n",
1968 ctl.id.group_id, dnet_dump_id_len_raw(ctl.id.id, DNET_ID_SIZE, id_str), err);
1969 goto err_out_put_complete;
1971 io->size = c->size;
1972 data = c->data;
1973 err = 0;
1975 err_out_put_complete:
1976 if (atomic_dec_and_test(&c->refcnt))
1977 free(c);
1978 err_out_put:
1979 dnet_wait_put(w);
1980 err_out_exit:
1981 *errp = err;
1982 return data;
1985 static int dnet_read_recover(struct dnet_session *s, struct dnet_id *id, struct dnet_io_attr *io, void *data, uint64_t cflags)
1987 struct dnet_node *n = s->node;
1988 struct dnet_meta_container mc;
1989 struct dnet_io_control ctl;
1990 void *result;
1991 int err;
1993 err = dnet_read_meta(s, &mc, NULL, 0, id);
1994 if (err) {
1995 dnet_log(n, DNET_LOG_ERROR, "%s: read-recovery: could read metadata: %d\n", dnet_dump_id(id), err);
1996 goto err_out_exit;
1999 memset(&ctl, 0, sizeof(struct dnet_io_control));
2001 ctl.id = *id;
2002 ctl.io = *io;
2004 ctl.data = data + sizeof(struct dnet_io_attr);
2005 ctl.io.size -= sizeof(struct dnet_io_attr);
2007 ctl.fd = -1;
2008 ctl.cmd = DNET_CMD_WRITE;
2009 ctl.cflags = cflags;
2011 err = dnet_write_data_wait(s, &ctl, &result);
2012 if (err < 0) {
2013 dnet_log(n, DNET_LOG_ERROR, "%s: read-recovery: could not write data: %d\n", dnet_dump_id(id), err);
2014 goto err_out_free_meta;
2017 err = dnet_write_metadata(s, &mc, 0, cflags);
2018 if (err < 0)
2019 goto err_out_free_result;
2021 err_out_free_result:
2022 free(result);
2023 err_out_free_meta:
2024 free(mc.data);
2025 err_out_exit:
2026 return err;
2029 void *dnet_read_data_wait_groups(struct dnet_session *s, struct dnet_id *id, int *groups, int num,
2030 struct dnet_io_attr *io, uint64_t cflags, int *errp)
2032 int i;
2033 void *data;
2035 for (i = 0; i < num; ++i) {
2036 id->group_id = groups[i];
2038 data = dnet_read_data_wait_raw(s, id, io, DNET_CMD_READ, cflags, errp);
2039 if (data) {
2040 if ((i != 0) && (io->type == 0) && (io->offset == 0) && (io->size > sizeof(struct dnet_io_attr))) {
2041 dnet_read_recover(s, id, io, data, cflags);
2044 *errp = 0;
2045 return data;
2049 return NULL;
2052 void *dnet_read_data_wait(struct dnet_session *s, struct dnet_id *id, struct dnet_io_attr *io,
2053 uint64_t cflags, int *errp)
2055 int num, *g, err;
2056 void *data = NULL;
2058 num = dnet_mix_states(s, id, &g);
2059 if (num < 0) {
2060 err = num;
2061 goto err_out_exit;
2064 data = dnet_read_data_wait_groups(s, id, g, num, io, cflags, &err);
2065 if (!data)
2066 goto err_out_free;
2068 err_out_free:
2069 free(g);
2070 err_out_exit:
2071 *errp = err;
2072 return data;
2075 int dnet_write_data_wait(struct dnet_session *s, struct dnet_io_control *ctl, void **result)
2077 struct dnet_node *n = s->node;
2078 int err, trans_num = 0;
2079 struct dnet_wait *w;
2080 struct dnet_write_completion *wc;
2082 wc = malloc(sizeof(struct dnet_write_completion));
2083 if (!wc) {
2084 err = -ENOMEM;
2085 goto err_out_exit;
2087 memset(wc, 0, sizeof(struct dnet_write_completion));
2089 w = dnet_wait_alloc(0);
2090 if (!w) {
2091 err = -ENOMEM;
2092 free(wc);
2093 goto err_out_exit;
2095 wc->wait = w;
2097 w->status = -ENOENT;
2098 ctl->priv = wc;
2099 ctl->complete = dnet_write_complete;
2101 ctl->cmd = DNET_CMD_WRITE;
2102 ctl->cflags |= DNET_FLAGS_NEED_ACK;
2104 memcpy(ctl->io.id, ctl->id.id, DNET_ID_SIZE);
2106 atomic_set(&w->refcnt, INT_MAX);
2107 trans_num = dnet_write_object(s, ctl);
2108 if (trans_num < 0)
2109 trans_num = 0;
2112 * 1 - the first reference counter we grabbed at allocation time
2114 atomic_sub(&w->refcnt, INT_MAX - trans_num - 1);
2116 err = dnet_wait_event(w, w->cond == trans_num, &n->wait_ts);
2117 if (err || w->status) {
2118 if (!err)
2119 err = w->status;
2120 dnet_log(n, DNET_LOG_NOTICE, "%s: failed to wait for IO write completion, err: %d, status: %d.\n",
2121 dnet_dump_id(&ctl->id), err, w->status);
2124 if (err || !trans_num) {
2125 if (!err)
2126 err = -EINVAL;
2127 dnet_log(n, DNET_LOG_ERROR, "Failed to write data into the storage, err: %d, trans_num: %d.\n", err, trans_num);
2128 goto err_out_put;
2131 if (trans_num)
2132 dnet_log(n, DNET_LOG_NOTICE, "%s: wrote: %llu bytes, type: %d, reply size: %d.\n",
2133 dnet_dump_id(&ctl->id), (unsigned long long)ctl->io.size, ctl->io.type, wc->size);
2134 err = trans_num;
2136 *result = wc->reply;
2137 err = wc->size;
2139 wc->reply = NULL;
2141 err_out_put:
2142 dnet_write_complete_free(wc);
2143 err_out_exit:
2144 return err;
2147 int dnet_lookup_addr(struct dnet_session *s, const void *remote, int len, struct dnet_id *id, int group_id, char *dst, int dlen)
2149 struct dnet_node *n = s->node;
2150 struct dnet_id raw;
2151 struct dnet_net_state *st;
2152 int err = -ENOENT;
2154 if (!id) {
2155 dnet_transform(n, remote, len, &raw);
2156 id = &raw;
2158 id->group_id = group_id;
2160 st = dnet_state_get_first(n, id);
2161 if (!st)
2162 goto err_out_exit;
2164 dnet_server_convert_dnet_addr_raw(dnet_state_addr(st), dst, dlen);
2165 dnet_state_put(st);
2166 err = 0;
2168 err_out_exit:
2169 return err;
2172 struct dnet_weight {
2173 int weight;
2174 int group_id;
2177 static int dnet_weight_compare(const void *v1, const void *v2)
2179 const struct dnet_weight *w1 = v1;
2180 const struct dnet_weight *w2 = v2;
2182 return w2->weight - w1->weight;
2185 static int dnet_weight_get_winner(struct dnet_weight *w, int num)
2187 long sum = 0, pos;
2188 float r;
2189 int i;
2191 for (i = 0; i < num; ++i)
2192 sum += w[i].weight;
2194 r = (float)rand() / (float)RAND_MAX;
2195 pos = r * sum;
2197 for (i = 0; i < num; ++i) {
2198 pos -= w[i].weight;
2199 if (pos <= 0)
2200 return i;
2203 return num - 1;
2206 int dnet_mix_states(struct dnet_session *s, struct dnet_id *id, int **groupsp)
2208 struct dnet_node *n = s->node;
2209 struct dnet_weight *weights;
2210 int *groups;
2211 int group_num, i, num;
2212 struct dnet_net_state *st;
2214 if (!s->group_num)
2215 return -ENOENT;
2217 group_num = s->group_num;
2219 weights = alloca(s->group_num * sizeof(*weights));
2220 groups = malloc(s->group_num * sizeof(*groups));
2221 if (groups)
2222 memcpy(groups, s->groups, s->group_num * sizeof(*groups));
2224 if (!groups) {
2225 *groupsp = NULL;
2226 return -ENOMEM;
2229 if (n->flags & DNET_CFG_RANDOMIZE_STATES) {
2230 for (i = 0; i < group_num; ++i) {
2231 weights[i].weight = rand();
2232 weights[i].group_id = groups[i];
2234 num = group_num;
2235 } else {
2236 if (!(n->flags & DNET_CFG_MIX_STATES)) {
2237 *groupsp = groups;
2238 return group_num;
2241 memset(weights, 0, group_num * sizeof(*weights));
2243 for (i = 0, num = 0; i < group_num; ++i) {
2244 id->group_id = groups[i];
2246 st = dnet_state_get_first(n, id);
2247 if (st) {
2248 weights[num].weight = (int)st->weight;
2249 weights[num].group_id = id->group_id;
2251 dnet_state_put(st);
2253 num++;
2258 group_num = num;
2259 if (group_num) {
2260 qsort(weights, group_num, sizeof(struct dnet_weight), dnet_weight_compare);
2262 for (i = 0; i < group_num; ++i) {
2263 int pos = dnet_weight_get_winner(weights, group_num - i);
2264 groups[i] = weights[pos].group_id;
2266 if (pos < group_num - 1)
2267 memmove(&weights[pos], &weights[pos + 1], (group_num - 1 - pos) * sizeof(struct dnet_weight));
2271 dnet_session_set_groups(s, groups, group_num);
2273 *groupsp = groups;
2274 return group_num;
2277 int dnet_data_map(struct dnet_map_fd *map)
2279 uint64_t off;
2280 long page_size = sysconf(_SC_PAGE_SIZE);
2281 int err = 0;
2283 off = map->offset & ~(page_size - 1);
2284 map->mapped_size = ALIGN(map->size + map->offset - off, page_size);
2286 map->mapped_data = mmap(NULL, map->mapped_size, PROT_READ, MAP_SHARED, map->fd, off);
2287 if (map->mapped_data == MAP_FAILED) {
2288 err = -errno;
2289 goto err_out_exit;
2292 map->data = map->mapped_data + map->offset - off;
2294 err_out_exit:
2295 return err;
2298 void dnet_data_unmap(struct dnet_map_fd *map)
2300 munmap(map->mapped_data, map->mapped_size);
2303 struct dnet_io_attr *dnet_remove_range(struct dnet_session *s, struct dnet_io_attr *io, int group_id, uint64_t cflags, int *ret_num, int *errp)
2305 struct dnet_node *n = s->node;
2306 struct dnet_id id;
2307 struct dnet_io_attr *ret, *new_ret;
2308 struct dnet_raw_id start, next;
2309 struct dnet_raw_id end;
2310 uint64_t size = io->size;
2311 void *data;
2312 int err, need_exit = 0;
2314 memcpy(end.id, io->parent, DNET_ID_SIZE);
2316 dnet_setup_id(&id, group_id, io->id);
2317 id.type = io->type;
2319 ret = NULL;
2320 *ret_num = 0;
2321 while (!need_exit) {
2322 err = dnet_search_range(n, &id, &start, &next);
2323 if (err)
2324 goto err_out_exit;
2326 if ((dnet_id_cmp_str(id.id, next.id) > 0) ||
2327 !memcmp(start.id, next.id, DNET_ID_SIZE) ||
2328 (dnet_id_cmp_str(next.id, end.id) > 0)) {
2329 memcpy(next.id, end.id, DNET_ID_SIZE);
2330 need_exit = 1;
2333 if (n->log->log_level > DNET_LOG_NOTICE) {
2334 int len = 6;
2335 char start_id[2*len + 1];
2336 char next_id[2*len + 1];
2337 char end_id[2*len + 1];
2338 char id_str[2*len + 1];
2340 dnet_log(n, DNET_LOG_NOTICE, "id: %s, start: %s: next: %s, end: %s, size: %llu, cmp: %d\n",
2341 dnet_dump_id_len_raw(id.id, len, id_str),
2342 dnet_dump_id_len_raw(start.id, len, start_id),
2343 dnet_dump_id_len_raw(next.id, len, next_id),
2344 dnet_dump_id_len_raw(end.id, len, end_id),
2345 (unsigned long long)size, dnet_id_cmp_str(next.id, end.id));
2348 memcpy(io->id, id.id, DNET_ID_SIZE);
2349 memcpy(io->parent, next.id, DNET_ID_SIZE);
2351 io->size = size;
2353 data = dnet_read_data_wait_raw(s, &id, io, DNET_CMD_DEL_RANGE, cflags, &err);
2354 if (io->size != sizeof(struct dnet_io_attr)) {
2355 err = -ENOENT;
2356 goto err_out_exit;
2359 if (data) {
2360 struct dnet_io_attr *rep = (struct dnet_io_attr*)data;
2362 dnet_convert_io_attr(rep);
2364 dnet_log(n, DNET_LOG_NOTICE, "%s: rep_num: %llu, io_start: %llu, io_num: %llu, io_size: %llu\n",
2365 dnet_dump_id(&id), (unsigned long long)rep->num, (unsigned long long)io->start,
2366 (unsigned long long)io->num, (unsigned long long)io->size);
2368 (*ret_num)++;
2370 new_ret = realloc(ret, *ret_num * sizeof(struct dnet_io_attr));
2371 if (!new_ret) {
2372 err = -ENOMEM;
2373 goto err_out_exit;
2376 ret = new_ret;
2377 ret[*ret_num - 1] = *rep;
2379 free(data);
2382 memcpy(id.id, next.id, DNET_ID_SIZE);
2385 err_out_exit:
2386 *errp = err;
2388 return ret;
2391 struct dnet_range_data *dnet_read_range(struct dnet_session *s, struct dnet_io_attr *io, int group_id, uint64_t cflags, int *errp)
2393 struct dnet_node *n = s->node;
2394 struct dnet_id id;
2395 int ret_num;
2396 struct dnet_range_data *ret;
2397 struct dnet_raw_id start, next;
2398 struct dnet_raw_id end;
2399 uint64_t size = io->size;
2400 void *data;
2401 int err, need_exit = 0;
2403 memcpy(end.id, io->parent, DNET_ID_SIZE);
2405 dnet_setup_id(&id, group_id, io->id);
2406 id.type = io->type;
2408 ret = NULL;
2409 ret_num = 0;
2410 while (!need_exit) {
2411 err = dnet_search_range(n, &id, &start, &next);
2412 if (err)
2413 goto err_out_exit;
2415 if ((dnet_id_cmp_str(id.id, next.id) > 0) ||
2416 !memcmp(start.id, next.id, DNET_ID_SIZE) ||
2417 (dnet_id_cmp_str(next.id, end.id) > 0)) {
2418 memcpy(next.id, end.id, DNET_ID_SIZE);
2419 need_exit = 1;
2422 if (n->log->log_level > DNET_LOG_NOTICE) {
2423 int len = 6;
2424 char start_id[2*len + 1];
2425 char next_id[2*len + 1];
2426 char end_id[2*len + 1];
2427 char id_str[2*len + 1];
2429 dnet_log(n, DNET_LOG_NOTICE, "id: %s, start: %s: next: %s, end: %s, size: %llu, cmp: %d\n",
2430 dnet_dump_id_len_raw(id.id, len, id_str),
2431 dnet_dump_id_len_raw(start.id, len, start_id),
2432 dnet_dump_id_len_raw(next.id, len, next_id),
2433 dnet_dump_id_len_raw(end.id, len, end_id),
2434 (unsigned long long)size, dnet_id_cmp_str(next.id, end.id));
2437 memcpy(io->id, id.id, DNET_ID_SIZE);
2438 memcpy(io->parent, next.id, DNET_ID_SIZE);
2440 io->size = size;
2442 data = dnet_read_data_wait_raw(s, &id, io, DNET_CMD_READ_RANGE, cflags, &err);
2443 if (data) {
2444 struct dnet_io_attr *rep = data + io->size - sizeof(struct dnet_io_attr);
2446 /* If DNET_IO_FLAGS_NODATA is set do not decrement size as 'rep' is the only structure in output */
2447 if (!(io->flags & DNET_IO_FLAGS_NODATA))
2448 io->size -= sizeof(struct dnet_io_attr);
2449 dnet_convert_io_attr(rep);
2451 dnet_log(n, DNET_LOG_NOTICE, "%s: rep_num: %llu, io_start: %llu, io_num: %llu, io_size: %llu\n",
2452 dnet_dump_id(&id), (unsigned long long)rep->num, (unsigned long long)io->start,
2453 (unsigned long long)io->num, (unsigned long long)io->size);
2455 if (io->start < rep->num) {
2456 rep->num -= io->start;
2457 io->start = 0;
2458 io->num -= rep->num;
2460 if (!io->size && !(io->flags & DNET_IO_FLAGS_NODATA)) {
2461 free(data);
2462 } else {
2463 struct dnet_range_data *new_ret;
2465 ret_num++;
2467 new_ret = realloc(ret, ret_num * sizeof(struct dnet_range_data));
2468 if (!new_ret) {
2469 goto err_out_exit;
2472 ret = new_ret;
2474 ret[ret_num - 1].data = data;
2475 ret[ret_num - 1].size = io->size;
2478 err = 0;
2479 if (!io->num)
2480 break;
2481 } else {
2482 io->start -= rep->num;
2486 memcpy(id.id, next.id, DNET_ID_SIZE);
2489 err_out_exit:
2490 if (ret) {
2491 *errp = ret_num;
2492 } else {
2493 *errp = err;
2495 return ret;
2498 struct dnet_read_latest_id {
2499 struct dnet_id id;
2500 struct dnet_file_info fi;
2503 struct dnet_read_latest_ctl {
2504 struct dnet_wait *w;
2505 int num, pos;
2506 pthread_mutex_t lock;
2508 struct dnet_read_latest_id ids[0];
2511 static void dnet_read_latest_ctl_put(struct dnet_read_latest_ctl *ctl)
2513 dnet_wakeup(ctl->w, ctl->w->cond++);
2514 if (atomic_dec_and_test(&ctl->w->refcnt)) {
2515 dnet_wait_destroy(ctl->w);
2516 pthread_mutex_destroy(&ctl->lock);
2517 free(ctl);
2521 static int dnet_read_latest_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
2523 struct dnet_read_latest_ctl *ctl = priv;
2524 struct dnet_node *n;
2525 struct dnet_addr_attr *a;
2526 struct dnet_file_info *fi;
2527 int pos, err;
2529 if (is_trans_destroyed(st, cmd)) {
2530 dnet_read_latest_ctl_put(ctl);
2531 return 0;
2534 n = st->n;
2536 err = cmd->status;
2537 if (err || !cmd->size)
2538 goto err_out_exit;
2540 if (cmd->size < sizeof(struct dnet_addr_attr) + sizeof(struct dnet_file_info)) {
2541 dnet_log(n, DNET_LOG_ERROR, "%s: wrong dnet_addr attribute size %llu, must be at least %zu.\n",
2542 dnet_dump_id(&cmd->id), (unsigned long long)cmd->size,
2543 sizeof(struct dnet_addr_attr) + sizeof(struct dnet_file_info));
2544 err = -EINVAL;
2545 goto err_out_exit;
2547 a = (struct dnet_addr_attr *)(cmd + 1);
2548 fi = (struct dnet_file_info *)(a + 1);
2550 dnet_convert_addr_attr(a);
2551 dnet_convert_file_info(fi);
2553 pthread_mutex_lock(&ctl->lock);
2554 pos = ctl->pos++;
2555 pthread_mutex_unlock(&ctl->lock);
2557 /* we do not care about filename */
2558 memcpy(&ctl->ids[pos].fi, fi, sizeof(struct dnet_file_info));
2559 memcpy(&ctl->ids[pos].id, &cmd->id, sizeof(struct dnet_id));
2561 err_out_exit:
2562 return err;
2565 static int dnet_file_read_latest_cmp(const void *p1, const void *p2)
2567 const struct dnet_read_latest_id *id1 = p1;
2568 const struct dnet_read_latest_id *id2 = p2;
2570 int ret = (int)(id2->fi.mtime.tsec - id1->fi.mtime.tsec);
2572 if (!ret)
2573 ret = (int)(id2->fi.mtime.tnsec - id1->fi.mtime.tnsec);
2575 return ret;
2578 int dnet_read_latest_prepare(struct dnet_read_latest_prepare *pr)
2580 struct dnet_read_latest_ctl *ctl;
2581 int group_id = pr->id.group_id;
2582 int err, i;
2584 ctl = malloc(sizeof(struct dnet_read_latest_ctl) + sizeof(struct dnet_read_latest_id) * pr->group_num);
2585 if (!ctl) {
2586 err = -ENOMEM;
2587 goto err_out_exit;
2589 memset(ctl, 0, sizeof(struct dnet_read_latest_ctl));
2591 ctl->w = dnet_wait_alloc(0);
2592 if (!ctl->w) {
2593 err = -ENOMEM;
2594 goto err_out_free;
2597 err = pthread_mutex_init(&ctl->lock, NULL);
2598 if (err)
2599 goto err_out_put_wait;
2601 ctl->num = pr->group_num;
2602 ctl->pos = 0;
2604 for (i = 0; i < pr->group_num; ++i) {
2605 pr->id.group_id = pr->group[i];
2607 dnet_wait_get(ctl->w);
2608 dnet_lookup_object(pr->s, &pr->id, DNET_ATTR_META_TIMES | pr->cflags, dnet_read_latest_complete, ctl);
2611 err = dnet_wait_event(ctl->w, ctl->w->cond == pr->group_num, &pr->s->node->wait_ts);
2612 if (err)
2613 goto err_out_put;
2615 if (ctl->pos == 0)
2616 goto err_out_put;
2618 pr->group_num = ctl->pos;
2620 qsort(ctl->ids, pr->group_num, sizeof(struct dnet_read_latest_id), dnet_file_read_latest_cmp);
2622 for (i = 0; i < pr->group_num; ++i) {
2623 pr->group[i] = ctl->ids[i].id.group_id;
2625 if (group_id == pr->group[i]) {
2626 const struct dnet_read_latest_id *id0 = &ctl->ids[0];
2627 const struct dnet_read_latest_id *id1 = &ctl->ids[i];
2629 if (!dnet_file_read_latest_cmp(id0, id1)) {
2630 int tmp_group = pr->group[0];
2631 pr->group[0] = pr->group[i];
2632 pr->group[i] = tmp_group;
2637 err_out_put:
2638 dnet_read_latest_ctl_put(ctl);
2639 goto err_out_exit;
2641 err_out_put_wait:
2642 dnet_wait_put(ctl->w);
2643 err_out_free:
2644 free(ctl);
2645 err_out_exit:
2646 return err;
2649 int dnet_read_latest(struct dnet_session *s, struct dnet_id *id, struct dnet_io_attr *io, uint64_t cflags, void **datap)
2651 struct dnet_read_latest_prepare pr;
2652 int *g, num, err, i;
2654 if ((int)io->num > s->group_num) {
2655 err = -E2BIG;
2656 goto err_out_exit;
2659 err = dnet_mix_states(s, id, &g);
2660 if (err < 0)
2661 goto err_out_exit;
2663 num = err;
2665 if ((int)io->num > num) {
2666 err = -E2BIG;
2667 goto err_out_free;
2670 memset(&pr, 0, sizeof(struct dnet_read_latest_prepare));
2672 pr.s = s;
2673 pr.id = *id;
2674 pr.group = g;
2675 pr.group_num = num;
2676 pr.cflags = cflags;
2678 err = dnet_read_latest_prepare(&pr);
2679 if (err)
2680 goto err_out_free;
2682 err = -ENODATA;
2683 for (i = 0; i < pr.group_num; ++i) {
2684 void *data;
2686 id->group_id = pr.group[i];
2687 data = dnet_read_data_wait_raw(s, id, io, DNET_CMD_READ, cflags, &err);
2688 if (data) {
2689 if ((pr.group_num != num) || ((i != 0) && (io->type == 0) && (io->offset == 0))) {
2690 dnet_read_recover(s, id, io, data, cflags);
2693 *datap = data;
2694 err = 0;
2695 break;
2699 err_out_free:
2700 free(g);
2701 err_out_exit:
2702 return err;
2705 int dnet_get_routes(struct dnet_session *s, struct dnet_id **ids, struct dnet_addr **addrs) {
2707 struct dnet_node *n = s->node;
2708 struct dnet_net_state *st;
2709 struct dnet_group *g;
2710 struct dnet_addr *tmp_addrs;
2711 struct dnet_id *tmp_ids;
2712 int size = 0, count = 0;
2713 int i;
2715 *ids = NULL;
2716 *addrs = NULL;
2718 pthread_mutex_lock(&n->state_lock);
2719 list_for_each_entry(g, &n->group_list, group_entry) {
2720 list_for_each_entry(st, &g->state_list, state_entry) {
2722 size += st->idc->id_num;
2724 tmp_ids = (struct dnet_id *)realloc(*ids, size * sizeof(struct dnet_id));
2725 if (!tmp_ids) {
2726 count = -ENOMEM;
2727 goto err_out_free;
2729 *ids = tmp_ids;
2731 tmp_addrs = (struct dnet_addr *)realloc(*addrs, size * sizeof(struct dnet_addr));
2732 if (!tmp_addrs) {
2733 count = -ENOMEM;
2734 goto err_out_free;
2736 *addrs = tmp_addrs;
2738 for (i = 0; i < st->idc->id_num; ++i) {
2739 dnet_setup_id(&(*ids)[count], g->group_id, st->idc->ids[i].raw.id);
2740 memcpy(&(*addrs)[count], dnet_state_addr(st), sizeof(struct dnet_addr));
2741 count++;
2745 pthread_mutex_unlock(&n->state_lock);
2747 return count;
2749 err_out_free:
2750 if (ids)
2751 free(*ids);
2752 if (addrs)
2753 free(*addrs);
2755 return count;
2759 void *dnet_bulk_read_wait_raw(struct dnet_session *s, struct dnet_id *id, struct dnet_io_attr *ios,
2760 uint32_t io_num, int cmd, uint64_t cflags, int *errp)
2762 struct dnet_node *n = s->node;
2763 struct dnet_io_control ctl;
2764 struct dnet_io_attr io;
2765 struct dnet_wait *w;
2766 struct dnet_read_data_completion *c;
2767 void *data = NULL;
2768 int err;
2770 w = dnet_wait_alloc(0);
2771 if (!w) {
2772 err = -ENOMEM;
2773 goto err_out_exit;
2776 c = malloc(sizeof(*c));
2777 if (!c) {
2778 err = -ENOMEM;
2779 goto err_out_put;
2782 c->w = w;
2783 c->size = 0;
2784 c->data = NULL;
2785 /* one for completion callback, another for this function */
2786 atomic_init(&c->refcnt, 2);
2788 memset(&ctl, 0, sizeof(struct dnet_io_control));
2790 ctl.fd = -1;
2792 ctl.priv = c;
2793 ctl.complete = dnet_read_data_complete;
2795 ctl.cmd = cmd;
2796 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
2798 memcpy(&ctl.id, id, sizeof(struct dnet_id));
2799 memset(&ctl.io, 0, sizeof(struct dnet_io_attr));
2801 memcpy(io.id, id->id, DNET_ID_SIZE);
2802 memcpy(io.parent, id->id, DNET_ID_SIZE);
2804 ctl.io.size = io_num * sizeof(struct dnet_io_attr);
2805 ctl.data = ios;
2807 dnet_wait_get(w);
2808 err = dnet_read_object(s, &ctl);
2809 if (err)
2810 goto err_out_put_complete;
2812 err = dnet_wait_event(w, w->cond, &n->wait_ts);
2813 if (err || w->status) {
2814 char id_str[2*DNET_ID_SIZE + 1];
2815 if (!err)
2816 err = w->status;
2817 if ((cmd != DNET_CMD_READ_RANGE) || (err != -ENOENT))
2818 dnet_log(n, DNET_LOG_ERROR, "%d:%s : failed to read data: %d\n",
2819 ctl.id.group_id, dnet_dump_id_len_raw(ctl.id.id, DNET_ID_SIZE, id_str), err);
2820 goto err_out_put_complete;
2822 err = c->size;
2823 data = c->data;
2825 err_out_put_complete:
2826 if (atomic_dec_and_test(&c->refcnt))
2827 free(c);
2828 err_out_put:
2829 dnet_wait_put(w);
2830 err_out_exit:
2831 *errp = err;
2832 return data;
2836 static int dnet_io_attr_cmp(const void *d1, const void *d2)
2838 const struct dnet_io_attr *io1 = d1;
2839 const struct dnet_io_attr *io2 = d2;
2841 return memcmp(io1->id, io2->id, DNET_ID_SIZE);
2844 struct dnet_range_data *dnet_bulk_read(struct dnet_session *s, struct dnet_io_attr *ios, uint32_t io_num, int group_id, uint64_t cflags, int *errp)
2846 struct dnet_node *n = s->node;
2847 struct dnet_id id, next_id;
2848 int ret_num;
2849 struct dnet_range_data *ret;
2850 struct dnet_net_state *cur, *next = NULL;
2851 uint64_t size = 0;
2852 void *data;
2853 int err;
2854 uint32_t i, start = -1;
2856 if (io_num <= 0) {
2857 return 0;
2860 qsort(ios, io_num, sizeof(struct dnet_io_attr), dnet_io_attr_cmp);
2862 ret = NULL;
2863 ret_num = 0;
2864 size = 0;
2866 dnet_setup_id(&id, group_id, ios[0].id);
2867 id.type = ios[0].type;
2869 cur = dnet_state_get_first(n, &id);
2870 if (!cur) {
2871 dnet_log(n, DNET_LOG_ERROR, "%s: Can't get state for id\n", dnet_dump_id(&id));
2872 err = -ENOENT;
2873 goto err_out_exit;
2876 for (i = 0; i < io_num; ++i) {
2877 if ((i + 1) < io_num) {
2878 dnet_setup_id(&next_id, group_id, ios[i+1].id);
2879 next_id.type = ios[i+1].type;
2881 next = dnet_state_get_first(n, &next_id);
2882 if (!next) {
2883 dnet_log(n, DNET_LOG_ERROR, "%s: Can't get state for id\n", dnet_dump_id(&next_id));
2884 err = -ENOENT;
2885 goto err_out_put;
2888 /* Send command only if state changes or it's a last id */
2889 if ((cur == next)) {
2890 dnet_state_put(next);
2891 next = NULL;
2892 continue;
2896 dnet_log(n, DNET_LOG_NOTICE, "start: %s: end: %s, count: %llu, addr: %s\n",
2897 dnet_dump_id(&id),
2898 dnet_dump_id(&next_id),
2899 (unsigned long long)(i - start),
2900 dnet_state_dump_addr(cur));
2902 data = dnet_bulk_read_wait_raw(s, &id, ios, i - start, DNET_CMD_BULK_READ, cflags, &err);
2903 if (data) {
2904 size = err;
2905 err = 0;
2907 if (!size) {
2908 free(data);
2909 } else {
2910 struct dnet_range_data *new_ret;
2912 ret_num++;
2914 new_ret = realloc(ret, ret_num * sizeof(struct dnet_range_data));
2915 if (!new_ret) {
2916 goto err_out_put;
2919 ret = new_ret;
2921 ret[ret_num - 1].data = data;
2922 ret[ret_num - 1].size = size;
2925 err = 0;
2928 dnet_state_put(cur);
2929 cur = next;
2930 next = NULL;
2931 memcpy(&id, &next_id, sizeof(struct dnet_id));
2934 err_out_put:
2935 if (next)
2936 dnet_state_put(next);
2937 dnet_state_put(cur);
2938 err_out_exit:
2939 if (ret) {
2940 *errp = ret_num;
2941 } else {
2942 *errp = err;
2944 return ret;
2947 struct dnet_range_data dnet_bulk_write(struct dnet_session *s, struct dnet_io_control *ctl, int ctl_num, int *errp)
2949 struct dnet_node *n = s->node;
2950 int err, i, trans_num = 0, local_trans_num;
2951 struct dnet_wait *w;
2952 struct dnet_write_completion *wc;
2953 struct dnet_range_data ret;
2954 struct dnet_metadata_control mcl;
2955 struct dnet_meta_container mc;
2956 struct dnet_io_control meta_ctl;
2957 struct timeval tv;
2958 int *groups = NULL;
2959 int group_num = 0;
2961 memset(&ret, 0, sizeof(ret));
2963 wc = malloc(sizeof(struct dnet_write_completion));
2964 if (!wc) {
2965 err = -ENOMEM;
2966 goto err_out_exit;
2968 memset(wc, 0, sizeof(struct dnet_write_completion));
2970 w = dnet_wait_alloc(0);
2971 if (!w) {
2972 err = -ENOMEM;
2973 free(wc);
2974 goto err_out_exit;
2976 wc->wait = w;
2978 atomic_set(&w->refcnt, INT_MAX);
2979 w->status = -ENOENT;
2981 for (i = 0; i < ctl_num; ++i) {
2982 ctl[i].priv = wc;
2983 ctl[i].complete = dnet_write_complete;
2985 ctl[i].cmd = DNET_CMD_WRITE;
2986 ctl[i].cflags = DNET_FLAGS_NEED_ACK;
2988 memcpy(ctl[i].io.id, ctl[i].id.id, DNET_ID_SIZE);
2989 memcpy(ctl[i].io.parent, ctl[i].id.id, DNET_ID_SIZE);
2991 local_trans_num = dnet_write_object(s, &ctl[i]);
2992 if (local_trans_num < 0)
2993 local_trans_num = 0;
2995 trans_num += local_trans_num;
2997 /* Prepare and send metadata */
2998 memset(&mcl, 0, sizeof(mcl));
3000 group_num = s->group_num;
3001 groups = alloca(group_num * sizeof(int));
3002 memcpy(groups, s->groups, group_num * sizeof(int));
3004 mcl.groups = groups;
3005 mcl.group_num = group_num;
3006 mcl.id = ctl[i].id;
3007 mcl.cflags = ctl[i].cflags;
3009 gettimeofday(&tv, NULL);
3010 mcl.ts.tv_sec = tv.tv_sec;
3011 mcl.ts.tv_nsec = tv.tv_usec * 1000;
3013 memset(&mc, 0, sizeof(mc));
3015 err = dnet_create_metadata(s, &mcl, &mc);
3016 dnet_log(n, DNET_LOG_DEBUG, "Creating metadata: err: %d", err);
3017 if (!err) {
3018 dnet_convert_metadata(n, mc.data, mc.size);
3020 memset(&meta_ctl, 0, sizeof(struct dnet_io_control));
3022 meta_ctl.priv = wc;
3023 meta_ctl.complete = dnet_write_complete;
3024 meta_ctl.cmd = DNET_CMD_WRITE;
3025 meta_ctl.fd = -1;
3027 meta_ctl.cflags = ctl[i].cflags;
3029 memcpy(&meta_ctl.id, &ctl[i].id, sizeof(struct dnet_id));
3030 memcpy(meta_ctl.io.id, ctl[i].id.id, DNET_ID_SIZE);
3031 memcpy(meta_ctl.io.parent, ctl[i].id.id, DNET_ID_SIZE);
3032 meta_ctl.id.type = meta_ctl.io.type = EBLOB_TYPE_META;
3034 meta_ctl.io.flags |= DNET_IO_FLAGS_META;
3035 meta_ctl.io.offset = 0;
3036 meta_ctl.io.size = mc.size;
3037 meta_ctl.data = mc.data;
3039 local_trans_num = dnet_write_object(s, &meta_ctl);
3040 if (local_trans_num < 0)
3041 local_trans_num = 0;
3043 trans_num += local_trans_num;
3048 * 1 - the first reference counter we grabbed at allocation time
3050 atomic_sub(&w->refcnt, INT_MAX - trans_num - 1);
3052 err = dnet_wait_event(w, w->cond == trans_num, &n->wait_ts);
3053 if (err || w->status) {
3054 if (!err)
3055 err = w->status;
3056 dnet_log(n, DNET_LOG_NOTICE, "%s: failed to wait for IO write completion, err: %d, status: %d.\n",
3057 dnet_dump_id(&ctl->id), err, w->status);
3060 if (err || !trans_num) {
3061 if (!err)
3062 err = -EINVAL;
3063 dnet_log(n, DNET_LOG_ERROR, "Failed to write data into the storage, err: %d, trans_num: %d.\n", err, trans_num);
3064 goto err_out_put;
3067 if (trans_num)
3068 dnet_log(n, DNET_LOG_NOTICE, "%s: successfully wrote %llu bytes into the storage, reply size: %d.\n",
3069 dnet_dump_id(&ctl->id), (unsigned long long)ctl->io.size, wc->size);
3070 err = trans_num;
3072 ret.data = wc->reply;
3073 ret.size = wc->size;
3075 wc->reply = NULL;
3077 err_out_put:
3078 dnet_write_complete_free(wc);
3079 err_out_exit:
3080 *errp = err;
3081 return ret;
3084 int dnet_flags(struct dnet_node *n)
3086 return n->flags;
3089 static int dnet_start_defrag_complete(struct dnet_net_state *state, struct dnet_cmd *cmd, void *priv)
3091 struct dnet_wait *w = priv;
3093 if (is_trans_destroyed(state, cmd)) {
3094 dnet_wakeup(w, w->cond++);
3095 dnet_wait_put(w);
3096 return 0;
3099 return 0;
3102 static int dnet_start_defrag_single(struct dnet_net_state *st, void *priv, uint64_t cflags)
3104 struct dnet_trans_control ctl;
3106 memset(&ctl, 0, sizeof(struct dnet_trans_control));
3108 dnet_setup_id(&ctl.id, st->idc->group->group_id, st->idc->ids[0].raw.id);
3109 ctl.cmd = DNET_CMD_DEFRAG;
3110 ctl.complete = dnet_start_defrag_complete;
3111 ctl.priv = priv;
3112 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
3114 return dnet_trans_alloc_send_state(st, &ctl);
3117 int dnet_start_defrag(struct dnet_session *s, uint64_t cflags)
3119 struct dnet_node *n = s->node;
3120 struct dnet_net_state *st;
3121 struct dnet_wait *w;
3122 struct dnet_group *g;
3123 int num = 0;
3124 int err;
3126 w = dnet_wait_alloc(0);
3127 if (!w) {
3128 err = -ENOMEM;
3129 goto err_out_exit;
3132 pthread_mutex_lock(&n->state_lock);
3133 list_for_each_entry(g, &n->group_list, group_entry) {
3134 list_for_each_entry(st, &g->state_list, state_entry) {
3135 if (st == n->st)
3136 continue;
3138 if (w)
3139 dnet_wait_get(w);
3141 dnet_start_defrag_single(st, w, cflags);
3142 num++;
3145 pthread_mutex_unlock(&n->state_lock);
3147 err = dnet_wait_event(w, w->cond == num, &n->wait_ts);
3148 dnet_wait_put(w);
3150 err_out_exit:
3151 return err;