2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
18 #include <sys/socket.h>
29 #include "elliptics.h"
31 #include "elliptics/packet.h"
32 #include "elliptics/interface.h"
35 int dnet_transform(struct dnet_node
*n
, const void *src
, uint64_t size
, struct dnet_id
*id
)
37 struct dnet_transform
*t
= &n
->transform
;
38 unsigned int csize
= sizeof(id
->id
);
40 return t
->transform(t
->priv
, src
, size
, id
->id
, &csize
, 0);
44 static char *dnet_cmd_strings
[] = {
45 [DNET_CMD_LOOKUP
] = "LOOKUP",
46 [DNET_CMD_REVERSE_LOOKUP
] = "REVERSE_LOOKUP",
47 [DNET_CMD_JOIN
] = "JOIN",
48 [DNET_CMD_WRITE
] = "WRITE",
49 [DNET_CMD_READ
] = "READ",
50 [DNET_CMD_LIST
] = "CHECK",
51 [DNET_CMD_EXEC
] = "EXEC",
52 [DNET_CMD_ROUTE_LIST
] = "ROUTE_LIST",
53 [DNET_CMD_STAT
] = "STAT",
54 [DNET_CMD_NOTIFY
] = "NOTIFY",
55 [DNET_CMD_DEL
] = "REMOVE",
56 [DNET_CMD_STAT_COUNT
] = "STAT_COUNT",
57 [DNET_CMD_STATUS
] = "STATUS",
58 [DNET_CMD_READ_RANGE
] = "READ_RANGE",
59 [DNET_CMD_DEL_RANGE
] = "DEL_RANGE",
60 [DNET_CMD_AUTH
] = "AUTH",
61 [DNET_CMD_BULK_READ
] = "BULK_READ",
62 [DNET_CMD_UNKNOWN
] = "UNKNOWN",
65 static char *dnet_counter_strings
[] = {
66 [DNET_CNTR_LA1
] = "DNET_CNTR_LA1",
67 [DNET_CNTR_LA5
] = "DNET_CNTR_LA5",
68 [DNET_CNTR_LA15
] = "DNET_CNTR_LA15",
69 [DNET_CNTR_BSIZE
] = "DNET_CNTR_BSIZE",
70 [DNET_CNTR_FRSIZE
] = "DNET_CNTR_FRSIZE",
71 [DNET_CNTR_BLOCKS
] = "DNET_CNTR_BLOCKS",
72 [DNET_CNTR_BFREE
] = "DNET_CNTR_BFREE",
73 [DNET_CNTR_BAVAIL
] = "DNET_CNTR_BAVAIL",
74 [DNET_CNTR_FILES
] = "DNET_CNTR_FILES",
75 [DNET_CNTR_FFREE
] = "DNET_CNTR_FFREE",
76 [DNET_CNTR_FAVAIL
] = "DNET_CNTR_FAVAIL",
77 [DNET_CNTR_FSID
] = "DNET_CNTR_FSID",
78 [DNET_CNTR_VM_ACTIVE
] = "DNET_CNTR_VM_ACTIVE",
79 [DNET_CNTR_VM_INACTIVE
] = "DNET_CNTR_VM_INACTIVE",
80 [DNET_CNTR_VM_TOTAL
] = "DNET_CNTR_VM_TOTAL",
81 [DNET_CNTR_VM_FREE
] = "DNET_CNTR_VM_FREE",
82 [DNET_CNTR_VM_CACHED
] = "DNET_CNTR_VM_CACHED",
83 [DNET_CNTR_VM_BUFFERS
] = "DNET_CNTR_VM_BUFFERS",
84 [DNET_CNTR_NODE_FILES
] = "DNET_CNTR_NODE_FILES",
85 [DNET_CNTR_NODE_LAST_MERGE
] = "DNET_CNTR_NODE_LAST_MERGE",
86 [DNET_CNTR_NODE_CHECK_COPY
] = "DNET_CNTR_NODE_CHECK_COPY",
87 [DNET_CNTR_DBR_NOREC
] = "DNET_CNTR_DBR_NOREC",
88 [DNET_CNTR_DBR_SYSTEM
] = "DNET_CNTR_DBR_SYSTEM",
89 [DNET_CNTR_DBR_ERROR
] = "DNET_CNTR_DBR_ERROR",
90 [DNET_CNTR_DBW_SYSTEM
] = "DNET_CNTR_DBW_SYSTEM",
91 [DNET_CNTR_DBW_ERROR
] = "DNET_CNTR_DBW_ERROR",
92 [DNET_CNTR_UNKNOWN
] = "UNKNOWN",
95 char *dnet_cmd_string(int cmd
)
97 if (cmd
<= 0 || cmd
>= __DNET_CMD_MAX
)
98 cmd
= DNET_CMD_UNKNOWN
;
100 return dnet_cmd_strings
[cmd
];
103 char *dnet_counter_string(int cntr
, int cmd_num
)
105 if (cntr
<= 0 || cntr
>= __DNET_CNTR_MAX
)
106 cntr
= DNET_CNTR_UNKNOWN
;
109 return dnet_cmd_string(cntr
);
111 if (cntr
>= cmd_num
&& cntr
< (cmd_num
* 2))
112 return dnet_cmd_string(cntr
- cmd_num
);
114 return dnet_counter_strings
[cntr
];
117 static int dnet_add_received_state(struct dnet_node
*n
, struct dnet_addr_attr
*a
,
118 int group_id
, struct dnet_raw_id
*ids
, int id_num
)
121 struct dnet_net_state
*nst
;
125 dnet_setup_id(&raw
, group_id
, ids
[0].id
);
127 nst
= dnet_state_search_by_addr(n
, &a
->addr
);
134 s
= dnet_socket_create_addr(n
, a
->sock_type
, a
->proto
, a
->family
,
135 (struct sockaddr
*)&a
->addr
.addr
, a
->addr
.addr_len
, 0);
141 join
= DNET_WANT_RECONNECT
;
142 if (n
->flags
& DNET_CFG_JOIN_NETWORK
)
145 nst
= dnet_state_create(n
, group_id
, ids
, id_num
, &a
->addr
, s
, &err
, join
, dnet_state_net_process
);
149 dnet_log(n
, DNET_LOG_NOTICE
, "%d: added received state %s.\n",
150 group_id
, dnet_state_dump_addr(nst
));
160 static int dnet_process_addr_attr(struct dnet_net_state
*st
, struct dnet_addr_attr
*a
, int group_id
, int num
)
162 struct dnet_node
*n
= st
->n
;
163 struct dnet_raw_id
*ids
;
166 ids
= (struct dnet_raw_id
*)(a
+ 1);
167 for (i
=0; i
<num
; ++i
)
168 dnet_convert_raw_id(&ids
[0]);
170 err
= dnet_add_received_state(n
, a
, group_id
, ids
, num
);
171 dnet_log(n
, DNET_LOG_DEBUG
, "%s: route list: %d entries: %d.\n", dnet_server_convert_dnet_addr(&a
->addr
), num
, err
);
176 static int dnet_recv_route_list_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
178 struct dnet_wait
*w
= priv
;
179 struct dnet_addr_attr
*a
;
183 if (is_trans_destroyed(st
, cmd
)) {
189 dnet_wakeup(w
, w
->cond
= 1);
196 if (!cmd
->size
|| err
)
199 size
= cmd
->size
+ sizeof(struct dnet_cmd
);
200 if (size
< (signed)sizeof(struct dnet_addr_cmd
)) {
205 num
= (cmd
->size
- sizeof(struct dnet_addr_attr
)) / sizeof(struct dnet_raw_id
);
211 a
= (struct dnet_addr_attr
*)(cmd
+ 1);
212 dnet_convert_addr_attr(a
);
214 err
= dnet_process_addr_attr(st
, a
, cmd
->id
.group_id
, num
);
220 int dnet_recv_route_list(struct dnet_net_state
*st
)
222 struct dnet_io_req req
;
223 struct dnet_node
*n
= st
->n
;
224 struct dnet_trans
*t
;
225 struct dnet_cmd
*cmd
;
229 w
= dnet_wait_alloc(0);
235 t
= dnet_trans_alloc(n
, sizeof(struct dnet_cmd
));
238 goto err_out_wait_put
;
241 t
->complete
= dnet_recv_route_list_complete
;
244 cmd
= (struct dnet_cmd
*)(t
+ 1);
246 cmd
->flags
= DNET_FLAGS_NEED_ACK
| DNET_FLAGS_DIRECT
| DNET_FLAGS_NOLOCK
;
249 memcpy(&t
->cmd
, cmd
, sizeof(struct dnet_cmd
));
251 cmd
->cmd
= t
->command
= DNET_CMD_ROUTE_LIST
;
253 t
->st
= dnet_state_get(st
);
254 cmd
->trans
= t
->rcv_trans
= t
->trans
= atomic_inc(&n
->trans
);
256 dnet_convert_cmd(cmd
);
258 dnet_log(n
, DNET_LOG_DEBUG
, "%s: list route request to %s.\n", dnet_dump_id(&cmd
->id
),
259 dnet_server_convert_dnet_addr(&st
->addr
));
261 memset(&req
, 0, sizeof(req
));
264 req
.hsize
= sizeof(struct dnet_cmd
);
267 err
= dnet_trans_send(t
, &req
);
269 goto err_out_destroy
;
271 err
= dnet_wait_event(w
, w
->cond
!= 0, &n
->wait_ts
);
284 static struct dnet_net_state
*dnet_add_state_socket(struct dnet_node
*n
, struct dnet_addr
*addr
, int s
, int *errp
, int join
)
286 struct dnet_net_state
*st
, dummy
;
287 char buf
[sizeof(struct dnet_addr_cmd
)];
288 struct dnet_cmd
*cmd
;
289 int err
, num
, i
, size
;
290 struct dnet_raw_id
*ids
;
292 memset(buf
, 0, sizeof(buf
));
294 cmd
= (struct dnet_cmd
*)(buf
);
296 cmd
->flags
= DNET_FLAGS_DIRECT
| DNET_FLAGS_NOLOCK
;
297 cmd
->cmd
= DNET_CMD_REVERSE_LOOKUP
;
299 dnet_convert_cmd(cmd
);
302 memset(st
, 0, sizeof(struct dnet_net_state
));
304 st
->write_s
= st
->read_s
= s
;
307 err
= dnet_send_nolock(st
, buf
, sizeof(struct dnet_cmd
));
309 dnet_log(n
, DNET_LOG_ERROR
, "Failed to send reverse "
310 "lookup message to %s, err: %d.\n",
311 dnet_server_convert_dnet_addr(addr
), err
);
315 err
= dnet_recv(st
, buf
, sizeof(buf
));
317 dnet_log(n
, DNET_LOG_ERROR
, "Failed to receive reverse "
318 "lookup headers from %s, err: %d.\n",
319 dnet_server_convert_dnet_addr(addr
), err
);
323 cmd
= (struct dnet_cmd
*)(buf
);
325 dnet_convert_addr_cmd((struct dnet_addr_cmd
*)buf
);
327 size
= cmd
->size
- sizeof(struct dnet_addr_attr
);
328 num
= size
/ sizeof(struct dnet_raw_id
);
330 dnet_log(n
, DNET_LOG_DEBUG
, "%s: waiting for %d ids\n", dnet_dump_id(&cmd
->id
), num
);
338 err
= dnet_recv(st
, ids
, size
);
340 dnet_log(n
, DNET_LOG_ERROR
, "Failed to receive reverse "
341 "lookup body (%llu bytes) from %s, err: %d.\n",
342 (unsigned long long)cmd
->size
,
343 dnet_server_convert_dnet_addr(addr
), err
);
347 for (i
=0; i
<num
; ++i
)
348 dnet_convert_raw_id(&ids
[i
]);
350 st
= dnet_state_create(n
, cmd
->id
.group_id
, ids
, num
, addr
, s
, &err
, join
, dnet_state_net_process
);
352 /* socket is already closed */
369 int dnet_add_state(struct dnet_node
*n
, struct dnet_config
*cfg
)
371 int s
, err
, join
= DNET_WANT_RECONNECT
;
372 struct dnet_addr addr
;
373 struct dnet_net_state
*st
;
375 memset(&addr
, 0, sizeof(addr
));
377 addr
.addr_len
= sizeof(addr
.addr
);
378 s
= dnet_socket_create(n
, cfg
, &addr
, 0);
381 goto err_out_reconnect
;
384 if (n
->flags
& DNET_CFG_JOIN_NETWORK
)
387 /* will close socket on error */
388 st
= dnet_add_state_socket(n
, &addr
, s
, &err
, join
);
390 goto err_out_reconnect
;
392 if (!(cfg
->flags
& DNET_CFG_NO_ROUTE_LIST
))
393 dnet_recv_route_list(st
);
398 /* if state is already exist, it should not be an error */
402 if ((err
== -EADDRINUSE
) || (err
== -ECONNREFUSED
) || (err
== -ECONNRESET
) ||
403 (err
== -EINPROGRESS
) || (err
== -EAGAIN
))
404 dnet_add_reconnect_state(n
, &addr
, join
);
408 struct dnet_write_completion
{
411 struct dnet_wait
*wait
;
414 static void dnet_write_complete_free(struct dnet_write_completion
*wc
)
416 if (atomic_dec_and_test(&wc
->wait
->refcnt
)) {
417 dnet_wait_destroy(wc
->wait
);
423 static int dnet_write_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
426 struct dnet_write_completion
*wc
= priv
;
427 struct dnet_wait
*w
= wc
->wait
;
429 if (is_trans_destroyed(st
, cmd
)) {
430 dnet_wakeup(w
, w
->cond
++);
431 dnet_write_complete_free(wc
);
436 if (!err
&& st
&& (cmd
->size
> sizeof(struct dnet_addr_attr
) + sizeof(struct dnet_file_info
))) {
437 int old_size
= wc
->size
;
440 wc
->size
+= cmd
->size
+ sizeof(struct dnet_cmd
) + sizeof(struct dnet_addr
);
441 wc
->reply
= realloc(wc
->reply
, wc
->size
);
447 data
= wc
->reply
+ old_size
;
449 memcpy(data
, &st
->addr
, sizeof(struct dnet_addr
));
450 memcpy(data
+ sizeof(struct dnet_addr
), cmd
, sizeof(struct dnet_cmd
));
451 memcpy(data
+ sizeof(struct dnet_addr
) + sizeof(struct dnet_cmd
), cmd
+ 1, cmd
->size
);
455 pthread_mutex_lock(&w
->wait_lock
);
458 pthread_mutex_unlock(&w
->wait_lock
);
463 static struct dnet_trans
*dnet_io_trans_create(struct dnet_node
*n
, struct dnet_io_control
*ctl
, int *errp
)
465 struct dnet_io_req req
;
466 struct dnet_trans
*t
= NULL
;
467 struct dnet_io_attr
*io
;
468 struct dnet_cmd
*cmd
;
469 uint64_t size
= ctl
->io
.size
;
470 uint64_t tsize
= sizeof(struct dnet_io_attr
) + sizeof(struct dnet_cmd
);
473 if (ctl
->cmd
== DNET_CMD_READ
)
476 if (ctl
->fd
< 0 && size
< DNET_COPY_IO_SIZE
)
479 t
= dnet_trans_alloc(n
, tsize
);
482 goto err_out_complete
;
484 t
->complete
= ctl
->complete
;
487 cmd
= (struct dnet_cmd
*)(t
+ 1);
488 io
= (struct dnet_io_attr
*)(cmd
+ 1);
490 if (ctl
->fd
< 0 && size
< DNET_COPY_IO_SIZE
) {
493 memcpy(data
, ctl
->data
, size
);
497 memcpy(&cmd
->id
, &ctl
->id
, sizeof(struct dnet_id
));
498 cmd
->size
= sizeof(struct dnet_io_attr
) + size
;
499 cmd
->flags
= ctl
->cflags
;
502 cmd
->cmd
= t
->command
= ctl
->cmd
;
504 memcpy(io
, &ctl
->io
, sizeof(struct dnet_io_attr
));
505 memcpy(&t
->cmd
, cmd
, sizeof(struct dnet_cmd
));
507 t
->st
= dnet_state_get_first(n
, &cmd
->id
);
510 goto err_out_destroy
;
513 cmd
->trans
= t
->rcv_trans
= t
->trans
= atomic_inc(&n
->trans
);
515 dnet_log(n
, DNET_LOG_INFO
, "%s: created trans: %llu, cmd: %s, cflags: %llx, size: %llu, offset: %llu, "
516 "fd: %d, local_offset: %llu -> %s weight: %f, mrt: %ld.\n",
517 dnet_dump_id(&ctl
->id
),
518 (unsigned long long)t
->trans
,
519 dnet_cmd_string(ctl
->cmd
), (unsigned long long)cmd
->flags
,
520 (unsigned long long)ctl
->io
.size
, (unsigned long long)ctl
->io
.offset
,
522 (unsigned long long)ctl
->local_offset
,
523 dnet_server_convert_dnet_addr(&t
->st
->addr
), t
->st
->weight
, t
->st
->median_read_time
);
525 dnet_convert_cmd(cmd
);
526 dnet_convert_io_attr(io
);
529 memset(&req
, 0, sizeof(req
));
537 req
.local_offset
= ctl
->local_offset
;
539 } else if (size
>= DNET_COPY_IO_SIZE
) {
540 req
.data
= (void *)ctl
->data
;
544 err
= dnet_trans_send(t
, &req
);
546 goto err_out_destroy
;
552 ctl
->complete(NULL
, NULL
, ctl
->priv
);
562 int dnet_trans_create_send_all(struct dnet_node
*n
, struct dnet_io_control
*ctl
)
566 pthread_mutex_lock(&n
->group_lock
);
567 for (i
=0; i
<n
->group_num
; ++i
) {
568 ctl
->id
.group_id
= n
->groups
[i
];
570 dnet_io_trans_create(n
, ctl
, &err
);
573 pthread_mutex_unlock(&n
->group_lock
);
576 dnet_io_trans_create(n
, ctl
, &err
);
583 int dnet_write_object(struct dnet_node
*n
, struct dnet_io_control
*ctl
)
585 return dnet_trans_create_send_all(n
, ctl
);
588 static int dnet_write_file_id_raw(struct dnet_node
*n
, const char *file
, struct dnet_id
*id
,
589 uint64_t local_offset
, uint64_t remote_offset
, uint64_t size
,
590 uint64_t cflags
, unsigned int ioflags
)
592 int fd
, err
, trans_num
;
595 struct dnet_io_control ctl
;
596 struct dnet_write_completion
*wc
;
598 wc
= malloc(sizeof(struct dnet_write_completion
));
603 memset(wc
, 0, sizeof(struct dnet_write_completion
));
605 w
= dnet_wait_alloc(0);
609 dnet_log(n
, DNET_LOG_ERROR
, "Failed to allocate read waiting structure.\n");
615 fd
= open(file
, O_RDONLY
| O_LARGEFILE
| O_CLOEXEC
);
618 dnet_log_err(n
, "Failed to open to be written file '%s'", file
);
622 err
= fstat(fd
, &stat
);
625 dnet_log_err(n
, "Failed to stat to be written file '%s'", file
);
629 if (local_offset
>= (uint64_t)stat
.st_size
) {
634 if (!size
|| size
+ local_offset
>= (uint64_t)stat
.st_size
)
635 size
= stat
.st_size
- local_offset
;
637 memset(&ctl
, 0, sizeof(struct dnet_io_control
));
639 atomic_set(&w
->refcnt
, INT_MAX
);
643 ctl
.local_offset
= local_offset
;
646 ctl
.complete
= dnet_write_complete
;
649 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
650 ctl
.cmd
= DNET_CMD_WRITE
;
652 memcpy(ctl
.io
.id
, id
->id
, DNET_ID_SIZE
);
653 memcpy(ctl
.io
.parent
, id
->id
, DNET_ID_SIZE
);
655 ctl
.io
.flags
= ioflags
;
657 ctl
.io
.offset
= remote_offset
;
658 ctl
.io
.type
= id
->type
;
660 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
662 trans_num
= dnet_write_object(n
, &ctl
);
667 * 1 - the first reference counter we grabbed at allocation time
669 atomic_sub(&w
->refcnt
, INT_MAX
- trans_num
- 1);
671 err
= dnet_wait_event(w
, w
->cond
== trans_num
, &n
->wait_ts
);
672 if (err
|| w
->status
) {
677 if (!err
&& !trans_num
)
681 dnet_log(n
, DNET_LOG_ERROR
, "Failed to write file '%s' into the storage, transactions: %d, err: %d.\n", file
, trans_num
, err
);
685 dnet_log(n
, DNET_LOG_NOTICE
, "Successfully wrote file: '%s' into the storage, size: %llu.\n",
686 file
, (unsigned long long)size
);
689 dnet_write_complete_free(wc
);
696 dnet_write_complete_free(wc
);
701 int dnet_write_file_id(struct dnet_node
*n
, const char *file
, struct dnet_id
*id
, uint64_t local_offset
,
702 uint64_t remote_offset
, uint64_t size
, uint64_t cflags
, unsigned int ioflags
)
704 int err
= dnet_write_file_id_raw(n
, file
, id
, local_offset
, remote_offset
, size
, cflags
, ioflags
);
705 if (!err
&& !(ioflags
& DNET_IO_FLAGS_CACHE_ONLY
))
706 err
= dnet_create_write_metadata_strings(n
, NULL
, 0, id
, NULL
, cflags
);
711 int dnet_write_file(struct dnet_node
*n
, const char *file
, const void *remote
, int remote_len
,
712 uint64_t local_offset
, uint64_t remote_offset
, uint64_t size
,
713 uint64_t cflags
, unsigned int ioflags
, int type
)
718 dnet_transform(n
, remote
, remote_len
, &id
);
721 err
= dnet_write_file_id_raw(n
, file
, &id
, local_offset
, remote_offset
, size
, cflags
, ioflags
);
722 if (!err
&& !(ioflags
& DNET_IO_FLAGS_CACHE_ONLY
))
723 err
= dnet_create_write_metadata_strings(n
, remote
, remote_len
, &id
, NULL
, cflags
);
728 static int dnet_read_file_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
732 struct dnet_io_completion
*c
= priv
;
733 struct dnet_io_attr
*io
;
736 if (is_trans_destroyed(st
, cmd
)) {
739 if (cmd
&& cmd
->status
)
742 dnet_wakeup(c
->wait
, c
->wait
->cond
= err
);
743 dnet_wait_put(c
->wait
);
752 if (cmd
->status
!= 0 || cmd
->size
== 0) {
754 goto err_out_exit_no_log
;
757 if (cmd
->size
<= sizeof(struct dnet_io_attr
)) {
758 dnet_log(n
, DNET_LOG_ERROR
, "%s: read completion error: wrong size: cmd_size: %llu, must be more than %zu.\n",
759 dnet_dump_id(&cmd
->id
), (unsigned long long)cmd
->size
,
760 sizeof(struct dnet_io_attr
));
762 goto err_out_exit_no_log
;
765 io
= (struct dnet_io_attr
*)(cmd
+ 1);
768 dnet_convert_io_attr(io
);
770 fd
= open(c
->file
, O_RDWR
| O_CREAT
| O_CLOEXEC
, 0644);
773 dnet_log_err(n
, "%s: failed to open read completion file '%s'", dnet_dump_id(&cmd
->id
), c
->file
);
777 err
= pwrite(fd
, data
, io
->size
, c
->offset
);
780 dnet_log_err(n
, "%s: failed to write data into completion file '%s'", dnet_dump_id(&cmd
->id
), c
->file
);
785 dnet_log(n
, DNET_LOG_NOTICE
, "%s: read completed: file: '%s', offset: %llu, size: %llu, status: %d.\n",
786 dnet_dump_id(&cmd
->id
), c
->file
, (unsigned long long)c
->offset
,
787 (unsigned long long)io
->size
, cmd
->status
);
794 dnet_log(n
, DNET_LOG_ERROR
, "%s: read completed: file: '%s', offset: %llu, size: %llu, status: %d, err: %d.\n",
795 dnet_dump_id(&cmd
->id
), c
->file
, (unsigned long long)io
->offset
,
796 (unsigned long long)io
->size
, cmd
->status
, err
);
798 dnet_wakeup(c
->wait
, c
->wait
->cond
= err
? err
: 1);
802 int dnet_read_object(struct dnet_node
*n
, struct dnet_io_control
*ctl
)
806 if (!dnet_io_trans_create(n
, ctl
, &err
))
812 static int dnet_read_file_raw_exec(struct dnet_node
*n
, const char *file
, unsigned int len
,
813 uint64_t write_offset
, uint64_t io_offset
, uint64_t io_size
,
814 struct dnet_id
*id
, struct dnet_wait
*w
)
816 struct dnet_io_control ctl
;
817 struct dnet_io_completion
*c
;
818 int err
, wait_init
= ~0;
820 memset(&ctl
, 0, sizeof(struct dnet_io_control
));
822 ctl
.io
.size
= io_size
;
823 ctl
.io
.offset
= io_offset
;
825 ctl
.io
.type
= id
->type
;
827 memcpy(ctl
.io
.parent
, id
->id
, DNET_ID_SIZE
);
828 memcpy(ctl
.io
.id
, id
->id
, DNET_ID_SIZE
);
830 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
833 ctl
.complete
= dnet_read_file_complete
;
834 ctl
.cmd
= DNET_CMD_READ
;
835 ctl
.cflags
= DNET_FLAGS_NEED_ACK
;
837 c
= malloc(sizeof(struct dnet_io_completion
) + len
+ 1 + sizeof(DNET_HISTORY_SUFFIX
));
839 dnet_log(n
, DNET_LOG_ERROR
, "%s: failed to allocate IO completion structure "
840 "for '%s' file reading.\n",
841 dnet_dump_id(&ctl
.id
), file
);
846 memset(c
, 0, sizeof(struct dnet_io_completion
) + len
+ 1 + sizeof(DNET_HISTORY_SUFFIX
));
848 c
->wait
= dnet_wait_get(w
);
849 c
->offset
= write_offset
;
850 c
->file
= (char *)(c
+ 1);
852 sprintf(c
->file
, "%s", file
);
857 err
= dnet_read_object(n
, &ctl
);
861 err
= dnet_wait_event(w
, w
->cond
!= wait_init
, &n
->wait_ts
);
862 if ((err
< 0) || (w
->cond
< 0)) {
863 char id_str
[2*DNET_ID_SIZE
+ 1];
866 dnet_log(n
, DNET_LOG_ERROR
, "%d:%s '%s' : failed to read data: %d\n",
867 ctl
.id
.group_id
, dnet_dump_id_len_raw(ctl
.id
.id
, DNET_ID_SIZE
, id_str
),
878 static int dnet_read_file_raw(struct dnet_node
*n
, const char *file
, struct dnet_id
*id
, uint64_t offset
, uint64_t size
)
880 int err
= -ENOENT
, len
= strlen(file
), i
;
884 w
= dnet_wait_alloc(~0);
887 dnet_log(n
, DNET_LOG_ERROR
, "Failed to allocate read waiting.\n");
894 num
= dnet_mix_states(n
, id
, &g
);
900 for (i
=0; i
<num
; ++i
) {
903 err
= dnet_read_file_raw_exec(n
, file
, len
, 0, offset
, size
, id
, w
);
917 int dnet_read_file_id(struct dnet_node
*n
, const char *file
, struct dnet_id
*id
, uint64_t offset
, uint64_t size
)
919 return dnet_read_file_raw(n
, file
, id
, offset
, size
);
922 int dnet_read_file(struct dnet_node
*n
, const char *file
, const void *remote
, int remote_size
,
923 uint64_t offset
, uint64_t size
, int type
)
927 dnet_transform(n
, remote
, remote_size
, &id
);
930 return dnet_read_file_raw(n
, file
, &id
, offset
, size
);
933 struct dnet_wait
*dnet_wait_alloc(int cond
)
938 w
= malloc(sizeof(struct dnet_wait
));
944 memset(w
, 0, sizeof(struct dnet_wait
));
946 err
= pthread_cond_init(&w
->wait
, NULL
);
950 err
= pthread_mutex_init(&w
->wait_lock
, NULL
);
952 goto err_out_destroy
;
955 atomic_init(&w
->refcnt
, 1);
960 pthread_mutex_destroy(&w
->wait_lock
);
965 void dnet_wait_destroy(struct dnet_wait
*w
)
967 pthread_mutex_destroy(&w
->wait_lock
);
968 pthread_cond_destroy(&w
->wait
);
973 static int dnet_send_cmd_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
975 struct dnet_wait
*w
= priv
;
977 if (is_trans_destroyed(st
, cmd
)) {
978 dnet_wakeup(w
, w
->cond
++);
983 w
->status
= cmd
->status
;
987 void *data
= cmd
+ 1;
989 w
->ret
= realloc(w
->ret
, w
->size
+ cmd
->size
);
994 memcpy(w
->ret
+ w
->size
, data
, cmd
->size
);
995 w
->size
+= cmd
->size
;
1002 static int dnet_send_cmd_single(struct dnet_net_state
*st
, struct dnet_wait
*w
, struct sph
*e
, uint64_t cflags
)
1004 struct dnet_trans_control ctl
;
1006 memset(&ctl
, 0, sizeof(struct dnet_trans_control
));
1008 dnet_setup_id(&ctl
.id
, st
->idc
->group
->group_id
, st
->idc
->ids
[0].raw
.id
);
1009 ctl
.size
= sizeof(struct sph
) + e
->event_size
+ e
->data_size
+ e
->binary_size
;
1010 ctl
.cmd
= DNET_CMD_EXEC
;
1011 ctl
.complete
= dnet_send_cmd_complete
;
1013 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
1015 dnet_convert_sph(e
);
1019 return dnet_trans_alloc_send_state(st
, &ctl
);
1022 static int dnet_send_cmd_raw(struct dnet_node
*n
, struct dnet_id
*id
,
1023 struct sph
*e
, void **ret
, uint64_t cflags
)
1025 struct dnet_net_state
*st
;
1026 int err
= -ENOENT
, num
= 0;
1027 struct dnet_wait
*w
;
1028 struct dnet_group
*g
;
1030 w
= dnet_wait_alloc(0);
1036 if (id
&& id
->group_id
!= 0) {
1038 st
= dnet_state_get_first(n
, id
);
1041 err
= dnet_send_cmd_single(st
, w
, e
, cflags
);
1044 } else if (id
&& id
->group_id
== 0) {
1045 pthread_mutex_lock(&n
->state_lock
);
1046 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
1049 id
->group_id
= g
->group_id
;
1051 st
= dnet_state_search_nolock(n
, id
);
1054 err
= dnet_send_cmd_single(st
, w
, e
, cflags
);
1060 pthread_mutex_unlock(&n
->state_lock
);
1062 pthread_mutex_lock(&n
->state_lock
);
1063 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
1064 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
1070 err
= dnet_send_cmd_single(st
, w
, e
, cflags
);
1074 pthread_mutex_unlock(&n
->state_lock
);
1077 err
= dnet_wait_event(w
, w
->cond
== num
, &n
->wait_ts
);
1098 int dnet_send_cmd(struct dnet_node
*n
, struct dnet_id
*id
, struct sph
*e
, void **ret
)
1100 return dnet_send_cmd_raw(n
, id
, e
, ret
, 0);
1103 int dnet_send_cmd_nolock(struct dnet_node
*n
, struct dnet_id
*id
, struct sph
*e
, void **ret
)
1105 return dnet_send_cmd_raw(n
, id
, e
, ret
, DNET_FLAGS_NOLOCK
);
1108 int dnet_try_reconnect(struct dnet_node
*n
)
1110 struct dnet_addr_storage
*ast
, *tmp
;
1111 struct dnet_net_state
*st
;
1115 if (list_empty(&n
->reconnect_list
))
1118 pthread_mutex_lock(&n
->reconnect_lock
);
1119 list_for_each_entry_safe(ast
, tmp
, &n
->reconnect_list
, reconnect_entry
) {
1120 list_move(&ast
->reconnect_entry
, &list
);
1122 pthread_mutex_unlock(&n
->reconnect_lock
);
1124 list_for_each_entry_safe(ast
, tmp
, &list
, reconnect_entry
) {
1125 s
= dnet_socket_create_addr(n
, n
->sock_type
, n
->proto
, n
->family
,
1126 (struct sockaddr
*)ast
->addr
.addr
, ast
->addr
.addr_len
, 0);
1130 join
= DNET_WANT_RECONNECT
;
1131 if (ast
->__join_state
== DNET_JOIN
)
1134 st
= dnet_add_state_socket(n
, &ast
->addr
, s
, &err
, join
);
1140 if (err
== -EEXIST
|| err
== -EINVAL
)
1144 dnet_add_reconnect_state(n
, &ast
->addr
, ast
->__join_state
);
1146 list_del(&ast
->reconnect_entry
);
1153 int dnet_lookup_object(struct dnet_node
*n
, struct dnet_id
*id
, uint64_t cflags
,
1154 int (* complete
)(struct dnet_net_state
*, struct dnet_cmd
*, void *),
1157 struct dnet_io_req req
;
1158 struct dnet_trans
*t
;
1159 struct dnet_cmd
*cmd
;
1162 t
= dnet_trans_alloc(n
, sizeof(struct dnet_cmd
));
1165 goto err_out_complete
;
1167 t
->complete
= complete
;
1170 cmd
= (struct dnet_cmd
*)(t
+ 1);
1172 memcpy(&cmd
->id
, id
, sizeof(struct dnet_id
));
1174 memcpy(&t
->cmd
, cmd
, sizeof(struct dnet_cmd
));
1176 cmd
->cmd
= t
->command
= DNET_CMD_LOOKUP
;
1177 cmd
->flags
= cflags
| DNET_FLAGS_NEED_ACK
;
1179 t
->st
= dnet_state_get_first(n
, &cmd
->id
);
1182 goto err_out_destroy
;
1185 cmd
->trans
= t
->rcv_trans
= t
->trans
= atomic_inc(&n
->trans
);
1186 dnet_convert_cmd(cmd
);
1188 dnet_log(n
, DNET_LOG_NOTICE
, "%s: lookup to %s.\n", dnet_dump_id(id
), dnet_server_convert_dnet_addr(&t
->st
->addr
));
1190 memset(&req
, 0, sizeof(req
));
1193 req
.hsize
= sizeof(struct dnet_cmd
);
1195 err
= dnet_trans_send(t
, &req
);
1197 goto err_out_destroy
;
1203 complete(NULL
, NULL
, priv
);
1211 int dnet_lookup_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
1213 struct dnet_wait
*w
= priv
;
1214 struct dnet_node
*n
= NULL
;
1215 struct dnet_addr_attr
*a
;
1216 struct dnet_net_state
*other
;
1217 char addr_str
[128] = "no-address";
1220 if (is_trans_destroyed(st
, cmd
)) {
1221 dnet_wakeup(w
, w
->cond
++);
1228 if (err
|| !cmd
->size
)
1231 if (cmd
->size
< sizeof(struct dnet_addr_attr
)) {
1232 dnet_log(st
->n
, DNET_LOG_ERROR
, "%s: wrong dnet_addr attribute size %llu, must be at least %zu.\n",
1233 dnet_dump_id(&cmd
->id
), (unsigned long long)cmd
->size
, sizeof(struct dnet_addr_attr
));
1238 a
= (struct dnet_addr_attr
*)(cmd
+ 1);
1240 dnet_convert_addr_attr(a
);
1241 dnet_server_convert_dnet_addr_raw(&a
->addr
, addr_str
, sizeof(addr_str
));
1243 if (cmd
->size
> sizeof(struct dnet_addr_attr
) + sizeof(struct dnet_file_info
)) {
1244 struct dnet_file_info
*info
= (struct dnet_file_info
*)(a
+ 1);
1246 dnet_convert_file_info(info
);
1248 dnet_log_raw(n
, DNET_LOG_NOTICE
, "%s: lookup object: %s: "
1249 "offset: %llu, size: %llu, mode: %llo, path: %s\n",
1250 dnet_dump_id(&cmd
->id
), addr_str
,
1251 (unsigned long long)info
->offset
, (unsigned long long)info
->size
,
1252 (unsigned long long)info
->mode
, (char *)(info
+ 1));
1254 dnet_log_raw(n
, DNET_LOG_INFO
, "%s: lookup object: %s\n",
1255 dnet_dump_id(&cmd
->id
), addr_str
);
1259 other
= dnet_state_search_by_addr(n
, &a
->addr
);
1261 dnet_state_put(other
);
1263 dnet_recv_route_list(st
);
1270 dnet_log(n
, DNET_LOG_ERROR
, "%s: lookup completion status: %d, err: %d.\n", dnet_dump_id(&cmd
->id
), cmd
->status
, err
);
1275 int dnet_lookup(struct dnet_node
*n
, const char *file
)
1277 int err
, error
= 0, i
;
1278 struct dnet_wait
*w
;
1281 w
= dnet_wait_alloc(0);
1287 dnet_transform(n
, file
, strlen(file
), &raw
);
1289 pthread_mutex_lock(&n
->group_lock
);
1290 for (i
=0; i
<n
->group_num
; ++i
) {
1291 raw
.group_id
= n
->groups
[i
];
1293 err
= dnet_lookup_object(n
, &raw
, 0, dnet_lookup_complete
, dnet_wait_get(w
));
1299 err
= dnet_wait_event(w
, w
->cond
== 1, &n
->wait_ts
);
1300 if (err
|| w
->status
) {
1310 pthread_mutex_unlock(&n
->group_lock
);
1319 struct dnet_addr
*dnet_state_addr(struct dnet_net_state
*st
)
1324 static int dnet_stat_complete(struct dnet_net_state
*state
, struct dnet_cmd
*cmd
, void *priv
)
1326 struct dnet_wait
*w
= priv
;
1328 struct dnet_stat
*st
;
1331 if (is_trans_destroyed(state
, cmd
)) {
1332 dnet_wakeup(w
, w
->cond
++);
1337 if (cmd
->cmd
== DNET_CMD_STAT
&& cmd
->size
== sizeof(struct dnet_stat
)) {
1338 st
= (struct dnet_stat
*)(cmd
+ 1);
1340 dnet_convert_stat(st
);
1342 la
[0] = (float)st
->la
[0] / 100.0;
1343 la
[1] = (float)st
->la
[1] / 100.0;
1344 la
[2] = (float)st
->la
[2] / 100.0;
1346 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: la: %.2f %.2f %.2f.\n",
1347 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
),
1348 la
[0], la
[1], la
[2]);
1349 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: mem: "
1350 "total: %llu kB, free: %llu kB, cache: %llu kB.\n",
1351 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
),
1352 (unsigned long long)st
->vm_total
,
1353 (unsigned long long)st
->vm_free
,
1354 (unsigned long long)st
->vm_cached
);
1355 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: fs: "
1356 "total: %llu mB, avail: %llu mB, files: %llu, fsid: %llx.\n",
1357 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
),
1358 (unsigned long long)(st
->frsize
* st
->blocks
/ 1024 / 1024),
1359 (unsigned long long)(st
->bavail
* st
->bsize
/ 1024 / 1024),
1360 (unsigned long long)st
->files
, (unsigned long long)st
->fsid
);
1362 } else if (cmd
->size
>= sizeof(struct dnet_addr_stat
) && cmd
->cmd
== DNET_CMD_STAT_COUNT
) {
1363 struct dnet_addr_stat
*as
= (struct dnet_addr_stat
*)(cmd
+ 1);
1366 dnet_convert_addr_stat(as
, 0);
1368 for (i
=0; i
<as
->num
; ++i
) {
1369 if (as
->num
> as
->cmd_num
) {
1371 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: Storage commands\n",
1372 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
));
1373 if (i
== as
->cmd_num
)
1374 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: Proxy commands\n",
1375 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
));
1376 if (i
== as
->cmd_num
* 2)
1377 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: Counters\n",
1378 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
));
1380 dnet_log(state
->n
, DNET_LOG_DATA
, "%s: %s: cmd: %s, count: %llu, err: %llu\n",
1381 dnet_dump_id(&cmd
->id
), dnet_state_dump_addr(state
),
1382 dnet_counter_string(i
, as
->cmd_num
),
1383 (unsigned long long)as
->count
[i
].count
, (unsigned long long)as
->count
[i
].err
);
1390 static int dnet_request_cmd_single(struct dnet_node
*n
, struct dnet_net_state
*st
, struct dnet_trans_control
*ctl
)
1393 return dnet_trans_alloc_send_state(st
, ctl
);
1395 return dnet_trans_alloc_send(n
, ctl
);
1398 int dnet_request_stat(struct dnet_node
*n
, struct dnet_id
*id
,
1399 unsigned int cmd
, uint64_t cflags
,
1400 int (* complete
)(struct dnet_net_state
*state
,
1401 struct dnet_cmd
*cmd
,
1405 struct dnet_trans_control ctl
;
1406 struct dnet_wait
*w
= NULL
;
1408 struct timeval start
, end
;
1411 gettimeofday(&start
, NULL
);
1414 w
= dnet_wait_alloc(0);
1420 complete
= dnet_stat_complete
;
1424 memset(&ctl
, 0, sizeof(struct dnet_trans_control
));
1427 ctl
.complete
= complete
;
1429 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| DNET_FLAGS_NOLOCK
| cflags
;
1435 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
1437 err
= dnet_request_cmd_single(n
, NULL
, &ctl
);
1440 struct dnet_net_state
*st
;
1441 struct dnet_group
*g
;
1444 pthread_mutex_lock(&n
->state_lock
);
1445 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
1446 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
1453 dnet_setup_id(&ctl
.id
, st
->idc
->group
->group_id
, st
->idc
->ids
[0].raw
.id
);
1454 dnet_request_cmd_single(n
, st
, &ctl
);
1458 pthread_mutex_unlock(&n
->state_lock
);
1462 gettimeofday(&end
, NULL
);
1463 diff
= (end
.tv_sec
- start
.tv_sec
) * 1000000 + end
.tv_usec
- start
.tv_usec
;
1464 dnet_log(n
, DNET_LOG_NOTICE
, "stat cmd: %s: %ld usecs, num: %d.\n", dnet_cmd_string(cmd
), diff
, num
);
1469 err
= dnet_wait_event(w
, w
->cond
== num
, &n
->wait_ts
);
1471 gettimeofday(&end
, NULL
);
1472 diff
= (end
.tv_sec
- start
.tv_sec
) * 1000000 + end
.tv_usec
- start
.tv_usec
;
1473 dnet_log(n
, DNET_LOG_NOTICE
, "stat cmd: %s: %ld usecs, wait_error: %d, num: %d.\n", dnet_cmd_string(cmd
), diff
, err
, num
);
1488 struct dnet_request_cmd_priv
{
1489 struct dnet_wait
*w
;
1491 int (* complete
)(struct dnet_net_state
*state
, struct dnet_cmd
*cmd
, void *priv
);
1495 static int dnet_request_cmd_complete(struct dnet_net_state
*state
, struct dnet_cmd
*cmd
, void *priv
)
1497 struct dnet_request_cmd_priv
*p
= priv
;
1498 int err
= p
->complete(state
, cmd
, p
->priv
);
1500 if (is_trans_destroyed(state
, cmd
)) {
1501 struct dnet_wait
*w
= p
->w
;
1503 dnet_wakeup(w
, w
->cond
++);
1504 if (atomic_read(&w
->refcnt
) == 1)
1512 int dnet_request_cmd(struct dnet_node
*n
, struct dnet_trans_control
*ctl
)
1515 struct dnet_request_cmd_priv
*p
;
1516 struct dnet_wait
*w
;
1517 struct dnet_net_state
*st
;
1518 struct dnet_group
*g
;
1519 struct timeval start
, end
;
1522 gettimeofday(&start
, NULL
);
1524 p
= malloc(sizeof(*p
));
1530 w
= dnet_wait_alloc(0);
1537 p
->complete
= ctl
->complete
;
1538 p
->priv
= ctl
->priv
;
1540 ctl
->complete
= dnet_request_cmd_complete
;
1543 pthread_mutex_lock(&n
->state_lock
);
1544 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
1545 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
1551 ctl
->id
.group_id
= g
->group_id
;
1553 if (!(ctl
->cflags
& DNET_FLAGS_DIRECT
))
1554 dnet_setup_id(&ctl
->id
, st
->idc
->group
->group_id
, st
->idc
->ids
[0].raw
.id
);
1555 dnet_request_cmd_single(n
, st
, ctl
);
1559 pthread_mutex_unlock(&n
->state_lock
);
1561 err
= dnet_wait_event(w
, w
->cond
== num
, &n
->wait_ts
);
1563 gettimeofday(&end
, NULL
);
1564 diff
= (end
.tv_sec
- start
.tv_sec
) * 1000000 + end
.tv_usec
- start
.tv_usec
;
1565 dnet_log(n
, DNET_LOG_NOTICE
, "request cmd: %s: %ld usecs, wait_error: %d, num: %d.\n", dnet_cmd_string(ctl
->cmd
), diff
, err
, num
);
1570 if (atomic_read(&w
->refcnt
) == 1)
1582 struct dnet_update_status_priv
{
1583 struct dnet_wait
*w
;
1584 struct dnet_node_status status
;
1588 static int dnet_update_status_complete(struct dnet_net_state
*state
, struct dnet_cmd
*cmd
, void *priv
)
1590 struct dnet_update_status_priv
*p
= priv
;
1592 if (is_trans_destroyed(state
, cmd
)) {
1593 dnet_wakeup(p
->w
, p
->w
->cond
++);
1594 dnet_wait_put(p
->w
);
1595 if (atomic_dec_and_test(&p
->refcnt
))
1599 if (cmd
->size
== sizeof(struct dnet_node_status
)) {
1600 memcpy(&p
->status
, cmd
+ 1, sizeof(struct dnet_node_status
));
1607 int dnet_update_status(struct dnet_node
*n
, struct dnet_addr
*addr
, struct dnet_id
*id
, struct dnet_node_status
*status
)
1610 struct dnet_update_status_priv
*priv
;
1611 struct dnet_trans_control ctl
;
1618 memset(&ctl
, 0, sizeof(ctl
));
1621 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
1623 struct dnet_net_state
*st
;
1625 st
= dnet_state_search_by_addr(n
, addr
);
1631 dnet_setup_id(&ctl
.id
, st
->idc
->group
->group_id
, st
->idc
->ids
[0].raw
.id
);
1635 priv
= malloc(sizeof(struct dnet_update_status_priv
));
1641 priv
->w
= dnet_wait_alloc(0);
1647 ctl
.complete
= dnet_update_status_complete
;
1649 ctl
.cmd
= DNET_CMD_STATUS
;
1650 ctl
.cflags
= DNET_FLAGS_NEED_ACK
;
1651 ctl
.size
= sizeof(struct dnet_node_status
);
1654 dnet_wait_get(priv
->w
);
1655 dnet_request_cmd_single(n
, NULL
, &ctl
);
1657 err
= dnet_wait_event(priv
->w
, priv
->w
->cond
== 1, &n
->wait_ts
);
1658 dnet_wait_put(priv
->w
);
1660 memcpy(status
, &priv
->status
, sizeof(struct dnet_node_status
));
1662 if (atomic_dec_and_test(&priv
->refcnt
))
1669 static int dnet_remove_object_raw(struct dnet_node
*n
, struct dnet_id
*id
,
1670 int (* complete
)(struct dnet_net_state
*state
,
1671 struct dnet_cmd
*cmd
,
1673 void *priv
, uint64_t cflags
, uint64_t ioflags
)
1675 struct dnet_io_control ctl
;
1677 memset(&ctl
, 0, sizeof(struct dnet_io_control
));
1679 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
1681 memcpy(&ctl
.io
.id
, id
->id
, DNET_ID_SIZE
);
1682 memcpy(&ctl
.io
.parent
, id
->id
, DNET_ID_SIZE
);
1683 ctl
.io
.flags
= ioflags
;
1687 ctl
.cmd
= DNET_CMD_DEL
;
1688 ctl
.complete
= complete
;
1690 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
1692 return dnet_trans_create_send_all(n
, &ctl
);
1695 static int dnet_remove_complete(struct dnet_net_state
*state
,
1696 struct dnet_cmd
*cmd
,
1699 struct dnet_wait
*w
= priv
;
1701 if (is_trans_destroyed(state
, cmd
)) {
1702 dnet_wakeup(w
, w
->cond
++);
1708 w
->status
= cmd
->status
;
1712 int dnet_remove_object(struct dnet_node
*n
, struct dnet_id
*id
,
1713 int (* complete
)(struct dnet_net_state
*state
,
1714 struct dnet_cmd
*cmd
,
1717 uint64_t cflags
, uint64_t ioflags
)
1719 struct dnet_wait
*w
= NULL
;
1723 w
= dnet_wait_alloc(0);
1729 complete
= dnet_remove_complete
;
1734 err
= dnet_remove_object_raw(n
, id
, complete
, priv
, cflags
, ioflags
);
1739 err
= dnet_wait_event(w
, w
->cond
!= err
, &n
->wait_ts
);
1754 static int dnet_remove_file_raw(struct dnet_node
*n
, struct dnet_id
*id
, uint64_t cflags
, uint64_t ioflags
)
1756 struct dnet_wait
*w
;
1759 w
= dnet_wait_alloc(0);
1765 atomic_add(&w
->refcnt
, 1024);
1766 err
= dnet_remove_object_raw(n
, id
, dnet_remove_complete
, w
, cflags
, ioflags
);
1768 atomic_sub(&w
->refcnt
, 1024);
1773 atomic_sub(&w
->refcnt
, 1024 - num
);
1775 err
= dnet_wait_event(w
, w
->cond
== num
, &n
->wait_ts
);
1789 int dnet_remove_object_now(struct dnet_node
*n
, struct dnet_id
*id
, uint64_t cflags
, uint64_t ioflags
)
1791 return dnet_remove_file_raw(n
, id
, cflags
| DNET_FLAGS_NEED_ACK
| DNET_ATTR_DELETE_HISTORY
, ioflags
);
1794 int dnet_remove_file(struct dnet_node
*n
, char *remote
, int remote_len
, struct dnet_id
*id
, uint64_t cflags
, uint64_t ioflags
)
1799 dnet_transform(n
, remote
, remote_len
, &raw
);
1804 return dnet_remove_file_raw(n
, id
, cflags
, ioflags
);
1807 int dnet_request_ids(struct dnet_node
*n
, struct dnet_id
*id
, uint64_t cflags
,
1808 int (* complete
)(struct dnet_net_state
*state
,
1809 struct dnet_cmd
*cmd
,
1813 struct dnet_trans_control ctl
;
1815 dnet_log_raw(n
, DNET_LOG_ERROR
, "Temporarily unsupported operation.\n");
1818 memset(&ctl
, 0, sizeof(struct dnet_trans_control
));
1820 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
1821 ctl
.cmd
= DNET_CMD_LIST
;
1822 ctl
.complete
= complete
;
1824 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
1826 return dnet_trans_alloc_send(n
, &ctl
);
1829 struct dnet_node
*dnet_get_node_from_state(void *state
)
1831 struct dnet_net_state
*st
= state
;
1838 struct dnet_read_data_completion
{
1839 struct dnet_wait
*w
;
1845 static int dnet_read_data_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
1847 struct dnet_read_data_completion
*c
= priv
;
1848 struct dnet_wait
*w
= c
->w
;
1851 if (is_trans_destroyed(st
, cmd
)) {
1852 dnet_wakeup(w
, w
->cond
++);
1854 if (atomic_dec_and_test(&c
->refcnt
))
1863 if (cmd
->size
>= sizeof(struct dnet_io_attr
)) {
1864 struct dnet_io_attr
*io
= (struct dnet_io_attr
*)(cmd
+ 1);
1865 uint64_t sz
= c
->size
;
1867 dnet_convert_io_attr(io
);
1869 sz
+= io
->size
+ sizeof(struct dnet_io_attr
);
1870 c
->data
= realloc(c
->data
, sz
);
1876 memcpy(c
->data
+ c
->size
, io
, sizeof(struct dnet_io_attr
) + io
->size
);
1881 dnet_log(st
->n
, DNET_LOG_NOTICE
, "%s: object read completed: trans: %llu, status: %d, err: %d.\n",
1882 dnet_dump_id(&cmd
->id
), (unsigned long long)(cmd
->trans
& ~DNET_TRANS_REPLY
),
1888 void *dnet_read_data_wait_raw(struct dnet_node
*n
, struct dnet_id
*id
, struct dnet_io_attr
*io
,
1889 int cmd
, uint64_t cflags
, int *errp
)
1891 struct dnet_io_control ctl
;
1892 struct dnet_wait
*w
;
1893 struct dnet_read_data_completion
*c
;
1897 w
= dnet_wait_alloc(0);
1903 c
= malloc(sizeof(*c
));
1912 /* one for completion callback, another for this function */
1913 atomic_init(&c
->refcnt
, 2);
1915 memset(&ctl
, 0, sizeof(struct dnet_io_control
));
1920 ctl
.complete
= dnet_read_data_complete
;
1923 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
1925 memcpy(&ctl
.io
, io
, sizeof(struct dnet_io_attr
));
1926 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
1928 ctl
.id
.type
= io
->type
;
1931 err
= dnet_read_object(n
, &ctl
);
1933 goto err_out_put_complete
;
1935 err
= dnet_wait_event(w
, w
->cond
, &n
->wait_ts
);
1936 if (err
|| w
->status
) {
1937 char id_str
[2*DNET_ID_SIZE
+ 1];
1940 if ((cmd
!= DNET_CMD_READ_RANGE
) || (err
!= -ENOENT
))
1941 dnet_log(n
, DNET_LOG_ERROR
, "%d:%s : failed to read data: %d\n",
1942 ctl
.id
.group_id
, dnet_dump_id_len_raw(ctl
.id
.id
, DNET_ID_SIZE
, id_str
), err
);
1943 goto err_out_put_complete
;
1949 err_out_put_complete
:
1950 if (atomic_dec_and_test(&c
->refcnt
))
1959 static int dnet_read_recover(struct dnet_node
*n
, struct dnet_id
*id
, struct dnet_io_attr
*io
, void *data
, uint64_t cflags
)
1961 struct dnet_meta_container mc
;
1962 struct dnet_io_control ctl
;
1966 err
= dnet_read_meta(n
, &mc
, NULL
, 0, id
);
1968 dnet_log(n
, DNET_LOG_ERROR
, "%s: read-recovery: could read metadata: %d\n", dnet_dump_id(id
), err
);
1972 memset(&ctl
, 0, sizeof(struct dnet_io_control
));
1977 ctl
.data
= data
+ sizeof(struct dnet_io_attr
);
1978 ctl
.io
.size
-= sizeof(struct dnet_io_attr
);
1981 ctl
.cmd
= DNET_CMD_WRITE
;
1982 ctl
.cflags
= cflags
;
1984 err
= dnet_write_data_wait(n
, &ctl
, &result
);
1986 dnet_log(n
, DNET_LOG_ERROR
, "%s: read-recovery: could not write data: %d\n", dnet_dump_id(id
), err
);
1987 goto err_out_free_meta
;
1990 err
= dnet_write_metadata(n
, &mc
, 0, cflags
);
1992 goto err_out_free_result
;
1994 err_out_free_result
:
2002 void *dnet_read_data_wait_groups(struct dnet_node
*n
, struct dnet_id
*id
, int *groups
, int num
,
2003 struct dnet_io_attr
*io
, uint64_t cflags
, int *errp
)
2008 for (i
= 0; i
< num
; ++i
) {
2009 id
->group_id
= groups
[i
];
2011 data
= dnet_read_data_wait_raw(n
, id
, io
, DNET_CMD_READ
, cflags
, errp
);
2013 if ((i
!= 0) && (io
->type
== 0) && (io
->offset
== 0) && (io
->size
> sizeof(struct dnet_io_attr
))) {
2014 dnet_read_recover(n
, id
, io
, data
, cflags
);
2025 void *dnet_read_data_wait(struct dnet_node
*n
, struct dnet_id
*id
, struct dnet_io_attr
*io
,
2026 uint64_t cflags
, int *errp
)
2031 num
= dnet_mix_states(n
, id
, &g
);
2037 data
= dnet_read_data_wait_groups(n
, id
, g
, num
, io
, cflags
, &err
);
2048 int dnet_write_data_wait(struct dnet_node
*n
, struct dnet_io_control
*ctl
, void **result
)
2050 int err
, trans_num
= 0;
2051 struct dnet_wait
*w
;
2052 struct dnet_write_completion
*wc
;
2054 wc
= malloc(sizeof(struct dnet_write_completion
));
2059 memset(wc
, 0, sizeof(struct dnet_write_completion
));
2061 w
= dnet_wait_alloc(0);
2069 w
->status
= -ENOENT
;
2071 ctl
->complete
= dnet_write_complete
;
2073 ctl
->cmd
= DNET_CMD_WRITE
;
2074 ctl
->cflags
|= DNET_FLAGS_NEED_ACK
;
2076 memcpy(ctl
->io
.id
, ctl
->id
.id
, DNET_ID_SIZE
);
2077 memcpy(ctl
->io
.parent
, ctl
->id
.id
, DNET_ID_SIZE
);
2079 atomic_set(&w
->refcnt
, INT_MAX
);
2080 trans_num
= dnet_write_object(n
, ctl
);
2085 * 1 - the first reference counter we grabbed at allocation time
2087 atomic_sub(&w
->refcnt
, INT_MAX
- trans_num
- 1);
2089 err
= dnet_wait_event(w
, w
->cond
== trans_num
, &n
->wait_ts
);
2090 if (err
|| w
->status
) {
2093 dnet_log(n
, DNET_LOG_NOTICE
, "%s: failed to wait for IO write completion, err: %d, status: %d.\n",
2094 dnet_dump_id(&ctl
->id
), err
, w
->status
);
2097 if (err
|| !trans_num
) {
2100 dnet_log(n
, DNET_LOG_ERROR
, "Failed to write data into the storage, err: %d, trans_num: %d.\n", err
, trans_num
);
2105 dnet_log(n
, DNET_LOG_NOTICE
, "%s: wrote: %llu bytes, type: %d, reply size: %d.\n",
2106 dnet_dump_id(&ctl
->id
), (unsigned long long)ctl
->io
.size
, ctl
->io
.type
, wc
->size
);
2109 *result
= wc
->reply
;
2115 dnet_write_complete_free(wc
);
2120 int dnet_lookup_addr(struct dnet_node
*n
, const void *remote
, int len
, struct dnet_id
*id
, int group_id
, char *dst
, int dlen
)
2123 struct dnet_net_state
*st
;
2127 dnet_transform(n
, remote
, len
, &raw
);
2130 id
->group_id
= group_id
;
2132 st
= dnet_state_get_first(n
, id
);
2136 dnet_server_convert_dnet_addr_raw(dnet_state_addr(st
), dst
, dlen
);
2144 struct dnet_weight
{
2149 static int dnet_weight_compare(const void *v1
, const void *v2
)
2151 const struct dnet_weight
*w1
= v1
;
2152 const struct dnet_weight
*w2
= v2
;
2154 return w2
->weight
- w1
->weight
;
2157 static int dnet_weight_get_winner(struct dnet_weight
*w
, int num
)
2163 for (i
= 0; i
< num
; ++i
)
2166 r
= (float)rand() / (float)RAND_MAX
;
2169 for (i
= 0; i
< num
; ++i
) {
2178 int dnet_mix_states(struct dnet_node
*n
, struct dnet_id
*id
, int **groupsp
)
2180 struct dnet_weight
*weights
;
2182 int group_num
, i
, num
;
2183 struct dnet_net_state
*st
;
2188 pthread_mutex_lock(&n
->group_lock
);
2189 group_num
= n
->group_num
;
2191 weights
= alloca(n
->group_num
* sizeof(*weights
));
2192 groups
= malloc(n
->group_num
* sizeof(*groups
));
2194 memcpy(groups
, n
->groups
, n
->group_num
* sizeof(*groups
));
2195 pthread_mutex_unlock(&n
->group_lock
);
2202 if (n
->flags
& DNET_CFG_RANDOMIZE_STATES
) {
2203 for (i
= 0; i
< group_num
; ++i
) {
2204 weights
[i
].weight
= rand();
2205 weights
[i
].group_id
= groups
[i
];
2209 if (!(n
->flags
& DNET_CFG_MIX_STATES
)) {
2214 memset(weights
, 0, group_num
* sizeof(*weights
));
2216 for (i
= 0, num
= 0; i
< group_num
; ++i
) {
2217 id
->group_id
= groups
[i
];
2219 st
= dnet_state_get_first(n
, id
);
2221 weights
[num
].weight
= (int)st
->weight
;
2222 weights
[num
].group_id
= id
->group_id
;
2233 qsort(weights
, group_num
, sizeof(struct dnet_weight
), dnet_weight_compare
);
2235 for (i
= 0; i
< group_num
; ++i
) {
2236 int pos
= dnet_weight_get_winner(weights
, group_num
- i
);
2237 groups
[i
] = weights
[pos
].group_id
;
2239 if (pos
< group_num
- 1)
2240 memmove(&weights
[pos
], &weights
[pos
+ 1], (group_num
- 1 - pos
) * sizeof(struct dnet_weight
));
2244 dnet_node_set_groups(n
, groups
, group_num
);
2250 int dnet_data_map(struct dnet_map_fd
*map
)
2253 long page_size
= sysconf(_SC_PAGE_SIZE
);
2256 off
= map
->offset
& ~(page_size
- 1);
2257 map
->mapped_size
= ALIGN(map
->size
+ map
->offset
- off
, page_size
);
2259 map
->mapped_data
= mmap(NULL
, map
->mapped_size
, PROT_READ
, MAP_SHARED
, map
->fd
, off
);
2260 if (map
->mapped_data
== MAP_FAILED
) {
2265 map
->data
= map
->mapped_data
+ map
->offset
- off
;
2271 void dnet_data_unmap(struct dnet_map_fd
*map
)
2273 munmap(map
->mapped_data
, map
->mapped_size
);
2276 struct dnet_io_attr
*dnet_remove_range(struct dnet_node
*n
, struct dnet_io_attr
*io
, int group_id
, uint64_t cflags
, int *ret_num
, int *errp
)
2279 struct dnet_io_attr
*ret
, *new_ret
;
2280 struct dnet_raw_id start
, next
;
2281 struct dnet_raw_id end
;
2282 uint64_t size
= io
->size
;
2284 int err
, need_exit
= 0;
2286 memcpy(end
.id
, io
->parent
, DNET_ID_SIZE
);
2288 dnet_setup_id(&id
, group_id
, io
->id
);
2293 while (!need_exit
) {
2294 err
= dnet_search_range(n
, &id
, &start
, &next
);
2298 if ((dnet_id_cmp_str(id
.id
, next
.id
) > 0) ||
2299 !memcmp(start
.id
, next
.id
, DNET_ID_SIZE
) ||
2300 (dnet_id_cmp_str(next
.id
, end
.id
) > 0)) {
2301 memcpy(next
.id
, end
.id
, DNET_ID_SIZE
);
2305 if (n
->log
->log_level
> DNET_LOG_NOTICE
) {
2307 char start_id
[2*len
+ 1];
2308 char next_id
[2*len
+ 1];
2309 char end_id
[2*len
+ 1];
2310 char id_str
[2*len
+ 1];
2312 dnet_log(n
, DNET_LOG_NOTICE
, "id: %s, start: %s: next: %s, end: %s, size: %llu, cmp: %d\n",
2313 dnet_dump_id_len_raw(id
.id
, len
, id_str
),
2314 dnet_dump_id_len_raw(start
.id
, len
, start_id
),
2315 dnet_dump_id_len_raw(next
.id
, len
, next_id
),
2316 dnet_dump_id_len_raw(end
.id
, len
, end_id
),
2317 (unsigned long long)size
, dnet_id_cmp_str(next
.id
, end
.id
));
2320 memcpy(io
->id
, id
.id
, DNET_ID_SIZE
);
2321 memcpy(io
->parent
, next
.id
, DNET_ID_SIZE
);
2325 data
= dnet_read_data_wait_raw(n
, &id
, io
, DNET_CMD_DEL_RANGE
, cflags
, &err
);
2326 if (io
->size
!= sizeof(struct dnet_io_attr
)) {
2332 struct dnet_io_attr
*rep
= (struct dnet_io_attr
*)data
;
2334 dnet_convert_io_attr(rep
);
2336 dnet_log(n
, DNET_LOG_NOTICE
, "%s: rep_num: %llu, io_start: %llu, io_num: %llu, io_size: %llu\n",
2337 dnet_dump_id(&id
), (unsigned long long)rep
->num
, (unsigned long long)io
->start
,
2338 (unsigned long long)io
->num
, (unsigned long long)io
->size
);
2342 new_ret
= realloc(ret
, *ret_num
* sizeof(struct dnet_io_attr
));
2349 ret
[*ret_num
- 1] = *rep
;
2354 memcpy(id
.id
, next
.id
, DNET_ID_SIZE
);
2363 struct dnet_range_data
*dnet_read_range(struct dnet_node
*n
, struct dnet_io_attr
*io
, int group_id
, uint64_t cflags
, int *errp
)
2367 struct dnet_range_data
*ret
;
2368 struct dnet_raw_id start
, next
;
2369 struct dnet_raw_id end
;
2370 uint64_t size
= io
->size
;
2372 int err
, need_exit
= 0;
2374 memcpy(end
.id
, io
->parent
, DNET_ID_SIZE
);
2376 dnet_setup_id(&id
, group_id
, io
->id
);
2381 while (!need_exit
) {
2382 err
= dnet_search_range(n
, &id
, &start
, &next
);
2386 if ((dnet_id_cmp_str(id
.id
, next
.id
) > 0) ||
2387 !memcmp(start
.id
, next
.id
, DNET_ID_SIZE
) ||
2388 (dnet_id_cmp_str(next
.id
, end
.id
) > 0)) {
2389 memcpy(next
.id
, end
.id
, DNET_ID_SIZE
);
2393 if (n
->log
->log_level
> DNET_LOG_NOTICE
) {
2395 char start_id
[2*len
+ 1];
2396 char next_id
[2*len
+ 1];
2397 char end_id
[2*len
+ 1];
2398 char id_str
[2*len
+ 1];
2400 dnet_log(n
, DNET_LOG_NOTICE
, "id: %s, start: %s: next: %s, end: %s, size: %llu, cmp: %d\n",
2401 dnet_dump_id_len_raw(id
.id
, len
, id_str
),
2402 dnet_dump_id_len_raw(start
.id
, len
, start_id
),
2403 dnet_dump_id_len_raw(next
.id
, len
, next_id
),
2404 dnet_dump_id_len_raw(end
.id
, len
, end_id
),
2405 (unsigned long long)size
, dnet_id_cmp_str(next
.id
, end
.id
));
2408 memcpy(io
->id
, id
.id
, DNET_ID_SIZE
);
2409 memcpy(io
->parent
, next
.id
, DNET_ID_SIZE
);
2413 data
= dnet_read_data_wait_raw(n
, &id
, io
, DNET_CMD_READ_RANGE
, cflags
, &err
);
2415 struct dnet_io_attr
*rep
= data
+ io
->size
- sizeof(struct dnet_io_attr
);
2417 /* If DNET_IO_FLAGS_NODATA is set do not decrement size as 'rep' is the only structure in output */
2418 if (!(io
->flags
& DNET_IO_FLAGS_NODATA
))
2419 io
->size
-= sizeof(struct dnet_io_attr
);
2420 dnet_convert_io_attr(rep
);
2422 dnet_log(n
, DNET_LOG_NOTICE
, "%s: rep_num: %llu, io_start: %llu, io_num: %llu, io_size: %llu\n",
2423 dnet_dump_id(&id
), (unsigned long long)rep
->num
, (unsigned long long)io
->start
,
2424 (unsigned long long)io
->num
, (unsigned long long)io
->size
);
2426 if (io
->start
< rep
->num
) {
2427 rep
->num
-= io
->start
;
2429 io
->num
-= rep
->num
;
2431 if (!io
->size
&& !(io
->flags
& DNET_IO_FLAGS_NODATA
)) {
2434 struct dnet_range_data
*new_ret
;
2438 new_ret
= realloc(ret
, ret_num
* sizeof(struct dnet_range_data
));
2445 ret
[ret_num
- 1].data
= data
;
2446 ret
[ret_num
- 1].size
= io
->size
;
2453 io
->start
-= rep
->num
;
2457 memcpy(id
.id
, next
.id
, DNET_ID_SIZE
);
2469 struct dnet_read_latest_id
{
2471 struct dnet_file_info fi
;
2474 struct dnet_read_latest_ctl
{
2475 struct dnet_wait
*w
;
2477 pthread_mutex_t lock
;
2479 struct dnet_read_latest_id ids
[0];
2482 static void dnet_read_latest_ctl_put(struct dnet_read_latest_ctl
*ctl
)
2484 dnet_wakeup(ctl
->w
, ctl
->w
->cond
++);
2485 if (atomic_dec_and_test(&ctl
->w
->refcnt
)) {
2486 dnet_wait_destroy(ctl
->w
);
2487 pthread_mutex_destroy(&ctl
->lock
);
2492 static int dnet_read_latest_complete(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *priv
)
2494 struct dnet_read_latest_ctl
*ctl
= priv
;
2495 struct dnet_node
*n
;
2496 struct dnet_addr_attr
*a
;
2497 struct dnet_file_info
*fi
;
2500 if (is_trans_destroyed(st
, cmd
)) {
2501 dnet_read_latest_ctl_put(ctl
);
2508 if (err
|| !cmd
->size
)
2511 if (cmd
->size
< sizeof(struct dnet_addr_attr
) + sizeof(struct dnet_file_info
)) {
2512 dnet_log(n
, DNET_LOG_ERROR
, "%s: wrong dnet_addr attribute size %llu, must be at least %zu.\n",
2513 dnet_dump_id(&cmd
->id
), (unsigned long long)cmd
->size
,
2514 sizeof(struct dnet_addr_attr
) + sizeof(struct dnet_file_info
));
2518 a
= (struct dnet_addr_attr
*)(cmd
+ 1);
2519 fi
= (struct dnet_file_info
*)(a
+ 1);
2521 dnet_convert_addr_attr(a
);
2522 dnet_convert_file_info(fi
);
2524 pthread_mutex_lock(&ctl
->lock
);
2526 pthread_mutex_unlock(&ctl
->lock
);
2528 /* we do not care about filename */
2529 memcpy(&ctl
->ids
[pos
].fi
, fi
, sizeof(struct dnet_file_info
));
2530 memcpy(&ctl
->ids
[pos
].id
, &cmd
->id
, sizeof(struct dnet_id
));
2536 static int dnet_file_read_latest_cmp(const void *p1
, const void *p2
)
2538 const struct dnet_read_latest_id
*id1
= p1
;
2539 const struct dnet_read_latest_id
*id2
= p2
;
2541 int ret
= (int)(id2
->fi
.mtime
.tsec
- id1
->fi
.mtime
.tsec
);
2544 ret
= (int)(id2
->fi
.mtime
.tnsec
- id1
->fi
.mtime
.tnsec
);
2549 int dnet_read_latest_prepare(struct dnet_read_latest_prepare
*pr
)
2551 struct dnet_read_latest_ctl
*ctl
;
2552 int group_id
= pr
->id
.group_id
;
2555 ctl
= malloc(sizeof(struct dnet_read_latest_ctl
) + sizeof(struct dnet_read_latest_id
) * pr
->group_num
);
2560 memset(ctl
, 0, sizeof(struct dnet_read_latest_ctl
));
2562 ctl
->w
= dnet_wait_alloc(0);
2568 err
= pthread_mutex_init(&ctl
->lock
, NULL
);
2570 goto err_out_put_wait
;
2572 ctl
->num
= pr
->group_num
;
2575 for (i
= 0; i
< pr
->group_num
; ++i
) {
2576 pr
->id
.group_id
= pr
->group
[i
];
2578 dnet_wait_get(ctl
->w
);
2579 dnet_lookup_object(pr
->n
, &pr
->id
, DNET_ATTR_META_TIMES
| pr
->cflags
, dnet_read_latest_complete
, ctl
);
2582 err
= dnet_wait_event(ctl
->w
, ctl
->w
->cond
== pr
->group_num
, &pr
->n
->wait_ts
);
2589 pr
->group_num
= ctl
->pos
;
2591 qsort(ctl
->ids
, pr
->group_num
, sizeof(struct dnet_read_latest_id
), dnet_file_read_latest_cmp
);
2593 for (i
= 0; i
< pr
->group_num
; ++i
) {
2594 pr
->group
[i
] = ctl
->ids
[i
].id
.group_id
;
2596 if (group_id
== pr
->group
[i
]) {
2597 const struct dnet_read_latest_id
*id0
= &ctl
->ids
[0];
2598 const struct dnet_read_latest_id
*id1
= &ctl
->ids
[i
];
2600 if (!dnet_file_read_latest_cmp(id0
, id1
)) {
2601 int tmp_group
= pr
->group
[0];
2602 pr
->group
[0] = pr
->group
[i
];
2603 pr
->group
[i
] = tmp_group
;
2609 dnet_read_latest_ctl_put(ctl
);
2613 dnet_wait_put(ctl
->w
);
2620 int dnet_read_latest(struct dnet_node
*n
, struct dnet_id
*id
, struct dnet_io_attr
*io
, uint64_t cflags
, void **datap
)
2622 struct dnet_read_latest_prepare pr
;
2623 int *g
, num
, err
, i
;
2625 if ((int)io
->num
> n
->group_num
) {
2630 err
= dnet_mix_states(n
, id
, &g
);
2636 if ((int)io
->num
> num
) {
2641 memset(&pr
, 0, sizeof(struct dnet_read_latest_prepare
));
2649 err
= dnet_read_latest_prepare(&pr
);
2654 for (i
= 0; i
< pr
.group_num
; ++i
) {
2657 id
->group_id
= pr
.group
[i
];
2658 data
= dnet_read_data_wait_raw(n
, id
, io
, DNET_CMD_READ
, cflags
, &err
);
2660 if ((pr
.group_num
!= num
) || ((i
!= 0) && (io
->type
== 0) && (io
->offset
== 0))) {
2661 dnet_read_recover(n
, id
, io
, data
, cflags
);
2676 int dnet_get_routes(struct dnet_node
*n
, struct dnet_id
**ids
, struct dnet_addr
**addrs
) {
2678 struct dnet_net_state
*st
;
2679 struct dnet_group
*g
;
2680 struct dnet_addr
*tmp_addrs
;
2681 struct dnet_id
*tmp_ids
;
2682 int size
= 0, count
= 0;
2688 pthread_mutex_lock(&n
->state_lock
);
2689 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
2690 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
2692 size
+= st
->idc
->id_num
;
2694 tmp_ids
= (struct dnet_id
*)realloc(*ids
, size
* sizeof(struct dnet_id
));
2701 tmp_addrs
= (struct dnet_addr
*)realloc(*addrs
, size
* sizeof(struct dnet_addr
));
2708 for (i
= 0; i
< st
->idc
->id_num
; ++i
) {
2709 dnet_setup_id(&(*ids
)[count
], g
->group_id
, st
->idc
->ids
[i
].raw
.id
);
2710 memcpy(&(*addrs
)[count
], dnet_state_addr(st
), sizeof(struct dnet_addr
));
2712 //fprintf(stderr, "%d: %s -> %s\n", g->group_id, dnet_dump_id_str(st->idc->ids[i].raw.id), dnet_state_dump_addr(st));
2716 pthread_mutex_unlock(&n
->state_lock
);
2730 void *dnet_bulk_read_wait_raw(struct dnet_node
*n
, struct dnet_id
*id
, struct dnet_io_attr
*ios
,
2731 uint32_t io_num
, int cmd
, uint64_t cflags
, int *errp
)
2733 struct dnet_io_control ctl
;
2734 struct dnet_io_attr io
;
2735 struct dnet_wait
*w
;
2736 struct dnet_read_data_completion
*c
;
2740 w
= dnet_wait_alloc(0);
2746 c
= malloc(sizeof(*c
));
2755 /* one for completion callback, another for this function */
2756 atomic_init(&c
->refcnt
, 2);
2758 memset(&ctl
, 0, sizeof(struct dnet_io_control
));
2763 ctl
.complete
= dnet_read_data_complete
;
2766 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
2768 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
2769 memset(&ctl
.io
, 0, sizeof(struct dnet_io_attr
));
2771 memcpy(io
.id
, id
->id
, DNET_ID_SIZE
);
2772 memcpy(io
.parent
, id
->id
, DNET_ID_SIZE
);
2774 ctl
.io
.size
= io_num
* sizeof(struct dnet_io_attr
);
2778 err
= dnet_read_object(n
, &ctl
);
2780 goto err_out_put_complete
;
2782 err
= dnet_wait_event(w
, w
->cond
, &n
->wait_ts
);
2783 if (err
|| w
->status
) {
2784 char id_str
[2*DNET_ID_SIZE
+ 1];
2787 if ((cmd
!= DNET_CMD_READ_RANGE
) || (err
!= -ENOENT
))
2788 dnet_log(n
, DNET_LOG_ERROR
, "%d:%s : failed to read data: %d\n",
2789 ctl
.id
.group_id
, dnet_dump_id_len_raw(ctl
.id
.id
, DNET_ID_SIZE
, id_str
), err
);
2790 goto err_out_put_complete
;
2795 err_out_put_complete
:
2796 if (atomic_dec_and_test(&c
->refcnt
))
2806 static int dnet_io_attr_cmp(const void *d1
, const void *d2
)
2808 const struct dnet_io_attr
*io1
= d1
;
2809 const struct dnet_io_attr
*io2
= d2
;
2811 return memcmp(io1
->id
, io2
->id
, DNET_ID_SIZE
);
2814 struct dnet_range_data
*dnet_bulk_read(struct dnet_node
*n
, struct dnet_io_attr
*ios
, uint32_t io_num
, int group_id
, uint64_t cflags
, int *errp
)
2816 struct dnet_id id
, next_id
;
2818 struct dnet_range_data
*ret
;
2819 struct dnet_net_state
*cur
, *next
= NULL
;
2823 uint32_t i
, start
= -1;
2829 qsort(ios
, io_num
, sizeof(struct dnet_io_attr
), dnet_io_attr_cmp
);
2835 dnet_setup_id(&id
, group_id
, ios
[0].id
);
2836 id
.type
= ios
[0].type
;
2838 cur
= dnet_state_get_first(n
, &id
);
2840 dnet_log(n
, DNET_LOG_ERROR
, "%s: Can't get state for id\n", dnet_dump_id(&id
));
2845 for (i
= 0; i
< io_num
; ++i
) {
2846 if ((i
+ 1) < io_num
) {
2847 dnet_setup_id(&next_id
, group_id
, ios
[i
+1].id
);
2848 next_id
.type
= ios
[i
+1].type
;
2850 next
= dnet_state_get_first(n
, &next_id
);
2852 dnet_log(n
, DNET_LOG_ERROR
, "%s: Can't get state for id\n", dnet_dump_id(&next_id
));
2857 /* Send command only if state changes or it's a last id */
2858 if ((cur
== next
)) {
2859 dnet_state_put(next
);
2865 dnet_log(n
, DNET_LOG_NOTICE
, "start: %s: end: %s, count: %llu, addr: %s\n",
2867 dnet_dump_id(&next_id
),
2868 (unsigned long long)(i
- start
),
2869 dnet_state_dump_addr(cur
));
2871 data
= dnet_bulk_read_wait_raw(n
, &id
, ios
, i
- start
, DNET_CMD_BULK_READ
, cflags
, &err
);
2879 struct dnet_range_data
*new_ret
;
2883 new_ret
= realloc(ret
, ret_num
* sizeof(struct dnet_range_data
));
2890 ret
[ret_num
- 1].data
= data
;
2891 ret
[ret_num
- 1].size
= size
;
2897 dnet_state_put(cur
);
2900 memcpy(&id
, &next_id
, sizeof(struct dnet_id
));
2905 dnet_state_put(next
);
2906 dnet_state_put(cur
);
2916 struct dnet_range_data
dnet_bulk_write(struct dnet_node
*n
, struct dnet_io_control
*ctl
, int ctl_num
, int *errp
)
2918 int err
, i
, trans_num
= 0, local_trans_num
;
2919 struct dnet_wait
*w
;
2920 struct dnet_write_completion
*wc
;
2921 struct dnet_range_data ret
;
2922 struct dnet_metadata_control mcl
;
2923 struct dnet_meta_container mc
;
2924 struct dnet_io_control meta_ctl
;
2929 memset(&ret
, 0, sizeof(ret
));
2931 wc
= malloc(sizeof(struct dnet_write_completion
));
2936 memset(wc
, 0, sizeof(struct dnet_write_completion
));
2938 w
= dnet_wait_alloc(0);
2946 atomic_set(&w
->refcnt
, INT_MAX
);
2947 w
->status
= -ENOENT
;
2949 for (i
= 0; i
< ctl_num
; ++i
) {
2951 ctl
[i
].complete
= dnet_write_complete
;
2953 ctl
[i
].cmd
= DNET_CMD_WRITE
;
2954 ctl
[i
].cflags
= DNET_FLAGS_NEED_ACK
;
2956 memcpy(ctl
[i
].io
.id
, ctl
[i
].id
.id
, DNET_ID_SIZE
);
2957 memcpy(ctl
[i
].io
.parent
, ctl
[i
].id
.id
, DNET_ID_SIZE
);
2959 local_trans_num
= dnet_write_object(n
, &ctl
[i
]);
2960 if (local_trans_num
< 0)
2961 local_trans_num
= 0;
2963 trans_num
+= local_trans_num
;
2965 /* Prepare and send metadata */
2966 memset(&mcl
, 0, sizeof(mcl
));
2968 pthread_mutex_lock(&n
->group_lock
);
2969 group_num
= n
->group_num
;
2970 groups
= alloca(group_num
* sizeof(int));
2972 memcpy(groups
, n
->groups
, group_num
* sizeof(int));
2973 pthread_mutex_unlock(&n
->group_lock
);
2975 mcl
.groups
= groups
;
2976 mcl
.group_num
= group_num
;
2978 mcl
.cflags
= ctl
[i
].cflags
;
2980 gettimeofday(&tv
, NULL
);
2981 mcl
.ts
.tv_sec
= tv
.tv_sec
;
2982 mcl
.ts
.tv_nsec
= tv
.tv_usec
* 1000;
2984 memset(&mc
, 0, sizeof(mc
));
2986 err
= dnet_create_metadata(n
, &mcl
, &mc
);
2987 dnet_log(n
, DNET_LOG_DEBUG
, "Creating metadata: err: %d", err
);
2989 dnet_convert_metadata(n
, mc
.data
, mc
.size
);
2991 memset(&meta_ctl
, 0, sizeof(struct dnet_io_control
));
2994 meta_ctl
.complete
= dnet_write_complete
;
2995 meta_ctl
.cmd
= DNET_CMD_WRITE
;
2998 meta_ctl
.cflags
= ctl
[i
].cflags
;
3000 memcpy(&meta_ctl
.id
, &ctl
[i
].id
, sizeof(struct dnet_id
));
3001 memcpy(meta_ctl
.io
.id
, ctl
[i
].id
.id
, DNET_ID_SIZE
);
3002 memcpy(meta_ctl
.io
.parent
, ctl
[i
].id
.id
, DNET_ID_SIZE
);
3003 meta_ctl
.id
.type
= meta_ctl
.io
.type
= EBLOB_TYPE_META
;
3005 meta_ctl
.io
.flags
|= DNET_IO_FLAGS_META
;
3006 meta_ctl
.io
.offset
= 0;
3007 meta_ctl
.io
.size
= mc
.size
;
3008 meta_ctl
.data
= mc
.data
;
3010 local_trans_num
= dnet_write_object(n
, &meta_ctl
);
3011 if (local_trans_num
< 0)
3012 local_trans_num
= 0;
3014 trans_num
+= local_trans_num
;
3019 * 1 - the first reference counter we grabbed at allocation time
3021 atomic_sub(&w
->refcnt
, INT_MAX
- trans_num
- 1);
3023 err
= dnet_wait_event(w
, w
->cond
== trans_num
, &n
->wait_ts
);
3024 if (err
|| w
->status
) {
3027 dnet_log(n
, DNET_LOG_NOTICE
, "%s: failed to wait for IO write completion, err: %d, status: %d.\n",
3028 dnet_dump_id(&ctl
->id
), err
, w
->status
);
3031 if (err
|| !trans_num
) {
3034 dnet_log(n
, DNET_LOG_ERROR
, "Failed to write data into the storage, err: %d, trans_num: %d.\n", err
, trans_num
);
3039 dnet_log(n
, DNET_LOG_NOTICE
, "%s: successfully wrote %llu bytes into the storage, reply size: %d.\n",
3040 dnet_dump_id(&ctl
->id
), (unsigned long long)ctl
->io
.size
, wc
->size
);
3043 ret
.data
= wc
->reply
;
3044 ret
.size
= wc
->size
;
3049 dnet_write_complete_free(wc
);
3055 int dnet_flags(struct dnet_node
*n
)
3060 static int dnet_start_defrag_complete(struct dnet_net_state
*state
, struct dnet_cmd
*cmd
, void *priv
)
3062 struct dnet_wait
*w
= priv
;
3064 if (is_trans_destroyed(state
, cmd
)) {
3065 dnet_wakeup(w
, w
->cond
++);
3073 static int dnet_start_defrag_single(struct dnet_net_state
*st
, void *priv
, uint64_t cflags
)
3075 struct dnet_trans_control ctl
;
3077 memset(&ctl
, 0, sizeof(struct dnet_trans_control
));
3079 dnet_setup_id(&ctl
.id
, st
->idc
->group
->group_id
, st
->idc
->ids
[0].raw
.id
);
3080 ctl
.cmd
= DNET_CMD_DEFRAG
;
3081 ctl
.complete
= dnet_start_defrag_complete
;
3083 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| cflags
;
3085 return dnet_trans_alloc_send_state(st
, &ctl
);
3088 int dnet_start_defrag(struct dnet_node
*n
, uint64_t cflags
)
3090 struct dnet_net_state
*st
;
3091 struct dnet_wait
*w
;
3092 struct dnet_group
*g
;
3096 w
= dnet_wait_alloc(0);
3102 pthread_mutex_lock(&n
->state_lock
);
3103 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
3104 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
3111 dnet_start_defrag_single(st
, w
, cflags
);
3115 pthread_mutex_unlock(&n
->state_lock
);
3117 err
= dnet_wait_event(w
, w
->cond
== num
, &n
->wait_ts
);