2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
18 #include <sys/socket.h>
29 #include "elliptics.h"
31 #include "elliptics/packet.h"
32 #include "elliptics/interface.h"
35 int dnet_transform(struct dnet_node
*n
, const void *src
, uint64_t size
, struct dnet_id
*id
)
37 struct dnet_transform
*t
= &n
->transform
;
38 unsigned int csize
= sizeof(id
->id
);
40 return t
->transform(t
->priv
, src
, size
, id
->id
, &csize
, 0);
43 int dnet_stat_local(struct dnet_net_state
*st
, struct dnet_id
*id
)
45 struct dnet_node
*n
= st
->n
;
48 struct dnet_io_attr
*io
;
52 cmd_size
= size
+ sizeof(struct dnet_cmd
) + sizeof(struct dnet_io_attr
);
54 cmd
= malloc(cmd_size
);
56 dnet_log(n
, DNET_LOG_ERROR
, "%s: failed to allocate %d bytes for local stat.\n",
57 dnet_dump_id(id
), cmd_size
);
62 memset(cmd
, 0, cmd_size
);
64 io
= (struct dnet_io_attr
*)(cmd
+ 1);
66 memcpy(&cmd
->id
, id
, sizeof(struct dnet_id
));
67 cmd
->size
= cmd_size
- sizeof(struct dnet_cmd
);
68 cmd
->flags
= DNET_FLAGS_NOLOCK
;
69 cmd
->cmd
= DNET_CMD_READ
;
71 io
->size
= cmd
->size
- sizeof(struct dnet_io_attr
);
73 io
->flags
= DNET_IO_FLAGS_SKIP_SENDING
;
75 memcpy(io
->parent
, id
->id
, DNET_ID_SIZE
);
76 memcpy(io
->id
, id
->id
, DNET_ID_SIZE
);
78 dnet_convert_io_attr(io
);
80 err
= n
->cb
->command_handler(st
, n
->cb
->command_private
, cmd
, io
);
81 dnet_log(n
, DNET_LOG_INFO
, "%s: local stat: io_size: %llu, err: %d.\n", dnet_dump_id(&cmd
->id
), (unsigned long long)io
->size
, err
);
89 int dnet_remove_local(struct dnet_node
*n
, struct dnet_id
*id
)
93 struct dnet_io_attr
*io
;
96 cmd_size
= sizeof(struct dnet_cmd
) + sizeof(struct dnet_io_attr
);
98 cmd
= malloc(cmd_size
);
100 dnet_log(n
, DNET_LOG_ERROR
, "%s: failed to allocate %d bytes for local remove.\n",
101 dnet_dump_id(id
), cmd_size
);
106 memset(cmd
, 0, cmd_size
);
108 io
= (struct dnet_io_attr
*)(cmd
+ 1);
111 cmd
->size
= cmd_size
- sizeof(struct dnet_cmd
);
112 cmd
->flags
= DNET_FLAGS_NOLOCK
;
113 cmd
->cmd
= DNET_CMD_DEL
;
115 io
->flags
= DNET_IO_FLAGS_SKIP_SENDING
;
117 memcpy(io
->parent
, id
->id
, DNET_ID_SIZE
);
118 memcpy(io
->id
, id
->id
, DNET_ID_SIZE
);
120 dnet_convert_io_attr(io
);
122 err
= n
->cb
->command_handler(n
->st
, n
->cb
->command_private
, cmd
, io
);
123 dnet_log(n
, DNET_LOG_NOTICE
, "%s: local remove: err: %d.\n", dnet_dump_id(&cmd
->id
), err
);
132 static void dnet_send_idc_fill(struct dnet_net_state
*st
, void *buf
, int size
,
133 struct dnet_id
*id
, uint64_t trans
, unsigned int command
, int reply
, int direct
, int more
)
135 struct dnet_node
*n
= st
->n
;
136 struct dnet_cmd
*cmd
;
137 struct dnet_raw_id
*sid
;
138 struct dnet_addr_attr
*addr
;
141 memset(buf
, 0, sizeof(*cmd
) + sizeof(*addr
));
144 addr
= (struct dnet_addr_attr
*)(cmd
+ 1);
145 sid
= (struct dnet_raw_id
*)(addr
+ 1);
147 memcpy(&cmd
->id
, id
, sizeof(struct dnet_id
));
148 cmd
->size
= size
- sizeof(struct dnet_cmd
);
151 cmd
->flags
= DNET_FLAGS_NOLOCK
;
153 cmd
->flags
|= DNET_FLAGS_MORE
;
155 cmd
->flags
|= DNET_FLAGS_DIRECT
;
157 cmd
->trans
|= DNET_TRANS_REPLY
;
161 addr
->sock_type
= n
->sock_type
;
162 addr
->family
= n
->family
;
163 addr
->proto
= n
->proto
;
164 memcpy(&addr
->addr
, &st
->addr
, sizeof(struct dnet_addr
));
166 for (i
=0; i
<st
->idc
->id_num
; ++i
) {
167 memcpy(&sid
[i
], &st
->idc
->ids
[i
].raw
, sizeof(struct dnet_raw_id
));
168 dnet_convert_raw_id(&sid
[i
]);
171 dnet_convert_addr_cmd(buf
);
174 static int dnet_send_idc(struct dnet_net_state
*orig
, struct dnet_net_state
*send
, struct dnet_id
*id
, uint64_t trans
,
175 unsigned int command
, int reply
, int direct
, int more
)
177 struct dnet_node
*n
= orig
->n
;
178 int size
= sizeof(struct dnet_addr_cmd
) + orig
->idc
->id_num
* sizeof(struct dnet_raw_id
);
181 struct timeval start
, end
;
184 gettimeofday(&start
, NULL
);
191 memset(buf
, 0, sizeof(struct dnet_addr_cmd
));
193 dnet_send_idc_fill(orig
, buf
, size
, id
, trans
, command
, reply
, direct
, more
);
195 gettimeofday(&end
, NULL
);
196 diff
= (end
.tv_sec
- start
.tv_sec
) * 1000000 + end
.tv_usec
- start
.tv_usec
;
197 dnet_log(n
, DNET_LOG_INFO
, "%s: sending address %s: %ld\n", dnet_dump_id(id
), dnet_state_dump_addr(orig
), diff
);
199 err
= dnet_send(send
, buf
, size
);
207 static int dnet_cmd_reverse_lookup(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *data __unused
)
209 struct dnet_node
*n
= st
->n
;
210 struct dnet_net_state
*base
;
213 cmd
->id
.group_id
= n
->id
.group_id
;
214 base
= dnet_node_state(n
);
216 err
= dnet_send_idc(base
, st
, &cmd
->id
, cmd
->trans
, DNET_CMD_REVERSE_LOOKUP
, 1, 0, 0);
217 dnet_state_put(base
);
223 static int dnet_check_connection(struct dnet_node
*n
, struct dnet_addr_attr
*a
)
227 s
= dnet_socket_create_addr(n
, a
->sock_type
, a
->proto
, a
->family
,
228 (struct sockaddr
*)a
->addr
.addr
, a
->addr
.addr_len
, 0);
236 static int dnet_cmd_join_client(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *data
)
238 struct dnet_node
*n
= st
->n
;
239 struct dnet_addr_attr
*a
= data
;
240 struct dnet_raw_id
*ids
;
243 dnet_convert_addr_attr(a
);
245 dnet_log(n
, DNET_LOG_DEBUG
, "%s: accepted joining client (%s), requesting statistics.\n",
246 dnet_dump_id(&cmd
->id
), dnet_server_convert_dnet_addr(&a
->addr
));
247 err
= dnet_check_connection(n
, a
);
249 dnet_log(n
, DNET_LOG_ERROR
, "%s: failed to request statistics from joining client (%s), dropping connection.\n",
250 dnet_dump_id(&cmd
->id
), dnet_server_convert_dnet_addr(&a
->addr
));
254 num
= (cmd
->size
- sizeof(struct dnet_addr_attr
)) / sizeof(struct dnet_raw_id
);
255 ids
= (struct dnet_raw_id
*)(a
+ 1);
256 for (i
=0; i
<num
; ++i
)
257 dnet_convert_raw_id(&ids
[0]);
259 pthread_mutex_lock(&n
->state_lock
);
260 list_del_init(&st
->state_entry
);
261 list_del_init(&st
->storage_state_entry
);
262 pthread_mutex_unlock(&n
->state_lock
);
264 memcpy(&st
->addr
, &a
->addr
, sizeof(struct dnet_addr
));
265 err
= dnet_idc_create(st
, cmd
->id
.group_id
, ids
, num
);
267 dnet_log(n
, DNET_LOG_INFO
, "%s: accepted join request from state %s: %d.\n", dnet_dump_id(&cmd
->id
),
268 dnet_server_convert_dnet_addr(&a
->addr
), err
);
273 static int dnet_cmd_route_list(struct dnet_net_state
*orig
, struct dnet_cmd
*cmd
)
275 struct dnet_node
*n
= orig
->n
;
276 struct dnet_net_state
*st
;
277 struct dnet_group
*g
;
278 void *buf
, *orig_buf
;
279 size_t size
= 0, send_size
= 0, sz
;
282 pthread_mutex_lock(&n
->state_lock
);
283 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
284 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
285 if (!memcmp(&st
->addr
, &orig
->addr
, sizeof(struct dnet_addr
)))
288 size
+= st
->idc
->id_num
* sizeof(struct dnet_raw_id
) + sizeof(struct dnet_addr_cmd
);
291 pthread_mutex_unlock(&n
->state_lock
);
293 orig_buf
= buf
= malloc(size
);
299 pthread_mutex_lock(&n
->state_lock
);
300 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
301 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
302 if (!memcmp(&st
->addr
, &orig
->addr
, sizeof(struct dnet_addr
)))
305 sz
= st
->idc
->id_num
* sizeof(struct dnet_raw_id
) + sizeof(struct dnet_addr_cmd
);
307 cmd
->id
.group_id
= g
->group_id
;
308 dnet_send_idc_fill(st
, buf
, sz
, &cmd
->id
, cmd
->trans
, DNET_CMD_ROUTE_LIST
, 1, 0, 1);
317 pthread_mutex_unlock(&n
->state_lock
);
319 err
= dnet_send(orig
, orig_buf
, send_size
);
329 static int dnet_cmd_exec(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *data
)
331 struct dnet_node
*n
= st
->n
;
332 struct sph
*e
= data
;
335 data
+= sizeof(struct sph
);
339 if (e
->event_size
+ e
->data_size
+ e
->binary_size
+ sizeof(struct sph
) != cmd
->size
) {
341 dnet_log(n
, DNET_LOG_ERROR
, "%s: invalid size: event-size %d, data-size %llu, binary-size %llu must be: %llu\n",
342 dnet_dump_id(&cmd
->id
),
344 (unsigned long long)e
->data_size
,
345 (unsigned long long)e
->binary_size
,
346 (unsigned long long)cmd
->size
);
350 err
= dnet_cmd_exec_raw(st
, cmd
, e
, data
);
356 static int dnet_cmd_stat_count_single(struct dnet_net_state
*orig
, struct dnet_cmd
*cmd
, struct dnet_net_state
*st
, struct dnet_addr_stat
*as
)
360 cmd
->cmd
= DNET_CMD_STAT_COUNT
;
362 memcpy(&as
->addr
, &st
->addr
, sizeof(struct dnet_addr
));
363 as
->num
= __DNET_CMD_MAX
;
364 as
->cmd_num
= __DNET_CMD_MAX
;
366 for (i
=0; i
<as
->num
; ++i
) {
367 as
->count
[i
] = st
->stat
[i
];
370 dnet_convert_addr_stat(as
, as
->num
);
372 return dnet_send_reply(orig
, cmd
, as
, sizeof(struct dnet_addr_stat
) + __DNET_CMD_MAX
* sizeof(struct dnet_stat_count
), 1);
375 static int dnet_cmd_stat_count_global(struct dnet_net_state
*orig
, struct dnet_cmd
*cmd
,
376 struct dnet_node
*n
, struct dnet_addr_stat
*as
)
381 cmd
->cmd
= DNET_CMD_STAT_COUNT
;
383 memcpy(&as
->addr
, &n
->addr
, sizeof(struct dnet_addr
));
384 as
->num
= __DNET_CNTR_MAX
;
385 as
->cmd_num
= __DNET_CMD_MAX
;
387 memcpy(as
->count
, n
->counters
, sizeof(struct dnet_stat_count
) * __DNET_CNTR_MAX
);
389 if (n
->cb
->storage_stat
) {
390 err
= n
->cb
->storage_stat(n
->cb
->command_private
, &st
);
394 as
->count
[DNET_CNTR_LA1
].count
= st
.la
[0];
395 as
->count
[DNET_CNTR_LA5
].count
= st
.la
[1];
396 as
->count
[DNET_CNTR_LA15
].count
= st
.la
[2];
397 as
->count
[DNET_CNTR_BSIZE
].count
= st
.bsize
;
398 as
->count
[DNET_CNTR_FRSIZE
].count
= st
.frsize
;
399 as
->count
[DNET_CNTR_BLOCKS
].count
= st
.blocks
;
400 as
->count
[DNET_CNTR_BFREE
].count
= st
.bfree
;
401 as
->count
[DNET_CNTR_BAVAIL
].count
= st
.bavail
;
402 as
->count
[DNET_CNTR_FILES
].count
= st
.files
;
403 as
->count
[DNET_CNTR_FFREE
].count
= st
.ffree
;
404 as
->count
[DNET_CNTR_FAVAIL
].count
= st
.favail
;
405 as
->count
[DNET_CNTR_FSID
].count
= st
.fsid
;
406 as
->count
[DNET_CNTR_VM_ACTIVE
].count
= st
.vm_active
;
407 as
->count
[DNET_CNTR_VM_INACTIVE
].count
= st
.vm_inactive
;
408 as
->count
[DNET_CNTR_VM_TOTAL
].count
= st
.vm_total
;
409 as
->count
[DNET_CNTR_VM_FREE
].count
= st
.vm_free
;
410 as
->count
[DNET_CNTR_VM_CACHED
].count
= st
.vm_cached
;
411 as
->count
[DNET_CNTR_VM_BUFFERS
].count
= st
.vm_buffers
;
413 as
->count
[DNET_CNTR_NODE_FILES
].count
= n
->cb
->meta_total_elements(n
->cb
->command_private
);
415 dnet_convert_addr_stat(as
, as
->num
);
417 return dnet_send_reply(orig
, cmd
, as
, sizeof(struct dnet_addr_stat
) + __DNET_CNTR_MAX
* sizeof(struct dnet_stat_count
), 1);
420 static int dnet_cmd_stat_count(struct dnet_net_state
*orig
, struct dnet_cmd
*cmd
, void *data __unused
)
422 struct dnet_node
*n
= orig
->n
;
423 struct dnet_net_state
*st
;
424 struct dnet_addr_stat
*as
;
427 as
= alloca(sizeof(struct dnet_addr_stat
) + __DNET_CNTR_MAX
* sizeof(struct dnet_stat_count
));
433 if (cmd
->flags
& DNET_ATTR_CNTR_GLOBAL
) {
434 err
= dnet_cmd_stat_count_global(orig
, cmd
, orig
->n
, as
);
436 pthread_mutex_lock(&n
->state_lock
);
438 list_for_each_entry(st
, &n
->state_list
, state_entry
) {
439 err
= dnet_cmd_stat_count_single(orig
, cmd
, st
, as
);
444 list_for_each_entry(st
, &n
->empty_state_list
, state_entry
) {
445 err
= dnet_cmd_stat_count_single(orig
, cmd
, st
, as
);
450 pthread_mutex_unlock(&n
->state_lock
);
457 static int dnet_cmd_status(struct dnet_net_state
*orig
, struct dnet_cmd
*cmd __unused
, void *data
)
459 struct dnet_node
*n
= orig
->n
;
460 struct dnet_node_status
*st
= data
;
462 dnet_convert_node_status(st
);
464 dnet_log(n
, DNET_LOG_INFO
, "%s: status-change: nflags: %x->%x, log_level: %d->%d, "
465 "status_flags: EXIT: %d, RO: %d\n",
466 dnet_dump_id(&cmd
->id
), n
->flags
, st
->nflags
, n
->log
->log_level
, st
->log_level
,
467 !!(st
->status_flags
& DNET_STATUS_EXIT
), !!(st
->status_flags
& DNET_STATUS_RO
));
469 if (st
->status_flags
!= -1) {
470 if (st
->status_flags
& DNET_STATUS_EXIT
) {
471 dnet_set_need_exit(n
);
474 if (st
->status_flags
& DNET_STATUS_RO
) {
481 if (st
->nflags
!= -1)
482 n
->flags
= st
->nflags
;
484 if (st
->log_level
!= ~0U)
485 n
->log
->log_level
= st
->log_level
;
487 st
->nflags
= n
->flags
;
488 st
->log_level
= n
->log
->log_level
;
489 st
->status_flags
= 0;
492 st
->status_flags
|= DNET_STATUS_EXIT
;
495 st
->status_flags
|= DNET_STATUS_RO
;
497 dnet_convert_node_status(st
);
499 return dnet_send_reply(orig
, cmd
, st
, sizeof(struct dnet_node_status
), 1);
502 static int dnet_cmd_auth(struct dnet_net_state
*orig
, struct dnet_cmd
*cmd __unused
, void *data
)
504 struct dnet_node
*n
= orig
->n
;
505 struct dnet_auth
*a
= data
;
508 if (cmd
->size
!= sizeof(struct dnet_auth
)) {
513 dnet_convert_auth(a
);
514 if (memcmp(n
->cookie
, a
->cookie
, DNET_AUTH_COOKIE_SIZE
)) {
516 dnet_log(n
, DNET_LOG_ERROR
, "%s: auth cookies do not match\n", dnet_state_dump_addr(orig
));
518 dnet_log(n
, DNET_LOG_INFO
, "%s: authentication succeeded\n", dnet_state_dump_addr(orig
));
525 int dnet_process_cmd_raw(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *data
)
528 unsigned long long size
= cmd
->size
;
529 struct dnet_node
*n
= st
->n
;
530 unsigned long long tid
= cmd
->trans
& ~DNET_TRANS_REPLY
;
531 struct dnet_io_attr
*io
;
532 struct timeval start
, end
;
535 if (!(cmd
->flags
& DNET_FLAGS_NOLOCK
)) {
536 dnet_oplock(n
, &cmd
->id
);
539 gettimeofday(&start
, NULL
);
543 err
= dnet_cmd_auth(st
, cmd
, data
);
545 case DNET_CMD_STATUS
:
546 err
= dnet_cmd_status(st
, cmd
, data
);
548 case DNET_CMD_REVERSE_LOOKUP
:
549 err
= dnet_cmd_reverse_lookup(st
, cmd
, data
);
552 err
= dnet_cmd_join_client(st
, cmd
, data
);
554 case DNET_CMD_ROUTE_LIST
:
555 err
= dnet_cmd_route_list(st
, cmd
);
558 err
= dnet_cmd_exec(st
, cmd
, data
);
560 case DNET_CMD_STAT_COUNT
:
561 err
= dnet_cmd_stat_count(st
, cmd
, data
);
563 case DNET_CMD_NOTIFY
:
564 if (!(cmd
->flags
& DNET_ATTR_DROP_NOTIFICATION
)) {
565 err
= dnet_notify_add(st
, cmd
);
567 * We drop 'need ack' flag, since notification
568 * transaction is a long-living one, since
569 * every notification will be sent as transaction
572 * Transaction acknowledge will be sent when
573 * notification is removed.
576 cmd
->flags
&= ~DNET_FLAGS_NEED_ACK
;
578 err
= dnet_notify_remove(st
, cmd
);
584 if (cmd
->flags
& DNET_ATTR_BULK_CHECK
)
585 err
= dnet_cmd_bulk_check(st
, cmd
, data
);
587 err
= dnet_db_list(st
, cmd
);
593 if (n
->ro
&& ((cmd
->cmd
== DNET_CMD_DEL
) || (cmd
->cmd
== DNET_CMD_WRITE
))) {
599 if (size
< sizeof(struct dnet_io_attr
)) {
600 dnet_log(st
->n
, DNET_LOG_ERROR
, "%s: invalid size: cmd: %u, rest_size: %llu\n",
601 dnet_dump_id(&cmd
->id
), cmd
->cmd
, size
);
606 dnet_convert_io_attr(io
);
608 dnet_log(n
, DNET_LOG_INFO
, "%s: %s io command, offset: %llu, size: %llu, ioflags: %x, cflags: %llx, "
609 "node-flags: %x, type: %d\n",
610 dnet_dump_id_str(io
->id
), dnet_cmd_string(cmd
->cmd
),
611 (unsigned long long)io
->offset
, (unsigned long long)io
->size
,
612 io
->flags
, (unsigned long long)cmd
->flags
,
615 if (n
->flags
& DNET_CFG_NO_CSUM
)
616 io
->flags
|= DNET_IO_FLAGS_NOCSUM
;
618 /* do not write metadata for cache-only writes */
619 if ((io
->flags
& DNET_IO_FLAGS_CACHE_ONLY
) && (io
->type
== EBLOB_TYPE_META
)) {
625 * Only allow cache for column 0
626 * In the next life (2012 I really expect) there will be no columns at all
630 * Always check cache when reading!
632 if ((io
->flags
& DNET_IO_FLAGS_CACHE
) || (cmd
->cmd
!= DNET_CMD_WRITE
)) {
633 err
= dnet_cmd_cache_io(st
, cmd
, io
, data
+ sizeof(struct dnet_io_attr
));
635 if (io
->flags
& DNET_IO_FLAGS_CACHE_ONLY
)
639 * We successfully read data from cache, do not sink to disk for it
641 if ((cmd
->cmd
== DNET_CMD_READ
) && !err
)
646 if (io
->flags
& DNET_IO_FLAGS_COMPARE_AND_SWAP
) {
647 char csum
[DNET_ID_SIZE
];
648 int csize
= DNET_ID_SIZE
;
650 err
= n
->cb
->checksum(n
, n
->cb
->command_private
, &cmd
->id
, csum
, &csize
);
652 dnet_log(n
, DNET_LOG_ERROR
, "%s: cas: checksum operation failed\n", dnet_dump_id(&cmd
->id
));
655 if (memcmp(csum
, io
->parent
, DNET_ID_SIZE
)) {
656 dnet_log(n
, DNET_LOG_ERROR
, "%s: cas: checksum mismatch\n", dnet_dump_id(&cmd
->id
));
663 if ((cmd
->cmd
== DNET_CMD_DEL
) || (io
->flags
& DNET_IO_FLAGS_META
)) {
664 err
= dnet_process_meta(st
, cmd
, data
);
668 dnet_convert_io_attr(io
);
670 /* Remove DNET_FLAGS_NEED_ACK flags for WRITE command
671 to eliminate double reply packets
672 (the first one with dnet_file_info structure,
673 the second to destroy transaction on client side) */
674 if ((cmd
->cmd
== DNET_CMD_WRITE
) || (cmd
->cmd
== DNET_CMD_READ
)) {
675 cmd
->flags
&= ~DNET_FLAGS_NEED_ACK
;
677 err
= n
->cb
->command_handler(st
, n
->cb
->command_private
, cmd
, data
);
679 /* If there was error in WRITE command - send empty reply
680 to notify client with error code and destroy transaction */
681 if (err
&& ((cmd
->cmd
== DNET_CMD_WRITE
) || (cmd
->cmd
== DNET_CMD_READ
))) {
682 cmd
->flags
|= DNET_FLAGS_NEED_ACK
;
685 if (!err
&& (cmd
->cmd
== DNET_CMD_WRITE
)) {
686 dnet_update_notify(st
, cmd
, a
, data
);
692 dnet_stat_inc(st
->stat
, cmd
->cmd
, err
);
693 if (st
->__join_state
== DNET_JOIN
)
694 dnet_counter_inc(n
, cmd
->cmd
, err
);
696 dnet_counter_inc(n
, cmd
->cmd
+ __DNET_CMD_MAX
, err
);
698 gettimeofday(&end
, NULL
);
700 diff
= (end
.tv_sec
- start
.tv_sec
) * 1000000 + (end
.tv_usec
- start
.tv_usec
);
701 dnet_log(n
, DNET_LOG_INFO
, "%s: %s: trans: %llu, cflags: %llx, time: %ld usecs, err: %d.\n",
702 dnet_dump_id(&cmd
->id
), dnet_cmd_string(cmd
->cmd
), tid
,
703 (unsigned long long)cmd
->flags
, diff
, err
);
705 if (cmd
->flags
& DNET_FLAGS_NEED_ACK
) {
708 memcpy(&ack
.id
, &cmd
->id
, sizeof(struct dnet_id
));
710 ack
.trans
= cmd
->trans
| DNET_TRANS_REPLY
;
712 ack
.flags
= cmd
->flags
& ~(DNET_FLAGS_NEED_ACK
| DNET_FLAGS_MORE
);
715 dnet_log(n
, DNET_LOG_DEBUG
, "%s: ack trans: %llu, flags: %llx, status: %d.\n",
716 dnet_dump_id(&cmd
->id
), tid
, (unsigned long long)ack
.flags
, err
);
718 dnet_convert_cmd(&ack
);
719 err
= dnet_send(st
, &ack
, sizeof(struct dnet_cmd
));
722 if (!(cmd
->flags
& DNET_FLAGS_NOLOCK
))
723 dnet_opunlock(n
, &cmd
->id
);
728 int dnet_state_join_nolock(struct dnet_net_state
*st
)
731 struct dnet_node
*n
= st
->n
;
732 struct dnet_net_state
*base
;
735 base
= dnet_state_search_nolock(n
, &n
->id
);
741 /* we do not care about group_id actually, since use direct send */
742 memcpy(&id
, &n
->id
, sizeof(id
));
744 err
= dnet_send_idc(base
, st
, &id
, 0, DNET_CMD_JOIN
, 0, 1, 0);
746 dnet_log(n
, DNET_LOG_ERROR
, "%s: failed to send join request to %s.\n",
747 dnet_dump_id(&id
), dnet_server_convert_dnet_addr(&st
->addr
));
751 st
->__join_state
= DNET_JOIN
;
752 dnet_log(n
, DNET_LOG_INFO
, "%s: successfully joined network, group %d.\n", dnet_dump_id(&id
), id
.group_id
);
755 /* this is dangerous, since base can go away and we will destroy it here,
756 * which in turn will call dnet_state_remove(), which will deadlock with n->state_lock already being held
760 dnet_state_put(base
);
766 int64_t dnet_get_param(struct dnet_node *n, struct dnet_id *id, enum id_params param)
768 struct dnet_net_state *st;
771 st = dnet_state_get_first(n, id);
776 case DNET_ID_PARAM_LA:
779 case DNET_ID_PARAM_FREE_SPACE:
790 static int dnet_compare_by_param(const void *id1, const void *id2)
792 const struct dnet_id_param *l1 = id1;
793 const struct dnet_id_param *l2 = id2;
795 if (l1->param == l2->param)
796 return l1->param_reserved - l2->param_reserved;
798 return l1->param - l2->param;
801 /* TODO: remove this function
802 int dnet_generate_ids_by_param(struct dnet_node *n, struct dnet_id *id, enum id_params param, struct dnet_id_param **dst)
804 int i, err = 0, group_num = 0;
805 struct dnet_id_param *ids;
806 struct dnet_group *g;
809 pthread_mutex_lock(&n->group_lock);
811 group_num = n->group_num;
813 ids = malloc(group_num * sizeof(struct dnet_id_param));
816 goto err_out_unlock_group;
818 for (i=0; i<group_num; ++i)
819 ids[i].group_id = n->groups[i];
821 err_out_unlock_group:
822 pthread_mutex_unlock(&n->group_lock);
830 pthread_mutex_lock(&n->state_lock);
831 list_for_each_entry(g, &n->group_list, group_entry)
834 ids = malloc(group_num * sizeof(struct dnet_id_param));
837 goto err_out_unlock_state;
840 list_for_each_entry(g, &n->group_list, group_entry) {
841 ids[pos].group_id = g->group_id;
844 err_out_unlock_state:
845 pthread_mutex_unlock(&n->state_lock);
850 for (i=0; i<group_num; ++i) {
851 id->group_id = ids[i].group_id;
852 ids[i].param = dnet_get_param(n, id, param);
855 qsort(ids, group_num, sizeof(struct dnet_id_param), dnet_compare_by_param);
858 for (i=0; i<group_num; ++i) {
859 id->group_id = ids[i].group_id;
869 static int dnet_populate_cache(struct dnet_node
*n
, struct dnet_cmd
*cmd
, struct dnet_io_attr
*io
,
870 void *data
, int fd
, size_t fd_offset
, size_t size
)
872 void *orig_data
= data
;
875 if (!data
&& fd
>= 0) {
876 ssize_t tmp_size
= size
;
878 if (size
>= n
->cache_size
)
881 orig_data
= data
= malloc(size
);
885 while (tmp_size
> 0) {
886 err
= pread(fd
, data
, tmp_size
, fd_offset
);
888 dnet_log_err(n
, "%s: failed to populate cache: pread: offset: %zd, size: %zd",
889 dnet_dump_id(&cmd
->id
), fd_offset
, size
);
899 cmd
->cmd
= DNET_CMD_WRITE
;
900 err
= dnet_cmd_cache_io(n
->st
, cmd
, io
, orig_data
);
901 cmd
->cmd
= DNET_CMD_READ
;
904 if (data
!= orig_data
)
910 int dnet_send_read_data(void *state
, struct dnet_cmd
*cmd
, struct dnet_io_attr
*io
, void *data
,
911 int fd
, uint64_t offset
, int close_on_exit
)
913 struct dnet_net_state
*st
= state
;
915 struct dnet_io_attr
*rio
;
916 int hsize
= sizeof(struct dnet_cmd
) + sizeof(struct dnet_io_attr
);
920 * A simple hack to forbid read reply sending.
921 * It is used in local stat - we do not want to send stat data
922 * back to parental client, instead server will wrap data into
923 * proper transaction reply next to this obscure packet.
925 if (io
->flags
& DNET_IO_FLAGS_SKIP_SENDING
)
936 rio
= (struct dnet_io_attr
*)(c
+ 1);
938 dnet_setup_id(&c
->id
, cmd
->id
.group_id
, io
->id
);
940 c
->flags
= cmd
->flags
& ~(DNET_FLAGS_NEED_ACK
| DNET_FLAGS_MORE
);
941 if (cmd
->flags
& DNET_FLAGS_NEED_ACK
)
942 c
->flags
|= DNET_FLAGS_MORE
;
944 c
->size
= sizeof(struct dnet_io_attr
) + io
->size
;
945 c
->trans
= cmd
->trans
| DNET_TRANS_REPLY
;
946 c
->cmd
= DNET_CMD_READ
;
948 memcpy(rio
, io
, sizeof(struct dnet_io_attr
));
950 dnet_log_raw(st
->n
, DNET_LOG_NOTICE
, "%s: %s: reply: offset: %llu, size: %llu.\n",
951 dnet_dump_id(&c
->id
), dnet_cmd_string(c
->cmd
),
952 (unsigned long long)io
->offset
, (unsigned long long)io
->size
);
954 /* only populate data which has zero offset and from column 0 */
955 if ((io
->flags
& DNET_IO_FLAGS_CACHE
) && !io
->offset
&& (io
->type
== 0)) {
956 err
= dnet_populate_cache(st
->n
, c
, rio
, data
, fd
, offset
, io
->size
);
960 dnet_convert_io_attr(rio
);
963 err
= dnet_send_data(st
, c
, hsize
, data
, io
->size
);
965 err
= dnet_send_fd(st
, c
, hsize
, fd
, offset
, io
->size
, close_on_exit
);
973 void dnet_fill_addr_attr(struct dnet_node
*n
, struct dnet_addr_attr
*attr
)
975 memcpy(&attr
->addr
, &n
->addr
, sizeof(struct dnet_addr
));
977 attr
->sock_type
= n
->sock_type
;
978 attr
->family
= n
->family
;
979 attr
->proto
= n
->proto
;
982 int dnet_read_file_info(struct dnet_node
*n
, struct dnet_id
*id
, struct dnet_file_info
*info
)
985 struct dnet_meta_update
*mu
;
986 struct dnet_meta_container mc
;
987 struct dnet_raw_id raw
;
990 memcpy(raw
.id
, id
->id
, DNET_ID_SIZE
);
992 err
= n
->cb
->meta_read(n
->cb
->command_private
, &raw
, &mc
.data
);
998 m
= dnet_meta_search(n
, &mc
, DNET_META_UPDATE
);
1000 dnet_log(n
, DNET_LOG_ERROR
, "%s: dnet_read_file_info_verify_csum: no DNET_META_UPDATE tag in metadata\n",
1006 mu
= (struct dnet_meta_update
*)m
->data
;
1007 dnet_convert_meta_update(mu
);
1009 info
->ctime
= info
->mtime
= mu
->tm
;
1018 static int dnet_fd_readlink(int fd
, char **datap
)
1024 snprintf(src
, sizeof(src
), "/proc/self/fd/%d", fd
);
1026 dst
= malloc(dsize
);
1032 err
= readlink(src
, dst
, dsize
);
1039 return err
+ 1; /* including 0-byte */
1047 int dnet_send_file_info(void *state
, struct dnet_cmd
*cmd
, int fd
, uint64_t offset
, int64_t size
)
1049 struct dnet_node
*n
= dnet_get_node_from_state(state
);
1050 struct dnet_file_info
*info
;
1051 struct dnet_addr_attr
*a
;
1056 err
= dnet_fd_readlink(fd
, &file
);
1062 a
= malloc(sizeof(struct dnet_addr_attr
) + sizeof(struct dnet_file_info
) + flen
);
1065 goto err_out_free_file
;
1067 info
= (struct dnet_file_info
*)(a
+ 1);
1069 dnet_fill_addr_attr(n
, a
);
1071 err
= fstat(fd
, &st
);
1074 dnet_log(n
, DNET_LOG_ERROR
, "%s: EBLOB: %s: info-stat: %d: %s.\n",
1075 dnet_dump_id(&cmd
->id
), file
, err
, strerror(-err
));
1079 dnet_info_from_stat(info
, &st
);
1080 /* this is not valid data from raw blob file stat */
1081 info
->ctime
.tsec
= info
->mtime
.tsec
= 0;
1083 if (cmd
->flags
& DNET_ATTR_META_TIMES
) {
1084 err
= dnet_read_file_info(n
, &cmd
->id
, info
);
1085 if ((err
== -ENOENT
) && (cmd
->flags
& DNET_ATTR_META_TIMES
))
1094 info
->offset
= offset
;
1096 if (info
->size
== 0) {
1098 dnet_log(n
, DNET_LOG_NOTICE
, "%s: EBLOB: %s: info-stat: ZERO-FILE-SIZE, fd: %d.\n",
1099 dnet_dump_id(&cmd
->id
), file
, fd
);
1104 memcpy(info
+ 1, file
, flen
);
1106 dnet_convert_file_info(info
);
1108 err
= dnet_send_reply(state
, cmd
, a
, sizeof(struct dnet_addr_attr
) + sizeof(struct dnet_file_info
) + flen
, 0);
1118 int dnet_checksum_data(struct dnet_node
*n
, void *csum
, int *csize
, const void *data
, uint64_t size
)
1120 struct dnet_transform
*t
= &n
->transform
;
1122 return t
->transform(t
->priv
, data
, size
, csum
, (unsigned int *)csize
, 0);
1125 int dnet_checksum_file(struct dnet_node
*n
, void *csum
, int *csize
, const char *file
, uint64_t offset
, uint64_t size
)
1129 err
= open(file
, O_RDONLY
);
1133 dnet_log_err(n
, "failed to open to be csummed file '%s'", file
);
1137 err
= dnet_checksum_fd(n
, csum
, csize
, fd
, offset
, size
);
1144 int dnet_checksum_fd(struct dnet_node
*n
, void *csum
, int *csize
, int fd
, uint64_t offset
, uint64_t size
)
1147 struct dnet_map_fd m
;
1152 err
= fstat(fd
, &st
);
1155 dnet_log_err(n
, "CSUM: fd: %d", fd
);
1166 err
= dnet_data_map(&m
);
1170 err
= dnet_checksum_data(n
, csum
, csize
, m
.data
, size
);
1171 dnet_data_unmap(&m
);