Elliptics version update: 2.18.3.2
[elliptics.git] / library / dnet_common.c
blob1f4ffd06bf2f376c17f1792eb3ee179f4a2f6f3a
1 /*
2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
17 #include <sys/stat.h>
18 #include <sys/socket.h>
19 #include <sys/mman.h>
20 #include <sys/wait.h>
22 #include <ctype.h>
23 #include <fcntl.h>
24 #include <limits.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <unistd.h>
29 #include "elliptics.h"
31 #include "elliptics/packet.h"
32 #include "elliptics/interface.h"
35 int dnet_transform(struct dnet_node *n, const void *src, uint64_t size, struct dnet_id *id)
37 struct dnet_transform *t = &n->transform;
38 unsigned int csize = sizeof(id->id);
40 return t->transform(t->priv, src, size, id->id, &csize, 0);
44 static char *dnet_cmd_strings[] = {
45 [DNET_CMD_LOOKUP] = "LOOKUP",
46 [DNET_CMD_REVERSE_LOOKUP] = "REVERSE_LOOKUP",
47 [DNET_CMD_JOIN] = "JOIN",
48 [DNET_CMD_WRITE] = "WRITE",
49 [DNET_CMD_READ] = "READ",
50 [DNET_CMD_LIST] = "CHECK",
51 [DNET_CMD_EXEC] = "EXEC",
52 [DNET_CMD_ROUTE_LIST] = "ROUTE_LIST",
53 [DNET_CMD_STAT] = "STAT",
54 [DNET_CMD_NOTIFY] = "NOTIFY",
55 [DNET_CMD_DEL] = "REMOVE",
56 [DNET_CMD_STAT_COUNT] = "STAT_COUNT",
57 [DNET_CMD_STATUS] = "STATUS",
58 [DNET_CMD_READ_RANGE] = "READ_RANGE",
59 [DNET_CMD_DEL_RANGE] = "DEL_RANGE",
60 [DNET_CMD_AUTH] = "AUTH",
61 [DNET_CMD_BULK_READ] = "BULK_READ",
62 [DNET_CMD_UNKNOWN] = "UNKNOWN",
65 static char *dnet_counter_strings[] = {
66 [DNET_CNTR_LA1] = "DNET_CNTR_LA1",
67 [DNET_CNTR_LA5] = "DNET_CNTR_LA5",
68 [DNET_CNTR_LA15] = "DNET_CNTR_LA15",
69 [DNET_CNTR_BSIZE] = "DNET_CNTR_BSIZE",
70 [DNET_CNTR_FRSIZE] = "DNET_CNTR_FRSIZE",
71 [DNET_CNTR_BLOCKS] = "DNET_CNTR_BLOCKS",
72 [DNET_CNTR_BFREE] = "DNET_CNTR_BFREE",
73 [DNET_CNTR_BAVAIL] = "DNET_CNTR_BAVAIL",
74 [DNET_CNTR_FILES] = "DNET_CNTR_FILES",
75 [DNET_CNTR_FFREE] = "DNET_CNTR_FFREE",
76 [DNET_CNTR_FAVAIL] = "DNET_CNTR_FAVAIL",
77 [DNET_CNTR_FSID] = "DNET_CNTR_FSID",
78 [DNET_CNTR_VM_ACTIVE] = "DNET_CNTR_VM_ACTIVE",
79 [DNET_CNTR_VM_INACTIVE] = "DNET_CNTR_VM_INACTIVE",
80 [DNET_CNTR_VM_TOTAL] = "DNET_CNTR_VM_TOTAL",
81 [DNET_CNTR_VM_FREE] = "DNET_CNTR_VM_FREE",
82 [DNET_CNTR_VM_CACHED] = "DNET_CNTR_VM_CACHED",
83 [DNET_CNTR_VM_BUFFERS] = "DNET_CNTR_VM_BUFFERS",
84 [DNET_CNTR_NODE_FILES] = "DNET_CNTR_NODE_FILES",
85 [DNET_CNTR_NODE_LAST_MERGE] = "DNET_CNTR_NODE_LAST_MERGE",
86 [DNET_CNTR_NODE_CHECK_COPY] = "DNET_CNTR_NODE_CHECK_COPY",
87 [DNET_CNTR_DBR_NOREC] = "DNET_CNTR_DBR_NOREC",
88 [DNET_CNTR_DBR_SYSTEM] = "DNET_CNTR_DBR_SYSTEM",
89 [DNET_CNTR_DBR_ERROR] = "DNET_CNTR_DBR_ERROR",
90 [DNET_CNTR_DBW_SYSTEM] = "DNET_CNTR_DBW_SYSTEM",
91 [DNET_CNTR_DBW_ERROR] = "DNET_CNTR_DBW_ERROR",
92 [DNET_CNTR_UNKNOWN] = "UNKNOWN",
95 char *dnet_cmd_string(int cmd)
97 if (cmd <= 0 || cmd >= __DNET_CMD_MAX)
98 cmd = DNET_CMD_UNKNOWN;
100 return dnet_cmd_strings[cmd];
103 char *dnet_counter_string(int cntr, int cmd_num)
105 if (cntr <= 0 || cntr >= __DNET_CNTR_MAX)
106 cntr = DNET_CNTR_UNKNOWN;
108 if (cntr < cmd_num)
109 return dnet_cmd_string(cntr);
111 if (cntr >= cmd_num && cntr < (cmd_num * 2))
112 return dnet_cmd_string(cntr - cmd_num);
114 return dnet_counter_strings[cntr];
117 static int dnet_add_received_state(struct dnet_node *n, struct dnet_addr_attr *a,
118 int group_id, struct dnet_raw_id *ids, int id_num)
120 int s, err = 0;
121 struct dnet_net_state *nst;
122 struct dnet_id raw;
123 int join;
125 dnet_setup_id(&raw, group_id, ids[0].id);
127 nst = dnet_state_search_by_addr(n, &a->addr);
128 if (nst) {
129 err = -EEXIST;
130 dnet_state_put(nst);
131 goto err_out_exit;
134 s = dnet_socket_create_addr(n, a->sock_type, a->proto, a->family,
135 (struct sockaddr *)&a->addr.addr, a->addr.addr_len, 0);
136 if (s < 0) {
137 err = s;
138 goto err_out_exit;
141 join = DNET_WANT_RECONNECT;
142 if (n->flags & DNET_CFG_JOIN_NETWORK)
143 join = DNET_JOIN;
145 nst = dnet_state_create(n, group_id, ids, id_num, &a->addr, s, &err, join, dnet_state_net_process);
146 if (!nst)
147 goto err_out_close;
149 dnet_log(n, DNET_LOG_NOTICE, "%d: added received state %s.\n",
150 group_id, dnet_state_dump_addr(nst));
152 return 0;
154 err_out_close:
155 dnet_sock_close(s);
156 err_out_exit:
157 return err;
160 static int dnet_process_addr_attr(struct dnet_net_state *st, struct dnet_addr_attr *a, int group_id, int num)
162 struct dnet_node *n = st->n;
163 struct dnet_raw_id *ids;
164 int i, err;
166 ids = (struct dnet_raw_id *)(a + 1);
167 for (i=0; i<num; ++i)
168 dnet_convert_raw_id(&ids[0]);
170 err = dnet_add_received_state(n, a, group_id, ids, num);
171 dnet_log(n, DNET_LOG_DEBUG, "%s: route list: %d entries: %d.\n", dnet_server_convert_dnet_addr(&a->addr), num, err);
173 return err;
176 static int dnet_recv_route_list_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
178 struct dnet_wait *w = priv;
179 struct dnet_addr_attr *a;
180 long size;
181 int err, num;
183 if (is_trans_destroyed(st, cmd)) {
184 err = -EINVAL;
185 if (cmd)
186 err = cmd->status;
188 w->status = err;
189 dnet_wakeup(w, w->cond = 1);
190 dnet_wait_put(w);
191 goto err_out_exit;
195 err = cmd->status;
196 if (!cmd->size || err)
197 goto err_out_exit;
199 size = cmd->size + sizeof(struct dnet_cmd);
200 if (size < (signed)sizeof(struct dnet_addr_cmd)) {
201 err = -EINVAL;
202 goto err_out_exit;
205 num = (cmd->size - sizeof(struct dnet_addr_attr)) / sizeof(struct dnet_raw_id);
206 if (!num) {
207 err = -EINVAL;
208 goto err_out_exit;
211 a = (struct dnet_addr_attr *)(cmd + 1);
212 dnet_convert_addr_attr(a);
214 err = dnet_process_addr_attr(st, a, cmd->id.group_id, num);
216 err_out_exit:
217 return err;
220 int dnet_recv_route_list(struct dnet_net_state *st)
222 struct dnet_io_req req;
223 struct dnet_node *n = st->n;
224 struct dnet_trans *t;
225 struct dnet_cmd *cmd;
226 struct dnet_wait *w;
227 int err;
229 w = dnet_wait_alloc(0);
230 if (!w) {
231 err = -ENOMEM;
232 goto err_out_exit;
235 t = dnet_trans_alloc(n, sizeof(struct dnet_cmd));
236 if (!t) {
237 err = -ENOMEM;
238 goto err_out_wait_put;
241 t->complete = dnet_recv_route_list_complete;
242 t->priv = w;
244 cmd = (struct dnet_cmd *)(t + 1);
246 cmd->flags = DNET_FLAGS_NEED_ACK | DNET_FLAGS_DIRECT | DNET_FLAGS_NOLOCK;
247 cmd->status = 0;
249 memcpy(&t->cmd, cmd, sizeof(struct dnet_cmd));
251 cmd->cmd = t->command = DNET_CMD_ROUTE_LIST;
253 t->st = dnet_state_get(st);
254 cmd->trans = t->rcv_trans = t->trans = atomic_inc(&n->trans);
256 dnet_convert_cmd(cmd);
258 dnet_log(n, DNET_LOG_DEBUG, "%s: list route request to %s.\n", dnet_dump_id(&cmd->id),
259 dnet_server_convert_dnet_addr(&st->addr));
261 memset(&req, 0, sizeof(req));
262 req.st = st;
263 req.header = cmd;
264 req.hsize = sizeof(struct dnet_cmd);
266 dnet_wait_get(w);
267 err = dnet_trans_send(t, &req);
268 if (err)
269 goto err_out_destroy;
271 err = dnet_wait_event(w, w->cond != 0, &n->wait_ts);
272 dnet_wait_put(w);
274 return 0;
276 err_out_destroy:
277 dnet_trans_put(t);
278 err_out_wait_put:
279 dnet_wait_put(w);
280 err_out_exit:
281 return err;
284 static struct dnet_net_state *dnet_add_state_socket(struct dnet_node *n, struct dnet_addr *addr, int s, int *errp, int join)
286 struct dnet_net_state *st, dummy;
287 char buf[sizeof(struct dnet_addr_cmd)];
288 struct dnet_cmd *cmd;
289 int err, num, i, size;
290 struct dnet_raw_id *ids;
292 memset(buf, 0, sizeof(buf));
294 cmd = (struct dnet_cmd *)(buf);
296 cmd->flags = DNET_FLAGS_DIRECT | DNET_FLAGS_NOLOCK;
297 cmd->cmd = DNET_CMD_REVERSE_LOOKUP;
299 dnet_convert_cmd(cmd);
301 st = &dummy;
302 memset(st, 0, sizeof(struct dnet_net_state));
304 st->write_s = st->read_s = s;
305 st->n = n;
307 err = dnet_send_nolock(st, buf, sizeof(struct dnet_cmd));
308 if (err) {
309 dnet_log(n, DNET_LOG_ERROR, "Failed to send reverse "
310 "lookup message to %s, err: %d.\n",
311 dnet_server_convert_dnet_addr(addr), err);
312 goto err_out_exit;
315 err = dnet_recv(st, buf, sizeof(buf));
316 if (err) {
317 dnet_log(n, DNET_LOG_ERROR, "Failed to receive reverse "
318 "lookup headers from %s, err: %d.\n",
319 dnet_server_convert_dnet_addr(addr), err);
320 goto err_out_exit;
323 cmd = (struct dnet_cmd *)(buf);
325 dnet_convert_addr_cmd((struct dnet_addr_cmd *)buf);
327 size = cmd->size - sizeof(struct dnet_addr_attr);
328 num = size / sizeof(struct dnet_raw_id);
330 dnet_log(n, DNET_LOG_DEBUG, "%s: waiting for %d ids\n", dnet_dump_id(&cmd->id), num);
332 ids = malloc(size);
333 if (!ids) {
334 err = -ENOMEM;
335 goto err_out_exit;
338 err = dnet_recv(st, ids, size);
339 if (err) {
340 dnet_log(n, DNET_LOG_ERROR, "Failed to receive reverse "
341 "lookup body (%llu bytes) from %s, err: %d.\n",
342 (unsigned long long)cmd->size,
343 dnet_server_convert_dnet_addr(addr), err);
344 goto err_out_exit;
347 for (i=0; i<num; ++i)
348 dnet_convert_raw_id(&ids[i]);
350 st = dnet_state_create(n, cmd->id.group_id, ids, num, addr, s, &err, join, dnet_state_net_process);
351 if (!st) {
352 /* socket is already closed */
353 s = -1;
354 goto err_out_free;
356 free(ids);
358 return st;
360 err_out_free:
361 free(ids);
362 err_out_exit:
363 *errp = err;
364 if (s >= 0)
365 dnet_sock_close(s);
366 return NULL;
369 int dnet_add_state(struct dnet_node *n, struct dnet_config *cfg)
371 int s, err, join = DNET_WANT_RECONNECT;
372 struct dnet_addr addr;
373 struct dnet_net_state *st;
375 memset(&addr, 0, sizeof(addr));
377 addr.addr_len = sizeof(addr.addr);
378 s = dnet_socket_create(n, cfg, &addr, 0);
379 if (s < 0) {
380 err = s;
381 goto err_out_reconnect;
384 if (n->flags & DNET_CFG_JOIN_NETWORK)
385 join = DNET_JOIN;
387 /* will close socket on error */
388 st = dnet_add_state_socket(n, &addr, s, &err, join);
389 if (!st)
390 goto err_out_reconnect;
392 if (!(cfg->flags & DNET_CFG_NO_ROUTE_LIST))
393 dnet_recv_route_list(st);
395 return 0;
397 err_out_reconnect:
398 /* if state is already exist, it should not be an error */
399 if (err == -EEXIST)
400 err = 0;
402 if ((err == -EADDRINUSE) || (err == -ECONNREFUSED) || (err == -ECONNRESET) ||
403 (err == -EINPROGRESS) || (err == -EAGAIN))
404 dnet_add_reconnect_state(n, &addr, join);
405 return err;
408 struct dnet_write_completion {
409 void *reply;
410 int size;
411 struct dnet_wait *wait;
414 static void dnet_write_complete_free(struct dnet_write_completion *wc)
416 if (atomic_dec_and_test(&wc->wait->refcnt)) {
417 dnet_wait_destroy(wc->wait);
418 free(wc->reply);
419 free(wc);
423 static int dnet_write_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
425 int err = -EINVAL;
426 struct dnet_write_completion *wc = priv;
427 struct dnet_wait *w = wc->wait;
429 if (is_trans_destroyed(st, cmd)) {
430 dnet_wakeup(w, w->cond++);
431 dnet_write_complete_free(wc);
432 return 0;
435 err = cmd->status;
436 if (!err && st && (cmd->size > sizeof(struct dnet_addr_attr) + sizeof(struct dnet_file_info))) {
437 int old_size = wc->size;
438 void *data;
440 wc->size += cmd->size + sizeof(struct dnet_cmd) + sizeof(struct dnet_addr);
441 wc->reply = realloc(wc->reply, wc->size);
442 if (!wc->reply) {
443 err = -ENOMEM;
444 goto err_out_exit;
447 data = wc->reply + old_size;
449 memcpy(data, &st->addr, sizeof(struct dnet_addr));
450 memcpy(data + sizeof(struct dnet_addr), cmd, sizeof(struct dnet_cmd));
451 memcpy(data + sizeof(struct dnet_addr) + sizeof(struct dnet_cmd), cmd + 1, cmd->size);
454 err_out_exit:
455 pthread_mutex_lock(&w->wait_lock);
456 if (w->status < 0)
457 w->status = err;
458 pthread_mutex_unlock(&w->wait_lock);
460 return 0;
463 static struct dnet_trans *dnet_io_trans_create(struct dnet_node *n, struct dnet_io_control *ctl, int *errp)
465 struct dnet_io_req req;
466 struct dnet_trans *t = NULL;
467 struct dnet_io_attr *io;
468 struct dnet_cmd *cmd;
469 uint64_t size = ctl->io.size;
470 uint64_t tsize = sizeof(struct dnet_io_attr) + sizeof(struct dnet_cmd);
471 int err;
473 if (ctl->cmd == DNET_CMD_READ)
474 size = 0;
476 if (ctl->fd < 0 && size < DNET_COPY_IO_SIZE)
477 tsize += size;
479 t = dnet_trans_alloc(n, tsize);
480 if (!t) {
481 err = -ENOMEM;
482 goto err_out_complete;
484 t->complete = ctl->complete;
485 t->priv = ctl->priv;
487 cmd = (struct dnet_cmd *)(t + 1);
488 io = (struct dnet_io_attr *)(cmd + 1);
490 if (ctl->fd < 0 && size < DNET_COPY_IO_SIZE) {
491 if (size) {
492 void *data = io + 1;
493 memcpy(data, ctl->data, size);
497 memcpy(&cmd->id, &ctl->id, sizeof(struct dnet_id));
498 cmd->size = sizeof(struct dnet_io_attr) + size;
499 cmd->flags = ctl->cflags;
500 cmd->status = 0;
502 cmd->cmd = t->command = ctl->cmd;
504 memcpy(io, &ctl->io, sizeof(struct dnet_io_attr));
505 memcpy(&t->cmd, cmd, sizeof(struct dnet_cmd));
507 t->st = dnet_state_get_first(n, &cmd->id);
508 if (!t->st) {
509 err = -ENOENT;
510 goto err_out_destroy;
513 cmd->trans = t->rcv_trans = t->trans = atomic_inc(&n->trans);
515 dnet_log(n, DNET_LOG_INFO, "%s: created trans: %llu, cmd: %s, cflags: %llx, size: %llu, offset: %llu, "
516 "fd: %d, local_offset: %llu -> %s weight: %f, mrt: %ld.\n",
517 dnet_dump_id(&ctl->id),
518 (unsigned long long)t->trans,
519 dnet_cmd_string(ctl->cmd), (unsigned long long)cmd->flags,
520 (unsigned long long)ctl->io.size, (unsigned long long)ctl->io.offset,
521 ctl->fd,
522 (unsigned long long)ctl->local_offset,
523 dnet_server_convert_dnet_addr(&t->st->addr), t->st->weight, t->st->median_read_time);
525 dnet_convert_cmd(cmd);
526 dnet_convert_io_attr(io);
529 memset(&req, 0, sizeof(req));
530 req.st = t->st;
531 req.header = cmd;
532 req.hsize = tsize;
534 req.fd = ctl->fd;
536 if (ctl->fd >= 0) {
537 req.local_offset = ctl->local_offset;
538 req.fsize = size;
539 } else if (size >= DNET_COPY_IO_SIZE) {
540 req.data = (void *)ctl->data;
541 req.dsize = size;
544 err = dnet_trans_send(t, &req);
545 if (err)
546 goto err_out_destroy;
548 return t;
550 err_out_complete:
551 if (ctl->complete)
552 ctl->complete(NULL, NULL, ctl->priv);
553 *errp = err;
554 return NULL;
556 err_out_destroy:
557 dnet_trans_put(t);
558 *errp = err;
559 return NULL;
562 int dnet_trans_create_send_all(struct dnet_node *n, struct dnet_io_control *ctl)
564 int num = 0, i, err;
566 pthread_mutex_lock(&n->group_lock);
567 for (i=0; i<n->group_num; ++i) {
568 ctl->id.group_id = n->groups[i];
570 dnet_io_trans_create(n, ctl, &err);
571 num++;
573 pthread_mutex_unlock(&n->group_lock);
575 if (!num) {
576 dnet_io_trans_create(n, ctl, &err);
577 num++;
580 return num;
583 int dnet_write_object(struct dnet_node *n, struct dnet_io_control *ctl)
585 return dnet_trans_create_send_all(n, ctl);
588 static int dnet_write_file_id_raw(struct dnet_node *n, const char *file, struct dnet_id *id,
589 uint64_t local_offset, uint64_t remote_offset, uint64_t size,
590 uint64_t cflags, unsigned int ioflags)
592 int fd, err, trans_num;
593 struct stat stat;
594 struct dnet_wait *w;
595 struct dnet_io_control ctl;
596 struct dnet_write_completion *wc;
598 wc = malloc(sizeof(struct dnet_write_completion));
599 if (!wc) {
600 err = -ENOMEM;
601 goto err_out_exit;
603 memset(wc, 0, sizeof(struct dnet_write_completion));
605 w = dnet_wait_alloc(0);
606 if (!w) {
607 free(wc);
608 err = -ENOMEM;
609 dnet_log(n, DNET_LOG_ERROR, "Failed to allocate read waiting structure.\n");
610 goto err_out_exit;
613 wc->wait = w;
615 fd = open(file, O_RDONLY | O_LARGEFILE | O_CLOEXEC);
616 if (fd < 0) {
617 err = -errno;
618 dnet_log_err(n, "Failed to open to be written file '%s'", file);
619 goto err_out_put;
622 err = fstat(fd, &stat);
623 if (err) {
624 err = -errno;
625 dnet_log_err(n, "Failed to stat to be written file '%s'", file);
626 goto err_out_close;
629 if (local_offset >= (uint64_t)stat.st_size) {
630 err = 0;
631 goto err_out_close;
634 if (!size || size + local_offset >= (uint64_t)stat.st_size)
635 size = stat.st_size - local_offset;
637 memset(&ctl, 0, sizeof(struct dnet_io_control));
639 atomic_set(&w->refcnt, INT_MAX);
641 ctl.data = NULL;
642 ctl.fd = fd;
643 ctl.local_offset = local_offset;
645 w->status = -ENOENT;
646 ctl.complete = dnet_write_complete;
647 ctl.priv = wc;
649 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
650 ctl.cmd = DNET_CMD_WRITE;
652 memcpy(ctl.io.id, id->id, DNET_ID_SIZE);
653 memcpy(ctl.io.parent, id->id, DNET_ID_SIZE);
655 ctl.io.flags = ioflags;
656 ctl.io.size = size;
657 ctl.io.offset = remote_offset;
658 ctl.io.type = id->type;
660 memcpy(&ctl.id, id, sizeof(struct dnet_id));
662 trans_num = dnet_write_object(n, &ctl);
663 if (trans_num < 0)
664 trans_num = 0;
667 * 1 - the first reference counter we grabbed at allocation time
669 atomic_sub(&w->refcnt, INT_MAX - trans_num - 1);
671 err = dnet_wait_event(w, w->cond == trans_num, &n->wait_ts);
672 if (err || w->status) {
673 if (!err)
674 err = w->status;
677 if (!err && !trans_num)
678 err = -EINVAL;
680 if (err) {
681 dnet_log(n, DNET_LOG_ERROR, "Failed to write file '%s' into the storage, transactions: %d, err: %d.\n", file, trans_num, err);
682 goto err_out_close;
685 dnet_log(n, DNET_LOG_NOTICE, "Successfully wrote file: '%s' into the storage, size: %llu.\n",
686 file, (unsigned long long)size);
688 close(fd);
689 dnet_write_complete_free(wc);
691 return 0;
693 err_out_close:
694 close(fd);
695 err_out_put:
696 dnet_write_complete_free(wc);
697 err_out_exit:
698 return err;
701 int dnet_write_file_id(struct dnet_node *n, const char *file, struct dnet_id *id, uint64_t local_offset,
702 uint64_t remote_offset, uint64_t size, uint64_t cflags, unsigned int ioflags)
704 int err = dnet_write_file_id_raw(n, file, id, local_offset, remote_offset, size, cflags, ioflags);
705 if (!err && !(ioflags & DNET_IO_FLAGS_CACHE_ONLY))
706 err = dnet_create_write_metadata_strings(n, NULL, 0, id, NULL, cflags);
708 return err;
711 int dnet_write_file(struct dnet_node *n, const char *file, const void *remote, int remote_len,
712 uint64_t local_offset, uint64_t remote_offset, uint64_t size,
713 uint64_t cflags, unsigned int ioflags, int type)
715 int err;
716 struct dnet_id id;
718 dnet_transform(n, remote, remote_len, &id);
719 id.type = type;
721 err = dnet_write_file_id_raw(n, file, &id, local_offset, remote_offset, size, cflags, ioflags);
722 if (!err && !(ioflags & DNET_IO_FLAGS_CACHE_ONLY))
723 err = dnet_create_write_metadata_strings(n, remote, remote_len, &id, NULL, cflags);
725 return err;
728 static int dnet_read_file_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
730 int fd, err;
731 struct dnet_node *n;
732 struct dnet_io_completion *c = priv;
733 struct dnet_io_attr *io;
734 void *data;
736 if (is_trans_destroyed(st, cmd)) {
737 if (c->wait) {
738 int err = 1;
739 if (cmd && cmd->status)
740 err = cmd->status;
742 dnet_wakeup(c->wait, c->wait->cond = err);
743 dnet_wait_put(c->wait);
746 free(c);
747 return 0;
750 n = st->n;
752 if (cmd->status != 0 || cmd->size == 0) {
753 err = cmd->status;
754 goto err_out_exit_no_log;
757 if (cmd->size <= sizeof(struct dnet_io_attr)) {
758 dnet_log(n, DNET_LOG_ERROR, "%s: read completion error: wrong size: cmd_size: %llu, must be more than %zu.\n",
759 dnet_dump_id(&cmd->id), (unsigned long long)cmd->size,
760 sizeof(struct dnet_io_attr));
761 err = -EINVAL;
762 goto err_out_exit_no_log;
765 io = (struct dnet_io_attr *)(cmd + 1);
766 data = io + 1;
768 dnet_convert_io_attr(io);
770 fd = open(c->file, O_RDWR | O_CREAT | O_CLOEXEC, 0644);
771 if (fd < 0) {
772 err = -errno;
773 dnet_log_err(n, "%s: failed to open read completion file '%s'", dnet_dump_id(&cmd->id), c->file);
774 goto err_out_exit;
777 err = pwrite(fd, data, io->size, c->offset);
778 if (err <= 0) {
779 err = -errno;
780 dnet_log_err(n, "%s: failed to write data into completion file '%s'", dnet_dump_id(&cmd->id), c->file);
781 goto err_out_close;
784 close(fd);
785 dnet_log(n, DNET_LOG_NOTICE, "%s: read completed: file: '%s', offset: %llu, size: %llu, status: %d.\n",
786 dnet_dump_id(&cmd->id), c->file, (unsigned long long)c->offset,
787 (unsigned long long)io->size, cmd->status);
789 return cmd->status;
791 err_out_close:
792 close(fd);
793 err_out_exit:
794 dnet_log(n, DNET_LOG_ERROR, "%s: read completed: file: '%s', offset: %llu, size: %llu, status: %d, err: %d.\n",
795 dnet_dump_id(&cmd->id), c->file, (unsigned long long)io->offset,
796 (unsigned long long)io->size, cmd->status, err);
797 err_out_exit_no_log:
798 dnet_wakeup(c->wait, c->wait->cond = err ? err : 1);
799 return err;
802 int dnet_read_object(struct dnet_node *n, struct dnet_io_control *ctl)
804 int err;
806 if (!dnet_io_trans_create(n, ctl, &err))
807 return err;
809 return 0;
812 static int dnet_read_file_raw_exec(struct dnet_node *n, const char *file, unsigned int len,
813 uint64_t write_offset, uint64_t io_offset, uint64_t io_size,
814 struct dnet_id *id, struct dnet_wait *w)
816 struct dnet_io_control ctl;
817 struct dnet_io_completion *c;
818 int err, wait_init = ~0;
820 memset(&ctl, 0, sizeof(struct dnet_io_control));
822 ctl.io.size = io_size;
823 ctl.io.offset = io_offset;
825 ctl.io.type = id->type;
827 memcpy(ctl.io.parent, id->id, DNET_ID_SIZE);
828 memcpy(ctl.io.id, id->id, DNET_ID_SIZE);
830 memcpy(&ctl.id, id, sizeof(struct dnet_id));
832 ctl.fd = -1;
833 ctl.complete = dnet_read_file_complete;
834 ctl.cmd = DNET_CMD_READ;
835 ctl.cflags = DNET_FLAGS_NEED_ACK;
837 c = malloc(sizeof(struct dnet_io_completion) + len + 1 + sizeof(DNET_HISTORY_SUFFIX));
838 if (!c) {
839 dnet_log(n, DNET_LOG_ERROR, "%s: failed to allocate IO completion structure "
840 "for '%s' file reading.\n",
841 dnet_dump_id(&ctl.id), file);
842 err = -ENOMEM;
843 goto err_out_exit;
846 memset(c, 0, sizeof(struct dnet_io_completion) + len + 1 + sizeof(DNET_HISTORY_SUFFIX));
848 c->wait = dnet_wait_get(w);
849 c->offset = write_offset;
850 c->file = (char *)(c + 1);
852 sprintf(c->file, "%s", file);
854 ctl.priv = c;
856 w->cond = wait_init;
857 err = dnet_read_object(n, &ctl);
858 if (err)
859 goto err_out_exit;
861 err = dnet_wait_event(w, w->cond != wait_init, &n->wait_ts);
862 if ((err < 0) || (w->cond < 0)) {
863 char id_str[2*DNET_ID_SIZE + 1];
864 if (!err)
865 err = w->cond;
866 dnet_log(n, DNET_LOG_ERROR, "%d:%s '%s' : failed to read data: %d\n",
867 ctl.id.group_id, dnet_dump_id_len_raw(ctl.id.id, DNET_ID_SIZE, id_str),
868 file, err);
869 goto err_out_exit;
872 return 0;
874 err_out_exit:
875 return err;
878 static int dnet_read_file_raw(struct dnet_node *n, const char *file, struct dnet_id *id, uint64_t offset, uint64_t size)
880 int err = -ENOENT, len = strlen(file), i;
881 struct dnet_wait *w;
882 int *g, num;
884 w = dnet_wait_alloc(~0);
885 if (!w) {
886 err = -ENOMEM;
887 dnet_log(n, DNET_LOG_ERROR, "Failed to allocate read waiting.\n");
888 goto err_out_exit;
891 if (!size)
892 size = ~0ULL;
894 num = dnet_mix_states(n, id, &g);
895 if (num < 0) {
896 err = num;
897 goto err_out_exit;
900 for (i=0; i<num; ++i) {
901 id->group_id = g[i];
903 err = dnet_read_file_raw_exec(n, file, len, 0, offset, size, id, w);
904 if (err)
905 continue;
907 break;
910 dnet_wait_put(w);
911 free(g);
913 err_out_exit:
914 return err;
917 int dnet_read_file_id(struct dnet_node *n, const char *file, struct dnet_id *id, uint64_t offset, uint64_t size)
919 return dnet_read_file_raw(n, file, id, offset, size);
922 int dnet_read_file(struct dnet_node *n, const char *file, const void *remote, int remote_size,
923 uint64_t offset, uint64_t size, int type)
925 struct dnet_id id;
927 dnet_transform(n, remote, remote_size, &id);
928 id.type = type;
930 return dnet_read_file_raw(n, file, &id, offset, size);
933 struct dnet_wait *dnet_wait_alloc(int cond)
935 int err;
936 struct dnet_wait *w;
938 w = malloc(sizeof(struct dnet_wait));
939 if (!w) {
940 err = -ENOMEM;
941 goto err_out_exit;
944 memset(w, 0, sizeof(struct dnet_wait));
946 err = pthread_cond_init(&w->wait, NULL);
947 if (err)
948 goto err_out_exit;
950 err = pthread_mutex_init(&w->wait_lock, NULL);
951 if (err)
952 goto err_out_destroy;
954 w->cond = cond;
955 atomic_init(&w->refcnt, 1);
957 return w;
959 err_out_destroy:
960 pthread_mutex_destroy(&w->wait_lock);
961 err_out_exit:
962 return NULL;
965 void dnet_wait_destroy(struct dnet_wait *w)
967 pthread_mutex_destroy(&w->wait_lock);
968 pthread_cond_destroy(&w->wait);
969 free(w->ret);
970 free(w);
973 static int dnet_send_cmd_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
975 struct dnet_wait *w = priv;
977 if (is_trans_destroyed(st, cmd)) {
978 dnet_wakeup(w, w->cond++);
979 dnet_wait_put(w);
980 return 0;
983 w->status = cmd->status;
985 if (cmd->size) {
986 void *old = w->ret;
987 void *data = cmd + 1;
989 w->ret = realloc(w->ret, w->size + cmd->size);
990 if (!w->ret) {
991 w->ret = old;
992 w->status = -ENOMEM;
993 } else {
994 memcpy(w->ret + w->size, data, cmd->size);
995 w->size += cmd->size;
999 return w->status;
1002 static int dnet_send_cmd_single(struct dnet_net_state *st, struct dnet_wait *w, struct sph *e, uint64_t cflags)
1004 struct dnet_trans_control ctl;
1006 memset(&ctl, 0, sizeof(struct dnet_trans_control));
1008 dnet_setup_id(&ctl.id, st->idc->group->group_id, st->idc->ids[0].raw.id);
1009 ctl.size = sizeof(struct sph) + e->event_size + e->data_size + e->binary_size;
1010 ctl.cmd = DNET_CMD_EXEC;
1011 ctl.complete = dnet_send_cmd_complete;
1012 ctl.priv = w;
1013 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
1015 dnet_convert_sph(e);
1017 ctl.data = e;
1019 return dnet_trans_alloc_send_state(st, &ctl);
1022 static int dnet_send_cmd_raw(struct dnet_node *n, struct dnet_id *id,
1023 struct sph *e, void **ret, uint64_t cflags)
1025 struct dnet_net_state *st;
1026 int err = -ENOENT, num = 0;
1027 struct dnet_wait *w;
1028 struct dnet_group *g;
1030 w = dnet_wait_alloc(0);
1031 if (!w) {
1032 err = -ENOMEM;
1033 goto err_out_exit;
1036 if (id && id->group_id != 0) {
1037 dnet_wait_get(w);
1038 st = dnet_state_get_first(n, id);
1039 if (!st)
1040 goto err_out_put;
1041 err = dnet_send_cmd_single(st, w, e, cflags);
1042 dnet_state_put(st);
1043 num = 1;
1044 } else if (id && id->group_id == 0) {
1045 pthread_mutex_lock(&n->state_lock);
1046 list_for_each_entry(g, &n->group_list, group_entry) {
1047 dnet_wait_get(w);
1049 id->group_id = g->group_id;
1051 st = dnet_state_search_nolock(n, id);
1052 if (st) {
1053 if (st != n->st) {
1054 err = dnet_send_cmd_single(st, w, e, cflags);
1055 num++;
1057 dnet_state_put(st);
1060 pthread_mutex_unlock(&n->state_lock);
1061 } else {
1062 pthread_mutex_lock(&n->state_lock);
1063 list_for_each_entry(g, &n->group_list, group_entry) {
1064 list_for_each_entry(st, &g->state_list, state_entry) {
1065 if (st == n->st)
1066 continue;
1068 dnet_wait_get(w);
1070 err = dnet_send_cmd_single(st, w, e, cflags);
1071 num++;
1074 pthread_mutex_unlock(&n->state_lock);
1077 err = dnet_wait_event(w, w->cond == num, &n->wait_ts);
1078 if (err)
1079 goto err_out_put;
1081 if (w->ret) {
1082 *ret = w->ret;
1083 w->ret = NULL;
1085 err = w->size;
1088 dnet_wait_put(w);
1090 return err;
1092 err_out_put:
1093 dnet_wait_put(w);
1094 err_out_exit:
1095 return err;
1098 int dnet_send_cmd(struct dnet_node *n, struct dnet_id *id, struct sph *e, void **ret)
1100 return dnet_send_cmd_raw(n, id, e, ret, 0);
1103 int dnet_send_cmd_nolock(struct dnet_node *n, struct dnet_id *id, struct sph *e, void **ret)
1105 return dnet_send_cmd_raw(n, id, e, ret, DNET_FLAGS_NOLOCK);
1108 int dnet_try_reconnect(struct dnet_node *n)
1110 struct dnet_addr_storage *ast, *tmp;
1111 struct dnet_net_state *st;
1112 LIST_HEAD(list);
1113 int s, err, join;
1115 if (list_empty(&n->reconnect_list))
1116 return 0;
1118 pthread_mutex_lock(&n->reconnect_lock);
1119 list_for_each_entry_safe(ast, tmp, &n->reconnect_list, reconnect_entry) {
1120 list_move(&ast->reconnect_entry, &list);
1122 pthread_mutex_unlock(&n->reconnect_lock);
1124 list_for_each_entry_safe(ast, tmp, &list, reconnect_entry) {
1125 s = dnet_socket_create_addr(n, n->sock_type, n->proto, n->family,
1126 (struct sockaddr *)ast->addr.addr, ast->addr.addr_len, 0);
1127 if (s < 0)
1128 goto out_add;
1130 join = DNET_WANT_RECONNECT;
1131 if (ast->__join_state == DNET_JOIN)
1132 join = DNET_JOIN;
1134 st = dnet_add_state_socket(n, &ast->addr, s, &err, join);
1135 if (st)
1136 goto out_remove;
1138 dnet_sock_close(s);
1140 if (err == -EEXIST || err == -EINVAL)
1141 goto out_remove;
1143 out_add:
1144 dnet_add_reconnect_state(n, &ast->addr, ast->__join_state);
1145 out_remove:
1146 list_del(&ast->reconnect_entry);
1147 free(ast);
1150 return 0;
1153 int dnet_lookup_object(struct dnet_node *n, struct dnet_id *id, uint64_t cflags,
1154 int (* complete)(struct dnet_net_state *, struct dnet_cmd *, void *),
1155 void *priv)
1157 struct dnet_io_req req;
1158 struct dnet_trans *t;
1159 struct dnet_cmd *cmd;
1160 int err;
1162 t = dnet_trans_alloc(n, sizeof(struct dnet_cmd));
1163 if (!t) {
1164 err = -ENOMEM;
1165 goto err_out_complete;
1167 t->complete = complete;
1168 t->priv = priv;
1170 cmd = (struct dnet_cmd *)(t + 1);
1172 memcpy(&cmd->id, id, sizeof(struct dnet_id));
1174 memcpy(&t->cmd, cmd, sizeof(struct dnet_cmd));
1176 cmd->cmd = t->command = DNET_CMD_LOOKUP;
1177 cmd->flags = cflags | DNET_FLAGS_NEED_ACK;
1179 t->st = dnet_state_get_first(n, &cmd->id);
1180 if (!t->st) {
1181 err = -ENOENT;
1182 goto err_out_destroy;
1185 cmd->trans = t->rcv_trans = t->trans = atomic_inc(&n->trans);
1186 dnet_convert_cmd(cmd);
1188 dnet_log(n, DNET_LOG_NOTICE, "%s: lookup to %s.\n", dnet_dump_id(id), dnet_server_convert_dnet_addr(&t->st->addr));
1190 memset(&req, 0, sizeof(req));
1191 req.st = t->st;
1192 req.header = cmd;
1193 req.hsize = sizeof(struct dnet_cmd);
1195 err = dnet_trans_send(t, &req);
1196 if (err)
1197 goto err_out_destroy;
1199 return 0;
1201 err_out_complete:
1202 if (complete)
1203 complete(NULL, NULL, priv);
1204 return err;
1206 err_out_destroy:
1207 dnet_trans_put(t);
1208 return err;
1211 int dnet_lookup_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
1213 struct dnet_wait *w = priv;
1214 struct dnet_node *n = NULL;
1215 struct dnet_addr_attr *a;
1216 struct dnet_net_state *other;
1217 char addr_str[128] = "no-address";
1218 int err;
1220 if (is_trans_destroyed(st, cmd)) {
1221 dnet_wakeup(w, w->cond++);
1222 dnet_wait_put(w);
1223 return 0;
1225 n = st->n;
1227 err = cmd->status;
1228 if (err || !cmd->size)
1229 goto err_out_exit;
1231 if (cmd->size < sizeof(struct dnet_addr_attr)) {
1232 dnet_log(st->n, DNET_LOG_ERROR, "%s: wrong dnet_addr attribute size %llu, must be at least %zu.\n",
1233 dnet_dump_id(&cmd->id), (unsigned long long)cmd->size, sizeof(struct dnet_addr_attr));
1234 err = -EINVAL;
1235 goto err_out_exit;
1238 a = (struct dnet_addr_attr *)(cmd + 1);
1240 dnet_convert_addr_attr(a);
1241 dnet_server_convert_dnet_addr_raw(&a->addr, addr_str, sizeof(addr_str));
1243 if (cmd->size > sizeof(struct dnet_addr_attr) + sizeof(struct dnet_file_info)) {
1244 struct dnet_file_info *info = (struct dnet_file_info *)(a + 1);
1246 dnet_convert_file_info(info);
1248 dnet_log_raw(n, DNET_LOG_NOTICE, "%s: lookup object: %s: "
1249 "offset: %llu, size: %llu, mode: %llo, path: %s\n",
1250 dnet_dump_id(&cmd->id), addr_str,
1251 (unsigned long long)info->offset, (unsigned long long)info->size,
1252 (unsigned long long)info->mode, (char *)(info + 1));
1253 } else {
1254 dnet_log_raw(n, DNET_LOG_INFO, "%s: lookup object: %s\n",
1255 dnet_dump_id(&cmd->id), addr_str);
1259 other = dnet_state_search_by_addr(n, &a->addr);
1260 if (other) {
1261 dnet_state_put(other);
1262 } else {
1263 dnet_recv_route_list(st);
1266 return 0;
1268 err_out_exit:
1269 if (n)
1270 dnet_log(n, DNET_LOG_ERROR, "%s: lookup completion status: %d, err: %d.\n", dnet_dump_id(&cmd->id), cmd->status, err);
1272 return err;
1275 int dnet_lookup(struct dnet_node *n, const char *file)
1277 int err, error = 0, i;
1278 struct dnet_wait *w;
1279 struct dnet_id raw;
1281 w = dnet_wait_alloc(0);
1282 if (!w) {
1283 err = -ENOMEM;
1284 goto err_out_exit;
1287 dnet_transform(n, file, strlen(file), &raw);
1289 pthread_mutex_lock(&n->group_lock);
1290 for (i=0; i<n->group_num; ++i) {
1291 raw.group_id = n->groups[i];
1293 err = dnet_lookup_object(n, &raw, 0, dnet_lookup_complete, dnet_wait_get(w));
1294 if (err) {
1295 error = err;
1296 continue;
1299 err = dnet_wait_event(w, w->cond == 1, &n->wait_ts);
1300 if (err || w->status) {
1301 if (!err)
1302 err = w->status;
1303 error = err;
1304 continue;
1307 error = 0;
1308 break;
1310 pthread_mutex_unlock(&n->group_lock);
1312 dnet_wait_put(w);
1313 return error;
1315 err_out_exit:
1316 return err;
1319 struct dnet_addr *dnet_state_addr(struct dnet_net_state *st)
1321 return &st->addr;
1324 static int dnet_stat_complete(struct dnet_net_state *state, struct dnet_cmd *cmd, void *priv)
1326 struct dnet_wait *w = priv;
1327 float la[3];
1328 struct dnet_stat *st;
1329 int err = -EINVAL;
1331 if (is_trans_destroyed(state, cmd)) {
1332 dnet_wakeup(w, w->cond++);
1333 dnet_wait_put(w);
1334 return 0;
1337 if (cmd->cmd == DNET_CMD_STAT && cmd->size == sizeof(struct dnet_stat)) {
1338 st = (struct dnet_stat *)(cmd + 1);
1340 dnet_convert_stat(st);
1342 la[0] = (float)st->la[0] / 100.0;
1343 la[1] = (float)st->la[1] / 100.0;
1344 la[2] = (float)st->la[2] / 100.0;
1346 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: la: %.2f %.2f %.2f.\n",
1347 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state),
1348 la[0], la[1], la[2]);
1349 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: mem: "
1350 "total: %llu kB, free: %llu kB, cache: %llu kB.\n",
1351 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state),
1352 (unsigned long long)st->vm_total,
1353 (unsigned long long)st->vm_free,
1354 (unsigned long long)st->vm_cached);
1355 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: fs: "
1356 "total: %llu mB, avail: %llu mB, files: %llu, fsid: %llx.\n",
1357 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state),
1358 (unsigned long long)(st->frsize * st->blocks / 1024 / 1024),
1359 (unsigned long long)(st->bavail * st->bsize / 1024 / 1024),
1360 (unsigned long long)st->files, (unsigned long long)st->fsid);
1361 err = 0;
1362 } else if (cmd->size >= sizeof(struct dnet_addr_stat) && cmd->cmd == DNET_CMD_STAT_COUNT) {
1363 struct dnet_addr_stat *as = (struct dnet_addr_stat *)(cmd + 1);
1364 int i;
1366 dnet_convert_addr_stat(as, 0);
1368 for (i=0; i<as->num; ++i) {
1369 if (as->num > as->cmd_num) {
1370 if (i == 0)
1371 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: Storage commands\n",
1372 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state));
1373 if (i == as->cmd_num)
1374 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: Proxy commands\n",
1375 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state));
1376 if (i == as->cmd_num * 2)
1377 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: Counters\n",
1378 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state));
1380 dnet_log(state->n, DNET_LOG_DATA, "%s: %s: cmd: %s, count: %llu, err: %llu\n",
1381 dnet_dump_id(&cmd->id), dnet_state_dump_addr(state),
1382 dnet_counter_string(i, as->cmd_num),
1383 (unsigned long long)as->count[i].count, (unsigned long long)as->count[i].err);
1387 return err;
1390 static int dnet_request_cmd_single(struct dnet_node *n, struct dnet_net_state *st, struct dnet_trans_control *ctl)
1392 if (st)
1393 return dnet_trans_alloc_send_state(st, ctl);
1394 else
1395 return dnet_trans_alloc_send(n, ctl);
1398 int dnet_request_stat(struct dnet_node *n, struct dnet_id *id,
1399 unsigned int cmd, uint64_t cflags,
1400 int (* complete)(struct dnet_net_state *state,
1401 struct dnet_cmd *cmd,
1402 void *priv),
1403 void *priv)
1405 struct dnet_trans_control ctl;
1406 struct dnet_wait *w = NULL;
1407 int err, num = 0;
1408 struct timeval start, end;
1409 long diff;
1411 gettimeofday(&start, NULL);
1413 if (!complete) {
1414 w = dnet_wait_alloc(0);
1415 if (!w) {
1416 err = -ENOMEM;
1417 goto err_out_exit;
1420 complete = dnet_stat_complete;
1421 priv = w;
1424 memset(&ctl, 0, sizeof(struct dnet_trans_control));
1426 ctl.cmd = cmd;
1427 ctl.complete = complete;
1428 ctl.priv = priv;
1429 ctl.cflags = DNET_FLAGS_NEED_ACK | DNET_FLAGS_NOLOCK | cflags;
1431 if (id) {
1432 if (w)
1433 dnet_wait_get(w);
1435 memcpy(&ctl.id, id, sizeof(struct dnet_id));
1437 err = dnet_request_cmd_single(n, NULL, &ctl);
1438 num = 1;
1439 } else {
1440 struct dnet_net_state *st;
1441 struct dnet_group *g;
1444 pthread_mutex_lock(&n->state_lock);
1445 list_for_each_entry(g, &n->group_list, group_entry) {
1446 list_for_each_entry(st, &g->state_list, state_entry) {
1447 if (st == n->st)
1448 continue;
1450 if (w)
1451 dnet_wait_get(w);
1453 dnet_setup_id(&ctl.id, st->idc->group->group_id, st->idc->ids[0].raw.id);
1454 dnet_request_cmd_single(n, st, &ctl);
1455 num++;
1458 pthread_mutex_unlock(&n->state_lock);
1461 if (!w) {
1462 gettimeofday(&end, NULL);
1463 diff = (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec;
1464 dnet_log(n, DNET_LOG_NOTICE, "stat cmd: %s: %ld usecs, num: %d.\n", dnet_cmd_string(cmd), diff, num);
1466 return num;
1469 err = dnet_wait_event(w, w->cond == num, &n->wait_ts);
1471 gettimeofday(&end, NULL);
1472 diff = (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec;
1473 dnet_log(n, DNET_LOG_NOTICE, "stat cmd: %s: %ld usecs, wait_error: %d, num: %d.\n", dnet_cmd_string(cmd), diff, err, num);
1475 if (err)
1476 goto err_out_put;
1478 dnet_wait_put(w);
1480 return num;
1482 err_out_put:
1483 dnet_wait_put(w);
1484 err_out_exit:
1485 return err;
1488 struct dnet_request_cmd_priv {
1489 struct dnet_wait *w;
1491 int (* complete)(struct dnet_net_state *state, struct dnet_cmd *cmd, void *priv);
1492 void *priv;
1495 static int dnet_request_cmd_complete(struct dnet_net_state *state, struct dnet_cmd *cmd, void *priv)
1497 struct dnet_request_cmd_priv *p = priv;
1498 int err = p->complete(state, cmd, p->priv);
1500 if (is_trans_destroyed(state, cmd)) {
1501 struct dnet_wait *w = p->w;
1503 dnet_wakeup(w, w->cond++);
1504 if (atomic_read(&w->refcnt) == 1)
1505 free(p);
1506 dnet_wait_put(w);
1509 return err;
1512 int dnet_request_cmd(struct dnet_node *n, struct dnet_trans_control *ctl)
1514 int err, num = 0;
1515 struct dnet_request_cmd_priv *p;
1516 struct dnet_wait *w;
1517 struct dnet_net_state *st;
1518 struct dnet_group *g;
1519 struct timeval start, end;
1520 long diff;
1522 gettimeofday(&start, NULL);
1524 p = malloc(sizeof(*p));
1525 if (!p) {
1526 err = -ENOMEM;
1527 goto err_out_exit;
1530 w = dnet_wait_alloc(0);
1531 if (!w) {
1532 err = -ENOMEM;
1533 goto err_out_free;
1536 p->w = w;
1537 p->complete = ctl->complete;
1538 p->priv = ctl->priv;
1540 ctl->complete = dnet_request_cmd_complete;
1541 ctl->priv = p;
1543 pthread_mutex_lock(&n->state_lock);
1544 list_for_each_entry(g, &n->group_list, group_entry) {
1545 list_for_each_entry(st, &g->state_list, state_entry) {
1546 if (st == n->st)
1547 continue;
1549 dnet_wait_get(w);
1551 ctl->id.group_id = g->group_id;
1553 if (!(ctl->cflags & DNET_FLAGS_DIRECT))
1554 dnet_setup_id(&ctl->id, st->idc->group->group_id, st->idc->ids[0].raw.id);
1555 dnet_request_cmd_single(n, st, ctl);
1556 num++;
1559 pthread_mutex_unlock(&n->state_lock);
1561 err = dnet_wait_event(w, w->cond == num, &n->wait_ts);
1563 gettimeofday(&end, NULL);
1564 diff = (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec;
1565 dnet_log(n, DNET_LOG_NOTICE, "request cmd: %s: %ld usecs, wait_error: %d, num: %d.\n", dnet_cmd_string(ctl->cmd), diff, err, num);
1567 if (!err)
1568 err = num;
1570 if (atomic_read(&w->refcnt) == 1)
1571 free(p);
1572 dnet_wait_put(w);
1574 return err;
1576 err_out_free:
1577 free(p);
1578 err_out_exit:
1579 return err;
1582 struct dnet_update_status_priv {
1583 struct dnet_wait *w;
1584 struct dnet_node_status status;
1585 atomic_t refcnt;
1588 static int dnet_update_status_complete(struct dnet_net_state *state, struct dnet_cmd *cmd, void *priv)
1590 struct dnet_update_status_priv *p = priv;
1592 if (is_trans_destroyed(state, cmd)) {
1593 dnet_wakeup(p->w, p->w->cond++);
1594 dnet_wait_put(p->w);
1595 if (atomic_dec_and_test(&p->refcnt))
1596 free(p);
1599 if (cmd->size == sizeof(struct dnet_node_status)) {
1600 memcpy(&p->status, cmd + 1, sizeof(struct dnet_node_status));
1601 return 0;
1604 return -ENOENT;
1607 int dnet_update_status(struct dnet_node *n, struct dnet_addr *addr, struct dnet_id *id, struct dnet_node_status *status)
1609 int err;
1610 struct dnet_update_status_priv *priv;
1611 struct dnet_trans_control ctl;
1613 if (!id && !addr) {
1614 err = -EINVAL;
1615 goto err_out_exit;
1618 memset(&ctl, 0, sizeof(ctl));
1620 if (id) {
1621 memcpy(&ctl.id, id, sizeof(struct dnet_id));
1622 } else {
1623 struct dnet_net_state *st;
1625 st = dnet_state_search_by_addr(n, addr);
1626 if (!st) {
1627 err = -ENOENT;
1628 goto err_out_exit;
1631 dnet_setup_id(&ctl.id, st->idc->group->group_id, st->idc->ids[0].raw.id);
1632 dnet_state_put(st);
1635 priv = malloc(sizeof(struct dnet_update_status_priv));
1636 if (!priv) {
1637 err = -ENOMEM;
1638 goto err_out_exit;
1641 priv->w = dnet_wait_alloc(0);
1642 if (!priv->w) {
1643 err = -ENOMEM;
1644 goto err_out_exit;
1647 ctl.complete = dnet_update_status_complete;
1648 ctl.priv = priv;
1649 ctl.cmd = DNET_CMD_STATUS;
1650 ctl.cflags = DNET_FLAGS_NEED_ACK;
1651 ctl.size = sizeof(struct dnet_node_status);
1652 ctl.data = status;
1654 dnet_wait_get(priv->w);
1655 dnet_request_cmd_single(n, NULL, &ctl);
1657 err = dnet_wait_event(priv->w, priv->w->cond == 1, &n->wait_ts);
1658 dnet_wait_put(priv->w);
1659 if (!err && priv) {
1660 memcpy(status, &priv->status, sizeof(struct dnet_node_status));
1662 if (atomic_dec_and_test(&priv->refcnt))
1663 free(priv);
1665 err_out_exit:
1666 return err;
1669 static int dnet_remove_object_raw(struct dnet_node *n, struct dnet_id *id,
1670 int (* complete)(struct dnet_net_state *state,
1671 struct dnet_cmd *cmd,
1672 void *priv),
1673 void *priv, uint64_t cflags, uint64_t ioflags)
1675 struct dnet_io_control ctl;
1677 memset(&ctl, 0, sizeof(struct dnet_io_control));
1679 memcpy(&ctl.id, id, sizeof(struct dnet_id));
1681 memcpy(&ctl.io.id, id->id, DNET_ID_SIZE);
1682 memcpy(&ctl.io.parent, id->id, DNET_ID_SIZE);
1683 ctl.io.flags = ioflags;
1685 ctl.fd = -1;
1687 ctl.cmd = DNET_CMD_DEL;
1688 ctl.complete = complete;
1689 ctl.priv = priv;
1690 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
1692 return dnet_trans_create_send_all(n, &ctl);
1695 static int dnet_remove_complete(struct dnet_net_state *state,
1696 struct dnet_cmd *cmd,
1697 void *priv)
1699 struct dnet_wait *w = priv;
1701 if (is_trans_destroyed(state, cmd)) {
1702 dnet_wakeup(w, w->cond++);
1703 dnet_wait_put(w);
1704 return 0;
1707 if (cmd->status)
1708 w->status = cmd->status;
1709 return cmd->status;
1712 int dnet_remove_object(struct dnet_node *n, struct dnet_id *id,
1713 int (* complete)(struct dnet_net_state *state,
1714 struct dnet_cmd *cmd,
1715 void *priv),
1716 void *priv,
1717 uint64_t cflags, uint64_t ioflags)
1719 struct dnet_wait *w = NULL;
1720 int err;
1722 if (!complete) {
1723 w = dnet_wait_alloc(0);
1724 if (!w) {
1725 err = -ENOMEM;
1726 goto err_out_exit;
1729 complete = dnet_remove_complete;
1730 priv = w;
1731 dnet_wait_get(w);
1734 err = dnet_remove_object_raw(n, id, complete, priv, cflags, ioflags);
1735 if (err < 0)
1736 goto err_out_put;
1738 if (w) {
1739 err = dnet_wait_event(w, w->cond != err, &n->wait_ts);
1740 if (err)
1741 goto err_out_put;
1743 dnet_wait_put(w);
1745 return 0;
1747 err_out_put:
1748 if (w)
1749 dnet_wait_put(w);
1750 err_out_exit:
1751 return err;
1754 static int dnet_remove_file_raw(struct dnet_node *n, struct dnet_id *id, uint64_t cflags, uint64_t ioflags)
1756 struct dnet_wait *w;
1757 int err, num;
1759 w = dnet_wait_alloc(0);
1760 if (!w) {
1761 err = -ENOMEM;
1762 goto err_out_exit;
1765 atomic_add(&w->refcnt, 1024);
1766 err = dnet_remove_object_raw(n, id, dnet_remove_complete, w, cflags, ioflags);
1767 if (err < 0) {
1768 atomic_sub(&w->refcnt, 1024);
1769 goto err_out_put;
1772 num = err;
1773 atomic_sub(&w->refcnt, 1024 - num);
1775 err = dnet_wait_event(w, w->cond == num, &n->wait_ts);
1776 if (err)
1777 goto err_out_put;
1779 dnet_wait_put(w);
1781 return 0;
1783 err_out_put:
1784 dnet_wait_put(w);
1785 err_out_exit:
1786 return err;
1789 int dnet_remove_object_now(struct dnet_node *n, struct dnet_id *id, uint64_t cflags, uint64_t ioflags)
1791 return dnet_remove_file_raw(n, id, cflags | DNET_FLAGS_NEED_ACK | DNET_ATTR_DELETE_HISTORY, ioflags);
1794 int dnet_remove_file(struct dnet_node *n, char *remote, int remote_len, struct dnet_id *id, uint64_t cflags, uint64_t ioflags)
1796 struct dnet_id raw;
1798 if (!id) {
1799 dnet_transform(n, remote, remote_len, &raw);
1800 raw.group_id = 0;
1801 id = &raw;
1804 return dnet_remove_file_raw(n, id, cflags, ioflags);
1807 int dnet_request_ids(struct dnet_node *n, struct dnet_id *id, uint64_t cflags,
1808 int (* complete)(struct dnet_net_state *state,
1809 struct dnet_cmd *cmd,
1810 void *priv),
1811 void *priv)
1813 struct dnet_trans_control ctl;
1815 dnet_log_raw(n, DNET_LOG_ERROR, "Temporarily unsupported operation.\n");
1816 exit(-1);
1818 memset(&ctl, 0, sizeof(struct dnet_trans_control));
1820 memcpy(&ctl.id, id, sizeof(struct dnet_id));
1821 ctl.cmd = DNET_CMD_LIST;
1822 ctl.complete = complete;
1823 ctl.priv = priv;
1824 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
1826 return dnet_trans_alloc_send(n, &ctl);
1829 struct dnet_node *dnet_get_node_from_state(void *state)
1831 struct dnet_net_state *st = state;
1833 if (!st)
1834 return NULL;
1835 return st->n;
1838 struct dnet_read_data_completion {
1839 struct dnet_wait *w;
1840 void *data;
1841 uint64_t size;
1842 atomic_t refcnt;
1845 static int dnet_read_data_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
1847 struct dnet_read_data_completion *c = priv;
1848 struct dnet_wait *w = c->w;
1849 int err = -EINVAL;
1851 if (is_trans_destroyed(st, cmd)) {
1852 dnet_wakeup(w, w->cond++);
1853 dnet_wait_put(w);
1854 if (atomic_dec_and_test(&c->refcnt))
1855 free(c);
1856 return err;
1859 err = cmd->status;
1860 if (err)
1861 w->status = err;
1863 if (cmd->size >= sizeof(struct dnet_io_attr)) {
1864 struct dnet_io_attr *io = (struct dnet_io_attr *)(cmd + 1);
1865 uint64_t sz = c->size;
1867 dnet_convert_io_attr(io);
1869 sz += io->size + sizeof(struct dnet_io_attr);
1870 c->data = realloc(c->data, sz);
1871 if (!c->data) {
1872 err = -ENOMEM;
1873 goto err_out_exit;
1876 memcpy(c->data + c->size, io, sizeof(struct dnet_io_attr) + io->size);
1877 c->size = sz;
1880 err_out_exit:
1881 dnet_log(st->n, DNET_LOG_NOTICE, "%s: object read completed: trans: %llu, status: %d, err: %d.\n",
1882 dnet_dump_id(&cmd->id), (unsigned long long)(cmd->trans & ~DNET_TRANS_REPLY),
1883 cmd->status, err);
1885 return err;
1888 void *dnet_read_data_wait_raw(struct dnet_node *n, struct dnet_id *id, struct dnet_io_attr *io,
1889 int cmd, uint64_t cflags, int *errp)
1891 struct dnet_io_control ctl;
1892 struct dnet_wait *w;
1893 struct dnet_read_data_completion *c;
1894 void *data = NULL;
1895 int err;
1897 w = dnet_wait_alloc(0);
1898 if (!w) {
1899 err = -ENOMEM;
1900 goto err_out_exit;
1903 c = malloc(sizeof(*c));
1904 if (!c) {
1905 err = -ENOMEM;
1906 goto err_out_put;
1909 c->w = w;
1910 c->size = 0;
1911 c->data = NULL;
1912 /* one for completion callback, another for this function */
1913 atomic_init(&c->refcnt, 2);
1915 memset(&ctl, 0, sizeof(struct dnet_io_control));
1917 ctl.fd = -1;
1919 ctl.priv = c;
1920 ctl.complete = dnet_read_data_complete;
1922 ctl.cmd = cmd;
1923 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
1925 memcpy(&ctl.io, io, sizeof(struct dnet_io_attr));
1926 memcpy(&ctl.id, id, sizeof(struct dnet_id));
1928 ctl.id.type = io->type;
1930 dnet_wait_get(w);
1931 err = dnet_read_object(n, &ctl);
1932 if (err)
1933 goto err_out_put_complete;
1935 err = dnet_wait_event(w, w->cond, &n->wait_ts);
1936 if (err || w->status) {
1937 char id_str[2*DNET_ID_SIZE + 1];
1938 if (!err)
1939 err = w->status;
1940 if ((cmd != DNET_CMD_READ_RANGE) || (err != -ENOENT))
1941 dnet_log(n, DNET_LOG_ERROR, "%d:%s : failed to read data: %d\n",
1942 ctl.id.group_id, dnet_dump_id_len_raw(ctl.id.id, DNET_ID_SIZE, id_str), err);
1943 goto err_out_put_complete;
1945 io->size = c->size;
1946 data = c->data;
1947 err = 0;
1949 err_out_put_complete:
1950 if (atomic_dec_and_test(&c->refcnt))
1951 free(c);
1952 err_out_put:
1953 dnet_wait_put(w);
1954 err_out_exit:
1955 *errp = err;
1956 return data;
1959 static int dnet_read_recover(struct dnet_node *n, struct dnet_id *id, struct dnet_io_attr *io, void *data, uint64_t cflags)
1961 struct dnet_meta_container mc;
1962 struct dnet_io_control ctl;
1963 void *result;
1964 int err;
1966 err = dnet_read_meta(n, &mc, NULL, 0, id);
1967 if (err) {
1968 dnet_log(n, DNET_LOG_ERROR, "%s: read-recovery: could read metadata: %d\n", dnet_dump_id(id), err);
1969 goto err_out_exit;
1972 memset(&ctl, 0, sizeof(struct dnet_io_control));
1974 ctl.id = *id;
1975 ctl.io = *io;
1977 ctl.data = data + sizeof(struct dnet_io_attr);
1978 ctl.io.size -= sizeof(struct dnet_io_attr);
1980 ctl.fd = -1;
1981 ctl.cmd = DNET_CMD_WRITE;
1982 ctl.cflags = cflags;
1984 err = dnet_write_data_wait(n, &ctl, &result);
1985 if (err < 0) {
1986 dnet_log(n, DNET_LOG_ERROR, "%s: read-recovery: could not write data: %d\n", dnet_dump_id(id), err);
1987 goto err_out_free_meta;
1990 err = dnet_write_metadata(n, &mc, 0, cflags);
1991 if (err < 0)
1992 goto err_out_free_result;
1994 err_out_free_result:
1995 free(result);
1996 err_out_free_meta:
1997 free(mc.data);
1998 err_out_exit:
1999 return err;
2002 void *dnet_read_data_wait_groups(struct dnet_node *n, struct dnet_id *id, int *groups, int num,
2003 struct dnet_io_attr *io, uint64_t cflags, int *errp)
2005 int i;
2006 void *data;
2008 for (i = 0; i < num; ++i) {
2009 id->group_id = groups[i];
2011 data = dnet_read_data_wait_raw(n, id, io, DNET_CMD_READ, cflags, errp);
2012 if (data) {
2013 if ((i != 0) && (io->type == 0) && (io->offset == 0) && (io->size > sizeof(struct dnet_io_attr))) {
2014 dnet_read_recover(n, id, io, data, cflags);
2017 *errp = 0;
2018 return data;
2022 return NULL;
2025 void *dnet_read_data_wait(struct dnet_node *n, struct dnet_id *id, struct dnet_io_attr *io,
2026 uint64_t cflags, int *errp)
2028 int num, *g, err;
2029 void *data = NULL;
2031 num = dnet_mix_states(n, id, &g);
2032 if (num < 0) {
2033 err = num;
2034 goto err_out_exit;
2037 data = dnet_read_data_wait_groups(n, id, g, num, io, cflags, &err);
2038 if (!data)
2039 goto err_out_free;
2041 err_out_free:
2042 free(g);
2043 err_out_exit:
2044 *errp = err;
2045 return data;
2048 int dnet_write_data_wait(struct dnet_node *n, struct dnet_io_control *ctl, void **result)
2050 int err, trans_num = 0;
2051 struct dnet_wait *w;
2052 struct dnet_write_completion *wc;
2054 wc = malloc(sizeof(struct dnet_write_completion));
2055 if (!wc) {
2056 err = -ENOMEM;
2057 goto err_out_exit;
2059 memset(wc, 0, sizeof(struct dnet_write_completion));
2061 w = dnet_wait_alloc(0);
2062 if (!w) {
2063 err = -ENOMEM;
2064 free(wc);
2065 goto err_out_exit;
2067 wc->wait = w;
2069 w->status = -ENOENT;
2070 ctl->priv = wc;
2071 ctl->complete = dnet_write_complete;
2073 ctl->cmd = DNET_CMD_WRITE;
2074 ctl->cflags |= DNET_FLAGS_NEED_ACK;
2076 memcpy(ctl->io.id, ctl->id.id, DNET_ID_SIZE);
2077 memcpy(ctl->io.parent, ctl->id.id, DNET_ID_SIZE);
2079 atomic_set(&w->refcnt, INT_MAX);
2080 trans_num = dnet_write_object(n, ctl);
2081 if (trans_num < 0)
2082 trans_num = 0;
2085 * 1 - the first reference counter we grabbed at allocation time
2087 atomic_sub(&w->refcnt, INT_MAX - trans_num - 1);
2089 err = dnet_wait_event(w, w->cond == trans_num, &n->wait_ts);
2090 if (err || w->status) {
2091 if (!err)
2092 err = w->status;
2093 dnet_log(n, DNET_LOG_NOTICE, "%s: failed to wait for IO write completion, err: %d, status: %d.\n",
2094 dnet_dump_id(&ctl->id), err, w->status);
2097 if (err || !trans_num) {
2098 if (!err)
2099 err = -EINVAL;
2100 dnet_log(n, DNET_LOG_ERROR, "Failed to write data into the storage, err: %d, trans_num: %d.\n", err, trans_num);
2101 goto err_out_put;
2104 if (trans_num)
2105 dnet_log(n, DNET_LOG_NOTICE, "%s: wrote: %llu bytes, type: %d, reply size: %d.\n",
2106 dnet_dump_id(&ctl->id), (unsigned long long)ctl->io.size, ctl->io.type, wc->size);
2107 err = trans_num;
2109 *result = wc->reply;
2110 err = wc->size;
2112 wc->reply = NULL;
2114 err_out_put:
2115 dnet_write_complete_free(wc);
2116 err_out_exit:
2117 return err;
2120 int dnet_lookup_addr(struct dnet_node *n, const void *remote, int len, struct dnet_id *id, int group_id, char *dst, int dlen)
2122 struct dnet_id raw;
2123 struct dnet_net_state *st;
2124 int err = -ENOENT;
2126 if (!id) {
2127 dnet_transform(n, remote, len, &raw);
2128 id = &raw;
2130 id->group_id = group_id;
2132 st = dnet_state_get_first(n, id);
2133 if (!st)
2134 goto err_out_exit;
2136 dnet_server_convert_dnet_addr_raw(dnet_state_addr(st), dst, dlen);
2137 dnet_state_put(st);
2138 err = 0;
2140 err_out_exit:
2141 return err;
2144 struct dnet_weight {
2145 int weight;
2146 int group_id;
2149 static int dnet_weight_compare(const void *v1, const void *v2)
2151 const struct dnet_weight *w1 = v1;
2152 const struct dnet_weight *w2 = v2;
2154 return w2->weight - w1->weight;
2157 static int dnet_weight_get_winner(struct dnet_weight *w, int num)
2159 long sum = 0, pos;
2160 float r;
2161 int i;
2163 for (i = 0; i < num; ++i)
2164 sum += w[i].weight;
2166 r = (float)rand() / (float)RAND_MAX;
2167 pos = r * sum;
2169 for (i = 0; i < num; ++i) {
2170 pos -= w[i].weight;
2171 if (pos <= 0)
2172 return i;
2175 return num - 1;
2178 int dnet_mix_states(struct dnet_node *n, struct dnet_id *id, int **groupsp)
2180 struct dnet_weight *weights;
2181 int *groups;
2182 int group_num, i, num;
2183 struct dnet_net_state *st;
2185 if (!n->group_num)
2186 return -ENOENT;
2188 pthread_mutex_lock(&n->group_lock);
2189 group_num = n->group_num;
2191 weights = alloca(n->group_num * sizeof(*weights));
2192 groups = malloc(n->group_num * sizeof(*groups));
2193 if (groups)
2194 memcpy(groups, n->groups, n->group_num * sizeof(*groups));
2195 pthread_mutex_unlock(&n->group_lock);
2197 if (!groups) {
2198 *groupsp = NULL;
2199 return -ENOMEM;
2202 if (n->flags & DNET_CFG_RANDOMIZE_STATES) {
2203 for (i = 0; i < group_num; ++i) {
2204 weights[i].weight = rand();
2205 weights[i].group_id = groups[i];
2207 num = group_num;
2208 } else {
2209 if (!(n->flags & DNET_CFG_MIX_STATES)) {
2210 *groupsp = groups;
2211 return group_num;
2214 memset(weights, 0, group_num * sizeof(*weights));
2216 for (i = 0, num = 0; i < group_num; ++i) {
2217 id->group_id = groups[i];
2219 st = dnet_state_get_first(n, id);
2220 if (st) {
2221 weights[num].weight = (int)st->weight;
2222 weights[num].group_id = id->group_id;
2224 dnet_state_put(st);
2226 num++;
2231 group_num = num;
2232 if (group_num) {
2233 qsort(weights, group_num, sizeof(struct dnet_weight), dnet_weight_compare);
2235 for (i = 0; i < group_num; ++i) {
2236 int pos = dnet_weight_get_winner(weights, group_num - i);
2237 groups[i] = weights[pos].group_id;
2239 if (pos < group_num - 1)
2240 memmove(&weights[pos], &weights[pos + 1], (group_num - 1 - pos) * sizeof(struct dnet_weight));
2244 dnet_node_set_groups(n, groups, group_num);
2246 *groupsp = groups;
2247 return group_num;
2250 int dnet_data_map(struct dnet_map_fd *map)
2252 uint64_t off;
2253 long page_size = sysconf(_SC_PAGE_SIZE);
2254 int err = 0;
2256 off = map->offset & ~(page_size - 1);
2257 map->mapped_size = ALIGN(map->size + map->offset - off, page_size);
2259 map->mapped_data = mmap(NULL, map->mapped_size, PROT_READ, MAP_SHARED, map->fd, off);
2260 if (map->mapped_data == MAP_FAILED) {
2261 err = -errno;
2262 goto err_out_exit;
2265 map->data = map->mapped_data + map->offset - off;
2267 err_out_exit:
2268 return err;
2271 void dnet_data_unmap(struct dnet_map_fd *map)
2273 munmap(map->mapped_data, map->mapped_size);
2276 struct dnet_io_attr *dnet_remove_range(struct dnet_node *n, struct dnet_io_attr *io, int group_id, uint64_t cflags, int *ret_num, int *errp)
2278 struct dnet_id id;
2279 struct dnet_io_attr *ret, *new_ret;
2280 struct dnet_raw_id start, next;
2281 struct dnet_raw_id end;
2282 uint64_t size = io->size;
2283 void *data;
2284 int err, need_exit = 0;
2286 memcpy(end.id, io->parent, DNET_ID_SIZE);
2288 dnet_setup_id(&id, group_id, io->id);
2289 id.type = io->type;
2291 ret = NULL;
2292 *ret_num = 0;
2293 while (!need_exit) {
2294 err = dnet_search_range(n, &id, &start, &next);
2295 if (err)
2296 goto err_out_exit;
2298 if ((dnet_id_cmp_str(id.id, next.id) > 0) ||
2299 !memcmp(start.id, next.id, DNET_ID_SIZE) ||
2300 (dnet_id_cmp_str(next.id, end.id) > 0)) {
2301 memcpy(next.id, end.id, DNET_ID_SIZE);
2302 need_exit = 1;
2305 if (n->log->log_level > DNET_LOG_NOTICE) {
2306 int len = 6;
2307 char start_id[2*len + 1];
2308 char next_id[2*len + 1];
2309 char end_id[2*len + 1];
2310 char id_str[2*len + 1];
2312 dnet_log(n, DNET_LOG_NOTICE, "id: %s, start: %s: next: %s, end: %s, size: %llu, cmp: %d\n",
2313 dnet_dump_id_len_raw(id.id, len, id_str),
2314 dnet_dump_id_len_raw(start.id, len, start_id),
2315 dnet_dump_id_len_raw(next.id, len, next_id),
2316 dnet_dump_id_len_raw(end.id, len, end_id),
2317 (unsigned long long)size, dnet_id_cmp_str(next.id, end.id));
2320 memcpy(io->id, id.id, DNET_ID_SIZE);
2321 memcpy(io->parent, next.id, DNET_ID_SIZE);
2323 io->size = size;
2325 data = dnet_read_data_wait_raw(n, &id, io, DNET_CMD_DEL_RANGE, cflags, &err);
2326 if (io->size != sizeof(struct dnet_io_attr)) {
2327 err = -ENOENT;
2328 goto err_out_exit;
2331 if (data) {
2332 struct dnet_io_attr *rep = (struct dnet_io_attr*)data;
2334 dnet_convert_io_attr(rep);
2336 dnet_log(n, DNET_LOG_NOTICE, "%s: rep_num: %llu, io_start: %llu, io_num: %llu, io_size: %llu\n",
2337 dnet_dump_id(&id), (unsigned long long)rep->num, (unsigned long long)io->start,
2338 (unsigned long long)io->num, (unsigned long long)io->size);
2340 (*ret_num)++;
2342 new_ret = realloc(ret, *ret_num * sizeof(struct dnet_io_attr));
2343 if (!new_ret) {
2344 err = -ENOMEM;
2345 goto err_out_exit;
2348 ret = new_ret;
2349 ret[*ret_num - 1] = *rep;
2351 free(data);
2354 memcpy(id.id, next.id, DNET_ID_SIZE);
2357 err_out_exit:
2358 *errp = err;
2360 return ret;
2363 struct dnet_range_data *dnet_read_range(struct dnet_node *n, struct dnet_io_attr *io, int group_id, uint64_t cflags, int *errp)
2365 struct dnet_id id;
2366 int ret_num;
2367 struct dnet_range_data *ret;
2368 struct dnet_raw_id start, next;
2369 struct dnet_raw_id end;
2370 uint64_t size = io->size;
2371 void *data;
2372 int err, need_exit = 0;
2374 memcpy(end.id, io->parent, DNET_ID_SIZE);
2376 dnet_setup_id(&id, group_id, io->id);
2377 id.type = io->type;
2379 ret = NULL;
2380 ret_num = 0;
2381 while (!need_exit) {
2382 err = dnet_search_range(n, &id, &start, &next);
2383 if (err)
2384 goto err_out_exit;
2386 if ((dnet_id_cmp_str(id.id, next.id) > 0) ||
2387 !memcmp(start.id, next.id, DNET_ID_SIZE) ||
2388 (dnet_id_cmp_str(next.id, end.id) > 0)) {
2389 memcpy(next.id, end.id, DNET_ID_SIZE);
2390 need_exit = 1;
2393 if (n->log->log_level > DNET_LOG_NOTICE) {
2394 int len = 6;
2395 char start_id[2*len + 1];
2396 char next_id[2*len + 1];
2397 char end_id[2*len + 1];
2398 char id_str[2*len + 1];
2400 dnet_log(n, DNET_LOG_NOTICE, "id: %s, start: %s: next: %s, end: %s, size: %llu, cmp: %d\n",
2401 dnet_dump_id_len_raw(id.id, len, id_str),
2402 dnet_dump_id_len_raw(start.id, len, start_id),
2403 dnet_dump_id_len_raw(next.id, len, next_id),
2404 dnet_dump_id_len_raw(end.id, len, end_id),
2405 (unsigned long long)size, dnet_id_cmp_str(next.id, end.id));
2408 memcpy(io->id, id.id, DNET_ID_SIZE);
2409 memcpy(io->parent, next.id, DNET_ID_SIZE);
2411 io->size = size;
2413 data = dnet_read_data_wait_raw(n, &id, io, DNET_CMD_READ_RANGE, cflags, &err);
2414 if (data) {
2415 struct dnet_io_attr *rep = data + io->size - sizeof(struct dnet_io_attr);
2417 /* If DNET_IO_FLAGS_NODATA is set do not decrement size as 'rep' is the only structure in output */
2418 if (!(io->flags & DNET_IO_FLAGS_NODATA))
2419 io->size -= sizeof(struct dnet_io_attr);
2420 dnet_convert_io_attr(rep);
2422 dnet_log(n, DNET_LOG_NOTICE, "%s: rep_num: %llu, io_start: %llu, io_num: %llu, io_size: %llu\n",
2423 dnet_dump_id(&id), (unsigned long long)rep->num, (unsigned long long)io->start,
2424 (unsigned long long)io->num, (unsigned long long)io->size);
2426 if (io->start < rep->num) {
2427 rep->num -= io->start;
2428 io->start = 0;
2429 io->num -= rep->num;
2431 if (!io->size && !(io->flags & DNET_IO_FLAGS_NODATA)) {
2432 free(data);
2433 } else {
2434 struct dnet_range_data *new_ret;
2436 ret_num++;
2438 new_ret = realloc(ret, ret_num * sizeof(struct dnet_range_data));
2439 if (!new_ret) {
2440 goto err_out_exit;
2443 ret = new_ret;
2445 ret[ret_num - 1].data = data;
2446 ret[ret_num - 1].size = io->size;
2449 err = 0;
2450 if (!io->num)
2451 break;
2452 } else {
2453 io->start -= rep->num;
2457 memcpy(id.id, next.id, DNET_ID_SIZE);
2460 err_out_exit:
2461 if (ret) {
2462 *errp = ret_num;
2463 } else {
2464 *errp = err;
2466 return ret;
2469 struct dnet_read_latest_id {
2470 struct dnet_id id;
2471 struct dnet_file_info fi;
2474 struct dnet_read_latest_ctl {
2475 struct dnet_wait *w;
2476 int num, pos;
2477 pthread_mutex_t lock;
2479 struct dnet_read_latest_id ids[0];
2482 static void dnet_read_latest_ctl_put(struct dnet_read_latest_ctl *ctl)
2484 dnet_wakeup(ctl->w, ctl->w->cond++);
2485 if (atomic_dec_and_test(&ctl->w->refcnt)) {
2486 dnet_wait_destroy(ctl->w);
2487 pthread_mutex_destroy(&ctl->lock);
2488 free(ctl);
2492 static int dnet_read_latest_complete(struct dnet_net_state *st, struct dnet_cmd *cmd, void *priv)
2494 struct dnet_read_latest_ctl *ctl = priv;
2495 struct dnet_node *n;
2496 struct dnet_addr_attr *a;
2497 struct dnet_file_info *fi;
2498 int pos, err;
2500 if (is_trans_destroyed(st, cmd)) {
2501 dnet_read_latest_ctl_put(ctl);
2502 return 0;
2505 n = st->n;
2507 err = cmd->status;
2508 if (err || !cmd->size)
2509 goto err_out_exit;
2511 if (cmd->size < sizeof(struct dnet_addr_attr) + sizeof(struct dnet_file_info)) {
2512 dnet_log(n, DNET_LOG_ERROR, "%s: wrong dnet_addr attribute size %llu, must be at least %zu.\n",
2513 dnet_dump_id(&cmd->id), (unsigned long long)cmd->size,
2514 sizeof(struct dnet_addr_attr) + sizeof(struct dnet_file_info));
2515 err = -EINVAL;
2516 goto err_out_exit;
2518 a = (struct dnet_addr_attr *)(cmd + 1);
2519 fi = (struct dnet_file_info *)(a + 1);
2521 dnet_convert_addr_attr(a);
2522 dnet_convert_file_info(fi);
2524 pthread_mutex_lock(&ctl->lock);
2525 pos = ctl->pos++;
2526 pthread_mutex_unlock(&ctl->lock);
2528 /* we do not care about filename */
2529 memcpy(&ctl->ids[pos].fi, fi, sizeof(struct dnet_file_info));
2530 memcpy(&ctl->ids[pos].id, &cmd->id, sizeof(struct dnet_id));
2532 err_out_exit:
2533 return err;
2536 static int dnet_file_read_latest_cmp(const void *p1, const void *p2)
2538 const struct dnet_read_latest_id *id1 = p1;
2539 const struct dnet_read_latest_id *id2 = p2;
2541 int ret = (int)(id2->fi.mtime.tsec - id1->fi.mtime.tsec);
2543 if (!ret)
2544 ret = (int)(id2->fi.mtime.tnsec - id1->fi.mtime.tnsec);
2546 return ret;
2549 int dnet_read_latest_prepare(struct dnet_read_latest_prepare *pr)
2551 struct dnet_read_latest_ctl *ctl;
2552 int group_id = pr->id.group_id;
2553 int err, i;
2555 ctl = malloc(sizeof(struct dnet_read_latest_ctl) + sizeof(struct dnet_read_latest_id) * pr->group_num);
2556 if (!ctl) {
2557 err = -ENOMEM;
2558 goto err_out_exit;
2560 memset(ctl, 0, sizeof(struct dnet_read_latest_ctl));
2562 ctl->w = dnet_wait_alloc(0);
2563 if (!ctl->w) {
2564 err = -ENOMEM;
2565 goto err_out_free;
2568 err = pthread_mutex_init(&ctl->lock, NULL);
2569 if (err)
2570 goto err_out_put_wait;
2572 ctl->num = pr->group_num;
2573 ctl->pos = 0;
2575 for (i = 0; i < pr->group_num; ++i) {
2576 pr->id.group_id = pr->group[i];
2578 dnet_wait_get(ctl->w);
2579 dnet_lookup_object(pr->n, &pr->id, DNET_ATTR_META_TIMES | pr->cflags, dnet_read_latest_complete, ctl);
2582 err = dnet_wait_event(ctl->w, ctl->w->cond == pr->group_num, &pr->n->wait_ts);
2583 if (err)
2584 goto err_out_put;
2586 if (ctl->pos == 0)
2587 goto err_out_put;
2589 pr->group_num = ctl->pos;
2591 qsort(ctl->ids, pr->group_num, sizeof(struct dnet_read_latest_id), dnet_file_read_latest_cmp);
2593 for (i = 0; i < pr->group_num; ++i) {
2594 pr->group[i] = ctl->ids[i].id.group_id;
2596 if (group_id == pr->group[i]) {
2597 const struct dnet_read_latest_id *id0 = &ctl->ids[0];
2598 const struct dnet_read_latest_id *id1 = &ctl->ids[i];
2600 if (!dnet_file_read_latest_cmp(id0, id1)) {
2601 int tmp_group = pr->group[0];
2602 pr->group[0] = pr->group[i];
2603 pr->group[i] = tmp_group;
2608 err_out_put:
2609 dnet_read_latest_ctl_put(ctl);
2610 goto err_out_exit;
2612 err_out_put_wait:
2613 dnet_wait_put(ctl->w);
2614 err_out_free:
2615 free(ctl);
2616 err_out_exit:
2617 return err;
2620 int dnet_read_latest(struct dnet_node *n, struct dnet_id *id, struct dnet_io_attr *io, uint64_t cflags, void **datap)
2622 struct dnet_read_latest_prepare pr;
2623 int *g, num, err, i;
2625 if ((int)io->num > n->group_num) {
2626 err = -E2BIG;
2627 goto err_out_exit;
2630 err = dnet_mix_states(n, id, &g);
2631 if (err < 0)
2632 goto err_out_exit;
2634 num = err;
2636 if ((int)io->num > num) {
2637 err = -E2BIG;
2638 goto err_out_free;
2641 memset(&pr, 0, sizeof(struct dnet_read_latest_prepare));
2643 pr.n = n;
2644 pr.id = *id;
2645 pr.group = g;
2646 pr.group_num = num;
2647 pr.cflags = cflags;
2649 err = dnet_read_latest_prepare(&pr);
2650 if (err)
2651 goto err_out_free;
2653 err = -ENODATA;
2654 for (i = 0; i < pr.group_num; ++i) {
2655 void *data;
2657 id->group_id = pr.group[i];
2658 data = dnet_read_data_wait_raw(n, id, io, DNET_CMD_READ, cflags, &err);
2659 if (data) {
2660 if ((pr.group_num != num) || ((i != 0) && (io->type == 0) && (io->offset == 0))) {
2661 dnet_read_recover(n, id, io, data, cflags);
2664 *datap = data;
2665 err = 0;
2666 break;
2670 err_out_free:
2671 free(g);
2672 err_out_exit:
2673 return err;
2676 int dnet_get_routes(struct dnet_node *n, struct dnet_id **ids, struct dnet_addr **addrs) {
2678 struct dnet_net_state *st;
2679 struct dnet_group *g;
2680 struct dnet_addr *tmp_addrs;
2681 struct dnet_id *tmp_ids;
2682 int size = 0, count = 0;
2683 int i;
2685 *ids = NULL;
2686 *addrs = NULL;
2688 pthread_mutex_lock(&n->state_lock);
2689 list_for_each_entry(g, &n->group_list, group_entry) {
2690 list_for_each_entry(st, &g->state_list, state_entry) {
2692 size += st->idc->id_num;
2694 tmp_ids = (struct dnet_id *)realloc(*ids, size * sizeof(struct dnet_id));
2695 if (!tmp_ids) {
2696 count = -ENOMEM;
2697 goto err_out_free;
2699 *ids = tmp_ids;
2701 tmp_addrs = (struct dnet_addr *)realloc(*addrs, size * sizeof(struct dnet_addr));
2702 if (!tmp_addrs) {
2703 count = -ENOMEM;
2704 goto err_out_free;
2706 *addrs = tmp_addrs;
2708 for (i = 0; i < st->idc->id_num; ++i) {
2709 dnet_setup_id(&(*ids)[count], g->group_id, st->idc->ids[i].raw.id);
2710 memcpy(&(*addrs)[count], dnet_state_addr(st), sizeof(struct dnet_addr));
2711 count++;
2712 //fprintf(stderr, "%d: %s -> %s\n", g->group_id, dnet_dump_id_str(st->idc->ids[i].raw.id), dnet_state_dump_addr(st));
2716 pthread_mutex_unlock(&n->state_lock);
2718 return count;
2720 err_out_free:
2721 if (ids)
2722 free(*ids);
2723 if (addrs)
2724 free(*addrs);
2726 return count;
2730 void *dnet_bulk_read_wait_raw(struct dnet_node *n, struct dnet_id *id, struct dnet_io_attr *ios,
2731 uint32_t io_num, int cmd, uint64_t cflags, int *errp)
2733 struct dnet_io_control ctl;
2734 struct dnet_io_attr io;
2735 struct dnet_wait *w;
2736 struct dnet_read_data_completion *c;
2737 void *data = NULL;
2738 int err;
2740 w = dnet_wait_alloc(0);
2741 if (!w) {
2742 err = -ENOMEM;
2743 goto err_out_exit;
2746 c = malloc(sizeof(*c));
2747 if (!c) {
2748 err = -ENOMEM;
2749 goto err_out_put;
2752 c->w = w;
2753 c->size = 0;
2754 c->data = NULL;
2755 /* one for completion callback, another for this function */
2756 atomic_init(&c->refcnt, 2);
2758 memset(&ctl, 0, sizeof(struct dnet_io_control));
2760 ctl.fd = -1;
2762 ctl.priv = c;
2763 ctl.complete = dnet_read_data_complete;
2765 ctl.cmd = cmd;
2766 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
2768 memcpy(&ctl.id, id, sizeof(struct dnet_id));
2769 memset(&ctl.io, 0, sizeof(struct dnet_io_attr));
2771 memcpy(io.id, id->id, DNET_ID_SIZE);
2772 memcpy(io.parent, id->id, DNET_ID_SIZE);
2774 ctl.io.size = io_num * sizeof(struct dnet_io_attr);
2775 ctl.data = ios;
2777 dnet_wait_get(w);
2778 err = dnet_read_object(n, &ctl);
2779 if (err)
2780 goto err_out_put_complete;
2782 err = dnet_wait_event(w, w->cond, &n->wait_ts);
2783 if (err || w->status) {
2784 char id_str[2*DNET_ID_SIZE + 1];
2785 if (!err)
2786 err = w->status;
2787 if ((cmd != DNET_CMD_READ_RANGE) || (err != -ENOENT))
2788 dnet_log(n, DNET_LOG_ERROR, "%d:%s : failed to read data: %d\n",
2789 ctl.id.group_id, dnet_dump_id_len_raw(ctl.id.id, DNET_ID_SIZE, id_str), err);
2790 goto err_out_put_complete;
2792 err = c->size;
2793 data = c->data;
2795 err_out_put_complete:
2796 if (atomic_dec_and_test(&c->refcnt))
2797 free(c);
2798 err_out_put:
2799 dnet_wait_put(w);
2800 err_out_exit:
2801 *errp = err;
2802 return data;
2806 static int dnet_io_attr_cmp(const void *d1, const void *d2)
2808 const struct dnet_io_attr *io1 = d1;
2809 const struct dnet_io_attr *io2 = d2;
2811 return memcmp(io1->id, io2->id, DNET_ID_SIZE);
2814 struct dnet_range_data *dnet_bulk_read(struct dnet_node *n, struct dnet_io_attr *ios, uint32_t io_num, int group_id, uint64_t cflags, int *errp)
2816 struct dnet_id id, next_id;
2817 int ret_num;
2818 struct dnet_range_data *ret;
2819 struct dnet_net_state *cur, *next = NULL;
2820 uint64_t size = 0;
2821 void *data;
2822 int err;
2823 uint32_t i, start = -1;
2825 if (io_num <= 0) {
2826 return 0;
2829 qsort(ios, io_num, sizeof(struct dnet_io_attr), dnet_io_attr_cmp);
2831 ret = NULL;
2832 ret_num = 0;
2833 size = 0;
2835 dnet_setup_id(&id, group_id, ios[0].id);
2836 id.type = ios[0].type;
2838 cur = dnet_state_get_first(n, &id);
2839 if (!cur) {
2840 dnet_log(n, DNET_LOG_ERROR, "%s: Can't get state for id\n", dnet_dump_id(&id));
2841 err = -ENOENT;
2842 goto err_out_exit;
2845 for (i = 0; i < io_num; ++i) {
2846 if ((i + 1) < io_num) {
2847 dnet_setup_id(&next_id, group_id, ios[i+1].id);
2848 next_id.type = ios[i+1].type;
2850 next = dnet_state_get_first(n, &next_id);
2851 if (!next) {
2852 dnet_log(n, DNET_LOG_ERROR, "%s: Can't get state for id\n", dnet_dump_id(&next_id));
2853 err = -ENOENT;
2854 goto err_out_put;
2857 /* Send command only if state changes or it's a last id */
2858 if ((cur == next)) {
2859 dnet_state_put(next);
2860 next = NULL;
2861 continue;
2865 dnet_log(n, DNET_LOG_NOTICE, "start: %s: end: %s, count: %llu, addr: %s\n",
2866 dnet_dump_id(&id),
2867 dnet_dump_id(&next_id),
2868 (unsigned long long)(i - start),
2869 dnet_state_dump_addr(cur));
2871 data = dnet_bulk_read_wait_raw(n, &id, ios, i - start, DNET_CMD_BULK_READ, cflags, &err);
2872 if (data) {
2873 size = err;
2874 err = 0;
2876 if (!size) {
2877 free(data);
2878 } else {
2879 struct dnet_range_data *new_ret;
2881 ret_num++;
2883 new_ret = realloc(ret, ret_num * sizeof(struct dnet_range_data));
2884 if (!new_ret) {
2885 goto err_out_put;
2888 ret = new_ret;
2890 ret[ret_num - 1].data = data;
2891 ret[ret_num - 1].size = size;
2894 err = 0;
2897 dnet_state_put(cur);
2898 cur = next;
2899 next = NULL;
2900 memcpy(&id, &next_id, sizeof(struct dnet_id));
2903 err_out_put:
2904 if (next)
2905 dnet_state_put(next);
2906 dnet_state_put(cur);
2907 err_out_exit:
2908 if (ret) {
2909 *errp = ret_num;
2910 } else {
2911 *errp = err;
2913 return ret;
2916 struct dnet_range_data dnet_bulk_write(struct dnet_node *n, struct dnet_io_control *ctl, int ctl_num, int *errp)
2918 int err, i, trans_num = 0, local_trans_num;
2919 struct dnet_wait *w;
2920 struct dnet_write_completion *wc;
2921 struct dnet_range_data ret;
2922 struct dnet_metadata_control mcl;
2923 struct dnet_meta_container mc;
2924 struct dnet_io_control meta_ctl;
2925 struct timeval tv;
2926 int *groups = NULL;
2927 int group_num = 0;
2929 memset(&ret, 0, sizeof(ret));
2931 wc = malloc(sizeof(struct dnet_write_completion));
2932 if (!wc) {
2933 err = -ENOMEM;
2934 goto err_out_exit;
2936 memset(wc, 0, sizeof(struct dnet_write_completion));
2938 w = dnet_wait_alloc(0);
2939 if (!w) {
2940 err = -ENOMEM;
2941 free(wc);
2942 goto err_out_exit;
2944 wc->wait = w;
2946 atomic_set(&w->refcnt, INT_MAX);
2947 w->status = -ENOENT;
2949 for (i = 0; i < ctl_num; ++i) {
2950 ctl[i].priv = wc;
2951 ctl[i].complete = dnet_write_complete;
2953 ctl[i].cmd = DNET_CMD_WRITE;
2954 ctl[i].cflags = DNET_FLAGS_NEED_ACK;
2956 memcpy(ctl[i].io.id, ctl[i].id.id, DNET_ID_SIZE);
2957 memcpy(ctl[i].io.parent, ctl[i].id.id, DNET_ID_SIZE);
2959 local_trans_num = dnet_write_object(n, &ctl[i]);
2960 if (local_trans_num < 0)
2961 local_trans_num = 0;
2963 trans_num += local_trans_num;
2965 /* Prepare and send metadata */
2966 memset(&mcl, 0, sizeof(mcl));
2968 pthread_mutex_lock(&n->group_lock);
2969 group_num = n->group_num;
2970 groups = alloca(group_num * sizeof(int));
2972 memcpy(groups, n->groups, group_num * sizeof(int));
2973 pthread_mutex_unlock(&n->group_lock);
2975 mcl.groups = groups;
2976 mcl.group_num = group_num;
2977 mcl.id = ctl[i].id;
2978 mcl.cflags = ctl[i].cflags;
2980 gettimeofday(&tv, NULL);
2981 mcl.ts.tv_sec = tv.tv_sec;
2982 mcl.ts.tv_nsec = tv.tv_usec * 1000;
2984 memset(&mc, 0, sizeof(mc));
2986 err = dnet_create_metadata(n, &mcl, &mc);
2987 dnet_log(n, DNET_LOG_DEBUG, "Creating metadata: err: %d", err);
2988 if (!err) {
2989 dnet_convert_metadata(n, mc.data, mc.size);
2991 memset(&meta_ctl, 0, sizeof(struct dnet_io_control));
2993 meta_ctl.priv = wc;
2994 meta_ctl.complete = dnet_write_complete;
2995 meta_ctl.cmd = DNET_CMD_WRITE;
2996 meta_ctl.fd = -1;
2998 meta_ctl.cflags = ctl[i].cflags;
3000 memcpy(&meta_ctl.id, &ctl[i].id, sizeof(struct dnet_id));
3001 memcpy(meta_ctl.io.id, ctl[i].id.id, DNET_ID_SIZE);
3002 memcpy(meta_ctl.io.parent, ctl[i].id.id, DNET_ID_SIZE);
3003 meta_ctl.id.type = meta_ctl.io.type = EBLOB_TYPE_META;
3005 meta_ctl.io.flags |= DNET_IO_FLAGS_META;
3006 meta_ctl.io.offset = 0;
3007 meta_ctl.io.size = mc.size;
3008 meta_ctl.data = mc.data;
3010 local_trans_num = dnet_write_object(n, &meta_ctl);
3011 if (local_trans_num < 0)
3012 local_trans_num = 0;
3014 trans_num += local_trans_num;
3019 * 1 - the first reference counter we grabbed at allocation time
3021 atomic_sub(&w->refcnt, INT_MAX - trans_num - 1);
3023 err = dnet_wait_event(w, w->cond == trans_num, &n->wait_ts);
3024 if (err || w->status) {
3025 if (!err)
3026 err = w->status;
3027 dnet_log(n, DNET_LOG_NOTICE, "%s: failed to wait for IO write completion, err: %d, status: %d.\n",
3028 dnet_dump_id(&ctl->id), err, w->status);
3031 if (err || !trans_num) {
3032 if (!err)
3033 err = -EINVAL;
3034 dnet_log(n, DNET_LOG_ERROR, "Failed to write data into the storage, err: %d, trans_num: %d.\n", err, trans_num);
3035 goto err_out_put;
3038 if (trans_num)
3039 dnet_log(n, DNET_LOG_NOTICE, "%s: successfully wrote %llu bytes into the storage, reply size: %d.\n",
3040 dnet_dump_id(&ctl->id), (unsigned long long)ctl->io.size, wc->size);
3041 err = trans_num;
3043 ret.data = wc->reply;
3044 ret.size = wc->size;
3046 wc->reply = NULL;
3048 err_out_put:
3049 dnet_write_complete_free(wc);
3050 err_out_exit:
3051 *errp = err;
3052 return ret;
3055 int dnet_flags(struct dnet_node *n)
3057 return n->flags;
3060 static int dnet_start_defrag_complete(struct dnet_net_state *state, struct dnet_cmd *cmd, void *priv)
3062 struct dnet_wait *w = priv;
3064 if (is_trans_destroyed(state, cmd)) {
3065 dnet_wakeup(w, w->cond++);
3066 dnet_wait_put(w);
3067 return 0;
3070 return 0;
3073 static int dnet_start_defrag_single(struct dnet_net_state *st, void *priv, uint64_t cflags)
3075 struct dnet_trans_control ctl;
3077 memset(&ctl, 0, sizeof(struct dnet_trans_control));
3079 dnet_setup_id(&ctl.id, st->idc->group->group_id, st->idc->ids[0].raw.id);
3080 ctl.cmd = DNET_CMD_DEFRAG;
3081 ctl.complete = dnet_start_defrag_complete;
3082 ctl.priv = priv;
3083 ctl.cflags = DNET_FLAGS_NEED_ACK | cflags;
3085 return dnet_trans_alloc_send_state(st, &ctl);
3088 int dnet_start_defrag(struct dnet_node *n, uint64_t cflags)
3090 struct dnet_net_state *st;
3091 struct dnet_wait *w;
3092 struct dnet_group *g;
3093 int num = 0;
3094 int err;
3096 w = dnet_wait_alloc(0);
3097 if (!w) {
3098 err = -ENOMEM;
3099 goto err_out_exit;
3102 pthread_mutex_lock(&n->state_lock);
3103 list_for_each_entry(g, &n->group_list, group_entry) {
3104 list_for_each_entry(st, &g->state_list, state_entry) {
3105 if (st == n->st)
3106 continue;
3108 if (w)
3109 dnet_wait_get(w);
3111 dnet_start_defrag_single(st, w, cflags);
3112 num++;
3115 pthread_mutex_unlock(&n->state_lock);
3117 err = dnet_wait_event(w, w->cond == num, &n->wait_ts);
3118 dnet_wait_put(w);
3120 err_out_exit:
3121 return err;