make over replicate/drain work again.
[MogileFS-Server.git] / lib / MogileFS / Worker / Replicate.pm
blobdb134f0112144eea5218a3bd3911d0299c0c9a51
1 package MogileFS::Worker::Replicate;
2 # replicates files around
4 use strict;
5 use base 'MogileFS::Worker';
6 use fields (
7 'fidtodo', # hashref { fid => 1 }
8 );
10 use List::Util ();
11 use MogileFS::Util qw(error every debug);
12 use MogileFS::Config;
13 use MogileFS::Class;
14 use MogileFS::ReplicationRequest qw(rr_upgrade);
16 # setup the value used in a 'nexttry' field to indicate that this item will never
17 # actually be tried again and require some sort of manual intervention.
18 use constant ENDOFTIME => 2147483647;
20 sub end_of_time { ENDOFTIME; }
22 sub new {
23 my ($class, $psock) = @_;
24 my $self = fields::new($class);
25 $self->SUPER::new($psock);
26 $self->{fidtodo} = {};
27 return $self;
30 # replicator wants
31 sub watchdog_timeout { 90; }
33 sub work {
34 my $self = shift;
36 # give the monitor job 15 seconds to give us an update
37 my $warn_after = time() + 15;
39 every(1.0, sub {
40 # replication doesn't go well if the monitor job hasn't actively started
41 # marking things as being available
42 unless ($self->monitor_has_run) {
43 error("waiting for monitor job to complete a cycle before beginning replication")
44 if time() > $warn_after;
45 return;
48 $self->validate_dbh;
49 my $dbh = $self->get_dbh or return 0;
50 my $sto = Mgd::get_store();
51 $self->send_to_parent("worker_bored 100 replicate rebalance");
53 my $queue_todo = $self->queue_todo('replicate');
54 my $queue_todo2 = $self->queue_todo('rebalance');
55 unless (@$queue_todo || @$queue_todo2) {
56 return;
59 while (my $todo = shift @$queue_todo) {
60 my $fid = $todo->{fid};
61 $self->replicate_using_torepl_table($todo);
63 while (my $todo = shift @$queue_todo2) {
64 $self->still_alive;
65 # deserialize the arg :/
66 $todo->{arg} = [split /,/, $todo->{arg}];
67 my $devfid =
68 MogileFS::DevFID->new($todo->{devid}, $todo->{fid});
69 $self->rebalance_devfid($devfid,
70 { target_devids => $todo->{arg} });
72 # If files error out, we want to send the error up to syslog
73 # and make a real effort to chew through the queue. Users may
74 # manually re-run rebalance to retry.
75 $sto->delete_fid_from_file_to_queue($todo->{fid}, REBAL_QUEUE);
77 $_[0]->(0); # don't sleep.
78 });
81 # return 1 if we did something (or tried to do something), return 0 if
82 # there was nothing to be done.
83 sub replicate_using_torepl_table {
84 my $self = shift;
85 my $todo = shift;
87 # find some fids to replicate, prioritize based on when they should be tried
88 my $sto = Mgd::get_store();
90 my $fid = $todo->{fid};
91 $self->still_alive;
93 my $errcode;
95 my %opts;
96 $opts{errref} = \$errcode;
97 $opts{no_unlock} = 1; # to make it return an $unlock subref
98 $opts{source_devid} = $todo->{fromdevid} if $todo->{fromdevid};
100 my ($status, $unlock) = replicate($fid, %opts);
102 if ($status) {
103 # $status is either 0 (failure, handled below), 1 (success, we actually
104 # replicated this file), or 2 (success, but someone else replicated it).
106 # when $staus eq "lost_race", this delete is unnecessary normally
107 # (somebody else presumably already deleted it if they
108 # also replicated it), but in the case of running with old
109 # replicators from previous versions, -or- simply if the
110 # other guy's delete failed, this cleans it up....
111 $sto->delete_fid_from_file_to_replicate($fid);
112 $unlock->() if $unlock;
113 next;
116 debug("Replication of fid=$fid failed with errcode=$errcode") if $Mgd::DEBUG >= 2;
118 # ERROR CASES:
120 # README: please keep this up to date if you update the replicate() function so we ensure
121 # that this code always does the right thing
123 # -- HARMLESS --
124 # failed_getting_lock => harmless. skip. somebody else probably doing.
126 # -- ACTIONABLE --
127 # too_happy => too many copies, attempt to rebalance.
129 # -- TEMPORARY; DO EXPONENTIAL BACKOFF --
130 # source_down => only source available is observed down.
131 # policy_error_doing_failed => policy plugin fucked up. it's looping.
132 # policy_error_already_there => policy plugin fucked up. it's dumb.
133 # policy_no_suggestions => no copy was attempted. policy is just not happy.
134 # copy_error => policy said to do 1+ things, we failed, it ran out of suggestions.
136 # -- FATAL; DON'T TRY AGAIN --
137 # no_source => it simply exists nowhere. not that something's down, but file_on is empty.
139 # bail if we failed getting the lock, that means someone else probably
140 # already did it, so we should just move on
141 if ($errcode eq 'failed_getting_lock') {
142 $unlock->() if $unlock;
143 next;
146 # logic for setting the next try time appropriately
147 my $update_nexttry = sub {
148 my ($type, $delay) = @_;
149 my $sto = Mgd::get_store();
150 if ($type eq 'end_of_time') {
151 # special; update to a time that won't happen again,
152 # as we've encountered a scenario in which case we're
153 # really hosed
154 $sto->reschedule_file_to_replicate_absolute($fid, ENDOFTIME);
155 } elsif ($type eq "offset") {
156 $sto->reschedule_file_to_replicate_relative($fid, $delay+0);
157 } else {
158 $sto->reschedule_file_to_replicate_absolute($fid, $delay+0);
162 # now let's handle any error we want to consider a total failure; do not
163 # retry at any point. push this file off to the end so someone has to come
164 # along and figure out what went wrong.
165 if ($errcode eq 'no_source') {
166 $update_nexttry->( end_of_time => 1 );
167 $unlock->() if $unlock;
168 next;
171 # try to shake off extra copies. fall through to the backoff logic
172 # so we don't flood if it's impossible to properly weaken the fid.
173 # there's a race where the fid could be checked again, but the
174 # exclusive locking prevents replication clobbering.
175 if ($errcode eq 'too_happy') {
176 $unlock->() if $unlock;
177 $unlock = undef;
178 my $f = MogileFS::FID->new($fid);
179 my @devs = List::Util::shuffle($f->devids);
180 my $devfid;
181 # First one we can delete from, we try to rebalance away from.
182 for (@devs) {
183 my $dev = MogileFS::Device->of_devid($_);
184 # Not positive 'can_read_from' needs to be here.
185 # We must be able to delete off of this dev so the fid can
186 # move.
187 if ($dev->can_delete_from && $dev->can_read_from) {
188 $devfid = MogileFS::DevFID->new($dev, $f);
189 last;
192 $self->rebalance_devfid($devfid) if $devfid;
195 # at this point, the rest of the errors require exponential backoff. define what this means
196 # as far as failcount -> delay to next try.
197 # 15s, 1m, 5m, 30m, 1h, 2h, 4h, 8h, 24h, 24h, 24h, 24h, ...
198 my @backoff = qw( 15 60 300 1800 3600 7200 14400 28800 );
199 $update_nexttry->( offset => int(($backoff[$todo->{failcount}] || 86400) * (rand(0.4) + 0.8)) );
200 $unlock->() if $unlock;
201 return 1;
204 # Return 1 on success, 0 on failure.
205 sub rebalance_devfid {
206 my ($self, $devfid, $opts) = @_;
207 $opts ||= {};
208 MogileFS::Util::okay_args($opts, qw(avoid_devids target_devids));
210 my $fid = $devfid->fid;
212 # bail out early if this FID is no longer in the namespace (weird
213 # case where file is in file_on because not yet deleted, but
214 # has been replaced/deleted in 'file' table...). not too harmful
215 # (just noisy) if this line didn't exist, but whatever... it
216 # makes stuff cleaner on my intentionally-corrupted-for-fsck-testing
217 # dev machine...
218 return 1 if ! $fid->exists;
220 my $errcode;
221 my ($ret, $unlock) = replicate($fid,
222 mask_devids => { $devfid->devid => 1 },
223 no_unlock => 1,
224 target_devids => $opts->{target_devids},
225 errref => \$errcode,
228 my $fail = sub {
229 my $error = shift;
230 $unlock->();
231 error("Rebalance for $devfid (" . $devfid->url . ") failed: $error");
232 return 0;
235 unless ($ret || $errcode eq "too_happy") {
236 return $fail->("Replication failed");
239 my $should_delete = 0;
240 my $del_reason;
242 if ($errcode eq "too_happy" || $ret eq "lost_race") {
243 # for some reason, we did no work. that could be because
244 # either 1) we lost the race, as the error code implies,
245 # and some other process rebalanced this first, or 2)
246 # the file is over-replicated, and everybody just thinks they
247 # lost the race because the replication policy said there's
248 # nothing to do, even with this devfid masked away.
249 # so let's figure it out... if this devfid still exists,
250 # we're over-replicated, else we just lost the race.
251 if ($devfid->exists) {
252 # over-replicated
254 # see if some copy, besides this one we want
255 # to delete, is currently alive & of right size..
256 # just as extra paranoid check before we delete it
257 foreach my $test_df ($fid->devfids) {
258 next if $test_df->devid == $devfid->devid;
259 if ($test_df->size_matches) {
260 $should_delete = 1;
261 $del_reason = "over_replicated";
262 last;
265 } else {
266 # lost race
267 $should_delete = 0; # no-op
269 } elsif ($ret eq "would_worsen") {
270 # replication has indicated we would be making ruining this fid's day
271 # if we delete an existing copy, so lets not do that.
272 # this indicates a condition where there're no suitable devices to
273 # copy new data onto, so lets be loud about it.
274 return $fail->("no suitable destination devices available");
275 } else {
276 $should_delete = 1;
277 $del_reason = "did_rebalance;ret=$ret";
280 my %destroy_opts;
282 $destroy_opts{ignore_missing} = 1
283 if MogileFS::Config->config("rebalance_ignore_missing");
285 if ($should_delete) {
286 eval { $devfid->destroy(%destroy_opts) };
287 if ($@) {
288 return $fail->("HTTP delete (due to '$del_reason') failed: $@");
292 $unlock->();
293 return 1;
296 # replicates $fid to make sure it meets its class' replicate policy.
298 # README: if you update this sub to return a new error code, please update the
299 # appropriate callers to know how to deal with the errors returned.
301 # returns either:
302 # $rv
303 # ($rv, $unlock_sub) -- when 'no_unlock' %opt is used. subref to release lock.
304 # $rv is one of:
305 # 0 = failure (failure written to ${$opts{errref}})
306 # 1 = success
307 # "lost_race" = skipping, we did no work and policy was already met.
308 # "nofid" => fid no longer exists. skip replication.
309 sub replicate {
310 my ($fid, %opts) = @_;
311 $fid = MogileFS::FID->new($fid) unless ref $fid;
312 my $fidid = $fid->id;
314 debug("Replication for $fidid called, opts=".join(',',keys(%opts))) if $Mgd::DEBUG >= 2;
316 my $errref = delete $opts{'errref'};
317 my $no_unlock = delete $opts{'no_unlock'};
318 my $sdevid = delete $opts{'source_devid'};
319 my $mask_devids = delete $opts{'mask_devids'} || {};
320 my $avoid_devids = delete $opts{'avoid_devids'} || {};
321 my $target_devids = delete $opts{'target_devids'} || []; # inverse of avoid_devids.
322 die "unknown_opts" if %opts;
323 die unless ref $mask_devids eq "HASH";
325 # bool: if source was explicitly requested by caller
326 my $fixed_source = $sdevid ? 1 : 0;
328 my $sto = Mgd::get_store();
329 my $unlock = sub {
330 $sto->note_done_replicating($fidid);
333 my $retunlock = sub {
334 my $rv = shift;
335 my ($errmsg, $errcode);
336 if (@_ == 2) {
337 ($errcode, $errmsg) = @_;
338 $errmsg = "$errcode: $errmsg"; # include code with message
339 } else {
340 ($errmsg) = @_;
342 $$errref = $errcode if $errref;
344 my $ret;
345 if ($errcode && $errcode eq "failed_getting_lock") {
346 # don't emit a warning with error() on lock failure. not
347 # a big deal, don't scare people.
348 $ret = 0;
349 } else {
350 $ret = $rv ? $rv : error($errmsg);
352 if ($no_unlock) {
353 die "ERROR: must be called in list context w/ no_unlock" unless wantarray;
354 return ($ret, $unlock);
355 } else {
356 die "ERROR: must not be called in list context w/o no_unlock" if wantarray;
357 $unlock->();
358 return $ret;
362 # hashref of devid -> MogileFS::Device
363 my $devs = MogileFS::Device->map
364 or die "No device map";
366 return $retunlock->(0, "failed_getting_lock", "Unable to obtain lock for fid $fidid")
367 unless $sto->should_begin_replicating_fidid($fidid);
369 # if the fid doesn't even exist, consider our job done! no point
370 # replicating file contents of a file no longer in the namespace.
371 return $retunlock->("nofid") unless $fid->exists;
373 my $cls = $fid->class;
374 my $polobj = $cls->repl_policy_obj;
376 # learn what this devices file is already on
377 my @on_devs; # all devices fid is on, reachable or not.
378 my @on_devs_tellpol; # subset of @on_devs, to tell the policy class about
379 my @on_up_devid; # subset of @on_devs: just devs that are readable
381 foreach my $devid ($fid->devids) {
382 my $d = MogileFS::Device->of_devid($devid)
383 or next;
384 push @on_devs, $d;
385 if ($d->dstate->should_have_files && ! $mask_devids->{$devid}) {
386 push @on_devs_tellpol, $d;
388 if ($d->dstate->can_read_from) {
389 push @on_up_devid, $devid;
393 return $retunlock->(0, "no_source", "Source is no longer available replicating $fidid") if @on_devs == 0;
394 return $retunlock->(0, "source_down", "No alive devices available replicating $fidid") if @on_up_devid == 0;
396 # if they requested a specific source, that source must be up.
397 if ($sdevid && ! grep { $_ == $sdevid} @on_up_devid) {
398 return $retunlock->(0, "source_down", "Requested replication source device $sdevid not available");
401 my %dest_failed; # devid -> 1 for each devid we were asked to copy to, but failed.
402 my %source_failed; # devid -> 1 for each devid we had problems reading from.
403 my $got_copy_request = 0; # true once replication policy asks us to move something somewhere
404 my $copy_err;
406 my $dest_devs = $devs;
407 if (@$target_devids) {
408 $dest_devs = {map { $_ => $devs->{$_} } @$target_devids};
411 my $rr; # MogileFS::ReplicationRequest
412 while (1) {
413 $rr = rr_upgrade($polobj->replicate_to(
414 fid => $fidid,
415 on_devs => \@on_devs_tellpol, # all device objects fid is on, dead or otherwise
416 all_devs => $dest_devs,
417 failed => \%dest_failed,
418 min => $cls->mindevcount,
421 last if $rr->is_happy;
423 my @ddevs; # dest devs, in order of preference
424 my $ddevid; # dest devid we've chosen to copy to
425 if (@ddevs = $rr->copy_to_one_of_ideally) {
426 if (my @not_masked_ids = (grep { ! $mask_devids->{$_} &&
427 ! $avoid_devids->{$_}
429 map { $_->id } @ddevs)) {
430 $ddevid = $not_masked_ids[0];
431 } else {
432 # once we masked devids away, there were no
433 # ideal suggestions. this is the case of rebalancing,
434 # which without this check could 'worsen' the state
435 # of the world. consider the case:
436 # h1[ d1 d2 ] h2[ d3 ]
437 # and files are on d1 & d3, an ideal layout.
438 # if d3 is being rebalanced, and masked away, the
439 # replication policy could presumably say to put
440 # the file on d2, even though d3 isn't dead.
441 # so instead, when masking is in effect, we don't
442 # use non-ideal placement, just bailing out.
444 # this used to return "lost_race" as a lie, but rebalance was
445 # happily deleting the masked fid if at least one other fid
446 # existed... because it assumed it was over replicated.
447 # now we tell rebalance that touching this fid would be
448 # stupid.
449 return $retunlock->("would_worsen");
451 } elsif (@ddevs = $rr->copy_to_one_of_desperate) {
452 # TODO: reschedule a replication for 'n' minutes in future, or
453 # when new hosts/devices become available or change state
454 $ddevid = $ddevs[0]->id;
455 } else {
456 last;
459 $got_copy_request = 1;
461 # replication policy shouldn't tell us to put a file on a device
462 # we've already told it that we've failed at. so if we get that response,
463 # the policy plugin is broken and we should terminate now.
464 if ($dest_failed{$ddevid}) {
465 return $retunlock->(0, "policy_error_doing_failed",
466 "replication policy told us to do something we already told it we failed at while replicating fid $fidid");
469 # replication policy shouldn't tell us to put a file on a
470 # device that it's already on. that's just stupid.
471 if (grep { $_->id == $ddevid } @on_devs) {
472 return $retunlock->(0, "policy_error_already_there",
473 "replication policy told us to put fid $fidid on dev $ddevid, but it's already there!");
476 # find where we're replicating from
477 unless ($fixed_source) {
478 # TODO: use an observed good device+host as source to start.
479 my @choices = grep { ! $source_failed{$_} } @on_up_devid;
480 return $retunlock->(0, "source_down", "No devices available replicating $fidid") unless @choices;
481 @choices = List::Util::shuffle(@choices);
482 MogileFS::run_global_hook('replicate_order_final_choices', $devs, \@choices);
483 $sdevid = shift @choices;
486 my $worker = MogileFS::ProcManager->is_child or die;
487 my $rv = http_copy(
488 sdevid => $sdevid,
489 ddevid => $ddevid,
490 fid => $fidid,
491 rfid => $fid,
492 expected_len => undef, # FIXME: get this info to pass along
493 errref => \$copy_err,
494 callback => sub { $worker->still_alive; },
496 die "Bogus error code: $copy_err" if !$rv && $copy_err !~ /^(?:src|dest)_error$/;
498 unless ($rv) {
499 error("Failed copying fid $fidid from devid $sdevid to devid $ddevid (error type: $copy_err)");
500 if ($copy_err eq "src_error") {
501 $source_failed{$sdevid} = 1;
503 if ($fixed_source) {
504 # there can't be any more retries, as this source
505 # is busted and is the only one we wanted.
506 return $retunlock->(0, "copy_error", "error copying fid $fidid from devid $sdevid during replication");
509 } else {
510 $dest_failed{$ddevid} = 1;
512 next;
515 my $dfid = MogileFS::DevFID->new($ddevid, $fid);
516 $dfid->add_to_db;
518 push @on_devs, $devs->{$ddevid};
519 push @on_devs_tellpol, $devs->{$ddevid};
520 push @on_up_devid, $ddevid;
523 # We are over replicated. Let caller decide if it should rebalance.
524 if ($rr->too_happy) {
525 return $retunlock->(0, "too_happy", "fid $fidid is on too many devices");
528 if ($rr->is_happy) {
529 return $retunlock->(1) if $got_copy_request;
530 return $retunlock->("lost_race"); # some other process got to it first. policy was happy immediately.
533 return $retunlock->(0, "policy_no_suggestions",
534 "replication policy ran out of suggestions for us replicating fid $fidid");
537 # copies a file from one Perlbal to another utilizing HTTP
538 sub http_copy {
539 my %opts = @_;
540 my ($sdevid, $ddevid, $fid, $rfid, $expected_clen, $intercopy_cb, $errref) =
541 map { delete $opts{$_} } qw(sdevid
542 ddevid
544 rfid
545 expected_len
546 callback
547 errref
549 die if %opts;
552 $intercopy_cb ||= sub {};
554 # handles setting unreachable magic; $error->(reachability, "message")
555 my $error_unreachable = sub {
556 $$errref = "src_error" if $errref;
557 return error("Fid $fid unreachable while replicating: $_[0]");
560 my $dest_error = sub {
561 $$errref = "dest_error" if $errref;
562 error($_[0]);
563 return 0;
566 my $src_error = sub {
567 $$errref = "src_error" if $errref;
568 error($_[0]);
569 return 0;
572 # get some information we'll need
573 my $sdev = MogileFS::Device->of_devid($sdevid);
574 my $ddev = MogileFS::Device->of_devid($ddevid);
576 return error("Error: unable to get device information: source=$sdevid, destination=$ddevid, fid=$fid")
577 unless $sdev && $ddev && $sdev->exists && $ddev->exists;
579 my $s_dfid = MogileFS::DevFID->new($sdev, $fid);
580 my $d_dfid = MogileFS::DevFID->new($ddev, $fid);
582 my ($spath, $dpath) = (map { $_->uri_path } ($s_dfid, $d_dfid));
583 my ($shost, $dhost) = (map { $_->host } ($sdev, $ddev));
585 my ($shostip, $sport) = ($shost->ip, $shost->http_port);
586 if (MogileFS::Config->config("repl_use_get_port")) {
587 $sport = $shost->http_get_port;
589 my ($dhostip, $dport) = ($dhost->ip, $dhost->http_port);
590 unless (defined $spath && defined $dpath && defined $shostip && defined $dhostip && $sport && $dport) {
591 # show detailed information to find out what's not configured right
592 error("Error: unable to replicate file fid=$fid from device id $sdevid to device id $ddevid");
593 error(" http://$shostip:$sport$spath -> http://$dhostip:$dport$dpath");
594 return 0;
597 # need by webdav servers, like lighttpd...
598 $ddev->vivify_directories($d_dfid->url);
600 # setup our pipe error handler, in case we get closed on
601 my $pipe_closed = 0;
602 local $SIG{PIPE} = sub { $pipe_closed = 1; };
604 # call a hook for odd casing completely different source data
605 # for specific files.
606 my $shttphost;
607 MogileFS::run_global_hook('replicate_alternate_source',
608 $rfid, \$shostip, \$sport, \$spath, \$shttphost);
610 # okay, now get the file
611 my $sock = IO::Socket::INET->new(PeerAddr => $shostip, PeerPort => $sport, Timeout => 2)
612 or return $src_error->("Unable to create source socket to $shostip:$sport for $spath");
613 unless ($shttphost) {
614 $sock->write("GET $spath HTTP/1.0\r\n\r\n");
615 } else {
616 # plugin set a custom host.
617 $sock->write("GET $spath HTTP/1.0\r\nHost: $shttphost\r\n\r\n");
619 return error("Pipe closed retrieving $spath from $shostip:$sport")
620 if $pipe_closed;
622 # we just want a content length
623 my $clen;
624 # FIXME: this can block. needs to timeout.
625 while (defined (my $line = <$sock>)) {
626 $line =~ s/[\s\r\n]+$//;
627 last unless length $line;
628 if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
629 # make sure we get a good response
630 return $error_unreachable->("Error: Resource http://$shostip:$sport$spath failed: HTTP $1")
631 unless $1 >= 200 && $1 <= 299;
633 next unless $line =~ /^Content-length:\s*(\d+)\s*$/i;
634 $clen = $1;
636 return $error_unreachable->("File $spath has unexpected content-length of $clen, not $expected_clen")
637 if defined $expected_clen && $clen != $expected_clen;
639 # open target for put
640 my $dsock = IO::Socket::INET->new(PeerAddr => $dhostip, PeerPort => $dport, Timeout => 2)
641 or return $dest_error->("Unable to create dest socket to $dhostip:$dport for $dpath");
642 $dsock->write("PUT $dpath HTTP/1.0\r\nContent-length: $clen\r\n\r\n")
643 or return $dest_error->("Unable to write data to $dpath on $dhostip:$dport");
644 return $dest_error->("Pipe closed during write to $dpath on $dhostip:$dport")
645 if $pipe_closed;
647 # now read data and print while we're reading.
648 my ($data, $written, $remain) = ('', 0, $clen);
649 my $bytes_to_read = 1024*1024; # read 1MB at a time until there's less than that remaining
650 $bytes_to_read = $remain if $remain < $bytes_to_read;
651 my $finished_read = 0;
653 if ($bytes_to_read) {
654 while (!$pipe_closed && (my $bytes = $sock->read($data, $bytes_to_read))) {
655 # now we've read in $bytes bytes
656 $remain -= $bytes;
657 $bytes_to_read = $remain if $remain < $bytes_to_read;
659 my $wbytes = $dsock->send($data);
660 $written += $wbytes;
661 return $dest_error->("Error: wrote $wbytes; expected to write $bytes; failed putting to $dpath")
662 unless $wbytes == $bytes;
663 $intercopy_cb->();
665 die if $bytes_to_read < 0;
666 next if $bytes_to_read;
667 $finished_read = 1;
668 last;
670 } else {
671 # 0 byte file copy.
672 $finished_read = 1;
674 return $dest_error->("closed pipe writing to destination") if $pipe_closed;
675 return $src_error->("error reading midway through source: $!") unless $finished_read;
677 # now read in the response line (should be first line)
678 my $line = <$dsock>;
679 if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
680 return 1 if $1 >= 200 && $1 <= 299;
681 return $dest_error->("Got HTTP status code $1 PUTing to http://$dhostip:$dport$dpath");
682 } else {
683 return $dest_error->("Error: HTTP response line not recognized writing to http://$dhostip:$dport$dpath: $line");
689 # Local Variables:
690 # mode: perl
691 # c-basic-indent: 4
692 # indent-tabs-mode: nil
693 # End:
695 __END__
697 =head1 NAME
699 MogileFS::Worker::Replicate -- replicates files
701 =head1 OVERVIEW
703 This process replicates files enqueued in B<file_to_replicate> table.
705 The replication policy (which devices to replicate to) is pluggable,
706 but only one policy comes with the server. See
707 L<MogileFS::ReplicationPolicy::MultipleHosts>
709 =head1 SEE ALSO
711 L<MogileFS::Worker>
713 L<MogileFS::ReplicationPolicy>
715 L<MogileFS::ReplicationPolicy::MultipleHosts>