Minor cleanups
[elliptics.git] / library / check.c
blob514eceeeba5fb270bb3d6095a0f794c9e458eaaf
1 /*
2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
17 #include <sys/stat.h>
18 #include <sys/mman.h>
20 #include <dirent.h>
21 #include <errno.h>
22 #include <fcntl.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <unistd.h>
28 #include "elliptics.h"
29 #include "elliptics/interface.h"
31 static char dnet_check_tmp_dir[] = "/dev/shm";
33 static int dnet_merge_remove_local(struct dnet_node *n, struct dnet_id *id, int full_process)
35 char buf[sizeof(struct dnet_cmd)];
36 struct dnet_cmd *cmd;
37 struct dnet_net_state *base;
38 int err = -ENOENT;
40 memset(buf, 0, sizeof(buf));
42 cmd = (struct dnet_cmd *)buf;
44 memcpy(&cmd->id, id, sizeof(struct dnet_id));
45 cmd->size = 0;
47 cmd->cmd = DNET_CMD_DEL;
48 if (!full_process)
49 cmd->flags = DNET_ATTR_DELETE_HISTORY;
51 base = dnet_node_state(n);
52 if (base) {
53 err = dnet_process_meta(base, cmd, NULL);
54 dnet_state_put(base);
57 return err;
60 static int dnet_dump_meta_container(struct dnet_node *n, struct dnet_meta_container *mc)
62 int fd, err;
63 char file[256];
64 char id_str[DNET_ID_SIZE*2+1];
66 snprintf(file, sizeof(file), "%s/%s.meta", dnet_check_tmp_dir, dnet_dump_id_len_raw(mc->id.id, DNET_ID_SIZE, id_str));
68 fd = open(file, O_RDWR | O_TRUNC | O_CREAT | O_CLOEXEC, 0644);
69 if (fd < 0) {
70 err = -errno;
71 dnet_log_raw(n, DNET_LOG_ERROR, "Failed to open meta container file '%s': %s\n",
72 file, strerror(errno));
73 goto err_out_exit;
76 err = write(fd, mc->data, mc->size);
77 if (err != (int)mc->size) {
78 err = -errno;
79 dnet_log_raw(n, DNET_LOG_ERROR, "Failed to write meta container into '%s': %s\n",
80 file, strerror(errno));
81 goto err_out_close;
83 err = 0;
85 err_out_close:
86 close(fd);
87 err_out_exit:
88 return err;
91 static int dnet_check_find_groups(struct dnet_node *n, struct dnet_meta_container *mc, int **groupsp)
93 int err, i, num;
94 struct dnet_meta *m;
95 int *groups;
97 m = dnet_meta_search(n, mc, DNET_META_GROUPS);
98 if (!m) {
99 dnet_log_raw(n, DNET_LOG_ERROR, "%s: failed to find groups metadata.\n", dnet_dump_id(&mc->id));
100 err = -ENOENT;
101 goto err_out_exit;
104 groups = malloc(m->size);
105 if (!groups) {
106 err = -ENOMEM;
107 goto err_out_exit;
109 memcpy(groups, m->data, m->size);
111 num = m->size / sizeof(int32_t);
113 for (i=0; i<num; ++i) {
114 dnet_log_raw(n, DNET_LOG_DEBUG, "%s: group: %d\n", dnet_dump_id(&mc->id), groups[i]);
117 *groupsp = groups;
119 return num;
121 err_out_exit:
122 dnet_dump_meta_container(n, mc);
123 return err;
126 /*static int dnet_bulk_db_check_update(struct dnet_node *n, struct dnet_meta_container *mc_array, int *rec_num,
127 struct dnet_meta_container *mc, int final)
129 int err = 0;
130 int64_t rec_processed;
131 KCREC recs[DNET_BULK_META_UPD_SIZE];
132 int rec_iter = 0;
134 if (!rec_num || *rec_num > DNET_BULK_META_UPD_SIZE) {
135 err = -EINVAL;
136 goto err_out_exit;
139 if (!final) {
140 if (!mc) {
141 dnet_log_raw(n, DNET_LOG_ERROR, "CHECK: mc should be passed\n");
142 return -1;
144 memcpy(&mc_array[*rec_num].id, &mc->id, sizeof(struct dnet_id));
145 mc_array[*rec_num].size = mc->size;
146 // Allocate extra memory for potential META_CHECK_STATUS structure
147 mc_array[*rec_num].data = malloc(mc->size + sizeof(struct dnet_meta) + sizeof(struct dnet_meta_check_status));
148 if (!mc_array[*rec_num].data) {
149 err = -ENOMEM;
150 goto err_out_exit;
152 memcpy(mc_array[*rec_num].data, mc->data, mc->size);
153 (*rec_num)++;
156 if (*rec_num == DNET_BULK_META_UPD_SIZE || (final && *rec_num > 0)) {
157 err = kcdbbegintran(n->meta, 0);
158 if (!err) {
159 err = -kcdbecode(n->meta);
160 dnet_log_raw(n, DNET_LOG_ERROR, "CHECK: DB: failed to start %s transaction, err: %d: %s.\n",
161 "meta", err, kcecodename(-err));
162 goto err_out_exit;
164 for (rec_iter = 0; rec_iter < *rec_num; ++rec_iter) {
165 err = dnet_db_check_update(n, &mc_array[rec_iter]);
166 recs[rec_iter].key.size = DNET_ID_SIZE;
167 recs[rec_iter].key.buf = (char *)mc_array[rec_iter].id.id;
168 recs[rec_iter].value.size = mc_array[rec_iter].size;
169 recs[rec_iter].value.buf = mc_array[rec_iter].data;
171 rec_processed = kcdbsetbulk(n->meta, recs, *rec_num, 0);
172 if ((int)rec_processed != *rec_num) {
173 err = -kcdbecode(n->meta);
174 dnet_log_raw(n, DNET_LOG_ERROR, "CHECK: DB: failed to set check update stamps, %d records processed, err: %d: %s.\n",
175 (int)rec_processed, err, kcecodename(-err));
176 kcdbendtran(n->meta, 0);
177 goto err_out_exit;
180 kcdbendtran(n->meta, 1);
181 for (rec_iter = 0; rec_iter < *rec_num; ++rec_iter) {
182 free(mc_array[rec_iter].data);
184 *rec_num = 0;
187 err_out_exit:
188 return err;
192 int dnet_cmd_bulk_check(struct dnet_net_state *orig, struct dnet_cmd *cmd, void *data)
194 struct dnet_bulk_id *ids = (struct dnet_bulk_id *)data;
195 struct dnet_meta_container mc;
196 struct dnet_meta_update mu;
197 struct dnet_id raw;
198 int i;
199 int err = 0;
200 int num;
202 if (!(cmd->size % sizeof(struct dnet_bulk_id))) {
203 num = cmd->size / sizeof(struct dnet_bulk_id);
205 dnet_log(orig->n, DNET_LOG_DEBUG, "BULK: received %d entries\n", num);
207 for (i = 0; i < num; ++i) {
209 /* Send empty reply every DNET_BULK_CHECK_PING records to prevent timeout */
210 if (i % DNET_BULK_CHECK_PING == 0 && i > 0) {
211 dnet_send_reply(orig, cmd, NULL, 0, 1);
214 dnet_log(orig->n, DNET_LOG_DEBUG, "BULK: processing ID %s\n", dnet_dump_id_str(ids[i].id.id));
215 mc.data = NULL;
216 dnet_setup_id(&mc.id, 0, ids[i].id.id);
217 err = orig->n->cb->meta_read(orig->n->cb->command_private, &ids[i].id, &mc.data);
218 if (mc.data) {
219 mc.size = err;
220 dnet_log(orig->n, DNET_LOG_DEBUG, "BULK: %d bytes of metadata found, searching for META_UPDATE group_id=%d\n",
221 mc.size, orig->n->st->idc->group->group_id);
222 if (dnet_get_meta_update(orig->n, &mc, &mu))
224 dnet_convert_meta_update(&ids[i].last_update);
225 dnet_log(orig->n, DNET_LOG_DEBUG, "BULK: mu.tsec=%lu, mu.tnsec=%lu, mu.flags=%02lx\n",
226 (unsigned long)mu.tm.tsec, (unsigned long)mu.tm.tnsec, (unsigned long)mu.flags);
227 dnet_log(orig->n, DNET_LOG_DEBUG,
228 "BULK: last_update.tsec=%lu, last_update.tnsec=%lu, last_update.flags=%02lx\n",
229 (unsigned long)ids[i].last_update.tm.tsec, (unsigned long)ids[i].last_update.tm.tnsec,
230 (unsigned long)ids[i].last_update.flags);
232 if ((mu.flags & DNET_IO_FLAGS_REMOVED) || (mu.tm.tsec < ids[i].last_update.tm.tsec) ||
233 ((mu.tm.tnsec < ids[i].last_update.tm.tnsec) && (mu.tm.tsec == ids[i].last_update.tm.tsec))) {
234 err = 0;
235 } else {
236 /* File is not needed to be updated */
237 dnet_setup_id(&raw, orig->n->id.group_id, ids[i].id.id);
238 err = dnet_stat_local(orig, &raw);
239 if (err) {
240 /* File was not found in the storage */
241 mu.tm.tsec = 1;
242 mu.flags = 0;
243 } else {
244 err = dnet_meta_update_check_status(orig->n, &mc);
245 if (err) {
246 dnet_log(orig->n, DNET_LOG_ERROR,
247 "BULK: %s: couldn't update meta CHECK_STATUS err: %d\n",
248 dnet_dump_id_str(ids[i].id.id), err);
253 memcpy(&ids[i].last_update, &mu, sizeof(struct dnet_meta_update));
254 dnet_convert_meta_update(&ids[i].last_update);
256 free(mc.data);
257 } else {
258 /* Meta is not present - set timestamp to very old one */
259 dnet_convert_meta_update(&ids[i].last_update);
260 ids[i].last_update.tm.tsec = 1;
261 ids[i].last_update.flags = 0;
262 dnet_convert_meta_update(&ids[i].last_update);
265 } else {
266 dnet_log(orig->n, DNET_LOG_ERROR, "BULK: received corrupted data, size = %llu, sizeof(dnet_bulk_id) = %zu\n",
267 (unsigned long long)cmd->size, sizeof(struct dnet_bulk_id));
268 err = -1;
269 goto err_out_exit;
272 return dnet_send_reply(orig, cmd, data, sizeof(struct dnet_bulk_id) * num, 0);
274 err_out_exit:
275 return err;
278 struct dnet_bulk_check_priv {
279 struct dnet_wait *w;
280 struct dnet_check_temp_db *db;
281 atomic_t refcnt;
282 int group_num;
283 int *groups;
286 static int dnet_bulk_check_complete_single(struct dnet_net_state *state, struct dnet_bulk_id *ids,
287 int remote_group, struct dnet_bulk_check_priv *p)
289 struct dnet_session *s = dnet_session_create(state->n);
290 struct dnet_meta_container mc;
291 struct dnet_meta_container temp_mc;
292 struct dnet_meta_update *mu;
293 struct dnet_meta *mg;
294 struct dnet_id id;
295 char *tmpdata = NULL;
296 int *groups, group_num = 1;
297 int err = -EINVAL, error = 0;
298 int i = 0;
299 int my_group, lastest_group = -1;
300 struct dnet_meta_update lastest_mu, my_mu;
301 struct timeval current_ts;
302 int removed_in_all = 1, updated = 0;
303 int lastest = 0;
304 uint64_t cflags = 0;
305 uint64_t ioflags = 0;
308 my_group = state->n->id.group_id;
310 dnet_log(state->n, DNET_LOG_DEBUG, "BULK: checking ID %s\n", dnet_dump_id_str(ids->id.id));
311 err = -ENOENT;
312 error = 0;
314 dnet_setup_id(&mc.id, my_group, ids->id.id);
316 err = state->n->cb->meta_read(state->n->cb->command_private, &ids->id, &mc.data);
317 if (err <= 0) {
318 if (err == 0)
319 err = -ENOENT;
320 goto err_out_continue;
322 mc.size = err;
324 /* Set current group meta_update as lastest_mu */
325 if (!dnet_get_meta_update(state->n, &mc, &my_mu)) {
326 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: meta_update structure doesn't exist for group %d\n",
327 dnet_dump_id_str(ids->id.id), my_group);
328 err = -ENOENT;
329 goto err_out_kcfree;
331 dnet_convert_meta_update(&my_mu);
332 memcpy(&lastest_mu, &my_mu, sizeof(struct dnet_meta_update));
333 lastest_group = my_group;
335 /* Get group list */
336 if (p->group_num) {
337 /* groups came from dnet_check utility */
338 groups = p->groups;
339 group_num = p->group_num;
340 } else {
341 mg = dnet_meta_search(state->n, &mc, DNET_META_GROUPS);
342 if (!mg) {
343 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: DNET_META_GROUPS structure doesn't exist\n", dnet_dump_id_str(ids->id.id));
344 err = -ENOENT;
345 goto err_out_kcfree;
347 dnet_convert_meta(mg);
348 if (mg->size % sizeof(int)) {
349 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: DNET_META_GROUPS structure is corrupted\n", dnet_dump_id_str(ids->id.id));
350 err = -1;
351 goto err_out_kcfree;
353 group_num = mg->size / sizeof(int);
354 groups = (int *)mg->data;
355 dnet_convert_meta(mg);
358 /* Read temporary meta */
359 temp_mc.data = malloc(sizeof(struct dnet_meta_update) * group_num);
360 if (!temp_mc.data) {
361 err = -ENOMEM;
362 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: could not allocate memory for temp UPDATE_META\n", dnet_dump_id_str(ids->id.id));
363 goto err_out_kcfree;
365 memset(temp_mc.data, 0, sizeof(struct dnet_meta_update) * group_num);
366 temp_mc.size = sizeof(struct dnet_meta_update) * group_num;
368 err = dnet_db_read_raw(p->db->b, &ids->id, (void **)&tmpdata);
369 if (err <= 0) {
370 if (err < 0 && err != -2)
371 goto err_out_free;
372 /* No data in temp meta was stored. Placing local meta_update at the beginning */
373 mu = temp_mc.data;
374 mu[0].group_id = my_group;
375 mu[0].tm = my_mu.tm;
376 mu[0].flags = my_mu.flags;
378 } else {
379 if (err > (int)(sizeof(struct dnet_meta_update) * group_num)) {
380 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: too many data stored in temp meta\n", dnet_dump_id_str(ids->id.id));
381 err = -ENOMEM;
382 goto err_out_free;
384 memcpy(temp_mc.data, tmpdata, err);
387 /* Update temp meta with received group */
388 mu = temp_mc.data;
389 updated = 0;
390 lastest = 0;
391 for (i = 0; i < group_num; ++i) {
392 if (mu[i].group_id == remote_group) {
393 mu[i].tm = ids->last_update.tm;
394 mu[i].flags = ids->last_update.flags;
395 updated = 1;
398 if (mu[i].group_id == 0)
399 break;
401 if (!(mu[i].flags & DNET_IO_FLAGS_REMOVED))
402 removed_in_all = 0;
404 if (((mu[i].tm.tsec > mu[lastest].tm.tsec)
405 || ((mu[i].tm.tsec == mu[lastest].tm.tsec) && (mu[i].tm.tnsec > mu[lastest].tm.tnsec)))
406 && i != lastest) {
407 lastest = i;
408 lastest_group = groups[i];
412 if (!updated && i == group_num) {
413 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: no space left to save group in temp meta!\n", dnet_dump_id_str(ids->id.id));
414 err = -ENOMEM;
415 goto err_out_free;
417 if (!updated) {
418 mu[i].group_id = remote_group;
419 mu[i].tm = ids->last_update.tm;
420 mu[i].flags = ids->last_update.flags;
422 if (((mu[i].tm.tsec > mu[lastest].tm.tsec)
423 || ((mu[i].tm.tsec == mu[lastest].tm.tsec) && (mu[i].tm.tnsec > mu[lastest].tm.tnsec)))
424 && i != lastest) {
425 lastest = i;
426 lastest_group = groups[i];
429 ++i;
432 /* Not all groups processed yet */
433 if (i < group_num) {
434 err = 0;
435 err = dnet_db_write_raw(p->db->b, &ids->id, temp_mc.data, temp_mc.size);
436 if (err) {
437 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: unable to save temp meta, err: %d\n", dnet_dump_id_str(ids->id.id), err);
439 goto err_out_free;
442 /* Check if removal_delay second has gone since object was marked as REMOVED */
443 if (removed_in_all) {
444 gettimeofday(&current_ts, NULL);
445 if (((uint64_t)current_ts.tv_sec < mu[lastest].tm.tsec)
446 || ((uint64_t)current_ts.tv_sec - mu[lastest].tm.tsec) < (uint64_t)(state->n->removal_delay * 3600 * 24))
447 removed_in_all = 0;
450 /* TODO: receive newer files from remote groups
452 * Yep, we should read it locally and send it to other groups too
454 if ((lastest_group != my_group) && !(mu[lastest].flags & DNET_IO_FLAGS_REMOVED)) {
455 dnet_log(state->n, DNET_LOG_DEBUG, "BULK: %s: File on remote group %d is newer, skipping this file\n",
456 dnet_dump_id_str(ids->id.id), lastest_group);
457 err = 0;
458 goto err_out_free;
461 for (i = 0; i < group_num; ++i) {
462 err = 0;
463 if (mu[i].group_id == my_group)
464 continue;
466 dnet_session_set_groups(s, (int *)&mu[i].group_id, 1);
467 dnet_setup_id(&id, mu[i].group_id, ids->id.id);
468 id.type = -1;
470 if (mu[lastest].flags & DNET_IO_FLAGS_REMOVED) {
471 if (removed_in_all) {
472 dnet_log(state->n, DNET_LOG_DEBUG, "BULK: dnet_remove_object_now %s in group %d, err=%d\n",
473 dnet_dump_id(&id), mu[i].group_id, err);
474 err = dnet_remove_object_now(s, &id, cflags, ioflags);
475 } else {
476 if (!(mu[i].flags & DNET_IO_FLAGS_REMOVED)) {
477 err = dnet_remove_object(s, &id, NULL, NULL, cflags, ioflags);
478 dnet_log(state->n, DNET_LOG_DEBUG, "BULK: dnet_remove_object %s in group %d err=%d\n",
479 dnet_dump_id(&id), mu[i].group_id, err);
482 if (err < 0)
483 goto err_out_cont2;
484 } else {
485 if ((mu[i].tm.tsec < mu[lastest].tm.tsec) || ((mu[i].tm.tsec == mu[lastest].tm.tsec) &&
486 ((mu[i].tm.tnsec < mu[lastest].tm.tnsec)))) {
487 err = state->n->cb->send(state, state->n->cb->command_private, &id);
489 if (err)
490 goto err_out_cont2;
492 err = dnet_meta_update_check_status_raw(state->n, &mc);
493 if (err)
494 goto err_out_cont2;
496 memcpy(&mc.id, &id, sizeof(struct dnet_id));
497 err = dnet_write_metadata(s, &mc, 1, cflags);
498 dnet_log(state->n, DNET_LOG_DEBUG, "BULK: dnet_write_metadata %s in group %d, err=%d\n",
499 dnet_dump_id(&id), my_group, err);
501 if (err < 0)
502 goto err_out_cont2;
505 err_out_cont2:
506 if (err < 0)
507 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: Error during sending transaction to group %d, err=%d\n",
508 dnet_dump_id_str(ids->id.id), groups[i], err);
509 if (!error && err < 0)
510 error = err;
513 if (mu[lastest].flags & DNET_IO_FLAGS_REMOVED) {
514 if (removed_in_all) {
515 err = dnet_merge_remove_local(state->n, &mc.id, 0);
516 } else if (!(my_mu.flags & DNET_IO_FLAGS_REMOVED)) {
517 err = dnet_merge_remove_local(state->n, &mc.id, 1);
521 if (!(mu[lastest].flags & DNET_IO_FLAGS_REMOVED) && !error) {
522 err = dnet_meta_update_check_status(state->n, &mc);
523 if (err) {
524 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: couldn't update meta CHECK_STATUS\n", dnet_dump_id_str(ids->id.id));
528 if (group_num > 2) {
529 err = dnet_db_remove_raw(p->db->b, &ids->id, 1);
530 if (!err) {
531 dnet_log_raw(state->n, DNET_LOG_ERROR, "BULK: %s: DB: failed to remove temp_meta object, err: %d.\n",
532 dnet_dump_id(&mc.id), err);
536 if (error > 0)
537 error = 0;
538 err_out_free:
539 free(temp_mc.data);
540 err_out_kcfree:
541 free(mc.data);
542 if (tmpdata)
543 free(tmpdata);
544 err_out_continue:
545 if (error < 0) {
546 dnet_log(state->n, DNET_LOG_ERROR, "Failed to check ID %s to %s, err=%d\n", dnet_dump_id_str(ids->id.id),
547 dnet_state_dump_addr(state), error);
549 if (i == group_num) {
550 dnet_counter_inc(state->n, DNET_CNTR_NODE_CHECK_COPY, error);
553 return error;
556 static int dnet_bulk_check_complete(struct dnet_net_state *state, struct dnet_cmd *cmd, void *priv)
558 struct dnet_bulk_check_priv *p = priv;
559 int err = 0, i;
561 if (is_trans_destroyed(state, cmd)) {
562 dnet_wakeup(p->w, p->w->cond++);
563 dnet_wait_put(p->w);
564 dnet_check_temp_db_put(p->db);
565 if (atomic_dec_and_test(&p->refcnt)) {
566 free(p->groups);
567 free(p);
569 return 0;
572 /* Empty reply that prevents timeout */
573 if (cmd->size == 0) {
574 return 0;
577 if (!(cmd->size % sizeof(struct dnet_bulk_id))) {
578 struct dnet_bulk_id *ids = (struct dnet_bulk_id *)(cmd + 1);
579 int num = cmd->size / sizeof(struct dnet_bulk_id);
581 dnet_log(state->n, DNET_LOG_DEBUG, "BULK: received %d entries\n", num);
583 //dnet_db_ptr_get(&state->n->temp_meta);
584 //ret = kcdbbegintran(state->n->temp_meta.db, 0);
585 //if (!ret) {
586 // err = -kcdbecode(state->n->temp_meta.db);
587 // dnet_log_raw(state->n, DNET_LOG_ERROR, "BULK: DB: failed to start temp_meta transaction, err: %d: %s.\n",
588 // err, kcecodename(-err));
589 // return err;
592 for (i = 0; i < num && !state->n->need_exit; ++i) {
593 err = dnet_bulk_check_complete_single(state, &ids[i], cmd->id.group_id, p);
596 //kcdbendtran(state->n->temp_meta.db, 1);
597 //dnet_db_ptr_put(state->n, &state->n->temp_meta);
599 if (err) {
600 dnet_log(state->n, DNET_LOG_ERROR, "BULK: couldn't update meta CHECK_STATUS\n");
602 } else {
603 dnet_log(state->n, DNET_LOG_ERROR, "BULK: received corrupted data, size = %llu, sizeof(dnet_bulk_id) = %zu\n",
604 (unsigned long long)cmd->size, sizeof(struct dnet_bulk_id));
607 p->w->status = cmd->status;
608 return err;
611 int dnet_request_bulk_check(struct dnet_node *n, struct dnet_bulk_state *state, struct dnet_check_params *params)
613 struct dnet_trans_control ctl;
614 struct dnet_net_state *st;
615 struct dnet_bulk_check_priv *p;
616 struct timespec wait_ts;
617 int err;
619 p = (struct dnet_bulk_check_priv *)malloc(sizeof(struct dnet_bulk_check_priv));
620 if (!p) {
621 err = -ENOMEM;
622 goto err_out_exit;
624 atomic_init(&p->refcnt, 2);
625 p->db = params->db;
626 dnet_check_temp_db_get(p->db);
628 p->w = dnet_wait_alloc(0);
629 if (!p->w) {
630 err = -ENOMEM;
631 goto err_out_free;
634 p->group_num = params->group_num;
635 p->groups = (int *)malloc(sizeof(int) * params->group_num);
636 if (!p->groups) {
637 err = -ENOMEM;
638 goto err_out_put;
640 memcpy(p->groups, params->groups, sizeof(int) * params->group_num);
642 memset(&ctl, 0, sizeof(struct dnet_trans_control));
644 ctl.cmd = DNET_CMD_LIST;
645 ctl.complete = dnet_bulk_check_complete;
646 ctl.priv = p;
647 ctl.cflags = DNET_FLAGS_NEED_ACK | DNET_FLAGS_NOLOCK | DNET_ATTR_BULK_CHECK;
649 ctl.data = state->ids;
650 ctl.size = sizeof(struct dnet_bulk_id) * state->num;
652 st = dnet_state_search_by_addr(n, &state->addr);
653 if (!st) {
654 err = -ENOENT;
655 goto err_out_put;
657 dnet_setup_id(&ctl.id, st->idc->group->group_id, st->idc->ids[0].raw.id);
658 dnet_wait_get(p->w);
660 dnet_log(n, DNET_LOG_DEBUG, "BULK: sending %u bytes of data to %s (%s)\n",
661 ctl.size, dnet_dump_id(&ctl.id), dnet_server_convert_dnet_addr(&state->addr));
662 err = dnet_trans_alloc_send_state(st, &ctl);
663 dnet_state_put(st);
665 wait_ts = n->wait_ts;
666 wait_ts.tv_sec *= DNET_BULK_IDS_SIZE;
667 err = dnet_wait_event(p->w, p->w->cond != 0, &wait_ts);
668 if (err)
669 goto err_out_put;
671 if (p->w->status) {
672 err = p->w->status;
673 goto err_out_put;
676 dnet_wait_put(p->w);
678 if (atomic_dec_and_test(&p->refcnt)) {
679 free(p->groups);
680 free(p);
683 return 0;
685 err_out_put:
686 dnet_wait_put(p->w);
688 err_out_free:
689 if (atomic_dec_and_test(&p->refcnt)) {
690 free(p->groups);
691 free(p);
694 err_out_exit:
695 dnet_log(n, DNET_LOG_ERROR, "Bulk check exited with status %d\n", err);
696 return err;
699 static int dnet_bulk_add_id(struct dnet_node *n, struct dnet_bulk_array *bulk_array, struct dnet_id *id,
700 struct dnet_meta_container *mc, struct dnet_check_params *params)
702 int err = 0;
703 struct dnet_bulk_state tmp;
704 struct dnet_bulk_state *state = NULL;
705 struct dnet_net_state *st = dnet_state_get_first(n, id);
706 struct dnet_bulk_id *bulk_id;
707 struct dnet_meta_update mu;
709 dnet_log(n, DNET_LOG_DEBUG, "BULK: adding ID %s to array\n", dnet_dump_id(id));
710 if (!st)
711 return -1;
713 memcpy(&tmp.addr, &st->addr, sizeof(struct dnet_addr));
714 dnet_state_put(st);
716 dnet_log(n, DNET_LOG_DEBUG, "BULK: Searching state in states array\n");
717 state = bsearch(&tmp, bulk_array->states, bulk_array->num, sizeof(struct dnet_bulk_state), dnet_compare_bulk_state);
718 if (!state)
719 return -1;
721 if (!dnet_get_meta_update(n, mc, &mu))
722 return -ENOENT;
724 dnet_log(n, DNET_LOG_DEBUG, "BULK: addr = %s state->num = %d\n", dnet_server_convert_dnet_addr(&state->addr), state->num);
725 //pthread_mutex_lock(&state->state_lock);
726 if (state->num >= DNET_BULK_IDS_SIZE || state->num < 0)
727 goto err_out_unlock;
729 bulk_id = &state->ids[state->num];
730 memset(bulk_id, 0, sizeof(struct dnet_bulk_id));
732 memcpy(&bulk_id->id, &id->id, DNET_ID_SIZE);
734 dnet_log(n, DNET_LOG_DEBUG, "BULK: ID: %s, last_update->tsec=%llu, last_update->tnsec=%llu, flags=%02llx\n",
735 dnet_dump_id_str(bulk_id->id.id), (unsigned long long)mu.tm.tsec, (unsigned long long)mu.tm.tnsec,
736 (unsigned long long)mu.flags);
738 dnet_convert_meta_update(&mu);
740 memcpy(&bulk_id->last_update, &mu, sizeof(struct dnet_meta_update));
742 state->num++;
744 dnet_log(n, DNET_LOG_DEBUG, "BULK: addr = %s state->num = %d\n", dnet_server_convert_dnet_addr(&state->addr), state->num);
745 if (state->num == DNET_BULK_IDS_SIZE) {
746 err = dnet_request_bulk_check(n, state, params);
747 state->num = 0;
748 if (err)
749 goto err_out_unlock;
752 //pthread_mutex_unlock(&state->state_lock);
754 return 0;
756 err_out_unlock:
757 //pthread_mutex_unlock(&state->state_lock);
758 return -2;
761 static int dnet_check_number_of_copies(struct dnet_node *n, struct dnet_meta_container *mc, int *groups, int group_num,
762 struct dnet_bulk_array *bulk_array, struct dnet_check_params *params)
764 struct dnet_id raw;
765 int group_id = mc->id.group_id;
766 int err = 0, i, error = 0;
768 for (i=0; i<group_num; ++i) {
769 if (groups[i] == group_id)
770 continue;
772 dnet_setup_id(&raw, groups[i], mc->id.id);
774 err = dnet_bulk_add_id(n, bulk_array, &raw, mc, params);
775 if (err)
776 dnet_log(n, DNET_LOG_ERROR, "BULK: after adding ID %s err = %d\n", dnet_dump_id(&raw), err);
778 if (!err)
779 error = 0;
780 else if (!error)
781 error = err;
784 return error;
787 static int dnet_check_copies(struct dnet_node *n, struct dnet_meta_container *mc,
788 struct dnet_bulk_array *bulk_array, struct dnet_check_params *params)
790 int err;
791 int *groups = NULL;
793 if (params->group_num) {
794 groups = params->groups;
795 err = params->group_num;
796 } else {
797 err = dnet_check_find_groups(n, mc, &groups);
798 if (err <= 0)
799 return -ENOENT;
802 err = dnet_check_number_of_copies(n, mc, groups, err, bulk_array, params);
804 if (!params->group_num)
805 free(groups);
807 return err;
810 static int dnet_merge_direct(struct dnet_session *s, struct dnet_meta_container *mc)
812 struct dnet_node *n = s->node;
813 struct dnet_net_state *base;
814 int cflags = 0;
815 int err;
817 dnet_log(n, DNET_LOG_DEBUG, "in dnet_merge_direct mc->size = %u\n", mc->size);
818 base = dnet_node_state(n);
819 if (!base) {
820 err = -ENOENT;
821 goto err_out_exit;
824 err = n->cb->send(base, n->cb->command_private, &mc->id);
825 dnet_log(n, DNET_LOG_DEBUG, "in dnet_merge_direct after n->cb->send err = %d\n\n", err);
826 if (err < 0)
827 goto err_out_put;
829 dnet_log(n, DNET_LOG_DEBUG, "in dnet_merge_direct2 mc->size = %u\n", mc->size);
830 err = dnet_write_metadata(s, mc, 0, cflags);
831 if (err <= 0)
832 goto err_out_put;
834 err = 0;
836 err_out_put:
837 dnet_state_put(base);
838 err_out_exit:
839 return err;
843 static int dnet_merge_upload(struct dnet_node *n, struct dnet_meta_container *mc)
845 struct dnet_net_state *base;
846 int err = 0;
848 base = dnet_node_state(n);
849 if (!base) {
850 err = -ENOENT;
851 goto err_out_exit;
854 err = n->cb->send(base, n->cb->command_private, &mc->id);
855 if (err)
856 goto err_out_put;
858 err = dnet_write_metadata(n, mc, 0);
859 if (err <= 0)
860 goto err_out_put;
862 err_out_put:
863 dnet_state_put(base);
864 err_out_exit:
865 return err;
869 static int dnet_merge_common(struct dnet_session *s, struct dnet_meta_container *remote_meta, struct dnet_meta_container *mc)
871 struct dnet_node *n = s->node;
872 int err = 0;
873 struct dnet_meta_update local, remote;
874 uint64_t cflags = 0;
875 uint64_t ioflags = 0;
877 dnet_log(n, DNET_LOG_DEBUG, "in dnet_merge_common mc->size = %d\n", mc->size);
878 if (!dnet_get_meta_update(n, mc, &local)) {
879 err = -ENOENT;
880 dnet_log(n, DNET_LOG_ERROR, "%s: META_UPDATE not found in local meta\n", dnet_dump_id(&mc->id));
881 goto err_out_exit;
884 if (!dnet_get_meta_update(n, remote_meta, &remote)) {
885 err = -ENOENT;
886 dnet_log(n, DNET_LOG_ERROR, "%s: META_UPDATE not found in remote meta, perform direct merge\n", dnet_dump_id(&mc->id));
887 err = dnet_merge_direct(s, mc);
888 goto err_out_exit;
891 if ((local.tm.tsec > remote.tm.tsec) || (local.tm.tsec == remote.tm.tsec && local.tm.tnsec > remote.tm.tnsec)) {
892 if (local.flags & DNET_IO_FLAGS_REMOVED) {
893 err = dnet_remove_object_now(s, &mc->id, cflags, ioflags);
894 } else {
895 err = dnet_merge_direct(s, mc);
899 err_out_exit:
900 return err;
904 static int dnet_check_merge(struct dnet_session *s, struct dnet_meta_container *mc)
906 struct dnet_node *n = s->node;
907 int err;
908 struct dnet_meta_container remote_mc;
910 dnet_log(n, DNET_LOG_DEBUG, "in dnet_check_merge mc->size = %d\n", mc->size);
911 memset(&remote_mc, 0, sizeof(struct dnet_meta_container));
913 err = dnet_read_meta(s, &remote_mc, NULL, 0, &mc->id);
914 if (err) {
915 if (err != -ENOENT) {
916 dnet_log_raw(n, DNET_LOG_ERROR, "%s: failed to download object to be merged from storage: %d.\n",
917 dnet_dump_id(&mc->id), err);
918 goto err_out_exit;
921 dnet_log_raw(n, DNET_LOG_INFO, "%s: there is no meta in the storage to merge with, "
922 "doing direct merge (plain upload).\n", dnet_dump_id(&mc->id));
923 err = dnet_merge_direct(s, mc);
924 } else {
926 err = dnet_merge_common(s, &remote_mc, mc);
929 if (err)
930 goto err_out_exit;
932 err_out_exit:
933 if (remote_mc.data)
934 free(remote_mc.data);
935 return err;
938 int dnet_check(struct dnet_node *n, struct dnet_meta_container *mc, struct dnet_bulk_array *bulk_array,
939 int need_merge, struct dnet_check_params *params)
941 int err = 0;
942 struct dnet_session *s = dnet_session_create(n);
943 dnet_session_set_groups(s, (int *)&n->id.group_id, 1);
945 dnet_log(n, DNET_LOG_DEBUG, "need_merge = %d, mc.size = %d\n", need_merge, mc->size);
946 if (need_merge) {
947 err = dnet_check_merge(s, mc);
948 dnet_log(n, DNET_LOG_DEBUG, "err=%d\n", err);
949 if (!err)
950 dnet_merge_remove_local(n, &mc->id, 0);
951 } else {
952 err = dnet_check_copies(n, mc, bulk_array, params);
954 return err;