Tune nonblocking pool growing policy
[elliptics.git] / library / dnet.c
blobedcec2569b3ee19339b602304d7690b7f033c79f
1 /*
2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
17 #include <sys/stat.h>
18 #include <sys/socket.h>
19 #include <sys/mman.h>
20 #include <sys/wait.h>
22 #include <ctype.h>
23 #include <fcntl.h>
24 #include <limits.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <unistd.h>
29 #include "elliptics.h"
31 #include "elliptics/packet.h"
32 #include "elliptics/interface.h"
35 int dnet_transform(struct dnet_node *n, const void *src, uint64_t size, struct dnet_id *id)
37 struct dnet_transform *t = &n->transform;
38 unsigned int csize = sizeof(id->id);
40 return t->transform(t->priv, src, size, id->id, &csize, 0);
43 int dnet_stat_local(struct dnet_net_state *st, struct dnet_id *id)
45 struct dnet_node *n = st->n;
46 int size, cmd_size;
47 struct dnet_cmd *cmd;
48 struct dnet_io_attr *io;
49 int err;
51 size = 1;
52 cmd_size = size + sizeof(struct dnet_cmd) + sizeof(struct dnet_io_attr);
54 cmd = malloc(cmd_size);
55 if (!cmd) {
56 dnet_log(n, DNET_LOG_ERROR, "%s: failed to allocate %d bytes for local stat.\n",
57 dnet_dump_id(id), cmd_size);
58 err = -ENOMEM;
59 goto err_out_exit;
62 memset(cmd, 0, cmd_size);
64 io = (struct dnet_io_attr *)(cmd + 1);
66 memcpy(&cmd->id, id, sizeof(struct dnet_id));
67 cmd->size = cmd_size - sizeof(struct dnet_cmd);
68 cmd->flags = DNET_FLAGS_NOLOCK;
69 cmd->cmd = DNET_CMD_READ;
71 io->size = cmd->size - sizeof(struct dnet_io_attr);
72 io->offset = 0;
73 io->flags = DNET_IO_FLAGS_SKIP_SENDING;
75 memcpy(io->parent, id->id, DNET_ID_SIZE);
76 memcpy(io->id, id->id, DNET_ID_SIZE);
78 dnet_convert_io_attr(io);
80 err = n->cb->command_handler(st, n->cb->command_private, cmd, io);
81 dnet_log(n, DNET_LOG_INFO, "%s: local stat: io_size: %llu, err: %d.\n", dnet_dump_id(&cmd->id), (unsigned long long)io->size, err);
83 free(cmd);
85 err_out_exit:
86 return err;
89 int dnet_remove_local(struct dnet_node *n, struct dnet_id *id)
91 int cmd_size;
92 struct dnet_cmd *cmd;
93 struct dnet_io_attr *io;
94 int err;
96 cmd_size = sizeof(struct dnet_cmd) + sizeof(struct dnet_io_attr);
98 cmd = malloc(cmd_size);
99 if (!cmd) {
100 dnet_log(n, DNET_LOG_ERROR, "%s: failed to allocate %d bytes for local remove.\n",
101 dnet_dump_id(id), cmd_size);
102 err = -ENOMEM;
103 goto err_out_exit;
106 memset(cmd, 0, cmd_size);
108 io = (struct dnet_io_attr *)(cmd + 1);
110 cmd->id = *id;
111 cmd->size = cmd_size - sizeof(struct dnet_cmd);
112 cmd->flags = DNET_FLAGS_NOLOCK;
113 cmd->cmd = DNET_CMD_DEL;
115 io->flags = DNET_IO_FLAGS_SKIP_SENDING;
117 memcpy(io->parent, id->id, DNET_ID_SIZE);
118 memcpy(io->id, id->id, DNET_ID_SIZE);
120 dnet_convert_io_attr(io);
122 err = n->cb->command_handler(n->st, n->cb->command_private, cmd, io);
123 dnet_log(n, DNET_LOG_NOTICE, "%s: local remove: err: %d.\n", dnet_dump_id(&cmd->id), err);
125 free(cmd);
127 err_out_exit:
128 return err;
132 static void dnet_send_idc_fill(struct dnet_net_state *st, void *buf, int size,
133 struct dnet_id *id, uint64_t trans, unsigned int command, int reply, int direct, int more)
135 struct dnet_node *n = st->n;
136 struct dnet_cmd *cmd;
137 struct dnet_raw_id *sid;
138 struct dnet_addr_attr *addr;
139 int i;
141 memset(buf, 0, sizeof(*cmd) + sizeof(*addr));
143 cmd = buf;
144 addr = (struct dnet_addr_attr *)(cmd + 1);
145 sid = (struct dnet_raw_id *)(addr + 1);
147 memcpy(&cmd->id, id, sizeof(struct dnet_id));
148 cmd->size = size - sizeof(struct dnet_cmd);
149 cmd->trans = trans;
151 cmd->flags = DNET_FLAGS_NOLOCK;
152 if (more)
153 cmd->flags |= DNET_FLAGS_MORE;
154 if (direct)
155 cmd->flags |= DNET_FLAGS_DIRECT;
156 if (reply)
157 cmd->trans |= DNET_TRANS_REPLY;
159 cmd->cmd = command;
161 addr->sock_type = n->sock_type;
162 addr->family = n->family;
163 addr->proto = n->proto;
164 memcpy(&addr->addr, &st->addr, sizeof(struct dnet_addr));
166 for (i=0; i<st->idc->id_num; ++i) {
167 memcpy(&sid[i], &st->idc->ids[i].raw, sizeof(struct dnet_raw_id));
168 dnet_convert_raw_id(&sid[i]);
171 dnet_convert_addr_cmd(buf);
174 static int dnet_send_idc(struct dnet_net_state *orig, struct dnet_net_state *send, struct dnet_id *id, uint64_t trans,
175 unsigned int command, int reply, int direct, int more)
177 struct dnet_node *n = orig->n;
178 int size = sizeof(struct dnet_addr_cmd) + orig->idc->id_num * sizeof(struct dnet_raw_id);
179 void *buf;
180 int err;
181 struct timeval start, end;
182 long diff;
184 gettimeofday(&start, NULL);
186 buf = malloc(size);
187 if (!buf) {
188 err = -ENOMEM;
189 goto err_out_exit;
191 memset(buf, 0, sizeof(struct dnet_addr_cmd));
193 dnet_send_idc_fill(orig, buf, size, id, trans, command, reply, direct, more);
195 gettimeofday(&end, NULL);
196 diff = (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec;
197 dnet_log(n, DNET_LOG_INFO, "%s: sending address %s: %ld\n", dnet_dump_id(id), dnet_state_dump_addr(orig), diff);
199 err = dnet_send(send, buf, size);
201 free(buf);
203 err_out_exit:
204 return err;
207 static int dnet_cmd_reverse_lookup(struct dnet_net_state *st, struct dnet_cmd *cmd, void *data __unused)
209 struct dnet_node *n = st->n;
210 struct dnet_net_state *base;
211 int err = -ENOENT;
213 cmd->id.group_id = n->id.group_id;
214 base = dnet_node_state(n);
215 if (base) {
216 err = dnet_send_idc(base, st, &cmd->id, cmd->trans, DNET_CMD_REVERSE_LOOKUP, 1, 0, 0);
217 dnet_state_put(base);
220 return err;
223 static int dnet_check_connection(struct dnet_node *n, struct dnet_addr_attr *a)
225 int s;
227 s = dnet_socket_create_addr(n, a->sock_type, a->proto, a->family,
228 (struct sockaddr *)a->addr.addr, a->addr.addr_len, 0);
229 if (s < 0)
230 return s;
232 dnet_sock_close(s);
233 return 0;
236 static int dnet_cmd_join_client(struct dnet_net_state *st, struct dnet_cmd *cmd, void *data)
238 struct dnet_node *n = st->n;
239 struct dnet_addr_attr *a = data;
240 struct dnet_raw_id *ids;
241 int num, i, err;
243 dnet_convert_addr_attr(a);
245 dnet_log(n, DNET_LOG_DEBUG, "%s: accepted joining client (%s), requesting statistics.\n",
246 dnet_dump_id(&cmd->id), dnet_server_convert_dnet_addr(&a->addr));
247 err = dnet_check_connection(n, a);
248 if (err) {
249 dnet_log(n, DNET_LOG_ERROR, "%s: failed to request statistics from joining client (%s), dropping connection.\n",
250 dnet_dump_id(&cmd->id), dnet_server_convert_dnet_addr(&a->addr));
251 return err;
254 num = (cmd->size - sizeof(struct dnet_addr_attr)) / sizeof(struct dnet_raw_id);
255 ids = (struct dnet_raw_id *)(a + 1);
256 for (i=0; i<num; ++i)
257 dnet_convert_raw_id(&ids[0]);
259 pthread_mutex_lock(&n->state_lock);
260 list_del_init(&st->state_entry);
261 list_del_init(&st->storage_state_entry);
262 pthread_mutex_unlock(&n->state_lock);
264 memcpy(&st->addr, &a->addr, sizeof(struct dnet_addr));
265 err = dnet_idc_create(st, cmd->id.group_id, ids, num);
267 dnet_log(n, DNET_LOG_INFO, "%s: accepted join request from state %s: %d.\n", dnet_dump_id(&cmd->id),
268 dnet_server_convert_dnet_addr(&a->addr), err);
270 return err;
273 static int dnet_cmd_route_list(struct dnet_net_state *orig, struct dnet_cmd *cmd)
275 struct dnet_node *n = orig->n;
276 struct dnet_net_state *st;
277 struct dnet_group *g;
278 void *buf, *orig_buf;
279 size_t size = 0, send_size = 0, sz;
280 int err;
282 pthread_mutex_lock(&n->state_lock);
283 list_for_each_entry(g, &n->group_list, group_entry) {
284 list_for_each_entry(st, &g->state_list, state_entry) {
285 if (!memcmp(&st->addr, &orig->addr, sizeof(struct dnet_addr)))
286 continue;
288 size += st->idc->id_num * sizeof(struct dnet_raw_id) + sizeof(struct dnet_addr_cmd);
291 pthread_mutex_unlock(&n->state_lock);
293 orig_buf = buf = malloc(size);
294 if (!buf) {
295 err = -ENOMEM;
296 goto err_out_exit;
299 pthread_mutex_lock(&n->state_lock);
300 list_for_each_entry(g, &n->group_list, group_entry) {
301 list_for_each_entry(st, &g->state_list, state_entry) {
302 if (!memcmp(&st->addr, &orig->addr, sizeof(struct dnet_addr)))
303 continue;
305 sz = st->idc->id_num * sizeof(struct dnet_raw_id) + sizeof(struct dnet_addr_cmd);
306 if (sz <= size) {
307 cmd->id.group_id = g->group_id;
308 dnet_send_idc_fill(st, buf, sz, &cmd->id, cmd->trans, DNET_CMD_ROUTE_LIST, 1, 0, 1);
310 size -= sz;
311 buf += sz;
313 send_size += sz;
317 pthread_mutex_unlock(&n->state_lock);
319 err = dnet_send(orig, orig_buf, send_size);
320 if (err)
321 goto err_out_free;
323 err_out_free:
324 free(orig_buf);
325 err_out_exit:
326 return err;
329 static int dnet_cmd_exec(struct dnet_net_state *st, struct dnet_cmd *cmd, void *data)
331 struct dnet_node *n = st->n;
332 struct sph *e = data;
333 int err = -ENOTSUP;
335 data += sizeof(struct sph);
337 dnet_convert_sph(e);
339 if (e->event_size + e->data_size + e->binary_size + sizeof(struct sph) != cmd->size) {
340 err = -E2BIG;
341 dnet_log(n, DNET_LOG_ERROR, "%s: invalid size: event-size %d, data-size %llu, binary-size %llu must be: %llu\n",
342 dnet_dump_id(&cmd->id),
343 e->event_size,
344 (unsigned long long)e->data_size,
345 (unsigned long long)e->binary_size,
346 (unsigned long long)cmd->size);
347 goto err_out_exit;
350 err = dnet_cmd_exec_raw(st, cmd, e, data);
352 err_out_exit:
353 return err;
356 static int dnet_cmd_stat_count_single(struct dnet_net_state *orig, struct dnet_cmd *cmd, struct dnet_net_state *st, struct dnet_addr_stat *as)
358 int i;
360 cmd->cmd = DNET_CMD_STAT_COUNT;
362 memcpy(&as->addr, &st->addr, sizeof(struct dnet_addr));
363 as->num = __DNET_CMD_MAX;
364 as->cmd_num = __DNET_CMD_MAX;
366 for (i=0; i<as->num; ++i) {
367 as->count[i] = st->stat[i];
370 dnet_convert_addr_stat(as, as->num);
372 return dnet_send_reply(orig, cmd, as, sizeof(struct dnet_addr_stat) + __DNET_CMD_MAX * sizeof(struct dnet_stat_count), 1);
375 static int dnet_cmd_stat_count_global(struct dnet_net_state *orig, struct dnet_cmd *cmd,
376 struct dnet_node *n, struct dnet_addr_stat *as)
378 struct dnet_stat st;
379 int err = 0;
381 cmd->cmd = DNET_CMD_STAT_COUNT;
383 memcpy(&as->addr, &n->addr, sizeof(struct dnet_addr));
384 as->num = __DNET_CNTR_MAX;
385 as->cmd_num = __DNET_CMD_MAX;
387 memcpy(as->count, n->counters, sizeof(struct dnet_stat_count) * __DNET_CNTR_MAX);
389 if (n->cb->storage_stat) {
390 err = n->cb->storage_stat(n->cb->command_private, &st);
391 if (err)
392 return err;
394 as->count[DNET_CNTR_LA1].count = st.la[0];
395 as->count[DNET_CNTR_LA5].count = st.la[1];
396 as->count[DNET_CNTR_LA15].count = st.la[2];
397 as->count[DNET_CNTR_BSIZE].count = st.bsize;
398 as->count[DNET_CNTR_FRSIZE].count = st.frsize;
399 as->count[DNET_CNTR_BLOCKS].count = st.blocks;
400 as->count[DNET_CNTR_BFREE].count = st.bfree;
401 as->count[DNET_CNTR_BAVAIL].count = st.bavail;
402 as->count[DNET_CNTR_FILES].count = st.files;
403 as->count[DNET_CNTR_FFREE].count = st.ffree;
404 as->count[DNET_CNTR_FAVAIL].count = st.favail;
405 as->count[DNET_CNTR_FSID].count = st.fsid;
406 as->count[DNET_CNTR_VM_ACTIVE].count = st.vm_active;
407 as->count[DNET_CNTR_VM_INACTIVE].count = st.vm_inactive;
408 as->count[DNET_CNTR_VM_TOTAL].count = st.vm_total;
409 as->count[DNET_CNTR_VM_FREE].count = st.vm_free;
410 as->count[DNET_CNTR_VM_CACHED].count = st.vm_cached;
411 as->count[DNET_CNTR_VM_BUFFERS].count = st.vm_buffers;
413 as->count[DNET_CNTR_NODE_FILES].count = n->cb->meta_total_elements(n->cb->command_private);
415 dnet_convert_addr_stat(as, as->num);
417 return dnet_send_reply(orig, cmd, as, sizeof(struct dnet_addr_stat) + __DNET_CNTR_MAX * sizeof(struct dnet_stat_count), 1);
420 static int dnet_cmd_stat_count(struct dnet_net_state *orig, struct dnet_cmd *cmd, void *data __unused)
422 struct dnet_node *n = orig->n;
423 struct dnet_net_state *st;
424 struct dnet_addr_stat *as;
425 int err = 0;
427 as = alloca(sizeof(struct dnet_addr_stat) + __DNET_CNTR_MAX * sizeof(struct dnet_stat_count));
428 if (!as) {
429 err = -ENOMEM;
430 goto err_out_exit;
433 if (cmd->flags & DNET_ATTR_CNTR_GLOBAL) {
434 err = dnet_cmd_stat_count_global(orig, cmd, orig->n, as);
435 } else {
436 pthread_mutex_lock(&n->state_lock);
437 #if 0
438 list_for_each_entry(st, &n->state_list, state_entry) {
439 err = dnet_cmd_stat_count_single(orig, cmd, st, as);
440 if (err)
441 goto err_out_unlock;
443 #endif
444 list_for_each_entry(st, &n->empty_state_list, state_entry) {
445 err = dnet_cmd_stat_count_single(orig, cmd, st, as);
446 if (err)
447 goto err_out_unlock;
449 err_out_unlock:
450 pthread_mutex_unlock(&n->state_lock);
453 err_out_exit:
454 return err;
457 static int dnet_cmd_status(struct dnet_net_state *orig, struct dnet_cmd *cmd __unused, void *data)
459 struct dnet_node *n = orig->n;
460 struct dnet_node_status *st = data;
462 dnet_convert_node_status(st);
464 dnet_log(n, DNET_LOG_INFO, "%s: status-change: nflags: %x->%x, log_level: %d->%d, "
465 "status_flags: EXIT: %d, RO: %d\n",
466 dnet_dump_id(&cmd->id), n->flags, st->nflags, n->log->log_level, st->log_level,
467 !!(st->status_flags & DNET_STATUS_EXIT), !!(st->status_flags & DNET_STATUS_RO));
469 if (st->status_flags != -1) {
470 if (st->status_flags & DNET_STATUS_EXIT) {
471 dnet_set_need_exit(n);
474 if (st->status_flags & DNET_STATUS_RO) {
475 n->ro = 1;
476 } else {
477 n->ro = 0;
481 if (st->nflags != -1)
482 n->flags = st->nflags;
484 if (st->log_level != ~0U)
485 n->log->log_level = st->log_level;
487 st->nflags = n->flags;
488 st->log_level = n->log->log_level;
489 st->status_flags = 0;
491 if (n->need_exit)
492 st->status_flags |= DNET_STATUS_EXIT;
494 if (n->ro)
495 st->status_flags |= DNET_STATUS_RO;
497 dnet_convert_node_status(st);
499 return dnet_send_reply(orig, cmd, st, sizeof(struct dnet_node_status), 1);
502 static int dnet_cmd_auth(struct dnet_net_state *orig, struct dnet_cmd *cmd __unused, void *data)
504 struct dnet_node *n = orig->n;
505 struct dnet_auth *a = data;
506 int err = 0;
508 if (cmd->size != sizeof(struct dnet_auth)) {
509 err = -EINVAL;
510 goto err_out_exit;
513 dnet_convert_auth(a);
514 if (memcmp(n->cookie, a->cookie, DNET_AUTH_COOKIE_SIZE)) {
515 err = -EPERM;
516 dnet_log(n, DNET_LOG_ERROR, "%s: auth cookies do not match\n", dnet_state_dump_addr(orig));
517 } else {
518 dnet_log(n, DNET_LOG_INFO, "%s: authentication succeeded\n", dnet_state_dump_addr(orig));
521 err_out_exit:
522 return err;
525 int dnet_process_cmd_raw(struct dnet_net_state *st, struct dnet_cmd *cmd, void *data)
527 int err = 0;
528 unsigned long long size = cmd->size;
529 struct dnet_node *n = st->n;
530 unsigned long long tid = cmd->trans & ~DNET_TRANS_REPLY;
531 struct dnet_io_attr *io;
532 struct timeval start, end;
533 long diff;
535 if (!(cmd->flags & DNET_FLAGS_NOLOCK)) {
536 dnet_oplock(n, &cmd->id);
539 gettimeofday(&start, NULL);
541 switch (cmd->cmd) {
542 case DNET_CMD_AUTH:
543 err = dnet_cmd_auth(st, cmd, data);
544 break;
545 case DNET_CMD_STATUS:
546 err = dnet_cmd_status(st, cmd, data);
547 break;
548 case DNET_CMD_REVERSE_LOOKUP:
549 err = dnet_cmd_reverse_lookup(st, cmd, data);
550 break;
551 case DNET_CMD_JOIN:
552 err = dnet_cmd_join_client(st, cmd, data);
553 break;
554 case DNET_CMD_ROUTE_LIST:
555 err = dnet_cmd_route_list(st, cmd);
556 break;
557 case DNET_CMD_EXEC:
558 err = dnet_cmd_exec(st, cmd, data);
559 break;
560 case DNET_CMD_STAT_COUNT:
561 err = dnet_cmd_stat_count(st, cmd, data);
562 break;
563 case DNET_CMD_NOTIFY:
564 if (!(cmd->flags & DNET_ATTR_DROP_NOTIFICATION)) {
565 err = dnet_notify_add(st, cmd);
567 * We drop 'need ack' flag, since notification
568 * transaction is a long-living one, since
569 * every notification will be sent as transaction
570 * completion.
572 * Transaction acknowledge will be sent when
573 * notification is removed.
575 if (!err)
576 cmd->flags &= ~DNET_FLAGS_NEED_ACK;
577 } else
578 err = dnet_notify_remove(st, cmd);
579 break;
580 case DNET_CMD_LIST:
581 if (n->ro) {
582 err = -EROFS;
583 } else {
584 if (cmd->flags & DNET_ATTR_BULK_CHECK)
585 err = dnet_cmd_bulk_check(st, cmd, data);
586 else
587 err = dnet_db_list(st, cmd);
589 break;
590 case DNET_CMD_READ:
591 case DNET_CMD_WRITE:
592 case DNET_CMD_DEL:
593 if (n->ro && ((cmd->cmd == DNET_CMD_DEL) || (cmd->cmd == DNET_CMD_WRITE))) {
594 err = -EROFS;
595 break;
598 io = NULL;
599 if (size < sizeof(struct dnet_io_attr)) {
600 dnet_log(st->n, DNET_LOG_ERROR, "%s: invalid size: cmd: %u, rest_size: %llu\n",
601 dnet_dump_id(&cmd->id), cmd->cmd, size);
602 err = -EINVAL;
603 break;
605 io = data;
606 dnet_convert_io_attr(io);
608 dnet_log(n, DNET_LOG_INFO, "%s: %s io command, offset: %llu, size: %llu, ioflags: %x, cflags: %llx, "
609 "node-flags: %x, type: %d\n",
610 dnet_dump_id_str(io->id), dnet_cmd_string(cmd->cmd),
611 (unsigned long long)io->offset, (unsigned long long)io->size,
612 io->flags, (unsigned long long)cmd->flags,
613 n->flags, io->type);
615 if (n->flags & DNET_CFG_NO_CSUM)
616 io->flags |= DNET_IO_FLAGS_NOCSUM;
618 /* do not write metadata for cache-only writes */
619 if ((io->flags & DNET_IO_FLAGS_CACHE_ONLY) && (io->type == EBLOB_TYPE_META)) {
620 err = -EINVAL;
621 break;
625 * Only allow cache for column 0
626 * In the next life (2012 I really expect) there will be no columns at all
628 if (io->type == 0) {
630 * Always check cache when reading!
632 if ((io->flags & DNET_IO_FLAGS_CACHE) || (cmd->cmd != DNET_CMD_WRITE)) {
633 err = dnet_cmd_cache_io(st, cmd, io, data + sizeof(struct dnet_io_attr));
635 if (io->flags & DNET_IO_FLAGS_CACHE_ONLY)
636 break;
639 * We successfully read data from cache, do not sink to disk for it
641 if ((cmd->cmd == DNET_CMD_READ) && !err)
642 break;
646 if (io->flags & DNET_IO_FLAGS_COMPARE_AND_SWAP) {
647 char csum[DNET_ID_SIZE];
648 int csize = DNET_ID_SIZE;
650 err = n->cb->checksum(n, n->cb->command_private, &cmd->id, csum, &csize);
651 if (err < 0) {
652 dnet_log(n, DNET_LOG_ERROR, "%s: cas: checksum operation failed\n", dnet_dump_id(&cmd->id));
653 err = 0;
654 } else {
655 if (memcmp(csum, io->parent, DNET_ID_SIZE)) {
656 dnet_log(n, DNET_LOG_ERROR, "%s: cas: checksum mismatch\n", dnet_dump_id(&cmd->id));
657 err = -EINVAL;
658 break;
663 if ((cmd->cmd == DNET_CMD_DEL) || (io->flags & DNET_IO_FLAGS_META)) {
664 err = dnet_process_meta(st, cmd, data);
665 break;
668 dnet_convert_io_attr(io);
669 default:
670 /* Remove DNET_FLAGS_NEED_ACK flags for WRITE command
671 to eliminate double reply packets
672 (the first one with dnet_file_info structure,
673 the second to destroy transaction on client side) */
674 if ((cmd->cmd == DNET_CMD_WRITE) || (cmd->cmd == DNET_CMD_READ)) {
675 cmd->flags &= ~DNET_FLAGS_NEED_ACK;
677 err = n->cb->command_handler(st, n->cb->command_private, cmd, data);
679 /* If there was error in WRITE command - send empty reply
680 to notify client with error code and destroy transaction */
681 if (err && ((cmd->cmd == DNET_CMD_WRITE) || (cmd->cmd == DNET_CMD_READ))) {
682 cmd->flags |= DNET_FLAGS_NEED_ACK;
684 #if 0
685 if (!err && (cmd->cmd == DNET_CMD_WRITE)) {
686 dnet_update_notify(st, cmd, a, data);
688 #endif
689 break;
692 dnet_stat_inc(st->stat, cmd->cmd, err);
693 if (st->__join_state == DNET_JOIN)
694 dnet_counter_inc(n, cmd->cmd, err);
695 else
696 dnet_counter_inc(n, cmd->cmd + __DNET_CMD_MAX, err);
698 gettimeofday(&end, NULL);
700 diff = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec);
701 dnet_log(n, DNET_LOG_INFO, "%s: %s: trans: %llu, cflags: %llx, time: %ld usecs, err: %d.\n",
702 dnet_dump_id(&cmd->id), dnet_cmd_string(cmd->cmd), tid,
703 (unsigned long long)cmd->flags, diff, err);
705 if (cmd->flags & DNET_FLAGS_NEED_ACK) {
706 struct dnet_cmd ack;
708 memcpy(&ack.id, &cmd->id, sizeof(struct dnet_id));
709 ack.cmd = cmd->cmd;
710 ack.trans = cmd->trans | DNET_TRANS_REPLY;
711 ack.size = 0;
712 ack.flags = cmd->flags & ~(DNET_FLAGS_NEED_ACK | DNET_FLAGS_MORE);
713 ack.status = err;
715 dnet_log(n, DNET_LOG_DEBUG, "%s: ack trans: %llu, flags: %llx, status: %d.\n",
716 dnet_dump_id(&cmd->id), tid, (unsigned long long)ack.flags, err);
718 dnet_convert_cmd(&ack);
719 err = dnet_send(st, &ack, sizeof(struct dnet_cmd));
722 if (!(cmd->flags & DNET_FLAGS_NOLOCK))
723 dnet_opunlock(n, &cmd->id);
725 return err;
728 int dnet_state_join_nolock(struct dnet_net_state *st)
730 int err;
731 struct dnet_node *n = st->n;
732 struct dnet_net_state *base;
733 struct dnet_id id;
735 base = dnet_state_search_nolock(n, &n->id);
736 if (!base) {
737 err = -ENOENT;
738 goto err_out_exit;
741 /* we do not care about group_id actually, since use direct send */
742 memcpy(&id, &n->id, sizeof(id));
744 err = dnet_send_idc(base, st, &id, 0, DNET_CMD_JOIN, 0, 1, 0);
745 if (err) {
746 dnet_log(n, DNET_LOG_ERROR, "%s: failed to send join request to %s.\n",
747 dnet_dump_id(&id), dnet_server_convert_dnet_addr(&st->addr));
748 goto err_out_put;
751 st->__join_state = DNET_JOIN;
752 dnet_log(n, DNET_LOG_INFO, "%s: successfully joined network, group %d.\n", dnet_dump_id(&id), id.group_id);
754 err_out_put:
755 /* this is dangerous, since base can go away and we will destroy it here,
756 * which in turn will call dnet_state_remove(), which will deadlock with n->state_lock already being held
758 * FIXME
760 dnet_state_put(base);
761 err_out_exit:
762 return err;
766 int64_t dnet_get_param(struct dnet_node *n, struct dnet_id *id, enum id_params param)
768 struct dnet_net_state *st;
769 int64_t ret = 1;
771 st = dnet_state_get_first(n, id);
772 if (!st)
773 return -ENOENT;
775 switch (param) {
776 case DNET_ID_PARAM_LA:
777 ret = st->la;
778 break;
779 case DNET_ID_PARAM_FREE_SPACE:
780 ret = st->free;
781 break;
782 default:
783 break;
785 dnet_state_put(st);
787 return ret;
790 static int dnet_compare_by_param(const void *id1, const void *id2)
792 const struct dnet_id_param *l1 = id1;
793 const struct dnet_id_param *l2 = id2;
795 if (l1->param == l2->param)
796 return l1->param_reserved - l2->param_reserved;
798 return l1->param - l2->param;
801 /* TODO: remove this function
802 int dnet_generate_ids_by_param(struct dnet_node *n, struct dnet_id *id, enum id_params param, struct dnet_id_param **dst)
804 int i, err = 0, group_num = 0;
805 struct dnet_id_param *ids;
806 struct dnet_group *g;
808 if (n->group_num) {
809 pthread_mutex_lock(&n->group_lock);
810 if (n->group_num) {
811 group_num = n->group_num;
813 ids = malloc(group_num * sizeof(struct dnet_id_param));
814 if (!ids) {
815 err = -ENOMEM;
816 goto err_out_unlock_group;
818 for (i=0; i<group_num; ++i)
819 ids[i].group_id = n->groups[i];
821 err_out_unlock_group:
822 pthread_mutex_unlock(&n->group_lock);
823 if (err)
824 goto err_out_exit;
827 if (!group_num) {
828 int pos = 0;
830 pthread_mutex_lock(&n->state_lock);
831 list_for_each_entry(g, &n->group_list, group_entry)
832 group_num++;
834 ids = malloc(group_num * sizeof(struct dnet_id_param));
835 if (!ids) {
836 err = -ENOMEM;
837 goto err_out_unlock_state;
840 list_for_each_entry(g, &n->group_list, group_entry) {
841 ids[pos].group_id = g->group_id;
842 pos++;
844 err_out_unlock_state:
845 pthread_mutex_unlock(&n->state_lock);
846 if (err)
847 goto err_out_exit;
850 for (i=0; i<group_num; ++i) {
851 id->group_id = ids[i].group_id;
852 ids[i].param = dnet_get_param(n, id, param);
855 qsort(ids, group_num, sizeof(struct dnet_id_param), dnet_compare_by_param);
856 *dst = ids;
858 for (i=0; i<group_num; ++i) {
859 id->group_id = ids[i].group_id;
862 err = group_num;
864 err_out_exit:
865 return err;
869 static int dnet_populate_cache(struct dnet_node *n, struct dnet_cmd *cmd, struct dnet_io_attr *io,
870 void *data, int fd, size_t fd_offset, size_t size)
872 void *orig_data = data;
873 ssize_t err;
875 if (!data && fd >= 0) {
876 ssize_t tmp_size = size;
878 if (size >= n->cache_size)
879 return -ENOMEM;
881 orig_data = data = malloc(size);
882 if (!data)
883 return -ENOMEM;
885 while (tmp_size > 0) {
886 err = pread(fd, data, tmp_size, fd_offset);
887 if (err <= 0) {
888 dnet_log_err(n, "%s: failed to populate cache: pread: offset: %zd, size: %zd",
889 dnet_dump_id(&cmd->id), fd_offset, size);
890 goto err_out_free;
893 data += err;
894 tmp_size -= err;
895 fd_offset += err;
899 cmd->cmd = DNET_CMD_WRITE;
900 err = dnet_cmd_cache_io(n->st, cmd, io, orig_data);
901 cmd->cmd = DNET_CMD_READ;
903 err_out_free:
904 if (data != orig_data)
905 free(orig_data);
907 return err;
910 int dnet_send_read_data(void *state, struct dnet_cmd *cmd, struct dnet_io_attr *io, void *data,
911 int fd, uint64_t offset, int close_on_exit)
913 struct dnet_net_state *st = state;
914 struct dnet_cmd *c;
915 struct dnet_io_attr *rio;
916 int hsize = sizeof(struct dnet_cmd) + sizeof(struct dnet_io_attr);
917 int err;
920 * A simple hack to forbid read reply sending.
921 * It is used in local stat - we do not want to send stat data
922 * back to parental client, instead server will wrap data into
923 * proper transaction reply next to this obscure packet.
925 if (io->flags & DNET_IO_FLAGS_SKIP_SENDING)
926 return 0;
928 c = malloc(hsize);
929 if (!c) {
930 err = -ENOMEM;
931 goto err_out_exit;
934 memset(c, 0, hsize);
936 rio = (struct dnet_io_attr *)(c + 1);
938 dnet_setup_id(&c->id, cmd->id.group_id, io->id);
940 c->flags = cmd->flags & ~(DNET_FLAGS_NEED_ACK | DNET_FLAGS_MORE);
941 if (cmd->flags & DNET_FLAGS_NEED_ACK)
942 c->flags |= DNET_FLAGS_MORE;
944 c->size = sizeof(struct dnet_io_attr) + io->size;
945 c->trans = cmd->trans | DNET_TRANS_REPLY;
946 c->cmd = DNET_CMD_READ;
948 memcpy(rio, io, sizeof(struct dnet_io_attr));
950 dnet_log_raw(st->n, DNET_LOG_NOTICE, "%s: %s: reply: offset: %llu, size: %llu.\n",
951 dnet_dump_id(&c->id), dnet_cmd_string(c->cmd),
952 (unsigned long long)io->offset, (unsigned long long)io->size);
954 /* only populate data which has zero offset and from column 0 */
955 if ((io->flags & DNET_IO_FLAGS_CACHE) && !io->offset && (io->type == 0)) {
956 err = dnet_populate_cache(st->n, c, rio, data, fd, offset, io->size);
959 dnet_convert_cmd(c);
960 dnet_convert_io_attr(rio);
962 if (data)
963 err = dnet_send_data(st, c, hsize, data, io->size);
964 else
965 err = dnet_send_fd(st, c, hsize, fd, offset, io->size, close_on_exit);
967 free(c);
969 err_out_exit:
970 return err;
973 void dnet_fill_addr_attr(struct dnet_node *n, struct dnet_addr_attr *attr)
975 memcpy(&attr->addr, &n->addr, sizeof(struct dnet_addr));
977 attr->sock_type = n->sock_type;
978 attr->family = n->family;
979 attr->proto = n->proto;
982 int dnet_read_file_info(struct dnet_node *n, struct dnet_id *id, struct dnet_file_info *info)
984 struct dnet_meta *m;
985 struct dnet_meta_update *mu;
986 struct dnet_meta_container mc;
987 struct dnet_raw_id raw;
988 int err;
990 memcpy(raw.id, id->id, DNET_ID_SIZE);
992 err = n->cb->meta_read(n->cb->command_private, &raw, &mc.data);
993 if (err < 0) {
994 goto err_out_exit;
996 mc.size = err;
998 m = dnet_meta_search(n, &mc, DNET_META_UPDATE);
999 if (!m) {
1000 dnet_log(n, DNET_LOG_ERROR, "%s: dnet_read_file_info_verify_csum: no DNET_META_UPDATE tag in metadata\n",
1001 dnet_dump_id(id));
1002 err = -ENODATA;
1003 goto err_out_free;
1006 mu = (struct dnet_meta_update *)m->data;
1007 dnet_convert_meta_update(mu);
1009 info->ctime = info->mtime = mu->tm;
1010 err = 0;
1012 err_out_free:
1013 free(mc.data);
1014 err_out_exit:
1015 return err;
1018 static int dnet_fd_readlink(int fd, char **datap)
1020 char *dst, src[64];
1021 int dsize = 4096;
1022 int err;
1024 snprintf(src, sizeof(src), "/proc/self/fd/%d", fd);
1026 dst = malloc(dsize);
1027 if (!dst) {
1028 err = -ENOMEM;
1029 goto err_out_exit;
1032 err = readlink(src, dst, dsize);
1033 if (err < 0)
1034 goto err_out_free;
1036 dst[err] = '\0';
1037 *datap = dst;
1039 return err + 1; /* including 0-byte */
1041 err_out_free:
1042 free(dst);
1043 err_out_exit:
1044 return err;
1047 int dnet_send_file_info(void *state, struct dnet_cmd *cmd, int fd, uint64_t offset, int64_t size)
1049 struct dnet_node *n = dnet_get_node_from_state(state);
1050 struct dnet_file_info *info;
1051 struct dnet_addr_attr *a;
1052 int flen, err;
1053 char *file;
1054 struct stat st;
1056 err = dnet_fd_readlink(fd, &file);
1057 if (err < 0)
1058 goto err_out_exit;
1060 flen = err;
1062 a = malloc(sizeof(struct dnet_addr_attr) + sizeof(struct dnet_file_info) + flen);
1063 if (!a) {
1064 err = -ENOMEM;
1065 goto err_out_free_file;
1067 info = (struct dnet_file_info *)(a + 1);
1069 dnet_fill_addr_attr(n, a);
1071 err = fstat(fd, &st);
1072 if (err) {
1073 err = -errno;
1074 dnet_log(n, DNET_LOG_ERROR, "%s: EBLOB: %s: info-stat: %d: %s.\n",
1075 dnet_dump_id(&cmd->id), file, err, strerror(-err));
1076 goto err_out_free;
1079 dnet_info_from_stat(info, &st);
1080 /* this is not valid data from raw blob file stat */
1081 info->ctime.tsec = info->mtime.tsec = 0;
1083 if (cmd->flags & DNET_ATTR_META_TIMES) {
1084 err = dnet_read_file_info(n, &cmd->id, info);
1085 if ((err == -ENOENT) && (cmd->flags & DNET_ATTR_META_TIMES))
1086 err = 0;
1087 if (err)
1088 goto err_out_free;
1091 if (size >= 0)
1092 info->size = size;
1093 if (offset)
1094 info->offset = offset;
1096 if (info->size == 0) {
1097 err = -ENOENT;
1098 dnet_log(n, DNET_LOG_NOTICE, "%s: EBLOB: %s: info-stat: ZERO-FILE-SIZE, fd: %d.\n",
1099 dnet_dump_id(&cmd->id), file, fd);
1100 goto err_out_free;
1103 info->flen = flen;
1104 memcpy(info + 1, file, flen);
1106 dnet_convert_file_info(info);
1108 err = dnet_send_reply(state, cmd, a, sizeof(struct dnet_addr_attr) + sizeof(struct dnet_file_info) + flen, 0);
1110 err_out_free:
1111 free(a);
1112 err_out_free_file:
1113 free(file);
1114 err_out_exit:
1115 return err;
1118 int dnet_checksum_data(struct dnet_node *n, void *csum, int *csize, const void *data, uint64_t size)
1120 struct dnet_transform *t = &n->transform;
1122 return t->transform(t->priv, data, size, csum, (unsigned int *)csize, 0);
1125 int dnet_checksum_file(struct dnet_node *n, void *csum, int *csize, const char *file, uint64_t offset, uint64_t size)
1127 int fd, err;
1129 err = open(file, O_RDONLY);
1131 if (err < 0) {
1132 err = -errno;
1133 dnet_log_err(n, "failed to open to be csummed file '%s'", file);
1134 goto err_out_exit;
1136 fd = err;
1137 err = dnet_checksum_fd(n, csum, csize, fd, offset, size);
1138 close(fd);
1140 err_out_exit:
1141 return err;
1144 int dnet_checksum_fd(struct dnet_node *n, void *csum, int *csize, int fd, uint64_t offset, uint64_t size)
1146 int err;
1147 struct dnet_map_fd m;
1149 if (!size) {
1150 struct stat st;
1152 err = fstat(fd, &st);
1153 if (err < 0) {
1154 err = -errno;
1155 dnet_log_err(n, "CSUM: fd: %d", fd);
1156 goto err_out_exit;
1159 size = st.st_size;
1162 m.fd = fd;
1163 m.size = size;
1164 m.offset = offset;
1166 err = dnet_data_map(&m);
1167 if (err)
1168 goto err_out_exit;
1170 err = dnet_checksum_data(n, csum, csize, m.data, size);
1171 dnet_data_unmap(&m);
1173 err_out_exit:
1174 return err;