device: reuse HTTP connections for MKCOL
[MogileFS-Server.git] / lib / MogileFS / Worker.pm
blobf3a74dff33761e5272e11e3021773c6d370b9896
1 package MogileFS::Worker;
2 use strict;
3 use fields ('psock', # socket for parent/child communications
4 'last_bcast_state', # "{device|host}-$devid" => [$time, {alive|dead}]
5 'readbuf', # unparsed data from parent
6 'monitor_has_run', # true once we've heard of the monitor job being alive
7 'last_ping', # time we last said we're alive
8 'woken_up', # bool: if we've been woken up
9 'last_wake', # hashref: { $class -> time() } when we last woke up a certain job class
10 'queue_depth', # depth of a queue we queried
11 'queue_todo', # aref of hrefs of work sent from parent
14 use MogileFS::Util qw(error eurl decode_url_args apply_state_events);
15 use MogileFS::Server;
17 use vars (
18 '$got_live_vs_die', # local'ized scalarref flag for whether we've
19 # gotten a live-vs-die instruction from parent
22 sub new {
23 my ($self, $psock) = @_;
24 $self = fields::new($self) unless ref $self;
26 $self->{psock} = $psock;
27 $self->{readbuf} = '';
28 $self->{last_bcast_state} = {};
29 $self->{monitor_has_run} = MogileFS::ProcManager->is_monitor_good;
30 $self->{last_ping} = 0;
31 $self->{last_wake} = {};
32 $self->{queue_depth} = {};
33 $self->{queue_todo} = {};
35 IO::Handle::blocking($psock, 0);
36 return $self;
39 sub psock_fd {
40 my $self = shift;
41 return fileno($self->{psock});
44 sub psock {
45 my $self = shift;
46 return $self->{psock};
49 sub validate_dbh {
50 return Mgd::validate_dbh();
53 sub monitor_has_run {
54 my $self = shift;
55 return $self->{monitor_has_run} ? 1 : 0;
58 sub forget_that_monitor_has_run {
59 my $self = shift;
60 $self->{monitor_has_run} = 0;
63 sub wait_for_monitor {
64 my $self = shift;
65 while (! $self->monitor_has_run) {
66 $self->read_from_parent(1);
67 $self->still_alive;
71 # method that workers can call just to write something to the parent, so worker
72 # doesn't get killed. (during idle/slow operation, say)
73 # returns current time, so caller can avoid a time() call as well, for its loop
74 sub still_alive {
75 my $self = shift;
76 my $now = time();
77 if ($now > $self->{last_ping} + ($self->watchdog_timeout / 4)) {
78 $self->send_to_parent(":still_alive"); # a no-op, just for the watchdog
79 $self->{last_ping} = $now;
81 return $now;
84 sub send_to_parent {
85 my $self = shift;
87 # can be called as package method: MogileFS::Worker->send_to_parent...
88 unless (ref $self) {
89 $self = MogileFS::ProcManager->is_child
90 or return;
93 my $write = "$_[0]\r\n";
94 my $totallen = length $write;
95 my $rv = syswrite($self->{psock}, $write);
96 return 1 if defined $rv && $rv == $totallen;
97 die "Error writing to parent process: $!" if $! && ! $!{EAGAIN};
99 $rv ||= 0; # could've been undef, if EAGAIN immediately.
100 my $remain = $totallen - $rv;
101 my $offset = $rv;
102 while ($remain > 0) {
103 MogileFS::Util::wait_for_writeability(fileno($self->{psock}), 30)
104 or die "Parent not writable in 30 seconds";
106 $rv = syswrite($self->{psock}, $write, $remain, $offset);
107 die "Error writing to parent process (in loop): $!" if $! && ! $!{EAGAIN};
108 if ($rv) {
109 $remain -= $rv;
110 $offset += $rv;
113 die "remain is negative: $remain" if $remain < 0;
114 return 1;
117 # override in children
118 sub watchdog_timeout {
119 return 10;
122 # should be overridden by workers to process worker-specific directives
123 # from the parent process. return 1 if you recognize the command, 0 otherwise.
124 sub process_line {
125 my ($self, $lineref) = @_;
126 return 0;
129 sub read_from_parent {
130 my $self = shift;
131 my $timeout = shift || 0;
132 my $psock = $self->{psock};
134 # while things are immediately available,
135 # (or optionally sleep a bit)
136 while (MogileFS::Util::wait_for_readability(fileno($psock), $timeout)) {
137 $timeout = 0; # only wait on the timeout for the first read.
138 my $buf;
139 my $rv = sysread($psock, $buf, Mgd::UNIX_RCVBUF_SIZE());
140 if (!$rv) {
141 if (defined $rv) {
142 die "While reading pipe from parent, got EOF. Parent's gone. Quitting.\n";
143 } else {
144 die "Error reading pipe from parent: $!\n";
148 if ($Mgd::POST_SLEEP_DEBUG) {
149 my $out = $buf;
150 $out =~ s/\s+$//;
151 warn "proc ${self}[$$] read: [$out]\n"
153 $self->{readbuf} .= $buf;
155 while ($self->{readbuf} =~ s/^(.+?)\r?\n//) {
156 my $line = $1;
158 next if $self->process_generic_command(\$line);
159 my $ok = $self->process_line(\$line);
160 unless ($ok) {
161 error("Unrecognized command from parent: $line");
167 sub parent_ping {
168 my $self = shift;
169 my $psock = $self->{psock};
170 $self->send_to_parent(':ping');
172 my $got_reply = 0;
173 die "recursive parent_ping!" if $got_live_vs_die;
174 local $got_live_vs_die = \$got_reply;
176 my $loops = 0;
178 while (!$got_reply) {
179 $self->read_from_parent;
180 return if $got_reply;
182 $loops++;
183 select undef, undef, undef, 0.20;
184 if ($loops > 5) {
185 warn "No simple reply from parent to child $self [$$] in $loops 0.2second loops.\n";
186 die "No answer in 4 seconds from parent to child $self [$$], dying" if $loops > 20;
191 # tries to parse generic (not job-specific) commands sent from parent
192 # to child. returns 1 on success, or 0 if command given isn't generic,
193 # and child should parse.
194 # lineref doesn't have \r\n at end.
195 sub process_generic_command {
196 my ($self, $lineref) = @_;
197 return 0 unless $$lineref =~ /^:/; # all generic commands start with colon
199 if ($$lineref =~ /^:shutdown/) {
200 $$got_live_vs_die = 1 if $got_live_vs_die;
201 exit 0;
204 if ($$lineref =~ /^:stay_alive/) {
205 $$got_live_vs_die = 1 if $got_live_vs_die;
206 return 1;
209 if ($$lineref =~ /^:monitor_events/) {
210 apply_state_events($lineref);
211 return 1;
214 if ($$lineref =~ /^:monitor_has_run/) {
215 $self->{monitor_has_run} = 1;
216 return 1;
219 if ($$lineref =~ /^:wake_up/) {
220 $self->{woken_up} = 1;
221 return 1;
224 if ($$lineref =~ /^:set_config_from_parent (\S+) (.+)/) {
225 # the 'no_broadcast' API keeps us from looping forever.
226 MogileFS::Config->set_config_no_broadcast($1, $2);
227 return 1;
230 # queue_name depth
231 if ($$lineref =~ /^:queue_depth (\w+) (\d+)/) {
232 $self->queue_depth($1, $2);
233 return 1;
236 # queue_name encoded_item
237 if ($$lineref =~ /^:queue_todo (\w+) (.+)/) {
238 # TODO: Use the accessor.
239 push(@{$self->{queue_todo}->{$1}}, decode_url_args(\$2));
240 return 1;
243 # TODO: warn on unknown commands?
245 return 0;
248 sub queue_depth {
249 my MogileFS::Worker $self = shift;
250 my $type = shift;
251 $self->{queue_depth}->{$type} ||= 0;
252 return $self->{queue_depth}->{$type} unless @_;
253 return $self->{queue_depth}->{$type} = shift;
256 sub queue_todo {
257 my MogileFS::Worker $self = shift;
258 my $type = shift;
259 $self->{queue_todo}->{$type} ||= [];
260 push(@{$self->{queue_todo}->{$type}}, @_) if @_;
261 return $self->{queue_todo}->{$type};
264 sub was_woken_up {
265 my MogileFS::Worker $self = shift;
266 return $self->{woken_up};
269 sub forget_woken_up {
270 my MogileFS::Worker $self = shift;
271 $self->{woken_up} = 0;
274 # don't wake processes more than once a second... not necessary.
275 sub wake_a {
276 my ($self, $class) = @_;
277 my $now = time();
278 return if ($self->{last_wake}{$class}||0) == $now;
279 $self->{last_wake}{$class} = $now;
280 $self->send_to_parent(":wake_a $class");
285 # Local Variables:
286 # mode: perl
287 # c-basic-indent: 4
288 # indent-tabs-mode: nil
289 # End: