tracker: client fairness, backpressure, and expiry
[MogileFS-Server.git] / lib / MogileFS / Connection / Client.pm
blob121da5b935947c4c36131c4772382c7856aeb205
1 # A client is a user connection for sending requests to us. Requests
2 # can either be normal user requests to be sent to a QueryWorker
3 # or management requests that start with a !.
5 package MogileFS::Connection::Client;
7 use strict;
8 use Danga::Socket ();
9 use base qw{Danga::Socket};
10 use IO::Handle;
11 use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
12 use MogileFS::Util qw(error);
14 use fields qw{read_buf pipelined};
16 my %SLOW_WRITERS = ();
17 my $EXPTIME = 120;
18 my $PIPELINE = [];
20 sub Reset {
21 %SLOW_WRITERS = ();
22 $PIPELINE = [];
25 sub WriterWatchDog {
26 my $dmap = Danga::Socket->DescriptorMap;
27 my $old = clock_gettime(CLOCK_MONOTONIC) - $EXPTIME;
28 foreach my $fd (keys %SLOW_WRITERS) {
29 my $last_write = $SLOW_WRITERS{$fd};
30 next if $last_write > $old;
32 if (my $ds = $dmap->{$fd}) {
33 error('write timeout expired: '.$ds->as_string);
34 $ds->close;
35 } else {
36 error("fd=$fd not known to Danga::Socket(!?), ignoring");
38 delete $SLOW_WRITERS{$fd};
42 sub ProcessPipelined {
43 my $run = $PIPELINE;
44 $PIPELINE = [];
45 foreach my MogileFS::Connection::Client $clref (@$run) {
46 $clref->{pipelined} = undef;
47 $clref->process_request or $clref->watch_read(1);
51 sub new {
52 my $self = shift;
53 $self = fields::new($self) unless ref $self;
54 $self->SUPER::new( @_ );
55 IO::Handle::blocking($self->{sock}, 0);
56 delete $SLOW_WRITERS{$self->{fd}};
57 $self->watch_read(1);
58 return $self;
61 # Client
63 sub process_request {
64 my MogileFS::Connection::Client $self = shift;
66 while ($self->{read_buf} =~ s/^(.*?)\r?\n//) {
67 next unless length $1;
68 $self->handle_request($1);
69 return 1;
74 sub event_read {
75 my MogileFS::Connection::Client $self = shift;
77 my $bref = $self->read(1024);
78 return $self->close unless defined $bref;
79 $self->{read_buf} .= $$bref;
80 $self->process_request;
83 sub write {
84 my MogileFS::Connection::Client $self = shift;
85 my $done = $self->SUPER::write(@_);
86 my $fd = $self->{fd};
87 if ($done) {
88 if (defined $fd) {
89 delete $SLOW_WRITERS{$fd};
90 unless ($self->{pipelined}) {
91 $self->{pipelined} = 1;
92 push @$PIPELINE, $self;
95 } else {
96 # stop reading if we can't write, otherwise we'll OOM
97 if (defined $fd) {
98 $SLOW_WRITERS{$fd} = clock_gettime(CLOCK_MONOTONIC);
99 $self->watch_read(0);
102 $done;
105 sub handle_request {
106 my ($self, $line) = @_;
108 # if it's just 'help', 'h', '?', or something, do that
109 #if ((substr($line, 0, 1) eq '?') || ($line eq 'help')) {
110 # MogileFS::ProcManager->SendHelp($_[1]);
111 # return;
114 if ($line =~ /^!(\S+)(?:\s+(.+))?$/) {
115 my ($cmd, $args) = ($1, $2);
116 return $self->handle_admin_command($cmd, $args);
119 $self->watch_read(0);
120 MogileFS::ProcManager->EnqueueCommandRequest($line, $self);
123 sub handle_admin_command {
124 my ($self, $cmd, $args) = @_;
126 my @out;
127 if ($cmd =~ /^stats$/) {
128 # print out some stats on the queues
129 my $uptime = time() - MogileFS::ProcManager->server_starttime;
130 my $ccount = MogileFS::ProcManager->PendingQueryCount;
131 my $wcount = MogileFS::ProcManager->BoredQueryWorkerCount;
132 my $ipcount = MogileFS::ProcManager->QueriesInProgressCount;
133 my $stats = MogileFS::ProcManager->StatsHash;
134 push @out, "uptime $uptime",
135 "pending_queries $ccount",
136 "processing_queries $ipcount",
137 "bored_queryworkers $wcount",
138 map { "$_ $stats->{$_}" } sort keys %$stats;
140 } elsif ($cmd =~ /^shutdown/) {
141 print "User requested shutdown: $args\n";
142 kill 15, $$; # kill us, that kills our kids
144 } elsif ($cmd =~ /^jobs/) {
145 # dump out a list of running jobs and pids
146 MogileFS::ProcManager->foreach_job(sub {
147 my ($job, $ct, $desired, $pidlist) = @_;
148 push @out, "$job count $ct";
149 push @out, "$job desired $desired";
150 push @out, "$job pids " . join(' ', @$pidlist);
153 } elsif ($cmd =~ /^want/) {
154 # !want <count> <jobclass>
155 # set the new desired staffing level for a class
156 if ($args =~ /^(\d+)\s+(\S+)/) {
157 my ($count, $job) = ($1, $2);
159 $count = 500 if $count > 500;
161 # now make sure it's a real job
162 if (MogileFS::ProcManager->is_monitor_good) {
163 if (MogileFS::ProcManager->is_valid_job($job)) {
164 MogileFS::ProcManager->request_job_process($job, $count);
165 push @out, "Now desiring $count children doing '$job'.";
166 } else {
167 my $classes = join(", ", MogileFS::ProcManager->valid_jobs);
168 push @out, "ERROR: Invalid class '$job'. Valid classes: $classes";
170 } else {
171 push @out, "ERROR: Monitor has not completed initial run yet\n";
173 } else {
174 push @out, "ERROR: usage: !want <count> <jobclass>";
177 } elsif ($cmd =~ /^to/) {
178 # !to <jobclass> <message>
179 # sends <message> to all children of <jobclass>
180 if ($args =~ /^(\S+)\s+(.+)/) {
181 my $ct = MogileFS::ProcManager->ImmediateSendToChildrenByJob($1, $2);
182 push @out, "Message sent to $ct children.";
184 } else {
185 push @out, "ERROR: usage: !to <jobclass> <message>";
188 } elsif ($cmd =~ /^queue/ || $cmd =~ /^pend/) {
189 MogileFS::ProcManager->foreach_pending_query(sub {
190 my ($client, $query) = @_;
191 push @out, $query;
194 } elsif ($cmd =~ /^watch/) {
195 if (MogileFS::ProcManager->RemoveErrorWatcher($self)) {
196 push @out, "Removed you from watcher list.";
197 } else {
198 MogileFS::ProcManager->AddErrorWatcher($self);
199 push @out, "Added you to watcher list.";
202 } elsif ($cmd =~ /^recent/) {
203 # show the most recent N queries
204 push @out, MogileFS::ProcManager->RecentQueries;
206 } elsif ($cmd =~ /^version/) {
207 # show the most recent N queries
208 push @out, $MogileFS::Server::VERSION;
210 } else {
211 MogileFS::ProcManager->SendHelp($self, $args);
214 $self->write(join("\r\n", @out) . "\r\n") if @out;
215 $self->write(".\r\n");
216 return;
219 # Client
220 sub event_err { my $self = shift; $self->close; }
221 sub event_hup { my $self = shift; $self->close; }
223 # just note that we've died
224 sub close {
225 # mark us as being dead
226 my $self = shift;
227 MogileFS::ProcManager->NoteDeadClient($self);
228 $self->SUPER::close(@_);
233 # Local Variables:
234 # mode: perl
235 # c-basic-indent: 4
236 # indent-tabs-mode: nil
237 # End: