1 ### simple package for handling the stream request port
2 package Mogstored
::SideChannelClient
;
5 use base
qw{Perlbal
::Socket
};
7 'count', # how many requests we've serviced
8 'read_buf', # unprocessed read buffer
9 'mogsvc', # the mogstored Perlbal::Service object
12 use POSIX
qw(O_RDONLY);
13 use Mogstored
::TaskQueue
;
16 eval { require IO
::AIO
; };
19 # TODO: interface to make this tunable
22 # needed since we're pretending to be a Perlbal::Socket... never idle out
23 sub max_idle_time
{ return 0; }
26 my Mogstored
::SideChannelClient
$self = shift;
27 $self = fields
::new
($self) unless ref $self;
28 $self->SUPER::new
(@_);
30 $self->{read_buf
} = '';
31 $self->{mogsvc
} = Perlbal
->service('mogstored');
36 my ($self, $uri) = @_;
38 $self->write("ERROR: uri invalid (contains ..)\r\n");
45 my Mogstored
::SideChannelClient
$self = shift;
47 my $bref = $self->read(1024);
48 return $self->close unless defined $bref;
49 $self->{read_buf
} .= $$bref;
50 $self->read_buf_consume;
53 sub read_buf_consume
{
55 my $path = $self->{mogsvc
}->{docroot
};
57 while ($self->{read_buf
} =~ s/^(.+?)\r?\n//) {
59 if ($cmd =~ /^size (\S+)$/) {
63 my $uri = $self->validate_uri($1);
64 return unless defined($uri);
66 # now stat the file to get the size and such
67 Perlbal
::AIO
::aio_stat
("$path$uri", sub {
68 return if $self->{closed
};
69 my $size = -e _ ?
-s _
: -1;
70 $self->write("$uri $size\r\n");
72 } elsif ($cmd =~ /^watch$/i) {
73 unless (Mogstored
->iostat_available) {
74 $self->write("ERR iostat unavailable\r\n");
78 Mogstored
->iostat_subscribe($self);
79 } elsif ($cmd =~ /^(MD5|SHA-1) (\S+)(?: (\w+))?$/) {
80 # we can easily enable other hash algorithms with the above
81 # regexp, but we won't for now (see MogileFS::Checksum)
83 my $uri = $self->validate_uri($2);
85 return unless defined($uri);
87 return $self->digest($alg, $path, $uri, $reason);
89 # we don't understand this so pass it on to manage command interface
91 Perlbal
::run_manage_command
($cmd, sub { push @out, $_[0]; });
92 $self->write(join("\r\n", @out) . "\r\n");
97 # stop watching writeability if we've nothing else to
98 # write to them. else just kick off more writes.
101 $self->watch_write(0) if $self->write(undef);
104 # override Danga::Socket's event handlers which die
105 sub event_err
{ $_[0]->close; }
106 sub event_hup
{ $_[0]->close; }
110 my Mogstored
::SideChannelClient
$self = shift;
112 my $ret = $self->SUPER::as_string
;
113 $ret .= "; size_requests=$self->{count}";
119 my Mogstored
::SideChannelClient
$self = shift;
120 Mogstored
->iostat_unsubscribe($self);
125 Mogstored
->on_sidechannel_die_gracefully;
129 my ($self, $alg, $path, $uri, $reason) = @_;
131 $self->watch_read(0);
133 Perlbal
::AIO
::aio_open
("$path$uri", O_RDONLY
, 0, sub {
136 IO
::AIO
::fadvise
(fileno($fh), 0, 0, IO
::AIO
::FADV_SEQUENTIAL
());
139 if ($self->{closed
}) {
140 CORE
::close($fh) if $fh;
146 if ($reason && $reason eq "fsck") {
147 # fstat(2) should return immediately, no AIO needed
148 my $devid = (stat($fh))[0];
149 $queue = $digest_queues{$devid} ||= Mogstored
::TaskQueue
->new;
150 $queue->run(sub { $self->digest_fh($alg, $fh, $uri, $queue) });
152 $self->digest_fh($alg, $fh, $uri);
155 $self->write("$uri $alg=-1\r\n");
156 $self->after_long_request;
162 my ($self, $alg, $fh, $uri, $queue) = @_;
165 my $digest = Digest
->new($alg);
171 my $bytes = length($data);
174 Perlbal
::AIO
::aio_read
($fh, $offset, 0x100000, $data, $cb);
175 } elsif ($retval == 0) { # EOF
178 $digest = $digest->hexdigest;
179 $self->write("$uri $alg=$digest\r\n");
180 $queue->task_done if $queue;
181 $self->after_long_request;
185 $self->write("ERR read $uri at $offset failed\r\n");
186 $queue->task_done if $queue;
187 $self->after_long_request; # should we try to continue?
190 Perlbal
::AIO
::aio_read
($fh, $offset, 0x100000, $data, $cb);
193 sub after_long_request
{
196 if ($self->{read_buf
} =~ /^(.+?)\r?\n/) {
197 $self->read_buf_consume;
199 $self->watch_read(1);