1 package MogileFS
::Worker
::Query
;
2 # responds to queries from Mogile clients
7 use base
'MogileFS::Worker';
8 use fields
qw(querystarttime reqid callid);
9 use MogileFS
::Util
qw(error error_code first weighted_list
10 device_state eurl decode_url_args);
11 use MogileFS
::HTTPFile
;
12 use MogileFS
::Rebalance
;
17 my ($class, $psock) = @_;
18 my $self = fields
::new
($class);
19 $self->SUPER::new
($psock);
21 $self->{querystarttime
} = undef;
22 $self->{reqid
} = undef;
23 $self->{callid
} = undef;
27 # no query should take 30 seconds, and we check in every 5 seconds.
28 sub watchdog_timeout
{ 30 }
30 # called by plugins to register a command in the namespace
31 sub register_command
{
34 # validate the command, then convert it to the actual thing the user
36 return 0 unless $cmd =~ /^[\w\d]+$/;
39 # register in namespace with 'cmd_' which we will automatically find
49 my $psock = $self->{psock
};
51 vec($rin, fileno($psock), 1) = 1;
56 unless (select($rout=$rin, undef, undef, 5.0)) {
62 my $rv = sysread($psock, $newread, 1024);
65 die "While reading pipe from parent, got EOF. Parent's gone. Quitting.\n";
67 die "Error reading pipe from parent: $!\n";
72 while ($buf =~ s/^(.+?)\r?\n//) {
74 if ($self->process_generic_command(\
$line)) {
75 $self->still_alive; # no-op for watchdog
78 $self->process_line(\
$line);
85 my MogileFS
::Worker
::Query
$self = shift;
88 # see what kind of command this is
89 return $self->err_line('unknown_command')
90 unless $$lineref =~ /^(\d+-\d+)?\s*(\S+)\s*(.*)/;
92 $self->{reqid
} = $1 || undef;
93 my ($client_ip, $line) = ($2, $3);
95 # set global variables for zone determination
96 local $MogileFS::REQ_client_ip
= $client_ip;
98 # Use as array here, otherwise we get a string which breaks usage of
99 # Time::HiRes::tv_interval further on.
100 $self->{querystarttime
} = [ Time
::HiRes
::gettimeofday
() ];
102 # fallback to normal command handling
103 if ($line =~ /^(\w+)\s*(.*)/) {
104 my ($cmd, $orig_args) = ($1, $2);
108 my $cmd_handler = *{"cmd_$cmd"}{CODE
};
109 my $args = decode_url_args
(\
$orig_args);
110 $self->{callid
} = $args->{callid
};
112 local $MogileFS::REQ_altzone
= ($args->{zone
} && $args->{zone
} eq 'alt');
114 $cmd_handler->($self, $args);
117 my $errc = error_code
($@
);
118 if ($errc eq "dup") {
119 return $self->err_line("dup");
122 error
("Error running command '$cmd': $@");
123 return $self->err_line("failure");
130 return $self->err_line('unknown_command');
133 # this is a half-finished command. in particular, errors tend to
134 # crash the parent or child or something. it's a quick hack for a quick
135 # ops task that needs done. note in particular how it reaches across
136 # package boundaries into an API that the Replicator probably doesn't
139 my MogileFS
::Worker
::Query
$self = shift;
141 my $sdevid = $args->{sdevid
};
142 my $ddevid = $args->{ddevid
};
143 my $fid = $args->{fid
};
146 my $rv = MogileFS
::Worker
::Replicate
::http_copy
(sdevid
=> $sdevid,
151 my $dfid = MogileFS
::DevFID
->new($ddevid, $fid);
153 or return $self->err_line("copy_err", "failed to add link to database");
154 return $self->ok_line;
156 return $self->err_line("copy_err", $err);
160 # returns 0 on error, or dmid of domain
162 my MogileFS
::Worker
::Query
$self = shift;
165 my $domain = $args->{domain
};
167 return $self->err_line("no_domain") unless defined $domain && length $domain;
170 my $dmid = eval { Mgd
::domain_factory
()->get_by_name($domain)->id } or
171 return $self->err_line("unreg_domain");
177 my MogileFS
::Worker
::Query
$self = shift;
179 sleep($args->{duration
} || 10);
180 return $self->ok_line;
184 my MogileFS
::Worker
::Query
$self = shift;
186 die "Crashed on purpose" if $args->{crash
};
187 return $self->ok_line;
190 sub cmd_clear_cache
{
191 my MogileFS
::Worker
::Query
$self = shift;
193 $self->forget_that_monitor_has_run;
194 $self->send_to_parent(":refresh_monitor");
195 $self->wait_for_monitor;
197 return $self->ok_line(@_);
200 sub cmd_create_open
{
201 my MogileFS
::Worker
::Query
$self = shift;
204 # has to be filled out for some plugins
205 $args->{dmid
} = $self->check_domain($args) or return;
207 # first, pass this to a hook to do any manipulations needed
208 eval {MogileFS
::run_global_hook
('cmd_create_open', $args)};
210 return $self->err_line("plugin_aborted", "$@")
213 # validate parameters
214 my $dmid = $args->{dmid
};
215 my $key = $args->{key
} || "";
216 my $multi = $args->{multi_dest
} ?
1 : 0;
217 my $size = $args->{size
} || undef; # Size is optional at create time,
218 # but used to grep devices if available
220 # optional profiling of stages, if $args->{debug_profile}
221 my @profpoints; # array of [point,hires-starttime]
222 my $profstart = sub {
224 push @profpoints, [$pt, Time
::HiRes
::time()];
226 $profstart = sub {} unless $args->{debug_profile
};
227 $profstart->("begin");
229 # we want it to be undef if not explicit, else force to numeric
230 my $exp_fidid = $args->{fid
} ?
int($args->{fid
}) : undef;
233 my $sto = Mgd
::get_store
();
235 # figure out what classid this file is for
236 my $class = $args->{class} || "";
238 if (length($class)) {
239 $classid = eval { Mgd
::class_factory
()->get_by_name($dmid, $class)->id }
240 or return $self->err_line("unreg_class");
243 # if we haven't heard from the monitoring job yet, we need to chill a bit
244 # to prevent a race where we tell a user that we can't create a file when
245 # in fact we've just not heard from the monitor
246 $profstart->("wait_monitor");
247 $self->wait_for_monitor;
249 $profstart->("find_deviceid");
251 my @devices = Mgd
::device_factory
()->get_all;
253 @devices = grep { ($_->mb_free * 1024*1024) > $size } @devices;
256 unless (MogileFS
::run_global_hook
('cmd_create_open_order_devices', [ @devices ], \
@devices)) {
257 @devices = sort_devs_by_freespace
(@devices);
260 # find suitable device(s) to put this file on.
261 my @dests; # MogileFS::Device objects which are suitable
263 while (scalar(@dests) < ($multi ?
3 : 1)) {
264 my $ddev = shift @devices;
267 next unless $ddev->not_on_hosts(map { $_->host } @dests);
271 return $self->err_line("no_devices") unless @dests;
274 $sto->register_tempfile(
275 fid
=> $exp_fidid, # may be undef/NULL to mean auto-increment
279 devids
=> join(',', map { $_->id } @dests),
283 my $errc = error_code
($@
);
284 return $self->err_line("fid_in_use") if $errc eq "dup";
285 warn "Error registering tempfile: $@\n";
286 return $self->err_line("db");
289 # make sure directories exist for client to be able to PUT into
290 foreach my $dev (@dests) {
291 $profstart->("vivify_dir_on_dev" . $dev->id);
292 my $dfid = MogileFS
::DevFID
->new($dev, $fidid);
293 $dfid->vivify_directories;
298 # common reply variables
305 $res->{profpoints
} = 0;
306 for (my $i=0; $i<$#profpoints; $i++) {
307 my $ptnum = ++$res->{profpoints
};
308 $res->{"prof_${ptnum}_name"} = $profpoints[$i]->[0];
309 $res->{"prof_${ptnum}_time"} =
311 $profpoints[$i+1]->[1] - $profpoints[$i]->[1]);
318 foreach my $dev (@dests) {
320 $res->{"devid_$ct"} = $dev->id;
321 $res->{"path_$ct"} = MogileFS
::DevFID
->new($dev, $fidid)->url;
323 $res->{dev_count
} = $ct;
325 $res->{devid
} = $dests[0]->id;
326 $res->{path
} = MogileFS
::DevFID
->new($dests[0], $fidid)->url;
329 return $self->ok_line($res);
332 sub sort_devs_by_freespace
{
333 my @devices_with_weights = map {
334 [$_, 100 * $_->percent_free]
336 $b->percent_free <=> $a->percent_free;
338 $_->should_get_new_files;
342 MogileFS
::Util
::weighted_list
(splice(@devices_with_weights, 0, 20));
350 return defined($key) && length($key);
353 sub cmd_create_close
{
354 my MogileFS
::Worker
::Query
$self = shift;
357 # has to be filled out for some plugins
358 $args->{dmid
} = $self->check_domain($args) or return;
360 # call out to a hook that might modify the arguments for us
361 MogileFS
::run_global_hook
('cmd_create_close', $args);
363 # late validation of parameters
364 my $dmid = $args->{dmid
};
365 my $key = $args->{key
};
366 my $fidid = $args->{fid
} or return $self->err_line("no_fid");
367 my $devid = $args->{devid
} or return $self->err_line("no_devid");
368 my $path = $args->{path
} or return $self->err_line("no_path");
369 my $checksum = $args->{checksum
};
372 $checksum = eval { MogileFS
::Checksum
->from_string($fidid, $checksum) };
373 return $self->err_line("invalid_checksum_format") if $@
;
376 my $fid = MogileFS
::FID
->new($fidid);
377 my $dfid = MogileFS
::DevFID
->new($devid, $fid);
379 # is the provided path what we'd expect for this fid/devid?
380 return $self->err_line("bogus_args")
381 unless $path eq $dfid->url;
383 my $sto = Mgd
::get_store
();
385 # find the temp file we're closing and making real. If another worker
386 # already has it, bail out---the client closed it twice.
387 # this is racy, but the only expected use case is a client retrying.
388 # should still be fixed better once more scalable locking is available.
389 my $trow = $sto->delete_and_return_tempfile_row($fidid) or
390 return $self->err_line("no_temp_file");
392 # Protect against leaving orphaned uploads.
398 unless ($trow->{devids
} =~ m/\b$devid\b/) {
400 return $self->err_line("invalid_destdev", "File uploaded to invalid dest $devid. Valid devices were: " . $trow->{devids
});
403 # if a temp file is closed without a provided-key, that means to
405 unless (valid_key
($key)) {
407 return $self->ok_line;
410 # get size of file and verify that it matches what we were given, if anything
411 my $httpfile = MogileFS
::HTTPFile
->at($path);
412 my $size = $httpfile->size;
414 # size check is optional? Needs to support zero byte files.
415 $args->{size
} = -1 unless $args->{size
};
416 if (!defined($size) || $size == MogileFS
::HTTPFile
::FILE_MISSING
) {
417 # storage node is unreachable or the file is missing
418 my $type = defined $size ?
"missing" : "cantreach";
419 my $lasterr = MogileFS
::Util
::last_error
();
421 return $self->err_line("size_verify_error", "Expected: $args->{size}; actual: 0 ($type); path: $path; error: $lasterr")
424 if ($args->{size
} > -1 && ($args->{size
} != $size)) {
426 return $self->err_line("size_mismatch", "Expected: $args->{size}; actual: $size; path: $path")
429 # checksum validation is optional as it can be very expensive
430 # However, we /always/ verify it if the client wants us to, even
431 # if the class does not enforce or store it.
432 if ($checksum && $args->{checksumverify
}) {
433 my $alg = $checksum->hashname;
434 my $actual = $httpfile->digest($alg, sub { $self->still_alive });
435 if ($actual ne $checksum->{checksum
}) {
437 $actual = "$alg:" . unpack("H*", $actual);
438 return $self->err_line("checksum_mismatch",
439 "Expected: $checksum; actual: $actual; path: $path");
443 # see if we have a fid for this key already
444 my $old_fid = MogileFS
::FID
->new_from_dmid_and_key($dmid, $key);
446 # Fail if a file already exists for this fid. Should never
447 # happen, as it should not be possible to close a file twice.
448 return $self->err_line("fid_exists")
449 unless $old_fid->{fidid
} != $fidid;
454 # TODO: check for EIO?
459 $checksum->maybe_save($dmid, $trow->{classid
}) if $checksum;
461 $sto->replace_into_file(
466 classid
=> $trow->{classid
},
470 # mark it as needing replicating:
471 $fid->enqueue_for_replication();
473 # call the hook - if this fails, we need to back the file out
474 my $rv = MogileFS
::run_global_hook
('file_stored', $args);
475 if (defined $rv && ! $rv) { # undef = no hooks, 1 = success, 0 = failure
477 return $self->err_line("plugin_aborted");
480 # all went well, we would've hit condthrow on DB errors
481 return $self->ok_line;
484 sub cmd_updateclass
{
485 my MogileFS
::Worker
::Query
$self = shift;
488 $args->{dmid
} = $self->check_domain($args) or return;
490 # call out to a hook that might modify the arguments for us, abort if it tells us to
491 my $rv = MogileFS
::run_global_hook
('cmd_updateclass', $args);
492 return $self->err_line('plugin_aborted') if defined $rv && ! $rv;
494 my $dmid = $args->{dmid
};
495 my $key = $args->{key
};
496 valid_key
($key) or return $self->err_line("no_key");
497 my $class = $args->{class} or return $self->err_line("no_class");
499 my $classobj = Mgd
::class_factory
()->get_by_name($dmid, $class)
500 or return $self->err_line('class_not_found');
501 my $classid = $classobj->id;
503 my $fid = MogileFS
::FID
->new_from_dmid_and_key($dmid, $key)
504 or return $self->err_line('invalid_key');
506 my @devids = $fid->devids;
507 return $self->err_line("no_devices") unless @devids;
509 if ($fid->classid != $classid) {
510 $fid->update_class(classid
=> $classid);
511 $fid->enqueue_for_replication();
514 return $self->ok_line;
518 my MogileFS
::Worker
::Query
$self = shift;
521 # validate domain for plugins
522 $args->{dmid
} = $self->check_domain($args) or return;
524 # now invoke the plugin, abort if it tells us to
525 my $rv = MogileFS
::run_global_hook
('cmd_delete', $args);
526 return $self->err_line('plugin_aborted')
527 if defined $rv && ! $rv;
529 # validate parameters
530 my $dmid = $args->{dmid
};
531 my $key = $args->{key
};
533 valid_key
($key) or return $self->err_line("no_key");
535 # is this fid still owned by this key?
536 my $fid = MogileFS
::FID
->new_from_dmid_and_key($dmid, $key)
537 or return $self->err_line("unknown_key");
541 return $self->ok_line;
544 # Takes either domain/dkey or fid and tries to return as much as possible.
546 my MogileFS
::Worker
::Query
$self = shift;
548 # Talk to the master since this is "debug mode"
549 my $sto = Mgd
::get_store
();
552 # If a FID is provided, just use that.
556 $fidid = $args->{fid
}+0;
557 # It's not fatal if we don't find the row here.
558 $fid = $sto->file_row_from_fidid($args->{fid
}+0);
560 # If not, require dmid/dkey and pick up the fid from there.
561 $args->{dmid
} = $self->check_domain($args) or return;
562 return $self->err_line("no_key") unless valid_key
($args->{key
});
564 # now invoke the plugin, abort if it tells us to
565 my $rv = MogileFS
::run_global_hook
('cmd_file_debug', $args);
566 return $self->err_line('plugin_aborted')
567 if defined $rv && ! $rv;
569 $fid = $sto->file_row_from_dmid_key($args->{dmid
}, $args->{key
});
570 return $self->err_line("unknown_key") unless $fid;
571 $fidid = $fid->{fid
};
575 $fid->{domain
} = Mgd
::domain_factory
()->get_by_id($fid->{dmid
})->name;
576 $fid->{class} = Mgd
::class_factory
()->get_by_id($fid->{dmid
},
577 $fid->{classid
})->name;
580 # Fetch all of the queue data.
581 my $tfile = $sto->tempfile_row_from_fid($fidid);
582 my $repl = $sto->find_fid_from_file_to_replicate($fidid);
583 my $del = $sto->find_fid_from_file_to_delete2($fidid);
584 my $reb = $sto->find_fid_from_file_to_queue($fidid, REBAL_QUEUE
);
585 my $fsck = $sto->find_fid_from_file_to_queue($fidid, FSCK_QUEUE
);
587 # Fetch file_on rows, and turn into paths.
588 my @devids = $sto->fid_devids($fidid);
589 for my $devid (@devids) {
590 # Won't matter if we can't make the path (dev is dead/deleted/etc)
592 my $dfid = MogileFS
::DevFID
->new($devid, $fidid);
593 my $path = $dfid->get_url;
594 $ret->{'devpath_' . $devid} = $path;
597 $ret->{devids
} = join(',', @devids) if @devids;
599 # Always look for a checksum
600 my $checksum = Mgd
::get_store
()->get_checksum($fidid);
602 $checksum = MogileFS
::Checksum
->new($checksum);
603 $ret->{checksum
} = $checksum->info;
605 $ret->{checksum
} = 'NONE';
608 # Return file row (if found) and all other data.
609 my %toret = (fid
=> $fid, tempfile
=> $tfile, replqueue
=> $repl,
610 delqueue
=> $del, rebqueue
=> $reb, fsckqueue
=> $fsck);
611 while (my ($key, $hash) = each %toret) {
612 while (my ($name, $val) = each %$hash) {
613 $ret->{$key . '_' . $name} = $val;
617 return $self->err_line("unknown_fid") unless keys %$ret;
618 return $self->ok_line($ret);
622 my MogileFS
::Worker
::Query
$self = shift;
625 # validate domain for plugins
626 $args->{dmid
} = $self->check_domain($args) or return;
628 # now invoke the plugin, abort if it tells us to
629 my $rv = MogileFS
::run_global_hook
('cmd_file_info', $args);
630 return $self->err_line('plugin_aborted')
631 if defined $rv && ! $rv;
633 # validate parameters
634 my $dmid = $args->{dmid
};
635 my $key = $args->{key
};
637 valid_key
($key) or return $self->err_line("no_key");
640 Mgd
::get_store
()->slaves_ok(sub {
641 $fid = MogileFS
::FID
->new_from_dmid_and_key($dmid, $key);
643 $fid or return $self->err_line("unknown_key");
646 $ret->{fid
} = $fid->id;
647 $ret->{domain
} = Mgd
::domain_factory
()->get_by_id($fid->dmid)->name;
648 my $class = Mgd
::class_factory
()->get_by_id($fid->dmid, $fid->classid);
649 $ret->{class} = $class->name;
650 if ($class->{hashtype
}) {
651 my $checksum = Mgd
::get_store
()->get_checksum($fid->id);
653 $checksum = MogileFS
::Checksum
->new($checksum);
654 $ret->{checksum
} = $checksum->info;
656 $ret->{checksum
} = "MISSING";
660 $ret->{'length'} = $fid->length;
661 $ret->{devcount
} = $fid->devcount;
662 # Only if requested, also return the raw devids.
663 # Caller should use get_paths if they intend to fetch the file.
664 if ($args->{devices
}) {
665 $ret->{devids
} = join(',', $fid->devids);
668 return $self->ok_line($ret);
672 my MogileFS
::Worker
::Query
$self = shift;
675 # validate parameters
676 my $fromfid = ($args->{from
} || 0)+0;
677 my $count = ($args->{to
} || 0)+0;
679 $count = 500 if $count > 500 || $count < 0;
681 my $rows = Mgd
::get_store
()->file_row_from_fidid_range($fromfid, $count);
682 return $self->err_line('failure') unless $rows;
683 return $self->ok_line({ fid_count
=> 0 }) unless @
$rows;
685 # setup temporary storage of class/host
686 my (%domains, %classes);
688 # now iterate over our data rows and construct result
691 foreach my $r (@
$rows) {
694 $ret->{"fid_${ct}_fid"} = $fid;
695 $ret->{"fid_${ct}_domain"} = ($domains{$r->{dmid
}} ||=
696 Mgd
::domain_factory
()->get_by_id($r->{dmid
})->name);
697 $ret->{"fid_${ct}_class"} = ($classes{$r->{dmid
}}{$r->{classid
}} ||=
698 Mgd
::class_factory
()->get_by_id($r->{dmid
}, $r->{classid
})->name);
699 $ret->{"fid_${ct}_key"} = $r->{dkey
};
700 $ret->{"fid_${ct}_length"} = $r->{length};
701 $ret->{"fid_${ct}_devcount"} = $r->{devcount
};
703 $ret->{fid_count
} = $ct;
704 return $self->ok_line($ret);
708 my MogileFS
::Worker
::Query
$self = shift;
711 # validate parameters
712 my $dmid = $self->check_domain($args) or return;
713 my ($prefix, $after, $limit) = ($args->{prefix
}, $args->{after
}, $args->{limit
});
715 if (defined $prefix and $prefix ne '') {
716 # now validate that after matches prefix
717 return $self->err_line('after_mismatch')
718 if $after && $after !~ /^$prefix/;
723 $limit = 1000 if $limit > 1000;
725 my $keys = Mgd
::get_store
()->get_keys_like($dmid, $prefix, $after, $limit);
727 # if we got nothing, say so
728 return $self->err_line('none_match') unless $keys && @
$keys;
730 # construct the output and send
731 my $ret = { key_count
=> 0, next_after
=> '' };
732 foreach my $key (@
$keys) {
734 $ret->{next_after
} = $key
735 if $key gt $ret->{next_after
};
736 $ret->{"key_$ret->{key_count}"} = $key;
738 return $self->ok_line($ret);
742 my MogileFS
::Worker
::Query
$self = shift;
745 # validate parameters
746 my $dmid = $self->check_domain($args) or return;
747 my ($fkey, $tkey) = ($args->{from_key
}, $args->{to_key
});
748 unless (valid_key
($fkey) && valid_key
($tkey)) {
749 return $self->err_line("no_key");
752 my $fid = MogileFS
::FID
->new_from_dmid_and_key($dmid, $fkey)
753 or return $self->err_line("unknown_key");
755 $fid->rename($tkey) or
756 $self->err_line("key_exists");
758 return $self->ok_line;
762 my MogileFS
::Worker
::Query
$self = shift;
765 my $ret = { hosts
=> 0 };
766 for my $host (Mgd
::host_factory
()->get_all) {
767 next if defined $args->{hostid
} && $host->id != $args->{hostid
};
768 my $n = ++$ret->{hosts
};
769 my $fields = $host->fields(qw(hostid status hostname hostip http_port
770 http_get_port altip altmask));
771 while (my ($key, $val) = each %$fields) {
772 # must be regular data so copy it in
773 $ret->{"host${n}_$key"} = $val;
777 return $self->ok_line($ret);
780 sub cmd_get_devices
{
781 my MogileFS
::Worker
::Query
$self = shift;
784 my $ret = { devices
=> 0 };
785 for my $dev (Mgd
::device_factory
()->get_all) {
786 next if defined $args->{devid
} && $dev->id != $args->{devid
};
787 my $n = ++$ret->{devices
};
789 my $sum = $dev->fields;
790 while (my ($key, $val) = each %$sum) {
791 $ret->{"dev${n}_$key"} = $val;
795 return $self->ok_line($ret);
798 sub cmd_create_device
{
799 my MogileFS
::Worker
::Query
$self = shift;
802 my $status = $args->{state} || "alive";
803 return $self->err_line("invalid_state") unless
804 device_state
($status);
806 my $devid = $args->{devid
};
807 return $self->err_line("invalid_devid") unless $devid && $devid =~ /^\d+$/;
811 my $sto = Mgd
::get_store
();
812 if ($args->{hostid
} && $args->{hostid
} =~ /^\d+$/) {
813 $hostid = $sto->get_hostid_by_id($args->{hostid
});
814 return $self->err_line("unknown_hostid") unless $hostid;
815 } elsif (my $hname = $args->{hostname
}) {
816 $hostid = $sto->get_hostid_by_name($hname);
817 return $self->err_line("unknown_host") unless $hostid;
819 return $self->err_line("bad_args", "No hostid/hostname parameter");
822 if (eval { $sto->create_device($devid, $hostid, $status) }) {
823 return $self->cmd_clear_cache;
826 my $errc = error_code
($@
);
827 return $self->err_line("existing_devid") if $errc;
831 sub cmd_create_domain
{
832 my MogileFS
::Worker
::Query
$self = shift;
835 my $domain = $args->{domain
} or
836 return $self->err_line('no_domain');
838 my $dom = eval { Mgd
::get_store
()->create_domain($domain); };
840 if (error_code
($@
) eq "dup") {
841 return $self->err_line('domain_exists');
843 return $self->err_line('failure', "$@");
846 return $self->cmd_clear_cache({ domain
=> $domain });
849 sub cmd_delete_domain
{
850 my MogileFS
::Worker
::Query
$self = shift;
853 my $domain = $args->{domain
} or
854 return $self->err_line('no_domain');
856 my $sto = Mgd
::get_store
();
857 my $dmid = $sto->get_domainid_by_name($domain) or
858 return $self->err_line('domain_not_found');
860 if (eval { $sto->delete_domain($dmid) }) {
861 return $self->cmd_clear_cache({ domain
=> $domain });
864 my $err = error_code
($@
);
865 return $self->err_line('domain_has_files') if $err eq "has_files";
866 return $self->err_line('domain_has_classes') if $err eq "has_classes";
867 return $self->err_line("failure");
870 sub cmd_create_class
{
871 my MogileFS
::Worker
::Query
$self = shift;
874 my $domain = $args->{domain
};
875 return $self->err_line('no_domain') unless length $domain;
877 my $class = $args->{class};
878 return $self->err_line('no_class') unless length $class;
880 my $mindevcount = $args->{mindevcount
}+0;
881 return $self->err_line('invalid_mindevcount') unless $mindevcount > 0;
883 my $replpolicy = $args->{replpolicy
} || '';
886 MogileFS
::ReplicationPolicy
->new_from_policy_string($replpolicy);
888 return $self->err_line('invalid_replpolicy', $@
) if $@
;
891 my $hashtype = $args->{hashtype
};
892 if ($hashtype && $hashtype ne 'NONE') {
893 my $tmp = $MogileFS::Checksum
::NAME2TYPE
{$hashtype};
894 return $self->err_line('invalid_hashtype') unless $tmp;
898 my $sto = Mgd
::get_store
();
899 my $dmid = $sto->get_domainid_by_name($domain) or
900 return $self->err_line('domain_not_found');
902 my $clsid = $sto->get_classid_by_name($dmid, $class);
903 if (!defined $clsid && $args->{update
} && $class eq 'default') {
906 if ($args->{update
}) {
907 return $self->err_line('class_not_found') if ! defined $clsid;
908 $sto->update_class_name(dmid
=> $dmid, classid
=> $clsid,
909 classname
=> $class);
911 $clsid = eval { $sto->create_class($dmid, $class); };
913 if (error_code
($@
) eq "dup") {
914 return $self->err_line('class_exists');
916 return $self->err_line('failure', "$@");
919 $sto->update_class_mindevcount(dmid
=> $dmid, classid
=> $clsid,
920 mindevcount
=> $mindevcount);
921 # don't erase an existing replpolicy if we're not setting a new one.
922 $sto->update_class_replpolicy(dmid
=> $dmid, classid
=> $clsid,
923 replpolicy
=> $replpolicy) if $replpolicy;
925 $sto->update_class_hashtype(dmid
=> $dmid, classid
=> $clsid,
926 hashtype
=> $hashtype eq 'NONE' ?
undef : $hashtype);
930 return $self->cmd_clear_cache({ class => $class, mindevcount
=> $mindevcount, domain
=> $domain });
933 sub cmd_update_class
{
934 my MogileFS
::Worker
::Query
$self = shift;
937 # simply passes through to create_class with update set
938 $self->cmd_create_class({ %$args, update
=> 1 });
941 sub cmd_delete_class
{
942 my MogileFS
::Worker
::Query
$self = shift;
945 my $domain = $args->{domain
};
946 return $self->err_line('no_domain') unless length $domain;
947 my $class = $args->{class};
948 return $self->err_line('no_class') unless length $domain;
950 return $self->err_line('nodel_default_class') if $class eq 'default';
952 my $sto = Mgd
::get_store
();
953 my $dmid = $sto->get_domainid_by_name($domain) or
954 return $self->err_line('domain_not_found');
955 my $clsid = $sto->get_classid_by_name($dmid, $class);
956 return $self->err_line('class_not_found') unless defined $clsid;
958 if (eval { Mgd
::get_store
()->delete_class($dmid, $clsid) }) {
959 return $self->cmd_clear_cache({ domain
=> $domain, class => $class });
962 my $errc = error_code
($@
);
963 return $self->err_line('class_has_files') if $errc eq "has_files";
964 return $self->err_line('failure');
967 sub cmd_create_host
{
968 my MogileFS
::Worker
::Query
$self = shift;
971 my $hostname = $args->{host
} or
972 return $self->err_line('no_host');
974 my $sto = Mgd
::get_store
();
975 my $hostid = $sto->get_hostid_by_name($hostname);
977 # if we're creating a new host, require ip/port, and default to
978 # host being down if client didn't specify
979 if ($args->{update
}) {
980 return $self->err_line('host_not_found') unless $hostid;
982 return $self->err_line('host_exists') if $hostid;
983 return $self->err_line('no_ip') unless $args->{ip
};
984 return $self->err_line('no_port') unless $args->{port
};
985 $args->{status
} ||= 'down';
988 if ($args->{status
}) {
989 return $self->err_line('unknown_state')
990 unless MogileFS
::Host
->valid_state($args->{status
});
993 # arguments all good, let's do it.
995 $hostid ||= $sto->create_host($hostname, $args->{ip
});
997 # Protocol mismatch data fixup.
998 $args->{hostip
} = delete $args->{ip
} if exists $args->{ip
};
999 $args->{http_port
} = delete $args->{port
} if exists $args->{port
};
1000 $args->{http_get_port
} = delete $args->{getport
} if exists $args->{getport
};
1001 my @toupdate = grep { exists $args->{$_} } qw(status hostip http_port
1002 http_get_port altip altmask);
1003 $sto->update_host($hostid, { map { $_ => $args->{$_} } @toupdate });
1006 return $self->cmd_clear_cache({ hostid
=> $hostid, hostname
=> $hostname });
1009 sub cmd_update_host
{
1010 my MogileFS
::Worker
::Query
$self = shift;
1013 # simply passes through to create_host with update set
1014 $self->cmd_create_host({ %$args, update
=> 1 });
1017 sub cmd_delete_host
{
1018 my MogileFS
::Worker
::Query
$self = shift;
1021 my $sto = Mgd
::get_store
();
1022 my $hostid = $sto->get_hostid_by_name($args->{host
})
1023 or return $self->err_line('unknown_host');
1025 # TODO: $sto->delete_host should have a "has_devices" test internally
1026 for my $dev ($sto->get_all_devices) {
1027 return $self->err_line('host_not_empty')
1028 if $dev->{hostid
} == $hostid;
1031 $sto->delete_host($hostid);
1033 return $self->cmd_clear_cache;
1036 sub cmd_get_domains
{
1037 my MogileFS
::Worker
::Query
$self = shift;
1042 for my $dom (Mgd
::domain_factory
()->get_all) {
1044 $ret->{"domain${dm_n}"} = $dom->name;
1046 foreach my $cl ($dom->classes) {
1048 $ret->{"domain${dm_n}class${cl_n}name"} = $cl->name;
1049 $ret->{"domain${dm_n}class${cl_n}mindevcount"} = $cl->mindevcount;
1050 $ret->{"domain${dm_n}class${cl_n}replpolicy"} =
1051 $cl->repl_policy_string;
1052 $ret->{"domain${dm_n}class${cl_n}hashtype"} = $cl->hashtype_string;
1054 $ret->{"domain${dm_n}classes"} = $cl_n;
1056 $ret->{"domains"} = $dm_n;
1058 return $self->ok_line($ret);
1062 my MogileFS
::Worker
::Query
$self = shift;
1065 # memcache mappings are as follows:
1066 # mogfid:<dmid>:<dkey> -> fidid
1067 # mogdevids:<fidid> -> \@devids (and TODO: invalidate when deletion is run!)
1069 # if you specify 'noverify', that means a correct answer isn't needed and memcache can
1071 my $memc = MogileFS
::Config
->memcache_client;
1072 my $get_from_memc = $memc && $args->{noverify
};
1073 my $memcache_ttl = MogileFS
::Config
->server_setting_cached("memcache_ttl") || 3600;
1075 # validate domain for plugins
1076 $args->{dmid
} = $self->check_domain($args) or return;
1078 # now invoke the plugin, abort if it tells us to
1079 my $rv = MogileFS
::run_global_hook
('cmd_get_paths', $args);
1080 return $self->err_line('plugin_aborted')
1081 if defined $rv && ! $rv;
1083 # validate parameters
1084 my $dmid = $args->{dmid
};
1085 my $key = $args->{key
};
1087 valid_key
($key) or return $self->err_line("no_key");
1089 # We default to returning two possible paths.
1090 # but the client may ask for more if they want.
1091 my $pathcount = $args->{pathcount
} || 2;
1092 $pathcount = 2 if $pathcount < 2;
1096 my $need_fid_in_memcache = 0;
1097 my $mogfid_memkey = "mogfid:$args->{dmid}:$key";
1098 if ($get_from_memc) {
1099 if (my $fidid = $memc->get($mogfid_memkey)) {
1100 $fid = MogileFS
::FID
->new($fidid);
1102 $need_fid_in_memcache = 1;
1106 Mgd
::get_store
()->slaves_ok(sub {
1107 $fid = MogileFS
::FID
->new_from_dmid_and_key($dmid, $key);
1109 $fid or return $self->err_line("unknown_key");
1112 # add to memcache, if needed. for an hour.
1113 $memc->set($mogfid_memkey, $fid->id, $memcache_ttl ) if $need_fid_in_memcache || ($memc && !$get_from_memc);
1115 my $dmap = Mgd
::device_factory
()->map_by_id;
1121 # find devids that FID is on in memcache or db.
1123 my $need_devids_in_memcache = 0;
1124 my $devid_memkey = "mogdevids:" . $fid->id;
1125 if ($get_from_memc) {
1126 if (my $list = $memc->get($devid_memkey)) {
1127 @fid_devids = @
$list;
1129 $need_devids_in_memcache = 1;
1132 unless (@fid_devids) {
1133 Mgd
::get_store
()->slaves_ok(sub {
1134 @fid_devids = $fid->devids;
1136 $memc->set($devid_memkey, \
@fid_devids, $memcache_ttl ) if $need_devids_in_memcache || ($memc && !$get_from_memc);
1139 my @devices = map { $dmap->{$_} } @fid_devids;
1142 unless (MogileFS
::run_global_hook
('cmd_get_paths_order_devices', \
@devices, \
@sorted_devs)) {
1143 @sorted_devs = sort_devs_by_utilization
(@devices);
1146 # keep one partially-bogus path around just in case we have nothing else to send.
1149 # files on devices set for drain may disappear soon.
1152 # construct result paths
1153 foreach my $dev (@sorted_devs) {
1154 next unless $dev && $dev->host;
1156 my $dfid = MogileFS
::DevFID
->new($dev, $fid);
1157 my $path = $dfid->get_url;
1158 my $currently_up = $dev->should_read_from;
1160 if (! $currently_up) {
1161 $backup_path = $path;
1165 # only verify size one first one, and never verify if they've asked not to
1168 $args->{noverify
} ||
1169 $dfid->size_matches;
1171 if ($dev->dstate->should_drain) {
1172 push @drain_paths, $path;
1176 my $n = ++$ret->{paths
};
1177 $ret->{"path$n"} = $path;
1178 last if $n == $pathcount; # one verified, one likely seems enough for now. time will tell.
1181 # deprioritize devices set for drain, they could disappear soon...
1182 # Clients /should/ try to use lower-numbered paths first to avoid this.
1183 if ($ret->{paths
} < $pathcount && @drain_paths) {
1184 foreach my $path (@drain_paths) {
1185 my $n = ++$ret->{paths
};
1186 $ret->{"path$n"} = $path;
1187 last if $n == $pathcount;
1191 # use our backup path if all else fails
1192 if ($backup_path && ! $ret->{paths
}) {
1194 $ret->{path1
} = $backup_path;
1197 return $self->ok_line($ret);
1200 sub sort_devs_by_utilization
{
1201 my @devices_with_weights;
1203 # is this fid still owned by this key?
1204 foreach my $dev (@_) {
1206 my $util = $dev->observed_utilization;
1208 if (defined($util) and $util =~ /\A\d+\Z/) {
1209 $weight = 102 - $util;
1212 $weight = $dev->weight;
1215 push @devices_with_weights, [$dev, $weight];
1218 # randomly weight the devices
1219 my @list = MogileFS
::Util
::weighted_list
(@devices_with_weights);
1224 # ------------------------------------------------------------
1226 # NOTE: cmd_edit_file is EXPERIMENTAL. Please see the documentation
1227 # for edit_file in L<MogileFS::Client>.
1228 # It is not recommended to use cmd_edit_file on production systems.
1230 # cmd_edit_file is similar to cmd_get_paths, except we:
1231 # - take the device of the first path we would have returned
1232 # - get a tempfile with a new fid (pointing to nothing) on the same device
1233 # the tempfile has the same key, so will replace the old contents on
1235 # - detach the old fid from that device (leaving the file in place)
1236 # - attach the new fid to that device
1237 # - returns only the first path to the old fid and a path to new fid
1238 # (the client then DAV-renames the old path to the new path)
1240 # TODO - what to do about situations where we would be reducing the
1241 # replica count to zero?
1242 # TODO - what to do about pending replications where we remove the source?
1243 # TODO - the current implementation of cmd_edit_file is based on a copy
1244 # of cmd_get_paths. Once proven mature, consider factoring out common
1245 # code from the two functions.
1246 # ------------------------------------------------------------
1248 my MogileFS
::Worker
::Query
$self = shift;
1251 my $memc = MogileFS
::Config
->memcache_client;
1253 # validate domain for plugins
1254 $args->{dmid
} = $self->check_domain($args) or return;
1256 # now invoke the plugin, abort if it tells us to
1257 my $rv = MogileFS
::run_global_hook
('cmd_get_paths', $args);
1258 return $self->err_line('plugin_aborted')
1259 if defined $rv && ! $rv;
1261 # validate parameters
1262 my $dmid = $args->{dmid
};
1263 my $key = $args->{key
};
1265 valid_key
($key) or return $self->err_line("no_key");
1269 my $need_fid_in_memcache = 0;
1270 my $mogfid_memkey = "mogfid:$args->{dmid}:$key";
1271 if (my $fidid = $memc->get($mogfid_memkey)) {
1272 $fid = MogileFS
::FID
->new($fidid);
1274 $need_fid_in_memcache = 1;
1277 Mgd
::get_store
()->slaves_ok(sub {
1278 $fid = MogileFS
::FID
->new_from_dmid_and_key($dmid, $key);
1280 $fid or return $self->err_line("unknown_key");
1283 # add to memcache, if needed. for an hour.
1284 $memc->add($mogfid_memkey, $fid->id, 3600) if $need_fid_in_memcache;
1286 my $dmap = Mgd
::device_factory
()->map_by_id;
1288 my @devices_with_weights;
1290 # find devids that FID is on in memcache or db.
1292 my $need_devids_in_memcache = 0;
1293 my $devid_memkey = "mogdevids:" . $fid->id;
1294 if (my $list = $memc->get($devid_memkey)) {
1295 @fid_devids = @
$list;
1297 $need_devids_in_memcache = 1;
1299 unless (@fid_devids) {
1300 Mgd
::get_store
()->slaves_ok(sub {
1301 @fid_devids = $fid->devids;
1303 $memc->add($devid_memkey, \
@fid_devids, 3600) if $need_devids_in_memcache;
1306 # is this fid still owned by this key?
1307 foreach my $devid (@fid_devids) {
1309 my $dev = $dmap->{$devid};
1310 my $util = $dev->observed_utilization;
1312 if (defined($util) and $util =~ /\A\d+\Z/) {
1313 $weight = 102 - $util;
1316 $weight = $dev->weight;
1319 push @devices_with_weights, [$devid, $weight];
1322 # randomly weight the devices
1323 # TODO - should we reverse the order, to leave the best
1324 # one there for get_paths?
1325 my @list = MogileFS
::Util
::weighted_list
(@devices_with_weights);
1327 # Filter out bad devs
1330 my $dev = $dmap->{$devid};
1332 $dev && $dev->should_read_from;
1335 # Take first remaining device from list
1336 my $devid = $list[0];
1338 my $classid = $fid->classid;
1340 Mgd
::get_store
()->register_tempfile(
1341 fid
=> undef, # undef => let the store pick a fid
1343 key
=> $key, # This tempfile will ultimately become this key
1344 classid
=> $classid,
1349 my $errc = error_code
($@
);
1350 return $self->err_line("fid_in_use") if $errc eq "dup";
1351 warn "Error registering tempfile: $@\n";
1352 return $self->err_line("db");
1354 unless (Mgd
::get_store
()->remove_fidid_from_devid($fid->id, $devid)) {
1355 warn "Error removing fidid from devid";
1356 return $self->err_line("db");
1358 unless (Mgd
::get_store
()->add_fidid_to_devid($newfid, $devid)) {
1359 warn "Error removing fidid from devid";
1360 return $self->err_line("db");
1364 my $dfid = MogileFS
::DevFID
->new($devid, $_);
1365 my $path = $dfid->get_url;
1368 $ret->{oldpath
} = $paths[0];
1369 $ret->{newpath
} = $paths[1];
1370 $ret->{fid
} = $newfid;
1371 $ret->{devid
} = $devid;
1372 $ret->{class} = $classid;
1373 return $self->ok_line($ret);
1376 sub cmd_set_weight
{
1377 my MogileFS
::Worker
::Query
$self = shift;
1380 # figure out what they want to do
1381 my ($hostname, $devid, $weight) = ($args->{host
}, $args->{device
}+0, $args->{weight
}+0);
1382 return $self->err_line('bad_params')
1383 unless $hostname && $devid && $weight >= 0;
1385 my $dev = Mgd
::device_factory
()->get_by_id($devid);
1386 return $self->err_line('no_device') unless $dev;
1387 return $self->err_line('host_mismatch')
1388 unless $dev->host->hostname eq $hostname;
1390 Mgd
::get_store
()->set_device_weight($dev->id, $weight);
1392 return $self->cmd_clear_cache;
1396 my MogileFS
::Worker
::Query
$self = shift;
1399 # figure out what they want to do
1400 my ($hostname, $devid, $state) = ($args->{host
}, $args->{device
}+0, $args->{state});
1402 my $dstate = device_state
($state);
1403 return $self->err_line('bad_params')
1404 unless $hostname && $devid && $dstate;
1406 my $dev = Mgd
::device_factory
()->get_by_id($devid);
1407 return $self->err_line('no_device') unless $dev;
1408 return $self->err_line('host_mismatch')
1409 unless $dev->host->hostname eq $hostname;
1411 # make sure the destination state isn't too high
1412 return $self->err_line('state_too_high')
1413 unless $dev->can_change_to_state($state);
1415 Mgd
::get_store
()->set_device_state($dev->id, $state);
1416 return $self->cmd_clear_cache;
1420 my MogileFS
::Worker
::Query
$self = shift;
1422 return $self->ok_line;
1425 sub cmd_replicate_now
{
1426 my MogileFS
::Worker
::Query
$self = shift;
1428 my $rv = Mgd
::get_store
()->replicate_now;
1429 return $self->ok_line({ count
=> int($rv) });
1432 sub cmd_set_server_setting
{
1433 my MogileFS
::Worker
::Query
$self = shift;
1435 my $key = $args->{key
} or
1436 return $self->err_line("bad_params");
1437 my $val = $args->{value
};
1439 my $chk = MogileFS
::Config
->server_setting_is_writable($key) or
1440 return $self->err_line("not_writable");
1442 my $cleanval = eval { $chk->($val); };
1443 return $self->err_line("invalid_format", $@
) if $@
;
1445 MogileFS
::Config
->set_server_setting($key, $cleanval);
1447 # GROSS HACK: slave settings are managed directly by MogileFS::Client, but
1448 # I need to add a version key, so we check and inject that code here.
1449 # FIXME: Move this when slave keys are managed by query worker commands!
1450 if ($key =~ /^slave_/) {
1451 Mgd
::get_store
()->incr_server_setting('slave_version', 1);
1454 return $self->ok_line;
1457 sub cmd_server_setting
{
1458 my MogileFS
::Worker
::Query
$self = shift;
1460 my $key = $args->{key
};
1461 return $self->err_line("bad_params") unless $key;
1462 my $value = MogileFS
::Config
->server_setting($key);
1463 return $self->ok_line({key
=> $key, value
=> $value});
1466 sub cmd_server_settings
{
1467 my MogileFS
::Worker
::Query
$self = shift;
1468 my $ss = Mgd
::get_store
()->server_settings;
1471 while (my ($k, $v) = each %$ss) {
1472 next unless MogileFS
::Config
->server_setting_is_readable($k);
1473 $ret->{"key_count"} = ++$n;
1474 $ret->{"key_$n"} = $k;
1475 $ret->{"value_$n"} = $v;
1477 return $self->ok_line($ret);
1480 sub cmd_do_monitor_round
{
1481 my MogileFS
::Worker
::Query
$self = shift;
1483 $self->forget_that_monitor_has_run;
1484 $self->wait_for_monitor;
1485 return $self->ok_line;
1488 sub cmd_fsck_start
{
1489 my MogileFS
::Worker
::Query
$self = shift;
1490 my $sto = Mgd
::get_store
();
1492 my $fsck_host = MogileFS
::Config
->server_setting("fsck_host");
1493 my $rebal_host = MogileFS
::Config
->server_setting("rebal_host");
1495 return $self->err_line("fsck_running", "fsck is already running") if $fsck_host;
1496 return $self->err_line("rebal_running", "rebalance running; cannot run fsck at same time") if $rebal_host;
1498 # reset position, if a previous fsck was already completed.
1499 my $intss = sub { MogileFS
::Config
->server_setting($_[0]) || 0 };
1500 my $checked_fid = $intss->("fsck_highest_fid_checked");
1501 my $final_fid = $intss->("fsck_fid_at_end");
1502 if (($checked_fid && $final_fid && $checked_fid >= $final_fid) ||
1503 (!$final_fid && !$checked_fid)) {
1504 $self->_do_fsck_reset or return $self->err_line("db");
1507 # set params for stats:
1508 $sto->set_server_setting("fsck_start_time", $sto->get_db_unixtime);
1509 $sto->set_server_setting("fsck_stop_time", undef);
1510 $sto->set_server_setting("fsck_fids_checked", 0);
1512 MogileFS
::Config
->server_setting('fsck_highest_fid_checked') || 0;
1513 $sto->set_server_setting("fsck_start_fid", $start_fid);
1516 $sto->set_server_setting("fsck_host", MogileFS
::Config
->hostname);
1517 MogileFS
::ProcManager
->wake_a("fsck");
1519 return $self->ok_line;
1523 my MogileFS
::Worker
::Query
$self = shift;
1524 my $sto = Mgd
::get_store
();
1525 $sto->set_server_setting("fsck_host", undef);
1526 $sto->set_server_setting("fsck_stop_time", $sto->get_db_unixtime);
1527 return $self->ok_line;
1530 sub cmd_fsck_reset
{
1531 my MogileFS
::Worker
::Query
$self = shift;
1534 my $sto = Mgd
::get_store
();
1535 $sto->set_server_setting("fsck_opt_policy_only",
1536 ($args->{policy_only
} ?
"1" : undef));
1537 $sto->set_server_setting("fsck_highest_fid_checked",
1538 ($args->{startpos
} ?
$args->{startpos
} : "0"));
1540 $self->_do_fsck_reset or return $self->err_line("db");
1541 return $self->ok_line;
1544 sub _do_fsck_reset
{
1545 my MogileFS
::Worker
::Query
$self = shift;
1547 my $sto = Mgd
::get_store
();
1548 $sto->set_server_setting("fsck_start_time", undef);
1549 $sto->set_server_setting("fsck_stop_time", undef);
1550 $sto->set_server_setting("fsck_fids_checked", 0);
1551 $sto->set_server_setting("fsck_fid_at_end", $sto->max_fidid);
1553 # clear existing event counts summaries.
1554 my $ss = $sto->server_settings;
1555 foreach my $k (keys %$ss) {
1556 next unless $k =~ /^fsck_sum_evcount_/;
1557 $sto->set_server_setting($k, undef);
1559 my $logid = $sto->max_fsck_logid;
1560 $sto->set_server_setting("fsck_start_maxlogid", $logid);
1561 $sto->set_server_setting("fsck_logid_processed", $logid);
1564 error
("DB error in _do_fsck_reset: $@");
1570 sub cmd_fsck_clearlog
{
1571 my MogileFS
::Worker
::Query
$self = shift;
1572 my $sto = Mgd
::get_store
();
1573 $sto->clear_fsck_log;
1574 return $self->ok_line;
1577 sub cmd_fsck_getlog
{
1578 my MogileFS
::Worker
::Query
$self = shift;
1581 my $sto = Mgd
::get_store
();
1582 my @rows = $sto->fsck_log_rows($args->{after_logid
}, 100);
1585 foreach my $row (@rows) {
1587 foreach my $k (keys %$row) {
1588 $ret->{"row_${n}_$k"} = $row->{$k} if defined $row->{$k};
1591 $ret->{row_count
} = $n;
1592 return $self->ok_line($ret);
1595 sub cmd_fsck_status
{
1596 my MogileFS
::Worker
::Query
$self = shift;
1598 my $sto = Mgd
::get_store
();
1599 # Kick up the summary before we read the values
1600 $sto->fsck_log_summarize;
1601 my $fsck_host = MogileFS
::Config
->server_setting('fsck_host');
1602 my $intss = sub { MogileFS
::Config
->server_setting($_[0]) || 0 };
1604 running
=> ($fsck_host ?
1 : 0),
1606 max_fid_checked
=> $intss->('fsck_highest_fid_checked'),
1607 policy_only
=> $intss->('fsck_opt_policy_only'),
1608 end_fid
=> $intss->('fsck_fid_at_end'),
1609 start_time
=> $intss->('fsck_start_time'),
1610 stop_time
=> $intss->('fsck_stop_time'),
1611 current_time
=> $sto->get_db_unixtime,
1612 max_logid
=> $sto->max_fsck_logid,
1615 # throw some stats in.
1616 my $ss = $sto->server_settings;
1617 foreach my $k (keys %$ss) {
1618 next unless $k =~ /^fsck_sum_evcount_(.+)/;
1619 $ret->{"num_$1"} += $ss->{$k};
1622 return $self->ok_line($ret);
1625 sub cmd_rebalance_status
{
1626 my MogileFS
::Worker
::Query
$self = shift;
1628 my $sto = Mgd
::get_store
();
1630 my $rebal_state = MogileFS
::Config
->server_setting('rebal_state');
1631 return $self->err_line('no_rebal_state') unless $rebal_state;
1632 return $self->ok_line({ state => $rebal_state });
1635 sub cmd_rebalance_start
{
1636 my MogileFS
::Worker
::Query
$self = shift;
1638 my $rebal_host = MogileFS
::Config
->server_setting("rebal_host");
1639 my $fsck_host = MogileFS
::Config
->server_setting("fsck_host");
1641 return $self->err_line("rebal_running", "rebalance is already running") if $rebal_host;
1642 return $self->err_line("fsck_running", "fsck running; cannot run rebalance at same time") if $fsck_host;
1644 my $rebal_state = MogileFS
::Config
->server_setting('rebal_state');
1645 unless ($rebal_state) {
1646 my $rebal_pol = MogileFS
::Config
->server_setting('rebal_policy');
1647 return $self->err_line('no_rebal_policy') unless $rebal_pol;
1649 my $rebal = MogileFS
::Rebalance
->new;
1650 $rebal->policy($rebal_pol);
1651 my @devs = Mgd
::device_factory
()->get_all;
1652 $rebal->init(\
@devs);
1653 my $sdevs = $rebal->source_devices;
1655 $rebal_state = $rebal->save_state;
1656 MogileFS
::Config
->set_server_setting('rebal_state', $rebal_state);
1658 # TODO: register start time somewhere.
1659 MogileFS
::Config
->set_server_setting('rebal_host', MogileFS
::Config
->hostname);
1660 return $self->ok_line({ state => $rebal_state });
1663 sub cmd_rebalance_test
{
1664 my MogileFS
::Worker
::Query
$self = shift;
1665 my $rebal_pol = MogileFS
::Config
->server_setting('rebal_policy');
1666 my $rebal_state = MogileFS
::Config
->server_setting('rebal_state');
1667 return $self->err_line('no_rebal_policy') unless $rebal_pol;
1669 my $rebal = MogileFS
::Rebalance
->new;
1670 my @devs = Mgd
::device_factory
()->get_all;
1671 $rebal->policy($rebal_pol);
1672 $rebal->init(\
@devs);
1674 # client should display list of source, destination devices.
1675 # FIXME: can probably avoid calling this twice by pulling state?
1676 # *or* not running init.
1677 my $sdevs = $rebal->filter_source_devices(\
@devs);
1678 my $ddevs = $rebal->filter_dest_devices(\
@devs);
1680 $ret->{sdevs
} = join(',', @
$sdevs);
1681 $ret->{ddevs
} = join(',', @
$ddevs);
1683 return $self->ok_line($ret);
1686 sub cmd_rebalance_reset
{
1687 my MogileFS
::Worker
::Query
$self = shift;
1688 my $host = MogileFS
::Config
->server_setting('rebal_host');
1690 return $self->err_line("rebal_running", "rebalance is running") if $host;
1692 MogileFS
::Config
->set_server_setting('rebal_state', undef);
1693 return $self->ok_line;
1696 sub cmd_rebalance_stop
{
1697 my MogileFS
::Worker
::Query
$self = shift;
1698 my $host = MogileFS
::Config
->server_setting('rebal_host');
1700 return $self->err_line('rebal_not_started');
1702 MogileFS
::Config
->set_server_setting('rebal_signal', 'stop');
1703 return $self->ok_line;
1706 sub cmd_rebalance_set_policy
{
1707 my MogileFS
::Worker
::Query
$self = shift;
1710 my $rebal_host = MogileFS
::Config
->server_setting("rebal_host");
1711 return $self->err_line("no_set_rebal", "cannot change rebalance policy while rebalance is running") if $rebal_host;
1713 # load policy object, test policy, set policy.
1714 my $rebal = MogileFS
::Rebalance
->new;
1716 $rebal->policy($args->{policy
});
1719 return $self->err_line("bad_rebal_pol", $@
);
1722 MogileFS
::Config
->set_server_setting('rebal_policy', $args->{policy
});
1723 MogileFS
::Config
->set_server_setting('rebal_state', undef);
1724 return $self->ok_line;
1728 my MogileFS
::Worker
::Query
$self = shift;
1731 if ($self->{querystarttime
}) {
1732 $delay = sprintf("%.4f ", Time
::HiRes
::tv_interval
( $self->{querystarttime
} ));
1733 $self->{querystarttime
} = undef;
1736 my $id = defined $self->{reqid
} ?
"$self->{reqid} " : '';
1738 my $args = shift || {};
1739 $args->{callid
} = $self->{callid
} if defined $self->{callid
};
1740 my $argline = join('&', map { eurl
($_) . "=" . eurl
($args->{$_}) } keys %$args);
1741 $self->send_to_parent("${id}${delay}OK $argline");
1745 # first argument: error code.
1746 # second argument: optional error text. text will be taken from code if no text provided.
1748 my MogileFS
::Worker
::Query
$self = shift;
1750 my $err_code = shift;
1751 my $err_text = shift || {
1752 'dup' => "Duplicate name/number used.",
1753 'after_mismatch' => "Pattern does not match the after-value?",
1754 'bad_params' => "Invalid parameters to command; please see documentation",
1755 'class_exists' => "That class already exists in that domain",
1756 'class_has_files' => "Class still has files, unable to delete",
1757 'class_not_found' => "Class not found",
1758 'db' => "Database error",
1759 'domain_has_files' => "Domain still has files, unable to delete",
1760 'domain_exists' => "That domain already exists",
1761 'domain_not_empty' => "Domain still has classes, unable to delete",
1762 'domain_not_found' => "Domain not found",
1763 'failure' => "Operation failed",
1764 'host_exists' => "That host already exists",
1765 'host_mismatch' => "The device specified doesn't belong to the host specified",
1766 'host_not_empty' => "Unable to delete host; it contains devices still",
1767 'host_not_found' => "Host not found",
1768 'invalid_checker_level' => "Checker level invalid. Please see documentation on this command.",
1769 'invalid_mindevcount' => "The mindevcount must be at least 1",
1770 'key_exists' => "Target key name already exists; can't overwrite.",
1771 'no_class' => "No class provided",
1772 'no_devices' => "No devices found to store file",
1773 'no_device' => "Device not found",
1774 'no_domain' => "No domain provided",
1775 'no_host' => "No host provided",
1776 'no_ip' => "IP required to create host",
1777 'no_port' => "Port required to create host",
1778 'no_temp_file' => "No tempfile or file already closed",
1779 'none_match' => "No keys match that pattern and after-value (if any).",
1780 'plugin_aborted' => "Action aborted by plugin",
1781 'state_too_high' => "Status cannot go from dead to alive; must use down",
1782 'unknown_command' => "Unknown server command",
1783 'unknown_host' => "Host not found",
1784 'unknown_state' => "Invalid/unknown state",
1785 'unreg_domain' => "Domain name invalid/not found",
1786 'rebal_not_started' => "Rebalance not running",
1787 'no_rebal_state' => "No available rebalance status",
1788 'no_rebal_policy' => "No rebalance policy available",
1789 'nodel_default_class' => "Cannot delete the default class",
1790 }->{$err_code} || $err_code;
1793 if ($self->{querystarttime
}) {
1794 $delay = sprintf("%.4f ", Time
::HiRes
::tv_interval
($self->{querystarttime
}));
1795 $self->{querystarttime
} = undef;
1797 # don't send another ERR line if we already sent one
1798 error
("err_line called redundantly with $err_code ( " . eurl
($err_text) . ")");
1802 my $id = defined $self->{reqid
} ?
"$self->{reqid} " : '';
1803 my $callid = defined $self->{callid
} ?
' ' . eurl
($self->{callid
}) : '';
1805 $self->send_to_parent("${id}${delay}ERR $err_code " . eurl
($err_text) . $callid);
1814 # indent-tabs-mode: nil
1821 MogileFS::Worker::Query -- implements the MogileFS client protocol