2 # some of the comments match the comments in MogileFS/Worker/Fsck.pm
3 # _exactly_ for reference purposes
8 use Time::HiRes qw(sleep);
12 find_mogclient_or_skip();
15 my $sto = eval { temp_store(); };
17 plan skip_all => "Can't create temporary test database: $@";
23 $mogroot{1} = File::Temp::tempdir( CLEANUP => 1 );
24 $mogroot{2} = File::Temp::tempdir( CLEANUP => 1 );
25 $mogroot{3} = File::Temp::tempdir( CLEANUP => 1 );
26 my $dev2host = { 1 => 1, 2 => 2, 3 => 2 };
27 foreach (sort { $a <=> $b } keys %$dev2host) {
28 my $root = $mogroot{$dev2host->{$_}};
29 mkdir("$root/dev$_") or die "Failed to create dev$_ dir: $!";
32 my $ms1 = create_mogstored("127.0.1.1", $mogroot{1});
33 ok($ms1, "got mogstored1");
34 my $ms2 = create_mogstored("127.0.1.2", $mogroot{2});
35 ok($ms2, "got mogstored2");
37 while (! -e "$mogroot{1}/dev1/usage" ||
38 ! -e "$mogroot{2}/dev2/usage" ||
39 ! -e "$mogroot{2}/dev3/usage") {
40 print "Waiting on usage...\n";
44 my $tmptrack = create_temp_tracker($sto);
47 my $admin = IO::Socket::INET->new(PeerAddr => '127.0.0.1:7001');
48 $admin or die "failed to create admin socket: $!";
49 my $moga = MogileFS::Admin->new(hosts => [ "127.0.0.1:7001" ]);
50 my $mogc = MogileFS::Client->new(
52 hosts => [ "127.0.0.1:7001" ],
54 my $be = $mogc->{backend}; # gross, reaching inside of MogileFS::Client
56 # test some basic commands to backend
57 ok($tmptrack->mogadm("domain", "add", "testdom"), "created test domain");
58 ok($tmptrack->mogadm("class", "add", "testdom", "2copies", "--mindevcount=2"), "created 2copies class in testdom");
59 ok($tmptrack->mogadm("class", "add", "testdom", "1copy", "--mindevcount=1"), "created 1copy class in testdom");
61 ok($tmptrack->mogadm("host", "add", "hostA", "--ip=127.0.1.1", "--status=alive"), "created hostA");
62 ok($tmptrack->mogadm("host", "add", "hostB", "--ip=127.0.1.2", "--status=alive"), "created hostB");
64 ok($tmptrack->mogadm("device", "add", "hostA", 1), "created dev1 on hostA");
65 ok($tmptrack->mogadm("device", "add", "hostB", 2), "created dev2 on hostB");
67 sub wait_for_monitor {
69 my $was = $be->{timeout}; # can't use local on phash :(
71 ok($be->do_request("clear_cache", {}), "waited for monitor")
72 or die "Failed to wait for monitor";
73 ok($be->do_request("clear_cache", {}), "waited for monitor")
74 or die "Failed to wait for monitor";
75 $be->{timeout} = $was;
78 sub watcher_wait_for {
81 my $watcher = IO::Socket::INET->new(PeerAddr => '127.0.0.1:7001');
82 $watcher or die "failed to create watcher socket: $!";
83 $watcher->syswrite("!watch\r\n");
85 $cb->(); # usually this is to start fsck
88 $line = $watcher->getline;
89 } until ($line =~ /$re/);
94 sub wait_for_empty_queue {
95 my ($table, $dbh) = @_;
101 $count = $dbh->selectrow_array("SELECT COUNT(*) from $table");
102 return if ($count == 0);
105 my $time = $delay * $limit;
106 die "$table is not empty after ${time}s!";
110 my ($tmptrack, $dbh) = @_;
112 # this should help prevent race conditions:
113 wait_for_empty_queue("file_to_queue", $dbh);
115 ok($tmptrack->mogadm("fsck", "stop"), "stop fsck");
116 ok($tmptrack->mogadm("fsck", "clearlog"), "clear fsck log");
117 ok($tmptrack->mogadm("fsck", "reset"), "reset fsck");
118 ok($tmptrack->mogadm("fsck", "start"), "started fsck");
121 sub unblock_fsck_queue {
122 my ($sto, $expect) = @_;
123 my $now = $sto->unix_timestamp;
124 my $upd = sub { $sto->dbh->do("UPDATE file_to_queue SET nexttry = $now") };
125 is($sto->retry_on_deadlock($upd), $expect, "unblocked fsck queue");
128 sub get_worker_pids {
129 my ($admin, $worker) = @_;
131 ok($admin->syswrite("!jobs\n"), "requested jobs");
134 while (my $line = $admin->getline) {
135 last if $line =~ /^\.\r?\n/;
137 if ($line =~ /^$worker pids (\d[\d+\s]*)\r?\n/) {
138 @pids = split(/\s+/, $1);
141 ok(scalar(@pids), "got pid(s) of $worker");
146 sub shutdown_worker {
147 my ($admin, $worker) = @_;
149 watcher_wait_for(qr/Job $worker has only 0/, sub {
150 $admin->syswrite("!to $worker :shutdown\r\n");
151 like($admin->getline, qr/^Message sent to 1 children/, "tracker sent message to child");
152 like($admin->getline, qr/^\./, "tracker ended transmission");
156 wait_for_monitor($be);
158 my ($req, $rv, %opts, @paths, @fsck_log, $info);
159 my $ua = LWP::UserAgent->new;
165 # upload a file and wait for replica to appear
167 my $fh = $mogc->new_file($key, "1copy");
169 ok(close($fh), "closed file");
172 # first obvious fucked-up case: no devids even presumed to exist.
174 $info = $mogc->file_info($key);
175 is($info->{devcount}, 1, "ensure devcount is correct at start");
177 # ensure repl queue is empty before destroying file_on
178 wait_for_empty_queue("file_to_replicate", $dbh);
180 my @devids = $sto->fid_devids($info->{fid});
181 is(scalar(@devids), 1, "devids retrieved");
182 is($sto->remove_fidid_from_devid($info->{fid}, $devids[0]), 1,
183 "delete $key from file_on table");
185 full_fsck($tmptrack, $dbh);
187 @fsck_log = $sto->fsck_log_rows;
188 } while (scalar(@fsck_log) < 3 && sleep(0.1));
190 wait_for_empty_queue("file_to_queue", $dbh);
191 @fsck_log = $sto->fsck_log_rows;
193 my $nopa = $fsck_log[0];
194 is($nopa->{evcode}, "NOPA", "evcode for no paths logged");
196 # entering "desperate" mode
197 my $srch = $fsck_log[1];
198 is($srch->{evcode}, "SRCH", "evcode for start search logged");
200 # wow, we actually found it!
201 my $fond = $fsck_log[2];
202 is($fond->{evcode}, "FOND", "evcode for start search logged");
204 $info = $mogc->file_info($key);
205 is($info->{devcount}, 1, "ensure devcount is correct at fsck end");
206 @paths = $mogc->get_paths($key);
207 is(scalar(@paths), 1, "get_paths returns correctly at fsck end");
210 # update class to require 2copies and have fsck fix it
212 @paths = $mogc->get_paths($key);
213 is(scalar(@paths), 1, "only one path exists before fsck");
215 # _NOT_ using "updateclass" command since that enqueues for replication
216 my $fid = MogileFS::FID->new($info->{fid});
217 my $classid_2copies = $sto->get_classid_by_name($fid->dmid, "2copies");
218 is($fid->update_class(classid => $classid_2copies), 1, "classid updated");
220 full_fsck($tmptrack, $dbh);
223 @paths = $mogc->get_paths($key);
224 } while (scalar(@paths) == 1 and sleep(0.1));
225 is(scalar(@paths), 2, "replicated from fsck");
227 $info = $mogc->file_info($key);
228 is($info->{devcount}, 2, "ensure devcount is updated by replicate");
231 @fsck_log = $sto->fsck_log_rows;
232 } while (scalar(@fsck_log) == 0 and sleep(0.1));
234 my $povi = $fsck_log[0];
235 is($povi->{evcode}, "POVI", "policy violation logged by fsck");
237 my $repl = $fsck_log[1];
238 is($repl->{evcode}, "REPL", "replication request logged by fsck");
241 # wrong devcount in file column, but otherwise everything is OK
243 foreach my $wrong_devcount (13, 0, 1) {
245 $dbh->do("UPDATE file SET devcount = ? WHERE fid = ?",
246 undef, $wrong_devcount, $info->{fid});
248 is($sto->retry_on_deadlock($upd), 1, "set improper devcount");
250 $info = $mogc->file_info($key);
251 is($info->{devcount}, $wrong_devcount, "devcount is set to $wrong_devcount");
253 full_fsck($tmptrack, $dbh);
256 $info = $mogc->file_info($key);
257 } while ($info->{devcount} == $wrong_devcount && sleep(0.1));
258 is($info->{devcount}, 2, "devcount is corrected by fsck");
260 wait_for_empty_queue("file_to_queue", $dbh);
261 @fsck_log = $sto->fsck_log_rows;
262 is($fsck_log[0]->{evcode}, "BCNT", "bad count logged");
266 # nuke a file from disk but keep the file_on row
268 @paths = $mogc->get_paths($key);
269 is(scalar(@paths), 2, "two paths returned from get_paths");
270 $req = HTTP::Request->new(DELETE => $paths[0]);
271 $rv = $ua->request($req);
272 ok($rv->is_success, "DELETE successful");
274 full_fsck($tmptrack, $dbh);
276 @fsck_log = $sto->fsck_log_rows;
277 } while (scalar(@fsck_log) < 2 && sleep(0.1));
279 my $miss = $fsck_log[0];
280 is($miss->{evcode}, "MISS", "missing file logged by fsck");
282 my $repl = $fsck_log[1];
283 is($repl->{evcode}, "REPL", "replication request logged by fsck");
285 wait_for_empty_queue("file_to_replicate", $dbh);
287 @paths = $mogc->get_paths($key);
288 is(scalar(@paths), 2, "two paths returned from get_paths");
289 foreach my $path (@paths) {
290 $rv = $ua->get($path);
291 is($rv->content, "hello\n", "GET successful on restored path");
295 # change the length of a file from disk and have fsck correct it
297 @paths = $mogc->get_paths($key);
298 is(scalar(@paths), 2, "two paths returned from get_paths");
299 $req = HTTP::Request->new(PUT => $paths[0]);
300 $req->content("hello\r\n");
301 $rv = $ua->request($req);
302 ok($rv->is_success, "PUT successful");
304 full_fsck($tmptrack, $dbh);
306 @fsck_log = $sto->fsck_log_rows;
307 } while (scalar(@fsck_log) < 2 && sleep(0.1));
309 my $blen = $fsck_log[0];
310 is($blen->{evcode}, "BLEN", "missing file logged by fsck");
312 my $repl = $fsck_log[1];
313 is($repl->{evcode}, "REPL", "replication request logged by fsck");
315 wait_for_empty_queue("file_to_replicate", $dbh);
317 @paths = $mogc->get_paths($key);
318 is(scalar(@paths), 2, "two paths returned from get_paths");
319 foreach my $path (@paths) {
320 $rv = $ua->get($path);
321 is($rv->content, "hello\n", "GET successful on restored path");
325 # nuke a file completely and irreparably
327 @paths = $mogc->get_paths($key);
328 is(scalar(@paths), 2, "two paths returned from get_paths");
329 foreach my $path (@paths) {
330 $req = HTTP::Request->new(DELETE => $path);
331 $rv = $ua->request($req);
332 ok($rv->is_success, "DELETE successful");
335 full_fsck($tmptrack, $dbh);
337 @fsck_log = $sto->fsck_log_rows;
338 } while (scalar(@fsck_log) < 4 && sleep(0.1));
340 is($fsck_log[0]->{evcode}, "MISS", "missing file logged for first path");
341 is($fsck_log[1]->{evcode}, "MISS", "missing file logged for second path");
342 is($fsck_log[2]->{evcode}, "SRCH", "desperate search attempt logged");
343 is($fsck_log[3]->{evcode}, "GONE", "inability to fix FID logged");
345 wait_for_empty_queue("file_to_queue", $dbh);
346 $info = $mogc->file_info($key);
348 is($info->{devcount}, 0, "devcount updated to zero");
349 @paths = $mogc->get_paths($key);
350 is(scalar(@paths), 0, "get_paths returns nothing");
353 # upload a file and wait for replica to appear
355 my $fh = $mogc->new_file($key, "2copies");
357 ok(close($fh), "closed file");
360 $info = $mogc->file_info($key);
361 } while ($info->{devcount} < 2);
362 is($info->{devcount}, 2, "ensure devcount is correct at start");
365 # stall fsck with a non-responsive host
367 is(kill("STOP", $ms1->pid), 1, "send SIGSTOP to mogstored1");
368 wait_for_monitor($be) foreach (1..3);
370 shutdown_worker($admin, "job_master");
371 shutdown_worker($admin, "fsck");
373 $sto->retry_on_deadlock(sub { $sto->dbh->do("DELETE FROM file_to_queue") });
374 watcher_wait_for(qr/\[fsck\(\d+\)] Connectivity problem reaching device/, sub {
375 full_fsck($tmptrack, $dbh);
377 is($sto->file_queue_length(MogileFS::Config::FSCK_QUEUE), 1, "fsck queue still blocked");
380 # resume fsck when host is responsive again
382 is(kill("CONT", $ms1->pid), 1, "send SIGCONT to mogstored1");
383 wait_for_monitor($be);
385 shutdown_worker($admin, "fsck");
387 # force fsck to wakeup and do work again
388 unblock_fsck_queue($sto, 1);
390 ok($admin->syswrite("!to fsck :wake_up\n"), "force fsck to wake up");
391 ok($admin->getline, "got wakeup response 1");
392 ok($admin->getline, "got wakeup response 2");
394 foreach my $i (1..30) {
395 last if $sto->file_queue_length(MogileFS::Config::FSCK_QUEUE) == 0;
400 is($sto->file_queue_length(MogileFS::Config::FSCK_QUEUE), 0, "fsck queue emptied");
403 # upload a file and wait for replica to appear
405 my $fh = $mogc->new_file($key, "2copies");
407 ok(close($fh), "closed file");
410 $info = $mogc->file_info($key);
411 } while ($info->{devcount} < 2);
412 is($info->{devcount}, 2, "ensure devcount is correct at start");
415 # stall fsck with a non-responsive host
416 # resume fsck when host is responsive again
418 is(kill("STOP", $ms1->pid), 1, "send SIGSTOP to mogstored1 to stall");
419 wait_for_monitor($be);
421 watcher_wait_for(qr/\[fsck\(\d+\)] Connectivity problem reaching device/, sub {
422 full_fsck($tmptrack, $dbh);
424 is($sto->file_queue_length(MogileFS::Config::FSCK_QUEUE), 1, "fsck queue still blocked");
426 is(kill("CONT", $ms1->pid), 1, "send SIGCONT to mogstored1 to resume");
427 wait_for_monitor($be);
429 # force fsck to wakeup and do work again
430 unblock_fsck_queue($sto, 1);
431 ok($admin->syswrite("!to fsck :wake_up\n"), "force fsck to wake up");
432 ok($admin->getline, "got wakeup response 1");
433 ok($admin->getline, "got wakeup response 2");
435 foreach my $i (1..30) {
436 last if $sto->file_queue_length(MogileFS::Config::FSCK_QUEUE) == 0;
441 is($sto->file_queue_length(MogileFS::Config::FSCK_QUEUE), 0, "fsck queue emptied");
444 # cleanup over-replicated file
446 $info = $mogc->file_info($key);
447 is($info->{devcount}, 2, "ensure devcount is correct at start");
449 # _NOT_ using "updateclass" command since that enqueues for replication
450 my $fid = MogileFS::FID->new($info->{fid});
451 my $classid_1copy = $sto->get_classid_by_name($fid->dmid, "1copy");
452 is($fid->update_class(classid => $classid_1copy), 1, "classid updated");
454 full_fsck($tmptrack, $dbh);
456 sleep(1) while $mogc->file_info($key)->{devcount} == 2;
457 is($mogc->file_info($key)->{devcount}, 1, "too-happy file cleaned up");
459 @fsck_log = $sto->fsck_log_rows;
460 is($fsck_log[0]->{evcode}, "POVI", "policy violation logged");
462 # replicator is requested to delete too-happy file
463 is($fsck_log[1]->{evcode}, "REPL", "replication request logged");
466 # kill a device and replace it with another, without help from reaper
467 # this test may become impossible if monitor + reaper are merged...
469 ok($mogc->update_class($key, "2copies"), "request 2 replicas again");
471 $info = $mogc->file_info($key);
472 } while ($info->{devcount} < 2);
473 is($info->{devcount}, 2, "ensure devcount is correct at start");
474 wait_for_empty_queue("file_to_replicate", $dbh);
476 my (@reaper_pids) = get_worker_pids($admin, "reaper");
477 is(scalar(@reaper_pids), 1, "only one reaper pid");
478 my $reaper_pid = $reaper_pids[0];
479 ok($reaper_pid, "got pid of reaper");
481 # reaper is watchdog is 240s, and it wakes up in 5s intervals right now
482 # so we should be safe from watchdog for now...
483 ok(kill("STOP", $reaper_pid), "stopped reaper from running");
485 ok($tmptrack->mogadm("device", "mark", "hostB", 2, "dead"), "mark dev2 as dead");
486 ok($tmptrack->mogadm("device", "add", "hostB", 3), "created dev3 on hostB");
487 wait_for_monitor($be);
489 full_fsck($tmptrack, $dbh);
491 sleep 1 while MogileFS::Config->server_setting("fsck_host");
493 foreach my $i (1..30) {
494 last if $sto->file_queue_length(MogileFS::Config::FSCK_QUEUE) == 0;
497 is($sto->file_queue_length(MogileFS::Config::FSCK_QUEUE), 0, "fsck queue empty");
499 # fsck should've corrected what reaper missed
500 @fsck_log = $sto->fsck_log_rows;
501 is(scalar(@fsck_log), 1, "fsck log populated");
502 is($fsck_log[0]->{evcode}, "REPL", "replication enqueued");
504 ok(kill("CONT", $reaper_pid), "resumed reaper");
508 ok($tmptrack->mogadm("fsck", "stop"), "stop fsck");
510 foreach my $i (1..10) {
511 my $fh = $mogc->new_file("k$i", "1copy");
513 ok(close($fh), "closed file ($i)");
515 $info = $mogc->file_info("k10");
517 ok($tmptrack->mogadm("settings", "set", "queue_rate_for_fsck", 1), "set queue_rate_for_fsck to 1");
518 ok($tmptrack->mogadm("settings", "set", "queue_size_for_fsck", 1), "set queue_size_for_fsck to 1");
520 wait_for_monitor($be) foreach (1..3);
522 shutdown_worker($admin, "job_master");
523 shutdown_worker($admin, "fsck");
525 ok($tmptrack->mogadm("fsck", "clearlog"), "clear fsck log");
526 ok($tmptrack->mogadm("fsck", "reset"), "reset fsck");
527 $sto->retry_on_deadlock(sub { $sto->dbh->do("DELETE FROM file_to_queue") });
528 ok($tmptrack->mogadm("fsck", "start"), "start fsck with slow queue rate");
530 ok(MogileFS::Config->server_setting("fsck_host"), "fsck_host set");
531 is(MogileFS::Config->server_setting("fsck_fid_at_end"), $info->{fid}, "fsck_fid_at_end matches");
534 foreach my $i (1..100) {
535 $highest = MogileFS::Config->server_setting("fsck_highest_fid_checked");
536 last if defined $highest;
539 ok(defined($highest), "fsck_highest_fid_checked is set");
540 like($highest, qr/\A\d+\z/, "fsck_highest_fid_checked is a digit");
541 isnt($highest, $info->{fid}, "fsck is not running too fast");
543 # wait for something to get fscked
544 foreach my $i (1..100) {
545 last if MogileFS::Config->server_setting("fsck_highest_fid_checked") != $highest;
549 my $old_highest = $highest;
550 $highest = MogileFS::Config->server_setting("fsck_highest_fid_checked");
551 isnt($highest, $old_highest, "moved to next FID");
553 ok($tmptrack->mogadm("fsck", "stop"), "stop fsck");
554 ok(! MogileFS::Config->server_setting("fsck_host"), "fsck_host unset");
555 is(MogileFS::Config->server_setting("fsck_fid_at_end"), $info->{fid}, "fsck_fid_at_end matches");
558 ok($tmptrack->mogadm("fsck", "start"), "restart fsck");
559 $highest = MogileFS::Config->server_setting("fsck_highest_fid_checked");
560 cmp_ok($highest, '>=', $old_highest, "fsck resumed without resetting fsck_highest_fid_checked");
562 # wait for something to get fscked
563 foreach my $i (1..200) {
564 last if MogileFS::Config->server_setting("fsck_highest_fid_checked") != $highest;
568 $highest = MogileFS::Config->server_setting("fsck_highest_fid_checked");
569 cmp_ok($highest, '>', $old_highest, "fsck continued to higher FID");
572 # upload new files, but ensure fsck does NOT reach them
574 my $last_fid = $sto->max_fidid;
576 foreach my $i (1..10) {
577 my $fh = $mogc->new_file("z$i", "1copy");
579 ok(close($fh), "closed file (z$i)");
582 # crank up fsck speed again
583 ok($tmptrack->mogadm("settings", "set", "queue_rate_for_fsck", 100), "set queue_rate_for_fsck to 100");
584 ok($tmptrack->mogadm("settings", "set", "queue_size_for_fsck", 100), "set queue_size_for_fsck to 100");
586 sleep 0.1 while MogileFS::Config->server_setting("fsck_host");
588 my $highest = MogileFS::Config->server_setting("fsck_highest_fid_checked");
589 is($highest, $last_fid, "fsck didn't advance beyond what we started with");