add C lib package
[couchdbimport.git] / CouchProjects / CouchDb / couch_stream.erl
blob147931715da7074066b29e2e44f13ce606ea45de
1 %% CouchDb
2 %% Copyright (C) 2006 Damien Katz
3 %%
4 %% This program is free software; you can redistribute it and/or
5 %% modify it under the terms of the GNU General Public License
6 %% as published by the Free Software Foundation; either version 2
7 %% of the License, or (at your option) any later version.
8 %%
9 %% This program is distributed in the hope that it will be useful,
10 %% but WITHOUT ANY WARRANTY; without even the implied warranty of
11 %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 %% GNU General Public License for more details.
13 %%
14 %% You should have received a copy of the GNU General Public License
15 %% along with this program; if not, write to the Free Software
16 %% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 -module(couch_stream).
19 -behaviour(gen_server).
21 -export([test/1]).
22 -export([open/1, open/2, read/3, read_term/2, write/2, write_term/2, get_state/1]).
23 -export([init/1, terminate/2, handle_call/3]).
24 -export([handle_cast/2,code_change/3,handle_info/2]).
26 -include("couch_db.hrl").
28 -define(FILE_POINTER_BYTES, 8).
29 -define(FILE_POINTER_BITS, 8*(?FILE_POINTER_BYTES)).
31 -define(STREAM_OFFSET_BYTES, 4).
32 -define(STREAM_OFFSET_BITS, 8*(?STREAM_OFFSET_BYTES)).
35 -record(write_stream,
36 {fd = 0,
37 current_pos = 0,
38 bytes_remaining = 0,
39 alloc_sizer_func = fun(NeededSize) -> NeededSize end % default is to return Needed size unchanged
40 }).
42 -record(stream,
44 pid,
46 }).
49 %%% Interface functions %%%
51 open(Fd) ->
52 open(nil, Fd).
54 open(nil, Fd) ->
55 open({0,0}, Fd);
56 open(State, Fd) ->
57 {ok, Pid} = gen_server:start_link(couch_stream, {State, Fd}, []),
58 {ok, #stream{pid = Pid, fd = Fd}}.
60 close(#stream{pid = Pid, fd = _Fd}) ->
61 gen_server:call(Pid, close).
63 get_state(#stream{pid = Pid, fd = _Fd}) ->
64 gen_server:call(Pid, get_state).
66 read(#stream{pid = _Pid, fd = Fd}, Sp, Num) ->
67 read(Fd, Sp, Num);
68 read(Fd, Sp, Num) ->
69 {ok, RevBinList, Sp2} = read_data(Fd, Sp, Num, []),
70 Bin = list_to_binary(lists:reverse(RevBinList)),
71 {ok, Bin, Sp2}.
73 read_term(#stream{pid = _Pid, fd = Fd}, Sp) ->
74 read_term(Fd, Sp);
75 read_term(Fd, Sp) ->
76 {ok, <<TermLen:(?STREAM_OFFSET_BITS)>>, Sp2}
77 = read(Fd, Sp, ?STREAM_OFFSET_BYTES),
78 {ok, Bin, _Sp3} = read(Fd, Sp2, TermLen),
79 {ok, binary_to_term(Bin)}.
81 write_term(Stream, Term) ->
82 Bin = term_to_binary(Term),
83 Size = size(Bin),
84 Bin2 = <<Size:(?STREAM_OFFSET_BITS), Bin/binary>>,
85 write(Stream, Bin2).
87 write(#stream{pid = Pid}, Bin) ->
88 gen_server:call(Pid, {write, Bin}).
91 init({{Pos, BytesRemaining}, Fd}) ->
92 {ok, #write_stream
93 {fd = Fd,
94 current_pos = Pos,
95 bytes_remaining = BytesRemaining
96 }}.
98 terminate(_Reason, _Stream) ->
99 ok.
101 handle_call(get_state, _From, Stream) ->
102 #write_stream{current_pos=Pos, bytes_remaining = BytesRemaining} = Stream,
103 {reply, {Pos, BytesRemaining}, Stream};
104 handle_call({write, Bin}, _From, Stream) ->
105 AllocSizer = Stream#write_stream.alloc_sizer_func,
106 % ensure init is called first so we can get a pointer to the begining of the binary
107 {ok, Sp, Stream2} = ensure_init(Stream, AllocSizer(size(Bin))),
108 {ok, Stream3} = write_data(Stream2, Bin),
109 {reply, {ok, Sp}, Stream3};
110 handle_call(close, _From, Stream) ->
111 #write_stream{current_pos=Pos, bytes_remaining = BytesRemaining} = Stream,
112 {stop, normal, {ok, {Pos, BytesRemaining}}, Stream}.
114 handle_cast(_Msg, State) ->
115 {noreply,State}.
117 code_change(_OldVsn, State, _Extra) ->
118 {ok, State}.
120 handle_info(_Info, State) ->
121 {noreply, State}.
123 %%% Internal function %%%
125 read_data(_Fd, Sp, 0, RevBinList) ->
126 {ok, RevBinList, Sp};
127 read_data(Fd, {Pos, 0}, Num, RevBinList) ->
128 {ok, <<NextPos:(?FILE_POINTER_BITS), NextOffset:(?STREAM_OFFSET_BITS)>>}
129 = couch_file:pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
130 Sp = {NextPos, NextOffset},
131 read_data(Fd, Sp, Num, RevBinList);
132 read_data(Fd, {Pos, Offset}, Num, RevBinList) ->
133 ReadAmount = lists:min([Num, Offset]),
134 {ok, Bin} = couch_file:pread(Fd, Pos, ReadAmount),
135 Sp = {Pos + ReadAmount, Offset - ReadAmount},
136 read_data(Fd, Sp, Num - ReadAmount, [Bin | RevBinList]).
138 write_data(Stream, <<>>) ->
139 {ok, Stream};
140 write_data(#write_stream{fd=Fd, current_pos=Pos, bytes_remaining=0, alloc_sizer_func=AllocSizer} = Stream, Bin) ->
141 % no space in the current segment, must alloc a new segment
142 NewSize = AllocSizer(size(Bin)),
143 {ok, NewPos} = couch_file:expand(Fd, NewSize + ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
144 ok = couch_file:pwrite(Fd, Pos, <<NewPos:(?FILE_POINTER_BITS), NewSize:(?STREAM_OFFSET_BITS)>>),
145 Stream2 = Stream#write_stream{
146 current_pos=NewPos,
147 bytes_remaining=NewSize},
148 write_data(Stream2, Bin);
149 write_data(#write_stream{fd=Fd, current_pos=Pos, bytes_remaining=BytesRemaining} = Stream, Bin) ->
150 BytesToWrite = lists:min([size(Bin), BytesRemaining]),
151 {WriteBin, Rest} = split_binary(Bin, BytesToWrite),
152 ok = couch_file:pwrite(Fd, Pos, WriteBin),
153 Stream2 = Stream#write_stream{
154 bytes_remaining=BytesRemaining - BytesToWrite,
155 current_pos=Pos + BytesToWrite
157 write_data(Stream2, Rest).
159 ensure_init(#write_stream{current_pos=0, bytes_remaining=0}=Stream, InitialSize) ->
160 % null stream initialize
161 {ok, NewPos} = couch_file:expand(Stream#write_stream.fd, InitialSize + ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
162 {ok, {NewPos, InitialSize}, Stream#write_stream{current_pos = NewPos, bytes_remaining = InitialSize}};
163 ensure_init(#write_stream{current_pos=Pos, bytes_remaining=0, alloc_sizer_func=AllocSizer, fd=Fd}=Stream, InitialSize) ->
164 % no space in the current segment, must alloc a new segment
165 NewSize = AllocSizer(InitialSize),
166 {ok, NewPos} = couch_file:expand(Fd, InitialSize + ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
167 ok = couch_file:pwrite(Fd, Pos, <<NewPos:(?FILE_POINTER_BITS), NewSize:(?STREAM_OFFSET_BITS)>>),
168 {ok, {NewPos, NewSize}, Stream#write_stream{current_pos = NewPos, bytes_remaining = NewSize}};
169 ensure_init(#write_stream{current_pos=Pos, bytes_remaining=NextOffset}=Stream, _InitialSize) ->
170 {ok, {Pos, NextOffset}, Stream}.
174 %%% Tests %%%
177 test(Term) ->
178 {ok, Fd} = couch_file:open("foo", [write]),
179 {ok, Stream} = open({0,0}, Fd),
180 {ok, Pos} = write_term(Stream, Term),
181 {ok, Pos2} = write_term(Stream, {Term, Term}),
182 close(Stream),
183 couch_file:close(Fd),
184 {ok, Fd2} = couch_file:open("foo", [read, write]),
185 {ok, Stream2} = open({0,0}, Fd2),
186 {ok, Term1} = read_term(Fd2, Pos),
187 io:format("Term1: ~w ~n",[Term1]),
188 {ok, Term2} = read_term(Fd2, Pos2),
189 io:format("Term2: ~w ~n",[Term2]),
190 {ok, PointerList} = deep_write_test(Stream2, Term, 1000, []),
191 deep_read_test(Fd2, PointerList),
192 close(Stream2),
193 couch_file:close(Fd2).
195 deep_read_test(_Fd, []) ->
197 deep_read_test(Fd, [Pointer | RestPointerList]) ->
198 {ok, _Term} = read_term(Fd, Pointer),
199 deep_read_test(Fd, RestPointerList).
201 deep_write_test(_Stream, _Term, 0, PointerList) ->
202 {ok, PointerList};
203 deep_write_test(Stream, Term, N, PointerList) ->
204 WriteList = lists:duplicate(random:uniform(N), Term),
205 {ok, Pointer} = write_term(Stream, WriteList),
206 deep_write_test(Stream, Term, N-1, [Pointer | PointerList]).