Only work with 0.9.4 cocaine
[elliptics.git] / library / metadb.c
blobef25f57ee8b26039f701af9ee9775ea786d5f970
1 /*
2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
17 #include <sys/stat.h>
18 #include <sys/mman.h>
20 #include <errno.h>
21 #include <fcntl.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #include <unistd.h>
27 #include "elliptics.h"
28 #include "elliptics/interface.h"
30 ssize_t dnet_db_read_raw(struct eblob_backend *b, struct dnet_raw_id *id, void **datap)
32 struct eblob_key key;
33 void *data;
34 uint64_t offset, size;
35 int fd, err;
37 memcpy(key.id, id->id, DNET_ID_SIZE);
39 err = eblob_read(b, &key, &fd, &offset, &size, EBLOB_TYPE_META);
40 if (err) {
41 goto err_out_exit;
44 data = malloc(size);
45 if (!data) {
46 err = -ENOMEM;
47 goto err_out_exit;
50 err = pread(fd, data, size, offset);
51 if (err != (int)size) {
52 err = -errno;
53 goto err_out_free;
56 *datap = data;
58 return size;
60 err_out_free:
61 free(data);
62 err_out_exit:
63 return err;
66 int dnet_db_write_raw(struct eblob_backend *b, struct dnet_raw_id *id, void *data, unsigned int size)
68 struct eblob_write_control wc;
69 struct eblob_key key;
70 int err;
72 memset(&wc, 0, sizeof(struct eblob_write_control));
73 memcpy(key.id, id->id, DNET_ID_SIZE);
74 err = eblob_write(b, &key, data, 0, size, BLOB_DISK_CTL_NOCSUM, EBLOB_TYPE_META);
75 if (err)
76 goto err_out_exit;
78 wc.offset = 0;
79 wc.size = size;
80 wc.flags = BLOB_DISK_CTL_NOCSUM;
81 wc.type = EBLOB_TYPE_META;
82 err = eblob_write_commit(b, &key, NULL, 0, &wc);
84 err_out_exit:
85 return err;
88 static int dnet_db_remove_direct(struct eblob_backend *b, struct dnet_raw_id *id)
90 struct eblob_key key;
92 memcpy(key.id, id->id, EBLOB_ID_SIZE);
93 return eblob_remove(b, &key, EBLOB_TYPE_META);
96 int dnet_db_remove_raw(struct eblob_backend *b, struct dnet_raw_id *id, int real_del)
98 if (real_del) {
99 dnet_db_remove_direct(b, id);
100 return 1;
103 return dnet_update_ts_metadata(b, id, DNET_IO_FLAGS_REMOVED, 0);
106 int dnet_update_ts_metadata(struct eblob_backend *b, struct dnet_raw_id *id, uint64_t flags_set, uint64_t flags_clear)
108 int err = 0;
109 struct dnet_meta_container mc;
110 struct dnet_meta *m;
112 memset(&mc, 0, sizeof(struct dnet_meta_container));
114 err = dnet_db_read_raw(b, id, &mc.data);
115 if (err < 0) {
116 m = malloc(sizeof(struct dnet_meta) + sizeof(struct dnet_meta_update));
117 if (!m) {
118 err = -ENOMEM;
119 goto err_out_exit;
121 dnet_create_meta_update(m, NULL, flags_set, flags_clear);
123 mc.data = m;
124 mc.size = sizeof(struct dnet_meta_update) + sizeof(struct dnet_meta);
125 } else {
126 err = dnet_update_ts_metadata_raw(&mc, flags_set, flags_clear);
127 if (err) {
128 /* broken metadata, rewrite it */
129 if (err != -ENOENT) {
130 free(mc.data);
132 mc.data = NULL;
133 mc.size = 0;
136 mc.data = realloc(mc.data, mc.size + sizeof(struct dnet_meta) + sizeof(struct dnet_meta_update));
137 if (!mc.data) {
138 err = -ENOMEM;
139 goto err_out_exit;
142 m = mc.data + mc.size;
143 mc.size += sizeof(struct dnet_meta) + sizeof(struct dnet_meta_update);
145 dnet_create_meta_update(m, NULL, flags_set, flags_clear);
149 err = dnet_db_write_raw(b, id, mc.data, mc.size);
150 if (err) {
151 goto err_out_free;
154 err_out_free:
155 free(mc.data);
156 err_out_exit:
157 return err;
160 int dnet_process_meta(struct dnet_net_state *st, struct dnet_cmd *cmd, struct dnet_io_attr *io)
162 struct dnet_node *n = st->n;
163 struct dnet_raw_id id;
164 void *data;
165 int err;
167 if (cmd->cmd == DNET_CMD_READ || cmd->cmd == DNET_CMD_WRITE) {
168 if (cmd->size < sizeof(struct dnet_io_attr)) {
169 dnet_log(n, DNET_LOG_ERROR,
170 "%s: wrong read attribute, size does not match "
171 "IO attribute size: size: %llu, must be: %zu.\n",
172 dnet_dump_id(&cmd->id), (unsigned long long)cmd->size,
173 sizeof(struct dnet_io_attr));
174 err = -EINVAL;
175 goto err_out_exit;
178 memcpy(id.id, io->id, DNET_ID_SIZE);
181 switch (cmd->cmd) {
182 case DNET_CMD_READ:
183 err = n->cb->meta_read(n->cb->command_private, &id, &data);
184 if (err > 0) {
185 io->size = err;
186 err = dnet_send_read_data(st, cmd, io, data, -1, io->offset, 0);
187 free(data);
189 break;
190 case DNET_CMD_WRITE:
191 if (n->flags & DNET_CFG_NO_META) {
192 err = 0;
193 break;
196 data = io + 1;
198 err = n->cb->meta_write(n->cb->command_private, &id, data, io->size);
199 break;
200 case DNET_CMD_DEL:
201 memcpy(id.id, cmd->id.id, DNET_ID_SIZE);
202 n->cb->meta_remove(n->cb->command_private, &id, !!(cmd->flags & DNET_ATTR_DELETE_HISTORY));
203 err = n->cb->command_handler(st, n->cb->command_private, cmd, io);
204 break;
205 default:
206 err = -EINVAL;
207 break;
210 err_out_exit:
211 return err;
214 struct dnet_db_list_control {
215 struct dnet_node *n;
216 struct dnet_net_state *st;
217 struct dnet_cmd *cmd;
218 struct dnet_check_request *req;
219 struct dnet_check_params params;
221 atomic_t completed;
222 atomic_t errors;
223 atomic_t total;
226 static long long dnet_meta_get_ts(struct dnet_node *n, struct dnet_meta_container *mc)
228 struct dnet_meta *m;
229 struct dnet_meta_check_status *c;
231 m = dnet_meta_search(n, mc, DNET_META_CHECK_STATUS);
232 if (!m)
233 return -ENOENT;
235 c = (struct dnet_meta_check_status *)m->data;
236 dnet_convert_meta_check_status(c);
238 return (long long)c->tm.tsec;
241 static int dnet_db_send_check_reply(struct dnet_db_list_control *ctl)
243 struct dnet_check_reply reply;
245 memset(&reply, 0, sizeof(reply));
247 reply.total = atomic_read(&ctl->total);
248 reply.errors = atomic_read(&ctl->errors);
249 reply.completed = atomic_read(&ctl->completed);
251 dnet_convert_check_reply(&reply);
252 return dnet_send_reply(ctl->st, ctl->cmd, &reply, sizeof(reply), 1);
255 struct dnet_check_temp_db * dnet_check_temp_db_alloc(struct dnet_node *n, char *path)
257 static char temp_meta_path[310];
258 struct eblob_config ecfg;
259 struct dnet_check_temp_db *db;
261 db = (struct dnet_check_temp_db *)malloc(sizeof(struct dnet_check_temp_db));
262 if (!db) {
263 dnet_log(n, DNET_LOG_ERROR, "Failed to allocate memory for temp meta eblob config\n");
264 return NULL;
267 snprintf(temp_meta_path, sizeof(temp_meta_path), "%s/tmp_meta", path);
269 memset(&ecfg, 0, sizeof(struct eblob_config));
271 ecfg.file = temp_meta_path;
273 db->log.log = n->log->log;
274 db->log.log_private = n->log->log_private;
275 db->log.log_level = EBLOB_LOG_NOTICE;
276 ecfg.log = &db->log;
278 db->b = eblob_init(&ecfg);
279 if (!db->b) {
280 dnet_log(n, DNET_LOG_ERROR, "Failed to initialize temp meta eblob\n");
281 goto err_out_free;
284 atomic_init(&db->refcnt, 1);
286 return db;
288 err_out_free:
289 free(db);
290 return NULL;
293 int dnet_db_iterate(struct eblob_backend *b, struct dnet_iterate_ctl *dctl)
295 struct eblob_iterate_control ctl;
297 memset(&ctl, 0, sizeof(ctl));
299 ctl.flags = dctl->flags | EBLOB_ITERATE_FLAGS_ALL;
300 ctl.priv = dctl->callback_private;
301 ctl.iterator_cb = dctl->iterate_cb;
302 ctl.start_type = ctl.max_type = EBLOB_TYPE_META;
303 ctl.blob_start = dctl->blob_start;
304 ctl.blob_num = dctl->blob_num;
306 return eblob_iterate(b, &ctl);
309 static int dnet_db_list_iter_init(struct eblob_iterate_control *iter_ctl, void **thread_priv)
311 struct dnet_db_list_control *ctl = iter_ctl->priv;
312 struct dnet_node *n = ctl->n;
313 struct dnet_bulk_array *bulk_array = NULL;
314 struct dnet_net_state *st;
315 struct dnet_group *g;
316 int only_merge = !!(ctl->req->flags & DNET_CHECK_MERGE);
317 int bulk_array_tmp_num;
318 int err = 0;
320 dnet_log(n, DNET_LOG_DEBUG, "BULK: only_merge=%d\n", only_merge);
321 if (!only_merge) {
322 bulk_array = malloc(sizeof(struct dnet_bulk_array));
323 if (!bulk_array) {
324 err = -ENOMEM;
325 goto err_out_exit;
327 atomic_init(&bulk_array->refcnt, 0);
329 bulk_array_tmp_num = DNET_BULK_STATES_ALLOC_STEP;
330 bulk_array->num = 0;
331 bulk_array->states = NULL;
332 dnet_log(n, DNET_LOG_DEBUG, "BULK: allocating space for arrays, num=%d\n", bulk_array_tmp_num);
334 bulk_array->states = (struct dnet_bulk_state *)malloc(sizeof(struct dnet_bulk_state) * bulk_array_tmp_num);
335 if (!bulk_array->states) {
336 err = -ENOMEM;
337 dnet_log(n, DNET_LOG_ERROR, "BULK: Failed to allocate buffer for bulk states array.\n");
338 goto err_out_exit;
341 pthread_mutex_lock(&n->state_lock);
342 list_for_each_entry(g, &n->group_list, group_entry) {
343 if (g->group_id == n->st->idc->group->group_id)
344 continue;
346 list_for_each_entry(st, &g->state_list, state_entry) {
347 if (st == n->st)
348 continue;
350 if (bulk_array->num == bulk_array_tmp_num) {
351 dnet_log(n, DNET_LOG_DEBUG, "BULK: reallocating space for arrays, num=%d\n", bulk_array_tmp_num);
352 bulk_array_tmp_num += DNET_BULK_STATES_ALLOC_STEP;
353 bulk_array->states = realloc(bulk_array->states, sizeof(struct dnet_bulk_state) * bulk_array_tmp_num);
354 if (!bulk_array->states) {
355 err = -ENOMEM;
356 dnet_log(n, DNET_LOG_ERROR, "BULK: Failed to reallocate buffer for bulk states array.\n");
357 goto err_out_exit;
361 memcpy(&bulk_array->states[bulk_array->num].addr, &st->addr, sizeof(struct dnet_addr));
362 pthread_mutex_init(&bulk_array->states[bulk_array->num].state_lock, NULL);
363 bulk_array->states[bulk_array->num].num = 0;
364 bulk_array->states[bulk_array->num].ids = NULL;
366 bulk_array->states[bulk_array->num].ids = malloc(sizeof(struct dnet_bulk_id) * DNET_BULK_IDS_SIZE);
367 if (!bulk_array->states[bulk_array->num].ids) {
368 err = -ENOMEM;
369 dnet_log(n, DNET_LOG_ERROR, "BULK: Failed to reallocate buffer for bulk states array.\n");
370 pthread_mutex_unlock(&n->state_lock);
371 goto err_out_exit;
374 dnet_log(n, DNET_LOG_DEBUG, "BULK: added state %s (%s)\n",
375 dnet_dump_id_str(st->idc->ids[0].raw.id),
376 dnet_server_convert_dnet_addr(&st->addr));
377 bulk_array->num++;
380 pthread_mutex_unlock(&n->state_lock);
382 qsort(bulk_array->states, bulk_array->num, sizeof(struct dnet_bulk_state), dnet_compare_bulk_state);
385 *thread_priv = bulk_array;
386 return 0;
388 err_out_exit:
389 return err;
392 static int dnet_db_list_iter_free(struct eblob_iterate_control *iter_ctl, void **thread_priv)
394 struct dnet_db_list_control *ctl = iter_ctl->priv;
395 struct dnet_node *n = ctl->n;
396 struct dnet_bulk_array *bulk_array = *thread_priv;
397 int err;
398 int i;
400 if (bulk_array) {
401 while(atomic_read(&bulk_array->refcnt) > 0)
402 sleep(1);
404 for (i = 0; i < bulk_array->num; ++i) {
405 dnet_log(n, DNET_LOG_DEBUG, "CHECK: free: processing state %d %s: %d ids in this state\n",
406 i, dnet_server_convert_dnet_addr(&bulk_array->states[i].addr), bulk_array->states[i].num);
408 if (bulk_array->states[i].num > 0) {
409 err = dnet_request_bulk_check(n, &bulk_array->states[i], &ctl->params);
410 if (err) {
411 dnet_log(n, DNET_LOG_ERROR, "CHECK: dnet_request_bulk_check failed, state %s, err %d\n",
412 dnet_server_convert_dnet_addr(&bulk_array->states[i].addr), err);
415 free(bulk_array->states[i].ids);
418 free(bulk_array->states);
419 free(bulk_array);
422 *thread_priv = NULL;
424 return 0;
427 static int dnet_db_list_iter(struct eblob_disk_control *dc, struct eblob_ram_control *rc,
428 void *data, void *p, void *thread_priv)
430 struct dnet_db_list_control *ctl = p;
431 struct dnet_node *n = ctl->n;
432 struct dnet_meta_container mc;
433 struct dnet_net_state *tmp;
434 struct dnet_bulk_array *bulk_array;
435 long long check_ts, check_edge_ts = ctl->req->timestamp, update_ts;
436 char check_time[64], check_edge_time[64], update_start[64], update_stop[64], update_time[64];
437 struct tm tm;
438 int will_check, should_be_merged;
439 int send_check_reply = 1;
440 int err = 0;
442 bulk_array = thread_priv;
443 if (!bulk_array && !(ctl->req->flags & DNET_CHECK_MERGE)) {
444 dnet_log(n, DNET_LOG_ERROR, "CHECK: bulk_array is not initialized and check type is not MERGE_ONLY\n");
445 return -ENOMEM;
448 mc.data = data;
449 mc.size = rc->size;
451 if (check_edge_ts) {
452 localtime_r((time_t *)&check_edge_ts, &tm);
453 strftime(check_edge_time, sizeof(check_edge_time), "%F %R:%S %Z", &tm);
454 } else {
455 snprintf(check_edge_time, sizeof(check_edge_time), "no-check-edge");
458 if (ctl->req->updatestamp_start) {
459 localtime_r((time_t *)&ctl->req->updatestamp_start, &tm);
460 strftime(update_start, sizeof(update_start), "%F %R:%S %Z", &tm);
461 } else {
462 snprintf(update_start, sizeof(update_start), "all");
464 if (!ctl->req->updatestamp_stop)
465 ctl->req->updatestamp_stop = time(NULL);
466 localtime_r((time_t *)&ctl->req->updatestamp_stop, &tm);
467 strftime(update_stop, sizeof(update_stop), "%F %R:%S %Z", &tm);
470 dnet_setup_id(&mc.id, n->id.group_id, dc->key.id);
473 * Use group ID field to specify whether we should check number of copies
474 * or merge transaction with other history log in the storage
476 * tmp == NULL means this key belongs to given node and we should check
477 * number of its copies in the storage. If state is not NULL then given
478 * key must be moved to another machine and potentially merged with data
479 * present there
481 tmp = dnet_state_get_first(n, &mc.id);
482 should_be_merged = (tmp != NULL);
483 dnet_state_put(tmp);
486 * If timestamp is specified check should be performed only to files
487 * that was not checked since that timestamp
489 check_ts = dnet_meta_get_ts(n, &mc);
490 will_check = !(check_edge_ts && (check_ts > check_edge_ts));
493 * If start/stop update stamp is specified check should be performed only to files
494 * that were created in that interval (inclusive)
496 update_ts = 0;
497 if (will_check) {
498 struct dnet_meta_update mu;
500 /* only try to check creation/update timestamp if it is really present in database */
501 if (dnet_get_meta_update(n, &mc, &mu)) {
502 update_ts = mu.tm.tsec;
504 will_check = 0;
505 if ((mu.tm.tsec >= ctl->req->updatestamp_start) && (mu.tm.tsec <= ctl->req->updatestamp_stop))
506 will_check = 1;
510 if (will_check && !should_be_merged && (ctl->req->flags & DNET_CHECK_MERGE)) {
511 will_check = 0;
514 if (n->log->log_level > DNET_LOG_NOTICE) {
515 localtime_r((time_t *)&check_ts, &tm);
516 strftime(check_time, sizeof(check_time), "%F %R:%S %Z", &tm);
518 localtime_r((time_t *)&update_ts, &tm);
519 strftime(update_time, sizeof(update_time), "%F %R:%S %Z", &tm);
521 dnet_log_raw(n, DNET_LOG_NOTICE, "CHECK: start key: %s, "
522 "last check: %lld [%s], "
523 "last check before: %lld [%s], "
524 "created/updated: %lld [%s], "
525 "updated between: %lld [%s] - %lld [%s], "
526 "will check: %d, should_be_merged: %d, dry: %d, flags: %x, size: %u.\n",
527 dnet_dump_id(&mc.id),
528 check_ts, check_time,
529 check_edge_ts, check_edge_time,
530 update_ts, update_time,
531 (unsigned long long)ctl->req->updatestamp_start, update_start,
532 (unsigned long long)ctl->req->updatestamp_stop, update_stop,
533 will_check, should_be_merged,
534 !!(ctl->req->flags & DNET_CHECK_DRY_RUN), ctl->req->flags, mc.size);
537 if (will_check) {
538 err = 0;
539 if (!(ctl->req->flags & DNET_CHECK_DRY_RUN)) {
540 err = dnet_check(n, &mc, bulk_array, should_be_merged, &ctl->params);
542 dnet_log_raw(n, DNET_LOG_NOTICE, "CHECK: complete key: %s, merge: %d, err: %d\n",
543 dnet_dump_id(&mc.id), should_be_merged, err);
546 if (!err) {
547 atomic_inc(&ctl->completed);
548 } else {
549 atomic_inc(&ctl->errors);
553 if ((atomic_inc(&ctl->total) % 30000) == 0) {
554 if (send_check_reply) {
555 if (dnet_db_send_check_reply(ctl))
556 send_check_reply = 0;
559 dnet_log(n, DNET_LOG_INFO, "CHECK: total: %d, completed: %d, errors: %d\n",
560 atomic_read(&ctl->total), atomic_read(&ctl->completed), atomic_read(&ctl->errors));
563 return err;
566 int dnet_db_list(struct dnet_net_state *st, struct dnet_cmd *cmd)
568 struct dnet_node *n = st->n;
569 struct dnet_db_list_control ctl;
570 struct dnet_check_request *r, req;
571 char ctl_time[64];
572 struct tm tm;
573 int err = 0;
575 if (n->check_in_progress)
576 return -EINPROGRESS;
578 if (cmd->size < sizeof(struct dnet_check_request)) {
579 dnet_log(n, DNET_LOG_ERROR, "%s: CHECK: invalid check request size %llu, must be %zu\n",
580 dnet_dump_id(&cmd->id), (unsigned long long)cmd->size, sizeof(struct dnet_check_request));
581 return -EINVAL;
584 r = (struct dnet_check_request *)(cmd + 1);
585 dnet_convert_check_request(r);
587 n->check_in_progress = 1;
589 if (!r->thread_num)
590 r->thread_num = 50;
592 memcpy(&req, r, sizeof(req));
594 memset(&ctl, 0, sizeof(struct dnet_db_list_control));
596 atomic_init(&ctl.completed, 0);
597 atomic_init(&ctl.errors, 0);
598 atomic_init(&ctl.total, 0);
600 ctl.n = n;
601 ctl.st = st;
602 ctl.cmd = cmd;
603 ctl.req = &req;
604 ctl.params.db = dnet_check_temp_db_alloc(n, n->temp_meta_env);
605 if (!ctl.params.db) {
606 err = -ENOMEM;
607 goto err_out_exit;
610 if (req.timestamp) {
611 localtime_r((time_t *)&req.timestamp, &tm);
612 strftime(ctl_time, sizeof(ctl_time), "%F %R:%S %Z", &tm);
613 } else {
614 snprintf(ctl_time, sizeof(ctl_time), "all records");
617 if (req.obj_num > 0)
618 req.thread_num = 1;
620 dnet_log(n, DNET_LOG_INFO, "CHECK: Started %u checking threads, recovering %llu transactions, "
621 "which started before %s: merge: %d, full: %d, dry: %d.\n",
622 req.thread_num, (unsigned long long)req.obj_num, ctl_time,
623 !!(req.flags & DNET_CHECK_MERGE), !!(req.flags & DNET_CHECK_FULL),
624 !!(req.flags & DNET_CHECK_DRY_RUN));
626 if (req.group_num) {
627 int *groups;
628 char str[req.group_num*36+1], *ptr;
629 int rest;
630 uint32_t i;
632 groups = (int *)((char *)(r) + sizeof(struct dnet_check_request) + r->obj_num * sizeof(struct dnet_id));
634 ptr = str;
635 rest = sizeof(ptr);
636 for (i = 0; i < req.group_num; ++i) {
637 err = snprintf(ptr, rest, "%d:", groups[i]);
638 if (err > rest)
639 break;
641 rest -= err;
642 ptr += err;
645 *(--ptr) = '\0';
647 dnet_log(n, DNET_LOG_INFO, "CHECK: groups will be overrided with: %s\n", str);
649 ctl.params.group_num = req.group_num;
650 ctl.params.groups = groups;
653 dnet_ioprio_set(dnet_get_id(), n->bg_ionice_class, n->bg_ionice_prio);
655 if (req.obj_num > 0) {
656 struct dnet_id *ids = (struct dnet_id *)(r + 1);
657 struct eblob_iterate_control iter_ctl;
658 struct eblob_disk_control dc;
659 struct eblob_ram_control rc;
660 struct dnet_raw_id id;
661 void *data;
662 void *priv = NULL;
663 int err;
664 uint32_t i;
666 memset(&dc, 0, sizeof(struct eblob_disk_control));
667 memset(&rc, 0, sizeof(struct eblob_ram_control));
669 iter_ctl.thread_num = 1;
670 iter_ctl.priv = &ctl;
671 dnet_db_list_iter_init(&iter_ctl, &priv);
673 for (i = 0; i < req.obj_num; ++i) {
674 memcpy(&id.id, &ids[i].id, DNET_ID_SIZE);
675 err = n->cb->meta_read(n->cb->command_private, &id, &data);
676 if (err > 0) {
677 rc.size = err;
678 memcpy(&dc.key.id, &ids[i].id, DNET_ID_SIZE);
679 err = dnet_db_list_iter(&dc, &rc, data, &ctl, priv);
682 dnet_db_list_iter_free(&iter_ctl, &priv);
684 } else {
685 struct dnet_iterate_ctl dctl;
687 memset(&dctl, 0, sizeof(struct dnet_iterate_ctl));
689 dctl.iterate_private = n->cb->command_private;
690 dctl.flags = 0;
691 dctl.blob_start = req.blob_start;
692 dctl.blob_num = req.blob_num;
693 dctl.callback_private = &ctl;
695 dctl.iterate_cb.iterator = dnet_db_list_iter;
696 dctl.iterate_cb.iterator_init = dnet_db_list_iter_init;
697 dctl.iterate_cb.iterator_free = dnet_db_list_iter_free;
698 dctl.iterate_cb.thread_num = req.thread_num;
700 err = n->cb->meta_iterate(&dctl);
703 if(r->flags & DNET_CHECK_MERGE) {
704 dnet_counter_set(n, DNET_CNTR_NODE_LAST_MERGE, 0, atomic_read(&ctl.completed));
705 dnet_counter_set(n, DNET_CNTR_NODE_LAST_MERGE, 1, atomic_read(&ctl.errors));
708 dnet_db_send_check_reply(&ctl);
710 dnet_check_temp_db_put(ctl.params.db);
712 err_out_exit:
713 n->check_in_progress = 0;
714 return err;