Checking in changes prior to tagging of version 2.73.
[MogileFS-Server.git] / lib / MogileFS / Worker / Query.pm
blobbde7f987f6ca0953f421d2d13c63489bff4d5acc
1 package MogileFS::Worker::Query;
2 # responds to queries from Mogile clients
4 use strict;
5 use warnings;
7 use base 'MogileFS::Worker';
8 use fields qw(querystarttime reqid callid);
9 use MogileFS::Util qw(error error_code first weighted_list
10 device_state eurl decode_url_args);
11 use MogileFS::HTTPFile;
12 use MogileFS::Rebalance;
13 use MogileFS::Config;
14 use MogileFS::Server;
16 sub new {
17 my ($class, $psock) = @_;
18 my $self = fields::new($class);
19 $self->SUPER::new($psock);
21 $self->{querystarttime} = undef;
22 $self->{reqid} = undef;
23 $self->{callid} = undef;
24 return $self;
27 # no query should take 30 seconds, and we check in every 5 seconds.
28 sub watchdog_timeout { 30 }
30 # called by plugins to register a command in the namespace
31 sub register_command {
32 my ($cmd, $sub) = @_;
34 # validate the command, then convert it to the actual thing the user
35 # will be calling
36 return 0 unless $cmd =~ /^[\w\d]+$/;
37 $cmd = "plugin_$cmd";
39 # register in namespace with 'cmd_' which we will automatically find
40 no strict 'refs';
41 *{"cmd_$cmd"} = $sub;
43 # all's well
44 return 1;
47 sub work {
48 my $self = shift;
49 my $psock = $self->{psock};
50 my $rin = '';
51 vec($rin, fileno($psock), 1) = 1;
52 my $buf;
54 while (1) {
55 my $rout;
56 unless (select($rout=$rin, undef, undef, 5.0)) {
57 $self->still_alive;
58 next;
61 my $newread;
62 my $rv = sysread($psock, $newread, Mgd::UNIX_RCVBUF_SIZE());
63 if (!$rv) {
64 if (defined $rv) {
65 die "While reading pipe from parent, got EOF. Parent's gone. Quitting.\n";
66 } else {
67 die "Error reading pipe from parent: $!\n";
70 $buf .= $newread;
72 while ($buf =~ s/^(.+?)\r?\n//) {
73 my $line = $1;
74 if ($self->process_generic_command(\$line)) {
75 $self->still_alive; # no-op for watchdog
76 } else {
77 $self->validate_dbh;
78 $self->process_line(\$line);
84 sub process_line {
85 my MogileFS::Worker::Query $self = shift;
86 my $lineref = shift;
88 # see what kind of command this is
89 return $self->err_line('unknown_command')
90 unless $$lineref =~ /^(\d+-\d+)?\s*(\S+)\s*(.*)/;
92 $self->{reqid} = $1 || undef;
93 my ($client_ip, $line) = ($2, $3);
95 # set global variables for zone determination
96 local $MogileFS::REQ_client_ip = $client_ip;
98 # Use as array here, otherwise we get a string which breaks usage of
99 # Time::HiRes::tv_interval further on.
100 $self->{querystarttime} = [ Time::HiRes::gettimeofday() ];
102 # fallback to normal command handling
103 if ($line =~ /^(\w+)\s*(.*)/) {
104 my ($cmd, $orig_args) = ($1, $2);
105 $cmd = lc($cmd);
107 no strict 'refs';
108 my $cmd_handler = *{"cmd_$cmd"}{CODE};
109 my $args = decode_url_args(\$orig_args);
110 $self->{callid} = $args->{callid};
111 if ($cmd_handler) {
112 local $MogileFS::REQ_altzone = ($args->{zone} && $args->{zone} eq 'alt');
113 eval {
114 $cmd_handler->($self, $args);
116 if ($@) {
117 my $errc = error_code($@);
118 if ($errc eq "dup") {
119 return $self->err_line("dup");
120 } else {
121 warn "Error: $@\n";
122 error("Error running command '$cmd': $@");
123 return $self->err_line("failure");
126 return;
130 return $self->err_line('unknown_command');
133 # this is a half-finished command. in particular, errors tend to
134 # crash the parent or child or something. it's a quick hack for a quick
135 # ops task that needs done. note in particular how it reaches across
136 # package boundaries into an API that the Replicator probably doesn't
137 # want exposed.
138 sub cmd_httpcopy {
139 my MogileFS::Worker::Query $self = shift;
140 my $args = shift;
141 my $sdevid = $args->{sdevid};
142 my $ddevid = $args->{ddevid};
143 my $fid = $args->{fid};
145 my $err;
146 my $rv = MogileFS::Worker::Replicate::http_copy(sdevid => $sdevid,
147 ddevid => $ddevid,
148 fid => $fid,
149 errref => \$err);
150 if ($rv) {
151 my $dfid = MogileFS::DevFID->new($ddevid, $fid);
152 $dfid->add_to_db
153 or return $self->err_line("copy_err", "failed to add link to database");
154 return $self->ok_line;
155 } else {
156 return $self->err_line("copy_err", $err);
160 # returns 0 on error, or dmid of domain
161 sub check_domain {
162 my MogileFS::Worker::Query $self = shift;
163 my $args = shift;
165 my $domain = $args->{domain};
167 return $self->err_line("no_domain") unless defined $domain && length $domain;
169 # validate domain
170 my $dmid = eval { Mgd::domain_factory()->get_by_name($domain)->id } or
171 return $self->err_line("unreg_domain");
173 return $dmid;
176 sub cmd_sleep {
177 my MogileFS::Worker::Query $self = shift;
178 my $args = shift;
179 sleep($args->{duration} || 10);
180 return $self->ok_line;
183 sub cmd_test {
184 my MogileFS::Worker::Query $self = shift;
185 my $args = shift;
186 die "Crashed on purpose" if $args->{crash};
187 return $self->ok_line;
190 sub cmd_clear_cache {
191 my MogileFS::Worker::Query $self = shift;
193 $self->forget_that_monitor_has_run;
194 $self->send_to_parent(":refresh_monitor");
195 $self->wait_for_monitor;
197 return $self->ok_line(@_);
200 sub cmd_create_open {
201 my MogileFS::Worker::Query $self = shift;
202 my $args = shift;
204 # has to be filled out for some plugins
205 $args->{dmid} = $self->check_domain($args) or return;
207 # first, pass this to a hook to do any manipulations needed
208 eval {MogileFS::run_global_hook('cmd_create_open', $args)};
210 return $self->err_line("plugin_aborted", "$@")
211 if $@;
213 # validate parameters
214 my $dmid = $args->{dmid};
215 my $key = $args->{key} || "";
216 my $multi = $args->{multi_dest} ? 1 : 0;
217 my $size = $args->{size} || undef; # Size is optional at create time,
218 # but used to grep devices if available
220 # optional profiling of stages, if $args->{debug_profile}
221 my @profpoints; # array of [point,hires-starttime]
222 my $profstart = sub {
223 my $pt = shift;
224 push @profpoints, [$pt, Time::HiRes::time()];
226 $profstart = sub {} unless $args->{debug_profile};
227 $profstart->("begin");
229 # we want it to be undef if not explicit, else force to numeric
230 my $exp_fidid = $args->{fid} ? int($args->{fid}) : undef;
232 # get DB handle
233 my $sto = Mgd::get_store();
235 # figure out what classid this file is for
236 my $class = $args->{class} || "";
237 my $classid = 0;
238 if (length($class)) {
239 $classid = eval { Mgd::class_factory()->get_by_name($dmid, $class)->id }
240 or return $self->err_line("unreg_class");
243 # if we haven't heard from the monitoring job yet, we need to chill a bit
244 # to prevent a race where we tell a user that we can't create a file when
245 # in fact we've just not heard from the monitor
246 $profstart->("wait_monitor");
247 $self->wait_for_monitor;
249 $profstart->("find_deviceid");
251 my @devices = Mgd::device_factory()->get_all;
252 if ($size) {
253 # We first ignore all the devices with an unknown space free.
254 @devices = grep { length($_->mb_free) && ($_->mb_free * 1024*1024) > $size } @devices;
256 # If we didn't find any, try all the devices with an unknown space free.
257 # This may happen if mogstored isn't running.
258 if (!@devices) {
259 @devices = grep { !length($_->mb_free) } Mgd::device_factory()->get_all;
263 unless (MogileFS::run_global_hook('cmd_create_open_order_devices', [ @devices ], \@devices)) {
264 @devices = sort_devs_by_freespace(@devices);
267 # find suitable device(s) to put this file on.
268 my @dests; # MogileFS::Device objects which are suitable
270 while (scalar(@dests) < ($multi ? 3 : 1)) {
271 my $ddev = shift @devices;
273 last unless $ddev;
274 next unless $ddev->not_on_hosts(map { $_->host } @dests);
276 push @dests, $ddev;
278 return $self->err_line("no_devices") unless @dests;
280 my $fidid = eval {
281 $sto->register_tempfile(
282 fid => $exp_fidid, # may be undef/NULL to mean auto-increment
283 dmid => $dmid,
284 key => $key,
285 classid => $classid,
286 devids => join(',', map { $_->id } @dests),
289 unless ($fidid) {
290 my $errc = error_code($@);
291 return $self->err_line("fid_in_use") if $errc eq "dup";
292 warn "Error registering tempfile: $@\n";
293 return $self->err_line("db");
296 # make sure directories exist for client to be able to PUT into
297 my %dir_done;
298 $profstart->("vivify_dir_on_all_devs");
300 my $t0 = Time::HiRes::time();
301 foreach my $dev (@dests) {
302 my $dfid = MogileFS::DevFID->new($dev, $fidid);
303 $dfid->vivify_directories(sub {
304 $dir_done{$dfid->devid} = Time::HiRes::time() - $t0;
308 # don't start the event loop if results are all cached
309 if (scalar keys %dir_done != scalar @dests) {
310 Danga::Socket->SetPostLoopCallback(sub { scalar keys %dir_done != scalar @dests });
311 Danga::Socket->EventLoop;
313 $profstart->("end");
315 # common reply variables
316 my $res = {
317 fid => $fidid,
320 # add profiling data
321 if (@profpoints) {
322 $res->{profpoints} = 0;
323 for (my $i=0; $i<$#profpoints; $i++) {
324 my $ptnum = ++$res->{profpoints};
325 $res->{"prof_${ptnum}_name"} = $profpoints[$i]->[0];
326 $res->{"prof_${ptnum}_time"} =
327 sprintf("%0.03f",
328 $profpoints[$i+1]->[1] - $profpoints[$i]->[1]);
330 while (my ($devid, $time) = each %dir_done) {
331 my $ptnum = ++$res->{profpoints};
332 $res->{"prof_${ptnum}_name"} = "vivify_dir_on_dev$devid";
333 $res->{"prof_${ptnum}_time"} = sprintf("%0.03f", $time);
337 # add path info
338 if ($multi) {
339 my $ct = 0;
340 foreach my $dev (@dests) {
341 $ct++;
342 $res->{"devid_$ct"} = $dev->id;
343 $res->{"path_$ct"} = MogileFS::DevFID->new($dev, $fidid)->url;
345 $res->{dev_count} = $ct;
346 } else {
347 $res->{devid} = $dests[0]->id;
348 $res->{path} = MogileFS::DevFID->new($dests[0], $fidid)->url;
351 return $self->ok_line($res);
354 sub sort_devs_by_freespace {
355 my @devices_with_weights = map {
356 [$_, 100 * $_->percent_free]
357 } sort {
358 $b->percent_free <=> $a->percent_free;
359 } grep {
360 $_->should_get_new_files;
361 } @_;
363 my @list =
364 MogileFS::Util::weighted_list(splice(@devices_with_weights, 0, 20));
366 return @list;
369 sub valid_key {
370 my ($key) = @_;
372 return defined($key) && length($key);
375 sub cmd_create_close {
376 my MogileFS::Worker::Query $self = shift;
377 my $args = shift;
379 # has to be filled out for some plugins
380 $args->{dmid} = $self->check_domain($args) or return;
382 # call out to a hook that might modify the arguments for us
383 MogileFS::run_global_hook('cmd_create_close', $args);
385 # late validation of parameters
386 my $dmid = $args->{dmid};
387 my $key = $args->{key};
388 my $fidid = $args->{fid} or return $self->err_line("no_fid");
389 my $devid = $args->{devid} or return $self->err_line("no_devid");
390 my $path = $args->{path} or return $self->err_line("no_path");
391 my $checksum = $args->{checksum};
393 if ($checksum) {
394 $checksum = eval { MogileFS::Checksum->from_string($fidid, $checksum) };
395 return $self->err_line("invalid_checksum_format") if $@;
398 my $fid = MogileFS::FID->new($fidid);
399 my $dfid = MogileFS::DevFID->new($devid, $fid);
401 # is the provided path what we'd expect for this fid/devid?
402 return $self->err_line("bogus_args")
403 unless $path eq $dfid->url;
405 my $sto = Mgd::get_store();
407 # find the temp file we're closing and making real. If another worker
408 # already has it, bail out---the client closed it twice.
409 # this is racy, but the only expected use case is a client retrying.
410 # should still be fixed better once more scalable locking is available.
411 my $trow = $sto->delete_and_return_tempfile_row($fidid) or
412 return $self->err_line("no_temp_file");
414 # Protect against leaving orphaned uploads.
415 my $failed = sub {
416 $dfid->add_to_db;
417 $fid->delete;
420 unless ($trow->{devids} =~ m/\b$devid\b/) {
421 $failed->();
422 return $self->err_line("invalid_destdev", "File uploaded to invalid dest $devid. Valid devices were: " . $trow->{devids});
425 # if a temp file is closed without a provided-key, that means to
426 # delete it.
427 unless (valid_key($key)) {
428 $failed->();
429 return $self->ok_line;
432 # get size of file and verify that it matches what we were given, if anything
433 my $httpfile = MogileFS::HTTPFile->at($path);
434 my $size = $httpfile->size;
436 # size check is optional? Needs to support zero byte files.
437 $args->{size} = -1 unless $args->{size};
438 if (!defined($size) || $size == MogileFS::HTTPFile::FILE_MISSING) {
439 # storage node is unreachable or the file is missing
440 my $type = defined $size ? "missing" : "cantreach";
441 my $lasterr = MogileFS::Util::last_error();
442 $failed->();
443 return $self->err_line("size_verify_error", "Expected: $args->{size}; actual: 0 ($type); path: $path; error: $lasterr")
446 if ($args->{size} > -1 && ($args->{size} != $size)) {
447 $failed->();
448 return $self->err_line("size_mismatch", "Expected: $args->{size}; actual: $size; path: $path")
451 # checksum validation is optional as it can be very expensive
452 # However, we /always/ verify it if the client wants us to, even
453 # if the class does not enforce or store it.
454 if ($checksum && $args->{checksumverify}) {
455 my $alg = $checksum->hashname;
456 my $actual = $httpfile->digest($alg, sub { $self->still_alive });
457 if ($actual ne $checksum->{checksum}) {
458 $failed->();
459 $actual = "$alg:" . unpack("H*", $actual);
460 return $self->err_line("checksum_mismatch",
461 "Expected: $checksum; actual: $actual; path: $path");
465 # see if we have a fid for this key already
466 my $old_fid = MogileFS::FID->new_from_dmid_and_key($dmid, $key);
467 if ($old_fid) {
468 # Fail if a file already exists for this fid. Should never
469 # happen, as it should not be possible to close a file twice.
470 return $self->err_line("fid_exists")
471 unless $old_fid->{fidid} != $fidid;
473 $old_fid->delete;
476 # TODO: check for EIO?
478 # insert file_on row
479 $dfid->add_to_db;
481 $checksum->maybe_save($dmid, $trow->{classid}) if $checksum;
483 $sto->replace_into_file(
484 fidid => $fidid,
485 dmid => $dmid,
486 key => $key,
487 length => $size,
488 classid => $trow->{classid},
489 devcount => 1,
492 # mark it as needing replicating:
493 $fid->enqueue_for_replication();
495 # call the hook - if this fails, we need to back the file out
496 my $rv = MogileFS::run_global_hook('file_stored', $args);
497 if (defined $rv && ! $rv) { # undef = no hooks, 1 = success, 0 = failure
498 $fid->delete;
499 return $self->err_line("plugin_aborted");
502 # all went well, we would've hit condthrow on DB errors
503 return $self->ok_line;
506 sub cmd_updateclass {
507 my MogileFS::Worker::Query $self = shift;
508 my $args = shift;
510 $args->{dmid} = $self->check_domain($args) or return;
512 # call out to a hook that might modify the arguments for us, abort if it tells us to
513 my $rv = MogileFS::run_global_hook('cmd_updateclass', $args);
514 return $self->err_line('plugin_aborted') if defined $rv && ! $rv;
516 my $dmid = $args->{dmid};
517 my $key = $args->{key};
518 valid_key($key) or return $self->err_line("no_key");
519 my $class = $args->{class} or return $self->err_line("no_class");
521 my $classobj = Mgd::class_factory()->get_by_name($dmid, $class)
522 or return $self->err_line('class_not_found');
523 my $classid = $classobj->id;
525 my $fid = MogileFS::FID->new_from_dmid_and_key($dmid, $key)
526 or return $self->err_line('invalid_key');
528 my @devids = $fid->devids;
529 return $self->err_line("no_devices") unless @devids;
531 if ($fid->classid != $classid) {
532 $fid->update_class(classid => $classid);
533 $fid->enqueue_for_replication();
536 return $self->ok_line;
539 sub cmd_delete {
540 my MogileFS::Worker::Query $self = shift;
541 my $args = shift;
543 # validate domain for plugins
544 $args->{dmid} = $self->check_domain($args) or return;
546 # now invoke the plugin, abort if it tells us to
547 my $rv = MogileFS::run_global_hook('cmd_delete', $args);
548 return $self->err_line('plugin_aborted')
549 if defined $rv && ! $rv;
551 # validate parameters
552 my $dmid = $args->{dmid};
553 my $key = $args->{key};
555 valid_key($key) or return $self->err_line("no_key");
557 # is this fid still owned by this key?
558 my $fid = MogileFS::FID->new_from_dmid_and_key($dmid, $key)
559 or return $self->err_line("unknown_key");
561 $fid->delete;
563 return $self->ok_line;
566 # Takes either domain/dkey or fid and tries to return as much as possible.
567 sub cmd_file_debug {
568 my MogileFS::Worker::Query $self = shift;
569 my $args = shift;
570 # Talk to the master since this is "debug mode"
571 my $sto = Mgd::get_store();
572 my $ret = {};
574 # If a FID is provided, just use that.
575 my $fid;
576 my $fidid;
577 if ($args->{fid}) {
578 $fidid = $args->{fid}+0;
579 # It's not fatal if we don't find the row here.
580 $fid = $sto->file_row_from_fidid($args->{fid}+0);
581 } else {
582 # If not, require dmid/dkey and pick up the fid from there.
583 $args->{dmid} = $self->check_domain($args) or return;
584 return $self->err_line("no_key") unless valid_key($args->{key});
586 # now invoke the plugin, abort if it tells us to
587 my $rv = MogileFS::run_global_hook('cmd_file_debug', $args);
588 return $self->err_line('plugin_aborted')
589 if defined $rv && ! $rv;
591 $fid = $sto->file_row_from_dmid_key($args->{dmid}, $args->{key});
592 return $self->err_line("unknown_key") unless $fid;
593 $fidid = $fid->{fid};
596 if ($fid) {
597 $fid->{domain} = Mgd::domain_factory()->get_by_id($fid->{dmid})->name;
598 $fid->{class} = Mgd::class_factory()->get_by_id($fid->{dmid},
599 $fid->{classid})->name;
602 # Fetch all of the queue data.
603 my $tfile = $sto->tempfile_row_from_fid($fidid);
604 my $repl = $sto->find_fid_from_file_to_replicate($fidid);
605 my $del = $sto->find_fid_from_file_to_delete2($fidid);
606 my $reb = $sto->find_fid_from_file_to_queue($fidid, REBAL_QUEUE);
607 my $fsck = $sto->find_fid_from_file_to_queue($fidid, FSCK_QUEUE);
609 # Fetch file_on rows, and turn into paths.
610 my @devids = $sto->fid_devids($fidid);
611 for my $devid (@devids) {
612 # Won't matter if we can't make the path (dev is dead/deleted/etc)
613 eval {
614 my $dfid = MogileFS::DevFID->new($devid, $fidid);
615 my $path = $dfid->get_url;
616 $ret->{'devpath_' . $devid} = $path;
619 $ret->{devids} = join(',', @devids) if @devids;
621 # Always look for a checksum
622 my $checksum = Mgd::get_store()->get_checksum($fidid);
623 if ($checksum) {
624 $checksum = MogileFS::Checksum->new($checksum);
625 $ret->{checksum} = $checksum->info;
626 } else {
627 $ret->{checksum} = 'NONE';
630 # Return file row (if found) and all other data.
631 my %toret = (fid => $fid, tempfile => $tfile, replqueue => $repl,
632 delqueue => $del, rebqueue => $reb, fsckqueue => $fsck);
633 while (my ($key, $hash) = each %toret) {
634 while (my ($name, $val) = each %$hash) {
635 $ret->{$key . '_' . $name} = $val;
639 return $self->err_line("unknown_fid") unless keys %$ret;
640 return $self->ok_line($ret);
643 sub cmd_file_info {
644 my MogileFS::Worker::Query $self = shift;
645 my $args = shift;
647 # validate domain for plugins
648 $args->{dmid} = $self->check_domain($args) or return;
650 # now invoke the plugin, abort if it tells us to
651 my $rv = MogileFS::run_global_hook('cmd_file_info', $args);
652 return $self->err_line('plugin_aborted')
653 if defined $rv && ! $rv;
655 # validate parameters
656 my $dmid = $args->{dmid};
657 my $key = $args->{key};
659 valid_key($key) or return $self->err_line("no_key");
661 my $fid;
662 Mgd::get_store()->slaves_ok(sub {
663 $fid = MogileFS::FID->new_from_dmid_and_key($dmid, $key);
665 $fid or return $self->err_line("unknown_key");
667 my $ret = {};
668 $ret->{fid} = $fid->id;
669 $ret->{domain} = Mgd::domain_factory()->get_by_id($fid->dmid)->name;
670 my $class = Mgd::class_factory()->get_by_id($fid->dmid, $fid->classid);
671 $ret->{class} = $class->name;
672 if ($class->{hashtype}) {
673 my $checksum = Mgd::get_store()->get_checksum($fid->id);
674 if ($checksum) {
675 $checksum = MogileFS::Checksum->new($checksum);
676 $ret->{checksum} = $checksum->info;
677 } else {
678 $ret->{checksum} = "MISSING";
681 $ret->{key} = $key;
682 $ret->{'length'} = $fid->length;
683 $ret->{devcount} = $fid->devcount;
684 # Only if requested, also return the raw devids.
685 # Caller should use get_paths if they intend to fetch the file.
686 if ($args->{devices}) {
687 $ret->{devids} = join(',', $fid->devids);
690 return $self->ok_line($ret);
693 sub cmd_list_fids {
694 my MogileFS::Worker::Query $self = shift;
695 my $args = shift;
697 # validate parameters
698 my $fromfid = ($args->{from} || 0)+0;
699 my $count = ($args->{to} || 0)+0;
700 $count ||= 100;
701 $count = 500 if $count > 500 || $count < 0;
703 my $rows = Mgd::get_store()->file_row_from_fidid_range($fromfid, $count);
704 return $self->err_line('failure') unless $rows;
705 return $self->ok_line({ fid_count => 0 }) unless @$rows;
707 # setup temporary storage of class/host
708 my (%domains, %classes);
710 # now iterate over our data rows and construct result
711 my $ct = 0;
712 my $ret = {};
713 foreach my $r (@$rows) {
714 $ct++;
715 my $fid = $r->{fid};
716 $ret->{"fid_${ct}_fid"} = $fid;
717 $ret->{"fid_${ct}_domain"} = ($domains{$r->{dmid}} ||=
718 Mgd::domain_factory()->get_by_id($r->{dmid})->name);
719 $ret->{"fid_${ct}_class"} = ($classes{$r->{dmid}}{$r->{classid}} ||=
720 Mgd::class_factory()->get_by_id($r->{dmid}, $r->{classid})->name);
721 $ret->{"fid_${ct}_key"} = $r->{dkey};
722 $ret->{"fid_${ct}_length"} = $r->{length};
723 $ret->{"fid_${ct}_devcount"} = $r->{devcount};
725 $ret->{fid_count} = $ct;
726 return $self->ok_line($ret);
729 sub cmd_list_keys {
730 my MogileFS::Worker::Query $self = shift;
731 my $args = shift;
733 # validate parameters
734 my $dmid = $self->check_domain($args) or return;
735 my ($prefix, $after, $limit) = ($args->{prefix}, $args->{after}, $args->{limit});
737 if (defined $prefix and $prefix ne '') {
738 # now validate that after matches prefix
739 return $self->err_line('after_mismatch')
740 if $after && $after !~ /^$prefix/;
743 $limit ||= 1000;
744 $limit += 0;
745 $limit = 1000 if $limit > 1000;
747 my $keys = Mgd::get_store()->get_keys_like($dmid, $prefix, $after, $limit);
749 # if we got nothing, say so
750 return $self->err_line('none_match') unless $keys && @$keys;
752 # construct the output and send
753 my $ret = { key_count => 0, next_after => '' };
754 foreach my $key (@$keys) {
755 $ret->{key_count}++;
756 $ret->{next_after} = $key
757 if $key gt $ret->{next_after};
758 $ret->{"key_$ret->{key_count}"} = $key;
760 return $self->ok_line($ret);
763 sub cmd_rename {
764 my MogileFS::Worker::Query $self = shift;
765 my $args = shift;
767 # validate parameters
768 my $dmid = $self->check_domain($args) or return;
769 my ($fkey, $tkey) = ($args->{from_key}, $args->{to_key});
770 unless (valid_key($fkey) && valid_key($tkey)) {
771 return $self->err_line("no_key");
774 my $fid = MogileFS::FID->new_from_dmid_and_key($dmid, $fkey)
775 or return $self->err_line("unknown_key");
777 $fid->rename($tkey) or
778 $self->err_line("key_exists");
780 return $self->ok_line;
783 sub cmd_get_hosts {
784 my MogileFS::Worker::Query $self = shift;
785 my $args = shift;
787 my $ret = { hosts => 0 };
788 for my $host (Mgd::host_factory()->get_all) {
789 next if defined $args->{hostid} && $host->id != $args->{hostid};
790 my $n = ++$ret->{hosts};
791 my $fields = $host->fields(qw(hostid status hostname hostip http_port
792 http_get_port altip altmask));
793 while (my ($key, $val) = each %$fields) {
794 # must be regular data so copy it in
795 $ret->{"host${n}_$key"} = $val;
799 return $self->ok_line($ret);
802 sub cmd_get_devices {
803 my MogileFS::Worker::Query $self = shift;
804 my $args = shift;
806 my $ret = { devices => 0 };
807 for my $dev (Mgd::device_factory()->get_all) {
808 next if defined $args->{devid} && $dev->id != $args->{devid};
809 my $n = ++$ret->{devices};
811 my $sum = $dev->fields;
812 while (my ($key, $val) = each %$sum) {
813 $ret->{"dev${n}_$key"} = $val;
817 return $self->ok_line($ret);
820 sub cmd_create_device {
821 my MogileFS::Worker::Query $self = shift;
822 my $args = shift;
824 my $status = $args->{state} || "alive";
825 return $self->err_line("invalid_state") unless
826 device_state($status);
828 my $devid = $args->{devid};
829 return $self->err_line("invalid_devid") unless $devid && $devid =~ /^\d+$/;
831 my $hostid;
833 my $sto = Mgd::get_store();
834 if ($args->{hostid} && $args->{hostid} =~ /^\d+$/) {
835 $hostid = $sto->get_hostid_by_id($args->{hostid});
836 return $self->err_line("unknown_hostid") unless $hostid;
837 } elsif (my $hname = $args->{hostname}) {
838 $hostid = $sto->get_hostid_by_name($hname);
839 return $self->err_line("unknown_host") unless $hostid;
840 } else {
841 return $self->err_line("bad_args", "No hostid/hostname parameter");
844 if (eval { $sto->create_device($devid, $hostid, $status) }) {
845 return $self->cmd_clear_cache;
848 my $errc = error_code($@);
849 return $self->err_line("existing_devid") if $errc;
850 die $@; # rethrow;
853 sub cmd_create_domain {
854 my MogileFS::Worker::Query $self = shift;
855 my $args = shift;
857 my $domain = $args->{domain} or
858 return $self->err_line('no_domain');
860 my $dom = eval { Mgd::get_store()->create_domain($domain); };
861 if ($@) {
862 if (error_code($@) eq "dup") {
863 return $self->err_line('domain_exists');
865 return $self->err_line('failure', "$@");
868 return $self->cmd_clear_cache({ domain => $domain });
871 sub cmd_delete_domain {
872 my MogileFS::Worker::Query $self = shift;
873 my $args = shift;
875 my $domain = $args->{domain} or
876 return $self->err_line('no_domain');
878 my $sto = Mgd::get_store();
879 my $dmid = $sto->get_domainid_by_name($domain) or
880 return $self->err_line('domain_not_found');
882 if (eval { $sto->delete_domain($dmid) }) {
883 return $self->cmd_clear_cache({ domain => $domain });
886 my $err = error_code($@);
887 return $self->err_line('domain_has_files') if $err eq "has_files";
888 return $self->err_line('domain_has_classes') if $err eq "has_classes";
889 return $self->err_line("failure");
892 sub cmd_create_class {
893 my MogileFS::Worker::Query $self = shift;
894 my $args = shift;
896 my $domain = $args->{domain};
897 return $self->err_line('no_domain') unless length $domain;
899 my $class = $args->{class};
900 return $self->err_line('no_class') unless length $class;
902 my $mindevcount = $args->{mindevcount}+0;
903 return $self->err_line('invalid_mindevcount') unless $mindevcount > 0;
905 my $replpolicy = $args->{replpolicy} || '';
906 if ($replpolicy) {
907 eval {
908 MogileFS::ReplicationPolicy->new_from_policy_string($replpolicy);
910 return $self->err_line('invalid_replpolicy', $@) if $@;
913 my $hashtype = $args->{hashtype};
914 if ($hashtype && $hashtype ne 'NONE') {
915 my $tmp = $MogileFS::Checksum::NAME2TYPE{$hashtype};
916 return $self->err_line('invalid_hashtype') unless $tmp;
917 $hashtype = $tmp;
920 my $sto = Mgd::get_store();
921 my $dmid = $sto->get_domainid_by_name($domain) or
922 return $self->err_line('domain_not_found');
924 my $clsid = $sto->get_classid_by_name($dmid, $class);
925 if (!defined $clsid && $args->{update} && $class eq 'default') {
926 $args->{update} = 0;
928 if ($args->{update}) {
929 return $self->err_line('class_not_found') if ! defined $clsid;
930 $sto->update_class_name(dmid => $dmid, classid => $clsid,
931 classname => $class);
932 } else {
933 $clsid = eval { $sto->create_class($dmid, $class); };
934 if ($@) {
935 if (error_code($@) eq "dup") {
936 return $self->err_line('class_exists');
938 return $self->err_line('failure', "$@");
941 $sto->update_class_mindevcount(dmid => $dmid, classid => $clsid,
942 mindevcount => $mindevcount);
943 # don't erase an existing replpolicy if we're not setting a new one.
944 $sto->update_class_replpolicy(dmid => $dmid, classid => $clsid,
945 replpolicy => $replpolicy) if $replpolicy;
946 if ($hashtype) {
947 $sto->update_class_hashtype(dmid => $dmid, classid => $clsid,
948 hashtype => $hashtype eq 'NONE' ? undef : $hashtype);
951 # return success
952 return $self->cmd_clear_cache({ class => $class, mindevcount => $mindevcount, domain => $domain });
955 sub cmd_update_class {
956 my MogileFS::Worker::Query $self = shift;
957 my $args = shift;
959 # simply passes through to create_class with update set
960 $self->cmd_create_class({ %$args, update => 1 });
963 sub cmd_delete_class {
964 my MogileFS::Worker::Query $self = shift;
965 my $args = shift;
967 my $domain = $args->{domain};
968 return $self->err_line('no_domain') unless length $domain;
969 my $class = $args->{class};
970 return $self->err_line('no_class') unless length $domain;
972 return $self->err_line('nodel_default_class') if $class eq 'default';
974 my $sto = Mgd::get_store();
975 my $dmid = $sto->get_domainid_by_name($domain) or
976 return $self->err_line('domain_not_found');
977 my $clsid = $sto->get_classid_by_name($dmid, $class);
978 return $self->err_line('class_not_found') unless defined $clsid;
980 if (eval { Mgd::get_store()->delete_class($dmid, $clsid) }) {
981 return $self->cmd_clear_cache({ domain => $domain, class => $class });
984 my $errc = error_code($@);
985 return $self->err_line('class_has_files') if $errc eq "has_files";
986 return $self->err_line('failure');
989 sub cmd_create_host {
990 my MogileFS::Worker::Query $self = shift;
991 my $args = shift;
993 my $hostname = $args->{host} or
994 return $self->err_line('no_host');
996 my $sto = Mgd::get_store();
997 my $hostid = $sto->get_hostid_by_name($hostname);
999 # if we're creating a new host, require ip/port, and default to
1000 # host being down if client didn't specify
1001 if ($args->{update}) {
1002 return $self->err_line('host_not_found') unless $hostid;
1003 } else {
1004 return $self->err_line('host_exists') if $hostid;
1005 return $self->err_line('no_ip') unless $args->{ip};
1006 return $self->err_line('no_port') unless $args->{port};
1007 $args->{status} ||= 'down';
1010 if ($args->{status}) {
1011 return $self->err_line('unknown_state')
1012 unless MogileFS::Host->valid_state($args->{status});
1015 # arguments all good, let's do it.
1017 $hostid ||= $sto->create_host($hostname, $args->{ip});
1019 # Protocol mismatch data fixup.
1020 $args->{hostip} = delete $args->{ip} if exists $args->{ip};
1021 $args->{http_port} = delete $args->{port} if exists $args->{port};
1022 $args->{http_get_port} = delete $args->{getport} if exists $args->{getport};
1023 my @toupdate = grep { exists $args->{$_} } qw(status hostip http_port
1024 http_get_port altip altmask);
1025 $sto->update_host($hostid, { map { $_ => $args->{$_} } @toupdate });
1027 # return success
1028 return $self->cmd_clear_cache({ hostid => $hostid, hostname => $hostname });
1031 sub cmd_update_host {
1032 my MogileFS::Worker::Query $self = shift;
1033 my $args = shift;
1035 # simply passes through to create_host with update set
1036 $self->cmd_create_host({ %$args, update => 1 });
1039 sub cmd_delete_host {
1040 my MogileFS::Worker::Query $self = shift;
1041 my $args = shift;
1043 my $sto = Mgd::get_store();
1044 my $hostid = $sto->get_hostid_by_name($args->{host})
1045 or return $self->err_line('unknown_host');
1047 # TODO: $sto->delete_host should have a "has_devices" test internally
1048 for my $dev ($sto->get_all_devices) {
1049 return $self->err_line('host_not_empty')
1050 if $dev->{hostid} == $hostid;
1053 $sto->delete_host($hostid);
1055 return $self->cmd_clear_cache;
1058 sub cmd_get_domains {
1059 my MogileFS::Worker::Query $self = shift;
1060 my $args = shift;
1062 my $ret = {};
1063 my $dm_n = 0;
1064 for my $dom (Mgd::domain_factory()->get_all) {
1065 $dm_n++;
1066 $ret->{"domain${dm_n}"} = $dom->name;
1067 my $cl_n = 0;
1068 foreach my $cl ($dom->classes) {
1069 $cl_n++;
1070 $ret->{"domain${dm_n}class${cl_n}name"} = $cl->name;
1071 $ret->{"domain${dm_n}class${cl_n}mindevcount"} = $cl->mindevcount;
1072 $ret->{"domain${dm_n}class${cl_n}replpolicy"} =
1073 $cl->repl_policy_string;
1074 $ret->{"domain${dm_n}class${cl_n}hashtype"} = $cl->hashtype_string;
1076 $ret->{"domain${dm_n}classes"} = $cl_n;
1078 $ret->{"domains"} = $dm_n;
1080 return $self->ok_line($ret);
1083 sub cmd_get_paths {
1084 my MogileFS::Worker::Query $self = shift;
1085 my $args = shift;
1087 # memcache mappings are as follows:
1088 # mogfid:<dmid>:<dkey> -> fidid
1089 # mogdevids:<fidid> -> \@devids (and TODO: invalidate when deletion is run!)
1091 # if you specify 'noverify', that means a correct answer isn't needed and memcache can
1092 # be used.
1093 my $memc = MogileFS::Config->memcache_client;
1094 my $get_from_memc = $memc && $args->{noverify};
1095 my $memcache_ttl = MogileFS::Config->server_setting_cached("memcache_ttl") || 3600;
1097 # validate domain for plugins
1098 $args->{dmid} = $self->check_domain($args) or return;
1100 # now invoke the plugin, abort if it tells us to
1101 my $rv = MogileFS::run_global_hook('cmd_get_paths', $args);
1102 return $self->err_line('plugin_aborted')
1103 if defined $rv && ! $rv;
1105 # validate parameters
1106 my $dmid = $args->{dmid};
1107 my $key = $args->{key};
1109 valid_key($key) or return $self->err_line("no_key");
1111 # We default to returning two possible paths.
1112 # but the client may ask for more if they want.
1113 my $pathcount = $args->{pathcount} || 2;
1114 $pathcount = 2 if $pathcount < 2;
1116 # get DB handle
1117 my $fid;
1118 my $need_fid_in_memcache = 0;
1119 my $mogfid_memkey = "mogfid:$args->{dmid}:$key";
1120 if ($get_from_memc) {
1121 if (my $fidid = $memc->get($mogfid_memkey)) {
1122 $fid = MogileFS::FID->new($fidid);
1123 } else {
1124 $need_fid_in_memcache = 1;
1127 unless ($fid) {
1128 Mgd::get_store()->slaves_ok(sub {
1129 $fid = MogileFS::FID->new_from_dmid_and_key($dmid, $key);
1131 $fid or return $self->err_line("unknown_key");
1134 # add to memcache, if needed. for an hour.
1135 $memc->set($mogfid_memkey, $fid->id, $memcache_ttl ) if $need_fid_in_memcache || ($memc && !$get_from_memc);
1137 my $dmap = Mgd::device_factory()->map_by_id;
1139 my $ret = {
1140 paths => 0,
1143 # find devids that FID is on in memcache or db.
1144 my @fid_devids;
1145 my $need_devids_in_memcache = 0;
1146 my $devid_memkey = "mogdevids:" . $fid->id;
1147 if ($get_from_memc) {
1148 if (my $list = $memc->get($devid_memkey)) {
1149 @fid_devids = @$list;
1150 } else {
1151 $need_devids_in_memcache = 1;
1154 unless (@fid_devids) {
1155 Mgd::get_store()->slaves_ok(sub {
1156 @fid_devids = $fid->devids;
1158 $memc->set($devid_memkey, \@fid_devids, $memcache_ttl ) if $need_devids_in_memcache || ($memc && !$get_from_memc);
1161 my @devices = map { $dmap->{$_} } @fid_devids;
1163 my @sorted_devs;
1164 unless (MogileFS::run_global_hook('cmd_get_paths_order_devices', \@devices, \@sorted_devs)) {
1165 @sorted_devs = sort_devs_by_utilization(@devices);
1168 # keep one partially-bogus path around just in case we have nothing else to send.
1169 my $backup_path;
1171 # files on devices set for drain may disappear soon.
1172 my @drain_paths;
1174 # construct result paths
1175 foreach my $dev (@sorted_devs) {
1176 next unless $dev && $dev->host;
1178 my $dfid = MogileFS::DevFID->new($dev, $fid);
1179 my $path = $dfid->get_url;
1180 my $currently_up = $dev->should_read_from;
1182 if (! $currently_up) {
1183 $backup_path = $path;
1184 next;
1187 # only verify size one first one, and never verify if they've asked not to
1188 next unless
1189 $ret->{paths} ||
1190 $args->{noverify} ||
1191 $dfid->size_matches;
1193 if ($dev->dstate->should_drain) {
1194 push @drain_paths, $path;
1195 next;
1198 my $n = ++$ret->{paths};
1199 $ret->{"path$n"} = $path;
1200 last if $n == $pathcount; # one verified, one likely seems enough for now. time will tell.
1203 # deprioritize devices set for drain, they could disappear soon...
1204 # Clients /should/ try to use lower-numbered paths first to avoid this.
1205 if ($ret->{paths} < $pathcount && @drain_paths) {
1206 foreach my $path (@drain_paths) {
1207 my $n = ++$ret->{paths};
1208 $ret->{"path$n"} = $path;
1209 last if $n == $pathcount;
1213 # use our backup path if all else fails
1214 if ($backup_path && ! $ret->{paths}) {
1215 $ret->{paths} = 1;
1216 $ret->{path1} = $backup_path;
1219 return $self->ok_line($ret);
1222 sub sort_devs_by_utilization {
1223 my @devices_with_weights;
1225 # is this fid still owned by this key?
1226 foreach my $dev (@_) {
1227 my $weight;
1228 my $util = $dev->observed_utilization;
1230 if (defined($util) and $util =~ /\A\d+\Z/) {
1231 $weight = 102 - $util;
1232 $weight ||= 100;
1233 } else {
1234 $weight = $dev->weight;
1235 $weight ||= 100;
1237 push @devices_with_weights, [$dev, $weight];
1240 # randomly weight the devices
1241 my @list = MogileFS::Util::weighted_list(@devices_with_weights);
1243 return @list;
1246 # ------------------------------------------------------------
1248 # NOTE: cmd_edit_file is EXPERIMENTAL. Please see the documentation
1249 # for edit_file in L<MogileFS::Client>.
1250 # It is not recommended to use cmd_edit_file on production systems.
1252 # cmd_edit_file is similar to cmd_get_paths, except we:
1253 # - take the device of the first path we would have returned
1254 # - get a tempfile with a new fid (pointing to nothing) on the same device
1255 # the tempfile has the same key, so will replace the old contents on
1256 # create_close
1257 # - detach the old fid from that device (leaving the file in place)
1258 # - attach the new fid to that device
1259 # - returns only the first path to the old fid and a path to new fid
1260 # (the client then DAV-renames the old path to the new path)
1262 # TODO - what to do about situations where we would be reducing the
1263 # replica count to zero?
1264 # TODO - what to do about pending replications where we remove the source?
1265 # TODO - the current implementation of cmd_edit_file is based on a copy
1266 # of cmd_get_paths. Once proven mature, consider factoring out common
1267 # code from the two functions.
1268 # ------------------------------------------------------------
1269 sub cmd_edit_file {
1270 my MogileFS::Worker::Query $self = shift;
1271 my $args = shift;
1273 my $memc = MogileFS::Config->memcache_client;
1275 # validate domain for plugins
1276 $args->{dmid} = $self->check_domain($args) or return;
1278 # now invoke the plugin, abort if it tells us to
1279 my $rv = MogileFS::run_global_hook('cmd_get_paths', $args);
1280 return $self->err_line('plugin_aborted')
1281 if defined $rv && ! $rv;
1283 # validate parameters
1284 my $dmid = $args->{dmid};
1285 my $key = $args->{key};
1287 valid_key($key) or return $self->err_line("no_key");
1289 # get DB handle
1290 my $fid;
1291 my $need_fid_in_memcache = 0;
1292 my $mogfid_memkey = "mogfid:$args->{dmid}:$key";
1293 if (my $fidid = $memc->get($mogfid_memkey)) {
1294 $fid = MogileFS::FID->new($fidid);
1295 } else {
1296 $need_fid_in_memcache = 1;
1298 unless ($fid) {
1299 Mgd::get_store()->slaves_ok(sub {
1300 $fid = MogileFS::FID->new_from_dmid_and_key($dmid, $key);
1302 $fid or return $self->err_line("unknown_key");
1305 # add to memcache, if needed. for an hour.
1306 $memc->add($mogfid_memkey, $fid->id, 3600) if $need_fid_in_memcache;
1308 my $dmap = Mgd::device_factory()->map_by_id;
1310 my @devices_with_weights;
1312 # find devids that FID is on in memcache or db.
1313 my @fid_devids;
1314 my $need_devids_in_memcache = 0;
1315 my $devid_memkey = "mogdevids:" . $fid->id;
1316 if (my $list = $memc->get($devid_memkey)) {
1317 @fid_devids = @$list;
1318 } else {
1319 $need_devids_in_memcache = 1;
1321 unless (@fid_devids) {
1322 Mgd::get_store()->slaves_ok(sub {
1323 @fid_devids = $fid->devids;
1325 $memc->add($devid_memkey, \@fid_devids, 3600) if $need_devids_in_memcache;
1328 # is this fid still owned by this key?
1329 foreach my $devid (@fid_devids) {
1330 my $weight;
1331 my $dev = $dmap->{$devid};
1332 my $util = $dev->observed_utilization;
1334 if (defined($util) and $util =~ /\A\d+\Z/) {
1335 $weight = 102 - $util;
1336 $weight ||= 100;
1337 } else {
1338 $weight = $dev->weight;
1339 $weight ||= 100;
1341 push @devices_with_weights, [$devid, $weight];
1344 # randomly weight the devices
1345 # TODO - should we reverse the order, to leave the best
1346 # one there for get_paths?
1347 my @list = MogileFS::Util::weighted_list(@devices_with_weights);
1349 # Filter out bad devs
1350 @list = grep {
1351 my $devid = $_;
1352 my $dev = $dmap->{$devid};
1354 $dev && $dev->should_read_from;
1355 } @list;
1357 # Take first remaining device from list
1358 my $devid = $list[0];
1360 my $classid = $fid->classid;
1361 my $newfid = eval {
1362 Mgd::get_store()->register_tempfile(
1363 fid => undef, # undef => let the store pick a fid
1364 dmid => $dmid,
1365 key => $key, # This tempfile will ultimately become this key
1366 classid => $classid,
1367 devids => $devid,
1370 unless ($newfid) {
1371 my $errc = error_code($@);
1372 return $self->err_line("fid_in_use") if $errc eq "dup";
1373 warn "Error registering tempfile: $@\n";
1374 return $self->err_line("db");
1376 unless (Mgd::get_store()->remove_fidid_from_devid($fid->id, $devid)) {
1377 warn "Error removing fidid from devid";
1378 return $self->err_line("db");
1380 unless (Mgd::get_store()->add_fidid_to_devid($newfid, $devid)) {
1381 warn "Error removing fidid from devid";
1382 return $self->err_line("db");
1385 my @paths = map {
1386 my $dfid = MogileFS::DevFID->new($devid, $_);
1387 my $path = $dfid->get_url;
1388 } ($fid, $newfid);
1389 my $ret;
1390 $ret->{oldpath} = $paths[0];
1391 $ret->{newpath} = $paths[1];
1392 $ret->{fid} = $newfid;
1393 $ret->{devid} = $devid;
1394 $ret->{class} = $classid;
1395 return $self->ok_line($ret);
1398 sub cmd_set_weight {
1399 my MogileFS::Worker::Query $self = shift;
1400 my $args = shift;
1402 # figure out what they want to do
1403 my ($hostname, $devid, $weight) = ($args->{host}, $args->{device}+0, $args->{weight}+0);
1404 return $self->err_line('bad_params')
1405 unless $hostname && $devid && $weight >= 0;
1407 my $dev = Mgd::device_factory()->get_by_id($devid);
1408 return $self->err_line('no_device') unless $dev;
1409 return $self->err_line('host_mismatch')
1410 unless $dev->host->hostname eq $hostname;
1412 Mgd::get_store()->set_device_weight($dev->id, $weight);
1414 return $self->cmd_clear_cache;
1417 sub cmd_set_state {
1418 my MogileFS::Worker::Query $self = shift;
1419 my $args = shift;
1421 # figure out what they want to do
1422 my ($hostname, $devid, $state) = ($args->{host}, $args->{device}+0, $args->{state});
1424 my $dstate = device_state($state);
1425 return $self->err_line('bad_params')
1426 unless $hostname && $devid && $dstate;
1428 my $dev = Mgd::device_factory()->get_by_id($devid);
1429 return $self->err_line('no_device') unless $dev;
1430 return $self->err_line('host_mismatch')
1431 unless $dev->host->hostname eq $hostname;
1433 # make sure the destination state isn't too high
1434 return $self->err_line('state_too_high')
1435 unless $dev->can_change_to_state($state);
1437 Mgd::get_store()->set_device_state($dev->id, $state);
1438 return $self->cmd_clear_cache;
1441 sub cmd_noop {
1442 my MogileFS::Worker::Query $self = shift;
1443 my $args = shift;
1444 return $self->ok_line;
1447 sub cmd_replicate_now {
1448 my MogileFS::Worker::Query $self = shift;
1450 my $rv = Mgd::get_store()->replicate_now;
1451 return $self->ok_line({ count => int($rv) });
1454 sub cmd_set_server_setting {
1455 my MogileFS::Worker::Query $self = shift;
1456 my $args = shift;
1457 my $key = $args->{key} or
1458 return $self->err_line("bad_params");
1459 my $val = $args->{value};
1461 my $chk = MogileFS::Config->server_setting_is_writable($key) or
1462 return $self->err_line("not_writable");
1464 my $cleanval = eval { $chk->($val); };
1465 return $self->err_line("invalid_format", $@) if $@;
1467 MogileFS::Config->set_server_setting($key, $cleanval);
1469 # GROSS HACK: slave settings are managed directly by MogileFS::Client, but
1470 # I need to add a version key, so we check and inject that code here.
1471 # FIXME: Move this when slave keys are managed by query worker commands!
1472 if ($key =~ /^slave_/) {
1473 Mgd::get_store()->incr_server_setting('slave_version', 1);
1476 return $self->ok_line;
1479 sub cmd_server_setting {
1480 my MogileFS::Worker::Query $self = shift;
1481 my $args = shift;
1482 my $key = $args->{key};
1483 return $self->err_line("bad_params") unless $key;
1484 my $value = MogileFS::Config->server_setting($key);
1485 return $self->ok_line({key => $key, value => $value});
1488 sub cmd_server_settings {
1489 my MogileFS::Worker::Query $self = shift;
1490 my $ss = Mgd::get_store()->server_settings;
1491 my $ret = {};
1492 my $n = 0;
1493 while (my ($k, $v) = each %$ss) {
1494 next unless MogileFS::Config->server_setting_is_readable($k);
1495 $ret->{"key_count"} = ++$n;
1496 $ret->{"key_$n"} = $k;
1497 $ret->{"value_$n"} = $v;
1499 return $self->ok_line($ret);
1502 sub cmd_do_monitor_round {
1503 my MogileFS::Worker::Query $self = shift;
1504 my $args = shift;
1505 $self->forget_that_monitor_has_run;
1506 $self->wait_for_monitor;
1507 return $self->ok_line;
1510 sub cmd_fsck_start {
1511 my MogileFS::Worker::Query $self = shift;
1512 my $sto = Mgd::get_store();
1514 my $fsck_host = MogileFS::Config->server_setting("fsck_host");
1515 my $rebal_host = MogileFS::Config->server_setting("rebal_host");
1517 return $self->err_line("fsck_running", "fsck is already running") if $fsck_host;
1518 return $self->err_line("rebal_running", "rebalance running; cannot run fsck at same time") if $rebal_host;
1520 # reset position, if a previous fsck was already completed.
1521 my $intss = sub { MogileFS::Config->server_setting($_[0]) || 0 };
1522 my $checked_fid = $intss->("fsck_highest_fid_checked");
1523 my $final_fid = $intss->("fsck_fid_at_end");
1524 if (($checked_fid && $final_fid && $checked_fid >= $final_fid) ||
1525 (!$final_fid && !$checked_fid)) {
1526 $self->_do_fsck_reset or return $self->err_line("db");
1529 # set params for stats:
1530 $sto->set_server_setting("fsck_start_time", $sto->get_db_unixtime);
1531 $sto->set_server_setting("fsck_stop_time", undef);
1532 $sto->set_server_setting("fsck_fids_checked", 0);
1533 my $start_fid =
1534 MogileFS::Config->server_setting('fsck_highest_fid_checked') || 0;
1535 $sto->set_server_setting("fsck_start_fid", $start_fid);
1537 # and start it:
1538 $sto->set_server_setting("fsck_host", MogileFS::Config->hostname);
1539 MogileFS::ProcManager->wake_a("fsck");
1541 return $self->ok_line;
1544 sub cmd_fsck_stop {
1545 my MogileFS::Worker::Query $self = shift;
1546 my $sto = Mgd::get_store();
1547 $sto->set_server_setting("fsck_host", undef);
1548 $sto->set_server_setting("fsck_stop_time", $sto->get_db_unixtime);
1549 return $self->ok_line;
1552 sub cmd_fsck_reset {
1553 my MogileFS::Worker::Query $self = shift;
1554 my $args = shift;
1556 my $sto = Mgd::get_store();
1557 $sto->set_server_setting("fsck_opt_policy_only",
1558 ($args->{policy_only} ? "1" : undef));
1559 $sto->set_server_setting("fsck_highest_fid_checked",
1560 ($args->{startpos} ? $args->{startpos} : "0"));
1562 $self->_do_fsck_reset or return $self->err_line("db");
1563 return $self->ok_line;
1566 sub _do_fsck_reset {
1567 my MogileFS::Worker::Query $self = shift;
1568 eval {
1569 my $sto = Mgd::get_store();
1570 $sto->set_server_setting("fsck_start_time", undef);
1571 $sto->set_server_setting("fsck_stop_time", undef);
1572 $sto->set_server_setting("fsck_fids_checked", 0);
1573 $sto->set_server_setting("fsck_fid_at_end", $sto->max_fidid);
1575 # clear existing event counts summaries.
1576 my $ss = $sto->server_settings;
1577 foreach my $k (keys %$ss) {
1578 next unless $k =~ /^fsck_sum_evcount_/;
1579 $sto->set_server_setting($k, undef);
1581 my $logid = $sto->max_fsck_logid;
1582 $sto->set_server_setting("fsck_start_maxlogid", $logid);
1583 $sto->set_server_setting("fsck_logid_processed", $logid);
1585 if ($@) {
1586 error("DB error in _do_fsck_reset: $@");
1587 return 0;
1589 return 1;
1592 sub cmd_fsck_clearlog {
1593 my MogileFS::Worker::Query $self = shift;
1594 my $sto = Mgd::get_store();
1595 $sto->clear_fsck_log;
1596 return $self->ok_line;
1599 sub cmd_fsck_getlog {
1600 my MogileFS::Worker::Query $self = shift;
1601 my $args = shift;
1603 my $sto = Mgd::get_store();
1604 my @rows = $sto->fsck_log_rows($args->{after_logid}, 100);
1605 my $ret;
1606 my $n = 0;
1607 foreach my $row (@rows) {
1608 $n++;
1609 foreach my $k (keys %$row) {
1610 $ret->{"row_${n}_$k"} = $row->{$k} if defined $row->{$k};
1613 $ret->{row_count} = $n;
1614 return $self->ok_line($ret);
1617 sub cmd_fsck_status {
1618 my MogileFS::Worker::Query $self = shift;
1620 my $sto = Mgd::get_store();
1621 # Kick up the summary before we read the values
1622 $sto->fsck_log_summarize;
1623 my $fsck_host = MogileFS::Config->server_setting('fsck_host');
1624 my $intss = sub { MogileFS::Config->server_setting($_[0]) || 0 };
1625 my $ret = {
1626 running => ($fsck_host ? 1 : 0),
1627 host => $fsck_host,
1628 max_fid_checked => $intss->('fsck_highest_fid_checked'),
1629 policy_only => $intss->('fsck_opt_policy_only'),
1630 end_fid => $intss->('fsck_fid_at_end'),
1631 start_time => $intss->('fsck_start_time'),
1632 stop_time => $intss->('fsck_stop_time'),
1633 current_time => $sto->get_db_unixtime,
1634 max_logid => $sto->max_fsck_logid,
1637 # throw some stats in.
1638 my $ss = $sto->server_settings;
1639 foreach my $k (keys %$ss) {
1640 next unless $k =~ /^fsck_sum_evcount_(.+)/;
1641 $ret->{"num_$1"} += $ss->{$k};
1644 return $self->ok_line($ret);
1647 sub cmd_rebalance_status {
1648 my MogileFS::Worker::Query $self = shift;
1650 my $sto = Mgd::get_store();
1652 my $rebal_state = MogileFS::Config->server_setting('rebal_state');
1653 return $self->err_line('no_rebal_state') unless $rebal_state;
1654 return $self->ok_line({ state => $rebal_state });
1657 sub cmd_rebalance_start {
1658 my MogileFS::Worker::Query $self = shift;
1660 my $rebal_host = MogileFS::Config->server_setting("rebal_host");
1661 my $fsck_host = MogileFS::Config->server_setting("fsck_host");
1663 return $self->err_line("rebal_running", "rebalance is already running") if $rebal_host;
1664 return $self->err_line("fsck_running", "fsck running; cannot run rebalance at same time") if $fsck_host;
1666 my $rebal_state = MogileFS::Config->server_setting('rebal_state');
1667 unless ($rebal_state) {
1668 my $rebal_pol = MogileFS::Config->server_setting('rebal_policy');
1669 return $self->err_line('no_rebal_policy') unless $rebal_pol;
1671 my $rebal = MogileFS::Rebalance->new;
1672 $rebal->policy($rebal_pol);
1673 my @devs = Mgd::device_factory()->get_all;
1674 $rebal->init(\@devs);
1675 my $sdevs = $rebal->source_devices;
1677 $rebal_state = $rebal->save_state;
1678 MogileFS::Config->set_server_setting('rebal_state', $rebal_state);
1680 # TODO: register start time somewhere.
1681 MogileFS::Config->set_server_setting('rebal_host', MogileFS::Config->hostname);
1682 return $self->ok_line({ state => $rebal_state });
1685 sub cmd_rebalance_test {
1686 my MogileFS::Worker::Query $self = shift;
1687 my $rebal_pol = MogileFS::Config->server_setting('rebal_policy');
1688 my $rebal_state = MogileFS::Config->server_setting('rebal_state');
1689 return $self->err_line('no_rebal_policy') unless $rebal_pol;
1691 my $rebal = MogileFS::Rebalance->new;
1692 my @devs = Mgd::device_factory()->get_all;
1693 $rebal->policy($rebal_pol);
1694 $rebal->init(\@devs);
1696 # client should display list of source, destination devices.
1697 # FIXME: can probably avoid calling this twice by pulling state?
1698 # *or* not running init.
1699 my $sdevs = $rebal->filter_source_devices(\@devs);
1700 my $ddevs = $rebal->filter_dest_devices(\@devs);
1701 my $ret = {};
1702 $ret->{sdevs} = join(',', @$sdevs);
1703 $ret->{ddevs} = join(',', @$ddevs);
1705 return $self->ok_line($ret);
1708 sub cmd_rebalance_reset {
1709 my MogileFS::Worker::Query $self = shift;
1710 my $host = MogileFS::Config->server_setting('rebal_host');
1711 if ($host) {
1712 return $self->err_line("rebal_running", "rebalance is running") if $host;
1714 MogileFS::Config->set_server_setting('rebal_state', undef);
1715 return $self->ok_line;
1718 sub cmd_rebalance_stop {
1719 my MogileFS::Worker::Query $self = shift;
1720 my $host = MogileFS::Config->server_setting('rebal_host');
1721 unless ($host) {
1722 return $self->err_line('rebal_not_started');
1724 MogileFS::Config->set_server_setting('rebal_signal', 'stop');
1725 return $self->ok_line;
1728 sub cmd_rebalance_set_policy {
1729 my MogileFS::Worker::Query $self = shift;
1730 my $args = shift;
1732 my $rebal_host = MogileFS::Config->server_setting("rebal_host");
1733 return $self->err_line("no_set_rebal", "cannot change rebalance policy while rebalance is running") if $rebal_host;
1735 # load policy object, test policy, set policy.
1736 my $rebal = MogileFS::Rebalance->new;
1737 eval {
1738 $rebal->policy($args->{policy});
1740 if ($@) {
1741 return $self->err_line("bad_rebal_pol", $@);
1744 MogileFS::Config->set_server_setting('rebal_policy', $args->{policy});
1745 MogileFS::Config->set_server_setting('rebal_state', undef);
1746 return $self->ok_line;
1749 sub ok_line {
1750 my MogileFS::Worker::Query $self = shift;
1752 my $delay = '';
1753 if ($self->{querystarttime}) {
1754 $delay = sprintf("%.4f ", Time::HiRes::tv_interval( $self->{querystarttime} ));
1755 $self->{querystarttime} = undef;
1758 my $id = defined $self->{reqid} ? "$self->{reqid} " : '';
1760 my $args = shift || {};
1761 $args->{callid} = $self->{callid} if defined $self->{callid};
1762 my $argline = join('&', map { eurl($_) . "=" . eurl($args->{$_}) } keys %$args);
1763 $self->send_to_parent("${id}${delay}OK $argline");
1764 return 1;
1767 # first argument: error code.
1768 # second argument: optional error text. text will be taken from code if no text provided.
1769 sub err_line {
1770 my MogileFS::Worker::Query $self = shift;
1772 my $err_code = shift;
1773 my $err_text = shift || {
1774 'dup' => "Duplicate name/number used.",
1775 'after_mismatch' => "Pattern does not match the after-value?",
1776 'bad_params' => "Invalid parameters to command; please see documentation",
1777 'class_exists' => "That class already exists in that domain",
1778 'class_has_files' => "Class still has files, unable to delete",
1779 'class_not_found' => "Class not found",
1780 'db' => "Database error",
1781 'domain_has_files' => "Domain still has files, unable to delete",
1782 'domain_exists' => "That domain already exists",
1783 'domain_not_empty' => "Domain still has classes, unable to delete",
1784 'domain_not_found' => "Domain not found",
1785 'failure' => "Operation failed",
1786 'host_exists' => "That host already exists",
1787 'host_mismatch' => "The device specified doesn't belong to the host specified",
1788 'host_not_empty' => "Unable to delete host; it contains devices still",
1789 'host_not_found' => "Host not found",
1790 'invalid_checker_level' => "Checker level invalid. Please see documentation on this command.",
1791 'invalid_mindevcount' => "The mindevcount must be at least 1",
1792 'key_exists' => "Target key name already exists; can't overwrite.",
1793 'no_class' => "No class provided",
1794 'no_devices' => "No devices found to store file",
1795 'no_device' => "Device not found",
1796 'no_domain' => "No domain provided",
1797 'no_host' => "No host provided",
1798 'no_ip' => "IP required to create host",
1799 'no_port' => "Port required to create host",
1800 'no_temp_file' => "No tempfile or file already closed",
1801 'none_match' => "No keys match that pattern and after-value (if any).",
1802 'plugin_aborted' => "Action aborted by plugin",
1803 'state_too_high' => "Status cannot go from dead to alive; must use down",
1804 'unknown_command' => "Unknown server command",
1805 'unknown_host' => "Host not found",
1806 'unknown_state' => "Invalid/unknown state",
1807 'unreg_domain' => "Domain name invalid/not found",
1808 'rebal_not_started' => "Rebalance not running",
1809 'no_rebal_state' => "No available rebalance status",
1810 'no_rebal_policy' => "No rebalance policy available",
1811 'nodel_default_class' => "Cannot delete the default class",
1812 }->{$err_code} || $err_code;
1814 my $delay = '';
1815 if ($self->{querystarttime}) {
1816 $delay = sprintf("%.4f ", Time::HiRes::tv_interval($self->{querystarttime}));
1817 $self->{querystarttime} = undef;
1818 } else {
1819 # don't send another ERR line if we already sent one
1820 error("err_line called redundantly with $err_code ( " . eurl($err_text) . ")");
1821 return 0;
1824 my $id = defined $self->{reqid} ? "$self->{reqid} " : '';
1825 my $callid = defined $self->{callid} ? ' ' . eurl($self->{callid}) : '';
1827 $self->send_to_parent("${id}${delay}ERR $err_code " . eurl($err_text) . $callid);
1828 return 0;
1833 # Local Variables:
1834 # mode: perl
1835 # c-basic-indent: 4
1836 # indent-tabs-mode: nil
1837 # End:
1839 __END__
1841 =head1 NAME
1843 MogileFS::Worker::Query -- implements the MogileFS client protocol
1845 =head1 SEE ALSO
1847 L<MogileFS::Worker>