1 # private base class for poolable HTTP/Mogstored sidechannel connections
2 # This is currently only used by HTTP, but is intended for Mogstored
4 package MogileFS
::Connection
::Poolable
;
8 use base
qw(Danga::Socket);
10 'mfs_pool', # owner of the connection (MogileFS::ConnectionPool)
11 'mfs_hostport', # [ ip, port ]
12 'mfs_expire', # Danga::Socket::Timer object
13 'mfs_expire_cb', # Danga::Socket::Timer callback
14 'mfs_requests', # number of requests made on this object
15 'mfs_err', # used to propagate an error to start()
17 use Socket
qw(SO_KEEPALIVE);
20 # subclasses (MogileFS::Connection::{HTTP,Mogstored}) must call this sub
22 my ($self, $sock, $ip, $port) = @_;
23 $self->SUPER::new
($sock); # Danga::Socket->new
25 # connection may not be established, yet
26 # so Danga::Socket->peer_addr_string can't be used here
27 $self->{mfs_hostport
} = [ $ip, $port ];
28 $self->{mfs_requests
} = 0;
33 # used by ConnectionPool for tracking per-hostport connection counts
34 sub key
{ join(':', @
{$_[0]->{mfs_hostport
}}); }
36 # backwards compatibility
37 sub host_port
{ $_[0]->key; }
39 sub ip_port
{ @
{$_[0]->{mfs_hostport
}}; }
41 sub fd
{ fileno($_[0]->sock); }
43 # marks a connection as idle, call this before putting it in a connection
44 # pool for eventual reuse.
50 # set the keepalive flag the first time we're idle
51 $self->sock->sockopt(SO_KEEPALIVE
, 1) if $self->{mfs_requests
} == 0;
53 $self->{mfs_requests
}++;
56 # the request running on this connection is retryable if this socket
57 # has ever been marked idle. The connection pool can never be 100%
58 # reliable for detecting dead sockets, and all HTTP requests made by
59 # MogileFS are idempotent.
61 my ($self, $reason) = @_;
62 return ($reason !~ /timeout/ && $self->{mfs_requests
} > 0);
65 # Sets (or updates) the timeout of the connection
66 # timeout_key is "node_timeout" or "conn_timeout"
67 # clears the current timeout if timeout_key is undef
69 my ($self, $timeout_key) = @_;
70 my $mfs_pool = $self->{mfs_pool
};
75 if ($timeout_key =~ /[a-z_]/) {
76 $timeout = MogileFS
->config($timeout_key) || 2;
78 $timeout = $timeout_key;
79 $timeout_key = "timeout";
82 my $t0 = Time
::HiRes
::time();
83 $self->{mfs_expire
} = $t0 + $timeout;
84 $self->{mfs_expire_cb
} = sub {
86 my $elapsed = $now - $t0;
88 # for HTTP, this will fake an HTTP error response like LWP does
89 $self->err("$timeout_key: $timeout (elapsed: $elapsed)");
91 $mfs_pool->register_timeout($self, $timeout) if $mfs_pool;
93 $self->{mfs_expire
} = $self->{mfs_expire_cb
} = undef;
94 $mfs_pool->register_timeout($self, undef) if $mfs_pool;
98 # returns the expiry time of the connection
99 sub expiry
{ $_[0]->{mfs_expire
} }
101 # runs expiry callback and returns true if time is up,
102 # returns false if there is time remaining
104 my ($self, $now) = @_;
105 my $expire = $self->{mfs_expire
} or return 0;
106 $now ||= Time
::HiRes
::time();
108 if ($now >= $expire) {
109 my $expire_cb = delete $self->{mfs_expire_cb
};
110 if ($expire_cb && $self->sock) {
118 # may be overriden in subclass, called only on errors
119 # The HTTP version of this will fake an HTTP response for LWP compatibility
121 my ($self, $close_reason) = @_;
123 $self->inflight_expire; # ensure we don't call new_err on eventual close()
125 if ($close_reason =~ /\A:event_(?:hup|err)\z/) {
126 # there's a chance this can be invoked while inflight,
127 # conn_drop will handle this case appropriately
128 $self->{mfs_pool
}->conn_drop($self, $close_reason) if $self->{mfs_pool
};
130 $self->close($close_reason);
134 # sets the pool this connection belongs to, only call from ConnectionPool
136 my ($self, $pool) = @_;
138 $self->{mfs_pool
} = $pool;
141 # closes a connection, and may reschedule the inflight callback if
142 # close_reason is ":retry"
144 my ($self, $close_reason) = @_;
146 delete $self->{mfs_expire_cb
}; # avoid circular ref
148 my $mfs_pool = delete $self->{mfs_pool
}; # avoid circular ref
152 $mfs_pool->schedule_queued;
153 $inflight_cb = $mfs_pool->conn_close_prepare($self, $close_reason);
155 $self->SUPER::close($close_reason); # Danga::Socket->close
157 if ($inflight_cb && $close_reason) {
158 if ($close_reason eq ":retry") {
159 my ($ip, $port) = $self->ip_port;
161 $mfs_pool->enqueue($ip, $port, $inflight_cb);
163 # Danga::Socket-scheduled write()s which fail with ECONNREFUSED,
164 # EPIPE, or "write_error" after an initial (non-blocking)
166 $mfs_pool->on_next_tick(sub {
167 ref($self)->new_err($close_reason || "error", $inflight_cb);
173 # Marks a connection as no-longer inflight. Calling this prevents retries.
174 sub inflight_expire
{
176 my $mfs_pool = $self->{mfs_pool
};
177 die "BUG: expiring without MogileFS::ConnectionPool\n" unless $mfs_pool;
178 $mfs_pool->inflight_cb_expire($self);
181 # Danga::Socket callbacks
182 sub event_hup
{ $_[0]->err(':event_hup'); }
183 sub event_err
{ $_[0]->err(':event_err'); }
185 # called when we couldn't create a socket, but need to create an object
186 # anyways for errors (creating fake, LWP-style error responses)
188 my ($class, $err, $start_cb) = @_;
189 my $self = fields
::new
($class);
190 $self->{mfs_err
} = $err;
195 # returns this connection back to its associated pool.
196 # Returns false if not successful (pool is full)
199 my $mfs_pool = $self->{mfs_pool
};
201 return $mfs_pool ?
$mfs_pool->conn_persist($self) : 0;