1 # -*- encoding: binary -*-
3 class Rainbows::Coolio::Client < Coolio::IO
4 include Rainbows::EvCore
5 APP = Rainbows.server.app
6 CONN = Rainbows::Coolio::CONN
7 KATO = Rainbows::Coolio::KATO
8 LOOP = Coolio::Loop.default
18 enable unless enabled?
23 close if nil == @deferred && @_write_buffer.empty?
26 # override the Coolio::IO#write method try to write directly to the
27 # kernel socket buffers to avoid an extra userspace copy if
30 if @_write_buffer.empty?
32 case rv = @_io.kgio_trywrite(buf)
34 return enable_write_watcher
36 break # fall through to super(buf)
38 buf = rv # retry, skb could grow or been drained
41 return handle_error(e)
48 buf = @_io.kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, RBUF)
56 rescue Errno::ECONNRESET
60 # allows enabling of write watcher even when read watcher is disabled
68 enable_write_watcher # trigger on_write_complete
72 if nil == @deferred && @_write_buffer.empty?
80 # used for streaming sockets and pipes
81 def stream_response_body(body, io, chunk)
82 # we only want to attach to the Coolio::Loop belonging to the
83 # main thread in Ruby 1.9
84 (chunk ? Rainbows::Coolio::ResponseChunkPipe :
85 Rainbows::Coolio::ResponsePipe).new(io, self, body).attach(LOOP)
95 def write_response_path(status, headers, body, alive)
100 defer_file(status, headers, body, alive, io, st)
101 elsif st.socket? || st.pipe?
102 chunk = stream_response_headers(status, headers, alive, body)
103 return hijacked if nil == chunk
104 stream_response_body(body, io, chunk)
106 # char or block device... WTF?
107 write_response(status, headers, body, alive)
111 def ev_write_response(status, headers, body, alive)
112 if body.respond_to?(:to_path)
113 body = write_response_path(status, headers, body, alive)
115 body = write_response(status, headers, body, alive)
117 return hijacked unless body
118 return quit unless alive && :close != @state
125 @env[RACK_INPUT] = input
126 @env[REMOTE_ADDR] = @_io.kgio_addr
127 @env[ASYNC_CALLBACK] = method(:write_async_response)
128 @hp.hijack_setup(@env, @_io)
129 status, headers, body = catch(:async) {
130 APP.call(@env.merge!(RACK_DEFAULTS))
132 return hijacked if @hp.hijacked?
134 (nil == status || -1 == status) ? @deferred = true :
135 ev_write_response(status, headers, body, @hp.next?)
138 def on_write_complete
140 when true then return # #next! will clear this bit
141 when nil # fall through
143 return if stream_file_chunk(@deferred)
144 close_deferred # EOF, fall through
149 close if @_write_buffer.empty?
152 buf = @_io.kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, RBUF) or return close
153 String === buf and return on_read(buf)
154 # buf == :wait_readable
157 KATO[self] = Time.now
169 if msg = Rainbows::Error.response(e)
170 @_io.kgio_trywrite(msg) rescue nil
180 @deferred.close if @deferred.respond_to?(:close)
182 Unicorn.log_error(Rainbows.server.logger,
183 "closing deferred=#{@deferred.inspect}", e)
195 if IO.method_defined?(:trysendfile)
196 def defer_file(status, headers, body, alive, io, st)
197 if r = sendfile_range(status, headers)
198 status, headers, range = r
199 body = write_headers(status, headers, alive, body) or return hijacked
200 range and defer_file_stream(range[0], range[1], io, body)
202 write_headers(status, headers, alive, body) or return hijacked
203 defer_file_stream(0, st.size, io, body)
208 def stream_file_chunk(sf) # +sf+ is a Rainbows::StreamFile object
209 case n = @_io.trysendfile(sf, sf.offset, sf.count)
212 return if 0 == (sf.count -= n)
214 return enable_write_watcher
220 def defer_file(status, headers, body, alive, io, st)
221 write_headers(status, headers, alive, body) or return hijacked
222 defer_file_stream(0, st.size, io, body)
226 def stream_file_chunk(body)
227 buf = body.to_io.read(0x4000) and write(buf)
231 def defer_file_stream(offset, count, io, body)
232 @deferred = Rainbows::StreamFile.new(offset, count, io, body)