add C lib package
[couchdbimport.git] / CouchProjects / CouchDb / couch_db.erl
blob12eee4448ee9b971b310537814db85b3586edcfb
1 %% CouchDb
2 %% Copyright (C) 2006 Damien Katz
3 %%
4 %% This program is free software; you can redistribute it and/or
5 %% modify it under the terms of the GNU General Public License
6 %% as published by the Free Software Foundation; either version 2
7 %% of the License, or (at your option) any later version.
8 %%
9 %% This program is distributed in the hope that it will be useful,
10 %% but WITHOUT ANY WARRANTY; without even the implied warranty of
11 %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 %% GNU General Public License for more details.
13 %%
14 %% You should have received a copy of the GNU General Public License
15 %% along with this program; if not, write to the Free Software
16 %% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 -module(couch_db).
19 -behaviour(gen_server).
21 -export([test/0, test/1]).
22 -export([open/1,create/1,create/2,save_doc/2,save_doc/3,get_doc_info/2, save_doc_revs/3]).
23 -export([delete_doc/3,open_doc/2,open_doc/3,close/1,enum_docs_since/4,enum_docs_since/5]).
24 -export([enum_docs/4,enum_docs/5, open_doc_revs/4, get_missing_revs/2]).
25 -export([update_table_group_sync/2,update_table_group/2,fold_table/6,fold_table/7,get_info/1]).
26 -export([update_loop/2]).
27 -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
28 -export([revision_list_to_trees/2 , merge_rev_trees/2 ]).
30 -include("couch_db.hrl").
32 -define(HEADER_SIZE, 2048). % size of each segment of the doubly written header
35 -record(db_header,
36 {write_version = 0,
37 last_update_seq = 0,
38 summary_stream_state = nil,
39 docinfo_by_uuid_btree_state = nil,
40 docinfo_by_seq_btree_state = nil,
41 table_group_btree_state = nil,
42 local_docs_btree_state = nil,
43 doc_count=0
44 }).
46 -record(db,
47 {fd=0,
48 supervisor=0,
49 header = #db_header{},
50 uncommitted_writes = false,
51 summary_stream,
52 docinfo_by_uuid_btree,
53 docinfo_by_seq_btree,
54 local_docs_btree,
55 last_update_seq,
56 table_group_btree,
57 doc_count,
58 filepath,
59 table_group_mgr
60 }).
62 -record(main,
63 {db,
64 update_pid,
65 table_group_mgr
66 }).
68 start_link(Filepath, Options) ->
69 {ok, Super} = couch_db_sup:start_link(),
70 FdResult = supervisor:start_child(Super,
71 {couch_file,
72 {couch_file, open, [Filepath, Options]},
73 permanent,
74 brutal_kill,
75 worker,
76 [couch_file]}),
77 case FdResult of
78 {ok, Fd} ->
79 {ok, _Db} = supervisor:start_child(Super,
80 {couch_db,
81 %%%%%% below calls gen_server:start_link(couch_db, {Fd, Super, Options}, []).
82 {gen_server, start_link, [couch_db, {Filepath, Fd, Super, Options}, []]},
83 permanent,
84 10,
85 worker,
86 [couch_db]}),
87 {ok, Super};
88 {error, {enoent, _ChildInfo}} ->
89 % couldn't find file
90 exit(Super,kill),
91 {error, not_found};
92 {error, {Error, _ChildInfo}} ->
93 exit(Super,kill),
94 {error, Error};
95 Else ->
96 exit(Super,kill),
97 {error, Else}
98 end.
100 %%% Interface functions %%%
102 create(Filename) ->
103 create(Filename, []).
105 create(Filename, Options) when is_list(Options) ->
106 start_link(Filename, [create | Options]).
108 open(Filename) ->
109 start_link(Filename, []).
111 delete_doc(SupPid, Uuid, Revisions) ->
112 case open_doc_revs(SupPid, Uuid, Revisions, []) of
113 {ok, DocResults} ->
114 DeletedDocs = [Doc#doc{deleted=true} || {ok, #doc{deleted=false}=Doc} <- DocResults],
115 save_doc_revs(SupPid, DeletedDocs, [new_edits]);
116 Error ->
117 Error
118 end.
124 open_doc(_SupPid, #doc_info{deleted=true}) ->
125 {not_found, deleted};
126 open_doc(SupPid, UuidOrDocInfo) ->
127 open_doc(SupPid, UuidOrDocInfo, []).
129 open_doc(SupPid, Uuid, Options) ->
130 open_doc_int(get_db(db_pid(SupPid)), Uuid, Options).
132 open_doc_revs(SupPid, Uuid, Revs, Options) ->
133 open_doc_revs_int(get_db(db_pid(SupPid)), Uuid, Revs, Options).
135 get_missing_revs(SupPid, UuidRevsList) ->
136 Uuids = [Uuid1 || {Uuid1, _Revs} <- UuidRevsList],
137 {ok, FullDocInfoResults} = get_full_doc_infos(SupPid, Uuids),
138 Results =
139 lists:zipwith( fun({Uuid, Revs}, FullDocInfoResult) ->
140 case FullDocInfoResult of
141 {ok, {_, RevisionTrees}} ->
142 {Uuid, find_missing_revisions(Revs, RevisionTrees)};
143 {not_found, _} ->
144 {Uuid, Revs}
146 end, UuidRevsList, FullDocInfoResults),
147 {ok, Results}.
149 get_doc_info(Db, Uuid) ->
150 case get_full_doc_info(Db, Uuid) of
151 {ok, {DocInfo, _RevisionTrees}} ->
152 {ok, DocInfo};
153 Else ->
154 Else
155 end.
157 get_full_doc_info(Db, Uuid) ->
158 case get_full_doc_infos(Db, [Uuid]) of
159 {ok, [{ok, DocInfo}]} ->
160 {ok, DocInfo};
161 {ok, [{not_found, Uuid}]} ->
162 not_found
163 end.
165 get_full_doc_infos(SupPid, Uuids) when is_pid(SupPid) ->
166 get_full_doc_infos(get_db(db_pid(SupPid)), Uuids);
167 get_full_doc_infos(#db{}=Db, Uuids) ->
168 {ok, LookupResults} = couch_btree:lookup(Db#db.docinfo_by_uuid_btree, Uuids),
169 FinalResults =
170 lists:map(
171 fun({ok, {Uuid, {UpdateSeq, Rev, SummaryPointer, RevisionTrees}}}) ->
172 {Conflicts, DeletedConflicts} = get_conflict_revs(RevisionTrees),
173 {ok,
175 #doc_info
177 uuid=Uuid,
178 revision=Rev,
179 update_seq=UpdateSeq,
180 summary_pointer=SummaryPointer,
181 deleted=(summary_ptr_type(SummaryPointer)==deletion),
182 conflict_revs=Conflicts,
183 deleted_conflict_revs=DeletedConflicts
185 RevisionTrees
188 ({not_found, {Uuid, _}}) ->
189 {not_found, Uuid}
190 end, LookupResults),
192 {ok, FinalResults}.
194 get_info(SupPid) ->
195 gen_server:call(db_pid(SupPid), get_info).
197 save_doc(SupPid, Doc) ->
198 save_doc(SupPid, Doc, []).
200 save_doc(SupPid, Doc, Options) ->
201 {ok, [Result]} = save_doc_revs(SupPid, [Doc], [new_edits | Options]),
202 case Result of
203 {ok, NewRev} ->
204 {ok, NewRev};
205 {conflict, _} ->
206 conflict
207 end.
209 save_doc_revs(SupPid, Docs, Options) ->
210 {ok, [RetValue]} = gen_server:call(db_pid(SupPid), {update_docs, [{Docs, Options}], []}),
211 {ok, RetValue}.
213 enum_docs_since(SupPid, SinceSeq, Direction, InFun, Ctx) ->
214 Db = get_db(db_pid(SupPid)),
215 EnumFun = fun({UpdateSeq, {Uuid, Rev, SummaryPointer, ConflictRevs, DeletedConflictRevs}}, EnumCtx) ->
216 DocInfo = #doc_info{
217 uuid = Uuid,
218 revision = Rev,
219 update_seq = UpdateSeq,
220 summary_pointer = SummaryPointer,
221 conflict_revs = ConflictRevs,
222 deleted_conflict_revs = DeletedConflictRevs,
223 deleted = (summary_ptr_type(SummaryPointer) == deletion)
225 InFun(DocInfo, EnumCtx)
226 end,
228 couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, EnumFun, Ctx).
230 enum_docs_since(SupPid, SinceSeq, InFun, Ctx) ->
231 enum_docs_since(SupPid, SinceSeq, fwd, InFun, Ctx).
233 enum_docs(SupPid, StartUuid, Direction, InFun, Ctx) ->
234 Db = get_db(db_pid(SupPid)),
235 EnumFun = fun({Uuid, {UpdateSeq, Rev, SummaryPointer, RevTrees}}, EnumCtx) ->
236 {ConflictRevs, DeletedConflictRevs} = get_conflict_revs(RevTrees),
237 DocInfo = #doc_info{
238 uuid = Uuid,
239 revision = Rev,
240 update_seq = UpdateSeq,
241 summary_pointer = SummaryPointer,
242 deleted = (summary_ptr_type(SummaryPointer) == deletion),
243 conflict_revs = ConflictRevs,
244 deleted_conflict_revs = DeletedConflictRevs
246 InFun(DocInfo, EnumCtx)
247 end,
248 couch_btree:fold(Db#db.docinfo_by_uuid_btree, StartUuid, Direction, EnumFun, Ctx).
250 enum_docs(SupPid, StartUuid, InFun, Ctx) ->
251 enum_docs(SupPid, StartUuid, fwd, InFun, Ctx).
254 update_table_group(SupPid, TableGroupDocUuid) ->
255 gen_server:call(db_pid(SupPid), {update_table_group, TableGroupDocUuid, fun(_Whatever) -> ok end}).
258 sync_update_notify(Pid, Ref, partial) ->
259 % We want to wait until complete
260 % so return a fun that calls ourself
261 fun(Status)-> sync_update_notify(Pid, Ref, Status) end;
262 sync_update_notify(Pid, Ref, complete) ->
263 Pid ! {Ref, ok};
264 sync_update_notify(Pid, Ref, Else) ->
265 Pid ! {Ref, Else}.
268 update_table_group_sync(SupPid, TableGroupDocUuid) ->
269 Pid = self(),
270 Ref = make_ref(),
271 UpdateFun = fun(Status)-> sync_update_notify(Pid, Ref, Status) end,
272 case gen_server:call(db_pid(SupPid), {update_table_group, TableGroupDocUuid, UpdateFun}, infinity) of
273 ok ->
274 receive
275 {Ref, Result} ->
276 Result
277 end;
278 Else ->
279 Else
280 end.
282 fold_table(SupPid, TableGroupDocUuid, TableName, Dir, Fun, Acc) ->
283 case gen_server:call(db_pid(SupPid), {get_table_group, TableGroupDocUuid}) of
284 {ok, TableGroup} ->
285 couch_table_group:fold(TableGroup, TableName, Dir, Fun, Acc);
286 Else ->
287 Else
288 end.
290 fold_table(SupPid, TableGroupDocUuid, TableName, StartKey, Dir, Fun, Acc) ->
291 case gen_server:call(db_pid(SupPid), {get_table_group, TableGroupDocUuid}) of
292 {ok, TableGroup} ->
293 couch_table_group:fold(TableGroup, TableName, StartKey, Dir, Fun, Acc);
294 Else ->
295 Else
296 end.
298 close(SupPid) ->
299 Ref = erlang:monitor(process, SupPid),
300 unlink(SupPid),
301 exit(SupPid, normal),
302 receive
303 {'DOWN', Ref, process, SupPid, _Reason} ->
305 end.
309 % server functions
311 init({Filepath, Fd, Supervisor, Options}) ->
312 case lists:member(create, Options) of
313 true ->
314 init_main(Filepath, Fd, Supervisor, nil);
315 false ->
316 {ok, Header} = read_header(Fd),
317 init_main(Filepath, Fd, Supervisor, Header)
318 end.
321 init_main(Filepath, Fd, Supervisor, nil) ->
322 % creates a new header and writes it to the file
323 {ok, _} = couch_file:expand(Fd, 2*(?HEADER_SIZE)),
324 Header = #db_header{},
325 {ok, Header2} = write_header(Fd, Header),
326 ok = couch_file:sync(Fd),
327 init_main(Filepath, Fd, Supervisor, Header2);
328 init_main(Filepath, Fd, Supervisor, Header) ->
329 {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd),
330 {ok, UuidBtree} = couch_btree:open(Header#db_header.docinfo_by_uuid_btree_state, Fd),
331 {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd),
332 {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd),
333 {ok, TableGroupBtree} = couch_btree:open(Header#db_header.table_group_btree_state, Fd),
335 Db = #db{
336 fd=Fd,
337 supervisor=Supervisor,
338 header=Header,
339 summary_stream = SummaryStream,
340 docinfo_by_uuid_btree = UuidBtree,
341 docinfo_by_seq_btree = SeqBtree,
342 local_docs_btree = LocalDocsBtree,
343 last_update_seq = Header#db_header.last_update_seq,
344 table_group_btree = TableGroupBtree,
345 doc_count = Header#db_header.doc_count,
346 filepath = Filepath
349 UpdatePid = spawn_link(couch_db, update_loop, [self(), Db]),
351 Pid=self(),
353 GetTableGroupInfoFun = fun(GroupKey) ->
354 get_table_group_info(get_db(Pid), GroupKey)
355 end,
357 UpdateTableGroupInfoFun = fun(GroupKey, UpdateStatus, GroupInfo) ->
358 UpdatePid ! {table_group_updated, GroupKey, UpdateStatus, GroupInfo},
360 end,
361 {ok, TableMgr} = couch_table_group:start_manager(Supervisor, Fd, GetTableGroupInfoFun, UpdateTableGroupInfoFun),
363 UpdatePid ! {set_table_group_mgr, TableMgr},
365 {ok, #main{db=Db, update_pid=UpdatePid, table_group_mgr=TableMgr}}.
367 terminate(_Reason, #main{db=Db} = Main) ->
368 Main#main.update_pid ! close,
369 couch_table_group:stop(Main#main.table_group_mgr),
370 couch_file:close(Db#db.fd).
372 handle_call({get_table_group, TableGroupDocUuid}, From, #main{db=Db}=Main) ->
373 case get_doc_info(Db, TableGroupDocUuid) of
374 {ok, #doc_info{deleted=true}} ->
375 {reply, {not_found, deleted}, Main};
376 {ok, DocInfo} ->
377 ok = couch_table_group:get_group_async(Main#main.table_group_mgr, DocInfo, From),
378 {noreply, Main};
379 not_found ->
380 {reply, {not_found, missing}, Main}
381 end;
382 handle_call({update_docs, DocActions, Options}, From, Main) ->
383 Main#main.update_pid ! {From, update_docs, DocActions, Options},
384 {noreply, Main};
385 handle_call(get_db, _From, #main{db=Db}=Main) ->
386 {reply, {ok, Db}, Main};
387 handle_call(get_info, _From, #main{db=Db}=Main) ->
388 InfoList = [
389 {doc_count, Db#db.doc_count},
390 {last_update_seq, Db#db.last_update_seq}
392 {reply, {ok, InfoList}, Main};
393 handle_call({update_table_group, Uuid, UpdateNotifFun}, _From, #main{db=Db}=Main) ->
394 case get_doc_info(Db, Uuid) of
395 {ok, DocInfo} ->
396 ok = couch_table_group:update_group(Main#main.table_group_mgr, DocInfo, UpdateNotifFun),
397 {reply, ok, Main};
398 Error ->
399 {reply, Error, Main}
400 end.
403 handle_cast({db_updated, NewDb}, Main) ->
404 {noreply, Main#main{db=NewDb}}.
406 %%% Internal function %%%
409 db_pid(SupPid)->
410 {error, {already_started, DbPid}} = supervisor:start_child(SupPid,
411 {couch_db,
412 {couch_db, sup_start_link, []},
413 permanent,
414 brutal_kill,
415 worker,
416 [couch_db]}),
417 DbPid.
420 update_loop(MainPid, Db) ->
421 receive
422 {set_table_group_mgr, TableMgr} ->
423 update_loop(MainPid, Db#db{table_group_mgr=TableMgr});
424 {OrigFrom, update_docs, DocActions, Options} ->
425 {ok, DocResults, Db2} = update_docs_int(Db, DocActions, Options),
426 ok = gen_server:cast(MainPid, {db_updated, Db2}),
427 gen_server:reply(OrigFrom, {ok, DocResults}),
428 update_loop(MainPid, Db2);
429 {table_group_updated, #doc_info{uuid=Uuid}=GroupDocInfo, _UpdateStatus, TableGroupInfo} ->
430 case get_doc_info(Db, GroupDocInfo#doc_info.uuid) of
431 {ok, GroupDocInfo} ->
432 % revision on disk matches the revision of the table group being updated
433 % so we save the info to disk
434 {ok, GroupBtree2} = couch_btree:add_remove(Db#db.table_group_btree, [{Uuid, TableGroupInfo}], []),
435 Db2 = Db#db{table_group_btree=GroupBtree2, uncommitted_writes=true},
436 {ok, Db3} = commit_outstanding(Db2),
437 ok = gen_server:cast(MainPid, {db_updated, Db3}),
438 update_loop(MainPid, Db3);
439 _Else ->
440 % doesn't match, don't save in btree
441 update_loop(MainPid, Db)
442 end;
443 close ->
444 % terminate loop
446 end.
448 get_table_group_info(#db{}=Db, #doc_info{uuid=Uuid}=DocInfo) ->
449 case couch_btree:lookup_single(Db#db.table_group_btree, Uuid) of
450 {ok, {TableQueries, TableGroupState}} ->
451 {ok, {TableQueries, TableGroupState}};
452 not_found ->
453 {ok, Doc} = open_doc_int(Db, DocInfo, []),
454 case couch_doc:get_table_queries(Doc) of
455 [] ->
456 {not_found, no_tables_found};
457 TableQueries ->
458 {ok, {TableQueries, nil}}
460 end.
462 get_db(DbPid) ->
463 {ok, Db} = gen_server:call(DbPid, get_db),
466 open_doc_revs_int(Db, Uuid, Revs, Options) ->
467 case get_full_doc_info(Db, Uuid) of
468 {ok, {_DocInfo, RevisionTree}} ->
469 {FoundRevs, MissingRevs} =
470 case Revs of
471 all ->
472 {get_all_leafs(RevisionTree, []), []};
473 _ ->
474 case lists:member(latest, Options) of
475 true ->
476 get_rev_leafs(RevisionTree, Revs, []);
477 false ->
478 get_revs(RevisionTree, Revs, [])
480 end,
481 FoundResults =
482 lists:map(fun({Rev, FoundSummaryPtr, FoundRevPath}) ->
483 case summary_ptr_type(FoundSummaryPtr) of
484 missing ->
485 % we have the rev in our but know nothing about it
486 {{not_found, missing}, Rev};
487 deletion ->
488 {ok, make_doc(Db, Uuid, FoundSummaryPtr, FoundRevPath)};
489 disk ->
490 {ok, make_doc(Db, Uuid, FoundSummaryPtr, FoundRevPath)}
492 end, FoundRevs),
493 Results = FoundResults ++ [{{not_found, missing}, MissingRev} || MissingRev <- MissingRevs],
494 {ok, Results};
495 not_found when Revs == all ->
496 {ok, []};
497 not_found ->
498 {ok, [{{not_found, missing}, Rev} || Rev <- Revs]}
499 end.
501 open_doc_int(Db, ?NON_REP_DOC_PREFIX ++ Uuid, _Options) ->
502 case couch_btree:lookup_single(Db#db.local_docs_btree, Uuid) of
503 {ok, SummaryFields} ->
504 {ok, #doc{uuid=?NON_REP_DOC_PREFIX ++ Uuid, summary_fields=dict:from_list(SummaryFields)}};
505 not_found ->
506 {not_found, missing}
507 end;
508 open_doc_int(Db, #doc_info{revision=Rev,summary_pointer=SummaryPointer}=DocInfo, Options) ->
509 open_doc_int(Db, {DocInfo, [{Rev, SummaryPointer, []}]}, Options);
510 open_doc_int(Db, {#doc_info{uuid=Uuid,revision=Rev,summary_pointer=SummaryPointer,deleted=Deleted}, RevisionTree}, Options) ->
511 case (not Deleted) orelse lists:member(allow_stub, Options) of
512 true ->
513 {[{_,_, RevPath}], []} = get_revs(RevisionTree, [Rev], []),
514 {ok, make_doc(Db, Uuid, SummaryPointer, RevPath)};
515 false ->
516 {not_found, deleted}
517 end;
518 open_doc_int(Db, Uuid, Options) ->
519 case get_full_doc_info(Db, Uuid) of
520 {ok, {DocInfo, RevisionTree}} ->
521 open_doc_int(Db, {DocInfo, RevisionTree}, Options);
522 not_found ->
523 {not_found, missing}
524 end.
526 % revision tree functions
528 merge_rev_trees([], B) ->
529 {B, 0};
530 merge_rev_trees(A, []) ->
531 {A, 0};
532 merge_rev_trees([ATree | ANextTree], [BTree | BNextTree]) ->
533 {ARev, ADoc, ASubTrees} = ATree,
534 {BRev, _BDoc, BSubTrees} = BTree,
536 ARev == BRev ->
537 %same rev
538 {MergedSubTrees, SubTreesConflicts} = merge_rev_trees(ASubTrees, BSubTrees),
539 {MergedNextTrees, NextConflicts} = merge_rev_trees(ANextTree, BNextTree),
540 {[{ARev, ADoc, MergedSubTrees} | MergedNextTrees], SubTreesConflicts + NextConflicts};
541 ARev < BRev ->
542 {Merged, Conflicts} = merge_rev_trees(ANextTree, [BTree | BNextTree]),
543 {[ATree | Merged], Conflicts};
544 true ->
545 {Merged, Conflicts} = merge_rev_trees([ATree | ANextTree], BNextTree),
546 {[BTree | Merged], Conflicts + 1}
547 end.
549 find_missing_revisions([], _Trees) ->
551 find_missing_revisions(SrcRevs, []) ->
552 SrcRevs;
553 find_missing_revisions(SrcRevs, [{RevId, _, SubTrees} | RestTrees]) ->
554 SrcRevs2 = lists:delete(RevId, SrcRevs),
555 SrcRevs3 = find_missing_revisions(SrcRevs2, SubTrees),
556 find_missing_revisions(SrcRevs3, RestTrees).
558 get_rev_leafs(_Trees, [], _RevPathAcc) ->
559 {[], []};
560 get_rev_leafs([], RevsToGet, _RevPathAcc) ->
561 {[], RevsToGet};
562 get_rev_leafs([{RevId, _SummaryPtr, SubTrees}=Tree | RestTrees], RevsToGet, RevPathAcc) ->
563 case lists:member(RevId, RevsToGet) of
564 true -> % found it
565 LeafsFound = get_all_leafs([Tree], RevPathAcc),
566 LeafRevsFound = [LeafRevFound || {LeafRevFound, _, _} <- LeafsFound],
567 RevsToGet2 = RevsToGet -- LeafRevsFound,
568 {RestLeafsFound, RevsRemaining} = get_revs(RestTrees, RevsToGet2, RevPathAcc),
569 {LeafsFound ++ RestLeafsFound, RevsRemaining};
570 false ->
571 {LeafsFound, RevsToGet2} = get_revs(SubTrees, RevsToGet, [RevId | RevPathAcc]),
572 {RestLeafsFound, RevsRemaining} = get_revs(RestTrees, RevsToGet2, RevPathAcc),
573 {LeafsFound ++ RestLeafsFound, RevsRemaining}
574 end.
576 get_revs([], RevsToGet, _RevPathAcc) ->
577 {[], RevsToGet};
578 get_revs([{RevId, SummaryPtr, SubTrees} | RestTrees], RevsToGet, RevPathAcc) ->
579 RevsToGet2 = RevsToGet -- [RevId],
580 CurrentNodeResult =
581 case RevsToGet2 == RevsToGet of
582 true ->
583 % not in the rev list.
585 false ->
586 % this node is the rev list. return it
587 [{RevId, SummaryPtr, [RevId | RevPathAcc]}]
588 end,
589 {RevsGotten, RevsRemaining} = get_revs(SubTrees, RevsToGet2, [RevId | RevPathAcc]),
590 {RevsGotten2, RevsRemaining2} = get_revs(RestTrees, RevsRemaining, RevPathAcc),
591 {CurrentNodeResult ++ RevsGotten ++ RevsGotten2, RevsRemaining2}.
594 get_all_leafs([], _RevPathAcc) ->
596 get_all_leafs([{RevId, SummaryPtr, []} | RestTrees], RevPathAcc) ->
597 [{RevId, SummaryPtr, [RevId | RevPathAcc]} | get_all_leafs(RestTrees, RevPathAcc)];
598 get_all_leafs([{RevId, _SummaryPtr, SubTrees} | RestTrees], RevPathAcc) ->
599 get_all_leafs(SubTrees, [RevId | RevPathAcc]) ++ get_all_leafs(RestTrees, RevPathAcc).
601 revision_list_to_trees(Doc, RevIds) ->
602 revision_list_to_trees2(Doc, lists:reverse(RevIds)).
604 revision_list_to_trees2(Doc, [RevId]) ->
605 [{RevId, Doc, []}];
606 revision_list_to_trees2(Doc, [RevId | Rest]) ->
607 [{RevId, type_to_summary_ptr(missing), revision_list_to_trees2(Doc, Rest)}] .
609 winning_revision(Trees) ->
610 LeafRevs = get_all_leafs(Trees, []),
611 SortedLeafRevs =
612 lists:sort(fun({RevIdA, SummaryPointerA, PathA}, {RevIdB, SummaryPointerB, PathB}) ->
613 % sort descending by {not deleted, then Depth, then RevisionId}
614 ANotDeleted = summary_ptr_type(SummaryPointerA) /= deletion,
615 BNotDeleted = summary_ptr_type(SummaryPointerB) /= deletion,
616 A = {ANotDeleted, length(PathA), RevIdA},
617 B = {BNotDeleted, length(PathB), RevIdB},
618 A > B
619 end,
620 LeafRevs),
622 [{RevId, SummaryPointer, _} | Rest] = SortedLeafRevs,
624 {ConflictRevTuples, DeletedConflictRevTuples} =
625 lists:splitwith(fun({_ConflictRevId, SummaryPointer1, _}) ->
626 summary_ptr_type(SummaryPointer1) /= deletion
627 end, Rest),
629 ConflictRevs = [RevId1 || {RevId1, _, _} <- ConflictRevTuples],
630 DeletedConflictRevs = [RevId2 || {RevId2, _, _} <- DeletedConflictRevTuples],
632 {RevId, SummaryPointer, ConflictRevs, DeletedConflictRevs}.
634 get_conflict_revs([]) ->
635 {[], []};
636 get_conflict_revs(Trees) ->
637 {_, _, ConflictRevs, DeletedConflictRevs} = winning_revision(Trees),
638 {ConflictRevs, DeletedConflictRevs}.
640 % Flushes to disk any outstanding revisions (document records where summary pointers should be)
641 % and replaces the documents with their SummaryPointers in the returned trees.
643 flush_revision_trees(_Db, []) ->
645 flush_revision_trees(Db, [{RevId, #doc{deleted=true}, SubTrees} | RestTrees]) ->
646 [{RevId, type_to_summary_ptr(deletion), flush_revision_trees(Db, SubTrees)} | flush_revision_trees(Db, RestTrees)];
647 flush_revision_trees(Db, [{RevId, #doc{summary_fields=Fields}, SubTrees} | RestTrees]) ->
648 {ok, SummaryPointer} = couch_stream:write_term(Db#db.summary_stream, dict:to_list(Fields)),
649 [{RevId, SummaryPointer, flush_revision_trees(Db, SubTrees)} | flush_revision_trees(Db, RestTrees)];
650 flush_revision_trees(Db, [{RevId, SummaryPointer, SubTrees} | RestTrees]) ->
651 [{RevId, SummaryPointer, flush_revision_trees(Db, SubTrees)} | flush_revision_trees(Db, RestTrees)].
655 make_doc(Db, Uuid, SummaryPointer, RevisionPath) ->
656 SummaryFields =
657 case summary_ptr_type(SummaryPointer) == disk of
658 true ->
659 {ok, SummaryFields2} = couch_stream:read_term(Db#db.summary_stream, SummaryPointer),
660 SummaryFields2;
661 false ->
663 end,
664 #doc{
665 uuid = Uuid,
666 revisions = RevisionPath,
667 summary_fields = dict:from_list(SummaryFields),
668 deleted = (summary_ptr_type(SummaryPointer) == deletion)
673 type_to_summary_ptr(missing) ->
675 type_to_summary_ptr(deletion) ->
678 summary_ptr_type(0) ->
679 missing;
680 summary_ptr_type(1) ->
681 deletion;
682 summary_ptr_type(_Pointer) ->
683 disk.
685 write_summaries(Db, [], InfoBySeqOut, RemoveSeqOut, InfoByUuidOut, DocResultOut) ->
686 {ok, lists:reverse(DocResultOut), lists:reverse(InfoByUuidOut), lists:reverse(RemoveSeqOut),
687 lists:reverse(InfoBySeqOut), Db};
688 write_summaries(Db, [{Uuid, {Docs, Options}, {DiskUpdateSeq, _DiskRevision, DiskSummaryPointer, DiskRevTrees}} | Rest],
689 InfoBySeqOut, RemoveSeqOut, InfoByUuidOut, DocResultOut) ->
690 NewEdits = lists:member(new_edits, Options),
691 {InputRevTrees, OutputRevs} =
692 lists:foldl(fun(#doc{revisions=Revisions}=Doc, {AccTrees, AccRevs}) ->
693 Revisions2 = case NewEdits of
694 true -> [couch_util:rand32() | Revisions]; % add new revision
695 false -> Revisions
696 end,
697 DocRevTree = revision_list_to_trees(Doc, Revisions2),
698 {NewRevTrees, _ConflictCount} = merge_rev_trees(AccTrees, DocRevTree),
699 {NewRevTrees, [lists:nth(1, Revisions2) | AccRevs]}
700 end, {[], []}, Docs),
702 {NewRevTrees, ConflictCount} = merge_rev_trees(DiskRevTrees, InputRevTrees),
704 case NewEdits andalso ConflictCount > 0 of
705 true ->
706 % don't save
707 DocResultOut2 = [{conflict, Rev} || #doc{revisions=[Rev|_]} <- Docs],
708 write_summaries(Db, Rest, InfoBySeqOut, RemoveSeqOut, InfoByUuidOut, DocResultOut2);
709 false ->
710 FlushedTrees = flush_revision_trees(Db, NewRevTrees),
711 {WinningRevision, WinningSummaryPointer, ConflictRevs, DeletedConflictRevs} = winning_revision(FlushedTrees),
713 OldDiskDocuments = case summary_ptr_type(DiskSummaryPointer) == disk of true -> 1; false -> 0 end,
714 NewDiskDocuments = case summary_ptr_type(WinningSummaryPointer) == disk of true -> 1; false -> 0 end,
716 NewDocCount = Db#db.doc_count + NewDiskDocuments - OldDiskDocuments,
718 UpdateSeq = Db#db.last_update_seq + 1,
720 RemoveSeqOut2 =
721 case DiskUpdateSeq of
722 0 -> RemoveSeqOut;
723 _ -> [DiskUpdateSeq | RemoveSeqOut]
724 end,
726 InfoBySeqOut2 = [{UpdateSeq, {Uuid, WinningRevision, WinningSummaryPointer, ConflictRevs, DeletedConflictRevs}} | InfoBySeqOut],
727 InfoByUuidOut2 = [{Uuid, {UpdateSeq, WinningRevision, WinningSummaryPointer, FlushedTrees}} | InfoByUuidOut],
728 % output an ok and the revid for each successful save
729 DocResultOut2 = [[{ok, OutputRev} || OutputRev <- OutputRevs] | DocResultOut],
730 Db2 = Db#db{last_update_seq = UpdateSeq, uncommitted_writes=true, doc_count=NewDocCount},
731 write_summaries(Db2, Rest, InfoBySeqOut2, RemoveSeqOut2, InfoByUuidOut2, DocResultOut2)
732 end.
734 update_docs_int(Db, DocsOptionsList, Options) ->
735 #db{
736 docinfo_by_uuid_btree = DocInfoByUuidBTree,
737 docinfo_by_seq_btree = DocInfoBySeqBTree,
738 table_group_btree = TableGroupBTree,
739 local_docs_btree = LocalDocsBtree,
740 table_group_mgr = TableGroupMgr
741 } = Db,
743 % seperate out the NonRep documents from the rest of the documents
744 {Uuids, DocsOptionsList2, NonRepDocs} =
745 lists:foldl(fun({[#doc{uuid=Uuid}=Doc | Rest], _Options}=DocOptions, {UuidsAcc, DocsOptionsAcc, NonRepDocsAcc}) ->
746 case Uuid of
747 ?NON_REP_DOC_PREFIX ++ _ when Rest==[] ->
748 {UuidsAcc, DocsOptionsAcc, [Doc | NonRepDocsAcc]};
749 Uuid->
750 {[Uuid | UuidsAcc], [DocOptions | DocsOptionsAcc], NonRepDocsAcc}
752 end, {[], [], []}, DocsOptionsList),
754 {ok, OldDocInfoResults} = couch_btree:lookup(DocInfoByUuidBTree, Uuids),
756 % create a list of {{Docs, UpdateOptions}, RevisionTree} tuples.
757 DocsAndOldDocInfo =
758 lists:zipwith3(fun(DocUuid, DocsOptions, OldDocInfoLookupResult) ->
759 case OldDocInfoLookupResult of
760 {ok, {_Uuid, {DiskUpdateSeq, DiskRevision, DiskSummaryPointer, DiskRevTrees}}} ->
761 {DocUuid, DocsOptions, {DiskUpdateSeq, DiskRevision, DiskSummaryPointer, DiskRevTrees}};
762 {not_found, _} ->
763 {DocUuid, DocsOptions, {0, 0, 0, []}}
765 end,
766 Uuids, DocsOptionsList2, OldDocInfoResults),
768 % now write out the documents
769 {ok, DocResults, InfoByUuid, RemoveSeqList, InfoBySeqList, Db2} =
770 write_summaries(Db, DocsAndOldDocInfo, [], [], [], []),
772 % and the indexes to the documents
773 {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeqList, RemoveSeqList),
774 {ok, DocInfoByUuidBTree2} = couch_btree:add_remove(DocInfoByUuidBTree, InfoByUuid, []),
776 % clear the computed table cache
777 UpdatedUuids = [UpdatedUuid || {UpdatedUuid, _DocInfo} <- InfoByUuid],
778 {ok, TableGroupBTree2} = couch_btree:add_remove(TableGroupBTree, [], UpdatedUuids),
780 % now notify the table group manager to discard any of the table groups it has in memory
782 OldDocInfos =
783 lists:map(fun({OldUuid, _Docs, {OldUpdateSeq, OldRev, OldSummaryPointer, OldRevTrees}}) ->
784 {ConflictRevs, DeletedConflictRevs} = get_conflict_revs(OldRevTrees),
785 #doc_info{uuid=OldUuid, update_seq=OldUpdateSeq, revision=OldRev,
786 summary_pointer=OldSummaryPointer, conflict_revs=ConflictRevs, deleted_conflict_revs=DeletedConflictRevs}
787 end, DocsAndOldDocInfo),
789 ok = couch_table_group:free_groups(TableGroupMgr, OldDocInfos),
791 NRUuidsSummaries =
792 [{NRUuid, dict:to_list(NRFields)} || #doc{uuid=?NON_REP_DOC_PREFIX ++ NRUuid, summary_fields=NRFields, deleted=false} <- NonRepDocs],
794 NRUuidsDelete =
795 [NRUuid || #doc{uuid=?NON_REP_DOC_PREFIX ++ NRUuid, deleted=true} <- NonRepDocs],
797 {ok, LocalDocsBtree2} = couch_btree:add_remove(LocalDocsBtree, NRUuidsSummaries, NRUuidsDelete),
799 NRDocResults = [[{ok, 0}] || _Doc <- NonRepDocs],
801 Db3 = Db2#db{
802 docinfo_by_uuid_btree = DocInfoByUuidBTree2,
803 docinfo_by_seq_btree = DocInfoBySeqBTree2,
804 table_group_btree = TableGroupBTree2,
805 local_docs_btree = LocalDocsBtree2,
806 uncommitted_writes = true
808 case lists:member(delay_commit, Options) of
809 true ->
810 Db4 = Db3;
811 false ->
812 {ok, Db4} = commit_outstanding(Db3)
813 end,
814 {ok, DocResults ++ NRDocResults, Db4}.
819 commit_outstanding(#db{fd=Fd, uncommitted_writes=true, header=Header} = Db) ->
820 ok = couch_file:sync(Fd), % commit outstanding data
821 Header2 = Header#db_header{
822 last_update_seq = Db#db.last_update_seq,
823 summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
824 docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
825 docinfo_by_uuid_btree_state = couch_btree:get_state(Db#db.docinfo_by_uuid_btree),
826 table_group_btree_state = couch_btree:get_state(Db#db.table_group_btree),
827 local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
828 doc_count = Db#db.doc_count
830 {ok, Header3} = write_header(Fd, Header2),
831 ok = couch_file:sync(Fd), % commit header to disk
832 Db2 = Db#db{
833 uncommitted_writes = false,
834 header = Header3
836 {ok, Db2};
837 commit_outstanding(#db{uncommitted_writes=false} = Db) ->
838 {ok, Db}.
840 write_header(Fd, Header) ->
841 H2 = Header#db_header{write_version = Header#db_header.write_version + 1},
842 Bin = term_to_binary(H2),
843 case size(Bin) > ?HEADER_SIZE of
844 true ->
845 % too big
846 {error, error_header_too_large};
847 false ->
848 ok = couch_file:pwrite(Fd,0,Bin),
849 ok = couch_file:pwrite(Fd,?HEADER_SIZE,Bin),
850 {ok, H2}
851 end.
854 read_header(Fd) ->
855 case read_header(Fd,0) of
856 {ok, Header} ->
857 {ok, Header};
858 _Error ->
859 read_header(Fd,?HEADER_SIZE)
860 end.
862 read_header(Fd,Offset) ->
863 {ok, Bin} = couch_file:pread(Fd,Offset,?HEADER_SIZE),
864 case binary_to_term(Bin) of
865 Header when is_record(Header, db_header) ->
866 {ok, Header};
867 Else ->
868 {error, {unknown_header_type, Else}}
869 end.
872 code_change(_OldVsn, State, _Extra) ->
873 {ok, State}.
875 handle_info(_Info, State) ->
876 {noreply, State}.
878 test_restart() ->
879 couch_server:stop(),
880 timer:sleep(100),
881 couch_server:start().
883 test() ->
884 test(100).
886 test(N) ->
887 couch_server:start(),
888 io:format("foo~n"),
889 {ok, Db} = couch_server:create("foo", [overwrite]),
891 TableGroupDoc0 = couch_doc:new(),
892 TableGroupDoc1 = TableGroupDoc0#doc{uuid = "TestTable"},
893 TableGroupDoc2 = couch_doc:set_field(TableGroupDoc1, "$table_foo", ["SELECT Foo != \"\"; COLUMN Foo; COLUMN BAR"]),
894 TableGroupDoc3 = couch_doc:set_field(TableGroupDoc2, "$table_bar", ["SELECT *; COLUMN Bar; COLUMN foo"]),
895 TableGroupDoc4 = couch_doc:set_field(TableGroupDoc3, "bar", ["BAR!!!"]),
896 {ok, NewRevId} = save_doc(Db, TableGroupDoc4),
897 {ok, _NewRevId2} = save_doc(Db, couch_doc:add_rev(TableGroupDoc4, NewRevId)),
899 ok = update_table_group_sync(Db, TableGroupDoc1#doc.uuid),
900 DumpTableFun = fun({_Uuid, _Rev}, Keys, ColumnValues, _Offset, _TotalTableCount, Count) -> io:format("Keys:~p, ColumnValues:~p~n", [Keys,ColumnValues]), {ok, Count + 1} end,
901 {ok, OfficalCount, TotalCount} = fold_table(Db, TableGroupDoc1#doc.uuid, "foo", nil, fwd, DumpTableFun, 0),
902 OfficalCount = TotalCount,
903 io:format("Total rows dumped: ~w~n", [TotalCount]),
905 test_restart(),
907 {ok, Db2} = couch_server:open("foo"),
908 {ok, Doc5} = open_doc(Db2, TableGroupDoc1#doc.uuid),
909 {ok, FooValue2} = couch_doc:get_field(Doc5, "bar"),
910 io:format("bar: ~p~n", [FooValue2]),
912 {ok, UuidsAndValues} = test_write_docs(N, Db2, []),
914 {ok, OfficalCount2, TotalCount2} = fold_table(Db2, TableGroupDoc1#doc.uuid, "foo", nil, fwd, DumpTableFun, 0),
915 OfficalCount2 = TotalCount2,
916 io:format("Total rows dumped: ~w~n", [TotalCount2]),
918 time_action(fun() ->
919 ok = update_table_group_sync(Db2, TableGroupDoc1#doc.uuid)
920 end),
923 io:format("database info: ~p~n", [get_info(Db2)]),
925 {ok, OfficalCount3, TotalCount3} = fold_table(Db2, TableGroupDoc1#doc.uuid, "foo", nil, fwd, DumpTableFun, 0),
926 OfficalCount3 = TotalCount3,
927 io:format("Total rows dumped: ~w~n", [TotalCount3]),
929 UuidsAndValues2 = test_delete_docs(Db2, UuidsAndValues, []),
931 ok = update_table_group_sync(Db2, TableGroupDoc1#doc.uuid),
933 {ok, OfficalCount4, TotalCount4} = fold_table(Db2, TableGroupDoc1#doc.uuid, "foo", nil, fwd, DumpTableFun, 0),
934 OfficalCount4 = TotalCount4,
935 io:format("Total rows dumped: ~w~n", [TotalCount4]),
938 check_delete_docs(Db2, UuidsAndValues),
940 test_restart(),
942 {ok, Db3} = couch_server:open("foo"),
943 test_update_docs(Db3, UuidsAndValues2),
945 ok = update_table_group_sync(Db3, TableGroupDoc1#doc.uuid),
947 {ok, OfficalCount5, TotalCount5} = fold_table(Db3, TableGroupDoc1#doc.uuid, "foo", nil, fwd, DumpTableFun, 0),
948 OfficalCount5 = TotalCount5,
949 io:format("Total rows dumped: ~w~n", [TotalCount5]),
954 BadResult = fold_table(Db3, "TableGroupDoc1#doc.uuid", "foot", nil, fwd, DumpTableFun, 0),
955 io:format("BadResult: ~p~n", [BadResult]),
957 BadResult2 = update_table_group_sync(Db3, "foo"),
958 io:format("BadResult2: ~p~n", [BadResult2]),
961 _UuidsAndValues3 = test_delete_docs(Db3, UuidsAndValues2, []),
963 ok = update_table_group_sync(Db3, TableGroupDoc1#doc.uuid),
965 {ok, _OfficalCount6, TotalCount6} = fold_table(Db3, TableGroupDoc1#doc.uuid, "foo", {"1", nil}, fwd, DumpTableFun, 0),
966 io:format("Total rows dumped: ~w~n", [TotalCount6]),
968 io:format("database info: ~p~n", [get_info(Db3)]).
972 test_write_docs(0, _Db, Output) ->
973 {ok, lists:reverse(Output)};
974 test_write_docs(N, Db, Output) ->
975 Doc = couch_doc:new(),
976 Doc2 = couch_doc:set_field(Doc, "foo", [integer_to_list(N)]),
977 Doc3 = couch_doc:set_field(Doc2, "num", [N]),
978 Doc4 = couch_doc:set_field(Doc3, "bar", ["blah"]),
979 save_doc(Db, Doc4),
980 test_write_docs(N-1, Db, [{Doc3#doc.uuid, N} | Output]).
983 test_delete_docs(_Db, [], Acc) ->
984 lists:reverse(Acc);
985 test_delete_docs(Db, [{Uuid, _Value} | Rest], Acc) ->
986 {ok, _RevId} = delete_doc(Db, Uuid, any),
987 test_delete_docs2(Db, Rest, Acc).
990 test_delete_docs2(_Db, [], Acc) ->
991 lists:reverse(Acc);
992 test_delete_docs2(Db, [{Uuid, Value} | Rest], Acc) ->
993 test_delete_docs(Db, Rest, [{Uuid, Value} | Acc]).
996 check_delete_docs(_Db, []) ->
998 check_delete_docs(Db, [{Uuid, _Value} | Rest]) ->
999 {not_found, deleted} = open_doc(Db, Uuid),
1000 check_delete_docs2(Db, Rest).
1003 check_delete_docs2(_Db, []) ->
1005 check_delete_docs2(Db, [{Uuid, _Value} | Rest]) ->
1006 {ok, _Doc} = open_doc(Db, Uuid),
1007 check_delete_docs(Db, Rest).
1009 test_update_docs(_Db, []) ->
1011 test_update_docs(Db, [{Uuid,_Value}| Rest]) ->
1012 {ok, Doc} = open_doc(Db, Uuid),
1013 {ok, [Num]} = couch_doc:get_field(Doc, "num"),
1014 Doc2 = couch_doc:set_field(Doc, "num", [Num, Num + 5]),
1015 save_doc(Db, Doc2),
1016 test_update_docs(Db, Rest).
1018 time_action(Fun) ->
1019 StartTime = now(),
1020 Result = Fun(),
1021 EndTime = now(),
1022 Diff = timer:now_diff(EndTime, StartTime),
1023 io:format("Action took ~f secs!~n", [Diff/1000000.0]),
1024 Result.