ProcManager: only log times_out_of_qworkers for new queries
[MogileFS-Server.git] / lib / MogileFS / ProcManager.pm
blob7b3036505d6f916266c85e32d7bd9a47f809b638
1 package MogileFS::ProcManager;
2 use strict;
3 use warnings;
4 use POSIX qw(:sys_wait_h sigprocmask SIGINT SIG_BLOCK SIG_UNBLOCK);
5 use Symbol;
6 use Socket;
7 use MogileFS::Connection::Client;
8 use MogileFS::Connection::Worker;
9 use MogileFS::Util qw(apply_state_events);
11 # This class handles keeping lists of workers and clients and
12 # assigning them to each other when things happen. You don't actually
13 # instantiate a procmanager. the class itself holds all state.
15 # Mappings: fd => [ clientref, jobstring, starttime ]
16 # queues are just lists of Client class objects
17 # ChildrenByJob: job => { pid => $client }
18 # ErrorsTo: fid => Client
19 # RecentQueries: [ string, string, string, ... ]
20 # Stats: element => number
21 our ($IsChild, @RecentQueries,
22 %Mappings, %ChildrenByJob, %ErrorsTo, %Stats);
24 our $starttime = time(); # time we got going
25 sub server_starttime { return $starttime }
27 my @IdleQueryWorkers; # workers that are idle, able to process commands (MogileFS::Worker::Query, ...)
28 my @PendingQueries; # [ MogileFS::Connection::Client, "$ip $query" ]
30 my %idle_workers = (); # 'job' -> {href of idle workers}
31 my %pending_work = (); # 'job' -> [aref of pending work]
33 $IsChild = 0; # either false if we're the parent, or a MogileFS::Worker object
35 # keep track of what all child pids are doing, and what jobs are being
36 # satisifed.
37 my %child = (); # pid -> MogileFS::Connection::Worker
38 my %todie = (); # pid -> 1 (lists pids that we've asked to die)
39 my %jobs = (); # jobname -> [ min, current ]
41 our $allkidsup = 0; # if true, all our kids are running. set to 0 when a kid dies.
43 my @prefork_cleanup; # subrefs to run to clean stuff up before we make a new child
45 *error = \&Mgd::error;
47 my $monitor_good = 0; # ticked after monitor executes once after startup
49 my $nowish; # updated approximately once per second
51 sub push_pre_fork_cleanup {
52 my ($class, $code) = @_;
53 push @prefork_cleanup, $code;
56 sub RecentQueries {
57 return @RecentQueries;
60 sub write_pidfile {
61 my $class = shift;
62 my $pidfile = MogileFS->config("pidfile")
63 or return 1;
64 my $fh;
65 unless (open($fh, ">$pidfile")) {
66 Mgd::log('err', "couldn't create pidfile '$pidfile': $!");
67 return 0;
69 unless ((print $fh "$$\n") && close($fh)) {
70 Mgd::log('err', "couldn't write into pidfile '$pidfile': $!");
71 remove_pidfile();
72 return 0;
74 return 1;
77 sub remove_pidfile {
78 my $class = shift;
79 my $pidfile = MogileFS->config("pidfile")
80 or return;
81 unlink $pidfile;
82 return 1;
85 sub set_min_workers {
86 my ($class, $job, $min) = @_;
87 $jobs{$job} ||= [undef, 0]; # [min, current]
88 $jobs{$job}->[0] = $min;
90 # TODO: set allkipsup false, so spawner re-checks?
93 sub job_to_class_suffix {
94 my ($class, $job) = @_;
95 return {
96 fsck => "Fsck",
97 queryworker => "Query",
98 delete => "Delete",
99 replicate => "Replicate",
100 reaper => "Reaper",
101 monitor => "Monitor",
102 job_master => "JobMaster",
103 }->{$job};
106 sub job_to_class {
107 my ($class, $job) = @_;
108 my $suffix = $class->job_to_class_suffix($job) or return "";
109 return "MogileFS::Worker::$suffix";
112 sub child_pids {
113 return keys %child;
116 sub WatchDog {
117 foreach my $pid (keys %child) {
118 my MogileFS::Connection::Worker $child = $child{$pid};
119 my $healthy = $child->watchdog_check;
120 next if $healthy;
122 # special $todie level of 2 means the watchdog tried to kill it.
123 # TODO: Should be a CONSTANT?
124 next if $todie{$pid} && $todie{$pid} == 2;
125 note_pending_death($child->job, $pid, 2);
127 error("Watchdog killing worker $pid (" . $child->job . ")");
128 kill 9, $pid;
132 # returns a sub that Danga::Socket calls after each event loop round.
133 # the sub must return 1 for the program to continue running.
134 sub PostEventLoopChecker {
135 my $lastspawntime = 0; # time we last ran spawn_children sub
137 return sub {
138 # run only once per second
139 $nowish = time();
140 return 1 unless $nowish > $lastspawntime;
141 $lastspawntime = $nowish;
143 MogileFS::ProcManager->WatchDog;
145 # see if anybody has died, but don't hang up on doing so
146 while(my $pid = waitpid -1, WNOHANG) {
147 last unless $pid > 0;
148 $allkidsup = 0; # know something died
150 # when a child dies, figure out what it was doing
151 # and note that job has one less worker
152 my $jobconn;
153 if (($jobconn = delete $child{$pid})) {
154 my $job = $jobconn->job;
155 my $extra = $todie{$pid} ? "expected" : "UNEXPECTED";
156 error("Child $pid ($job) died: $? ($extra)");
157 MogileFS::ProcManager->NoteDeadChild($pid);
158 $jobconn->close;
160 if (my $jobstat = $jobs{$job}) {
161 # if the pid is in %todie, then we have asked it to shut down
162 # and have already decremented the jobstat counter and don't
163 # want to do it again
164 unless (my $true = delete $todie{$pid}) {
165 # decrement the count of currently running jobs
166 $jobstat->[1]--;
172 return 1 if $allkidsup;
174 # foreach job, fork enough children
175 while (my ($job, $jobstat) = each %jobs) {
176 my $need = $jobstat->[0] - $jobstat->[1];
177 if ($need > 0) {
178 error("Job $job has only $jobstat->[1], wants $jobstat->[0], making $need.");
179 for (1..$need) {
180 my $jobconn = make_new_child($job)
181 or return 1; # basically bail: true value keeps event loop running
182 $child{$jobconn->pid} = $jobconn;
184 # now increase the count of processes currently doing this job
185 $jobstat->[1]++;
190 # if we got this far, all jobs have been re-created. note that
191 # so we avoid more CPU usage in this post-event-loop callback later
192 $allkidsup = 1;
194 # true value keeps us running:
195 return 1;
199 sub make_new_child {
200 my $job = shift;
202 my $pid;
203 my $sigset;
205 # Ensure our dbh is closed before we fork anything.
206 # Causes problems on some platforms (Solaris+Postgres)
207 Mgd::close_store();
209 # block signal for fork
210 $sigset = POSIX::SigSet->new(SIGINT);
211 sigprocmask(SIG_BLOCK, $sigset)
212 or return error("Can't block SIGINT for fork: $!");
214 socketpair(my $parents_ipc, my $childs_ipc, AF_UNIX, SOCK_STREAM, PF_UNSPEC )
215 or die( "Sockpair failed" );
217 return error("fork failed creating $job: $!")
218 unless defined ($pid = fork);
220 # enable auto-flush, so it's not pipe-buffered between parent/child
221 select((select( $parents_ipc ), $|++)[0]);
222 select((select( $childs_ipc ), $|++)[0]);
224 # if i'm the parent
225 if ($pid) {
226 sigprocmask(SIG_UNBLOCK, $sigset)
227 or return error("Can't unblock SIGINT for fork: $!");
229 close($childs_ipc); # unnecessary but explicit
230 IO::Handle::blocking($parents_ipc, 0);
232 my $worker_conn = MogileFS::Connection::Worker->new($parents_ipc);
233 $worker_conn->pid($pid);
234 $worker_conn->job($job);
235 MogileFS::ProcManager->RegisterWorkerConn($worker_conn);
236 return $worker_conn;
239 # let children have different random number seeds
240 srand();
242 # as a child, we want to close these and ignore them
243 $_->() foreach @prefork_cleanup;
244 close($parents_ipc);
245 undef $parents_ipc;
247 $SIG{INT} = 'DEFAULT';
248 $SIG{TERM} = 'DEFAULT';
249 $0 .= " [$job]";
251 # unblock signals
252 sigprocmask(SIG_UNBLOCK, $sigset)
253 or return error("Can't unblock SIGINT for fork: $!");
255 # now call our job function
256 my $class = MogileFS::ProcManager->job_to_class($job)
257 or die "No worker class defined for job '$job'\n";
258 my $worker = $class->new($childs_ipc);
260 # set our frontend into child mode
261 MogileFS::ProcManager->SetAsChild($worker);
263 $worker->work;
264 exit 0;
267 sub PendingQueryCount {
268 return scalar @PendingQueries;
271 sub BoredQueryWorkerCount {
272 return scalar @IdleQueryWorkers;
275 sub QueriesInProgressCount {
276 return scalar keys %Mappings;
279 # Toss in any queue depths.
280 sub StatsHash {
281 for my $job (keys %pending_work) {
282 $Stats{'work_queue_for_' . $job} = @{$pending_work{$job}};
284 return \%Stats;
287 sub foreach_job {
288 my ($class, $cb) = @_;
289 foreach my $job (sort keys %ChildrenByJob) {
290 my $ct = scalar(keys %{$ChildrenByJob{$job}});
291 $cb->($job, $ct, $jobs{$job}->[0], [ join(' ', sort { $a <=> $b } keys %{$ChildrenByJob{$job}}) ]);
295 sub foreach_pending_query {
296 my ($class, $cb) = @_;
297 foreach my $clq (@PendingQueries) {
298 $cb->($clq->[0], # client object,
299 $clq->[1], # "$ip $query"
304 sub is_monitor_good {
305 return $monitor_good;
308 sub is_valid_job {
309 my ($class, $job) = @_;
310 return defined $jobs{$job};
313 sub valid_jobs {
314 return sort keys %jobs;
317 sub request_job_process {
318 my ($class, $job, $n) = @_;
319 return 0 unless $class->is_valid_job($job);
320 return 0 if ($job =~ /^(?:job_master|monitor)$/i && $n > 1); # ghetto special case
322 $jobs{$job}->[0] = $n;
323 $allkidsup = 0;
325 # try to clean out the queryworkers (if that's what we're doing?)
326 MogileFS::ProcManager->CullQueryWorkers
327 if $job eq 'queryworker';
329 # other workers listening off of a queue should be pinging parent
330 # frequently. shouldn't explicitly kill them.
334 # when a child is spawned, they'll have copies of all the data from the
335 # parent, but they don't need it. this method is called when you want
336 # to indicate that this procmanager is running on a child and should clean.
337 sub SetAsChild {
338 my ($class, $worker) = @_;
340 @IdleQueryWorkers = ();
341 @PendingQueries = ();
342 %Mappings = ();
343 $IsChild = $worker;
344 %ErrorsTo = ();
345 %idle_workers = ();
346 %pending_work = ();
348 # and now kill off our event loop so that we don't waste time
349 Danga::Socket->SetPostLoopCallback(sub { return 0; });
352 # called when a child has died. a child is someone doing a job for us,
353 # but it might be a queryworker or any other type of job. we just want
354 # to remove them from our list of children. they're actually respawned
355 # by the make_new_child function elsewhere in Mgd.
356 sub NoteDeadChild {
357 my $pid = $_[1];
358 foreach my $job (keys %ChildrenByJob) {
359 return if # bail out if we actually delete one
360 delete $ChildrenByJob{$job}->{$pid};
364 # called when a client dies. clients are users, management or non.
365 # we just want to remove them from the error reporting interface, if
366 # they happen to be part of it.
367 sub NoteDeadClient {
368 my $client = $_[1];
369 delete $ErrorsTo{$client->{fd}};
372 # called when the error function in Mgd is called and we're in the parent,
373 # so it's pretty simple that basically we just spit it out to folks listening
374 # to errors
375 sub NoteError {
376 return unless %ErrorsTo;
378 my $msg = ":: ${$_[1]}\r\n";
379 foreach my $client (values %ErrorsTo) {
380 $client->write(\$msg);
384 sub RemoveErrorWatcher {
385 my ($class, $client) = @_;
386 return delete $ErrorsTo{$client->{fd}};
389 sub AddErrorWatcher {
390 my ($class, $client) = @_;
391 $ErrorsTo{$client->{fd}} = $client;
394 # one-time initialization of a new worker connection
395 sub RegisterWorkerConn {
396 my MogileFS::Connection::Worker $worker = $_[1];
397 $worker->watch_read(1);
399 #warn sprintf("Registering start-up of $worker (%s) [%d]\n", $worker->job, $worker->pid);
401 # now do any special case startup
402 if ($worker->job eq 'queryworker') {
403 MogileFS::ProcManager->NoteIdleQueryWorker($worker);
406 # add to normal list
407 $ChildrenByJob{$worker->job}->{$worker->pid} = $worker;
411 sub EnqueueCommandRequest {
412 my ($class, $line, $client) = @_;
413 push @PendingQueries, [
414 $client,
415 ($client->peer_ip_string || '0.0.0.0') . " $line"
417 MogileFS::ProcManager->ProcessQueues;
418 if (@PendingQueries) {
419 # Don't like the name. Feel free to change if you find better.
420 $Stats{times_out_of_qworkers}++;
424 # puts a worker back in the queue, deleting any outstanding jobs in
425 # the mapping list for this fd.
426 sub NoteIdleQueryWorker {
427 # first arg is class, second is worker
428 my MogileFS::Connection::Worker $worker = $_[1];
429 delete $Mappings{$worker->{fd}};
431 # see if we need to kill off some workers
432 if (job_needs_reduction('queryworker')) {
433 Mgd::error("Reducing queryworker headcount by 1.");
434 MogileFS::ProcManager->AskWorkerToDie($worker);
435 return;
438 # must be okay, so put it in the queue
439 push @IdleQueryWorkers, $worker;
440 MogileFS::ProcManager->ProcessQueues;
443 # if we need to kill off a worker, this function takes in the WorkerConn
444 # object, tells it to die, marks us as having requested its death, and decrements
445 # the count of running jobs.
446 sub AskWorkerToDie {
447 my MogileFS::Connection::Worker $worker = $_[1];
448 note_pending_death($worker->job, $worker->pid);
449 $worker->write(":shutdown\r\n");
452 # kill bored query workers so we can get down to the level requested. this
453 # continues killing until we run out of folks to kill.
454 sub CullQueryWorkers {
455 while (@IdleQueryWorkers && job_needs_reduction('queryworker')) {
456 my MogileFS::Connection::Worker $worker = shift @IdleQueryWorkers;
457 MogileFS::ProcManager->AskWorkerToDie($worker);
461 # called when we get a response from a worker. this reenqueues the
462 # worker so it can handle another response as well as passes the answer
463 # back on to the client.
464 sub HandleQueryWorkerResponse {
465 # got a response from a worker
466 my MogileFS::Connection::Worker $worker;
467 my $line;
468 (undef, $worker, $line) = @_;
470 return Mgd::error("ASSERT: ProcManager (Child) got worker response: $line") if $IsChild;
471 return unless $worker && $Mappings{$worker->{fd}};
473 # get the client we're working with (if any)
474 my ($client, $jobstr, $starttime) = @{ $Mappings{$worker->{fd}} };
476 # if we have no client, then we just got a standard message from
477 # the queryworker and need to pass it up the line
478 return MogileFS::ProcManager->HandleChildRequest($worker, $line) if !$client;
480 # at this point it was a command response, but if the client has gone
481 # away, just reenqueue this query worker
482 return MogileFS::ProcManager->NoteIdleQueryWorker($worker) if $client->{closed};
484 # <numeric id> [client-side time to complete] <response>
485 my ($time, $id, $res);
486 if ($line =~ /^(\d+-\d+)\s+(\-?\d+\.\d+)\s+(.+)$/) {
487 # save time and response for use later
488 # Note the optional negative sign in the regexp. Somebody
489 # on the mailing list was getting a time of -0.0000, causing
490 # broken connections.
491 ($id, $time, $res) = ($1, $2, $3);
494 # now, if it doesn't match
495 unless ($id && $id eq "$worker->{pid}-$worker->{reqid}") {
496 $id = "<undef>" unless defined $id;
497 $line = "<undef>" unless defined $line;
498 $line =~ s/\n/\\n/g;
499 $line =~ s/\r/\\r/g;
500 Mgd::error("Worker responded with id $id (line: [$line]), but expected id $worker->{pid}-$worker->{reqid}, killing");
501 $client->close('worker_mismatch');
502 return MogileFS::ProcManager->AskWorkerToDie($worker);
505 # now time this interval and add to @RecentQueries
506 my $tinterval = Time::HiRes::time() - $starttime;
507 push @RecentQueries, sprintf("%s %.4f %s", $jobstr, $tinterval, $time);
508 shift @RecentQueries if scalar(@RecentQueries) > 50;
510 # send text to client, put worker back in queue
511 $client->write("$res\r\n");
512 MogileFS::ProcManager->NoteIdleQueryWorker($worker);
515 # new per-worker magic internal queue runner.
516 # TODO: Since this fires only when a master asks or a worker reports
517 # in bored, it should just operate on that *one* queue?
519 # new change: if worker in $job, but not in _bored, do not send work.
520 # if work is received, only delete from _bored
521 sub process_worker_queues {
522 return if $IsChild;
524 JOB: while (my ($job, $queue) = each %pending_work) {
525 next JOB unless @$queue;
526 next JOB unless $idle_workers{$job} && keys %{$idle_workers{$job}};
527 WORKER: for my $worker_key (keys %{$idle_workers{$job}}) {
528 my MogileFS::Connection::Worker $worker =
529 delete $idle_workers{_bored}->{$worker_key};
530 if (!defined $worker || $worker->{closed}) {
531 delete $idle_workers{$job}->{$worker_key};
532 next WORKER;
535 # allow workers to grab a linear range of work.
536 while (@$queue && $worker->wants_todo($job)) {
537 $worker->write(":queue_todo $job " . shift(@$queue) . "\r\n");
538 $Stats{'work_sent_to_' . $job}++;
540 next JOB unless @$queue;
545 # called from various spots to empty the queues of available pairs.
546 sub ProcessQueues {
547 return if $IsChild;
549 # try to match up a client with a worker
550 while (@IdleQueryWorkers && @PendingQueries) {
551 # get client that isn't closed
552 my $clref;
553 while (!$clref && @PendingQueries) {
554 $clref = shift @PendingQueries
555 or next;
556 if ($clref->[0]->{closed}) {
557 $clref = undef;
558 next;
561 next unless $clref;
563 # get worker and make sure it's not closed already
564 my MogileFS::Connection::Worker $worker = pop @IdleQueryWorkers;
565 if (!defined $worker || $worker->{closed}) {
566 unshift @PendingQueries, $clref;
567 next;
570 # put in mapping and send data to worker
571 push @$clref, Time::HiRes::time();
572 $Mappings{$worker->{fd}} = $clref;
573 $Stats{queries}++;
575 # increment our counter so we know what request counter this is going out
576 $worker->{reqid}++;
577 # so we're writing a string of the form:
578 # 123-455 10.2.3.123 get_paths foo=bar&blah=bar\r\n
579 $worker->write("$worker->{pid}-$worker->{reqid} $clref->[1]\r\n");
583 # send short descriptions of commands we support to the user
584 sub SendHelp {
585 my $client = $_[1];
587 # send general purpose help
588 $client->write(<<HELP);
589 Mogilefsd admin commands:
591 !version Server version
592 !recent Recently executed queries and how long they took.
593 !queue Queries that are pending execution.
594 !stats General stats on what we\'re up to.
595 !watch Observe errors/messages from children.
596 !jobs Outstanding job counts, desired level, and pids.
597 !shutdown Immediately kill all of mogilefsd.
599 !to <job class> <message>
600 Send <message> to all workers of <job class>.
601 Mostly used for debugging.
603 !want <count> <job class>
604 Alter the level of workers of this class desired.
605 Example: !want 20 queryworker, !want 3 replicate.
606 See !jobs for what jobs are available.
608 HELP
612 # a child has contacted us with some command/status/something.
613 sub HandleChildRequest {
614 if ($IsChild) {
615 Mgd::fatal("ASSERT: child $_[2] shouldn't be getting requests from other children");
618 # if they have no job set, then their first line is what job they are
619 # and not a command. they also specify their pid, just so we know what
620 # connection goes with what pid, in case it's ever useful information.
621 my MogileFS::Connection::Worker $child = $_[1];
622 my $cmd = $_[2];
624 die "Child $child with no pid?" unless $child->job;
626 # at this point we've got a command of some sort
627 if ($cmd =~ /^error (.+)$/i) {
628 # pass it on to our error handler, prefaced with the child's job
629 Mgd::error("[" . $child->job . "(" . $child->pid . ")] $1");
631 } elsif ($cmd =~ /^debug (.+)$/i) {
632 # pass it on to our error handler, prefaced with the child's job
633 Mgd::debug("[" . $child->job . "(" . $child->pid . ")] $1");
635 } elsif ($cmd =~ /^queue_depth (\w+)/) {
636 my $job = $1;
637 if ($job eq 'all') {
638 for my $qname (keys %pending_work) {
639 my $depth = @{$pending_work{$qname}};
640 $child->write(":queue_depth $qname $depth\r\n");
642 } else {
643 my $depth = 0;
644 if ($pending_work{$job}) {
645 $depth = @{$pending_work{$job}};
647 $child->write(":queue_depth $job $depth\r\n");
649 MogileFS::ProcManager->process_worker_queues;
650 } elsif ($cmd =~ /^queue_todo (\w+) (.+)/) {
651 my $job = $1;
652 $pending_work{$job} ||= [];
653 push(@{$pending_work{$job}}, $2);
654 # Don't process queues immediately, to allow batch processing.
655 } elsif ($cmd =~ /^worker_bored (\d+) (.+)/) {
656 my $batch = $1;
657 my $types = $2;
658 if (job_needs_reduction($child->job)) {
659 MogileFS::ProcManager->AskWorkerToDie($child);
660 } else {
661 unless (exists $idle_workers{$child->job}) {
662 $idle_workers{$child->job} = {};
664 $idle_workers{_bored} ||= {};
665 $idle_workers{_bored}->{$child} = $child;
666 for my $type (split(/\s+/, $types)) {
667 $idle_workers{$type} ||= {};
668 $idle_workers{$type}->{$child}++;
669 $child->wants_todo($type, $batch);
671 MogileFS::ProcManager->process_worker_queues;
673 } elsif ($cmd eq ":ping") {
675 # warn sprintf("Job '%s' with pid %d is still alive at %d\n", $child->job, $child->pid, time());
677 # this command expects a reply, either to die or stay alive. beginning of worker's loops
678 if (job_needs_reduction($child->job)) {
679 MogileFS::ProcManager->AskWorkerToDie($child);
680 } else {
681 $child->write(":stay_alive\r\n");
684 } elsif ($cmd eq ":still_alive") {
685 # a no-op
687 } elsif ($cmd =~ /^:monitor_events/) {
688 # Apply the state locally, so when we fork children they have a
689 # pre-parsed factory.
690 # Also replay the event back where it came, so the same mechanism
691 # applies and uses local changes.
692 apply_state_events(\$cmd);
693 MogileFS::ProcManager->send_to_all_children($cmd);
695 } elsif ($cmd eq ":monitor_just_ran") {
696 send_monitor_has_run($child);
698 } elsif ($cmd =~ /^:wake_a (\w+)$/) {
700 MogileFS::ProcManager->wake_a($1, $child);
701 } elsif ($cmd =~ /^:set_config_from_child (\S+) (.+)/) {
702 # and this will rebroadcast it to all other children
703 # (including the one that just set it to us, but eh)
704 MogileFS::Config->set_config($1, $2);
705 } elsif ($cmd =~ /^:refresh_monitor$/) {
706 MogileFS::ProcManager->ImmediateSendToChildrenByJob("monitor", $cmd);
707 } else {
708 # unknown command
709 my $show = $cmd;
710 $show = substr($show, 0, 80) . "..." if length $cmd > 80;
711 Mgd::error("Unknown command [$show] from child; job=" . $child->job);
715 # Class method.
716 # ProcManager->ImmediateSendToChildrenByJob($class, $message, [ $child ])
717 # given a job class, and a message, send it to all children of that job. returns
718 # the number of children the message was sent to.
720 # if child is specified, the message will be sent to members of the job class that
721 # aren't that child. so you can exclude the one that originated the message.
723 # doesn't add to queue of things child gets on next interactive command: writes immediately
724 # (won't get in middle of partial write, though, as danga::socket queues things up)
726 # if $just_one is specified, only a single process is notified, then we stop.
727 sub ImmediateSendToChildrenByJob {
728 my ($pkg, $class, $msg, $exclude_child, $just_one) = @_;
730 my $childref = $ChildrenByJob{$class};
731 return 0 unless defined $childref && %$childref;
733 foreach my $child (values %$childref) {
734 # ignore the child specified as the third arg if one is sent
735 next if $exclude_child && $exclude_child == $child;
737 # send the message to this child
738 $child->write("$msg\r\n");
739 return 1 if $just_one;
741 return scalar(keys %$childref);
744 # called when we notice that a worker has bit it. we might have to restart a
745 # job that they had been working on.
746 sub NoteDeadWorkerConn {
747 return if $IsChild;
749 # get parms and error check
750 my MogileFS::Connection::Worker $worker = $_[1];
751 return unless $worker;
753 my $fd = $worker->{fd};
754 return unless defined($fd);
756 # if there's a mapping for this worker's fd, they had a job that didn't get done
757 if ($Mappings{$fd}) {
758 # unshift, since this one already went through the queue once
759 unshift @PendingQueries, $Mappings{$worker->{fd}};
760 delete $Mappings{$worker->{fd}};
762 # now try to get it processing again
763 MogileFS::ProcManager->ProcessQueues;
767 # given (job, pid), record that this worker is about to die
768 # $level is so we can tell if watchdog requested the death.
769 sub note_pending_death {
770 my ($job, $pid, $level) = @_;
772 die "$job not defined in call to note_pending_death.\n"
773 unless defined $jobs{$job};
775 $level ||= 1;
776 # don't double decrement.
777 $jobs{$job}->[1]-- unless $todie{$pid};
778 $todie{$pid} = $level;
781 # see if we should reduce the number of active children
782 sub job_needs_reduction {
783 my $job = shift;
784 return $jobs{$job}->[0] < $jobs{$job}->[1];
787 sub is_child {
788 return $IsChild;
791 sub wake_a {
792 my ($pkg, $class, $fromchild) = @_; # from arg is optional (which child sent it)
793 my $child = MogileFS::ProcManager->is_child;
794 if ($child) {
795 $child->wake_a($class);
796 } else {
797 MogileFS::ProcManager->ImmediateSendToChildrenByJob($class, ":wake_up", $fromchild, "just_one");
801 sub send_to_all_children {
802 my ($pkg, $msg, $exclude) = @_;
803 foreach my $child (values %child) {
804 next if $exclude && $child == $exclude;
805 $child->write($msg . "\r\n");
809 sub send_monitor_has_run {
810 my $child = shift;
811 # Gas up other workers if monitor's completed for the first time.
812 if (! $monitor_good) {
813 MogileFS::ProcManager->set_min_workers('queryworker' => MogileFS->config('query_jobs'));
814 MogileFS::ProcManager->set_min_workers('delete' => MogileFS->config('delete_jobs'));
815 MogileFS::ProcManager->set_min_workers('replicate' => MogileFS->config('replicate_jobs'));
816 MogileFS::ProcManager->set_min_workers('reaper' => MogileFS->config('reaper_jobs'));
817 MogileFS::ProcManager->set_min_workers('fsck' => MogileFS->config('fsck_jobs'));
818 MogileFS::ProcManager->set_min_workers('job_master' => 1);
819 $monitor_good = 1;
820 $allkidsup = 0;
822 for my $type (qw(queryworker)) {
823 MogileFS::ProcManager->ImmediateSendToChildrenByJob($type, ":monitor_has_run", $child);
829 # Local Variables:
830 # mode: perl
831 # c-basic-indent: 4
832 # indent-tabs-mode: nil
833 # End: