replicate: enforce expected Content-Length in http_copy
[MogileFS-Server.git] / lib / MogileFS / Worker / Replicate.pm
blobbd5c406061787168fc8bcd5c1666eeedf62a2f69
1 package MogileFS::Worker::Replicate;
2 # replicates files around
4 use strict;
5 use base 'MogileFS::Worker';
6 use fields (
7 'fidtodo', # hashref { fid => 1 }
8 );
10 use List::Util ();
11 use MogileFS::Server;
12 use MogileFS::Util qw(error every debug);
13 use MogileFS::Config;
14 use MogileFS::ReplicationRequest qw(rr_upgrade);
15 use Digest;
16 use MIME::Base64 qw(encode_base64);
18 sub new {
19 my ($class, $psock) = @_;
20 my $self = fields::new($class);
21 $self->SUPER::new($psock);
22 $self->{fidtodo} = {};
23 return $self;
26 # replicator wants
27 sub watchdog_timeout { 90; }
29 sub work {
30 my $self = shift;
32 every(1.0, sub {
33 $self->send_to_parent("worker_bored 100 replicate rebalance");
35 my $queue_todo = $self->queue_todo('replicate');
36 my $queue_todo2 = $self->queue_todo('rebalance');
37 return unless (@$queue_todo || @$queue_todo2);
39 return unless $self->validate_dbh;
40 my $sto = Mgd::get_store();
42 while (my $todo = shift @$queue_todo) {
43 my $fid = $todo->{fid};
44 $self->replicate_using_torepl_table($todo);
46 while (my $todo = shift @$queue_todo2) {
47 $self->still_alive;
48 # deserialize the arg :/
49 $todo->{arg} = [split /,/, $todo->{arg}];
50 my $devfid =
51 MogileFS::DevFID->new($todo->{devid}, $todo->{fid});
52 $self->rebalance_devfid($devfid,
53 { target_devids => $todo->{arg} });
55 # If files error out, we want to send the error up to syslog
56 # and make a real effort to chew through the queue. Users may
57 # manually re-run rebalance to retry.
58 $sto->delete_fid_from_file_to_queue($todo->{fid}, REBAL_QUEUE);
60 $_[0]->(0); # don't sleep.
61 });
64 # return 1 if we did something (or tried to do something), return 0 if
65 # there was nothing to be done.
66 sub replicate_using_torepl_table {
67 my $self = shift;
68 my $todo = shift;
70 # find some fids to replicate, prioritize based on when they should be tried
71 my $sto = Mgd::get_store();
73 my $fid = $todo->{fid};
74 $self->still_alive;
76 my $errcode;
78 my %opts;
79 $opts{errref} = \$errcode;
80 $opts{no_unlock} = 1; # to make it return an $unlock subref
81 $opts{source_devid} = $todo->{fromdevid} if $todo->{fromdevid};
83 my ($status, $unlock) = replicate($fid, %opts);
85 if ($status) {
86 # $status is either 0 (failure, handled below), 1 (success, we actually
87 # replicated this file), or 2 (success, but someone else replicated it).
89 # when $staus eq "lost_race", this delete is unnecessary normally
90 # (somebody else presumably already deleted it if they
91 # also replicated it), but in the case of running with old
92 # replicators from previous versions, -or- simply if the
93 # other guy's delete failed, this cleans it up....
94 $sto->delete_fid_from_file_to_replicate($fid);
95 $unlock->() if $unlock;
96 next;
99 debug("Replication of fid=$fid failed with errcode=$errcode") if $Mgd::DEBUG >= 2;
101 # ERROR CASES:
103 # README: please keep this up to date if you update the replicate() function so we ensure
104 # that this code always does the right thing
106 # -- HARMLESS --
107 # failed_getting_lock => harmless. skip. somebody else probably doing.
109 # -- ACTIONABLE --
110 # too_happy => too many copies, attempt to rebalance.
112 # -- TEMPORARY; DO EXPONENTIAL BACKOFF --
113 # source_down => only source available is observed down.
114 # policy_error_doing_failed => policy plugin fucked up. it's looping.
115 # policy_error_already_there => policy plugin fucked up. it's dumb.
116 # policy_no_suggestions => no copy was attempted. policy is just not happy.
117 # copy_error => policy said to do 1+ things, we failed, it ran out of suggestions.
119 # -- FATAL; DON'T TRY AGAIN --
120 # no_source => it simply exists nowhere. not that something's down, but file_on is empty.
122 # bail if we failed getting the lock, that means someone else probably
123 # already did it, so we should just move on
124 if ($errcode eq 'failed_getting_lock') {
125 $unlock->() if $unlock;
126 next;
129 # logic for setting the next try time appropriately
130 my $update_nexttry = sub {
131 my ($type, $delay) = @_;
132 my $sto = Mgd::get_store();
133 if ($type eq 'end_of_time') {
134 # special; update to a time that won't happen again,
135 # as we've encountered a scenario in which case we're
136 # really hosed
137 $sto->reschedule_file_to_replicate_absolute($fid, $sto->end_of_time);
138 } elsif ($type eq "offset") {
139 $sto->reschedule_file_to_replicate_relative($fid, $delay+0);
140 } else {
141 $sto->reschedule_file_to_replicate_absolute($fid, $delay+0);
145 # now let's handle any error we want to consider a total failure; do not
146 # retry at any point. push this file off to the end so someone has to come
147 # along and figure out what went wrong.
148 if ($errcode eq 'no_source') {
149 $update_nexttry->( end_of_time => 1 );
150 $unlock->() if $unlock;
151 next;
154 # try to shake off extra copies. fall through to the backoff logic
155 # so we don't flood if it's impossible to properly weaken the fid.
156 # there's a race where the fid could be checked again, but the
157 # exclusive locking prevents replication clobbering.
158 if ($errcode eq 'too_happy') {
159 $unlock->() if $unlock;
160 $unlock = undef;
161 my $f = MogileFS::FID->new($fid);
162 my @devs = List::Util::shuffle($f->devids);
163 my $devfid;
164 # First one we can delete from, we try to rebalance away from.
165 for (@devs) {
166 my $dev = Mgd::device_factory()->get_by_id($_);
167 # Not positive 'should_read_from' needs to be here.
168 # We must be able to delete off of this dev so the fid can
169 # move.
170 if ($dev->can_delete_from && $dev->should_read_from) {
171 $devfid = MogileFS::DevFID->new($dev, $f);
172 last;
175 $self->rebalance_devfid($devfid) if $devfid;
178 # at this point, the rest of the errors require exponential backoff. define what this means
179 # as far as failcount -> delay to next try.
180 # 15s, 1m, 5m, 30m, 1h, 2h, 4h, 8h, 24h, 24h, 24h, 24h, ...
181 my @backoff = qw( 15 60 300 1800 3600 7200 14400 28800 );
182 $update_nexttry->( offset => int(($backoff[$todo->{failcount}] || 86400) * (rand(0.4) + 0.8)) );
183 $unlock->() if $unlock;
184 return 1;
187 # Return 1 on success, 0 on failure.
188 sub rebalance_devfid {
189 my ($self, $devfid, $opts) = @_;
190 $opts ||= {};
191 MogileFS::Util::okay_args($opts, qw(avoid_devids target_devids));
193 my $fid = $devfid->fid;
195 # bail out early if this FID is no longer in the namespace (weird
196 # case where file is in file_on because not yet deleted, but
197 # has been replaced/deleted in 'file' table...). not too harmful
198 # (just noisy) if this line didn't exist, but whatever... it
199 # makes stuff cleaner on my intentionally-corrupted-for-fsck-testing
200 # dev machine...
201 return 1 if ! $fid->exists;
203 my $errcode;
204 my ($ret, $unlock) = replicate($fid,
205 mask_devids => { $devfid->devid => 1 },
206 no_unlock => 1,
207 target_devids => $opts->{target_devids},
208 errref => \$errcode,
211 my $fail = sub {
212 my $error = shift;
213 $unlock->();
214 error("Rebalance for $devfid (" . $devfid->url . ") failed: $error");
215 return 0;
218 unless ($ret || $errcode eq "too_happy") {
219 return $fail->("Replication failed");
222 my $should_delete = 0;
223 my $del_reason;
225 if ($errcode eq "too_happy" || $ret eq "lost_race") {
226 # for some reason, we did no work. that could be because
227 # either 1) we lost the race, as the error code implies,
228 # and some other process rebalanced this first, or 2)
229 # the file is over-replicated, and everybody just thinks they
230 # lost the race because the replication policy said there's
231 # nothing to do, even with this devfid masked away.
232 # so let's figure it out... if this devfid still exists,
233 # we're over-replicated, else we just lost the race.
234 if ($devfid->exists) {
235 # over-replicated
237 # see if some copy, besides this one we want
238 # to delete, is currently alive & of right size..
239 # just as extra paranoid check before we delete it
240 foreach my $test_df ($fid->devfids) {
241 next if $test_df->devid == $devfid->devid;
242 if ($test_df->size_matches) {
243 $should_delete = 1;
244 $del_reason = "over_replicated";
245 last;
248 } else {
249 # lost race
250 $should_delete = 0; # no-op
252 } elsif ($ret eq "would_worsen") {
253 # replication has indicated we would be making ruining this fid's day
254 # if we delete an existing copy, so lets not do that.
255 # this indicates a condition where there're no suitable devices to
256 # copy new data onto, so lets be loud about it.
257 return $fail->("no suitable destination devices available");
258 } else {
259 $should_delete = 1;
260 $del_reason = "did_rebalance;ret=$ret";
263 my %destroy_opts;
265 $destroy_opts{ignore_missing} = 1
266 if MogileFS::Config->config("rebalance_ignore_missing");
268 if ($should_delete) {
269 eval { $devfid->destroy(%destroy_opts) };
270 if ($@) {
271 return $fail->("HTTP delete (due to '$del_reason') failed: $@");
275 $unlock->();
276 return 1;
279 # replicates $fid to make sure it meets its class' replicate policy.
281 # README: if you update this sub to return a new error code, please update the
282 # appropriate callers to know how to deal with the errors returned.
284 # returns either:
285 # $rv
286 # ($rv, $unlock_sub) -- when 'no_unlock' %opt is used. subref to release lock.
287 # $rv is one of:
288 # 0 = failure (failure written to ${$opts{errref}})
289 # 1 = success
290 # "lost_race" = skipping, we did no work and policy was already met.
291 # "nofid" => fid no longer exists. skip replication.
292 sub replicate {
293 my ($fid, %opts) = @_;
294 $fid = MogileFS::FID->new($fid) unless ref $fid;
295 my $fidid = $fid->id;
297 debug("Replication for $fidid called, opts=".join(',',keys(%opts))) if $Mgd::DEBUG >= 2;
299 my $errref = delete $opts{'errref'};
300 my $no_unlock = delete $opts{'no_unlock'};
301 my $fixed_source = delete $opts{'source_devid'};
302 my $mask_devids = delete $opts{'mask_devids'} || {};
303 my $avoid_devids = delete $opts{'avoid_devids'} || {};
304 my $target_devids = delete $opts{'target_devids'} || []; # inverse of avoid_devids.
305 die "unknown_opts" if %opts;
306 die unless ref $mask_devids eq "HASH";
308 my $sdevid;
310 my $sto = Mgd::get_store();
311 my $unlock = sub {
312 $sto->note_done_replicating($fidid);
315 my $retunlock = sub {
316 my $rv = shift;
317 my ($errmsg, $errcode);
318 if (@_ == 2) {
319 ($errcode, $errmsg) = @_;
320 $errmsg = "$errcode: $errmsg"; # include code with message
321 } else {
322 ($errmsg) = @_;
324 $$errref = $errcode if $errref;
326 my $ret;
327 if ($errcode && $errcode eq "failed_getting_lock") {
328 # don't emit a warning with error() on lock failure. not
329 # a big deal, don't scare people.
330 $ret = 0;
331 } else {
332 $ret = $rv ? $rv : error($errmsg);
334 if ($no_unlock) {
335 die "ERROR: must be called in list context w/ no_unlock" unless wantarray;
336 return ($ret, $unlock);
337 } else {
338 die "ERROR: must not be called in list context w/o no_unlock" if wantarray;
339 $unlock->();
340 return $ret;
344 # hashref of devid -> MogileFS::Device
345 my $devs = Mgd::device_factory()->map_by_id
346 or die "No device map";
348 return $retunlock->(0, "failed_getting_lock", "Unable to obtain lock for fid $fidid")
349 unless $sto->should_begin_replicating_fidid($fidid);
351 # if the fid doesn't even exist, consider our job done! no point
352 # replicating file contents of a file no longer in the namespace.
353 return $retunlock->("nofid") unless $fid->exists;
355 my $cls = $fid->class;
356 my $polobj = $cls->repl_policy_obj;
358 # learn what this devices file is already on
359 my @on_devs; # all devices fid is on, reachable or not.
360 my @on_devs_tellpol; # subset of @on_devs, to tell the policy class about
361 my @on_up_devid; # subset of @on_devs: just devs that are readable
363 foreach my $devid ($fid->devids) {
364 my $d = Mgd::device_factory()->get_by_id($devid)
365 or next;
366 push @on_devs, $d;
367 if ($d->dstate->should_have_files && ! $mask_devids->{$devid}) {
368 push @on_devs_tellpol, $d;
370 if ($d->should_read_from) {
371 push @on_up_devid, $devid;
375 return $retunlock->(0, "no_source", "Source is no longer available replicating $fidid") if @on_devs == 0;
376 return $retunlock->(0, "source_down", "No alive devices available replicating $fidid") if @on_up_devid == 0;
378 if ($fixed_source && ! grep { $_ == $fixed_source } @on_up_devid) {
379 error("Fixed source dev$fixed_source requested for $fidid but not available. Trying other devices");
382 my %dest_failed; # devid -> 1 for each devid we were asked to copy to, but failed.
383 my %source_failed; # devid -> 1 for each devid we had problems reading from.
384 my $got_copy_request = 0; # true once replication policy asks us to move something somewhere
385 my $copy_err;
387 my $dest_devs = $devs;
388 if (@$target_devids) {
389 $dest_devs = {map { $_ => $devs->{$_} } @$target_devids};
392 my $rr; # MogileFS::ReplicationRequest
393 while (1) {
394 $rr = rr_upgrade($polobj->replicate_to(
395 fid => $fidid,
396 on_devs => \@on_devs_tellpol, # all device objects fid is on, dead or otherwise
397 all_devs => $dest_devs,
398 failed => \%dest_failed,
399 min => $cls->mindevcount,
402 last if $rr->is_happy;
404 my @ddevs; # dest devs, in order of preference
405 my $ddevid; # dest devid we've chosen to copy to
406 if (@ddevs = $rr->copy_to_one_of_ideally) {
407 if (my @not_masked_ids = (grep { ! $mask_devids->{$_} &&
408 ! $avoid_devids->{$_}
410 map { $_->id } @ddevs)) {
411 $ddevid = $not_masked_ids[0];
412 } else {
413 # once we masked devids away, there were no
414 # ideal suggestions. this is the case of rebalancing,
415 # which without this check could 'worsen' the state
416 # of the world. consider the case:
417 # h1[ d1 d2 ] h2[ d3 ]
418 # and files are on d1 & d3, an ideal layout.
419 # if d3 is being rebalanced, and masked away, the
420 # replication policy could presumably say to put
421 # the file on d2, even though d3 isn't dead.
422 # so instead, when masking is in effect, we don't
423 # use non-ideal placement, just bailing out.
425 # this used to return "lost_race" as a lie, but rebalance was
426 # happily deleting the masked fid if at least one other fid
427 # existed... because it assumed it was over replicated.
428 # now we tell rebalance that touching this fid would be
429 # stupid.
430 return $retunlock->("would_worsen");
432 } elsif (@ddevs = $rr->copy_to_one_of_desperate) {
433 # TODO: reschedule a replication for 'n' minutes in future, or
434 # when new hosts/devices become available or change state
435 $ddevid = $ddevs[0]->id;
436 } else {
437 last;
440 $got_copy_request = 1;
442 # replication policy shouldn't tell us to put a file on a device
443 # we've already told it that we've failed at. so if we get that response,
444 # the policy plugin is broken and we should terminate now.
445 if ($dest_failed{$ddevid}) {
446 return $retunlock->(0, "policy_error_doing_failed",
447 "replication policy told us to do something we already told it we failed at while replicating fid $fidid");
450 # replication policy shouldn't tell us to put a file on a
451 # device that it's already on. that's just stupid.
452 if (grep { $_->id == $ddevid } @on_devs) {
453 return $retunlock->(0, "policy_error_already_there",
454 "replication policy told us to put fid $fidid on dev $ddevid, but it's already there!");
457 # find where we're replicating from
459 # TODO: use an observed good device+host as source to start.
460 my @choices = grep { ! $source_failed{$_} } @on_up_devid;
461 return $retunlock->(0, "source_down", "No devices available replicating $fidid") unless @choices;
462 if ($fixed_source && grep { $_ == $fixed_source } @choices) {
463 $sdevid = $fixed_source;
464 } else {
465 @choices = List::Util::shuffle(@choices);
466 MogileFS::run_global_hook('replicate_order_final_choices', $devs, \@choices);
467 $sdevid = shift @choices;
471 my $worker = MogileFS::ProcManager->is_child or die;
472 my $digest;
473 my $fid_checksum = $fid->checksum;
474 $digest = Digest->new($fid_checksum->hashname) if $fid_checksum;
475 $digest ||= Digest->new($cls->hashname) if $cls->hashtype;
477 my $rv = http_copy(
478 sdevid => $sdevid,
479 ddevid => $ddevid,
480 fid => $fid,
481 errref => \$copy_err,
482 callback => sub { $worker->still_alive; },
483 digest => $digest,
485 die "Bogus error code: $copy_err" if !$rv && $copy_err !~ /^(?:src|dest)_error$/;
487 unless ($rv) {
488 error("Failed copying fid $fidid from devid $sdevid to devid $ddevid (error type: $copy_err)");
489 if ($copy_err eq "src_error") {
490 $source_failed{$sdevid} = 1;
492 if ($fixed_source && $fixed_source == $sdevid) {
493 error("Fixed source dev$fixed_source was requested for $fidid but failed: will try other sources");
496 } else {
497 $dest_failed{$ddevid} = 1;
499 next;
502 my $dfid = MogileFS::DevFID->new($ddevid, $fid);
503 $dfid->add_to_db;
504 if ($digest && !$fid->checksum) {
505 $sto->set_checksum($fidid, $cls->hashtype, $digest->digest);
508 push @on_devs, $devs->{$ddevid};
509 push @on_devs_tellpol, $devs->{$ddevid};
510 push @on_up_devid, $ddevid;
513 # We are over replicated. Let caller decide if it should rebalance.
514 if ($rr->too_happy) {
515 return $retunlock->(0, "too_happy", "fid $fidid is on too many devices");
518 if ($rr->is_happy) {
519 return $retunlock->(1) if $got_copy_request;
520 return $retunlock->("lost_race"); # some other process got to it first. policy was happy immediately.
523 return $retunlock->(0, "policy_no_suggestions",
524 "replication policy ran out of suggestions for us replicating fid $fidid");
527 # copies a file from one Perlbal to another utilizing HTTP
528 sub http_copy {
529 my %opts = @_;
530 my ($sdevid, $ddevid, $fid, $intercopy_cb, $errref, $digest) =
531 map { delete $opts{$_} } qw(sdevid
532 ddevid
534 callback
535 errref
536 digest
538 die if %opts;
540 $fid = MogileFS::FID->new($fid) unless ref($fid);
541 my $fidid = $fid->id;
542 my $expected_clen = $fid->length;
543 my $content_md5 = '';
544 my $fid_checksum = $fid->checksum;
545 if ($fid_checksum && $fid_checksum->hashname eq "MD5") {
546 # some HTTP servers may be able to verify Content-MD5 on PUT
547 # and reject corrupted requests. no HTTP server should reject
548 # a request for an unrecognized header
549 my $b64digest = encode_base64($fid_checksum->{checksum}, "");
550 $content_md5 = "\r\nContent-MD5: $b64digest";
553 $intercopy_cb ||= sub {};
555 # handles setting unreachable magic; $error->(reachability, "message")
556 my $error_unreachable = sub {
557 $$errref = "src_error" if $errref;
558 return error("Fid $fidid unreachable while replicating: $_[0]");
561 my $dest_error = sub {
562 $$errref = "dest_error" if $errref;
563 error($_[0]);
564 return 0;
567 my $src_error = sub {
568 $$errref = "src_error" if $errref;
569 error($_[0]);
570 return 0;
573 # get some information we'll need
574 my $sdev = Mgd::device_factory()->get_by_id($sdevid);
575 my $ddev = Mgd::device_factory()->get_by_id($ddevid);
577 return error("Error: unable to get device information: source=$sdevid, destination=$ddevid, fid=$fidid")
578 unless $sdev && $ddev;
580 my $s_dfid = MogileFS::DevFID->new($sdev, $fid);
581 my $d_dfid = MogileFS::DevFID->new($ddev, $fid);
583 my ($spath, $dpath) = (map { $_->uri_path } ($s_dfid, $d_dfid));
584 my ($shost, $dhost) = (map { $_->host } ($sdev, $ddev));
586 my ($shostip, $sport) = ($shost->ip, $shost->http_port);
587 if (MogileFS::Config->config("repl_use_get_port")) {
588 $sport = $shost->http_get_port;
590 my ($dhostip, $dport) = ($dhost->ip, $dhost->http_port);
591 unless (defined $spath && defined $dpath && defined $shostip && defined $dhostip && $sport && $dport) {
592 # show detailed information to find out what's not configured right
593 error("Error: unable to replicate file fid=$fidid from device id $sdevid to device id $ddevid");
594 error(" http://$shostip:$sport$spath -> http://$dhostip:$dport$dpath");
595 return 0;
598 # need by webdav servers, like lighttpd...
599 $ddev->vivify_directories($d_dfid->url);
601 # setup our pipe error handler, in case we get closed on
602 my $pipe_closed = 0;
603 local $SIG{PIPE} = sub { $pipe_closed = 1; };
605 # call a hook for odd casing completely different source data
606 # for specific files.
607 my $shttphost;
608 MogileFS::run_global_hook('replicate_alternate_source',
609 $fid, \$shostip, \$sport, \$spath, \$shttphost);
611 # okay, now get the file
612 my $sock = IO::Socket::INET->new(PeerAddr => $shostip, PeerPort => $sport, Timeout => 2)
613 or return $src_error->("Unable to create source socket to $shostip:$sport for $spath");
614 unless ($shttphost) {
615 $sock->write("GET $spath HTTP/1.0\r\n\r\n");
616 } else {
617 # plugin set a custom host.
618 $sock->write("GET $spath HTTP/1.0\r\nHost: $shttphost\r\n\r\n");
620 return error("Pipe closed retrieving $spath from $shostip:$sport")
621 if $pipe_closed;
623 # we just want a content length
624 my $clen;
625 # FIXME: this can block. needs to timeout.
626 while (defined (my $line = <$sock>)) {
627 $line =~ s/[\s\r\n]+$//;
628 last unless length $line;
629 if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
630 # make sure we get a good response
631 return $error_unreachable->("Error: Resource http://$shostip:$sport$spath failed: HTTP $1")
632 unless $1 >= 200 && $1 <= 299;
634 next unless $line =~ /^Content-length:\s*(\d+)\s*$/i;
635 $clen = $1;
637 return $error_unreachable->("File $spath has unexpected content-length of $clen, not $expected_clen")
638 if $clen != $expected_clen;
640 # open target for put
641 my $dsock = IO::Socket::INET->new(PeerAddr => $dhostip, PeerPort => $dport, Timeout => 2)
642 or return $dest_error->("Unable to create dest socket to $dhostip:$dport for $dpath");
643 $dsock->write("PUT $dpath HTTP/1.0\r\nContent-length: $clen$content_md5\r\n\r\n")
644 or return $dest_error->("Unable to write data to $dpath on $dhostip:$dport");
645 return $dest_error->("Pipe closed during write to $dpath on $dhostip:$dport")
646 if $pipe_closed;
648 # now read data and print while we're reading.
649 my ($data, $written, $remain) = ('', 0, $clen);
650 my $bytes_to_read = 1024*1024; # read 1MB at a time until there's less than that remaining
651 $bytes_to_read = $remain if $remain < $bytes_to_read;
652 my $finished_read = 0;
654 if ($bytes_to_read) {
655 while (!$pipe_closed && (my $bytes = $sock->read($data, $bytes_to_read))) {
656 # now we've read in $bytes bytes
657 $remain -= $bytes;
658 $bytes_to_read = $remain if $remain < $bytes_to_read;
659 $digest->add($data) if $digest;
661 my $data_len = $bytes;
662 my $data_off = 0;
663 while (1) {
664 my $wbytes = syswrite($dsock, $data, $data_len, $data_off);
665 unless (defined $wbytes) {
666 return $dest_error->("Error: syswrite failed after $written bytes with: $!; failed putting to $dpath");
668 $written += $wbytes;
669 $intercopy_cb->();
670 last if ($data_len == $wbytes);
672 $data_len -= $wbytes;
673 $data_off += $wbytes;
676 die if $bytes_to_read < 0;
677 next if $bytes_to_read;
678 $finished_read = 1;
679 last;
681 } else {
682 # 0 byte file copy.
683 $finished_read = 1;
685 return $dest_error->("closed pipe writing to destination") if $pipe_closed;
686 return $src_error->("error reading midway through source: $!") unless $finished_read;
688 # callee will want this digest, too, so clone as "digest" is destructive
689 $digest = $digest->clone->digest if $digest;
691 if ($fid_checksum) {
692 if ($digest ne $fid_checksum->{checksum}) {
693 my $expect = $fid_checksum->hexdigest;
694 $digest = unpack("H*", $digest);
695 return $src_error->("checksum mismatch on GET: expected: $expect actual: $digest");
699 # now read in the response line (should be first line)
700 my $line = <$dsock>;
701 if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
702 if ($1 >= 200 && $1 <= 299) {
703 if ($digest) {
704 my $alg = ($fid_checksum && $fid_checksum->hashname) || $fid->class->hashname;
706 if ($ddev->{reject_bad_md5} && ($alg eq "MD5")) {
707 # dest device would've rejected us with a error,
708 # no need to reread the file
709 return 1;
711 my $durl = "http://$dhostip:$dport$dpath";
712 my $httpfile = MogileFS::HTTPFile->at($durl);
713 my $actual = $httpfile->digest($alg, $intercopy_cb);
714 if ($actual ne $digest) {
715 my $expect = unpack("H*", $digest);
716 $actual = unpack("H*", $actual);
717 return $dest_error->("checksum mismatch on PUT, expected: $expect actual: $digest");
720 return 1;
722 return $dest_error->("Got HTTP status code $1 PUTing to http://$dhostip:$dport$dpath");
723 } else {
724 return $dest_error->("Error: HTTP response line not recognized writing to http://$dhostip:$dport$dpath: $line");
730 # Local Variables:
731 # mode: perl
732 # c-basic-indent: 4
733 # indent-tabs-mode: nil
734 # End:
736 __END__
738 =head1 NAME
740 MogileFS::Worker::Replicate -- replicates files
742 =head1 OVERVIEW
744 This process replicates files enqueued in B<file_to_replicate> table.
746 The replication policy (which devices to replicate to) is pluggable,
747 but only one policy comes with the server. See
748 L<MogileFS::ReplicationPolicy::MultipleHosts>
750 =head1 SEE ALSO
752 L<MogileFS::Worker>
754 L<MogileFS::ReplicationPolicy>
756 L<MogileFS::ReplicationPolicy::MultipleHosts>