1 # -*- encoding: binary -*-
3 class Rainbows::EventMachine::Client < EM::Connection
4 include Rainbows::EvCore
5 Rainbows.config!(self, :keepalive_timeout)
15 def receive_data(data)
16 # To avoid clobbering the current streaming response
17 # (often a static file), we do not attempt to process another
18 # request on the same connection until the first is complete
22 @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
24 EM.next_tick { receive_data(nil) } unless @buf.empty?
26 on_read(data || ''.freeze) if (@buf.size > 0) || data
32 close_connection_after_writing if nil == @deferred
36 set_comm_inactivity_timeout 0
37 @env['rack.input'] = input
38 @env['REMOTE_ADDR'] = @_io.kgio_addr
39 @env['async.callback'] = method(:write_async_response)
40 @env['async.close'] = EM::DefaultDeferrable.new
41 @hp.hijack_setup(@_io)
42 status, headers, body = catch(:async) {
43 APP.call(@env.merge!(RACK_DEFAULTS))
45 return hijacked if @hp.hijacked?
47 if (nil == status || -1 == status)
50 ev_write_response(status, headers, body, @hp.next?)
54 def deferred_errback(orig_body)
56 orig_body.close if orig_body.respond_to?(:close)
62 def deferred_callback(orig_body, alive)
64 orig_body.close if orig_body.respond_to?(:close)
66 alive ? receive_data(nil) : quit
70 def ev_write_response(status, headers, body, alive)
71 @state = :headers if alive
72 if body.respond_to?(:errback) && body.respond_to?(:callback)
73 write_headers(status, headers, alive, body) or return hijacked
76 deferred_errback(body)
77 deferred_callback(body, alive)
79 elsif body.respond_to?(:to_path)
80 st = File.stat(path = body.to_path)
83 write_headers(status, headers, alive, body) or return hijacked
84 @deferred = stream_file_data(path)
85 deferred_errback(body)
86 deferred_callback(body, alive)
88 elsif st.socket? || st.pipe?
89 chunk = stream_response_headers(status, headers, alive, body)
90 return hijacked if nil == chunk
91 io = body_to_io(@deferred = body)
92 m = chunk ? Rainbows::EventMachine::ResponseChunkPipe :
93 Rainbows::EventMachine::ResponsePipe
94 return EM.watch(io, m, self).notify_readable = true
96 # char or block device... WTF? fall through to body.each
98 write_response(status, headers, body, alive) or return hijacked
102 set_comm_inactivity_timeout(KEEPALIVE_TIMEOUT)
104 EM.next_tick { receive_data(nil) }
108 quit unless @deferred
113 @deferred.close if @deferred.respond_to?(:close)
115 @hp.keepalive? ? receive_data(nil) : quit
119 return if @hp.hijacked?
120 async_close = @env['async.close'] and async_close.succeed
121 @deferred.respond_to?(:fail) and @deferred.fail
125 # EventMachine's EventableDescriptor::Close() may close
126 # the underlying file descriptor without invalidating the
127 # associated IO object on errors, so @_io.closed? isn't