2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
18 #include <sys/socket.h>
29 #include "elliptics.h"
31 #include "elliptics/packet.h"
32 #include "elliptics/interface.h"
35 int dnet_transform(struct dnet_node
*n
, const void *src
, uint64_t size
, struct dnet_id
*id
)
37 struct dnet_transform
*t
= &n
->transform
;
38 unsigned int csize
= sizeof(id
->id
);
40 return t
->transform(t
->priv
, src
, size
, id
->id
, &csize
, 0);
44 static char *dnet_cmd_strings
[] = {
45 [DNET_CMD_LOOKUP
] = "LOOKUP",
46 [DNET_CMD_REVERSE_LOOKUP
] = "REVERSE_LOOKUP",
47 [DNET_CMD_JOIN
] = "JOIN",
48 [DNET_CMD_WRITE
] = "WRITE",
49 [DNET_CMD_READ
] = "READ",
50 [DNET_CMD_LIST
] = "CHECK",
51 [DNET_CMD_EXEC
] = "EXEC",
52 [DNET_CMD_ROUTE_LIST
] = "ROUTE_LIST",
53 [DNET_CMD_STAT
] = "STAT",
54 [DNET_CMD_NOTIFY
] = "NOTIFY",
55 [DNET_CMD_DEL
] = "REMOVE",
56 [DNET_CMD_STAT_COUNT
] = "STAT_COUNT",
57 [DNET_CMD_STATUS
] = "STATUS",
58 [DNET_CMD_READ_RANGE
] = "READ_RANGE",
59 [DNET_CMD_DEL_RANGE
] = "DEL_RANGE",
60 [DNET_CMD_AUTH
] = "AUTH",
61 [DNET_CMD_BULK_READ
] = "BULK_READ",
62 [DNET_CMD_DEFRAG
] = "DEFRAG",
63 [DNET_CMD_UNKNOWN
] = "UNKNOWN",
66 static char *dnet_counter_strings
[] = {
67 [DNET_CNTR_LA1
] = "DNET_CNTR_LA1",
68 [DNET_CNTR_LA5
] = "DNET_CNTR_LA5",
69 [DNET_CNTR_LA15
] = "DNET_CNTR_LA15",
70 [DNET_CNTR_BSIZE
] = "DNET_CNTR_BSIZE",
71 [DNET_CNTR_FRSIZE
] = "DNET_CNTR_FRSIZE",
72 [DNET_CNTR_BLOCKS
] = "DNET_CNTR_BLOCKS",
73 [DNET_CNTR_BFREE
] = "DNET_CNTR_BFREE",
74 [DNET_CNTR_BAVAIL
] = "DNET_CNTR_BAVAIL",
75 [DNET_CNTR_FILES
] = "DNET_CNTR_FILES",
76 [DNET_CNTR_FFREE
] = "DNET_CNTR_FFREE",
77 [DNET_CNTR_FAVAIL
] = "DNET_CNTR_FAVAIL",
78 [DNET_CNTR_FSID
] = "DNET_CNTR_FSID",
79 [DNET_CNTR_VM_ACTIVE
] = "DNET_CNTR_VM_ACTIVE",
80 [DNET_CNTR_VM_INACTIVE
] = "DNET_CNTR_VM_INACTIVE",
81 [DNET_CNTR_VM_TOTAL
] = "DNET_CNTR_VM_TOTAL",
82 [DNET_CNTR_VM_FREE
] = "DNET_CNTR_VM_FREE",
83 [DNET_CNTR_VM_CACHED
] = "DNET_CNTR_VM_CACHED",
84 [DNET_CNTR_VM_BUFFERS
] = "DNET_CNTR_VM_BUFFERS",
85 [DNET_CNTR_NODE_FILES
] = "DNET_CNTR_NODE_FILES",
86 [DNET_CNTR_NODE_LAST_MERGE
] = "DNET_CNTR_NODE_LAST_MERGE",
87 [DNET_CNTR_NODE_CHECK_COPY
] = "DNET_CNTR_NODE_CHECK_COPY",
88 [DNET_CNTR_DBR_NOREC
] = "DNET_CNTR_DBR_NOREC",
89 [DNET_CNTR_DBR_SYSTEM
] = "DNET_CNTR_DBR_SYSTEM",
90 [DNET_CNTR_DBR_ERROR
] = "DNET_CNTR_DBR_ERROR",
91 [DNET_CNTR_DBW_SYSTEM
] = "DNET_CNTR_DBW_SYSTEM",
92 [DNET_CNTR_DBW_ERROR
] = "DNET_CNTR_DBW_ERROR",
93 [DNET_CNTR_UNKNOWN
] = "UNKNOWN",
96 char *dnet_cmd_string(int cmd
)
98 if (cmd
<= 0 || cmd
>= __DNET_CMD_MAX
|| cmd
>= DNET_CMD_UNKNOWN
)
99 cmd
= DNET_CMD_UNKNOWN
;
101 return dnet_cmd_strings
[cmd
];
104 char *dnet_counter_string(int cntr
, int cmd_num
)
106 if (cntr
<= 0 || cntr
>= __DNET_CNTR_MAX
|| cntr
>= DNET_CNTR_UNKNOWN
)
107 cntr
= DNET_CNTR_UNKNOWN
;
110 return dnet_cmd_string(cntr
);
112 if (cntr
>= cmd_num
&& cntr
< (cmd_num
* 2))
113 return dnet_cmd_string(cntr
- cmd_num
);
115 cntr
+= DNET_CNTR_LA1
- cmd_num
* 2;
116 return dnet_counter_strings
[cntr
];
119 static int dnet_add_received_state(struct dnet_node
*n
, struct dnet_addr_attr
*a
,
120 int group_id
, struct dnet_raw_id
*ids
, int id_num
)
123 struct dnet_net_state
*nst
;
127 dnet_setup_id(&raw
, group_id
, ids
[0].id
);
129 nst
= dnet_state_search_by_addr(n
, &a
->addr
);
136 s
= dnet_socket_create_addr(n
, a
->sock_type
, a
->proto
, a
->family
,
137 (struct sockaddr
*)&a
->addr
.addr
, a
->addr
.addr_len
, 0);
143 join
= DNET_WANT_RECONNECT
;
144 if (n
->flags
& DNET_CFG_JOIN_NETWORK
)
147 nst
= dnet_state_create(n
, group_id
, ids
, id_num
, &a
->addr
, s
, &err
, join
, dnet_state_net_process
);
151 dnet_log(n
, DNET_LOG_NOTICE
, "%d: added received state %s.\n",
152 group_id
, dnet_state_dump_addr(nst
));
162 static int dnet_process_addr_attr(struct dnet_net_state
*st
, struct dnet_addr_attr
*a
, int group_id
, int num
)
164 struct dnet_node
*n
= st
->n
;
165 struct dnet_raw_id
*ids
;
168 ids
= (struct dnet_raw_id
*)(a
+ 1);
169 for (i
=0; i
<num
; ++i
)
170 dnet_convert_raw_id(&ids
[0]);
172 err
= dnet_add_received_state(n
, a
, group_id
, ids
, num
);
173 dnet_log(n
, DNET_LOG_DEBUG
, "%s: route list: %d entries: %d.\n", dnet_server_convert_dnet_addr(&a
->addr
), num
, err
);
178 static int dnet_recv_route_list_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
180 struct dnet_wait
*w
= priv
;
181 struct dnet_addr_attr
*a
;
185 if (is_trans_destroyed(st
, cmd
)) {
191 dnet_wakeup(w
, w
->cond
= 1);
198 if (!cmd
->size
|| err
)
201 size
= cmd
->size
+ sizeof(struct dnet_cmd
);
202 if (size
< (signed)sizeof(struct dnet_addr_cmd
)) {
207 num
= (cmd
->size
- sizeof(struct dnet_addr_attr
)) / sizeof(struct dnet_raw_id
);
213 a
= (struct dnet_addr_attr
*)(cmd
+ 1);
214 dnet_convert_addr_attr(a
);
216 err
= dnet_process_addr_attr(st
, a
, cmd
->id
.group_id
, num
);
222 int dnet_recv_route_list(struct dnet_net_state
*st
)
224 struct dnet_io_req req
;
225 struct dnet_node
*n
= st
->n
;
226 struct dnet_trans
*t
;
227 struct dnet_cmd
*cmd
;
231 w
= dnet_wait_alloc(0);
237 t
= dnet_trans_alloc(n
, sizeof(struct dnet_cmd
));
240 goto err_out_wait_put
;
243 t
->complete
= dnet_recv_route_list_complete
;
246 cmd
= (struct dnet_cmd
*)(t
+ 1);
248 cmd
->flags
= DNET_FLAGS_NEED_ACK
| DNET_FLAGS_DIRECT
| DNET_FLAGS_NOLOCK
;
251 memcpy(&t
->cmd
, cmd
, sizeof(struct dnet_cmd
));
253 cmd
->cmd
= t
->command
= DNET_CMD_ROUTE_LIST
;
255 t
->st
= dnet_state_get(st
);
256 cmd
->trans
= t
->rcv_trans
= t
->trans
= atomic_inc(&n
->trans
);
258 dnet_convert_cmd(cmd
);
260 dnet_log(n
, DNET_LOG_DEBUG
, "%s: list route request to %s.\n", dnet_dump_id(&cmd
->id
),
261 dnet_server_convert_dnet_addr(&st
->addr
));
263 memset(&req
, 0, sizeof(req
));
266 req
.hsize
= sizeof(struct dnet_cmd
);
269 err
= dnet_trans_send(t
, &req
);
271 goto err_out_destroy
;
273 err
= dnet_wait_event(w
, w
->cond
!= 0, &n
->wait_ts
);
286 static struct dnet_net_state
*dnet_add_state_socket(struct dnet_node
*n
, struct dnet_addr
*addr
, int s
, int *errp
, int join
)
288 struct dnet_net_state
*st
, dummy
;
289 char buf
[sizeof(struct dnet_addr_cmd
)];
290 struct dnet_cmd
*cmd
;
291 int err
, num
, i
, size
;
292 struct dnet_raw_id
*ids
;
294 memset(buf
, 0, sizeof(buf
));
296 cmd
= (struct dnet_cmd
*)(buf
);
298 cmd
->flags
= DNET_FLAGS_DIRECT
| DNET_FLAGS_NOLOCK
;
299 cmd
->cmd
= DNET_CMD_REVERSE_LOOKUP
;
301 dnet_convert_cmd(cmd
);
304 memset(st
, 0, sizeof(struct dnet_net_state
));
306 st
->write_s
= st
->read_s
= s
;
309 err
= dnet_send_nolock(st
, buf
, sizeof(struct dnet_cmd
));
311 dnet_log(n
, DNET_LOG_ERROR
, "Failed to send reverse "
312 "lookup message to %s, err: %d.\n",
313 dnet_server_convert_dnet_addr(addr
), err
);
317 err
= dnet_recv(st
, buf
, sizeof(buf
));
319 dnet_log(n
, DNET_LOG_ERROR
, "Failed to receive reverse "
320 "lookup headers from %s, err: %d.\n",
321 dnet_server_convert_dnet_addr(addr
), err
);
325 cmd
= (struct dnet_cmd
*)(buf
);
327 dnet_convert_addr_cmd((struct dnet_addr_cmd
*)buf
);
329 size
= cmd
->size
- sizeof(struct dnet_addr_attr
);
330 num
= size
/ sizeof(struct dnet_raw_id
);
332 dnet_log(n
, DNET_LOG_DEBUG
, "%s: waiting for %d ids\n", dnet_dump_id(&cmd
->id
), num
);
340 err
= dnet_recv(st
, ids
, size
);
342 dnet_log(n
, DNET_LOG_ERROR
, "Failed to receive reverse "
343 "lookup body (%llu bytes) from %s, err: %d.\n",
344 (unsigned long long)cmd
->size
,
345 dnet_server_convert_dnet_addr(addr
), err
);
349 for (i
=0; i
<num
; ++i
)
350 dnet_convert_raw_id(&ids
[i
]);
352 st
= dnet_state_create(n
, cmd
->id
.group_id
, ids
, num
, addr
, s
, &err
, join
, dnet_state_net_process
);
354 /* socket is already closed */
371 int dnet_add_state(struct dnet_node
*n
, struct dnet_config
*cfg
)
373 int s
, err
, join
= DNET_WANT_RECONNECT
;
374 struct dnet_addr addr
;
375 struct dnet_net_state
*st
;
377 memset(&addr
, 0, sizeof(addr
));
379 addr
.addr_len
= sizeof(addr
.addr
);
380 s
= dnet_socket_create(n
, cfg
, &addr
, 0);
383 goto err_out_reconnect
;
386 if (n
->flags
& DNET_CFG_JOIN_NETWORK
)
389 /* will close socket on error */
390 st
= dnet_add_state_socket(n
, &addr
, s
, &err
, join
);
392 goto err_out_reconnect
;
394 if (!(cfg
->flags
& DNET_CFG_NO_ROUTE_LIST
))
395 dnet_recv_route_list(st
);
400 /* if state is already exist, it should not be an error */
404 if ((err
== -EADDRINUSE
) || (err
== -ECONNREFUSED
) || (err
== -ECONNRESET
) ||
405 (err
== -EINPROGRESS
) || (err
== -EAGAIN
))
406 dnet_add_reconnect_state(n
, &addr
, join
);
410 struct dnet_write_completion
{
413 struct dnet_wait
*wait
;
416 static void dnet_write_complete_free(struct dnet_write_completion
*wc
)
418 if (atomic_dec_and_test(&wc
->wait
->refcnt
)) {
419 dnet_wait_destroy(wc
->wait
);
425 static int dnet_write_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
428 struct dnet_write_completion
*wc
= priv
;
429 struct dnet_wait
*w
= wc
->wait
;
431 if (is_trans_destroyed(st
, cmd
)) {
432 dnet_wakeup(w
, w
->cond
++);
433 dnet_write_complete_free(wc
);
438 if (!err
&& st
&& (cmd
->size
> sizeof(struct dnet_addr_attr
) + sizeof(struct dnet_file_info
))) {
439 int old_size
= wc
->size
;
442 wc
->size
+= cmd
->size
+ sizeof(struct dnet_cmd
) + sizeof(struct dnet_addr
);
443 wc
->reply
= realloc(wc
->reply
, wc
->size
);
449 data
= wc
->reply
+ old_size
;
451 memcpy(data
, &st
->addr
, sizeof(struct dnet_addr
));
452 memcpy(data
+ sizeof(struct dnet_addr
), cmd
, sizeof(struct dnet_cmd
));
453 memcpy(data
+ sizeof(struct dnet_addr
) + sizeof(struct dnet_cmd
), cmd
+ 1, cmd
->size
);
457 pthread_mutex_lock(&w
->wait_lock
);
460 pthread_mutex_unlock(&w
->wait_lock
);
465 static struct dnet_trans
*dnet_io_trans_create(struct dnet_node
*n
, struct dnet_io_control
*ctl
, int *errp
)
467 struct dnet_io_req req
;
468 struct dnet_trans
*t
= NULL
;
469 struct dnet_io_attr
*io
;
470 struct dnet_cmd
*cmd
;
471 uint64_t size
= ctl
->io
.size
;
472 uint64_t tsize
= sizeof(struct dnet_io_attr
) + sizeof(struct dnet_cmd
);
475 if (ctl
->cmd
== DNET_CMD_READ
)
478 if (ctl
->fd
< 0 && size
< DNET_COPY_IO_SIZE
)
481 t
= dnet_trans_alloc(n
, tsize
);
484 goto err_out_complete
;
486 t
->complete
= ctl
->complete
;
489 cmd
= (struct dnet_cmd
*)(t
+ 1);
490 io
= (struct dnet_io_attr
*)(cmd
+ 1);
492 if (ctl
->fd
< 0 && size
< DNET_COPY_IO_SIZE
) {
495 memcpy(data
, ctl
->data
, size
);
499 memcpy(&cmd
->id
, &ctl
->id
, sizeof(struct dnet_id
));
500 cmd
->size
= sizeof(struct dnet_io_attr
) + size
;
501 cmd
->flags
= ctl
->cflags
;
504 cmd
->cmd
= t
->command
= ctl
->cmd
;
506 memcpy(io
, &ctl
->io
, sizeof(struct dnet_io_attr
));
507 memcpy(&t
->cmd
, cmd
, sizeof(struct dnet_cmd
));
509 t
->st
= dnet_state_get_first(n
, &cmd
->id
);
512 goto err_out_destroy
;
515 cmd
->trans
= t
->rcv_trans
= t
->trans
= atomic_inc(&n
->trans
);
517 dnet_log(n
, DNET_LOG_INFO
, "%s: created trans: %llu, cmd: %s, cflags: %llx, size: %llu, offset: %llu, "
518 "fd: %d, local_offset: %llu -> %s weight: %f, mrt: %ld.\n",
519 dnet_dump_id(&ctl
->id
),
520 (unsigned long long)t
->trans
,
521 dnet_cmd_string(ctl
->cmd
), (unsigned long long)cmd
->flags
,
522 (unsigned long long)ctl
->io
.size
, (unsigned long long)ctl
->io
.offset
,
524 (unsigned long long)ctl
->local_offset
,
525 dnet_server_convert_dnet_addr(&t
->st
->addr
), t
->st
->weight
, t
->st
->median_read_time
);
527 dnet_convert_cmd(cmd
);
528 dnet_convert_io_attr(io
);
531 memset(&req
, 0, sizeof(req
));
539 req
.local_offset
= ctl
->local_offset
;
541 } else if (size
>= DNET_COPY_IO_SIZE
) {
542 req
.data
= (void *)ctl
->data
;
546 err
= dnet_trans_send(t
, &req
);
548 goto err_out_destroy
;
554 ctl
->complete(NULL
, NULL
, ctl
->priv
);
564 int dnet_trans_create_send_all(struct dnet_session
*s
, struct dnet_io_control
*ctl
)
566 struct dnet_node
*n
= s
->node
;
569 for (i
=0; i
<s
->group_num
; ++i
) {
570 ctl
->id
.group_id
= s
->groups
[i
];
572 dnet_io_trans_create(n
, ctl
, &err
);
577 dnet_io_trans_create(n
, ctl
, &err
);
584 int dnet_write_object(struct dnet_session
*s
, struct dnet_io_control
*ctl
)
586 return dnet_trans_create_send_all(s
, ctl
);
589 static int dnet_write_file_id_raw(struct dnet_session
*s
, const char *file
, struct dnet_id
*id
,
590 uint64_t local_offset
, uint64_t remote_offset
, uint64_t size
,
591 uint64_t cflags
, unsigned int ioflags
)
593 struct dnet_node
*n
= s
->node
;
594 int fd
, err
, trans_num
;
597 struct dnet_io_control ctl
;
598 struct dnet_write_completion
*wc
;
600 wc
= malloc(sizeof(struct dnet_write_completion
));
605 memset(wc
, 0, sizeof(struct dnet_write_completion
));
607 w
= dnet_wait_alloc(0);
611 dnet_log(n
, DNET_LOG_ERROR
, "Failed to allocate read waiting structure.\n");
617 fd
= open(file
, O_RDONLY
| O_LARGEFILE
| O_CLOEXEC
);
620 dnet_log_err(n
, "Failed to open to be written file '%s'", file
);
624 err
= fstat(fd
, &stat
);
627 dnet_log_err(n
, "Failed to stat to be written file '%s'", file
);
631 if (local_offset
>= (uint64_t)stat
.st_size
) {
636 if (!size
|| size
+ local_offset
>= (uint64_t)stat
.st_size
)
637 size
= stat
.st_size
- local_offset
;
639 memset(&ctl
, 0, sizeof(struct dnet_io_control
));
641 atomic_set(&w
->refcnt
, INT_MAX
);
645 ctl
.local_offset
= local_offset
;
648 ctl
.complete
= dnet_write_complete
;
651 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
652 ctl
.cmd
= DNET_CMD_WRITE
;
654 memcpy(ctl
.io
.id
, id
->id
, DNET_ID_SIZE
);
655 memcpy(ctl
.io
.parent
, id
->id
, DNET_ID_SIZE
);
657 ctl
.io
.flags
= ioflags
;
659 ctl
.io
.offset
= remote_offset
;
660 ctl
.io
.type
= id
->type
;
662 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
664 trans_num
= dnet_write_object(s
, &ctl
);
669 * 1 - the first reference counter we grabbed at allocation time
671 atomic_sub(&w
->refcnt
, INT_MAX
- trans_num
- 1);
673 err
= dnet_wait_event(w
, w
->cond
== trans_num
, &n
->wait_ts
);
674 if (err
|| w
->status
) {
679 if (!err
&& !trans_num
)
683 dnet_log(n
, DNET_LOG_ERROR
, "Failed to write file '%s' into the storage, transactions: %d, err: %d.\n", file
, trans_num
, err
);
687 dnet_log(n
, DNET_LOG_NOTICE
, "Successfully wrote file: '%s' into the storage, size: %llu.\n",
688 file
, (unsigned long long)size
);
691 dnet_write_complete_free(wc
);
698 dnet_write_complete_free(wc
);
703 int dnet_write_file_id(struct dnet_session
*s
, const char *file
, struct dnet_id
*id
, uint64_t local_offset
,
704 uint64_t remote_offset
, uint64_t size
, uint64_t cflags
, unsigned int ioflags
)
706 int err
= dnet_write_file_id_raw(s
, file
, id
, local_offset
, remote_offset
, size
, cflags
, ioflags
);
707 if (!err
&& !(ioflags
& DNET_IO_FLAGS_CACHE_ONLY
))
708 err
= dnet_create_write_metadata_strings(s
, NULL
, 0, id
, NULL
, cflags
);
713 int dnet_write_file(struct dnet_session
*s
, const char *file
, const void *remote
, int remote_len
,
714 uint64_t local_offset
, uint64_t remote_offset
, uint64_t size
,
715 uint64_t cflags
, unsigned int ioflags
, int type
)
720 dnet_transform(s
->node
, remote
, remote_len
, &id
);
723 err
= dnet_write_file_id_raw(s
, file
, &id
, local_offset
, remote_offset
, size
, cflags
, ioflags
);
724 if (!err
&& !(ioflags
& DNET_IO_FLAGS_CACHE_ONLY
))
725 err
= dnet_create_write_metadata_strings(s
, remote
, remote_len
, &id
, NULL
, cflags
);
730 static int dnet_read_file_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
734 struct dnet_io_completion
*c
= priv
;
735 struct dnet_io_attr
*io
;
738 if (is_trans_destroyed(st
, cmd
)) {
741 if (cmd
&& cmd
->status
)
744 dnet_wakeup(c
->wait
, c
->wait
->cond
= err
);
745 dnet_wait_put(c
->wait
);
754 if (cmd
->status
!= 0 || cmd
->size
== 0) {
756 goto err_out_exit_no_log
;
759 if (cmd
->size
<= sizeof(struct dnet_io_attr
)) {
760 dnet_log(n
, DNET_LOG_ERROR
, "%s: read completion error: wrong size: cmd_size: %llu, must be more than %zu.\n",
761 dnet_dump_id(&cmd
->id
), (unsigned long long)cmd
->size
,
762 sizeof(struct dnet_io_attr
));
764 goto err_out_exit_no_log
;
767 io
= (struct dnet_io_attr
*)(cmd
+ 1);
770 dnet_convert_io_attr(io
);
772 fd
= open(c
->file
, O_RDWR
| O_CREAT
| O_CLOEXEC
, 0644);
775 dnet_log_err(n
, "%s: failed to open read completion file '%s'", dnet_dump_id(&cmd
->id
), c
->file
);
779 err
= pwrite(fd
, data
, io
->size
, c
->offset
);
782 dnet_log_err(n
, "%s: failed to write data into completion file '%s'", dnet_dump_id(&cmd
->id
), c
->file
);
787 dnet_log(n
, DNET_LOG_NOTICE
, "%s: read completed: file: '%s', offset: %llu, size: %llu, status: %d.\n",
788 dnet_dump_id(&cmd
->id
), c
->file
, (unsigned long long)c
->offset
,
789 (unsigned long long)io
->size
, cmd
->status
);
796 dnet_log(n
, DNET_LOG_ERROR
, "%s: read completed: file: '%s', offset: %llu, size: %llu, status: %d, err: %d.\n",
797 dnet_dump_id(&cmd
->id
), c
->file
, (unsigned long long)io
->offset
,
798 (unsigned long long)io
->size
, cmd
->status
, err
);
800 dnet_wakeup(c
->wait
, c
->wait
->cond
= err
? err
: 1);
804 int dnet_read_object(struct dnet_session
*s
, struct dnet_io_control
*ctl
)
808 if (!dnet_io_trans_create(s
->node
, ctl
, &err
))
814 static int dnet_read_file_raw_exec(struct dnet_session
*s
, const char *file
, unsigned int len
,
815 uint64_t write_offset
, uint64_t io_offset
, uint64_t io_size
,
816 struct dnet_id
*id
, struct dnet_wait
*w
)
818 struct dnet_node
*n
= s
->node
;
819 struct dnet_io_control ctl
;
820 struct dnet_io_completion
*c
;
821 int err
, wait_init
= ~0;
823 memset(&ctl
, 0, sizeof(struct dnet_io_control
));
825 ctl
.io
.size
= io_size
;
826 ctl
.io
.offset
= io_offset
;
828 ctl
.io
.type
= id
->type
;
830 memcpy(ctl
.io
.parent
, id
->id
, DNET_ID_SIZE
);
831 memcpy(ctl
.io
.id
, id
->id
, DNET_ID_SIZE
);
833 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
836 ctl
.complete
= dnet_read_file_complete
;
837 ctl
.cmd
= DNET_CMD_READ
;
838 ctl
.cflags
= DNET_FLAGS_NEED_ACK
;
840 c
= malloc(sizeof(struct dnet_io_completion
) + len
+ 1 + sizeof(DNET_HISTORY_SUFFIX
));
842 dnet_log(n
, DNET_LOG_ERROR
, "%s: failed to allocate IO completion structure "
843 "for '%s' file reading.\n",
844 dnet_dump_id(&ctl
.id
), file
);
849 memset(c
, 0, sizeof(struct dnet_io_completion
) + len
+ 1 + sizeof(DNET_HISTORY_SUFFIX
));
851 c
->wait
= dnet_wait_get(w
);
852 c
->offset
= write_offset
;
853 c
->file
= (char *)(c
+ 1);
855 sprintf(c
->file
, "%s", file
);
860 err
= dnet_read_object(s
, &ctl
);
864 err
= dnet_wait_event(w
, w
->cond
!= wait_init
, &n
->wait_ts
);
865 if ((err
< 0) || (w
->cond
< 0)) {
866 char id_str
[2*DNET_ID_SIZE
+ 1];
869 dnet_log(n
, DNET_LOG_ERROR
, "%d:%s '%s' : failed to read data: %d\n",
870 ctl
.id
.group_id
, dnet_dump_id_len_raw(ctl
.id
.id
, DNET_ID_SIZE
, id_str
),
881 static int dnet_read_file_raw(struct dnet_session
*s
, const char *file
, struct dnet_id
*id
, uint64_t offset
, uint64_t size
)
883 struct dnet_node
*n
= s
->node
;
884 int err
= -ENOENT
, len
= strlen(file
), i
;
888 w
= dnet_wait_alloc(~0);
891 dnet_log(n
, DNET_LOG_ERROR
, "Failed to allocate read waiting.\n");
898 num
= dnet_mix_states(s
, id
, &g
);
904 for (i
=0; i
<num
; ++i
) {
907 err
= dnet_read_file_raw_exec(s
, file
, len
, 0, offset
, size
, id
, w
);
921 int dnet_read_file_id(struct dnet_session
*s
, const char *file
, struct dnet_id
*id
, uint64_t offset
, uint64_t size
)
923 return dnet_read_file_raw(s
, file
, id
, offset
, size
);
926 int dnet_read_file(struct dnet_session
*s
, const char *file
, const void *remote
, int remote_size
,
927 uint64_t offset
, uint64_t size
, int type
)
931 dnet_transform(s
->node
, remote
, remote_size
, &id
);
934 return dnet_read_file_raw(s
, file
, &id
, offset
, size
);
937 struct dnet_wait
*dnet_wait_alloc(int cond
)
942 w
= malloc(sizeof(struct dnet_wait
));
948 memset(w
, 0, sizeof(struct dnet_wait
));
950 err
= pthread_cond_init(&w
->wait
, NULL
);
954 err
= pthread_mutex_init(&w
->wait_lock
, NULL
);
956 goto err_out_destroy
;
959 atomic_init(&w
->refcnt
, 1);
964 pthread_mutex_destroy(&w
->wait_lock
);
969 void dnet_wait_destroy(struct dnet_wait
*w
)
971 pthread_mutex_destroy(&w
->wait_lock
);
972 pthread_cond_destroy(&w
->wait
);
977 static int dnet_send_cmd_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
979 struct dnet_wait
*w
= priv
;
981 if (is_trans_destroyed(st
, cmd
)) {
982 dnet_wakeup(w
, w
->cond
++);
987 w
->status
= cmd
->status
;
991 void *data
= cmd
+ 1;
993 w
->ret
= realloc(w
->ret
, w
->size
+ cmd
->size
);
998 memcpy(w
->ret
+ w
->size
, data
, cmd
->size
);
999 w
->size
+= cmd
->size
;
1006 static int dnet_send_cmd_single(struct dnet_net_state
*st
, struct dnet_wait
*w
, struct sph
*e
, uint64_t cflags
)
1008 struct dnet_trans_control ctl
;
1010 memset(&ctl
, 0, sizeof(struct dnet_trans_control
));
1012 dnet_setup_id(&ctl
.id
, st
->idc
->group
->group_id
, st
->idc
->ids
[0].raw
.id
);
1013 ctl
.size
= sizeof(struct sph
) + e
->event_size
+ e
->data_size
+ e
->binary_size
;
1014 ctl
.cmd
= DNET_CMD_EXEC
;
1015 ctl
.complete
= dnet_send_cmd_complete
;
1017 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
1019 dnet_convert_sph(e
);
1023 return dnet_trans_alloc_send_state(st
, &ctl
);
1026 static int dnet_send_cmd_raw(struct dnet_session
*s
, struct dnet_id
*id
,
1027 struct sph
*e
, void **ret
, uint64_t cflags
)
1029 struct dnet_node
*n
= s
->node
;
1030 struct dnet_net_state
*st
;
1031 int err
= -ENOENT
, num
= 0;
1032 struct dnet_wait
*w
;
1033 struct dnet_group
*g
;
1035 w
= dnet_wait_alloc(0);
1041 if (id
&& id
->group_id
!= 0) {
1043 st
= dnet_state_get_first(n
, id
);
1046 err
= dnet_send_cmd_single(st
, w
, e
, cflags
);
1049 } else if (id
&& id
->group_id
== 0) {
1050 pthread_mutex_lock(&n
->state_lock
);
1051 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
1054 id
->group_id
= g
->group_id
;
1056 st
= dnet_state_search_nolock(n
, id
);
1059 err
= dnet_send_cmd_single(st
, w
, e
, cflags
);
1065 pthread_mutex_unlock(&n
->state_lock
);
1067 pthread_mutex_lock(&n
->state_lock
);
1068 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
1069 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
1075 err
= dnet_send_cmd_single(st
, w
, e
, cflags
);
1079 pthread_mutex_unlock(&n
->state_lock
);
1082 err
= dnet_wait_event(w
, w
->cond
== num
, &n
->wait_ts
);
1103 int dnet_send_cmd(struct dnet_session
*s
, struct dnet_id
*id
, struct sph
*e
, void **ret
)
1105 return dnet_send_cmd_raw(s
, id
, e
, ret
, 0);
1108 int dnet_send_cmd_nolock(struct dnet_session
*s
, struct dnet_id
*id
, struct sph
*e
, void **ret
)
1110 return dnet_send_cmd_raw(s
, id
, e
, ret
, DNET_FLAGS_NOLOCK
);
1113 int dnet_try_reconnect(struct dnet_node
*n
)
1115 struct dnet_addr_storage
*ast
, *tmp
;
1116 struct dnet_net_state
*st
;
1120 if (list_empty(&n
->reconnect_list
))
1123 pthread_mutex_lock(&n
->reconnect_lock
);
1124 list_for_each_entry_safe(ast
, tmp
, &n
->reconnect_list
, reconnect_entry
) {
1125 list_move(&ast
->reconnect_entry
, &list
);
1127 pthread_mutex_unlock(&n
->reconnect_lock
);
1129 list_for_each_entry_safe(ast
, tmp
, &list
, reconnect_entry
) {
1130 s
= dnet_socket_create_addr(n
, n
->sock_type
, n
->proto
, n
->family
,
1131 (struct sockaddr
*)ast
->addr
.addr
, ast
->addr
.addr_len
, 0);
1135 join
= DNET_WANT_RECONNECT
;
1136 if (ast
->__join_state
== DNET_JOIN
)
1139 st
= dnet_add_state_socket(n
, &ast
->addr
, s
, &err
, join
);
1145 if (err
== -EEXIST
|| err
== -EINVAL
)
1149 dnet_add_reconnect_state(n
, &ast
->addr
, ast
->__join_state
);
1151 list_del(&ast
->reconnect_entry
);
1158 int dnet_lookup_object(struct dnet_session
*s
, struct dnet_id
*id
, uint64_t cflags
,
1159 int (* complete
)(struct dnet_net_state
*, struct dnet_cmd
*, void *),
1162 struct dnet_node
*n
= s
->node
;
1163 struct dnet_io_req req
;
1164 struct dnet_trans
*t
;
1165 struct dnet_cmd
*cmd
;
1168 t
= dnet_trans_alloc(n
, sizeof(struct dnet_cmd
));
1171 goto err_out_complete
;
1173 t
->complete
= complete
;
1176 cmd
= (struct dnet_cmd
*)(t
+ 1);
1178 memcpy(&cmd
->id
, id
, sizeof(struct dnet_id
));
1180 memcpy(&t
->cmd
, cmd
, sizeof(struct dnet_cmd
));
1182 cmd
->cmd
= t
->command
= DNET_CMD_LOOKUP
;
1183 cmd
->flags
= cflags
| DNET_FLAGS_NEED_ACK
;
1185 t
->st
= dnet_state_get_first(n
, &cmd
->id
);
1188 goto err_out_destroy
;
1191 cmd
->trans
= t
->rcv_trans
= t
->trans
= atomic_inc(&n
->trans
);
1192 dnet_convert_cmd(cmd
);
1194 dnet_log(n
, DNET_LOG_NOTICE
, "%s: lookup to %s.\n", dnet_dump_id(id
), dnet_server_convert_dnet_addr(&t
->st
->addr
));
1196 memset(&req
, 0, sizeof(req
));
1199 req
.hsize
= sizeof(struct dnet_cmd
);
1201 err
= dnet_trans_send(t
, &req
);
1203 goto err_out_destroy
;
1209 complete(NULL
, NULL
, priv
);
1217 int dnet_lookup_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
1219 struct dnet_wait
*w
= priv
;
1220 struct dnet_node
*n
= NULL
;
1221 struct dnet_addr_attr
*a
;
1222 struct dnet_net_state
*other
;
1223 char addr_str
[128] = "no-address";
1226 if (is_trans_destroyed(st
, cmd
)) {
1227 dnet_wakeup(w
, w
->cond
++);
1234 if (err
|| !cmd
->size
)
1237 if (cmd
->size
< sizeof(struct dnet_addr_attr
)) {
1238 dnet_log(st
->n
, DNET_LOG_ERROR
, "%s: wrong dnet_addr attribute size %llu, must be at least %zu.\n",
1239 dnet_dump_id(&cmd
->id
), (unsigned long long)cmd
->size
, sizeof(struct dnet_addr_attr
));
1244 a
= (struct dnet_addr_attr
*)(cmd
+ 1);
1246 dnet_convert_addr_attr(a
);
1247 dnet_server_convert_dnet_addr_raw(&a
->addr
, addr_str
, sizeof(addr_str
));
1249 if (cmd
->size
> sizeof(struct dnet_addr_attr
) + sizeof(struct dnet_file_info
)) {
1250 struct dnet_file_info
*info
= (struct dnet_file_info
*)(a
+ 1);
1252 dnet_convert_file_info(info
);
1254 dnet_log_raw(n
, DNET_LOG_NOTICE
, "%s: lookup object: %s: "
1255 "offset: %llu, size: %llu, mode: %llo, path: %s\n",
1256 dnet_dump_id(&cmd
->id
), addr_str
,
1257 (unsigned long long)info
->offset
, (unsigned long long)info
->size
,
1258 (unsigned long long)info
->mode
, (char *)(info
+ 1));
1260 dnet_log_raw(n
, DNET_LOG_INFO
, "%s: lookup object: %s\n",
1261 dnet_dump_id(&cmd
->id
), addr_str
);
1265 other
= dnet_state_search_by_addr(n
, &a
->addr
);
1267 dnet_state_put(other
);
1269 dnet_recv_route_list(st
);
1276 dnet_log(n
, DNET_LOG_ERROR
, "%s: lookup completion status: %d, err: %d.\n", dnet_dump_id(&cmd
->id
), cmd
->status
, err
);
1281 int dnet_lookup(struct dnet_session
*s
, const char *file
)
1283 struct dnet_node
*n
= s
->node
;
1284 int err
, error
= 0, i
;
1285 struct dnet_wait
*w
;
1288 w
= dnet_wait_alloc(0);
1294 dnet_transform(n
, file
, strlen(file
), &raw
);
1296 for (i
=0; i
<s
->group_num
; ++i
) {
1297 raw
.group_id
= s
->groups
[i
];
1299 err
= dnet_lookup_object(s
, &raw
, 0, dnet_lookup_complete
, dnet_wait_get(w
));
1305 err
= dnet_wait_event(w
, w
->cond
== 1, &n
->wait_ts
);
1306 if (err
|| w
->status
) {
1324 struct dnet_addr
*dnet_state_addr(struct dnet_net_state
*st
)
1329 static int dnet_stat_complete(struct dnet_net_state
*state
, struct dnet_cmd
*cmd
, void *priv
)
1331 struct dnet_wait
*w
= priv
;
1333 struct dnet_stat
*st
;
1336 if (is_trans_destroyed(state
, cmd
)) {
1337 dnet_wakeup(w
, w
->cond
++);
1342 if (cmd
->cmd
== DNET_CMD_STAT
&& cmd
->size
== sizeof(struct dnet_stat
)) {
1343 st
= (struct dnet_stat
*)(cmd
+ 1);
1345 dnet_convert_stat(st
);
1347 la
[0] = (float)st
->la
[0] / 100.0;
1348 la
[1] = (float)st
->la
[1] / 100.0;
1349 la
[2] = (float)st
->la
[2] / 100.0;
1351 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: la: %.2f %.2f %.2f.\n",
1352 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
),
1353 la
[0], la
[1], la
[2]);
1354 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: mem: "
1355 "total: %llu kB, free: %llu kB, cache: %llu kB.\n",
1356 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
),
1357 (unsigned long long)st
->vm_total
,
1358 (unsigned long long)st
->vm_free
,
1359 (unsigned long long)st
->vm_cached
);
1360 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: fs: "
1361 "total: %llu mB, avail: %llu mB, files: %llu, fsid: %llx.\n",
1362 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
),
1363 (unsigned long long)(st
->frsize
* st
->blocks
/ 1024 / 1024),
1364 (unsigned long long)(st
->bavail
* st
->bsize
/ 1024 / 1024),
1365 (unsigned long long)st
->files
, (unsigned long long)st
->fsid
);
1367 } else if (cmd
->size
>= sizeof(struct dnet_addr_stat
) && cmd
->cmd
== DNET_CMD_STAT_COUNT
) {
1368 struct dnet_addr_stat
*as
= (struct dnet_addr_stat
*)(cmd
+ 1);
1371 dnet_convert_addr_stat(as
, 0);
1373 for (i
=0; i
<as
->num
; ++i
) {
1374 if (as
->num
> as
->cmd_num
) {
1376 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: Storage commands\n",
1377 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
));
1378 if (i
== as
->cmd_num
)
1379 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: Proxy commands\n",
1380 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
));
1381 if (i
== as
->cmd_num
* 2)
1382 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: Counters\n",
1383 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
));
1385 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: cmd: %s, count: %llu, err: %llu\n",
1386 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
),
1387 dnet_counter_string(i
, as
->cmd_num
),
1388 (unsigned long long)as
->count
[i
].count
, (unsigned long long)as
->count
[i
].err
);
1395 static int dnet_request_cmd_single(struct dnet_session
*s
, struct dnet_net_state
*st
, struct dnet_trans_control
*ctl
)
1398 return dnet_trans_alloc_send_state(st
, ctl
);
1400 return dnet_trans_alloc_send(s
, ctl
);
1403 int dnet_request_stat(struct dnet_session
*s
, struct dnet_id
*id
,
1404 unsigned int cmd
, uint64_t cflags
,
1405 int (* complete
)(struct dnet_net_state
*state
,
1406 struct dnet_cmd
*cmd
,
1410 struct dnet_node
*n
= s
->node
;
1411 struct dnet_trans_control ctl
;
1412 struct dnet_wait
*w
= NULL
;
1414 struct timeval start
, end
;
1417 gettimeofday(&start
, NULL
);
1420 w
= dnet_wait_alloc(0);
1426 complete
= dnet_stat_complete
;
1430 memset(&ctl
, 0, sizeof(struct dnet_trans_control
));
1433 ctl
.complete
= complete
;
1435 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| DNET_FLAGS_NOLOCK
| cflags
;
1441 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
1443 err
= dnet_request_cmd_single(s
, NULL
, &ctl
);
1446 struct dnet_net_state
*st
;
1447 struct dnet_group
*g
;
1450 pthread_mutex_lock(&n
->state_lock
);
1451 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
1452 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
1459 dnet_setup_id(&ctl
.id
, st
->idc
->group
->group_id
, st
->idc
->ids
[0].raw
.id
);
1460 dnet_request_cmd_single(s
, st
, &ctl
);
1464 pthread_mutex_unlock(&n
->state_lock
);
1468 gettimeofday(&end
, NULL
);
1469 diff
= (end
.tv_sec
- start
.tv_sec
) * 1000000 + end
.tv_usec
- start
.tv_usec
;
1470 dnet_log(n
, DNET_LOG_NOTICE
, "stat cmd: %s: %ld usecs, num: %d.\n", dnet_cmd_string(cmd
), diff
, num
);
1475 err
= dnet_wait_event(w
, w
->cond
== num
, &n
->wait_ts
);
1477 gettimeofday(&end
, NULL
);
1478 diff
= (end
.tv_sec
- start
.tv_sec
) * 1000000 + end
.tv_usec
- start
.tv_usec
;
1479 dnet_log(n
, DNET_LOG_NOTICE
, "stat cmd: %s: %ld usecs, wait_error: %d, num: %d.\n", dnet_cmd_string(cmd
), diff
, err
, num
);
1494 struct dnet_request_cmd_priv
{
1495 struct dnet_wait
*w
;
1497 int (* complete
)(struct dnet_net_state
*state
, struct dnet_cmd
*cmd
, void *priv
);
1501 static int dnet_request_cmd_complete(struct dnet_net_state
*state
, struct dnet_cmd
*cmd
, void *priv
)
1503 struct dnet_request_cmd_priv
*p
= priv
;
1504 int err
= p
->complete(state
, cmd
, p
->priv
);
1506 if (is_trans_destroyed(state
, cmd
)) {
1507 struct dnet_wait
*w
= p
->w
;
1509 dnet_wakeup(w
, w
->cond
++);
1510 if (atomic_read(&w
->refcnt
) == 1)
1518 int dnet_request_cmd(struct dnet_session
*s
, struct dnet_trans_control
*ctl
)
1520 struct dnet_node
*n
= s
->node
;
1522 struct dnet_request_cmd_priv
*p
;
1523 struct dnet_wait
*w
;
1524 struct dnet_net_state
*st
;
1525 struct dnet_group
*g
;
1526 struct timeval start
, end
;
1529 gettimeofday(&start
, NULL
);
1531 p
= malloc(sizeof(*p
));
1537 w
= dnet_wait_alloc(0);
1544 p
->complete
= ctl
->complete
;
1545 p
->priv
= ctl
->priv
;
1547 ctl
->complete
= dnet_request_cmd_complete
;
1550 pthread_mutex_lock(&n
->state_lock
);
1551 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
1552 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
1558 ctl
->id
.group_id
= g
->group_id
;
1560 if (!(ctl
->cflags
& DNET_FLAGS_DIRECT
))
1561 dnet_setup_id(&ctl
->id
, st
->idc
->group
->group_id
, st
->idc
->ids
[0].raw
.id
);
1562 dnet_request_cmd_single(s
, st
, ctl
);
1566 pthread_mutex_unlock(&n
->state_lock
);
1568 err
= dnet_wait_event(w
, w
->cond
== num
, &n
->wait_ts
);
1570 gettimeofday(&end
, NULL
);
1571 diff
= (end
.tv_sec
- start
.tv_sec
) * 1000000 + end
.tv_usec
- start
.tv_usec
;
1572 dnet_log(n
, DNET_LOG_NOTICE
, "request cmd: %s: %ld usecs, wait_error: %d, num: %d.\n", dnet_cmd_string(ctl
->cmd
), diff
, err
, num
);
1577 if (atomic_read(&w
->refcnt
) == 1)
1589 struct dnet_update_status_priv
{
1590 struct dnet_wait
*w
;
1591 struct dnet_node_status status
;
1595 static int dnet_update_status_complete(struct dnet_net_state
*state
, struct dnet_cmd
*cmd
, void *priv
)
1597 struct dnet_update_status_priv
*p
= priv
;
1599 if (is_trans_destroyed(state
, cmd
)) {
1600 dnet_wakeup(p
->w
, p
->w
->cond
++);
1601 dnet_wait_put(p
->w
);
1602 if (atomic_dec_and_test(&p
->refcnt
))
1606 if (cmd
->size
== sizeof(struct dnet_node_status
)) {
1607 memcpy(&p
->status
, cmd
+ 1, sizeof(struct dnet_node_status
));
1614 int dnet_update_status(struct dnet_session
*s
, struct dnet_addr
*addr
, struct dnet_id
*id
, struct dnet_node_status
*status
)
1617 struct dnet_update_status_priv
*priv
;
1618 struct dnet_trans_control ctl
;
1625 memset(&ctl
, 0, sizeof(ctl
));
1628 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
1630 struct dnet_net_state
*st
;
1632 st
= dnet_state_search_by_addr(s
->node
, addr
);
1638 dnet_setup_id(&ctl
.id
, st
->idc
->group
->group_id
, st
->idc
->ids
[0].raw
.id
);
1642 priv
= malloc(sizeof(struct dnet_update_status_priv
));
1648 priv
->w
= dnet_wait_alloc(0);
1654 ctl
.complete
= dnet_update_status_complete
;
1656 ctl
.cmd
= DNET_CMD_STATUS
;
1657 ctl
.cflags
= DNET_FLAGS_NEED_ACK
;
1658 ctl
.size
= sizeof(struct dnet_node_status
);
1661 dnet_wait_get(priv
->w
);
1662 dnet_request_cmd_single(s
, NULL
, &ctl
);
1664 err
= dnet_wait_event(priv
->w
, priv
->w
->cond
== 1, &s
->node
->wait_ts
);
1665 dnet_wait_put(priv
->w
);
1667 memcpy(status
, &priv
->status
, sizeof(struct dnet_node_status
));
1669 if (atomic_dec_and_test(&priv
->refcnt
))
1676 static int dnet_remove_object_raw(struct dnet_session
*s
, struct dnet_id
*id
,
1677 int (* complete
)(struct dnet_net_state
*state
,
1678 struct dnet_cmd
*cmd
,
1680 void *priv
, uint64_t cflags
, uint64_t ioflags
)
1682 struct dnet_io_control ctl
;
1685 memset(&ctl
, 0, sizeof(struct dnet_io_control
));
1687 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
1689 memcpy(&ctl
.io
.id
, id
->id
, DNET_ID_SIZE
);
1690 memcpy(&ctl
.io
.parent
, id
->id
, DNET_ID_SIZE
);
1691 ctl
.io
.flags
= ioflags
;
1695 ctl
.cmd
= DNET_CMD_DEL
;
1696 ctl
.complete
= complete
;
1698 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
1700 err
= dnet_trans_create_send_all(s
, &ctl
);
1709 static int dnet_remove_complete(struct dnet_net_state
*state
,
1710 struct dnet_cmd
*cmd
,
1713 struct dnet_wait
*w
= priv
;
1715 if (is_trans_destroyed(state
, cmd
)) {
1716 dnet_wakeup(w
, w
->cond
++);
1722 w
->status
= cmd
->status
;
1726 int dnet_remove_object(struct dnet_session
*s
, struct dnet_id
*id
,
1727 int (* complete
)(struct dnet_net_state
*state
,
1728 struct dnet_cmd
*cmd
,
1731 uint64_t cflags
, uint64_t ioflags
)
1733 struct dnet_wait
*w
= NULL
;
1737 w
= dnet_wait_alloc(0);
1743 complete
= dnet_remove_complete
;
1748 err
= dnet_remove_object_raw(s
, id
, complete
, priv
, cflags
, ioflags
);
1753 err
= dnet_wait_event(w
, w
->cond
!= err
, &s
->node
->wait_ts
);
1757 if (w
->status
< 0) {
1773 static int dnet_remove_file_raw(struct dnet_session
*s
, struct dnet_id
*id
, uint64_t cflags
, uint64_t ioflags
)
1775 struct dnet_wait
*w
;
1778 w
= dnet_wait_alloc(0);
1784 atomic_add(&w
->refcnt
, 1024);
1785 err
= dnet_remove_object_raw(s
, id
, dnet_remove_complete
, w
, cflags
, ioflags
);
1787 atomic_sub(&w
->refcnt
, 1024);
1792 atomic_sub(&w
->refcnt
, 1024 - num
);
1794 err
= dnet_wait_event(w
, w
->cond
== num
, &s
->node
->wait_ts
);
1798 if (w
->status
< 0) {
1814 int dnet_remove_object_now(struct dnet_session
*s
, struct dnet_id
*id
, uint64_t cflags
, uint64_t ioflags
)
1816 return dnet_remove_file_raw(s
, id
, cflags
| DNET_FLAGS_NEED_ACK
| DNET_ATTR_DELETE_HISTORY
, ioflags
);
1819 int dnet_remove_file(struct dnet_session
*s
, char *remote
, int remote_len
, struct dnet_id
*id
, uint64_t cflags
, uint64_t ioflags
)
1824 dnet_transform(s
->node
, remote
, remote_len
, &raw
);
1829 return dnet_remove_file_raw(s
, id
, cflags
, ioflags
);
1832 int dnet_request_ids(struct dnet_session
*s
, struct dnet_id
*id
, uint64_t cflags
,
1833 int (* complete
)(struct dnet_net_state
*state
,
1834 struct dnet_cmd
*cmd
,
1838 struct dnet_trans_control ctl
;
1840 dnet_log_raw(s
->node
, DNET_LOG_ERROR
, "Temporarily unsupported operation.\n");
1843 memset(&ctl
, 0, sizeof(struct dnet_trans_control
));
1845 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
1846 ctl
.cmd
= DNET_CMD_LIST
;
1847 ctl
.complete
= complete
;
1849 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
1851 return dnet_trans_alloc_send(s
, &ctl
);
1854 struct dnet_node
*dnet_get_node_from_state(void *state
)
1856 struct dnet_net_state
*st
= state
;
1863 struct dnet_read_data_completion
{
1864 struct dnet_wait
*w
;
1870 static int dnet_read_data_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
1872 struct dnet_read_data_completion
*c
= priv
;
1873 struct dnet_wait
*w
= c
->w
;
1876 if (is_trans_destroyed(st
, cmd
)) {
1877 dnet_wakeup(w
, w
->cond
++);
1879 if (atomic_dec_and_test(&c
->refcnt
))
1888 if (cmd
->size
>= sizeof(struct dnet_io_attr
)) {
1889 struct dnet_io_attr
*io
= (struct dnet_io_attr
*)(cmd
+ 1);
1890 uint64_t sz
= c
->size
;
1892 dnet_convert_io_attr(io
);
1894 sz
+= io
->size
+ sizeof(struct dnet_io_attr
);
1895 c
->data
= realloc(c
->data
, sz
);
1901 memcpy(c
->data
+ c
->size
, io
, sizeof(struct dnet_io_attr
) + io
->size
);
1906 dnet_log(st
->n
, DNET_LOG_NOTICE
, "%s: object read completed: trans: %llu, status: %d, err: %d.\n",
1907 dnet_dump_id(&cmd
->id
), (unsigned long long)(cmd
->trans
& ~DNET_TRANS_REPLY
),
1913 void *dnet_read_data_wait_raw(struct dnet_session
*s
, struct dnet_id
*id
, struct dnet_io_attr
*io
,
1914 int cmd
, uint64_t cflags
, int *errp
)
1916 struct dnet_node
*n
= s
->node
;
1917 struct dnet_io_control ctl
;
1918 struct dnet_wait
*w
;
1919 struct dnet_read_data_completion
*c
;
1923 w
= dnet_wait_alloc(0);
1929 c
= malloc(sizeof(*c
));
1938 /* one for completion callback, another for this function */
1939 atomic_init(&c
->refcnt
, 2);
1941 memset(&ctl
, 0, sizeof(struct dnet_io_control
));
1946 ctl
.complete
= dnet_read_data_complete
;
1949 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
1951 memcpy(&ctl
.io
, io
, sizeof(struct dnet_io_attr
));
1952 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
1954 ctl
.id
.type
= io
->type
;
1957 err
= dnet_read_object(s
, &ctl
);
1959 goto err_out_put_complete
;
1961 err
= dnet_wait_event(w
, w
->cond
, &n
->wait_ts
);
1962 if (err
|| w
->status
) {
1963 char id_str
[2*DNET_ID_SIZE
+ 1];
1966 if ((cmd
!= DNET_CMD_READ_RANGE
) || (err
!= -ENOENT
))
1967 dnet_log(n
, DNET_LOG_ERROR
, "%d:%s : failed to read data: %d\n",
1968 ctl
.id
.group_id
, dnet_dump_id_len_raw(ctl
.id
.id
, DNET_ID_SIZE
, id_str
), err
);
1969 goto err_out_put_complete
;
1975 err_out_put_complete
:
1976 if (atomic_dec_and_test(&c
->refcnt
))
1985 static int dnet_read_recover(struct dnet_session
*s
, struct dnet_id
*id
, struct dnet_io_attr
*io
, void *data
, uint64_t cflags
)
1987 struct dnet_node
*n
= s
->node
;
1988 struct dnet_meta_container mc
;
1989 struct dnet_io_control ctl
;
1993 err
= dnet_read_meta(s
, &mc
, NULL
, 0, id
);
1995 dnet_log(n
, DNET_LOG_ERROR
, "%s: read-recovery: could read metadata: %d\n", dnet_dump_id(id
), err
);
1999 memset(&ctl
, 0, sizeof(struct dnet_io_control
));
2004 ctl
.data
= data
+ sizeof(struct dnet_io_attr
);
2005 ctl
.io
.size
-= sizeof(struct dnet_io_attr
);
2008 ctl
.cmd
= DNET_CMD_WRITE
;
2009 ctl
.cflags
= cflags
;
2011 err
= dnet_write_data_wait(s
, &ctl
, &result
);
2013 dnet_log(n
, DNET_LOG_ERROR
, "%s: read-recovery: could not write data: %d\n", dnet_dump_id(id
), err
);
2014 goto err_out_free_meta
;
2017 err
= dnet_write_metadata(s
, &mc
, 0, cflags
);
2019 goto err_out_free_result
;
2021 err_out_free_result
:
2029 void *dnet_read_data_wait_groups(struct dnet_session
*s
, struct dnet_id
*id
, int *groups
, int num
,
2030 struct dnet_io_attr
*io
, uint64_t cflags
, int *errp
)
2035 for (i
= 0; i
< num
; ++i
) {
2036 id
->group_id
= groups
[i
];
2038 data
= dnet_read_data_wait_raw(s
, id
, io
, DNET_CMD_READ
, cflags
, errp
);
2040 if ((i
!= 0) && (io
->type
== 0) && (io
->offset
== 0) && (io
->size
> sizeof(struct dnet_io_attr
))) {
2041 dnet_read_recover(s
, id
, io
, data
, cflags
);
2052 void *dnet_read_data_wait(struct dnet_session
*s
, struct dnet_id
*id
, struct dnet_io_attr
*io
,
2053 uint64_t cflags
, int *errp
)
2058 num
= dnet_mix_states(s
, id
, &g
);
2064 data
= dnet_read_data_wait_groups(s
, id
, g
, num
, io
, cflags
, &err
);
2075 int dnet_write_data_wait(struct dnet_session
*s
, struct dnet_io_control
*ctl
, void **result
)
2077 struct dnet_node
*n
= s
->node
;
2078 int err
, trans_num
= 0;
2079 struct dnet_wait
*w
;
2080 struct dnet_write_completion
*wc
;
2082 wc
= malloc(sizeof(struct dnet_write_completion
));
2087 memset(wc
, 0, sizeof(struct dnet_write_completion
));
2089 w
= dnet_wait_alloc(0);
2097 w
->status
= -ENOENT
;
2099 ctl
->complete
= dnet_write_complete
;
2101 ctl
->cmd
= DNET_CMD_WRITE
;
2102 ctl
->cflags
|= DNET_FLAGS_NEED_ACK
;
2104 memcpy(ctl
->io
.id
, ctl
->id
.id
, DNET_ID_SIZE
);
2106 atomic_set(&w
->refcnt
, INT_MAX
);
2107 trans_num
= dnet_write_object(s
, ctl
);
2112 * 1 - the first reference counter we grabbed at allocation time
2114 atomic_sub(&w
->refcnt
, INT_MAX
- trans_num
- 1);
2116 err
= dnet_wait_event(w
, w
->cond
== trans_num
, &n
->wait_ts
);
2117 if (err
|| w
->status
) {
2120 dnet_log(n
, DNET_LOG_NOTICE
, "%s: failed to wait for IO write completion, err: %d, status: %d.\n",
2121 dnet_dump_id(&ctl
->id
), err
, w
->status
);
2124 if (err
|| !trans_num
) {
2127 dnet_log(n
, DNET_LOG_ERROR
, "Failed to write data into the storage, err: %d, trans_num: %d.\n", err
, trans_num
);
2132 dnet_log(n
, DNET_LOG_NOTICE
, "%s: wrote: %llu bytes, type: %d, reply size: %d.\n",
2133 dnet_dump_id(&ctl
->id
), (unsigned long long)ctl
->io
.size
, ctl
->io
.type
, wc
->size
);
2136 *result
= wc
->reply
;
2142 dnet_write_complete_free(wc
);
2147 int dnet_lookup_addr(struct dnet_session
*s
, const void *remote
, int len
, struct dnet_id
*id
, int group_id
, char *dst
, int dlen
)
2149 struct dnet_node
*n
= s
->node
;
2151 struct dnet_net_state
*st
;
2155 dnet_transform(n
, remote
, len
, &raw
);
2158 id
->group_id
= group_id
;
2160 st
= dnet_state_get_first(n
, id
);
2164 dnet_server_convert_dnet_addr_raw(dnet_state_addr(st
), dst
, dlen
);
2172 struct dnet_weight
{
2177 static int dnet_weight_compare(const void *v1
, const void *v2
)
2179 const struct dnet_weight
*w1
= v1
;
2180 const struct dnet_weight
*w2
= v2
;
2182 return w2
->weight
- w1
->weight
;
2185 static int dnet_weight_get_winner(struct dnet_weight
*w
, int num
)
2191 for (i
= 0; i
< num
; ++i
)
2194 r
= (float)rand() / (float)RAND_MAX
;
2197 for (i
= 0; i
< num
; ++i
) {
2206 int dnet_mix_states(struct dnet_session
*s
, struct dnet_id
*id
, int **groupsp
)
2208 struct dnet_node
*n
= s
->node
;
2209 struct dnet_weight
*weights
;
2211 int group_num
, i
, num
;
2212 struct dnet_net_state
*st
;
2217 group_num
= s
->group_num
;
2219 weights
= alloca(s
->group_num
* sizeof(*weights
));
2220 groups
= malloc(s
->group_num
* sizeof(*groups
));
2222 memcpy(groups
, s
->groups
, s
->group_num
* sizeof(*groups
));
2229 if (n
->flags
& DNET_CFG_RANDOMIZE_STATES
) {
2230 for (i
= 0; i
< group_num
; ++i
) {
2231 weights
[i
].weight
= rand();
2232 weights
[i
].group_id
= groups
[i
];
2236 if (!(n
->flags
& DNET_CFG_MIX_STATES
)) {
2241 memset(weights
, 0, group_num
* sizeof(*weights
));
2243 for (i
= 0, num
= 0; i
< group_num
; ++i
) {
2244 id
->group_id
= groups
[i
];
2246 st
= dnet_state_get_first(n
, id
);
2248 weights
[num
].weight
= (int)st
->weight
;
2249 weights
[num
].group_id
= id
->group_id
;
2260 qsort(weights
, group_num
, sizeof(struct dnet_weight
), dnet_weight_compare
);
2262 for (i
= 0; i
< group_num
; ++i
) {
2263 int pos
= dnet_weight_get_winner(weights
, group_num
- i
);
2264 groups
[i
] = weights
[pos
].group_id
;
2266 if (pos
< group_num
- 1)
2267 memmove(&weights
[pos
], &weights
[pos
+ 1], (group_num
- 1 - pos
) * sizeof(struct dnet_weight
));
2271 dnet_session_set_groups(s
, groups
, group_num
);
2277 int dnet_data_map(struct dnet_map_fd
*map
)
2280 long page_size
= sysconf(_SC_PAGE_SIZE
);
2283 off
= map
->offset
& ~(page_size
- 1);
2284 map
->mapped_size
= ALIGN(map
->size
+ map
->offset
- off
, page_size
);
2286 map
->mapped_data
= mmap(NULL
, map
->mapped_size
, PROT_READ
, MAP_SHARED
, map
->fd
, off
);
2287 if (map
->mapped_data
== MAP_FAILED
) {
2292 map
->data
= map
->mapped_data
+ map
->offset
- off
;
2298 void dnet_data_unmap(struct dnet_map_fd
*map
)
2300 munmap(map
->mapped_data
, map
->mapped_size
);
2303 struct dnet_io_attr
*dnet_remove_range(struct dnet_session
*s
, struct dnet_io_attr
*io
, int group_id
, uint64_t cflags
, int *ret_num
, int *errp
)
2305 struct dnet_node
*n
= s
->node
;
2307 struct dnet_io_attr
*ret
, *new_ret
;
2308 struct dnet_raw_id start
, next
;
2309 struct dnet_raw_id end
;
2310 uint64_t size
= io
->size
;
2312 int err
, need_exit
= 0;
2314 memcpy(end
.id
, io
->parent
, DNET_ID_SIZE
);
2316 dnet_setup_id(&id
, group_id
, io
->id
);
2321 while (!need_exit
) {
2322 err
= dnet_search_range(n
, &id
, &start
, &next
);
2326 if ((dnet_id_cmp_str(id
.id
, next
.id
) > 0) ||
2327 !memcmp(start
.id
, next
.id
, DNET_ID_SIZE
) ||
2328 (dnet_id_cmp_str(next
.id
, end
.id
) > 0)) {
2329 memcpy(next
.id
, end
.id
, DNET_ID_SIZE
);
2333 if (n
->log
->log_level
> DNET_LOG_NOTICE
) {
2335 char start_id
[2*len
+ 1];
2336 char next_id
[2*len
+ 1];
2337 char end_id
[2*len
+ 1];
2338 char id_str
[2*len
+ 1];
2340 dnet_log(n
, DNET_LOG_NOTICE
, "id: %s, start: %s: next: %s, end: %s, size: %llu, cmp: %d\n",
2341 dnet_dump_id_len_raw(id
.id
, len
, id_str
),
2342 dnet_dump_id_len_raw(start
.id
, len
, start_id
),
2343 dnet_dump_id_len_raw(next
.id
, len
, next_id
),
2344 dnet_dump_id_len_raw(end
.id
, len
, end_id
),
2345 (unsigned long long)size
, dnet_id_cmp_str(next
.id
, end
.id
));
2348 memcpy(io
->id
, id
.id
, DNET_ID_SIZE
);
2349 memcpy(io
->parent
, next
.id
, DNET_ID_SIZE
);
2353 data
= dnet_read_data_wait_raw(s
, &id
, io
, DNET_CMD_DEL_RANGE
, cflags
, &err
);
2354 if (io
->size
!= sizeof(struct dnet_io_attr
)) {
2360 struct dnet_io_attr
*rep
= (struct dnet_io_attr
*)data
;
2362 dnet_convert_io_attr(rep
);
2364 dnet_log(n
, DNET_LOG_NOTICE
, "%s: rep_num: %llu, io_start: %llu, io_num: %llu, io_size: %llu\n",
2365 dnet_dump_id(&id
), (unsigned long long)rep
->num
, (unsigned long long)io
->start
,
2366 (unsigned long long)io
->num
, (unsigned long long)io
->size
);
2370 new_ret
= realloc(ret
, *ret_num
* sizeof(struct dnet_io_attr
));
2377 ret
[*ret_num
- 1] = *rep
;
2382 memcpy(id
.id
, next
.id
, DNET_ID_SIZE
);
2391 struct dnet_range_data
*dnet_read_range(struct dnet_session
*s
, struct dnet_io_attr
*io
, int group_id
, uint64_t cflags
, int *errp
)
2393 struct dnet_node
*n
= s
->node
;
2396 struct dnet_range_data
*ret
;
2397 struct dnet_raw_id start
, next
;
2398 struct dnet_raw_id end
;
2399 uint64_t size
= io
->size
;
2401 int err
, need_exit
= 0;
2403 memcpy(end
.id
, io
->parent
, DNET_ID_SIZE
);
2405 dnet_setup_id(&id
, group_id
, io
->id
);
2410 while (!need_exit
) {
2411 err
= dnet_search_range(n
, &id
, &start
, &next
);
2415 if ((dnet_id_cmp_str(id
.id
, next
.id
) > 0) ||
2416 !memcmp(start
.id
, next
.id
, DNET_ID_SIZE
) ||
2417 (dnet_id_cmp_str(next
.id
, end
.id
) > 0)) {
2418 memcpy(next
.id
, end
.id
, DNET_ID_SIZE
);
2422 if (n
->log
->log_level
> DNET_LOG_NOTICE
) {
2424 char start_id
[2*len
+ 1];
2425 char next_id
[2*len
+ 1];
2426 char end_id
[2*len
+ 1];
2427 char id_str
[2*len
+ 1];
2429 dnet_log(n
, DNET_LOG_NOTICE
, "id: %s, start: %s: next: %s, end: %s, size: %llu, cmp: %d\n",
2430 dnet_dump_id_len_raw(id
.id
, len
, id_str
),
2431 dnet_dump_id_len_raw(start
.id
, len
, start_id
),
2432 dnet_dump_id_len_raw(next
.id
, len
, next_id
),
2433 dnet_dump_id_len_raw(end
.id
, len
, end_id
),
2434 (unsigned long long)size
, dnet_id_cmp_str(next
.id
, end
.id
));
2437 memcpy(io
->id
, id
.id
, DNET_ID_SIZE
);
2438 memcpy(io
->parent
, next
.id
, DNET_ID_SIZE
);
2442 data
= dnet_read_data_wait_raw(s
, &id
, io
, DNET_CMD_READ_RANGE
, cflags
, &err
);
2444 struct dnet_io_attr
*rep
= data
+ io
->size
- sizeof(struct dnet_io_attr
);
2446 /* If DNET_IO_FLAGS_NODATA is set do not decrement size as 'rep' is the only structure in output */
2447 if (!(io
->flags
& DNET_IO_FLAGS_NODATA
))
2448 io
->size
-= sizeof(struct dnet_io_attr
);
2449 dnet_convert_io_attr(rep
);
2451 dnet_log(n
, DNET_LOG_NOTICE
, "%s: rep_num: %llu, io_start: %llu, io_num: %llu, io_size: %llu\n",
2452 dnet_dump_id(&id
), (unsigned long long)rep
->num
, (unsigned long long)io
->start
,
2453 (unsigned long long)io
->num
, (unsigned long long)io
->size
);
2455 if (io
->start
< rep
->num
) {
2456 rep
->num
-= io
->start
;
2458 io
->num
-= rep
->num
;
2460 if (!io
->size
&& !(io
->flags
& DNET_IO_FLAGS_NODATA
)) {
2463 struct dnet_range_data
*new_ret
;
2467 new_ret
= realloc(ret
, ret_num
* sizeof(struct dnet_range_data
));
2474 ret
[ret_num
- 1].data
= data
;
2475 ret
[ret_num
- 1].size
= io
->size
;
2482 io
->start
-= rep
->num
;
2486 memcpy(id
.id
, next
.id
, DNET_ID_SIZE
);
2498 struct dnet_read_latest_id
{
2500 struct dnet_file_info fi
;
2503 struct dnet_read_latest_ctl
{
2504 struct dnet_wait
*w
;
2506 pthread_mutex_t lock
;
2508 struct dnet_read_latest_id ids
[0];
2511 static void dnet_read_latest_ctl_put(struct dnet_read_latest_ctl
*ctl
)
2513 dnet_wakeup(ctl
->w
, ctl
->w
->cond
++);
2514 if (atomic_dec_and_test(&ctl
->w
->refcnt
)) {
2515 dnet_wait_destroy(ctl
->w
);
2516 pthread_mutex_destroy(&ctl
->lock
);
2521 static int dnet_read_latest_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
2523 struct dnet_read_latest_ctl
*ctl
= priv
;
2524 struct dnet_node
*n
;
2525 struct dnet_addr_attr
*a
;
2526 struct dnet_file_info
*fi
;
2529 if (is_trans_destroyed(st
, cmd
)) {
2530 dnet_read_latest_ctl_put(ctl
);
2537 if (err
|| !cmd
->size
)
2540 if (cmd
->size
< sizeof(struct dnet_addr_attr
) + sizeof(struct dnet_file_info
)) {
2541 dnet_log(n
, DNET_LOG_ERROR
, "%s: wrong dnet_addr attribute size %llu, must be at least %zu.\n",
2542 dnet_dump_id(&cmd
->id
), (unsigned long long)cmd
->size
,
2543 sizeof(struct dnet_addr_attr
) + sizeof(struct dnet_file_info
));
2547 a
= (struct dnet_addr_attr
*)(cmd
+ 1);
2548 fi
= (struct dnet_file_info
*)(a
+ 1);
2550 dnet_convert_addr_attr(a
);
2551 dnet_convert_file_info(fi
);
2553 pthread_mutex_lock(&ctl
->lock
);
2555 pthread_mutex_unlock(&ctl
->lock
);
2557 /* we do not care about filename */
2558 memcpy(&ctl
->ids
[pos
].fi
, fi
, sizeof(struct dnet_file_info
));
2559 memcpy(&ctl
->ids
[pos
].id
, &cmd
->id
, sizeof(struct dnet_id
));
2565 static int dnet_file_read_latest_cmp(const void *p1
, const void *p2
)
2567 const struct dnet_read_latest_id
*id1
= p1
;
2568 const struct dnet_read_latest_id
*id2
= p2
;
2570 int ret
= (int)(id2
->fi
.mtime
.tsec
- id1
->fi
.mtime
.tsec
);
2573 ret
= (int)(id2
->fi
.mtime
.tnsec
- id1
->fi
.mtime
.tnsec
);
2578 int dnet_read_latest_prepare(struct dnet_read_latest_prepare
*pr
)
2580 struct dnet_read_latest_ctl
*ctl
;
2581 int group_id
= pr
->id
.group_id
;
2584 ctl
= malloc(sizeof(struct dnet_read_latest_ctl
) + sizeof(struct dnet_read_latest_id
) * pr
->group_num
);
2589 memset(ctl
, 0, sizeof(struct dnet_read_latest_ctl
));
2591 ctl
->w
= dnet_wait_alloc(0);
2597 err
= pthread_mutex_init(&ctl
->lock
, NULL
);
2599 goto err_out_put_wait
;
2601 ctl
->num
= pr
->group_num
;
2604 for (i
= 0; i
< pr
->group_num
; ++i
) {
2605 pr
->id
.group_id
= pr
->group
[i
];
2607 dnet_wait_get(ctl
->w
);
2608 dnet_lookup_object(pr
->s
, &pr
->id
, DNET_ATTR_META_TIMES
| pr
->cflags
, dnet_read_latest_complete
, ctl
);
2611 err
= dnet_wait_event(ctl
->w
, ctl
->w
->cond
== pr
->group_num
, &pr
->s
->node
->wait_ts
);
2618 pr
->group_num
= ctl
->pos
;
2620 qsort(ctl
->ids
, pr
->group_num
, sizeof(struct dnet_read_latest_id
), dnet_file_read_latest_cmp
);
2622 for (i
= 0; i
< pr
->group_num
; ++i
) {
2623 pr
->group
[i
] = ctl
->ids
[i
].id
.group_id
;
2625 if (group_id
== pr
->group
[i
]) {
2626 const struct dnet_read_latest_id
*id0
= &ctl
->ids
[0];
2627 const struct dnet_read_latest_id
*id1
= &ctl
->ids
[i
];
2629 if (!dnet_file_read_latest_cmp(id0
, id1
)) {
2630 int tmp_group
= pr
->group
[0];
2631 pr
->group
[0] = pr
->group
[i
];
2632 pr
->group
[i
] = tmp_group
;
2638 dnet_read_latest_ctl_put(ctl
);
2642 dnet_wait_put(ctl
->w
);
2649 int dnet_read_latest(struct dnet_session
*s
, struct dnet_id
*id
, struct dnet_io_attr
*io
, uint64_t cflags
, void **datap
)
2651 struct dnet_read_latest_prepare pr
;
2652 int *g
, num
, err
, i
;
2654 if ((int)io
->num
> s
->group_num
) {
2659 err
= dnet_mix_states(s
, id
, &g
);
2665 if ((int)io
->num
> num
) {
2670 memset(&pr
, 0, sizeof(struct dnet_read_latest_prepare
));
2678 err
= dnet_read_latest_prepare(&pr
);
2683 for (i
= 0; i
< pr
.group_num
; ++i
) {
2686 id
->group_id
= pr
.group
[i
];
2687 data
= dnet_read_data_wait_raw(s
, id
, io
, DNET_CMD_READ
, cflags
, &err
);
2689 if ((pr
.group_num
!= num
) || ((i
!= 0) && (io
->type
== 0) && (io
->offset
== 0))) {
2690 dnet_read_recover(s
, id
, io
, data
, cflags
);
2705 int dnet_get_routes(struct dnet_session
*s
, struct dnet_id
**ids
, struct dnet_addr
**addrs
) {
2707 struct dnet_node
*n
= s
->node
;
2708 struct dnet_net_state
*st
;
2709 struct dnet_group
*g
;
2710 struct dnet_addr
*tmp_addrs
;
2711 struct dnet_id
*tmp_ids
;
2712 int size
= 0, count
= 0;
2718 pthread_mutex_lock(&n
->state_lock
);
2719 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
2720 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
2722 size
+= st
->idc
->id_num
;
2724 tmp_ids
= (struct dnet_id
*)realloc(*ids
, size
* sizeof(struct dnet_id
));
2731 tmp_addrs
= (struct dnet_addr
*)realloc(*addrs
, size
* sizeof(struct dnet_addr
));
2738 for (i
= 0; i
< st
->idc
->id_num
; ++i
) {
2739 dnet_setup_id(&(*ids
)[count
], g
->group_id
, st
->idc
->ids
[i
].raw
.id
);
2740 memcpy(&(*addrs
)[count
], dnet_state_addr(st
), sizeof(struct dnet_addr
));
2745 pthread_mutex_unlock(&n
->state_lock
);
2759 void *dnet_bulk_read_wait_raw(struct dnet_session
*s
, struct dnet_id
*id
, struct dnet_io_attr
*ios
,
2760 uint32_t io_num
, int cmd
, uint64_t cflags
, int *errp
)
2762 struct dnet_node
*n
= s
->node
;
2763 struct dnet_io_control ctl
;
2764 struct dnet_io_attr io
;
2765 struct dnet_wait
*w
;
2766 struct dnet_read_data_completion
*c
;
2770 w
= dnet_wait_alloc(0);
2776 c
= malloc(sizeof(*c
));
2785 /* one for completion callback, another for this function */
2786 atomic_init(&c
->refcnt
, 2);
2788 memset(&ctl
, 0, sizeof(struct dnet_io_control
));
2793 ctl
.complete
= dnet_read_data_complete
;
2796 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
2798 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
2799 memset(&ctl
.io
, 0, sizeof(struct dnet_io_attr
));
2801 memcpy(io
.id
, id
->id
, DNET_ID_SIZE
);
2802 memcpy(io
.parent
, id
->id
, DNET_ID_SIZE
);
2804 ctl
.io
.size
= io_num
* sizeof(struct dnet_io_attr
);
2808 err
= dnet_read_object(s
, &ctl
);
2810 goto err_out_put_complete
;
2812 err
= dnet_wait_event(w
, w
->cond
, &n
->wait_ts
);
2813 if (err
|| w
->status
) {
2814 char id_str
[2*DNET_ID_SIZE
+ 1];
2817 if ((cmd
!= DNET_CMD_READ_RANGE
) || (err
!= -ENOENT
))
2818 dnet_log(n
, DNET_LOG_ERROR
, "%d:%s : failed to read data: %d\n",
2819 ctl
.id
.group_id
, dnet_dump_id_len_raw(ctl
.id
.id
, DNET_ID_SIZE
, id_str
), err
);
2820 goto err_out_put_complete
;
2825 err_out_put_complete
:
2826 if (atomic_dec_and_test(&c
->refcnt
))
2836 static int dnet_io_attr_cmp(const void *d1
, const void *d2
)
2838 const struct dnet_io_attr
*io1
= d1
;
2839 const struct dnet_io_attr
*io2
= d2
;
2841 return memcmp(io1
->id
, io2
->id
, DNET_ID_SIZE
);
2844 struct dnet_range_data
*dnet_bulk_read(struct dnet_session
*s
, struct dnet_io_attr
*ios
, uint32_t io_num
, int group_id
, uint64_t cflags
, int *errp
)
2846 struct dnet_node
*n
= s
->node
;
2847 struct dnet_id id
, next_id
;
2849 struct dnet_range_data
*ret
;
2850 struct dnet_net_state
*cur
, *next
= NULL
;
2854 uint32_t i
, start
= -1;
2860 qsort(ios
, io_num
, sizeof(struct dnet_io_attr
), dnet_io_attr_cmp
);
2866 dnet_setup_id(&id
, group_id
, ios
[0].id
);
2867 id
.type
= ios
[0].type
;
2869 cur
= dnet_state_get_first(n
, &id
);
2871 dnet_log(n
, DNET_LOG_ERROR
, "%s: Can't get state for id\n", dnet_dump_id(&id
));
2876 for (i
= 0; i
< io_num
; ++i
) {
2877 if ((i
+ 1) < io_num
) {
2878 dnet_setup_id(&next_id
, group_id
, ios
[i
+1].id
);
2879 next_id
.type
= ios
[i
+1].type
;
2881 next
= dnet_state_get_first(n
, &next_id
);
2883 dnet_log(n
, DNET_LOG_ERROR
, "%s: Can't get state for id\n", dnet_dump_id(&next_id
));
2888 /* Send command only if state changes or it's a last id */
2889 if ((cur
== next
)) {
2890 dnet_state_put(next
);
2896 dnet_log(n
, DNET_LOG_NOTICE
, "start: %s: end: %s, count: %llu, addr: %s\n",
2898 dnet_dump_id(&next_id
),
2899 (unsigned long long)(i
- start
),
2900 dnet_state_dump_addr(cur
));
2902 data
= dnet_bulk_read_wait_raw(s
, &id
, ios
, i
- start
, DNET_CMD_BULK_READ
, cflags
, &err
);
2910 struct dnet_range_data
*new_ret
;
2914 new_ret
= realloc(ret
, ret_num
* sizeof(struct dnet_range_data
));
2921 ret
[ret_num
- 1].data
= data
;
2922 ret
[ret_num
- 1].size
= size
;
2928 dnet_state_put(cur
);
2931 memcpy(&id
, &next_id
, sizeof(struct dnet_id
));
2936 dnet_state_put(next
);
2937 dnet_state_put(cur
);
2947 struct dnet_range_data
dnet_bulk_write(struct dnet_session
*s
, struct dnet_io_control
*ctl
, int ctl_num
, int *errp
)
2949 struct dnet_node
*n
= s
->node
;
2950 int err
, i
, trans_num
= 0, local_trans_num
;
2951 struct dnet_wait
*w
;
2952 struct dnet_write_completion
*wc
;
2953 struct dnet_range_data ret
;
2954 struct dnet_metadata_control mcl
;
2955 struct dnet_meta_container mc
;
2956 struct dnet_io_control meta_ctl
;
2961 memset(&ret
, 0, sizeof(ret
));
2963 wc
= malloc(sizeof(struct dnet_write_completion
));
2968 memset(wc
, 0, sizeof(struct dnet_write_completion
));
2970 w
= dnet_wait_alloc(0);
2978 atomic_set(&w
->refcnt
, INT_MAX
);
2979 w
->status
= -ENOENT
;
2981 for (i
= 0; i
< ctl_num
; ++i
) {
2983 ctl
[i
].complete
= dnet_write_complete
;
2985 ctl
[i
].cmd
= DNET_CMD_WRITE
;
2986 ctl
[i
].cflags
= DNET_FLAGS_NEED_ACK
;
2988 memcpy(ctl
[i
].io
.id
, ctl
[i
].id
.id
, DNET_ID_SIZE
);
2989 memcpy(ctl
[i
].io
.parent
, ctl
[i
].id
.id
, DNET_ID_SIZE
);
2991 local_trans_num
= dnet_write_object(s
, &ctl
[i
]);
2992 if (local_trans_num
< 0)
2993 local_trans_num
= 0;
2995 trans_num
+= local_trans_num
;
2997 /* Prepare and send metadata */
2998 memset(&mcl
, 0, sizeof(mcl
));
3000 group_num
= s
->group_num
;
3001 groups
= alloca(group_num
* sizeof(int));
3002 memcpy(groups
, s
->groups
, group_num
* sizeof(int));
3004 mcl
.groups
= groups
;
3005 mcl
.group_num
= group_num
;
3007 mcl
.cflags
= ctl
[i
].cflags
;
3009 gettimeofday(&tv
, NULL
);
3010 mcl
.ts
.tv_sec
= tv
.tv_sec
;
3011 mcl
.ts
.tv_nsec
= tv
.tv_usec
* 1000;
3013 memset(&mc
, 0, sizeof(mc
));
3015 err
= dnet_create_metadata(s
, &mcl
, &mc
);
3016 dnet_log(n
, DNET_LOG_DEBUG
, "Creating metadata: err: %d", err
);
3018 dnet_convert_metadata(n
, mc
.data
, mc
.size
);
3020 memset(&meta_ctl
, 0, sizeof(struct dnet_io_control
));
3023 meta_ctl
.complete
= dnet_write_complete
;
3024 meta_ctl
.cmd
= DNET_CMD_WRITE
;
3027 meta_ctl
.cflags
= ctl
[i
].cflags
;
3029 memcpy(&meta_ctl
.id
, &ctl
[i
].id
, sizeof(struct dnet_id
));
3030 memcpy(meta_ctl
.io
.id
, ctl
[i
].id
.id
, DNET_ID_SIZE
);
3031 memcpy(meta_ctl
.io
.parent
, ctl
[i
].id
.id
, DNET_ID_SIZE
);
3032 meta_ctl
.id
.type
= meta_ctl
.io
.type
= EBLOB_TYPE_META
;
3034 meta_ctl
.io
.flags
|= DNET_IO_FLAGS_META
;
3035 meta_ctl
.io
.offset
= 0;
3036 meta_ctl
.io
.size
= mc
.size
;
3037 meta_ctl
.data
= mc
.data
;
3039 local_trans_num
= dnet_write_object(s
, &meta_ctl
);
3040 if (local_trans_num
< 0)
3041 local_trans_num
= 0;
3043 trans_num
+= local_trans_num
;
3048 * 1 - the first reference counter we grabbed at allocation time
3050 atomic_sub(&w
->refcnt
, INT_MAX
- trans_num
- 1);
3052 err
= dnet_wait_event(w
, w
->cond
== trans_num
, &n
->wait_ts
);
3053 if (err
|| w
->status
) {
3056 dnet_log(n
, DNET_LOG_NOTICE
, "%s: failed to wait for IO write completion, err: %d, status: %d.\n",
3057 dnet_dump_id(&ctl
->id
), err
, w
->status
);
3060 if (err
|| !trans_num
) {
3063 dnet_log(n
, DNET_LOG_ERROR
, "Failed to write data into the storage, err: %d, trans_num: %d.\n", err
, trans_num
);
3068 dnet_log(n
, DNET_LOG_NOTICE
, "%s: successfully wrote %llu bytes into the storage, reply size: %d.\n",
3069 dnet_dump_id(&ctl
->id
), (unsigned long long)ctl
->io
.size
, wc
->size
);
3072 ret
.data
= wc
->reply
;
3073 ret
.size
= wc
->size
;
3078 dnet_write_complete_free(wc
);
3084 int dnet_flags(struct dnet_node
*n
)
3089 static int dnet_start_defrag_complete(struct dnet_net_state
*state
, struct dnet_cmd
*cmd
, void *priv
)
3091 struct dnet_wait
*w
= priv
;
3093 if (is_trans_destroyed(state
, cmd
)) {
3094 dnet_wakeup(w
, w
->cond
++);
3102 static int dnet_start_defrag_single(struct dnet_net_state
*st
, void *priv
, uint64_t cflags
)
3104 struct dnet_trans_control ctl
;
3106 memset(&ctl
, 0, sizeof(struct dnet_trans_control
));
3108 dnet_setup_id(&ctl
.id
, st
->idc
->group
->group_id
, st
->idc
->ids
[0].raw
.id
);
3109 ctl
.cmd
= DNET_CMD_DEFRAG
;
3110 ctl
.complete
= dnet_start_defrag_complete
;
3112 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
3114 return dnet_trans_alloc_send_state(st
, &ctl
);
3117 int dnet_start_defrag(struct dnet_session
*s
, uint64_t cflags
)
3119 struct dnet_node
*n
= s
->node
;
3120 struct dnet_net_state
*st
;
3121 struct dnet_wait
*w
;
3122 struct dnet_group
*g
;
3126 w
= dnet_wait_alloc(0);
3132 pthread_mutex_lock(&n
->state_lock
);
3133 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
3134 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
3141 dnet_start_defrag_single(st
, w
, cflags
);
3145 pthread_mutex_unlock(&n
->state_lock
);
3147 err
= dnet_wait_event(w
, w
->cond
== num
, &n
->wait_ts
);