split out common read/write code
[metropolis.git] / lib / metropolis / tc / hdb.rb
blob4c58ca8ebe3b8e718667f93407259e65e855e277
1 # -*- encoding: binary -*-
3 # this module is NOT thread-safe, all performance is dependent on the
4 # local machine so there is never anything that needs yielding to threads.
5 module Metropolis::TC::HDB
6   autoload :RO, 'metropolis/tc/hdb/ro'
8   TCHDB = TokyoCabinet::HDB # :nodoc
9   include Metropolis::Common
11   def setup(opts)
12     @headers = { 'Content-Type' => 'application/octet-stream' }
13     @headers.merge!(opts[:response_headers] || {})
14     @nr_slots = opts[:nr_slots] || 3
15     path_pattern = opts[:path_pattern]
16     path_pattern.scan(/%\d*x/).size == 1 or
17       raise ArgumentError, "only one '/%\d*x/' may appear in #{path_pattern}"
18     @optimize = nil
19     if query = opts[:query]
20       flags = 0
21       @optimize = %w(bnum apow fpow).map do |x|
22         v = query[x]
23         v ? v.to_i : nil
24       end
25       case large = query['large']
26       when 'false', nil
27       when 'true'
28         flags |= TCHDB::TLARGE
29       else
30         raise ArgumentError, "invalid 'large' value: #{large}"
31       end
32       case compress = query['compress']
33       when nil
34       when 'deflate', 'bzip', 'tcbs'
35         flags |= TCHDB.const_get("T#{compress.upcase}")
36       else
37         raise ArgumentError, "invalid 'compress' value: #{compress}"
38       end
39       @optimize << flags
40     end
41     @dbv = (0...@nr_slots).to_a.map do |slot|
42       path = sprintf(path_pattern, slot)
43       hdb = TCHDB.new
44       unless opts[:read_only]
45         hdb.open(path, TCHDB::OWRITER | TCHDB::OCREAT) or ex!(:open, hdb)
46         if @optimize
47           hdb.optimize(*@optimize) or ex!(:optimize, hdb)
48         end
49         hdb.close or ex!(:close, hdb)
50       end
51       [ hdb, path ]
52     end
53     @rd_flags = TCHDB::OREADER
54     @wr_flags = TCHDB::OWRITER
55     if opts[:read_only]
56       extend(RO)
57     end
58   end
61   def ex!(msg, hdb)
62     raise "#{msg}: #{hdb.errmsg(hdb.ecode)}"
63   end
65   def writer(key, &block)
66     hdb, path = @dbv[key.hash % @nr_slots]
67     hdb.open(path, @wr_flags) or ex!(:open, hdb)
68     yield hdb
69     ensure
70       hdb.close or ex!(:close, hdb)
71   end
73   def reader(key)
74     hdb, path = @dbv[key.hash % @nr_slots]
75     hdb.open(path, @rd_flags) or ex!(:open, hdb)
76     yield hdb
77     ensure
78       hdb.close or ex!(:close, hdb)
79   end
81   def put(key, env)
82     value = env["rack.input"].read
83     writer(key) do |hdb|
84       case env['HTTP_X_TT_PDMODE']
85       when '1'
86         unless hdb.putkeep(key, value)
87           TCHDB::EKEEP == hdb.ecode and return r(409)
88           ex!(:putkeep, hdb)
89         end
90       when '2'
91         hdb.putcat(key, value) or ex!(:putcat, hdb)
92       else
93         # ttserver does not care for other PDMODE values, so we don't, either
94         hdb.put(key, value) or ex!(:put, hdb)
95       end
96     end
97     r(201)
98   end
100   def delete(key)
101     writer(key) do |hdb|
102       unless hdb.delete(key)
103         TCHDB::ENOREC == hdb.ecode and return r(404)
104         ex!(:delete, hdb)
105       end
106     end
107     r(200)
108   end
110   def head(key)
111     size = reader(key) { |hdb| hdb.vsiz(key) or ex!(:vsiz, hdb) }
112     0 > size and return r(404)
113     [ 200, {
114         'Content-Length' => size.to_s,
115       }.merge!(@headers), [] ]
116   end
118   def get(key)
119     value = nil
120     reader(key) do |hdb|
121       unless value = hdb.get(key)
122         TCHDB::ENOREC == hdb.ecode and return r(404)
123         ex!(:get, hdb)
124       end
125     end
126     [ 200, {
127         'Content-Length' => value.size.to_s,
128       }.merge!(@headers), [ value ] ]
129   end
131   def close!
132     @dbv.each { |(hdb,_)| hdb.close }
133   end