2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
18 #include <sys/socket.h>
29 #include "elliptics.h"
31 #include "elliptics/packet.h"
32 #include "elliptics/interface.h"
35 int dnet_transform(struct dnet_node
*n
, const void *src
, uint64_t size
, struct dnet_id
*id
)
37 struct dnet_transform
*t
= &n
->transform
;
38 unsigned int csize
= sizeof(id
->id
);
40 return t
->transform(t
->priv
, src
, size
, id
->id
, &csize
, 0);
43 int dnet_stat_local(struct dnet_net_state
*st
, struct dnet_id
*id
)
45 struct dnet_node
*n
= st
->n
;
48 struct dnet_io_attr
*io
;
52 cmd_size
= size
+ sizeof(struct dnet_cmd
) + sizeof(struct dnet_io_attr
);
54 cmd
= malloc(cmd_size
);
56 dnet_log(n
, DNET_LOG_ERROR
, "%s: failed to allocate %d bytes for local stat.\n",
57 dnet_dump_id(id
), cmd_size
);
62 memset(cmd
, 0, cmd_size
);
64 io
= (struct dnet_io_attr
*)(cmd
+ 1);
66 memcpy(&cmd
->id
, id
, sizeof(struct dnet_id
));
67 cmd
->size
= cmd_size
- sizeof(struct dnet_cmd
);
68 cmd
->flags
= DNET_FLAGS_NOLOCK
;
69 cmd
->cmd
= DNET_CMD_READ
;
71 io
->size
= cmd
->size
- sizeof(struct dnet_io_attr
);
73 io
->flags
= DNET_IO_FLAGS_SKIP_SENDING
;
75 memcpy(io
->parent
, id
->id
, DNET_ID_SIZE
);
76 memcpy(io
->id
, id
->id
, DNET_ID_SIZE
);
78 dnet_convert_io_attr(io
);
80 err
= n
->cb
->command_handler(st
, n
->cb
->command_private
, cmd
, io
);
81 dnet_log(n
, DNET_LOG_INFO
, "%s: local stat: io_size: %llu, err: %d.\n", dnet_dump_id(&cmd
->id
), (unsigned long long)io
->size
, err
);
89 int dnet_remove_local(struct dnet_node
*n
, struct dnet_id
*id
)
93 struct dnet_io_attr
*io
;
96 cmd_size
= sizeof(struct dnet_cmd
) + sizeof(struct dnet_io_attr
);
98 cmd
= malloc(cmd_size
);
100 dnet_log(n
, DNET_LOG_ERROR
, "%s: failed to allocate %d bytes for local remove.\n",
101 dnet_dump_id(id
), cmd_size
);
106 memset(cmd
, 0, cmd_size
);
108 io
= (struct dnet_io_attr
*)(cmd
+ 1);
111 cmd
->size
= cmd_size
- sizeof(struct dnet_cmd
);
112 cmd
->flags
= DNET_FLAGS_NOLOCK
;
113 cmd
->cmd
= DNET_CMD_DEL
;
115 io
->flags
= DNET_IO_FLAGS_SKIP_SENDING
;
117 memcpy(io
->parent
, id
->id
, DNET_ID_SIZE
);
118 memcpy(io
->id
, id
->id
, DNET_ID_SIZE
);
120 dnet_convert_io_attr(io
);
122 err
= n
->cb
->command_handler(n
->st
, n
->cb
->command_private
, cmd
, io
);
123 dnet_log(n
, DNET_LOG_NOTICE
, "%s: local remove: err: %d.\n", dnet_dump_id(&cmd
->id
), err
);
132 static void dnet_send_idc_fill(struct dnet_net_state
*st
, void *buf
, int size
,
133 struct dnet_id
*id
, uint64_t trans
, unsigned int command
, int reply
, int direct
, int more
)
135 struct dnet_node
*n
= st
->n
;
136 struct dnet_cmd
*cmd
;
137 struct dnet_raw_id
*sid
;
138 struct dnet_addr_attr
*addr
;
141 memset(buf
, 0, sizeof(*cmd
) + sizeof(*addr
));
144 addr
= (struct dnet_addr_attr
*)(cmd
+ 1);
145 sid
= (struct dnet_raw_id
*)(addr
+ 1);
147 memcpy(&cmd
->id
, id
, sizeof(struct dnet_id
));
148 cmd
->size
= size
- sizeof(struct dnet_cmd
);
151 cmd
->flags
= DNET_FLAGS_NOLOCK
;
153 cmd
->flags
|= DNET_FLAGS_MORE
;
155 cmd
->flags
|= DNET_FLAGS_DIRECT
;
157 cmd
->trans
|= DNET_TRANS_REPLY
;
161 addr
->sock_type
= n
->sock_type
;
162 addr
->family
= n
->family
;
163 addr
->proto
= n
->proto
;
164 memcpy(&addr
->addr
, &st
->addr
, sizeof(struct dnet_addr
));
166 for (i
=0; i
<st
->idc
->id_num
; ++i
) {
167 memcpy(&sid
[i
], &st
->idc
->ids
[i
].raw
, sizeof(struct dnet_raw_id
));
168 dnet_convert_raw_id(&sid
[i
]);
171 dnet_convert_addr_cmd(buf
);
174 static int dnet_send_idc(struct dnet_net_state
*orig
, struct dnet_net_state
*send
, struct dnet_id
*id
, uint64_t trans
,
175 unsigned int command
, int reply
, int direct
, int more
)
177 struct dnet_node
*n
= orig
->n
;
178 int size
= sizeof(struct dnet_addr_cmd
) + orig
->idc
->id_num
* sizeof(struct dnet_raw_id
);
181 struct timeval start
, end
;
184 gettimeofday(&start
, NULL
);
191 memset(buf
, 0, sizeof(struct dnet_addr_cmd
));
193 dnet_send_idc_fill(orig
, buf
, size
, id
, trans
, command
, reply
, direct
, more
);
195 gettimeofday(&end
, NULL
);
196 diff
= (end
.tv_sec
- start
.tv_sec
) * 1000000 + end
.tv_usec
- start
.tv_usec
;
197 dnet_log(n
, DNET_LOG_INFO
, "%s: sending address %s: %ld\n", dnet_dump_id(id
), dnet_state_dump_addr(orig
), diff
);
199 err
= dnet_send(send
, buf
, size
);
207 static int dnet_cmd_reverse_lookup(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *data __unused
)
209 struct dnet_node
*n
= st
->n
;
210 struct dnet_net_state
*base
;
213 cmd
->id
.group_id
= n
->id
.group_id
;
214 base
= dnet_node_state(n
);
216 err
= dnet_send_idc(base
, st
, &cmd
->id
, cmd
->trans
, DNET_CMD_REVERSE_LOOKUP
, 1, 0, 0);
217 dnet_state_put(base
);
223 static int dnet_check_connection(struct dnet_node
*n
, struct dnet_addr_attr
*a
)
227 s
= dnet_socket_create_addr(n
, a
->sock_type
, a
->proto
, a
->family
,
228 (struct sockaddr
*)a
->addr
.addr
, a
->addr
.addr_len
, 0);
236 static int dnet_cmd_join_client(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *data
)
238 struct dnet_node
*n
= st
->n
;
239 struct dnet_addr_attr
*a
= data
;
240 struct dnet_raw_id
*ids
;
243 dnet_convert_addr_attr(a
);
245 dnet_log(n
, DNET_LOG_DEBUG
, "%s: accepted joining client (%s), requesting statistics.\n",
246 dnet_dump_id(&cmd
->id
), dnet_server_convert_dnet_addr(&a
->addr
));
247 err
= dnet_check_connection(n
, a
);
249 dnet_log(n
, DNET_LOG_ERROR
, "%s: failed to request statistics from joining client (%s), dropping connection.\n",
250 dnet_dump_id(&cmd
->id
), dnet_server_convert_dnet_addr(&a
->addr
));
254 num
= (cmd
->size
- sizeof(struct dnet_addr_attr
)) / sizeof(struct dnet_raw_id
);
255 ids
= (struct dnet_raw_id
*)(a
+ 1);
256 for (i
=0; i
<num
; ++i
)
257 dnet_convert_raw_id(&ids
[0]);
259 pthread_mutex_lock(&n
->state_lock
);
260 list_del_init(&st
->state_entry
);
261 list_del_init(&st
->storage_state_entry
);
262 pthread_mutex_unlock(&n
->state_lock
);
264 memcpy(&st
->addr
, &a
->addr
, sizeof(struct dnet_addr
));
265 err
= dnet_idc_create(st
, cmd
->id
.group_id
, ids
, num
);
267 dnet_log(n
, DNET_LOG_INFO
, "%s: accepted join request from state %s: %d.\n", dnet_dump_id(&cmd
->id
),
268 dnet_server_convert_dnet_addr(&a
->addr
), err
);
273 static int dnet_cmd_route_list(struct dnet_net_state
*orig
, struct dnet_cmd
*cmd
)
275 struct dnet_node
*n
= orig
->n
;
276 struct dnet_net_state
*st
;
277 struct dnet_group
*g
;
278 void *buf
, *orig_buf
;
279 size_t size
= 0, send_size
= 0, sz
;
282 pthread_mutex_lock(&n
->state_lock
);
283 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
284 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
285 if (!memcmp(&st
->addr
, &orig
->addr
, sizeof(struct dnet_addr
)))
288 size
+= st
->idc
->id_num
* sizeof(struct dnet_raw_id
) + sizeof(struct dnet_addr_cmd
);
291 pthread_mutex_unlock(&n
->state_lock
);
293 orig_buf
= buf
= malloc(size
);
299 pthread_mutex_lock(&n
->state_lock
);
300 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
301 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
302 if (!memcmp(&st
->addr
, &orig
->addr
, sizeof(struct dnet_addr
)))
305 sz
= st
->idc
->id_num
* sizeof(struct dnet_raw_id
) + sizeof(struct dnet_addr_cmd
);
307 cmd
->id
.group_id
= g
->group_id
;
308 dnet_send_idc_fill(st
, buf
, sz
, &cmd
->id
, cmd
->trans
, DNET_CMD_ROUTE_LIST
, 1, 0, 1);
317 pthread_mutex_unlock(&n
->state_lock
);
319 err
= dnet_send(orig
, orig_buf
, send_size
);
329 static int dnet_cmd_exec(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *data
)
331 struct dnet_node
*n
= st
->n
;
332 struct sph
*e
= data
;
335 data
+= sizeof(struct sph
);
339 if (e
->event_size
+ e
->data_size
+ e
->binary_size
+ sizeof(struct sph
) != cmd
->size
) {
341 dnet_log(n
, DNET_LOG_ERROR
, "%s: invalid size: event-size %d, data-size %llu, binary-size %llu must be: %llu\n",
342 dnet_dump_id(&cmd
->id
),
344 (unsigned long long)e
->data_size
,
345 (unsigned long long)e
->binary_size
,
346 (unsigned long long)cmd
->size
);
350 err
= dnet_cmd_exec_raw(st
, cmd
, e
, data
);
356 static int dnet_cmd_stat_count_single(struct dnet_net_state
*orig
, struct dnet_cmd
*cmd
, struct dnet_net_state
*st
, struct dnet_addr_stat
*as
)
360 cmd
->cmd
= DNET_CMD_STAT_COUNT
;
362 memcpy(&as
->addr
, &st
->addr
, sizeof(struct dnet_addr
));
363 as
->num
= __DNET_CMD_MAX
;
364 as
->cmd_num
= __DNET_CMD_MAX
;
366 for (i
=0; i
<as
->num
; ++i
) {
367 as
->count
[i
] = st
->stat
[i
];
370 dnet_convert_addr_stat(as
, as
->num
);
372 return dnet_send_reply(orig
, cmd
, as
, sizeof(struct dnet_addr_stat
) + __DNET_CMD_MAX
* sizeof(struct dnet_stat_count
), 1);
375 static int dnet_cmd_stat_count_global(struct dnet_net_state
*orig
, struct dnet_cmd
*cmd
,
376 struct dnet_node
*n
, struct dnet_addr_stat
*as
)
381 cmd
->cmd
= DNET_CMD_STAT_COUNT
;
383 memcpy(&as
->addr
, &n
->addr
, sizeof(struct dnet_addr
));
384 as
->num
= __DNET_CNTR_MAX
;
385 as
->cmd_num
= __DNET_CMD_MAX
;
387 memcpy(as
->count
, n
->counters
, sizeof(struct dnet_stat_count
) * __DNET_CNTR_MAX
);
389 if (n
->cb
->storage_stat
) {
390 err
= n
->cb
->storage_stat(n
->cb
->command_private
, &st
);
394 as
->count
[DNET_CNTR_LA1
].count
= st
.la
[0];
395 as
->count
[DNET_CNTR_LA5
].count
= st
.la
[1];
396 as
->count
[DNET_CNTR_LA15
].count
= st
.la
[2];
397 as
->count
[DNET_CNTR_BSIZE
].count
= st
.bsize
;
398 as
->count
[DNET_CNTR_FRSIZE
].count
= st
.frsize
;
399 as
->count
[DNET_CNTR_BLOCKS
].count
= st
.blocks
;
400 as
->count
[DNET_CNTR_BFREE
].count
= st
.bfree
;
401 as
->count
[DNET_CNTR_BAVAIL
].count
= st
.bavail
;
402 as
->count
[DNET_CNTR_FILES
].count
= st
.files
;
403 as
->count
[DNET_CNTR_FFREE
].count
= st
.ffree
;
404 as
->count
[DNET_CNTR_FAVAIL
].count
= st
.favail
;
405 as
->count
[DNET_CNTR_FSID
].count
= st
.fsid
;
406 as
->count
[DNET_CNTR_VM_ACTIVE
].count
= st
.vm_active
;
407 as
->count
[DNET_CNTR_VM_INACTIVE
].count
= st
.vm_inactive
;
408 as
->count
[DNET_CNTR_VM_TOTAL
].count
= st
.vm_total
;
409 as
->count
[DNET_CNTR_VM_FREE
].count
= st
.vm_free
;
410 as
->count
[DNET_CNTR_VM_CACHED
].count
= st
.vm_cached
;
411 as
->count
[DNET_CNTR_VM_BUFFERS
].count
= st
.vm_buffers
;
413 as
->count
[DNET_CNTR_NODE_FILES
].count
= n
->cb
->meta_total_elements(n
->cb
->command_private
);
415 dnet_convert_addr_stat(as
, as
->num
);
417 return dnet_send_reply(orig
, cmd
, as
, sizeof(struct dnet_addr_stat
) + __DNET_CNTR_MAX
* sizeof(struct dnet_stat_count
), 1);
420 static int dnet_cmd_stat_count(struct dnet_net_state
*orig
, struct dnet_cmd
*cmd
, void *data __unused
)
422 struct dnet_node
*n
= orig
->n
;
423 struct dnet_net_state
*st
;
424 struct dnet_addr_stat
*as
;
427 as
= alloca(sizeof(struct dnet_addr_stat
) + __DNET_CNTR_MAX
* sizeof(struct dnet_stat_count
));
433 if (cmd
->flags
& DNET_ATTR_CNTR_GLOBAL
) {
434 err
= dnet_cmd_stat_count_global(orig
, cmd
, orig
->n
, as
);
436 pthread_mutex_lock(&n
->state_lock
);
438 list_for_each_entry(st
, &n
->state_list
, state_entry
) {
439 err
= dnet_cmd_stat_count_single(orig
, cmd
, st
, as
);
444 list_for_each_entry(st
, &n
->empty_state_list
, state_entry
) {
445 err
= dnet_cmd_stat_count_single(orig
, cmd
, st
, as
);
450 pthread_mutex_unlock(&n
->state_lock
);
457 static int dnet_cmd_status(struct dnet_net_state
*orig
, struct dnet_cmd
*cmd __unused
, void *data
)
459 struct dnet_node
*n
= orig
->n
;
460 struct dnet_node_status
*st
= data
;
462 dnet_convert_node_status(st
);
464 dnet_log(n
, DNET_LOG_INFO
, "%s: status-change: nflags: %x->%x, log_level: %d->%d, "
465 "status_flags: EXIT: %d, RO: %d\n",
466 dnet_dump_id(&cmd
->id
), n
->flags
, st
->nflags
, n
->log
->log_level
, st
->log_level
,
467 !!(st
->status_flags
& DNET_STATUS_EXIT
), !!(st
->status_flags
& DNET_STATUS_RO
));
469 if (st
->status_flags
!= -1) {
470 if (st
->status_flags
& DNET_STATUS_EXIT
) {
471 dnet_set_need_exit(n
);
474 if (st
->status_flags
& DNET_STATUS_RO
) {
481 if (st
->nflags
!= -1)
482 n
->flags
= st
->nflags
;
484 if (st
->log_level
!= ~0U)
485 n
->log
->log_level
= st
->log_level
;
487 st
->nflags
= n
->flags
;
488 st
->log_level
= n
->log
->log_level
;
489 st
->status_flags
= 0;
492 st
->status_flags
|= DNET_STATUS_EXIT
;
495 st
->status_flags
|= DNET_STATUS_RO
;
497 dnet_convert_node_status(st
);
499 return dnet_send_reply(orig
, cmd
, st
, sizeof(struct dnet_node_status
), 1);
502 static int dnet_cmd_auth(struct dnet_net_state
*orig
, struct dnet_cmd
*cmd __unused
, void *data
)
504 struct dnet_node
*n
= orig
->n
;
505 struct dnet_auth
*a
= data
;
508 if (cmd
->size
!= sizeof(struct dnet_auth
)) {
513 dnet_convert_auth(a
);
514 if (memcmp(n
->cookie
, a
->cookie
, DNET_AUTH_COOKIE_SIZE
)) {
516 dnet_log(n
, DNET_LOG_ERROR
, "%s: auth cookies do not match\n", dnet_state_dump_addr(orig
));
518 dnet_log(n
, DNET_LOG_INFO
, "%s: authentication succeeded\n", dnet_state_dump_addr(orig
));
525 int dnet_process_cmd_raw(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *data
)
528 unsigned long long size
= cmd
->size
;
529 struct dnet_node
*n
= st
->n
;
530 unsigned long long tid
= cmd
->trans
& ~DNET_TRANS_REPLY
;
531 struct dnet_io_attr
*io
;
532 struct timeval start
, end
;
535 if (!(cmd
->flags
& DNET_FLAGS_NOLOCK
)) {
536 dnet_oplock(n
, &cmd
->id
);
539 gettimeofday(&start
, NULL
);
543 err
= dnet_cmd_auth(st
, cmd
, data
);
545 case DNET_CMD_STATUS
:
546 err
= dnet_cmd_status(st
, cmd
, data
);
548 case DNET_CMD_REVERSE_LOOKUP
:
549 err
= dnet_cmd_reverse_lookup(st
, cmd
, data
);
552 err
= dnet_cmd_join_client(st
, cmd
, data
);
554 case DNET_CMD_ROUTE_LIST
:
555 err
= dnet_cmd_route_list(st
, cmd
);
558 err
= dnet_cmd_exec(st
, cmd
, data
);
560 case DNET_CMD_STAT_COUNT
:
561 err
= dnet_cmd_stat_count(st
, cmd
, data
);
563 case DNET_CMD_NOTIFY
:
564 if (!(cmd
->flags
& DNET_ATTR_DROP_NOTIFICATION
)) {
565 err
= dnet_notify_add(st
, cmd
);
567 * We drop 'need ack' flag, since notification
568 * transaction is a long-living one, since
569 * every notification will be sent as transaction
572 * Transaction acknowledge will be sent when
573 * notification is removed.
576 cmd
->flags
&= ~DNET_FLAGS_NEED_ACK
;
578 err
= dnet_notify_remove(st
, cmd
);
584 if (cmd
->flags
& DNET_ATTR_BULK_CHECK
)
585 err
= dnet_cmd_bulk_check(st
, cmd
, data
);
587 err
= dnet_db_list(st
, cmd
);
593 if (n
->ro
&& ((cmd
->cmd
== DNET_CMD_DEL
) || (cmd
->cmd
== DNET_CMD_WRITE
))) {
599 if (size
< sizeof(struct dnet_io_attr
)) {
600 dnet_log(st
->n
, DNET_LOG_ERROR
, "%s: invalid size: cmd: %u, rest_size: %llu\n",
601 dnet_dump_id(&cmd
->id
), cmd
->cmd
, size
);
606 dnet_convert_io_attr(io
);
608 dnet_log(n
, DNET_LOG_INFO
, "%s: %s io command, offset: %llu, size: %llu, ioflags: %x, cflags: %llx, "
609 "node-flags: %x, type: %d\n",
610 dnet_dump_id_str(io
->id
), dnet_cmd_string(cmd
->cmd
),
611 (unsigned long long)io
->offset
, (unsigned long long)io
->size
,
612 io
->flags
, (unsigned long long)cmd
->flags
,
615 if (n
->flags
& DNET_CFG_NO_CSUM
)
616 io
->flags
|= DNET_IO_FLAGS_NOCSUM
;
618 /* do not write metadata for cache-only writes */
619 if ((io
->flags
& DNET_IO_FLAGS_CACHE_ONLY
) && (io
->type
== EBLOB_TYPE_META
)) {
625 * Only allow cache for column 0
626 * In the next life (2012 I really expect) there will be no columns at all
630 * Always check cache when reading!
632 if ((io
->flags
& DNET_IO_FLAGS_CACHE
) || (cmd
->cmd
!= DNET_CMD_WRITE
)) {
633 err
= dnet_cmd_cache_io(st
, cmd
, io
, data
+ sizeof(struct dnet_io_attr
));
635 if (io
->flags
& DNET_IO_FLAGS_CACHE_ONLY
)
639 * We successfully read data from cache, do not sink to disk for it
641 if ((cmd
->cmd
== DNET_CMD_READ
) && !err
)
646 if ((cmd
->cmd
== DNET_CMD_DEL
) || (io
->flags
& DNET_IO_FLAGS_META
)) {
647 err
= dnet_process_meta(st
, cmd
, data
);
651 dnet_convert_io_attr(io
);
653 /* Remove DNET_FLAGS_NEED_ACK flags for WRITE command
654 to eliminate double reply packets
655 (the first one with dnet_file_info structure,
656 the second to destroy transaction on client side) */
657 if ((cmd
->cmd
== DNET_CMD_WRITE
) || (cmd
->cmd
== DNET_CMD_READ
)) {
658 cmd
->flags
&= ~DNET_FLAGS_NEED_ACK
;
660 err
= n
->cb
->command_handler(st
, n
->cb
->command_private
, cmd
, data
);
662 /* If there was error in WRITE command - send empty reply
663 to notify client with error code and destroy transaction */
664 if (err
&& ((cmd
->cmd
== DNET_CMD_WRITE
) || (cmd
->cmd
== DNET_CMD_READ
))) {
665 cmd
->flags
|= DNET_FLAGS_NEED_ACK
;
668 if (!err
&& (cmd
->cmd
== DNET_CMD_WRITE
)) {
669 dnet_update_notify(st
, cmd
, a
, data
);
675 dnet_stat_inc(st
->stat
, cmd
->cmd
, err
);
676 if (st
->__join_state
== DNET_JOIN
)
677 dnet_counter_inc(n
, cmd
->cmd
, err
);
679 dnet_counter_inc(n
, cmd
->cmd
+ __DNET_CMD_MAX
, err
);
681 gettimeofday(&end
, NULL
);
683 diff
= (end
.tv_sec
- start
.tv_sec
) * 1000000 + (end
.tv_usec
- start
.tv_usec
);
684 dnet_log(n
, DNET_LOG_INFO
, "%s: %s: trans: %llu, cflags: %llx, time: %ld usecs, err: %d.\n",
685 dnet_dump_id(&cmd
->id
), dnet_cmd_string(cmd
->cmd
), tid
,
686 (unsigned long long)cmd
->flags
, diff
, err
);
688 if (cmd
->flags
& DNET_FLAGS_NEED_ACK
) {
691 memcpy(&ack
.id
, &cmd
->id
, sizeof(struct dnet_id
));
693 ack
.trans
= cmd
->trans
| DNET_TRANS_REPLY
;
695 ack
.flags
= cmd
->flags
& ~(DNET_FLAGS_NEED_ACK
| DNET_FLAGS_MORE
);
698 dnet_log(n
, DNET_LOG_DEBUG
, "%s: ack trans: %llu, flags: %llx, status: %d.\n",
699 dnet_dump_id(&cmd
->id
), tid
, (unsigned long long)ack
.flags
, err
);
701 dnet_convert_cmd(&ack
);
702 err
= dnet_send(st
, &ack
, sizeof(struct dnet_cmd
));
705 if (!(cmd
->flags
& DNET_FLAGS_NOLOCK
))
706 dnet_opunlock(n
, &cmd
->id
);
711 int dnet_state_join_nolock(struct dnet_net_state
*st
)
714 struct dnet_node
*n
= st
->n
;
715 struct dnet_net_state
*base
;
718 base
= dnet_state_search_nolock(n
, &n
->id
);
724 /* we do not care about group_id actually, since use direct send */
725 memcpy(&id
, &n
->id
, sizeof(id
));
727 err
= dnet_send_idc(base
, st
, &id
, 0, DNET_CMD_JOIN
, 0, 1, 0);
729 dnet_log(n
, DNET_LOG_ERROR
, "%s: failed to send join request to %s.\n",
730 dnet_dump_id(&id
), dnet_server_convert_dnet_addr(&st
->addr
));
734 st
->__join_state
= DNET_JOIN
;
735 dnet_log(n
, DNET_LOG_INFO
, "%s: successfully joined network, group %d.\n", dnet_dump_id(&id
), id
.group_id
);
738 /* this is dangerous, since base can go away and we will destroy it here,
739 * which in turn will call dnet_state_remove(), which will deadlock with n->state_lock already being held
743 dnet_state_put(base
);
748 int64_t dnet_get_param(struct dnet_node
*n
, struct dnet_id
*id
, enum id_params param
)
750 struct dnet_net_state
*st
;
753 st
= dnet_state_get_first(n
, id
);
758 case DNET_ID_PARAM_LA
:
761 case DNET_ID_PARAM_FREE_SPACE
:
772 static int dnet_compare_by_param(const void *id1
, const void *id2
)
774 const struct dnet_id_param
*l1
= id1
;
775 const struct dnet_id_param
*l2
= id2
;
777 if (l1
->param
== l2
->param
)
778 return l1
->param_reserved
- l2
->param_reserved
;
780 return l1
->param
- l2
->param
;
783 int dnet_generate_ids_by_param(struct dnet_node
*n
, struct dnet_id
*id
, enum id_params param
, struct dnet_id_param
**dst
)
785 int i
, err
= 0, group_num
= 0;
786 struct dnet_id_param
*ids
;
787 struct dnet_group
*g
;
790 pthread_mutex_lock(&n
->group_lock
);
792 group_num
= n
->group_num
;
794 ids
= malloc(group_num
* sizeof(struct dnet_id_param
));
797 goto err_out_unlock_group
;
799 for (i
=0; i
<group_num
; ++i
)
800 ids
[i
].group_id
= n
->groups
[i
];
802 err_out_unlock_group
:
803 pthread_mutex_unlock(&n
->group_lock
);
811 pthread_mutex_lock(&n
->state_lock
);
812 list_for_each_entry(g
, &n
->group_list
, group_entry
)
815 ids
= malloc(group_num
* sizeof(struct dnet_id_param
));
818 goto err_out_unlock_state
;
821 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
822 ids
[pos
].group_id
= g
->group_id
;
825 err_out_unlock_state
:
826 pthread_mutex_unlock(&n
->state_lock
);
831 for (i
=0; i
<group_num
; ++i
) {
832 id
->group_id
= ids
[i
].group_id
;
833 ids
[i
].param
= dnet_get_param(n
, id
, param
);
836 qsort(ids
, group_num
, sizeof(struct dnet_id_param
), dnet_compare_by_param
);
839 for (i
=0; i
<group_num
; ++i
) {
840 id
->group_id
= ids
[i
].group_id
;
849 static int dnet_populate_cache(struct dnet_node
*n
, struct dnet_cmd
*cmd
, struct dnet_io_attr
*io
,
850 void *data
, int fd
, size_t fd_offset
, size_t size
)
852 void *orig_data
= data
;
855 if (!data
&& fd
>= 0) {
856 ssize_t tmp_size
= size
;
858 if (size
>= n
->cache_size
)
861 orig_data
= data
= malloc(size
);
865 while (tmp_size
> 0) {
866 err
= pread(fd
, data
, tmp_size
, fd_offset
);
868 dnet_log_err(n
, "%s: failed to populate cache: pread: offset: %zd, size: %zd",
869 dnet_dump_id(&cmd
->id
), fd_offset
, size
);
879 cmd
->cmd
= DNET_CMD_WRITE
;
880 err
= dnet_cmd_cache_io(n
->st
, cmd
, io
, orig_data
);
881 cmd
->cmd
= DNET_CMD_READ
;
884 if (data
!= orig_data
)
890 int dnet_send_read_data(void *state
, struct dnet_cmd
*cmd
, struct dnet_io_attr
*io
, void *data
,
891 int fd
, uint64_t offset
, int close_on_exit
)
893 struct dnet_net_state
*st
= state
;
895 struct dnet_io_attr
*rio
;
896 int hsize
= sizeof(struct dnet_cmd
) + sizeof(struct dnet_io_attr
);
900 * A simple hack to forbid read reply sending.
901 * It is used in local stat - we do not want to send stat data
902 * back to parental client, instead server will wrap data into
903 * proper transaction reply next to this obscure packet.
905 if (io
->flags
& DNET_IO_FLAGS_SKIP_SENDING
)
916 rio
= (struct dnet_io_attr
*)(c
+ 1);
918 dnet_setup_id(&c
->id
, cmd
->id
.group_id
, io
->id
);
920 c
->flags
= cmd
->flags
& ~(DNET_FLAGS_NEED_ACK
| DNET_FLAGS_MORE
);
921 if (cmd
->flags
& DNET_FLAGS_NEED_ACK
)
922 c
->flags
|= DNET_FLAGS_MORE
;
924 c
->size
= sizeof(struct dnet_io_attr
) + io
->size
;
925 c
->trans
= cmd
->trans
| DNET_TRANS_REPLY
;
926 c
->cmd
= DNET_CMD_READ
;
928 memcpy(rio
, io
, sizeof(struct dnet_io_attr
));
930 dnet_log_raw(st
->n
, DNET_LOG_NOTICE
, "%s: %s: reply: offset: %llu, size: %llu.\n",
931 dnet_dump_id(&c
->id
), dnet_cmd_string(c
->cmd
),
932 (unsigned long long)io
->offset
, (unsigned long long)io
->size
);
934 /* only populate data which has zero offset and from column 0 */
935 if ((io
->flags
& DNET_IO_FLAGS_CACHE
) && !io
->offset
&& (io
->type
== 0)) {
936 err
= dnet_populate_cache(st
->n
, c
, rio
, data
, fd
, offset
, io
->size
);
940 dnet_convert_io_attr(rio
);
943 err
= dnet_send_data(st
, c
, hsize
, data
, io
->size
);
945 err
= dnet_send_fd(st
, c
, hsize
, fd
, offset
, io
->size
, close_on_exit
);
953 void dnet_fill_addr_attr(struct dnet_node
*n
, struct dnet_addr_attr
*attr
)
955 memcpy(&attr
->addr
, &n
->addr
, sizeof(struct dnet_addr
));
957 attr
->sock_type
= n
->sock_type
;
958 attr
->family
= n
->family
;
959 attr
->proto
= n
->proto
;
962 int dnet_read_file_info(struct dnet_node
*n
, struct dnet_id
*id
, struct dnet_file_info
*info
)
965 struct dnet_meta_update
*mu
;
966 struct dnet_meta_container mc
;
967 struct dnet_raw_id raw
;
970 memcpy(raw
.id
, id
->id
, DNET_ID_SIZE
);
972 err
= n
->cb
->meta_read(n
->cb
->command_private
, &raw
, &mc
.data
);
978 m
= dnet_meta_search(n
, &mc
, DNET_META_UPDATE
);
980 dnet_log(n
, DNET_LOG_ERROR
, "%s: dnet_read_file_info_verify_csum: no DNET_META_UPDATE tag in metadata\n",
986 mu
= (struct dnet_meta_update
*)m
->data
;
987 dnet_convert_meta_update(mu
);
989 info
->ctime
= info
->mtime
= mu
->tm
;
998 static int dnet_fd_readlink(int fd
, char **datap
)
1004 snprintf(src
, sizeof(src
), "/proc/self/fd/%d", fd
);
1006 dst
= malloc(dsize
);
1012 err
= readlink(src
, dst
, dsize
);
1019 return err
+ 1; /* including 0-byte */
1027 int dnet_send_file_info(void *state
, struct dnet_cmd
*cmd
, int fd
, uint64_t offset
, int64_t size
)
1029 struct dnet_node
*n
= dnet_get_node_from_state(state
);
1030 struct dnet_file_info
*info
;
1031 struct dnet_addr_attr
*a
;
1036 err
= dnet_fd_readlink(fd
, &file
);
1042 a
= malloc(sizeof(struct dnet_addr_attr
) + sizeof(struct dnet_file_info
) + flen
);
1045 goto err_out_free_file
;
1047 info
= (struct dnet_file_info
*)(a
+ 1);
1049 dnet_fill_addr_attr(n
, a
);
1051 err
= fstat(fd
, &st
);
1054 dnet_log(n
, DNET_LOG_ERROR
, "%s: EBLOB: %s: info-stat: %d: %s.\n",
1055 dnet_dump_id(&cmd
->id
), file
, err
, strerror(-err
));
1059 dnet_info_from_stat(info
, &st
);
1060 /* this is not valid data from raw blob file stat */
1061 info
->ctime
.tsec
= info
->mtime
.tsec
= 0;
1063 if (cmd
->flags
& DNET_ATTR_META_TIMES
) {
1064 err
= dnet_read_file_info(n
, &cmd
->id
, info
);
1065 if ((err
== -ENOENT
) && (cmd
->flags
& DNET_ATTR_META_TIMES
))
1074 info
->offset
= offset
;
1076 if (info
->size
== 0) {
1078 dnet_log(n
, DNET_LOG_NOTICE
, "%s: EBLOB: %s: info-stat: ZERO-FILE-SIZE, fd: %d.\n",
1079 dnet_dump_id(&cmd
->id
), file
, fd
);
1084 memcpy(info
+ 1, file
, flen
);
1086 dnet_convert_file_info(info
);
1088 err
= dnet_send_reply(state
, cmd
, a
, sizeof(struct dnet_addr_attr
) + sizeof(struct dnet_file_info
) + flen
, 0);