2 %% Copyright (C) 2006 Damien Katz
4 %% This program is free software; you can redistribute it and/or
5 %% modify it under the terms of the GNU General Public License
6 %% as published by the Free Software Foundation; either version 2
7 %% of the License, or (at your option) any later version.
9 %% This program is distributed in the hope that it will be useful,
10 %% but WITHOUT ANY WARRANTY; without even the implied warranty of
11 %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 %% GNU General Public License for more details.
14 %% You should have received a copy of the GNU General Public License
15 %% along with this program; if not, write to the Free Software
16 %% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 -module(couch_table_group
).
19 -behaviour(gen_server
).
21 -export([start_manager
/4,stop
/1,
22 open
/4,open
/5,fold
/5,fold
/6,
23 update_group
/3,update_table_proc
/5,free_groups
/2,
24 update_table_proc
/7,get_group_async
/3,
26 -export([init
/1,terminate
/2,handle_call
/3,handle_cast
/2,handle_info
/2,code_change
/3]).
28 -include("couch_db.hrl").
30 % arbitrarily chosen amount of memory to use before flushing to disk
31 -define(FLUSH_MAX_MEM
, 1000000).
53 update_state_by_uuid
= dict:new(),
54 uuids_by_pid
= dict:new(),
56 update_group_info_fun
,
57 cached_groups
=dict:new()
61 start_manager(Db
, Fd
, GetGroupInfoFun
, UpdateGroupInfoFun
) ->
62 gen_server:start_link(couch_table_group
, {Db
, Fd
, GetGroupInfoFun
, UpdateGroupInfoFun
}, []).
65 gen_server:cast(MgrPid
, stop
).
67 init({Db
, Fd
, GetGroupInfoFun
, UpdateGroupInfoFun
}) ->
68 process_flag(trap_exit
, true
),
69 {ok
, #mgr
{db
=Db
, fd
=Fd
, get_group_info_fun
=GetGroupInfoFun
, update_group_info_fun
=UpdateGroupInfoFun
}}.
71 terminate(_Reason
, _Mgr
) ->
74 get_group_async(MgrPid
, GroupId
, OrigFrom
) ->
75 gen_server:cast(MgrPid
, {get_group_async
, GroupId
, OrigFrom
}).
78 update_group(MgrPid
, GroupId
, UpdateNotifFun
) ->
79 gen_server:cast(MgrPid
, {update_group
, GroupId
, UpdateNotifFun
}).
81 % stops any processing of the table and frees and cached values
82 free_groups(MgrPid
, GroupIds
) ->
83 gen_server:call(MgrPid
, {free_groups
, GroupIds
}).
86 % called from the update process
87 handle_call({group_cache_update
, GroupId
, Group
}, {FromPid
, _FromRef
}, Mgr
) ->
88 % the process may have been killed by the free groups call
89 % so check to make sure its alive.
90 case is_process_alive(FromPid
) of
92 #mgr
{cached_groups
=CachedGroups
} = Mgr
,
93 CachedGroups2
= dict:store(GroupId
, Group
, CachedGroups
),
94 {reply
, ok
, Mgr#mgr
{cached_groups
=CachedGroups2
}};
98 handle_call({free_groups
, GroupIds
}, _From
, Mgr
) ->
100 lists:foldl(fun(GroupId
, MgrAcc
) ->
101 #mgr
{cached_groups
=CachedGroups
, update_state_by_uuid
=ProcsDict
} = MgrAcc
,
102 CachedGroups2
= dict:erase(GroupId
, CachedGroups
),
103 case dict:find(GroupId
, ProcsDict
) of
104 {ok
, {processing_request
, _NotifyFuns
, Pid
}} ->
106 {ok
, {processing_and_pending_request
, _NotifyFuns
, _PendingUpdateNotifFuns
, Pid
}} ->
111 MgrAcc#mgr
{cached_groups
=CachedGroups2
}
116 handle_cast({get_group_async
, GroupId
, OrigFrom
}, Mgr
) ->
120 get_group_info_fun
=GetGroupInfoFun
,
121 cached_groups
=CachedGroups
123 case dict:find(GroupId
, CachedGroups
) of
125 gen_server:reply(OrigFrom
, {ok
, CachedGroup
}),
129 case GetGroupInfoFun(GroupId
) of
130 {ok
, {NamedQueries
, GroupState
}} ->
131 {ok
, Group
} = open(Db
, Fd
, NamedQueries
, GroupState
),
132 NewMgr
= Mgr#mgr
{cached_groups
=dict:store(GroupId
, Group
, CachedGroups
)},
133 {NewMgr
, {ok
, Group
}};
137 gen_server:reply(OrigFrom
, Reply
),
140 handle_cast({update_group
, GroupId
, UpdateNotifFun
}, Mgr
) ->
142 update_state_by_uuid
=ProcsDict
,
143 uuids_by_pid
=GroupIdDict
,
146 get_group_info_fun
=GetGroupInfoFun
,
147 update_group_info_fun
=UpdateGroupInfoFun
,
148 cached_groups
=CachedGroups
150 case dict:find(GroupId
, ProcsDict
) of
151 {ok
, {processing_request
, NotifyFuns
, Pid
}} ->
152 ProcsDict2
= dict:store(
154 {processing_and_pending_request
, NotifyFuns
, [UpdateNotifFun
], Pid
},
157 {noreply
, Mgr#mgr
{update_state_by_uuid
=ProcsDict2
}};
158 {ok
, {processing_and_pending_request
, NotifyFuns
, PendingUpdateNotifFuns
, Pid
}} ->
159 ProcsDict2
= dict:store(
161 {processing_and_pending_request
, NotifyFuns
, [UpdateNotifFun
| PendingUpdateNotifFuns
], Pid
},
164 {noreply
, Mgr#mgr
{update_state_by_uuid
=ProcsDict2
}};
166 case dict:find(GroupId
, CachedGroups
) of
168 Pid
= spawn_link(couch_table_group
, update_table_proc
, [self(), Group
, GroupId
, UpdateGroupInfoFun
, [UpdateNotifFun
]]);
170 Pid
= spawn_link(couch_table_group
, update_table_proc
, [self(), Db
, Fd
, GroupId
, GetGroupInfoFun
, UpdateGroupInfoFun
, [UpdateNotifFun
]])
172 ProcsDict2
= dict:store(GroupId
, {processing_request
, [UpdateNotifFun
], Pid
}, ProcsDict
),
173 GroupIdDict2
= dict:store(Pid
, GroupId
, GroupIdDict
),
174 {noreply
, Mgr#mgr
{update_state_by_uuid
=ProcsDict2
, uuids_by_pid
=GroupIdDict2
}}
176 handle_cast(stop
, Mgr
) ->
177 {stop
, normal
, Mgr
}. % causes terminate to be called
179 handle_info({'EXIT', FromPid
, Reason
}, Mgr
) ->
181 update_state_by_uuid
=ProcsDict
,
182 uuids_by_pid
=GroupIdDict
,
185 get_group_info_fun
=GetGroupInfoFun
,
186 update_group_info_fun
=UpdateGroupInfoFun
,
187 cached_groups
=CachedGroups
189 case dict:find(FromPid
, GroupIdDict
) of
191 case dict:find(GroupId
, ProcsDict
) of
192 {ok
, {processing_request
, NotifyFuns
, _Pid
}} ->
193 GroupIdDict2
= dict:erase(FromPid
, GroupIdDict
),
194 ProcsDict2
= dict:erase(GroupId
, ProcsDict
);
195 {ok
, {processing_and_pending_request
, NotifyFuns
, NextNotifyFuns
, _Pid
}} ->
196 case dict:find(GroupId
, CachedGroups
) of
198 Pid
= spawn_link(couch_table_group
, update_table_proc
, [self(), Group
, GroupId
, UpdateGroupInfoFun
, NextNotifyFuns
]);
200 Pid
= spawn_link(couch_table_group
, update_table_proc
, [self(), Db
, Fd
, GroupId
, GetGroupInfoFun
, UpdateGroupInfoFun
, NextNotifyFuns
])
202 GroupIdDict2
= dict:store(Pid
, GroupId
, dict:erase(FromPid
, GroupIdDict
)),
203 ProcsDict2
= dict:store(GroupId
, {processing_request
, NextNotifyFuns
, Pid
}, ProcsDict
)
209 % process returned abnormally, notify any waiting listeners
210 [catch NotifyFun(Reason
) || NotifyFun
<- NotifyFuns
]
212 Mgr2
= Mgr#mgr
{update_state_by_uuid
=ProcsDict2
, uuids_by_pid
=GroupIdDict2
},
215 % a linked process must have died, we propagate the error
219 code_change(_OldVsn
, State
, _Extra
) ->
223 open(Db
, Fd
, NamedQueries
, GroupState
) ->
224 open(Db
, Fd
, NamedQueries
, GroupState
, fun(_Table
) -> ok
end).
226 open(Db
, Fd
, NamedQueries
, GroupState
, UpdateNotifFun
) ->
227 {TableBtreeStates
, CurrentSeq
, GroupIdBtreeState
} =
229 nil
-> % new table group, init GroupState to nils
230 {[ nil
|| _Query
<- NamedQueries
], 0, nil
};
235 lists:mapfoldl(fun({{Name
, QueryString
}, BtreeState
}, Count
) ->
236 {ok
, Btree
} = couch_btree:open(BtreeState
, Fd
, fun less_fun
/2),
237 {#table
{name
=Name
, id
=Count
, btree
=Btree
, query_string
=QueryString
}, Count
+1}
239 0, lists:zip(NamedQueries
, TableBtreeStates
)),
240 {ok
, GroupIdBtree
} = couch_btree:open(GroupIdBtreeState
, Fd
),
241 TableGroup
= #table_group
{db
=Db
,
243 current_seq
=CurrentSeq
,
244 uuid_btree
=GroupIdBtree
,
245 update_notif_fun
=UpdateNotifFun
,
246 named_queries
=NamedQueries
},
250 get_info(#table_group
{named_queries
=NamedQueries
, tables
=Tables
, current_seq
=CurrentSeq
, uuid_btree
=GroupIdBtree
} = _TableGroup
) ->
251 TableBtreeStates
= [couch_btree:get_state(Table#table
.btree
) || Table
<- Tables
],
252 {NamedQueries
, {TableBtreeStates
, CurrentSeq
, couch_btree:get_state(GroupIdBtree
)}}.
254 fold(TableGroup
, TableName
, Dir
, Fun
, Acc
) ->
255 fold_int(TableGroup#table_group
.tables
, Dir
, TableName
, Fun
, Acc
).
257 fold_int([], _TableName
, _Dir
, _Fun
, _Acc
) ->
258 {not_found
, missing_named_table
};
259 fold_int([Table
| _RestTables
], TableName
, Dir
, Fun
, Acc
) when Table#table
.name
== TableName
->
260 TotalRowCount
= couch_btree:row_count(Table#table
.btree
),
261 {ok
, [{ok
, ColumnNames
}]} = couch_fabric:table_columns([Table#table
.query_string
]),
262 WrapperFun
= fun({{KeyColumn
, DocId
}, [Rev
|Columns
]}, Offset
, WrapperAcc
) ->
265 Fun({DocId
,Rev
}, KeyColumn
, [], Offset
, TotalRowCount
, WrapperAcc
);
267 Fun({DocId
,Rev
}, KeyColumn
, lists:zip(ColumnNames
, [KeyColumn
| Columns
]), Offset
, TotalRowCount
, WrapperAcc
)
270 {ok
, AccResult
} = couch_btree:fold(Table#table
.btree
, Dir
, WrapperFun
, Acc
),
271 {ok
, TotalRowCount
, AccResult
};
272 fold_int([_Table
| RestTables
], TableName
, Dir
, Fun
, Acc
) ->
273 fold_int(RestTables
, TableName
, Dir
, Fun
, Acc
).
276 fold(TableGroup
, TableName
, StartKey
, Dir
, Fun
, Acc
) ->
277 fold_int(TableGroup#table_group
.tables
, TableName
, StartKey
, Dir
, Fun
, Acc
).
279 fold_int([], _TableName
, _StartKey
, _Dir
, _Fun
, _Acc
) ->
280 {not_found
, missing_named_table
};
281 fold_int([Table
| _RestTables
], TableName
, StartKey
, Dir
, Fun
, Acc
) when Table#table
.name
== TableName
->
282 TotalRowCount
= couch_btree:row_count(Table#table
.btree
),
283 {ok
, [{ok
, ColumnNames
}]} = couch_fabric:table_columns([Table#table
.query_string
]),
284 WrapperFun
= fun({{KeyColumn
, DocId
}, [Rev
|Columns
]}, Offset
, WrapperAcc
) ->
287 Fun({DocId
, Rev
}, KeyColumn
, [], Offset
, TotalRowCount
, WrapperAcc
);
289 Fun({DocId
, Rev
}, KeyColumn
, lists:zip(ColumnNames
, [KeyColumn
| Columns
]), Offset
, TotalRowCount
, WrapperAcc
)
292 {ok
, AccResult
} = couch_btree:fold(Table#table
.btree
, StartKey
, Dir
, WrapperFun
, Acc
),
293 {ok
, TotalRowCount
, AccResult
};
294 fold_int([_Table
| RestTables
], TableName
, StartKey
, Dir
, Fun
, Acc
) ->
295 fold_int(RestTables
, TableName
, StartKey
, Dir
, Fun
, Acc
).
300 cmp_elements(A
, B
) when is_list(A
) and
is_list(B
) ->
301 couch_util:collate(A
, B
, [nocase
]);
302 cmp_elements(A
, B
) when A
== B
->
304 cmp_elements(A
, B
) when A
< B
->
306 cmp_elements(A
, B
) when A
> B
->
308 cmp_elements(_A
, _B
) ->
313 cmp_list([A
|RestA
], [B
|RestB
]) ->
314 case cmp_elements(A
, B
) of
315 0 -> cmp_list(RestA
, RestB
);
318 cmp_list([_A
|_RestA
], []) ->
320 cmp_list([], [_B
|_RestB
]) ->
323 % a case insenstive comparison function for keys in a table
324 % Note! Because of case insensitivity, this will not produce
325 % a stable sort! However, there is a way to get a stable sort
326 % and the desired insensitivity sematics by using the right
327 % collations at the right time. TODO
328 less_fun({A
, DocIdA
}, {B
, DocIdB
}) when is_list(A
) and
is_list(B
) ->
329 case cmp_list(A
, B
) of
337 less_fun({A
, DocIdA
}, {B
, DocIdB
}) ->
338 {A
, DocIdA
} < {B
, DocIdB
}.
341 notify(MgrPid
, UpdateStatus
, TableGroup
, GroupId
, UpdateGroupInfoFun
, StatusNotifyFuns
) ->
342 GroupInfo
= get_info(TableGroup
),
343 ok
= gen_server:call(MgrPid
, {group_cache_update
, GroupId
, TableGroup
}),
344 ok
= UpdateGroupInfoFun(GroupId
, UpdateStatus
, GroupInfo
),
346 StatusNotifyFuns2
= lists:foldl(fun(NotifyFun
, AccFuns
) ->
347 case (catch NotifyFun(UpdateStatus
)) of
348 NewNotifyFun
when is_function(NewNotifyFun
) ->
349 [NewNotifyFun
| AccFuns
];
354 [], StatusNotifyFuns
),
355 fun(UpdateStatus2
, TableGroup2
) ->
356 notify(MgrPid
, UpdateStatus2
, TableGroup2
, GroupId
, UpdateGroupInfoFun
, StatusNotifyFuns2
)
359 update_table_proc(MgrPid
, #table_group
{} = TableGroup
, GroupId
, UpdateGroupInfoFun
, StatusNotifyFuns
) ->
361 fun(UpdateStatus
, TableGroup2
) ->
362 notify(MgrPid
, UpdateStatus
, TableGroup2
, GroupId
, UpdateGroupInfoFun
, StatusNotifyFuns
)
364 update_int(TableGroup
, NotifyFun
).
366 update_table_proc(MgrPid
, Db
, Fd
, GroupId
, GetGroupInfoFun
, UpdateGroupInfoFun
, StatusNotifyFuns
) ->
367 case GetGroupInfoFun(GroupId
) of
368 {ok
, {NamedQueries
, GroupState
}} ->
369 {ok
, TableGroup
} = open(Db
, Fd
, NamedQueries
, GroupState
),
370 update_table_proc(MgrPid
, TableGroup
, GroupId
, UpdateGroupInfoFun
, StatusNotifyFuns
);
375 update_int(TableGroup
, NotifyFun
) ->
379 current_seq
=CurrentSeq
381 TableEmptyKVs
= [{Table
, []} || Table
<- Tables
],
383 % compute on all docs modified since we last computed.
384 {ok
, {UncomputedDocs
, TableGroup2
, TableKVsToAdd
, GroupIdTableIdKeys
, NewSeq
, NotifyFun2
}}
385 = couch_db:enum_docs_since(
388 fun(DocInfo
, Acc
) -> process_doc(Db
, DocInfo
, Acc
) end,
389 {[], TableGroup
, TableEmptyKVs
, [], CurrentSeq
, NotifyFun
}
392 {TableKVsToAdd2
, GroupIdTableIdKeys2
} =
393 case UncomputedDocs
of
395 {TableKVsToAdd
, GroupIdTableIdKeys
};
397 {ok
, Results
} = couch_fabric:table_compute(UncomputedDocs
, queries(TableGroup2
)),
398 table_insert_query_results(UncomputedDocs
, Results
, TableKVsToAdd
, GroupIdTableIdKeys
)
400 {ok
, TableGroup3
} = write_changes(TableGroup2
, TableKVsToAdd2
, GroupIdTableIdKeys2
, NewSeq
),
401 NotifyFun2(complete
, TableGroup3
),
405 process_doc(Db
, DocInfo
, {Docs
, TableGroup
, TableKVs
, GroupIdTableIdKeys
, _LastSeq
, NotifyFun
}) ->
406 % This fun computes once for each document
407 #doc_info
{uuid
=GroupId
, update_seq
=Seq
} = DocInfo
,
408 {Docs2
, GroupIdTableIdKeys2
} =
409 case couch_db:open_doc(Db
, DocInfo
) of
411 {[Doc
| Docs
], GroupIdTableIdKeys
};
412 {not_found
, deleted
} ->
413 {Docs
, [{GroupId
, []} | GroupIdTableIdKeys
]}
415 case process_info(self(), memory
) of
416 {memory
, Mem
} when Mem
> ?FLUSH_MAX_MEM
->
417 {ok
, Results
} = couch_fabric:table_compute(Docs2
, queries(TableGroup
)),
418 {TableKVs3
, GroupIdTableIdKeys3
} = table_insert_query_results(Docs2
, Results
, TableKVs
, GroupIdTableIdKeys2
),
419 {ok
, TableGroup2
} = write_changes(TableGroup
, TableKVs3
, GroupIdTableIdKeys3
, Seq
),
421 NotifyFun2
= NotifyFun(partial
, TableGroup2
),
422 TableEmptyKeyValues
= [{Table
, []} || Table
<- TableGroup2#table_group
.tables
],
423 {ok
, {[], TableGroup2
, TableEmptyKeyValues
, [], Seq
, NotifyFun2
}};
425 {ok
, {Docs2
, TableGroup
, TableKVs
, GroupIdTableIdKeys2
, Seq
, NotifyFun
}}
428 table_insert_query_results([], [], TableKVs
, GroupIdTableIdKeysAcc
) ->
429 {TableKVs
, GroupIdTableIdKeysAcc
};
430 table_insert_query_results([Doc
|RestDocs
], [QueryResults
| RestResults
], TableKVs
, GroupIdTableIdKeysAcc
) ->
431 {NewTableKVs
, NewTableIdKeys
} = table_insert_doc_query_results(Doc
, QueryResults
, TableKVs
, [], []),
432 NewGroupIdTableIdKeys
= [{Doc#doc
.uuid
, NewTableIdKeys
} | GroupIdTableIdKeysAcc
],
433 table_insert_query_results(RestDocs
, RestResults
, NewTableKVs
, NewGroupIdTableIdKeys
).
436 table_insert_doc_query_results(_Doc
, [], [], TableKVsAcc
, TableIdKeysAcc
) ->
437 {lists:reverse(TableKVsAcc
), lists:reverse(TableIdKeysAcc
)};
438 table_insert_doc_query_results(#doc
{uuid
=DocId
, revisions
=[Rev
|_
]}=Doc
, [{ok
, Columns
}|RestResults
], [{Table
, KVs
}|RestTableKVs
], TableKVsAcc
, TableIdKeysAcc
) ->
441 NewKVs
= [{{[], DocId
}, [Rev
]}];
442 [KeyColumn
| ValueColumns
] ->
443 NewKVs
= [{{KeyColumn
, DocId
}, [Rev
|ValueColumns
]}]
445 NewTableIdKeys
= [{Table#table
.id
, Key
} || {{Key
, _GroupId
}, _Value
} <- NewKVs
],
446 NewTableKVsAcc
= [{Table
, NewKVs
++ KVs
} | TableKVsAcc
],
447 NewTableIdKeysAcc
= NewTableIdKeys
++ TableIdKeysAcc
,
448 table_insert_doc_query_results(Doc
, RestResults
, RestTableKVs
, NewTableKVsAcc
, NewTableIdKeysAcc
);
449 table_insert_doc_query_results(Doc
, [no_match
|RestResults
], [{Table
, KVs
}|RestTableKVs
], TableKVsAcc
, TableIdKeysAcc
) ->
450 NewTableKVsAcc
= [{Table
, KVs
} | TableKVsAcc
],
451 table_insert_doc_query_results(Doc
, RestResults
, RestTableKVs
, NewTableKVsAcc
, TableIdKeysAcc
).
454 queries(TableGroup
) ->
455 [Table#table
.query_string
|| Table
<- TableGroup#table_group
.tables
].
458 dict_find(Key
, DefaultValue
, Dict
) ->
459 case dict:find(Key
, Dict
) of
466 write_changes(TableGroup
, TableKeyValuesToAdd
, GroupIdTableIdKeys
, NewSeq
) ->
467 #table_group
{uuid_btree
=GroupIdBtree
} = TableGroup
,
469 AddGroupIdTableIdKeys
= [{GroupId
, TableIdKeys
} || {GroupId
, TableIdKeys
} <- GroupIdTableIdKeys
, TableIdKeys
/= []],
470 RemoveGroupIds
= [GroupId
|| {GroupId
, TableIdKeys
} <- GroupIdTableIdKeys
, TableIdKeys
== []],
471 LookupGroupIds
= [GroupId
|| {GroupId
, _TableIdKeys
} <- GroupIdTableIdKeys
],
473 {ok
, LookupResults
, GroupIdBtree2
}
474 = couch_btree:query_modify(GroupIdBtree
, LookupGroupIds
, AddGroupIdTableIdKeys
, RemoveGroupIds
),
475 KeysToRemoveByTable
= lists:foldl(
476 fun(LookupResult
, KeysToRemoveByTableAcc
) ->
478 {ok
, {GroupId
, TableIdKeys
}} ->
480 fun({TableId
, Key
}, KeysToRemoveByTableAcc2
) ->
481 dict:append(TableId
, {Key
, GroupId
}, KeysToRemoveByTableAcc2
)
483 KeysToRemoveByTableAcc
, TableIdKeys
);
485 KeysToRemoveByTableAcc
488 dict:new(), LookupResults
),
492 KeysToRemove
= dict_find(Table#table
.id
, [], KeysToRemoveByTable
),
493 {ok
, TableBtree2
} = couch_btree:add_remove(Table#table
.btree
, AddKeyValues
, KeysToRemove
),
494 Table#table
{btree
= TableBtree2
}
497 {Table
, AddKeyValues
} <- TableKeyValuesToAdd
500 TableGroup2
= TableGroup#table_group
{tables
=Tables2
, current_seq
=NewSeq
, uuid_btree
=GroupIdBtree2
},