From d8cd470b66ba015b0e55706a3d3ec5fbddc1c6e0 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 4 Sep 2012 23:21:40 +0000 Subject: [PATCH] monitor: refactor/rewrite to use new async API In order to migrate to the upcoming Danga::Socket-based HTTP API, we'll first refactor monitor to use the new API (but preserve LWP usage behind-the-scenes). DEBUG=1 users will see the elapsed time for all device refreshes each time monitor runs. While we're at it, also guard against race conditions on the PUT/GET test by double-checking on failure. (A long-standing TODO item) also squashed the following commit: use conn_timeout in monitor, node_timeout in other workers This matches the behavior in MogileFS:Server 2.65. It makes sense to use a different, lower timeout in monitor to quickly detect overloaded nodes and avoid propagating their liveness for a monitoring period. It also makes sense to use a higher value for node_timeout in other workers since other actions are less fault-tolerant. For example, a timed-out size check in create_close may cause a client to eventually reupload the file, creating even more load on the cluster. --- MANIFEST | 1 + lib/MogileFS/Connection/Parent.pm | 35 +++ lib/MogileFS/Host.pm | 48 ++++ lib/MogileFS/Worker.pm | 5 + lib/MogileFS/Worker/Monitor.pm | 462 ++++++++++++++++++++++++++------------ 5 files changed, 410 insertions(+), 141 deletions(-) create mode 100644 lib/MogileFS/Connection/Parent.pm diff --git a/MANIFEST b/MANIFEST index 84c27a6..e7ba10b 100644 --- a/MANIFEST +++ b/MANIFEST @@ -12,6 +12,7 @@ lib/MogileFS/Class.pm lib/MogileFS/Config.pm lib/MogileFS/Connection/Client.pm lib/MogileFS/Connection/Mogstored.pm +lib/MogileFS/Connection/Parent.pm lib/MogileFS/Connection/Worker.pm lib/MogileFS/DevFID.pm lib/MogileFS/Device.pm diff --git a/lib/MogileFS/Connection/Parent.pm b/lib/MogileFS/Connection/Parent.pm new file mode 100644 index 0000000..d28ffa6 --- /dev/null +++ b/lib/MogileFS/Connection/Parent.pm @@ -0,0 +1,35 @@ +package MogileFS::Connection::Parent; +# maintains a connection in a worker process to the parent ProcManager process +# Only used by workers that use the Danga::Socket->EventLoop internally +# currently only Monitor +use warnings; +use strict; +use Danga::Socket (); +use base qw{Danga::Socket}; +use fields qw(worker); + +sub new { + my ($self, $worker) = @_; + $self = fields::new($self) unless ref $self; + $self->SUPER::new($worker->psock); + $self->{worker} = $worker; + + return $self; +} + +sub ping { + my ($self) = @_; + + $self->write(":ping\r\n"); +} + +sub event_read { + my ($self) = @_; + + $self->{worker}->read_from_parent; +} + +sub event_hup { $_[0]->close } +sub event_err { $_[0]->close } + +1; diff --git a/lib/MogileFS/Host.pm b/lib/MogileFS/Host.pm index 49964bc..6b0c560 100644 --- a/lib/MogileFS/Host.pm +++ b/lib/MogileFS/Host.pm @@ -6,6 +6,15 @@ use Net::Netmask; use Carp qw(croak); use MogileFS::Connection::Mogstored; +# temporary... +use LWP::UserAgent; +sub ua { + LWP::UserAgent->new( + timeout => MogileFS::Config->config('conn_timeout') || 2, + keep_alive => 20, + ); +} + =head1 MogileFS::Host - host class @@ -100,4 +109,43 @@ sub sidechannel_port { MogileFS->config("mogstored_stream_port"); } +sub _http_req { + my ($self, $method, $uri, $opts, $cb) = @_; + + $opts ||= {}; + my $h = $opts->{headers} || {}; + my $req = HTTP::Request->new($method, $uri, [ %$h ]); + $req->content($opts->{content}) if exists $opts->{content}; + Danga::Socket->AddTimer(0, sub { + my $response = $self->ua->request($req); + Danga::Socket->AddTimer(0, sub { $cb->($response) }); + }); +} + +# FIXME: make async +sub http_get { + my ($self, $method, $uri, $opts, $cb) = @_; + + if ($method !~ /\A(?:GET|HEAD)\z/) { + die "Bad method for HTTP get port: $method"; + } + + # convert path-only URL to full URL + if ($uri =~ m{\A/}) { + $uri = 'http://' . $self->ip . ':' . $self->http_get_port . $uri; + } + $self->_http_req($method, $uri, $opts, $cb); +} + +# FIXME: make async +sub http { + my ($self, $method, $uri, $opts, $cb) = @_; + + # convert path-only URL to full URL + if ($uri =~ m{\A/}) { + $uri = 'http://' . $self->ip . ':' . $self->http_port . $uri; + } + $self->_http_req($method, $uri, $opts, $cb); +} + 1; diff --git a/lib/MogileFS/Worker.pm b/lib/MogileFS/Worker.pm index f32ff60..f3a74df 100644 --- a/lib/MogileFS/Worker.pm +++ b/lib/MogileFS/Worker.pm @@ -41,6 +41,11 @@ sub psock_fd { return fileno($self->{psock}); } +sub psock { + my $self = shift; + return $self->{psock}; +} + sub validate_dbh { return Mgd::validate_dbh(); } diff --git a/lib/MogileFS/Worker/Monitor.pm b/lib/MogileFS/Worker/Monitor.pm index beb18f6..4653d88 100644 --- a/lib/MogileFS/Worker/Monitor.pm +++ b/lib/MogileFS/Worker/Monitor.pm @@ -5,14 +5,21 @@ use warnings; use base 'MogileFS::Worker'; use fields ( 'last_test_write', # devid -> time. time we last tried writing to a device. + 'monitor_start', # main monitor start time 'skip_host', # hostid -> 1 if already noted dead (reset every loop) 'seen_hosts', # IP -> 1 (reset every loop) - 'ua', # LWP::UserAgent for checking usage files 'iow', # MogileFS::IOStatWatcher object 'prev_data', # DB data from previous run 'devutil', # Running tally of device utilization 'events', # Queue of state events + 'refresh_state', # devid -> { used, total, callbacks }, temporary data in each refresh run 'have_masterdb', # Hint flag for if the master DB is available + 'updateable_devices', # devid -> Device, avoids device table updates + 'parent', # socketpair to parent process + 'refresh_pending', # set if there was a manually-requested refresh + 'db_monitor_ran', # We announce "monitor_just_ran" every time the + # device checks are run, but only if the DB has + # been checked inbetween. ); use Danga::Socket 1.56; @@ -20,6 +27,7 @@ use MogileFS::Config; use MogileFS::Util qw(error debug encode_url_args apply_state_events_list); use MogileFS::IOStatWatcher; use MogileFS::Server; +use MogileFS::Connection::Parent; use Digest::MD5 qw(md5_base64); use constant UPDATE_DB_EVERY => 15; @@ -33,7 +41,7 @@ sub new { $self->{iow} = MogileFS::IOStatWatcher->new; $self->{prev_data} = { domain => {}, class => {}, host => {}, device => {} }; - $self->{devutil} = { cur => {}, prev => {} }; + $self->{devutil} = { cur => {}, prev => {}, tmp => {} }; $self->{events} = []; $self->{have_masterdb} = 0; return $self; @@ -43,9 +51,16 @@ sub watchdog_timeout { 30; } +# returns 1 if a DB update was attempted +# returns 0 immediately if the (device) monitor is already running sub cache_refresh { my $self = shift; + if ($self->{refresh_state}) { + debug("Monitor run in progress, will not check for DB updates"); + return 0; + } + debug("Monitor running; checking DB for updates"); # "Fix" our local cache of this flag, so we always check the master DB. MogileFS::Config->cache_server_setting('_master_db_alive', 1); @@ -67,56 +82,113 @@ sub cache_refresh { } $self->send_events_to_parent; + $self->{db_monitor_ran} = 1; + + return 1; } sub usage_refresh { - my $self = shift; + my ($self) = @_; + + # prevent concurrent refresh + return if $self->{refresh_state}; debug("Monitor running; scanning usage files"); + + $self->{refresh_state} = {}; # devid -> ... + $self->{monitor_start} = Time::HiRes::time(); + my $have_dbh = $self->validate_dbh; - my $updateable_devices; # See if we should be allowed to update the device table rows. if ($have_dbh && Mgd::get_store()->get_lock('mgfs:device_update', 0)) { # Fetch the freshlist list of entries, to avoid excessive writes. - $updateable_devices = { map { $_->{devid} => $_ } + $self->{updateable_devices} = { map { $_->{devid} => $_ } Mgd::get_store()->get_all_devices }; + } else { + $self->{updateable_devices} = undef; } $self->{skip_host} = {}; # hostid -> 1 if already noted dead. $self->{seen_hosts} = {}; # IP -> 1 my $dev_factory = MogileFS::Factory::Device->get_factory(); + my $devutil = $self->{devutil}; - my $cur_iow = {}; - # Run check_devices to test host/devs. diff against old values. + $devutil->{tmp} = {}; + # kick off check_device to test host/devs. diff against old values. for my $dev ($dev_factory->get_all) { if (my $state = $self->is_iow_diff($dev)) { $self->state_event('device', $dev->id, {utilization => $state}); } - $cur_iow->{$dev->id} = $self->{devutil}->{cur}->{$dev->id}; - next if $self->{skip_host}{$dev->hostid}; - $self->check_device($dev, $have_dbh, $updateable_devices) - if $dev->can_read_from; - $self->still_alive; # Ping parent if needed so we don't time out - # given lots of devices. + $devutil->{tmp}->{$dev->id} = $devutil->{cur}->{$dev->id}; + + $dev->can_read_from or next; + $self->check_device_begin($dev); } + # we're done if we didn't schedule any work + $self->usage_refresh_done unless keys %{$self->{refresh_state}}; +} + +sub usage_refresh_done { + my ($self) = @_; - if ($have_dbh && $updateable_devices) { + if ($self->{updateable_devices}) { Mgd::get_store()->release_lock('mgfs:device_update'); + $self->{updateable_devices} = undef; } - $self->{devutil}->{prev} = $cur_iow; + $self->{devutil}->{prev} = $self->{devutil}->{tmp}; # Set the IOWatcher hosts (once old monitor code has been disabled) $self->send_events_to_parent; $self->{iow}->set_hosts(keys %{$self->{seen_hosts}}); + + foreach my $devid (keys %{$self->{refresh_state}}) { + error("device check incomplete for dev$devid"); + } + + my $start = delete $self->{monitor_start}; + my $elapsed = Time::HiRes::time() - $start; + debug("device refresh finished after $elapsed"); + + $self->{refresh_state} = undef; + my $pending_since = $self->{refresh_pending}; + + # schedule another usage_refresh immediately if somebody requested it + # Don't announce :monitor_just_ran if somebody requested a refresh + # while we were running, we could've been refreshing on a stale DB + if ($pending_since && $pending_since > $start) { + # using AddTimer to schedule the refresh to avoid stack overflow + # since usage_refresh can call usage_refresh_done directly if + # there are no devices + Danga::Socket->AddTimer(0, sub { + $self->cache_refresh; + $self->usage_refresh; + }); + } + + # announce we're done if we ran on schedule, or we had a + # forced refresh that was requested before we started. + if (!$pending_since || $pending_since <= $start) { + # totally done refreshing, accept manual refresh requests again + $self->{parent}->watch_read(1); + delete $self->{refresh_pending}; + if (delete $self->{db_monitor_ran} || $pending_since) { + $self->send_to_parent(":monitor_just_ran"); + } + } } sub work { my $self = shift; + # It makes sense to have monitor use a shorter timeout + # (conn_timeout) across the board to skip slow hosts. Other workers + # are less tolerant, and may use a higher value in node_timeout. + MogileFS::Config->set_config_no_broadcast("node_timeout", MogileFS::Config->config("conn_timeout")); + my $iow = $self->{iow}; $iow->on_stats(sub { my ($hostname, $stats) = @_; @@ -129,15 +201,17 @@ sub work { } }); - # We announce "monitor_just_ran" every time the device checks are run, but - # only if the DB has been checked inbetween. - my $db_monitor_ran = 0; - my $db_monitor; $db_monitor = sub { - $self->parent_ping; - $self->cache_refresh; - $db_monitor_ran++; + $self->still_alive; + + # reschedule immediately if we were blocked by main_monitor. + # setting refresh_pending will call cache_refresh again + if (!$self->cache_refresh) { + $self->{refresh_pending} ||= Time::HiRes::time(); + } + + # always reschedule in 4 seconds, regardless Danga::Socket->AddTimer(4, $db_monitor); }; @@ -146,17 +220,14 @@ sub work { my $main_monitor; $main_monitor = sub { - $self->parent_ping; + $self->{parent}->ping; $self->usage_refresh; - if ($db_monitor_ran) { - $self->send_to_parent(":monitor_just_ran"); - $db_monitor_ran = 0; - } Danga::Socket->AddTimer(2.5, $main_monitor); }; - $main_monitor->(); - Danga::Socket->AddOtherFds($self->psock_fd, sub{ $self->read_from_parent }); + $self->parent_ping; # ensure we get the initial DB state back + $self->{parent} = MogileFS::Connection::Parent->new($self); + Danga::Socket->AddTimer(0, $main_monitor); Danga::Socket->EventLoop; } @@ -164,9 +235,14 @@ sub process_line { my MogileFS::Worker::Monitor $self = shift; my $lineref = shift; if ($$lineref =~ /^:refresh_monitor$/) { - $self->cache_refresh; - $self->usage_refresh; - $self->send_to_parent(":monitor_just_ran"); + if ($self->cache_refresh) { + $self->usage_refresh; + } else { + $self->{refresh_pending} ||= Time::HiRes::time(); + } + # try to stop processing further refresh_monitor requests + # if we're acting on a manual refresh + $self->{parent}->watch_read(0); return 1; } return 0; @@ -306,55 +382,10 @@ sub grab_all_data { return \%ret; } -sub ua { - my $self = shift; - return $self->{ua} ||= LWP::UserAgent->new( - timeout => MogileFS::Config->config('conn_timeout') || 2, - keep_alive => 20, - ); -} - -sub check_device { - my ($self, $dev, $have_dbh, $updateable_devices) = @_; - +# returns true on success, false on failure +sub check_usage_response { + my ($self, $dev, $response) = @_; my $devid = $dev->id; - my $host = $dev->host; - - my $port = $host->http_port; - my $get_port = $host->http_get_port; # || $port; - my $hostip = $host->ip; - my $url = $dev->usage_url; - - $self->{seen_hosts}{$hostip} = 1; - - # now try to get the data with a short timeout - my $timeout = MogileFS::Config->config('conn_timeout') || 2; - my $start_time = Time::HiRes::time(); - - my $ua = $self->ua; - my $response = $ua->get($url); - my $res_time = Time::HiRes::time(); - - unless ($response->is_success) { - my $failed_after = $res_time - $start_time; - if ($failed_after < 0.5) { - $self->state_event('device', $dev->id, {observed_state => 'unreachable'}) - if (!$dev->observed_unreachable); - error("Port $get_port not listening on $hostip ($url)? Error was: " . $response->status_line); - } else { - $failed_after = sprintf("%.02f", $failed_after); - $self->state_event('host', $dev->hostid, {observed_state => 'unreachable'}) - if (!$host->observed_unreachable); - $self->{skip_host}{$dev->hostid} = 1; - error("Timeout contacting $hostip dev $devid ($url): took $failed_after seconds out of $timeout allowed"); - } - return; - } - - # at this point we can reach the host - $self->state_event('host', $dev->hostid, {observed_state => 'reachable'}) - if (!$host->observed_reachable); - $self->{iow}->restart_monitoring_if_needed($hostip); my %stats; my $data = $response->content; @@ -369,93 +400,242 @@ sub check_device { $total = "" unless defined $total; my $clen = length($data || ""); error("dev$devid reports used = $used, total = $total, content-length: $clen, error?"); - return; + return 0; } + my $rstate = $self->{refresh_state}->{$devid}; + ($rstate->{used}, $rstate->{total}) = ($used, $total); + # only update database every ~15 seconds per device - my $now = time(); - if ($have_dbh && $updateable_devices) { - my $devrow = $updateable_devices->{$devid}; + if ($self->{updateable_devices}) { + my $devrow = $self->{updateable_devices}->{$devid}; my $last = ($devrow && $devrow->{mb_asof}) ? $devrow->{mb_asof} : 0; - if ($last + UPDATE_DB_EVERY < $now) { + if ($last + UPDATE_DB_EVERY < time()) { Mgd::get_store()->update_device_usage(mb_total => int($total / 1024), mb_used => int($used / 1024), devid => $devid); } } + return 1; +} + +sub dev_debug { + my ($self, $dev, $writable) = @_; + return unless $Mgd::DEBUG >= 1; + my $devid = $dev->id; + my $rstate = $self->{refresh_state}->{$devid}; + my ($used, $total) = ($rstate->{used}, $rstate->{total}); + + debug("dev$devid: used = $used, total = $total, writeable = $writable"); +} + +sub check_write { + my ($self, $dev) = @_; + my $rstate = $self->{refresh_state}->{$dev->id}; + my $test_write = $rstate->{test_write}; + + if (!$test_write || $test_write->{tries} > 0) { + # this was "$$-$now" before, but we don't yet have a cleaner in + # mogstored for these files + my $num = int(rand 100); + $test_write = $rstate->{test_write} ||= {}; + $test_write->{path} = "/dev${\$dev->id}/test-write/test-write-$num"; + $test_write->{content} = "time=" . time . " rand=$num"; + $test_write->{tries} ||= 2; + } + $test_write->{tries}--; + + my $opts = { content => $test_write->{content} }; + $dev->host->http("PUT", $test_write->{path}, $opts, sub { + my ($response) = @_; + $self->on_check_write_response($dev, $response); + }); +} + +# starts the lengthy device check process +sub check_device_begin { + my ($self, $dev) = @_; + $self->{refresh_state}->{$dev->id} = {}; - # next if we're not going to try this now - return if ($self->{last_test_write}{$devid} || 0) + UPDATE_DB_EVERY > $now; - $self->{last_test_write}{$devid} = $now; + $self->check_device($dev); +} + +# the lengthy device check process +sub check_device { + my ($self, $dev) = @_; + return $self->check_device_done($dev) if $self->{skip_host}{$dev->hostid}; + + my $devid = $dev->id; + my $url = $dev->usage_url; + my $host = $dev->host; + + $self->{seen_hosts}{$host->ip} = 1; + + # now try to get the data with a short timeout + my $start_time = Time::HiRes::time(); + $host->http_get("GET", $dev->usage_url, undef, sub { + my ($response) = @_; + if (!$self->on_usage_response($dev, $response, $start_time)) { + return $self->check_device_done($dev); + } + # next if we're not going to try this now + my $now = time(); + if (($self->{last_test_write}{$devid} || 0) + UPDATE_DB_EVERY > $now) { + return $self->check_device_done($dev); + } + $self->{last_test_write}{$devid} = $now; + + unless ($dev->can_delete_from) { + # we should not try to write on readonly devices because it can be + # mounted as RO. + return $self->dev_observed_readonly($dev); + } + # now we want to check if this device is writeable - unless ($dev->can_delete_from) { - # we should not try to write on readonly devices because it can be # mounted as RO. - $self->state_event('device', $devid, {observed_state => 'readable'}) - if (!$dev->observed_readable); - debug("dev$devid: used = $used, total = $total, writeable = 0"); - return; + # first, create the test-write directory. this will return + # immediately after the first time, as the 'create_directory' + # function caches what it's already created. + $dev->create_directory("/dev$devid/test-write"); # XXX synchronous + + return $self->check_write($dev); + }); +} + +# called on a successful PUT, ensure the data we get back is what we uploaded +sub check_reread { + my ($self, $dev) = @_; + # now let's get it back to verify; note we use the get_port to + # verify that the distinction works (if we have one) + my $test_write = $self->{refresh_state}->{$dev->id}->{test_write}; + $dev->host->http_get("GET", $test_write->{path}, undef, sub { + my ($response) = @_; + $self->on_check_reread_response($dev, $response); + }); +} + +sub on_check_reread_response { + my ($self, $dev, $response) = @_; + my $test_write = $self->{refresh_state}->{$dev->id}->{test_write}; + + # if success and the content matches, mark it writeable + if ($response->is_success) { + if ($response->content eq $test_write->{content}) { + if (!$dev->observed_writeable) { + my $event = { observed_state => 'writeable' }; + $self->state_event('device', $dev->id, $event); + } + $self->dev_debug($dev, 1); + + return $self->check_bogus_md5($dev); # onto the final check... + } + + # content didn't match due to race, retry and hope we're lucky + return $self->check_write($dev) if ($test_write->{tries} > 0); } - # now we want to check if this device is writeable - - # first, create the test-write directory. this will return - # immediately after the first time, as the 'create_directory' - # function caches what it's already created. - $dev->create_directory("/dev$devid/test-write"); - - my $num = int(rand 100); # this was "$$-$now" before, but we don't yet have a cleaner in mogstored for these files - my $puturl = "http://$hostip:$port/dev$devid/test-write/test-write-$num"; - my $content = "time=$now rand=$num"; - my $req = HTTP::Request->new(PUT => $puturl); - $req->content($content); - - # TODO: guard against race-conditions with double-check on failure - - # now, depending on what happens - my $resp = $ua->request($req); - if ($resp->is_success) { - # now let's get it back to verify; note we use the get_port to verify that - # the distinction works (if we have one) - my $geturl = "http://$hostip:$get_port/dev$devid/test-write/test-write-$num"; - my $testwrite = $ua->get($geturl); - - # if success and the content matches, mark it writeable - if ($testwrite->is_success && $testwrite->content eq $content) { - $self->check_bogus_md5($dev); - $self->state_event('device', $devid, {observed_state => 'writeable'}) - if (!$dev->observed_writeable); - debug("dev$devid: used = $used, total = $total, writeable = 1"); - return; + + return $self->dev_observed_readonly($dev); # it's read-only at least +} + +sub on_check_write_response { + my ($self, $dev, $response) = @_; + return $self->check_reread($dev) if $response->is_success; + return $self->dev_observed_readonly($dev); +} + +# returns true on success, false on failure +sub on_usage_response { + my ($self, $dev, $response, $start_time) = @_; + my $host = $dev->host; + my $hostip = $host->ip; + + if ($response->is_success) { + # at this point we can reach the host + if (!$host->observed_reachable) { + my $event = { observed_state => 'reachable' }; + $self->state_event('host', $dev->hostid, $event); } + $self->{iow}->restart_monitoring_if_needed($hostip); + + return $self->check_usage_response($dev, $response); } - # if we fall through to here, then we know that something is not so good, so mark it readable - # which is guaranteed given we even tested writeability - $self->state_event('device', $devid, {observed_state => 'readable'}) - if (!$dev->observed_readable); - debug("dev$devid: used = $used, total = $total, writeable = 0"); + my $url = $dev->usage_url; + my $failed_after = Time::HiRes::time() - $start_time; + if ($failed_after < 0.5) { + if (!$dev->observed_unreachable) { + my $event = { observed_state => 'unreachable' }; + $self->state_event('device', $dev->id, $event); + } + my $get_port = $host->http_get_port; + error("Port $get_port not listening on $hostip ($url)? Error was: " . $response->status_line); + } else { + $failed_after = sprintf("%.02f", $failed_after); + if (!$host->observed_unreachable) { + my $event = { observed_state => 'unreachable' }; + $self->state_event('host', $dev->hostid, $event); + } + $self->{skip_host}{$dev->hostid} = 1; + my $timeout = MogileFS->config("node_timeout"); + my $devid = $dev->id; + error("Timeout contacting $hostip dev $devid ($url): took $failed_after seconds out of $timeout allowed"); + } + return 0; # failure } sub check_bogus_md5 { my ($self, $dev) = @_; - my $host = $dev->host; - my $hostip = $host->ip; - my $port = $host->http_port; - my $devid = $dev->id; - my $puturl = "http://$hostip:$port/dev$devid/test-write/test-md5"; - my $req = HTTP::Request->new(PUT => $puturl); - $req->header("Content-MD5", md5_base64("!") . "=="); - $req->content("."); + my $put_path = "/dev${\$dev->id}/test-write/test-md5"; + my $opts = { + headers => { "Content-MD5" => md5_base64("!") . "==", }, + content => '.', + }; # success is bad here, it means the server doesn't understand how to # verify and reject corrupt bodies from Content-MD5 headers. # most servers /will/ succeed here :< - my $resp = $self->ua->request($req); - my $rej = $resp->is_success ? 0 : 1; + $dev->host->http("PUT", $put_path, $opts, sub { + my ($response) = @_; + $self->on_bogus_md5_response($dev, $response); + }); +} + +sub on_bogus_md5_response { + my ($self, $dev, $response) = @_; + my $rej = $response->is_success ? 0 : 1; my $prev = $dev->reject_bad_md5; if (!defined($prev) || $prev != $rej) { - debug("dev$devid: reject_bad_md5 = $rej"); - $self->state_event('device', $devid, { reject_bad_md5 => $rej }); + debug("dev${\$dev->id}: reject_bad_md5 = $rej"); + $self->state_event('device', $dev->id, { reject_bad_md5 => $rej }); + } + return $self->check_device_done($dev); +} + +# if we fall through to here, then we know that something is not so +# good, so mark it readable which is guaranteed given we even tested +# writeability +sub dev_observed_readonly { + my ($self, $dev) = @_; + + if (!$dev->observed_readable) { + my $event = { observed_state => 'readable' }; + $self->state_event('device', $dev->id, $event); + } + $self->dev_debug($dev, 0); + return $self->check_device_done($dev); +} + +# called when all checks are done for a particular device +sub check_device_done { + my ($self, $dev) = @_; + + $self->still_alive; # Ping parent if needed so we don't time out + # given lots of devices. + delete $self->{refresh_state}->{$dev->id}; + + # if refresh_state is totally empty, we're done + if ((scalar keys %{$self->{refresh_state}}) == 0) { + $self->usage_refresh_done; } } -- 2.11.4.GIT