2 %% Copyright (C) 2006 Damien Katz
4 %% This program is free software; you can redistribute it and/or
5 %% modify it under the terms of the GNU General Public License
6 %% as published by the Free Software Foundation; either version 2
7 %% of the License, or (at your option) any later version.
9 %% This program is distributed in the hope that it will be useful,
10 %% but WITHOUT ANY WARRANTY; without even the implied warranty of
11 %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 %% GNU General Public License for more details.
14 %% You should have received a copy of the GNU General Public License
15 %% along with this program; if not, write to the Free Software
16 %% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
19 -behaviour(gen_server
).
21 -export([open
/2,create
/2,create
/3,save_doc
/2,save_doc
/3,get_doc_info
/2, save_doc_revs
/3]).
22 -export([save_docs
/2, save_docs
/3]).
23 -export([delete_doc
/3,open_doc
/2,open_doc
/3,close
/1,enum_docs_since
/4,enum_docs_since
/5]).
24 -export([enum_docs
/4,enum_docs
/5, open_doc_revs
/4, get_missing_revs
/2]).
25 -export([update_view_group_sync
/2,update_view_group
/2,fold_view
/6,fold_view
/7,get_info
/1]).
26 -export([update_temp_view_group_sync
/2, fold_temp_view
/5,fold_temp_view
/6]).
27 -export([update_loop
/2]).
28 -export([init
/1,terminate
/2,handle_call
/3,handle_cast
/2,code_change
/3,handle_info
/2]).
29 -export([revision_list_to_trees
/2 , merge_rev_trees
/2 ]).
31 -include("couch_db.hrl").
33 -define(HEADER_SIZE
, 2048). % size of each segment of the doubly written header
35 -define(DB_FILE_SIG
, <<$g
, $m
, $k
, 0>>). % fixed signature at the beginning of each file header
37 -define(FILE_VERSION
, 1). % the current file version this code base uses.
38 % In the future CouchDb will have the ability
39 % to use/convert different file versions.
45 summary_stream_state
= nil
,
46 docinfo_by_uuid_btree_state
= nil
,
47 docinfo_by_seq_btree_state
= nil
,
48 view_group_btree_state
= nil
,
49 local_docs_btree_state
= nil
,
56 header
= #db_header
{},
57 uncommitted_writes
= false
,
59 docinfo_by_uuid_btree
,
76 start_link(DbName
, Filepath
, Options
) ->
77 {ok
, Super
} = couch_db_sup:start_link(),
78 FdResult
= supervisor:start_child(Super
,
80 {couch_file
, open
, [Filepath
, Options
]},
87 {ok
, _Db
} = supervisor:start_child(Super
,
89 %%%%%% below calls gen_server:start_link(couch_db, {Fd, Super, Options}, []).
90 {gen_server
, start_link
, [couch_db
, {DbName
, Fd
, Super
, Options
}, []]},
96 {error
, {enoent
, _ChildInfo
}} ->
100 {error
, {Error
, _ChildInfo
}} ->
108 %%% Interface functions %%%
110 create(Filepath
, Options
) ->
111 create(Filepath
, Filepath
, Options
).
113 create(DbName
, Filepath
, Options
) when is_list(Options
) ->
114 start_link(DbName
, Filepath
, [create
| Options
]).
116 open(DbName
, Filepath
) ->
117 start_link(DbName
, Filepath
, []).
119 delete_doc(SupPid
, Uuid
, Revisions
) ->
120 case open_doc_revs(SupPid
, Uuid
, Revisions
, []) of
122 DeletedDocs
= [Doc#doc
{deleted
=true
} || {ok
, #doc
{deleted
=false
}=Doc
} <- DocResults
],
123 save_doc_revs(SupPid
, DeletedDocs
, [new_edits
]);
128 open_doc(_SupPid
, #doc_info
{deleted
=true
}) ->
129 {not_found
, deleted
};
130 open_doc(SupPid
, UuidOrDocInfo
) ->
131 open_doc(SupPid
, UuidOrDocInfo
, []).
133 open_doc(SupPid
, Uuid
, Options
) ->
134 open_doc_int(get_db(db_pid(SupPid
)), Uuid
, Options
).
136 open_doc_revs(SupPid
, Uuid
, Revs
, Options
) ->
137 open_doc_revs_int(get_db(db_pid(SupPid
)), Uuid
, Revs
, Options
).
139 get_missing_revs(SupPid
, UuidRevsList
) ->
140 Uuids
= [Uuid1
|| {Uuid1
, _Revs
} <- UuidRevsList
],
141 {ok
, FullDocInfoResults
} = get_full_doc_infos(SupPid
, Uuids
),
143 lists:zipwith( fun({Uuid
, Revs
}, FullDocInfoResult
) ->
144 case FullDocInfoResult
of
145 {ok
, {_
, RevisionTrees
}} ->
146 {Uuid
, find_missing_revisions(Revs
, RevisionTrees
)};
150 end, UuidRevsList
, FullDocInfoResults
),
153 get_doc_info(Db
, Uuid
) ->
154 case get_full_doc_info(Db
, Uuid
) of
155 {ok
, {DocInfo
, _RevisionTrees
}} ->
161 get_full_doc_info(Db
, Uuid
) ->
162 case get_full_doc_infos(Db
, [Uuid
]) of
163 {ok
, [{ok
, DocInfo
}]} ->
165 {ok
, [{not_found
, Uuid
}]} ->
169 get_full_doc_infos(SupPid
, Uuids
) when is_pid(SupPid
) ->
170 get_full_doc_infos(get_db(db_pid(SupPid
)), Uuids
);
171 get_full_doc_infos(#db
{}=Db
, Uuids
) ->
172 {ok
, LookupResults
} = couch_btree:lookup(Db#db
.docinfo_by_uuid_btree
, Uuids
),
175 fun({ok
, {Uuid
, {UpdateSeq
, Rev
, SummaryPointer
, RevisionTrees
}}}) ->
176 {Conflicts
, DeletedConflicts
} = get_conflict_revs(RevisionTrees
),
183 update_seq
=UpdateSeq
,
184 summary_pointer
=SummaryPointer
,
185 deleted
=(summary_ptr_type(SummaryPointer
)==deletion
),
186 conflict_revs
=Conflicts
,
187 deleted_conflict_revs
=DeletedConflicts
192 ({not_found
, {Uuid
, _
}}) ->
199 gen_server:call(db_pid(SupPid
), get_info
).
201 save_doc(SupPid
, Doc
) ->
202 save_doc(SupPid
, Doc
, []).
204 save_doc(SupPid
, Doc
, Options
) ->
205 {ok
, [[Result
]]} = save_docs(SupPid
, [{[Doc
], [new_edits
| Options
]}], Options
),
208 save_doc_revs(SupPid
, Docs
, Options
) ->
209 {ok
, [RetValue
]} = save_docs(SupPid
, [{Docs
, Options
}], Options
),
212 save_docs(SupPid
, DocAndOptions
) ->
213 save_docs(SupPid
, DocAndOptions
, []).
215 save_docs(SupPid
, DocsAndOptions
, TransactionOptions
) ->
216 % flush unwritten binaries to disk.
217 Db
= get_db(db_pid(SupPid
)),
218 DocsAndOptions2
= [{[doc_flush_binaries(Doc
, Db#db
.fd
) || Doc
<- Docs
], Options
} || {Docs
, Options
} <- DocsAndOptions
],
219 {ok
, _RetValues
} = gen_server:call(db_pid(SupPid
), {update_docs
, DocsAndOptions2
, TransactionOptions
}).
222 doc_flush_binaries(Doc
, Fd
) ->
223 % calc size of binaries to write out
224 Bins
= Doc#doc
.attachments
,
227 fun(BinValue
, SizeAcc
) ->
229 {_Key
, {_Type
, {Fd0
, _StreamPointer
, _Len
}}} when Fd0
== Fd
->
230 % already written to our file, nothing to write
232 {_Key
, {_Type
, {_OtherFd
, _StreamPointer
, Len
}}} ->
233 % written to a different file
235 {_Key
, {_Type
, Bin
}} when is_binary(Bin
) ->
241 {ok
, OutputStream
} = couch_stream:open(Fd
),
242 ok
= couch_stream:ensure_buffer(OutputStream
, PreAllocSize
),
245 fun({Key
, {Type
, BinValue
}}) ->
248 {Fd0
, StreamPointer
, Len
} when Fd0
== Fd
->
249 % already written to our file, nothing to write
250 {Fd
, StreamPointer
, Len
};
251 {OtherFd
, StreamPointer
, Len
} ->
252 % written to a different file (or a closed file
253 % instance, which will cause an error)
254 {ok
, {NewStreamPointer
, Len
}, _EndSp
} =
255 couch_stream:foldl(OtherFd
, StreamPointer
, Len
,
256 fun(Bin
, {BeginPointer
, SizeAcc
}) ->
257 {ok
, Pointer
} = couch_stream:write(OutputStream
, Bin
),
259 0 -> % this was the first write, record the pointer
260 {ok
, {Pointer
, size(Bin
)}};
262 {ok
, {BeginPointer
, SizeAcc
+ size(Bin
)}}
266 {Fd
, NewStreamPointer
, Len
};
267 Bin
when is_binary(Bin
), size(Bin
) > 0 ->
268 {ok
, StreamPointer
} = couch_stream:write(OutputStream
, Bin
),
269 {Fd
, StreamPointer
, size(Bin
)}
271 {Key
, {Type
, NewBinValue
}}
274 {ok
, _FinalPos
} = couch_stream:close(OutputStream
),
276 Doc#doc
{attachments
= NewBins
}.
278 enum_docs_since(SupPid
, SinceSeq
, Direction
, InFun
, Ctx
) ->
279 Db
= get_db(db_pid(SupPid
)),
280 EnumFun
= fun({UpdateSeq
, {Uuid
, Rev
, SummaryPointer
, ConflictRevs
, DeletedConflictRevs
}}, EnumCtx
) ->
284 update_seq
= UpdateSeq
,
285 summary_pointer
= SummaryPointer
,
286 conflict_revs
= ConflictRevs
,
287 deleted_conflict_revs
= DeletedConflictRevs
,
288 deleted
= (summary_ptr_type(SummaryPointer
) == deletion
)
290 InFun(DocInfo
, EnumCtx
)
293 couch_btree:fold(Db#db
.docinfo_by_seq_btree
, SinceSeq
+ 1, Direction
, EnumFun
, Ctx
).
295 enum_docs_since(SupPid
, SinceSeq
, InFun
, Ctx
) ->
296 enum_docs_since(SupPid
, SinceSeq
, fwd
, InFun
, Ctx
).
298 enum_docs(SupPid
, StartUuid
, Direction
, InFun
, Ctx
) ->
299 Db
= get_db(db_pid(SupPid
)),
300 EnumFun
= fun({Uuid
, {UpdateSeq
, Rev
, SummaryPointer
, RevTrees
}}, EnumCtx
) ->
301 {ConflictRevs
, DeletedConflictRevs
} = get_conflict_revs(RevTrees
),
305 update_seq
= UpdateSeq
,
306 summary_pointer
= SummaryPointer
,
307 deleted
= (summary_ptr_type(SummaryPointer
) == deletion
),
308 conflict_revs
= ConflictRevs
,
309 deleted_conflict_revs
= DeletedConflictRevs
311 InFun(DocInfo
, EnumCtx
)
313 couch_btree:fold(Db#db
.docinfo_by_uuid_btree
, StartUuid
, Direction
, EnumFun
, Ctx
).
315 enum_docs(SupPid
, StartUuid
, InFun
, Ctx
) ->
316 enum_docs(SupPid
, StartUuid
, fwd
, InFun
, Ctx
).
319 update_view_group(SupPid
, ViewGroupDocUuid
) ->
320 gen_server:call(db_pid(SupPid
), {update_view_group
, ViewGroupDocUuid
, fun(_Whatever
) -> ok
end}).
323 sync_update_notify(Pid
, Ref
, partial
) ->
324 % We want to wait until complete
325 % so return a fun that calls ourself
326 fun(Status
)-> sync_update_notify(Pid
, Ref
, Status
) end;
327 sync_update_notify(Pid
, Ref
, complete
) ->
329 sync_update_notify(Pid
, Ref
, Else
) ->
333 update_view_group_sync(SupPid
, ViewGroupDocUuid
) ->
334 update_view_group_sync0(SupPid
, update_view_group
, ViewGroupDocUuid
).
336 update_temp_view_group_sync(SupPid
, MapFunSrc
) ->
337 update_view_group_sync0(SupPid
, update_temp_view_group
, MapFunSrc
).
339 update_view_group_sync0(SupPid
, Type
, Id
) ->
342 UpdateFun
= fun(Status
)-> sync_update_notify(Pid
, Ref
, Status
) end,
343 case gen_server:call(db_pid(SupPid
), {Type
, Id
, UpdateFun
}, infinity
) of
353 fold_view(SupPid
, ViewGroupDocUuid
, ViewName
, Dir
, Fun
, Acc
) ->
354 case gen_server:call(db_pid(SupPid
), {get_view_group
, ViewGroupDocUuid
}) of
356 couch_view_group:fold(ViewGroup
, ViewName
, Dir
, Fun
, Acc
);
361 fold_view(SupPid
, ViewGroupDocUuid
, ViewName
, StartKey
, Dir
, Fun
, Acc
) ->
362 case gen_server:call(db_pid(SupPid
), {get_view_group
, ViewGroupDocUuid
}) of
364 couch_view_group:fold(ViewGroup
, ViewName
, StartKey
, Dir
, Fun
, Acc
);
369 fold_temp_view(SupPid
, Src
, Dir
, Fun
, Acc
) ->
370 case gen_server:call(db_pid(SupPid
), {get_temp_view_group
, Src
}) of
372 couch_view_group:fold(ViewGroup
, Src
, Dir
, Fun
, Acc
);
377 fold_temp_view(SupPid
, Src
, StartKey
, Dir
, Fun
, Acc
) ->
378 case gen_server:call(db_pid(SupPid
), {get_temp_view_group
, Src
}) of
380 couch_view_group:fold(ViewGroup
, Src
, StartKey
, Dir
, Fun
, Acc
);
386 Ref
= erlang:monitor(process, SupPid
),
388 exit(SupPid
, normal
),
390 {'DOWN', Ref
, process, SupPid
, _Reason
} ->
398 init({Filepath
, Fd
, Supervisor
, Options
}) ->
399 case lists:member(create
, Options
) of
401 init_main(Filepath
, Fd
, Supervisor
, nil
);
403 {ok
, Header
} = read_header(Filepath
, Fd
),
404 init_main(Filepath
, Fd
, Supervisor
, Header
)
408 init_main(Filepath
, Fd
, Supervisor
, nil
) ->
409 % creates a new header and writes it to the file
410 {ok
, _
} = couch_file:expand(Fd
, 2*(?HEADER_SIZE
)),
411 Header
= #db_header
{},
412 {ok
, Header2
} = write_header(Fd
, Header
),
413 ok
= couch_file:sync(Fd
),
414 init_main(Filepath
, Fd
, Supervisor
, Header2
);
415 init_main(Filepath
, Fd
, Supervisor
, Header
) ->
416 {ok
, SummaryStream
} = couch_stream:open(Header#db_header
.summary_stream_state
, Fd
),
417 ok
= couch_stream:set_min_buffer(SummaryStream
, 10000),
418 {ok
, UuidBtree
} = couch_btree:open(Header#db_header
.docinfo_by_uuid_btree_state
, Fd
),
419 {ok
, SeqBtree
} = couch_btree:open(Header#db_header
.docinfo_by_seq_btree_state
, Fd
),
420 {ok
, LocalDocsBtree
} = couch_btree:open(Header#db_header
.local_docs_btree_state
, Fd
),
421 {ok
, ViewGroupBtree
} = couch_btree:open(Header#db_header
.view_group_btree_state
, Fd
),
425 supervisor
=Supervisor
,
427 summary_stream
= SummaryStream
,
428 docinfo_by_uuid_btree
= UuidBtree
,
429 docinfo_by_seq_btree
= SeqBtree
,
430 local_docs_btree
= LocalDocsBtree
,
431 last_update_seq
= Header#db_header
.last_update_seq
,
432 view_group_btree
= ViewGroupBtree
,
433 doc_count
= Header#db_header
.doc_count
,
437 UpdatePid
= spawn_link(couch_db
, update_loop
, [self(), Db
]),
441 GetViewGroupInfoFun
= fun(GroupKey
) ->
442 get_view_group_info(get_db(Pid
), GroupKey
)
445 GetTempViewGroupInfoFun
= fun(GroupKey
) ->
446 % for temp views, the groupkey is the source. and we never persist info
447 Type
= lists:takewhile(fun($
|) -> false
; (_
) -> true
end, GroupKey
),
448 [$
| | Function
] = lists:dropwhile(fun($
|) -> false
; (_
) -> true
end, GroupKey
),
449 {ok
, {{Type
, [{GroupKey
, Function
}]}, nil
}}
452 UpdateViewGroupInfoFun
= fun(GroupKey
, UpdateStatus
, GroupInfo
) ->
453 % send the updated view group info to the update process
454 UpdatePid
! {view_group_updated
, GroupKey
, UpdateStatus
, GroupInfo
},
458 UpdateTempViewGroupInfoFun
= fun(_GroupKey
, _UpdateStatus
, _GroupInfo
) ->
461 {ok
, TempFd
} = couch_file:open(Filepath
++ ".temp", [create
,overwrite
]),
462 {ok
, ViewMgr
} = couch_view_group:start_manager(Supervisor
, Fd
, GetViewGroupInfoFun
, UpdateViewGroupInfoFun
),
463 {ok
, TempViewMgr
} = couch_view_group:start_manager(Supervisor
, TempFd
, GetTempViewGroupInfoFun
, UpdateTempViewGroupInfoFun
),
465 UpdatePid
! {set_view_group_mgr
, ViewMgr
},
467 {ok
, #main
{db
=Db
, update_pid
=UpdatePid
, view_group_mgr
=ViewMgr
, temp_view_group_mgr
=TempViewMgr
}}.
469 terminate(_Reason
, #main
{db
=Db
} = Main
) ->
470 Main#main
.update_pid
! close
,
471 couch_view_group:stop(Main#main
.view_group_mgr
),
472 couch_view_group:stop(Main#main
.temp_view_group_mgr
),
473 couch_file:close(Db#db
.fd
).
475 handle_call({get_view_group
, ViewGroupDocUuid
}, From
, #main
{db
=Db
}=Main
) ->
476 case get_doc_info(Db
, ViewGroupDocUuid
) of
477 {ok
, #doc_info
{deleted
=true
}} ->
478 {reply
, {not_found
, deleted
}, Main
};
480 ok
= couch_view_group:get_group_async(Main#main
.view_group_mgr
, DocInfo
, From
),
483 {reply
, {not_found
, missing
}, Main
}
485 handle_call({get_temp_view_group
, MapFunSrc
}, From
, Main
) ->
486 ok
= couch_view_group:get_group_async(Main#main
.temp_view_group_mgr
, MapFunSrc
, From
),
488 handle_call({update_docs
, DocActions
, Options
}, From
, Main
) ->
489 Main#main
.update_pid
! {From
, update_docs
, DocActions
, Options
},
491 handle_call(get_db
, _From
, #main
{db
=Db
}=Main
) ->
492 {reply
, {ok
, Db
}, Main
};
493 handle_call(get_info
, _From
, #main
{db
=Db
}=Main
) ->
495 {doc_count
, Db#db
.doc_count
},
496 {last_update_seq
, Db#db
.last_update_seq
}
498 {reply
, {ok
, InfoList
}, Main
};
499 handle_call({update_view_group
, Uuid
, UpdateNotifFun
}, _From
, #main
{db
=Db
}=Main
) ->
500 case get_doc_info(Db
, Uuid
) of
502 ok
= couch_view_group:update_group(Main#main
.view_group_mgr
, DocInfo
, UpdateNotifFun
),
507 handle_call({update_temp_view_group
, Src
, UpdateNotifFun
}, _From
, Main
) ->
508 {reply
, couch_view_group:update_group(Main#main
.temp_view_group_mgr
, Src
, UpdateNotifFun
), Main
};
509 handle_call({db_updated
, NewDb
}, _From
, Main
) ->
510 {reply
, ok
, Main#main
{db
=NewDb
}}.
513 handle_cast(foo
, Main
) ->
516 %%% Internal function %%%
520 {error
, {already_started
, DbPid
}} = supervisor:start_child(SupPid
,
522 {couch_db
, sup_start_link
, []},
530 update_loop(MainPid
, Db
) ->
532 {set_view_group_mgr
, ViewMgr
} ->
533 update_loop(MainPid
, Db#db
{view_group_mgr
=ViewMgr
});
534 {OrigFrom
, update_docs
, DocActions
, Options
} ->
535 {ok
, DocResults
, Db2
} = update_docs_int(Db
, DocActions
, Options
),
536 ok
= gen_server:call(MainPid
, {db_updated
, Db2
}),
537 couch_db_update_notifier:notify_all(Db2#db
.name
),
538 gen_server:reply(OrigFrom
, {ok
, DocResults
}),
539 update_loop(MainPid
, Db2
);
540 {view_group_updated
, #doc_info
{uuid
=Uuid
}=GroupDocInfo
, _UpdateStatus
, ViewGroupInfo
} ->
541 case get_doc_info(Db
, GroupDocInfo#doc_info
.uuid
) of
542 {ok
, GroupDocInfo
} ->
543 % revision on disk matches the revision of the view group being updated
544 % so we save the info to disk
545 {ok
, GroupBtree2
} = couch_btree:add_remove(Db#db
.view_group_btree
, [{Uuid
, ViewGroupInfo
}], []),
546 Db2
= Db#db
{view_group_btree
=GroupBtree2
, uncommitted_writes
=true
},
547 {ok
, Db3
} = commit_outstanding(Db2
),
548 ok
= gen_server:call(MainPid
, {db_updated
, Db3
}),
549 update_loop(MainPid
, Db3
);
551 % doesn't match, don't save in btree
552 update_loop(MainPid
, Db
)
559 get_view_group_info(#db
{}=Db
, #doc_info
{uuid
=Uuid
}=DocInfo
) ->
560 case couch_btree:lookup_single(Db#db
.view_group_btree
, Uuid
) of
561 {ok
, {ViewQueries
, ViewGroupState
}} ->
562 {ok
, {ViewQueries
, ViewGroupState
}};
564 {ok
, Doc
} = open_doc_int(Db
, DocInfo
, []),
565 case couch_doc:get_view_functions(Doc
) of
567 {not_found
, no_views_found
};
574 {ok
, Db
} = gen_server:call(DbPid
, get_db
),
577 open_doc_revs_int(Db
, Uuid
, Revs
, Options
) ->
578 case get_full_doc_info(Db
, Uuid
) of
579 {ok
, {_DocInfo
, RevisionTree
}} ->
580 {FoundRevs
, MissingRevs
} =
583 {get_all_leafs(RevisionTree
, []), []};
585 case lists:member(latest
, Options
) of
587 get_rev_leafs(RevisionTree
, Revs
, []);
589 get_revs(RevisionTree
, Revs
, [])
593 lists:map(fun({Rev
, FoundSummaryPtr
, FoundRevPath
}) ->
594 case summary_ptr_type(FoundSummaryPtr
) of
596 % we have the rev in our but know nothing about it
597 {{not_found
, missing
}, Rev
};
599 {ok
, make_doc(Db
, Uuid
, FoundSummaryPtr
, FoundRevPath
)};
601 {ok
, make_doc(Db
, Uuid
, FoundSummaryPtr
, FoundRevPath
)}
604 Results
= FoundResults
++ [{{not_found
, missing
}, MissingRev
} || MissingRev
<- MissingRevs
],
606 not_found
when Revs
== all
->
609 {ok
, [{{not_found
, missing
}, Rev
} || Rev
<- Revs
]}
612 open_doc_int(Db
, ?NON_REP_DOC_PREFIX
++ Uuid
, _Options
) ->
613 case couch_btree:lookup_single(Db#db
.local_docs_btree
, Uuid
) of
615 {ok
, #doc
{uuid
=?NON_REP_DOC_PREFIX
++ Uuid
, body
=BodyData
}};
619 open_doc_int(Db
, #doc_info
{revision
=Rev
,summary_pointer
=SummaryPointer
}=DocInfo
, Options
) ->
620 open_doc_int(Db
, {DocInfo
, [{Rev
, SummaryPointer
, []}]}, Options
);
621 open_doc_int(Db
, {#doc_info
{uuid
=Uuid
,revision
=Rev
,summary_pointer
=SummaryPointer
,deleted
=Deleted
}, RevisionTree
}, Options
) ->
622 case (not Deleted
) orelse
lists:member(allow_stub
, Options
) of
624 {[{_
,_
, RevPath
}], []} = get_revs(RevisionTree
, [Rev
], []),
625 {ok
, make_doc(Db
, Uuid
, SummaryPointer
, RevPath
)};
629 open_doc_int(Db
, Uuid
, Options
) ->
630 case get_full_doc_info(Db
, Uuid
) of
631 {ok
, {DocInfo
, RevisionTree
}} ->
632 open_doc_int(Db
, {DocInfo
, RevisionTree
}, Options
);
637 % revision tree functions
639 merge_rev_trees([], B
) ->
641 merge_rev_trees(A
, []) ->
643 merge_rev_trees([ATree
| ANextTree
], [BTree
| BNextTree
]) ->
644 {ARev
, ADoc
, ASubTrees
} = ATree
,
645 {BRev
, _BDoc
, BSubTrees
} = BTree
,
649 {MergedSubTrees
, SubTreesConflicts
} = merge_rev_trees(ASubTrees
, BSubTrees
),
650 {MergedNextTrees
, NextConflicts
} = merge_rev_trees(ANextTree
, BNextTree
),
651 {[{ARev
, ADoc
, MergedSubTrees
} | MergedNextTrees
], SubTreesConflicts
+ NextConflicts
};
653 {Merged
, Conflicts
} = merge_rev_trees(ANextTree
, [BTree
| BNextTree
]),
654 {[ATree
| Merged
], Conflicts
+ 1};
656 {Merged
, Conflicts
} = merge_rev_trees([ATree
| ANextTree
], BNextTree
),
657 {[BTree
| Merged
], Conflicts
+ 1}
660 find_missing_revisions([], _Trees
) ->
662 find_missing_revisions(SrcRevs
, []) ->
664 find_missing_revisions(SrcRevs
, [{RevId
, _
, SubTrees
} | RestTrees
]) ->
665 SrcRevs2
= lists:delete(RevId
, SrcRevs
),
666 SrcRevs3
= find_missing_revisions(SrcRevs2
, SubTrees
),
667 find_missing_revisions(SrcRevs3
, RestTrees
).
669 % get the latest leaf revisions for the found revision.
670 % Often these are the same revision.
671 get_rev_leafs(_Trees
, [], _RevPathAcc
) ->
673 get_rev_leafs([], RevsToGet
, _RevPathAcc
) ->
675 get_rev_leafs([{RevId
, _SummaryPtr
, SubTrees
}=Tree
| RestTrees
], RevsToGet
, RevPathAcc
) ->
676 case lists:member(RevId
, RevsToGet
) of
678 LeafsFound
= get_all_leafs([Tree
], RevPathAcc
),
679 LeafRevsFound
= [LeafRevFound
|| {LeafRevFound
, _
, _
} <- LeafsFound
],
680 RevsToGet2
= RevsToGet
-- LeafRevsFound
,
681 {RestLeafsFound
, RevsRemaining
} = get_rev_leafs(RestTrees
, RevsToGet2
, RevPathAcc
),
682 {LeafsFound
++ RestLeafsFound
, RevsRemaining
};
684 {LeafsFound
, RevsToGet2
} = get_rev_leafs(SubTrees
, RevsToGet
, [RevId
| RevPathAcc
]),
685 {RestLeafsFound
, RevsRemaining
} = get_rev_leafs(RestTrees
, RevsToGet2
, RevPathAcc
),
686 {LeafsFound
++ RestLeafsFound
, RevsRemaining
}
689 get_revs([], RevsToGet
, _RevPathAcc
) ->
691 get_revs([{RevId
, SummaryPtr
, SubTrees
} | RestTrees
], RevsToGet
, RevPathAcc
) ->
692 RevsToGet2
= RevsToGet
-- [RevId
],
694 case RevsToGet2
== RevsToGet
of
696 % not in the rev list.
699 % this node is the rev list. return it
700 [{RevId
, SummaryPtr
, [RevId
| RevPathAcc
]}]
702 {RevsGotten
, RevsRemaining
} = get_revs(SubTrees
, RevsToGet2
, [RevId
| RevPathAcc
]),
703 {RevsGotten2
, RevsRemaining2
} = get_revs(RestTrees
, RevsRemaining
, RevPathAcc
),
704 {CurrentNodeResult
++ RevsGotten
++ RevsGotten2
, RevsRemaining2
}.
707 get_all_leafs([], _RevPathAcc
) ->
709 get_all_leafs([{RevId
, SummaryPtr
, []} | RestTrees
], RevPathAcc
) ->
710 [{RevId
, SummaryPtr
, [RevId
| RevPathAcc
]} | get_all_leafs(RestTrees
, RevPathAcc
)];
711 get_all_leafs([{RevId
, _SummaryPtr
, SubTrees
} | RestTrees
], RevPathAcc
) ->
712 get_all_leafs(SubTrees
, [RevId
| RevPathAcc
]) ++ get_all_leafs(RestTrees
, RevPathAcc
).
714 revision_list_to_trees(Doc
, RevIds
) ->
715 revision_list_to_trees2(Doc
, lists:reverse(RevIds
)).
717 revision_list_to_trees2(Doc
, [RevId
]) ->
719 revision_list_to_trees2(Doc
, [RevId
| Rest
]) ->
720 [{RevId
, type_to_summary_ptr(missing
), revision_list_to_trees2(Doc
, Rest
)}] .
722 winning_revision(Trees
) ->
723 LeafRevs
= get_all_leafs(Trees
, []),
725 lists:sort(fun({RevIdA
, SummaryPointerA
, PathA
}, {RevIdB
, SummaryPointerB
, PathB
}) ->
726 % sort descending by {not deleted, then Depth, then RevisionId}
727 ANotDeleted
= summary_ptr_type(SummaryPointerA
) /= deletion
,
728 BNotDeleted
= summary_ptr_type(SummaryPointerB
) /= deletion
,
729 A
= {ANotDeleted
, length(PathA
), RevIdA
},
730 B
= {BNotDeleted
, length(PathB
), RevIdB
},
735 [{RevId
, SummaryPointer
, _
} | Rest
] = SortedLeafRevs
,
737 {ConflictRevTuples
, DeletedConflictRevTuples
} =
738 lists:splitwith(fun({_ConflictRevId
, SummaryPointer1
, _
}) ->
739 summary_ptr_type(SummaryPointer1
) /= deletion
742 ConflictRevs
= [RevId1
|| {RevId1
, _
, _
} <- ConflictRevTuples
],
743 DeletedConflictRevs
= [RevId2
|| {RevId2
, _
, _
} <- DeletedConflictRevTuples
],
745 {RevId
, SummaryPointer
, ConflictRevs
, DeletedConflictRevs
}.
747 get_conflict_revs([]) ->
749 get_conflict_revs(Trees
) ->
750 {_
, _
, ConflictRevs
, DeletedConflictRevs
} = winning_revision(Trees
),
751 {ConflictRevs
, DeletedConflictRevs
}.
753 % Flushes to disk any outstanding revisions (document records where summary pointers should be)
754 % and replaces the documents with their SummaryPointers in the returned trees.
756 flush_revision_trees(_Db
, []) ->
758 flush_revision_trees(Db
, [{RevId
, #doc
{deleted
=true
}, SubTrees
} | RestTrees
]) ->
759 [{RevId
, type_to_summary_ptr(deletion
), flush_revision_trees(Db
, SubTrees
)} | flush_revision_trees(Db
, RestTrees
)];
760 flush_revision_trees(Db
, [{RevId
, #doc
{}=Doc
, SubTrees
} | RestTrees
]) ->
761 % all bins must be flushed stream pointers with the same Fd as this db
762 Bins
= [{BinName
, {BinType
, BinSp
, BinLen
}} || {BinName
, {BinType
, {_Fd
, BinSp
, BinLen
}}} <- Doc#doc
.attachments
],
763 {ok
, SummaryPointer
} = couch_stream:write_term(Db#db
.summary_stream
, {Doc#doc
.body
, Bins
}),
764 [{RevId
, SummaryPointer
, flush_revision_trees(Db
, SubTrees
)} | flush_revision_trees(Db
, RestTrees
)];
765 flush_revision_trees(Db
, [{RevId
, SummaryPointer
, SubTrees
} | RestTrees
]) ->
766 [{RevId
, SummaryPointer
, flush_revision_trees(Db
, SubTrees
)} | flush_revision_trees(Db
, RestTrees
)].
770 make_doc(Db
, Uuid
, SummaryPointer
, RevisionPath
) ->
771 {BodyData
, BinValues
} =
772 case summary_ptr_type(SummaryPointer
) == disk
of
774 {ok
, {BodyData0
, BinValues0
}} = couch_stream:read_term(Db#db
.summary_stream
, SummaryPointer
),
775 {BodyData0
, [{Name
, {Type
, {Db#db
.fd
, Sp
, Len
}}} || {Name
, {Type
, Sp
, Len
}} <- BinValues0
]};
781 revisions
= RevisionPath
,
783 attachments
= BinValues
,
784 deleted
= (summary_ptr_type(SummaryPointer
) == deletion
)
787 type_to_summary_ptr(missing
) -> 0;
788 type_to_summary_ptr(deletion
) -> 1.
790 summary_ptr_type(0) -> missing
;
791 summary_ptr_type(1) -> deletion
;
792 summary_ptr_type(_Pointer
) -> disk
.
794 write_summaries(Db
, [], InfoBySeqOut
, RemoveSeqOut
, InfoByUuidOut
, DocResultOut
) ->
795 {ok
, lists:reverse(DocResultOut
), lists:reverse(InfoByUuidOut
), lists:reverse(RemoveSeqOut
),
796 lists:reverse(InfoBySeqOut
), Db
};
798 [{Uuid
, {Docs
, Options
}, {DiskUpdateSeq
, _DiskRevision
, DiskSummaryPointer
, DiskRevTrees
}} | Rest
],
799 InfoBySeqOut
, RemoveSeqOut
, InfoByUuidOut
, DocResultOut
) ->
800 NewEdits
= lists:member(new_edits
, Options
),
801 {InputRevTrees
, OutputRevs
} =
802 lists:foldl(fun(#doc
{revisions
=Revisions
}=Doc
, {AccTrees
, AccRevs
}) ->
803 Revisions2
= case NewEdits
of
804 true
-> [couch_util:rand32() | Revisions
]; % add new revision
807 DocRevTree
= revision_list_to_trees(Doc
, Revisions2
),
808 {NewRevTrees
, _ConflictCount
} = merge_rev_trees(AccTrees
, DocRevTree
),
809 {NewRevTrees
, [lists:nth(1, Revisions2
) | AccRevs
]}
810 end, {[], []}, Docs
),
811 {NewRevTrees
, ConflictCount
} = merge_rev_trees(DiskRevTrees
, InputRevTrees
),
814 andalso ConflictCount
> 0
815 andalso (summary_ptr_type(DiskSummaryPointer
) /= deletion
816 orelse FirstDoc#doc
.deleted
== true
) of
818 DocResultOut2
= [[conflict
|| _Doc
<- Docs
] | DocResultOut
],
819 write_summaries(Db
, Rest
, InfoBySeqOut
, RemoveSeqOut
, InfoByUuidOut
, DocResultOut2
);
821 FlushedTrees
= flush_revision_trees(Db
, NewRevTrees
),
822 {WinningRevision
, WinningSummaryPointer
, ConflictRevs
, DeletedConflictRevs
} = winning_revision(FlushedTrees
),
824 OldDiskDocuments
= case summary_ptr_type(DiskSummaryPointer
) == disk
of true
-> 1; false
-> 0 end,
825 NewDiskDocuments
= case summary_ptr_type(WinningSummaryPointer
) == disk
of true
-> 1; false
-> 0 end,
827 NewDocCount
= Db#db
.doc_count
+ NewDiskDocuments
- OldDiskDocuments
,
829 UpdateSeq
= Db#db
.last_update_seq
+ 1,
832 case DiskUpdateSeq
of
834 _
-> [DiskUpdateSeq
| RemoveSeqOut
]
837 InfoBySeqOut2
= [{UpdateSeq
, {Uuid
, WinningRevision
, WinningSummaryPointer
, ConflictRevs
, DeletedConflictRevs
}} | InfoBySeqOut
],
838 InfoByUuidOut2
= [{Uuid
, {UpdateSeq
, WinningRevision
, WinningSummaryPointer
, FlushedTrees
}} | InfoByUuidOut
],
839 % output an ok and the revid for each successful save
840 DocResultOut2
= [[{ok
, OutputRev
} || OutputRev
<- OutputRevs
] | DocResultOut
],
841 Db2
= Db#db
{last_update_seq
= UpdateSeq
, uncommitted_writes
=true
, doc_count
=NewDocCount
},
842 write_summaries(Db2
, Rest
, InfoBySeqOut2
, RemoveSeqOut2
, InfoByUuidOut2
, DocResultOut2
)
845 update_docs_int(Db
, DocsOptionsList
, Options
) ->
847 docinfo_by_uuid_btree
= DocInfoByUuidBTree
,
848 docinfo_by_seq_btree
= DocInfoBySeqBTree
,
849 view_group_btree
= ViewGroupBTree
,
850 local_docs_btree
= LocalDocsBtree
,
851 view_group_mgr
= ViewGroupMgr
854 % seperate out the NonRep documents from the rest of the documents
855 {Uuids
, DocsOptionsList2
, NonRepDocs
} =
856 lists:foldl(fun({[#doc
{uuid
=Uuid
}=Doc
| Rest
], _Options
}=DocOptions
, {UuidsAcc
, DocsOptionsAcc
, NonRepDocsAcc
}) ->
858 ?NON_REP_DOC_PREFIX
++ _
when Rest
==[] ->
859 % when saving NR (non rep) documents, you can only save a single revision
860 {UuidsAcc
, DocsOptionsAcc
, [Doc
| NonRepDocsAcc
]};
862 {[Uuid
| UuidsAcc
], [DocOptions
| DocsOptionsAcc
], NonRepDocsAcc
}
864 end, {[], [], []}, DocsOptionsList
),
866 {ok
, OldDocInfoResults
} = couch_btree:lookup(DocInfoByUuidBTree
, Uuids
),
868 % create a list of {{Docs, UpdateOptions}, RevisionTree} tuples.
870 lists:zipwith3(fun(DocUuid
, DocsOptions
, OldDocInfoLookupResult
) ->
871 case OldDocInfoLookupResult
of
872 {ok
, {_Uuid
, {DiskUpdateSeq
, DiskRevision
, DiskSummaryPointer
, DiskRevTrees
}}} ->
873 {DocUuid
, DocsOptions
, {DiskUpdateSeq
, DiskRevision
, DiskSummaryPointer
, DiskRevTrees
}};
875 {DocUuid
, DocsOptions
, {0, 0, 0, []}}
878 Uuids
, DocsOptionsList2
, OldDocInfoResults
),
880 % now write out the documents
881 {ok
, DocResults
, InfoByUuid
, RemoveSeqList
, InfoBySeqList
, Db2
} =
882 write_summaries(Db
, DocsAndOldDocInfo
, [], [], [], []),
884 % and the indexes to the documents
885 {ok
, DocInfoBySeqBTree2
} = couch_btree:add_remove(DocInfoBySeqBTree
, InfoBySeqList
, RemoveSeqList
),
886 {ok
, DocInfoByUuidBTree2
} = couch_btree:add_remove(DocInfoByUuidBTree
, InfoByUuid
, []),
888 % clear the computed view cache
889 UpdatedUuids
= [UpdatedUuid
|| {UpdatedUuid
, _DocInfo
} <- InfoByUuid
],
890 {ok
, ViewGroupBTree2
} = couch_btree:add_remove(ViewGroupBTree
, [], UpdatedUuids
),
892 % now notify the view group manager to discard any of the view groups it has in memory
894 OldDocInfos
= lists:map(
895 fun({OldUuid
, _Docs
, {OldUpdateSeq
, OldRev
, OldSummaryPointer
, OldRevTrees
}}) ->
896 {ConflictRevs
, DeletedConflictRevs
} = get_conflict_revs(OldRevTrees
),
897 #doc_info
{uuid
=OldUuid
, update_seq
=OldUpdateSeq
, revision
=OldRev
,
898 summary_pointer
=OldSummaryPointer
, conflict_revs
=ConflictRevs
, deleted_conflict_revs
=DeletedConflictRevs
}
902 ok
= couch_view_group:free_groups(ViewGroupMgr
, OldDocInfos
),
905 [{NRUuid
, NRBody
} || #doc
{uuid
=?NON_REP_DOC_PREFIX
++ NRUuid
, body
=NRBody
, deleted
=false
} <- NonRepDocs
],
908 [NRUuid
|| #doc
{uuid
=?NON_REP_DOC_PREFIX
++ NRUuid
, deleted
=true
} <- NonRepDocs
],
910 {ok
, LocalDocsBtree2
} = couch_btree:add_remove(LocalDocsBtree
, NRUuidsSummaries
, NRUuidsDelete
),
912 NRDocResults
= [[{ok
, 0}] || _Doc
<- NonRepDocs
],
915 docinfo_by_uuid_btree
= DocInfoByUuidBTree2
,
916 docinfo_by_seq_btree
= DocInfoBySeqBTree2
,
917 view_group_btree
= ViewGroupBTree2
,
918 local_docs_btree
= LocalDocsBtree2
,
919 uncommitted_writes
= true
921 case lists:member(delay_commit
, Options
) of
925 {ok
, Db4
} = commit_outstanding(Db3
)
927 {ok
, DocResults
++ NRDocResults
, Db4
}.
932 commit_outstanding(#db
{fd
=Fd
, uncommitted_writes
=true
, header
=Header
} = Db
) ->
933 ok
= couch_file:sync(Fd
), % commit outstanding data
934 Header2
= Header#db_header
{
935 last_update_seq
= Db#db
.last_update_seq
,
936 summary_stream_state
= couch_stream:get_state(Db#db
.summary_stream
),
937 docinfo_by_seq_btree_state
= couch_btree:get_state(Db#db
.docinfo_by_seq_btree
),
938 docinfo_by_uuid_btree_state
= couch_btree:get_state(Db#db
.docinfo_by_uuid_btree
),
939 view_group_btree_state
= couch_btree:get_state(Db#db
.view_group_btree
),
940 local_docs_btree_state
= couch_btree:get_state(Db#db
.local_docs_btree
),
941 doc_count
= Db#db
.doc_count
943 {ok
, Header3
} = write_header(Fd
, Header2
),
944 ok
= couch_file:sync(Fd
), % commit header to disk
946 uncommitted_writes
= false
,
950 commit_outstanding(Db
) ->
953 write_header(Fd
, Header
) ->
954 H2
= Header#db_header
{write_version
= Header#db_header
.write_version
+ 1},
955 % The leading bytes in every db file, the sig and the file version:
956 HeaderPrefix
= ?DB_FILE_SIG
,
957 FileVersion
= <<(?FILE_VERSION
):16>>,
958 %the actual header data
959 TermBin
= term_to_binary(H2
),
960 % the size of all the bytes written to the header, including the md5 signature (16 bytes)
961 FilledSize
= size(HeaderPrefix
) + size(FileVersion
) + size(TermBin
) + 16,
962 case FilledSize
> ?HEADER_SIZE
of
965 {error
, error_header_too_large
};
967 % pad out the header with zeros, then take the md5 hash
968 PadZeros
= <<0:(8*(?HEADER_SIZE
- FilledSize
))>>,
969 Sig
= erlang:md5([TermBin
, PadZeros
]),
970 % now we assemble the final header binary and write to disk
971 WriteBin
= <<HeaderPrefix
/binary, FileVersion
/binary, TermBin
/binary, PadZeros
/binary, Sig
/binary>>,
972 ?HEADER_SIZE
= size(WriteBin
), % sanity check
973 DblWriteBin
= [WriteBin
, WriteBin
],
974 ok
= couch_file:pwrite(Fd
, 0, DblWriteBin
),
979 read_header(FilePath
, Fd
) ->
980 {ok
, Bin
} = couch_file:pread(Fd
, 0, 2*(?HEADER_SIZE
)),
981 <<Bin1:(?HEADER_SIZE
)/binary, Bin2:(?HEADER_SIZE
)/binary>> = Bin
,
982 % read the first header
983 case extract_header(Bin1
) of
985 case extract_header(Bin2
) of
987 case Header1
== Header2
of
989 % Everything is completely normal!
992 % To get here we must have two different header versions with signatures intact.
993 % It's weird but possible (a commit failure right at the 2k boundary). Log it.
994 couch_log:info("Header version differences on database open (~s).~nPrimary Header: ~p~nSecondary Header: ~p", [FilePath
, Header1
, Header2
]),
995 case Header1#db_header
.write_version
> Header2#db_header
.write_version
of
996 true
-> {ok
, Header1
};
997 false
-> {ok
, Header2
}
1001 % error reading second header. It's ok, but log it.
1002 couch_log:info("Secondary header corruption on database open (~s)(error: ~p). Using primary header instead.", [FilePath
, Error
]),
1006 % error reading primary header
1007 case extract_header(Bin2
) of
1009 % log corrupt primary header. It's ok since the secondary is still good.
1010 couch_log:info("Primary header corruption on database open (~s)(error: ~p). Using secondary header instead.", [FilePath
, Error
]),
1013 % error reading secondary header too
1014 % return the error, no need to log anything as the caller will be responsible for dealing with the error.
1020 extract_header(Bin
) ->
1021 SizeOfPrefix
= size(?DB_FILE_SIG
),
1022 SizeOfTermBin
= ?HEADER_SIZE
-
1027 <<HeaderPrefix:SizeOfPrefix
/binary, FileVersion:16, TermBin:SizeOfTermBin
/binary, Sig:16/binary>> = Bin
,
1029 % check the header prefix
1030 case HeaderPrefix
of
1032 % check the file version
1035 % check the integrity signature
1036 case erlang:md5(TermBin
) == Sig
of
1038 Header
= binary_to_term(TermBin
),
1039 #db_header
{} = Header
, % make sure we decoded to the right record type
1042 {error
, header_corrupt
}
1045 {error
, {incompatible_file_version
, FileVersion
}}
1048 {error
, unknown_file_type
}
1052 code_change(_OldVsn
, State
, _Extra
) ->
1055 handle_info(_Info
, State
) ->