dnet_remove_object_raw() must return positive number of transactions sent
[elliptics.git] / library / dnet_common.c
blobb7afd76822bf3568a280df3e205173bcd92ade4e
1 /*
2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
17 #include <sys/stat.h>
18 #include <sys/socket.h>
19 #include <sys/mman.h>
20 #include <sys/wait.h>
22 #include <ctype.h>
23 #include <fcntl.h>
24 #include <limits.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <unistd.h>
29 #include "elliptics.h"
31 #include "elliptics/packet.h"
32 #include "elliptics/interface.h"
35 int dnet_transform(struct dnet_node *n, const void *src, uint64_t size, struct dnet_id *id)
37 struct dnet_transform *t = &n->transform;
38 unsigned int csize = sizeof(id->id);
40 return t->transform(t->priv, src, size, id->id, &csize, 0);
44 static char *dnet_cmd_strings[] = {
45 [DNET_CMD_LOOKUP] = "LOOKUP",
46 [DNET_CMD_REVERSE_LOOKUP] = "REVERSE_LOOKUP",
47 [DNET_CMD_JOIN] = "JOIN",
48 [DNET_CMD_WRITE] = "WRITE",
49 [DNET_CMD_READ] = "READ",
50 [DNET_CMD_LIST] = "CHECK",
51 [DNET_CMD_EXEC] = "EXEC",
52 [DNET_CMD_ROUTE_LIST] = "ROUTE_LIST",
53 [DNET_CMD_STAT] = "STAT",
54 [DNET_CMD_NOTIFY] = "NOTIFY",
55 [DNET_CMD_DEL] = "REMOVE",
56 [DNET_CMD_STAT_COUNT] = "STAT_COUNT",
57 [DNET_CMD_STATUS] = "STATUS",
58 [DNET_CMD_READ_RANGE] = "READ_RANGE",
59 [DNET_CMD_DEL_RANGE] = "DEL_RANGE",
60 [DNET_CMD_AUTH] = "AUTH",
61 [DNET_CMD_BULK_READ] = "BULK_READ",
62 [DNET_CMD_DEFRAG] = "DEFRAG",
63 [DNET_CMD_UNKNOWN] = "UNKNOWN",
66 static char *dnet_counter_strings[] = {
67 [DNET_CNTR_LA1] = "DNET_CNTR_LA1",
68 [DNET_CNTR_LA5] = "DNET_CNTR_LA5",
69 [DNET_CNTR_LA15] = "DNET_CNTR_LA15",
70 [DNET_CNTR_BSIZE] = "DNET_CNTR_BSIZE",
71 [DNET_CNTR_FRSIZE] = "DNET_CNTR_FRSIZE",
72 [DNET_CNTR_BLOCKS] = "DNET_CNTR_BLOCKS",
73 [DNET_CNTR_BFREE] = "DNET_CNTR_BFREE",
74 [DNET_CNTR_BAVAIL] = "DNET_CNTR_BAVAIL",
75 [DNET_CNTR_FILES] = "DNET_CNTR_FILES",
76 [DNET_CNTR_FFREE] = "DNET_CNTR_FFREE",
77 [DNET_CNTR_FAVAIL] = "DNET_CNTR_FAVAIL",
78 [DNET_CNTR_FSID] = "DNET_CNTR_FSID",
79 [DNET_CNTR_VM_ACTIVE] = "DNET_CNTR_VM_ACTIVE",
80 [DNET_CNTR_VM_INACTIVE] = "DNET_CNTR_VM_INACTIVE",
81 [DNET_CNTR_VM_TOTAL] = "DNET_CNTR_VM_TOTAL",
82 [DNET_CNTR_VM_FREE] = "DNET_CNTR_VM_FREE",
83 [DNET_CNTR_VM_CACHED] = "DNET_CNTR_VM_CACHED",
84 [DNET_CNTR_VM_BUFFERS] = "DNET_CNTR_VM_BUFFERS",
85 [DNET_CNTR_NODE_FILES] = "DNET_CNTR_NODE_FILES",
86 [DNET_CNTR_NODE_LAST_MERGE] = "DNET_CNTR_NODE_LAST_MERGE",
87 [DNET_CNTR_NODE_CHECK_COPY] = "DNET_CNTR_NODE_CHECK_COPY",
88 [DNET_CNTR_DBR_NOREC] = "DNET_CNTR_DBR_NOREC",
89 [DNET_CNTR_DBR_SYSTEM] = "DNET_CNTR_DBR_SYSTEM",
90 [DNET_CNTR_DBR_ERROR] = "DNET_CNTR_DBR_ERROR",
91 [DNET_CNTR_DBW_SYSTEM] = "DNET_CNTR_DBW_SYSTEM",
92 [DNET_CNTR_DBW_ERROR] = "DNET_CNTR_DBW_ERROR",
93 [DNET_CNTR_UNKNOWN] = "UNKNOWN",
96 char *dnet_cmd_string(int cmd)
98 if (cmd <= 0 || cmd >= __DNET_CMD_MAX || cmd >= DNET_CMD_UNKNOWN)
99 cmd = DNET_CMD_UNKNOWN;
101 return dnet_cmd_strings[cmd];
104 char *dnet_counter_string(int cntr, int cmd_num)
106 if (cntr <= 0 || cntr >= __DNET_CNTR_MAX || cntr >= DNET_CNTR_UNKNOWN)
107 cntr = DNET_CNTR_UNKNOWN;
109 if (cntr < cmd_num)
110 return dnet_cmd_string(cntr);
112 if (cntr >= cmd_num && cntr < (cmd_num * 2))
113 return dnet_cmd_string(cntr - cmd_num);
115 cntr += DNET_CNTR_LA1 - cmd_num * 2;
116 return dnet_counter_strings[cntr];
119 static int dnet_add_received_state(struct dnet_node *n, struct dnet_addr_attr *a,
120 int group_id, struct dnet_raw_id *ids, int id_num)
122 int s, err = 0;
123 struct dnet_net_state *nst;
124 struct dnet_id raw;
125 int join;
127 dnet_setup_id(&raw, group_id, ids[0].id);
129 nst = dnet_state_search_by_addr(n, &a->addr);
130 if (nst) {
131 err = -EEXIST;
132 dnet_state_put(nst);
133 goto err_out_exit;
136 s = dnet_socket_create_addr(n, a->sock_type, a->proto, a->family,
137 (struct sockaddr *)&a->addr.addr, a->addr.addr_len, 0);
138 if (s < 0) {
139 err = s;
140 goto err_out_exit;
143 join = DNET_WANT_RECONNECT;
144 if (n->flags & DNET_CFG_JOIN_NETWORK)
145 join = DNET_JOIN;
147 nst = dnet_state_create(n, group_id, ids, id_num, &a->addr, s, &err, join, dnet_state_net_process);
148 if (!nst)
149 goto err_out_close;
151 dnet_log(n, DNET_LOG_NOTICE, "%d: added received state %s.\n",
152 group_id, dnet_state_dump_addr(nst));
154 return 0;
156 err_out_close:
157 dnet_sock_close(s);
158 err_out_exit:
159 return err;
162 static int dnet_process_addr_attr(struct dnet_net_state *st, struct dnet_addr_attr *a, int group_id, int num)
164 struct dnet_node *n = st->n;
165 struct dnet_raw_id *ids;
166 int i, err;
168 ids = (struct dnet_raw_id *)(a + 1);
169 for (i=0; i<num; ++i)
170 dnet_convert_raw_id(&ids[0]);
172 err = dnet_add_received_state(n, a, group_id, ids, num);
173 dnet_log(n, DNET_LOG_DEBUG, "%s: route list: %d entries: %d.\n", dnet_server_convert_dnet_addr(&a->addr), num, err);
175 return err;
178 static int dnet_recv_route_list_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
180 struct dnet_wait *w = priv;
181 struct dnet_addr_attr *a;
182 long size;
183 int err, num;
185 if (is_trans_destroyed(st, cmd)) {
186 err = -EINVAL;
187 if (cmd)
188 err = cmd->status;
190 w->status = err;
191 dnet_wakeup(w, w->cond = 1);
192 dnet_wait_put(w);
193 goto err_out_exit;
197 err = cmd->status;
198 if (!cmd->size || err)
199 goto err_out_exit;
201 size = cmd->size + sizeof(struct dnet_cmd);
202 if (size < (signed)sizeof(struct dnet_addr_cmd)) {
203 err = -EINVAL;
204 goto err_out_exit;
207 num = (cmd->size - sizeof(struct dnet_addr_attr)) / sizeof(struct dnet_raw_id);
208 if (!num) {
209 err = -EINVAL;
210 goto err_out_exit;
213 a = (struct dnet_addr_attr *)(cmd + 1);
214 dnet_convert_addr_attr(a);
216 err = dnet_process_addr_attr(st, a, cmd->id.group_id, num);
218 err_out_exit:
219 return err;
222 int dnet_recv_route_list(struct dnet_net_state *st)
224 struct dnet_io_req req;
225 struct dnet_node *n = st->n;
226 struct dnet_trans *t;
227 struct dnet_cmd *cmd;
228 struct dnet_wait *w;
229 int err;
231 w = dnet_wait_alloc(0);
232 if (!w) {
233 err = -ENOMEM;
234 goto err_out_exit;
237 t = dnet_trans_alloc(n, sizeof(struct dnet_cmd));
238 if (!t) {
239 err = -ENOMEM;
240 goto err_out_wait_put;
243 t->complete = dnet_recv_route_list_complete;
244 t->priv = w;
246 cmd = (struct dnet_cmd *)(t + 1);
248 cmd->flags = DNET_FLAGS_NEED_ACK | DNET_FLAGS_DIRECT | DNET_FLAGS_NOLOCK;
249 cmd->status = 0;
251 memcpy(&t->cmd, cmd, sizeof(struct dnet_cmd));
253 cmd->cmd = t->command = DNET_CMD_ROUTE_LIST;
255 t->st = dnet_state_get(st);
256 cmd->trans = t->rcv_trans = t->trans = atomic_inc(&n->trans);
258 dnet_convert_cmd(cmd);
260 dnet_log(n, DNET_LOG_DEBUG, "%s: list route request to %s.\n", dnet_dump_id(&cmd->id),
261 dnet_server_convert_dnet_addr(&st->addr));
263 memset(&req, 0, sizeof(req));
264 req.st = st;
265 req.header = cmd;
266 req.hsize = sizeof(struct dnet_cmd);
268 dnet_wait_get(w);
269 err = dnet_trans_send(t, &req);
270 if (err)
271 goto err_out_destroy;
273 err = dnet_wait_event(w, w->cond != 0, &n->wait_ts);
274 dnet_wait_put(w);
276 return 0;
278 err_out_destroy:
279 dnet_trans_put(t);
280 err_out_wait_put:
281 dnet_wait_put(w);
282 err_out_exit:
283 return err;
286 static struct dnet_net_state *dnet_add_state_socket(struct dnet_node *n, struct dnet_addr *addr, int s, int *errp, int join)
288 struct dnet_net_state *st, dummy;
289 char buf[sizeof(struct dnet_addr_cmd)];
290 struct dnet_cmd *cmd;
291 int err, num, i, size;
292 struct dnet_raw_id *ids;
294 memset(buf, 0, sizeof(buf));
296 cmd = (struct dnet_cmd *)(buf);
298 cmd->flags = DNET_FLAGS_DIRECT | DNET_FLAGS_NOLOCK;
299 cmd->cmd = DNET_CMD_REVERSE_LOOKUP;
301 dnet_convert_cmd(cmd);
303 st = &dummy;
304 memset(st, 0, sizeof(struct dnet_net_state));
306 st->write_s = st->read_s = s;
307 st->n = n;
309 err = dnet_send_nolock(st, buf, sizeof(struct dnet_cmd));
310 if (err) {
311 dnet_log(n, DNET_LOG_ERROR, "Failed to send reverse "
312 "lookup message to %s, err: %d.\n",
313 dnet_server_convert_dnet_addr(addr), err);
314 goto err_out_exit;
317 err = dnet_recv(st, buf, sizeof(buf));
318 if (err) {
319 dnet_log(n, DNET_LOG_ERROR, "Failed to receive reverse "
320 "lookup headers from %s, err: %d.\n",
321 dnet_server_convert_dnet_addr(addr), err);
322 goto err_out_exit;
325 cmd = (struct dnet_cmd *)(buf);
327 dnet_convert_addr_cmd((struct dnet_addr_cmd *)buf);
329 size = cmd->size - sizeof(struct dnet_addr_attr);
330 num = size / sizeof(struct dnet_raw_id);
332 dnet_log(n, DNET_LOG_DEBUG, "%s: waiting for %d ids\n", dnet_dump_id(&cmd->id), num);
334 ids = malloc(size);
335 if (!ids) {
336 err = -ENOMEM;
337 goto err_out_exit;
340 err = dnet_recv(st, ids, size);
341 if (err) {
342 dnet_log(n, DNET_LOG_ERROR, "Failed to receive reverse "
343 "lookup body (%llu bytes) from %s, err: %d.\n",
344 (unsigned long long)cmd->size,
345 dnet_server_convert_dnet_addr(addr), err);
346 goto err_out_exit;
349 for (i=0; i<num; ++i)
350 dnet_convert_raw_id(&ids[i]);
352 st = dnet_state_create(n, cmd->id.group_id, ids, num, addr, s, &err, join, dnet_state_net_process);
353 if (!st) {
354 /* socket is already closed */
355 s = -1;
356 goto err_out_free;
358 free(ids);
360 return st;
362 err_out_free:
363 free(ids);
364 err_out_exit:
365 *errp = err;
366 if (s >= 0)
367 dnet_sock_close(s);
368 return NULL;
371 int dnet_add_state(struct dnet_node *n, struct dnet_config *cfg)
373 int s, err, join = DNET_WANT_RECONNECT;
374 struct dnet_addr addr;
375 struct dnet_net_state *st;
377 memset(&addr, 0, sizeof(addr));
379 addr.addr_len = sizeof(addr.addr);
380 s = dnet_socket_create(n, cfg, &addr, 0);
381 if (s < 0) {
382 err = s;
383 goto err_out_reconnect;
386 if (n->flags & DNET_CFG_JOIN_NETWORK)
387 join = DNET_JOIN;
389 /* will close socket on error */
390 st = dnet_add_state_socket(n, &addr, s, &err, join);
391 if (!st)
392 goto err_out_reconnect;
394 if (!(cfg->flags & DNET_CFG_NO_ROUTE_LIST))
395 dnet_recv_route_list(st);
397 return 0;
399 err_out_reconnect:
400 /* if state is already exist, it should not be an error */
401 if (err == -EEXIST)
402 err = 0;
404 if ((err == -EADDRINUSE) || (err == -ECONNREFUSED) || (err == -ECONNRESET) ||
405 (err == -EINPROGRESS) || (err == -EAGAIN))
406 dnet_add_reconnect_state(n, &addr, join);
407 return err;
410 struct dnet_write_completion {
411 void *reply;
412 int size;
413 struct dnet_wait *wait;
416 static void dnet_write_complete_free(struct dnet_write_completion *wc)
418 if (atomic_dec_and_test(&wc->wait->refcnt)) {
419 dnet_wait_destroy(wc->wait);
420 free(wc->reply);
421 free(wc);
425 static int dnet_write_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
427 int err = -EINVAL;
428 struct dnet_write_completion *wc = priv;
429 struct dnet_wait *w = wc->wait;
431 if (is_trans_destroyed(st, cmd)) {
432 dnet_wakeup(w, w->cond++);
433 dnet_write_complete_free(wc);
434 return 0;
437 err = cmd->status;
438 if (!err && st && (cmd->size > sizeof(struct dnet_addr_attr) + sizeof(struct dnet_file_info))) {
439 int old_size = wc->size;
440 void *data;
442 wc->size += cmd->size + sizeof(struct dnet_cmd) + sizeof(struct dnet_addr);
443 wc->reply = realloc(wc->reply, wc->size);
444 if (!wc->reply) {
445 err = -ENOMEM;
446 goto err_out_exit;
449 data = wc->reply + old_size;
451 memcpy(data, &st->addr, sizeof(struct dnet_addr));
452 memcpy(data + sizeof(struct dnet_addr), cmd, sizeof(struct dnet_cmd));
453 memcpy(data + sizeof(struct dnet_addr) + sizeof(struct dnet_cmd), cmd + 1, cmd->size);
456 err_out_exit:
457 pthread_mutex_lock(&w->wait_lock);
458 if (w->status < 0)
459 w->status = err;
460 pthread_mutex_unlock(&w->wait_lock);
462 return 0;
465 static struct dnet_trans *dnet_io_trans_create(struct dnet_node *n, struct dnet_io_control *ctl, int *errp)
467 struct dnet_io_req req;
468 struct dnet_trans *t = NULL;
469 struct dnet_io_attr *io;
470 struct dnet_cmd *cmd;
471 uint64_t size = ctl->io.size;
472 uint64_t tsize = sizeof(struct dnet_io_attr) + sizeof(struct dnet_cmd);
473 int err;
475 if (ctl->cmd == DNET_CMD_READ)
476 size = 0;
478 if (ctl->fd < 0 && size < DNET_COPY_IO_SIZE)
479 tsize += size;
481 t = dnet_trans_alloc(n, tsize);
482 if (!t) {
483 err = -ENOMEM;
484 goto err_out_complete;
486 t->complete = ctl->complete;
487 t->priv = ctl->priv;
489 cmd = (struct dnet_cmd *)(t + 1);
490 io = (struct dnet_io_attr *)(cmd + 1);
492 if (ctl->fd < 0 && size < DNET_COPY_IO_SIZE) {
493 if (size) {
494 void *data = io + 1;
495 memcpy(data, ctl->data, size);
499 memcpy(&cmd->id, &ctl->id, sizeof(struct dnet_id));
500 cmd->size = sizeof(struct dnet_io_attr) + size;
501 cmd->flags = ctl->cflags;
502 cmd->status = 0;
504 cmd->cmd = t->command = ctl->cmd;
506 memcpy(io, &ctl->io, sizeof(struct dnet_io_attr));
507 memcpy(&t->cmd, cmd, sizeof(struct dnet_cmd));
509 t->st = dnet_state_get_first(n, &cmd->id);
510 if (!t->st) {
511 err = -ENOENT;
512 goto err_out_destroy;
515 cmd->trans = t->rcv_trans = t->trans = atomic_inc(&n->trans);
517 dnet_log(n, DNET_LOG_INFO, "%s: created trans: %llu, cmd: %s, cflags: %llx, size: %llu, offset: %llu, "
518 "fd: %d, local_offset: %llu -> %s weight: %f, mrt: %ld.\n",
519 dnet_dump_id(&ctl->id),
520 (unsigned long long)t->trans,
521 dnet_cmd_string(ctl->cmd), (unsigned long long)cmd->flags,
522 (unsigned long long)ctl->io.size, (unsigned long long)ctl->io.offset,
523 ctl->fd,
524 (unsigned long long)ctl->local_offset,
525 dnet_server_convert_dnet_addr(&t->st->addr), t->st->weight, t->st->median_read_time);
527 dnet_convert_cmd(cmd);
528 dnet_convert_io_attr(io);
531 memset(&req, 0, sizeof(req));
532 req.st = t->st;
533 req.header = cmd;
534 req.hsize = tsize;
536 req.fd = ctl->fd;
538 if (ctl->fd >= 0) {
539 req.local_offset = ctl->local_offset;
540 req.fsize = size;
541 } else if (size >= DNET_COPY_IO_SIZE) {
542 req.data = (void *)ctl->data;
543 req.dsize = size;
546 err = dnet_trans_send(t, &req);
547 if (err)
548 goto err_out_destroy;
550 return t;
552 err_out_complete:
553 if (ctl->complete)
554 ctl->complete(NULL, NULL, ctl->priv);
555 *errp = err;
556 return NULL;
558 err_out_destroy:
559 dnet_trans_put(t);
560 *errp = err;
561 return NULL;
564 int dnet_trans_create_send_all(struct dnet_session *s, struct dnet_io_control *ctl)
566 struct dnet_node *n = s->node;
567 int num = 0, i, err;
569 for (i=0; i<s->group_num; ++i) {
570 ctl->id.group_id = s->groups[i];
572 dnet_io_trans_create(n, ctl, &err);
573 num++;
576 if (!num) {
577 dnet_io_trans_create(n, ctl, &err);
578 num++;
581 return num;
584 int dnet_write_object(struct dnet_session *s, struct dnet_io_control *ctl)
586 return dnet_trans_create_send_all(s, ctl);
589 static int dnet_write_file_id_raw(struct dnet_session *s, const char *file, struct dnet_id *id,
590 uint64_t local_offset, uint64_t remote_offset, uint64_t size,
591 uint64_t cflags, unsigned int ioflags)
593 struct dnet_node *n = s->node;
594 int fd, err, trans_num;
595 struct stat stat;
596 struct dnet_wait *w;
597 struct dnet_io_control ctl;
598 struct dnet_write_completion *wc;
600 wc = malloc(sizeof(struct dnet_write_completion));
601 if (!wc) {
602 err = -ENOMEM;
603 goto err_out_exit;
605 memset(wc, 0, sizeof(struct dnet_write_completion));
607 w = dnet_wait_alloc(0);
608 if (!w) {
609 free(wc);
610 err = -ENOMEM;
611 dnet_log(n, DNET_LOG_ERROR, "Failed to allocate read waiting structure.\n");
612 goto err_out_exit;
615 wc->wait = w;
617 fd = open(file, O_RDONLY | O_LARGEFILE | O_CLOEXEC);
618 if (fd < 0) {
619 err = -errno;
620 dnet_log_err(n, "Failed to open to be written file '%s'", file);
621 goto err_out_put;
624 err = fstat(fd, &stat);
625 if (err) {
626 err = -errno;
627 dnet_log_err(n, "Failed to stat to be written file '%s'", file);
628 goto err_out_close;
631 if (local_offset >= (uint64_t)stat.st_size) {
632 err = 0;
633 goto err_out_close;
636 if (!size || size + local_offset >= (uint64_t)stat.st_size)
637 size = stat.st_size - local_offset;
639 memset(&ctl, 0, sizeof(struct dnet_io_control));
641 atomic_set(&w->refcnt, INT_MAX);
643 ctl.data = NULL;
644 ctl.fd = fd;
645 ctl.local_offset = local_offset;
647 w->status = -ENOENT;
648 ctl.complete = dnet_write_complete;
649 ctl.priv = wc;
651 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
652 ctl.cmd = DNET_CMD_WRITE;
654 memcpy(ctl.io.id, id->id, DNET_ID_SIZE);
655 memcpy(ctl.io.parent, id->id, DNET_ID_SIZE);
657 ctl.io.flags = ioflags;
658 ctl.io.size = size;
659 ctl.io.offset = remote_offset;
660 ctl.io.type = id->type;
662 memcpy(&ctl.id, id, sizeof(struct dnet_id));
664 trans_num = dnet_write_object(s, &ctl);
665 if (trans_num < 0)
666 trans_num = 0;
669 * 1 - the first reference counter we grabbed at allocation time
671 atomic_sub(&w->refcnt, INT_MAX - trans_num - 1);
673 err = dnet_wait_event(w, w->cond == trans_num, &n->wait_ts);
674 if (err || w->status) {
675 if (!err)
676 err = w->status;
679 if (!err && !trans_num)
680 err = -EINVAL;
682 if (err) {
683 dnet_log(n, DNET_LOG_ERROR, "Failed to write file '%s' into the storage, transactions: %d, err: %d.\n", file, trans_num, err);
684 goto err_out_close;
687 dnet_log(n, DNET_LOG_NOTICE, "Successfully wrote file: '%s' into the storage, size: %llu.\n",
688 file, (unsigned long long)size);
690 close(fd);
691 dnet_write_complete_free(wc);
693 return 0;
695 err_out_close:
696 close(fd);
697 err_out_put:
698 dnet_write_complete_free(wc);
699 err_out_exit:
700 return err;
703 int dnet_write_file_id(struct dnet_session *s, const char *file, struct dnet_id *id, uint64_t local_offset,
704 uint64_t remote_offset, uint64_t size, uint64_t cflags, unsigned int ioflags)
706 int err = dnet_write_file_id_raw(s, file, id, local_offset, remote_offset, size, cflags, ioflags);
707 if (!err && !(ioflags & DNET_IO_FLAGS_CACHE_ONLY))
708 err = dnet_create_write_metadata_strings(s, NULL, 0, id, NULL, cflags);
710 return err;
713 int dnet_write_file(struct dnet_session *s, const char *file, const void *remote, int remote_len,
714 uint64_t local_offset, uint64_t remote_offset, uint64_t size,
715 uint64_t cflags, unsigned int ioflags, int type)
717 int err;
718 struct dnet_id id;
720 dnet_transform(s->node, remote, remote_len, &id);
721 id.type = type;
723 err = dnet_write_file_id_raw(s, file, &id, local_offset, remote_offset, size, cflags, ioflags);
724 if (!err && !(ioflags & DNET_IO_FLAGS_CACHE_ONLY))
725 err = dnet_create_write_metadata_strings(s, remote, remote_len, &id, NULL, cflags);
727 return err;
730 static int dnet_read_file_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
732 int fd, err;
733 struct dnet_node *n;
734 struct dnet_io_completion *c = priv;
735 struct dnet_io_attr *io;
736 void *data;
738 if (is_trans_destroyed(st, cmd)) {
739 if (c->wait) {
740 int err = 1;
741 if (cmd && cmd->status)
742 err = cmd->status;
744 dnet_wakeup(c->wait, c->wait->cond = err);
745 dnet_wait_put(c->wait);
748 free(c);
749 return 0;
752 n = st->n;
754 if (cmd->status != 0 || cmd->size == 0) {
755 err = cmd->status;
756 goto err_out_exit_no_log;
759 if (cmd->size <= sizeof(struct dnet_io_attr)) {
760 dnet_log(n, DNET_LOG_ERROR, "%s: read completion error: wrong size: cmd_size: %llu, must be more than %zu.\n",
761 dnet_dump_id(&cmd->id), (unsigned long long)cmd->size,
762 sizeof(struct dnet_io_attr));
763 err = -EINVAL;
764 goto err_out_exit_no_log;
767 io = (struct dnet_io_attr *)(cmd + 1);
768 data = io + 1;
770 dnet_convert_io_attr(io);
772 fd = open(c->file, O_RDWR | O_CREAT | O_CLOEXEC, 0644);
773 if (fd < 0) {
774 err = -errno;
775 dnet_log_err(n, "%s: failed to open read completion file '%s'", dnet_dump_id(&cmd->id), c->file);
776 goto err_out_exit;
779 err = pwrite(fd, data, io->size, c->offset);
780 if (err <= 0) {
781 err = -errno;
782 dnet_log_err(n, "%s: failed to write data into completion file '%s'", dnet_dump_id(&cmd->id), c->file);
783 goto err_out_close;
786 close(fd);
787 dnet_log(n, DNET_LOG_NOTICE, "%s: read completed: file: '%s', offset: %llu, size: %llu, status: %d.\n",
788 dnet_dump_id(&cmd->id), c->file, (unsigned long long)c->offset,
789 (unsigned long long)io->size, cmd->status);
791 return cmd->status;
793 err_out_close:
794 close(fd);
795 err_out_exit:
796 dnet_log(n, DNET_LOG_ERROR, "%s: read completed: file: '%s', offset: %llu, size: %llu, status: %d, err: %d.\n",
797 dnet_dump_id(&cmd->id), c->file, (unsigned long long)io->offset,
798 (unsigned long long)io->size, cmd->status, err);
799 err_out_exit_no_log:
800 dnet_wakeup(c->wait, c->wait->cond = err ? err : 1);
801 return err;
804 int dnet_read_object(struct dnet_session *s, struct dnet_io_control *ctl)
806 int err;
808 if (!dnet_io_trans_create(s->node, ctl, &err))
809 return err;
811 return 0;
814 static int dnet_read_file_raw_exec(struct dnet_session *s, const char *file, unsigned int len,
815 uint64_t write_offset, uint64_t io_offset, uint64_t io_size,
816 struct dnet_id *id, struct dnet_wait *w)
818 struct dnet_node *n = s->node;
819 struct dnet_io_control ctl;
820 struct dnet_io_completion *c;
821 int err, wait_init = ~0;
823 memset(&ctl, 0, sizeof(struct dnet_io_control));
825 ctl.io.size = io_size;
826 ctl.io.offset = io_offset;
828 ctl.io.type = id->type;
830 memcpy(ctl.io.parent, id->id, DNET_ID_SIZE);
831 memcpy(ctl.io.id, id->id, DNET_ID_SIZE);
833 memcpy(&ctl.id, id, sizeof(struct dnet_id));
835 ctl.fd = -1;
836 ctl.complete = dnet_read_file_complete;
837 ctl.cmd = DNET_CMD_READ;
838 ctl.cflags = DNET_FLAGS_NEED_ACK;
840 c = malloc(sizeof(struct dnet_io_completion) + len + 1 + sizeof(DNET_HISTORY_SUFFIX));
841 if (!c) {
842 dnet_log(n, DNET_LOG_ERROR, "%s: failed to allocate IO completion structure "
843 "for '%s' file reading.\n",
844 dnet_dump_id(&ctl.id), file);
845 err = -ENOMEM;
846 goto err_out_exit;
849 memset(c, 0, sizeof(struct dnet_io_completion) + len + 1 + sizeof(DNET_HISTORY_SUFFIX));
851 c->wait = dnet_wait_get(w);
852 c->offset = write_offset;
853 c->file = (char *)(c + 1);
855 sprintf(c->file, "%s", file);
857 ctl.priv = c;
859 w->cond = wait_init;
860 err = dnet_read_object(s, &ctl);
861 if (err)
862 goto err_out_exit;
864 err = dnet_wait_event(w, w->cond != wait_init, &n->wait_ts);
865 if ((err < 0) || (w->cond < 0)) {
866 char id_str[2*DNET_ID_SIZE + 1];
867 if (!err)
868 err = w->cond;
869 dnet_log(n, DNET_LOG_ERROR, "%d:%s '%s' : failed to read data: %d\n",
870 ctl.id.group_id, dnet_dump_id_len_raw(ctl.id.id, DNET_ID_SIZE, id_str),
871 file, err);
872 goto err_out_exit;
875 return 0;
877 err_out_exit:
878 return err;
881 static int dnet_read_file_raw(struct dnet_session *s, const char *file, struct dnet_id *id, uint64_t offset, uint64_t size)
883 struct dnet_node *n = s->node;
884 int err = -ENOENT, len = strlen(file), i;
885 struct dnet_wait *w;
886 int *g, num;
888 w = dnet_wait_alloc(~0);
889 if (!w) {
890 err = -ENOMEM;
891 dnet_log(n, DNET_LOG_ERROR, "Failed to allocate read waiting.\n");
892 goto err_out_exit;
895 if (!size)
896 size = ~0ULL;
898 num = dnet_mix_states(s, id, &g);
899 if (num < 0) {
900 err = num;
901 goto err_out_exit;
904 for (i=0; i<num; ++i) {
905 id->group_id = g[i];
907 err = dnet_read_file_raw_exec(s, file, len, 0, offset, size, id, w);
908 if (err)
909 continue;
911 break;
914 dnet_wait_put(w);
915 free(g);
917 err_out_exit:
918 return err;
921 int dnet_read_file_id(struct dnet_session *s, const char *file, struct dnet_id *id, uint64_t offset, uint64_t size)
923 return dnet_read_file_raw(s, file, id, offset, size);
926 int dnet_read_file(struct dnet_session *s, const char *file, const void *remote, int remote_size,
927 uint64_t offset, uint64_t size, int type)
929 struct dnet_id id;
931 dnet_transform(s->node, remote, remote_size, &id);
932 id.type = type;
934 return dnet_read_file_raw(s, file, &id, offset, size);
937 struct dnet_wait *dnet_wait_alloc(int cond)
939 int err;
940 struct dnet_wait *w;
942 w = malloc(sizeof(struct dnet_wait));
943 if (!w) {
944 err = -ENOMEM;
945 goto err_out_exit;
948 memset(w, 0, sizeof(struct dnet_wait));
950 err = pthread_cond_init(&w->wait, NULL);
951 if (err)
952 goto err_out_exit;
954 err = pthread_mutex_init(&w->wait_lock, NULL);
955 if (err)
956 goto err_out_destroy;
958 w->cond = cond;
959 atomic_init(&w->refcnt, 1);
961 return w;
963 err_out_destroy:
964 pthread_mutex_destroy(&w->wait_lock);
965 err_out_exit:
966 return NULL;
969 void dnet_wait_destroy(struct dnet_wait *w)
971 pthread_mutex_destroy(&w->wait_lock);
972 pthread_cond_destroy(&w->wait);
973 free(w->ret);
974 free(w);
977 static int dnet_send_cmd_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
979 struct dnet_wait *w = priv;
981 if (is_trans_destroyed(st, cmd)) {
982 dnet_wakeup(w, w->cond++);
983 dnet_wait_put(w);
984 return 0;
987 w->status = cmd->status;
989 if (cmd->size) {
990 void *old = w->ret;
991 void *data = cmd + 1;
993 w->ret = realloc(w->ret, w->size + cmd->size);
994 if (!w->ret) {
995 w->ret = old;
996 w->status = -ENOMEM;
997 } else {
998 memcpy(w->ret + w->size, data, cmd->size);
999 w->size += cmd->size;
1003 return w->status;
1006 static int dnet_send_cmd_single(struct dnet_net_state *st, struct dnet_wait *w, struct sph *e, uint64_t cflags)
1008 struct dnet_trans_control ctl;
1010 memset(&ctl, 0, sizeof(struct dnet_trans_control));
1012 dnet_setup_id(&ctl.id, st->idc->group->group_id, st->idc->ids[0].raw.id);
1013 ctl.size = sizeof(struct sph) + e->event_size + e->data_size + e->binary_size;
1014 ctl.cmd = DNET_CMD_EXEC;
1015 ctl.complete = dnet_send_cmd_complete;
1016 ctl.priv = w;
1017 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
1019 dnet_convert_sph(e);
1021 ctl.data = e;
1023 return dnet_trans_alloc_send_state(st, &ctl);
1026 static int dnet_send_cmd_raw(struct dnet_session *s, struct dnet_id *id,
1027 struct sph *e, void **ret, uint64_t cflags)
1029 struct dnet_node *n = s->node;
1030 struct dnet_net_state *st;
1031 int err = -ENOENT, num = 0;
1032 struct dnet_wait *w;
1033 struct dnet_group *g;
1035 w = dnet_wait_alloc(0);
1036 if (!w) {
1037 err = -ENOMEM;
1038 goto err_out_exit;
1041 if (id && id->group_id != 0) {
1042 dnet_wait_get(w);
1043 st = dnet_state_get_first(n, id);
1044 if (!st)
1045 goto err_out_put;
1046 err = dnet_send_cmd_single(st, w, e, cflags);
1047 dnet_state_put(st);
1048 num = 1;
1049 } else if (id && id->group_id == 0) {
1050 pthread_mutex_lock(&n->state_lock);
1051 list_for_each_entry(g, &n->group_list, group_entry) {
1052 dnet_wait_get(w);
1054 id->group_id = g->group_id;
1056 st = dnet_state_search_nolock(n, id);
1057 if (st) {
1058 if (st != n->st) {
1059 err = dnet_send_cmd_single(st, w, e, cflags);
1060 num++;
1062 dnet_state_put(st);
1065 pthread_mutex_unlock(&n->state_lock);
1066 } else {
1067 pthread_mutex_lock(&n->state_lock);
1068 list_for_each_entry(g, &n->group_list, group_entry) {
1069 list_for_each_entry(st, &g->state_list, state_entry) {
1070 if (st == n->st)
1071 continue;
1073 dnet_wait_get(w);
1075 err = dnet_send_cmd_single(st, w, e, cflags);
1076 num++;
1079 pthread_mutex_unlock(&n->state_lock);
1082 err = dnet_wait_event(w, w->cond == num, &n->wait_ts);
1083 if (err)
1084 goto err_out_put;
1086 if (w->ret) {
1087 *ret = w->ret;
1088 w->ret = NULL;
1090 err = w->size;
1093 dnet_wait_put(w);
1095 return err;
1097 err_out_put:
1098 dnet_wait_put(w);
1099 err_out_exit:
1100 return err;
1103 int dnet_send_cmd(struct dnet_session *s, struct dnet_id *id, struct sph *e, void **ret)
1105 return dnet_send_cmd_raw(s, id, e, ret, 0);
1108 int dnet_send_cmd_nolock(struct dnet_session *s, struct dnet_id *id, struct sph *e, void **ret)
1110 return dnet_send_cmd_raw(s, id, e, ret, DNET_FLAGS_NOLOCK);
1113 int dnet_try_reconnect(struct dnet_node *n)
1115 struct dnet_addr_storage *ast, *tmp;
1116 struct dnet_net_state *st;
1117 LIST_HEAD(list);
1118 int s, err, join;
1120 if (list_empty(&n->reconnect_list))
1121 return 0;
1123 pthread_mutex_lock(&n->reconnect_lock);
1124 list_for_each_entry_safe(ast, tmp, &n->reconnect_list, reconnect_entry) {
1125 list_move(&ast->reconnect_entry, &list);
1127 pthread_mutex_unlock(&n->reconnect_lock);
1129 list_for_each_entry_safe(ast, tmp, &list, reconnect_entry) {
1130 s = dnet_socket_create_addr(n, n->sock_type, n->proto, n->family,
1131 (struct sockaddr *)ast->addr.addr, ast->addr.addr_len, 0);
1132 if (s < 0)
1133 goto out_add;
1135 join = DNET_WANT_RECONNECT;
1136 if (ast->__join_state == DNET_JOIN)
1137 join = DNET_JOIN;
1139 st = dnet_add_state_socket(n, &ast->addr, s, &err, join);
1140 if (st)
1141 goto out_remove;
1143 dnet_sock_close(s);
1145 if (err == -EEXIST || err == -EINVAL)
1146 goto out_remove;
1148 out_add:
1149 dnet_add_reconnect_state(n, &ast->addr, ast->__join_state);
1150 out_remove:
1151 list_del(&ast->reconnect_entry);
1152 free(ast);
1155 return 0;
1158 int dnet_lookup_object(struct dnet_session *s, struct dnet_id *id, uint64_t cflags,
1159 int (* complete)(struct dnet_net_state *, struct dnet_cmd *, void *),
1160 void *priv)
1162 struct dnet_node *n = s->node;
1163 struct dnet_io_req req;
1164 struct dnet_trans *t;
1165 struct dnet_cmd *cmd;
1166 int err;
1168 t = dnet_trans_alloc(n, sizeof(struct dnet_cmd));
1169 if (!t) {
1170 err = -ENOMEM;
1171 goto err_out_complete;
1173 t->complete = complete;
1174 t->priv = priv;
1176 cmd = (struct dnet_cmd *)(t + 1);
1178 memcpy(&cmd->id, id, sizeof(struct dnet_id));
1180 memcpy(&t->cmd, cmd, sizeof(struct dnet_cmd));
1182 cmd->cmd = t->command = DNET_CMD_LOOKUP;
1183 cmd->flags = cflags | DNET_FLAGS_NEED_ACK;
1185 t->st = dnet_state_get_first(n, &cmd->id);
1186 if (!t->st) {
1187 err = -ENOENT;
1188 goto err_out_destroy;
1191 cmd->trans = t->rcv_trans = t->trans = atomic_inc(&n->trans);
1192 dnet_convert_cmd(cmd);
1194 dnet_log(n, DNET_LOG_NOTICE, "%s: lookup to %s.\n", dnet_dump_id(id), dnet_server_convert_dnet_addr(&t->st->addr));
1196 memset(&req, 0, sizeof(req));
1197 req.st = t->st;
1198 req.header = cmd;
1199 req.hsize = sizeof(struct dnet_cmd);
1201 err = dnet_trans_send(t, &req);
1202 if (err)
1203 goto err_out_destroy;
1205 return 0;
1207 err_out_complete:
1208 if (complete)
1209 complete(NULL, NULL, priv);
1210 return err;
1212 err_out_destroy:
1213 dnet_trans_put(t);
1214 return err;
1217 int dnet_lookup_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
1219 struct dnet_wait *w = priv;
1220 struct dnet_node *n = NULL;
1221 struct dnet_addr_attr *a;
1222 struct dnet_net_state *other;
1223 char addr_str[128] = "no-address";
1224 int err;
1226 if (is_trans_destroyed(st, cmd)) {
1227 dnet_wakeup(w, w->cond++);
1228 dnet_wait_put(w);
1229 return 0;
1231 n = st->n;
1233 err = cmd->status;
1234 if (err || !cmd->size)
1235 goto err_out_exit;
1237 if (cmd->size < sizeof(struct dnet_addr_attr)) {
1238 dnet_log(st->n, DNET_LOG_ERROR, "%s: wrong dnet_addr attribute size %llu, must be at least %zu.\n",
1239 dnet_dump_id(&cmd->id), (unsigned long long)cmd->size, sizeof(struct dnet_addr_attr));
1240 err = -EINVAL;
1241 goto err_out_exit;
1244 a = (struct dnet_addr_attr *)(cmd + 1);
1246 dnet_convert_addr_attr(a);
1247 dnet_server_convert_dnet_addr_raw(&a->addr, addr_str, sizeof(addr_str));
1249 if (cmd->size > sizeof(struct dnet_addr_attr) + sizeof(struct dnet_file_info)) {
1250 struct dnet_file_info *info = (struct dnet_file_info *)(a + 1);
1252 dnet_convert_file_info(info);
1254 dnet_log_raw(n, DNET_LOG_NOTICE, "%s: lookup object: %s: "
1255 "offset: %llu, size: %llu, mode: %llo, path: %s\n",
1256 dnet_dump_id(&cmd->id), addr_str,
1257 (unsigned long long)info->offset, (unsigned long long)info->size,
1258 (unsigned long long)info->mode, (char *)(info + 1));
1259 } else {
1260 dnet_log_raw(n, DNET_LOG_INFO, "%s: lookup object: %s\n",
1261 dnet_dump_id(&cmd->id), addr_str);
1265 other = dnet_state_search_by_addr(n, &a->addr);
1266 if (other) {
1267 dnet_state_put(other);
1268 } else {
1269 dnet_recv_route_list(st);
1272 return 0;
1274 err_out_exit:
1275 if (n)
1276 dnet_log(n, DNET_LOG_ERROR, "%s: lookup completion status: %d, err: %d.\n", dnet_dump_id(&cmd->id), cmd->status, err);
1278 return err;
1281 int dnet_lookup(struct dnet_session *s, const char *file)
1283 struct dnet_node *n = s->node;
1284 int err, error = 0, i;
1285 struct dnet_wait *w;
1286 struct dnet_id raw;
1288 w = dnet_wait_alloc(0);
1289 if (!w) {
1290 err = -ENOMEM;
1291 goto err_out_exit;
1294 dnet_transform(n, file, strlen(file), &raw);
1296 for (i=0; i<s->group_num; ++i) {
1297 raw.group_id = s->groups[i];
1299 err = dnet_lookup_object(s, &raw, 0, dnet_lookup_complete, dnet_wait_get(w));
1300 if (err) {
1301 error = err;
1302 continue;
1305 err = dnet_wait_event(w, w->cond == 1, &n->wait_ts);
1306 if (err || w->status) {
1307 if (!err)
1308 err = w->status;
1309 error = err;
1310 continue;
1313 error = 0;
1314 break;
1317 dnet_wait_put(w);
1318 return error;
1320 err_out_exit:
1321 return err;
1324 struct dnet_addr *dnet_state_addr(struct dnet_net_state *st)
1326 return &st->addr;
1329 static int dnet_stat_complete(struct dnet_net_state *state, struct dnet_cmd *cmd, void *priv)
1331 struct dnet_wait *w = priv;
1332 float la[3];
1333 struct dnet_stat *st;
1334 int err = -EINVAL;
1336 if (is_trans_destroyed(state, cmd)) {
1337 dnet_wakeup(w, w->cond++);
1338 dnet_wait_put(w);
1339 return 0;
1342 if (cmd->cmd == DNET_CMD_STAT && cmd->size == sizeof(struct dnet_stat)) {
1343 st = (struct dnet_stat *)(cmd + 1);
1345 dnet_convert_stat(st);
1347 la[0] = (float)st->la[0] / 100.0;
1348 la[1] = (float)st->la[1] / 100.0;
1349 la[2] = (float)st->la[2] / 100.0;
1351 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: la: %.2f %.2f %.2f.\n",
1352 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state),
1353 la[0], la[1], la[2]);
1354 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: mem: "
1355 "total: %llu kB, free: %llu kB, cache: %llu kB.\n",
1356 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state),
1357 (unsigned long long)st->vm_total,
1358 (unsigned long long)st->vm_free,
1359 (unsigned long long)st->vm_cached);
1360 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: fs: "
1361 "total: %llu mB, avail: %llu mB, files: %llu, fsid: %llx.\n",
1362 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state),
1363 (unsigned long long)(st->frsize * st->blocks / 1024 / 1024),
1364 (unsigned long long)(st->bavail * st->bsize / 1024 / 1024),
1365 (unsigned long long)st->files, (unsigned long long)st->fsid);
1366 err = 0;
1367 } else if (cmd->size >= sizeof(struct dnet_addr_stat) && cmd->cmd == DNET_CMD_STAT_COUNT) {
1368 struct dnet_addr_stat *as = (struct dnet_addr_stat *)(cmd + 1);
1369 int i;
1371 dnet_convert_addr_stat(as, 0);
1373 for (i=0; i<as->num; ++i) {
1374 if (as->num > as->cmd_num) {
1375 if (i == 0)
1376 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: Storage commands\n",
1377 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state));
1378 if (i == as->cmd_num)
1379 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: Proxy commands\n",
1380 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state));
1381 if (i == as->cmd_num * 2)
1382 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: Counters\n",
1383 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state));
1385 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: cmd: %s, count: %llu, err: %llu\n",
1386 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state),
1387 dnet_counter_string(i, as->cmd_num),
1388 (unsigned long long)as->count[i].count, (unsigned long long)as->count[i].err);
1392 return err;
1395 static int dnet_request_cmd_single(struct dnet_session *s, struct dnet_net_state *st, struct dnet_trans_control *ctl)
1397 if (st)
1398 return dnet_trans_alloc_send_state(st, ctl);
1399 else
1400 return dnet_trans_alloc_send(s, ctl);
1403 int dnet_request_stat(struct dnet_session *s, struct dnet_id *id,
1404 unsigned int cmd, uint64_t cflags,
1405 int (* complete)(struct dnet_net_state *state,
1406 struct dnet_cmd *cmd,
1407 void *priv),
1408 void *priv)
1410 struct dnet_node *n = s->node;
1411 struct dnet_trans_control ctl;
1412 struct dnet_wait *w = NULL;
1413 int err, num = 0;
1414 struct timeval start, end;
1415 long diff;
1417 gettimeofday(&start, NULL);
1419 if (!complete) {
1420 w = dnet_wait_alloc(0);
1421 if (!w) {
1422 err = -ENOMEM;
1423 goto err_out_exit;
1426 complete = dnet_stat_complete;
1427 priv = w;
1430 memset(&ctl, 0, sizeof(struct dnet_trans_control));
1432 ctl.cmd = cmd;
1433 ctl.complete = complete;
1434 ctl.priv = priv;
1435 ctl.cflags = DNET_FLAGS_NEED_ACK | DNET_FLAGS_NOLOCK | cflags;
1437 if (id) {
1438 if (w)
1439 dnet_wait_get(w);
1441 memcpy(&ctl.id, id, sizeof(struct dnet_id));
1443 err = dnet_request_cmd_single(s, NULL, &ctl);
1444 num = 1;
1445 } else {
1446 struct dnet_net_state *st;
1447 struct dnet_group *g;
1450 pthread_mutex_lock(&n->state_lock);
1451 list_for_each_entry(g, &n->group_list, group_entry) {
1452 list_for_each_entry(st, &g->state_list, state_entry) {
1453 if (st == n->st)
1454 continue;
1456 if (w)
1457 dnet_wait_get(w);
1459 dnet_setup_id(&ctl.id, st->idc->group->group_id, st->idc->ids[0].raw.id);
1460 dnet_request_cmd_single(s, st, &ctl);
1461 num++;
1464 pthread_mutex_unlock(&n->state_lock);
1467 if (!w) {
1468 gettimeofday(&end, NULL);
1469 diff = (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec;
1470 dnet_log(n, DNET_LOG_NOTICE, "stat cmd: %s: %ld usecs, num: %d.\n", dnet_cmd_string(cmd), diff, num);
1472 return num;
1475 err = dnet_wait_event(w, w->cond == num, &n->wait_ts);
1477 gettimeofday(&end, NULL);
1478 diff = (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec;
1479 dnet_log(n, DNET_LOG_NOTICE, "stat cmd: %s: %ld usecs, wait_error: %d, num: %d.\n", dnet_cmd_string(cmd), diff, err, num);
1481 if (err)
1482 goto err_out_put;
1484 dnet_wait_put(w);
1486 return num;
1488 err_out_put:
1489 dnet_wait_put(w);
1490 err_out_exit:
1491 return err;
1494 struct dnet_request_cmd_priv {
1495 struct dnet_wait *w;
1497 int (* complete)(struct dnet_net_state *state, struct dnet_cmd *cmd, void *priv);
1498 void *priv;
1501 static int dnet_request_cmd_complete(struct dnet_net_state *state, struct dnet_cmd *cmd, void *priv)
1503 struct dnet_request_cmd_priv *p = priv;
1504 int err = p->complete(state, cmd, p->priv);
1506 if (is_trans_destroyed(state, cmd)) {
1507 struct dnet_wait *w = p->w;
1509 dnet_wakeup(w, w->cond++);
1510 if (atomic_read(&w->refcnt) == 1)
1511 free(p);
1512 dnet_wait_put(w);
1515 return err;
1518 int dnet_request_cmd(struct dnet_session *s, struct dnet_trans_control *ctl)
1520 struct dnet_node *n = s->node;
1521 int err, num = 0;
1522 struct dnet_request_cmd_priv *p;
1523 struct dnet_wait *w;
1524 struct dnet_net_state *st;
1525 struct dnet_group *g;
1526 struct timeval start, end;
1527 long diff;
1529 gettimeofday(&start, NULL);
1531 p = malloc(sizeof(*p));
1532 if (!p) {
1533 err = -ENOMEM;
1534 goto err_out_exit;
1537 w = dnet_wait_alloc(0);
1538 if (!w) {
1539 err = -ENOMEM;
1540 goto err_out_free;
1543 p->w = w;
1544 p->complete = ctl->complete;
1545 p->priv = ctl->priv;
1547 ctl->complete = dnet_request_cmd_complete;
1548 ctl->priv = p;
1550 pthread_mutex_lock(&n->state_lock);
1551 list_for_each_entry(g, &n->group_list, group_entry) {
1552 list_for_each_entry(st, &g->state_list, state_entry) {
1553 if (st == n->st)
1554 continue;
1556 dnet_wait_get(w);
1558 ctl->id.group_id = g->group_id;
1560 if (!(ctl->cflags & DNET_FLAGS_DIRECT))
1561 dnet_setup_id(&ctl->id, st->idc->group->group_id, st->idc->ids[0].raw.id);
1562 dnet_request_cmd_single(s, st, ctl);
1563 num++;
1566 pthread_mutex_unlock(&n->state_lock);
1568 err = dnet_wait_event(w, w->cond == num, &n->wait_ts);
1570 gettimeofday(&end, NULL);
1571 diff = (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec;
1572 dnet_log(n, DNET_LOG_NOTICE, "request cmd: %s: %ld usecs, wait_error: %d, num: %d.\n", dnet_cmd_string(ctl->cmd), diff, err, num);
1574 if (!err)
1575 err = num;
1577 if (atomic_read(&w->refcnt) == 1)
1578 free(p);
1579 dnet_wait_put(w);
1581 return err;
1583 err_out_free:
1584 free(p);
1585 err_out_exit:
1586 return err;
1589 struct dnet_update_status_priv {
1590 struct dnet_wait *w;
1591 struct dnet_node_status status;
1592 atomic_t refcnt;
1595 static int dnet_update_status_complete(struct dnet_net_state *state, struct dnet_cmd *cmd, void *priv)
1597 struct dnet_update_status_priv *p = priv;
1599 if (is_trans_destroyed(state, cmd)) {
1600 dnet_wakeup(p->w, p->w->cond++);
1601 dnet_wait_put(p->w);
1602 if (atomic_dec_and_test(&p->refcnt))
1603 free(p);
1606 if (cmd->size == sizeof(struct dnet_node_status)) {
1607 memcpy(&p->status, cmd + 1, sizeof(struct dnet_node_status));
1608 return 0;
1611 return -ENOENT;
1614 int dnet_update_status(struct dnet_session *s, struct dnet_addr *addr, struct dnet_id *id, struct dnet_node_status *status)
1616 int err;
1617 struct dnet_update_status_priv *priv;
1618 struct dnet_trans_control ctl;
1620 if (!id && !addr) {
1621 err = -EINVAL;
1622 goto err_out_exit;
1625 memset(&ctl, 0, sizeof(ctl));
1627 if (id) {
1628 memcpy(&ctl.id, id, sizeof(struct dnet_id));
1629 } else {
1630 struct dnet_net_state *st;
1632 st = dnet_state_search_by_addr(s->node, addr);
1633 if (!st) {
1634 err = -ENOENT;
1635 goto err_out_exit;
1638 dnet_setup_id(&ctl.id, st->idc->group->group_id, st->idc->ids[0].raw.id);
1639 dnet_state_put(st);
1642 priv = malloc(sizeof(struct dnet_update_status_priv));
1643 if (!priv) {
1644 err = -ENOMEM;
1645 goto err_out_exit;
1648 priv->w = dnet_wait_alloc(0);
1649 if (!priv->w) {
1650 err = -ENOMEM;
1651 goto err_out_exit;
1654 ctl.complete = dnet_update_status_complete;
1655 ctl.priv = priv;
1656 ctl.cmd = DNET_CMD_STATUS;
1657 ctl.cflags = DNET_FLAGS_NEED_ACK;
1658 ctl.size = sizeof(struct dnet_node_status);
1659 ctl.data = status;
1661 dnet_wait_get(priv->w);
1662 dnet_request_cmd_single(s, NULL, &ctl);
1664 err = dnet_wait_event(priv->w, priv->w->cond == 1, &s->node->wait_ts);
1665 dnet_wait_put(priv->w);
1666 if (!err && priv) {
1667 memcpy(status, &priv->status, sizeof(struct dnet_node_status));
1669 if (atomic_dec_and_test(&priv->refcnt))
1670 free(priv);
1672 err_out_exit:
1673 return err;
1676 static int dnet_remove_object_raw(struct dnet_session *s, struct dnet_id *id,
1677 int (* complete)(struct dnet_net_state *state,
1678 struct dnet_cmd *cmd,
1679 void *priv),
1680 void *priv, uint64_t cflags, uint64_t ioflags)
1682 struct dnet_io_control ctl;
1683 int err;
1685 memset(&ctl, 0, sizeof(struct dnet_io_control));
1687 memcpy(&ctl.id, id, sizeof(struct dnet_id));
1689 memcpy(&ctl.io.id, id->id, DNET_ID_SIZE);
1690 memcpy(&ctl.io.parent, id->id, DNET_ID_SIZE);
1691 ctl.io.flags = ioflags;
1693 ctl.fd = -1;
1695 ctl.cmd = DNET_CMD_DEL;
1696 ctl.complete = complete;
1697 ctl.priv = priv;
1698 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
1700 err = dnet_trans_create_send_all(s, &ctl);
1701 if (err == 0)
1702 err = -ECONNRESET;
1704 return err;
1707 static int dnet_remove_complete(struct dnet_net_state *state,
1708 struct dnet_cmd *cmd,
1709 void *priv)
1711 struct dnet_wait *w = priv;
1713 if (is_trans_destroyed(state, cmd)) {
1714 dnet_wakeup(w, w->cond++);
1715 dnet_wait_put(w);
1716 return 0;
1719 if (cmd->status)
1720 w->status = cmd->status;
1721 return cmd->status;
1724 int dnet_remove_object(struct dnet_session *s, struct dnet_id *id,
1725 int (* complete)(struct dnet_net_state *state,
1726 struct dnet_cmd *cmd,
1727 void *priv),
1728 void *priv,
1729 uint64_t cflags, uint64_t ioflags)
1731 struct dnet_wait *w = NULL;
1732 int err;
1734 if (!complete) {
1735 w = dnet_wait_alloc(0);
1736 if (!w) {
1737 err = -ENOMEM;
1738 goto err_out_exit;
1741 complete = dnet_remove_complete;
1742 priv = w;
1743 dnet_wait_get(w);
1746 err = dnet_remove_object_raw(s, id, complete, priv, cflags, ioflags);
1747 if (err < 0)
1748 goto err_out_put;
1750 if (w) {
1751 err = dnet_wait_event(w, w->cond != err, &s->node->wait_ts);
1752 if (err)
1753 goto err_out_put;
1755 if (w->status < 0) {
1756 err = w->status;
1757 goto err_out_put;
1760 dnet_wait_put(w);
1762 return 0;
1764 err_out_put:
1765 if (w)
1766 dnet_wait_put(w);
1767 err_out_exit:
1768 return err;
1771 static int dnet_remove_file_raw(struct dnet_session *s, struct dnet_id *id, uint64_t cflags, uint64_t ioflags)
1773 struct dnet_wait *w;
1774 int err, num;
1776 w = dnet_wait_alloc(0);
1777 if (!w) {
1778 err = -ENOMEM;
1779 goto err_out_exit;
1782 atomic_add(&w->refcnt, 1024);
1783 err = dnet_remove_object_raw(s, id, dnet_remove_complete, w, cflags, ioflags);
1784 if (err < 0) {
1785 atomic_sub(&w->refcnt, 1024);
1786 goto err_out_put;
1789 num = err;
1790 atomic_sub(&w->refcnt, 1024 - num);
1792 err = dnet_wait_event(w, w->cond == num, &s->node->wait_ts);
1793 if (err)
1794 goto err_out_put;
1796 if (w->status < 0) {
1797 err = w->status;
1798 goto err_out_put;
1802 dnet_wait_put(w);
1804 return 0;
1806 err_out_put:
1807 dnet_wait_put(w);
1808 err_out_exit:
1809 return err;
1812 int dnet_remove_object_now(struct dnet_session *s, struct dnet_id *id, uint64_t cflags, uint64_t ioflags)
1814 return dnet_remove_file_raw(s, id, cflags | DNET_FLAGS_NEED_ACK | DNET_ATTR_DELETE_HISTORY, ioflags);
1817 int dnet_remove_file(struct dnet_session *s, char *remote, int remote_len, struct dnet_id *id, uint64_t cflags, uint64_t ioflags)
1819 struct dnet_id raw;
1821 if (!id) {
1822 dnet_transform(s->node, remote, remote_len, &raw);
1823 raw.group_id = 0;
1824 id = &raw;
1827 return dnet_remove_file_raw(s, id, cflags, ioflags);
1830 int dnet_request_ids(struct dnet_session *s, struct dnet_id *id, uint64_t cflags,
1831 int (* complete)(struct dnet_net_state *state,
1832 struct dnet_cmd *cmd,
1833 void *priv),
1834 void *priv)
1836 struct dnet_trans_control ctl;
1838 dnet_log_raw(s->node, DNET_LOG_ERROR, "Temporarily unsupported operation.\n");
1839 exit(-1);
1841 memset(&ctl, 0, sizeof(struct dnet_trans_control));
1843 memcpy(&ctl.id, id, sizeof(struct dnet_id));
1844 ctl.cmd = DNET_CMD_LIST;
1845 ctl.complete = complete;
1846 ctl.priv = priv;
1847 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
1849 return dnet_trans_alloc_send(s, &ctl);
1852 struct dnet_node *dnet_get_node_from_state(void *state)
1854 struct dnet_net_state *st = state;
1856 if (!st)
1857 return NULL;
1858 return st->n;
1861 struct dnet_read_data_completion {
1862 struct dnet_wait *w;
1863 void *data;
1864 uint64_t size;
1865 atomic_t refcnt;
1868 static int dnet_read_data_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
1870 struct dnet_read_data_completion *c = priv;
1871 struct dnet_wait *w = c->w;
1872 int err = -EINVAL;
1874 if (is_trans_destroyed(st, cmd)) {
1875 dnet_wakeup(w, w->cond++);
1876 dnet_wait_put(w);
1877 if (atomic_dec_and_test(&c->refcnt))
1878 free(c);
1879 return err;
1882 err = cmd->status;
1883 if (err)
1884 w->status = err;
1886 if (cmd->size >= sizeof(struct dnet_io_attr)) {
1887 struct dnet_io_attr *io = (struct dnet_io_attr *)(cmd + 1);
1888 uint64_t sz = c->size;
1890 dnet_convert_io_attr(io);
1892 sz += io->size + sizeof(struct dnet_io_attr);
1893 c->data = realloc(c->data, sz);
1894 if (!c->data) {
1895 err = -ENOMEM;
1896 goto err_out_exit;
1899 memcpy(c->data + c->size, io, sizeof(struct dnet_io_attr) + io->size);
1900 c->size = sz;
1903 err_out_exit:
1904 dnet_log(st->n, DNET_LOG_NOTICE, "%s: object read completed: trans: %llu, status: %d, err: %d.\n",
1905 dnet_dump_id(&cmd->id), (unsigned long long)(cmd->trans & ~DNET_TRANS_REPLY),
1906 cmd->status, err);
1908 return err;
1911 void *dnet_read_data_wait_raw(struct dnet_session *s, struct dnet_id *id, struct dnet_io_attr *io,
1912 int cmd, uint64_t cflags, int *errp)
1914 struct dnet_node *n = s->node;
1915 struct dnet_io_control ctl;
1916 struct dnet_wait *w;
1917 struct dnet_read_data_completion *c;
1918 void *data = NULL;
1919 int err;
1921 w = dnet_wait_alloc(0);
1922 if (!w) {
1923 err = -ENOMEM;
1924 goto err_out_exit;
1927 c = malloc(sizeof(*c));
1928 if (!c) {
1929 err = -ENOMEM;
1930 goto err_out_put;
1933 c->w = w;
1934 c->size = 0;
1935 c->data = NULL;
1936 /* one for completion callback, another for this function */
1937 atomic_init(&c->refcnt, 2);
1939 memset(&ctl, 0, sizeof(struct dnet_io_control));
1941 ctl.fd = -1;
1943 ctl.priv = c;
1944 ctl.complete = dnet_read_data_complete;
1946 ctl.cmd = cmd;
1947 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
1949 memcpy(&ctl.io, io, sizeof(struct dnet_io_attr));
1950 memcpy(&ctl.id, id, sizeof(struct dnet_id));
1952 ctl.id.type = io->type;
1954 dnet_wait_get(w);
1955 err = dnet_read_object(s, &ctl);
1956 if (err)
1957 goto err_out_put_complete;
1959 err = dnet_wait_event(w, w->cond, &n->wait_ts);
1960 if (err || w->status) {
1961 char id_str[2*DNET_ID_SIZE + 1];
1962 if (!err)
1963 err = w->status;
1964 if ((cmd != DNET_CMD_READ_RANGE) || (err != -ENOENT))
1965 dnet_log(n, DNET_LOG_ERROR, "%d:%s : failed to read data: %d\n",
1966 ctl.id.group_id, dnet_dump_id_len_raw(ctl.id.id, DNET_ID_SIZE, id_str), err);
1967 goto err_out_put_complete;
1969 io->size = c->size;
1970 data = c->data;
1971 err = 0;
1973 err_out_put_complete:
1974 if (atomic_dec_and_test(&c->refcnt))
1975 free(c);
1976 err_out_put:
1977 dnet_wait_put(w);
1978 err_out_exit:
1979 *errp = err;
1980 return data;
1983 static int dnet_read_recover(struct dnet_session *s, struct dnet_id *id, struct dnet_io_attr *io, void *data, uint64_t cflags)
1985 struct dnet_node *n = s->node;
1986 struct dnet_meta_container mc;
1987 struct dnet_io_control ctl;
1988 void *result;
1989 int err;
1991 err = dnet_read_meta(s, &mc, NULL, 0, id);
1992 if (err) {
1993 dnet_log(n, DNET_LOG_ERROR, "%s: read-recovery: could read metadata: %d\n", dnet_dump_id(id), err);
1994 goto err_out_exit;
1997 memset(&ctl, 0, sizeof(struct dnet_io_control));
1999 ctl.id = *id;
2000 ctl.io = *io;
2002 ctl.data = data + sizeof(struct dnet_io_attr);
2003 ctl.io.size -= sizeof(struct dnet_io_attr);
2005 ctl.fd = -1;
2006 ctl.cmd = DNET_CMD_WRITE;
2007 ctl.cflags = cflags;
2009 err = dnet_write_data_wait(s, &ctl, &result);
2010 if (err < 0) {
2011 dnet_log(n, DNET_LOG_ERROR, "%s: read-recovery: could not write data: %d\n", dnet_dump_id(id), err);
2012 goto err_out_free_meta;
2015 err = dnet_write_metadata(s, &mc, 0, cflags);
2016 if (err < 0)
2017 goto err_out_free_result;
2019 err_out_free_result:
2020 free(result);
2021 err_out_free_meta:
2022 free(mc.data);
2023 err_out_exit:
2024 return err;
2027 void *dnet_read_data_wait_groups(struct dnet_session *s, struct dnet_id *id, int *groups, int num,
2028 struct dnet_io_attr *io, uint64_t cflags, int *errp)
2030 int i;
2031 void *data;
2033 for (i = 0; i < num; ++i) {
2034 id->group_id = groups[i];
2036 data = dnet_read_data_wait_raw(s, id, io, DNET_CMD_READ, cflags, errp);
2037 if (data) {
2038 if ((i != 0) && (io->type == 0) && (io->offset == 0) && (io->size > sizeof(struct dnet_io_attr))) {
2039 dnet_read_recover(s, id, io, data, cflags);
2042 *errp = 0;
2043 return data;
2047 return NULL;
2050 void *dnet_read_data_wait(struct dnet_session *s, struct dnet_id *id, struct dnet_io_attr *io,
2051 uint64_t cflags, int *errp)
2053 int num, *g, err;
2054 void *data = NULL;
2056 num = dnet_mix_states(s, id, &g);
2057 if (num < 0) {
2058 err = num;
2059 goto err_out_exit;
2062 data = dnet_read_data_wait_groups(s, id, g, num, io, cflags, &err);
2063 if (!data)
2064 goto err_out_free;
2066 err_out_free:
2067 free(g);
2068 err_out_exit:
2069 *errp = err;
2070 return data;
2073 int dnet_write_data_wait(struct dnet_session *s, struct dnet_io_control *ctl, void **result)
2075 struct dnet_node *n = s->node;
2076 int err, trans_num = 0;
2077 struct dnet_wait *w;
2078 struct dnet_write_completion *wc;
2080 wc = malloc(sizeof(struct dnet_write_completion));
2081 if (!wc) {
2082 err = -ENOMEM;
2083 goto err_out_exit;
2085 memset(wc, 0, sizeof(struct dnet_write_completion));
2087 w = dnet_wait_alloc(0);
2088 if (!w) {
2089 err = -ENOMEM;
2090 free(wc);
2091 goto err_out_exit;
2093 wc->wait = w;
2095 w->status = -ENOENT;
2096 ctl->priv = wc;
2097 ctl->complete = dnet_write_complete;
2099 ctl->cmd = DNET_CMD_WRITE;
2100 ctl->cflags |= DNET_FLAGS_NEED_ACK;
2102 memcpy(ctl->io.id, ctl->id.id, DNET_ID_SIZE);
2104 atomic_set(&w->refcnt, INT_MAX);
2105 trans_num = dnet_write_object(s, ctl);
2106 if (trans_num < 0)
2107 trans_num = 0;
2110 * 1 - the first reference counter we grabbed at allocation time
2112 atomic_sub(&w->refcnt, INT_MAX - trans_num - 1);
2114 err = dnet_wait_event(w, w->cond == trans_num, &n->wait_ts);
2115 if (err || w->status) {
2116 if (!err)
2117 err = w->status;
2118 dnet_log(n, DNET_LOG_NOTICE, "%s: failed to wait for IO write completion, err: %d, status: %d.\n",
2119 dnet_dump_id(&ctl->id), err, w->status);
2122 if (err || !trans_num) {
2123 if (!err)
2124 err = -EINVAL;
2125 dnet_log(n, DNET_LOG_ERROR, "Failed to write data into the storage, err: %d, trans_num: %d.\n", err, trans_num);
2126 goto err_out_put;
2129 if (trans_num)
2130 dnet_log(n, DNET_LOG_NOTICE, "%s: wrote: %llu bytes, type: %d, reply size: %d.\n",
2131 dnet_dump_id(&ctl->id), (unsigned long long)ctl->io.size, ctl->io.type, wc->size);
2132 err = trans_num;
2134 *result = wc->reply;
2135 err = wc->size;
2137 wc->reply = NULL;
2139 err_out_put:
2140 dnet_write_complete_free(wc);
2141 err_out_exit:
2142 return err;
2145 int dnet_lookup_addr(struct dnet_session *s, const void *remote, int len, struct dnet_id *id, int group_id, char *dst, int dlen)
2147 struct dnet_node *n = s->node;
2148 struct dnet_id raw;
2149 struct dnet_net_state *st;
2150 int err = -ENOENT;
2152 if (!id) {
2153 dnet_transform(n, remote, len, &raw);
2154 id = &raw;
2156 id->group_id = group_id;
2158 st = dnet_state_get_first(n, id);
2159 if (!st)
2160 goto err_out_exit;
2162 dnet_server_convert_dnet_addr_raw(dnet_state_addr(st), dst, dlen);
2163 dnet_state_put(st);
2164 err = 0;
2166 err_out_exit:
2167 return err;
2170 struct dnet_weight {
2171 int weight;
2172 int group_id;
2175 static int dnet_weight_compare(const void *v1, const void *v2)
2177 const struct dnet_weight *w1 = v1;
2178 const struct dnet_weight *w2 = v2;
2180 return w2->weight - w1->weight;
2183 static int dnet_weight_get_winner(struct dnet_weight *w, int num)
2185 long sum = 0, pos;
2186 float r;
2187 int i;
2189 for (i = 0; i < num; ++i)
2190 sum += w[i].weight;
2192 r = (float)rand() / (float)RAND_MAX;
2193 pos = r * sum;
2195 for (i = 0; i < num; ++i) {
2196 pos -= w[i].weight;
2197 if (pos <= 0)
2198 return i;
2201 return num - 1;
2204 int dnet_mix_states(struct dnet_session *s, struct dnet_id *id, int **groupsp)
2206 struct dnet_node *n = s->node;
2207 struct dnet_weight *weights;
2208 int *groups;
2209 int group_num, i, num;
2210 struct dnet_net_state *st;
2212 if (!s->group_num)
2213 return -ENOENT;
2215 group_num = s->group_num;
2217 weights = alloca(s->group_num * sizeof(*weights));
2218 groups = malloc(s->group_num * sizeof(*groups));
2219 if (groups)
2220 memcpy(groups, s->groups, s->group_num * sizeof(*groups));
2222 if (!groups) {
2223 *groupsp = NULL;
2224 return -ENOMEM;
2227 if ((n->flags & DNET_CFG_RANDOMIZE_STATES) || !id) {
2228 for (i = 0; i < group_num; ++i) {
2229 weights[i].weight = rand();
2230 weights[i].group_id = groups[i];
2232 num = group_num;
2233 } else {
2234 if (!(n->flags & DNET_CFG_MIX_STATES)) {
2235 *groupsp = groups;
2236 return group_num;
2239 memset(weights, 0, group_num * sizeof(*weights));
2241 for (i = 0, num = 0; i < group_num; ++i) {
2242 id->group_id = groups[i];
2244 st = dnet_state_get_first(n, id);
2245 if (st) {
2246 weights[num].weight = (int)st->weight;
2247 weights[num].group_id = id->group_id;
2249 dnet_state_put(st);
2251 num++;
2256 group_num = num;
2257 if (group_num) {
2258 qsort(weights, group_num, sizeof(struct dnet_weight), dnet_weight_compare);
2260 for (i = 0; i < group_num; ++i) {
2261 int pos = dnet_weight_get_winner(weights, group_num - i);
2262 groups[i] = weights[pos].group_id;
2264 if (pos < group_num - 1)
2265 memmove(&weights[pos], &weights[pos + 1], (group_num - 1 - pos) * sizeof(struct dnet_weight));
2269 dnet_session_set_groups(s, groups, group_num);
2271 *groupsp = groups;
2272 return group_num;
2275 int dnet_data_map(struct dnet_map_fd *map)
2277 uint64_t off;
2278 long page_size = sysconf(_SC_PAGE_SIZE);
2279 int err = 0;
2281 off = map->offset & ~(page_size - 1);
2282 map->mapped_size = ALIGN(map->size + map->offset - off, page_size);
2284 map->mapped_data = mmap(NULL, map->mapped_size, PROT_READ, MAP_SHARED, map->fd, off);
2285 if (map->mapped_data == MAP_FAILED) {
2286 err = -errno;
2287 goto err_out_exit;
2290 map->data = map->mapped_data + map->offset - off;
2292 err_out_exit:
2293 return err;
2296 void dnet_data_unmap(struct dnet_map_fd *map)
2298 munmap(map->mapped_data, map->mapped_size);
2301 struct dnet_io_attr *dnet_remove_range(struct dnet_session *s, struct dnet_io_attr *io, int group_id, uint64_t cflags, int *ret_num, int *errp)
2303 struct dnet_node *n = s->node;
2304 struct dnet_id id;
2305 struct dnet_io_attr *ret, *new_ret;
2306 struct dnet_raw_id start, next;
2307 struct dnet_raw_id end;
2308 uint64_t size = io->size;
2309 void *data;
2310 int err, need_exit = 0;
2312 memcpy(end.id, io->parent, DNET_ID_SIZE);
2314 dnet_setup_id(&id, group_id, io->id);
2315 id.type = io->type;
2317 ret = NULL;
2318 *ret_num = 0;
2319 while (!need_exit) {
2320 err = dnet_search_range(n, &id, &start, &next);
2321 if (err)
2322 goto err_out_exit;
2324 if ((dnet_id_cmp_str(id.id, next.id) > 0) ||
2325 !memcmp(start.id, next.id, DNET_ID_SIZE) ||
2326 (dnet_id_cmp_str(next.id, end.id) > 0)) {
2327 memcpy(next.id, end.id, DNET_ID_SIZE);
2328 need_exit = 1;
2331 if (n->log->log_level > DNET_LOG_NOTICE) {
2332 int len = 6;
2333 char start_id[2*len + 1];
2334 char next_id[2*len + 1];
2335 char end_id[2*len + 1];
2336 char id_str[2*len + 1];
2338 dnet_log(n, DNET_LOG_NOTICE, "id: %s, start: %s: next: %s, end: %s, size: %llu, cmp: %d\n",
2339 dnet_dump_id_len_raw(id.id, len, id_str),
2340 dnet_dump_id_len_raw(start.id, len, start_id),
2341 dnet_dump_id_len_raw(next.id, len, next_id),
2342 dnet_dump_id_len_raw(end.id, len, end_id),
2343 (unsigned long long)size, dnet_id_cmp_str(next.id, end.id));
2346 memcpy(io->id, id.id, DNET_ID_SIZE);
2347 memcpy(io->parent, next.id, DNET_ID_SIZE);
2349 io->size = size;
2351 data = dnet_read_data_wait_raw(s, &id, io, DNET_CMD_DEL_RANGE, cflags, &err);
2352 if (io->size != sizeof(struct dnet_io_attr)) {
2353 err = -ENOENT;
2354 goto err_out_exit;
2357 if (data) {
2358 struct dnet_io_attr *rep = (struct dnet_io_attr*)data;
2360 dnet_convert_io_attr(rep);
2362 dnet_log(n, DNET_LOG_NOTICE, "%s: rep_num: %llu, io_start: %llu, io_num: %llu, io_size: %llu\n",
2363 dnet_dump_id(&id), (unsigned long long)rep->num, (unsigned long long)io->start,
2364 (unsigned long long)io->num, (unsigned long long)io->size);
2366 (*ret_num)++;
2368 new_ret = realloc(ret, *ret_num * sizeof(struct dnet_io_attr));
2369 if (!new_ret) {
2370 err = -ENOMEM;
2371 goto err_out_exit;
2374 ret = new_ret;
2375 ret[*ret_num - 1] = *rep;
2377 free(data);
2380 memcpy(id.id, next.id, DNET_ID_SIZE);
2383 err_out_exit:
2384 *errp = err;
2386 return ret;
2389 struct dnet_range_data *dnet_read_range(struct dnet_session *s, struct dnet_io_attr *io, int group_id, uint64_t cflags, int *errp)
2391 struct dnet_node *n = s->node;
2392 struct dnet_id id;
2393 int ret_num;
2394 struct dnet_range_data *ret;
2395 struct dnet_raw_id start, next;
2396 struct dnet_raw_id end;
2397 uint64_t size = io->size;
2398 void *data;
2399 int err, need_exit = 0;
2401 memcpy(end.id, io->parent, DNET_ID_SIZE);
2403 dnet_setup_id(&id, group_id, io->id);
2404 id.type = io->type;
2406 ret = NULL;
2407 ret_num = 0;
2408 while (!need_exit) {
2409 err = dnet_search_range(n, &id, &start, &next);
2410 if (err)
2411 goto err_out_exit;
2413 if ((dnet_id_cmp_str(id.id, next.id) > 0) ||
2414 !memcmp(start.id, next.id, DNET_ID_SIZE) ||
2415 (dnet_id_cmp_str(next.id, end.id) > 0)) {
2416 memcpy(next.id, end.id, DNET_ID_SIZE);
2417 need_exit = 1;
2420 if (n->log->log_level > DNET_LOG_NOTICE) {
2421 int len = 6;
2422 char start_id[2*len + 1];
2423 char next_id[2*len + 1];
2424 char end_id[2*len + 1];
2425 char id_str[2*len + 1];
2427 dnet_log(n, DNET_LOG_NOTICE, "id: %s, start: %s: next: %s, end: %s, size: %llu, cmp: %d\n",
2428 dnet_dump_id_len_raw(id.id, len, id_str),
2429 dnet_dump_id_len_raw(start.id, len, start_id),
2430 dnet_dump_id_len_raw(next.id, len, next_id),
2431 dnet_dump_id_len_raw(end.id, len, end_id),
2432 (unsigned long long)size, dnet_id_cmp_str(next.id, end.id));
2435 memcpy(io->id, id.id, DNET_ID_SIZE);
2436 memcpy(io->parent, next.id, DNET_ID_SIZE);
2438 io->size = size;
2440 data = dnet_read_data_wait_raw(s, &id, io, DNET_CMD_READ_RANGE, cflags, &err);
2441 if (data) {
2442 struct dnet_io_attr *rep = data + io->size - sizeof(struct dnet_io_attr);
2444 /* If DNET_IO_FLAGS_NODATA is set do not decrement size as 'rep' is the only structure in output */
2445 if (!(io->flags & DNET_IO_FLAGS_NODATA))
2446 io->size -= sizeof(struct dnet_io_attr);
2447 dnet_convert_io_attr(rep);
2449 dnet_log(n, DNET_LOG_NOTICE, "%s: rep_num: %llu, io_start: %llu, io_num: %llu, io_size: %llu\n",
2450 dnet_dump_id(&id), (unsigned long long)rep->num, (unsigned long long)io->start,
2451 (unsigned long long)io->num, (unsigned long long)io->size);
2453 if (io->start < rep->num) {
2454 rep->num -= io->start;
2455 io->start = 0;
2456 io->num -= rep->num;
2458 if (!io->size && !(io->flags & DNET_IO_FLAGS_NODATA)) {
2459 free(data);
2460 } else {
2461 struct dnet_range_data *new_ret;
2463 ret_num++;
2465 new_ret = realloc(ret, ret_num * sizeof(struct dnet_range_data));
2466 if (!new_ret) {
2467 goto err_out_exit;
2470 ret = new_ret;
2472 ret[ret_num - 1].data = data;
2473 ret[ret_num - 1].size = io->size;
2476 err = 0;
2477 if (!io->num)
2478 break;
2479 } else {
2480 io->start -= rep->num;
2484 memcpy(id.id, next.id, DNET_ID_SIZE);
2487 err_out_exit:
2488 if (ret) {
2489 *errp = ret_num;
2490 } else {
2491 *errp = err;
2493 return ret;
2496 struct dnet_read_latest_id {
2497 struct dnet_id id;
2498 struct dnet_file_info fi;
2501 struct dnet_read_latest_ctl {
2502 struct dnet_wait *w;
2503 int num, pos;
2504 pthread_mutex_t lock;
2506 struct dnet_read_latest_id ids[0];
2509 static void dnet_read_latest_ctl_put(struct dnet_read_latest_ctl *ctl)
2511 dnet_wakeup(ctl->w, ctl->w->cond++);
2512 if (atomic_dec_and_test(&ctl->w->refcnt)) {
2513 dnet_wait_destroy(ctl->w);
2514 pthread_mutex_destroy(&ctl->lock);
2515 free(ctl);
2519 static int dnet_read_latest_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
2521 struct dnet_read_latest_ctl *ctl = priv;
2522 struct dnet_node *n;
2523 struct dnet_addr_attr *a;
2524 struct dnet_file_info *fi;
2525 int pos, err;
2527 if (is_trans_destroyed(st, cmd)) {
2528 dnet_read_latest_ctl_put(ctl);
2529 return 0;
2532 n = st->n;
2534 err = cmd->status;
2535 if (err || !cmd->size)
2536 goto err_out_exit;
2538 if (cmd->size < sizeof(struct dnet_addr_attr) + sizeof(struct dnet_file_info)) {
2539 dnet_log(n, DNET_LOG_ERROR, "%s: wrong dnet_addr attribute size %llu, must be at least %zu.\n",
2540 dnet_dump_id(&cmd->id), (unsigned long long)cmd->size,
2541 sizeof(struct dnet_addr_attr) + sizeof(struct dnet_file_info));
2542 err = -EINVAL;
2543 goto err_out_exit;
2545 a = (struct dnet_addr_attr *)(cmd + 1);
2546 fi = (struct dnet_file_info *)(a + 1);
2548 dnet_convert_addr_attr(a);
2549 dnet_convert_file_info(fi);
2551 pthread_mutex_lock(&ctl->lock);
2552 pos = ctl->pos++;
2553 pthread_mutex_unlock(&ctl->lock);
2555 /* we do not care about filename */
2556 memcpy(&ctl->ids[pos].fi, fi, sizeof(struct dnet_file_info));
2557 memcpy(&ctl->ids[pos].id, &cmd->id, sizeof(struct dnet_id));
2559 err_out_exit:
2560 return err;
2563 static int dnet_file_read_latest_cmp(const void *p1, const void *p2)
2565 const struct dnet_read_latest_id *id1 = p1;
2566 const struct dnet_read_latest_id *id2 = p2;
2568 int ret = (int)(id2->fi.mtime.tsec - id1->fi.mtime.tsec);
2570 if (!ret)
2571 ret = (int)(id2->fi.mtime.tnsec - id1->fi.mtime.tnsec);
2573 return ret;
2576 int dnet_read_latest_prepare(struct dnet_read_latest_prepare *pr)
2578 struct dnet_read_latest_ctl *ctl;
2579 int group_id = pr->id.group_id;
2580 int err, i;
2582 ctl = malloc(sizeof(struct dnet_read_latest_ctl) + sizeof(struct dnet_read_latest_id) * pr->group_num);
2583 if (!ctl) {
2584 err = -ENOMEM;
2585 goto err_out_exit;
2587 memset(ctl, 0, sizeof(struct dnet_read_latest_ctl));
2589 ctl->w = dnet_wait_alloc(0);
2590 if (!ctl->w) {
2591 err = -ENOMEM;
2592 goto err_out_free;
2595 err = pthread_mutex_init(&ctl->lock, NULL);
2596 if (err)
2597 goto err_out_put_wait;
2599 ctl->num = pr->group_num;
2600 ctl->pos = 0;
2602 for (i = 0; i < pr->group_num; ++i) {
2603 pr->id.group_id = pr->group[i];
2605 dnet_wait_get(ctl->w);
2606 dnet_lookup_object(pr->s, &pr->id, DNET_ATTR_META_TIMES | pr->cflags, dnet_read_latest_complete, ctl);
2609 err = dnet_wait_event(ctl->w, ctl->w->cond == pr->group_num, &pr->s->node->wait_ts);
2610 if (err)
2611 goto err_out_put;
2613 if (ctl->pos == 0)
2614 goto err_out_put;
2616 pr->group_num = ctl->pos;
2618 qsort(ctl->ids, pr->group_num, sizeof(struct dnet_read_latest_id), dnet_file_read_latest_cmp);
2620 for (i = 0; i < pr->group_num; ++i) {
2621 pr->group[i] = ctl->ids[i].id.group_id;
2623 if (group_id == pr->group[i]) {
2624 const struct dnet_read_latest_id *id0 = &ctl->ids[0];
2625 const struct dnet_read_latest_id *id1 = &ctl->ids[i];
2627 if (!dnet_file_read_latest_cmp(id0, id1)) {
2628 int tmp_group = pr->group[0];
2629 pr->group[0] = pr->group[i];
2630 pr->group[i] = tmp_group;
2635 err_out_put:
2636 dnet_read_latest_ctl_put(ctl);
2637 goto err_out_exit;
2639 err_out_put_wait:
2640 dnet_wait_put(ctl->w);
2641 err_out_free:
2642 free(ctl);
2643 err_out_exit:
2644 return err;
2647 int dnet_read_latest(struct dnet_session *s, struct dnet_id *id, struct dnet_io_attr *io, uint64_t cflags, void **datap)
2649 struct dnet_read_latest_prepare pr;
2650 int *g, num, err, i;
2652 if ((int)io->num > s->group_num) {
2653 err = -E2BIG;
2654 goto err_out_exit;
2657 err = dnet_mix_states(s, id, &g);
2658 if (err < 0)
2659 goto err_out_exit;
2661 num = err;
2663 if ((int)io->num > num) {
2664 err = -E2BIG;
2665 goto err_out_free;
2668 memset(&pr, 0, sizeof(struct dnet_read_latest_prepare));
2670 pr.s = s;
2671 pr.id = *id;
2672 pr.group = g;
2673 pr.group_num = num;
2674 pr.cflags = cflags;
2676 err = dnet_read_latest_prepare(&pr);
2677 if (err)
2678 goto err_out_free;
2680 err = -ENODATA;
2681 for (i = 0; i < pr.group_num; ++i) {
2682 void *data;
2684 id->group_id = pr.group[i];
2685 data = dnet_read_data_wait_raw(s, id, io, DNET_CMD_READ, cflags, &err);
2686 if (data) {
2687 if ((pr.group_num != num) || ((i != 0) && (io->type == 0) && (io->offset == 0))) {
2688 dnet_read_recover(s, id, io, data, cflags);
2691 *datap = data;
2692 err = 0;
2693 break;
2697 err_out_free:
2698 free(g);
2699 err_out_exit:
2700 return err;
2703 int dnet_get_routes(struct dnet_session *s, struct dnet_id **ids, struct dnet_addr **addrs) {
2705 struct dnet_node *n = s->node;
2706 struct dnet_net_state *st;
2707 struct dnet_group *g;
2708 struct dnet_addr *tmp_addrs;
2709 struct dnet_id *tmp_ids;
2710 int size = 0, count = 0;
2711 int i;
2713 *ids = NULL;
2714 *addrs = NULL;
2716 pthread_mutex_lock(&n->state_lock);
2717 list_for_each_entry(g, &n->group_list, group_entry) {
2718 list_for_each_entry(st, &g->state_list, state_entry) {
2720 size += st->idc->id_num;
2722 tmp_ids = (struct dnet_id *)realloc(*ids, size * sizeof(struct dnet_id));
2723 if (!tmp_ids) {
2724 count = -ENOMEM;
2725 goto err_out_free;
2727 *ids = tmp_ids;
2729 tmp_addrs = (struct dnet_addr *)realloc(*addrs, size * sizeof(struct dnet_addr));
2730 if (!tmp_addrs) {
2731 count = -ENOMEM;
2732 goto err_out_free;
2734 *addrs = tmp_addrs;
2736 for (i = 0; i < st->idc->id_num; ++i) {
2737 dnet_setup_id(&(*ids)[count], g->group_id, st->idc->ids[i].raw.id);
2738 memcpy(&(*addrs)[count], dnet_state_addr(st), sizeof(struct dnet_addr));
2739 count++;
2743 pthread_mutex_unlock(&n->state_lock);
2745 return count;
2747 err_out_free:
2748 if (ids)
2749 free(*ids);
2750 if (addrs)
2751 free(*addrs);
2753 return count;
2757 void *dnet_bulk_read_wait_raw(struct dnet_session *s, struct dnet_id *id, struct dnet_io_attr *ios,
2758 uint32_t io_num, int cmd, uint64_t cflags, int *errp)
2760 struct dnet_node *n = s->node;
2761 struct dnet_io_control ctl;
2762 struct dnet_io_attr io;
2763 struct dnet_wait *w;
2764 struct dnet_read_data_completion *c;
2765 void *data = NULL;
2766 int err;
2768 w = dnet_wait_alloc(0);
2769 if (!w) {
2770 err = -ENOMEM;
2771 goto err_out_exit;
2774 c = malloc(sizeof(*c));
2775 if (!c) {
2776 err = -ENOMEM;
2777 goto err_out_put;
2780 c->w = w;
2781 c->size = 0;
2782 c->data = NULL;
2783 /* one for completion callback, another for this function */
2784 atomic_init(&c->refcnt, 2);
2786 memset(&ctl, 0, sizeof(struct dnet_io_control));
2788 ctl.fd = -1;
2790 ctl.priv = c;
2791 ctl.complete = dnet_read_data_complete;
2793 ctl.cmd = cmd;
2794 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
2796 memcpy(&ctl.id, id, sizeof(struct dnet_id));
2797 memset(&ctl.io, 0, sizeof(struct dnet_io_attr));
2799 memcpy(io.id, id->id, DNET_ID_SIZE);
2800 memcpy(io.parent, id->id, DNET_ID_SIZE);
2802 ctl.io.size = io_num * sizeof(struct dnet_io_attr);
2803 ctl.data = ios;
2805 dnet_wait_get(w);
2806 err = dnet_read_object(s, &ctl);
2807 if (err)
2808 goto err_out_put_complete;
2810 err = dnet_wait_event(w, w->cond, &n->wait_ts);
2811 if (err || w->status) {
2812 char id_str[2*DNET_ID_SIZE + 1];
2813 if (!err)
2814 err = w->status;
2815 if ((cmd != DNET_CMD_READ_RANGE) || (err != -ENOENT))
2816 dnet_log(n, DNET_LOG_ERROR, "%d:%s : failed to read data: %d\n",
2817 ctl.id.group_id, dnet_dump_id_len_raw(ctl.id.id, DNET_ID_SIZE, id_str), err);
2818 goto err_out_put_complete;
2820 err = c->size;
2821 data = c->data;
2823 err_out_put_complete:
2824 if (atomic_dec_and_test(&c->refcnt))
2825 free(c);
2826 err_out_put:
2827 dnet_wait_put(w);
2828 err_out_exit:
2829 *errp = err;
2830 return data;
2834 static int dnet_io_attr_cmp(const void *d1, const void *d2)
2836 const struct dnet_io_attr *io1 = d1;
2837 const struct dnet_io_attr *io2 = d2;
2839 return memcmp(io1->id, io2->id, DNET_ID_SIZE);
2842 struct dnet_range_data *dnet_bulk_read(struct dnet_session *s, struct dnet_io_attr *ios, uint32_t io_num, int group_id, uint64_t cflags, int *errp)
2844 struct dnet_node *n = s->node;
2845 struct dnet_id id, next_id;
2846 int ret_num;
2847 struct dnet_range_data *ret;
2848 struct dnet_net_state *cur, *next = NULL;
2849 uint64_t size = 0;
2850 void *data;
2851 int err;
2852 uint32_t i, start = -1;
2854 if (io_num <= 0) {
2855 return 0;
2858 qsort(ios, io_num, sizeof(struct dnet_io_attr), dnet_io_attr_cmp);
2860 ret = NULL;
2861 ret_num = 0;
2862 size = 0;
2864 dnet_setup_id(&id, group_id, ios[0].id);
2865 id.type = ios[0].type;
2867 cur = dnet_state_get_first(n, &id);
2868 if (!cur) {
2869 dnet_log(n, DNET_LOG_ERROR, "%s: Can't get state for id\n", dnet_dump_id(&id));
2870 err = -ENOENT;
2871 goto err_out_exit;
2874 for (i = 0; i < io_num; ++i) {
2875 if ((i + 1) < io_num) {
2876 dnet_setup_id(&next_id, group_id, ios[i+1].id);
2877 next_id.type = ios[i+1].type;
2879 next = dnet_state_get_first(n, &next_id);
2880 if (!next) {
2881 dnet_log(n, DNET_LOG_ERROR, "%s: Can't get state for id\n", dnet_dump_id(&next_id));
2882 err = -ENOENT;
2883 goto err_out_put;
2886 /* Send command only if state changes or it's a last id */
2887 if ((cur == next)) {
2888 dnet_state_put(next);
2889 next = NULL;
2890 continue;
2894 dnet_log(n, DNET_LOG_NOTICE, "start: %s: end: %s, count: %llu, addr: %s\n",
2895 dnet_dump_id(&id),
2896 dnet_dump_id(&next_id),
2897 (unsigned long long)(i - start),
2898 dnet_state_dump_addr(cur));
2900 data = dnet_bulk_read_wait_raw(s, &id, ios, i - start, DNET_CMD_BULK_READ, cflags, &err);
2901 if (data) {
2902 size = err;
2903 err = 0;
2905 if (!size) {
2906 free(data);
2907 } else {
2908 struct dnet_range_data *new_ret;
2910 ret_num++;
2912 new_ret = realloc(ret, ret_num * sizeof(struct dnet_range_data));
2913 if (!new_ret) {
2914 goto err_out_put;
2917 ret = new_ret;
2919 ret[ret_num - 1].data = data;
2920 ret[ret_num - 1].size = size;
2923 err = 0;
2926 dnet_state_put(cur);
2927 cur = next;
2928 next = NULL;
2929 memcpy(&id, &next_id, sizeof(struct dnet_id));
2932 err_out_put:
2933 if (next)
2934 dnet_state_put(next);
2935 dnet_state_put(cur);
2936 err_out_exit:
2937 if (ret) {
2938 *errp = ret_num;
2939 } else {
2940 *errp = err;
2942 return ret;
2945 struct dnet_range_data dnet_bulk_write(struct dnet_session *s, struct dnet_io_control *ctl, int ctl_num, int *errp)
2947 struct dnet_node *n = s->node;
2948 int err, i, trans_num = 0, local_trans_num;
2949 struct dnet_wait *w;
2950 struct dnet_write_completion *wc;
2951 struct dnet_range_data ret;
2952 struct dnet_metadata_control mcl;
2953 struct dnet_meta_container mc;
2954 struct dnet_io_control meta_ctl;
2955 struct timeval tv;
2956 int *groups = NULL;
2957 int group_num = 0;
2959 memset(&ret, 0, sizeof(ret));
2961 wc = malloc(sizeof(struct dnet_write_completion));
2962 if (!wc) {
2963 err = -ENOMEM;
2964 goto err_out_exit;
2966 memset(wc, 0, sizeof(struct dnet_write_completion));
2968 w = dnet_wait_alloc(0);
2969 if (!w) {
2970 err = -ENOMEM;
2971 free(wc);
2972 goto err_out_exit;
2974 wc->wait = w;
2976 atomic_set(&w->refcnt, INT_MAX);
2977 w->status = -ENOENT;
2979 for (i = 0; i < ctl_num; ++i) {
2980 ctl[i].priv = wc;
2981 ctl[i].complete = dnet_write_complete;
2983 ctl[i].cmd = DNET_CMD_WRITE;
2984 ctl[i].cflags = DNET_FLAGS_NEED_ACK;
2986 memcpy(ctl[i].io.id, ctl[i].id.id, DNET_ID_SIZE);
2987 memcpy(ctl[i].io.parent, ctl[i].id.id, DNET_ID_SIZE);
2989 local_trans_num = dnet_write_object(s, &ctl[i]);
2990 if (local_trans_num < 0)
2991 local_trans_num = 0;
2993 trans_num += local_trans_num;
2995 /* Prepare and send metadata */
2996 memset(&mcl, 0, sizeof(mcl));
2998 group_num = s->group_num;
2999 groups = alloca(group_num * sizeof(int));
3000 memcpy(groups, s->groups, group_num * sizeof(int));
3002 mcl.groups = groups;
3003 mcl.group_num = group_num;
3004 mcl.id = ctl[i].id;
3005 mcl.cflags = ctl[i].cflags;
3007 gettimeofday(&tv, NULL);
3008 mcl.ts.tv_sec = tv.tv_sec;
3009 mcl.ts.tv_nsec = tv.tv_usec * 1000;
3011 memset(&mc, 0, sizeof(mc));
3013 err = dnet_create_metadata(s, &mcl, &mc);
3014 dnet_log(n, DNET_LOG_DEBUG, "Creating metadata: err: %d", err);
3015 if (!err) {
3016 dnet_convert_metadata(n, mc.data, mc.size);
3018 memset(&meta_ctl, 0, sizeof(struct dnet_io_control));
3020 meta_ctl.priv = wc;
3021 meta_ctl.complete = dnet_write_complete;
3022 meta_ctl.cmd = DNET_CMD_WRITE;
3023 meta_ctl.fd = -1;
3025 meta_ctl.cflags = ctl[i].cflags;
3027 memcpy(&meta_ctl.id, &ctl[i].id, sizeof(struct dnet_id));
3028 memcpy(meta_ctl.io.id, ctl[i].id.id, DNET_ID_SIZE);
3029 memcpy(meta_ctl.io.parent, ctl[i].id.id, DNET_ID_SIZE);
3030 meta_ctl.id.type = meta_ctl.io.type = EBLOB_TYPE_META;
3032 meta_ctl.io.flags |= DNET_IO_FLAGS_META;
3033 meta_ctl.io.offset = 0;
3034 meta_ctl.io.size = mc.size;
3035 meta_ctl.data = mc.data;
3037 local_trans_num = dnet_write_object(s, &meta_ctl);
3038 if (local_trans_num < 0)
3039 local_trans_num = 0;
3041 trans_num += local_trans_num;
3046 * 1 - the first reference counter we grabbed at allocation time
3048 atomic_sub(&w->refcnt, INT_MAX - trans_num - 1);
3050 err = dnet_wait_event(w, w->cond == trans_num, &n->wait_ts);
3051 if (err || w->status) {
3052 if (!err)
3053 err = w->status;
3054 dnet_log(n, DNET_LOG_NOTICE, "%s: failed to wait for IO write completion, err: %d, status: %d.\n",
3055 dnet_dump_id(&ctl->id), err, w->status);
3058 if (err || !trans_num) {
3059 if (!err)
3060 err = -EINVAL;
3061 dnet_log(n, DNET_LOG_ERROR, "Failed to write data into the storage, err: %d, trans_num: %d.\n", err, trans_num);
3062 goto err_out_put;
3065 if (trans_num)
3066 dnet_log(n, DNET_LOG_NOTICE, "%s: successfully wrote %llu bytes into the storage, reply size: %d.\n",
3067 dnet_dump_id(&ctl->id), (unsigned long long)ctl->io.size, wc->size);
3068 err = trans_num;
3070 ret.data = wc->reply;
3071 ret.size = wc->size;
3073 wc->reply = NULL;
3075 err_out_put:
3076 dnet_write_complete_free(wc);
3077 err_out_exit:
3078 *errp = err;
3079 return ret;
3082 int dnet_flags(struct dnet_node *n)
3084 return n->flags;
3087 static int dnet_start_defrag_complete(struct dnet_net_state *state, struct dnet_cmd *cmd, void *priv)
3089 struct dnet_wait *w = priv;
3091 if (is_trans_destroyed(state, cmd)) {
3092 dnet_wakeup(w, w->cond++);
3093 dnet_wait_put(w);
3094 return 0;
3097 return 0;
3100 static int dnet_start_defrag_single(struct dnet_net_state *st, void *priv, uint64_t cflags)
3102 struct dnet_trans_control ctl;
3104 memset(&ctl, 0, sizeof(struct dnet_trans_control));
3106 dnet_setup_id(&ctl.id, st->idc->group->group_id, st->idc->ids[0].raw.id);
3107 ctl.cmd = DNET_CMD_DEFRAG;
3108 ctl.complete = dnet_start_defrag_complete;
3109 ctl.priv = priv;
3110 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
3112 return dnet_trans_alloc_send_state(st, &ctl);
3115 int dnet_start_defrag(struct dnet_session *s, uint64_t cflags)
3117 struct dnet_node *n = s->node;
3118 struct dnet_net_state *st;
3119 struct dnet_wait *w;
3120 struct dnet_group *g;
3121 int num = 0;
3122 int err;
3124 w = dnet_wait_alloc(0);
3125 if (!w) {
3126 err = -ENOMEM;
3127 goto err_out_exit;
3130 pthread_mutex_lock(&n->state_lock);
3131 list_for_each_entry(g, &n->group_list, group_entry) {
3132 list_for_each_entry(st, &g->state_list, state_entry) {
3133 if (st == n->st)
3134 continue;
3136 if (w)
3137 dnet_wait_get(w);
3139 dnet_start_defrag_single(st, w, cflags);
3140 num++;
3143 pthread_mutex_unlock(&n->state_lock);
3145 err = dnet_wait_event(w, w->cond == num, &n->wait_ts);
3146 dnet_wait_put(w);
3148 err_out_exit:
3149 return err;