1 package MogileFS
::Worker
::Replicate
;
2 # replicates files around
5 use base
'MogileFS::Worker';
7 'fidtodo', # hashref { fid => 1 }
11 use MogileFS
::Util
qw(error every debug);
14 use MogileFS
::ReplicationRequest
qw(rr_upgrade);
16 # setup the value used in a 'nexttry' field to indicate that this item will never
17 # actually be tried again and require some sort of manual intervention.
18 use constant ENDOFTIME
=> 2147483647;
20 sub end_of_time
{ ENDOFTIME
; }
23 my ($class, $psock) = @_;
24 my $self = fields
::new
($class);
25 $self->SUPER::new
($psock);
26 $self->{fidtodo
} = {};
31 sub watchdog_timeout
{ 90; }
36 # give the monitor job 15 seconds to give us an update
37 my $warn_after = time() + 15;
40 # replication doesn't go well if the monitor job hasn't actively started
41 # marking things as being available
42 unless ($self->monitor_has_run) {
43 error
("waiting for monitor job to complete a cycle before beginning replication")
44 if time() > $warn_after;
49 my $dbh = $self->get_dbh or return 0;
50 my $sto = Mgd
::get_store
();
51 $self->send_to_parent("worker_bored 100 replicate rebalance");
53 my $queue_todo = $self->queue_todo('replicate');
54 my $queue_todo2 = $self->queue_todo('rebalance');
55 unless (@
$queue_todo || @
$queue_todo2) {
59 while (my $todo = shift @
$queue_todo) {
60 my $fid = $todo->{fid
};
61 $self->replicate_using_torepl_table($todo);
63 while (my $todo = shift @
$queue_todo2) {
65 # deserialize the arg :/
66 $todo->{arg
} = [split /,/, $todo->{arg
}];
68 MogileFS
::DevFID
->new($todo->{devid
}, $todo->{fid
});
69 $self->rebalance_devfid($devfid,
70 { target_devids
=> $todo->{arg
} });
72 # If files error out, we want to send the error up to syslog
73 # and make a real effort to chew through the queue. Users may
74 # manually re-run rebalance to retry.
75 $sto->delete_fid_from_file_to_queue($todo->{fid
}, REBAL_QUEUE
);
77 $_[0]->(0); # don't sleep.
81 # return 1 if we did something (or tried to do something), return 0 if
82 # there was nothing to be done.
83 sub replicate_using_torepl_table
{
87 # find some fids to replicate, prioritize based on when they should be tried
88 my $sto = Mgd
::get_store
();
90 my $fid = $todo->{fid
};
96 $opts{errref
} = \
$errcode;
97 $opts{no_unlock
} = 1; # to make it return an $unlock subref
98 $opts{source_devid
} = $todo->{fromdevid
} if $todo->{fromdevid
};
100 my ($status, $unlock) = replicate
($fid, %opts);
103 # $status is either 0 (failure, handled below), 1 (success, we actually
104 # replicated this file), or 2 (success, but someone else replicated it).
106 # when $staus eq "lost_race", this delete is unnecessary normally
107 # (somebody else presumably already deleted it if they
108 # also replicated it), but in the case of running with old
109 # replicators from previous versions, -or- simply if the
110 # other guy's delete failed, this cleans it up....
111 $sto->delete_fid_from_file_to_replicate($fid);
112 $unlock->() if $unlock;
116 debug
("Replication of fid=$fid failed with errcode=$errcode") if $Mgd::DEBUG
>= 2;
120 # README: please keep this up to date if you update the replicate() function so we ensure
121 # that this code always does the right thing
124 # failed_getting_lock => harmless. skip. somebody else probably doing.
127 # too_happy => too many copies, attempt to rebalance.
129 # -- TEMPORARY; DO EXPONENTIAL BACKOFF --
130 # source_down => only source available is observed down.
131 # policy_error_doing_failed => policy plugin fucked up. it's looping.
132 # policy_error_already_there => policy plugin fucked up. it's dumb.
133 # policy_no_suggestions => no copy was attempted. policy is just not happy.
134 # copy_error => policy said to do 1+ things, we failed, it ran out of suggestions.
136 # -- FATAL; DON'T TRY AGAIN --
137 # no_source => it simply exists nowhere. not that something's down, but file_on is empty.
139 # bail if we failed getting the lock, that means someone else probably
140 # already did it, so we should just move on
141 if ($errcode eq 'failed_getting_lock') {
142 $unlock->() if $unlock;
146 # logic for setting the next try time appropriately
147 my $update_nexttry = sub {
148 my ($type, $delay) = @_;
149 my $sto = Mgd
::get_store
();
150 if ($type eq 'end_of_time') {
151 # special; update to a time that won't happen again,
152 # as we've encountered a scenario in which case we're
154 $sto->reschedule_file_to_replicate_absolute($fid, ENDOFTIME
);
155 } elsif ($type eq "offset") {
156 $sto->reschedule_file_to_replicate_relative($fid, $delay+0);
158 $sto->reschedule_file_to_replicate_absolute($fid, $delay+0);
162 # now let's handle any error we want to consider a total failure; do not
163 # retry at any point. push this file off to the end so someone has to come
164 # along and figure out what went wrong.
165 if ($errcode eq 'no_source') {
166 $update_nexttry->( end_of_time
=> 1 );
167 $unlock->() if $unlock;
171 # try to shake off extra copies. fall through to the backoff logic
172 # so we don't flood if it's impossible to properly weaken the fid.
173 # there's a race where the fid could be checked again, but the
174 # exclusive locking prevents replication clobbering.
175 if ($errcode eq 'too_happy') {
176 $unlock->() if $unlock;
178 my $f = MogileFS
::FID
->new($fid);
179 my @devs = List
::Util
::shuffle
($f->devids);
181 # First one we can delete from, we try to rebalance away from.
183 my $dev = MogileFS
::Device
->of_devid($_);
184 # Not positive 'can_read_from' needs to be here.
185 # We must be able to delete off of this dev so the fid can
187 if ($dev->can_delete_from && $dev->can_read_from) {
188 $devfid = MogileFS
::DevFID
->new($dev, $f);
192 $self->rebalance_devfid($devfid) if $devfid;
195 # at this point, the rest of the errors require exponential backoff. define what this means
196 # as far as failcount -> delay to next try.
197 # 15s, 1m, 5m, 30m, 1h, 2h, 4h, 8h, 24h, 24h, 24h, 24h, ...
198 my @backoff = qw( 15 60 300 1800 3600 7200 14400 28800 );
199 $update_nexttry->( offset
=> int(($backoff[$todo->{failcount
}] || 86400) * (rand(0.4) + 0.8)) );
200 $unlock->() if $unlock;
204 # Return 1 on success, 0 on failure.
205 sub rebalance_devfid
{
206 my ($self, $devfid, $opts) = @_;
208 MogileFS
::Util
::okay_args
($opts, qw(avoid_devids target_devids));
210 my $fid = $devfid->fid;
212 # bail out early if this FID is no longer in the namespace (weird
213 # case where file is in file_on because not yet deleted, but
214 # has been replaced/deleted in 'file' table...). not too harmful
215 # (just noisy) if this line didn't exist, but whatever... it
216 # makes stuff cleaner on my intentionally-corrupted-for-fsck-testing
218 return 1 if ! $fid->exists;
221 my ($ret, $unlock) = replicate
($fid,
222 mask_devids
=> { $devfid->devid => 1 },
224 target_devids
=> $opts->{target_devids
},
231 error
("Rebalance for $devfid (" . $devfid->url . ") failed: $error");
235 unless ($ret || $errcode eq "too_happy") {
236 return $fail->("Replication failed");
239 my $should_delete = 0;
242 if ($errcode eq "too_happy" || $ret eq "lost_race") {
243 # for some reason, we did no work. that could be because
244 # either 1) we lost the race, as the error code implies,
245 # and some other process rebalanced this first, or 2)
246 # the file is over-replicated, and everybody just thinks they
247 # lost the race because the replication policy said there's
248 # nothing to do, even with this devfid masked away.
249 # so let's figure it out... if this devfid still exists,
250 # we're over-replicated, else we just lost the race.
251 if ($devfid->exists) {
254 # see if some copy, besides this one we want
255 # to delete, is currently alive & of right size..
256 # just as extra paranoid check before we delete it
257 foreach my $test_df ($fid->devfids) {
258 next if $test_df->devid == $devfid->devid;
259 if ($test_df->size_matches) {
261 $del_reason = "over_replicated";
267 $should_delete = 0; # no-op
269 } elsif ($ret eq "would_worsen") {
270 # replication has indicated we would be making ruining this fid's day
271 # if we delete an existing copy, so lets not do that.
272 # this indicates a condition where there're no suitable devices to
273 # copy new data onto, so lets be loud about it.
274 return $fail->("no suitable destination devices available");
277 $del_reason = "did_rebalance;ret=$ret";
282 $destroy_opts{ignore_missing
} = 1
283 if MogileFS
::Config
->config("rebalance_ignore_missing");
285 if ($should_delete) {
286 eval { $devfid->destroy(%destroy_opts) };
288 return $fail->("HTTP delete (due to '$del_reason') failed: $@");
296 # replicates $fid to make sure it meets its class' replicate policy.
298 # README: if you update this sub to return a new error code, please update the
299 # appropriate callers to know how to deal with the errors returned.
303 # ($rv, $unlock_sub) -- when 'no_unlock' %opt is used. subref to release lock.
305 # 0 = failure (failure written to ${$opts{errref}})
307 # "lost_race" = skipping, we did no work and policy was already met.
308 # "nofid" => fid no longer exists. skip replication.
310 my ($fid, %opts) = @_;
311 $fid = MogileFS
::FID
->new($fid) unless ref $fid;
312 my $fidid = $fid->id;
314 debug
("Replication for $fidid called, opts=".join(',',keys(%opts))) if $Mgd::DEBUG
>= 2;
316 my $errref = delete $opts{'errref'};
317 my $no_unlock = delete $opts{'no_unlock'};
318 my $sdevid = delete $opts{'source_devid'};
319 my $mask_devids = delete $opts{'mask_devids'} || {};
320 my $avoid_devids = delete $opts{'avoid_devids'} || {};
321 my $target_devids = delete $opts{'target_devids'} || []; # inverse of avoid_devids.
322 die "unknown_opts" if %opts;
323 die unless ref $mask_devids eq "HASH";
325 # bool: if source was explicitly requested by caller
326 my $fixed_source = $sdevid ?
1 : 0;
328 my $sto = Mgd
::get_store
();
330 $sto->note_done_replicating($fidid);
333 my $retunlock = sub {
335 my ($errmsg, $errcode);
337 ($errcode, $errmsg) = @_;
338 $errmsg = "$errcode: $errmsg"; # include code with message
342 $$errref = $errcode if $errref;
345 if ($errcode && $errcode eq "failed_getting_lock") {
346 # don't emit a warning with error() on lock failure. not
347 # a big deal, don't scare people.
350 $ret = $rv ?
$rv : error
($errmsg);
353 die "ERROR: must be called in list context w/ no_unlock" unless wantarray;
354 return ($ret, $unlock);
356 die "ERROR: must not be called in list context w/o no_unlock" if wantarray;
362 # hashref of devid -> MogileFS::Device
363 my $devs = MogileFS
::Device
->map
364 or die "No device map";
366 return $retunlock->(0, "failed_getting_lock", "Unable to obtain lock for fid $fidid")
367 unless $sto->should_begin_replicating_fidid($fidid);
369 # if the fid doesn't even exist, consider our job done! no point
370 # replicating file contents of a file no longer in the namespace.
371 return $retunlock->("nofid") unless $fid->exists;
373 my $cls = $fid->class;
374 my $polobj = $cls->repl_policy_obj;
376 # learn what this devices file is already on
377 my @on_devs; # all devices fid is on, reachable or not.
378 my @on_devs_tellpol; # subset of @on_devs, to tell the policy class about
379 my @on_up_devid; # subset of @on_devs: just devs that are readable
381 foreach my $devid ($fid->devids) {
382 my $d = MogileFS
::Device
->of_devid($devid)
385 if ($d->dstate->should_have_files && ! $mask_devids->{$devid}) {
386 push @on_devs_tellpol, $d;
388 if ($d->dstate->can_read_from) {
389 push @on_up_devid, $devid;
393 return $retunlock->(0, "no_source", "Source is no longer available replicating $fidid") if @on_devs == 0;
394 return $retunlock->(0, "source_down", "No alive devices available replicating $fidid") if @on_up_devid == 0;
396 # if they requested a specific source, that source must be up.
397 if ($sdevid && ! grep { $_ == $sdevid} @on_up_devid) {
398 return $retunlock->(0, "source_down", "Requested replication source device $sdevid not available");
401 my %dest_failed; # devid -> 1 for each devid we were asked to copy to, but failed.
402 my %source_failed; # devid -> 1 for each devid we had problems reading from.
403 my $got_copy_request = 0; # true once replication policy asks us to move something somewhere
406 my $dest_devs = $devs;
407 if (@
$target_devids) {
408 $dest_devs = {map { $_ => $devs->{$_} } @
$target_devids};
411 my $rr; # MogileFS::ReplicationRequest
413 $rr = rr_upgrade
($polobj->replicate_to(
415 on_devs
=> \
@on_devs_tellpol, # all device objects fid is on, dead or otherwise
416 all_devs
=> $dest_devs,
417 failed
=> \
%dest_failed,
418 min
=> $cls->mindevcount,
421 last if $rr->is_happy;
423 my @ddevs; # dest devs, in order of preference
424 my $ddevid; # dest devid we've chosen to copy to
425 if (@ddevs = $rr->copy_to_one_of_ideally) {
426 if (my @not_masked_ids = (grep { ! $mask_devids->{$_} &&
427 ! $avoid_devids->{$_}
429 map { $_->id } @ddevs)) {
430 $ddevid = $not_masked_ids[0];
432 # once we masked devids away, there were no
433 # ideal suggestions. this is the case of rebalancing,
434 # which without this check could 'worsen' the state
435 # of the world. consider the case:
436 # h1[ d1 d2 ] h2[ d3 ]
437 # and files are on d1 & d3, an ideal layout.
438 # if d3 is being rebalanced, and masked away, the
439 # replication policy could presumably say to put
440 # the file on d2, even though d3 isn't dead.
441 # so instead, when masking is in effect, we don't
442 # use non-ideal placement, just bailing out.
444 # this used to return "lost_race" as a lie, but rebalance was
445 # happily deleting the masked fid if at least one other fid
446 # existed... because it assumed it was over replicated.
447 # now we tell rebalance that touching this fid would be
449 return $retunlock->("would_worsen");
451 } elsif (@ddevs = $rr->copy_to_one_of_desperate) {
452 # TODO: reschedule a replication for 'n' minutes in future, or
453 # when new hosts/devices become available or change state
454 $ddevid = $ddevs[0]->id;
459 $got_copy_request = 1;
461 # replication policy shouldn't tell us to put a file on a device
462 # we've already told it that we've failed at. so if we get that response,
463 # the policy plugin is broken and we should terminate now.
464 if ($dest_failed{$ddevid}) {
465 return $retunlock->(0, "policy_error_doing_failed",
466 "replication policy told us to do something we already told it we failed at while replicating fid $fidid");
469 # replication policy shouldn't tell us to put a file on a
470 # device that it's already on. that's just stupid.
471 if (grep { $_->id == $ddevid } @on_devs) {
472 return $retunlock->(0, "policy_error_already_there",
473 "replication policy told us to put fid $fidid on dev $ddevid, but it's already there!");
476 # find where we're replicating from
477 unless ($fixed_source) {
478 # TODO: use an observed good device+host as source to start.
479 my @choices = grep { ! $source_failed{$_} } @on_up_devid;
480 return $retunlock->(0, "source_down", "No devices available replicating $fidid") unless @choices;
481 @choices = List
::Util
::shuffle
(@choices);
482 MogileFS
::run_global_hook
('replicate_order_final_choices', $devs, \
@choices);
483 $sdevid = shift @choices;
486 my $worker = MogileFS
::ProcManager
->is_child or die;
492 expected_len
=> undef, # FIXME: get this info to pass along
493 errref
=> \
$copy_err,
494 callback
=> sub { $worker->still_alive; },
496 die "Bogus error code: $copy_err" if !$rv && $copy_err !~ /^(?:src|dest)_error$/;
499 error
("Failed copying fid $fidid from devid $sdevid to devid $ddevid (error type: $copy_err)");
500 if ($copy_err eq "src_error") {
501 $source_failed{$sdevid} = 1;
504 # there can't be any more retries, as this source
505 # is busted and is the only one we wanted.
506 return $retunlock->(0, "copy_error", "error copying fid $fidid from devid $sdevid during replication");
510 $dest_failed{$ddevid} = 1;
515 my $dfid = MogileFS
::DevFID
->new($ddevid, $fid);
518 push @on_devs, $devs->{$ddevid};
519 push @on_devs_tellpol, $devs->{$ddevid};
520 push @on_up_devid, $ddevid;
523 # We are over replicated. Let caller decide if it should rebalance.
524 if ($rr->too_happy) {
525 return $retunlock->(0, "too_happy", "fid $fidid is on too many devices");
529 return $retunlock->(1) if $got_copy_request;
530 return $retunlock->("lost_race"); # some other process got to it first. policy was happy immediately.
533 return $retunlock->(0, "policy_no_suggestions",
534 "replication policy ran out of suggestions for us replicating fid $fidid");
537 # copies a file from one Perlbal to another utilizing HTTP
540 my ($sdevid, $ddevid, $fid, $rfid, $expected_clen, $intercopy_cb, $errref) =
541 map { delete $opts{$_} } qw(sdevid
552 $intercopy_cb ||= sub {};
554 # handles setting unreachable magic; $error->(reachability, "message")
555 my $error_unreachable = sub {
556 $$errref = "src_error" if $errref;
557 return error
("Fid $fid unreachable while replicating: $_[0]");
560 my $dest_error = sub {
561 $$errref = "dest_error" if $errref;
566 my $src_error = sub {
567 $$errref = "src_error" if $errref;
572 # get some information we'll need
573 my $sdev = MogileFS
::Device
->of_devid($sdevid);
574 my $ddev = MogileFS
::Device
->of_devid($ddevid);
576 return error
("Error: unable to get device information: source=$sdevid, destination=$ddevid, fid=$fid")
577 unless $sdev && $ddev && $sdev->exists && $ddev->exists;
579 my $s_dfid = MogileFS
::DevFID
->new($sdev, $fid);
580 my $d_dfid = MogileFS
::DevFID
->new($ddev, $fid);
582 my ($spath, $dpath) = (map { $_->uri_path } ($s_dfid, $d_dfid));
583 my ($shost, $dhost) = (map { $_->host } ($sdev, $ddev));
585 my ($shostip, $sport) = ($shost->ip, $shost->http_port);
586 if (MogileFS
::Config
->config("repl_use_get_port")) {
587 $sport = $shost->http_get_port;
589 my ($dhostip, $dport) = ($dhost->ip, $dhost->http_port);
590 unless (defined $spath && defined $dpath && defined $shostip && defined $dhostip && $sport && $dport) {
591 # show detailed information to find out what's not configured right
592 error
("Error: unable to replicate file fid=$fid from device id $sdevid to device id $ddevid");
593 error
(" http://$shostip:$sport$spath -> http://$dhostip:$dport$dpath");
597 # need by webdav servers, like lighttpd...
598 $ddev->vivify_directories($d_dfid->url);
600 # setup our pipe error handler, in case we get closed on
602 local $SIG{PIPE
} = sub { $pipe_closed = 1; };
604 # call a hook for odd casing completely different source data
605 # for specific files.
607 MogileFS
::run_global_hook
('replicate_alternate_source',
608 $rfid, \
$shostip, \
$sport, \
$spath, \
$shttphost);
610 # okay, now get the file
611 my $sock = IO
::Socket
::INET
->new(PeerAddr
=> $shostip, PeerPort
=> $sport, Timeout
=> 2)
612 or return $src_error->("Unable to create source socket to $shostip:$sport for $spath");
613 unless ($shttphost) {
614 $sock->write("GET $spath HTTP/1.0\r\n\r\n");
616 # plugin set a custom host.
617 $sock->write("GET $spath HTTP/1.0\r\nHost: $shttphost\r\n\r\n");
619 return error
("Pipe closed retrieving $spath from $shostip:$sport")
622 # we just want a content length
624 # FIXME: this can block. needs to timeout.
625 while (defined (my $line = <$sock>)) {
626 $line =~ s/[\s\r\n]+$//;
627 last unless length $line;
628 if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
629 # make sure we get a good response
630 return $error_unreachable->("Error: Resource http://$shostip:$sport$spath failed: HTTP $1")
631 unless $1 >= 200 && $1 <= 299;
633 next unless $line =~ /^Content-length:\s*(\d+)\s*$/i;
636 return $error_unreachable->("File $spath has unexpected content-length of $clen, not $expected_clen")
637 if defined $expected_clen && $clen != $expected_clen;
639 # open target for put
640 my $dsock = IO
::Socket
::INET
->new(PeerAddr
=> $dhostip, PeerPort
=> $dport, Timeout
=> 2)
641 or return $dest_error->("Unable to create dest socket to $dhostip:$dport for $dpath");
642 $dsock->write("PUT $dpath HTTP/1.0\r\nContent-length: $clen\r\n\r\n")
643 or return $dest_error->("Unable to write data to $dpath on $dhostip:$dport");
644 return $dest_error->("Pipe closed during write to $dpath on $dhostip:$dport")
647 # now read data and print while we're reading.
648 my ($data, $written, $remain) = ('', 0, $clen);
649 my $bytes_to_read = 1024*1024; # read 1MB at a time until there's less than that remaining
650 $bytes_to_read = $remain if $remain < $bytes_to_read;
651 my $finished_read = 0;
653 if ($bytes_to_read) {
654 while (!$pipe_closed && (my $bytes = $sock->read($data, $bytes_to_read))) {
655 # now we've read in $bytes bytes
657 $bytes_to_read = $remain if $remain < $bytes_to_read;
659 my $wbytes = $dsock->send($data);
661 return $dest_error->("Error: wrote $wbytes; expected to write $bytes; failed putting to $dpath")
662 unless $wbytes == $bytes;
665 die if $bytes_to_read < 0;
666 next if $bytes_to_read;
674 return $dest_error->("closed pipe writing to destination") if $pipe_closed;
675 return $src_error->("error reading midway through source: $!") unless $finished_read;
677 # now read in the response line (should be first line)
679 if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
680 return 1 if $1 >= 200 && $1 <= 299;
681 return $dest_error->("Got HTTP status code $1 PUTing to http://$dhostip:$dport$dpath");
683 return $dest_error->("Error: HTTP response line not recognized writing to http://$dhostip:$dport$dpath: $line");
692 # indent-tabs-mode: nil
699 MogileFS::Worker::Replicate -- replicates files
703 This process replicates files enqueued in B<file_to_replicate> table.
705 The replication policy (which devices to replicate to) is pluggable,
706 but only one policy comes with the server. See
707 L<MogileFS::ReplicationPolicy::MultipleHosts>
713 L<MogileFS::ReplicationPolicy>
715 L<MogileFS::ReplicationPolicy::MultipleHosts>