status: add error? method
[upr.git] / lib / upr / input_wrapper.rb
blob2070d381e659a0554f1ccbaf438912ea3a058ce4
1 # -*- encoding: binary -*-
3 module Upr
5   # the underlying middlware for for wrapping env["rack.input"],
6   # this should typically be installed before other middlewares
7   # that may wrap env["rack.input"] in the middleware chain.
8   class InputWrapper < Struct.new(:app, :path_info, :frequency, :backend,
9                                   :input, :pos, :seen, :content_length,
10                                   :upload_id, :mtime)
12     def initialize(app, options = {})
13       super(app,
14             Array(options[:path_info] || nil),
15             options[:frequency] || 3,
16             options[:backend])
18       # support :drb for compatibility with mongrel_upload_progress
19       if options[:drb]
20         backend and raise ArgumentError, ":backend and :drb are incompatible"
21         require 'drb'
22         DRb.start_service
23         self.backend = DRbObject.new(nil, options[:drb])
24       elsif String === backend
25         # allow people to use strings in case their backend gets
26         # lazy-loaded (like an ActiveRecord model)
27         self.backend = eval(backend)
28       else
29         self.backend ||= Upr::Monitor.new
30       end
31     end
33     def call(env)
34       if path_info.empty? || path_info.include?(env["PATH_INFO"])
35         # benefit curl users...
36         /\A100-continue\z/i =~ env['HTTP_EXPECT'] and return [ 100, {}, [] ]
38         length = env["CONTENT_LENGTH"] and length = length.to_i
39         env["TRANSFER_ENCODING"] =~ %r{\Achunked\z}i and length = nil
40         if length.nil? || length > 0
41           req = Rack::Request.new(env)
43           # can't blindly parse params here since we don't want to read
44           # the POST body if there is one, so only parse stuff in the
45           # query string...
46           if uid = req.GET["upload_id"]
47             return dup._call(env, uid, length)
48           end
49         end
50       end
51       app.call(env)
52     end
54     def _call(env, uid, length)
55       self.upload_id = env["upr.upload_id"] = uid
56       self.mtime = self.pos = self.seen = 0
57       self.input = env["rack.input"]
58       env["rack.input"] = self
59       self.content_length = length
60       backend.start(upload_id, length)
62       app.call(env)
63     end
65     def _incr(nr)
66       self.pos += nr
67       _finish if content_length && pos >= content_length
68       if (nr = pos - seen) > 0 && mtime <= (Time.now.to_i - frequency)
69         backend.incr(upload_id, nr)
70         self.seen = pos
71         self.mtime = Time.now.to_i
72       end
73     end
75     def _finish
76       self.seen = backend.finish(upload_id).seen
77       self.content_length ||= self.seen
78     end
80     def size
81       rv = input.size
83       # we had an unknown length and just had to read in everything to get it
84       if content_length.nil?
85         _incr(rv - seen)
86         _finish
87       end
88       rv
89     end
91     def rewind
92       self.pos = 0
93       input.rewind
94     end
96     def gets
97       rv = input.gets
98       rv.nil? ? _finish : _incr(rv.size)
99       rv
100     end
102     def read(*args)
103       rv = input.read(*args)
104       rv.nil? || rv.size == 0 ? _finish : _incr(rv.size)
105       rv
106     end
108     def each(&block)
109       input.each do |chunk| # usually just a line
110         _incr(chunk.size)
111         yield chunk
112       end
113       _finish
114     end
116   end