2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
18 #include <sys/socket.h>
29 #include "elliptics.h"
31 #include "elliptics/packet.h"
32 #include "elliptics/interface.h"
35 int dnet_transform(struct dnet_node
*n
, const void *src
, uint64_t size
, struct dnet_id
*id
)
37 struct dnet_transform
*t
= &n
->transform
;
38 unsigned int csize
= sizeof(id
->id
);
40 return t
->transform(t
->priv
, src
, size
, id
->id
, &csize
, 0);
44 static char *dnet_cmd_strings
[] = {
45 [DNET_CMD_LOOKUP
] = "LOOKUP",
46 [DNET_CMD_REVERSE_LOOKUP
] = "REVERSE_LOOKUP",
47 [DNET_CMD_JOIN
] = "JOIN",
48 [DNET_CMD_WRITE
] = "WRITE",
49 [DNET_CMD_READ
] = "READ",
50 [DNET_CMD_LIST
] = "CHECK",
51 [DNET_CMD_EXEC
] = "EXEC",
52 [DNET_CMD_ROUTE_LIST
] = "ROUTE_LIST",
53 [DNET_CMD_STAT
] = "STAT",
54 [DNET_CMD_NOTIFY
] = "NOTIFY",
55 [DNET_CMD_DEL
] = "REMOVE",
56 [DNET_CMD_STAT_COUNT
] = "STAT_COUNT",
57 [DNET_CMD_STATUS
] = "STATUS",
58 [DNET_CMD_READ_RANGE
] = "READ_RANGE",
59 [DNET_CMD_DEL_RANGE
] = "DEL_RANGE",
60 [DNET_CMD_AUTH
] = "AUTH",
61 [DNET_CMD_BULK_READ
] = "BULK_READ",
62 [DNET_CMD_UNKNOWN
] = "UNKNOWN",
65 static char *dnet_counter_strings
[] = {
66 [DNET_CNTR_LA1
] = "DNET_CNTR_LA1",
67 [DNET_CNTR_LA5
] = "DNET_CNTR_LA5",
68 [DNET_CNTR_LA15
] = "DNET_CNTR_LA15",
69 [DNET_CNTR_BSIZE
] = "DNET_CNTR_BSIZE",
70 [DNET_CNTR_FRSIZE
] = "DNET_CNTR_FRSIZE",
71 [DNET_CNTR_BLOCKS
] = "DNET_CNTR_BLOCKS",
72 [DNET_CNTR_BFREE
] = "DNET_CNTR_BFREE",
73 [DNET_CNTR_BAVAIL
] = "DNET_CNTR_BAVAIL",
74 [DNET_CNTR_FILES
] = "DNET_CNTR_FILES",
75 [DNET_CNTR_FFREE
] = "DNET_CNTR_FFREE",
76 [DNET_CNTR_FAVAIL
] = "DNET_CNTR_FAVAIL",
77 [DNET_CNTR_FSID
] = "DNET_CNTR_FSID",
78 [DNET_CNTR_VM_ACTIVE
] = "DNET_CNTR_VM_ACTIVE",
79 [DNET_CNTR_VM_INACTIVE
] = "DNET_CNTR_VM_INACTIVE",
80 [DNET_CNTR_VM_TOTAL
] = "DNET_CNTR_VM_TOTAL",
81 [DNET_CNTR_VM_FREE
] = "DNET_CNTR_VM_FREE",
82 [DNET_CNTR_VM_CACHED
] = "DNET_CNTR_VM_CACHED",
83 [DNET_CNTR_VM_BUFFERS
] = "DNET_CNTR_VM_BUFFERS",
84 [DNET_CNTR_NODE_FILES
] = "DNET_CNTR_NODE_FILES",
85 [DNET_CNTR_NODE_LAST_MERGE
] = "DNET_CNTR_NODE_LAST_MERGE",
86 [DNET_CNTR_NODE_CHECK_COPY
] = "DNET_CNTR_NODE_CHECK_COPY",
87 [DNET_CNTR_DBR_NOREC
] = "DNET_CNTR_DBR_NOREC",
88 [DNET_CNTR_DBR_SYSTEM
] = "DNET_CNTR_DBR_SYSTEM",
89 [DNET_CNTR_DBR_ERROR
] = "DNET_CNTR_DBR_ERROR",
90 [DNET_CNTR_DBW_SYSTEM
] = "DNET_CNTR_DBW_SYSTEM",
91 [DNET_CNTR_DBW_ERROR
] = "DNET_CNTR_DBW_ERROR",
92 [DNET_CNTR_UNKNOWN
] = "UNKNOWN",
95 char *dnet_cmd_string(int cmd
)
97 if (cmd
<= 0 || cmd
>= __DNET_CMD_MAX
)
98 cmd
= DNET_CMD_UNKNOWN
;
100 return dnet_cmd_strings
[cmd
];
103 char *dnet_counter_string(int cntr
, int cmd_num
)
105 if (cntr
<= 0 || cntr
>= __DNET_CNTR_MAX
)
106 cntr
= DNET_CNTR_UNKNOWN
;
109 return dnet_cmd_string(cntr
);
111 if (cntr
>= cmd_num
&& cntr
< (cmd_num
* 2))
112 return dnet_cmd_string(cntr
- cmd_num
);
114 return dnet_counter_strings
[cntr
];
117 static int dnet_add_received_state(struct dnet_node
*n
, struct dnet_addr_attr
*a
,
118 int group_id
, struct dnet_raw_id
*ids
, int id_num
)
121 struct dnet_net_state
*nst
;
125 dnet_setup_id(&raw
, group_id
, ids
[0].id
);
127 nst
= dnet_state_search_by_addr(n
, &a
->addr
);
134 s
= dnet_socket_create_addr(n
, a
->sock_type
, a
->proto
, a
->family
,
135 (struct sockaddr
*)&a
->addr
.addr
, a
->addr
.addr_len
, 0);
141 join
= DNET_WANT_RECONNECT
;
142 if (n
->flags
& DNET_CFG_JOIN_NETWORK
)
145 nst
= dnet_state_create(n
, group_id
, ids
, id_num
, &a
->addr
, s
, &err
, join
, dnet_state_net_process
);
149 dnet_log(n
, DNET_LOG_NOTICE
, "%d: added received state %s.\n",
150 group_id
, dnet_state_dump_addr(nst
));
160 static int dnet_process_addr_attr(struct dnet_net_state
*st
, struct dnet_addr_attr
*a
, int group_id
, int num
)
162 struct dnet_node
*n
= st
->n
;
163 struct dnet_raw_id
*ids
;
166 ids
= (struct dnet_raw_id
*)(a
+ 1);
167 for (i
=0; i
<num
; ++i
)
168 dnet_convert_raw_id(&ids
[0]);
170 err
= dnet_add_received_state(n
, a
, group_id
, ids
, num
);
171 dnet_log(n
, DNET_LOG_DEBUG
, "%s: route list: %d entries: %d.\n", dnet_server_convert_dnet_addr(&a
->addr
), num
, err
);
176 static int dnet_recv_route_list_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
178 struct dnet_wait
*w
= priv
;
179 struct dnet_addr_attr
*a
;
183 if (is_trans_destroyed(st
, cmd
)) {
189 dnet_wakeup(w
, w
->cond
= 1);
196 if (!cmd
->size
|| err
)
199 size
= cmd
->size
+ sizeof(struct dnet_cmd
);
200 if (size
< (signed)sizeof(struct dnet_addr_cmd
)) {
205 num
= (cmd
->size
- sizeof(struct dnet_addr_attr
)) / sizeof(struct dnet_raw_id
);
211 a
= (struct dnet_addr_attr
*)(cmd
+ 1);
212 dnet_convert_addr_attr(a
);
214 err
= dnet_process_addr_attr(st
, a
, cmd
->id
.group_id
, num
);
220 int dnet_recv_route_list(struct dnet_net_state
*st
)
222 struct dnet_io_req req
;
223 struct dnet_node
*n
= st
->n
;
224 struct dnet_trans
*t
;
225 struct dnet_cmd
*cmd
;
229 w
= dnet_wait_alloc(0);
235 t
= dnet_trans_alloc(n
, sizeof(struct dnet_cmd
));
238 goto err_out_wait_put
;
241 t
->complete
= dnet_recv_route_list_complete
;
244 cmd
= (struct dnet_cmd
*)(t
+ 1);
246 cmd
->flags
= DNET_FLAGS_NEED_ACK
| DNET_FLAGS_DIRECT
| DNET_FLAGS_NOLOCK
;
249 memcpy(&t
->cmd
, cmd
, sizeof(struct dnet_cmd
));
251 cmd
->cmd
= t
->command
= DNET_CMD_ROUTE_LIST
;
253 t
->st
= dnet_state_get(st
);
254 cmd
->trans
= t
->rcv_trans
= t
->trans
= atomic_inc(&n
->trans
);
256 dnet_convert_cmd(cmd
);
258 dnet_log(n
, DNET_LOG_DEBUG
, "%s: list route request to %s.\n", dnet_dump_id(&cmd
->id
),
259 dnet_server_convert_dnet_addr(&st
->addr
));
261 memset(&req
, 0, sizeof(req
));
264 req
.hsize
= sizeof(struct dnet_cmd
);
267 err
= dnet_trans_send(t
, &req
);
269 goto err_out_destroy
;
271 err
= dnet_wait_event(w
, w
->cond
!= 0, &n
->wait_ts
);
284 static struct dnet_net_state
*dnet_add_state_socket(struct dnet_node
*n
, struct dnet_addr
*addr
, int s
, int *errp
, int join
)
286 struct dnet_net_state
*st
, dummy
;
287 char buf
[sizeof(struct dnet_addr_cmd
)];
288 struct dnet_cmd
*cmd
;
289 int err
, num
, i
, size
;
290 struct dnet_raw_id
*ids
;
292 memset(buf
, 0, sizeof(buf
));
294 cmd
= (struct dnet_cmd
*)(buf
);
296 cmd
->flags
= DNET_FLAGS_DIRECT
| DNET_FLAGS_NOLOCK
;
297 cmd
->cmd
= DNET_CMD_REVERSE_LOOKUP
;
299 dnet_convert_cmd(cmd
);
302 memset(st
, 0, sizeof(struct dnet_net_state
));
304 st
->write_s
= st
->read_s
= s
;
307 err
= dnet_send_nolock(st
, buf
, sizeof(struct dnet_cmd
));
309 dnet_log(n
, DNET_LOG_ERROR
, "Failed to send reverse "
310 "lookup message to %s, err: %d.\n",
311 dnet_server_convert_dnet_addr(addr
), err
);
315 err
= dnet_recv(st
, buf
, sizeof(buf
));
317 dnet_log(n
, DNET_LOG_ERROR
, "Failed to receive reverse "
318 "lookup headers from %s, err: %d.\n",
319 dnet_server_convert_dnet_addr(addr
), err
);
323 cmd
= (struct dnet_cmd
*)(buf
);
325 dnet_convert_addr_cmd((struct dnet_addr_cmd
*)buf
);
327 size
= cmd
->size
- sizeof(struct dnet_addr_attr
);
328 num
= size
/ sizeof(struct dnet_raw_id
);
330 dnet_log(n
, DNET_LOG_DEBUG
, "%s: waiting for %d ids\n", dnet_dump_id(&cmd
->id
), num
);
338 err
= dnet_recv(st
, ids
, size
);
340 dnet_log(n
, DNET_LOG_ERROR
, "Failed to receive reverse "
341 "lookup body (%llu bytes) from %s, err: %d.\n",
342 (unsigned long long)cmd
->size
,
343 dnet_server_convert_dnet_addr(addr
), err
);
347 for (i
=0; i
<num
; ++i
)
348 dnet_convert_raw_id(&ids
[i
]);
350 st
= dnet_state_create(n
, cmd
->id
.group_id
, ids
, num
, addr
, s
, &err
, join
, dnet_state_net_process
);
352 /* socket is already closed */
369 int dnet_add_state(struct dnet_node
*n
, struct dnet_config
*cfg
)
371 int s
, err
, join
= DNET_WANT_RECONNECT
;
372 struct dnet_addr addr
;
373 struct dnet_net_state
*st
;
375 memset(&addr
, 0, sizeof(addr
));
377 addr
.addr_len
= sizeof(addr
.addr
);
378 s
= dnet_socket_create(n
, cfg
, &addr
, 0);
381 goto err_out_reconnect
;
384 if (n
->flags
& DNET_CFG_JOIN_NETWORK
)
387 /* will close socket on error */
388 st
= dnet_add_state_socket(n
, &addr
, s
, &err
, join
);
390 goto err_out_reconnect
;
392 if (!(cfg
->flags
& DNET_CFG_NO_ROUTE_LIST
))
393 dnet_recv_route_list(st
);
398 if ((err
== -EADDRINUSE
) || (err
== -ECONNREFUSED
) || (err
== -ECONNRESET
) ||
399 (err
== -EINPROGRESS
) || (err
== -EAGAIN
))
400 dnet_add_reconnect_state(n
, &addr
, join
);
404 struct dnet_write_completion
{
407 struct dnet_wait
*wait
;
410 static void dnet_write_complete_free(struct dnet_write_completion
*wc
)
412 if (atomic_dec_and_test(&wc
->wait
->refcnt
)) {
413 dnet_wait_destroy(wc
->wait
);
419 static int dnet_write_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
422 struct dnet_write_completion
*wc
= priv
;
423 struct dnet_wait
*w
= wc
->wait
;
425 if (is_trans_destroyed(st
, cmd
)) {
426 dnet_wakeup(w
, w
->cond
++);
427 dnet_write_complete_free(wc
);
432 if (!err
&& st
&& (cmd
->size
> sizeof(struct dnet_addr_attr
) + sizeof(struct dnet_file_info
))) {
433 int old_size
= wc
->size
;
436 wc
->size
+= cmd
->size
+ sizeof(struct dnet_cmd
) + sizeof(struct dnet_addr
);
437 wc
->reply
= realloc(wc
->reply
, wc
->size
);
443 data
= wc
->reply
+ old_size
;
445 memcpy(data
, &st
->addr
, sizeof(struct dnet_addr
));
446 memcpy(data
+ sizeof(struct dnet_addr
), cmd
, sizeof(struct dnet_cmd
));
447 memcpy(data
+ sizeof(struct dnet_addr
) + sizeof(struct dnet_cmd
), cmd
+ 1, cmd
->size
);
451 pthread_mutex_lock(&w
->wait_lock
);
454 pthread_mutex_unlock(&w
->wait_lock
);
459 static struct dnet_trans
*dnet_io_trans_create(struct dnet_node
*n
, struct dnet_io_control
*ctl
, int *errp
)
461 struct dnet_io_req req
;
462 struct dnet_trans
*t
= NULL
;
463 struct dnet_io_attr
*io
;
464 struct dnet_cmd
*cmd
;
465 uint64_t size
= ctl
->io
.size
;
466 uint64_t tsize
= sizeof(struct dnet_io_attr
) + sizeof(struct dnet_cmd
);
469 if (ctl
->cmd
== DNET_CMD_READ
)
472 if (ctl
->fd
< 0 && size
< DNET_COPY_IO_SIZE
)
475 t
= dnet_trans_alloc(n
, tsize
);
478 goto err_out_complete
;
480 t
->complete
= ctl
->complete
;
483 cmd
= (struct dnet_cmd
*)(t
+ 1);
484 io
= (struct dnet_io_attr
*)(cmd
+ 1);
486 if (ctl
->fd
< 0 && size
< DNET_COPY_IO_SIZE
) {
489 memcpy(data
, ctl
->data
, size
);
493 memcpy(&cmd
->id
, &ctl
->id
, sizeof(struct dnet_id
));
494 cmd
->size
= sizeof(struct dnet_io_attr
) + size
;
495 cmd
->flags
= ctl
->cflags
;
498 cmd
->cmd
= t
->command
= ctl
->cmd
;
500 memcpy(io
, &ctl
->io
, sizeof(struct dnet_io_attr
));
501 memcpy(&t
->cmd
, cmd
, sizeof(struct dnet_cmd
));
503 t
->st
= dnet_state_get_first(n
, &cmd
->id
);
506 goto err_out_destroy
;
509 cmd
->trans
= t
->rcv_trans
= t
->trans
= atomic_inc(&n
->trans
);
511 dnet_log(n
, DNET_LOG_INFO
, "%s: created trans: %llu, cmd: %s, cflags: %llx, size: %llu, offset: %llu, "
512 "fd: %d, local_offset: %llu -> %s weight: %f, mrt: %ld.\n",
513 dnet_dump_id(&ctl
->id
),
514 (unsigned long long)t
->trans
,
515 dnet_cmd_string(ctl
->cmd
), (unsigned long long)cmd
->flags
,
516 (unsigned long long)ctl
->io
.size
, (unsigned long long)ctl
->io
.offset
,
518 (unsigned long long)ctl
->local_offset
,
519 dnet_server_convert_dnet_addr(&t
->st
->addr
), t
->st
->weight
, t
->st
->median_read_time
);
521 dnet_convert_cmd(cmd
);
522 dnet_convert_io_attr(io
);
525 memset(&req
, 0, sizeof(req
));
533 req
.local_offset
= ctl
->local_offset
;
535 } else if (size
>= DNET_COPY_IO_SIZE
) {
536 req
.data
= (void *)ctl
->data
;
540 err
= dnet_trans_send(t
, &req
);
542 goto err_out_destroy
;
548 ctl
->complete(NULL
, NULL
, ctl
->priv
);
558 int dnet_trans_create_send_all(struct dnet_node
*n
, struct dnet_io_control
*ctl
)
562 pthread_mutex_lock(&n
->group_lock
);
563 for (i
=0; i
<n
->group_num
; ++i
) {
564 ctl
->id
.group_id
= n
->groups
[i
];
566 dnet_io_trans_create(n
, ctl
, &err
);
569 pthread_mutex_unlock(&n
->group_lock
);
572 dnet_io_trans_create(n
, ctl
, &err
);
579 int dnet_write_object(struct dnet_node
*n
, struct dnet_io_control
*ctl
)
581 return dnet_trans_create_send_all(n
, ctl
);
584 static int dnet_write_file_id_raw(struct dnet_node
*n
, const char *file
, struct dnet_id
*id
,
585 uint64_t local_offset
, uint64_t remote_offset
, uint64_t size
,
586 uint64_t cflags
, unsigned int ioflags
)
588 int fd
, err
, trans_num
;
591 struct dnet_io_control ctl
;
592 struct dnet_write_completion
*wc
;
594 wc
= malloc(sizeof(struct dnet_write_completion
));
599 memset(wc
, 0, sizeof(struct dnet_write_completion
));
601 w
= dnet_wait_alloc(0);
605 dnet_log(n
, DNET_LOG_ERROR
, "Failed to allocate read waiting structure.\n");
611 fd
= open(file
, O_RDONLY
| O_LARGEFILE
| O_CLOEXEC
);
614 dnet_log_err(n
, "Failed to open to be written file '%s'", file
);
618 err
= fstat(fd
, &stat
);
621 dnet_log_err(n
, "Failed to stat to be written file '%s'", file
);
625 if (local_offset
>= (uint64_t)stat
.st_size
) {
630 if (!size
|| size
+ local_offset
>= (uint64_t)stat
.st_size
)
631 size
= stat
.st_size
- local_offset
;
633 memset(&ctl
, 0, sizeof(struct dnet_io_control
));
635 atomic_set(&w
->refcnt
, INT_MAX
);
639 ctl
.local_offset
= local_offset
;
642 ctl
.complete
= dnet_write_complete
;
645 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
646 ctl
.cmd
= DNET_CMD_WRITE
;
648 memcpy(ctl
.io
.id
, id
->id
, DNET_ID_SIZE
);
649 memcpy(ctl
.io
.parent
, id
->id
, DNET_ID_SIZE
);
651 ctl
.io
.flags
= ioflags
;
653 ctl
.io
.offset
= remote_offset
;
654 ctl
.io
.type
= id
->type
;
656 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
658 trans_num
= dnet_write_object(n
, &ctl
);
663 * 1 - the first reference counter we grabbed at allocation time
665 atomic_sub(&w
->refcnt
, INT_MAX
- trans_num
- 1);
667 err
= dnet_wait_event(w
, w
->cond
== trans_num
, &n
->wait_ts
);
668 if (err
|| w
->status
) {
673 if (!err
&& !trans_num
)
677 dnet_log(n
, DNET_LOG_ERROR
, "Failed to write file '%s' into the storage, transactions: %d, err: %d.\n", file
, trans_num
, err
);
681 dnet_log(n
, DNET_LOG_NOTICE
, "Successfully wrote file: '%s' into the storage, size: %llu.\n",
682 file
, (unsigned long long)size
);
685 dnet_write_complete_free(wc
);
692 dnet_write_complete_free(wc
);
697 int dnet_write_file_id(struct dnet_node
*n
, const char *file
, struct dnet_id
*id
, uint64_t local_offset
,
698 uint64_t remote_offset
, uint64_t size
, uint64_t cflags
, unsigned int ioflags
)
700 int err
= dnet_write_file_id_raw(n
, file
, id
, local_offset
, remote_offset
, size
, cflags
, ioflags
);
701 if (!err
&& !(ioflags
& DNET_IO_FLAGS_CACHE_ONLY
))
702 err
= dnet_create_write_metadata_strings(n
, NULL
, 0, id
, NULL
, cflags
);
707 int dnet_write_file(struct dnet_node
*n
, const char *file
, const void *remote
, int remote_len
,
708 uint64_t local_offset
, uint64_t remote_offset
, uint64_t size
,
709 uint64_t cflags
, unsigned int ioflags
, int type
)
714 dnet_transform(n
, remote
, remote_len
, &id
);
717 err
= dnet_write_file_id_raw(n
, file
, &id
, local_offset
, remote_offset
, size
, cflags
, ioflags
);
718 if (!err
&& !(ioflags
& DNET_IO_FLAGS_CACHE_ONLY
))
719 err
= dnet_create_write_metadata_strings(n
, remote
, remote_len
, &id
, NULL
, cflags
);
724 static int dnet_read_file_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
728 struct dnet_io_completion
*c
= priv
;
729 struct dnet_io_attr
*io
;
732 if (is_trans_destroyed(st
, cmd
)) {
735 if (cmd
&& cmd
->status
)
738 dnet_wakeup(c
->wait
, c
->wait
->cond
= err
);
739 dnet_wait_put(c
->wait
);
748 if (cmd
->status
!= 0 || cmd
->size
== 0) {
750 goto err_out_exit_no_log
;
753 if (cmd
->size
<= sizeof(struct dnet_io_attr
)) {
754 dnet_log(n
, DNET_LOG_ERROR
, "%s: read completion error: wrong size: cmd_size: %llu, must be more than %zu.\n",
755 dnet_dump_id(&cmd
->id
), (unsigned long long)cmd
->size
,
756 sizeof(struct dnet_io_attr
));
758 goto err_out_exit_no_log
;
761 io
= (struct dnet_io_attr
*)(cmd
+ 1);
764 dnet_convert_io_attr(io
);
766 fd
= open(c
->file
, O_RDWR
| O_CREAT
| O_CLOEXEC
, 0644);
769 dnet_log_err(n
, "%s: failed to open read completion file '%s'", dnet_dump_id(&cmd
->id
), c
->file
);
773 err
= pwrite(fd
, data
, io
->size
, c
->offset
);
776 dnet_log_err(n
, "%s: failed to write data into completion file '%s'", dnet_dump_id(&cmd
->id
), c
->file
);
781 dnet_log(n
, DNET_LOG_NOTICE
, "%s: read completed: file: '%s', offset: %llu, size: %llu, status: %d.\n",
782 dnet_dump_id(&cmd
->id
), c
->file
, (unsigned long long)c
->offset
,
783 (unsigned long long)io
->size
, cmd
->status
);
790 dnet_log(n
, DNET_LOG_ERROR
, "%s: read completed: file: '%s', offset: %llu, size: %llu, status: %d, err: %d.\n",
791 dnet_dump_id(&cmd
->id
), c
->file
, (unsigned long long)io
->offset
,
792 (unsigned long long)io
->size
, cmd
->status
, err
);
794 dnet_wakeup(c
->wait
, c
->wait
->cond
= err
? err
: 1);
798 int dnet_read_object(struct dnet_node
*n
, struct dnet_io_control
*ctl
)
802 if (!dnet_io_trans_create(n
, ctl
, &err
))
808 static int dnet_read_file_raw_exec(struct dnet_node
*n
, const char *file
, unsigned int len
,
809 uint64_t write_offset
, uint64_t io_offset
, uint64_t io_size
,
810 struct dnet_id
*id
, struct dnet_wait
*w
)
812 struct dnet_io_control ctl
;
813 struct dnet_io_completion
*c
;
814 int err
, wait_init
= ~0;
816 memset(&ctl
, 0, sizeof(struct dnet_io_control
));
818 ctl
.io
.size
= io_size
;
819 ctl
.io
.offset
= io_offset
;
821 ctl
.io
.type
= id
->type
;
823 memcpy(ctl
.io
.parent
, id
->id
, DNET_ID_SIZE
);
824 memcpy(ctl
.io
.id
, id
->id
, DNET_ID_SIZE
);
826 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
829 ctl
.complete
= dnet_read_file_complete
;
830 ctl
.cmd
= DNET_CMD_READ
;
831 ctl
.cflags
= DNET_FLAGS_NEED_ACK
;
833 c
= malloc(sizeof(struct dnet_io_completion
) + len
+ 1 + sizeof(DNET_HISTORY_SUFFIX
));
835 dnet_log(n
, DNET_LOG_ERROR
, "%s: failed to allocate IO completion structure "
836 "for '%s' file reading.\n",
837 dnet_dump_id(&ctl
.id
), file
);
842 memset(c
, 0, sizeof(struct dnet_io_completion
) + len
+ 1 + sizeof(DNET_HISTORY_SUFFIX
));
844 c
->wait
= dnet_wait_get(w
);
845 c
->offset
= write_offset
;
846 c
->file
= (char *)(c
+ 1);
848 sprintf(c
->file
, "%s", file
);
853 err
= dnet_read_object(n
, &ctl
);
857 err
= dnet_wait_event(w
, w
->cond
!= wait_init
, &n
->wait_ts
);
858 if ((err
< 0) || (w
->cond
< 0)) {
859 char id_str
[2*DNET_ID_SIZE
+ 1];
862 dnet_log(n
, DNET_LOG_ERROR
, "%d:%s '%s' : failed to read data: %d\n",
863 ctl
.id
.group_id
, dnet_dump_id_len_raw(ctl
.id
.id
, DNET_ID_SIZE
, id_str
),
874 static int dnet_read_file_raw(struct dnet_node
*n
, const char *file
, struct dnet_id
*id
, uint64_t offset
, uint64_t size
)
876 int err
= -ENOENT
, len
= strlen(file
), i
;
880 w
= dnet_wait_alloc(~0);
883 dnet_log(n
, DNET_LOG_ERROR
, "Failed to allocate read waiting.\n");
890 num
= dnet_mix_states(n
, id
, &g
);
896 for (i
=0; i
<num
; ++i
) {
899 err
= dnet_read_file_raw_exec(n
, file
, len
, 0, offset
, size
, id
, w
);
913 int dnet_read_file_id(struct dnet_node
*n
, const char *file
, struct dnet_id
*id
, uint64_t offset
, uint64_t size
)
915 return dnet_read_file_raw(n
, file
, id
, offset
, size
);
918 int dnet_read_file(struct dnet_node
*n
, const char *file
, const void *remote
, int remote_size
,
919 uint64_t offset
, uint64_t size
, int type
)
923 dnet_transform(n
, remote
, remote_size
, &id
);
926 return dnet_read_file_raw(n
, file
, &id
, offset
, size
);
929 struct dnet_wait
*dnet_wait_alloc(int cond
)
934 w
= malloc(sizeof(struct dnet_wait
));
940 memset(w
, 0, sizeof(struct dnet_wait
));
942 err
= pthread_cond_init(&w
->wait
, NULL
);
946 err
= pthread_mutex_init(&w
->wait_lock
, NULL
);
948 goto err_out_destroy
;
951 atomic_init(&w
->refcnt
, 1);
956 pthread_mutex_destroy(&w
->wait_lock
);
961 void dnet_wait_destroy(struct dnet_wait
*w
)
963 pthread_mutex_destroy(&w
->wait_lock
);
964 pthread_cond_destroy(&w
->wait
);
969 static int dnet_send_cmd_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
971 struct dnet_wait
*w
= priv
;
973 if (is_trans_destroyed(st
, cmd
)) {
974 dnet_wakeup(w
, w
->cond
++);
979 w
->status
= cmd
->status
;
983 void *data
= cmd
+ 1;
985 w
->ret
= realloc(w
->ret
, w
->size
+ cmd
->size
);
990 memcpy(w
->ret
+ w
->size
, data
, cmd
->size
);
991 w
->size
+= cmd
->size
;
998 static int dnet_send_cmd_single(struct dnet_net_state
*st
, struct dnet_wait
*w
, struct sph
*e
, uint64_t cflags
)
1000 struct dnet_trans_control ctl
;
1002 memset(&ctl
, 0, sizeof(struct dnet_trans_control
));
1004 dnet_setup_id(&ctl
.id
, st
->idc
->group
->group_id
, st
->idc
->ids
[0].raw
.id
);
1005 ctl
.size
= sizeof(struct sph
) + e
->event_size
+ e
->data_size
+ e
->binary_size
;
1006 ctl
.cmd
= DNET_CMD_EXEC
;
1007 ctl
.complete
= dnet_send_cmd_complete
;
1009 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
1011 dnet_convert_sph(e
);
1015 return dnet_trans_alloc_send_state(st
, &ctl
);
1018 static int dnet_send_cmd_raw(struct dnet_node
*n
, struct dnet_id
*id
,
1019 struct sph
*e
, void **ret
, uint64_t cflags
)
1021 struct dnet_net_state
*st
;
1022 int err
= -ENOENT
, num
= 0;
1023 struct dnet_wait
*w
;
1024 struct dnet_group
*g
;
1026 w
= dnet_wait_alloc(0);
1032 if (id
&& id
->group_id
!= 0) {
1034 st
= dnet_state_get_first(n
, id
);
1037 err
= dnet_send_cmd_single(st
, w
, e
, cflags
);
1040 } else if (id
&& id
->group_id
== 0) {
1041 pthread_mutex_lock(&n
->state_lock
);
1042 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
1045 id
->group_id
= g
->group_id
;
1047 st
= dnet_state_search_nolock(n
, id
);
1050 err
= dnet_send_cmd_single(st
, w
, e
, cflags
);
1056 pthread_mutex_unlock(&n
->state_lock
);
1058 pthread_mutex_lock(&n
->state_lock
);
1059 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
1060 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
1066 err
= dnet_send_cmd_single(st
, w
, e
, cflags
);
1070 pthread_mutex_unlock(&n
->state_lock
);
1073 err
= dnet_wait_event(w
, w
->cond
== num
, &n
->wait_ts
);
1077 dnet_log(n
, DNET_LOG_INFO
, "%s: return data: %p, size: %d\n", dnet_dump_id_str(e
->src
.id
), w
->ret
, w
->size
);
1095 int dnet_send_cmd(struct dnet_node
*n
, struct dnet_id
*id
, struct sph
*e
, void **ret
)
1097 return dnet_send_cmd_raw(n
, id
, e
, ret
, 0);
1100 int dnet_send_cmd_nolock(struct dnet_node
*n
, struct dnet_id
*id
, struct sph
*e
, void **ret
)
1102 return dnet_send_cmd_raw(n
, id
, e
, ret
, DNET_FLAGS_NOLOCK
);
1105 int dnet_try_reconnect(struct dnet_node
*n
)
1107 struct dnet_addr_storage
*ast
, *tmp
;
1108 struct dnet_net_state
*st
;
1112 if (list_empty(&n
->reconnect_list
))
1115 pthread_mutex_lock(&n
->reconnect_lock
);
1116 list_for_each_entry_safe(ast
, tmp
, &n
->reconnect_list
, reconnect_entry
) {
1117 list_move(&ast
->reconnect_entry
, &list
);
1119 pthread_mutex_unlock(&n
->reconnect_lock
);
1121 list_for_each_entry_safe(ast
, tmp
, &list
, reconnect_entry
) {
1122 s
= dnet_socket_create_addr(n
, n
->sock_type
, n
->proto
, n
->family
,
1123 (struct sockaddr
*)ast
->addr
.addr
, ast
->addr
.addr_len
, 0);
1127 join
= DNET_WANT_RECONNECT
;
1128 if (ast
->__join_state
== DNET_JOIN
)
1131 st
= dnet_add_state_socket(n
, &ast
->addr
, s
, &err
, join
);
1137 if (err
== -EEXIST
|| err
== -EINVAL
)
1141 dnet_add_reconnect_state(n
, &ast
->addr
, ast
->__join_state
);
1143 list_del(&ast
->reconnect_entry
);
1150 int dnet_lookup_object(struct dnet_node
*n
, struct dnet_id
*id
, uint64_t cflags
,
1151 int (* complete
)(struct dnet_net_state
*, struct dnet_cmd
*, void *),
1154 struct dnet_io_req req
;
1155 struct dnet_trans
*t
;
1156 struct dnet_cmd
*cmd
;
1159 t
= dnet_trans_alloc(n
, sizeof(struct dnet_cmd
));
1162 goto err_out_complete
;
1164 t
->complete
= complete
;
1167 cmd
= (struct dnet_cmd
*)(t
+ 1);
1169 memcpy(&cmd
->id
, id
, sizeof(struct dnet_id
));
1171 memcpy(&t
->cmd
, cmd
, sizeof(struct dnet_cmd
));
1173 cmd
->cmd
= t
->command
= DNET_CMD_LOOKUP
;
1174 cmd
->flags
= cflags
| DNET_FLAGS_NEED_ACK
;
1176 t
->st
= dnet_state_get_first(n
, &cmd
->id
);
1179 goto err_out_destroy
;
1182 cmd
->trans
= t
->rcv_trans
= t
->trans
= atomic_inc(&n
->trans
);
1183 dnet_convert_cmd(cmd
);
1185 dnet_log(n
, DNET_LOG_NOTICE
, "%s: lookup to %s.\n", dnet_dump_id(id
), dnet_server_convert_dnet_addr(&t
->st
->addr
));
1187 memset(&req
, 0, sizeof(req
));
1190 req
.hsize
= sizeof(struct dnet_cmd
);
1192 err
= dnet_trans_send(t
, &req
);
1194 goto err_out_destroy
;
1200 complete(NULL
, NULL
, priv
);
1208 int dnet_lookup_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
1210 struct dnet_wait
*w
= priv
;
1211 struct dnet_node
*n
= NULL
;
1212 struct dnet_addr_attr
*a
;
1213 struct dnet_net_state
*other
;
1214 char addr_str
[128] = "no-address";
1217 if (is_trans_destroyed(st
, cmd
)) {
1218 dnet_wakeup(w
, w
->cond
++);
1225 if (err
|| !cmd
->size
)
1228 if (cmd
->size
< sizeof(struct dnet_addr_attr
)) {
1229 dnet_log(st
->n
, DNET_LOG_ERROR
, "%s: wrong dnet_addr attribute size %llu, must be at least %zu.\n",
1230 dnet_dump_id(&cmd
->id
), (unsigned long long)cmd
->size
, sizeof(struct dnet_addr_attr
));
1235 a
= (struct dnet_addr_attr
*)(cmd
+ 1);
1237 dnet_convert_addr_attr(a
);
1238 dnet_server_convert_dnet_addr_raw(&a
->addr
, addr_str
, sizeof(addr_str
));
1240 if (cmd
->size
> sizeof(struct dnet_addr_attr
) + sizeof(struct dnet_file_info
)) {
1241 struct dnet_file_info
*info
= (struct dnet_file_info
*)(a
+ 1);
1243 dnet_convert_file_info(info
);
1245 dnet_log_raw(n
, DNET_LOG_NOTICE
, "%s: lookup object: %s: "
1246 "offset: %llu, size: %llu, mode: %llo, path: %s\n",
1247 dnet_dump_id(&cmd
->id
), addr_str
,
1248 (unsigned long long)info
->offset
, (unsigned long long)info
->size
,
1249 (unsigned long long)info
->mode
, (char *)(info
+ 1));
1251 dnet_log_raw(n
, DNET_LOG_INFO
, "%s: lookup object: %s\n",
1252 dnet_dump_id(&cmd
->id
), addr_str
);
1256 other
= dnet_state_search_by_addr(n
, &a
->addr
);
1258 dnet_state_put(other
);
1260 dnet_recv_route_list(st
);
1267 dnet_log(n
, DNET_LOG_ERROR
, "%s: lookup completion status: %d, err: %d.\n", dnet_dump_id(&cmd
->id
), cmd
->status
, err
);
1272 int dnet_lookup(struct dnet_node
*n
, const char *file
)
1274 int err
, error
= 0, i
;
1275 struct dnet_wait
*w
;
1278 w
= dnet_wait_alloc(0);
1284 dnet_transform(n
, file
, strlen(file
), &raw
);
1286 pthread_mutex_lock(&n
->group_lock
);
1287 for (i
=0; i
<n
->group_num
; ++i
) {
1288 raw
.group_id
= n
->groups
[i
];
1290 err
= dnet_lookup_object(n
, &raw
, 0, dnet_lookup_complete
, dnet_wait_get(w
));
1296 err
= dnet_wait_event(w
, w
->cond
== 1, &n
->wait_ts
);
1297 if (err
|| w
->status
) {
1307 pthread_mutex_unlock(&n
->group_lock
);
1316 struct dnet_addr
*dnet_state_addr(struct dnet_net_state
*st
)
1321 static int dnet_stat_complete(struct dnet_net_state
*state
, struct dnet_cmd
*cmd
, void *priv
)
1323 struct dnet_wait
*w
= priv
;
1325 struct dnet_stat
*st
;
1328 if (is_trans_destroyed(state
, cmd
)) {
1329 dnet_wakeup(w
, w
->cond
++);
1334 if (cmd
->cmd
== DNET_CMD_STAT
&& cmd
->size
== sizeof(struct dnet_stat
)) {
1335 st
= (struct dnet_stat
*)(cmd
+ 1);
1337 dnet_convert_stat(st
);
1339 la
[0] = (float)st
->la
[0] / 100.0;
1340 la
[1] = (float)st
->la
[1] / 100.0;
1341 la
[2] = (float)st
->la
[2] / 100.0;
1343 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: la: %.2f %.2f %.2f.\n",
1344 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
),
1345 la
[0], la
[1], la
[2]);
1346 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: mem: "
1347 "total: %llu kB, free: %llu kB, cache: %llu kB.\n",
1348 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
),
1349 (unsigned long long)st
->vm_total
,
1350 (unsigned long long)st
->vm_free
,
1351 (unsigned long long)st
->vm_cached
);
1352 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: fs: "
1353 "total: %llu mB, avail: %llu mB, files: %llu, fsid: %llx.\n",
1354 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
),
1355 (unsigned long long)(st
->frsize
* st
->blocks
/ 1024 / 1024),
1356 (unsigned long long)(st
->bavail
* st
->bsize
/ 1024 / 1024),
1357 (unsigned long long)st
->files
, (unsigned long long)st
->fsid
);
1359 } else if (cmd
->size
>= sizeof(struct dnet_addr_stat
) && cmd
->cmd
== DNET_CMD_STAT_COUNT
) {
1360 struct dnet_addr_stat
*as
= (struct dnet_addr_stat
*)(cmd
+ 1);
1363 dnet_convert_addr_stat(as
, 0);
1365 for (i
=0; i
<as
->num
; ++i
) {
1366 if (as
->num
> as
->cmd_num
) {
1368 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: Storage commands\n",
1369 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
));
1370 if (i
== as
->cmd_num
)
1371 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: Proxy commands\n",
1372 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
));
1373 if (i
== as
->cmd_num
* 2)
1374 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: Counters\n",
1375 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
));
1377 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: cmd: %s, count: %llu, err: %llu\n",
1378 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
),
1379 dnet_counter_string(i
, as
->cmd_num
),
1380 (unsigned long long)as
->count
[i
].count
, (unsigned long long)as
->count
[i
].err
);
1387 static int dnet_request_cmd_single(struct dnet_node
*n
, struct dnet_net_state
*st
, struct dnet_trans_control
*ctl
)
1390 return dnet_trans_alloc_send_state(st
, ctl
);
1392 return dnet_trans_alloc_send(n
, ctl
);
1395 int dnet_request_stat(struct dnet_node
*n
, struct dnet_id
*id
,
1396 unsigned int cmd
, uint64_t cflags
,
1397 int (* complete
)(struct dnet_net_state
*state
,
1398 struct dnet_cmd
*cmd
,
1402 struct dnet_trans_control ctl
;
1403 struct dnet_wait
*w
= NULL
;
1405 struct timeval start
, end
;
1408 gettimeofday(&start
, NULL
);
1411 w
= dnet_wait_alloc(0);
1417 complete
= dnet_stat_complete
;
1421 memset(&ctl
, 0, sizeof(struct dnet_trans_control
));
1424 ctl
.complete
= complete
;
1426 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| DNET_FLAGS_NOLOCK
| cflags
;
1432 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
1434 err
= dnet_request_cmd_single(n
, NULL
, &ctl
);
1437 struct dnet_net_state
*st
;
1438 struct dnet_group
*g
;
1441 pthread_mutex_lock(&n
->state_lock
);
1442 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
1443 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
1450 dnet_setup_id(&ctl
.id
, st
->idc
->group
->group_id
, st
->idc
->ids
[0].raw
.id
);
1451 dnet_request_cmd_single(n
, st
, &ctl
);
1455 pthread_mutex_unlock(&n
->state_lock
);
1459 gettimeofday(&end
, NULL
);
1460 diff
= (end
.tv_sec
- start
.tv_sec
) * 1000000 + end
.tv_usec
- start
.tv_usec
;
1461 dnet_log(n
, DNET_LOG_NOTICE
, "stat cmd: %s: %ld usecs, num: %d.\n", dnet_cmd_string(cmd
), diff
, num
);
1466 err
= dnet_wait_event(w
, w
->cond
== num
, &n
->wait_ts
);
1468 gettimeofday(&end
, NULL
);
1469 diff
= (end
.tv_sec
- start
.tv_sec
) * 1000000 + end
.tv_usec
- start
.tv_usec
;
1470 dnet_log(n
, DNET_LOG_NOTICE
, "stat cmd: %s: %ld usecs, wait_error: %d, num: %d.\n", dnet_cmd_string(cmd
), diff
, err
, num
);
1485 struct dnet_request_cmd_priv
{
1486 struct dnet_wait
*w
;
1488 int (* complete
)(struct dnet_net_state
*state
, struct dnet_cmd
*cmd
, void *priv
);
1492 static int dnet_request_cmd_complete(struct dnet_net_state
*state
, struct dnet_cmd
*cmd
, void *priv
)
1494 struct dnet_request_cmd_priv
*p
= priv
;
1495 int err
= p
->complete(state
, cmd
, p
->priv
);
1497 if (is_trans_destroyed(state
, cmd
)) {
1498 struct dnet_wait
*w
= p
->w
;
1500 dnet_wakeup(w
, w
->cond
++);
1501 if (atomic_read(&w
->refcnt
) == 1)
1509 int dnet_request_cmd(struct dnet_node
*n
, struct dnet_trans_control
*ctl
)
1512 struct dnet_request_cmd_priv
*p
;
1513 struct dnet_wait
*w
;
1514 struct dnet_net_state
*st
;
1515 struct dnet_group
*g
;
1516 struct timeval start
, end
;
1519 gettimeofday(&start
, NULL
);
1521 p
= malloc(sizeof(*p
));
1527 w
= dnet_wait_alloc(0);
1534 p
->complete
= ctl
->complete
;
1535 p
->priv
= ctl
->priv
;
1537 ctl
->complete
= dnet_request_cmd_complete
;
1540 pthread_mutex_lock(&n
->state_lock
);
1541 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
1542 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
1548 ctl
->id
.group_id
= g
->group_id
;
1550 if (!(ctl
->cflags
& DNET_FLAGS_DIRECT
))
1551 dnet_setup_id(&ctl
->id
, st
->idc
->group
->group_id
, st
->idc
->ids
[0].raw
.id
);
1552 dnet_request_cmd_single(n
, st
, ctl
);
1556 pthread_mutex_unlock(&n
->state_lock
);
1558 err
= dnet_wait_event(w
, w
->cond
== num
, &n
->wait_ts
);
1560 gettimeofday(&end
, NULL
);
1561 diff
= (end
.tv_sec
- start
.tv_sec
) * 1000000 + end
.tv_usec
- start
.tv_usec
;
1562 dnet_log(n
, DNET_LOG_NOTICE
, "request cmd: %s: %ld usecs, wait_error: %d, num: %d.\n", dnet_cmd_string(ctl
->cmd
), diff
, err
, num
);
1567 if (atomic_read(&w
->refcnt
) == 1)
1579 struct dnet_update_status_priv
{
1580 struct dnet_wait
*w
;
1581 struct dnet_node_status status
;
1585 static int dnet_update_status_complete(struct dnet_net_state
*state
, struct dnet_cmd
*cmd
, void *priv
)
1587 struct dnet_update_status_priv
*p
= priv
;
1589 if (is_trans_destroyed(state
, cmd
)) {
1590 dnet_wakeup(p
->w
, p
->w
->cond
++);
1591 dnet_wait_put(p
->w
);
1592 if (atomic_dec_and_test(&p
->refcnt
))
1596 if (cmd
->size
== sizeof(struct dnet_node_status
)) {
1597 memcpy(&p
->status
, cmd
+ 1, sizeof(struct dnet_node_status
));
1604 int dnet_update_status(struct dnet_node
*n
, struct dnet_addr
*addr
, struct dnet_id
*id
, struct dnet_node_status
*status
)
1607 struct dnet_update_status_priv
*priv
;
1608 struct dnet_trans_control ctl
;
1615 memset(&ctl
, 0, sizeof(ctl
));
1618 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
1620 struct dnet_net_state
*st
;
1622 st
= dnet_state_search_by_addr(n
, addr
);
1628 dnet_setup_id(&ctl
.id
, st
->idc
->group
->group_id
, st
->idc
->ids
[0].raw
.id
);
1632 priv
= malloc(sizeof(struct dnet_update_status_priv
));
1638 priv
->w
= dnet_wait_alloc(0);
1644 ctl
.complete
= dnet_update_status_complete
;
1646 ctl
.cmd
= DNET_CMD_STATUS
;
1647 ctl
.cflags
= DNET_FLAGS_NEED_ACK
;
1648 ctl
.size
= sizeof(struct dnet_node_status
);
1651 dnet_wait_get(priv
->w
);
1652 dnet_request_cmd_single(n
, NULL
, &ctl
);
1654 err
= dnet_wait_event(priv
->w
, priv
->w
->cond
== 1, &n
->wait_ts
);
1655 dnet_wait_put(priv
->w
);
1657 memcpy(status
, &priv
->status
, sizeof(struct dnet_node_status
));
1659 if (atomic_dec_and_test(&priv
->refcnt
))
1666 static int dnet_remove_object_raw(struct dnet_node
*n
, struct dnet_id
*id
,
1667 int (* complete
)(struct dnet_net_state
*state
,
1668 struct dnet_cmd
*cmd
,
1670 void *priv
, uint64_t cflags
, uint64_t ioflags
)
1672 struct dnet_io_control ctl
;
1674 memset(&ctl
, 0, sizeof(struct dnet_io_control
));
1676 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
1678 memcpy(&ctl
.io
.id
, id
->id
, DNET_ID_SIZE
);
1679 memcpy(&ctl
.io
.parent
, id
->id
, DNET_ID_SIZE
);
1680 ctl
.io
.flags
= ioflags
;
1684 ctl
.cmd
= DNET_CMD_DEL
;
1685 ctl
.complete
= complete
;
1687 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
1689 return dnet_trans_create_send_all(n
, &ctl
);
1692 static int dnet_remove_complete(struct dnet_net_state
*state
,
1693 struct dnet_cmd
*cmd
,
1696 struct dnet_wait
*w
= priv
;
1698 if (is_trans_destroyed(state
, cmd
)) {
1699 dnet_wakeup(w
, w
->cond
++);
1705 w
->status
= cmd
->status
;
1709 int dnet_remove_object(struct dnet_node
*n
, struct dnet_id
*id
,
1710 int (* complete
)(struct dnet_net_state
*state
,
1711 struct dnet_cmd
*cmd
,
1714 uint64_t cflags
, uint64_t ioflags
)
1716 struct dnet_wait
*w
= NULL
;
1720 w
= dnet_wait_alloc(0);
1726 complete
= dnet_remove_complete
;
1731 err
= dnet_remove_object_raw(n
, id
, complete
, priv
, cflags
, ioflags
);
1736 err
= dnet_wait_event(w
, w
->cond
!= err
, &n
->wait_ts
);
1751 static int dnet_remove_file_raw(struct dnet_node
*n
, struct dnet_id
*id
, uint64_t cflags
, uint64_t ioflags
)
1753 struct dnet_wait
*w
;
1756 w
= dnet_wait_alloc(0);
1762 atomic_add(&w
->refcnt
, 1024);
1763 err
= dnet_remove_object_raw(n
, id
, dnet_remove_complete
, w
, cflags
, ioflags
);
1765 atomic_sub(&w
->refcnt
, 1024);
1770 atomic_sub(&w
->refcnt
, 1024 - num
);
1772 err
= dnet_wait_event(w
, w
->cond
== num
, &n
->wait_ts
);
1786 int dnet_remove_object_now(struct dnet_node
*n
, struct dnet_id
*id
, uint64_t cflags
, uint64_t ioflags
)
1788 return dnet_remove_file_raw(n
, id
, cflags
| DNET_FLAGS_NEED_ACK
| DNET_ATTR_DELETE_HISTORY
, ioflags
);
1791 int dnet_remove_file(struct dnet_node
*n
, char *remote
, int remote_len
, struct dnet_id
*id
, uint64_t cflags
, uint64_t ioflags
)
1796 dnet_transform(n
, remote
, remote_len
, &raw
);
1801 return dnet_remove_file_raw(n
, id
, cflags
, ioflags
);
1804 int dnet_request_ids(struct dnet_node
*n
, struct dnet_id
*id
, uint64_t cflags
,
1805 int (* complete
)(struct dnet_net_state
*state
,
1806 struct dnet_cmd
*cmd
,
1810 struct dnet_trans_control ctl
;
1812 dnet_log_raw(n
, DNET_LOG_ERROR
, "Temporarily unsupported operation.\n");
1815 memset(&ctl
, 0, sizeof(struct dnet_trans_control
));
1817 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
1818 ctl
.cmd
= DNET_CMD_LIST
;
1819 ctl
.complete
= complete
;
1821 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
1823 return dnet_trans_alloc_send(n
, &ctl
);
1826 struct dnet_node
*dnet_get_node_from_state(void *state
)
1828 struct dnet_net_state
*st
= state
;
1835 struct dnet_read_data_completion
{
1836 struct dnet_wait
*w
;
1842 static int dnet_read_data_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
1844 struct dnet_read_data_completion
*c
= priv
;
1845 struct dnet_wait
*w
= c
->w
;
1848 if (is_trans_destroyed(st
, cmd
)) {
1849 dnet_wakeup(w
, w
->cond
++);
1851 if (atomic_dec_and_test(&c
->refcnt
))
1860 if (cmd
->size
>= sizeof(struct dnet_io_attr
)) {
1861 struct dnet_io_attr
*io
= (struct dnet_io_attr
*)(cmd
+ 1);
1862 uint64_t sz
= c
->size
;
1864 dnet_convert_io_attr(io
);
1866 sz
+= io
->size
+ sizeof(struct dnet_io_attr
);
1867 c
->data
= realloc(c
->data
, sz
);
1873 memcpy(c
->data
+ c
->size
, io
, sizeof(struct dnet_io_attr
) + io
->size
);
1878 dnet_log(st
->n
, DNET_LOG_NOTICE
, "%s: object read completed: trans: %llu, status: %d, err: %d.\n",
1879 dnet_dump_id(&cmd
->id
), (unsigned long long)(cmd
->trans
& ~DNET_TRANS_REPLY
),
1885 void *dnet_read_data_wait_raw(struct dnet_node
*n
, struct dnet_id
*id
, struct dnet_io_attr
*io
,
1886 int cmd
, uint64_t cflags
, int *errp
)
1888 struct dnet_io_control ctl
;
1889 struct dnet_wait
*w
;
1890 struct dnet_read_data_completion
*c
;
1894 w
= dnet_wait_alloc(0);
1900 c
= malloc(sizeof(*c
));
1909 /* one for completion callback, another for this function */
1910 atomic_init(&c
->refcnt
, 2);
1912 memset(&ctl
, 0, sizeof(struct dnet_io_control
));
1917 ctl
.complete
= dnet_read_data_complete
;
1920 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
1922 memcpy(&ctl
.io
, io
, sizeof(struct dnet_io_attr
));
1923 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
1925 ctl
.id
.type
= io
->type
;
1928 err
= dnet_read_object(n
, &ctl
);
1930 goto err_out_put_complete
;
1932 err
= dnet_wait_event(w
, w
->cond
, &n
->wait_ts
);
1933 if (err
|| w
->status
) {
1934 char id_str
[2*DNET_ID_SIZE
+ 1];
1937 if ((cmd
!= DNET_CMD_READ_RANGE
) || (err
!= -ENOENT
))
1938 dnet_log(n
, DNET_LOG_ERROR
, "%d:%s : failed to read data: %d\n",
1939 ctl
.id
.group_id
, dnet_dump_id_len_raw(ctl
.id
.id
, DNET_ID_SIZE
, id_str
), err
);
1940 goto err_out_put_complete
;
1946 err_out_put_complete
:
1947 if (atomic_dec_and_test(&c
->refcnt
))
1956 void *dnet_read_data_wait_groups(struct dnet_node
*n
, struct dnet_id
*id
, int *groups
, int num
,
1957 struct dnet_io_attr
*io
, uint64_t cflags
, int *errp
)
1962 for (i
= 0; i
< num
; ++i
) {
1963 id
->group_id
= groups
[i
];
1965 data
= dnet_read_data_wait_raw(n
, id
, io
, DNET_CMD_READ
, cflags
, errp
);
1975 void *dnet_read_data_wait(struct dnet_node
*n
, struct dnet_id
*id
, struct dnet_io_attr
*io
,
1976 uint64_t cflags
, int *errp
)
1981 num
= dnet_mix_states(n
, id
, &g
);
1987 data
= dnet_read_data_wait_groups(n
, id
, g
, num
, io
, cflags
, &err
);
1998 int dnet_write_data_wait(struct dnet_node
*n
, struct dnet_io_control
*ctl
, void **result
)
2000 int err
, trans_num
= 0;
2001 struct dnet_wait
*w
;
2002 struct dnet_write_completion
*wc
;
2004 wc
= malloc(sizeof(struct dnet_write_completion
));
2009 memset(wc
, 0, sizeof(struct dnet_write_completion
));
2011 w
= dnet_wait_alloc(0);
2019 w
->status
= -ENOENT
;
2021 ctl
->complete
= dnet_write_complete
;
2023 ctl
->cmd
= DNET_CMD_WRITE
;
2024 ctl
->cflags
|= DNET_FLAGS_NEED_ACK
;
2026 memcpy(ctl
->io
.id
, ctl
->id
.id
, DNET_ID_SIZE
);
2027 memcpy(ctl
->io
.parent
, ctl
->id
.id
, DNET_ID_SIZE
);
2029 atomic_set(&w
->refcnt
, INT_MAX
);
2030 trans_num
= dnet_write_object(n
, ctl
);
2035 * 1 - the first reference counter we grabbed at allocation time
2037 atomic_sub(&w
->refcnt
, INT_MAX
- trans_num
- 1);
2039 err
= dnet_wait_event(w
, w
->cond
== trans_num
, &n
->wait_ts
);
2040 if (err
|| w
->status
) {
2043 dnet_log(n
, DNET_LOG_NOTICE
, "%s: failed to wait for IO write completion, err: %d, status: %d.\n",
2044 dnet_dump_id(&ctl
->id
), err
, w
->status
);
2047 if (err
|| !trans_num
) {
2050 dnet_log(n
, DNET_LOG_ERROR
, "Failed to write data into the storage, err: %d, trans_num: %d.\n", err
, trans_num
);
2055 dnet_log(n
, DNET_LOG_NOTICE
, "%s: wrote: %llu bytes, type: %d, reply size: %d.\n",
2056 dnet_dump_id(&ctl
->id
), (unsigned long long)ctl
->io
.size
, ctl
->io
.type
, wc
->size
);
2059 *result
= wc
->reply
;
2065 dnet_write_complete_free(wc
);
2070 int dnet_lookup_addr(struct dnet_node
*n
, const void *remote
, int len
, struct dnet_id
*id
, int group_id
, char *dst
, int dlen
)
2073 struct dnet_net_state
*st
;
2077 dnet_transform(n
, remote
, len
, &raw
);
2080 id
->group_id
= group_id
;
2082 st
= dnet_state_get_first(n
, id
);
2086 dnet_server_convert_dnet_addr_raw(dnet_state_addr(st
), dst
, dlen
);
2094 struct dnet_weight
{
2099 static int dnet_weight_compare(const void *v1
, const void *v2
)
2101 const struct dnet_weight
*w1
= v1
;
2102 const struct dnet_weight
*w2
= v2
;
2104 return w2
->weight
- w1
->weight
;
2107 static int dnet_weight_get_winner(struct dnet_weight
*w
, int num
)
2113 for (i
= 0; i
< num
; ++i
)
2116 r
= (float)rand() / (float)RAND_MAX
;
2119 for (i
= 0; i
< num
; ++i
) {
2128 int dnet_mix_states(struct dnet_node
*n
, struct dnet_id
*id
, int **groupsp
)
2130 struct dnet_weight
*weights
;
2132 int group_num
, i
, num
;
2133 struct dnet_net_state
*st
;
2138 pthread_mutex_lock(&n
->group_lock
);
2139 group_num
= n
->group_num
;
2141 weights
= alloca(n
->group_num
* sizeof(*weights
));
2142 groups
= malloc(n
->group_num
* sizeof(*groups
));
2144 memcpy(groups
, n
->groups
, n
->group_num
* sizeof(*groups
));
2145 pthread_mutex_unlock(&n
->group_lock
);
2152 if (n
->flags
& DNET_CFG_RANDOMIZE_STATES
) {
2153 for (i
= 0; i
< group_num
; ++i
) {
2154 weights
[i
].weight
= rand();
2155 weights
[i
].group_id
= groups
[i
];
2159 if (!(n
->flags
& DNET_CFG_MIX_STATES
)) {
2164 memset(weights
, 0, group_num
* sizeof(*weights
));
2166 for (i
= 0, num
= 0; i
< group_num
; ++i
) {
2167 id
->group_id
= groups
[i
];
2169 st
= dnet_state_get_first(n
, id
);
2171 weights
[num
].weight
= (int)st
->weight
;
2172 weights
[num
].group_id
= id
->group_id
;
2183 qsort(weights
, group_num
, sizeof(struct dnet_weight
), dnet_weight_compare
);
2185 for (i
= 0; i
< group_num
; ++i
) {
2186 int pos
= dnet_weight_get_winner(weights
, group_num
- i
);
2187 groups
[i
] = weights
[pos
].group_id
;
2189 if (pos
< group_num
- 1)
2190 memmove(&weights
[pos
], &weights
[pos
+ 1], (group_num
- 1 - pos
) * sizeof(struct dnet_weight
));
2194 dnet_node_set_groups(n
, groups
, group_num
);
2200 int dnet_data_map(struct dnet_map_fd
*map
)
2203 long page_size
= sysconf(_SC_PAGE_SIZE
);
2206 off
= map
->offset
& ~(page_size
- 1);
2207 map
->mapped_size
= ALIGN(map
->size
+ map
->offset
- off
, page_size
);
2209 map
->mapped_data
= mmap(NULL
, map
->mapped_size
, PROT_READ
, MAP_SHARED
, map
->fd
, off
);
2210 if (map
->mapped_data
== MAP_FAILED
) {
2215 map
->data
= map
->mapped_data
+ map
->offset
- off
;
2221 void dnet_data_unmap(struct dnet_map_fd
*map
)
2223 munmap(map
->mapped_data
, map
->mapped_size
);
2226 struct dnet_io_attr
*dnet_remove_range(struct dnet_node
*n
, struct dnet_io_attr
*io
, int group_id
, uint64_t cflags
, int *ret_num
, int *errp
)
2229 struct dnet_io_attr
*ret
, *new_ret
;
2230 struct dnet_raw_id start
, next
;
2231 struct dnet_raw_id end
;
2232 uint64_t size
= io
->size
;
2234 int err
, need_exit
= 0;
2236 memcpy(end
.id
, io
->parent
, DNET_ID_SIZE
);
2238 dnet_setup_id(&id
, group_id
, io
->id
);
2243 while (!need_exit
) {
2244 err
= dnet_search_range(n
, &id
, &start
, &next
);
2248 if ((dnet_id_cmp_str(id
.id
, next
.id
) > 0) ||
2249 !memcmp(start
.id
, next
.id
, DNET_ID_SIZE
) ||
2250 (dnet_id_cmp_str(next
.id
, end
.id
) > 0)) {
2251 memcpy(next
.id
, end
.id
, DNET_ID_SIZE
);
2255 if (n
->log
->log_level
> DNET_LOG_NOTICE
) {
2257 char start_id
[2*len
+ 1];
2258 char next_id
[2*len
+ 1];
2259 char end_id
[2*len
+ 1];
2260 char id_str
[2*len
+ 1];
2262 dnet_log(n
, DNET_LOG_NOTICE
, "id: %s, start: %s: next: %s, end: %s, size: %llu, cmp: %d\n",
2263 dnet_dump_id_len_raw(id
.id
, len
, id_str
),
2264 dnet_dump_id_len_raw(start
.id
, len
, start_id
),
2265 dnet_dump_id_len_raw(next
.id
, len
, next_id
),
2266 dnet_dump_id_len_raw(end
.id
, len
, end_id
),
2267 (unsigned long long)size
, dnet_id_cmp_str(next
.id
, end
.id
));
2270 memcpy(io
->id
, id
.id
, DNET_ID_SIZE
);
2271 memcpy(io
->parent
, next
.id
, DNET_ID_SIZE
);
2275 data
= dnet_read_data_wait_raw(n
, &id
, io
, DNET_CMD_DEL_RANGE
, cflags
, &err
);
2276 if (io
->size
!= sizeof(struct dnet_io_attr
)) {
2282 struct dnet_io_attr
*rep
= (struct dnet_io_attr
*)data
;
2284 dnet_convert_io_attr(rep
);
2286 dnet_log(n
, DNET_LOG_NOTICE
, "%s: rep_num: %llu, io_start: %llu, io_num: %llu, io_size: %llu\n",
2287 dnet_dump_id(&id
), (unsigned long long)rep
->num
, (unsigned long long)io
->start
,
2288 (unsigned long long)io
->num
, (unsigned long long)io
->size
);
2292 new_ret
= realloc(ret
, *ret_num
* sizeof(struct dnet_io_attr
));
2299 ret
[*ret_num
- 1] = *rep
;
2304 memcpy(id
.id
, next
.id
, DNET_ID_SIZE
);
2313 struct dnet_range_data
*dnet_read_range(struct dnet_node
*n
, struct dnet_io_attr
*io
, int group_id
, uint64_t cflags
, int *errp
)
2317 struct dnet_range_data
*ret
;
2318 struct dnet_raw_id start
, next
;
2319 struct dnet_raw_id end
;
2320 uint64_t size
= io
->size
;
2322 int err
, need_exit
= 0;
2324 memcpy(end
.id
, io
->parent
, DNET_ID_SIZE
);
2326 dnet_setup_id(&id
, group_id
, io
->id
);
2331 while (!need_exit
) {
2332 err
= dnet_search_range(n
, &id
, &start
, &next
);
2336 if ((dnet_id_cmp_str(id
.id
, next
.id
) > 0) ||
2337 !memcmp(start
.id
, next
.id
, DNET_ID_SIZE
) ||
2338 (dnet_id_cmp_str(next
.id
, end
.id
) > 0)) {
2339 memcpy(next
.id
, end
.id
, DNET_ID_SIZE
);
2343 if (n
->log
->log_level
> DNET_LOG_NOTICE
) {
2345 char start_id
[2*len
+ 1];
2346 char next_id
[2*len
+ 1];
2347 char end_id
[2*len
+ 1];
2348 char id_str
[2*len
+ 1];
2350 dnet_log(n
, DNET_LOG_NOTICE
, "id: %s, start: %s: next: %s, end: %s, size: %llu, cmp: %d\n",
2351 dnet_dump_id_len_raw(id
.id
, len
, id_str
),
2352 dnet_dump_id_len_raw(start
.id
, len
, start_id
),
2353 dnet_dump_id_len_raw(next
.id
, len
, next_id
),
2354 dnet_dump_id_len_raw(end
.id
, len
, end_id
),
2355 (unsigned long long)size
, dnet_id_cmp_str(next
.id
, end
.id
));
2358 memcpy(io
->id
, id
.id
, DNET_ID_SIZE
);
2359 memcpy(io
->parent
, next
.id
, DNET_ID_SIZE
);
2363 data
= dnet_read_data_wait_raw(n
, &id
, io
, DNET_CMD_READ_RANGE
, cflags
, &err
);
2365 struct dnet_io_attr
*rep
= data
+ io
->size
- sizeof(struct dnet_io_attr
);
2367 /* If DNET_IO_FLAGS_NODATA is set do not decrement size as 'rep' is the only structure in output */
2368 if (!(io
->flags
& DNET_IO_FLAGS_NODATA
))
2369 io
->size
-= sizeof(struct dnet_io_attr
);
2370 dnet_convert_io_attr(rep
);
2372 dnet_log(n
, DNET_LOG_NOTICE
, "%s: rep_num: %llu, io_start: %llu, io_num: %llu, io_size: %llu\n",
2373 dnet_dump_id(&id
), (unsigned long long)rep
->num
, (unsigned long long)io
->start
,
2374 (unsigned long long)io
->num
, (unsigned long long)io
->size
);
2376 if (io
->start
< rep
->num
) {
2377 rep
->num
-= io
->start
;
2379 io
->num
-= rep
->num
;
2381 if (!io
->size
&& !(io
->flags
& DNET_IO_FLAGS_NODATA
)) {
2384 struct dnet_range_data
*new_ret
;
2388 new_ret
= realloc(ret
, ret_num
* sizeof(struct dnet_range_data
));
2395 ret
[ret_num
- 1].data
= data
;
2396 ret
[ret_num
- 1].size
= io
->size
;
2403 io
->start
-= rep
->num
;
2407 memcpy(id
.id
, next
.id
, DNET_ID_SIZE
);
2419 struct dnet_read_latest_id
{
2421 struct dnet_file_info fi
;
2424 struct dnet_read_latest_ctl
{
2425 struct dnet_wait
*w
;
2427 pthread_mutex_t lock
;
2429 struct dnet_read_latest_id ids
[0];
2432 static void dnet_read_latest_ctl_put(struct dnet_read_latest_ctl
*ctl
)
2434 dnet_wakeup(ctl
->w
, ctl
->w
->cond
++);
2435 if (atomic_dec_and_test(&ctl
->w
->refcnt
)) {
2436 dnet_wait_destroy(ctl
->w
);
2437 pthread_mutex_destroy(&ctl
->lock
);
2442 static int dnet_read_latest_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
2444 struct dnet_read_latest_ctl
*ctl
= priv
;
2445 struct dnet_node
*n
;
2446 struct dnet_addr_attr
*a
;
2447 struct dnet_file_info
*fi
;
2450 if (is_trans_destroyed(st
, cmd
)) {
2451 dnet_read_latest_ctl_put(ctl
);
2458 if (err
|| !cmd
->size
)
2461 if (cmd
->size
< sizeof(struct dnet_addr_attr
) + sizeof(struct dnet_file_info
)) {
2462 dnet_log(n
, DNET_LOG_ERROR
, "%s: wrong dnet_addr attribute size %llu, must be at least %zu.\n",
2463 dnet_dump_id(&cmd
->id
), (unsigned long long)cmd
->size
,
2464 sizeof(struct dnet_addr_attr
) + sizeof(struct dnet_file_info
));
2468 a
= (struct dnet_addr_attr
*)(cmd
+ 1);
2469 fi
= (struct dnet_file_info
*)(a
+ 1);
2471 dnet_convert_addr_attr(a
);
2472 dnet_convert_file_info(fi
);
2474 pthread_mutex_lock(&ctl
->lock
);
2476 pthread_mutex_unlock(&ctl
->lock
);
2478 /* we do not care about filename */
2479 memcpy(&ctl
->ids
[pos
].fi
, fi
, sizeof(struct dnet_file_info
));
2480 memcpy(&ctl
->ids
[pos
].id
, &cmd
->id
, sizeof(struct dnet_id
));
2486 static int dnet_file_read_latest_cmp(const void *p1
, const void *p2
)
2488 const struct dnet_read_latest_id
*id1
= p1
;
2489 const struct dnet_read_latest_id
*id2
= p2
;
2491 int ret
= (int)(id2
->fi
.mtime
.tsec
- id1
->fi
.mtime
.tsec
);
2494 ret
= (int)(id2
->fi
.mtime
.tnsec
- id1
->fi
.mtime
.tnsec
);
2499 int dnet_read_latest_prepare(struct dnet_read_latest_prepare
*pr
)
2501 struct dnet_read_latest_ctl
*ctl
;
2502 int group_id
= pr
->id
.group_id
;
2505 ctl
= malloc(sizeof(struct dnet_read_latest_ctl
) + sizeof(struct dnet_read_latest_id
) * pr
->group_num
);
2510 memset(ctl
, 0, sizeof(struct dnet_read_latest_ctl
));
2512 ctl
->w
= dnet_wait_alloc(0);
2518 err
= pthread_mutex_init(&ctl
->lock
, NULL
);
2520 goto err_out_put_wait
;
2522 ctl
->num
= pr
->group_num
;
2525 for (i
= 0; i
< pr
->group_num
; ++i
) {
2526 pr
->id
.group_id
= pr
->group
[i
];
2528 dnet_wait_get(ctl
->w
);
2529 dnet_lookup_object(pr
->n
, &pr
->id
, DNET_ATTR_META_TIMES
| pr
->cflags
, dnet_read_latest_complete
, ctl
);
2532 err
= dnet_wait_event(ctl
->w
, ctl
->w
->cond
== pr
->group_num
, &pr
->n
->wait_ts
);
2539 pr
->group_num
= ctl
->pos
;
2541 qsort(ctl
->ids
, pr
->group_num
, sizeof(struct dnet_read_latest_id
), dnet_file_read_latest_cmp
);
2543 for (i
= 0; i
< pr
->group_num
; ++i
) {
2544 pr
->group
[i
] = ctl
->ids
[i
].id
.group_id
;
2546 if (group_id
== pr
->group
[i
]) {
2547 const struct dnet_read_latest_id
*id0
= &ctl
->ids
[0];
2548 const struct dnet_read_latest_id
*id1
= &ctl
->ids
[i
];
2550 if (!dnet_file_read_latest_cmp(id0
, id1
)) {
2551 int tmp_group
= pr
->group
[0];
2552 pr
->group
[0] = pr
->group
[i
];
2553 pr
->group
[i
] = tmp_group
;
2559 dnet_read_latest_ctl_put(ctl
);
2563 dnet_wait_put(ctl
->w
);
2570 int dnet_read_latest(struct dnet_node
*n
, struct dnet_id
*id
, struct dnet_io_attr
*io
, uint64_t cflags
, void **datap
)
2572 struct dnet_read_latest_prepare pr
;
2573 int *g
, num
, err
, i
;
2575 if ((int)io
->num
> n
->group_num
) {
2580 err
= dnet_mix_states(n
, id
, &g
);
2586 if ((int)io
->num
> num
) {
2591 memset(&pr
, 0, sizeof(struct dnet_read_latest_prepare
));
2599 err
= dnet_read_latest_prepare(&pr
);
2604 for (i
= 0; i
< pr
.group_num
; ++i
) {
2607 id
->group_id
= pr
.group
[i
];
2608 data
= dnet_read_data_wait_raw(n
, id
, io
, DNET_CMD_READ
, cflags
, &err
);
2622 int dnet_get_routes(struct dnet_node
*n
, struct dnet_id
**ids
, struct dnet_addr
**addrs
) {
2624 struct dnet_net_state
*st
;
2625 struct dnet_group
*g
;
2626 struct dnet_addr
*tmp_addrs
;
2627 struct dnet_id
*tmp_ids
;
2628 int size
= 0, count
= 0;
2634 pthread_mutex_lock(&n
->state_lock
);
2635 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
2636 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
2638 size
+= st
->idc
->id_num
;
2640 tmp_ids
= (struct dnet_id
*)realloc(*ids
, size
* sizeof(struct dnet_id
));
2647 tmp_addrs
= (struct dnet_addr
*)realloc(*addrs
, size
* sizeof(struct dnet_addr
));
2654 for (i
= 0; i
< st
->idc
->id_num
; ++i
) {
2655 dnet_setup_id(&(*ids
)[count
], g
->group_id
, st
->idc
->ids
[i
].raw
.id
);
2656 memcpy(&(*addrs
)[count
], dnet_state_addr(st
), sizeof(struct dnet_addr
));
2658 //fprintf(stderr, "%d: %s -> %s\n", g->group_id, dnet_dump_id_str(st->idc->ids[i].raw.id), dnet_state_dump_addr(st));
2662 pthread_mutex_unlock(&n
->state_lock
);
2676 void *dnet_bulk_read_wait_raw(struct dnet_node
*n
, struct dnet_id
*id
, struct dnet_io_attr
*ios
,
2677 uint32_t io_num
, int cmd
, uint64_t cflags
, int *errp
)
2679 struct dnet_io_control ctl
;
2680 struct dnet_io_attr io
;
2681 struct dnet_wait
*w
;
2682 struct dnet_read_data_completion
*c
;
2686 w
= dnet_wait_alloc(0);
2692 c
= malloc(sizeof(*c
));
2701 /* one for completion callback, another for this function */
2702 atomic_init(&c
->refcnt
, 2);
2704 memset(&ctl
, 0, sizeof(struct dnet_io_control
));
2709 ctl
.complete
= dnet_read_data_complete
;
2712 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
2714 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
2715 memset(&ctl
.io
, 0, sizeof(struct dnet_io_attr
));
2717 memcpy(io
.id
, id
->id
, DNET_ID_SIZE
);
2718 memcpy(io
.parent
, id
->id
, DNET_ID_SIZE
);
2720 ctl
.io
.size
= io_num
* sizeof(struct dnet_io_attr
);
2724 err
= dnet_read_object(n
, &ctl
);
2726 goto err_out_put_complete
;
2728 err
= dnet_wait_event(w
, w
->cond
, &n
->wait_ts
);
2729 if (err
|| w
->status
) {
2730 char id_str
[2*DNET_ID_SIZE
+ 1];
2733 if ((cmd
!= DNET_CMD_READ_RANGE
) || (err
!= -ENOENT
))
2734 dnet_log(n
, DNET_LOG_ERROR
, "%d:%s : failed to read data: %d\n",
2735 ctl
.id
.group_id
, dnet_dump_id_len_raw(ctl
.id
.id
, DNET_ID_SIZE
, id_str
), err
);
2736 goto err_out_put_complete
;
2741 err_out_put_complete
:
2742 if (atomic_dec_and_test(&c
->refcnt
))
2752 static int dnet_io_attr_cmp(const void *d1
, const void *d2
)
2754 const struct dnet_io_attr
*io1
= d1
;
2755 const struct dnet_io_attr
*io2
= d2
;
2757 return memcmp(io1
->id
, io2
->id
, DNET_ID_SIZE
);
2760 struct dnet_range_data
*dnet_bulk_read(struct dnet_node
*n
, struct dnet_io_attr
*ios
, uint32_t io_num
, int group_id
, uint64_t cflags
, int *errp
)
2762 struct dnet_id id
, next_id
;
2764 struct dnet_range_data
*ret
;
2765 struct dnet_net_state
*cur
, *next
= NULL
;
2769 uint32_t i
, start
= -1;
2775 qsort(ios
, io_num
, sizeof(struct dnet_io_attr
), dnet_io_attr_cmp
);
2781 dnet_setup_id(&id
, group_id
, ios
[0].id
);
2782 id
.type
= ios
[0].type
;
2784 cur
= dnet_state_get_first(n
, &id
);
2786 dnet_log(n
, DNET_LOG_ERROR
, "%s: Can't get state for id\n", dnet_dump_id(&id
));
2791 for (i
= 0; i
< io_num
; ++i
) {
2792 if ((i
+ 1) < io_num
) {
2793 dnet_setup_id(&next_id
, group_id
, ios
[i
+1].id
);
2794 next_id
.type
= ios
[i
+1].type
;
2796 next
= dnet_state_get_first(n
, &next_id
);
2798 dnet_log(n
, DNET_LOG_ERROR
, "%s: Can't get state for id\n", dnet_dump_id(&next_id
));
2803 /* Send command only if state changes or it's a last id */
2804 if ((cur
== next
)) {
2805 dnet_state_put(next
);
2811 dnet_log(n
, DNET_LOG_NOTICE
, "start: %s: end: %s, count: %llu, addr: %s\n",
2813 dnet_dump_id(&next_id
),
2814 (unsigned long long)(i
- start
),
2815 dnet_state_dump_addr(cur
));
2817 data
= dnet_bulk_read_wait_raw(n
, &id
, ios
, i
- start
, DNET_CMD_BULK_READ
, cflags
, &err
);
2825 struct dnet_range_data
*new_ret
;
2829 new_ret
= realloc(ret
, ret_num
* sizeof(struct dnet_range_data
));
2836 ret
[ret_num
- 1].data
= data
;
2837 ret
[ret_num
- 1].size
= size
;
2843 dnet_state_put(cur
);
2846 memcpy(&id
, &next_id
, sizeof(struct dnet_id
));
2851 dnet_state_put(next
);
2852 dnet_state_put(cur
);
2862 struct dnet_range_data
dnet_bulk_write(struct dnet_node
*n
, struct dnet_io_control
*ctl
, int ctl_num
, int *errp
)
2864 int err
, i
, trans_num
= 0, local_trans_num
;
2865 struct dnet_wait
*w
;
2866 struct dnet_write_completion
*wc
;
2867 struct dnet_range_data ret
;
2868 struct dnet_metadata_control mcl
;
2869 struct dnet_meta_container mc
;
2870 struct dnet_io_control meta_ctl
;
2875 memset(&ret
, 0, sizeof(ret
));
2877 wc
= malloc(sizeof(struct dnet_write_completion
));
2882 memset(wc
, 0, sizeof(struct dnet_write_completion
));
2884 w
= dnet_wait_alloc(0);
2892 atomic_set(&w
->refcnt
, INT_MAX
);
2893 w
->status
= -ENOENT
;
2895 for (i
= 0; i
< ctl_num
; ++i
) {
2897 ctl
[i
].complete
= dnet_write_complete
;
2899 ctl
[i
].cmd
= DNET_CMD_WRITE
;
2900 ctl
[i
].cflags
= DNET_FLAGS_NEED_ACK
;
2902 memcpy(ctl
[i
].io
.id
, ctl
[i
].id
.id
, DNET_ID_SIZE
);
2903 memcpy(ctl
[i
].io
.parent
, ctl
[i
].id
.id
, DNET_ID_SIZE
);
2905 local_trans_num
= dnet_write_object(n
, &ctl
[i
]);
2906 if (local_trans_num
< 0)
2907 local_trans_num
= 0;
2909 trans_num
+= local_trans_num
;
2911 /* Prepare and send metadata */
2912 memset(&mcl
, 0, sizeof(mcl
));
2914 pthread_mutex_lock(&n
->group_lock
);
2915 group_num
= n
->group_num
;
2916 groups
= alloca(group_num
* sizeof(int));
2918 memcpy(groups
, n
->groups
, group_num
* sizeof(int));
2919 pthread_mutex_unlock(&n
->group_lock
);
2921 mcl
.groups
= groups
;
2922 mcl
.group_num
= group_num
;
2924 mcl
.cflags
= ctl
[i
].cflags
;
2926 gettimeofday(&tv
, NULL
);
2927 mcl
.ts
.tv_sec
= tv
.tv_sec
;
2928 mcl
.ts
.tv_nsec
= tv
.tv_usec
* 1000;
2930 memset(&mc
, 0, sizeof(mc
));
2932 err
= dnet_create_metadata(n
, &mcl
, &mc
);
2933 dnet_log(n
, DNET_LOG_DEBUG
, "Creating metadata: err: %d", err
);
2935 dnet_convert_metadata(n
, mc
.data
, mc
.size
);
2937 memset(&meta_ctl
, 0, sizeof(struct dnet_io_control
));
2940 meta_ctl
.complete
= dnet_write_complete
;
2941 meta_ctl
.cmd
= DNET_CMD_WRITE
;
2944 meta_ctl
.cflags
= ctl
[i
].cflags
;
2946 memcpy(&meta_ctl
.id
, &ctl
[i
].id
, sizeof(struct dnet_id
));
2947 memcpy(meta_ctl
.io
.id
, ctl
[i
].id
.id
, DNET_ID_SIZE
);
2948 memcpy(meta_ctl
.io
.parent
, ctl
[i
].id
.id
, DNET_ID_SIZE
);
2949 meta_ctl
.id
.type
= meta_ctl
.io
.type
= EBLOB_TYPE_META
;
2951 meta_ctl
.io
.flags
|= DNET_IO_FLAGS_META
;
2952 meta_ctl
.io
.offset
= 0;
2953 meta_ctl
.io
.size
= mc
.size
;
2954 meta_ctl
.data
= mc
.data
;
2956 local_trans_num
= dnet_write_object(n
, &meta_ctl
);
2957 if (local_trans_num
< 0)
2958 local_trans_num
= 0;
2960 trans_num
+= local_trans_num
;
2965 * 1 - the first reference counter we grabbed at allocation time
2967 atomic_sub(&w
->refcnt
, INT_MAX
- trans_num
- 1);
2969 err
= dnet_wait_event(w
, w
->cond
== trans_num
, &n
->wait_ts
);
2970 if (err
|| w
->status
) {
2973 dnet_log(n
, DNET_LOG_NOTICE
, "%s: failed to wait for IO write completion, err: %d, status: %d.\n",
2974 dnet_dump_id(&ctl
->id
), err
, w
->status
);
2977 if (err
|| !trans_num
) {
2980 dnet_log(n
, DNET_LOG_ERROR
, "Failed to write data into the storage, err: %d, trans_num: %d.\n", err
, trans_num
);
2985 dnet_log(n
, DNET_LOG_NOTICE
, "%s: successfully wrote %llu bytes into the storage, reply size: %d.\n",
2986 dnet_dump_id(&ctl
->id
), (unsigned long long)ctl
->io
.size
, wc
->size
);
2989 ret
.data
= wc
->reply
;
2990 ret
.size
= wc
->size
;
2995 dnet_write_complete_free(wc
);
3001 int dnet_flags(struct dnet_node
*n
)
3006 static int dnet_start_defrag_complete(struct dnet_net_state
*state
, struct dnet_cmd
*cmd
, void *priv
)
3008 struct dnet_wait
*w
= priv
;
3010 if (is_trans_destroyed(state
, cmd
)) {
3011 dnet_wakeup(w
, w
->cond
++);
3019 static int dnet_start_defrag_single(struct dnet_net_state
*st
, void *priv
, uint64_t cflags
)
3021 struct dnet_trans_control ctl
;
3023 memset(&ctl
, 0, sizeof(struct dnet_trans_control
));
3025 dnet_setup_id(&ctl
.id
, st
->idc
->group
->group_id
, st
->idc
->ids
[0].raw
.id
);
3026 ctl
.cmd
= DNET_CMD_DEFRAG
;
3027 ctl
.complete
= dnet_start_defrag_complete
;
3029 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
3031 return dnet_trans_alloc_send_state(st
, &ctl
);
3034 int dnet_start_defrag(struct dnet_node
*n
, uint64_t cflags
)
3036 struct dnet_net_state
*st
;
3037 struct dnet_wait
*w
;
3038 struct dnet_group
*g
;
3042 w
= dnet_wait_alloc(0);
3048 pthread_mutex_lock(&n
->state_lock
);
3049 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
3050 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
3057 dnet_start_defrag_single(st
, w
, cflags
);
3061 pthread_mutex_unlock(&n
->state_lock
);
3063 err
= dnet_wait_event(w
, w
->cond
== num
, &n
->wait_ts
);