2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
27 #include "elliptics.h"
28 #include "elliptics/interface.h"
30 ssize_t
dnet_db_read_raw(struct eblob_backend
*b
, struct dnet_raw_id
*id
, void **datap
)
34 uint64_t offset
, size
;
37 memcpy(key
.id
, id
->id
, DNET_ID_SIZE
);
39 err
= eblob_read(b
, &key
, &fd
, &offset
, &size
, EBLOB_TYPE_META
);
50 err
= pread(fd
, data
, size
, offset
);
51 if (err
!= (int)size
) {
66 int dnet_db_write_raw(struct eblob_backend
*b
, struct dnet_raw_id
*id
, void *data
, unsigned int size
)
68 struct eblob_write_control wc
;
72 memset(&wc
, 0, sizeof(struct eblob_write_control
));
73 memcpy(key
.id
, id
->id
, DNET_ID_SIZE
);
74 err
= eblob_write(b
, &key
, data
, 0, size
, BLOB_DISK_CTL_NOCSUM
, EBLOB_TYPE_META
);
80 wc
.flags
= BLOB_DISK_CTL_NOCSUM
;
81 wc
.type
= EBLOB_TYPE_META
;
82 err
= eblob_write_commit(b
, &key
, NULL
, 0, &wc
);
88 static int dnet_db_remove_direct(struct eblob_backend
*b
, struct dnet_raw_id
*id
)
92 memcpy(key
.id
, id
->id
, EBLOB_ID_SIZE
);
93 return eblob_remove(b
, &key
, EBLOB_TYPE_META
);
96 int dnet_db_remove_raw(struct eblob_backend
*b
, struct dnet_raw_id
*id
, int real_del
)
99 dnet_db_remove_direct(b
, id
);
103 return dnet_update_ts_metadata(b
, id
, DNET_IO_FLAGS_REMOVED
, 0);
106 int dnet_update_ts_metadata(struct eblob_backend
*b
, struct dnet_raw_id
*id
, uint64_t flags_set
, uint64_t flags_clear
)
109 struct dnet_meta_container mc
;
112 memset(&mc
, 0, sizeof(struct dnet_meta_container
));
114 err
= dnet_db_read_raw(b
, id
, &mc
.data
);
116 m
= malloc(sizeof(struct dnet_meta
) + sizeof(struct dnet_meta_update
));
121 dnet_create_meta_update(m
, NULL
, flags_set
, flags_clear
);
124 mc
.size
= sizeof(struct dnet_meta_update
) + sizeof(struct dnet_meta
);
126 err
= dnet_update_ts_metadata_raw(&mc
, flags_set
, flags_clear
);
128 /* broken metadata, rewrite it */
129 if (err
!= -ENOENT
) {
136 mc
.data
= realloc(mc
.data
, mc
.size
+ sizeof(struct dnet_meta
) + sizeof(struct dnet_meta_update
));
142 m
= mc
.data
+ mc
.size
;
143 mc
.size
+= sizeof(struct dnet_meta
) + sizeof(struct dnet_meta_update
);
145 dnet_create_meta_update(m
, NULL
, flags_set
, flags_clear
);
149 err
= dnet_db_write_raw(b
, id
, mc
.data
, mc
.size
);
160 int dnet_process_meta(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, struct dnet_io_attr
*io
)
162 struct dnet_node
*n
= st
->n
;
163 struct dnet_raw_id id
;
167 if (cmd
->cmd
== DNET_CMD_READ
|| cmd
->cmd
== DNET_CMD_WRITE
) {
168 if (cmd
->size
< sizeof(struct dnet_io_attr
)) {
169 dnet_log(n
, DNET_LOG_ERROR
,
170 "%s: wrong read attribute, size does not match "
171 "IO attribute size: size: %llu, must be: %zu.\n",
172 dnet_dump_id(&cmd
->id
), (unsigned long long)cmd
->size
,
173 sizeof(struct dnet_io_attr
));
178 memcpy(id
.id
, io
->id
, DNET_ID_SIZE
);
183 err
= n
->cb
->meta_read(n
->cb
->command_private
, &id
, &data
);
186 err
= dnet_send_read_data(st
, cmd
, io
, data
, -1, io
->offset
, 0);
191 if (n
->flags
& DNET_CFG_NO_META
) {
198 err
= n
->cb
->meta_write(n
->cb
->command_private
, &id
, data
, io
->size
);
201 memcpy(id
.id
, cmd
->id
.id
, DNET_ID_SIZE
);
202 n
->cb
->meta_remove(n
->cb
->command_private
, &id
, !!(cmd
->flags
& DNET_ATTR_DELETE_HISTORY
));
203 err
= n
->cb
->command_handler(st
, n
->cb
->command_private
, cmd
, io
);
214 struct dnet_db_list_control
{
216 struct dnet_net_state
*st
;
217 struct dnet_cmd
*cmd
;
218 struct dnet_check_request
*req
;
219 struct dnet_check_params params
;
226 static long long dnet_meta_get_ts(struct dnet_node
*n
, struct dnet_meta_container
*mc
)
229 struct dnet_meta_check_status
*c
;
231 m
= dnet_meta_search(n
, mc
, DNET_META_CHECK_STATUS
);
235 c
= (struct dnet_meta_check_status
*)m
->data
;
236 dnet_convert_meta_check_status(c
);
238 return (long long)c
->tm
.tsec
;
241 static int dnet_db_send_check_reply(struct dnet_db_list_control
*ctl
)
243 struct dnet_check_reply reply
;
245 memset(&reply
, 0, sizeof(reply
));
247 reply
.total
= atomic_read(&ctl
->total
);
248 reply
.errors
= atomic_read(&ctl
->errors
);
249 reply
.completed
= atomic_read(&ctl
->completed
);
251 dnet_convert_check_reply(&reply
);
252 return dnet_send_reply(ctl
->st
, ctl
->cmd
, &reply
, sizeof(reply
), 1);
255 struct dnet_check_temp_db
* dnet_check_temp_db_alloc(struct dnet_node
*n
, char *path
)
257 static char temp_meta_path
[310];
258 struct eblob_config ecfg
;
259 struct dnet_check_temp_db
*db
;
261 db
= (struct dnet_check_temp_db
*)malloc(sizeof(struct dnet_check_temp_db
));
263 dnet_log(n
, DNET_LOG_ERROR
, "Failed to allocate memory for temp meta eblob config\n");
267 snprintf(temp_meta_path
, sizeof(temp_meta_path
), "%s/tmp_meta", path
);
269 memset(&ecfg
, 0, sizeof(struct eblob_config
));
271 ecfg
.file
= temp_meta_path
;
273 db
->log
.log
= n
->log
->log
;
274 db
->log
.log_private
= n
->log
->log_private
;
275 db
->log
.log_level
= EBLOB_LOG_NOTICE
;
278 db
->b
= eblob_init(&ecfg
);
280 dnet_log(n
, DNET_LOG_ERROR
, "Failed to initialize temp meta eblob\n");
284 atomic_init(&db
->refcnt
, 1);
293 int dnet_db_iterate(struct eblob_backend
*b
, struct dnet_iterate_ctl
*dctl
)
295 struct eblob_iterate_control ctl
;
297 memset(&ctl
, 0, sizeof(ctl
));
299 ctl
.flags
= dctl
->flags
| EBLOB_ITERATE_FLAGS_ALL
;
300 ctl
.priv
= dctl
->callback_private
;
301 ctl
.iterator_cb
= dctl
->iterate_cb
;
302 ctl
.start_type
= ctl
.max_type
= EBLOB_TYPE_META
;
303 ctl
.blob_start
= dctl
->blob_start
;
304 ctl
.blob_num
= dctl
->blob_num
;
306 return eblob_iterate(b
, &ctl
);
309 static int dnet_db_list_iter_init(struct eblob_iterate_control
*iter_ctl
, void **thread_priv
)
311 struct dnet_db_list_control
*ctl
= iter_ctl
->priv
;
312 struct dnet_node
*n
= ctl
->n
;
313 struct dnet_bulk_array
*bulk_array
= NULL
;
314 struct dnet_net_state
*st
;
315 struct dnet_group
*g
;
316 int only_merge
= !!(ctl
->req
->flags
& DNET_CHECK_MERGE
);
317 int bulk_array_tmp_num
;
320 dnet_log(n
, DNET_LOG_DEBUG
, "BULK: only_merge=%d\n", only_merge
);
322 bulk_array
= malloc(sizeof(struct dnet_bulk_array
));
327 atomic_init(&bulk_array
->refcnt
, 0);
329 bulk_array_tmp_num
= DNET_BULK_STATES_ALLOC_STEP
;
331 bulk_array
->states
= NULL
;
332 dnet_log(n
, DNET_LOG_DEBUG
, "BULK: allocating space for arrays, num=%d\n", bulk_array_tmp_num
);
334 bulk_array
->states
= (struct dnet_bulk_state
*)malloc(sizeof(struct dnet_bulk_state
) * bulk_array_tmp_num
);
335 if (!bulk_array
->states
) {
337 dnet_log(n
, DNET_LOG_ERROR
, "BULK: Failed to allocate buffer for bulk states array.\n");
341 pthread_mutex_lock(&n
->state_lock
);
342 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
343 if (g
->group_id
== n
->st
->idc
->group
->group_id
)
346 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
350 if (bulk_array
->num
== bulk_array_tmp_num
) {
351 dnet_log(n
, DNET_LOG_DEBUG
, "BULK: reallocating space for arrays, num=%d\n", bulk_array_tmp_num
);
352 bulk_array_tmp_num
+= DNET_BULK_STATES_ALLOC_STEP
;
353 bulk_array
->states
= realloc(bulk_array
->states
, sizeof(struct dnet_bulk_state
) * bulk_array_tmp_num
);
354 if (!bulk_array
->states
) {
356 dnet_log(n
, DNET_LOG_ERROR
, "BULK: Failed to reallocate buffer for bulk states array.\n");
361 memcpy(&bulk_array
->states
[bulk_array
->num
].addr
, &st
->addr
, sizeof(struct dnet_addr
));
362 pthread_mutex_init(&bulk_array
->states
[bulk_array
->num
].state_lock
, NULL
);
363 bulk_array
->states
[bulk_array
->num
].num
= 0;
364 bulk_array
->states
[bulk_array
->num
].ids
= NULL
;
366 bulk_array
->states
[bulk_array
->num
].ids
= malloc(sizeof(struct dnet_bulk_id
) * DNET_BULK_IDS_SIZE
);
367 if (!bulk_array
->states
[bulk_array
->num
].ids
) {
369 dnet_log(n
, DNET_LOG_ERROR
, "BULK: Failed to reallocate buffer for bulk states array.\n");
370 pthread_mutex_unlock(&n
->state_lock
);
374 dnet_log(n
, DNET_LOG_DEBUG
, "BULK: added state %s (%s)\n",
375 dnet_dump_id_str(st
->idc
->ids
[0].raw
.id
),
376 dnet_server_convert_dnet_addr(&st
->addr
));
380 pthread_mutex_unlock(&n
->state_lock
);
382 qsort(bulk_array
->states
, bulk_array
->num
, sizeof(struct dnet_bulk_state
), dnet_compare_bulk_state
);
385 *thread_priv
= bulk_array
;
392 static int dnet_db_list_iter_free(struct eblob_iterate_control
*iter_ctl
, void **thread_priv
)
394 struct dnet_db_list_control
*ctl
= iter_ctl
->priv
;
395 struct dnet_node
*n
= ctl
->n
;
396 struct dnet_bulk_array
*bulk_array
= *thread_priv
;
401 while(atomic_read(&bulk_array
->refcnt
) > 0)
404 for (i
= 0; i
< bulk_array
->num
; ++i
) {
405 dnet_log(n
, DNET_LOG_DEBUG
, "CHECK: free: processing state %d %s: %d ids in this state\n",
406 i
, dnet_server_convert_dnet_addr(&bulk_array
->states
[i
].addr
), bulk_array
->states
[i
].num
);
408 if (bulk_array
->states
[i
].num
> 0) {
409 err
= dnet_request_bulk_check(n
, &bulk_array
->states
[i
], &ctl
->params
);
411 dnet_log(n
, DNET_LOG_ERROR
, "CHECK: dnet_request_bulk_check failed, state %s, err %d\n",
412 dnet_server_convert_dnet_addr(&bulk_array
->states
[i
].addr
), err
);
415 free(bulk_array
->states
[i
].ids
);
418 free(bulk_array
->states
);
427 static int dnet_db_list_iter(struct eblob_disk_control
*dc
, struct eblob_ram_control
*rc
,
428 void *data
, void *p
, void *thread_priv
)
430 struct dnet_db_list_control
*ctl
= p
;
431 struct dnet_node
*n
= ctl
->n
;
432 struct dnet_meta_container mc
;
433 struct dnet_net_state
*tmp
;
434 struct dnet_bulk_array
*bulk_array
;
435 long long check_ts
, check_edge_ts
= ctl
->req
->timestamp
, update_ts
;
436 char check_time
[64], check_edge_time
[64], update_start
[64], update_stop
[64], update_time
[64];
438 int will_check
, should_be_merged
;
439 int send_check_reply
= 1;
442 bulk_array
= thread_priv
;
443 if (!bulk_array
&& !(ctl
->req
->flags
& DNET_CHECK_MERGE
)) {
444 dnet_log(n
, DNET_LOG_ERROR
, "CHECK: bulk_array is not initialized and check type is not MERGE_ONLY\n");
452 localtime_r((time_t *)&check_edge_ts
, &tm
);
453 strftime(check_edge_time
, sizeof(check_edge_time
), "%F %R:%S %Z", &tm
);
455 snprintf(check_edge_time
, sizeof(check_edge_time
), "no-check-edge");
458 if (ctl
->req
->updatestamp_start
) {
459 localtime_r((time_t *)&ctl
->req
->updatestamp_start
, &tm
);
460 strftime(update_start
, sizeof(update_start
), "%F %R:%S %Z", &tm
);
462 snprintf(update_start
, sizeof(update_start
), "all");
464 if (!ctl
->req
->updatestamp_stop
)
465 ctl
->req
->updatestamp_stop
= time(NULL
);
466 localtime_r((time_t *)&ctl
->req
->updatestamp_stop
, &tm
);
467 strftime(update_stop
, sizeof(update_stop
), "%F %R:%S %Z", &tm
);
470 dnet_setup_id(&mc
.id
, n
->id
.group_id
, dc
->key
.id
);
473 * Use group ID field to specify whether we should check number of copies
474 * or merge transaction with other history log in the storage
476 * tmp == NULL means this key belongs to given node and we should check
477 * number of its copies in the storage. If state is not NULL then given
478 * key must be moved to another machine and potentially merged with data
481 tmp
= dnet_state_get_first(n
, &mc
.id
);
482 should_be_merged
= (tmp
!= NULL
);
486 * If timestamp is specified check should be performed only to files
487 * that was not checked since that timestamp
489 check_ts
= dnet_meta_get_ts(n
, &mc
);
490 will_check
= !(check_edge_ts
&& (check_ts
> check_edge_ts
));
493 * If start/stop update stamp is specified check should be performed only to files
494 * that were created in that interval (inclusive)
498 struct dnet_meta_update mu
;
500 /* only try to check creation/update timestamp if it is really present in database */
501 if (dnet_get_meta_update(n
, &mc
, &mu
)) {
502 update_ts
= mu
.tm
.tsec
;
505 if ((mu
.tm
.tsec
>= ctl
->req
->updatestamp_start
) && (mu
.tm
.tsec
<= ctl
->req
->updatestamp_stop
))
510 if (will_check
&& !should_be_merged
&& (ctl
->req
->flags
& DNET_CHECK_MERGE
)) {
514 if (n
->log
->log_level
> DNET_LOG_NOTICE
) {
515 localtime_r((time_t *)&check_ts
, &tm
);
516 strftime(check_time
, sizeof(check_time
), "%F %R:%S %Z", &tm
);
518 localtime_r((time_t *)&update_ts
, &tm
);
519 strftime(update_time
, sizeof(update_time
), "%F %R:%S %Z", &tm
);
521 dnet_log_raw(n
, DNET_LOG_NOTICE
, "CHECK: start key: %s, "
522 "last check: %lld [%s], "
523 "last check before: %lld [%s], "
524 "created/updated: %lld [%s], "
525 "updated between: %lld [%s] - %lld [%s], "
526 "will check: %d, should_be_merged: %d, dry: %d, flags: %x, size: %u.\n",
527 dnet_dump_id(&mc
.id
),
528 check_ts
, check_time
,
529 check_edge_ts
, check_edge_time
,
530 update_ts
, update_time
,
531 (unsigned long long)ctl
->req
->updatestamp_start
, update_start
,
532 (unsigned long long)ctl
->req
->updatestamp_stop
, update_stop
,
533 will_check
, should_be_merged
,
534 !!(ctl
->req
->flags
& DNET_CHECK_DRY_RUN
), ctl
->req
->flags
, mc
.size
);
539 if (!(ctl
->req
->flags
& DNET_CHECK_DRY_RUN
)) {
540 err
= dnet_check(n
, &mc
, bulk_array
, should_be_merged
, &ctl
->params
);
542 dnet_log_raw(n
, DNET_LOG_NOTICE
, "CHECK: complete key: %s, merge: %d, err: %d\n",
543 dnet_dump_id(&mc
.id
), should_be_merged
, err
);
547 atomic_inc(&ctl
->completed
);
549 atomic_inc(&ctl
->errors
);
553 if ((atomic_inc(&ctl
->total
) % 30000) == 0) {
554 if (send_check_reply
) {
555 if (dnet_db_send_check_reply(ctl
))
556 send_check_reply
= 0;
559 dnet_log(n
, DNET_LOG_INFO
, "CHECK: total: %d, completed: %d, errors: %d\n",
560 atomic_read(&ctl
->total
), atomic_read(&ctl
->completed
), atomic_read(&ctl
->errors
));
566 int dnet_db_list(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
)
568 struct dnet_node
*n
= st
->n
;
569 struct dnet_db_list_control ctl
;
570 struct dnet_check_request
*r
, req
;
575 if (n
->check_in_progress
)
578 if (cmd
->size
< sizeof(struct dnet_check_request
)) {
579 dnet_log(n
, DNET_LOG_ERROR
, "%s: CHECK: invalid check request size %llu, must be %zu\n",
580 dnet_dump_id(&cmd
->id
), (unsigned long long)cmd
->size
, sizeof(struct dnet_check_request
));
584 r
= (struct dnet_check_request
*)(cmd
+ 1);
585 dnet_convert_check_request(r
);
587 n
->check_in_progress
= 1;
592 memcpy(&req
, r
, sizeof(req
));
594 memset(&ctl
, 0, sizeof(struct dnet_db_list_control
));
596 atomic_init(&ctl
.completed
, 0);
597 atomic_init(&ctl
.errors
, 0);
598 atomic_init(&ctl
.total
, 0);
604 ctl
.params
.db
= dnet_check_temp_db_alloc(n
, n
->temp_meta_env
);
605 if (!ctl
.params
.db
) {
611 localtime_r((time_t *)&req
.timestamp
, &tm
);
612 strftime(ctl_time
, sizeof(ctl_time
), "%F %R:%S %Z", &tm
);
614 snprintf(ctl_time
, sizeof(ctl_time
), "all records");
620 dnet_log(n
, DNET_LOG_INFO
, "CHECK: Started %u checking threads, recovering %llu transactions, "
621 "which started before %s: merge: %d, full: %d, dry: %d.\n",
622 req
.thread_num
, (unsigned long long)req
.obj_num
, ctl_time
,
623 !!(req
.flags
& DNET_CHECK_MERGE
), !!(req
.flags
& DNET_CHECK_FULL
),
624 !!(req
.flags
& DNET_CHECK_DRY_RUN
));
628 char str
[req
.group_num
*36+1], *ptr
;
632 groups
= (int *)((char *)(r
) + sizeof(struct dnet_check_request
) + r
->obj_num
* sizeof(struct dnet_id
));
636 for (i
= 0; i
< req
.group_num
; ++i
) {
637 err
= snprintf(ptr
, rest
, "%d:", groups
[i
]);
647 dnet_log(n
, DNET_LOG_INFO
, "CHECK: groups will be overrided with: %s\n", str
);
649 ctl
.params
.group_num
= req
.group_num
;
650 ctl
.params
.groups
= groups
;
653 dnet_ioprio_set(dnet_get_id(), n
->bg_ionice_class
, n
->bg_ionice_prio
);
655 if (req
.obj_num
> 0) {
656 struct dnet_id
*ids
= (struct dnet_id
*)(r
+ 1);
657 struct eblob_iterate_control iter_ctl
;
658 struct eblob_disk_control dc
;
659 struct eblob_ram_control rc
;
660 struct dnet_raw_id id
;
666 memset(&dc
, 0, sizeof(struct eblob_disk_control
));
667 memset(&rc
, 0, sizeof(struct eblob_ram_control
));
669 iter_ctl
.thread_num
= 1;
670 iter_ctl
.priv
= &ctl
;
671 dnet_db_list_iter_init(&iter_ctl
, &priv
);
673 for (i
= 0; i
< req
.obj_num
; ++i
) {
674 memcpy(&id
.id
, &ids
[i
].id
, DNET_ID_SIZE
);
675 err
= n
->cb
->meta_read(n
->cb
->command_private
, &id
, &data
);
678 memcpy(&dc
.key
.id
, &ids
[i
].id
, DNET_ID_SIZE
);
679 err
= dnet_db_list_iter(&dc
, &rc
, data
, &ctl
, priv
);
682 dnet_db_list_iter_free(&iter_ctl
, &priv
);
685 struct dnet_iterate_ctl dctl
;
687 memset(&dctl
, 0, sizeof(struct dnet_iterate_ctl
));
689 dctl
.iterate_private
= n
->cb
->command_private
;
691 dctl
.blob_start
= req
.blob_start
;
692 dctl
.blob_num
= req
.blob_num
;
693 dctl
.callback_private
= &ctl
;
695 dctl
.iterate_cb
.iterator
= dnet_db_list_iter
;
696 dctl
.iterate_cb
.iterator_init
= dnet_db_list_iter_init
;
697 dctl
.iterate_cb
.iterator_free
= dnet_db_list_iter_free
;
698 dctl
.iterate_cb
.thread_num
= req
.thread_num
;
700 err
= n
->cb
->meta_iterate(&dctl
);
703 if(r
->flags
& DNET_CHECK_MERGE
) {
704 dnet_counter_set(n
, DNET_CNTR_NODE_LAST_MERGE
, 0, atomic_read(&ctl
.completed
));
705 dnet_counter_set(n
, DNET_CNTR_NODE_LAST_MERGE
, 1, atomic_read(&ctl
.errors
));
708 dnet_db_send_check_reply(&ctl
);
710 dnet_check_temp_db_put(ctl
.params
.db
);
713 n
->check_in_progress
= 0;