2 %% Copyright (C) 2006 Damien Katz
4 %% This program is free software; you can redistribute it and/or
5 %% modify it under the terms of the GNU General Public License
6 %% as published by the Free Software Foundation; either version 2
7 %% of the License, or (at your option) any later version.
9 %% This program is distributed in the hope that it will be useful,
10 %% but WITHOUT ANY WARRANTY; without even the implied warranty of
11 %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 %% GNU General Public License for more details.
14 %% You should have received a copy of the GNU General Public License
15 %% along with this program; if not, write to the Free Software
16 %% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
19 -behaviour(gen_server
).
21 -export([test
/0, test
/1]).
22 -export([open
/1,create
/1,create
/2,save_doc
/2,save_doc
/3,get_doc_info
/2, save_doc_revs
/3]).
23 -export([delete_doc
/3,open_doc
/2,open_doc
/3,close
/1,enum_docs_since
/4,enum_docs_since
/5]).
24 -export([enum_docs
/4,enum_docs
/5, open_doc_revs
/4, get_missing_revs
/2]).
25 -export([update_table_group_sync
/2,update_table_group
/2,fold_table
/6,fold_table
/7,get_info
/1]).
26 -export([update_loop
/2]).
27 -export([init
/1,terminate
/2,handle_call
/3,handle_cast
/2,code_change
/3,handle_info
/2]).
28 -export([revision_list_to_trees
/2 , merge_rev_trees
/2 ]).
30 -include("couch_db.hrl").
32 -define(HEADER_SIZE
, 2048). % size of each segment of the doubly written header
38 summary_stream_state
= nil
,
39 docinfo_by_uuid_btree_state
= nil
,
40 docinfo_by_seq_btree_state
= nil
,
41 table_group_btree_state
= nil
,
42 local_docs_btree_state
= nil
,
49 header
= #db_header
{},
50 uncommitted_writes
= false
,
52 docinfo_by_uuid_btree
,
68 start_link(Filepath
, Options
) ->
69 {ok
, Super
} = couch_db_sup:start_link(),
70 FdResult
= supervisor:start_child(Super
,
72 {couch_file
, open
, [Filepath
, Options
]},
79 {ok
, _Db
} = supervisor:start_child(Super
,
81 %%%%%% below calls gen_server:start_link(couch_db, {Fd, Super, Options}, []).
82 {gen_server
, start_link
, [couch_db
, {Filepath
, Fd
, Super
, Options
}, []]},
88 {error
, {enoent
, _ChildInfo
}} ->
92 {error
, {Error
, _ChildInfo
}} ->
100 %%% Interface functions %%%
103 create(Filename
, []).
105 create(Filename
, Options
) when is_list(Options
) ->
106 start_link(Filename
, [create
| Options
]).
109 start_link(Filename
, []).
111 delete_doc(SupPid
, Uuid
, Revisions
) ->
112 case open_doc_revs(SupPid
, Uuid
, Revisions
, []) of
114 DeletedDocs
= [Doc#doc
{deleted
=true
} || {ok
, #doc
{deleted
=false
}=Doc
} <- DocResults
],
115 save_doc_revs(SupPid
, DeletedDocs
, [new_edits
]);
124 open_doc(_SupPid
, #doc_info
{deleted
=true
}) ->
125 {not_found
, deleted
};
126 open_doc(SupPid
, UuidOrDocInfo
) ->
127 open_doc(SupPid
, UuidOrDocInfo
, []).
129 open_doc(SupPid
, Uuid
, Options
) ->
130 open_doc_int(get_db(db_pid(SupPid
)), Uuid
, Options
).
132 open_doc_revs(SupPid
, Uuid
, Revs
, Options
) ->
133 open_doc_revs_int(get_db(db_pid(SupPid
)), Uuid
, Revs
, Options
).
135 get_missing_revs(SupPid
, UuidRevsList
) ->
136 Uuids
= [Uuid1
|| {Uuid1
, _Revs
} <- UuidRevsList
],
137 {ok
, FullDocInfoResults
} = get_full_doc_infos(SupPid
, Uuids
),
139 lists:zipwith( fun({Uuid
, Revs
}, FullDocInfoResult
) ->
140 case FullDocInfoResult
of
141 {ok
, {_
, RevisionTrees
}} ->
142 {Uuid
, find_missing_revisions(Revs
, RevisionTrees
)};
146 end, UuidRevsList
, FullDocInfoResults
),
149 get_doc_info(Db
, Uuid
) ->
150 case get_full_doc_info(Db
, Uuid
) of
151 {ok
, {DocInfo
, _RevisionTrees
}} ->
157 get_full_doc_info(Db
, Uuid
) ->
158 case get_full_doc_infos(Db
, [Uuid
]) of
159 {ok
, [{ok
, DocInfo
}]} ->
161 {ok
, [{not_found
, Uuid
}]} ->
165 get_full_doc_infos(SupPid
, Uuids
) when is_pid(SupPid
) ->
166 get_full_doc_infos(get_db(db_pid(SupPid
)), Uuids
);
167 get_full_doc_infos(#db
{}=Db
, Uuids
) ->
168 {ok
, LookupResults
} = couch_btree:lookup(Db#db
.docinfo_by_uuid_btree
, Uuids
),
171 fun({ok
, {Uuid
, {UpdateSeq
, Rev
, SummaryPointer
, RevisionTrees
}}}) ->
172 {Conflicts
, DeletedConflicts
} = get_conflict_revs(RevisionTrees
),
179 update_seq
=UpdateSeq
,
180 summary_pointer
=SummaryPointer
,
181 deleted
=(summary_ptr_type(SummaryPointer
)==deletion
),
182 conflict_revs
=Conflicts
,
183 deleted_conflict_revs
=DeletedConflicts
188 ({not_found
, {Uuid
, _
}}) ->
195 gen_server:call(db_pid(SupPid
), get_info
).
197 save_doc(SupPid
, Doc
) ->
198 save_doc(SupPid
, Doc
, []).
200 save_doc(SupPid
, Doc
, Options
) ->
201 {ok
, [Result
]} = save_doc_revs(SupPid
, [Doc
], [new_edits
| Options
]),
209 save_doc_revs(SupPid
, Docs
, Options
) ->
210 {ok
, [RetValue
]} = gen_server:call(db_pid(SupPid
), {update_docs
, [{Docs
, Options
}], []}),
213 enum_docs_since(SupPid
, SinceSeq
, Direction
, InFun
, Ctx
) ->
214 Db
= get_db(db_pid(SupPid
)),
215 EnumFun
= fun({UpdateSeq
, {Uuid
, Rev
, SummaryPointer
, ConflictRevs
, DeletedConflictRevs
}}, EnumCtx
) ->
219 update_seq
= UpdateSeq
,
220 summary_pointer
= SummaryPointer
,
221 conflict_revs
= ConflictRevs
,
222 deleted_conflict_revs
= DeletedConflictRevs
,
223 deleted
= (summary_ptr_type(SummaryPointer
) == deletion
)
225 InFun(DocInfo
, EnumCtx
)
228 couch_btree:fold(Db#db
.docinfo_by_seq_btree
, SinceSeq
+ 1, Direction
, EnumFun
, Ctx
).
230 enum_docs_since(SupPid
, SinceSeq
, InFun
, Ctx
) ->
231 enum_docs_since(SupPid
, SinceSeq
, fwd
, InFun
, Ctx
).
233 enum_docs(SupPid
, StartUuid
, Direction
, InFun
, Ctx
) ->
234 Db
= get_db(db_pid(SupPid
)),
235 EnumFun
= fun({Uuid
, {UpdateSeq
, Rev
, SummaryPointer
, RevTrees
}}, EnumCtx
) ->
236 {ConflictRevs
, DeletedConflictRevs
} = get_conflict_revs(RevTrees
),
240 update_seq
= UpdateSeq
,
241 summary_pointer
= SummaryPointer
,
242 deleted
= (summary_ptr_type(SummaryPointer
) == deletion
),
243 conflict_revs
= ConflictRevs
,
244 deleted_conflict_revs
= DeletedConflictRevs
246 InFun(DocInfo
, EnumCtx
)
248 couch_btree:fold(Db#db
.docinfo_by_uuid_btree
, StartUuid
, Direction
, EnumFun
, Ctx
).
250 enum_docs(SupPid
, StartUuid
, InFun
, Ctx
) ->
251 enum_docs(SupPid
, StartUuid
, fwd
, InFun
, Ctx
).
254 update_table_group(SupPid
, TableGroupDocUuid
) ->
255 gen_server:call(db_pid(SupPid
), {update_table_group
, TableGroupDocUuid
, fun(_Whatever
) -> ok
end}).
258 sync_update_notify(Pid
, Ref
, partial
) ->
259 % We want to wait until complete
260 % so return a fun that calls ourself
261 fun(Status
)-> sync_update_notify(Pid
, Ref
, Status
) end;
262 sync_update_notify(Pid
, Ref
, complete
) ->
264 sync_update_notify(Pid
, Ref
, Else
) ->
268 update_table_group_sync(SupPid
, TableGroupDocUuid
) ->
271 UpdateFun
= fun(Status
)-> sync_update_notify(Pid
, Ref
, Status
) end,
272 case gen_server:call(db_pid(SupPid
), {update_table_group
, TableGroupDocUuid
, UpdateFun
}, infinity
) of
282 fold_table(SupPid
, TableGroupDocUuid
, TableName
, Dir
, Fun
, Acc
) ->
283 case gen_server:call(db_pid(SupPid
), {get_table_group
, TableGroupDocUuid
}) of
285 couch_table_group:fold(TableGroup
, TableName
, Dir
, Fun
, Acc
);
290 fold_table(SupPid
, TableGroupDocUuid
, TableName
, StartKey
, Dir
, Fun
, Acc
) ->
291 case gen_server:call(db_pid(SupPid
), {get_table_group
, TableGroupDocUuid
}) of
293 couch_table_group:fold(TableGroup
, TableName
, StartKey
, Dir
, Fun
, Acc
);
299 Ref
= erlang:monitor(process, SupPid
),
301 exit(SupPid
, normal
),
303 {'DOWN', Ref
, process, SupPid
, _Reason
} ->
311 init({Filepath
, Fd
, Supervisor
, Options
}) ->
312 case lists:member(create
, Options
) of
314 init_main(Filepath
, Fd
, Supervisor
, nil
);
316 {ok
, Header
} = read_header(Fd
),
317 init_main(Filepath
, Fd
, Supervisor
, Header
)
321 init_main(Filepath
, Fd
, Supervisor
, nil
) ->
322 % creates a new header and writes it to the file
323 {ok
, _
} = couch_file:expand(Fd
, 2*(?HEADER_SIZE
)),
324 Header
= #db_header
{},
325 {ok
, Header2
} = write_header(Fd
, Header
),
326 ok
= couch_file:sync(Fd
),
327 init_main(Filepath
, Fd
, Supervisor
, Header2
);
328 init_main(Filepath
, Fd
, Supervisor
, Header
) ->
329 {ok
, SummaryStream
} = couch_stream:open(Header#db_header
.summary_stream_state
, Fd
),
330 {ok
, UuidBtree
} = couch_btree:open(Header#db_header
.docinfo_by_uuid_btree_state
, Fd
),
331 {ok
, SeqBtree
} = couch_btree:open(Header#db_header
.docinfo_by_seq_btree_state
, Fd
),
332 {ok
, LocalDocsBtree
} = couch_btree:open(Header#db_header
.local_docs_btree_state
, Fd
),
333 {ok
, TableGroupBtree
} = couch_btree:open(Header#db_header
.table_group_btree_state
, Fd
),
337 supervisor
=Supervisor
,
339 summary_stream
= SummaryStream
,
340 docinfo_by_uuid_btree
= UuidBtree
,
341 docinfo_by_seq_btree
= SeqBtree
,
342 local_docs_btree
= LocalDocsBtree
,
343 last_update_seq
= Header#db_header
.last_update_seq
,
344 table_group_btree
= TableGroupBtree
,
345 doc_count
= Header#db_header
.doc_count
,
349 UpdatePid
= spawn_link(couch_db
, update_loop
, [self(), Db
]),
353 GetTableGroupInfoFun
= fun(GroupKey
) ->
354 get_table_group_info(get_db(Pid
), GroupKey
)
357 UpdateTableGroupInfoFun
= fun(GroupKey
, UpdateStatus
, GroupInfo
) ->
358 UpdatePid
! {table_group_updated
, GroupKey
, UpdateStatus
, GroupInfo
},
361 {ok
, TableMgr
} = couch_table_group:start_manager(Supervisor
, Fd
, GetTableGroupInfoFun
, UpdateTableGroupInfoFun
),
363 UpdatePid
! {set_table_group_mgr
, TableMgr
},
365 {ok
, #main
{db
=Db
, update_pid
=UpdatePid
, table_group_mgr
=TableMgr
}}.
367 terminate(_Reason
, #main
{db
=Db
} = Main
) ->
368 Main#main
.update_pid
! close
,
369 couch_table_group:stop(Main#main
.table_group_mgr
),
370 couch_file:close(Db#db
.fd
).
372 handle_call({get_table_group
, TableGroupDocUuid
}, From
, #main
{db
=Db
}=Main
) ->
373 case get_doc_info(Db
, TableGroupDocUuid
) of
374 {ok
, #doc_info
{deleted
=true
}} ->
375 {reply
, {not_found
, deleted
}, Main
};
377 ok
= couch_table_group:get_group_async(Main#main
.table_group_mgr
, DocInfo
, From
),
380 {reply
, {not_found
, missing
}, Main
}
382 handle_call({update_docs
, DocActions
, Options
}, From
, Main
) ->
383 Main#main
.update_pid
! {From
, update_docs
, DocActions
, Options
},
385 handle_call(get_db
, _From
, #main
{db
=Db
}=Main
) ->
386 {reply
, {ok
, Db
}, Main
};
387 handle_call(get_info
, _From
, #main
{db
=Db
}=Main
) ->
389 {doc_count
, Db#db
.doc_count
},
390 {last_update_seq
, Db#db
.last_update_seq
}
392 {reply
, {ok
, InfoList
}, Main
};
393 handle_call({update_table_group
, Uuid
, UpdateNotifFun
}, _From
, #main
{db
=Db
}=Main
) ->
394 case get_doc_info(Db
, Uuid
) of
396 ok
= couch_table_group:update_group(Main#main
.table_group_mgr
, DocInfo
, UpdateNotifFun
),
403 handle_cast({db_updated
, NewDb
}, Main
) ->
404 {noreply
, Main#main
{db
=NewDb
}}.
406 %%% Internal function %%%
410 {error
, {already_started
, DbPid
}} = supervisor:start_child(SupPid
,
412 {couch_db
, sup_start_link
, []},
420 update_loop(MainPid
, Db
) ->
422 {set_table_group_mgr
, TableMgr
} ->
423 update_loop(MainPid
, Db#db
{table_group_mgr
=TableMgr
});
424 {OrigFrom
, update_docs
, DocActions
, Options
} ->
425 {ok
, DocResults
, Db2
} = update_docs_int(Db
, DocActions
, Options
),
426 ok
= gen_server:cast(MainPid
, {db_updated
, Db2
}),
427 gen_server:reply(OrigFrom
, {ok
, DocResults
}),
428 update_loop(MainPid
, Db2
);
429 {table_group_updated
, #doc_info
{uuid
=Uuid
}=GroupDocInfo
, _UpdateStatus
, TableGroupInfo
} ->
430 case get_doc_info(Db
, GroupDocInfo#doc_info
.uuid
) of
431 {ok
, GroupDocInfo
} ->
432 % revision on disk matches the revision of the table group being updated
433 % so we save the info to disk
434 {ok
, GroupBtree2
} = couch_btree:add_remove(Db#db
.table_group_btree
, [{Uuid
, TableGroupInfo
}], []),
435 Db2
= Db#db
{table_group_btree
=GroupBtree2
, uncommitted_writes
=true
},
436 {ok
, Db3
} = commit_outstanding(Db2
),
437 ok
= gen_server:cast(MainPid
, {db_updated
, Db3
}),
438 update_loop(MainPid
, Db3
);
440 % doesn't match, don't save in btree
441 update_loop(MainPid
, Db
)
448 get_table_group_info(#db
{}=Db
, #doc_info
{uuid
=Uuid
}=DocInfo
) ->
449 case couch_btree:lookup_single(Db#db
.table_group_btree
, Uuid
) of
450 {ok
, {TableQueries
, TableGroupState
}} ->
451 {ok
, {TableQueries
, TableGroupState
}};
453 {ok
, Doc
} = open_doc_int(Db
, DocInfo
, []),
454 case couch_doc:get_table_queries(Doc
) of
456 {not_found
, no_tables_found
};
458 {ok
, {TableQueries
, nil
}}
463 {ok
, Db
} = gen_server:call(DbPid
, get_db
),
466 open_doc_revs_int(Db
, Uuid
, Revs
, Options
) ->
467 case get_full_doc_info(Db
, Uuid
) of
468 {ok
, {_DocInfo
, RevisionTree
}} ->
469 {FoundRevs
, MissingRevs
} =
472 {get_all_leafs(RevisionTree
, []), []};
474 case lists:member(latest
, Options
) of
476 get_rev_leafs(RevisionTree
, Revs
, []);
478 get_revs(RevisionTree
, Revs
, [])
482 lists:map(fun({Rev
, FoundSummaryPtr
, FoundRevPath
}) ->
483 case summary_ptr_type(FoundSummaryPtr
) of
485 % we have the rev in our but know nothing about it
486 {{not_found
, missing
}, Rev
};
488 {ok
, make_doc(Db
, Uuid
, FoundSummaryPtr
, FoundRevPath
)};
490 {ok
, make_doc(Db
, Uuid
, FoundSummaryPtr
, FoundRevPath
)}
493 Results
= FoundResults
++ [{{not_found
, missing
}, MissingRev
} || MissingRev
<- MissingRevs
],
495 not_found
when Revs
== all
->
498 {ok
, [{{not_found
, missing
}, Rev
} || Rev
<- Revs
]}
501 open_doc_int(Db
, ?NON_REP_DOC_PREFIX
++ Uuid
, _Options
) ->
502 case couch_btree:lookup_single(Db#db
.local_docs_btree
, Uuid
) of
503 {ok
, SummaryFields
} ->
504 {ok
, #doc
{uuid
=?NON_REP_DOC_PREFIX
++ Uuid
, summary_fields
=dict:from_list(SummaryFields
)}};
508 open_doc_int(Db
, #doc_info
{revision
=Rev
,summary_pointer
=SummaryPointer
}=DocInfo
, Options
) ->
509 open_doc_int(Db
, {DocInfo
, [{Rev
, SummaryPointer
, []}]}, Options
);
510 open_doc_int(Db
, {#doc_info
{uuid
=Uuid
,revision
=Rev
,summary_pointer
=SummaryPointer
,deleted
=Deleted
}, RevisionTree
}, Options
) ->
511 case (not Deleted
) orelse
lists:member(allow_stub
, Options
) of
513 {[{_
,_
, RevPath
}], []} = get_revs(RevisionTree
, [Rev
], []),
514 {ok
, make_doc(Db
, Uuid
, SummaryPointer
, RevPath
)};
518 open_doc_int(Db
, Uuid
, Options
) ->
519 case get_full_doc_info(Db
, Uuid
) of
520 {ok
, {DocInfo
, RevisionTree
}} ->
521 open_doc_int(Db
, {DocInfo
, RevisionTree
}, Options
);
526 % revision tree functions
528 merge_rev_trees([], B
) ->
530 merge_rev_trees(A
, []) ->
532 merge_rev_trees([ATree
| ANextTree
], [BTree
| BNextTree
]) ->
533 {ARev
, ADoc
, ASubTrees
} = ATree
,
534 {BRev
, _BDoc
, BSubTrees
} = BTree
,
538 {MergedSubTrees
, SubTreesConflicts
} = merge_rev_trees(ASubTrees
, BSubTrees
),
539 {MergedNextTrees
, NextConflicts
} = merge_rev_trees(ANextTree
, BNextTree
),
540 {[{ARev
, ADoc
, MergedSubTrees
} | MergedNextTrees
], SubTreesConflicts
+ NextConflicts
};
542 {Merged
, Conflicts
} = merge_rev_trees(ANextTree
, [BTree
| BNextTree
]),
543 {[ATree
| Merged
], Conflicts
};
545 {Merged
, Conflicts
} = merge_rev_trees([ATree
| ANextTree
], BNextTree
),
546 {[BTree
| Merged
], Conflicts
+ 1}
549 find_missing_revisions([], _Trees
) ->
551 find_missing_revisions(SrcRevs
, []) ->
553 find_missing_revisions(SrcRevs
, [{RevId
, _
, SubTrees
} | RestTrees
]) ->
554 SrcRevs2
= lists:delete(RevId
, SrcRevs
),
555 SrcRevs3
= find_missing_revisions(SrcRevs2
, SubTrees
),
556 find_missing_revisions(SrcRevs3
, RestTrees
).
558 get_rev_leafs(_Trees
, [], _RevPathAcc
) ->
560 get_rev_leafs([], RevsToGet
, _RevPathAcc
) ->
562 get_rev_leafs([{RevId
, _SummaryPtr
, SubTrees
}=Tree
| RestTrees
], RevsToGet
, RevPathAcc
) ->
563 case lists:member(RevId
, RevsToGet
) of
565 LeafsFound
= get_all_leafs([Tree
], RevPathAcc
),
566 LeafRevsFound
= [LeafRevFound
|| {LeafRevFound
, _
, _
} <- LeafsFound
],
567 RevsToGet2
= RevsToGet
-- LeafRevsFound
,
568 {RestLeafsFound
, RevsRemaining
} = get_revs(RestTrees
, RevsToGet2
, RevPathAcc
),
569 {LeafsFound
++ RestLeafsFound
, RevsRemaining
};
571 {LeafsFound
, RevsToGet2
} = get_revs(SubTrees
, RevsToGet
, [RevId
| RevPathAcc
]),
572 {RestLeafsFound
, RevsRemaining
} = get_revs(RestTrees
, RevsToGet2
, RevPathAcc
),
573 {LeafsFound
++ RestLeafsFound
, RevsRemaining
}
576 get_revs([], RevsToGet
, _RevPathAcc
) ->
578 get_revs([{RevId
, SummaryPtr
, SubTrees
} | RestTrees
], RevsToGet
, RevPathAcc
) ->
579 RevsToGet2
= RevsToGet
-- [RevId
],
581 case RevsToGet2
== RevsToGet
of
583 % not in the rev list.
586 % this node is the rev list. return it
587 [{RevId
, SummaryPtr
, [RevId
| RevPathAcc
]}]
589 {RevsGotten
, RevsRemaining
} = get_revs(SubTrees
, RevsToGet2
, [RevId
| RevPathAcc
]),
590 {RevsGotten2
, RevsRemaining2
} = get_revs(RestTrees
, RevsRemaining
, RevPathAcc
),
591 {CurrentNodeResult
++ RevsGotten
++ RevsGotten2
, RevsRemaining2
}.
594 get_all_leafs([], _RevPathAcc
) ->
596 get_all_leafs([{RevId
, SummaryPtr
, []} | RestTrees
], RevPathAcc
) ->
597 [{RevId
, SummaryPtr
, [RevId
| RevPathAcc
]} | get_all_leafs(RestTrees
, RevPathAcc
)];
598 get_all_leafs([{RevId
, _SummaryPtr
, SubTrees
} | RestTrees
], RevPathAcc
) ->
599 get_all_leafs(SubTrees
, [RevId
| RevPathAcc
]) ++ get_all_leafs(RestTrees
, RevPathAcc
).
601 revision_list_to_trees(Doc
, RevIds
) ->
602 revision_list_to_trees2(Doc
, lists:reverse(RevIds
)).
604 revision_list_to_trees2(Doc
, [RevId
]) ->
606 revision_list_to_trees2(Doc
, [RevId
| Rest
]) ->
607 [{RevId
, type_to_summary_ptr(missing
), revision_list_to_trees2(Doc
, Rest
)}] .
609 winning_revision(Trees
) ->
610 LeafRevs
= get_all_leafs(Trees
, []),
612 lists:sort(fun({RevIdA
, SummaryPointerA
, PathA
}, {RevIdB
, SummaryPointerB
, PathB
}) ->
613 % sort descending by {not deleted, then Depth, then RevisionId}
614 ANotDeleted
= summary_ptr_type(SummaryPointerA
) /= deletion
,
615 BNotDeleted
= summary_ptr_type(SummaryPointerB
) /= deletion
,
616 A
= {ANotDeleted
, length(PathA
), RevIdA
},
617 B
= {BNotDeleted
, length(PathB
), RevIdB
},
622 [{RevId
, SummaryPointer
, _
} | Rest
] = SortedLeafRevs
,
624 {ConflictRevTuples
, DeletedConflictRevTuples
} =
625 lists:splitwith(fun({_ConflictRevId
, SummaryPointer1
, _
}) ->
626 summary_ptr_type(SummaryPointer1
) /= deletion
629 ConflictRevs
= [RevId1
|| {RevId1
, _
, _
} <- ConflictRevTuples
],
630 DeletedConflictRevs
= [RevId2
|| {RevId2
, _
, _
} <- DeletedConflictRevTuples
],
632 {RevId
, SummaryPointer
, ConflictRevs
, DeletedConflictRevs
}.
634 get_conflict_revs([]) ->
636 get_conflict_revs(Trees
) ->
637 {_
, _
, ConflictRevs
, DeletedConflictRevs
} = winning_revision(Trees
),
638 {ConflictRevs
, DeletedConflictRevs
}.
640 % Flushes to disk any outstanding revisions (document records where summary pointers should be)
641 % and replaces the documents with their SummaryPointers in the returned trees.
643 flush_revision_trees(_Db
, []) ->
645 flush_revision_trees(Db
, [{RevId
, #doc
{deleted
=true
}, SubTrees
} | RestTrees
]) ->
646 [{RevId
, type_to_summary_ptr(deletion
), flush_revision_trees(Db
, SubTrees
)} | flush_revision_trees(Db
, RestTrees
)];
647 flush_revision_trees(Db
, [{RevId
, #doc
{summary_fields
=Fields
}, SubTrees
} | RestTrees
]) ->
648 {ok
, SummaryPointer
} = couch_stream:write_term(Db#db
.summary_stream
, dict:to_list(Fields
)),
649 [{RevId
, SummaryPointer
, flush_revision_trees(Db
, SubTrees
)} | flush_revision_trees(Db
, RestTrees
)];
650 flush_revision_trees(Db
, [{RevId
, SummaryPointer
, SubTrees
} | RestTrees
]) ->
651 [{RevId
, SummaryPointer
, flush_revision_trees(Db
, SubTrees
)} | flush_revision_trees(Db
, RestTrees
)].
655 make_doc(Db
, Uuid
, SummaryPointer
, RevisionPath
) ->
657 case summary_ptr_type(SummaryPointer
) == disk
of
659 {ok
, SummaryFields2
} = couch_stream:read_term(Db#db
.summary_stream
, SummaryPointer
),
666 revisions
= RevisionPath
,
667 summary_fields
= dict:from_list(SummaryFields
),
668 deleted
= (summary_ptr_type(SummaryPointer
) == deletion
)
673 type_to_summary_ptr(missing
) ->
675 type_to_summary_ptr(deletion
) ->
678 summary_ptr_type(0) ->
680 summary_ptr_type(1) ->
682 summary_ptr_type(_Pointer
) ->
685 write_summaries(Db
, [], InfoBySeqOut
, RemoveSeqOut
, InfoByUuidOut
, DocResultOut
) ->
686 {ok
, lists:reverse(DocResultOut
), lists:reverse(InfoByUuidOut
), lists:reverse(RemoveSeqOut
),
687 lists:reverse(InfoBySeqOut
), Db
};
688 write_summaries(Db
, [{Uuid
, {Docs
, Options
}, {DiskUpdateSeq
, _DiskRevision
, DiskSummaryPointer
, DiskRevTrees
}} | Rest
],
689 InfoBySeqOut
, RemoveSeqOut
, InfoByUuidOut
, DocResultOut
) ->
690 NewEdits
= lists:member(new_edits
, Options
),
691 {InputRevTrees
, OutputRevs
} =
692 lists:foldl(fun(#doc
{revisions
=Revisions
}=Doc
, {AccTrees
, AccRevs
}) ->
693 Revisions2
= case NewEdits
of
694 true
-> [couch_util:rand32() | Revisions
]; % add new revision
697 DocRevTree
= revision_list_to_trees(Doc
, Revisions2
),
698 {NewRevTrees
, _ConflictCount
} = merge_rev_trees(AccTrees
, DocRevTree
),
699 {NewRevTrees
, [lists:nth(1, Revisions2
) | AccRevs
]}
700 end, {[], []}, Docs
),
702 {NewRevTrees
, ConflictCount
} = merge_rev_trees(DiskRevTrees
, InputRevTrees
),
704 case NewEdits andalso ConflictCount
> 0 of
707 DocResultOut2
= [{conflict
, Rev
} || #doc
{revisions
=[Rev
|_
]} <- Docs
],
708 write_summaries(Db
, Rest
, InfoBySeqOut
, RemoveSeqOut
, InfoByUuidOut
, DocResultOut2
);
710 FlushedTrees
= flush_revision_trees(Db
, NewRevTrees
),
711 {WinningRevision
, WinningSummaryPointer
, ConflictRevs
, DeletedConflictRevs
} = winning_revision(FlushedTrees
),
713 OldDiskDocuments
= case summary_ptr_type(DiskSummaryPointer
) == disk
of true
-> 1; false
-> 0 end,
714 NewDiskDocuments
= case summary_ptr_type(WinningSummaryPointer
) == disk
of true
-> 1; false
-> 0 end,
716 NewDocCount
= Db#db
.doc_count
+ NewDiskDocuments
- OldDiskDocuments
,
718 UpdateSeq
= Db#db
.last_update_seq
+ 1,
721 case DiskUpdateSeq
of
723 _
-> [DiskUpdateSeq
| RemoveSeqOut
]
726 InfoBySeqOut2
= [{UpdateSeq
, {Uuid
, WinningRevision
, WinningSummaryPointer
, ConflictRevs
, DeletedConflictRevs
}} | InfoBySeqOut
],
727 InfoByUuidOut2
= [{Uuid
, {UpdateSeq
, WinningRevision
, WinningSummaryPointer
, FlushedTrees
}} | InfoByUuidOut
],
728 % output an ok and the revid for each successful save
729 DocResultOut2
= [[{ok
, OutputRev
} || OutputRev
<- OutputRevs
] | DocResultOut
],
730 Db2
= Db#db
{last_update_seq
= UpdateSeq
, uncommitted_writes
=true
, doc_count
=NewDocCount
},
731 write_summaries(Db2
, Rest
, InfoBySeqOut2
, RemoveSeqOut2
, InfoByUuidOut2
, DocResultOut2
)
734 update_docs_int(Db
, DocsOptionsList
, Options
) ->
736 docinfo_by_uuid_btree
= DocInfoByUuidBTree
,
737 docinfo_by_seq_btree
= DocInfoBySeqBTree
,
738 table_group_btree
= TableGroupBTree
,
739 local_docs_btree
= LocalDocsBtree
,
740 table_group_mgr
= TableGroupMgr
743 % seperate out the NonRep documents from the rest of the documents
744 {Uuids
, DocsOptionsList2
, NonRepDocs
} =
745 lists:foldl(fun({[#doc
{uuid
=Uuid
}=Doc
| Rest
], _Options
}=DocOptions
, {UuidsAcc
, DocsOptionsAcc
, NonRepDocsAcc
}) ->
747 ?NON_REP_DOC_PREFIX
++ _
when Rest
==[] ->
748 {UuidsAcc
, DocsOptionsAcc
, [Doc
| NonRepDocsAcc
]};
750 {[Uuid
| UuidsAcc
], [DocOptions
| DocsOptionsAcc
], NonRepDocsAcc
}
752 end, {[], [], []}, DocsOptionsList
),
754 {ok
, OldDocInfoResults
} = couch_btree:lookup(DocInfoByUuidBTree
, Uuids
),
756 % create a list of {{Docs, UpdateOptions}, RevisionTree} tuples.
758 lists:zipwith3(fun(DocUuid
, DocsOptions
, OldDocInfoLookupResult
) ->
759 case OldDocInfoLookupResult
of
760 {ok
, {_Uuid
, {DiskUpdateSeq
, DiskRevision
, DiskSummaryPointer
, DiskRevTrees
}}} ->
761 {DocUuid
, DocsOptions
, {DiskUpdateSeq
, DiskRevision
, DiskSummaryPointer
, DiskRevTrees
}};
763 {DocUuid
, DocsOptions
, {0, 0, 0, []}}
766 Uuids
, DocsOptionsList2
, OldDocInfoResults
),
768 % now write out the documents
769 {ok
, DocResults
, InfoByUuid
, RemoveSeqList
, InfoBySeqList
, Db2
} =
770 write_summaries(Db
, DocsAndOldDocInfo
, [], [], [], []),
772 % and the indexes to the documents
773 {ok
, DocInfoBySeqBTree2
} = couch_btree:add_remove(DocInfoBySeqBTree
, InfoBySeqList
, RemoveSeqList
),
774 {ok
, DocInfoByUuidBTree2
} = couch_btree:add_remove(DocInfoByUuidBTree
, InfoByUuid
, []),
776 % clear the computed table cache
777 UpdatedUuids
= [UpdatedUuid
|| {UpdatedUuid
, _DocInfo
} <- InfoByUuid
],
778 {ok
, TableGroupBTree2
} = couch_btree:add_remove(TableGroupBTree
, [], UpdatedUuids
),
780 % now notify the table group manager to discard any of the table groups it has in memory
783 lists:map(fun({OldUuid
, _Docs
, {OldUpdateSeq
, OldRev
, OldSummaryPointer
, OldRevTrees
}}) ->
784 {ConflictRevs
, DeletedConflictRevs
} = get_conflict_revs(OldRevTrees
),
785 #doc_info
{uuid
=OldUuid
, update_seq
=OldUpdateSeq
, revision
=OldRev
,
786 summary_pointer
=OldSummaryPointer
, conflict_revs
=ConflictRevs
, deleted_conflict_revs
=DeletedConflictRevs
}
787 end, DocsAndOldDocInfo
),
789 ok
= couch_table_group:free_groups(TableGroupMgr
, OldDocInfos
),
792 [{NRUuid
, dict:to_list(NRFields
)} || #doc
{uuid
=?NON_REP_DOC_PREFIX
++ NRUuid
, summary_fields
=NRFields
, deleted
=false
} <- NonRepDocs
],
795 [NRUuid
|| #doc
{uuid
=?NON_REP_DOC_PREFIX
++ NRUuid
, deleted
=true
} <- NonRepDocs
],
797 {ok
, LocalDocsBtree2
} = couch_btree:add_remove(LocalDocsBtree
, NRUuidsSummaries
, NRUuidsDelete
),
799 NRDocResults
= [[{ok
, 0}] || _Doc
<- NonRepDocs
],
802 docinfo_by_uuid_btree
= DocInfoByUuidBTree2
,
803 docinfo_by_seq_btree
= DocInfoBySeqBTree2
,
804 table_group_btree
= TableGroupBTree2
,
805 local_docs_btree
= LocalDocsBtree2
,
806 uncommitted_writes
= true
808 case lists:member(delay_commit
, Options
) of
812 {ok
, Db4
} = commit_outstanding(Db3
)
814 {ok
, DocResults
++ NRDocResults
, Db4
}.
819 commit_outstanding(#db
{fd
=Fd
, uncommitted_writes
=true
, header
=Header
} = Db
) ->
820 ok
= couch_file:sync(Fd
), % commit outstanding data
821 Header2
= Header#db_header
{
822 last_update_seq
= Db#db
.last_update_seq
,
823 summary_stream_state
= couch_stream:get_state(Db#db
.summary_stream
),
824 docinfo_by_seq_btree_state
= couch_btree:get_state(Db#db
.docinfo_by_seq_btree
),
825 docinfo_by_uuid_btree_state
= couch_btree:get_state(Db#db
.docinfo_by_uuid_btree
),
826 table_group_btree_state
= couch_btree:get_state(Db#db
.table_group_btree
),
827 local_docs_btree_state
= couch_btree:get_state(Db#db
.local_docs_btree
),
828 doc_count
= Db#db
.doc_count
830 {ok
, Header3
} = write_header(Fd
, Header2
),
831 ok
= couch_file:sync(Fd
), % commit header to disk
833 uncommitted_writes
= false
,
837 commit_outstanding(#db
{uncommitted_writes
=false
} = Db
) ->
840 write_header(Fd
, Header
) ->
841 H2
= Header#db_header
{write_version
= Header#db_header
.write_version
+ 1},
842 Bin
= term_to_binary(H2
),
843 case size(Bin
) > ?HEADER_SIZE
of
846 {error
, error_header_too_large
};
848 ok
= couch_file:pwrite(Fd
,0,Bin
),
849 ok
= couch_file:pwrite(Fd
,?HEADER_SIZE
,Bin
),
855 case read_header(Fd
,0) of
859 read_header(Fd
,?HEADER_SIZE
)
862 read_header(Fd
,Offset
) ->
863 {ok
, Bin
} = couch_file:pread(Fd
,Offset
,?HEADER_SIZE
),
864 case binary_to_term(Bin
) of
865 Header
when is_record(Header
, db_header
) ->
868 {error
, {unknown_header_type
, Else
}}
872 code_change(_OldVsn
, State
, _Extra
) ->
875 handle_info(_Info
, State
) ->
881 couch_server:start().
887 couch_server:start(),
889 {ok
, Db
} = couch_server:create("foo", [overwrite
]),
891 TableGroupDoc0
= couch_doc:new(),
892 TableGroupDoc1
= TableGroupDoc0#doc
{uuid
= "TestTable"},
893 TableGroupDoc2
= couch_doc:set_field(TableGroupDoc1
, "$table_foo", ["SELECT Foo != \"\"; COLUMN Foo; COLUMN BAR"]),
894 TableGroupDoc3
= couch_doc:set_field(TableGroupDoc2
, "$table_bar", ["SELECT *; COLUMN Bar; COLUMN foo"]),
895 TableGroupDoc4
= couch_doc:set_field(TableGroupDoc3
, "bar", ["BAR!!!"]),
896 {ok
, NewRevId
} = save_doc(Db
, TableGroupDoc4
),
897 {ok
, _NewRevId2
} = save_doc(Db
, couch_doc:add_rev(TableGroupDoc4
, NewRevId
)),
899 ok
= update_table_group_sync(Db
, TableGroupDoc1#doc
.uuid
),
900 DumpTableFun
= fun({_Uuid
, _Rev
}, Keys
, ColumnValues
, _Offset
, _TotalTableCount
, Count
) -> io:format("Keys:~p, ColumnValues:~p~n", [Keys
,ColumnValues
]), {ok
, Count
+ 1} end,
901 {ok
, OfficalCount
, TotalCount
} = fold_table(Db
, TableGroupDoc1#doc
.uuid
, "foo", nil
, fwd
, DumpTableFun
, 0),
902 OfficalCount
= TotalCount
,
903 io:format("Total rows dumped: ~w~n", [TotalCount
]),
907 {ok
, Db2
} = couch_server:open("foo"),
908 {ok
, Doc5
} = open_doc(Db2
, TableGroupDoc1#doc
.uuid
),
909 {ok
, FooValue2
} = couch_doc:get_field(Doc5
, "bar"),
910 io:format("bar: ~p~n", [FooValue2
]),
912 {ok
, UuidsAndValues
} = test_write_docs(N
, Db2
, []),
914 {ok
, OfficalCount2
, TotalCount2
} = fold_table(Db2
, TableGroupDoc1#doc
.uuid
, "foo", nil
, fwd
, DumpTableFun
, 0),
915 OfficalCount2
= TotalCount2
,
916 io:format("Total rows dumped: ~w~n", [TotalCount2
]),
919 ok
= update_table_group_sync(Db2
, TableGroupDoc1#doc
.uuid
)
923 io:format("database info: ~p~n", [get_info(Db2
)]),
925 {ok
, OfficalCount3
, TotalCount3
} = fold_table(Db2
, TableGroupDoc1#doc
.uuid
, "foo", nil
, fwd
, DumpTableFun
, 0),
926 OfficalCount3
= TotalCount3
,
927 io:format("Total rows dumped: ~w~n", [TotalCount3
]),
929 UuidsAndValues2
= test_delete_docs(Db2
, UuidsAndValues
, []),
931 ok
= update_table_group_sync(Db2
, TableGroupDoc1#doc
.uuid
),
933 {ok
, OfficalCount4
, TotalCount4
} = fold_table(Db2
, TableGroupDoc1#doc
.uuid
, "foo", nil
, fwd
, DumpTableFun
, 0),
934 OfficalCount4
= TotalCount4
,
935 io:format("Total rows dumped: ~w~n", [TotalCount4
]),
938 check_delete_docs(Db2
, UuidsAndValues
),
942 {ok
, Db3
} = couch_server:open("foo"),
943 test_update_docs(Db3
, UuidsAndValues2
),
945 ok
= update_table_group_sync(Db3
, TableGroupDoc1#doc
.uuid
),
947 {ok
, OfficalCount5
, TotalCount5
} = fold_table(Db3
, TableGroupDoc1#doc
.uuid
, "foo", nil
, fwd
, DumpTableFun
, 0),
948 OfficalCount5
= TotalCount5
,
949 io:format("Total rows dumped: ~w~n", [TotalCount5
]),
954 BadResult
= fold_table(Db3
, "TableGroupDoc1#doc.uuid", "foot", nil
, fwd
, DumpTableFun
, 0),
955 io:format("BadResult: ~p~n", [BadResult
]),
957 BadResult2
= update_table_group_sync(Db3
, "foo"),
958 io:format("BadResult2: ~p~n", [BadResult2
]),
961 _UuidsAndValues3
= test_delete_docs(Db3
, UuidsAndValues2
, []),
963 ok
= update_table_group_sync(Db3
, TableGroupDoc1#doc
.uuid
),
965 {ok
, _OfficalCount6
, TotalCount6
} = fold_table(Db3
, TableGroupDoc1#doc
.uuid
, "foo", {"1", nil
}, fwd
, DumpTableFun
, 0),
966 io:format("Total rows dumped: ~w~n", [TotalCount6
]),
968 io:format("database info: ~p~n", [get_info(Db3
)]).
972 test_write_docs(0, _Db
, Output
) ->
973 {ok
, lists:reverse(Output
)};
974 test_write_docs(N
, Db
, Output
) ->
975 Doc
= couch_doc:new(),
976 Doc2
= couch_doc:set_field(Doc
, "foo", [integer_to_list(N
)]),
977 Doc3
= couch_doc:set_field(Doc2
, "num", [N
]),
978 Doc4
= couch_doc:set_field(Doc3
, "bar", ["blah"]),
980 test_write_docs(N
-1, Db
, [{Doc3#doc
.uuid
, N
} | Output
]).
983 test_delete_docs(_Db
, [], Acc
) ->
985 test_delete_docs(Db
, [{Uuid
, _Value
} | Rest
], Acc
) ->
986 {ok
, _RevId
} = delete_doc(Db
, Uuid
, any
),
987 test_delete_docs2(Db
, Rest
, Acc
).
990 test_delete_docs2(_Db
, [], Acc
) ->
992 test_delete_docs2(Db
, [{Uuid
, Value
} | Rest
], Acc
) ->
993 test_delete_docs(Db
, Rest
, [{Uuid
, Value
} | Acc
]).
996 check_delete_docs(_Db
, []) ->
998 check_delete_docs(Db
, [{Uuid
, _Value
} | Rest
]) ->
999 {not_found
, deleted
} = open_doc(Db
, Uuid
),
1000 check_delete_docs2(Db
, Rest
).
1003 check_delete_docs2(_Db
, []) ->
1005 check_delete_docs2(Db
, [{Uuid
, _Value
} | Rest
]) ->
1006 {ok
, _Doc
} = open_doc(Db
, Uuid
),
1007 check_delete_docs(Db
, Rest
).
1009 test_update_docs(_Db
, []) ->
1011 test_update_docs(Db
, [{Uuid
,_Value
}| Rest
]) ->
1012 {ok
, Doc
} = open_doc(Db
, Uuid
),
1013 {ok
, [Num
]} = couch_doc:get_field(Doc
, "num"),
1014 Doc2
= couch_doc:set_field(Doc
, "num", [Num
, Num
+ 5]),
1016 test_update_docs(Db
, Rest
).
1022 Diff
= timer:now_diff(EndTime
, StartTime
),
1023 io:format("Action took ~f secs!~n", [Diff
/1000000.0]),