1 package MogileFS
::IOStatWatcher
;
3 use Sys
::Syscall
0.22; # We use it indirectly, and trigger bugs in earlier versions.
9 =head2 $iow = MogileFS::IOStatWatcher->new()
11 Returns a new IOStatWatcher object.
20 $self->on_stats; # set an empty handler.
24 =head2 $iow->set_hosts( host1 [, host2 [, ...] ] )
26 Sets the list of hosts to connect to for collecting IOStat information. This call can block if you
27 pass it hostnames instead of ip addresses.
29 Upon successful connection, the on_stats callback will be called each time the statistics are
30 collected. Error states (failed connections, etc.) will trigger retries on 60 second intervals, and
31 disconnects will trigger an immediate reconnect.
36 my ($self, @ips) = @_;
37 my $old_hosts = $self->{hosts
};
39 foreach my $host (@ips) {
40 $new_hosts->{$host} = (delete $old_hosts->{$host}) || MogileFS
::IOStatWatch
::Client
->new($host, $self);
42 # TODO: close hosts that were removed (things in %$old_hosts)
43 $self->{hosts
} = $new_hosts;
46 =head2 $iow->on_stats( coderef )
48 Sets the coderef called for the C<on_stats> callback.
55 unless (ref $cb eq 'CODE') {
59 $self->{on_stats
} = $cb;
64 =head2 on_stats->( host, stats )
66 Called each time device use statistics are collected. The C<host>
67 argument is the value passed in to the C<set_hosts> method. The
68 C<stats> object is a hashref of mogile device numbers (without leading
69 "dev") to their corresponding utilization percentages.
73 # Everything beyond here is internal.
76 my ($self, $host, $stats) = @_;
77 $self->{on_stats
}->($host, $stats);
80 sub restart_monitoring_if_needed
{
81 my ($self, $host) = @_;
82 return unless $self->{hosts
}->{$host} && $self->{hosts
}->{$host}->{closed
};
83 $self->{hosts
}->{$host} = MogileFS
::IOStatWatch
::Client
->new($host, $self);
87 my ($self, $host) = @_;
88 Danga
::Socket
->AddTimer(60, sub {
89 $self->restart_monitoring_if_needed($host);
94 my ($self, $host) = @_;
95 $self->{hosts
}->{$host} = MogileFS
::IOStatWatch
::Client
->new($host, $self);
98 # Support class that does the communication with individual hosts.
99 package MogileFS
::IOStatWatch
::Client
;
104 use base
'Danga::Socket';
105 use fields
qw(host watcher buffer active);
108 my MogileFS
::IOStatWatch
::Client
$self = shift;
109 my $hostspec = shift;
112 my $sock = IO
::Socket
::INET
->new(
113 PeerAddr
=> $hostspec,
114 PeerPort
=> MogileFS
->config("mogstored_stream_port"),
120 $self = fields
::new
($self) unless ref $self;
121 $self->SUPER::new
($sock);
122 $self->watch_write(1);
123 $self->watch_read(1);
125 $self->{watcher
} = $watcher;
126 $self->{buffer
} = '';
127 $self->{host
} = $hostspec;
133 my MogileFS
::IOStatWatch
::Client
$self = shift;
135 $self->write("watch\n");
136 $self->watch_write(0); # I hope I can safely assume that 6 characters will write properly.
140 my MogileFS
::IOStatWatch
::Client
$self = shift;
142 my $bref = $self->read(10240);
143 return $self->close unless defined $bref;
145 $self->{buffer
} .= $$bref;
147 if ($self->{buffer
} =~ m/^ERR\s+(.*?)\s* $ /x) {
148 # There was an error on the way to watching this machine, close it and stay quiet.
152 # If we can yank off lines till there is one by itself with a . on it, we've gotten a full set of stats.
153 while ($self->{buffer
} =~ s/^(.*?\n)?\.\n//s) {
155 foreach my $line (split /\n+/, $1) {
157 my ($devnum, $util) = split /\s+/, $line;
158 $stats{$devnum} = $util;
160 $self->{watcher
}->got_stats($self->{host
}, \
%stats);
165 my MogileFS
::IOStatWatch
::Client
$self = shift;
166 $self->{watcher
}->got_error($self->{host
});
170 my MogileFS
::IOStatWatch
::Client
$self = shift;
171 $self->{watcher
}->got_error($self->{host
});
175 my MogileFS
::IOStatWatch
::Client
$self = shift;
176 if ($self->{active
}) {
177 $self->{watcher
}->got_disconnect($self->{host
});
179 $self->{watcher
}->got_error($self->{host
});
181 $self->SUPER::close(@_);