2 %% Copyright (C) 2006 Damien Katz
4 %% This program is free software; you can redistribute it and/or
5 %% modify it under the terms of the GNU General Public License
6 %% as published by the Free Software Foundation; either version 2
7 %% of the License, or (at your option) any later version.
9 %% This program is distributed in the hope that it will be useful,
10 %% but WITHOUT ANY WARRANTY; without even the implied warranty of
11 %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 %% GNU General Public License for more details.
14 %% You should have received a copy of the GNU General Public License
15 %% along with this program; if not, write to the Free Software
16 %% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 -module(couch_stream
).
19 -behaviour(gen_server
).
22 -export([open
/1, open
/2, read
/3, read_term
/2, write
/2, write_term
/2, get_state
/1]).
23 -export([init
/1, terminate
/2, handle_call
/3]).
24 -export([handle_cast
/2,code_change
/3,handle_info
/2]).
26 -include("couch_db.hrl").
28 -define(FILE_POINTER_BYTES
, 8).
29 -define(FILE_POINTER_BITS
, 8*(?FILE_POINTER_BYTES
)).
31 -define(STREAM_OFFSET_BYTES
, 4).
32 -define(STREAM_OFFSET_BITS
, 8*(?STREAM_OFFSET_BYTES
)).
39 alloc_sizer_func
= fun(NeededSize
) -> NeededSize
end % default is to return Needed size unchanged
49 %%% Interface functions %%%
57 {ok
, Pid
} = gen_server:start_link(couch_stream
, {State
, Fd
}, []),
58 {ok
, #stream
{pid = Pid
, fd
= Fd
}}.
60 close(#stream
{pid = Pid
, fd
= _Fd
}) ->
61 gen_server:call(Pid
, close
).
63 get_state(#stream
{pid = Pid
, fd
= _Fd
}) ->
64 gen_server:call(Pid
, get_state
).
66 read(#stream
{pid = _Pid
, fd
= Fd
}, Sp
, Num
) ->
69 {ok
, RevBinList
, Sp2
} = read_data(Fd
, Sp
, Num
, []),
70 Bin
= list_to_binary(lists:reverse(RevBinList
)),
73 read_term(#stream
{pid = _Pid
, fd
= Fd
}, Sp
) ->
76 {ok
, <<TermLen:(?STREAM_OFFSET_BITS
)>>, Sp2
}
77 = read(Fd
, Sp
, ?STREAM_OFFSET_BYTES
),
78 {ok
, Bin
, _Sp3
} = read(Fd
, Sp2
, TermLen
),
79 {ok
, binary_to_term(Bin
)}.
81 write_term(Stream
, Term
) ->
82 Bin
= term_to_binary(Term
),
84 Bin2
= <<Size:(?STREAM_OFFSET_BITS
), Bin
/binary>>,
87 write(#stream
{pid = Pid
}, Bin
) ->
88 gen_server:call(Pid
, {write
, Bin
}).
91 init({{Pos
, BytesRemaining
}, Fd
}) ->
95 bytes_remaining
= BytesRemaining
98 terminate(_Reason
, _Stream
) ->
101 handle_call(get_state
, _From
, Stream
) ->
102 #write_stream
{current_pos
=Pos
, bytes_remaining
= BytesRemaining
} = Stream
,
103 {reply
, {Pos
, BytesRemaining
}, Stream
};
104 handle_call({write
, Bin
}, _From
, Stream
) ->
105 AllocSizer
= Stream#write_stream
.alloc_sizer_func
,
106 % ensure init is called first so we can get a pointer to the begining of the binary
107 {ok
, Sp
, Stream2
} = ensure_init(Stream
, AllocSizer(size(Bin
))),
108 {ok
, Stream3
} = write_data(Stream2
, Bin
),
109 {reply
, {ok
, Sp
}, Stream3
};
110 handle_call(close
, _From
, Stream
) ->
111 #write_stream
{current_pos
=Pos
, bytes_remaining
= BytesRemaining
} = Stream
,
112 {stop
, normal
, {ok
, {Pos
, BytesRemaining
}}, Stream
}.
114 handle_cast(_Msg
, State
) ->
117 code_change(_OldVsn
, State
, _Extra
) ->
120 handle_info(_Info
, State
) ->
123 %%% Internal function %%%
125 read_data(_Fd
, Sp
, 0, RevBinList
) ->
126 {ok
, RevBinList
, Sp
};
127 read_data(Fd
, {Pos
, 0}, Num
, RevBinList
) ->
128 {ok
, <<NextPos:(?FILE_POINTER_BITS
), NextOffset:(?STREAM_OFFSET_BITS
)>>}
129 = couch_file:pread(Fd
, Pos
, ?FILE_POINTER_BYTES
+ ?STREAM_OFFSET_BYTES
),
130 Sp
= {NextPos
, NextOffset
},
131 read_data(Fd
, Sp
, Num
, RevBinList
);
132 read_data(Fd
, {Pos
, Offset
}, Num
, RevBinList
) ->
133 ReadAmount
= lists:min([Num
, Offset
]),
134 {ok
, Bin
} = couch_file:pread(Fd
, Pos
, ReadAmount
),
135 Sp
= {Pos
+ ReadAmount
, Offset
- ReadAmount
},
136 read_data(Fd
, Sp
, Num
- ReadAmount
, [Bin
| RevBinList
]).
138 write_data(Stream
, <<>>) ->
140 write_data(#write_stream
{fd
=Fd
, current_pos
=Pos
, bytes_remaining
=0, alloc_sizer_func
=AllocSizer
} = Stream
, Bin
) ->
141 % no space in the current segment, must alloc a new segment
142 NewSize
= AllocSizer(size(Bin
)),
143 {ok
, NewPos
} = couch_file:expand(Fd
, NewSize
+ ?FILE_POINTER_BYTES
+ ?STREAM_OFFSET_BYTES
),
144 ok
= couch_file:pwrite(Fd
, Pos
, <<NewPos:(?FILE_POINTER_BITS
), NewSize:(?STREAM_OFFSET_BITS
)>>),
145 Stream2
= Stream#write_stream
{
147 bytes_remaining
=NewSize
},
148 write_data(Stream2
, Bin
);
149 write_data(#write_stream
{fd
=Fd
, current_pos
=Pos
, bytes_remaining
=BytesRemaining
} = Stream
, Bin
) ->
150 BytesToWrite
= lists:min([size(Bin
), BytesRemaining
]),
151 {WriteBin
, Rest
} = split_binary(Bin
, BytesToWrite
),
152 ok
= couch_file:pwrite(Fd
, Pos
, WriteBin
),
153 Stream2
= Stream#write_stream
{
154 bytes_remaining
=BytesRemaining
- BytesToWrite
,
155 current_pos
=Pos
+ BytesToWrite
157 write_data(Stream2
, Rest
).
159 ensure_init(#write_stream
{current_pos
=0, bytes_remaining
=0}=Stream
, InitialSize
) ->
160 % null stream initialize
161 {ok
, NewPos
} = couch_file:expand(Stream#write_stream
.fd
, InitialSize
+ ?FILE_POINTER_BYTES
+ ?STREAM_OFFSET_BYTES
),
162 {ok
, {NewPos
, InitialSize
}, Stream#write_stream
{current_pos
= NewPos
, bytes_remaining
= InitialSize
}};
163 ensure_init(#write_stream
{current_pos
=Pos
, bytes_remaining
=0, alloc_sizer_func
=AllocSizer
, fd
=Fd
}=Stream
, InitialSize
) ->
164 % no space in the current segment, must alloc a new segment
165 NewSize
= AllocSizer(InitialSize
),
166 {ok
, NewPos
} = couch_file:expand(Fd
, InitialSize
+ ?FILE_POINTER_BYTES
+ ?STREAM_OFFSET_BYTES
),
167 ok
= couch_file:pwrite(Fd
, Pos
, <<NewPos:(?FILE_POINTER_BITS
), NewSize:(?STREAM_OFFSET_BITS
)>>),
168 {ok
, {NewPos
, NewSize
}, Stream#write_stream
{current_pos
= NewPos
, bytes_remaining
= NewSize
}};
169 ensure_init(#write_stream
{current_pos
=Pos
, bytes_remaining
=NextOffset
}=Stream
, _InitialSize
) ->
170 {ok
, {Pos
, NextOffset
}, Stream
}.
178 {ok
, Fd
} = couch_file:open("foo", [write
]),
179 {ok
, Stream
} = open({0,0}, Fd
),
180 {ok
, Pos
} = write_term(Stream
, Term
),
181 {ok
, Pos2
} = write_term(Stream
, {Term
, Term
}),
183 couch_file:close(Fd
),
184 {ok
, Fd2
} = couch_file:open("foo", [read
, write
]),
185 {ok
, Stream2
} = open({0,0}, Fd2
),
186 {ok
, Term1
} = read_term(Fd2
, Pos
),
187 io:format("Term1: ~w ~n",[Term1
]),
188 {ok
, Term2
} = read_term(Fd2
, Pos2
),
189 io:format("Term2: ~w ~n",[Term2
]),
190 {ok
, PointerList
} = deep_write_test(Stream2
, Term
, 1000, []),
191 deep_read_test(Fd2
, PointerList
),
193 couch_file:close(Fd2
).
195 deep_read_test(_Fd
, []) ->
197 deep_read_test(Fd
, [Pointer
| RestPointerList
]) ->
198 {ok
, _Term
} = read_term(Fd
, Pointer
),
199 deep_read_test(Fd
, RestPointerList
).
201 deep_write_test(_Stream
, _Term
, 0, PointerList
) ->
203 deep_write_test(Stream
, Term
, N
, PointerList
) ->
204 WriteList
= lists:duplicate(random:uniform(N
), Term
),
205 {ok
, Pointer
} = write_term(Stream
, WriteList
),
206 deep_write_test(Stream
, Term
, N
-1, [Pointer
| PointerList
]).