1 # A client is a user connection for sending requests to us. Requests
2 # can either be normal user requests to be sent to a QueryWorker
3 # or management requests that start with a !.
5 package MogileFS
::Connection
::Client
;
9 use base
qw{Danga
::Socket
};
11 use Time
::HiRes
qw(clock_gettime CLOCK_MONOTONIC);
12 use MogileFS
::Util
qw(error);
14 use fields
qw{read_buf pipelined
};
16 my %SLOW_WRITERS = ();
26 my $dmap = Danga
::Socket
->DescriptorMap;
27 my $old = clock_gettime
(CLOCK_MONOTONIC
) - $EXPTIME;
28 foreach my $fd (keys %SLOW_WRITERS) {
29 my $last_write = $SLOW_WRITERS{$fd};
30 next if $last_write > $old;
32 if (my $ds = $dmap->{$fd}) {
33 error
('write timeout expired: '.$ds->as_string);
36 error
("fd=$fd not known to Danga::Socket(!?), ignoring");
38 delete $SLOW_WRITERS{$fd};
42 sub ProcessPipelined
{
45 foreach my MogileFS
::Connection
::Client
$clref (@
$run) {
46 $clref->{pipelined
} = undef;
47 $clref->process_request or $clref->watch_read(1);
53 $self = fields
::new
($self) unless ref $self;
54 $self->SUPER::new
( @_ );
55 IO
::Handle
::blocking
($self->{sock
}, 0);
56 delete $SLOW_WRITERS{$self->{fd
}};
64 my MogileFS
::Connection
::Client
$self = shift;
66 while ($self->{read_buf
} =~ s/^(.*?)\r?\n//) {
67 next unless length $1;
68 $self->handle_request($1);
75 my MogileFS
::Connection
::Client
$self = shift;
77 my $bref = $self->read(1024);
78 return $self->close unless defined $bref;
79 $self->{read_buf
} .= $$bref;
80 $self->process_request;
84 my MogileFS
::Connection
::Client
$self = shift;
85 my $done = $self->SUPER::write(@_);
89 delete $SLOW_WRITERS{$fd};
90 unless ($self->{pipelined
}) {
91 $self->{pipelined
} = 1;
92 push @
$PIPELINE, $self;
96 # stop reading if we can't write, otherwise we'll OOM
98 $SLOW_WRITERS{$fd} = clock_gettime
(CLOCK_MONOTONIC
);
106 my ($self, $line) = @_;
108 # if it's just 'help', 'h', '?', or something, do that
109 #if ((substr($line, 0, 1) eq '?') || ($line eq 'help')) {
110 # MogileFS::ProcManager->SendHelp($_[1]);
114 if ($line =~ /^!(\S+)(?:\s+(.+))?$/) {
115 my ($cmd, $args) = ($1, $2);
116 return $self->handle_admin_command($cmd, $args);
119 $self->watch_read(0);
120 MogileFS
::ProcManager
->EnqueueCommandRequest($line, $self);
123 sub handle_admin_command
{
124 my ($self, $cmd, $args) = @_;
127 if ($cmd =~ /^stats$/) {
128 # print out some stats on the queues
129 my $uptime = time() - MogileFS
::ProcManager
->server_starttime;
130 my $ccount = MogileFS
::ProcManager
->PendingQueryCount;
131 my $wcount = MogileFS
::ProcManager
->BoredQueryWorkerCount;
132 my $ipcount = MogileFS
::ProcManager
->QueriesInProgressCount;
133 my $stats = MogileFS
::ProcManager
->StatsHash;
134 push @out, "uptime $uptime",
135 "pending_queries $ccount",
136 "processing_queries $ipcount",
137 "bored_queryworkers $wcount",
138 map { "$_ $stats->{$_}" } sort keys %$stats;
140 } elsif ($cmd =~ /^shutdown/) {
141 print "User requested shutdown: $args\n";
142 kill 15, $$; # kill us, that kills our kids
144 } elsif ($cmd =~ /^jobs/) {
145 # dump out a list of running jobs and pids
146 MogileFS
::ProcManager
->foreach_job(sub {
147 my ($job, $ct, $desired, $pidlist) = @_;
148 push @out, "$job count $ct";
149 push @out, "$job desired $desired";
150 push @out, "$job pids " . join(' ', @
$pidlist);
153 } elsif ($cmd =~ /^want/) {
154 # !want <count> <jobclass>
155 # set the new desired staffing level for a class
156 if ($args =~ /^(\d+)\s+(\S+)/) {
157 my ($count, $job) = ($1, $2);
159 $count = 500 if $count > 500;
161 # now make sure it's a real job
162 if (MogileFS
::ProcManager
->is_monitor_good) {
163 if (MogileFS
::ProcManager
->is_valid_job($job)) {
164 MogileFS
::ProcManager
->request_job_process($job, $count);
165 push @out, "Now desiring $count children doing '$job'.";
167 my $classes = join(", ", MogileFS
::ProcManager
->valid_jobs);
168 push @out, "ERROR: Invalid class '$job'. Valid classes: $classes";
171 push @out, "ERROR: Monitor has not completed initial run yet\n";
174 push @out, "ERROR: usage: !want <count> <jobclass>";
177 } elsif ($cmd =~ /^to/) {
178 # !to <jobclass> <message>
179 # sends <message> to all children of <jobclass>
180 if ($args =~ /^(\S+)\s+(.+)/) {
181 my $ct = MogileFS
::ProcManager
->ImmediateSendToChildrenByJob($1, $2);
182 push @out, "Message sent to $ct children.";
185 push @out, "ERROR: usage: !to <jobclass> <message>";
188 } elsif ($cmd =~ /^queue/ || $cmd =~ /^pend/) {
189 MogileFS
::ProcManager
->foreach_pending_query(sub {
190 my ($client, $query) = @_;
194 } elsif ($cmd =~ /^watch/) {
195 if (MogileFS
::ProcManager
->RemoveErrorWatcher($self)) {
196 push @out, "Removed you from watcher list.";
198 MogileFS
::ProcManager
->AddErrorWatcher($self);
199 push @out, "Added you to watcher list.";
202 } elsif ($cmd =~ /^recent/) {
203 # show the most recent N queries
204 push @out, MogileFS
::ProcManager
->RecentQueries;
206 } elsif ($cmd =~ /^version/) {
207 # show the most recent N queries
208 push @out, $MogileFS::Server
::VERSION
;
211 MogileFS
::ProcManager
->SendHelp($self, $args);
214 $self->write(join("\r\n", @out) . "\r\n") if @out;
215 $self->write(".\r\n");
220 sub event_err
{ my $self = shift; $self->close; }
221 sub event_hup
{ my $self = shift; $self->close; }
223 # just note that we've died
225 # mark us as being dead
227 MogileFS
::ProcManager
->NoteDeadClient($self);
228 $self->SUPER::close(@_);
236 # indent-tabs-mode: nil