1 # -*- coding: utf-8 -*-
4 ## This file is part of CDS Indico.
5 ## Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007 CERN.
7 ## CDS Indico is free software; you can redistribute it and/or
8 ## modify it under the terms of the GNU General Public License as
9 ## published by the Free Software Foundation; either version 2 of the
10 ## License, or (at your option) any later version.
12 ## CDS Indico is distributed in the hope that it will be useful, but
13 ## WITHOUT ANY WARRANTY; without even the implied warranty of
14 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 ## General Public License for more details.
17 ## You should have received a copy of the GNU General Public License
18 ## along with CDS Indico; if not, write to the Free Software Foundation, Inc.,
19 ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
22 Module containing the persistent classes that will be stored in the DB
24 # standard lib imports
29 from persistent
import Persistent
, mapping
31 # indico extpoint imports
32 from indico
.core
.extpoint
import Component
33 from indico
.util
.fossilize
import IFossil
, fossilizes
, Fossilizable
, conversion
36 from indico
.ext
.livesync
.struct
import SetMultiPointerTrack
37 from indico
.ext
.livesync
.util
import getPluginType
38 from indico
.ext
.livesync
.struct
import EmptyTrackException
39 from indico
.ext
.livesync
.base
import ILiveSyncAgentProvider
, MPT_GRANULARITY
40 from indico
.ext
.livesync
.db
import updateDBStructures
42 # legacy indico imports
43 from MaKaC
import conference
44 from MaKaC
.common
import DBMgr
46 class QueryException(Exception):
48 Raised by problems in AgentManager queries
52 class AgentExecutionException(Exception):
54 Raised by problems in Agent execution
58 class IAgentFossil(IFossil
):
69 def getDescription(self
):
77 getLastDT
.convert
= conversion
.Conversion
.datetime
79 def getExtraOptions(self
):
81 getExtraOptions
.name
= 'specific'
84 class SyncAgent(Fossilizable
, Persistent
):
86 Represents an "agent" (service)
89 fossilizes(IAgentFossil
)
93 # TODO: Subclass into PushSyncAgent(task)/PullSyncAgent?
95 def __init__(self
, aid
, name
, description
, updateTime
, access
=None):
98 self
._description
= description
99 self
._updateTime
= updateTime
102 self
._recording
= False
103 self
._access
= access
105 def record_str(self
, (obj
, objId
, status
)):
107 Translates the objects/states to an easy to read textual representation
110 def setManager(self
, manager
):
111 self
._manager
= manager
116 def isRecording(self
):
117 return self
._recording
119 def setActive(self
, value
):
122 def preActivate(self
, ts
):
123 track
= self
._manager
.getTrack()
126 track
.movePointer(self
._id
, ts
/ \
127 self
._manager
.getGranularity() - 1)
128 except EmptyTrackException
:
129 # if the track is empty, don't bother doing this
132 # this means everything from the pointer to present will be considered
133 # when the next update is done
134 self
._recording
= True
140 return self
._manager
.getTrack().getPointerTimestamp(self
._id
)
143 ts
= self
.getLastTS()
144 return datetime
.datetime
.utcfromtimestamp(ts
* \
145 self
._manager
.getGranularity()) if ts
else None
150 def getDescription(self
):
151 return self
._description
153 def getExtraOptions(self
):
154 return dict((option
, self
.getExtraOption(option
))
155 for option
in self
._extraOptions
)
157 def getExtraOption(self
, optionName
):
158 if optionName
in self
._extraOptions
:
159 return getattr(self
, "_%s" % optionName
)
161 raise Exception('unknown option!')
163 def setExtraOption(self
, optionName
, value
):
164 if optionName
in self
._extraOptions
:
165 setattr(self
, "_%s" % optionName
, value
)
167 raise Exception('unknown option!')
169 def setParameters(self
, description
=None,
172 self
._description
= description
177 class AgentProviderComponent(Component
):
179 This class only serves the purpose of letting LiveSync know that an
183 zope
.interface
.implements(ILiveSyncAgentProvider
)
185 # ILiveSyncAgentProvider
186 def providesLiveSyncAgentType(self
, obj
, types
):
187 if hasattr(self
, '_agentType'):
188 types
[self
._agentType
.__name
__] = self
._agentType
191 class PushSyncAgent(SyncAgent
):
193 PushSyncAgents are agents that actively send data to remote services,
194 instead of waiting to be queried.
197 # Should specify which worker will be used
200 def __init__(self
, aid
, name
, description
, updateTime
, access
=None):
203 :param name: agent name
204 :param description: a description of the agent
205 :param access: an Indico user/group that has equivalent access
207 super(PushSyncAgent
, self
).__init
__(aid
, name
, description
, updateTime
)
209 self
._access
= access
211 def _run(self
, data
, logger
=None, monitor
=None, dbi
=None):
213 Overloaded - will contain the specific agent code
215 raise Exception("Undefined method")
217 def _generateRecords(self
, data
, lastTS
, dbi
=None):
219 :param data: iterable containing data to be converted
222 Takes the raw data (i.e. "event created", etc) and transforms
223 it into a sequence of 'record/action' pairs.
225 Basically, this function reduces actions to remove server "commands"
227 i.e. ``modified 1234, deleted 1234`` becomes just ``delete 1234``
232 def run(self
, currentTS
, logger
=None, monitor
=None, dbi
=None):
234 Main method, called when agent needs to be run
237 if currentTS
== None:
240 till
= currentTS
/ self
._manager
.getGranularity() - 1
242 if not self
._manager
:
243 raise AgentExecutionException("SyncAgent '%s' has no manager!" % \
247 logger
.info("Querying agent %s for events till %s" % \
248 (self
.getId(), till
))
250 # query till currentTS - 1, for integrity reasons
251 data
= self
._manager
.query(agentId
=self
.getId(),
255 records
= self
._generateRecords
(data
, till
, dbi
=dbi
)
256 # run agent-specific cycle
257 result
= self
._run
(records
, logger
=logger
, monitor
=monitor
, dbi
=dbi
)
260 logger
.exception("Problem running agent %s" % self
.getId())
269 def acknowledge(self
):
271 Called to signal that the information has been correctly processed
272 (usually called by periodic task)
274 self
._manager
.advance(self
.getId(), self
._lastTry
)
277 class SyncManager(Persistent
):
279 Stores live sync configuration parameters and "agents". It is basically an
283 def __init__(self
, granularity
=MPT_GRANULARITY
):
285 :param granularity: integer, number of seconds per MPT entry
287 self
._granularity
= granularity
290 def getGranularity(self
):
292 Returns the granularity that is set for the MPT
294 return self
._granularity
297 def getDBInstance(cls
):
299 Returns the instance of SyncManager currently in the DB
301 storage
= getPluginType().getStorage()
302 if 'agent_manager' in storage
:
303 return storage
['agent_manager']
305 root
= DBMgr
.getInstance().getDBConnection()
306 updateDBStructures(root
)
308 def reset(self
, agentsOnly
=False, trackOnly
=False):
310 Resets database structures
313 This erases any agents and contents in the MPT
316 self
._agents
= mapping
.PersistentMapping()
318 self
._track
= SetMultiPointerTrack()
320 def registerNewAgent(self
, agent
):
322 Registers the agent, placing it in a mapping structure
324 self
._agents
[agent
.getId()] = agent
326 # create a new pointer in the track
327 self
._track
.addPointer(agent
.getId())
329 # impose myself as its manager
330 agent
.setManager(self
)
332 def removeAgent(self
, agent
):
336 self
._track
.removePointer(agent
.getId())
337 del self
._agents
[agent
.getId()]
339 def query(self
, agentId
=None, till
=None):
341 Queries the agent for a given timespan
344 # TODO: Add more criteria! (for now this will do)
347 raise QueryException("No criteria specified!")
349 return self
._track
.pointerIterItems(agentId
, till
=till
)
351 def advance(self
, agentId
, newLastTS
):
353 Advances the agent "pointer" to the specified timestamp
355 self
._track
.movePointer(agentId
,
358 def add(self
, timestamp
, action
):
360 Adds a specific action to the specified timestamp
362 self
._track
.add(timestamp
/ self
._granularity
, action
)
370 def getAllAgents(self
):
372 Returns the agent dictionary
376 def objectExcluded(self
, obj
):
378 Decides whether a particular object should be ignored or not
380 excluded
= getPluginType().getOption('excludedCategories').getValue()
381 if isinstance(obj
, conference
.Category
):
382 return obj
.getId() in excluded
383 elif isinstance(obj
, conference
.Conference
):
384 owner
= obj
.getOwner()
386 return owner
.getId() in excluded
387 elif obj
.getParent():
388 conf
= obj
.getConference()
390 owner
= conf
.getOwner()
392 return owner
.getId() in excluded
397 class RecordUploader(object):
399 Encapsulates record uploading behavior.
402 DEFAULT_BATCH_SIZE
, DEFAULT_NUM_SLAVES
= 1000, 2
404 def __init__(self
, logger
, agent
, batchSize
=DEFAULT_BATCH_SIZE
):
405 self
._logger
= logger
407 self
._batchSize
= batchSize
409 def _uploadBatch(self
, batch
):
411 :param batch: list of records
413 To be overloaded by uploaders. Does the actual upload.
415 raise Exception("Unimplemented method!")
417 def iterateOver(self
, iterator
, dbi
=None):
419 Consumes an iterator, uploading the records that are returned
420 `dbi` can be passed, so that the cache is cleared once in a while
425 # take operations and choose which records to send
426 for record
in iterator
:
428 if len(currentBatch
) > (self
._batchSize
- 1):
429 self
._uploadBatch
(currentBatch
)
432 currentBatch
.append(record
)
438 self
._uploadBatch
(currentBatch
)