New branched document revision support in the back-end database. Replicator almost...
[couchdbimport.git] / CouchProjects / CouchDb / couch_db.erl
blobc692ddbd9d2b431e3128e9ea2301f23974c7b519
1 %% CouchDb
2 %% Copyright (C) 2006 Damien Katz
3 %%
4 %% This program is free software; you can redistribute it and/or
5 %% modify it under the terms of the GNU General Public License
6 %% as published by the Free Software Foundation; either version 2
7 %% of the License, or (at your option) any later version.
8 %%
9 %% This program is distributed in the hope that it will be useful,
10 %% but WITHOUT ANY WARRANTY; without even the implied warranty of
11 %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 %% GNU General Public License for more details.
13 %%
14 %% You should have received a copy of the GNU General Public License
15 %% along with this program; if not, write to the Free Software
16 %% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 -module(couch_db).
19 -behaviour(gen_server).
21 -export([test/0, test/1]).
22 -export([open/1,create/1,create/2,save_doc/2,save_doc/3,get_doc_info/2]).
23 -export([delete_doc/3,save_docs/3,open_doc/2,open_doc/3,close/1,enum_docs_since/4,enum_docs_since/5]).
24 -export([enum_docs/4,enum_docs/5]).
25 -export([update_table_group_sync/2,update_table_group/2,fold_table/6,fold_table/7,get_info/1]).
26 -export([update_loop/2]).
27 -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
28 -export([revision_list_to_trees/2 , merge_rev_trees/2 ]).
30 -include("couch_db.hrl").
32 -define(HEADER_SIZE, 2048). % size of each segment of the doubly written header
34 -record(db_header,
35 {write_version = 0,
36 last_update_seq = 0,
37 summary_stream_state = nil,
38 docinfo_by_uuid_btree_state = nil,
39 docinfo_by_seq_btree_state = nil,
40 table_group_btree_state = nil,
41 doc_count=0
42 }).
44 -record(db,
45 {fd=0,
46 supervisor=0,
47 header = #db_header{},
48 uncommitted_writes = false,
49 summary_stream,
50 docinfo_by_uuid_btree,
51 docinfo_by_seq_btree,
52 last_update_seq,
53 table_group_btree,
54 doc_count,
55 filepath,
56 table_group_mgr
57 }).
59 -record(main,
60 {db,
61 update_pid,
62 table_group_mgr
63 }).
65 start_link(Filepath, Options) ->
66 {ok, Super} = couch_db_sup:start_link(),
67 FdResult = supervisor:start_child(Super,
68 {couch_file,
69 {couch_file, open, [Filepath, Options]},
70 permanent,
71 brutal_kill,
72 worker,
73 [couch_file]}),
74 case FdResult of
75 {ok, Fd} ->
76 {ok, _Db} = supervisor:start_child(Super,
77 {couch_db,
78 %%%%%% below calls gen_server:start_link(couch_db, {Fd, Super, Options}, []).
79 {gen_server, start_link, [couch_db, {Filepath, Fd, Super, Options}, []]},
80 permanent,
81 10,
82 worker,
83 [couch_db]}),
84 {ok, Super};
85 {error, {enoent, _ChildInfo}} ->
86 % couldn't find file
87 exit(Super,kill),
88 {error, not_found};
89 {error, {Error, _ChildInfo}} ->
90 exit(Super,kill),
91 {error, Error};
92 Else ->
93 exit(Super,kill),
94 {error, Else}
95 end.
97 %%% Interface functions %%%
99 create(Filename) ->
100 create(Filename, []).
102 create(Filename, Options) when is_list(Options) ->
103 start_link(Filename, [create | Options]).
105 open(Filename) ->
106 start_link(Filename, []).
108 delete_doc(SupPid, Uuid, any) ->
109 case open_doc(SupPid, Uuid) of
110 {ok, Doc} ->
111 delete_doc(SupPid, Uuid, Doc#doc.revisions);
112 Error ->
113 Error
114 end;
115 delete_doc(SupPid, Uuid, Revisions) ->
116 Doc = #doc{uuid=Uuid, revisions=Revisions, deleted=true},
117 {ok, [Result]} = gen_server:call(db_pid(SupPid), {update_docs, [{[Doc], [new_edits]}], []}),
118 Result.
124 open_doc(_SupPid, #doc_info{deleted=true}) ->
125 {not_found, deleted};
126 open_doc(SupPid, UuidOrDocInfo) ->
127 open_doc_int(get_db(db_pid(SupPid)), UuidOrDocInfo, []).
129 open_doc(SupPid, Uuid, Options) ->
130 open_doc_int(get_db(db_pid(SupPid)), Uuid, Options).
132 get_doc_info(Db, Uuid) ->
133 case get_full_doc_info(Db, Uuid) of
134 {ok, {DocInfo, _RevisionTrees}} ->
135 {ok, DocInfo};
136 Else ->
137 Else
138 end.
140 get_full_doc_info(SupPid, Uuid) when is_pid(SupPid) ->
141 get_doc_info(get_db(db_pid(SupPid)), Uuid);
142 get_full_doc_info(Db, Uuid) ->
143 case couch_btree:lookup_single(Db#db.docinfo_by_uuid_btree, Uuid) of
144 {ok, {UpdateSeq, Rev, SummaryPointer, Conflicts, RevisionTrees}} ->
145 {ok, {#doc_info{uuid=Uuid, revision=Rev, update_seq=UpdateSeq, summary_pointer=SummaryPointer, deleted=(SummaryPointer==0), conflicts=Conflicts}, RevisionTrees}};
146 not_found ->
147 not_found
148 end.
150 get_info(SupPid) ->
151 gen_server:call(db_pid(SupPid), get_info).
153 save_doc(SupPid, Doc) ->
154 save_doc(SupPid, Doc, []).
156 save_doc(SupPid, Doc, Options) ->
157 {ok, [RetValue]} = save_docs(SupPid, [{[Doc], [new_edits | Options]}], []),
158 RetValue.
160 save_docs(SupPid, DocsOptions, Options) ->
161 gen_server:call(db_pid(SupPid), {update_docs, DocsOptions, Options}).
163 enum_docs_since(SupPid, SinceSeq, Direction, InFun, Ctx) ->
164 Db = get_db(db_pid(SupPid)),
165 EnumFun = fun({UpdateSeq, {Uuid, Rev, SummaryPointer, Conflicts}}, EnumCtx) ->
166 DocInfo = #doc_info{
167 uuid = Uuid,
168 revision = Rev,
169 update_seq = UpdateSeq,
170 summary_pointer = SummaryPointer,
171 conflicts = Conflicts,
172 deleted = SummaryPointer == 0
174 InFun(DocInfo, EnumCtx)
175 end,
176 couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, EnumFun, Ctx).
178 enum_docs_since(SupPid, SinceSeq, InFun, Ctx) ->
179 enum_docs_since(SupPid, SinceSeq, fwd, InFun, Ctx).
181 enum_docs(SupPid, StartUuid, Direction, InFun, Ctx) ->
182 Db = get_db(db_pid(SupPid)),
183 EnumFun = fun({Uuid, {UpdateSeq, Rev, SummaryPointer, Conflicts, _RevTrees}}, EnumCtx) ->
184 DocInfo = #doc_info{
185 uuid = Uuid,
186 revision = Rev,
187 update_seq = UpdateSeq,
188 summary_pointer = SummaryPointer,
189 deleted = SummaryPointer == 0,
190 conflicts = Conflicts
192 InFun(DocInfo, EnumCtx)
193 end,
194 couch_btree:fold(Db#db.docinfo_by_uuid_btree, StartUuid, Direction, EnumFun, Ctx).
196 enum_docs(SupPid, StartUuid, InFun, Ctx) ->
197 enum_docs(SupPid, StartUuid, fwd, InFun, Ctx).
200 update_table_group(SupPid, TableGroupDocUuid) ->
201 gen_server:call(db_pid(SupPid), {update_table_group, TableGroupDocUuid, fun(_Whatever) -> ok end}).
204 sync_update_notify(Pid, Ref, partial) ->
205 % We want to wait until complete
206 % so return a fun that calls ourself
207 fun(Status)-> sync_update_notify(Pid, Ref, Status) end;
208 sync_update_notify(Pid, Ref, complete) ->
209 Pid ! {Ref, ok};
210 sync_update_notify(Pid, Ref, Else) ->
211 Pid ! {Ref, Else}.
214 update_table_group_sync(SupPid, TableGroupDocUuid) ->
215 Pid = self(),
216 Ref = make_ref(),
217 UpdateFun = fun(Status)-> sync_update_notify(Pid, Ref, Status) end,
218 case gen_server:call(db_pid(SupPid), {update_table_group, TableGroupDocUuid, UpdateFun}, infinity) of
219 ok ->
220 receive
221 {Ref, Result} ->
222 Result
223 end;
224 Else ->
225 Else
226 end.
228 fold_table(SupPid, TableGroupDocUuid, TableName, Dir, Fun, Acc) ->
229 case gen_server:call(db_pid(SupPid), {get_table_group, TableGroupDocUuid}) of
230 {ok, TableGroup} ->
231 couch_table_group:fold(TableGroup, TableName, Dir, Fun, Acc);
232 Else ->
233 Else
234 end.
236 fold_table(SupPid, TableGroupDocUuid, TableName, StartKey, Dir, Fun, Acc) ->
237 case gen_server:call(db_pid(SupPid), {get_table_group, TableGroupDocUuid}) of
238 {ok, TableGroup} ->
239 couch_table_group:fold(TableGroup, TableName, StartKey, Dir, Fun, Acc);
240 Else ->
241 Else
242 end.
244 close(SupPid) ->
245 Ref = erlang:monitor(process, SupPid),
246 unlink(SupPid),
247 exit(SupPid, normal),
248 receive
249 {'DOWN', Ref, process, SupPid, _Reason} ->
251 end.
255 % server functions
257 init({Filepath, Fd, Supervisor, Options}) ->
258 case lists:member(create, Options) of
259 true ->
260 init_main(Filepath, Fd, Supervisor, nil);
261 false ->
262 {ok, Header} = read_header(Fd),
263 init_main(Filepath, Fd, Supervisor, Header)
264 end.
267 init_main(Filepath, Fd, Supervisor, nil) ->
268 % creates a new header and writes it to the file
269 {ok, _} = couch_file:expand(Fd, 2*(?HEADER_SIZE)),
270 Header = #db_header{},
271 {ok, Header2} = write_header(Fd, Header),
272 ok = couch_file:sync(Fd),
273 init_main(Filepath, Fd, Supervisor, Header2);
274 init_main(Filepath, Fd, Supervisor, Header) ->
275 {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd),
276 {ok, UuidBtree} = couch_btree:open(Header#db_header.docinfo_by_uuid_btree_state, Fd),
277 {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd),
278 {ok, TableGroupBtree} = couch_btree:open(Header#db_header.table_group_btree_state, Fd),
280 Db = #db{
281 fd=Fd,
282 supervisor=Supervisor,
283 header=Header,
284 summary_stream = SummaryStream,
285 docinfo_by_uuid_btree = UuidBtree,
286 docinfo_by_seq_btree = SeqBtree,
287 last_update_seq = Header#db_header.last_update_seq,
288 table_group_btree = TableGroupBtree,
289 doc_count = Header#db_header.doc_count,
290 filepath = Filepath
293 UpdatePid = spawn_link(couch_db, update_loop, [self(), Db]),
295 Pid=self(),
297 GetTableGroupInfoFun = fun(GroupKey) ->
298 get_table_group_info(get_db(Pid), GroupKey)
299 end,
301 UpdateTableGroupInfoFun = fun(GroupKey, UpdateStatus, GroupInfo) ->
302 UpdatePid ! {table_group_updated, GroupKey, UpdateStatus, GroupInfo},
304 end,
305 {ok, TableMgr} = couch_table_group:start_manager(Supervisor, Fd, GetTableGroupInfoFun, UpdateTableGroupInfoFun),
307 UpdatePid ! {set_table_group_mgr, TableMgr},
309 {ok, #main{db=Db, update_pid=UpdatePid, table_group_mgr=TableMgr}}.
311 terminate(_Reason, #main{db=Db} = Main) ->
312 Main#main.update_pid ! close,
313 couch_table_group:stop(Main#main.table_group_mgr),
314 couch_file:close(Db#db.fd).
316 handle_call({get_table_group, TableGroupDocUuid}, From, #main{db=Db}=Main) ->
317 case get_doc_info(Db, TableGroupDocUuid) of
318 {ok, #doc_info{deleted=true}} ->
319 {reply, {not_found, deleted}, Main};
320 {ok, DocInfo} ->
321 ok = couch_table_group:get_group_async(Main#main.table_group_mgr, DocInfo, From),
322 {noreply, Main};
323 not_found ->
324 {reply, {not_found, missing}, Main}
325 end;
326 handle_call({update_docs, DocActions, Options}, From, Main) ->
327 Main#main.update_pid ! {From, update_docs, DocActions, Options},
328 {noreply, Main};
329 handle_call(get_db, _From, #main{db=Db}=Main) ->
330 {reply, {ok, Db}, Main};
331 handle_call(get_info, _From, #main{db=Db}=Main) ->
332 InfoList = [
333 {doc_count, Db#db.doc_count},
334 {last_update_seq, Db#db.last_update_seq}
336 {reply, {ok, InfoList}, Main};
337 handle_call({update_table_group, Uuid, UpdateNotifFun}, _From, #main{db=Db}=Main) ->
338 case get_doc_info(Db, Uuid) of
339 {ok, DocInfo} ->
340 ok = couch_table_group:update_group(Main#main.table_group_mgr, DocInfo, UpdateNotifFun),
341 {reply, ok, Main};
342 Error ->
343 {reply, Error, Main}
344 end.
347 handle_cast({db_updated, NewDb}, Main) ->
348 {noreply, Main#main{db=NewDb}}.
350 %%% Internal function %%%
353 db_pid(SupPid)->
354 {error, {already_started, DbPid}} = supervisor:start_child(SupPid,
355 {couch_db,
356 {couch_db, sup_start_link, []},
357 permanent,
358 brutal_kill,
359 worker,
360 [couch_db]}),
361 DbPid.
364 update_loop(MainPid, Db) ->
365 receive
366 {set_table_group_mgr, TableMgr} ->
367 update_loop(MainPid, Db#db{table_group_mgr=TableMgr});
368 {OrigFrom, update_docs, DocActions, Options} ->
369 {ok, DocResults, Db2} = update_docs_int(Db, DocActions, Options),
370 ok = gen_server:cast(MainPid, {db_updated, Db2}),
371 gen_server:reply(OrigFrom, {ok, DocResults}),
372 update_loop(MainPid, Db2);
373 {table_group_updated, #doc_info{uuid=Uuid}=GroupDocInfo, _UpdateStatus, TableGroupInfo} ->
374 case get_doc_info(Db, GroupDocInfo#doc_info.uuid) of
375 {ok, GroupDocInfo} ->
376 % revision on disk matches the revision of the table group being updated
377 % so we save the info to disk
378 {ok, GroupBtree2} = couch_btree:add_remove(Db#db.table_group_btree, [{Uuid, TableGroupInfo}], []),
379 Db2 = Db#db{table_group_btree=GroupBtree2, uncommitted_writes=true},
380 {ok, Db3} = commit_outstanding(Db2),
381 ok = gen_server:cast(MainPid, {db_updated, Db3}),
382 update_loop(MainPid, Db3);
383 _Else ->
384 % doesn't match, don't save in btree
385 update_loop(MainPid, Db)
386 end;
387 close ->
388 % terminate loop
390 end.
392 get_table_group_info(#db{}=Db, #doc_info{uuid=Uuid}=DocInfo) ->
393 case couch_btree:lookup_single(Db#db.table_group_btree, Uuid) of
394 {ok, {TableQueries, TableGroupState}} ->
395 {ok, {TableQueries, TableGroupState}};
396 not_found ->
397 {ok, Doc} = open_doc_int(Db, DocInfo, []),
398 case couch_doc:get_table_queries(Doc) of
399 [] ->
400 {not_found, no_tables_found};
401 TableQueries ->
402 {ok, {TableQueries, nil}}
404 end.
406 get_db(DbPid) ->
407 {ok, Db} = gen_server:call(DbPid, get_db),
411 open_doc_int(Db, #doc_info{uuid=Uuid,revision=Rev,summary_pointer=SummaryPointer,deleted=Deleted}, Options) ->
412 case (not Deleted) orelse lists:member(allow_stub, Options) of
413 true ->
414 {ok, SummaryFields} = couch_stream:read_term(Db#db.summary_stream, SummaryPointer),
415 Doc = #doc{
416 uuid = Uuid,
417 revisions = [Rev],
418 summary_fields = dict:from_list(SummaryFields),
419 deleted = Deleted
421 {ok, Doc};
422 false ->
423 {not_found, deleted}
424 end;
425 open_doc_int(Db, {#doc_info{uuid=Uuid,revision=Rev,summary_pointer=SummaryPointer,deleted=Deleted}, RevisionTree}, Options) ->
426 case (not Deleted) orelse lists:member(allow_stub, Options) of
427 true ->
428 {ok, SummaryFields} = couch_stream:read_term(Db#db.summary_stream, SummaryPointer),
429 io:format("RevisionTree:~p~n", [RevisionTree]),
430 io:format("RevisionPath:~p~n", [get_rev_path(Rev, RevisionTree)]),
431 Doc = #doc{
432 uuid = Uuid,
433 revisions = get_rev_path(Rev, RevisionTree),
434 summary_fields = dict:from_list(SummaryFields),
435 deleted = Deleted
437 {ok, Doc};
438 false ->
439 {not_found, deleted}
440 end;
441 open_doc_int(Db, Uuid, Options) ->
442 case get_full_doc_info(Db, Uuid) of
443 {ok, {DocInfo, RevisionTree}} ->
444 open_doc_int(Db, {DocInfo, RevisionTree}, Options);
445 not_found ->
446 {not_found, missing}
447 end.
449 % revision tree functions
451 append_to_leaf_node([], _FindLeafRev, _NewRev) ->
453 append_to_leaf_node([{LeafRev, LeafDoc, []} | Rest], FindLeafRev, {NewRev, NewDoc}) when LeafRev == FindLeafRev->
454 [{LeafRev, LeafDoc, [{NewRev, NewDoc, []}]} | Rest];
455 append_to_leaf_node([Node | Rest], FindLeafRev, {NewRev, NewDoc}) ->
456 [Node | append_to_leaf_node(Rest, FindLeafRev, {NewRev, NewDoc})].
458 merge_rev_trees([], B) ->
459 {B, 0};
460 merge_rev_trees(A, []) ->
461 {A, 0};
462 merge_rev_trees([ATree | ANextTree], [BTree | BNextTree]) ->
463 {ARev, ADoc, ASubTrees} = ATree,
464 {BRev, _BDoc, BSubTrees} = BTree,
466 ARev == BRev ->
467 %same rev
468 {MergedSubTrees, SubTreesConflicts} = merge_rev_trees(ASubTrees, BSubTrees),
469 {MergedNextTrees, NextConflicts} = merge_rev_trees(ANextTree, BNextTree),
470 {[{ARev, ADoc, MergedSubTrees} | MergedNextTrees], SubTreesConflicts + NextConflicts};
471 ARev < BRev ->
472 {Merged, Conflicts} = merge_rev_trees(ANextTree, [BTree | BNextTree]),
473 {[ATree | Merged], Conflicts + 1};
474 true ->
475 {Merged, Conflicts} = merge_rev_trees([ATree | ANextTree], BNextTree),
476 {[BTree | Merged], Conflicts + 1}
477 end.
479 get_leaf_revs([], _, Acc) ->
480 Acc;
481 get_leaf_revs([{RevId, DocOrPtr, []} | RestTrees], Depth, Acc) ->
482 [{RevId, DocOrPtr, Depth} | get_leaf_revs(RestTrees, Depth, Acc)];
483 get_leaf_revs([{_RevId, _DocOrPtr, SubTrees} | RestTrees], Depth, Acc) ->
484 get_leaf_revs(SubTrees, Depth + 1, get_leaf_revs(RestTrees, Depth, Acc)).
486 % returns a list of revids that is that path to the first revision
487 % match in the trees. If none found, returns null list []
488 get_rev_path(_Rev, []) ->
490 get_rev_path(Rev, [{RevId, _, _SubTrees} | _RestTrees]) when Rev == RevId ->
491 [RevId];
492 get_rev_path(Rev, [{RevId, _, SubTrees} | RestTrees]) ->
493 case get_rev_path(Rev, SubTrees) of
494 [] ->
495 get_rev_path(Rev, RestTrees);
496 Revs ->
497 [RevId | Revs]
498 end.
501 revision_list_to_trees(Doc, [RevId]) ->
502 [{RevId, Doc, []}];
503 revision_list_to_trees(Doc, [RevId | Rest]) ->
504 [{RevId, 0, revision_list_to_trees(Doc, Rest)}] .
506 % counts the number of conflicts in a list sorted by priority
507 count_non_deletes([], Count) ->
508 Count;
509 count_non_deletes([{_RevId, 0, _Depth} | _Rest], Count) ->
510 Count; % first deletion, all done
511 count_non_deletes([{_RevId, _SummaryPointer, _Depth} | Rest], Count) ->
512 count_non_deletes(Rest, Count + 1).
514 winning_revision(Trees) ->
515 LeafRevs = get_leaf_revs(Trees, 0, []),
516 SortedLeafRevs =
517 lists:sort(fun({RevIdA, SummaryPointerA, DepthA}, {RevIdB, SummaryPointerB, DepthB}) ->
518 % sort descending by {not deleted, then Depth, then RevisionId}
519 ANotDeleted = case SummaryPointerA of 0 -> false; _ -> true end,
520 BNotDeleted = case SummaryPointerB of 0 -> false; _ -> true end,
521 A = {ANotDeleted, DepthA, RevIdA},
522 B = {BNotDeleted, DepthB, RevIdB},
523 A > B
524 end,
525 LeafRevs),
526 [{RevId, SummaryPointer, _Depth} | Rest] = SortedLeafRevs,
527 {RevId, SummaryPointer, count_non_deletes(Rest, 0)}.
529 flush_revision_trees(_Db, []) ->
531 flush_revision_trees(Db, [{RevId, #doc{deleted=true}, SubTrees} | RestTrees]) ->
532 [{RevId, 0, flush_revision_trees(Db, SubTrees)} | flush_revision_trees(Db, RestTrees)];
533 flush_revision_trees(Db, [{RevId, #doc{summary_fields=Fields}, SubTrees} | RestTrees]) ->
534 {ok, SummaryPointer} = couch_stream:write_term(Db#db.summary_stream, dict:to_list(Fields)),
535 [{RevId, SummaryPointer, flush_revision_trees(Db, SubTrees)} | flush_revision_trees(Db, RestTrees)];
536 flush_revision_trees(Db, [{RevId, SummaryPointer, SubTrees} | RestTrees]) ->
537 [{RevId, SummaryPointer, flush_revision_trees(Db, SubTrees)} | flush_revision_trees(Db, RestTrees)].
541 write_summaries(Db, [], InfoBySeqOut, RemoveSeqOut, InfoByUuidOut, DocResultOut) ->
542 {ok, lists:reverse(DocResultOut), lists:reverse(InfoByUuidOut), lists:reverse(RemoveSeqOut),
543 lists:reverse(InfoBySeqOut), Db};
544 write_summaries(Db, [{Uuid, {Docs, Options}, {DiskUpdateSeq, _DiskRevision, DiskSummaryPointer, _DiskConflicts, DiskRevTrees}} | Rest],
545 InfoBySeqOut, RemoveSeqOut, InfoByUuidOut, DocResultOut) ->
546 NewEdits = lists:member(new_edits, Options),
547 InputRevTrees =
548 lists:foldl(fun(#doc{revisions=Revisions}=Doc, AccTrees) ->
549 Revisions2 = case NewEdits of
550 true -> Revisions ++ [couch_util:rand32()]; % add new revision
551 false -> Revisions
552 end,
553 DocRevTree = revision_list_to_trees(Doc, Revisions2),
554 {NewRevTrees, _Conflicts} = merge_rev_trees(AccTrees, DocRevTree),
555 NewRevTrees
556 end, [], Docs),
558 {NewRevTrees, Conflicts} = merge_rev_trees(DiskRevTrees, InputRevTrees),
560 case NewEdits andalso Conflicts > 0 of
561 true ->
562 io:format("DiskRevTrees:~p~nInputRevTrees:~p~n", [DiskRevTrees, InputRevTrees]),
563 % don't save
564 DocResultOut2 = [conflict | DocResultOut],
565 write_summaries(Db, Rest, InfoBySeqOut, RemoveSeqOut, InfoByUuidOut, DocResultOut2);
566 false ->
567 FlushedTrees = flush_revision_trees(Db, NewRevTrees),
568 {WinningRevision, WinningSummaryPointer, NumConflicts} = winning_revision(FlushedTrees),
570 WasDiskDocument = DiskSummaryPointer /= 0,
571 IsDiskDocument = WinningSummaryPointer /= 0,
573 NewDocCount =
574 case {WasDiskDocument, IsDiskDocument} of
575 {true, true} -> % existing document, addition
576 Db#db.doc_count;
577 {false, false} -> % no document, deletion
578 Db#db.doc_count;
579 {false, true} -> % no document, addition
580 Db#db.doc_count + 1;
581 {true, false} -> % existing document, deletion
582 Db#db.doc_count - 1
583 end,
585 UpdateSeq = Db#db.last_update_seq + 1,
587 case DiskUpdateSeq of
588 0 ->
589 RemoveSeqOut2 = RemoveSeqOut;
590 _ ->
591 RemoveSeqOut2 = [DiskUpdateSeq | RemoveSeqOut]
592 end,
594 InfoBySeqOut2 = [{UpdateSeq, {Uuid, WinningRevision, WinningSummaryPointer, NumConflicts}} | InfoBySeqOut],
595 InfoByUuidOut2 = [{Uuid, {UpdateSeq, WinningRevision, WinningSummaryPointer, NumConflicts, FlushedTrees}} | InfoByUuidOut],
596 DocResultOut2 = [{ok, WinningRevision} | DocResultOut],
597 Db2 = Db#db{last_update_seq = UpdateSeq, uncommitted_writes=true, doc_count=NewDocCount},
598 write_summaries(Db2, Rest, InfoBySeqOut2, RemoveSeqOut2, InfoByUuidOut2, DocResultOut2)
599 end.
601 update_docs_int(Db, DocsOptionsList, Options) ->
602 #db{
603 docinfo_by_uuid_btree = DocInfoByUuidBTree,
604 docinfo_by_seq_btree = DocInfoBySeqBTree,
605 table_group_btree = TableGroupBTree,
606 table_group_mgr = TableGroupMgr
607 } = Db,
608 Uuids = [Uuid || {[#doc{uuid=Uuid} | _Rest], _Options} <- DocsOptionsList],
609 {ok, OldDocInfoResults} = couch_btree:lookup(DocInfoByUuidBTree, Uuids),
611 % create a list of {{Docs, UpdateOptions}, RevisionTree} tuples.
612 DocsAndOldDocInfo =
613 lists:zipwith3(fun(DocUuid, DocsOptions, OldDocInfoLookupResult) ->
614 case OldDocInfoLookupResult of
615 {ok, {_Uuid, {DiskUpdateSeq, DiskRevision, DiskSummaryPointer, DiskConflicts, DiskRevTrees}}} ->
616 {DocUuid, DocsOptions, {DiskUpdateSeq, DiskRevision, DiskSummaryPointer, DiskConflicts, DiskRevTrees}};
617 {not_found, _} ->
618 {DocUuid, DocsOptions, {0, 0, 0, 0, []}}
620 end,
621 Uuids, DocsOptionsList, OldDocInfoResults),
623 % now write out the documents
624 {ok, DocResults, InfoByUuid, RemoveSeqList, InfoBySeqList, Db2} =
625 write_summaries(Db, DocsAndOldDocInfo, [], [], [], []),
627 % and the indexes to the documents
628 {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeqList, RemoveSeqList),
629 {ok, DocInfoByUuidBTree2} = couch_btree:add_remove(DocInfoByUuidBTree, InfoByUuid, []),
631 % clear the computed table cache
632 UpdatedUuids = [UpdatedUuid || {UpdatedUuid, _DocInfo} <- InfoByUuid],
633 {ok, TableGroupBTree2} = couch_btree:add_remove(TableGroupBTree, [], UpdatedUuids),
635 % now notify the table group manager to discard any of the table groups it has in memory
637 OldDocInfos =
638 lists:map(fun({OldUuid, _Docs, {OldUpdateSeq, OldRev, OldSummaryPointer, OldConflicts, _OldRevTrees}}) ->
639 #doc_info{uuid=OldUuid, update_seq=OldUpdateSeq, revision=OldRev, summary_pointer=OldSummaryPointer, conflicts=OldConflicts}
640 end, DocsAndOldDocInfo),
642 ok = couch_table_group:free_groups(TableGroupMgr, OldDocInfos),
644 Db3 = Db2#db{
645 docinfo_by_uuid_btree = DocInfoByUuidBTree2,
646 docinfo_by_seq_btree = DocInfoBySeqBTree2,
647 table_group_btree = TableGroupBTree2,
648 uncommitted_writes = true
650 case lists:member(delay_commit, Options) of
651 true ->
652 Db4 = Db3;
653 false ->
654 {ok, Db4} = commit_outstanding(Db3)
655 end,
656 {ok, DocResults, Db4}.
661 commit_outstanding(#db{fd=Fd, uncommitted_writes=true, header=Header} = Db) ->
662 ok = couch_file:sync(Fd), % commit outstanding data
663 Header2 = Header#db_header{
664 last_update_seq = Db#db.last_update_seq,
665 summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
666 docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
667 docinfo_by_uuid_btree_state = couch_btree:get_state(Db#db.docinfo_by_uuid_btree),
668 table_group_btree_state = couch_btree:get_state(Db#db.table_group_btree),
669 doc_count = Db#db.doc_count
671 {ok, Header3} = write_header(Fd, Header2),
672 ok = couch_file:sync(Fd), % commit header to disk
673 Db2 = Db#db{
674 uncommitted_writes = false,
675 header = Header3
677 {ok, Db2};
678 commit_outstanding(#db{uncommitted_writes=false} = Db) ->
679 {ok, Db}.
681 write_header(Fd, Header) ->
682 H2 = Header#db_header{write_version = Header#db_header.write_version + 1},
683 Bin = term_to_binary(H2),
684 case size(Bin) > ?HEADER_SIZE of
685 true ->
686 % too big
687 {error, error_header_too_large};
688 false ->
689 ok = couch_file:pwrite(Fd,0,Bin),
690 ok = couch_file:pwrite(Fd,?HEADER_SIZE,Bin),
691 {ok, H2}
692 end.
695 read_header(Fd) ->
696 case read_header(Fd,0) of
697 {ok, Header} ->
698 {ok, Header};
699 _Error ->
700 read_header(Fd,?HEADER_SIZE)
701 end.
703 read_header(Fd,Offset) ->
704 {ok, Bin} = couch_file:pread(Fd,Offset,?HEADER_SIZE),
705 case binary_to_term(Bin) of
706 Header when is_record(Header, db_header) ->
707 {ok, Header};
708 Else ->
709 {error, {unknown_header_type, Else}}
710 end.
713 code_change(_OldVsn, State, _Extra) ->
714 {ok, State}.
716 handle_info(_Info, State) ->
717 {noreply, State}.
719 test_restart() ->
720 couch_server:stop(),
721 timer:sleep(100),
722 couch_server:start().
724 test() ->
725 test(100).
727 test(N) ->
728 couch_server:start(),
729 io:format("foo~n"),
730 {ok, Db} = couch_server:create("foo", [overwrite]),
732 TableGroupDoc0 = couch_doc:new(),
733 TableGroupDoc1 = TableGroupDoc0#doc{uuid = "TestTable"},
734 TableGroupDoc2 = couch_doc:set_field(TableGroupDoc1, "$table_foo", ["SELECT Foo != \"\"; COLUMN Foo; COLUMN BAR"]),
735 TableGroupDoc3 = couch_doc:set_field(TableGroupDoc2, "$table_bar", ["SELECT *; COLUMN Bar; COLUMN foo"]),
736 TableGroupDoc4 = couch_doc:set_field(TableGroupDoc3, "bar", ["BAR!!!"]),
737 {ok, NewRevId} = save_doc(Db, TableGroupDoc4),
738 {ok, _NewRevId2} = save_doc(Db, couch_doc:add_rev(TableGroupDoc4, NewRevId)),
740 ok = update_table_group_sync(Db, TableGroupDoc1#doc.uuid),
741 DumpTableFun = fun({_Uuid, _Rev}, Keys, ColumnValues, _Offset, _TotalTableCount, Count) -> io:format("Keys:~p, ColumnValues:~p~n", [Keys,ColumnValues]), {ok, Count + 1} end,
742 {ok, OfficalCount, TotalCount} = fold_table(Db, TableGroupDoc1#doc.uuid, "foo", nil, fwd, DumpTableFun, 0),
743 OfficalCount = TotalCount,
744 io:format("Total rows dumped: ~w~n", [TotalCount]),
746 test_restart(),
748 {ok, Db2} = couch_server:open("foo"),
749 {ok, Doc5} = open_doc(Db2, TableGroupDoc1#doc.uuid),
750 {ok, FooValue2} = couch_doc:get_field(Doc5, "bar"),
751 io:format("bar: ~p~n", [FooValue2]),
753 {ok, UuidsAndValues} = test_write_docs(N, Db2, []),
755 {ok, OfficalCount2, TotalCount2} = fold_table(Db2, TableGroupDoc1#doc.uuid, "foo", nil, fwd, DumpTableFun, 0),
756 OfficalCount2 = TotalCount2,
757 io:format("Total rows dumped: ~w~n", [TotalCount2]),
759 time_action(fun() ->
760 ok = update_table_group_sync(Db2, TableGroupDoc1#doc.uuid)
761 end),
764 io:format("database info: ~p~n", [get_info(Db2)]),
766 {ok, OfficalCount3, TotalCount3} = fold_table(Db2, TableGroupDoc1#doc.uuid, "foo", nil, fwd, DumpTableFun, 0),
767 OfficalCount3 = TotalCount3,
768 io:format("Total rows dumped: ~w~n", [TotalCount3]),
770 UuidsAndValues2 = test_delete_docs(Db2, UuidsAndValues, []),
772 ok = update_table_group_sync(Db2, TableGroupDoc1#doc.uuid),
774 {ok, OfficalCount4, TotalCount4} = fold_table(Db2, TableGroupDoc1#doc.uuid, "foo", nil, fwd, DumpTableFun, 0),
775 OfficalCount4 = TotalCount4,
776 io:format("Total rows dumped: ~w~n", [TotalCount4]),
779 check_delete_docs(Db2, UuidsAndValues),
781 test_restart(),
783 {ok, Db3} = couch_server:open("foo"),
784 test_update_docs(Db3, UuidsAndValues2),
786 ok = update_table_group_sync(Db3, TableGroupDoc1#doc.uuid),
788 {ok, OfficalCount5, TotalCount5} = fold_table(Db3, TableGroupDoc1#doc.uuid, "foo", nil, fwd, DumpTableFun, 0),
789 OfficalCount5 = TotalCount5,
790 io:format("Total rows dumped: ~w~n", [TotalCount5]),
795 BadResult = fold_table(Db3, "TableGroupDoc1#doc.uuid", "foot", nil, fwd, DumpTableFun, 0),
796 io:format("BadResult: ~p~n", [BadResult]),
798 BadResult2 = update_table_group_sync(Db3, "foo"),
799 io:format("BadResult2: ~p~n", [BadResult2]),
802 _UuidsAndValues3 = test_delete_docs(Db3, UuidsAndValues2, []),
804 ok = update_table_group_sync(Db3, TableGroupDoc1#doc.uuid),
806 {ok, _OfficalCount6, TotalCount6} = fold_table(Db3, TableGroupDoc1#doc.uuid, "foo", {"1", nil}, fwd, DumpTableFun, 0),
807 io:format("Total rows dumped: ~w~n", [TotalCount6]),
809 io:format("database info: ~p~n", [get_info(Db3)]).
813 test_write_docs(0, _Db, Output) ->
814 {ok, lists:reverse(Output)};
815 test_write_docs(N, Db, Output) ->
816 Doc = couch_doc:new(),
817 Doc2 = couch_doc:set_field(Doc, "foo", [integer_to_list(N)]),
818 Doc3 = couch_doc:set_field(Doc2, "num", [N]),
819 Doc4 = couch_doc:set_field(Doc3, "bar", ["blah"]),
820 save_doc(Db, Doc4),
821 test_write_docs(N-1, Db, [{Doc3#doc.uuid, N} | Output]).
824 test_delete_docs(_Db, [], Acc) ->
825 lists:reverse(Acc);
826 test_delete_docs(Db, [{Uuid, _Value} | Rest], Acc) ->
827 {ok, _RevId} = delete_doc(Db, Uuid, any),
828 test_delete_docs2(Db, Rest, Acc).
831 test_delete_docs2(_Db, [], Acc) ->
832 lists:reverse(Acc);
833 test_delete_docs2(Db, [{Uuid, Value} | Rest], Acc) ->
834 test_delete_docs(Db, Rest, [{Uuid, Value} | Acc]).
837 check_delete_docs(_Db, []) ->
839 check_delete_docs(Db, [{Uuid, _Value} | Rest]) ->
840 {not_found, deleted} = open_doc(Db, Uuid),
841 check_delete_docs2(Db, Rest).
844 check_delete_docs2(_Db, []) ->
846 check_delete_docs2(Db, [{Uuid, _Value} | Rest]) ->
847 {ok, _Doc} = open_doc(Db, Uuid),
848 check_delete_docs(Db, Rest).
850 test_update_docs(_Db, []) ->
852 test_update_docs(Db, [{Uuid,_Value}| Rest]) ->
853 {ok, Doc} = open_doc(Db, Uuid),
854 {ok, [Num]} = couch_doc:get_field(Doc, "num"),
855 Doc2 = couch_doc:set_field(Doc, "num", [Num, Num + 5]),
856 save_doc(Db, Doc2),
857 test_update_docs(Db, Rest).
859 time_action(Fun) ->
860 StartTime = now(),
861 Result = Fun(),
862 EndTime = now(),
863 Diff = timer:now_diff(EndTime, StartTime),
864 io:format("Action took ~f secs!~n", [Diff/1000000.0]),
865 Result.