1 package MogileFS
::Worker
::Replicate
;
2 # replicates files around
5 use base
'MogileFS::Worker';
7 'fidtodo', # hashref { fid => 1 }
12 use MogileFS
::Util
qw(error every debug);
14 use MogileFS
::ReplicationRequest
qw(rr_upgrade);
16 use MIME
::Base64
qw(encode_base64);
19 my ($class, $psock) = @_;
20 my $self = fields
::new
($class);
21 $self->SUPER::new
($psock);
22 $self->{fidtodo
} = {};
27 sub watchdog_timeout
{ 90; }
33 $self->send_to_parent("worker_bored 100 replicate rebalance");
35 my $queue_todo = $self->queue_todo('replicate');
36 my $queue_todo2 = $self->queue_todo('rebalance');
37 return unless (@
$queue_todo || @
$queue_todo2);
39 return unless $self->validate_dbh;
40 my $sto = Mgd
::get_store
();
42 while (my $todo = shift @
$queue_todo) {
43 my $fid = $todo->{fid
};
44 $self->replicate_using_torepl_table($todo);
46 while (my $todo = shift @
$queue_todo2) {
48 # deserialize the arg :/
49 $todo->{arg
} = [split /,/, $todo->{arg
}];
51 MogileFS
::DevFID
->new($todo->{devid
}, $todo->{fid
});
52 $self->rebalance_devfid($devfid,
53 { target_devids
=> $todo->{arg
} });
55 # If files error out, we want to send the error up to syslog
56 # and make a real effort to chew through the queue. Users may
57 # manually re-run rebalance to retry.
58 $sto->delete_fid_from_file_to_queue($todo->{fid
}, REBAL_QUEUE
);
60 $_[0]->(0); # don't sleep.
64 # return 1 if we did something (or tried to do something), return 0 if
65 # there was nothing to be done.
66 sub replicate_using_torepl_table
{
70 # find some fids to replicate, prioritize based on when they should be tried
71 my $sto = Mgd
::get_store
();
73 my $fid = $todo->{fid
};
79 $opts{errref
} = \
$errcode;
80 $opts{no_unlock
} = 1; # to make it return an $unlock subref
81 $opts{source_devid
} = $todo->{fromdevid
} if $todo->{fromdevid
};
83 my ($status, $unlock) = replicate
($fid, %opts);
86 # $status is either 0 (failure, handled below), 1 (success, we actually
87 # replicated this file), or 2 (success, but someone else replicated it).
89 # when $staus eq "lost_race", this delete is unnecessary normally
90 # (somebody else presumably already deleted it if they
91 # also replicated it), but in the case of running with old
92 # replicators from previous versions, -or- simply if the
93 # other guy's delete failed, this cleans it up....
94 $sto->delete_fid_from_file_to_replicate($fid);
95 $unlock->() if $unlock;
99 debug
("Replication of fid=$fid failed with errcode=$errcode") if $Mgd::DEBUG
>= 2;
103 # README: please keep this up to date if you update the replicate() function so we ensure
104 # that this code always does the right thing
107 # failed_getting_lock => harmless. skip. somebody else probably doing.
110 # too_happy => too many copies, attempt to rebalance.
112 # -- TEMPORARY; DO EXPONENTIAL BACKOFF --
113 # source_down => only source available is observed down.
114 # policy_error_doing_failed => policy plugin fucked up. it's looping.
115 # policy_error_already_there => policy plugin fucked up. it's dumb.
116 # policy_no_suggestions => no copy was attempted. policy is just not happy.
117 # copy_error => policy said to do 1+ things, we failed, it ran out of suggestions.
119 # -- FATAL; DON'T TRY AGAIN --
120 # no_source => it simply exists nowhere. not that something's down, but file_on is empty.
122 # bail if we failed getting the lock, that means someone else probably
123 # already did it, so we should just move on
124 if ($errcode eq 'failed_getting_lock') {
125 $unlock->() if $unlock;
129 # logic for setting the next try time appropriately
130 my $update_nexttry = sub {
131 my ($type, $delay) = @_;
132 my $sto = Mgd
::get_store
();
133 if ($type eq 'end_of_time') {
134 # special; update to a time that won't happen again,
135 # as we've encountered a scenario in which case we're
137 $sto->reschedule_file_to_replicate_absolute($fid, $sto->end_of_time);
138 } elsif ($type eq "offset") {
139 $sto->reschedule_file_to_replicate_relative($fid, $delay+0);
141 $sto->reschedule_file_to_replicate_absolute($fid, $delay+0);
145 # now let's handle any error we want to consider a total failure; do not
146 # retry at any point. push this file off to the end so someone has to come
147 # along and figure out what went wrong.
148 if ($errcode eq 'no_source') {
149 $update_nexttry->( end_of_time
=> 1 );
150 $unlock->() if $unlock;
154 # try to shake off extra copies. fall through to the backoff logic
155 # so we don't flood if it's impossible to properly weaken the fid.
156 # there's a race where the fid could be checked again, but the
157 # exclusive locking prevents replication clobbering.
158 if ($errcode eq 'too_happy') {
159 $unlock->() if $unlock;
161 my $f = MogileFS
::FID
->new($fid);
162 my @devs = List
::Util
::shuffle
($f->devids);
164 # First one we can delete from, we try to rebalance away from.
166 my $dev = Mgd
::device_factory
()->get_by_id($_);
167 # Not positive 'should_read_from' needs to be here.
168 # We must be able to delete off of this dev so the fid can
170 if ($dev->can_delete_from && $dev->should_read_from) {
171 $devfid = MogileFS
::DevFID
->new($dev, $f);
175 $self->rebalance_devfid($devfid) if $devfid;
178 # at this point, the rest of the errors require exponential backoff. define what this means
179 # as far as failcount -> delay to next try.
180 # 15s, 1m, 5m, 30m, 1h, 2h, 4h, 8h, 24h, 24h, 24h, 24h, ...
181 my @backoff = qw( 15 60 300 1800 3600 7200 14400 28800 );
182 $update_nexttry->( offset
=> int(($backoff[$todo->{failcount
}] || 86400) * (rand(0.4) + 0.8)) );
183 $unlock->() if $unlock;
187 # Return 1 on success, 0 on failure.
188 sub rebalance_devfid
{
189 my ($self, $devfid, $opts) = @_;
191 MogileFS
::Util
::okay_args
($opts, qw(avoid_devids target_devids));
193 my $fid = $devfid->fid;
195 # bail out early if this FID is no longer in the namespace (weird
196 # case where file is in file_on because not yet deleted, but
197 # has been replaced/deleted in 'file' table...). not too harmful
198 # (just noisy) if this line didn't exist, but whatever... it
199 # makes stuff cleaner on my intentionally-corrupted-for-fsck-testing
201 return 1 if ! $fid->exists;
204 my ($ret, $unlock) = replicate
($fid,
205 mask_devids
=> { $devfid->devid => 1 },
207 target_devids
=> $opts->{target_devids
},
214 error
("Rebalance for $devfid (" . $devfid->url . ") failed: $error");
218 unless ($ret || $errcode eq "too_happy") {
219 return $fail->("Replication failed");
222 my $should_delete = 0;
225 if ($errcode eq "too_happy" || $ret eq "lost_race") {
226 # for some reason, we did no work. that could be because
227 # either 1) we lost the race, as the error code implies,
228 # and some other process rebalanced this first, or 2)
229 # the file is over-replicated, and everybody just thinks they
230 # lost the race because the replication policy said there's
231 # nothing to do, even with this devfid masked away.
232 # so let's figure it out... if this devfid still exists,
233 # we're over-replicated, else we just lost the race.
234 if ($devfid->exists) {
237 # see if some copy, besides this one we want
238 # to delete, is currently alive & of right size..
239 # just as extra paranoid check before we delete it
240 foreach my $test_df ($fid->devfids) {
241 next if $test_df->devid == $devfid->devid;
242 if ($test_df->size_matches) {
244 $del_reason = "over_replicated";
250 $should_delete = 0; # no-op
252 } elsif ($ret eq "would_worsen") {
253 # replication has indicated we would be making ruining this fid's day
254 # if we delete an existing copy, so lets not do that.
255 # this indicates a condition where there're no suitable devices to
256 # copy new data onto, so lets be loud about it.
257 return $fail->("no suitable destination devices available");
260 $del_reason = "did_rebalance;ret=$ret";
265 $destroy_opts{ignore_missing
} = 1
266 if MogileFS
::Config
->config("rebalance_ignore_missing");
268 if ($should_delete) {
269 eval { $devfid->destroy(%destroy_opts) };
271 return $fail->("HTTP delete (due to '$del_reason') failed: $@");
279 # replicates $fid to make sure it meets its class' replicate policy.
281 # README: if you update this sub to return a new error code, please update the
282 # appropriate callers to know how to deal with the errors returned.
286 # ($rv, $unlock_sub) -- when 'no_unlock' %opt is used. subref to release lock.
288 # 0 = failure (failure written to ${$opts{errref}})
290 # "lost_race" = skipping, we did no work and policy was already met.
291 # "nofid" => fid no longer exists. skip replication.
293 my ($fid, %opts) = @_;
294 $fid = MogileFS
::FID
->new($fid) unless ref $fid;
295 my $fidid = $fid->id;
297 debug
("Replication for $fidid called, opts=".join(',',keys(%opts))) if $Mgd::DEBUG
>= 2;
299 my $errref = delete $opts{'errref'};
300 my $no_unlock = delete $opts{'no_unlock'};
301 my $fixed_source = delete $opts{'source_devid'};
302 my $mask_devids = delete $opts{'mask_devids'} || {};
303 my $avoid_devids = delete $opts{'avoid_devids'} || {};
304 my $target_devids = delete $opts{'target_devids'} || []; # inverse of avoid_devids.
305 die "unknown_opts" if %opts;
306 die unless ref $mask_devids eq "HASH";
310 my $sto = Mgd
::get_store
();
312 $sto->note_done_replicating($fidid);
315 my $retunlock = sub {
317 my ($errmsg, $errcode);
319 ($errcode, $errmsg) = @_;
320 $errmsg = "$errcode: $errmsg"; # include code with message
324 $$errref = $errcode if $errref;
327 if ($errcode && $errcode eq "failed_getting_lock") {
328 # don't emit a warning with error() on lock failure. not
329 # a big deal, don't scare people.
332 $ret = $rv ?
$rv : error
($errmsg);
335 die "ERROR: must be called in list context w/ no_unlock" unless wantarray;
336 return ($ret, $unlock);
338 die "ERROR: must not be called in list context w/o no_unlock" if wantarray;
344 # hashref of devid -> MogileFS::Device
345 my $devs = Mgd
::device_factory
()->map_by_id
346 or die "No device map";
348 return $retunlock->(0, "failed_getting_lock", "Unable to obtain lock for fid $fidid")
349 unless $sto->should_begin_replicating_fidid($fidid);
351 # if the fid doesn't even exist, consider our job done! no point
352 # replicating file contents of a file no longer in the namespace.
353 return $retunlock->("nofid") unless $fid->exists;
355 my $cls = $fid->class;
356 my $polobj = $cls->repl_policy_obj;
358 # learn what this devices file is already on
359 my @on_devs; # all devices fid is on, reachable or not.
360 my @on_devs_tellpol; # subset of @on_devs, to tell the policy class about
361 my @on_up_devid; # subset of @on_devs: just devs that are readable
363 foreach my $devid ($fid->devids) {
364 my $d = Mgd
::device_factory
()->get_by_id($devid)
367 if ($d->dstate->should_have_files && ! $mask_devids->{$devid}) {
368 push @on_devs_tellpol, $d;
370 if ($d->should_read_from) {
371 push @on_up_devid, $devid;
375 return $retunlock->(0, "no_source", "Source is no longer available replicating $fidid") if @on_devs == 0;
376 return $retunlock->(0, "source_down", "No alive devices available replicating $fidid") if @on_up_devid == 0;
378 if ($fixed_source && ! grep { $_ == $fixed_source } @on_up_devid) {
379 error
("Fixed source dev$fixed_source requested for $fidid but not available. Trying other devices");
382 my %dest_failed; # devid -> 1 for each devid we were asked to copy to, but failed.
383 my %source_failed; # devid -> 1 for each devid we had problems reading from.
384 my $got_copy_request = 0; # true once replication policy asks us to move something somewhere
387 my $dest_devs = $devs;
388 if (@
$target_devids) {
389 $dest_devs = {map { $_ => $devs->{$_} } @
$target_devids};
392 my $rr; # MogileFS::ReplicationRequest
394 $rr = rr_upgrade
($polobj->replicate_to(
396 on_devs
=> \
@on_devs_tellpol, # all device objects fid is on, dead or otherwise
397 all_devs
=> $dest_devs,
398 failed
=> \
%dest_failed,
399 min
=> $cls->mindevcount,
402 last if $rr->is_happy;
404 my @ddevs; # dest devs, in order of preference
405 my $ddevid; # dest devid we've chosen to copy to
406 if (@ddevs = $rr->copy_to_one_of_ideally) {
407 if (my @not_masked_ids = (grep { ! $mask_devids->{$_} &&
408 ! $avoid_devids->{$_}
410 map { $_->id } @ddevs)) {
411 $ddevid = $not_masked_ids[0];
413 # once we masked devids away, there were no
414 # ideal suggestions. this is the case of rebalancing,
415 # which without this check could 'worsen' the state
416 # of the world. consider the case:
417 # h1[ d1 d2 ] h2[ d3 ]
418 # and files are on d1 & d3, an ideal layout.
419 # if d3 is being rebalanced, and masked away, the
420 # replication policy could presumably say to put
421 # the file on d2, even though d3 isn't dead.
422 # so instead, when masking is in effect, we don't
423 # use non-ideal placement, just bailing out.
425 # this used to return "lost_race" as a lie, but rebalance was
426 # happily deleting the masked fid if at least one other fid
427 # existed... because it assumed it was over replicated.
428 # now we tell rebalance that touching this fid would be
430 return $retunlock->("would_worsen");
432 } elsif (@ddevs = $rr->copy_to_one_of_desperate) {
433 # TODO: reschedule a replication for 'n' minutes in future, or
434 # when new hosts/devices become available or change state
435 $ddevid = $ddevs[0]->id;
440 $got_copy_request = 1;
442 # replication policy shouldn't tell us to put a file on a device
443 # we've already told it that we've failed at. so if we get that response,
444 # the policy plugin is broken and we should terminate now.
445 if ($dest_failed{$ddevid}) {
446 return $retunlock->(0, "policy_error_doing_failed",
447 "replication policy told us to do something we already told it we failed at while replicating fid $fidid");
450 # replication policy shouldn't tell us to put a file on a
451 # device that it's already on. that's just stupid.
452 if (grep { $_->id == $ddevid } @on_devs) {
453 return $retunlock->(0, "policy_error_already_there",
454 "replication policy told us to put fid $fidid on dev $ddevid, but it's already there!");
457 # find where we're replicating from
459 # TODO: use an observed good device+host as source to start.
460 my @choices = grep { ! $source_failed{$_} } @on_up_devid;
461 return $retunlock->(0, "source_down", "No devices available replicating $fidid") unless @choices;
462 if ($fixed_source && grep { $_ == $fixed_source } @choices) {
463 $sdevid = $fixed_source;
465 @choices = List
::Util
::shuffle
(@choices);
466 MogileFS
::run_global_hook
('replicate_order_final_choices', $devs, \
@choices);
467 $sdevid = shift @choices;
471 my $worker = MogileFS
::ProcManager
->is_child or die;
473 my $fid_checksum = $fid->checksum;
474 $digest = Digest
->new($fid_checksum->hashname) if $fid_checksum;
475 $digest ||= Digest
->new($cls->hashname) if $cls->hashtype;
481 errref
=> \
$copy_err,
482 callback
=> sub { $worker->still_alive; },
485 die "Bogus error code: $copy_err" if !$rv && $copy_err !~ /^(?:src|dest)_error$/;
488 error
("Failed copying fid $fidid from devid $sdevid to devid $ddevid (error type: $copy_err)");
489 if ($copy_err eq "src_error") {
490 $source_failed{$sdevid} = 1;
492 if ($fixed_source && $fixed_source == $sdevid) {
493 error
("Fixed source dev$fixed_source was requested for $fidid but failed: will try other sources");
497 $dest_failed{$ddevid} = 1;
502 my $dfid = MogileFS
::DevFID
->new($ddevid, $fid);
504 if ($digest && !$fid->checksum) {
505 $sto->set_checksum($fidid, $cls->hashtype, $digest->digest);
508 push @on_devs, $devs->{$ddevid};
509 push @on_devs_tellpol, $devs->{$ddevid};
510 push @on_up_devid, $ddevid;
513 # We are over replicated. Let caller decide if it should rebalance.
514 if ($rr->too_happy) {
515 return $retunlock->(0, "too_happy", "fid $fidid is on too many devices");
519 return $retunlock->(1) if $got_copy_request;
520 return $retunlock->("lost_race"); # some other process got to it first. policy was happy immediately.
523 return $retunlock->(0, "policy_no_suggestions",
524 "replication policy ran out of suggestions for us replicating fid $fidid");
527 # copies a file from one Perlbal to another utilizing HTTP
530 my ($sdevid, $ddevid, $fid, $intercopy_cb, $errref, $digest) =
531 map { delete $opts{$_} } qw(sdevid
540 $fid = MogileFS
::FID
->new($fid) unless ref($fid);
541 my $fidid = $fid->id;
542 my $expected_clen = $fid->length;
543 my $content_md5 = '';
544 my $fid_checksum = $fid->checksum;
545 if ($fid_checksum && $fid_checksum->hashname eq "MD5") {
546 # some HTTP servers may be able to verify Content-MD5 on PUT
547 # and reject corrupted requests. no HTTP server should reject
548 # a request for an unrecognized header
549 my $b64digest = encode_base64
($fid_checksum->{checksum
}, "");
550 $content_md5 = "\r\nContent-MD5: $b64digest";
553 $intercopy_cb ||= sub {};
555 # handles setting unreachable magic; $error->(reachability, "message")
556 my $error_unreachable = sub {
557 $$errref = "src_error" if $errref;
558 return error
("Fid $fidid unreachable while replicating: $_[0]");
561 my $dest_error = sub {
562 $$errref = "dest_error" if $errref;
567 my $src_error = sub {
568 $$errref = "src_error" if $errref;
573 # get some information we'll need
574 my $sdev = Mgd
::device_factory
()->get_by_id($sdevid);
575 my $ddev = Mgd
::device_factory
()->get_by_id($ddevid);
577 return error
("Error: unable to get device information: source=$sdevid, destination=$ddevid, fid=$fidid")
578 unless $sdev && $ddev;
580 my $s_dfid = MogileFS
::DevFID
->new($sdev, $fid);
581 my $d_dfid = MogileFS
::DevFID
->new($ddev, $fid);
583 my ($spath, $dpath) = (map { $_->uri_path } ($s_dfid, $d_dfid));
584 my ($shost, $dhost) = (map { $_->host } ($sdev, $ddev));
586 my ($shostip, $sport) = ($shost->ip, $shost->http_port);
587 if (MogileFS
::Config
->config("repl_use_get_port")) {
588 $sport = $shost->http_get_port;
590 my ($dhostip, $dport) = ($dhost->ip, $dhost->http_port);
591 unless (defined $spath && defined $dpath && defined $shostip && defined $dhostip && $sport && $dport) {
592 # show detailed information to find out what's not configured right
593 error
("Error: unable to replicate file fid=$fidid from device id $sdevid to device id $ddevid");
594 error
(" http://$shostip:$sport$spath -> http://$dhostip:$dport$dpath");
598 # need by webdav servers, like lighttpd...
599 $ddev->vivify_directories($d_dfid->url);
601 # setup our pipe error handler, in case we get closed on
603 local $SIG{PIPE
} = sub { $pipe_closed = 1; };
605 # call a hook for odd casing completely different source data
606 # for specific files.
608 MogileFS
::run_global_hook
('replicate_alternate_source',
609 $fid, \
$shostip, \
$sport, \
$spath, \
$shttphost);
611 # okay, now get the file
612 my $sock = IO
::Socket
::INET
->new(PeerAddr
=> $shostip, PeerPort
=> $sport, Timeout
=> 2)
613 or return $src_error->("Unable to create source socket to $shostip:$sport for $spath");
614 unless ($shttphost) {
615 $sock->write("GET $spath HTTP/1.0\r\n\r\n");
617 # plugin set a custom host.
618 $sock->write("GET $spath HTTP/1.0\r\nHost: $shttphost\r\n\r\n");
620 return error
("Pipe closed retrieving $spath from $shostip:$sport")
623 # we just want a content length
625 # FIXME: this can block. needs to timeout.
626 while (defined (my $line = <$sock>)) {
627 $line =~ s/[\s\r\n]+$//;
628 last unless length $line;
629 if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
630 # make sure we get a good response
631 return $error_unreachable->("Error: Resource http://$shostip:$sport$spath failed: HTTP $1")
632 unless $1 >= 200 && $1 <= 299;
634 next unless $line =~ /^Content-length:\s*(\d+)\s*$/i;
637 return $error_unreachable->("File $spath has unexpected content-length of $clen, not $expected_clen")
638 if $clen != $expected_clen;
640 # open target for put
641 my $dsock = IO
::Socket
::INET
->new(PeerAddr
=> $dhostip, PeerPort
=> $dport, Timeout
=> 2)
642 or return $dest_error->("Unable to create dest socket to $dhostip:$dport for $dpath");
643 $dsock->write("PUT $dpath HTTP/1.0\r\nContent-length: $clen$content_md5\r\n\r\n")
644 or return $dest_error->("Unable to write data to $dpath on $dhostip:$dport");
645 return $dest_error->("Pipe closed during write to $dpath on $dhostip:$dport")
648 # now read data and print while we're reading.
649 my ($data, $written, $remain) = ('', 0, $clen);
650 my $bytes_to_read = 1024*1024; # read 1MB at a time until there's less than that remaining
651 $bytes_to_read = $remain if $remain < $bytes_to_read;
652 my $finished_read = 0;
654 if ($bytes_to_read) {
655 while (!$pipe_closed && (my $bytes = $sock->read($data, $bytes_to_read))) {
656 # now we've read in $bytes bytes
658 $bytes_to_read = $remain if $remain < $bytes_to_read;
659 $digest->add($data) if $digest;
661 my $data_len = $bytes;
664 my $wbytes = syswrite($dsock, $data, $data_len, $data_off);
665 unless (defined $wbytes) {
666 return $dest_error->("Error: syswrite failed after $written bytes with: $!; failed putting to $dpath");
670 last if ($data_len == $wbytes);
672 $data_len -= $wbytes;
673 $data_off += $wbytes;
676 die if $bytes_to_read < 0;
677 next if $bytes_to_read;
685 return $dest_error->("closed pipe writing to destination") if $pipe_closed;
686 return $src_error->("error reading midway through source: $!") unless $finished_read;
688 # callee will want this digest, too, so clone as "digest" is destructive
689 $digest = $digest->clone->digest if $digest;
692 if ($digest ne $fid_checksum->{checksum
}) {
693 my $expect = $fid_checksum->hexdigest;
694 $digest = unpack("H*", $digest);
695 return $src_error->("checksum mismatch on GET: expected: $expect actual: $digest");
699 # now read in the response line (should be first line)
701 if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
702 if ($1 >= 200 && $1 <= 299) {
704 my $alg = ($fid_checksum && $fid_checksum->hashname) || $fid->class->hashname;
706 if ($ddev->{reject_bad_md5
} && ($alg eq "MD5")) {
707 # dest device would've rejected us with a error,
708 # no need to reread the file
711 my $durl = "http://$dhostip:$dport$dpath";
712 my $httpfile = MogileFS
::HTTPFile
->at($durl);
713 my $actual = $httpfile->digest($alg, $intercopy_cb);
714 if ($actual ne $digest) {
715 my $expect = unpack("H*", $digest);
716 $actual = unpack("H*", $actual);
717 return $dest_error->("checksum mismatch on PUT, expected: $expect actual: $digest");
722 return $dest_error->("Got HTTP status code $1 PUTing to http://$dhostip:$dport$dpath");
724 return $dest_error->("Error: HTTP response line not recognized writing to http://$dhostip:$dport$dpath: $line");
733 # indent-tabs-mode: nil
740 MogileFS::Worker::Replicate -- replicates files
744 This process replicates files enqueued in B<file_to_replicate> table.
746 The replication policy (which devices to replicate to) is pluggable,
747 but only one policy comes with the server. See
748 L<MogileFS::ReplicationPolicy::MultipleHosts>
754 L<MogileFS::ReplicationPolicy>
756 L<MogileFS::ReplicationPolicy::MultipleHosts>