fixed a problem with deleted docs and the all_docss view
[couchdbimport.git] / src / CouchDb / couch_table_group.erl
blob3aeb538629939375c0cbcee7fc6a0de7e7d9cdfc
1 %% CouchDb
2 %% Copyright (C) 2006 Damien Katz
3 %%
4 %% This program is free software; you can redistribute it and/or
5 %% modify it under the terms of the GNU General Public License
6 %% as published by the Free Software Foundation; either version 2
7 %% of the License, or (at your option) any later version.
8 %%
9 %% This program is distributed in the hope that it will be useful,
10 %% but WITHOUT ANY WARRANTY; without even the implied warranty of
11 %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 %% GNU General Public License for more details.
13 %%
14 %% You should have received a copy of the GNU General Public License
15 %% along with this program; if not, write to the Free Software
16 %% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 -module(couch_table_group).
19 -behaviour(gen_server).
21 -export([start_manager/4,stop/1,
22 open/4,open/5,fold/5,fold/6,
23 update_group/3,update_table_proc/5,free_groups/2,
24 update_table_proc/7,get_group_async/3,
25 less_json/2]).
26 -export([init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2,code_change/3]).
28 -include("couch_db.hrl").
30 % arbitrarily chosen amount of memory to use before flushing to disk
31 -define(FLUSH_MAX_MEM, 1000000).
33 -record(table_group,
34 {db,
35 named_queries,
36 tables,
37 current_seq,
38 uuid_btree,
39 update_notif_fun,
40 compiled_doc_map = nil
41 }).
43 -record(table,
44 {id_num,
45 name,
46 btree,
47 columns,
48 query_string
49 }).
51 -record(mgr,
52 {db,
53 fd,
54 update_state_by_uuid = dict:new(),
55 uuids_by_pid = dict:new(),
56 get_group_info_fun,
57 update_group_info_fun,
58 cached_groups=dict:new()
59 }).
62 start_manager(Db, Fd, GetGroupInfoFun, UpdateGroupInfoFun) ->
63 gen_server:start_link(couch_table_group, {Db, Fd, GetGroupInfoFun, UpdateGroupInfoFun}, []).
65 stop(MgrPid) ->
66 gen_server:cast(MgrPid, stop).
68 init({Db, Fd, GetGroupInfoFun, UpdateGroupInfoFun}) ->
69 process_flag(trap_exit, true),
70 {ok, #mgr{db=Db, fd=Fd, get_group_info_fun=GetGroupInfoFun, update_group_info_fun=UpdateGroupInfoFun}}.
72 terminate(_Reason, _Mgr) ->
73 ok.
75 get_group_async(MgrPid, GroupId, OrigFrom) ->
76 gen_server:cast(MgrPid, {get_group_async, GroupId, OrigFrom}).
79 update_group(MgrPid, GroupId, UpdateNotifFun) ->
80 gen_server:cast(MgrPid, {update_group, GroupId, UpdateNotifFun}).
82 % stops any processing of the table and frees and cached values
83 free_groups(MgrPid, GroupIds) ->
84 gen_server:call(MgrPid, {free_groups, GroupIds}).
87 % called from the update process
88 handle_call({group_cache_update, GroupId, Group}, {FromPid, _FromRef}, Mgr) ->
89 Group2 = Group#table_group{compiled_doc_map=nil},
90 % the process may have been killed by the free groups call
91 % so check to make sure its alive.
92 case is_process_alive(FromPid) of
93 true ->
94 #mgr{cached_groups=CachedGroups} = Mgr,
95 CachedGroups2 = dict:store(GroupId, Group2, CachedGroups),
96 {reply, ok, Mgr#mgr{cached_groups=CachedGroups2}};
97 false ->
98 {reply, ok, Mgr}
99 end;
100 handle_call({free_groups, GroupIds}, _From, Mgr) ->
101 Mgr2 =
102 lists:foldl(fun(GroupId, MgrAcc) ->
103 #mgr{cached_groups=CachedGroups, update_state_by_uuid=ProcsDict} = MgrAcc,
104 CachedGroups2 = dict:erase(GroupId, CachedGroups),
105 case dict:find(GroupId, ProcsDict) of
106 {ok, {processing_request, _NotifyFuns, Pid}} ->
107 exit(Pid, freed);
108 {ok, {processing_and_pending_request, _NotifyFuns, _PendingUpdateNotifFuns, Pid}} ->
109 exit(Pid, freed);
110 _Else ->
112 end,
113 MgrAcc#mgr{cached_groups=CachedGroups2}
114 end,
115 Mgr, GroupIds),
116 {reply, ok, Mgr2}.
118 handle_cast({get_group_async, GroupId, OrigFrom}, Mgr) ->
119 #mgr{
120 db=Db,
121 fd=Fd,
122 get_group_info_fun=GetGroupInfoFun,
123 cached_groups=CachedGroups
124 } = Mgr,
125 case dict:find(GroupId, CachedGroups) of
126 {ok, CachedGroup} ->
127 gen_server:reply(OrigFrom, {ok, CachedGroup}),
128 {noreply, Mgr};
129 error ->
130 {Mgr2, Reply} =
131 case GetGroupInfoFun(GroupId) of
132 {ok, {NamedQueries, GroupState}} ->
133 {ok, Group} = open(Db, Fd, NamedQueries, GroupState),
134 NewMgr = Mgr#mgr{cached_groups=dict:store(GroupId, Group, CachedGroups)},
135 {NewMgr, {ok, Group}};
136 Else ->
137 {Mgr, Else}
138 end,
139 gen_server:reply(OrigFrom, Reply),
140 {noreply, Mgr2}
141 end;
142 handle_cast({update_group, GroupId, UpdateNotifFun}, Mgr) ->
143 #mgr{
144 update_state_by_uuid=ProcsDict,
145 uuids_by_pid=GroupIdDict,
146 db=Db,
147 fd=Fd,
148 get_group_info_fun=GetGroupInfoFun,
149 update_group_info_fun=UpdateGroupInfoFun,
150 cached_groups=CachedGroups
151 } = Mgr,
152 case dict:find(GroupId, ProcsDict) of
153 {ok, {processing_request, NotifyFuns, Pid}} ->
154 ProcsDict2 = dict:store(
155 GroupId,
156 {processing_and_pending_request, NotifyFuns, [UpdateNotifFun], Pid},
157 ProcsDict
159 {noreply, Mgr#mgr{update_state_by_uuid=ProcsDict2}};
160 {ok, {processing_and_pending_request, NotifyFuns, PendingUpdateNotifFuns, Pid}} ->
161 ProcsDict2 = dict:store(
162 GroupId,
163 {processing_and_pending_request, NotifyFuns, [UpdateNotifFun | PendingUpdateNotifFuns], Pid},
164 ProcsDict
166 {noreply, Mgr#mgr{update_state_by_uuid=ProcsDict2}};
167 error ->
168 case dict:find(GroupId, CachedGroups) of
169 {ok, Group} ->
170 Pid = spawn_link(couch_table_group, update_table_proc, [self(), Group, GroupId, UpdateGroupInfoFun, [UpdateNotifFun]]);
171 error ->
172 Pid = spawn_link(couch_table_group, update_table_proc, [self(), Db, Fd, GroupId, GetGroupInfoFun, UpdateGroupInfoFun, [UpdateNotifFun]])
173 end,
174 ProcsDict2 = dict:store(GroupId, {processing_request, [UpdateNotifFun], Pid}, ProcsDict),
175 GroupIdDict2 = dict:store(Pid, GroupId, GroupIdDict),
176 {noreply, Mgr#mgr{update_state_by_uuid=ProcsDict2, uuids_by_pid=GroupIdDict2}}
177 end;
178 handle_cast(stop, Mgr) ->
179 {stop, normal, Mgr}. % causes terminate to be called
181 handle_info({'EXIT', FromPid, Reason}, Mgr) ->
182 #mgr{
183 update_state_by_uuid=ProcsDict,
184 uuids_by_pid=GroupIdDict,
185 db=Db,
186 fd=Fd,
187 get_group_info_fun=GetGroupInfoFun,
188 update_group_info_fun=UpdateGroupInfoFun,
189 cached_groups=CachedGroups
190 } = Mgr,
191 case dict:find(FromPid, GroupIdDict) of
192 {ok, GroupId} ->
193 case dict:find(GroupId, ProcsDict) of
194 {ok, {processing_request, NotifyFuns, _Pid}} ->
195 GroupIdDict2 = dict:erase(FromPid, GroupIdDict),
196 ProcsDict2 = dict:erase(GroupId, ProcsDict);
197 {ok, {processing_and_pending_request, NotifyFuns, NextNotifyFuns, _Pid}} ->
198 case dict:find(GroupId, CachedGroups) of
199 {ok, Group} ->
200 Pid = spawn_link(couch_table_group, update_table_proc, [self(), Group, GroupId, UpdateGroupInfoFun, NextNotifyFuns]);
201 error ->
202 Pid = spawn_link(couch_table_group, update_table_proc, [self(), Db, Fd, GroupId, GetGroupInfoFun, UpdateGroupInfoFun, NextNotifyFuns])
203 end,
204 GroupIdDict2 = dict:store(Pid, GroupId, dict:erase(FromPid, GroupIdDict)),
205 ProcsDict2 = dict:store(GroupId, {processing_request, NextNotifyFuns, Pid}, ProcsDict)
206 end,
207 case Reason of
208 normal ->
210 {{nocatch, Error}, _Trace} ->
211 % process returned abnormally, notify any waiting listeners
212 [catch NotifyFun(Error) || NotifyFun <- NotifyFuns];
213 _Else ->
214 % process returned abnormally, notify any waiting listeners
215 [catch NotifyFun(Reason) || NotifyFun <- NotifyFuns]
216 end,
217 Mgr2 = Mgr#mgr{update_state_by_uuid=ProcsDict2, uuids_by_pid=GroupIdDict2},
218 {noreply, Mgr2};
219 error ->
220 % a linked process must have died, we propagate the error
221 exit(Reason)
222 end.
224 code_change(_OldVsn, State, _Extra) ->
225 {ok, State}.
228 open(Db, Fd, NamedQueries, GroupState) ->
229 open(Db, Fd, NamedQueries, GroupState, fun(_Table) -> ok end).
231 open(Db, Fd, NamedQueries, GroupState, UpdateNotifFun) ->
232 {TableBtreeStates, CurrentSeq, GroupIdBtreeState} =
233 case GroupState of
234 nil -> % new table group, init GroupState to nils
235 {[ nil || _Query <- NamedQueries], 0, nil};
236 GroupState ->
237 GroupState
238 end,
239 {Tables, _Count} =
240 lists:mapfoldl(fun({{Name, QueryString}, BtreeState}, Count) ->
241 {ok, Btree} = couch_btree:open(BtreeState, Fd, fun less_json/2),
242 {#table{name=Name, id_num=Count, btree=Btree, query_string=QueryString}, Count+1}
243 end,
244 0, lists:zip(NamedQueries, TableBtreeStates)),
245 {ok, GroupIdBtree} = couch_btree:open(GroupIdBtreeState, Fd),
246 TableGroup = #table_group{db=Db,
247 tables=Tables,
248 current_seq=CurrentSeq,
249 uuid_btree=GroupIdBtree,
250 update_notif_fun=UpdateNotifFun,
251 named_queries=NamedQueries},
252 {ok, TableGroup}.
255 get_info(#table_group{named_queries=NamedQueries, tables=Tables, current_seq=CurrentSeq, uuid_btree=GroupIdBtree} = _TableGroup) ->
256 TableBtreeStates = [couch_btree:get_state(Table#table.btree) || Table <- Tables],
257 {NamedQueries, {TableBtreeStates, CurrentSeq, couch_btree:get_state(GroupIdBtree)}}.
259 fold(TableGroup, TableName, Dir, Fun, Acc) ->
260 Result = fold_int(TableGroup#table_group.tables, TableName, Dir, Fun, Acc),
261 Result.
263 fold_int([], _TableName, _Dir, _Fun, _Acc) ->
264 {not_found, missing_named_table};
265 fold_int([Table | _RestTables], TableName, Dir, Fun, Acc) when Table#table.name == TableName ->
266 TotalRowCount = couch_btree:row_count(Table#table.btree),
267 WrapperFun = fun({{Key, DocId}, {Rev, Value}}, Offset, WrapperAcc) ->
268 Fun({DocId,Rev}, Key, Value, Offset, TotalRowCount, WrapperAcc)
269 end,
270 {ok, AccResult} = couch_btree:fold(Table#table.btree, Dir, WrapperFun, Acc),
271 {ok, TotalRowCount, AccResult};
272 fold_int([_Table | RestTables], TableName, Dir, Fun, Acc) ->
273 fold_int(RestTables, TableName, Dir, Fun, Acc).
276 fold(TableGroup, TableName, StartKey, Dir, Fun, Acc) ->
277 Result = fold_int(TableGroup#table_group.tables, TableName, StartKey, Dir, Fun, Acc),
278 Result.
280 fold_int([], _TableName, _StartKey, _Dir, _Fun, _Acc) ->
281 {not_found, missing_named_table};
282 fold_int([Table | _RestTables], TableName, StartKey, Dir, Fun, Acc) when Table#table.name == TableName ->
283 TotalRowCount = couch_btree:row_count(Table#table.btree),
284 WrapperFun = fun({{Key, DocId}, {Rev, Value}}, Offset, WrapperAcc) ->
285 Fun({DocId, Rev}, Key, Value, Offset, TotalRowCount, WrapperAcc)
286 end,
287 {ok, AccResult} = couch_btree:fold(Table#table.btree, StartKey, Dir, WrapperFun, Acc),
288 {ok, TotalRowCount, AccResult};
289 fold_int([_Table | RestTables], TableName, StartKey, Dir, Fun, Acc) ->
290 fold_int(RestTables, TableName, StartKey, Dir, Fun, Acc).
292 less_json(A, B) ->
293 TypeA = type_sort(A),
294 TypeB = type_sort(B),
296 TypeA == TypeB ->
297 less_same_type(A,B);
298 true ->
299 TypeA < TypeB
300 end.
302 type_sort(V) when is_atom(V) -> 0;
303 type_sort(V) when is_integer(V) -> 1;
304 type_sort(V) when is_float(V) -> 1;
305 type_sort(V) when is_list(V) -> 2;
306 type_sort({obj, _}) -> 4; % must come before tuple test below
307 type_sort(V) when is_tuple(V) -> 3;
308 type_sort(V) when is_binary(V) -> 5.
310 atom_sort(nil) -> 0;
311 atom_sort(null) -> 1;
312 atom_sort(false) -> 2;
313 atom_sort(true) -> 3.
315 less_same_type(A,B) when is_atom(A) ->
316 atom_sort(A) < atom_sort(B);
317 less_same_type(A,B) when is_list(A) ->
318 couch_util:collate(A, B) < 0;
319 less_same_type({obj, AProps}, {obj, BProps}) ->
320 less_props(AProps, BProps);
321 less_same_type(A, B) when is_tuple(A) ->
322 less_list(tuple_to_list(A),tuple_to_list(B));
323 less_same_type(A, B) ->
324 A < B.
326 ensure_list(V) when is_list(V) -> V;
327 ensure_list(V) when is_atom(V) -> atom_to_list(V).
329 less_props([], [_|_]) ->
330 true;
331 less_props(_, []) ->
332 false;
333 less_props([{AKey, AValue}|RestA], [{BKey, BValue}|RestB]) ->
334 case couch_util:collate(ensure_list(AKey), ensure_list(BKey)) of
335 -1 -> true;
336 1 -> false;
337 0 ->
338 case less_json(AValue, BValue) of
339 true -> true;
340 false ->
341 case less_json(BValue, AValue) of
342 true -> false;
343 false ->
344 less_props(RestA, RestB)
347 end.
349 less_list([], [_|_]) ->
350 true;
351 less_list(_, []) ->
352 false;
353 less_list([A|RestA], [B|RestB]) ->
354 case less_json(A,B) of
355 true -> true;
356 false ->
357 case less_json(B,A) of
358 true -> false;
359 false ->
360 less_list(RestA, RestB)
362 end.
365 notify(MgrPid, UpdateStatus, TableGroup, GroupId, UpdateGroupInfoFun, StatusNotifyFuns) ->
366 GroupInfo = get_info(TableGroup),
367 ok = gen_server:call(MgrPid, {group_cache_update, GroupId, TableGroup}),
368 ok = UpdateGroupInfoFun(GroupId, UpdateStatus, GroupInfo),
370 StatusNotifyFuns2 = lists:foldl(fun(NotifyFun, AccFuns) ->
371 case (catch NotifyFun(UpdateStatus)) of
372 NewNotifyFun when is_function(NewNotifyFun) ->
373 [NewNotifyFun | AccFuns];
374 _Else ->
375 AccFuns
377 end,
378 [], StatusNotifyFuns),
379 fun(UpdateStatus2, TableGroup2) ->
380 notify(MgrPid, UpdateStatus2, TableGroup2, GroupId, UpdateGroupInfoFun, StatusNotifyFuns2)
381 end.
383 update_table_proc(MgrPid, #table_group{} = TableGroup, GroupId, UpdateGroupInfoFun, StatusNotifyFuns) ->
384 NotifyFun =
385 fun(UpdateStatus, TableGroup2) ->
386 notify(MgrPid, UpdateStatus, TableGroup2, GroupId, UpdateGroupInfoFun, StatusNotifyFuns)
387 end,
388 update_int(TableGroup, NotifyFun).
390 update_table_proc(MgrPid, Db, Fd, GroupId, GetGroupInfoFun, UpdateGroupInfoFun, StatusNotifyFuns) ->
391 case GetGroupInfoFun(GroupId) of
392 {ok, {NamedQueries, GroupState}} ->
393 {ok, TableGroup} = open(Db, Fd, NamedQueries, GroupState),
394 update_table_proc(MgrPid, TableGroup, GroupId, UpdateGroupInfoFun, StatusNotifyFuns);
395 Error ->
396 exit(Error)
397 end.
399 update_int(TableGroup, NotifyFun) ->
400 #table_group{
401 db=Db,
402 tables=Tables,
403 current_seq=CurrentSeq
404 } = TableGroup,
405 TableEmptyKVs = [{Table, []} || Table <- Tables],
407 % compute on all docs modified since we last computed.
408 {ok, {UncomputedDocs, TableGroup2, TableKVsToAdd, DocIdTableIdKeys, NewSeq, NotifyFun2}}
409 = couch_db:enum_docs_since(
411 CurrentSeq,
412 fun(DocInfo, Acc) -> process_doc(Db, DocInfo, Acc) end,
413 {[], TableGroup, TableEmptyKVs, [], CurrentSeq, NotifyFun}
416 {TableGroup3, Results} = table_compute(TableGroup2, UncomputedDocs),
417 {TableKVsToAdd2, DocIdTableIdKeys2} = table_insert_query_results(UncomputedDocs, Results, TableKVsToAdd, DocIdTableIdKeys),
418 couch_js:stop_doc_map(TableGroup3#table_group.compiled_doc_map),
419 {ok, TableGroup4} = write_changes(TableGroup3, TableKVsToAdd2, DocIdTableIdKeys2, NewSeq),
420 NotifyFun2(complete, TableGroup4),
421 {ok, TableGroup4}.
424 process_doc(Db, DocInfo, {Docs, TableGroup, TableKVs, DocIdTableIdKeys, _LastSeq, NotifyFun}) ->
425 % This fun computes once for each document
426 #doc_info{uuid=DocId,
427 update_seq=Seq} = DocInfo,
428 case couch_doc:is_special_doc(DocId) of
429 true ->
430 % skip this doc
431 {ok, {Docs, TableGroup, TableKVs, DocIdTableIdKeys, _LastSeq, NotifyFun}};
432 false ->
433 {Docs2, DocIdTableIdKeys2} =
434 case couch_db:open_doc(Db, DocInfo) of
435 {ok, Doc} ->
436 {[Doc | Docs], DocIdTableIdKeys};
437 {not_found, deleted} ->
438 {Docs, [{DocId, []} | DocIdTableIdKeys]}
439 end,
440 case process_info(self(), memory) of
441 {memory, Mem} when Mem > ?FLUSH_MAX_MEM ->
442 {TableGroup1, Results} = table_compute(TableGroup, Docs2),
443 {TableKVs3, DocIdTableIdKeys3} = table_insert_query_results(Docs2, Results, TableKVs, DocIdTableIdKeys2),
444 {ok, TableGroup2} = write_changes(TableGroup1, TableKVs3, DocIdTableIdKeys3, Seq),
445 garbage_collect(),
446 NotifyFun2 = NotifyFun(partial, TableGroup2),
447 TableEmptyKeyValues = [{Table, []} || Table <- TableGroup2#table_group.tables],
448 {ok, {[], TableGroup2, TableEmptyKeyValues, [], Seq, NotifyFun2}};
449 _Else ->
450 {ok, {Docs2, TableGroup, TableKVs, DocIdTableIdKeys2, Seq, NotifyFun}}
452 end.
454 table_insert_query_results([], [], TableKVs, DocIdTableIdKeysAcc) ->
455 {TableKVs, DocIdTableIdKeysAcc};
456 table_insert_query_results([Doc|RestDocs], [QueryResults | RestResults], TableKVs, DocIdTableIdKeysAcc) ->
457 {NewTableKVs, NewTableIdKeys} = table_insert_doc_query_results(Doc, QueryResults, TableKVs, [], []),
458 NewDocIdTableIdKeys = [{Doc#doc.uuid, NewTableIdKeys} | DocIdTableIdKeysAcc],
459 table_insert_query_results(RestDocs, RestResults, NewTableKVs, NewDocIdTableIdKeys).
462 table_insert_doc_query_results(_Doc, [], [], TableKVsAcc, TableIdKeysAcc) ->
463 {lists:reverse(TableKVsAcc), lists:reverse(TableIdKeysAcc)};
464 table_insert_doc_query_results(#doc{uuid=DocId, revisions=[Rev|_]}=Doc, [ResultKVs|RestResults], [{Table, KVs}|RestTableKVs], TableKVsAcc, TableIdKeysAcc) ->
465 NewKVs = [{{Key, DocId}, {Rev, Value}} || {Key, Value} <- ResultKVs],
466 NewTableIdKeys = [{Table#table.id_num, Key} || {Key, _Value} <- ResultKVs],
467 NewTableKVsAcc = [{Table, NewKVs ++ KVs} | TableKVsAcc],
468 NewTableIdKeysAcc = NewTableIdKeys ++ TableIdKeysAcc,
469 table_insert_doc_query_results(Doc, RestResults, RestTableKVs, NewTableKVsAcc, NewTableIdKeysAcc).
471 table_compute(TableGroup, []) ->
472 {TableGroup, []};
473 table_compute(#table_group{compiled_doc_map=DocMap}=TableGroup, Docs) ->
474 DocMap1 =
475 case DocMap of
476 nil -> % doc map not started
477 {ok, DocMap0} = couch_js:start_doc_map(queries(TableGroup)),
478 DocMap0;
479 _ ->
480 DocMap
481 end,
482 {ok, Results} = couch_js:map_docs(DocMap1, Docs),
483 {TableGroup#table_group{compiled_doc_map=DocMap1}, Results}.
485 queries(TableGroup) ->
486 [Table#table.query_string || Table <- TableGroup#table_group.tables].
489 dict_find(Key, DefaultValue, Dict) ->
490 case dict:find(Key, Dict) of
491 {ok, Value} ->
492 Value;
493 error ->
494 DefaultValue
495 end.
497 write_changes(TableGroup, TableKeyValuesToAdd, DocIdTableIdKeys, NewSeq) ->
498 #table_group{uuid_btree=GroupIdBtree} = TableGroup,
500 AddDocIdTableIdKeys = [{DocId, TableIdKeys} || {DocId, TableIdKeys} <- DocIdTableIdKeys, TableIdKeys /= []],
501 RemoveDocIds = [DocId || {DocId, TableIdKeys} <- DocIdTableIdKeys, TableIdKeys == []],
502 LookupDocIds = [DocId || {DocId, _TableIdKeys} <- DocIdTableIdKeys],
504 {ok, LookupResults, GroupIdBtree2}
505 = couch_btree:query_modify(GroupIdBtree, LookupDocIds, AddDocIdTableIdKeys, RemoveDocIds),
506 KeysToRemoveByTable = lists:foldl(
507 fun(LookupResult, KeysToRemoveByTableAcc) ->
508 case LookupResult of
509 {ok, {DocId, TableIdKeys}} ->
510 lists:foldl(
511 fun({TableId, Key}, KeysToRemoveByTableAcc2) ->
512 dict:append(TableId, {Key, DocId}, KeysToRemoveByTableAcc2)
513 end,
514 KeysToRemoveByTableAcc, TableIdKeys);
515 {not_found, _} ->
516 KeysToRemoveByTableAcc
518 end,
519 dict:new(), LookupResults),
521 Tables2 = [
522 begin
523 KeysToRemove = dict_find(Table#table.id_num, [], KeysToRemoveByTable),
524 {ok, TableBtree2} = couch_btree:add_remove(Table#table.btree, AddKeyValues, KeysToRemove),
525 Table#table{btree = TableBtree2}
528 {Table, AddKeyValues} <- TableKeyValuesToAdd
530 TableGroup2 = TableGroup#table_group{tables=Tables2, current_seq=NewSeq, uuid_btree=GroupIdBtree2},
531 {ok, TableGroup2}.