2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
18 #include <sys/socket.h>
29 #include "elliptics.h"
31 #include "elliptics/packet.h"
32 #include "elliptics/interface.h"
35 int dnet_transform(struct dnet_node
*n
, const void *src
, uint64_t size
, struct dnet_id
*id
)
37 struct dnet_transform
*t
= &n
->transform
;
38 unsigned int csize
= sizeof(id
->id
);
40 return t
->transform(t
->priv
, src
, size
, id
->id
, &csize
, 0);
44 static char *dnet_cmd_strings
[] = {
45 [DNET_CMD_LOOKUP
] = "LOOKUP",
46 [DNET_CMD_REVERSE_LOOKUP
] = "REVERSE_LOOKUP",
47 [DNET_CMD_JOIN
] = "JOIN",
48 [DNET_CMD_WRITE
] = "WRITE",
49 [DNET_CMD_READ
] = "READ",
50 [DNET_CMD_LIST
] = "CHECK",
51 [DNET_CMD_EXEC
] = "EXEC",
52 [DNET_CMD_ROUTE_LIST
] = "ROUTE_LIST",
53 [DNET_CMD_STAT
] = "STAT",
54 [DNET_CMD_NOTIFY
] = "NOTIFY",
55 [DNET_CMD_DEL
] = "REMOVE",
56 [DNET_CMD_STAT_COUNT
] = "STAT_COUNT",
57 [DNET_CMD_STATUS
] = "STATUS",
58 [DNET_CMD_READ_RANGE
] = "READ_RANGE",
59 [DNET_CMD_DEL_RANGE
] = "DEL_RANGE",
60 [DNET_CMD_AUTH
] = "AUTH",
61 [DNET_CMD_BULK_READ
] = "BULK_READ",
62 [DNET_CMD_DEFRAG
] = "DEFRAG",
63 [DNET_CMD_UNKNOWN
] = "UNKNOWN",
66 static char *dnet_counter_strings
[] = {
67 [DNET_CNTR_LA1
] = "DNET_CNTR_LA1",
68 [DNET_CNTR_LA5
] = "DNET_CNTR_LA5",
69 [DNET_CNTR_LA15
] = "DNET_CNTR_LA15",
70 [DNET_CNTR_BSIZE
] = "DNET_CNTR_BSIZE",
71 [DNET_CNTR_FRSIZE
] = "DNET_CNTR_FRSIZE",
72 [DNET_CNTR_BLOCKS
] = "DNET_CNTR_BLOCKS",
73 [DNET_CNTR_BFREE
] = "DNET_CNTR_BFREE",
74 [DNET_CNTR_BAVAIL
] = "DNET_CNTR_BAVAIL",
75 [DNET_CNTR_FILES
] = "DNET_CNTR_FILES",
76 [DNET_CNTR_FFREE
] = "DNET_CNTR_FFREE",
77 [DNET_CNTR_FAVAIL
] = "DNET_CNTR_FAVAIL",
78 [DNET_CNTR_FSID
] = "DNET_CNTR_FSID",
79 [DNET_CNTR_VM_ACTIVE
] = "DNET_CNTR_VM_ACTIVE",
80 [DNET_CNTR_VM_INACTIVE
] = "DNET_CNTR_VM_INACTIVE",
81 [DNET_CNTR_VM_TOTAL
] = "DNET_CNTR_VM_TOTAL",
82 [DNET_CNTR_VM_FREE
] = "DNET_CNTR_VM_FREE",
83 [DNET_CNTR_VM_CACHED
] = "DNET_CNTR_VM_CACHED",
84 [DNET_CNTR_VM_BUFFERS
] = "DNET_CNTR_VM_BUFFERS",
85 [DNET_CNTR_NODE_FILES
] = "DNET_CNTR_NODE_FILES",
86 [DNET_CNTR_NODE_LAST_MERGE
] = "DNET_CNTR_NODE_LAST_MERGE",
87 [DNET_CNTR_NODE_CHECK_COPY
] = "DNET_CNTR_NODE_CHECK_COPY",
88 [DNET_CNTR_DBR_NOREC
] = "DNET_CNTR_DBR_NOREC",
89 [DNET_CNTR_DBR_SYSTEM
] = "DNET_CNTR_DBR_SYSTEM",
90 [DNET_CNTR_DBR_ERROR
] = "DNET_CNTR_DBR_ERROR",
91 [DNET_CNTR_DBW_SYSTEM
] = "DNET_CNTR_DBW_SYSTEM",
92 [DNET_CNTR_DBW_ERROR
] = "DNET_CNTR_DBW_ERROR",
93 [DNET_CNTR_UNKNOWN
] = "UNKNOWN",
96 char *dnet_cmd_string(int cmd
)
98 if (cmd
<= 0 || cmd
>= __DNET_CMD_MAX
|| cmd
>= DNET_CMD_UNKNOWN
)
99 cmd
= DNET_CMD_UNKNOWN
;
101 return dnet_cmd_strings
[cmd
];
104 char *dnet_counter_string(int cntr
, int cmd_num
)
106 if (cntr
<= 0 || cntr
>= __DNET_CNTR_MAX
|| cntr
>= DNET_CNTR_UNKNOWN
)
107 cntr
= DNET_CNTR_UNKNOWN
;
110 return dnet_cmd_string(cntr
);
112 if (cntr
>= cmd_num
&& cntr
< (cmd_num
* 2))
113 return dnet_cmd_string(cntr
- cmd_num
);
115 cntr
+= DNET_CNTR_LA1
- cmd_num
* 2;
116 return dnet_counter_strings
[cntr
];
119 static int dnet_add_received_state(struct dnet_node
*n
, struct dnet_addr_attr
*a
,
120 int group_id
, struct dnet_raw_id
*ids
, int id_num
)
123 struct dnet_net_state
*nst
;
127 dnet_setup_id(&raw
, group_id
, ids
[0].id
);
129 nst
= dnet_state_search_by_addr(n
, &a
->addr
);
136 s
= dnet_socket_create_addr(n
, a
->sock_type
, a
->proto
, a
->family
,
137 (struct sockaddr
*)&a
->addr
.addr
, a
->addr
.addr_len
, 0);
143 join
= DNET_WANT_RECONNECT
;
144 if (n
->flags
& DNET_CFG_JOIN_NETWORK
)
147 nst
= dnet_state_create(n
, group_id
, ids
, id_num
, &a
->addr
, s
, &err
, join
, dnet_state_net_process
);
151 dnet_log(n
, DNET_LOG_NOTICE
, "%d: added received state %s.\n",
152 group_id
, dnet_state_dump_addr(nst
));
162 static int dnet_process_addr_attr(struct dnet_net_state
*st
, struct dnet_addr_attr
*a
, int group_id
, int num
)
164 struct dnet_node
*n
= st
->n
;
165 struct dnet_raw_id
*ids
;
168 ids
= (struct dnet_raw_id
*)(a
+ 1);
169 for (i
=0; i
<num
; ++i
)
170 dnet_convert_raw_id(&ids
[0]);
172 err
= dnet_add_received_state(n
, a
, group_id
, ids
, num
);
173 dnet_log(n
, DNET_LOG_DEBUG
, "%s: route list: %d entries: %d.\n", dnet_server_convert_dnet_addr(&a
->addr
), num
, err
);
178 static int dnet_recv_route_list_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
180 struct dnet_wait
*w
= priv
;
181 struct dnet_addr_attr
*a
;
185 if (is_trans_destroyed(st
, cmd
)) {
191 dnet_wakeup(w
, w
->cond
= 1);
198 if (!cmd
->size
|| err
)
201 size
= cmd
->size
+ sizeof(struct dnet_cmd
);
202 if (size
< (signed)sizeof(struct dnet_addr_cmd
)) {
207 num
= (cmd
->size
- sizeof(struct dnet_addr_attr
)) / sizeof(struct dnet_raw_id
);
213 a
= (struct dnet_addr_attr
*)(cmd
+ 1);
214 dnet_convert_addr_attr(a
);
216 err
= dnet_process_addr_attr(st
, a
, cmd
->id
.group_id
, num
);
222 int dnet_recv_route_list(struct dnet_net_state
*st
)
224 struct dnet_io_req req
;
225 struct dnet_node
*n
= st
->n
;
226 struct dnet_trans
*t
;
227 struct dnet_cmd
*cmd
;
231 w
= dnet_wait_alloc(0);
237 t
= dnet_trans_alloc(n
, sizeof(struct dnet_cmd
));
240 goto err_out_wait_put
;
243 t
->complete
= dnet_recv_route_list_complete
;
246 cmd
= (struct dnet_cmd
*)(t
+ 1);
248 cmd
->flags
= DNET_FLAGS_NEED_ACK
| DNET_FLAGS_DIRECT
| DNET_FLAGS_NOLOCK
;
251 memcpy(&t
->cmd
, cmd
, sizeof(struct dnet_cmd
));
253 cmd
->cmd
= t
->command
= DNET_CMD_ROUTE_LIST
;
255 t
->st
= dnet_state_get(st
);
256 cmd
->trans
= t
->rcv_trans
= t
->trans
= atomic_inc(&n
->trans
);
258 dnet_convert_cmd(cmd
);
260 dnet_log(n
, DNET_LOG_DEBUG
, "%s: list route request to %s.\n", dnet_dump_id(&cmd
->id
),
261 dnet_server_convert_dnet_addr(&st
->addr
));
263 memset(&req
, 0, sizeof(req
));
266 req
.hsize
= sizeof(struct dnet_cmd
);
269 err
= dnet_trans_send(t
, &req
);
271 goto err_out_destroy
;
273 err
= dnet_wait_event(w
, w
->cond
!= 0, &n
->wait_ts
);
286 static struct dnet_net_state
*dnet_add_state_socket(struct dnet_node
*n
, struct dnet_addr
*addr
, int s
, int *errp
, int join
)
288 struct dnet_net_state
*st
, dummy
;
289 char buf
[sizeof(struct dnet_addr_cmd
)];
290 struct dnet_cmd
*cmd
;
291 int err
, num
, i
, size
;
292 struct dnet_raw_id
*ids
;
294 memset(buf
, 0, sizeof(buf
));
296 cmd
= (struct dnet_cmd
*)(buf
);
298 cmd
->flags
= DNET_FLAGS_DIRECT
| DNET_FLAGS_NOLOCK
;
299 cmd
->cmd
= DNET_CMD_REVERSE_LOOKUP
;
301 dnet_convert_cmd(cmd
);
304 memset(st
, 0, sizeof(struct dnet_net_state
));
306 st
->write_s
= st
->read_s
= s
;
309 err
= dnet_send_nolock(st
, buf
, sizeof(struct dnet_cmd
));
311 dnet_log(n
, DNET_LOG_ERROR
, "Failed to send reverse "
312 "lookup message to %s, err: %d.\n",
313 dnet_server_convert_dnet_addr(addr
), err
);
317 err
= dnet_recv(st
, buf
, sizeof(buf
));
319 dnet_log(n
, DNET_LOG_ERROR
, "Failed to receive reverse "
320 "lookup headers from %s, err: %d.\n",
321 dnet_server_convert_dnet_addr(addr
), err
);
325 cmd
= (struct dnet_cmd
*)(buf
);
327 dnet_convert_addr_cmd((struct dnet_addr_cmd
*)buf
);
329 size
= cmd
->size
- sizeof(struct dnet_addr_attr
);
330 num
= size
/ sizeof(struct dnet_raw_id
);
332 dnet_log(n
, DNET_LOG_DEBUG
, "%s: waiting for %d ids\n", dnet_dump_id(&cmd
->id
), num
);
340 err
= dnet_recv(st
, ids
, size
);
342 dnet_log(n
, DNET_LOG_ERROR
, "Failed to receive reverse "
343 "lookup body (%llu bytes) from %s, err: %d.\n",
344 (unsigned long long)cmd
->size
,
345 dnet_server_convert_dnet_addr(addr
), err
);
349 for (i
=0; i
<num
; ++i
)
350 dnet_convert_raw_id(&ids
[i
]);
352 st
= dnet_state_create(n
, cmd
->id
.group_id
, ids
, num
, addr
, s
, &err
, join
, dnet_state_net_process
);
354 /* socket is already closed */
371 int dnet_add_state(struct dnet_node
*n
, struct dnet_config
*cfg
)
373 int s
, err
, join
= DNET_WANT_RECONNECT
;
374 struct dnet_addr addr
;
375 struct dnet_net_state
*st
;
377 memset(&addr
, 0, sizeof(addr
));
379 addr
.addr_len
= sizeof(addr
.addr
);
380 s
= dnet_socket_create(n
, cfg
, &addr
, 0);
383 goto err_out_reconnect
;
386 if (n
->flags
& DNET_CFG_JOIN_NETWORK
)
389 /* will close socket on error */
390 st
= dnet_add_state_socket(n
, &addr
, s
, &err
, join
);
392 goto err_out_reconnect
;
394 if (!(cfg
->flags
& DNET_CFG_NO_ROUTE_LIST
))
395 dnet_recv_route_list(st
);
400 /* if state is already exist, it should not be an error */
404 if ((err
== -EADDRINUSE
) || (err
== -ECONNREFUSED
) || (err
== -ECONNRESET
) ||
405 (err
== -EINPROGRESS
) || (err
== -EAGAIN
))
406 dnet_add_reconnect_state(n
, &addr
, join
);
410 struct dnet_write_completion
{
413 struct dnet_wait
*wait
;
416 static void dnet_write_complete_free(struct dnet_write_completion
*wc
)
418 if (atomic_dec_and_test(&wc
->wait
->refcnt
)) {
419 dnet_wait_destroy(wc
->wait
);
425 static int dnet_write_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
428 struct dnet_write_completion
*wc
= priv
;
429 struct dnet_wait
*w
= wc
->wait
;
431 if (is_trans_destroyed(st
, cmd
)) {
432 dnet_wakeup(w
, w
->cond
++);
433 dnet_write_complete_free(wc
);
438 if (!err
&& st
&& (cmd
->size
> sizeof(struct dnet_addr_attr
) + sizeof(struct dnet_file_info
))) {
439 int old_size
= wc
->size
;
442 wc
->size
+= cmd
->size
+ sizeof(struct dnet_cmd
) + sizeof(struct dnet_addr
);
443 wc
->reply
= realloc(wc
->reply
, wc
->size
);
449 data
= wc
->reply
+ old_size
;
451 memcpy(data
, &st
->addr
, sizeof(struct dnet_addr
));
452 memcpy(data
+ sizeof(struct dnet_addr
), cmd
, sizeof(struct dnet_cmd
));
453 memcpy(data
+ sizeof(struct dnet_addr
) + sizeof(struct dnet_cmd
), cmd
+ 1, cmd
->size
);
457 pthread_mutex_lock(&w
->wait_lock
);
460 pthread_mutex_unlock(&w
->wait_lock
);
465 static struct dnet_trans
*dnet_io_trans_create(struct dnet_node
*n
, struct dnet_io_control
*ctl
, int *errp
)
467 struct dnet_io_req req
;
468 struct dnet_trans
*t
= NULL
;
469 struct dnet_io_attr
*io
;
470 struct dnet_cmd
*cmd
;
471 uint64_t size
= ctl
->io
.size
;
472 uint64_t tsize
= sizeof(struct dnet_io_attr
) + sizeof(struct dnet_cmd
);
475 if (ctl
->cmd
== DNET_CMD_READ
)
478 if (ctl
->fd
< 0 && size
< DNET_COPY_IO_SIZE
)
481 t
= dnet_trans_alloc(n
, tsize
);
484 goto err_out_complete
;
486 t
->complete
= ctl
->complete
;
489 cmd
= (struct dnet_cmd
*)(t
+ 1);
490 io
= (struct dnet_io_attr
*)(cmd
+ 1);
492 if (ctl
->fd
< 0 && size
< DNET_COPY_IO_SIZE
) {
495 memcpy(data
, ctl
->data
, size
);
499 memcpy(&cmd
->id
, &ctl
->id
, sizeof(struct dnet_id
));
500 cmd
->size
= sizeof(struct dnet_io_attr
) + size
;
501 cmd
->flags
= ctl
->cflags
;
504 cmd
->cmd
= t
->command
= ctl
->cmd
;
506 memcpy(io
, &ctl
->io
, sizeof(struct dnet_io_attr
));
507 memcpy(&t
->cmd
, cmd
, sizeof(struct dnet_cmd
));
509 t
->st
= dnet_state_get_first(n
, &cmd
->id
);
512 goto err_out_destroy
;
515 cmd
->trans
= t
->rcv_trans
= t
->trans
= atomic_inc(&n
->trans
);
517 dnet_log(n
, DNET_LOG_INFO
, "%s: created trans: %llu, cmd: %s, cflags: %llx, size: %llu, offset: %llu, "
518 "fd: %d, local_offset: %llu -> %s weight: %f, mrt: %ld.\n",
519 dnet_dump_id(&ctl
->id
),
520 (unsigned long long)t
->trans
,
521 dnet_cmd_string(ctl
->cmd
), (unsigned long long)cmd
->flags
,
522 (unsigned long long)ctl
->io
.size
, (unsigned long long)ctl
->io
.offset
,
524 (unsigned long long)ctl
->local_offset
,
525 dnet_server_convert_dnet_addr(&t
->st
->addr
), t
->st
->weight
, t
->st
->median_read_time
);
527 dnet_convert_cmd(cmd
);
528 dnet_convert_io_attr(io
);
531 memset(&req
, 0, sizeof(req
));
539 req
.local_offset
= ctl
->local_offset
;
541 } else if (size
>= DNET_COPY_IO_SIZE
) {
542 req
.data
= (void *)ctl
->data
;
546 err
= dnet_trans_send(t
, &req
);
548 goto err_out_destroy
;
554 ctl
->complete(NULL
, NULL
, ctl
->priv
);
564 int dnet_trans_create_send_all(struct dnet_session
*s
, struct dnet_io_control
*ctl
)
566 struct dnet_node
*n
= s
->node
;
569 for (i
=0; i
<s
->group_num
; ++i
) {
570 ctl
->id
.group_id
= s
->groups
[i
];
572 dnet_io_trans_create(n
, ctl
, &err
);
577 dnet_io_trans_create(n
, ctl
, &err
);
584 int dnet_write_object(struct dnet_session
*s
, struct dnet_io_control
*ctl
)
586 return dnet_trans_create_send_all(s
, ctl
);
589 static int dnet_write_file_id_raw(struct dnet_session
*s
, const char *file
, struct dnet_id
*id
,
590 uint64_t local_offset
, uint64_t remote_offset
, uint64_t size
,
591 uint64_t cflags
, unsigned int ioflags
)
593 struct dnet_node
*n
= s
->node
;
594 int fd
, err
, trans_num
;
597 struct dnet_io_control ctl
;
598 struct dnet_write_completion
*wc
;
600 wc
= malloc(sizeof(struct dnet_write_completion
));
605 memset(wc
, 0, sizeof(struct dnet_write_completion
));
607 w
= dnet_wait_alloc(0);
611 dnet_log(n
, DNET_LOG_ERROR
, "Failed to allocate read waiting structure.\n");
617 fd
= open(file
, O_RDONLY
| O_LARGEFILE
| O_CLOEXEC
);
620 dnet_log_err(n
, "Failed to open to be written file '%s'", file
);
624 err
= fstat(fd
, &stat
);
627 dnet_log_err(n
, "Failed to stat to be written file '%s'", file
);
631 if (local_offset
>= (uint64_t)stat
.st_size
) {
636 if (!size
|| size
+ local_offset
>= (uint64_t)stat
.st_size
)
637 size
= stat
.st_size
- local_offset
;
639 memset(&ctl
, 0, sizeof(struct dnet_io_control
));
641 atomic_set(&w
->refcnt
, INT_MAX
);
645 ctl
.local_offset
= local_offset
;
648 ctl
.complete
= dnet_write_complete
;
651 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
652 ctl
.cmd
= DNET_CMD_WRITE
;
654 memcpy(ctl
.io
.id
, id
->id
, DNET_ID_SIZE
);
655 memcpy(ctl
.io
.parent
, id
->id
, DNET_ID_SIZE
);
657 ctl
.io
.flags
= ioflags
;
659 ctl
.io
.offset
= remote_offset
;
660 ctl
.io
.type
= id
->type
;
662 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
664 trans_num
= dnet_write_object(s
, &ctl
);
669 * 1 - the first reference counter we grabbed at allocation time
671 atomic_sub(&w
->refcnt
, INT_MAX
- trans_num
- 1);
673 err
= dnet_wait_event(w
, w
->cond
== trans_num
, &n
->wait_ts
);
674 if (err
|| w
->status
) {
679 if (!err
&& !trans_num
)
683 dnet_log(n
, DNET_LOG_ERROR
, "Failed to write file '%s' into the storage, transactions: %d, err: %d.\n", file
, trans_num
, err
);
687 dnet_log(n
, DNET_LOG_NOTICE
, "Successfully wrote file: '%s' into the storage, size: %llu.\n",
688 file
, (unsigned long long)size
);
691 dnet_write_complete_free(wc
);
698 dnet_write_complete_free(wc
);
703 int dnet_write_file_id(struct dnet_session
*s
, const char *file
, struct dnet_id
*id
, uint64_t local_offset
,
704 uint64_t remote_offset
, uint64_t size
, uint64_t cflags
, unsigned int ioflags
)
706 int err
= dnet_write_file_id_raw(s
, file
, id
, local_offset
, remote_offset
, size
, cflags
, ioflags
);
707 if (!err
&& !(ioflags
& DNET_IO_FLAGS_CACHE_ONLY
))
708 err
= dnet_create_write_metadata_strings(s
, NULL
, 0, id
, NULL
, cflags
);
713 int dnet_write_file(struct dnet_session
*s
, const char *file
, const void *remote
, int remote_len
,
714 uint64_t local_offset
, uint64_t remote_offset
, uint64_t size
,
715 uint64_t cflags
, unsigned int ioflags
, int type
)
720 dnet_transform(s
->node
, remote
, remote_len
, &id
);
723 err
= dnet_write_file_id_raw(s
, file
, &id
, local_offset
, remote_offset
, size
, cflags
, ioflags
);
724 if (!err
&& !(ioflags
& DNET_IO_FLAGS_CACHE_ONLY
))
725 err
= dnet_create_write_metadata_strings(s
, remote
, remote_len
, &id
, NULL
, cflags
);
730 static int dnet_read_file_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
734 struct dnet_io_completion
*c
= priv
;
735 struct dnet_io_attr
*io
;
738 if (is_trans_destroyed(st
, cmd
)) {
741 if (cmd
&& cmd
->status
)
744 dnet_wakeup(c
->wait
, c
->wait
->cond
= err
);
745 dnet_wait_put(c
->wait
);
754 if (cmd
->status
!= 0 || cmd
->size
== 0) {
756 goto err_out_exit_no_log
;
759 if (cmd
->size
<= sizeof(struct dnet_io_attr
)) {
760 dnet_log(n
, DNET_LOG_ERROR
, "%s: read completion error: wrong size: cmd_size: %llu, must be more than %zu.\n",
761 dnet_dump_id(&cmd
->id
), (unsigned long long)cmd
->size
,
762 sizeof(struct dnet_io_attr
));
764 goto err_out_exit_no_log
;
767 io
= (struct dnet_io_attr
*)(cmd
+ 1);
770 dnet_convert_io_attr(io
);
772 fd
= open(c
->file
, O_RDWR
| O_CREAT
| O_CLOEXEC
, 0644);
775 dnet_log_err(n
, "%s: failed to open read completion file '%s'", dnet_dump_id(&cmd
->id
), c
->file
);
779 err
= pwrite(fd
, data
, io
->size
, c
->offset
);
782 dnet_log_err(n
, "%s: failed to write data into completion file '%s'", dnet_dump_id(&cmd
->id
), c
->file
);
787 dnet_log(n
, DNET_LOG_NOTICE
, "%s: read completed: file: '%s', offset: %llu, size: %llu, status: %d.\n",
788 dnet_dump_id(&cmd
->id
), c
->file
, (unsigned long long)c
->offset
,
789 (unsigned long long)io
->size
, cmd
->status
);
796 dnet_log(n
, DNET_LOG_ERROR
, "%s: read completed: file: '%s', offset: %llu, size: %llu, status: %d, err: %d.\n",
797 dnet_dump_id(&cmd
->id
), c
->file
, (unsigned long long)io
->offset
,
798 (unsigned long long)io
->size
, cmd
->status
, err
);
800 dnet_wakeup(c
->wait
, c
->wait
->cond
= err
? err
: 1);
804 int dnet_read_object(struct dnet_session
*s
, struct dnet_io_control
*ctl
)
808 if (!dnet_io_trans_create(s
->node
, ctl
, &err
))
814 static int dnet_read_file_raw_exec(struct dnet_session
*s
, const char *file
, unsigned int len
,
815 uint64_t write_offset
, uint64_t io_offset
, uint64_t io_size
,
816 struct dnet_id
*id
, struct dnet_wait
*w
)
818 struct dnet_node
*n
= s
->node
;
819 struct dnet_io_control ctl
;
820 struct dnet_io_completion
*c
;
821 int err
, wait_init
= ~0;
823 memset(&ctl
, 0, sizeof(struct dnet_io_control
));
825 ctl
.io
.size
= io_size
;
826 ctl
.io
.offset
= io_offset
;
828 ctl
.io
.type
= id
->type
;
830 memcpy(ctl
.io
.parent
, id
->id
, DNET_ID_SIZE
);
831 memcpy(ctl
.io
.id
, id
->id
, DNET_ID_SIZE
);
833 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
836 ctl
.complete
= dnet_read_file_complete
;
837 ctl
.cmd
= DNET_CMD_READ
;
838 ctl
.cflags
= DNET_FLAGS_NEED_ACK
;
840 c
= malloc(sizeof(struct dnet_io_completion
) + len
+ 1 + sizeof(DNET_HISTORY_SUFFIX
));
842 dnet_log(n
, DNET_LOG_ERROR
, "%s: failed to allocate IO completion structure "
843 "for '%s' file reading.\n",
844 dnet_dump_id(&ctl
.id
), file
);
849 memset(c
, 0, sizeof(struct dnet_io_completion
) + len
+ 1 + sizeof(DNET_HISTORY_SUFFIX
));
851 c
->wait
= dnet_wait_get(w
);
852 c
->offset
= write_offset
;
853 c
->file
= (char *)(c
+ 1);
855 sprintf(c
->file
, "%s", file
);
860 err
= dnet_read_object(s
, &ctl
);
864 err
= dnet_wait_event(w
, w
->cond
!= wait_init
, &n
->wait_ts
);
865 if ((err
< 0) || (w
->cond
< 0)) {
866 char id_str
[2*DNET_ID_SIZE
+ 1];
869 dnet_log(n
, DNET_LOG_ERROR
, "%d:%s '%s' : failed to read data: %d\n",
870 ctl
.id
.group_id
, dnet_dump_id_len_raw(ctl
.id
.id
, DNET_ID_SIZE
, id_str
),
881 static int dnet_read_file_raw(struct dnet_session
*s
, const char *file
, struct dnet_id
*id
, uint64_t offset
, uint64_t size
)
883 struct dnet_node
*n
= s
->node
;
884 int err
= -ENOENT
, len
= strlen(file
), i
;
888 w
= dnet_wait_alloc(~0);
891 dnet_log(n
, DNET_LOG_ERROR
, "Failed to allocate read waiting.\n");
898 num
= dnet_mix_states(s
, id
, &g
);
904 for (i
=0; i
<num
; ++i
) {
907 err
= dnet_read_file_raw_exec(s
, file
, len
, 0, offset
, size
, id
, w
);
921 int dnet_read_file_id(struct dnet_session
*s
, const char *file
, struct dnet_id
*id
, uint64_t offset
, uint64_t size
)
923 return dnet_read_file_raw(s
, file
, id
, offset
, size
);
926 int dnet_read_file(struct dnet_session
*s
, const char *file
, const void *remote
, int remote_size
,
927 uint64_t offset
, uint64_t size
, int type
)
931 dnet_transform(s
->node
, remote
, remote_size
, &id
);
934 return dnet_read_file_raw(s
, file
, &id
, offset
, size
);
937 struct dnet_wait
*dnet_wait_alloc(int cond
)
942 w
= malloc(sizeof(struct dnet_wait
));
948 memset(w
, 0, sizeof(struct dnet_wait
));
950 err
= pthread_cond_init(&w
->wait
, NULL
);
954 err
= pthread_mutex_init(&w
->wait_lock
, NULL
);
956 goto err_out_destroy
;
959 atomic_init(&w
->refcnt
, 1);
964 pthread_mutex_destroy(&w
->wait_lock
);
969 void dnet_wait_destroy(struct dnet_wait
*w
)
971 pthread_mutex_destroy(&w
->wait_lock
);
972 pthread_cond_destroy(&w
->wait
);
977 static int dnet_send_cmd_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
979 struct dnet_wait
*w
= priv
;
981 if (is_trans_destroyed(st
, cmd
)) {
982 dnet_wakeup(w
, w
->cond
++);
987 w
->status
= cmd
->status
;
991 void *data
= cmd
+ 1;
993 w
->ret
= realloc(w
->ret
, w
->size
+ cmd
->size
);
998 memcpy(w
->ret
+ w
->size
, data
, cmd
->size
);
999 w
->size
+= cmd
->size
;
1006 static int dnet_send_cmd_single(struct dnet_net_state
*st
, struct dnet_wait
*w
, struct sph
*e
, uint64_t cflags
)
1008 struct dnet_trans_control ctl
;
1010 memset(&ctl
, 0, sizeof(struct dnet_trans_control
));
1012 dnet_setup_id(&ctl
.id
, st
->idc
->group
->group_id
, st
->idc
->ids
[0].raw
.id
);
1013 ctl
.size
= sizeof(struct sph
) + e
->event_size
+ e
->data_size
+ e
->binary_size
;
1014 ctl
.cmd
= DNET_CMD_EXEC
;
1015 ctl
.complete
= dnet_send_cmd_complete
;
1017 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
1019 dnet_convert_sph(e
);
1023 return dnet_trans_alloc_send_state(st
, &ctl
);
1026 static int dnet_send_cmd_raw(struct dnet_session
*s
, struct dnet_id
*id
,
1027 struct sph
*e
, void **ret
, uint64_t cflags
)
1029 struct dnet_node
*n
= s
->node
;
1030 struct dnet_net_state
*st
;
1031 int err
= -ENOENT
, num
= 0;
1032 struct dnet_wait
*w
;
1033 struct dnet_group
*g
;
1035 w
= dnet_wait_alloc(0);
1041 if (id
&& id
->group_id
!= 0) {
1043 st
= dnet_state_get_first(n
, id
);
1046 err
= dnet_send_cmd_single(st
, w
, e
, cflags
);
1049 } else if (id
&& id
->group_id
== 0) {
1050 pthread_mutex_lock(&n
->state_lock
);
1051 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
1054 id
->group_id
= g
->group_id
;
1056 st
= dnet_state_search_nolock(n
, id
);
1059 err
= dnet_send_cmd_single(st
, w
, e
, cflags
);
1065 pthread_mutex_unlock(&n
->state_lock
);
1067 pthread_mutex_lock(&n
->state_lock
);
1068 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
1069 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
1075 err
= dnet_send_cmd_single(st
, w
, e
, cflags
);
1079 pthread_mutex_unlock(&n
->state_lock
);
1082 err
= dnet_wait_event(w
, w
->cond
== num
, &n
->wait_ts
);
1103 int dnet_send_cmd(struct dnet_session
*s
, struct dnet_id
*id
, struct sph
*e
, void **ret
)
1105 return dnet_send_cmd_raw(s
, id
, e
, ret
, 0);
1108 int dnet_send_cmd_nolock(struct dnet_session
*s
, struct dnet_id
*id
, struct sph
*e
, void **ret
)
1110 return dnet_send_cmd_raw(s
, id
, e
, ret
, DNET_FLAGS_NOLOCK
);
1113 int dnet_try_reconnect(struct dnet_node
*n
)
1115 struct dnet_addr_storage
*ast
, *tmp
;
1116 struct dnet_net_state
*st
;
1120 if (list_empty(&n
->reconnect_list
))
1123 pthread_mutex_lock(&n
->reconnect_lock
);
1124 list_for_each_entry_safe(ast
, tmp
, &n
->reconnect_list
, reconnect_entry
) {
1125 list_move(&ast
->reconnect_entry
, &list
);
1127 pthread_mutex_unlock(&n
->reconnect_lock
);
1129 list_for_each_entry_safe(ast
, tmp
, &list
, reconnect_entry
) {
1130 s
= dnet_socket_create_addr(n
, n
->sock_type
, n
->proto
, n
->family
,
1131 (struct sockaddr
*)ast
->addr
.addr
, ast
->addr
.addr_len
, 0);
1135 join
= DNET_WANT_RECONNECT
;
1136 if (ast
->__join_state
== DNET_JOIN
)
1139 st
= dnet_add_state_socket(n
, &ast
->addr
, s
, &err
, join
);
1145 if (err
== -EEXIST
|| err
== -EINVAL
)
1149 dnet_add_reconnect_state(n
, &ast
->addr
, ast
->__join_state
);
1151 list_del(&ast
->reconnect_entry
);
1158 int dnet_lookup_object(struct dnet_session
*s
, struct dnet_id
*id
, uint64_t cflags
,
1159 int (* complete
)(struct dnet_net_state
*, struct dnet_cmd
*, void *),
1162 struct dnet_node
*n
= s
->node
;
1163 struct dnet_io_req req
;
1164 struct dnet_trans
*t
;
1165 struct dnet_cmd
*cmd
;
1168 t
= dnet_trans_alloc(n
, sizeof(struct dnet_cmd
));
1171 goto err_out_complete
;
1173 t
->complete
= complete
;
1176 cmd
= (struct dnet_cmd
*)(t
+ 1);
1178 memcpy(&cmd
->id
, id
, sizeof(struct dnet_id
));
1180 memcpy(&t
->cmd
, cmd
, sizeof(struct dnet_cmd
));
1182 cmd
->cmd
= t
->command
= DNET_CMD_LOOKUP
;
1183 cmd
->flags
= cflags
| DNET_FLAGS_NEED_ACK
;
1185 t
->st
= dnet_state_get_first(n
, &cmd
->id
);
1188 goto err_out_destroy
;
1191 cmd
->trans
= t
->rcv_trans
= t
->trans
= atomic_inc(&n
->trans
);
1192 dnet_convert_cmd(cmd
);
1194 dnet_log(n
, DNET_LOG_NOTICE
, "%s: lookup to %s.\n", dnet_dump_id(id
), dnet_server_convert_dnet_addr(&t
->st
->addr
));
1196 memset(&req
, 0, sizeof(req
));
1199 req
.hsize
= sizeof(struct dnet_cmd
);
1201 err
= dnet_trans_send(t
, &req
);
1203 goto err_out_destroy
;
1209 complete(NULL
, NULL
, priv
);
1217 int dnet_lookup_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
1219 struct dnet_wait
*w
= priv
;
1220 struct dnet_node
*n
= NULL
;
1221 struct dnet_addr_attr
*a
;
1222 struct dnet_net_state
*other
;
1223 char addr_str
[128] = "no-address";
1226 if (is_trans_destroyed(st
, cmd
)) {
1227 dnet_wakeup(w
, w
->cond
++);
1234 if (err
|| !cmd
->size
)
1237 if (cmd
->size
< sizeof(struct dnet_addr_attr
)) {
1238 dnet_log(st
->n
, DNET_LOG_ERROR
, "%s: wrong dnet_addr attribute size %llu, must be at least %zu.\n",
1239 dnet_dump_id(&cmd
->id
), (unsigned long long)cmd
->size
, sizeof(struct dnet_addr_attr
));
1244 a
= (struct dnet_addr_attr
*)(cmd
+ 1);
1246 dnet_convert_addr_attr(a
);
1247 dnet_server_convert_dnet_addr_raw(&a
->addr
, addr_str
, sizeof(addr_str
));
1249 if (cmd
->size
> sizeof(struct dnet_addr_attr
) + sizeof(struct dnet_file_info
)) {
1250 struct dnet_file_info
*info
= (struct dnet_file_info
*)(a
+ 1);
1252 dnet_convert_file_info(info
);
1254 dnet_log_raw(n
, DNET_LOG_NOTICE
, "%s: lookup object: %s: "
1255 "offset: %llu, size: %llu, mode: %llo, path: %s\n",
1256 dnet_dump_id(&cmd
->id
), addr_str
,
1257 (unsigned long long)info
->offset
, (unsigned long long)info
->size
,
1258 (unsigned long long)info
->mode
, (char *)(info
+ 1));
1260 dnet_log_raw(n
, DNET_LOG_INFO
, "%s: lookup object: %s\n",
1261 dnet_dump_id(&cmd
->id
), addr_str
);
1265 other
= dnet_state_search_by_addr(n
, &a
->addr
);
1267 dnet_state_put(other
);
1269 dnet_recv_route_list(st
);
1276 dnet_log(n
, DNET_LOG_ERROR
, "%s: lookup completion status: %d, err: %d.\n", dnet_dump_id(&cmd
->id
), cmd
->status
, err
);
1281 int dnet_lookup(struct dnet_session
*s
, const char *file
)
1283 struct dnet_node
*n
= s
->node
;
1284 int err
, error
= 0, i
;
1285 struct dnet_wait
*w
;
1288 w
= dnet_wait_alloc(0);
1294 dnet_transform(n
, file
, strlen(file
), &raw
);
1296 for (i
=0; i
<s
->group_num
; ++i
) {
1297 raw
.group_id
= s
->groups
[i
];
1299 err
= dnet_lookup_object(s
, &raw
, 0, dnet_lookup_complete
, dnet_wait_get(w
));
1305 err
= dnet_wait_event(w
, w
->cond
== 1, &n
->wait_ts
);
1306 if (err
|| w
->status
) {
1324 struct dnet_addr
*dnet_state_addr(struct dnet_net_state
*st
)
1329 static int dnet_stat_complete(struct dnet_net_state
*state
, struct dnet_cmd
*cmd
, void *priv
)
1331 struct dnet_wait
*w
= priv
;
1333 struct dnet_stat
*st
;
1336 if (is_trans_destroyed(state
, cmd
)) {
1337 dnet_wakeup(w
, w
->cond
++);
1342 if (cmd
->cmd
== DNET_CMD_STAT
&& cmd
->size
== sizeof(struct dnet_stat
)) {
1343 st
= (struct dnet_stat
*)(cmd
+ 1);
1345 dnet_convert_stat(st
);
1347 la
[0] = (float)st
->la
[0] / 100.0;
1348 la
[1] = (float)st
->la
[1] / 100.0;
1349 la
[2] = (float)st
->la
[2] / 100.0;
1351 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: la: %.2f %.2f %.2f.\n",
1352 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
),
1353 la
[0], la
[1], la
[2]);
1354 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: mem: "
1355 "total: %llu kB, free: %llu kB, cache: %llu kB.\n",
1356 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
),
1357 (unsigned long long)st
->vm_total
,
1358 (unsigned long long)st
->vm_free
,
1359 (unsigned long long)st
->vm_cached
);
1360 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: fs: "
1361 "total: %llu mB, avail: %llu mB, files: %llu, fsid: %llx.\n",
1362 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
),
1363 (unsigned long long)(st
->frsize
* st
->blocks
/ 1024 / 1024),
1364 (unsigned long long)(st
->bavail
* st
->bsize
/ 1024 / 1024),
1365 (unsigned long long)st
->files
, (unsigned long long)st
->fsid
);
1367 } else if (cmd
->size
>= sizeof(struct dnet_addr_stat
) && cmd
->cmd
== DNET_CMD_STAT_COUNT
) {
1368 struct dnet_addr_stat
*as
= (struct dnet_addr_stat
*)(cmd
+ 1);
1371 dnet_convert_addr_stat(as
, 0);
1373 for (i
=0; i
<as
->num
; ++i
) {
1374 if (as
->num
> as
->cmd_num
) {
1376 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: Storage commands\n",
1377 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
));
1378 if (i
== as
->cmd_num
)
1379 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: Proxy commands\n",
1380 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
));
1381 if (i
== as
->cmd_num
* 2)
1382 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: Counters\n",
1383 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
));
1385 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: cmd: %s, count: %llu, err: %llu\n",
1386 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
),
1387 dnet_counter_string(i
, as
->cmd_num
),
1388 (unsigned long long)as
->count
[i
].count
, (unsigned long long)as
->count
[i
].err
);
1395 static int dnet_request_cmd_single(struct dnet_session
*s
, struct dnet_net_state
*st
, struct dnet_trans_control
*ctl
)
1398 return dnet_trans_alloc_send_state(st
, ctl
);
1400 return dnet_trans_alloc_send(s
, ctl
);
1403 int dnet_request_stat(struct dnet_session
*s
, struct dnet_id
*id
,
1404 unsigned int cmd
, uint64_t cflags
,
1405 int (* complete
)(struct dnet_net_state
*state
,
1406 struct dnet_cmd
*cmd
,
1410 struct dnet_node
*n
= s
->node
;
1411 struct dnet_trans_control ctl
;
1412 struct dnet_wait
*w
= NULL
;
1414 struct timeval start
, end
;
1417 gettimeofday(&start
, NULL
);
1420 w
= dnet_wait_alloc(0);
1426 complete
= dnet_stat_complete
;
1430 memset(&ctl
, 0, sizeof(struct dnet_trans_control
));
1433 ctl
.complete
= complete
;
1435 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| DNET_FLAGS_NOLOCK
| cflags
;
1441 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
1443 err
= dnet_request_cmd_single(s
, NULL
, &ctl
);
1446 struct dnet_net_state
*st
;
1447 struct dnet_group
*g
;
1450 pthread_mutex_lock(&n
->state_lock
);
1451 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
1452 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
1459 dnet_setup_id(&ctl
.id
, st
->idc
->group
->group_id
, st
->idc
->ids
[0].raw
.id
);
1460 dnet_request_cmd_single(s
, st
, &ctl
);
1464 pthread_mutex_unlock(&n
->state_lock
);
1468 gettimeofday(&end
, NULL
);
1469 diff
= (end
.tv_sec
- start
.tv_sec
) * 1000000 + end
.tv_usec
- start
.tv_usec
;
1470 dnet_log(n
, DNET_LOG_NOTICE
, "stat cmd: %s: %ld usecs, num: %d.\n", dnet_cmd_string(cmd
), diff
, num
);
1475 err
= dnet_wait_event(w
, w
->cond
== num
, &n
->wait_ts
);
1477 gettimeofday(&end
, NULL
);
1478 diff
= (end
.tv_sec
- start
.tv_sec
) * 1000000 + end
.tv_usec
- start
.tv_usec
;
1479 dnet_log(n
, DNET_LOG_NOTICE
, "stat cmd: %s: %ld usecs, wait_error: %d, num: %d.\n", dnet_cmd_string(cmd
), diff
, err
, num
);
1494 struct dnet_request_cmd_priv
{
1495 struct dnet_wait
*w
;
1497 int (* complete
)(struct dnet_net_state
*state
, struct dnet_cmd
*cmd
, void *priv
);
1501 static int dnet_request_cmd_complete(struct dnet_net_state
*state
, struct dnet_cmd
*cmd
, void *priv
)
1503 struct dnet_request_cmd_priv
*p
= priv
;
1504 int err
= p
->complete(state
, cmd
, p
->priv
);
1506 if (is_trans_destroyed(state
, cmd
)) {
1507 struct dnet_wait
*w
= p
->w
;
1509 dnet_wakeup(w
, w
->cond
++);
1510 if (atomic_read(&w
->refcnt
) == 1)
1518 int dnet_request_cmd(struct dnet_session
*s
, struct dnet_trans_control
*ctl
)
1520 struct dnet_node
*n
= s
->node
;
1522 struct dnet_request_cmd_priv
*p
;
1523 struct dnet_wait
*w
;
1524 struct dnet_net_state
*st
;
1525 struct dnet_group
*g
;
1526 struct timeval start
, end
;
1529 gettimeofday(&start
, NULL
);
1531 p
= malloc(sizeof(*p
));
1537 w
= dnet_wait_alloc(0);
1544 p
->complete
= ctl
->complete
;
1545 p
->priv
= ctl
->priv
;
1547 ctl
->complete
= dnet_request_cmd_complete
;
1550 pthread_mutex_lock(&n
->state_lock
);
1551 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
1552 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
1558 ctl
->id
.group_id
= g
->group_id
;
1560 if (!(ctl
->cflags
& DNET_FLAGS_DIRECT
))
1561 dnet_setup_id(&ctl
->id
, st
->idc
->group
->group_id
, st
->idc
->ids
[0].raw
.id
);
1562 dnet_request_cmd_single(s
, st
, ctl
);
1566 pthread_mutex_unlock(&n
->state_lock
);
1568 err
= dnet_wait_event(w
, w
->cond
== num
, &n
->wait_ts
);
1570 gettimeofday(&end
, NULL
);
1571 diff
= (end
.tv_sec
- start
.tv_sec
) * 1000000 + end
.tv_usec
- start
.tv_usec
;
1572 dnet_log(n
, DNET_LOG_NOTICE
, "request cmd: %s: %ld usecs, wait_error: %d, num: %d.\n", dnet_cmd_string(ctl
->cmd
), diff
, err
, num
);
1577 if (atomic_read(&w
->refcnt
) == 1)
1589 struct dnet_update_status_priv
{
1590 struct dnet_wait
*w
;
1591 struct dnet_node_status status
;
1595 static int dnet_update_status_complete(struct dnet_net_state
*state
, struct dnet_cmd
*cmd
, void *priv
)
1597 struct dnet_update_status_priv
*p
= priv
;
1599 if (is_trans_destroyed(state
, cmd
)) {
1600 dnet_wakeup(p
->w
, p
->w
->cond
++);
1601 dnet_wait_put(p
->w
);
1602 if (atomic_dec_and_test(&p
->refcnt
))
1606 if (cmd
->size
== sizeof(struct dnet_node_status
)) {
1607 memcpy(&p
->status
, cmd
+ 1, sizeof(struct dnet_node_status
));
1614 int dnet_update_status(struct dnet_session
*s
, struct dnet_addr
*addr
, struct dnet_id
*id
, struct dnet_node_status
*status
)
1617 struct dnet_update_status_priv
*priv
;
1618 struct dnet_trans_control ctl
;
1625 memset(&ctl
, 0, sizeof(ctl
));
1628 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
1630 struct dnet_net_state
*st
;
1632 st
= dnet_state_search_by_addr(s
->node
, addr
);
1638 dnet_setup_id(&ctl
.id
, st
->idc
->group
->group_id
, st
->idc
->ids
[0].raw
.id
);
1642 priv
= malloc(sizeof(struct dnet_update_status_priv
));
1648 priv
->w
= dnet_wait_alloc(0);
1654 ctl
.complete
= dnet_update_status_complete
;
1656 ctl
.cmd
= DNET_CMD_STATUS
;
1657 ctl
.cflags
= DNET_FLAGS_NEED_ACK
;
1658 ctl
.size
= sizeof(struct dnet_node_status
);
1661 dnet_wait_get(priv
->w
);
1662 dnet_request_cmd_single(s
, NULL
, &ctl
);
1664 err
= dnet_wait_event(priv
->w
, priv
->w
->cond
== 1, &s
->node
->wait_ts
);
1665 dnet_wait_put(priv
->w
);
1667 memcpy(status
, &priv
->status
, sizeof(struct dnet_node_status
));
1669 if (atomic_dec_and_test(&priv
->refcnt
))
1676 static int dnet_remove_object_raw(struct dnet_session
*s
, struct dnet_id
*id
,
1677 int (* complete
)(struct dnet_net_state
*state
,
1678 struct dnet_cmd
*cmd
,
1680 void *priv
, uint64_t cflags
, uint64_t ioflags
)
1682 struct dnet_io_control ctl
;
1685 memset(&ctl
, 0, sizeof(struct dnet_io_control
));
1687 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
1689 memcpy(&ctl
.io
.id
, id
->id
, DNET_ID_SIZE
);
1690 memcpy(&ctl
.io
.parent
, id
->id
, DNET_ID_SIZE
);
1691 ctl
.io
.flags
= ioflags
;
1695 ctl
.cmd
= DNET_CMD_DEL
;
1696 ctl
.complete
= complete
;
1698 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
1700 err
= dnet_trans_create_send_all(s
, &ctl
);
1707 static int dnet_remove_complete(struct dnet_net_state
*state
,
1708 struct dnet_cmd
*cmd
,
1711 struct dnet_wait
*w
= priv
;
1713 if (is_trans_destroyed(state
, cmd
)) {
1714 dnet_wakeup(w
, w
->cond
++);
1720 w
->status
= cmd
->status
;
1724 int dnet_remove_object(struct dnet_session
*s
, struct dnet_id
*id
,
1725 int (* complete
)(struct dnet_net_state
*state
,
1726 struct dnet_cmd
*cmd
,
1729 uint64_t cflags
, uint64_t ioflags
)
1731 struct dnet_wait
*w
= NULL
;
1735 w
= dnet_wait_alloc(0);
1741 complete
= dnet_remove_complete
;
1746 err
= dnet_remove_object_raw(s
, id
, complete
, priv
, cflags
, ioflags
);
1751 err
= dnet_wait_event(w
, w
->cond
!= err
, &s
->node
->wait_ts
);
1755 if (w
->status
< 0) {
1771 static int dnet_remove_file_raw(struct dnet_session
*s
, struct dnet_id
*id
, uint64_t cflags
, uint64_t ioflags
)
1773 struct dnet_wait
*w
;
1776 w
= dnet_wait_alloc(0);
1782 atomic_add(&w
->refcnt
, 1024);
1783 err
= dnet_remove_object_raw(s
, id
, dnet_remove_complete
, w
, cflags
, ioflags
);
1785 atomic_sub(&w
->refcnt
, 1024);
1790 atomic_sub(&w
->refcnt
, 1024 - num
);
1792 err
= dnet_wait_event(w
, w
->cond
== num
, &s
->node
->wait_ts
);
1796 if (w
->status
< 0) {
1812 int dnet_remove_object_now(struct dnet_session
*s
, struct dnet_id
*id
, uint64_t cflags
, uint64_t ioflags
)
1814 return dnet_remove_file_raw(s
, id
, cflags
| DNET_FLAGS_NEED_ACK
| DNET_ATTR_DELETE_HISTORY
, ioflags
);
1817 int dnet_remove_file(struct dnet_session
*s
, char *remote
, int remote_len
, struct dnet_id
*id
, uint64_t cflags
, uint64_t ioflags
)
1822 dnet_transform(s
->node
, remote
, remote_len
, &raw
);
1827 return dnet_remove_file_raw(s
, id
, cflags
, ioflags
);
1830 int dnet_request_ids(struct dnet_session
*s
, struct dnet_id
*id
, uint64_t cflags
,
1831 int (* complete
)(struct dnet_net_state
*state
,
1832 struct dnet_cmd
*cmd
,
1836 struct dnet_trans_control ctl
;
1838 dnet_log_raw(s
->node
, DNET_LOG_ERROR
, "Temporarily unsupported operation.\n");
1841 memset(&ctl
, 0, sizeof(struct dnet_trans_control
));
1843 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
1844 ctl
.cmd
= DNET_CMD_LIST
;
1845 ctl
.complete
= complete
;
1847 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
1849 return dnet_trans_alloc_send(s
, &ctl
);
1852 struct dnet_node
*dnet_get_node_from_state(void *state
)
1854 struct dnet_net_state
*st
= state
;
1861 struct dnet_read_data_completion
{
1862 struct dnet_wait
*w
;
1868 static int dnet_read_data_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
1870 struct dnet_read_data_completion
*c
= priv
;
1871 struct dnet_wait
*w
= c
->w
;
1874 if (is_trans_destroyed(st
, cmd
)) {
1875 dnet_wakeup(w
, w
->cond
++);
1877 if (atomic_dec_and_test(&c
->refcnt
))
1886 if (cmd
->size
>= sizeof(struct dnet_io_attr
)) {
1887 struct dnet_io_attr
*io
= (struct dnet_io_attr
*)(cmd
+ 1);
1888 uint64_t sz
= c
->size
;
1890 dnet_convert_io_attr(io
);
1892 sz
+= io
->size
+ sizeof(struct dnet_io_attr
);
1893 c
->data
= realloc(c
->data
, sz
);
1899 memcpy(c
->data
+ c
->size
, io
, sizeof(struct dnet_io_attr
) + io
->size
);
1904 dnet_log(st
->n
, DNET_LOG_NOTICE
, "%s: object read completed: trans: %llu, status: %d, err: %d.\n",
1905 dnet_dump_id(&cmd
->id
), (unsigned long long)(cmd
->trans
& ~DNET_TRANS_REPLY
),
1911 void *dnet_read_data_wait_raw(struct dnet_session
*s
, struct dnet_id
*id
, struct dnet_io_attr
*io
,
1912 int cmd
, uint64_t cflags
, int *errp
)
1914 struct dnet_node
*n
= s
->node
;
1915 struct dnet_io_control ctl
;
1916 struct dnet_wait
*w
;
1917 struct dnet_read_data_completion
*c
;
1921 w
= dnet_wait_alloc(0);
1927 c
= malloc(sizeof(*c
));
1936 /* one for completion callback, another for this function */
1937 atomic_init(&c
->refcnt
, 2);
1939 memset(&ctl
, 0, sizeof(struct dnet_io_control
));
1944 ctl
.complete
= dnet_read_data_complete
;
1947 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
1949 memcpy(&ctl
.io
, io
, sizeof(struct dnet_io_attr
));
1950 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
1952 ctl
.id
.type
= io
->type
;
1955 err
= dnet_read_object(s
, &ctl
);
1957 goto err_out_put_complete
;
1959 err
= dnet_wait_event(w
, w
->cond
, &n
->wait_ts
);
1960 if (err
|| w
->status
) {
1961 char id_str
[2*DNET_ID_SIZE
+ 1];
1964 if ((cmd
!= DNET_CMD_READ_RANGE
) || (err
!= -ENOENT
))
1965 dnet_log(n
, DNET_LOG_ERROR
, "%d:%s : failed to read data: %d\n",
1966 ctl
.id
.group_id
, dnet_dump_id_len_raw(ctl
.id
.id
, DNET_ID_SIZE
, id_str
), err
);
1967 goto err_out_put_complete
;
1973 err_out_put_complete
:
1974 if (atomic_dec_and_test(&c
->refcnt
))
1983 static int dnet_read_recover(struct dnet_session
*s
, struct dnet_id
*id
, struct dnet_io_attr
*io
, void *data
, uint64_t cflags
)
1985 struct dnet_node
*n
= s
->node
;
1986 struct dnet_meta_container mc
;
1987 struct dnet_io_control ctl
;
1991 err
= dnet_read_meta(s
, &mc
, NULL
, 0, id
);
1993 dnet_log(n
, DNET_LOG_ERROR
, "%s: read-recovery: could read metadata: %d\n", dnet_dump_id(id
), err
);
1997 memset(&ctl
, 0, sizeof(struct dnet_io_control
));
2002 ctl
.data
= data
+ sizeof(struct dnet_io_attr
);
2003 ctl
.io
.size
-= sizeof(struct dnet_io_attr
);
2006 ctl
.cmd
= DNET_CMD_WRITE
;
2007 ctl
.cflags
= cflags
;
2009 err
= dnet_write_data_wait(s
, &ctl
, &result
);
2011 dnet_log(n
, DNET_LOG_ERROR
, "%s: read-recovery: could not write data: %d\n", dnet_dump_id(id
), err
);
2012 goto err_out_free_meta
;
2015 err
= dnet_write_metadata(s
, &mc
, 0, cflags
);
2017 goto err_out_free_result
;
2019 err_out_free_result
:
2027 void *dnet_read_data_wait_groups(struct dnet_session
*s
, struct dnet_id
*id
, int *groups
, int num
,
2028 struct dnet_io_attr
*io
, uint64_t cflags
, int *errp
)
2033 for (i
= 0; i
< num
; ++i
) {
2034 id
->group_id
= groups
[i
];
2036 data
= dnet_read_data_wait_raw(s
, id
, io
, DNET_CMD_READ
, cflags
, errp
);
2038 if ((i
!= 0) && (io
->type
== 0) && (io
->offset
== 0) && (io
->size
> sizeof(struct dnet_io_attr
))) {
2039 dnet_read_recover(s
, id
, io
, data
, cflags
);
2050 void *dnet_read_data_wait(struct dnet_session
*s
, struct dnet_id
*id
, struct dnet_io_attr
*io
,
2051 uint64_t cflags
, int *errp
)
2056 num
= dnet_mix_states(s
, id
, &g
);
2062 data
= dnet_read_data_wait_groups(s
, id
, g
, num
, io
, cflags
, &err
);
2073 int dnet_write_data_wait(struct dnet_session
*s
, struct dnet_io_control
*ctl
, void **result
)
2075 struct dnet_node
*n
= s
->node
;
2076 int err
, trans_num
= 0;
2077 struct dnet_wait
*w
;
2078 struct dnet_write_completion
*wc
;
2080 wc
= malloc(sizeof(struct dnet_write_completion
));
2085 memset(wc
, 0, sizeof(struct dnet_write_completion
));
2087 w
= dnet_wait_alloc(0);
2095 w
->status
= -ENOENT
;
2097 ctl
->complete
= dnet_write_complete
;
2099 ctl
->cmd
= DNET_CMD_WRITE
;
2100 ctl
->cflags
|= DNET_FLAGS_NEED_ACK
;
2102 memcpy(ctl
->io
.id
, ctl
->id
.id
, DNET_ID_SIZE
);
2104 atomic_set(&w
->refcnt
, INT_MAX
);
2105 trans_num
= dnet_write_object(s
, ctl
);
2110 * 1 - the first reference counter we grabbed at allocation time
2112 atomic_sub(&w
->refcnt
, INT_MAX
- trans_num
- 1);
2114 err
= dnet_wait_event(w
, w
->cond
== trans_num
, &n
->wait_ts
);
2115 if (err
|| w
->status
) {
2118 dnet_log(n
, DNET_LOG_NOTICE
, "%s: failed to wait for IO write completion, err: %d, status: %d.\n",
2119 dnet_dump_id(&ctl
->id
), err
, w
->status
);
2122 if (err
|| !trans_num
) {
2125 dnet_log(n
, DNET_LOG_ERROR
, "Failed to write data into the storage, err: %d, trans_num: %d.\n", err
, trans_num
);
2130 dnet_log(n
, DNET_LOG_NOTICE
, "%s: wrote: %llu bytes, type: %d, reply size: %d.\n",
2131 dnet_dump_id(&ctl
->id
), (unsigned long long)ctl
->io
.size
, ctl
->io
.type
, wc
->size
);
2134 *result
= wc
->reply
;
2140 dnet_write_complete_free(wc
);
2145 int dnet_lookup_addr(struct dnet_session
*s
, const void *remote
, int len
, struct dnet_id
*id
, int group_id
, char *dst
, int dlen
)
2147 struct dnet_node
*n
= s
->node
;
2149 struct dnet_net_state
*st
;
2153 dnet_transform(n
, remote
, len
, &raw
);
2156 id
->group_id
= group_id
;
2158 st
= dnet_state_get_first(n
, id
);
2162 dnet_server_convert_dnet_addr_raw(dnet_state_addr(st
), dst
, dlen
);
2170 struct dnet_weight
{
2175 static int dnet_weight_compare(const void *v1
, const void *v2
)
2177 const struct dnet_weight
*w1
= v1
;
2178 const struct dnet_weight
*w2
= v2
;
2180 return w2
->weight
- w1
->weight
;
2183 static int dnet_weight_get_winner(struct dnet_weight
*w
, int num
)
2189 for (i
= 0; i
< num
; ++i
)
2192 r
= (float)rand() / (float)RAND_MAX
;
2195 for (i
= 0; i
< num
; ++i
) {
2204 int dnet_mix_states(struct dnet_session
*s
, struct dnet_id
*id
, int **groupsp
)
2206 struct dnet_node
*n
= s
->node
;
2207 struct dnet_weight
*weights
;
2209 int group_num
, i
, num
;
2210 struct dnet_net_state
*st
;
2215 group_num
= s
->group_num
;
2217 weights
= alloca(s
->group_num
* sizeof(*weights
));
2218 groups
= malloc(s
->group_num
* sizeof(*groups
));
2220 memcpy(groups
, s
->groups
, s
->group_num
* sizeof(*groups
));
2227 if ((n
->flags
& DNET_CFG_RANDOMIZE_STATES
) || !id
) {
2228 for (i
= 0; i
< group_num
; ++i
) {
2229 weights
[i
].weight
= rand();
2230 weights
[i
].group_id
= groups
[i
];
2234 if (!(n
->flags
& DNET_CFG_MIX_STATES
)) {
2239 memset(weights
, 0, group_num
* sizeof(*weights
));
2241 for (i
= 0, num
= 0; i
< group_num
; ++i
) {
2242 id
->group_id
= groups
[i
];
2244 st
= dnet_state_get_first(n
, id
);
2246 weights
[num
].weight
= (int)st
->weight
;
2247 weights
[num
].group_id
= id
->group_id
;
2258 qsort(weights
, group_num
, sizeof(struct dnet_weight
), dnet_weight_compare
);
2260 for (i
= 0; i
< group_num
; ++i
) {
2261 int pos
= dnet_weight_get_winner(weights
, group_num
- i
);
2262 groups
[i
] = weights
[pos
].group_id
;
2264 if (pos
< group_num
- 1)
2265 memmove(&weights
[pos
], &weights
[pos
+ 1], (group_num
- 1 - pos
) * sizeof(struct dnet_weight
));
2269 dnet_session_set_groups(s
, groups
, group_num
);
2275 int dnet_data_map(struct dnet_map_fd
*map
)
2278 long page_size
= sysconf(_SC_PAGE_SIZE
);
2281 off
= map
->offset
& ~(page_size
- 1);
2282 map
->mapped_size
= ALIGN(map
->size
+ map
->offset
- off
, page_size
);
2284 map
->mapped_data
= mmap(NULL
, map
->mapped_size
, PROT_READ
, MAP_SHARED
, map
->fd
, off
);
2285 if (map
->mapped_data
== MAP_FAILED
) {
2290 map
->data
= map
->mapped_data
+ map
->offset
- off
;
2296 void dnet_data_unmap(struct dnet_map_fd
*map
)
2298 munmap(map
->mapped_data
, map
->mapped_size
);
2301 struct dnet_io_attr
*dnet_remove_range(struct dnet_session
*s
, struct dnet_io_attr
*io
, int group_id
, uint64_t cflags
, int *ret_num
, int *errp
)
2303 struct dnet_node
*n
= s
->node
;
2305 struct dnet_io_attr
*ret
, *new_ret
;
2306 struct dnet_raw_id start
, next
;
2307 struct dnet_raw_id end
;
2308 uint64_t size
= io
->size
;
2310 int err
, need_exit
= 0;
2312 memcpy(end
.id
, io
->parent
, DNET_ID_SIZE
);
2314 dnet_setup_id(&id
, group_id
, io
->id
);
2319 while (!need_exit
) {
2320 err
= dnet_search_range(n
, &id
, &start
, &next
);
2324 if ((dnet_id_cmp_str(id
.id
, next
.id
) > 0) ||
2325 !memcmp(start
.id
, next
.id
, DNET_ID_SIZE
) ||
2326 (dnet_id_cmp_str(next
.id
, end
.id
) > 0)) {
2327 memcpy(next
.id
, end
.id
, DNET_ID_SIZE
);
2331 if (n
->log
->log_level
> DNET_LOG_NOTICE
) {
2333 char start_id
[2*len
+ 1];
2334 char next_id
[2*len
+ 1];
2335 char end_id
[2*len
+ 1];
2336 char id_str
[2*len
+ 1];
2338 dnet_log(n
, DNET_LOG_NOTICE
, "id: %s, start: %s: next: %s, end: %s, size: %llu, cmp: %d\n",
2339 dnet_dump_id_len_raw(id
.id
, len
, id_str
),
2340 dnet_dump_id_len_raw(start
.id
, len
, start_id
),
2341 dnet_dump_id_len_raw(next
.id
, len
, next_id
),
2342 dnet_dump_id_len_raw(end
.id
, len
, end_id
),
2343 (unsigned long long)size
, dnet_id_cmp_str(next
.id
, end
.id
));
2346 memcpy(io
->id
, id
.id
, DNET_ID_SIZE
);
2347 memcpy(io
->parent
, next
.id
, DNET_ID_SIZE
);
2351 data
= dnet_read_data_wait_raw(s
, &id
, io
, DNET_CMD_DEL_RANGE
, cflags
, &err
);
2352 if (io
->size
!= sizeof(struct dnet_io_attr
)) {
2358 struct dnet_io_attr
*rep
= (struct dnet_io_attr
*)data
;
2360 dnet_convert_io_attr(rep
);
2362 dnet_log(n
, DNET_LOG_NOTICE
, "%s: rep_num: %llu, io_start: %llu, io_num: %llu, io_size: %llu\n",
2363 dnet_dump_id(&id
), (unsigned long long)rep
->num
, (unsigned long long)io
->start
,
2364 (unsigned long long)io
->num
, (unsigned long long)io
->size
);
2368 new_ret
= realloc(ret
, *ret_num
* sizeof(struct dnet_io_attr
));
2375 ret
[*ret_num
- 1] = *rep
;
2380 memcpy(id
.id
, next
.id
, DNET_ID_SIZE
);
2389 struct dnet_range_data
*dnet_read_range(struct dnet_session
*s
, struct dnet_io_attr
*io
, int group_id
, uint64_t cflags
, int *errp
)
2391 struct dnet_node
*n
= s
->node
;
2394 struct dnet_range_data
*ret
;
2395 struct dnet_raw_id start
, next
;
2396 struct dnet_raw_id end
;
2397 uint64_t size
= io
->size
;
2399 int err
, need_exit
= 0;
2401 memcpy(end
.id
, io
->parent
, DNET_ID_SIZE
);
2403 dnet_setup_id(&id
, group_id
, io
->id
);
2408 while (!need_exit
) {
2409 err
= dnet_search_range(n
, &id
, &start
, &next
);
2413 if ((dnet_id_cmp_str(id
.id
, next
.id
) > 0) ||
2414 !memcmp(start
.id
, next
.id
, DNET_ID_SIZE
) ||
2415 (dnet_id_cmp_str(next
.id
, end
.id
) > 0)) {
2416 memcpy(next
.id
, end
.id
, DNET_ID_SIZE
);
2420 if (n
->log
->log_level
> DNET_LOG_NOTICE
) {
2422 char start_id
[2*len
+ 1];
2423 char next_id
[2*len
+ 1];
2424 char end_id
[2*len
+ 1];
2425 char id_str
[2*len
+ 1];
2427 dnet_log(n
, DNET_LOG_NOTICE
, "id: %s, start: %s: next: %s, end: %s, size: %llu, cmp: %d\n",
2428 dnet_dump_id_len_raw(id
.id
, len
, id_str
),
2429 dnet_dump_id_len_raw(start
.id
, len
, start_id
),
2430 dnet_dump_id_len_raw(next
.id
, len
, next_id
),
2431 dnet_dump_id_len_raw(end
.id
, len
, end_id
),
2432 (unsigned long long)size
, dnet_id_cmp_str(next
.id
, end
.id
));
2435 memcpy(io
->id
, id
.id
, DNET_ID_SIZE
);
2436 memcpy(io
->parent
, next
.id
, DNET_ID_SIZE
);
2440 data
= dnet_read_data_wait_raw(s
, &id
, io
, DNET_CMD_READ_RANGE
, cflags
, &err
);
2442 struct dnet_io_attr
*rep
= data
+ io
->size
- sizeof(struct dnet_io_attr
);
2444 /* If DNET_IO_FLAGS_NODATA is set do not decrement size as 'rep' is the only structure in output */
2445 if (!(io
->flags
& DNET_IO_FLAGS_NODATA
))
2446 io
->size
-= sizeof(struct dnet_io_attr
);
2447 dnet_convert_io_attr(rep
);
2449 dnet_log(n
, DNET_LOG_NOTICE
, "%s: rep_num: %llu, io_start: %llu, io_num: %llu, io_size: %llu\n",
2450 dnet_dump_id(&id
), (unsigned long long)rep
->num
, (unsigned long long)io
->start
,
2451 (unsigned long long)io
->num
, (unsigned long long)io
->size
);
2453 if (io
->start
< rep
->num
) {
2454 rep
->num
-= io
->start
;
2456 io
->num
-= rep
->num
;
2458 if (!io
->size
&& !(io
->flags
& DNET_IO_FLAGS_NODATA
)) {
2461 struct dnet_range_data
*new_ret
;
2465 new_ret
= realloc(ret
, ret_num
* sizeof(struct dnet_range_data
));
2472 ret
[ret_num
- 1].data
= data
;
2473 ret
[ret_num
- 1].size
= io
->size
;
2480 io
->start
-= rep
->num
;
2484 memcpy(id
.id
, next
.id
, DNET_ID_SIZE
);
2496 struct dnet_read_latest_id
{
2498 struct dnet_file_info fi
;
2501 struct dnet_read_latest_ctl
{
2502 struct dnet_wait
*w
;
2504 pthread_mutex_t lock
;
2506 struct dnet_read_latest_id ids
[0];
2509 static void dnet_read_latest_ctl_put(struct dnet_read_latest_ctl
*ctl
)
2511 dnet_wakeup(ctl
->w
, ctl
->w
->cond
++);
2512 if (atomic_dec_and_test(&ctl
->w
->refcnt
)) {
2513 dnet_wait_destroy(ctl
->w
);
2514 pthread_mutex_destroy(&ctl
->lock
);
2519 static int dnet_read_latest_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
2521 struct dnet_read_latest_ctl
*ctl
= priv
;
2522 struct dnet_node
*n
;
2523 struct dnet_addr_attr
*a
;
2524 struct dnet_file_info
*fi
;
2527 if (is_trans_destroyed(st
, cmd
)) {
2528 dnet_read_latest_ctl_put(ctl
);
2535 if (err
|| !cmd
->size
)
2538 if (cmd
->size
< sizeof(struct dnet_addr_attr
) + sizeof(struct dnet_file_info
)) {
2539 dnet_log(n
, DNET_LOG_ERROR
, "%s: wrong dnet_addr attribute size %llu, must be at least %zu.\n",
2540 dnet_dump_id(&cmd
->id
), (unsigned long long)cmd
->size
,
2541 sizeof(struct dnet_addr_attr
) + sizeof(struct dnet_file_info
));
2545 a
= (struct dnet_addr_attr
*)(cmd
+ 1);
2546 fi
= (struct dnet_file_info
*)(a
+ 1);
2548 dnet_convert_addr_attr(a
);
2549 dnet_convert_file_info(fi
);
2551 pthread_mutex_lock(&ctl
->lock
);
2553 pthread_mutex_unlock(&ctl
->lock
);
2555 /* we do not care about filename */
2556 memcpy(&ctl
->ids
[pos
].fi
, fi
, sizeof(struct dnet_file_info
));
2557 memcpy(&ctl
->ids
[pos
].id
, &cmd
->id
, sizeof(struct dnet_id
));
2563 static int dnet_file_read_latest_cmp(const void *p1
, const void *p2
)
2565 const struct dnet_read_latest_id
*id1
= p1
;
2566 const struct dnet_read_latest_id
*id2
= p2
;
2568 int ret
= (int)(id2
->fi
.mtime
.tsec
- id1
->fi
.mtime
.tsec
);
2571 ret
= (int)(id2
->fi
.mtime
.tnsec
- id1
->fi
.mtime
.tnsec
);
2576 int dnet_read_latest_prepare(struct dnet_read_latest_prepare
*pr
)
2578 struct dnet_read_latest_ctl
*ctl
;
2579 int group_id
= pr
->id
.group_id
;
2582 ctl
= malloc(sizeof(struct dnet_read_latest_ctl
) + sizeof(struct dnet_read_latest_id
) * pr
->group_num
);
2587 memset(ctl
, 0, sizeof(struct dnet_read_latest_ctl
));
2589 ctl
->w
= dnet_wait_alloc(0);
2595 err
= pthread_mutex_init(&ctl
->lock
, NULL
);
2597 goto err_out_put_wait
;
2599 ctl
->num
= pr
->group_num
;
2602 for (i
= 0; i
< pr
->group_num
; ++i
) {
2603 pr
->id
.group_id
= pr
->group
[i
];
2605 dnet_wait_get(ctl
->w
);
2606 dnet_lookup_object(pr
->s
, &pr
->id
, DNET_ATTR_META_TIMES
| pr
->cflags
, dnet_read_latest_complete
, ctl
);
2609 err
= dnet_wait_event(ctl
->w
, ctl
->w
->cond
== pr
->group_num
, &pr
->s
->node
->wait_ts
);
2616 pr
->group_num
= ctl
->pos
;
2618 qsort(ctl
->ids
, pr
->group_num
, sizeof(struct dnet_read_latest_id
), dnet_file_read_latest_cmp
);
2620 for (i
= 0; i
< pr
->group_num
; ++i
) {
2621 pr
->group
[i
] = ctl
->ids
[i
].id
.group_id
;
2623 if (group_id
== pr
->group
[i
]) {
2624 const struct dnet_read_latest_id
*id0
= &ctl
->ids
[0];
2625 const struct dnet_read_latest_id
*id1
= &ctl
->ids
[i
];
2627 if (!dnet_file_read_latest_cmp(id0
, id1
)) {
2628 int tmp_group
= pr
->group
[0];
2629 pr
->group
[0] = pr
->group
[i
];
2630 pr
->group
[i
] = tmp_group
;
2636 dnet_read_latest_ctl_put(ctl
);
2640 dnet_wait_put(ctl
->w
);
2647 int dnet_read_latest(struct dnet_session
*s
, struct dnet_id
*id
, struct dnet_io_attr
*io
, uint64_t cflags
, void **datap
)
2649 struct dnet_read_latest_prepare pr
;
2650 int *g
, num
, err
, i
;
2652 if ((int)io
->num
> s
->group_num
) {
2657 err
= dnet_mix_states(s
, id
, &g
);
2663 if ((int)io
->num
> num
) {
2668 memset(&pr
, 0, sizeof(struct dnet_read_latest_prepare
));
2676 err
= dnet_read_latest_prepare(&pr
);
2681 for (i
= 0; i
< pr
.group_num
; ++i
) {
2684 id
->group_id
= pr
.group
[i
];
2685 data
= dnet_read_data_wait_raw(s
, id
, io
, DNET_CMD_READ
, cflags
, &err
);
2687 if ((pr
.group_num
!= num
) || ((i
!= 0) && (io
->type
== 0) && (io
->offset
== 0))) {
2688 dnet_read_recover(s
, id
, io
, data
, cflags
);
2703 int dnet_get_routes(struct dnet_session
*s
, struct dnet_id
**ids
, struct dnet_addr
**addrs
) {
2705 struct dnet_node
*n
= s
->node
;
2706 struct dnet_net_state
*st
;
2707 struct dnet_group
*g
;
2708 struct dnet_addr
*tmp_addrs
;
2709 struct dnet_id
*tmp_ids
;
2710 int size
= 0, count
= 0;
2716 pthread_mutex_lock(&n
->state_lock
);
2717 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
2718 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
2720 size
+= st
->idc
->id_num
;
2722 tmp_ids
= (struct dnet_id
*)realloc(*ids
, size
* sizeof(struct dnet_id
));
2729 tmp_addrs
= (struct dnet_addr
*)realloc(*addrs
, size
* sizeof(struct dnet_addr
));
2736 for (i
= 0; i
< st
->idc
->id_num
; ++i
) {
2737 dnet_setup_id(&(*ids
)[count
], g
->group_id
, st
->idc
->ids
[i
].raw
.id
);
2738 memcpy(&(*addrs
)[count
], dnet_state_addr(st
), sizeof(struct dnet_addr
));
2743 pthread_mutex_unlock(&n
->state_lock
);
2757 void *dnet_bulk_read_wait_raw(struct dnet_session
*s
, struct dnet_id
*id
, struct dnet_io_attr
*ios
,
2758 uint32_t io_num
, int cmd
, uint64_t cflags
, int *errp
)
2760 struct dnet_node
*n
= s
->node
;
2761 struct dnet_io_control ctl
;
2762 struct dnet_io_attr io
;
2763 struct dnet_wait
*w
;
2764 struct dnet_read_data_completion
*c
;
2768 w
= dnet_wait_alloc(0);
2774 c
= malloc(sizeof(*c
));
2783 /* one for completion callback, another for this function */
2784 atomic_init(&c
->refcnt
, 2);
2786 memset(&ctl
, 0, sizeof(struct dnet_io_control
));
2791 ctl
.complete
= dnet_read_data_complete
;
2794 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
2796 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
2797 memset(&ctl
.io
, 0, sizeof(struct dnet_io_attr
));
2799 memcpy(io
.id
, id
->id
, DNET_ID_SIZE
);
2800 memcpy(io
.parent
, id
->id
, DNET_ID_SIZE
);
2802 ctl
.io
.size
= io_num
* sizeof(struct dnet_io_attr
);
2806 err
= dnet_read_object(s
, &ctl
);
2808 goto err_out_put_complete
;
2810 err
= dnet_wait_event(w
, w
->cond
, &n
->wait_ts
);
2811 if (err
|| w
->status
) {
2812 char id_str
[2*DNET_ID_SIZE
+ 1];
2815 if ((cmd
!= DNET_CMD_READ_RANGE
) || (err
!= -ENOENT
))
2816 dnet_log(n
, DNET_LOG_ERROR
, "%d:%s : failed to read data: %d\n",
2817 ctl
.id
.group_id
, dnet_dump_id_len_raw(ctl
.id
.id
, DNET_ID_SIZE
, id_str
), err
);
2818 goto err_out_put_complete
;
2823 err_out_put_complete
:
2824 if (atomic_dec_and_test(&c
->refcnt
))
2834 static int dnet_io_attr_cmp(const void *d1
, const void *d2
)
2836 const struct dnet_io_attr
*io1
= d1
;
2837 const struct dnet_io_attr
*io2
= d2
;
2839 return memcmp(io1
->id
, io2
->id
, DNET_ID_SIZE
);
2842 struct dnet_range_data
*dnet_bulk_read(struct dnet_session
*s
, struct dnet_io_attr
*ios
, uint32_t io_num
, int group_id
, uint64_t cflags
, int *errp
)
2844 struct dnet_node
*n
= s
->node
;
2845 struct dnet_id id
, next_id
;
2847 struct dnet_range_data
*ret
;
2848 struct dnet_net_state
*cur
, *next
= NULL
;
2852 uint32_t i
, start
= -1;
2858 qsort(ios
, io_num
, sizeof(struct dnet_io_attr
), dnet_io_attr_cmp
);
2864 dnet_setup_id(&id
, group_id
, ios
[0].id
);
2865 id
.type
= ios
[0].type
;
2867 cur
= dnet_state_get_first(n
, &id
);
2869 dnet_log(n
, DNET_LOG_ERROR
, "%s: Can't get state for id\n", dnet_dump_id(&id
));
2874 for (i
= 0; i
< io_num
; ++i
) {
2875 if ((i
+ 1) < io_num
) {
2876 dnet_setup_id(&next_id
, group_id
, ios
[i
+1].id
);
2877 next_id
.type
= ios
[i
+1].type
;
2879 next
= dnet_state_get_first(n
, &next_id
);
2881 dnet_log(n
, DNET_LOG_ERROR
, "%s: Can't get state for id\n", dnet_dump_id(&next_id
));
2886 /* Send command only if state changes or it's a last id */
2887 if ((cur
== next
)) {
2888 dnet_state_put(next
);
2894 dnet_log(n
, DNET_LOG_NOTICE
, "start: %s: end: %s, count: %llu, addr: %s\n",
2896 dnet_dump_id(&next_id
),
2897 (unsigned long long)(i
- start
),
2898 dnet_state_dump_addr(cur
));
2900 data
= dnet_bulk_read_wait_raw(s
, &id
, ios
, i
- start
, DNET_CMD_BULK_READ
, cflags
, &err
);
2908 struct dnet_range_data
*new_ret
;
2912 new_ret
= realloc(ret
, ret_num
* sizeof(struct dnet_range_data
));
2919 ret
[ret_num
- 1].data
= data
;
2920 ret
[ret_num
- 1].size
= size
;
2926 dnet_state_put(cur
);
2929 memcpy(&id
, &next_id
, sizeof(struct dnet_id
));
2934 dnet_state_put(next
);
2935 dnet_state_put(cur
);
2945 struct dnet_range_data
dnet_bulk_write(struct dnet_session
*s
, struct dnet_io_control
*ctl
, int ctl_num
, int *errp
)
2947 struct dnet_node
*n
= s
->node
;
2948 int err
, i
, trans_num
= 0, local_trans_num
;
2949 struct dnet_wait
*w
;
2950 struct dnet_write_completion
*wc
;
2951 struct dnet_range_data ret
;
2952 struct dnet_metadata_control mcl
;
2953 struct dnet_meta_container mc
;
2954 struct dnet_io_control meta_ctl
;
2959 memset(&ret
, 0, sizeof(ret
));
2961 wc
= malloc(sizeof(struct dnet_write_completion
));
2966 memset(wc
, 0, sizeof(struct dnet_write_completion
));
2968 w
= dnet_wait_alloc(0);
2976 atomic_set(&w
->refcnt
, INT_MAX
);
2977 w
->status
= -ENOENT
;
2979 for (i
= 0; i
< ctl_num
; ++i
) {
2981 ctl
[i
].complete
= dnet_write_complete
;
2983 ctl
[i
].cmd
= DNET_CMD_WRITE
;
2984 ctl
[i
].cflags
= DNET_FLAGS_NEED_ACK
;
2986 memcpy(ctl
[i
].io
.id
, ctl
[i
].id
.id
, DNET_ID_SIZE
);
2987 memcpy(ctl
[i
].io
.parent
, ctl
[i
].id
.id
, DNET_ID_SIZE
);
2989 local_trans_num
= dnet_write_object(s
, &ctl
[i
]);
2990 if (local_trans_num
< 0)
2991 local_trans_num
= 0;
2993 trans_num
+= local_trans_num
;
2995 /* Prepare and send metadata */
2996 memset(&mcl
, 0, sizeof(mcl
));
2998 group_num
= s
->group_num
;
2999 groups
= alloca(group_num
* sizeof(int));
3000 memcpy(groups
, s
->groups
, group_num
* sizeof(int));
3002 mcl
.groups
= groups
;
3003 mcl
.group_num
= group_num
;
3005 mcl
.cflags
= ctl
[i
].cflags
;
3007 gettimeofday(&tv
, NULL
);
3008 mcl
.ts
.tv_sec
= tv
.tv_sec
;
3009 mcl
.ts
.tv_nsec
= tv
.tv_usec
* 1000;
3011 memset(&mc
, 0, sizeof(mc
));
3013 err
= dnet_create_metadata(s
, &mcl
, &mc
);
3014 dnet_log(n
, DNET_LOG_DEBUG
, "Creating metadata: err: %d", err
);
3016 dnet_convert_metadata(n
, mc
.data
, mc
.size
);
3018 memset(&meta_ctl
, 0, sizeof(struct dnet_io_control
));
3021 meta_ctl
.complete
= dnet_write_complete
;
3022 meta_ctl
.cmd
= DNET_CMD_WRITE
;
3025 meta_ctl
.cflags
= ctl
[i
].cflags
;
3027 memcpy(&meta_ctl
.id
, &ctl
[i
].id
, sizeof(struct dnet_id
));
3028 memcpy(meta_ctl
.io
.id
, ctl
[i
].id
.id
, DNET_ID_SIZE
);
3029 memcpy(meta_ctl
.io
.parent
, ctl
[i
].id
.id
, DNET_ID_SIZE
);
3030 meta_ctl
.id
.type
= meta_ctl
.io
.type
= EBLOB_TYPE_META
;
3032 meta_ctl
.io
.flags
|= DNET_IO_FLAGS_META
;
3033 meta_ctl
.io
.offset
= 0;
3034 meta_ctl
.io
.size
= mc
.size
;
3035 meta_ctl
.data
= mc
.data
;
3037 local_trans_num
= dnet_write_object(s
, &meta_ctl
);
3038 if (local_trans_num
< 0)
3039 local_trans_num
= 0;
3041 trans_num
+= local_trans_num
;
3046 * 1 - the first reference counter we grabbed at allocation time
3048 atomic_sub(&w
->refcnt
, INT_MAX
- trans_num
- 1);
3050 err
= dnet_wait_event(w
, w
->cond
== trans_num
, &n
->wait_ts
);
3051 if (err
|| w
->status
) {
3054 dnet_log(n
, DNET_LOG_NOTICE
, "%s: failed to wait for IO write completion, err: %d, status: %d.\n",
3055 dnet_dump_id(&ctl
->id
), err
, w
->status
);
3058 if (err
|| !trans_num
) {
3061 dnet_log(n
, DNET_LOG_ERROR
, "Failed to write data into the storage, err: %d, trans_num: %d.\n", err
, trans_num
);
3066 dnet_log(n
, DNET_LOG_NOTICE
, "%s: successfully wrote %llu bytes into the storage, reply size: %d.\n",
3067 dnet_dump_id(&ctl
->id
), (unsigned long long)ctl
->io
.size
, wc
->size
);
3070 ret
.data
= wc
->reply
;
3071 ret
.size
= wc
->size
;
3076 dnet_write_complete_free(wc
);
3082 int dnet_flags(struct dnet_node
*n
)
3087 static int dnet_start_defrag_complete(struct dnet_net_state
*state
, struct dnet_cmd
*cmd
, void *priv
)
3089 struct dnet_wait
*w
= priv
;
3091 if (is_trans_destroyed(state
, cmd
)) {
3092 dnet_wakeup(w
, w
->cond
++);
3100 static int dnet_start_defrag_single(struct dnet_net_state
*st
, void *priv
, uint64_t cflags
)
3102 struct dnet_trans_control ctl
;
3104 memset(&ctl
, 0, sizeof(struct dnet_trans_control
));
3106 dnet_setup_id(&ctl
.id
, st
->idc
->group
->group_id
, st
->idc
->ids
[0].raw
.id
);
3107 ctl
.cmd
= DNET_CMD_DEFRAG
;
3108 ctl
.complete
= dnet_start_defrag_complete
;
3110 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
3112 return dnet_trans_alloc_send_state(st
, &ctl
);
3115 int dnet_start_defrag(struct dnet_session
*s
, uint64_t cflags
)
3117 struct dnet_node
*n
= s
->node
;
3118 struct dnet_net_state
*st
;
3119 struct dnet_wait
*w
;
3120 struct dnet_group
*g
;
3124 w
= dnet_wait_alloc(0);
3130 pthread_mutex_lock(&n
->state_lock
);
3131 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
3132 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
3139 dnet_start_defrag_single(st
, w
, cflags
);
3143 pthread_mutex_unlock(&n
->state_lock
);
3145 err
= dnet_wait_event(w
, w
->cond
== num
, &n
->wait_ts
);