1 package MogileFS
::Worker
::Monitor
;
5 use base
'MogileFS::Worker';
7 'last_db_update', # devid -> time. update db less often than poll interval.
8 'last_test_write', # devid -> time. time we last tried writing to a device.
9 'skip_host', # hostid -> 1 if already noted dead (reset every loop)
10 'seen_hosts', # IP -> 1 (reset every loop)
11 'ua', # LWP::UserAgent for checking usage files
12 'iow', # MogileFS::IOStatWatcher object
13 'prev_data', # DB data from previous run
14 'devutil', # Running tally of device utilization
15 'events', # Queue of state events
18 use Danga
::Socket
1.56;
20 use MogileFS
::Util
qw(error debug encode_url_args);
21 use MogileFS
::IOStatWatcher
;
23 use constant UPDATE_DB_EVERY
=> 15;
26 my ($class, $psock) = @_;
27 my $self = fields
::new
($class);
28 $self->SUPER::new
($psock);
30 $self->{last_db_update
} = {};
31 $self->{last_test_write
} = {};
32 $self->{iow
} = MogileFS
::IOStatWatcher
->new;
33 $self->{prev_data
} = { domain
=> {}, class => {}, host
=> {},
35 $self->{devutil
} = { cur
=> {}, prev
=> {} };
40 sub watchdog_timeout
{
47 # we just forked from our parent process, also using Danga::Socket,
48 # so we need to lose all that state and start afresh.
51 my $iow = $self->{iow
};
53 my ($hostname, $stats) = @_;
55 while (my ($devid, $util) = each %$stats) {
56 # Lets not propagate devices that we accidentally find.
57 # This does hit the DB every time a device does not exist, so
58 # perhaps should add negative caching in the future.
59 $self->{devutil
}->{cur
}->{$devid} = $util;
60 my $dev = Mgd
::device_factory
()->get_by_id($devid);
62 $dev->set_observed_utilization($util);
70 # get db and note we're starting a run
71 debug
("Monitor running; scanning usage files");
74 $self->{skip_host
} = {}; # hostid -> 1 if already noted dead.
75 $self->{seen_hosts
} = {}; # IP -> 1
77 # now iterate over devices
78 MogileFS
::Device
->invalidate_cache;
79 MogileFS
::Host
->invalidate_cache;
81 foreach my $dev (MogileFS
::Device
->devices) {
82 next unless $dev->dstate->should_monitor;
83 next if $self->{skip_host
}{$dev->hostid};
84 $self->check_device($dev);
87 $iow->set_hosts(keys %{$self->{seen_hosts
}});
88 #$self->send_to_parent(":monitor_just_ran");
90 # Make sure we sleep for at least 2.5 seconds before running again.
91 # If there's a die above, the monitor will be restarted.
92 Danga
::Socket
->AddTimer(2.5, $main_monitor);
100 print STDERR
"New monitor for db data running\n";
104 my $prev_data = $self->{prev_data
};
105 my $db_data = $self->grab_all_data;
107 # Stack this up to ship back later.
109 $self->diff_data($db_data, $prev_data, $new_data, \
@events);
111 $self->{prev_data
} = $new_data;
112 $self->send_events_to_parent;
113 Danga
::Socket
->AddTimer(4, $db_monitor);
114 print STDERR
"New monitor for db finished\n";
118 # FIXME: Add a "read_from_parent" to ensure we pick up the response for
119 # populating the factories?
120 #$self->read_from_parent;
125 print STDERR
"New monitor running\n";
128 my $dev_factory = MogileFS
::Factory
::Device
->get_factory();
132 # Run check_devices2 to test host/devs. diff against old values.
133 for my $dev ($dev_factory->get_all) {
134 if (my $state = $self->is_iow_diff($dev)) {
135 $self->state_event('device', $dev->id, {utilization
=> $state});
137 $cur_iow->{$dev->id} = $self->{devutil
}->{cur
}->{$dev->id};
138 $self->check_device2($dev, \
@events);
141 $self->{devutil
}->{prev
} = $cur_iow;
142 # Set the IOWatcher hosts (once old monitor code has been disabled)
144 $self->send_events_to_parent;
146 $self->send_to_parent(":monitor_just_ran");
147 Danga
::Socket
->AddTimer(2.5, $new_monitor);
148 print STDERR
"New monitor finished\n";
152 Danga
::Socket
->EventLoop;
155 # --------------------------------------------------------------------------
157 # Flattens and flips events up to the parent. Can be huge on startup!
158 # Events: set type foo=bar&baz=quux
160 # setstate type id foo=bar&baz=quux
161 # Combined: ev_mode=set&ev_type=device&foo=bar
162 # ev_mode=setstate&ev_type=device&ev_id=1&foo=bar
163 sub send_events_to_parent
{
166 for my $ev (@
{$self->{events
}}) {
167 my ($mode, $type, $args) = @
$ev;
168 $args->{ev_mode
} = $mode;
169 $args->{ev_type
} = $type;
170 push(@flat, encode_url_args
($args));
173 $self->{events
} = [];
174 print STDERR
"SENDING STATE CHANGES ", join(' ', ':monitor_events', @flat), "\n";
175 $self->send_to_parent(join(' ', ':monitor_events', @flat));
179 push(@
{$_[0]->{events
}}, $_[1]);
183 # Allow callers to use shorthand
184 $_[3]->{ev_id
} = $_[2];
185 $_[0]->add_event(['set', $_[1], $_[3]]);
187 sub remove_event
{ $_[0]->add_event(['remove', $_[1], { ev_id
=> $_[2] }]); }
189 $_[3]->{ev_id
} = $_[2];
190 $_[0]->add_event(['setstate', $_[1], $_[3]]);
194 my ($self, $dev) = @_;
195 my $devid = $dev->id;
196 my $p = $self->{devutil
}->{prev
}->{$devid};
197 my $c = $self->{devutil
}->{cur
}->{$devid};
198 if ( ! defined $p || $p ne $c ) {
205 my ($self, $db_data, $prev_data, $new_data, $ev) = @_;
207 for my $type (keys %{$db_data}) {
208 my $d_data = $db_data->{$type};
209 my $p_data = $prev_data->{$type};
212 for my $item (@
{$d_data}) {
213 my $id = $type eq 'domain' ?
$item->{dmid
}
214 : $type eq 'class' ?
$item->{dmid
} . '-' . $item->{classid
}
215 : $type eq 'host' ?
$item->{hostid
}
216 : $type eq 'device' ?
$item->{devid
} : die "Unknown type";
217 my $old = delete $p_data->{$id};
218 # Special case: for devices, we don't care if mb_asof changes.
219 # FIXME: Change the grab routine (or filter there?).
220 delete $item->{mb_asof
} if $type eq 'device';
221 if (!$old || $self->diff_hash($old, $item)) {
222 $self->set_event($type, $id, { %$item });
224 $n_data->{$id} = $item;
226 for my $id (keys %{$p_data}) {
227 $self->remove_event($type, $id);
230 $new_data->{$type} = $n_data;
234 # returns 1 if the hashes are different.
236 my ($self, $old, $new) = @_;
239 map { $keys{$_}++ } keys %$old, keys %$new;
240 for my $k (keys %keys) {
241 return 1 unless ((exists $old->{$k} &&
242 exists $new->{$k}) &&
243 ( (! defined $old->{$k} && ! defined $new->{$k}) ||
244 ($old->{$k} eq $new->{$k}) )
252 my $sto = Mgd
::get_store
();
254 # Normalize the domain data to the rest to simplify the differ.
255 # FIXME: Once new objects are swapped in, fix the original
256 my %dom = $sto->get_all_domains;
258 while (my ($name, $id) = each %dom) {
259 push(@fixed_dom, { namespace
=> $name, dmid
=> $id });
261 my %ret = ( domain
=> \
@fixed_dom,
262 class => [$sto->get_all_classes],
263 host
=> [$sto->get_all_hosts],
264 device
=> [$sto->get_all_devices], );
270 return $self->{ua
} ||= LWP
::UserAgent
->new(
271 timeout
=> MogileFS
::Config
->config('conn_timeout') || 2,
277 my ($self, $dev, $ev) = @_;
279 my $devid = $dev->id;
280 my $host = $dev->host;
282 my $port = $host->http_port;
283 my $get_port = $host->http_get_port; # || $port;
284 my $hostip = $host->ip;
285 my $url = $dev->usage_url;
287 $self->{seen_hosts
}{$hostip} = 1;
289 # now try to get the data with a short timeout
290 my $timeout = MogileFS
::Config
->config('conn_timeout') || 2;
291 my $start_time = Time
::HiRes
::time();
294 my $response = $ua->get($url);
295 my $res_time = Time
::HiRes
::time();
297 $hostip ||= 'unknown';
298 $get_port ||= 'unknown';
299 $devid ||= 'unknown';
300 $timeout ||= 'unknown';
302 unless ($response->is_success) {
303 my $failed_after = $res_time - $start_time;
304 if ($failed_after < 0.5) {
305 $self->state_event('device', $dev->id, {observed_state
=> 'unreachable'})
306 if (!$dev->observed_unreachable);
307 error
("Port $get_port not listening on $hostip ($url)? Error was: " . $response->status_line);
309 $failed_after = sprintf("%.02f", $failed_after);
310 $self->state_event('host', $dev->hostid, {observed_state
=> 'unreachable'})
311 if (!$host->observed_unreachable);
312 $self->{skip_host
}{$dev->hostid} = 1;
313 error
("Timeout contacting $hostip dev $devid ($url): took $failed_after seconds out of $timeout allowed");
318 # at this point we can reach the host
319 $self->state_event('host', $dev->hostid, {observed_state
=> 'reachable'})
320 if (!$host->observed_reachable);
321 $self->{iow
}->restart_monitoring_if_needed($hostip);
324 my $data = $response->content;
325 foreach (split(/\r?\n/, $data)) {
326 next unless /^(\w+)\s*:\s*(.+)$/;
330 my ($used, $total) = ($stats{used
}, $stats{total
});
331 unless ($used && $total) {
332 $used = "<undef>" unless defined $used;
333 $total = "<undef>" unless defined $total;
334 my $clen = length($data || "");
335 error
("dev$devid reports used = $used, total = $total, content-length: $clen, error?");
339 # only update database every ~15 seconds per device
340 my $last_update = $self->{last_db_update
}{$dev->id} || 0;
341 my $next_update = $last_update + UPDATE_DB_EVERY
;
343 if ($now >= $next_update) {
344 Mgd
::get_store
()->update_device_usage(mb_total
=> int($total / 1024),
345 mb_used
=> int($used / 1024),
347 $self->{last_db_update
}{$devid} = $now;
350 # next if we're not going to try this now
351 # FIXME: Uncomment this to throttle test writes again.
352 #return if ($self->{last_test_write}{$devid} || 0) + UPDATE_DB_EVERY > $now;
353 $self->{last_test_write
}{$devid} = $now;
355 # now we want to check if this device is writeable
357 # first, create the test-write directory. this will return
358 # immediately after the first time, as the 'create_directory'
359 # function caches what it's already created.
360 $dev->create_directory("/dev$devid/test-write");
362 my $num = int(rand 100); # this was "$$-$now" before, but we don't yet have a cleaner in mogstored for these files
363 my $puturl = "http://$hostip:$port/dev$devid/test-write/test-write-$num";
364 my $content = "time=$now rand=$num";
365 my $req = HTTP
::Request
->new(PUT
=> $puturl);
366 $req->content($content);
368 # TODO: guard against race-conditions with double-check on failure
370 # now, depending on what happens
371 my $resp = $ua->request($req);
372 if ($resp->is_success) {
373 # now let's get it back to verify; note we use the get_port to verify that
374 # the distinction works (if we have one)
375 my $geturl = "http://$hostip:$get_port/dev$devid/test-write/test-write-$num";
376 my $testwrite = $ua->get($geturl);
378 # if success and the content matches, mark it writeable
379 if ($testwrite->is_success && $testwrite->content eq $content) {
380 $self->state_event('device', $devid, {observed_state
=> 'writeable'})
381 if (!$dev->observed_writeable);
382 debug
("dev$devid: used = $used, total = $total, writeable = 1");
387 # if we fall through to here, then we know that something is not so good, so mark it readable
388 # which is guaranteed given we even tested writeability
389 $self->state_event('device', $devid, {observed_state
=> 'readable'})
390 if (!$dev->observed_readable);
391 debug
("dev$devid: used = $used, total = $total, writeable = 0");
395 my ($self, $dev) = @_;
397 my $devid = $dev->id;
398 my $host = $dev->host;
400 my $port = $host->http_port;
401 my $get_port = $host->http_get_port; # || $port;
402 my $hostip = $host->ip;
403 my $url = $dev->usage_url;
405 $self->{seen_hosts
}{$hostip} = 1;
407 # now try to get the data with a short timeout
408 my $timeout = MogileFS
::Config
->config('conn_timeout') || 2;
409 my $start_time = Time
::HiRes
::time();
412 my $response = $ua->get($url);
413 my $res_time = Time
::HiRes
::time();
415 $hostip ||= 'unknown';
416 $get_port ||= 'unknown';
417 $devid ||= 'unknown';
418 $timeout ||= 'unknown';
420 unless ($response->is_success) {
421 my $failed_after = $res_time - $start_time;
422 if ($failed_after < 0.5) {
423 $self->broadcast_device_unreachable($dev->id);
424 error
("Port $get_port not listening on $hostip ($url)? Error was: " . $response->status_line);
426 $failed_after = sprintf("%.02f", $failed_after);
427 $self->broadcast_host_unreachable($dev->hostid);
428 $self->{skip_host
}{$dev->hostid} = 1;
429 error
("Timeout contacting $hostip dev $devid ($url): took $failed_after seconds out of $timeout allowed");
434 # at this point we can reach the host
435 $self->broadcast_host_reachable($dev->hostid);
436 $self->{iow
}->restart_monitoring_if_needed($hostip);
439 my $data = $response->content;
440 foreach (split(/\r?\n/, $data)) {
441 next unless /^(\w+)\s*:\s*(.+)$/;
445 my ($used, $total) = ($stats{used
}, $stats{total
});
446 unless ($used && $total) {
447 $used = "<undef>" unless defined $used;
448 $total = "<undef>" unless defined $total;
449 my $clen = length($data || "");
450 error
("dev$devid reports used = $used, total = $total, content-length: $clen, error?");
454 # only update database every ~15 seconds per device
455 my $last_update = $self->{last_db_update
}{$dev->id} || 0;
456 my $next_update = $last_update + UPDATE_DB_EVERY
;
458 if ($now >= $next_update) {
459 Mgd
::get_store
()->update_device_usage(mb_total
=> int($total / 1024),
460 mb_used
=> int($used / 1024),
462 $self->{last_db_update
}{$devid} = $now;
465 # next if we're not going to try this now
466 return if ($self->{last_test_write
}{$devid} || 0) + UPDATE_DB_EVERY
> $now;
467 $self->{last_test_write
}{$devid} = $now;
469 # now we want to check if this device is writeable
471 # first, create the test-write directory. this will return
472 # immediately after the first time, as the 'create_directory'
473 # function caches what it's already created.
474 $dev->create_directory("/dev$devid/test-write");
476 my $num = int(rand 100); # this was "$$-$now" before, but we don't yet have a cleaner in mogstored for these files
477 my $puturl = "http://$hostip:$port/dev$devid/test-write/test-write-$num";
478 my $content = "time=$now rand=$num";
479 my $req = HTTP
::Request
->new(PUT
=> $puturl);
480 $req->content($content);
482 # TODO: guard against race-conditions with double-check on failure
484 # now, depending on what happens
485 my $resp = $ua->request($req);
486 if ($resp->is_success) {
487 # now let's get it back to verify; note we use the get_port to verify that
488 # the distinction works (if we have one)
489 my $geturl = "http://$hostip:$get_port/dev$devid/test-write/test-write-$num";
490 my $testwrite = $ua->get($geturl);
492 # if success and the content matches, mark it writeable
493 if ($testwrite->is_success && $testwrite->content eq $content) {
494 $self->broadcast_device_writeable($devid);
495 debug
("dev$devid: used = $used, total = $total, writeable = 1");
500 # if we fall through to here, then we know that something is not so good, so mark it readable
501 # which is guaranteed given we even tested writeability
502 $self->broadcast_device_readable($devid);
503 debug
("dev$devid: used = $used, total = $total, writeable = 0");
511 # indent-tabs-mode: nil