2 %% Copyright (C) 2006 Damien Katz
4 %% This program is free software; you can redistribute it and/or
5 %% modify it under the terms of the GNU General Public License
6 %% as published by the Free Software Foundation; either version 2
7 %% of the License, or (at your option) any later version.
9 %% This program is distributed in the hope that it will be useful,
10 %% but WITHOUT ANY WARRANTY; without even the implied warranty of
11 %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 %% GNU General Public License for more details.
14 %% You should have received a copy of the GNU General Public License
15 %% along with this program; if not, write to the Free Software
16 %% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
19 -behaviour(gen_server
).
21 -export([test
/0, test
/1]).
22 -export([open
/1,create
/1,create
/2,save_doc
/2,save_doc
/3,get_doc_info
/2]).
23 -export([delete_doc
/3,save_docs
/3,open_doc
/2,open_doc
/3,close
/1,enum_docs_since
/4,enum_docs_since
/5]).
24 -export([enum_docs
/4,enum_docs
/5]).
25 -export([update_table_group_sync
/2,update_table_group
/2,fold_table
/6,fold_table
/7,get_info
/1]).
26 -export([update_loop
/2]).
27 -export([init
/1,terminate
/2,handle_call
/3,handle_cast
/2,code_change
/3,handle_info
/2]).
28 -export([revision_list_to_trees
/2 , merge_rev_trees
/2 ]).
30 -include("couch_db.hrl").
32 -define(HEADER_SIZE
, 2048). % size of each segment of the doubly written header
37 summary_stream_state
= nil
,
38 docinfo_by_uuid_btree_state
= nil
,
39 docinfo_by_seq_btree_state
= nil
,
40 table_group_btree_state
= nil
,
47 header
= #db_header
{},
48 uncommitted_writes
= false
,
50 docinfo_by_uuid_btree
,
65 start_link(Filepath
, Options
) ->
66 {ok
, Super
} = couch_db_sup:start_link(),
67 FdResult
= supervisor:start_child(Super
,
69 {couch_file
, open
, [Filepath
, Options
]},
76 {ok
, _Db
} = supervisor:start_child(Super
,
78 %%%%%% below calls gen_server:start_link(couch_db, {Fd, Super, Options}, []).
79 {gen_server
, start_link
, [couch_db
, {Filepath
, Fd
, Super
, Options
}, []]},
85 {error
, {enoent
, _ChildInfo
}} ->
89 {error
, {Error
, _ChildInfo
}} ->
97 %%% Interface functions %%%
100 create(Filename
, []).
102 create(Filename
, Options
) when is_list(Options
) ->
103 start_link(Filename
, [create
| Options
]).
106 start_link(Filename
, []).
108 delete_doc(SupPid
, Uuid
, any
) ->
109 case open_doc(SupPid
, Uuid
) of
111 delete_doc(SupPid
, Uuid
, Doc#doc
.revisions
);
115 delete_doc(SupPid
, Uuid
, Revisions
) ->
116 Doc
= #doc
{uuid
=Uuid
, revisions
=Revisions
, deleted
=true
},
117 {ok
, [Result
]} = gen_server:call(db_pid(SupPid
), {update_docs
, [{[Doc
], [new_edits
]}], []}),
124 open_doc(_SupPid
, #doc_info
{deleted
=true
}) ->
125 {not_found
, deleted
};
126 open_doc(SupPid
, UuidOrDocInfo
) ->
127 open_doc_int(get_db(db_pid(SupPid
)), UuidOrDocInfo
, []).
129 open_doc(SupPid
, Uuid
, Options
) ->
130 open_doc_int(get_db(db_pid(SupPid
)), Uuid
, Options
).
132 get_doc_info(Db
, Uuid
) ->
133 case get_full_doc_info(Db
, Uuid
) of
134 {ok
, {DocInfo
, _RevisionTrees
}} ->
140 get_full_doc_info(SupPid
, Uuid
) when is_pid(SupPid
) ->
141 get_doc_info(get_db(db_pid(SupPid
)), Uuid
);
142 get_full_doc_info(Db
, Uuid
) ->
143 case couch_btree:lookup_single(Db#db
.docinfo_by_uuid_btree
, Uuid
) of
144 {ok
, {UpdateSeq
, Rev
, SummaryPointer
, Conflicts
, RevisionTrees
}} ->
145 {ok
, {#doc_info
{uuid
=Uuid
, revision
=Rev
, update_seq
=UpdateSeq
, summary_pointer
=SummaryPointer
, deleted
=(SummaryPointer
==0), conflicts
=Conflicts
}, RevisionTrees
}};
151 gen_server:call(db_pid(SupPid
), get_info
).
153 save_doc(SupPid
, Doc
) ->
154 save_doc(SupPid
, Doc
, []).
156 save_doc(SupPid
, Doc
, Options
) ->
157 {ok
, [RetValue
]} = save_docs(SupPid
, [{[Doc
], [new_edits
| Options
]}], []),
160 save_docs(SupPid
, DocsOptions
, Options
) ->
161 gen_server:call(db_pid(SupPid
), {update_docs
, DocsOptions
, Options
}).
163 enum_docs_since(SupPid
, SinceSeq
, Direction
, InFun
, Ctx
) ->
164 Db
= get_db(db_pid(SupPid
)),
165 EnumFun
= fun({UpdateSeq
, {Uuid
, Rev
, SummaryPointer
, Conflicts
}}, EnumCtx
) ->
169 update_seq
= UpdateSeq
,
170 summary_pointer
= SummaryPointer
,
171 conflicts
= Conflicts
,
172 deleted
= SummaryPointer
== 0
174 InFun(DocInfo
, EnumCtx
)
176 couch_btree:fold(Db#db
.docinfo_by_seq_btree
, SinceSeq
+ 1, Direction
, EnumFun
, Ctx
).
178 enum_docs_since(SupPid
, SinceSeq
, InFun
, Ctx
) ->
179 enum_docs_since(SupPid
, SinceSeq
, fwd
, InFun
, Ctx
).
181 enum_docs(SupPid
, StartUuid
, Direction
, InFun
, Ctx
) ->
182 Db
= get_db(db_pid(SupPid
)),
183 EnumFun
= fun({Uuid
, {UpdateSeq
, Rev
, SummaryPointer
, Conflicts
, _RevTrees
}}, EnumCtx
) ->
187 update_seq
= UpdateSeq
,
188 summary_pointer
= SummaryPointer
,
189 deleted
= SummaryPointer
== 0,
190 conflicts
= Conflicts
192 InFun(DocInfo
, EnumCtx
)
194 couch_btree:fold(Db#db
.docinfo_by_uuid_btree
, StartUuid
, Direction
, EnumFun
, Ctx
).
196 enum_docs(SupPid
, StartUuid
, InFun
, Ctx
) ->
197 enum_docs(SupPid
, StartUuid
, fwd
, InFun
, Ctx
).
200 update_table_group(SupPid
, TableGroupDocUuid
) ->
201 gen_server:call(db_pid(SupPid
), {update_table_group
, TableGroupDocUuid
, fun(_Whatever
) -> ok
end}).
204 sync_update_notify(Pid
, Ref
, partial
) ->
205 % We want to wait until complete
206 % so return a fun that calls ourself
207 fun(Status
)-> sync_update_notify(Pid
, Ref
, Status
) end;
208 sync_update_notify(Pid
, Ref
, complete
) ->
210 sync_update_notify(Pid
, Ref
, Else
) ->
214 update_table_group_sync(SupPid
, TableGroupDocUuid
) ->
217 UpdateFun
= fun(Status
)-> sync_update_notify(Pid
, Ref
, Status
) end,
218 case gen_server:call(db_pid(SupPid
), {update_table_group
, TableGroupDocUuid
, UpdateFun
}, infinity
) of
228 fold_table(SupPid
, TableGroupDocUuid
, TableName
, Dir
, Fun
, Acc
) ->
229 case gen_server:call(db_pid(SupPid
), {get_table_group
, TableGroupDocUuid
}) of
231 couch_table_group:fold(TableGroup
, TableName
, Dir
, Fun
, Acc
);
236 fold_table(SupPid
, TableGroupDocUuid
, TableName
, StartKey
, Dir
, Fun
, Acc
) ->
237 case gen_server:call(db_pid(SupPid
), {get_table_group
, TableGroupDocUuid
}) of
239 couch_table_group:fold(TableGroup
, TableName
, StartKey
, Dir
, Fun
, Acc
);
245 Ref
= erlang:monitor(process, SupPid
),
247 exit(SupPid
, normal
),
249 {'DOWN', Ref
, process, SupPid
, _Reason
} ->
257 init({Filepath
, Fd
, Supervisor
, Options
}) ->
258 case lists:member(create
, Options
) of
260 init_main(Filepath
, Fd
, Supervisor
, nil
);
262 {ok
, Header
} = read_header(Fd
),
263 init_main(Filepath
, Fd
, Supervisor
, Header
)
267 init_main(Filepath
, Fd
, Supervisor
, nil
) ->
268 % creates a new header and writes it to the file
269 {ok
, _
} = couch_file:expand(Fd
, 2*(?HEADER_SIZE
)),
270 Header
= #db_header
{},
271 {ok
, Header2
} = write_header(Fd
, Header
),
272 ok
= couch_file:sync(Fd
),
273 init_main(Filepath
, Fd
, Supervisor
, Header2
);
274 init_main(Filepath
, Fd
, Supervisor
, Header
) ->
275 {ok
, SummaryStream
} = couch_stream:open(Header#db_header
.summary_stream_state
, Fd
),
276 {ok
, UuidBtree
} = couch_btree:open(Header#db_header
.docinfo_by_uuid_btree_state
, Fd
),
277 {ok
, SeqBtree
} = couch_btree:open(Header#db_header
.docinfo_by_seq_btree_state
, Fd
),
278 {ok
, TableGroupBtree
} = couch_btree:open(Header#db_header
.table_group_btree_state
, Fd
),
282 supervisor
=Supervisor
,
284 summary_stream
= SummaryStream
,
285 docinfo_by_uuid_btree
= UuidBtree
,
286 docinfo_by_seq_btree
= SeqBtree
,
287 last_update_seq
= Header#db_header
.last_update_seq
,
288 table_group_btree
= TableGroupBtree
,
289 doc_count
= Header#db_header
.doc_count
,
293 UpdatePid
= spawn_link(couch_db
, update_loop
, [self(), Db
]),
297 GetTableGroupInfoFun
= fun(GroupKey
) ->
298 get_table_group_info(get_db(Pid
), GroupKey
)
301 UpdateTableGroupInfoFun
= fun(GroupKey
, UpdateStatus
, GroupInfo
) ->
302 UpdatePid
! {table_group_updated
, GroupKey
, UpdateStatus
, GroupInfo
},
305 {ok
, TableMgr
} = couch_table_group:start_manager(Supervisor
, Fd
, GetTableGroupInfoFun
, UpdateTableGroupInfoFun
),
307 UpdatePid
! {set_table_group_mgr
, TableMgr
},
309 {ok
, #main
{db
=Db
, update_pid
=UpdatePid
, table_group_mgr
=TableMgr
}}.
311 terminate(_Reason
, #main
{db
=Db
} = Main
) ->
312 Main#main
.update_pid
! close
,
313 couch_table_group:stop(Main#main
.table_group_mgr
),
314 couch_file:close(Db#db
.fd
).
316 handle_call({get_table_group
, TableGroupDocUuid
}, From
, #main
{db
=Db
}=Main
) ->
317 case get_doc_info(Db
, TableGroupDocUuid
) of
318 {ok
, #doc_info
{deleted
=true
}} ->
319 {reply
, {not_found
, deleted
}, Main
};
321 ok
= couch_table_group:get_group_async(Main#main
.table_group_mgr
, DocInfo
, From
),
324 {reply
, {not_found
, missing
}, Main
}
326 handle_call({update_docs
, DocActions
, Options
}, From
, Main
) ->
327 Main#main
.update_pid
! {From
, update_docs
, DocActions
, Options
},
329 handle_call(get_db
, _From
, #main
{db
=Db
}=Main
) ->
330 {reply
, {ok
, Db
}, Main
};
331 handle_call(get_info
, _From
, #main
{db
=Db
}=Main
) ->
333 {doc_count
, Db#db
.doc_count
},
334 {last_update_seq
, Db#db
.last_update_seq
}
336 {reply
, {ok
, InfoList
}, Main
};
337 handle_call({update_table_group
, Uuid
, UpdateNotifFun
}, _From
, #main
{db
=Db
}=Main
) ->
338 case get_doc_info(Db
, Uuid
) of
340 ok
= couch_table_group:update_group(Main#main
.table_group_mgr
, DocInfo
, UpdateNotifFun
),
347 handle_cast({db_updated
, NewDb
}, Main
) ->
348 {noreply
, Main#main
{db
=NewDb
}}.
350 %%% Internal function %%%
354 {error
, {already_started
, DbPid
}} = supervisor:start_child(SupPid
,
356 {couch_db
, sup_start_link
, []},
364 update_loop(MainPid
, Db
) ->
366 {set_table_group_mgr
, TableMgr
} ->
367 update_loop(MainPid
, Db#db
{table_group_mgr
=TableMgr
});
368 {OrigFrom
, update_docs
, DocActions
, Options
} ->
369 {ok
, DocResults
, Db2
} = update_docs_int(Db
, DocActions
, Options
),
370 ok
= gen_server:cast(MainPid
, {db_updated
, Db2
}),
371 gen_server:reply(OrigFrom
, {ok
, DocResults
}),
372 update_loop(MainPid
, Db2
);
373 {table_group_updated
, #doc_info
{uuid
=Uuid
}=GroupDocInfo
, _UpdateStatus
, TableGroupInfo
} ->
374 case get_doc_info(Db
, GroupDocInfo#doc_info
.uuid
) of
375 {ok
, GroupDocInfo
} ->
376 % revision on disk matches the revision of the table group being updated
377 % so we save the info to disk
378 {ok
, GroupBtree2
} = couch_btree:add_remove(Db#db
.table_group_btree
, [{Uuid
, TableGroupInfo
}], []),
379 Db2
= Db#db
{table_group_btree
=GroupBtree2
, uncommitted_writes
=true
},
380 {ok
, Db3
} = commit_outstanding(Db2
),
381 ok
= gen_server:cast(MainPid
, {db_updated
, Db3
}),
382 update_loop(MainPid
, Db3
);
384 % doesn't match, don't save in btree
385 update_loop(MainPid
, Db
)
392 get_table_group_info(#db
{}=Db
, #doc_info
{uuid
=Uuid
}=DocInfo
) ->
393 case couch_btree:lookup_single(Db#db
.table_group_btree
, Uuid
) of
394 {ok
, {TableQueries
, TableGroupState
}} ->
395 {ok
, {TableQueries
, TableGroupState
}};
397 {ok
, Doc
} = open_doc_int(Db
, DocInfo
, []),
398 case couch_doc:get_table_queries(Doc
) of
400 {not_found
, no_tables_found
};
402 {ok
, {TableQueries
, nil
}}
407 {ok
, Db
} = gen_server:call(DbPid
, get_db
),
411 open_doc_int(Db
, #doc_info
{uuid
=Uuid
,revision
=Rev
,summary_pointer
=SummaryPointer
,deleted
=Deleted
}, Options
) ->
412 case (not Deleted
) orelse
lists:member(allow_stub
, Options
) of
414 {ok
, SummaryFields
} = couch_stream:read_term(Db#db
.summary_stream
, SummaryPointer
),
418 summary_fields
= dict:from_list(SummaryFields
),
425 open_doc_int(Db
, {#doc_info
{uuid
=Uuid
,revision
=Rev
,summary_pointer
=SummaryPointer
,deleted
=Deleted
}, RevisionTree
}, Options
) ->
426 case (not Deleted
) orelse
lists:member(allow_stub
, Options
) of
428 {ok
, SummaryFields
} = couch_stream:read_term(Db#db
.summary_stream
, SummaryPointer
),
429 io:format("RevisionTree:~p~n", [RevisionTree
]),
430 io:format("RevisionPath:~p~n", [get_rev_path(Rev
, RevisionTree
)]),
433 revisions
= get_rev_path(Rev
, RevisionTree
),
434 summary_fields
= dict:from_list(SummaryFields
),
441 open_doc_int(Db
, Uuid
, Options
) ->
442 case get_full_doc_info(Db
, Uuid
) of
443 {ok
, {DocInfo
, RevisionTree
}} ->
444 open_doc_int(Db
, {DocInfo
, RevisionTree
}, Options
);
449 % revision tree functions
451 append_to_leaf_node([], _FindLeafRev
, _NewRev
) ->
453 append_to_leaf_node([{LeafRev
, LeafDoc
, []} | Rest
], FindLeafRev
, {NewRev
, NewDoc
}) when LeafRev
== FindLeafRev
->
454 [{LeafRev
, LeafDoc
, [{NewRev
, NewDoc
, []}]} | Rest
];
455 append_to_leaf_node([Node
| Rest
], FindLeafRev
, {NewRev
, NewDoc
}) ->
456 [Node
| append_to_leaf_node(Rest
, FindLeafRev
, {NewRev
, NewDoc
})].
458 merge_rev_trees([], B
) ->
460 merge_rev_trees(A
, []) ->
462 merge_rev_trees([ATree
| ANextTree
], [BTree
| BNextTree
]) ->
463 {ARev
, ADoc
, ASubTrees
} = ATree
,
464 {BRev
, _BDoc
, BSubTrees
} = BTree
,
468 {MergedSubTrees
, SubTreesConflicts
} = merge_rev_trees(ASubTrees
, BSubTrees
),
469 {MergedNextTrees
, NextConflicts
} = merge_rev_trees(ANextTree
, BNextTree
),
470 {[{ARev
, ADoc
, MergedSubTrees
} | MergedNextTrees
], SubTreesConflicts
+ NextConflicts
};
472 {Merged
, Conflicts
} = merge_rev_trees(ANextTree
, [BTree
| BNextTree
]),
473 {[ATree
| Merged
], Conflicts
+ 1};
475 {Merged
, Conflicts
} = merge_rev_trees([ATree
| ANextTree
], BNextTree
),
476 {[BTree
| Merged
], Conflicts
+ 1}
479 get_leaf_revs([], _
, Acc
) ->
481 get_leaf_revs([{RevId
, DocOrPtr
, []} | RestTrees
], Depth
, Acc
) ->
482 [{RevId
, DocOrPtr
, Depth
} | get_leaf_revs(RestTrees
, Depth
, Acc
)];
483 get_leaf_revs([{_RevId
, _DocOrPtr
, SubTrees
} | RestTrees
], Depth
, Acc
) ->
484 get_leaf_revs(SubTrees
, Depth
+ 1, get_leaf_revs(RestTrees
, Depth
, Acc
)).
486 % returns a list of revids that is that path to the first revision
487 % match in the trees. If none found, returns null list []
488 get_rev_path(_Rev
, []) ->
490 get_rev_path(Rev
, [{RevId
, _
, _SubTrees
} | _RestTrees
]) when Rev
== RevId
->
492 get_rev_path(Rev
, [{RevId
, _
, SubTrees
} | RestTrees
]) ->
493 case get_rev_path(Rev
, SubTrees
) of
495 get_rev_path(Rev
, RestTrees
);
501 revision_list_to_trees(Doc
, [RevId
]) ->
503 revision_list_to_trees(Doc
, [RevId
| Rest
]) ->
504 [{RevId
, 0, revision_list_to_trees(Doc
, Rest
)}] .
506 % counts the number of conflicts in a list sorted by priority
507 count_non_deletes([], Count
) ->
509 count_non_deletes([{_RevId
, 0, _Depth
} | _Rest
], Count
) ->
510 Count
; % first deletion, all done
511 count_non_deletes([{_RevId
, _SummaryPointer
, _Depth
} | Rest
], Count
) ->
512 count_non_deletes(Rest
, Count
+ 1).
514 winning_revision(Trees
) ->
515 LeafRevs
= get_leaf_revs(Trees
, 0, []),
517 lists:sort(fun({RevIdA
, SummaryPointerA
, DepthA
}, {RevIdB
, SummaryPointerB
, DepthB
}) ->
518 % sort descending by {not deleted, then Depth, then RevisionId}
519 ANotDeleted
= case SummaryPointerA
of 0 -> false
; _
-> true
end,
520 BNotDeleted
= case SummaryPointerB
of 0 -> false
; _
-> true
end,
521 A
= {ANotDeleted
, DepthA
, RevIdA
},
522 B
= {BNotDeleted
, DepthB
, RevIdB
},
526 [{RevId
, SummaryPointer
, _Depth
} | Rest
] = SortedLeafRevs
,
527 {RevId
, SummaryPointer
, count_non_deletes(Rest
, 0)}.
529 flush_revision_trees(_Db
, []) ->
531 flush_revision_trees(Db
, [{RevId
, #doc
{deleted
=true
}, SubTrees
} | RestTrees
]) ->
532 [{RevId
, 0, flush_revision_trees(Db
, SubTrees
)} | flush_revision_trees(Db
, RestTrees
)];
533 flush_revision_trees(Db
, [{RevId
, #doc
{summary_fields
=Fields
}, SubTrees
} | RestTrees
]) ->
534 {ok
, SummaryPointer
} = couch_stream:write_term(Db#db
.summary_stream
, dict:to_list(Fields
)),
535 [{RevId
, SummaryPointer
, flush_revision_trees(Db
, SubTrees
)} | flush_revision_trees(Db
, RestTrees
)];
536 flush_revision_trees(Db
, [{RevId
, SummaryPointer
, SubTrees
} | RestTrees
]) ->
537 [{RevId
, SummaryPointer
, flush_revision_trees(Db
, SubTrees
)} | flush_revision_trees(Db
, RestTrees
)].
541 write_summaries(Db
, [], InfoBySeqOut
, RemoveSeqOut
, InfoByUuidOut
, DocResultOut
) ->
542 {ok
, lists:reverse(DocResultOut
), lists:reverse(InfoByUuidOut
), lists:reverse(RemoveSeqOut
),
543 lists:reverse(InfoBySeqOut
), Db
};
544 write_summaries(Db
, [{Uuid
, {Docs
, Options
}, {DiskUpdateSeq
, _DiskRevision
, DiskSummaryPointer
, _DiskConflicts
, DiskRevTrees
}} | Rest
],
545 InfoBySeqOut
, RemoveSeqOut
, InfoByUuidOut
, DocResultOut
) ->
546 NewEdits
= lists:member(new_edits
, Options
),
548 lists:foldl(fun(#doc
{revisions
=Revisions
}=Doc
, AccTrees
) ->
549 Revisions2
= case NewEdits
of
550 true
-> Revisions
++ [couch_util:rand32()]; % add new revision
553 DocRevTree
= revision_list_to_trees(Doc
, Revisions2
),
554 {NewRevTrees
, _Conflicts
} = merge_rev_trees(AccTrees
, DocRevTree
),
558 {NewRevTrees
, Conflicts
} = merge_rev_trees(DiskRevTrees
, InputRevTrees
),
560 case NewEdits andalso Conflicts
> 0 of
562 io:format("DiskRevTrees:~p~nInputRevTrees:~p~n", [DiskRevTrees
, InputRevTrees
]),
564 DocResultOut2
= [conflict
| DocResultOut
],
565 write_summaries(Db
, Rest
, InfoBySeqOut
, RemoveSeqOut
, InfoByUuidOut
, DocResultOut2
);
567 FlushedTrees
= flush_revision_trees(Db
, NewRevTrees
),
568 {WinningRevision
, WinningSummaryPointer
, NumConflicts
} = winning_revision(FlushedTrees
),
570 WasDiskDocument
= DiskSummaryPointer
/= 0,
571 IsDiskDocument
= WinningSummaryPointer
/= 0,
574 case {WasDiskDocument
, IsDiskDocument
} of
575 {true
, true
} -> % existing document, addition
577 {false
, false
} -> % no document, deletion
579 {false
, true
} -> % no document, addition
581 {true
, false
} -> % existing document, deletion
585 UpdateSeq
= Db#db
.last_update_seq
+ 1,
587 case DiskUpdateSeq
of
589 RemoveSeqOut2
= RemoveSeqOut
;
591 RemoveSeqOut2
= [DiskUpdateSeq
| RemoveSeqOut
]
594 InfoBySeqOut2
= [{UpdateSeq
, {Uuid
, WinningRevision
, WinningSummaryPointer
, NumConflicts
}} | InfoBySeqOut
],
595 InfoByUuidOut2
= [{Uuid
, {UpdateSeq
, WinningRevision
, WinningSummaryPointer
, NumConflicts
, FlushedTrees
}} | InfoByUuidOut
],
596 DocResultOut2
= [{ok
, WinningRevision
} | DocResultOut
],
597 Db2
= Db#db
{last_update_seq
= UpdateSeq
, uncommitted_writes
=true
, doc_count
=NewDocCount
},
598 write_summaries(Db2
, Rest
, InfoBySeqOut2
, RemoveSeqOut2
, InfoByUuidOut2
, DocResultOut2
)
601 update_docs_int(Db
, DocsOptionsList
, Options
) ->
603 docinfo_by_uuid_btree
= DocInfoByUuidBTree
,
604 docinfo_by_seq_btree
= DocInfoBySeqBTree
,
605 table_group_btree
= TableGroupBTree
,
606 table_group_mgr
= TableGroupMgr
608 Uuids
= [Uuid
|| {[#doc
{uuid
=Uuid
} | _Rest
], _Options
} <- DocsOptionsList
],
609 {ok
, OldDocInfoResults
} = couch_btree:lookup(DocInfoByUuidBTree
, Uuids
),
611 % create a list of {{Docs, UpdateOptions}, RevisionTree} tuples.
613 lists:zipwith3(fun(DocUuid
, DocsOptions
, OldDocInfoLookupResult
) ->
614 case OldDocInfoLookupResult
of
615 {ok
, {_Uuid
, {DiskUpdateSeq
, DiskRevision
, DiskSummaryPointer
, DiskConflicts
, DiskRevTrees
}}} ->
616 {DocUuid
, DocsOptions
, {DiskUpdateSeq
, DiskRevision
, DiskSummaryPointer
, DiskConflicts
, DiskRevTrees
}};
618 {DocUuid
, DocsOptions
, {0, 0, 0, 0, []}}
621 Uuids
, DocsOptionsList
, OldDocInfoResults
),
623 % now write out the documents
624 {ok
, DocResults
, InfoByUuid
, RemoveSeqList
, InfoBySeqList
, Db2
} =
625 write_summaries(Db
, DocsAndOldDocInfo
, [], [], [], []),
627 % and the indexes to the documents
628 {ok
, DocInfoBySeqBTree2
} = couch_btree:add_remove(DocInfoBySeqBTree
, InfoBySeqList
, RemoveSeqList
),
629 {ok
, DocInfoByUuidBTree2
} = couch_btree:add_remove(DocInfoByUuidBTree
, InfoByUuid
, []),
631 % clear the computed table cache
632 UpdatedUuids
= [UpdatedUuid
|| {UpdatedUuid
, _DocInfo
} <- InfoByUuid
],
633 {ok
, TableGroupBTree2
} = couch_btree:add_remove(TableGroupBTree
, [], UpdatedUuids
),
635 % now notify the table group manager to discard any of the table groups it has in memory
638 lists:map(fun({OldUuid
, _Docs
, {OldUpdateSeq
, OldRev
, OldSummaryPointer
, OldConflicts
, _OldRevTrees
}}) ->
639 #doc_info
{uuid
=OldUuid
, update_seq
=OldUpdateSeq
, revision
=OldRev
, summary_pointer
=OldSummaryPointer
, conflicts
=OldConflicts
}
640 end, DocsAndOldDocInfo
),
642 ok
= couch_table_group:free_groups(TableGroupMgr
, OldDocInfos
),
645 docinfo_by_uuid_btree
= DocInfoByUuidBTree2
,
646 docinfo_by_seq_btree
= DocInfoBySeqBTree2
,
647 table_group_btree
= TableGroupBTree2
,
648 uncommitted_writes
= true
650 case lists:member(delay_commit
, Options
) of
654 {ok
, Db4
} = commit_outstanding(Db3
)
656 {ok
, DocResults
, Db4
}.
661 commit_outstanding(#db
{fd
=Fd
, uncommitted_writes
=true
, header
=Header
} = Db
) ->
662 ok
= couch_file:sync(Fd
), % commit outstanding data
663 Header2
= Header#db_header
{
664 last_update_seq
= Db#db
.last_update_seq
,
665 summary_stream_state
= couch_stream:get_state(Db#db
.summary_stream
),
666 docinfo_by_seq_btree_state
= couch_btree:get_state(Db#db
.docinfo_by_seq_btree
),
667 docinfo_by_uuid_btree_state
= couch_btree:get_state(Db#db
.docinfo_by_uuid_btree
),
668 table_group_btree_state
= couch_btree:get_state(Db#db
.table_group_btree
),
669 doc_count
= Db#db
.doc_count
671 {ok
, Header3
} = write_header(Fd
, Header2
),
672 ok
= couch_file:sync(Fd
), % commit header to disk
674 uncommitted_writes
= false
,
678 commit_outstanding(#db
{uncommitted_writes
=false
} = Db
) ->
681 write_header(Fd
, Header
) ->
682 H2
= Header#db_header
{write_version
= Header#db_header
.write_version
+ 1},
683 Bin
= term_to_binary(H2
),
684 case size(Bin
) > ?HEADER_SIZE
of
687 {error
, error_header_too_large
};
689 ok
= couch_file:pwrite(Fd
,0,Bin
),
690 ok
= couch_file:pwrite(Fd
,?HEADER_SIZE
,Bin
),
696 case read_header(Fd
,0) of
700 read_header(Fd
,?HEADER_SIZE
)
703 read_header(Fd
,Offset
) ->
704 {ok
, Bin
} = couch_file:pread(Fd
,Offset
,?HEADER_SIZE
),
705 case binary_to_term(Bin
) of
706 Header
when is_record(Header
, db_header
) ->
709 {error
, {unknown_header_type
, Else
}}
713 code_change(_OldVsn
, State
, _Extra
) ->
716 handle_info(_Info
, State
) ->
722 couch_server:start().
728 couch_server:start(),
730 {ok
, Db
} = couch_server:create("foo", [overwrite
]),
732 TableGroupDoc0
= couch_doc:new(),
733 TableGroupDoc1
= TableGroupDoc0#doc
{uuid
= "TestTable"},
734 TableGroupDoc2
= couch_doc:set_field(TableGroupDoc1
, "$table_foo", ["SELECT Foo != \"\"; COLUMN Foo; COLUMN BAR"]),
735 TableGroupDoc3
= couch_doc:set_field(TableGroupDoc2
, "$table_bar", ["SELECT *; COLUMN Bar; COLUMN foo"]),
736 TableGroupDoc4
= couch_doc:set_field(TableGroupDoc3
, "bar", ["BAR!!!"]),
737 {ok
, NewRevId
} = save_doc(Db
, TableGroupDoc4
),
738 {ok
, _NewRevId2
} = save_doc(Db
, couch_doc:add_rev(TableGroupDoc4
, NewRevId
)),
740 ok
= update_table_group_sync(Db
, TableGroupDoc1#doc
.uuid
),
741 DumpTableFun
= fun({_Uuid
, _Rev
}, Keys
, ColumnValues
, _Offset
, _TotalTableCount
, Count
) -> io:format("Keys:~p, ColumnValues:~p~n", [Keys
,ColumnValues
]), {ok
, Count
+ 1} end,
742 {ok
, OfficalCount
, TotalCount
} = fold_table(Db
, TableGroupDoc1#doc
.uuid
, "foo", nil
, fwd
, DumpTableFun
, 0),
743 OfficalCount
= TotalCount
,
744 io:format("Total rows dumped: ~w~n", [TotalCount
]),
748 {ok
, Db2
} = couch_server:open("foo"),
749 {ok
, Doc5
} = open_doc(Db2
, TableGroupDoc1#doc
.uuid
),
750 {ok
, FooValue2
} = couch_doc:get_field(Doc5
, "bar"),
751 io:format("bar: ~p~n", [FooValue2
]),
753 {ok
, UuidsAndValues
} = test_write_docs(N
, Db2
, []),
755 {ok
, OfficalCount2
, TotalCount2
} = fold_table(Db2
, TableGroupDoc1#doc
.uuid
, "foo", nil
, fwd
, DumpTableFun
, 0),
756 OfficalCount2
= TotalCount2
,
757 io:format("Total rows dumped: ~w~n", [TotalCount2
]),
760 ok
= update_table_group_sync(Db2
, TableGroupDoc1#doc
.uuid
)
764 io:format("database info: ~p~n", [get_info(Db2
)]),
766 {ok
, OfficalCount3
, TotalCount3
} = fold_table(Db2
, TableGroupDoc1#doc
.uuid
, "foo", nil
, fwd
, DumpTableFun
, 0),
767 OfficalCount3
= TotalCount3
,
768 io:format("Total rows dumped: ~w~n", [TotalCount3
]),
770 UuidsAndValues2
= test_delete_docs(Db2
, UuidsAndValues
, []),
772 ok
= update_table_group_sync(Db2
, TableGroupDoc1#doc
.uuid
),
774 {ok
, OfficalCount4
, TotalCount4
} = fold_table(Db2
, TableGroupDoc1#doc
.uuid
, "foo", nil
, fwd
, DumpTableFun
, 0),
775 OfficalCount4
= TotalCount4
,
776 io:format("Total rows dumped: ~w~n", [TotalCount4
]),
779 check_delete_docs(Db2
, UuidsAndValues
),
783 {ok
, Db3
} = couch_server:open("foo"),
784 test_update_docs(Db3
, UuidsAndValues2
),
786 ok
= update_table_group_sync(Db3
, TableGroupDoc1#doc
.uuid
),
788 {ok
, OfficalCount5
, TotalCount5
} = fold_table(Db3
, TableGroupDoc1#doc
.uuid
, "foo", nil
, fwd
, DumpTableFun
, 0),
789 OfficalCount5
= TotalCount5
,
790 io:format("Total rows dumped: ~w~n", [TotalCount5
]),
795 BadResult
= fold_table(Db3
, "TableGroupDoc1#doc.uuid", "foot", nil
, fwd
, DumpTableFun
, 0),
796 io:format("BadResult: ~p~n", [BadResult
]),
798 BadResult2
= update_table_group_sync(Db3
, "foo"),
799 io:format("BadResult2: ~p~n", [BadResult2
]),
802 _UuidsAndValues3
= test_delete_docs(Db3
, UuidsAndValues2
, []),
804 ok
= update_table_group_sync(Db3
, TableGroupDoc1#doc
.uuid
),
806 {ok
, _OfficalCount6
, TotalCount6
} = fold_table(Db3
, TableGroupDoc1#doc
.uuid
, "foo", {"1", nil
}, fwd
, DumpTableFun
, 0),
807 io:format("Total rows dumped: ~w~n", [TotalCount6
]),
809 io:format("database info: ~p~n", [get_info(Db3
)]).
813 test_write_docs(0, _Db
, Output
) ->
814 {ok
, lists:reverse(Output
)};
815 test_write_docs(N
, Db
, Output
) ->
816 Doc
= couch_doc:new(),
817 Doc2
= couch_doc:set_field(Doc
, "foo", [integer_to_list(N
)]),
818 Doc3
= couch_doc:set_field(Doc2
, "num", [N
]),
819 Doc4
= couch_doc:set_field(Doc3
, "bar", ["blah"]),
821 test_write_docs(N
-1, Db
, [{Doc3#doc
.uuid
, N
} | Output
]).
824 test_delete_docs(_Db
, [], Acc
) ->
826 test_delete_docs(Db
, [{Uuid
, _Value
} | Rest
], Acc
) ->
827 {ok
, _RevId
} = delete_doc(Db
, Uuid
, any
),
828 test_delete_docs2(Db
, Rest
, Acc
).
831 test_delete_docs2(_Db
, [], Acc
) ->
833 test_delete_docs2(Db
, [{Uuid
, Value
} | Rest
], Acc
) ->
834 test_delete_docs(Db
, Rest
, [{Uuid
, Value
} | Acc
]).
837 check_delete_docs(_Db
, []) ->
839 check_delete_docs(Db
, [{Uuid
, _Value
} | Rest
]) ->
840 {not_found
, deleted
} = open_doc(Db
, Uuid
),
841 check_delete_docs2(Db
, Rest
).
844 check_delete_docs2(_Db
, []) ->
846 check_delete_docs2(Db
, [{Uuid
, _Value
} | Rest
]) ->
847 {ok
, _Doc
} = open_doc(Db
, Uuid
),
848 check_delete_docs(Db
, Rest
).
850 test_update_docs(_Db
, []) ->
852 test_update_docs(Db
, [{Uuid
,_Value
}| Rest
]) ->
853 {ok
, Doc
} = open_doc(Db
, Uuid
),
854 {ok
, [Num
]} = couch_doc:get_field(Doc
, "num"),
855 Doc2
= couch_doc:set_field(Doc
, "num", [Num
, Num
+ 5]),
857 test_update_docs(Db
, Rest
).
863 Diff
= timer:now_diff(EndTime
, StartTime
),
864 io:format("Action took ~f secs!~n", [Diff
/1000000.0]),