device: reuse HTTP connections for MKCOL
[MogileFS-Server.git] / lib / MogileFS / IOStatWatcher.pm
blobb6502aa8246fb069b01ba414629c870819ac5d7b
1 package MogileFS::IOStatWatcher;
2 use strict;
3 use Sys::Syscall 0.22; # We use it indirectly, and trigger bugs in earlier versions.
4 use Danga::Socket;
5 use IO::Socket::INET;
7 =head1 Methods
9 =head2 $iow = MogileFS::IOStatWatcher->new()
11 Returns a new IOStatWatcher object.
13 =cut
15 sub new {
16 my ($class) = @_;
17 my $self = bless {
18 hosts => {},
19 }, $class;
20 $self->on_stats; # set an empty handler.
21 return $self;
24 =head2 $iow->set_hosts( host1 [, host2 [, ...] ] )
26 Sets the list of hosts to connect to for collecting IOStat information. This call can block if you
27 pass it hostnames instead of ip addresses.
29 Upon successful connection, the on_stats callback will be called each time the statistics are
30 collected. Error states (failed connections, etc.) will trigger retries on 60 second intervals, and
31 disconnects will trigger an immediate reconnect.
33 =cut
35 sub set_hosts {
36 my ($self, @ips) = @_;
37 my $old_hosts = $self->{hosts};
38 my $new_hosts = {};
39 foreach my $host (@ips) {
40 $new_hosts->{$host} = (delete $old_hosts->{$host}) || MogileFS::IOStatWatch::Client->new($host, $self);
42 # TODO: close hosts that were removed (things in %$old_hosts)
43 $self->{hosts} = $new_hosts;
46 =head2 $iow->on_stats( coderef )
48 Sets the coderef called for the C<on_stats> callback.
50 =cut
52 sub on_stats {
53 my ($self, $cb) = @_;
55 unless (ref $cb eq 'CODE') {
56 $cb = sub {};
59 $self->{on_stats} = $cb;
62 =head1 Callbacks
64 =head2 on_stats->( host, stats )
66 Called each time device use statistics are collected. The C<host>
67 argument is the value passed in to the C<set_hosts> method. The
68 C<stats> object is a hashref of mogile device numbers (without leading
69 "dev") to their corresponding utilization percentages.
71 =cut
73 # Everything beyond here is internal.
75 sub got_stats {
76 my ($self, $host, $stats) = @_;
77 $self->{on_stats}->($host, $stats);
80 sub restart_monitoring_if_needed {
81 my ($self, $host) = @_;
82 return unless $self->{hosts}->{$host} && $self->{hosts}->{$host}->{closed};
83 $self->{hosts}->{$host} = MogileFS::IOStatWatch::Client->new($host, $self);
86 sub got_error {
87 my ($self, $host) = @_;
88 Danga::Socket->AddTimer(60, sub {
89 $self->restart_monitoring_if_needed($host);
90 });
93 sub got_disconnect {
94 my ($self, $host) = @_;
95 $self->{hosts}->{$host} = MogileFS::IOStatWatch::Client->new($host, $self);
98 # Support class that does the communication with individual hosts.
99 package MogileFS::IOStatWatch::Client;
101 use strict;
102 use warnings;
104 use base 'Danga::Socket';
105 use fields qw(host watcher buffer active);
107 sub new {
108 my MogileFS::IOStatWatch::Client $self = shift;
109 my $hostspec = shift;
110 my $watcher = shift;
112 my $sock = IO::Socket::INET->new(
113 PeerAddr => $hostspec,
114 PeerPort => MogileFS->config("mogstored_stream_port"),
115 Proto => 'tcp',
116 Blocking => 0,
118 return unless $sock;
120 $self = fields::new($self) unless ref $self;
121 $self->SUPER::new($sock);
122 $self->watch_write(1);
123 $self->watch_read(1);
125 $self->{watcher} = $watcher;
126 $self->{buffer} = '';
127 $self->{host} = $hostspec;
129 return $self;
132 sub event_write {
133 my MogileFS::IOStatWatch::Client $self = shift;
134 $self->{active} = 1;
135 $self->write("watch\n");
136 $self->watch_write(0); # I hope I can safely assume that 6 characters will write properly.
139 sub event_read {
140 my MogileFS::IOStatWatch::Client $self = shift;
142 my $bref = $self->read(10240);
143 return $self->close unless defined $bref;
145 $self->{buffer} .= $$bref;
147 if ($self->{buffer} =~ m/^ERR\s+(.*?)\s* $ /x) {
148 # There was an error on the way to watching this machine, close it and stay quiet.
149 $self->close;
152 # If we can yank off lines till there is one by itself with a . on it, we've gotten a full set of stats.
153 while ($self->{buffer} =~ s/^(.*?\n)?\.\n//s) {
154 my %stats;
155 foreach my $line (split /\n+/, $1) {
156 next unless $line;
157 my ($devnum, $util) = split /\s+/, $line;
158 $stats{$devnum} = $util;
160 $self->{watcher}->got_stats($self->{host}, \%stats);
164 sub event_err {
165 my MogileFS::IOStatWatch::Client $self = shift;
166 $self->{watcher}->got_error($self->{host});
169 sub event_hup {
170 my MogileFS::IOStatWatch::Client $self = shift;
171 $self->{watcher}->got_error($self->{host});
174 sub close {
175 my MogileFS::IOStatWatch::Client $self = shift;
176 if ($self->{active}) {
177 $self->{watcher}->got_disconnect($self->{host});
178 } else {
179 $self->{watcher}->got_error($self->{host});
181 $self->SUPER::close(@_);