Only work with 0.9.4 cocaine
[elliptics.git] / library / dnet.c
blob7cd9501d6ba7d1cb52fb4fc895b7fa113d076764
1 /*
2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
17 #include <sys/stat.h>
18 #include <sys/socket.h>
19 #include <sys/mman.h>
20 #include <sys/wait.h>
22 #include <ctype.h>
23 #include <fcntl.h>
24 #include <limits.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <unistd.h>
29 #include "elliptics.h"
31 #include "elliptics/packet.h"
32 #include "elliptics/interface.h"
35 int dnet_transform(struct dnet_node *n, const void *src, uint64_t size, struct dnet_id *id)
37 struct dnet_transform *t = &n->transform;
38 unsigned int csize = sizeof(id->id);
40 return t->transform(t->priv, src, size, id->id, &csize, 0);
43 int dnet_stat_local(struct dnet_net_state *st, struct dnet_id *id)
45 struct dnet_node *n = st->n;
46 int size, cmd_size;
47 struct dnet_cmd *cmd;
48 struct dnet_io_attr *io;
49 int err;
51 size = 1;
52 cmd_size = size + sizeof(struct dnet_cmd) + sizeof(struct dnet_io_attr);
54 cmd = malloc(cmd_size);
55 if (!cmd) {
56 dnet_log(n, DNET_LOG_ERROR, "%s: failed to allocate %d bytes for local stat.\n",
57 dnet_dump_id(id), cmd_size);
58 err = -ENOMEM;
59 goto err_out_exit;
62 memset(cmd, 0, cmd_size);
64 io = (struct dnet_io_attr *)(cmd + 1);
66 memcpy(&cmd->id, id, sizeof(struct dnet_id));
67 cmd->size = cmd_size - sizeof(struct dnet_cmd);
68 cmd->flags = DNET_FLAGS_NOLOCK;
69 cmd->cmd = DNET_CMD_READ;
71 io->size = cmd->size - sizeof(struct dnet_io_attr);
72 io->offset = 0;
73 io->flags = DNET_IO_FLAGS_SKIP_SENDING;
75 memcpy(io->parent, id->id, DNET_ID_SIZE);
76 memcpy(io->id, id->id, DNET_ID_SIZE);
78 dnet_convert_io_attr(io);
80 err = n->cb->command_handler(st, n->cb->command_private, cmd, io);
81 dnet_log(n, DNET_LOG_INFO, "%s: local stat: io_size: %llu, err: %d.\n", dnet_dump_id(&cmd->id), (unsigned long long)io->size, err);
83 free(cmd);
85 err_out_exit:
86 return err;
89 int dnet_remove_local(struct dnet_node *n, struct dnet_id *id)
91 int cmd_size;
92 struct dnet_cmd *cmd;
93 struct dnet_io_attr *io;
94 int err;
96 cmd_size = sizeof(struct dnet_cmd) + sizeof(struct dnet_io_attr);
98 cmd = malloc(cmd_size);
99 if (!cmd) {
100 dnet_log(n, DNET_LOG_ERROR, "%s: failed to allocate %d bytes for local remove.\n",
101 dnet_dump_id(id), cmd_size);
102 err = -ENOMEM;
103 goto err_out_exit;
106 memset(cmd, 0, cmd_size);
108 io = (struct dnet_io_attr *)(cmd + 1);
110 cmd->id = *id;
111 cmd->size = cmd_size - sizeof(struct dnet_cmd);
112 cmd->flags = DNET_FLAGS_NOLOCK;
113 cmd->cmd = DNET_CMD_DEL;
115 io->flags = DNET_IO_FLAGS_SKIP_SENDING;
117 memcpy(io->parent, id->id, DNET_ID_SIZE);
118 memcpy(io->id, id->id, DNET_ID_SIZE);
120 dnet_convert_io_attr(io);
122 err = n->cb->command_handler(n->st, n->cb->command_private, cmd, io);
123 dnet_log(n, DNET_LOG_NOTICE, "%s: local remove: err: %d.\n", dnet_dump_id(&cmd->id), err);
125 free(cmd);
127 err_out_exit:
128 return err;
132 static void dnet_send_idc_fill(struct dnet_net_state *st, void *buf, int size,
133 struct dnet_id *id, uint64_t trans, unsigned int command, int reply, int direct, int more)
135 struct dnet_node *n = st->n;
136 struct dnet_cmd *cmd;
137 struct dnet_raw_id *sid;
138 struct dnet_addr_attr *addr;
139 int i;
141 memset(buf, 0, sizeof(*cmd) + sizeof(*addr));
143 cmd = buf;
144 addr = (struct dnet_addr_attr *)(cmd + 1);
145 sid = (struct dnet_raw_id *)(addr + 1);
147 memcpy(&cmd->id, id, sizeof(struct dnet_id));
148 cmd->size = size - sizeof(struct dnet_cmd);
149 cmd->trans = trans;
151 cmd->flags = DNET_FLAGS_NOLOCK;
152 if (more)
153 cmd->flags |= DNET_FLAGS_MORE;
154 if (direct)
155 cmd->flags |= DNET_FLAGS_DIRECT;
156 if (reply)
157 cmd->trans |= DNET_TRANS_REPLY;
159 cmd->cmd = command;
161 addr->sock_type = n->sock_type;
162 addr->family = n->family;
163 addr->proto = n->proto;
164 memcpy(&addr->addr, &st->addr, sizeof(struct dnet_addr));
166 for (i=0; i<st->idc->id_num; ++i) {
167 memcpy(&sid[i], &st->idc->ids[i].raw, sizeof(struct dnet_raw_id));
168 dnet_convert_raw_id(&sid[i]);
171 dnet_convert_addr_cmd(buf);
174 static int dnet_send_idc(struct dnet_net_state *orig, struct dnet_net_state *send, struct dnet_id *id, uint64_t trans,
175 unsigned int command, int reply, int direct, int more)
177 struct dnet_node *n = orig->n;
178 int size = sizeof(struct dnet_addr_cmd) + orig->idc->id_num * sizeof(struct dnet_raw_id);
179 void *buf;
180 int err;
181 struct timeval start, end;
182 long diff;
184 gettimeofday(&start, NULL);
186 buf = malloc(size);
187 if (!buf) {
188 err = -ENOMEM;
189 goto err_out_exit;
191 memset(buf, 0, sizeof(struct dnet_addr_cmd));
193 dnet_send_idc_fill(orig, buf, size, id, trans, command, reply, direct, more);
195 gettimeofday(&end, NULL);
196 diff = (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec;
197 dnet_log(n, DNET_LOG_INFO, "%s: sending address %s: %ld\n", dnet_dump_id(id), dnet_state_dump_addr(orig), diff);
199 err = dnet_send(send, buf, size);
201 free(buf);
203 err_out_exit:
204 return err;
207 static int dnet_cmd_reverse_lookup(struct dnet_net_state *st, struct dnet_cmd *cmd, void *data __unused)
209 struct dnet_node *n = st->n;
210 struct dnet_net_state *base;
211 int err = -ENOENT;
213 cmd->id.group_id = n->id.group_id;
214 base = dnet_node_state(n);
215 if (base) {
216 err = dnet_send_idc(base, st, &cmd->id, cmd->trans, DNET_CMD_REVERSE_LOOKUP, 1, 0, 0);
217 dnet_state_put(base);
220 return err;
223 static int dnet_check_connection(struct dnet_node *n, struct dnet_addr_attr *a)
225 int s;
227 s = dnet_socket_create_addr(n, a->sock_type, a->proto, a->family,
228 (struct sockaddr *)a->addr.addr, a->addr.addr_len, 0);
229 if (s < 0)
230 return s;
232 dnet_sock_close(s);
233 return 0;
236 static int dnet_cmd_join_client(struct dnet_net_state *st, struct dnet_cmd *cmd, void *data)
238 struct dnet_node *n = st->n;
239 struct dnet_addr_attr *a = data;
240 struct dnet_raw_id *ids;
241 int num, i, err;
243 dnet_convert_addr_attr(a);
245 dnet_log(n, DNET_LOG_DEBUG, "%s: accepted joining client (%s), requesting statistics.\n",
246 dnet_dump_id(&cmd->id), dnet_server_convert_dnet_addr(&a->addr));
247 err = dnet_check_connection(n, a);
248 if (err) {
249 dnet_log(n, DNET_LOG_ERROR, "%s: failed to request statistics from joining client (%s), dropping connection.\n",
250 dnet_dump_id(&cmd->id), dnet_server_convert_dnet_addr(&a->addr));
251 return err;
254 num = (cmd->size - sizeof(struct dnet_addr_attr)) / sizeof(struct dnet_raw_id);
255 ids = (struct dnet_raw_id *)(a + 1);
256 for (i=0; i<num; ++i)
257 dnet_convert_raw_id(&ids[0]);
259 pthread_mutex_lock(&n->state_lock);
260 list_del_init(&st->state_entry);
261 list_del_init(&st->storage_state_entry);
262 pthread_mutex_unlock(&n->state_lock);
264 memcpy(&st->addr, &a->addr, sizeof(struct dnet_addr));
265 err = dnet_idc_create(st, cmd->id.group_id, ids, num);
267 dnet_log(n, DNET_LOG_INFO, "%s: accepted join request from state %s: %d.\n", dnet_dump_id(&cmd->id),
268 dnet_server_convert_dnet_addr(&a->addr), err);
270 return err;
273 static int dnet_cmd_route_list(struct dnet_net_state *orig, struct dnet_cmd *cmd)
275 struct dnet_node *n = orig->n;
276 struct dnet_net_state *st;
277 struct dnet_group *g;
278 void *buf, *orig_buf;
279 size_t size = 0, send_size = 0, sz;
280 int err;
282 pthread_mutex_lock(&n->state_lock);
283 list_for_each_entry(g, &n->group_list, group_entry) {
284 list_for_each_entry(st, &g->state_list, state_entry) {
285 if (!memcmp(&st->addr, &orig->addr, sizeof(struct dnet_addr)))
286 continue;
288 size += st->idc->id_num * sizeof(struct dnet_raw_id) + sizeof(struct dnet_addr_cmd);
291 pthread_mutex_unlock(&n->state_lock);
293 orig_buf = buf = malloc(size);
294 if (!buf) {
295 err = -ENOMEM;
296 goto err_out_exit;
299 pthread_mutex_lock(&n->state_lock);
300 list_for_each_entry(g, &n->group_list, group_entry) {
301 list_for_each_entry(st, &g->state_list, state_entry) {
302 if (!memcmp(&st->addr, &orig->addr, sizeof(struct dnet_addr)))
303 continue;
305 sz = st->idc->id_num * sizeof(struct dnet_raw_id) + sizeof(struct dnet_addr_cmd);
306 if (sz <= size) {
307 cmd->id.group_id = g->group_id;
308 dnet_send_idc_fill(st, buf, sz, &cmd->id, cmd->trans, DNET_CMD_ROUTE_LIST, 1, 0, 1);
310 size -= sz;
311 buf += sz;
313 send_size += sz;
317 pthread_mutex_unlock(&n->state_lock);
319 err = dnet_send(orig, orig_buf, send_size);
320 if (err)
321 goto err_out_free;
323 err_out_free:
324 free(orig_buf);
325 err_out_exit:
326 return err;
329 static int dnet_cmd_exec(struct dnet_net_state *st, struct dnet_cmd *cmd, void *data)
331 struct dnet_node *n = st->n;
332 struct sph *e = data;
333 int err = -ENOTSUP;
335 data += sizeof(struct sph);
337 dnet_convert_sph(e);
339 if (e->event_size + e->data_size + e->binary_size + sizeof(struct sph) != cmd->size) {
340 err = -E2BIG;
341 dnet_log(n, DNET_LOG_ERROR, "%s: invalid size: event-size %d, data-size %llu, binary-size %llu must be: %llu\n",
342 dnet_dump_id(&cmd->id),
343 e->event_size,
344 (unsigned long long)e->data_size,
345 (unsigned long long)e->binary_size,
346 (unsigned long long)cmd->size);
347 goto err_out_exit;
350 err = dnet_cmd_exec_raw(st, cmd, e, data);
352 err_out_exit:
353 return err;
356 static int dnet_cmd_stat_count_single(struct dnet_net_state *orig, struct dnet_cmd *cmd, struct dnet_net_state *st, struct dnet_addr_stat *as)
358 int i;
360 cmd->cmd = DNET_CMD_STAT_COUNT;
362 memcpy(&as->addr, &st->addr, sizeof(struct dnet_addr));
363 as->num = __DNET_CMD_MAX;
364 as->cmd_num = __DNET_CMD_MAX;
366 for (i=0; i<as->num; ++i) {
367 as->count[i] = st->stat[i];
370 dnet_convert_addr_stat(as, as->num);
372 return dnet_send_reply(orig, cmd, as, sizeof(struct dnet_addr_stat) + __DNET_CMD_MAX * sizeof(struct dnet_stat_count), 1);
375 static int dnet_cmd_stat_count_global(struct dnet_net_state *orig, struct dnet_cmd *cmd,
376 struct dnet_node *n, struct dnet_addr_stat *as)
378 struct dnet_stat st;
379 int err = 0;
381 cmd->cmd = DNET_CMD_STAT_COUNT;
383 memcpy(&as->addr, &n->addr, sizeof(struct dnet_addr));
384 as->num = __DNET_CNTR_MAX;
385 as->cmd_num = __DNET_CMD_MAX;
387 memcpy(as->count, n->counters, sizeof(struct dnet_stat_count) * __DNET_CNTR_MAX);
389 if (n->cb->storage_stat) {
390 err = n->cb->storage_stat(n->cb->command_private, &st);
391 if (err)
392 return err;
394 as->count[DNET_CNTR_LA1].count = st.la[0];
395 as->count[DNET_CNTR_LA5].count = st.la[1];
396 as->count[DNET_CNTR_LA15].count = st.la[2];
397 as->count[DNET_CNTR_BSIZE].count = st.bsize;
398 as->count[DNET_CNTR_FRSIZE].count = st.frsize;
399 as->count[DNET_CNTR_BLOCKS].count = st.blocks;
400 as->count[DNET_CNTR_BFREE].count = st.bfree;
401 as->count[DNET_CNTR_BAVAIL].count = st.bavail;
402 as->count[DNET_CNTR_FILES].count = st.files;
403 as->count[DNET_CNTR_FFREE].count = st.ffree;
404 as->count[DNET_CNTR_FAVAIL].count = st.favail;
405 as->count[DNET_CNTR_FSID].count = st.fsid;
406 as->count[DNET_CNTR_VM_ACTIVE].count = st.vm_active;
407 as->count[DNET_CNTR_VM_INACTIVE].count = st.vm_inactive;
408 as->count[DNET_CNTR_VM_TOTAL].count = st.vm_total;
409 as->count[DNET_CNTR_VM_FREE].count = st.vm_free;
410 as->count[DNET_CNTR_VM_CACHED].count = st.vm_cached;
411 as->count[DNET_CNTR_VM_BUFFERS].count = st.vm_buffers;
413 as->count[DNET_CNTR_NODE_FILES].count = n->cb->meta_total_elements(n->cb->command_private);
415 dnet_convert_addr_stat(as, as->num);
417 return dnet_send_reply(orig, cmd, as, sizeof(struct dnet_addr_stat) + __DNET_CNTR_MAX * sizeof(struct dnet_stat_count), 1);
420 static int dnet_cmd_stat_count(struct dnet_net_state *orig, struct dnet_cmd *cmd, void *data __unused)
422 struct dnet_node *n = orig->n;
423 struct dnet_net_state *st;
424 struct dnet_addr_stat *as;
425 int err = 0;
427 as = alloca(sizeof(struct dnet_addr_stat) + __DNET_CNTR_MAX * sizeof(struct dnet_stat_count));
428 if (!as) {
429 err = -ENOMEM;
430 goto err_out_exit;
433 if (cmd->flags & DNET_ATTR_CNTR_GLOBAL) {
434 err = dnet_cmd_stat_count_global(orig, cmd, orig->n, as);
435 } else {
436 pthread_mutex_lock(&n->state_lock);
437 #if 0
438 list_for_each_entry(st, &n->state_list, state_entry) {
439 err = dnet_cmd_stat_count_single(orig, cmd, st, as);
440 if (err)
441 goto err_out_unlock;
443 #endif
444 list_for_each_entry(st, &n->empty_state_list, state_entry) {
445 err = dnet_cmd_stat_count_single(orig, cmd, st, as);
446 if (err)
447 goto err_out_unlock;
449 err_out_unlock:
450 pthread_mutex_unlock(&n->state_lock);
453 err_out_exit:
454 return err;
457 static int dnet_cmd_status(struct dnet_net_state *orig, struct dnet_cmd *cmd __unused, void *data)
459 struct dnet_node *n = orig->n;
460 struct dnet_node_status *st = data;
462 dnet_convert_node_status(st);
464 dnet_log(n, DNET_LOG_INFO, "%s: status-change: nflags: %x->%x, log_level: %d->%d, "
465 "status_flags: EXIT: %d, RO: %d\n",
466 dnet_dump_id(&cmd->id), n->flags, st->nflags, n->log->log_level, st->log_level,
467 !!(st->status_flags & DNET_STATUS_EXIT), !!(st->status_flags & DNET_STATUS_RO));
469 if (st->status_flags != -1) {
470 if (st->status_flags & DNET_STATUS_EXIT) {
471 dnet_set_need_exit(n);
474 if (st->status_flags & DNET_STATUS_RO) {
475 n->ro = 1;
476 } else {
477 n->ro = 0;
481 if (st->nflags != -1)
482 n->flags = st->nflags;
484 if (st->log_level != ~0U)
485 n->log->log_level = st->log_level;
487 st->nflags = n->flags;
488 st->log_level = n->log->log_level;
489 st->status_flags = 0;
491 if (n->need_exit)
492 st->status_flags |= DNET_STATUS_EXIT;
494 if (n->ro)
495 st->status_flags |= DNET_STATUS_RO;
497 dnet_convert_node_status(st);
499 return dnet_send_reply(orig, cmd, st, sizeof(struct dnet_node_status), 1);
502 static int dnet_cmd_auth(struct dnet_net_state *orig, struct dnet_cmd *cmd __unused, void *data)
504 struct dnet_node *n = orig->n;
505 struct dnet_auth *a = data;
506 int err = 0;
508 if (cmd->size != sizeof(struct dnet_auth)) {
509 err = -EINVAL;
510 goto err_out_exit;
513 dnet_convert_auth(a);
514 if (memcmp(n->cookie, a->cookie, DNET_AUTH_COOKIE_SIZE)) {
515 err = -EPERM;
516 dnet_log(n, DNET_LOG_ERROR, "%s: auth cookies do not match\n", dnet_state_dump_addr(orig));
517 } else {
518 dnet_log(n, DNET_LOG_INFO, "%s: authentication succeeded\n", dnet_state_dump_addr(orig));
521 err_out_exit:
522 return err;
525 int dnet_process_cmd_raw(struct dnet_net_state *st, struct dnet_cmd *cmd, void *data)
527 int err = 0;
528 unsigned long long size = cmd->size;
529 struct dnet_node *n = st->n;
530 unsigned long long tid = cmd->trans & ~DNET_TRANS_REPLY;
531 struct dnet_io_attr *io;
532 struct timeval start, end;
533 long diff;
535 if (!(cmd->flags & DNET_FLAGS_NOLOCK)) {
536 dnet_oplock(n, &cmd->id);
539 gettimeofday(&start, NULL);
541 switch (cmd->cmd) {
542 case DNET_CMD_AUTH:
543 err = dnet_cmd_auth(st, cmd, data);
544 break;
545 case DNET_CMD_STATUS:
546 err = dnet_cmd_status(st, cmd, data);
547 break;
548 case DNET_CMD_REVERSE_LOOKUP:
549 err = dnet_cmd_reverse_lookup(st, cmd, data);
550 break;
551 case DNET_CMD_JOIN:
552 err = dnet_cmd_join_client(st, cmd, data);
553 break;
554 case DNET_CMD_ROUTE_LIST:
555 err = dnet_cmd_route_list(st, cmd);
556 break;
557 case DNET_CMD_EXEC:
558 err = dnet_cmd_exec(st, cmd, data);
559 break;
560 case DNET_CMD_STAT_COUNT:
561 err = dnet_cmd_stat_count(st, cmd, data);
562 break;
563 case DNET_CMD_NOTIFY:
564 if (!(cmd->flags & DNET_ATTR_DROP_NOTIFICATION)) {
565 err = dnet_notify_add(st, cmd);
567 * We drop 'need ack' flag, since notification
568 * transaction is a long-living one, since
569 * every notification will be sent as transaction
570 * completion.
572 * Transaction acknowledge will be sent when
573 * notification is removed.
575 if (!err)
576 cmd->flags &= ~DNET_FLAGS_NEED_ACK;
577 } else
578 err = dnet_notify_remove(st, cmd);
579 break;
580 case DNET_CMD_LIST:
581 if (n->ro) {
582 err = -EROFS;
583 } else {
584 if (cmd->flags & DNET_ATTR_BULK_CHECK)
585 err = dnet_cmd_bulk_check(st, cmd, data);
586 else
587 err = dnet_db_list(st, cmd);
589 break;
590 case DNET_CMD_READ:
591 case DNET_CMD_WRITE:
592 case DNET_CMD_DEL:
593 if (n->ro && ((cmd->cmd == DNET_CMD_DEL) || (cmd->cmd == DNET_CMD_WRITE))) {
594 err = -EROFS;
595 break;
598 io = NULL;
599 if (size < sizeof(struct dnet_io_attr)) {
600 dnet_log(st->n, DNET_LOG_ERROR, "%s: invalid size: cmd: %u, rest_size: %llu\n",
601 dnet_dump_id(&cmd->id), cmd->cmd, size);
602 err = -EINVAL;
603 break;
605 io = data;
606 dnet_convert_io_attr(io);
608 dnet_log(n, DNET_LOG_INFO, "%s: %s io command, offset: %llu, size: %llu, ioflags: %x, cflags: %llx, "
609 "node-flags: %x, type: %d\n",
610 dnet_dump_id_str(io->id), dnet_cmd_string(cmd->cmd),
611 (unsigned long long)io->offset, (unsigned long long)io->size,
612 io->flags, (unsigned long long)cmd->flags,
613 n->flags, io->type);
615 if (n->flags & DNET_CFG_NO_CSUM)
616 io->flags |= DNET_IO_FLAGS_NOCSUM;
618 /* do not write metadata for cache-only writes */
619 if ((io->flags & DNET_IO_FLAGS_CACHE_ONLY) && (io->type == EBLOB_TYPE_META)) {
620 err = -EINVAL;
621 break;
625 * Only allow cache for column 0
626 * In the next life (2012 I really expect) there will be no columns at all
628 if (io->type == 0) {
630 * Always check cache when reading!
632 if ((io->flags & DNET_IO_FLAGS_CACHE) || (cmd->cmd != DNET_CMD_WRITE)) {
633 err = dnet_cmd_cache_io(st, cmd, io, data + sizeof(struct dnet_io_attr));
635 if (io->flags & DNET_IO_FLAGS_CACHE_ONLY)
636 break;
639 * We successfully read data from cache, do not sink to disk for it
641 if ((cmd->cmd == DNET_CMD_READ) && !err)
642 break;
646 if ((cmd->cmd == DNET_CMD_DEL) || (io->flags & DNET_IO_FLAGS_META)) {
647 err = dnet_process_meta(st, cmd, data);
648 break;
651 dnet_convert_io_attr(io);
652 default:
653 /* Remove DNET_FLAGS_NEED_ACK flags for WRITE command
654 to eliminate double reply packets
655 (the first one with dnet_file_info structure,
656 the second to destroy transaction on client side) */
657 if ((cmd->cmd == DNET_CMD_WRITE) || (cmd->cmd == DNET_CMD_READ)) {
658 cmd->flags &= ~DNET_FLAGS_NEED_ACK;
660 err = n->cb->command_handler(st, n->cb->command_private, cmd, data);
662 /* If there was error in WRITE command - send empty reply
663 to notify client with error code and destroy transaction */
664 if (err && ((cmd->cmd == DNET_CMD_WRITE) || (cmd->cmd == DNET_CMD_READ))) {
665 cmd->flags |= DNET_FLAGS_NEED_ACK;
667 #if 0
668 if (!err && (cmd->cmd == DNET_CMD_WRITE)) {
669 dnet_update_notify(st, cmd, a, data);
671 #endif
672 break;
675 dnet_stat_inc(st->stat, cmd->cmd, err);
676 if (st->__join_state == DNET_JOIN)
677 dnet_counter_inc(n, cmd->cmd, err);
678 else
679 dnet_counter_inc(n, cmd->cmd + __DNET_CMD_MAX, err);
681 gettimeofday(&end, NULL);
683 diff = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec);
684 dnet_log(n, DNET_LOG_INFO, "%s: %s: trans: %llu, cflags: %llx, time: %ld usecs, err: %d.\n",
685 dnet_dump_id(&cmd->id), dnet_cmd_string(cmd->cmd), tid,
686 (unsigned long long)cmd->flags, diff, err);
688 if (cmd->flags & DNET_FLAGS_NEED_ACK) {
689 struct dnet_cmd ack;
691 memcpy(&ack.id, &cmd->id, sizeof(struct dnet_id));
692 ack.cmd = cmd->cmd;
693 ack.trans = cmd->trans | DNET_TRANS_REPLY;
694 ack.size = 0;
695 ack.flags = cmd->flags & ~(DNET_FLAGS_NEED_ACK | DNET_FLAGS_MORE);
696 ack.status = err;
698 dnet_log(n, DNET_LOG_DEBUG, "%s: ack trans: %llu, flags: %llx, status: %d.\n",
699 dnet_dump_id(&cmd->id), tid, (unsigned long long)ack.flags, err);
701 dnet_convert_cmd(&ack);
702 err = dnet_send(st, &ack, sizeof(struct dnet_cmd));
705 if (!(cmd->flags & DNET_FLAGS_NOLOCK))
706 dnet_opunlock(n, &cmd->id);
708 return err;
711 int dnet_state_join_nolock(struct dnet_net_state *st)
713 int err;
714 struct dnet_node *n = st->n;
715 struct dnet_net_state *base;
716 struct dnet_id id;
718 base = dnet_state_search_nolock(n, &n->id);
719 if (!base) {
720 err = -ENOENT;
721 goto err_out_exit;
724 /* we do not care about group_id actually, since use direct send */
725 memcpy(&id, &n->id, sizeof(id));
727 err = dnet_send_idc(base, st, &id, 0, DNET_CMD_JOIN, 0, 1, 0);
728 if (err) {
729 dnet_log(n, DNET_LOG_ERROR, "%s: failed to send join request to %s.\n",
730 dnet_dump_id(&id), dnet_server_convert_dnet_addr(&st->addr));
731 goto err_out_put;
734 st->__join_state = DNET_JOIN;
735 dnet_log(n, DNET_LOG_INFO, "%s: successfully joined network, group %d.\n", dnet_dump_id(&id), id.group_id);
737 err_out_put:
738 /* this is dangerous, since base can go away and we will destroy it here,
739 * which in turn will call dnet_state_remove(), which will deadlock with n->state_lock already being held
741 * FIXME
743 dnet_state_put(base);
744 err_out_exit:
745 return err;
748 int64_t dnet_get_param(struct dnet_node *n, struct dnet_id *id, enum id_params param)
750 struct dnet_net_state *st;
751 int64_t ret = 1;
753 st = dnet_state_get_first(n, id);
754 if (!st)
755 return -ENOENT;
757 switch (param) {
758 case DNET_ID_PARAM_LA:
759 ret = st->la;
760 break;
761 case DNET_ID_PARAM_FREE_SPACE:
762 ret = st->free;
763 break;
764 default:
765 break;
767 dnet_state_put(st);
769 return ret;
772 static int dnet_compare_by_param(const void *id1, const void *id2)
774 const struct dnet_id_param *l1 = id1;
775 const struct dnet_id_param *l2 = id2;
777 if (l1->param == l2->param)
778 return l1->param_reserved - l2->param_reserved;
780 return l1->param - l2->param;
783 int dnet_generate_ids_by_param(struct dnet_node *n, struct dnet_id *id, enum id_params param, struct dnet_id_param **dst)
785 int i, err = 0, group_num = 0;
786 struct dnet_id_param *ids;
787 struct dnet_group *g;
789 if (n->group_num) {
790 pthread_mutex_lock(&n->group_lock);
791 if (n->group_num) {
792 group_num = n->group_num;
794 ids = malloc(group_num * sizeof(struct dnet_id_param));
795 if (!ids) {
796 err = -ENOMEM;
797 goto err_out_unlock_group;
799 for (i=0; i<group_num; ++i)
800 ids[i].group_id = n->groups[i];
802 err_out_unlock_group:
803 pthread_mutex_unlock(&n->group_lock);
804 if (err)
805 goto err_out_exit;
808 if (!group_num) {
809 int pos = 0;
811 pthread_mutex_lock(&n->state_lock);
812 list_for_each_entry(g, &n->group_list, group_entry)
813 group_num++;
815 ids = malloc(group_num * sizeof(struct dnet_id_param));
816 if (!ids) {
817 err = -ENOMEM;
818 goto err_out_unlock_state;
821 list_for_each_entry(g, &n->group_list, group_entry) {
822 ids[pos].group_id = g->group_id;
823 pos++;
825 err_out_unlock_state:
826 pthread_mutex_unlock(&n->state_lock);
827 if (err)
828 goto err_out_exit;
831 for (i=0; i<group_num; ++i) {
832 id->group_id = ids[i].group_id;
833 ids[i].param = dnet_get_param(n, id, param);
836 qsort(ids, group_num, sizeof(struct dnet_id_param), dnet_compare_by_param);
837 *dst = ids;
839 for (i=0; i<group_num; ++i) {
840 id->group_id = ids[i].group_id;
843 err = group_num;
845 err_out_exit:
846 return err;
849 static int dnet_populate_cache(struct dnet_node *n, struct dnet_cmd *cmd, struct dnet_io_attr *io,
850 void *data, int fd, size_t fd_offset, size_t size)
852 void *orig_data = data;
853 ssize_t err;
855 if (!data && fd >= 0) {
856 ssize_t tmp_size = size;
858 if (size >= n->cache_size)
859 return -ENOMEM;
861 orig_data = data = malloc(size);
862 if (!data)
863 return -ENOMEM;
865 while (tmp_size > 0) {
866 err = pread(fd, data, tmp_size, fd_offset);
867 if (err <= 0) {
868 dnet_log_err(n, "%s: failed to populate cache: pread: offset: %zd, size: %zd",
869 dnet_dump_id(&cmd->id), fd_offset, size);
870 goto err_out_free;
873 data += err;
874 tmp_size -= err;
875 fd_offset += err;
879 cmd->cmd = DNET_CMD_WRITE;
880 err = dnet_cmd_cache_io(n->st, cmd, io, orig_data);
881 cmd->cmd = DNET_CMD_READ;
883 err_out_free:
884 if (data != orig_data)
885 free(orig_data);
887 return err;
890 int dnet_send_read_data(void *state, struct dnet_cmd *cmd, struct dnet_io_attr *io, void *data,
891 int fd, uint64_t offset, int close_on_exit)
893 struct dnet_net_state *st = state;
894 struct dnet_cmd *c;
895 struct dnet_io_attr *rio;
896 int hsize = sizeof(struct dnet_cmd) + sizeof(struct dnet_io_attr);
897 int err;
900 * A simple hack to forbid read reply sending.
901 * It is used in local stat - we do not want to send stat data
902 * back to parental client, instead server will wrap data into
903 * proper transaction reply next to this obscure packet.
905 if (io->flags & DNET_IO_FLAGS_SKIP_SENDING)
906 return 0;
908 c = malloc(hsize);
909 if (!c) {
910 err = -ENOMEM;
911 goto err_out_exit;
914 memset(c, 0, hsize);
916 rio = (struct dnet_io_attr *)(c + 1);
918 dnet_setup_id(&c->id, cmd->id.group_id, io->id);
920 c->flags = cmd->flags & ~(DNET_FLAGS_NEED_ACK | DNET_FLAGS_MORE);
921 if (cmd->flags & DNET_FLAGS_NEED_ACK)
922 c->flags |= DNET_FLAGS_MORE;
924 c->size = sizeof(struct dnet_io_attr) + io->size;
925 c->trans = cmd->trans | DNET_TRANS_REPLY;
926 c->cmd = DNET_CMD_READ;
928 memcpy(rio, io, sizeof(struct dnet_io_attr));
930 dnet_log_raw(st->n, DNET_LOG_NOTICE, "%s: %s: reply: offset: %llu, size: %llu.\n",
931 dnet_dump_id(&c->id), dnet_cmd_string(c->cmd),
932 (unsigned long long)io->offset, (unsigned long long)io->size);
934 /* only populate data which has zero offset and from column 0 */
935 if ((io->flags & DNET_IO_FLAGS_CACHE) && !io->offset && (io->type == 0)) {
936 err = dnet_populate_cache(st->n, c, rio, data, fd, offset, io->size);
939 dnet_convert_cmd(c);
940 dnet_convert_io_attr(rio);
942 if (data)
943 err = dnet_send_data(st, c, hsize, data, io->size);
944 else
945 err = dnet_send_fd(st, c, hsize, fd, offset, io->size, close_on_exit);
947 free(c);
949 err_out_exit:
950 return err;
953 void dnet_fill_addr_attr(struct dnet_node *n, struct dnet_addr_attr *attr)
955 memcpy(&attr->addr, &n->addr, sizeof(struct dnet_addr));
957 attr->sock_type = n->sock_type;
958 attr->family = n->family;
959 attr->proto = n->proto;
962 int dnet_read_file_info(struct dnet_node *n, struct dnet_id *id, struct dnet_file_info *info)
964 struct dnet_meta *m;
965 struct dnet_meta_update *mu;
966 struct dnet_meta_container mc;
967 struct dnet_raw_id raw;
968 int err;
970 memcpy(raw.id, id->id, DNET_ID_SIZE);
972 err = n->cb->meta_read(n->cb->command_private, &raw, &mc.data);
973 if (err < 0) {
974 goto err_out_exit;
976 mc.size = err;
978 m = dnet_meta_search(n, &mc, DNET_META_UPDATE);
979 if (!m) {
980 dnet_log(n, DNET_LOG_ERROR, "%s: dnet_read_file_info_verify_csum: no DNET_META_UPDATE tag in metadata\n",
981 dnet_dump_id(id));
982 err = -ENODATA;
983 goto err_out_free;
986 mu = (struct dnet_meta_update *)m->data;
987 dnet_convert_meta_update(mu);
989 info->ctime = info->mtime = mu->tm;
990 err = 0;
992 err_out_free:
993 free(mc.data);
994 err_out_exit:
995 return err;
998 static int dnet_fd_readlink(int fd, char **datap)
1000 char *dst, src[64];
1001 int dsize = 4096;
1002 int err;
1004 snprintf(src, sizeof(src), "/proc/self/fd/%d", fd);
1006 dst = malloc(dsize);
1007 if (!dst) {
1008 err = -ENOMEM;
1009 goto err_out_exit;
1012 err = readlink(src, dst, dsize);
1013 if (err < 0)
1014 goto err_out_free;
1016 dst[err] = '\0';
1017 *datap = dst;
1019 return err + 1; /* including 0-byte */
1021 err_out_free:
1022 free(dst);
1023 err_out_exit:
1024 return err;
1027 int dnet_send_file_info(void *state, struct dnet_cmd *cmd, int fd, uint64_t offset, int64_t size)
1029 struct dnet_node *n = dnet_get_node_from_state(state);
1030 struct dnet_file_info *info;
1031 struct dnet_addr_attr *a;
1032 int flen, err;
1033 char *file;
1034 struct stat st;
1036 err = dnet_fd_readlink(fd, &file);
1037 if (err < 0)
1038 goto err_out_exit;
1040 flen = err;
1042 a = malloc(sizeof(struct dnet_addr_attr) + sizeof(struct dnet_file_info) + flen);
1043 if (!a) {
1044 err = -ENOMEM;
1045 goto err_out_free_file;
1047 info = (struct dnet_file_info *)(a + 1);
1049 dnet_fill_addr_attr(n, a);
1051 err = fstat(fd, &st);
1052 if (err) {
1053 err = -errno;
1054 dnet_log(n, DNET_LOG_ERROR, "%s: EBLOB: %s: info-stat: %d: %s.\n",
1055 dnet_dump_id(&cmd->id), file, err, strerror(-err));
1056 goto err_out_free;
1059 dnet_info_from_stat(info, &st);
1060 /* this is not valid data from raw blob file stat */
1061 info->ctime.tsec = info->mtime.tsec = 0;
1063 if (cmd->flags & DNET_ATTR_META_TIMES) {
1064 err = dnet_read_file_info(n, &cmd->id, info);
1065 if ((err == -ENOENT) && (cmd->flags & DNET_ATTR_META_TIMES))
1066 err = 0;
1067 if (err)
1068 goto err_out_free;
1071 if (size >= 0)
1072 info->size = size;
1073 if (offset)
1074 info->offset = offset;
1076 if (info->size == 0) {
1077 err = -ENOENT;
1078 dnet_log(n, DNET_LOG_NOTICE, "%s: EBLOB: %s: info-stat: ZERO-FILE-SIZE, fd: %d.\n",
1079 dnet_dump_id(&cmd->id), file, fd);
1080 goto err_out_free;
1083 info->flen = flen;
1084 memcpy(info + 1, file, flen);
1086 dnet_convert_file_info(info);
1088 err = dnet_send_reply(state, cmd, a, sizeof(struct dnet_addr_attr) + sizeof(struct dnet_file_info) + flen, 0);
1090 err_out_free:
1091 free(a);
1092 err_out_free_file:
1093 free(file);
1094 err_out_exit:
1095 return err;