2 %% Copyright (C) 2006 Damien Katz
4 %% This program is free software; you can redistribute it and/or
5 %% modify it under the terms of the GNU General Public License
6 %% as published by the Free Software Foundation; either version 2
7 %% of the License, or (at your option) any later version.
9 %% This program is distributed in the hope that it will be useful,
10 %% but WITHOUT ANY WARRANTY; without even the implied warranty of
11 %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 %% GNU General Public License for more details.
14 %% You should have received a copy of the GNU General Public License
15 %% along with this program; if not, write to the Free Software
16 %% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 -module(couch_table_group
).
19 -behaviour(gen_server
).
21 -export([start_manager
/4,stop
/1,
22 open
/4,open
/5,fold
/5,fold
/6,
23 update_group
/3,update_table_proc
/5,free_groups
/2,
24 update_table_proc
/7,get_group_async
/3,
26 -export([init
/1,terminate
/2,handle_call
/3,handle_cast
/2,handle_info
/2,code_change
/3]).
28 -include("couch_db.hrl").
30 % arbitrarily chosen amount of memory to use before flushing to disk
31 -define(FLUSH_MAX_MEM
, 1000000).
40 compiled_doc_map
= nil
54 update_state_by_uuid
= dict:new(),
55 uuids_by_pid
= dict:new(),
57 update_group_info_fun
,
58 cached_groups
=dict:new()
62 start_manager(Db
, Fd
, GetGroupInfoFun
, UpdateGroupInfoFun
) ->
63 gen_server:start_link(couch_table_group
, {Db
, Fd
, GetGroupInfoFun
, UpdateGroupInfoFun
}, []).
66 gen_server:cast(MgrPid
, stop
).
68 init({Db
, Fd
, GetGroupInfoFun
, UpdateGroupInfoFun
}) ->
69 process_flag(trap_exit
, true
),
70 {ok
, #mgr
{db
=Db
, fd
=Fd
, get_group_info_fun
=GetGroupInfoFun
, update_group_info_fun
=UpdateGroupInfoFun
}}.
72 terminate(_Reason
, _Mgr
) ->
75 get_group_async(MgrPid
, GroupId
, OrigFrom
) ->
76 gen_server:cast(MgrPid
, {get_group_async
, GroupId
, OrigFrom
}).
79 update_group(MgrPid
, GroupId
, UpdateNotifFun
) ->
80 gen_server:cast(MgrPid
, {update_group
, GroupId
, UpdateNotifFun
}).
82 % stops any processing of the table and frees and cached values
83 free_groups(MgrPid
, GroupIds
) ->
84 gen_server:call(MgrPid
, {free_groups
, GroupIds
}).
87 % called from the update process
88 handle_call({group_cache_update
, GroupId
, Group
}, {FromPid
, _FromRef
}, Mgr
) ->
89 Group2
= Group#table_group
{compiled_doc_map
=nil
},
90 % the process may have been killed by the free groups call
91 % so check to make sure its alive.
92 case is_process_alive(FromPid
) of
94 #mgr
{cached_groups
=CachedGroups
} = Mgr
,
95 CachedGroups2
= dict:store(GroupId
, Group2
, CachedGroups
),
96 {reply
, ok
, Mgr#mgr
{cached_groups
=CachedGroups2
}};
100 handle_call({free_groups
, GroupIds
}, _From
, Mgr
) ->
102 lists:foldl(fun(GroupId
, MgrAcc
) ->
103 #mgr
{cached_groups
=CachedGroups
, update_state_by_uuid
=ProcsDict
} = MgrAcc
,
104 CachedGroups2
= dict:erase(GroupId
, CachedGroups
),
105 case dict:find(GroupId
, ProcsDict
) of
106 {ok
, {processing_request
, _NotifyFuns
, Pid
}} ->
108 {ok
, {processing_and_pending_request
, _NotifyFuns
, _PendingUpdateNotifFuns
, Pid
}} ->
113 MgrAcc#mgr
{cached_groups
=CachedGroups2
}
118 handle_cast({get_group_async
, GroupId
, OrigFrom
}, Mgr
) ->
122 get_group_info_fun
=GetGroupInfoFun
,
123 cached_groups
=CachedGroups
125 case dict:find(GroupId
, CachedGroups
) of
127 gen_server:reply(OrigFrom
, {ok
, CachedGroup
}),
131 case GetGroupInfoFun(GroupId
) of
132 {ok
, {NamedQueries
, GroupState
}} ->
133 {ok
, Group
} = open(Db
, Fd
, NamedQueries
, GroupState
),
134 NewMgr
= Mgr#mgr
{cached_groups
=dict:store(GroupId
, Group
, CachedGroups
)},
135 {NewMgr
, {ok
, Group
}};
139 gen_server:reply(OrigFrom
, Reply
),
142 handle_cast({update_group
, GroupId
, UpdateNotifFun
}, Mgr
) ->
144 update_state_by_uuid
=ProcsDict
,
145 uuids_by_pid
=GroupIdDict
,
148 get_group_info_fun
=GetGroupInfoFun
,
149 update_group_info_fun
=UpdateGroupInfoFun
,
150 cached_groups
=CachedGroups
152 case dict:find(GroupId
, ProcsDict
) of
153 {ok
, {processing_request
, NotifyFuns
, Pid
}} ->
154 ProcsDict2
= dict:store(
156 {processing_and_pending_request
, NotifyFuns
, [UpdateNotifFun
], Pid
},
159 {noreply
, Mgr#mgr
{update_state_by_uuid
=ProcsDict2
}};
160 {ok
, {processing_and_pending_request
, NotifyFuns
, PendingUpdateNotifFuns
, Pid
}} ->
161 ProcsDict2
= dict:store(
163 {processing_and_pending_request
, NotifyFuns
, [UpdateNotifFun
| PendingUpdateNotifFuns
], Pid
},
166 {noreply
, Mgr#mgr
{update_state_by_uuid
=ProcsDict2
}};
168 case dict:find(GroupId
, CachedGroups
) of
170 Pid
= spawn_link(couch_table_group
, update_table_proc
, [self(), Group
, GroupId
, UpdateGroupInfoFun
, [UpdateNotifFun
]]);
172 Pid
= spawn_link(couch_table_group
, update_table_proc
, [self(), Db
, Fd
, GroupId
, GetGroupInfoFun
, UpdateGroupInfoFun
, [UpdateNotifFun
]])
174 ProcsDict2
= dict:store(GroupId
, {processing_request
, [UpdateNotifFun
], Pid
}, ProcsDict
),
175 GroupIdDict2
= dict:store(Pid
, GroupId
, GroupIdDict
),
176 {noreply
, Mgr#mgr
{update_state_by_uuid
=ProcsDict2
, uuids_by_pid
=GroupIdDict2
}}
178 handle_cast(stop
, Mgr
) ->
179 {stop
, normal
, Mgr
}. % causes terminate to be called
181 handle_info({'EXIT', FromPid
, Reason
}, Mgr
) ->
183 update_state_by_uuid
=ProcsDict
,
184 uuids_by_pid
=GroupIdDict
,
187 get_group_info_fun
=GetGroupInfoFun
,
188 update_group_info_fun
=UpdateGroupInfoFun
,
189 cached_groups
=CachedGroups
191 case dict:find(FromPid
, GroupIdDict
) of
193 case dict:find(GroupId
, ProcsDict
) of
194 {ok
, {processing_request
, NotifyFuns
, _Pid
}} ->
195 GroupIdDict2
= dict:erase(FromPid
, GroupIdDict
),
196 ProcsDict2
= dict:erase(GroupId
, ProcsDict
);
197 {ok
, {processing_and_pending_request
, NotifyFuns
, NextNotifyFuns
, _Pid
}} ->
198 case dict:find(GroupId
, CachedGroups
) of
200 Pid
= spawn_link(couch_table_group
, update_table_proc
, [self(), Group
, GroupId
, UpdateGroupInfoFun
, NextNotifyFuns
]);
202 Pid
= spawn_link(couch_table_group
, update_table_proc
, [self(), Db
, Fd
, GroupId
, GetGroupInfoFun
, UpdateGroupInfoFun
, NextNotifyFuns
])
204 GroupIdDict2
= dict:store(Pid
, GroupId
, dict:erase(FromPid
, GroupIdDict
)),
205 ProcsDict2
= dict:store(GroupId
, {processing_request
, NextNotifyFuns
, Pid
}, ProcsDict
)
210 {{nocatch
, Error
}, _Trace
} ->
211 % process returned abnormally, notify any waiting listeners
212 [catch NotifyFun(Error
) || NotifyFun
<- NotifyFuns
];
214 % process returned abnormally, notify any waiting listeners
215 [catch NotifyFun(Reason
) || NotifyFun
<- NotifyFuns
]
217 Mgr2
= Mgr#mgr
{update_state_by_uuid
=ProcsDict2
, uuids_by_pid
=GroupIdDict2
},
220 % a linked process must have died, we propagate the error
224 code_change(_OldVsn
, State
, _Extra
) ->
228 open(Db
, Fd
, NamedQueries
, GroupState
) ->
229 open(Db
, Fd
, NamedQueries
, GroupState
, fun(_Table
) -> ok
end).
231 open(Db
, Fd
, NamedQueries
, GroupState
, UpdateNotifFun
) ->
232 {TableBtreeStates
, CurrentSeq
, GroupIdBtreeState
} =
234 nil
-> % new table group, init GroupState to nils
235 {[ nil
|| _Query
<- NamedQueries
], 0, nil
};
240 lists:mapfoldl(fun({{Name
, QueryString
}, BtreeState
}, Count
) ->
241 {ok
, Btree
} = couch_btree:open(BtreeState
, Fd
, fun less_json
/2),
242 {#table
{name
=Name
, id_num
=Count
, btree
=Btree
, query_string
=QueryString
}, Count
+1}
244 0, lists:zip(NamedQueries
, TableBtreeStates
)),
245 {ok
, GroupIdBtree
} = couch_btree:open(GroupIdBtreeState
, Fd
),
246 TableGroup
= #table_group
{db
=Db
,
248 current_seq
=CurrentSeq
,
249 uuid_btree
=GroupIdBtree
,
250 update_notif_fun
=UpdateNotifFun
,
251 named_queries
=NamedQueries
},
255 get_info(#table_group
{named_queries
=NamedQueries
, tables
=Tables
, current_seq
=CurrentSeq
, uuid_btree
=GroupIdBtree
} = _TableGroup
) ->
256 TableBtreeStates
= [couch_btree:get_state(Table#table
.btree
) || Table
<- Tables
],
257 {NamedQueries
, {TableBtreeStates
, CurrentSeq
, couch_btree:get_state(GroupIdBtree
)}}.
259 fold(TableGroup
, TableName
, Dir
, Fun
, Acc
) ->
260 Result
= fold_int(TableGroup#table_group
.tables
, TableName
, Dir
, Fun
, Acc
),
263 fold_int([], _TableName
, _Dir
, _Fun
, _Acc
) ->
264 {not_found
, missing_named_table
};
265 fold_int([Table
| _RestTables
], TableName
, Dir
, Fun
, Acc
) when Table#table
.name
== TableName
->
266 TotalRowCount
= couch_btree:row_count(Table#table
.btree
),
267 WrapperFun
= fun({{Key
, DocId
}, {Rev
, Value
}}, Offset
, WrapperAcc
) ->
268 Fun({DocId
,Rev
}, Key
, Value
, Offset
, TotalRowCount
, WrapperAcc
)
270 {ok
, AccResult
} = couch_btree:fold(Table#table
.btree
, Dir
, WrapperFun
, Acc
),
271 {ok
, TotalRowCount
, AccResult
};
272 fold_int([_Table
| RestTables
], TableName
, Dir
, Fun
, Acc
) ->
273 fold_int(RestTables
, TableName
, Dir
, Fun
, Acc
).
276 fold(TableGroup
, TableName
, StartKey
, Dir
, Fun
, Acc
) ->
277 Result
= fold_int(TableGroup#table_group
.tables
, TableName
, StartKey
, Dir
, Fun
, Acc
),
280 fold_int([], _TableName
, _StartKey
, _Dir
, _Fun
, _Acc
) ->
281 {not_found
, missing_named_table
};
282 fold_int([Table
| _RestTables
], TableName
, StartKey
, Dir
, Fun
, Acc
) when Table#table
.name
== TableName
->
283 TotalRowCount
= couch_btree:row_count(Table#table
.btree
),
284 WrapperFun
= fun({{Key
, DocId
}, {Rev
, Value
}}, Offset
, WrapperAcc
) ->
285 Fun({DocId
, Rev
}, Key
, Value
, Offset
, TotalRowCount
, WrapperAcc
)
287 {ok
, AccResult
} = couch_btree:fold(Table#table
.btree
, StartKey
, Dir
, WrapperFun
, Acc
),
288 {ok
, TotalRowCount
, AccResult
};
289 fold_int([_Table
| RestTables
], TableName
, StartKey
, Dir
, Fun
, Acc
) ->
290 fold_int(RestTables
, TableName
, StartKey
, Dir
, Fun
, Acc
).
293 TypeA
= type_sort(A
),
294 TypeB
= type_sort(B
),
302 type_sort(V
) when is_atom(V
) -> 0;
303 type_sort(V
) when is_integer(V
) -> 1;
304 type_sort(V
) when is_float(V
) -> 1;
305 type_sort(V
) when is_list(V
) -> 2;
306 type_sort({obj
, _
}) -> 4; % must come before tuple test below
307 type_sort(V
) when is_tuple(V
) -> 3;
308 type_sort(V
) when is_binary(V
) -> 5.
311 atom_sort(null
) -> 1;
312 atom_sort(false
) -> 2;
313 atom_sort(true
) -> 3.
315 less_same_type(A
,B
) when is_atom(A
) ->
316 atom_sort(A
) < atom_sort(B
);
317 less_same_type(A
,B
) when is_list(A
) ->
318 couch_util:collate(A
, B
) < 0;
319 less_same_type({obj
, AProps
}, {obj
, BProps
}) ->
320 less_props(AProps
, BProps
);
321 less_same_type(A
, B
) when is_tuple(A
) ->
322 less_list(tuple_to_list(A
),tuple_to_list(B
));
323 less_same_type(A
, B
) ->
326 ensure_list(V
) when is_list(V
) -> V
;
327 ensure_list(V
) when is_atom(V
) -> atom_to_list(V
).
329 less_props([], [_
|_
]) ->
333 less_props([{AKey
, AValue
}|RestA
], [{BKey
, BValue
}|RestB
]) ->
334 case couch_util:collate(ensure_list(AKey
), ensure_list(BKey
)) of
338 case less_json(AValue
, BValue
) of
341 case less_json(BValue
, AValue
) of
344 less_props(RestA
, RestB
)
349 less_list([], [_
|_
]) ->
353 less_list([A
|RestA
], [B
|RestB
]) ->
354 case less_json(A
,B
) of
357 case less_json(B
,A
) of
360 less_list(RestA
, RestB
)
365 notify(MgrPid
, UpdateStatus
, TableGroup
, GroupId
, UpdateGroupInfoFun
, StatusNotifyFuns
) ->
366 GroupInfo
= get_info(TableGroup
),
367 ok
= gen_server:call(MgrPid
, {group_cache_update
, GroupId
, TableGroup
}),
368 ok
= UpdateGroupInfoFun(GroupId
, UpdateStatus
, GroupInfo
),
370 StatusNotifyFuns2
= lists:foldl(fun(NotifyFun
, AccFuns
) ->
371 case (catch NotifyFun(UpdateStatus
)) of
372 NewNotifyFun
when is_function(NewNotifyFun
) ->
373 [NewNotifyFun
| AccFuns
];
378 [], StatusNotifyFuns
),
379 fun(UpdateStatus2
, TableGroup2
) ->
380 notify(MgrPid
, UpdateStatus2
, TableGroup2
, GroupId
, UpdateGroupInfoFun
, StatusNotifyFuns2
)
383 update_table_proc(MgrPid
, #table_group
{} = TableGroup
, GroupId
, UpdateGroupInfoFun
, StatusNotifyFuns
) ->
385 fun(UpdateStatus
, TableGroup2
) ->
386 notify(MgrPid
, UpdateStatus
, TableGroup2
, GroupId
, UpdateGroupInfoFun
, StatusNotifyFuns
)
388 update_int(TableGroup
, NotifyFun
).
390 update_table_proc(MgrPid
, Db
, Fd
, GroupId
, GetGroupInfoFun
, UpdateGroupInfoFun
, StatusNotifyFuns
) ->
391 case GetGroupInfoFun(GroupId
) of
392 {ok
, {NamedQueries
, GroupState
}} ->
393 {ok
, TableGroup
} = open(Db
, Fd
, NamedQueries
, GroupState
),
394 update_table_proc(MgrPid
, TableGroup
, GroupId
, UpdateGroupInfoFun
, StatusNotifyFuns
);
399 update_int(TableGroup
, NotifyFun
) ->
403 current_seq
=CurrentSeq
405 TableEmptyKVs
= [{Table
, []} || Table
<- Tables
],
407 % compute on all docs modified since we last computed.
408 {ok
, {UncomputedDocs
, TableGroup2
, TableKVsToAdd
, DocIdTableIdKeys
, NewSeq
, NotifyFun2
}}
409 = couch_db:enum_docs_since(
412 fun(DocInfo
, Acc
) -> process_doc(Db
, DocInfo
, Acc
) end,
413 {[], TableGroup
, TableEmptyKVs
, [], CurrentSeq
, NotifyFun
}
416 {TableGroup3
, Results
} = table_compute(TableGroup2
, UncomputedDocs
),
417 {TableKVsToAdd2
, DocIdTableIdKeys2
} = table_insert_query_results(UncomputedDocs
, Results
, TableKVsToAdd
, DocIdTableIdKeys
),
418 couch_js:stop_doc_map(TableGroup3#table_group
.compiled_doc_map
),
419 {ok
, TableGroup4
} = write_changes(TableGroup3
, TableKVsToAdd2
, DocIdTableIdKeys2
, NewSeq
),
420 NotifyFun2(complete
, TableGroup4
),
424 process_doc(Db
, DocInfo
, {Docs
, TableGroup
, TableKVs
, DocIdTableIdKeys
, _LastSeq
, NotifyFun
}) ->
425 % This fun computes once for each document
426 #doc_info
{uuid
=DocId
,
427 update_seq
=Seq
} = DocInfo
,
428 case couch_doc:is_special_doc(DocId
) of
431 {ok
, {Docs
, TableGroup
, TableKVs
, DocIdTableIdKeys
, _LastSeq
, NotifyFun
}};
433 {Docs2
, DocIdTableIdKeys2
} =
434 case couch_db:open_doc(Db
, DocInfo
) of
436 {[Doc
| Docs
], DocIdTableIdKeys
};
437 {not_found
, deleted
} ->
438 {Docs
, [{DocId
, []} | DocIdTableIdKeys
]}
440 case process_info(self(), memory
) of
441 {memory
, Mem
} when Mem
> ?FLUSH_MAX_MEM
->
442 {TableGroup1
, Results
} = table_compute(TableGroup
, Docs2
),
443 {TableKVs3
, DocIdTableIdKeys3
} = table_insert_query_results(Docs2
, Results
, TableKVs
, DocIdTableIdKeys2
),
444 {ok
, TableGroup2
} = write_changes(TableGroup1
, TableKVs3
, DocIdTableIdKeys3
, Seq
),
446 NotifyFun2
= NotifyFun(partial
, TableGroup2
),
447 TableEmptyKeyValues
= [{Table
, []} || Table
<- TableGroup2#table_group
.tables
],
448 {ok
, {[], TableGroup2
, TableEmptyKeyValues
, [], Seq
, NotifyFun2
}};
450 {ok
, {Docs2
, TableGroup
, TableKVs
, DocIdTableIdKeys2
, Seq
, NotifyFun
}}
454 table_insert_query_results([], [], TableKVs
, DocIdTableIdKeysAcc
) ->
455 {TableKVs
, DocIdTableIdKeysAcc
};
456 table_insert_query_results([Doc
|RestDocs
], [QueryResults
| RestResults
], TableKVs
, DocIdTableIdKeysAcc
) ->
457 {NewTableKVs
, NewTableIdKeys
} = table_insert_doc_query_results(Doc
, QueryResults
, TableKVs
, [], []),
458 NewDocIdTableIdKeys
= [{Doc#doc
.uuid
, NewTableIdKeys
} | DocIdTableIdKeysAcc
],
459 table_insert_query_results(RestDocs
, RestResults
, NewTableKVs
, NewDocIdTableIdKeys
).
462 table_insert_doc_query_results(_Doc
, [], [], TableKVsAcc
, TableIdKeysAcc
) ->
463 {lists:reverse(TableKVsAcc
), lists:reverse(TableIdKeysAcc
)};
464 table_insert_doc_query_results(#doc
{uuid
=DocId
, revisions
=[Rev
|_
]}=Doc
, [ResultKVs
|RestResults
], [{Table
, KVs
}|RestTableKVs
], TableKVsAcc
, TableIdKeysAcc
) ->
465 NewKVs
= [{{Key
, DocId
}, {Rev
, Value
}} || {Key
, Value
} <- ResultKVs
],
466 NewTableIdKeys
= [{Table#table
.id_num
, Key
} || {Key
, _Value
} <- ResultKVs
],
467 NewTableKVsAcc
= [{Table
, NewKVs
++ KVs
} | TableKVsAcc
],
468 NewTableIdKeysAcc
= NewTableIdKeys
++ TableIdKeysAcc
,
469 table_insert_doc_query_results(Doc
, RestResults
, RestTableKVs
, NewTableKVsAcc
, NewTableIdKeysAcc
).
471 table_compute(TableGroup
, []) ->
473 table_compute(#table_group
{compiled_doc_map
=DocMap
}=TableGroup
, Docs
) ->
476 nil
-> % doc map not started
477 {ok
, DocMap0
} = couch_js:start_doc_map(queries(TableGroup
)),
482 {ok
, Results
} = couch_js:map_docs(DocMap1
, Docs
),
483 {TableGroup#table_group
{compiled_doc_map
=DocMap1
}, Results
}.
485 queries(TableGroup
) ->
486 [Table#table
.query_string
|| Table
<- TableGroup#table_group
.tables
].
489 dict_find(Key
, DefaultValue
, Dict
) ->
490 case dict:find(Key
, Dict
) of
497 write_changes(TableGroup
, TableKeyValuesToAdd
, DocIdTableIdKeys
, NewSeq
) ->
498 #table_group
{uuid_btree
=GroupIdBtree
} = TableGroup
,
500 AddDocIdTableIdKeys
= [{DocId
, TableIdKeys
} || {DocId
, TableIdKeys
} <- DocIdTableIdKeys
, TableIdKeys
/= []],
501 RemoveDocIds
= [DocId
|| {DocId
, TableIdKeys
} <- DocIdTableIdKeys
, TableIdKeys
== []],
502 LookupDocIds
= [DocId
|| {DocId
, _TableIdKeys
} <- DocIdTableIdKeys
],
504 {ok
, LookupResults
, GroupIdBtree2
}
505 = couch_btree:query_modify(GroupIdBtree
, LookupDocIds
, AddDocIdTableIdKeys
, RemoveDocIds
),
506 KeysToRemoveByTable
= lists:foldl(
507 fun(LookupResult
, KeysToRemoveByTableAcc
) ->
509 {ok
, {DocId
, TableIdKeys
}} ->
511 fun({TableId
, Key
}, KeysToRemoveByTableAcc2
) ->
512 dict:append(TableId
, {Key
, DocId
}, KeysToRemoveByTableAcc2
)
514 KeysToRemoveByTableAcc
, TableIdKeys
);
516 KeysToRemoveByTableAcc
519 dict:new(), LookupResults
),
523 KeysToRemove
= dict_find(Table#table
.id_num
, [], KeysToRemoveByTable
),
524 {ok
, TableBtree2
} = couch_btree:add_remove(Table#table
.btree
, AddKeyValues
, KeysToRemove
),
525 Table#table
{btree
= TableBtree2
}
528 {Table
, AddKeyValues
} <- TableKeyValuesToAdd
530 TableGroup2
= TableGroup#table_group
{tables
=Tables2
, current_seq
=NewSeq
, uuid_btree
=GroupIdBtree2
},