Convert to new Device objects.
[MogileFS-Server.git] / lib / MogileFS / Worker / Monitor.pm
blobf76d4935ecb8b95f22e3d107c09d51af5b5773b4
1 package MogileFS::Worker::Monitor;
2 use strict;
3 use warnings;
5 use base 'MogileFS::Worker';
6 use fields (
7 'last_db_update', # devid -> time. update db less often than poll interval.
8 'last_test_write', # devid -> time. time we last tried writing to a device.
9 'skip_host', # hostid -> 1 if already noted dead (reset every loop)
10 'seen_hosts', # IP -> 1 (reset every loop)
11 'ua', # LWP::UserAgent for checking usage files
12 'iow', # MogileFS::IOStatWatcher object
13 'prev_data', # DB data from previous run
14 'devutil', # Running tally of device utilization
15 'events', # Queue of state events
18 use Danga::Socket 1.56;
19 use MogileFS::Config;
20 use MogileFS::Util qw(error debug encode_url_args);
21 use MogileFS::IOStatWatcher;
23 use constant UPDATE_DB_EVERY => 15;
25 sub new {
26 my ($class, $psock) = @_;
27 my $self = fields::new($class);
28 $self->SUPER::new($psock);
30 $self->{last_db_update} = {};
31 $self->{last_test_write} = {};
32 $self->{iow} = MogileFS::IOStatWatcher->new;
33 $self->{prev_data} = { domain => {}, class => {}, host => {},
34 device => {} };
35 $self->{devutil} = { cur => {}, prev => {} };
36 $self->{events} = [];
37 return $self;
40 sub watchdog_timeout {
41 30;
44 sub work {
45 my $self = shift;
47 # we just forked from our parent process, also using Danga::Socket,
48 # so we need to lose all that state and start afresh.
49 Danga::Socket->Reset;
51 my $iow = $self->{iow};
52 $iow->on_stats(sub {
53 my ($hostname, $stats) = @_;
55 while (my ($devid, $util) = each %$stats) {
56 # Lets not propagate devices that we accidentally find.
57 # This does hit the DB every time a device does not exist, so
58 # perhaps should add negative caching in the future.
59 $self->{devutil}->{cur}->{$devid} = $util;
60 my $dev = Mgd::device_factory()->get_by_id($devid);
61 next unless $dev;
62 $dev->set_observed_utilization($util);
64 });
66 my $main_monitor;
67 $main_monitor = sub {
68 $self->parent_ping;
70 # get db and note we're starting a run
71 debug("Monitor running; scanning usage files");
72 $self->validate_dbh;
74 $self->{skip_host} = {}; # hostid -> 1 if already noted dead.
75 $self->{seen_hosts} = {}; # IP -> 1
77 # now iterate over devices
78 MogileFS::Device->invalidate_cache;
79 MogileFS::Host->invalidate_cache;
81 foreach my $dev (MogileFS::Device->devices) {
82 next unless $dev->dstate->should_monitor;
83 next if $self->{skip_host}{$dev->hostid};
84 $self->check_device($dev);
87 $iow->set_hosts(keys %{$self->{seen_hosts}});
88 #$self->send_to_parent(":monitor_just_ran");
90 # Make sure we sleep for at least 2.5 seconds before running again.
91 # If there's a die above, the monitor will be restarted.
92 Danga::Socket->AddTimer(2.5, $main_monitor);
95 $main_monitor->();
97 my $db_monitor;
98 $db_monitor = sub {
99 $self->parent_ping;
100 print STDERR "New monitor for db data running\n";
101 $self->validate_dbh;
103 my $new_data = {};
104 my $prev_data = $self->{prev_data};
105 my $db_data = $self->grab_all_data;
107 # Stack this up to ship back later.
108 my @events = ();
109 $self->diff_data($db_data, $prev_data, $new_data, \@events);
111 $self->{prev_data} = $new_data;
112 $self->send_events_to_parent;
113 Danga::Socket->AddTimer(4, $db_monitor);
114 print STDERR "New monitor for db finished\n";
117 $db_monitor->();
118 # FIXME: Add a "read_from_parent" to ensure we pick up the response for
119 # populating the factories?
120 #$self->read_from_parent;
122 my $new_monitor;
123 $new_monitor = sub {
124 $self->parent_ping;
125 print STDERR "New monitor running\n";
126 $self->validate_dbh;
128 my $dev_factory = MogileFS::Factory::Device->get_factory();
130 my $cur_iow = {};
131 my @events = ();
132 # Run check_devices2 to test host/devs. diff against old values.
133 for my $dev ($dev_factory->get_all) {
134 if (my $state = $self->is_iow_diff($dev)) {
135 $self->state_event('device', $dev->id, {utilization => $state});
137 $cur_iow->{$dev->id} = $self->{devutil}->{cur}->{$dev->id};
138 $self->check_device2($dev, \@events);
141 $self->{devutil}->{prev} = $cur_iow;
142 # Set the IOWatcher hosts (once old monitor code has been disabled)
144 $self->send_events_to_parent;
146 $self->send_to_parent(":monitor_just_ran");
147 Danga::Socket->AddTimer(2.5, $new_monitor);
148 print STDERR "New monitor finished\n";
151 $new_monitor->();
152 Danga::Socket->EventLoop;
155 # --------------------------------------------------------------------------
157 # Flattens and flips events up to the parent. Can be huge on startup!
158 # Events: set type foo=bar&baz=quux
159 # remove type id
160 # setstate type id foo=bar&baz=quux
161 # Combined: ev_mode=set&ev_type=device&foo=bar
162 # ev_mode=setstate&ev_type=device&ev_id=1&foo=bar
163 sub send_events_to_parent {
164 my $self = shift;
165 my @flat = ();
166 for my $ev (@{$self->{events}}) {
167 my ($mode, $type, $args) = @$ev;
168 $args->{ev_mode} = $mode;
169 $args->{ev_type} = $type;
170 push(@flat, encode_url_args($args));
172 return unless @flat;
173 $self->{events} = [];
174 print STDERR "SENDING STATE CHANGES ", join(' ', ':monitor_events', @flat), "\n";
175 $self->send_to_parent(join(' ', ':monitor_events', @flat));
178 sub add_event {
179 push(@{$_[0]->{events}}, $_[1]);
182 sub set_event {
183 # Allow callers to use shorthand
184 $_[3]->{ev_id} = $_[2];
185 $_[0]->add_event(['set', $_[1], $_[3]]);
187 sub remove_event { $_[0]->add_event(['remove', $_[1], { ev_id => $_[2] }]); }
188 sub state_event {
189 $_[3]->{ev_id} = $_[2];
190 $_[0]->add_event(['setstate', $_[1], $_[3]]);
193 sub is_iow_diff {
194 my ($self, $dev) = @_;
195 my $devid = $dev->id;
196 my $p = $self->{devutil}->{prev}->{$devid};
197 my $c = $self->{devutil}->{cur}->{$devid};
198 if ( ! defined $p || $p ne $c ) {
199 return $c;
201 return undef;
204 sub diff_data {
205 my ($self, $db_data, $prev_data, $new_data, $ev) = @_;
207 for my $type (keys %{$db_data}) {
208 my $d_data = $db_data->{$type};
209 my $p_data = $prev_data->{$type};
210 my $n_data = {};
212 for my $item (@{$d_data}) {
213 my $id = $type eq 'domain' ? $item->{dmid}
214 : $type eq 'class' ? $item->{dmid} . '-' . $item->{classid}
215 : $type eq 'host' ? $item->{hostid}
216 : $type eq 'device' ? $item->{devid} : die "Unknown type";
217 my $old = delete $p_data->{$id};
218 # Special case: for devices, we don't care if mb_asof changes.
219 # FIXME: Change the grab routine (or filter there?).
220 delete $item->{mb_asof} if $type eq 'device';
221 if (!$old || $self->diff_hash($old, $item)) {
222 $self->set_event($type, $id, { %$item });
224 $n_data->{$id} = $item;
226 for my $id (keys %{$p_data}) {
227 $self->remove_event($type, $id);
230 $new_data->{$type} = $n_data;
234 # returns 1 if the hashes are different.
235 sub diff_hash {
236 my ($self, $old, $new) = @_;
238 my %keys = ();
239 map { $keys{$_}++ } keys %$old, keys %$new;
240 for my $k (keys %keys) {
241 return 1 unless ((exists $old->{$k} &&
242 exists $new->{$k}) &&
243 ( (! defined $old->{$k} && ! defined $new->{$k}) ||
244 ($old->{$k} eq $new->{$k}) )
247 return 0;
250 sub grab_all_data {
251 my $self = shift;
252 my $sto = Mgd::get_store();
254 # Normalize the domain data to the rest to simplify the differ.
255 # FIXME: Once new objects are swapped in, fix the original
256 my %dom = $sto->get_all_domains;
257 my @fixed_dom = ();
258 while (my ($name, $id) = each %dom) {
259 push(@fixed_dom, { namespace => $name, dmid => $id });
261 my %ret = ( domain => \@fixed_dom,
262 class => [$sto->get_all_classes],
263 host => [$sto->get_all_hosts],
264 device => [$sto->get_all_devices], );
265 return \%ret;
268 sub ua {
269 my $self = shift;
270 return $self->{ua} ||= LWP::UserAgent->new(
271 timeout => MogileFS::Config->config('conn_timeout') || 2,
272 keep_alive => 20,
276 sub check_device2 {
277 my ($self, $dev, $ev) = @_;
279 my $devid = $dev->id;
280 my $host = $dev->host;
282 my $port = $host->http_port;
283 my $get_port = $host->http_get_port; # || $port;
284 my $hostip = $host->ip;
285 my $url = $dev->usage_url;
287 $self->{seen_hosts}{$hostip} = 1;
289 # now try to get the data with a short timeout
290 my $timeout = MogileFS::Config->config('conn_timeout') || 2;
291 my $start_time = Time::HiRes::time();
293 my $ua = $self->ua;
294 my $response = $ua->get($url);
295 my $res_time = Time::HiRes::time();
297 $hostip ||= 'unknown';
298 $get_port ||= 'unknown';
299 $devid ||= 'unknown';
300 $timeout ||= 'unknown';
301 $url ||= 'unknown';
302 unless ($response->is_success) {
303 my $failed_after = $res_time - $start_time;
304 if ($failed_after < 0.5) {
305 $self->state_event('device', $dev->id, {observed_state => 'unreachable'})
306 if (!$dev->observed_unreachable);
307 error("Port $get_port not listening on $hostip ($url)? Error was: " . $response->status_line);
308 } else {
309 $failed_after = sprintf("%.02f", $failed_after);
310 $self->state_event('host', $dev->hostid, {observed_state => 'unreachable'})
311 if (!$host->observed_unreachable);
312 $self->{skip_host}{$dev->hostid} = 1;
313 error("Timeout contacting $hostip dev $devid ($url): took $failed_after seconds out of $timeout allowed");
315 return;
318 # at this point we can reach the host
319 $self->state_event('host', $dev->hostid, {observed_state => 'reachable'})
320 if (!$host->observed_reachable);
321 $self->{iow}->restart_monitoring_if_needed($hostip);
323 my %stats;
324 my $data = $response->content;
325 foreach (split(/\r?\n/, $data)) {
326 next unless /^(\w+)\s*:\s*(.+)$/;
327 $stats{$1} = $2;
330 my ($used, $total) = ($stats{used}, $stats{total});
331 unless ($used && $total) {
332 $used = "<undef>" unless defined $used;
333 $total = "<undef>" unless defined $total;
334 my $clen = length($data || "");
335 error("dev$devid reports used = $used, total = $total, content-length: $clen, error?");
336 return;
339 # only update database every ~15 seconds per device
340 my $last_update = $self->{last_db_update}{$dev->id} || 0;
341 my $next_update = $last_update + UPDATE_DB_EVERY;
342 my $now = time();
343 if ($now >= $next_update) {
344 Mgd::get_store()->update_device_usage(mb_total => int($total / 1024),
345 mb_used => int($used / 1024),
346 devid => $devid);
347 $self->{last_db_update}{$devid} = $now;
350 # next if we're not going to try this now
351 # FIXME: Uncomment this to throttle test writes again.
352 #return if ($self->{last_test_write}{$devid} || 0) + UPDATE_DB_EVERY > $now;
353 $self->{last_test_write}{$devid} = $now;
355 # now we want to check if this device is writeable
357 # first, create the test-write directory. this will return
358 # immediately after the first time, as the 'create_directory'
359 # function caches what it's already created.
360 $dev->create_directory("/dev$devid/test-write");
362 my $num = int(rand 100); # this was "$$-$now" before, but we don't yet have a cleaner in mogstored for these files
363 my $puturl = "http://$hostip:$port/dev$devid/test-write/test-write-$num";
364 my $content = "time=$now rand=$num";
365 my $req = HTTP::Request->new(PUT => $puturl);
366 $req->content($content);
368 # TODO: guard against race-conditions with double-check on failure
370 # now, depending on what happens
371 my $resp = $ua->request($req);
372 if ($resp->is_success) {
373 # now let's get it back to verify; note we use the get_port to verify that
374 # the distinction works (if we have one)
375 my $geturl = "http://$hostip:$get_port/dev$devid/test-write/test-write-$num";
376 my $testwrite = $ua->get($geturl);
378 # if success and the content matches, mark it writeable
379 if ($testwrite->is_success && $testwrite->content eq $content) {
380 $self->state_event('device', $devid, {observed_state => 'writeable'})
381 if (!$dev->observed_writeable);
382 debug("dev$devid: used = $used, total = $total, writeable = 1");
383 return;
387 # if we fall through to here, then we know that something is not so good, so mark it readable
388 # which is guaranteed given we even tested writeability
389 $self->state_event('device', $devid, {observed_state => 'readable'})
390 if (!$dev->observed_readable);
391 debug("dev$devid: used = $used, total = $total, writeable = 0");
394 sub check_device {
395 my ($self, $dev) = @_;
397 my $devid = $dev->id;
398 my $host = $dev->host;
400 my $port = $host->http_port;
401 my $get_port = $host->http_get_port; # || $port;
402 my $hostip = $host->ip;
403 my $url = $dev->usage_url;
405 $self->{seen_hosts}{$hostip} = 1;
407 # now try to get the data with a short timeout
408 my $timeout = MogileFS::Config->config('conn_timeout') || 2;
409 my $start_time = Time::HiRes::time();
411 my $ua = $self->ua;
412 my $response = $ua->get($url);
413 my $res_time = Time::HiRes::time();
415 $hostip ||= 'unknown';
416 $get_port ||= 'unknown';
417 $devid ||= 'unknown';
418 $timeout ||= 'unknown';
419 $url ||= 'unknown';
420 unless ($response->is_success) {
421 my $failed_after = $res_time - $start_time;
422 if ($failed_after < 0.5) {
423 $self->broadcast_device_unreachable($dev->id);
424 error("Port $get_port not listening on $hostip ($url)? Error was: " . $response->status_line);
425 } else {
426 $failed_after = sprintf("%.02f", $failed_after);
427 $self->broadcast_host_unreachable($dev->hostid);
428 $self->{skip_host}{$dev->hostid} = 1;
429 error("Timeout contacting $hostip dev $devid ($url): took $failed_after seconds out of $timeout allowed");
431 return;
434 # at this point we can reach the host
435 $self->broadcast_host_reachable($dev->hostid);
436 $self->{iow}->restart_monitoring_if_needed($hostip);
438 my %stats;
439 my $data = $response->content;
440 foreach (split(/\r?\n/, $data)) {
441 next unless /^(\w+)\s*:\s*(.+)$/;
442 $stats{$1} = $2;
445 my ($used, $total) = ($stats{used}, $stats{total});
446 unless ($used && $total) {
447 $used = "<undef>" unless defined $used;
448 $total = "<undef>" unless defined $total;
449 my $clen = length($data || "");
450 error("dev$devid reports used = $used, total = $total, content-length: $clen, error?");
451 return;
454 # only update database every ~15 seconds per device
455 my $last_update = $self->{last_db_update}{$dev->id} || 0;
456 my $next_update = $last_update + UPDATE_DB_EVERY;
457 my $now = time();
458 if ($now >= $next_update) {
459 Mgd::get_store()->update_device_usage(mb_total => int($total / 1024),
460 mb_used => int($used / 1024),
461 devid => $devid);
462 $self->{last_db_update}{$devid} = $now;
465 # next if we're not going to try this now
466 return if ($self->{last_test_write}{$devid} || 0) + UPDATE_DB_EVERY > $now;
467 $self->{last_test_write}{$devid} = $now;
469 # now we want to check if this device is writeable
471 # first, create the test-write directory. this will return
472 # immediately after the first time, as the 'create_directory'
473 # function caches what it's already created.
474 $dev->create_directory("/dev$devid/test-write");
476 my $num = int(rand 100); # this was "$$-$now" before, but we don't yet have a cleaner in mogstored for these files
477 my $puturl = "http://$hostip:$port/dev$devid/test-write/test-write-$num";
478 my $content = "time=$now rand=$num";
479 my $req = HTTP::Request->new(PUT => $puturl);
480 $req->content($content);
482 # TODO: guard against race-conditions with double-check on failure
484 # now, depending on what happens
485 my $resp = $ua->request($req);
486 if ($resp->is_success) {
487 # now let's get it back to verify; note we use the get_port to verify that
488 # the distinction works (if we have one)
489 my $geturl = "http://$hostip:$get_port/dev$devid/test-write/test-write-$num";
490 my $testwrite = $ua->get($geturl);
492 # if success and the content matches, mark it writeable
493 if ($testwrite->is_success && $testwrite->content eq $content) {
494 $self->broadcast_device_writeable($devid);
495 debug("dev$devid: used = $used, total = $total, writeable = 1");
496 return;
500 # if we fall through to here, then we know that something is not so good, so mark it readable
501 # which is guaranteed given we even tested writeability
502 $self->broadcast_device_readable($devid);
503 debug("dev$devid: used = $used, total = $total, writeable = 0");
508 # Local Variables:
509 # mode: perl
510 # c-basic-indent: 4
511 # indent-tabs-mode: nil
512 # End: