_temp_view support for alternate query languages, using content-type
[couchdbimport.git] / src / CouchDb / couch_db.erl
blob58bff32c8df93b3051f840f832c66320c749ed5b
1 %% CouchDb
2 %% Copyright (C) 2006 Damien Katz
3 %%
4 %% This program is free software; you can redistribute it and/or
5 %% modify it under the terms of the GNU General Public License
6 %% as published by the Free Software Foundation; either version 2
7 %% of the License, or (at your option) any later version.
8 %%
9 %% This program is distributed in the hope that it will be useful,
10 %% but WITHOUT ANY WARRANTY; without even the implied warranty of
11 %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 %% GNU General Public License for more details.
13 %%
14 %% You should have received a copy of the GNU General Public License
15 %% along with this program; if not, write to the Free Software
16 %% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 -module(couch_db).
19 -behaviour(gen_server).
21 -export([open/2,create/2,create/3,save_doc/2,save_doc/3,get_doc_info/2, save_doc_revs/3]).
22 -export([save_docs/2, save_docs/3]).
23 -export([delete_doc/3,open_doc/2,open_doc/3,close/1,enum_docs_since/4,enum_docs_since/5]).
24 -export([enum_docs/4,enum_docs/5, open_doc_revs/4, get_missing_revs/2]).
25 -export([update_view_group_sync/2,update_view_group/2,fold_view/6,fold_view/7,get_info/1]).
26 -export([update_temp_view_group_sync/2, fold_temp_view/5,fold_temp_view/6]).
27 -export([update_loop/2]).
28 -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
29 -export([revision_list_to_trees/2 , merge_rev_trees/2 ]).
31 -include("couch_db.hrl").
33 -define(HEADER_SIZE, 2048). % size of each segment of the doubly written header
35 -define(DB_FILE_SIG, <<$g, $m, $k, 0>>). % fixed signature at the beginning of each file header
37 -define(FILE_VERSION, 1). % the current file version this code base uses.
38 % In the future CouchDb will have the ability
39 % to use/convert different file versions.
42 -record(db_header,
43 {write_version = 0,
44 last_update_seq = 0,
45 summary_stream_state = nil,
46 docinfo_by_uuid_btree_state = nil,
47 docinfo_by_seq_btree_state = nil,
48 view_group_btree_state = nil,
49 local_docs_btree_state = nil,
50 doc_count=0
51 }).
53 -record(db,
54 {fd=0,
55 supervisor=0,
56 header = #db_header{},
57 uncommitted_writes = false,
58 summary_stream,
59 docinfo_by_uuid_btree,
60 docinfo_by_seq_btree,
61 local_docs_btree,
62 last_update_seq,
63 view_group_btree,
64 doc_count,
65 name,
66 view_group_mgr
67 }).
69 -record(main,
70 {db,
71 update_pid,
72 view_group_mgr,
73 temp_view_group_mgr
74 }).
76 start_link(DbName, Filepath, Options) ->
77 {ok, Super} = couch_db_sup:start_link(),
78 FdResult = supervisor:start_child(Super,
79 {couch_file,
80 {couch_file, open, [Filepath, Options]},
81 permanent,
82 brutal_kill,
83 worker,
84 [couch_file]}),
85 case FdResult of
86 {ok, Fd} ->
87 {ok, _Db} = supervisor:start_child(Super,
88 {couch_db,
89 %%%%%% below calls gen_server:start_link(couch_db, {Fd, Super, Options}, []).
90 {gen_server, start_link, [couch_db, {DbName, Fd, Super, Options}, []]},
91 permanent,
92 10,
93 worker,
94 [couch_db]}),
95 {ok, Super};
96 {error, {enoent, _ChildInfo}} ->
97 % couldn't find file
98 exit(Super,kill),
99 {error, not_found};
100 {error, {Error, _ChildInfo}} ->
101 exit(Super,kill),
102 {error, Error};
103 Else ->
104 exit(Super,kill),
105 {error, Else}
106 end.
108 %%% Interface functions %%%
110 create(Filepath, Options) ->
111 create(Filepath, Filepath, Options).
113 create(DbName, Filepath, Options) when is_list(Options) ->
114 start_link(DbName, Filepath, [create | Options]).
116 open(DbName, Filepath) ->
117 start_link(DbName, Filepath, []).
119 delete_doc(SupPid, Uuid, Revisions) ->
120 case open_doc_revs(SupPid, Uuid, Revisions, []) of
121 {ok, DocResults} ->
122 DeletedDocs = [Doc#doc{deleted=true} || {ok, #doc{deleted=false}=Doc} <- DocResults],
123 save_doc_revs(SupPid, DeletedDocs, [new_edits]);
124 Error ->
125 Error
126 end.
128 open_doc(_SupPid, #doc_info{deleted=true}) ->
129 {not_found, deleted};
130 open_doc(SupPid, UuidOrDocInfo) ->
131 open_doc(SupPid, UuidOrDocInfo, []).
133 open_doc(SupPid, Uuid, Options) ->
134 open_doc_int(get_db(db_pid(SupPid)), Uuid, Options).
136 open_doc_revs(SupPid, Uuid, Revs, Options) ->
137 open_doc_revs_int(get_db(db_pid(SupPid)), Uuid, Revs, Options).
139 get_missing_revs(SupPid, UuidRevsList) ->
140 Uuids = [Uuid1 || {Uuid1, _Revs} <- UuidRevsList],
141 {ok, FullDocInfoResults} = get_full_doc_infos(SupPid, Uuids),
142 Results =
143 lists:zipwith( fun({Uuid, Revs}, FullDocInfoResult) ->
144 case FullDocInfoResult of
145 {ok, {_, RevisionTrees}} ->
146 {Uuid, find_missing_revisions(Revs, RevisionTrees)};
147 {not_found, _} ->
148 {Uuid, Revs}
150 end, UuidRevsList, FullDocInfoResults),
151 {ok, Results}.
153 get_doc_info(Db, Uuid) ->
154 case get_full_doc_info(Db, Uuid) of
155 {ok, {DocInfo, _RevisionTrees}} ->
156 {ok, DocInfo};
157 Else ->
158 Else
159 end.
161 get_full_doc_info(Db, Uuid) ->
162 case get_full_doc_infos(Db, [Uuid]) of
163 {ok, [{ok, DocInfo}]} ->
164 {ok, DocInfo};
165 {ok, [{not_found, Uuid}]} ->
166 not_found
167 end.
169 get_full_doc_infos(SupPid, Uuids) when is_pid(SupPid) ->
170 get_full_doc_infos(get_db(db_pid(SupPid)), Uuids);
171 get_full_doc_infos(#db{}=Db, Uuids) ->
172 {ok, LookupResults} = couch_btree:lookup(Db#db.docinfo_by_uuid_btree, Uuids),
173 FinalResults =
174 lists:map(
175 fun({ok, {Uuid, {UpdateSeq, Rev, SummaryPointer, RevisionTrees}}}) ->
176 {Conflicts, DeletedConflicts} = get_conflict_revs(RevisionTrees),
177 {ok,
179 #doc_info
181 uuid=Uuid,
182 revision=Rev,
183 update_seq=UpdateSeq,
184 summary_pointer=SummaryPointer,
185 deleted=(summary_ptr_type(SummaryPointer)==deletion),
186 conflict_revs=Conflicts,
187 deleted_conflict_revs=DeletedConflicts
189 RevisionTrees
192 ({not_found, {Uuid, _}}) ->
193 {not_found, Uuid}
194 end, LookupResults),
196 {ok, FinalResults}.
198 get_info(SupPid) ->
199 gen_server:call(db_pid(SupPid), get_info).
201 save_doc(SupPid, Doc) ->
202 save_doc(SupPid, Doc, []).
204 save_doc(SupPid, Doc, Options) ->
205 {ok, [[Result]]} = save_docs(SupPid, [{[Doc], [new_edits | Options]}], Options),
206 Result.
208 save_doc_revs(SupPid, Docs, Options) ->
209 {ok, [RetValue]} = save_docs(SupPid, [{Docs, Options}], Options),
210 {ok, RetValue}.
212 save_docs(SupPid, DocAndOptions) ->
213 save_docs(SupPid, DocAndOptions, []).
215 save_docs(SupPid, DocsAndOptions, TransactionOptions) ->
216 % flush unwritten binaries to disk.
217 Db = get_db(db_pid(SupPid)),
218 DocsAndOptions2 = [{[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Docs], Options} || {Docs, Options} <- DocsAndOptions],
219 {ok, _RetValues} = gen_server:call(db_pid(SupPid), {update_docs, DocsAndOptions2, TransactionOptions}).
222 doc_flush_binaries(Doc, Fd) ->
223 % calc size of binaries to write out
224 Bins = Doc#doc.attachments,
225 PreAllocSize =
226 lists:foldl(
227 fun(BinValue, SizeAcc) ->
228 case BinValue of
229 {_Key, {_Type, {Fd0, _StreamPointer, _Len}}} when Fd0 == Fd ->
230 % already written to our file, nothing to write
231 SizeAcc;
232 {_Key, {_Type, {_OtherFd, _StreamPointer, Len}}} ->
233 % written to a different file
234 SizeAcc + Len;
235 {_Key, {_Type, Bin}} when is_binary(Bin) ->
236 SizeAcc + size(Bin)
238 end,
239 0, Bins),
241 {ok, OutputStream} = couch_stream:open(Fd),
242 ok = couch_stream:ensure_buffer(OutputStream, PreAllocSize),
244 NewBins = lists:map(
245 fun({Key, {Type, BinValue}}) ->
246 NewBinValue =
247 case BinValue of
248 {Fd0, StreamPointer, Len} when Fd0 == Fd ->
249 % already written to our file, nothing to write
250 {Fd, StreamPointer, Len};
251 {OtherFd, StreamPointer, Len} ->
252 % written to a different file (or a closed file
253 % instance, which will cause an error)
254 {ok, {NewStreamPointer, Len}, _EndSp} =
255 couch_stream:foldl(OtherFd, StreamPointer, Len,
256 fun(Bin, {BeginPointer, SizeAcc}) ->
257 {ok, Pointer} = couch_stream:write(OutputStream, Bin),
258 case SizeAcc of
259 0 -> % this was the first write, record the pointer
260 {ok, {Pointer, size(Bin)}};
261 _ ->
262 {ok, {BeginPointer, SizeAcc + size(Bin)}}
264 end,
265 {{0,0}, 0}),
266 {Fd, NewStreamPointer, Len};
267 Bin when is_binary(Bin), size(Bin) > 0 ->
268 {ok, StreamPointer} = couch_stream:write(OutputStream, Bin),
269 {Fd, StreamPointer, size(Bin)}
270 end,
271 {Key, {Type, NewBinValue}}
272 end, Bins),
274 {ok, _FinalPos} = couch_stream:close(OutputStream),
276 Doc#doc{attachments = NewBins}.
278 enum_docs_since(SupPid, SinceSeq, Direction, InFun, Ctx) ->
279 Db = get_db(db_pid(SupPid)),
280 EnumFun = fun({UpdateSeq, {Uuid, Rev, SummaryPointer, ConflictRevs, DeletedConflictRevs}}, EnumCtx) ->
281 DocInfo = #doc_info{
282 uuid = Uuid,
283 revision = Rev,
284 update_seq = UpdateSeq,
285 summary_pointer = SummaryPointer,
286 conflict_revs = ConflictRevs,
287 deleted_conflict_revs = DeletedConflictRevs,
288 deleted = (summary_ptr_type(SummaryPointer) == deletion)
290 InFun(DocInfo, EnumCtx)
291 end,
293 couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, EnumFun, Ctx).
295 enum_docs_since(SupPid, SinceSeq, InFun, Ctx) ->
296 enum_docs_since(SupPid, SinceSeq, fwd, InFun, Ctx).
298 enum_docs(SupPid, StartUuid, Direction, InFun, Ctx) ->
299 Db = get_db(db_pid(SupPid)),
300 EnumFun = fun({Uuid, {UpdateSeq, Rev, SummaryPointer, RevTrees}}, EnumCtx) ->
301 {ConflictRevs, DeletedConflictRevs} = get_conflict_revs(RevTrees),
302 DocInfo = #doc_info{
303 uuid = Uuid,
304 revision = Rev,
305 update_seq = UpdateSeq,
306 summary_pointer = SummaryPointer,
307 deleted = (summary_ptr_type(SummaryPointer) == deletion),
308 conflict_revs = ConflictRevs,
309 deleted_conflict_revs = DeletedConflictRevs
311 InFun(DocInfo, EnumCtx)
312 end,
313 couch_btree:fold(Db#db.docinfo_by_uuid_btree, StartUuid, Direction, EnumFun, Ctx).
315 enum_docs(SupPid, StartUuid, InFun, Ctx) ->
316 enum_docs(SupPid, StartUuid, fwd, InFun, Ctx).
319 update_view_group(SupPid, ViewGroupDocUuid) ->
320 gen_server:call(db_pid(SupPid), {update_view_group, ViewGroupDocUuid, fun(_Whatever) -> ok end}).
323 sync_update_notify(Pid, Ref, partial) ->
324 % We want to wait until complete
325 % so return a fun that calls ourself
326 fun(Status)-> sync_update_notify(Pid, Ref, Status) end;
327 sync_update_notify(Pid, Ref, complete) ->
328 Pid ! {Ref, ok};
329 sync_update_notify(Pid, Ref, Else) ->
330 Pid ! {Ref, Else}.
333 update_view_group_sync(SupPid, ViewGroupDocUuid) ->
334 update_view_group_sync0(SupPid, update_view_group, ViewGroupDocUuid).
336 update_temp_view_group_sync(SupPid, MapFunSrc) ->
337 update_view_group_sync0(SupPid, update_temp_view_group, MapFunSrc).
339 update_view_group_sync0(SupPid, Type, Id) ->
340 Pid = self(),
341 Ref = make_ref(),
342 UpdateFun = fun(Status)-> sync_update_notify(Pid, Ref, Status) end,
343 case gen_server:call(db_pid(SupPid), {Type, Id, UpdateFun}, infinity) of
344 ok ->
345 receive
346 {Ref, Result} ->
347 Result
348 end;
349 Else ->
350 Else
351 end.
353 fold_view(SupPid, ViewGroupDocUuid, ViewName, Dir, Fun, Acc) ->
354 case gen_server:call(db_pid(SupPid), {get_view_group, ViewGroupDocUuid}) of
355 {ok, ViewGroup} ->
356 couch_view_group:fold(ViewGroup, ViewName, Dir, Fun, Acc);
357 Else ->
358 Else
359 end.
361 fold_view(SupPid, ViewGroupDocUuid, ViewName, StartKey, Dir, Fun, Acc) ->
362 case gen_server:call(db_pid(SupPid), {get_view_group, ViewGroupDocUuid}) of
363 {ok, ViewGroup} ->
364 couch_view_group:fold(ViewGroup, ViewName, StartKey, Dir, Fun, Acc);
365 Else ->
366 Else
367 end.
369 fold_temp_view(SupPid, Src, Dir, Fun, Acc) ->
370 case gen_server:call(db_pid(SupPid), {get_temp_view_group, Src}) of
371 {ok, ViewGroup} ->
372 couch_view_group:fold(ViewGroup, Src, Dir, Fun, Acc);
373 Else ->
374 Else
375 end.
377 fold_temp_view(SupPid, Src, StartKey, Dir, Fun, Acc) ->
378 case gen_server:call(db_pid(SupPid), {get_temp_view_group, Src}) of
379 {ok, ViewGroup} ->
380 couch_view_group:fold(ViewGroup, Src, StartKey, Dir, Fun, Acc);
381 Else ->
382 Else
383 end.
385 close(SupPid) ->
386 Ref = erlang:monitor(process, SupPid),
387 unlink(SupPid),
388 exit(SupPid, normal),
389 receive
390 {'DOWN', Ref, process, SupPid, _Reason} ->
392 end.
396 % server functions
398 init({Filepath, Fd, Supervisor, Options}) ->
399 case lists:member(create, Options) of
400 true ->
401 init_main(Filepath, Fd, Supervisor, nil);
402 false ->
403 {ok, Header} = read_header(Filepath, Fd),
404 init_main(Filepath, Fd, Supervisor, Header)
405 end.
408 init_main(Filepath, Fd, Supervisor, nil) ->
409 % creates a new header and writes it to the file
410 {ok, _} = couch_file:expand(Fd, 2*(?HEADER_SIZE)),
411 Header = #db_header{},
412 {ok, Header2} = write_header(Fd, Header),
413 ok = couch_file:sync(Fd),
414 init_main(Filepath, Fd, Supervisor, Header2);
415 init_main(Filepath, Fd, Supervisor, Header) ->
416 {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd),
417 ok = couch_stream:set_min_buffer(SummaryStream, 10000),
418 {ok, UuidBtree} = couch_btree:open(Header#db_header.docinfo_by_uuid_btree_state, Fd),
419 {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd),
420 {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd),
421 {ok, ViewGroupBtree} = couch_btree:open(Header#db_header.view_group_btree_state, Fd),
423 Db = #db{
424 fd=Fd,
425 supervisor=Supervisor,
426 header=Header,
427 summary_stream = SummaryStream,
428 docinfo_by_uuid_btree = UuidBtree,
429 docinfo_by_seq_btree = SeqBtree,
430 local_docs_btree = LocalDocsBtree,
431 last_update_seq = Header#db_header.last_update_seq,
432 view_group_btree = ViewGroupBtree,
433 doc_count = Header#db_header.doc_count,
434 name = Filepath
437 UpdatePid = spawn_link(couch_db, update_loop, [self(), Db]),
439 Pid=self(),
441 GetViewGroupInfoFun = fun(GroupKey) ->
442 get_view_group_info(get_db(Pid), GroupKey)
443 end,
445 GetTempViewGroupInfoFun = fun(GroupKey) ->
446 % for temp views, the groupkey is the source. and we never persist info
447 Type = lists:takewhile(fun($|) -> false; (_) -> true end, GroupKey),
448 [$| | Function] = lists:dropwhile(fun($|) -> false; (_) -> true end, GroupKey),
449 {ok, {{Type, [{GroupKey, Function}]}, nil}}
450 end,
452 UpdateViewGroupInfoFun = fun(GroupKey, UpdateStatus, GroupInfo) ->
453 % send the updated view group info to the update process
454 UpdatePid ! {view_group_updated, GroupKey, UpdateStatus, GroupInfo},
456 end,
458 UpdateTempViewGroupInfoFun = fun(_GroupKey, _UpdateStatus, _GroupInfo) ->
459 ok % do nothing
460 end,
461 {ok, TempFd} = couch_file:open(Filepath ++ ".temp", [create,overwrite]),
462 {ok, ViewMgr} = couch_view_group:start_manager(Supervisor, Fd, GetViewGroupInfoFun, UpdateViewGroupInfoFun),
463 {ok, TempViewMgr} = couch_view_group:start_manager(Supervisor, TempFd, GetTempViewGroupInfoFun, UpdateTempViewGroupInfoFun),
465 UpdatePid ! {set_view_group_mgr, ViewMgr},
467 {ok, #main{db=Db, update_pid=UpdatePid, view_group_mgr=ViewMgr, temp_view_group_mgr=TempViewMgr}}.
469 terminate(_Reason, #main{db=Db} = Main) ->
470 Main#main.update_pid ! close,
471 couch_view_group:stop(Main#main.view_group_mgr),
472 couch_view_group:stop(Main#main.temp_view_group_mgr),
473 couch_file:close(Db#db.fd).
475 handle_call({get_view_group, ViewGroupDocUuid}, From, #main{db=Db}=Main) ->
476 case get_doc_info(Db, ViewGroupDocUuid) of
477 {ok, #doc_info{deleted=true}} ->
478 {reply, {not_found, deleted}, Main};
479 {ok, DocInfo} ->
480 ok = couch_view_group:get_group_async(Main#main.view_group_mgr, DocInfo, From),
481 {noreply, Main};
482 not_found ->
483 {reply, {not_found, missing}, Main}
484 end;
485 handle_call({get_temp_view_group, MapFunSrc}, From, Main) ->
486 ok = couch_view_group:get_group_async(Main#main.temp_view_group_mgr, MapFunSrc, From),
487 {noreply, Main};
488 handle_call({update_docs, DocActions, Options}, From, Main) ->
489 Main#main.update_pid ! {From, update_docs, DocActions, Options},
490 {noreply, Main};
491 handle_call(get_db, _From, #main{db=Db}=Main) ->
492 {reply, {ok, Db}, Main};
493 handle_call(get_info, _From, #main{db=Db}=Main) ->
494 InfoList = [
495 {doc_count, Db#db.doc_count},
496 {last_update_seq, Db#db.last_update_seq}
498 {reply, {ok, InfoList}, Main};
499 handle_call({update_view_group, Uuid, UpdateNotifFun}, _From, #main{db=Db}=Main) ->
500 case get_doc_info(Db, Uuid) of
501 {ok, DocInfo} ->
502 ok = couch_view_group:update_group(Main#main.view_group_mgr, DocInfo, UpdateNotifFun),
503 {reply, ok, Main};
504 Error ->
505 {reply, Error, Main}
506 end;
507 handle_call({update_temp_view_group, Src, UpdateNotifFun}, _From, Main) ->
508 {reply, couch_view_group:update_group(Main#main.temp_view_group_mgr, Src, UpdateNotifFun), Main};
509 handle_call({db_updated, NewDb}, _From, Main) ->
510 {reply, ok, Main#main{db=NewDb}}.
513 handle_cast(foo, Main) ->
514 {noreply, Main}.
516 %%% Internal function %%%
519 db_pid(SupPid)->
520 {error, {already_started, DbPid}} = supervisor:start_child(SupPid,
521 {couch_db,
522 {couch_db, sup_start_link, []},
523 permanent,
524 brutal_kill,
525 worker,
526 [couch_db]}),
527 DbPid.
530 update_loop(MainPid, Db) ->
531 receive
532 {set_view_group_mgr, ViewMgr} ->
533 update_loop(MainPid, Db#db{view_group_mgr=ViewMgr});
534 {OrigFrom, update_docs, DocActions, Options} ->
535 {ok, DocResults, Db2} = update_docs_int(Db, DocActions, Options),
536 ok = gen_server:call(MainPid, {db_updated, Db2}),
537 couch_db_update_notifier:notify_all(Db2#db.name),
538 gen_server:reply(OrigFrom, {ok, DocResults}),
539 update_loop(MainPid, Db2);
540 {view_group_updated, #doc_info{uuid=Uuid}=GroupDocInfo, _UpdateStatus, ViewGroupInfo} ->
541 case get_doc_info(Db, GroupDocInfo#doc_info.uuid) of
542 {ok, GroupDocInfo} ->
543 % revision on disk matches the revision of the view group being updated
544 % so we save the info to disk
545 {ok, GroupBtree2} = couch_btree:add_remove(Db#db.view_group_btree, [{Uuid, ViewGroupInfo}], []),
546 Db2 = Db#db{view_group_btree=GroupBtree2, uncommitted_writes=true},
547 {ok, Db3} = commit_outstanding(Db2),
548 ok = gen_server:call(MainPid, {db_updated, Db3}),
549 update_loop(MainPid, Db3);
550 _Else ->
551 % doesn't match, don't save in btree
552 update_loop(MainPid, Db)
553 end;
554 close ->
555 % terminate loop
557 end.
559 get_view_group_info(#db{}=Db, #doc_info{uuid=Uuid}=DocInfo) ->
560 case couch_btree:lookup_single(Db#db.view_group_btree, Uuid) of
561 {ok, {ViewQueries, ViewGroupState}} ->
562 {ok, {ViewQueries, ViewGroupState}};
563 not_found ->
564 {ok, Doc} = open_doc_int(Db, DocInfo, []),
565 case couch_doc:get_view_functions(Doc) of
566 none ->
567 {not_found, no_views_found};
568 Queries ->
569 {ok, {Queries, nil}}
571 end.
573 get_db(DbPid) ->
574 {ok, Db} = gen_server:call(DbPid, get_db),
577 open_doc_revs_int(Db, Uuid, Revs, Options) ->
578 case get_full_doc_info(Db, Uuid) of
579 {ok, {_DocInfo, RevisionTree}} ->
580 {FoundRevs, MissingRevs} =
581 case Revs of
582 all ->
583 {get_all_leafs(RevisionTree, []), []};
584 _ ->
585 case lists:member(latest, Options) of
586 true ->
587 get_rev_leafs(RevisionTree, Revs, []);
588 false ->
589 get_revs(RevisionTree, Revs, [])
591 end,
592 FoundResults =
593 lists:map(fun({Rev, FoundSummaryPtr, FoundRevPath}) ->
594 case summary_ptr_type(FoundSummaryPtr) of
595 missing ->
596 % we have the rev in our but know nothing about it
597 {{not_found, missing}, Rev};
598 deletion ->
599 {ok, make_doc(Db, Uuid, FoundSummaryPtr, FoundRevPath)};
600 disk ->
601 {ok, make_doc(Db, Uuid, FoundSummaryPtr, FoundRevPath)}
603 end, FoundRevs),
604 Results = FoundResults ++ [{{not_found, missing}, MissingRev} || MissingRev <- MissingRevs],
605 {ok, Results};
606 not_found when Revs == all ->
607 {ok, []};
608 not_found ->
609 {ok, [{{not_found, missing}, Rev} || Rev <- Revs]}
610 end.
612 open_doc_int(Db, ?NON_REP_DOC_PREFIX ++ Uuid, _Options) ->
613 case couch_btree:lookup_single(Db#db.local_docs_btree, Uuid) of
614 {ok, BodyData} ->
615 {ok, #doc{uuid=?NON_REP_DOC_PREFIX ++ Uuid, body=BodyData}};
616 not_found ->
617 {not_found, missing}
618 end;
619 open_doc_int(Db, #doc_info{revision=Rev,summary_pointer=SummaryPointer}=DocInfo, Options) ->
620 open_doc_int(Db, {DocInfo, [{Rev, SummaryPointer, []}]}, Options);
621 open_doc_int(Db, {#doc_info{uuid=Uuid,revision=Rev,summary_pointer=SummaryPointer,deleted=Deleted}, RevisionTree}, Options) ->
622 case (not Deleted) orelse lists:member(allow_stub, Options) of
623 true ->
624 {[{_,_, RevPath}], []} = get_revs(RevisionTree, [Rev], []),
625 {ok, make_doc(Db, Uuid, SummaryPointer, RevPath)};
626 false ->
627 {not_found, deleted}
628 end;
629 open_doc_int(Db, Uuid, Options) ->
630 case get_full_doc_info(Db, Uuid) of
631 {ok, {DocInfo, RevisionTree}} ->
632 open_doc_int(Db, {DocInfo, RevisionTree}, Options);
633 not_found ->
634 {not_found, missing}
635 end.
637 % revision tree functions
639 merge_rev_trees([], B) ->
640 {B, 0};
641 merge_rev_trees(A, []) ->
642 {A, 0};
643 merge_rev_trees([ATree | ANextTree], [BTree | BNextTree]) ->
644 {ARev, ADoc, ASubTrees} = ATree,
645 {BRev, _BDoc, BSubTrees} = BTree,
647 ARev == BRev ->
648 %same rev
649 {MergedSubTrees, SubTreesConflicts} = merge_rev_trees(ASubTrees, BSubTrees),
650 {MergedNextTrees, NextConflicts} = merge_rev_trees(ANextTree, BNextTree),
651 {[{ARev, ADoc, MergedSubTrees} | MergedNextTrees], SubTreesConflicts + NextConflicts};
652 ARev < BRev ->
653 {Merged, Conflicts} = merge_rev_trees(ANextTree, [BTree | BNextTree]),
654 {[ATree | Merged], Conflicts + 1};
655 true ->
656 {Merged, Conflicts} = merge_rev_trees([ATree | ANextTree], BNextTree),
657 {[BTree | Merged], Conflicts + 1}
658 end.
660 find_missing_revisions([], _Trees) ->
662 find_missing_revisions(SrcRevs, []) ->
663 SrcRevs;
664 find_missing_revisions(SrcRevs, [{RevId, _, SubTrees} | RestTrees]) ->
665 SrcRevs2 = lists:delete(RevId, SrcRevs),
666 SrcRevs3 = find_missing_revisions(SrcRevs2, SubTrees),
667 find_missing_revisions(SrcRevs3, RestTrees).
669 % get the latest leaf revisions for the found revision.
670 % Often these are the same revision.
671 get_rev_leafs(_Trees, [], _RevPathAcc) ->
672 {[], []};
673 get_rev_leafs([], RevsToGet, _RevPathAcc) ->
674 {[], RevsToGet};
675 get_rev_leafs([{RevId, _SummaryPtr, SubTrees}=Tree | RestTrees], RevsToGet, RevPathAcc) ->
676 case lists:member(RevId, RevsToGet) of
677 true -> % found it
678 LeafsFound = get_all_leafs([Tree], RevPathAcc),
679 LeafRevsFound = [LeafRevFound || {LeafRevFound, _, _} <- LeafsFound],
680 RevsToGet2 = RevsToGet -- LeafRevsFound,
681 {RestLeafsFound, RevsRemaining} = get_rev_leafs(RestTrees, RevsToGet2, RevPathAcc),
682 {LeafsFound ++ RestLeafsFound, RevsRemaining};
683 false ->
684 {LeafsFound, RevsToGet2} = get_rev_leafs(SubTrees, RevsToGet, [RevId | RevPathAcc]),
685 {RestLeafsFound, RevsRemaining} = get_rev_leafs(RestTrees, RevsToGet2, RevPathAcc),
686 {LeafsFound ++ RestLeafsFound, RevsRemaining}
687 end.
689 get_revs([], RevsToGet, _RevPathAcc) ->
690 {[], RevsToGet};
691 get_revs([{RevId, SummaryPtr, SubTrees} | RestTrees], RevsToGet, RevPathAcc) ->
692 RevsToGet2 = RevsToGet -- [RevId],
693 CurrentNodeResult =
694 case RevsToGet2 == RevsToGet of
695 true ->
696 % not in the rev list.
698 false ->
699 % this node is the rev list. return it
700 [{RevId, SummaryPtr, [RevId | RevPathAcc]}]
701 end,
702 {RevsGotten, RevsRemaining} = get_revs(SubTrees, RevsToGet2, [RevId | RevPathAcc]),
703 {RevsGotten2, RevsRemaining2} = get_revs(RestTrees, RevsRemaining, RevPathAcc),
704 {CurrentNodeResult ++ RevsGotten ++ RevsGotten2, RevsRemaining2}.
707 get_all_leafs([], _RevPathAcc) ->
709 get_all_leafs([{RevId, SummaryPtr, []} | RestTrees], RevPathAcc) ->
710 [{RevId, SummaryPtr, [RevId | RevPathAcc]} | get_all_leafs(RestTrees, RevPathAcc)];
711 get_all_leafs([{RevId, _SummaryPtr, SubTrees} | RestTrees], RevPathAcc) ->
712 get_all_leafs(SubTrees, [RevId | RevPathAcc]) ++ get_all_leafs(RestTrees, RevPathAcc).
714 revision_list_to_trees(Doc, RevIds) ->
715 revision_list_to_trees2(Doc, lists:reverse(RevIds)).
717 revision_list_to_trees2(Doc, [RevId]) ->
718 [{RevId, Doc, []}];
719 revision_list_to_trees2(Doc, [RevId | Rest]) ->
720 [{RevId, type_to_summary_ptr(missing), revision_list_to_trees2(Doc, Rest)}] .
722 winning_revision(Trees) ->
723 LeafRevs = get_all_leafs(Trees, []),
724 SortedLeafRevs =
725 lists:sort(fun({RevIdA, SummaryPointerA, PathA}, {RevIdB, SummaryPointerB, PathB}) ->
726 % sort descending by {not deleted, then Depth, then RevisionId}
727 ANotDeleted = summary_ptr_type(SummaryPointerA) /= deletion,
728 BNotDeleted = summary_ptr_type(SummaryPointerB) /= deletion,
729 A = {ANotDeleted, length(PathA), RevIdA},
730 B = {BNotDeleted, length(PathB), RevIdB},
731 A > B
732 end,
733 LeafRevs),
735 [{RevId, SummaryPointer, _} | Rest] = SortedLeafRevs,
737 {ConflictRevTuples, DeletedConflictRevTuples} =
738 lists:splitwith(fun({_ConflictRevId, SummaryPointer1, _}) ->
739 summary_ptr_type(SummaryPointer1) /= deletion
740 end, Rest),
742 ConflictRevs = [RevId1 || {RevId1, _, _} <- ConflictRevTuples],
743 DeletedConflictRevs = [RevId2 || {RevId2, _, _} <- DeletedConflictRevTuples],
745 {RevId, SummaryPointer, ConflictRevs, DeletedConflictRevs}.
747 get_conflict_revs([]) ->
748 {[], []};
749 get_conflict_revs(Trees) ->
750 {_, _, ConflictRevs, DeletedConflictRevs} = winning_revision(Trees),
751 {ConflictRevs, DeletedConflictRevs}.
753 % Flushes to disk any outstanding revisions (document records where summary pointers should be)
754 % and replaces the documents with their SummaryPointers in the returned trees.
756 flush_revision_trees(_Db, []) ->
758 flush_revision_trees(Db, [{RevId, #doc{deleted=true}, SubTrees} | RestTrees]) ->
759 [{RevId, type_to_summary_ptr(deletion), flush_revision_trees(Db, SubTrees)} | flush_revision_trees(Db, RestTrees)];
760 flush_revision_trees(Db, [{RevId, #doc{}=Doc, SubTrees} | RestTrees]) ->
761 % all bins must be flushed stream pointers with the same Fd as this db
762 Bins = [{BinName, {BinType, BinSp, BinLen}} || {BinName, {BinType, {_Fd, BinSp, BinLen}}} <- Doc#doc.attachments],
763 {ok, SummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}),
764 [{RevId, SummaryPointer, flush_revision_trees(Db, SubTrees)} | flush_revision_trees(Db, RestTrees)];
765 flush_revision_trees(Db, [{RevId, SummaryPointer, SubTrees} | RestTrees]) ->
766 [{RevId, SummaryPointer, flush_revision_trees(Db, SubTrees)} | flush_revision_trees(Db, RestTrees)].
770 make_doc(Db, Uuid, SummaryPointer, RevisionPath) ->
771 {BodyData, BinValues} =
772 case summary_ptr_type(SummaryPointer) == disk of
773 true ->
774 {ok, {BodyData0, BinValues0}} = couch_stream:read_term(Db#db.summary_stream, SummaryPointer),
775 {BodyData0, [{Name, {Type, {Db#db.fd, Sp, Len}}} || {Name, {Type, Sp, Len}} <- BinValues0]};
776 false ->
777 {[], []}
778 end,
779 #doc{
780 uuid = Uuid,
781 revisions = RevisionPath,
782 body = BodyData,
783 attachments = BinValues,
784 deleted = (summary_ptr_type(SummaryPointer) == deletion)
787 type_to_summary_ptr(missing) -> 0;
788 type_to_summary_ptr(deletion) -> 1.
790 summary_ptr_type(0) -> missing;
791 summary_ptr_type(1) -> deletion;
792 summary_ptr_type(_Pointer) -> disk.
794 write_summaries(Db, [], InfoBySeqOut, RemoveSeqOut, InfoByUuidOut, DocResultOut) ->
795 {ok, lists:reverse(DocResultOut), lists:reverse(InfoByUuidOut), lists:reverse(RemoveSeqOut),
796 lists:reverse(InfoBySeqOut), Db};
797 write_summaries(Db,
798 [{Uuid, {Docs, Options}, {DiskUpdateSeq, _DiskRevision, DiskSummaryPointer, DiskRevTrees}} | Rest],
799 InfoBySeqOut, RemoveSeqOut, InfoByUuidOut, DocResultOut) ->
800 NewEdits = lists:member(new_edits, Options),
801 {InputRevTrees, OutputRevs} =
802 lists:foldl(fun(#doc{revisions=Revisions}=Doc, {AccTrees, AccRevs}) ->
803 Revisions2 = case NewEdits of
804 true -> [couch_util:rand32() | Revisions]; % add new revision
805 false -> Revisions
806 end,
807 DocRevTree = revision_list_to_trees(Doc, Revisions2),
808 {NewRevTrees, _ConflictCount} = merge_rev_trees(AccTrees, DocRevTree),
809 {NewRevTrees, [lists:nth(1, Revisions2) | AccRevs]}
810 end, {[], []}, Docs),
811 {NewRevTrees, ConflictCount} = merge_rev_trees(DiskRevTrees, InputRevTrees),
812 [FirstDoc|_]=Docs,
813 case NewEdits
814 andalso ConflictCount > 0
815 andalso (summary_ptr_type(DiskSummaryPointer) /= deletion
816 orelse FirstDoc#doc.deleted == true) of
817 true ->
818 DocResultOut2 = [[conflict || _Doc <- Docs] | DocResultOut],
819 write_summaries(Db, Rest, InfoBySeqOut, RemoveSeqOut, InfoByUuidOut, DocResultOut2);
820 false ->
821 FlushedTrees = flush_revision_trees(Db, NewRevTrees),
822 {WinningRevision, WinningSummaryPointer, ConflictRevs, DeletedConflictRevs} = winning_revision(FlushedTrees),
824 OldDiskDocuments = case summary_ptr_type(DiskSummaryPointer) == disk of true -> 1; false -> 0 end,
825 NewDiskDocuments = case summary_ptr_type(WinningSummaryPointer) == disk of true -> 1; false -> 0 end,
827 NewDocCount = Db#db.doc_count + NewDiskDocuments - OldDiskDocuments,
829 UpdateSeq = Db#db.last_update_seq + 1,
831 RemoveSeqOut2 =
832 case DiskUpdateSeq of
833 0 -> RemoveSeqOut;
834 _ -> [DiskUpdateSeq | RemoveSeqOut]
835 end,
837 InfoBySeqOut2 = [{UpdateSeq, {Uuid, WinningRevision, WinningSummaryPointer, ConflictRevs, DeletedConflictRevs}} | InfoBySeqOut],
838 InfoByUuidOut2 = [{Uuid, {UpdateSeq, WinningRevision, WinningSummaryPointer, FlushedTrees}} | InfoByUuidOut],
839 % output an ok and the revid for each successful save
840 DocResultOut2 = [[{ok, OutputRev} || OutputRev <- OutputRevs] | DocResultOut],
841 Db2 = Db#db{last_update_seq = UpdateSeq, uncommitted_writes=true, doc_count=NewDocCount},
842 write_summaries(Db2, Rest, InfoBySeqOut2, RemoveSeqOut2, InfoByUuidOut2, DocResultOut2)
843 end.
845 update_docs_int(Db, DocsOptionsList, Options) ->
846 #db{
847 docinfo_by_uuid_btree = DocInfoByUuidBTree,
848 docinfo_by_seq_btree = DocInfoBySeqBTree,
849 view_group_btree = ViewGroupBTree,
850 local_docs_btree = LocalDocsBtree,
851 view_group_mgr = ViewGroupMgr
852 } = Db,
854 % seperate out the NonRep documents from the rest of the documents
855 {Uuids, DocsOptionsList2, NonRepDocs} =
856 lists:foldl(fun({[#doc{uuid=Uuid}=Doc | Rest], _Options}=DocOptions, {UuidsAcc, DocsOptionsAcc, NonRepDocsAcc}) ->
857 case Uuid of
858 ?NON_REP_DOC_PREFIX ++ _ when Rest==[] ->
859 % when saving NR (non rep) documents, you can only save a single revision
860 {UuidsAcc, DocsOptionsAcc, [Doc | NonRepDocsAcc]};
861 Uuid->
862 {[Uuid | UuidsAcc], [DocOptions | DocsOptionsAcc], NonRepDocsAcc}
864 end, {[], [], []}, DocsOptionsList),
866 {ok, OldDocInfoResults} = couch_btree:lookup(DocInfoByUuidBTree, Uuids),
868 % create a list of {{Docs, UpdateOptions}, RevisionTree} tuples.
869 DocsAndOldDocInfo =
870 lists:zipwith3(fun(DocUuid, DocsOptions, OldDocInfoLookupResult) ->
871 case OldDocInfoLookupResult of
872 {ok, {_Uuid, {DiskUpdateSeq, DiskRevision, DiskSummaryPointer, DiskRevTrees}}} ->
873 {DocUuid, DocsOptions, {DiskUpdateSeq, DiskRevision, DiskSummaryPointer, DiskRevTrees}};
874 {not_found, _} ->
875 {DocUuid, DocsOptions, {0, 0, 0, []}}
877 end,
878 Uuids, DocsOptionsList2, OldDocInfoResults),
880 % now write out the documents
881 {ok, DocResults, InfoByUuid, RemoveSeqList, InfoBySeqList, Db2} =
882 write_summaries(Db, DocsAndOldDocInfo, [], [], [], []),
884 % and the indexes to the documents
885 {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeqList, RemoveSeqList),
886 {ok, DocInfoByUuidBTree2} = couch_btree:add_remove(DocInfoByUuidBTree, InfoByUuid, []),
888 % clear the computed view cache
889 UpdatedUuids = [UpdatedUuid || {UpdatedUuid, _DocInfo} <- InfoByUuid],
890 {ok, ViewGroupBTree2} = couch_btree:add_remove(ViewGroupBTree, [], UpdatedUuids),
892 % now notify the view group manager to discard any of the view groups it has in memory
894 OldDocInfos = lists:map(
895 fun({OldUuid, _Docs, {OldUpdateSeq, OldRev, OldSummaryPointer, OldRevTrees}}) ->
896 {ConflictRevs, DeletedConflictRevs} = get_conflict_revs(OldRevTrees),
897 #doc_info{uuid=OldUuid, update_seq=OldUpdateSeq, revision=OldRev,
898 summary_pointer=OldSummaryPointer, conflict_revs=ConflictRevs, deleted_conflict_revs=DeletedConflictRevs}
899 end,
900 DocsAndOldDocInfo),
902 ok = couch_view_group:free_groups(ViewGroupMgr, OldDocInfos),
904 NRUuidsSummaries =
905 [{NRUuid, NRBody} || #doc{uuid=?NON_REP_DOC_PREFIX ++ NRUuid, body=NRBody, deleted=false} <- NonRepDocs],
907 NRUuidsDelete =
908 [NRUuid || #doc{uuid=?NON_REP_DOC_PREFIX ++ NRUuid, deleted=true} <- NonRepDocs],
910 {ok, LocalDocsBtree2} = couch_btree:add_remove(LocalDocsBtree, NRUuidsSummaries, NRUuidsDelete),
912 NRDocResults = [[{ok, 0}] || _Doc <- NonRepDocs],
914 Db3 = Db2#db{
915 docinfo_by_uuid_btree = DocInfoByUuidBTree2,
916 docinfo_by_seq_btree = DocInfoBySeqBTree2,
917 view_group_btree = ViewGroupBTree2,
918 local_docs_btree = LocalDocsBtree2,
919 uncommitted_writes = true
921 case lists:member(delay_commit, Options) of
922 true ->
923 Db4 = Db3;
924 false ->
925 {ok, Db4} = commit_outstanding(Db3)
926 end,
927 {ok, DocResults ++ NRDocResults, Db4}.
932 commit_outstanding(#db{fd=Fd, uncommitted_writes=true, header=Header} = Db) ->
933 ok = couch_file:sync(Fd), % commit outstanding data
934 Header2 = Header#db_header{
935 last_update_seq = Db#db.last_update_seq,
936 summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
937 docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
938 docinfo_by_uuid_btree_state = couch_btree:get_state(Db#db.docinfo_by_uuid_btree),
939 view_group_btree_state = couch_btree:get_state(Db#db.view_group_btree),
940 local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
941 doc_count = Db#db.doc_count
943 {ok, Header3} = write_header(Fd, Header2),
944 ok = couch_file:sync(Fd), % commit header to disk
945 Db2 = Db#db{
946 uncommitted_writes = false,
947 header = Header3
949 {ok, Db2};
950 commit_outstanding(Db) ->
951 {ok, Db}.
953 write_header(Fd, Header) ->
954 H2 = Header#db_header{write_version = Header#db_header.write_version + 1},
955 % The leading bytes in every db file, the sig and the file version:
956 HeaderPrefix = ?DB_FILE_SIG,
957 FileVersion = <<(?FILE_VERSION):16>>,
958 %the actual header data
959 TermBin = term_to_binary(H2),
960 % the size of all the bytes written to the header, including the md5 signature (16 bytes)
961 FilledSize = size(HeaderPrefix) + size(FileVersion) + size(TermBin) + 16,
962 case FilledSize > ?HEADER_SIZE of
963 true ->
964 % too big!
965 {error, error_header_too_large};
966 false ->
967 % pad out the header with zeros, then take the md5 hash
968 PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize))>>,
969 Sig = erlang:md5([TermBin, PadZeros]),
970 % now we assemble the final header binary and write to disk
971 WriteBin = <<HeaderPrefix/binary, FileVersion/binary, TermBin/binary, PadZeros/binary, Sig/binary>>,
972 ?HEADER_SIZE = size(WriteBin), % sanity check
973 DblWriteBin = [WriteBin, WriteBin],
974 ok = couch_file:pwrite(Fd, 0, DblWriteBin),
975 {ok, H2}
976 end.
979 read_header(FilePath, Fd) ->
980 {ok, Bin} = couch_file:pread(Fd, 0, 2*(?HEADER_SIZE)),
981 <<Bin1:(?HEADER_SIZE)/binary, Bin2:(?HEADER_SIZE)/binary>> = Bin,
982 % read the first header
983 case extract_header(Bin1) of
984 {ok, Header1} ->
985 case extract_header(Bin2) of
986 {ok, Header2} ->
987 case Header1 == Header2 of
988 true ->
989 % Everything is completely normal!
990 {ok, Header1};
991 false ->
992 % To get here we must have two different header versions with signatures intact.
993 % It's weird but possible (a commit failure right at the 2k boundary). Log it.
994 couch_log:info("Header version differences on database open (~s).~nPrimary Header: ~p~nSecondary Header: ~p", [FilePath, Header1, Header2]),
995 case Header1#db_header.write_version > Header2#db_header.write_version of
996 true -> {ok, Header1};
997 false -> {ok, Header2}
999 end;
1000 {error, Error} ->
1001 % error reading second header. It's ok, but log it.
1002 couch_log:info("Secondary header corruption on database open (~s)(error: ~p). Using primary header instead.", [FilePath, Error]),
1003 {ok, Header1}
1004 end;
1005 {error, Error} ->
1006 % error reading primary header
1007 case extract_header(Bin2) of
1008 {ok, Header2} ->
1009 % log corrupt primary header. It's ok since the secondary is still good.
1010 couch_log:info("Primary header corruption on database open (~s)(error: ~p). Using secondary header instead.", [FilePath, Error]),
1011 {ok, Header2};
1012 _ ->
1013 % error reading secondary header too
1014 % return the error, no need to log anything as the caller will be responsible for dealing with the error.
1015 {error, Error}
1017 end.
1020 extract_header(Bin) ->
1021 SizeOfPrefix = size(?DB_FILE_SIG),
1022 SizeOfTermBin = ?HEADER_SIZE -
1023 SizeOfPrefix -
1024 2 - % file version
1025 16, % md5 sig
1027 <<HeaderPrefix:SizeOfPrefix/binary, FileVersion:16, TermBin:SizeOfTermBin/binary, Sig:16/binary>> = Bin,
1029 % check the header prefix
1030 case HeaderPrefix of
1031 ?DB_FILE_SIG ->
1032 % check the file version
1033 case FileVersion of
1034 ?FILE_VERSION ->
1035 % check the integrity signature
1036 case erlang:md5(TermBin) == Sig of
1037 true ->
1038 Header = binary_to_term(TermBin),
1039 #db_header{} = Header, % make sure we decoded to the right record type
1040 {ok, Header};
1041 false ->
1042 {error, header_corrupt}
1043 end;
1044 _ ->
1045 {error, {incompatible_file_version, FileVersion}}
1046 end;
1047 _ ->
1048 {error, unknown_file_type}
1049 end.
1052 code_change(_OldVsn, State, _Extra) ->
1053 {ok, State}.
1055 handle_info(_Info, State) ->
1056 {noreply, State}.