Checking in changes prior to tagging of version 2.59.
[MogileFS-Server.git] / lib / Mogstored / SideChannelClient.pm
blob4397a2a9b4a5e2e405c07c53f74aa5f7b4240dbd
1 ### simple package for handling the stream request port
2 package Mogstored::SideChannelClient;
4 use strict;
5 use base qw{Perlbal::Socket};
6 use fields (
7 'count', # how many requests we've serviced
8 'read_buf', # unprocessed read buffer
9 'mogsvc', # the mogstored Perlbal::Service object
12 # needed since we're pretending to be a Perlbal::Socket... never idle out
13 sub max_idle_time { return 0; }
15 sub new {
16 my Mogstored::SideChannelClient $self = shift;
17 $self = fields::new($self) unless ref $self;
18 $self->SUPER::new(@_);
19 $self->{count} = 0;
20 $self->{read_buf} = '';
21 $self->{mogsvc} = Perlbal->service('mogstored');
22 return $self;
25 sub event_read {
26 my Mogstored::SideChannelClient $self = shift;
28 my $bref = $self->read(1024);
29 return $self->close unless defined $bref;
30 $self->{read_buf} .= $$bref;
32 my $path = $self->{mogsvc}->{docroot};
34 while ($self->{read_buf} =~ s/^(.+?)\r?\n//) {
35 my $cmd = $1;
36 if ($cmd =~ /^size (\S+)$/) {
37 # increase our count
38 $self->{count}++;
40 # validate uri
41 my $uri = $1;
42 if ($uri =~ /\.\./) {
43 $self->write("ERROR: uri invalid (contains ..)\r\n");
44 return;
47 # now stat the file to get the size and such
48 Perlbal::AIO::aio_stat("$path$uri", sub {
49 return if $self->{closed};
50 my $size = -e _ ? -s _ : -1;
51 $self->write("$uri $size\r\n");
52 });
53 } elsif ($cmd =~ /^watch$/i) {
54 unless (Mogstored->iostat_available) {
55 $self->write("ERR iostat unavailable\r\n");
56 next;
58 $self->watch_read(0);
59 Mogstored->iostat_subscribe($self);
60 } else {
61 # we don't understand this so pass it on to manage command interface
62 my @out;
63 Perlbal::run_manage_command($cmd, sub { push @out, $_[0]; });
64 $self->write(join("\r\n", @out) . "\r\n");
69 # stop watching writeability if we've nothing else to
70 # write to them. else just kick off more writes.
71 sub event_write {
72 my $self = shift;
73 $self->watch_write(0) if $self->write(undef);
76 # override Danga::Socket's event handlers which die
77 sub event_err { $_[0]->close; }
78 sub event_hup { $_[0]->close; }
80 # as_string handler
81 sub as_string {
82 my Mogstored::SideChannelClient $self = shift;
84 my $ret = $self->SUPER::as_string;
85 $ret .= "; size_requests=$self->{count}";
87 return $ret;
90 sub close {
91 my Mogstored::SideChannelClient $self = shift;
92 Mogstored->iostat_unsubscribe($self);
93 $self->SUPER::close;
96 sub die_gracefully {
97 Mogstored->on_sidechannel_die_gracefully;