6 # $Id: glideinFrontendPlugins.py,v 1.15 2011/02/10 21:35:31 parag Exp $
9 # This module implements plugins for the VO frontend
12 # Igor Sfiligoi (since Mar 31st 2009)
15 import os
,os
.path
,time
18 import glideinFrontendLib
21 ################################################################################
23 #### Proxy plugins ####
25 # All plugins implement the following interface: #
26 # __init_(config_dir,proxy_list) #
27 # Constructor, config_dir may be used for internal config/cache files #
28 # get_required_job_attributes() #
29 # Return the list of required condor_q attributes #
30 # get_required_classad_attributes() #
31 # Return the list of required condor_status attributes #
32 # get_proxies(condorq_dict,condorq_dict_types,status_dict,status_dict_types) #
33 # Return a list of proxies that match the input criteria #
34 # Each element is a (index, value) pair #
35 # If called multiple time, it is guaranteed that #
36 # if the index is the same, the proxy is (logicaly) the same #
38 ################################################################################
40 ############################################
42 # This plugin always returns the first proxy
43 # Useful when there is only one proxy
47 def __init__(self
,config_dir
,proxy_list
):
48 self
.proxy_list
=list2ilist(proxy_list
)
50 # what job attributes are used by this plugin
51 def get_required_job_attributes(self
):
54 # what glidein attributes are used by this plugin
55 def get_required_classad_attributes(self
):
58 # get the proxies, given the condor_q and condor_status data
59 def get_proxies(self
,condorq_dict
,condorq_dict_types
,
60 status_dict
,status_dict_types
):
61 return [self
.proxy_list
[0]]
63 ############################################
65 # This plugin returns all the proxies
66 # This is can be a very useful default policy
69 def __init__(self
,config_dir
,proxy_list
):
70 self
.proxy_list
=list2ilist(proxy_list
)
72 # what job attributes are used by this plugin
73 def get_required_job_attributes(self
):
76 # what glidein attributes are used by this plugin
77 def get_required_classad_attributes(self
):
80 # get the proxies, given the condor_q and condor_status data
81 def get_proxies(self
,condorq_dict
,condorq_dict_types
,
82 status_dict
,status_dict_types
):
83 return self
.proxy_list
85 ##########################################################
87 # This plugin uses the first N proxies
88 # where N is the number of users currently in the system
90 # This is useful if the first proxies are higher priority
92 # Also good for testing
94 class ProxyUserCardinality
:
95 def __init__(self
,config_dir
,proxy_list
):
96 self
.proxy_list
=list2ilist(proxy_list
)
98 # what job attributes are used by this plugin
99 def get_required_job_attributes(self
):
100 return (('User','s'),)
102 # what glidein attributes are used by this plugin
103 def get_required_classad_attributes(self
):
106 # get the proxies, given the condor_q and condor_status data
107 def get_proxies(self
,condorq_dict
,condorq_dict_types
,
108 status_dict
,status_dict_types
):
109 users_set
=glideinFrontendLib
.getCondorQUsers(condorq_dict
)
110 return self
.get_proxies_from_cardinality(len(users_set
))
112 #############################
114 #############################
116 # return the proxies based on data held by the class
117 def get_proxies_from_cardinality(self
,nr_requested_proxies
):
118 nr_proxies
=len(self
.proxy_list
)
120 if nr_requested_proxies
>=nr_proxies
:
121 # wants all of them, no need to select
122 return self
.proxy_list
125 for i
in range(nr_requested_proxies
):
126 out_proxies
.append(self
.proxy_list
[i
])
130 ######################################################################
132 # This plugin implements a user-based round-robin policy
133 # The same proxies are used as long as the users don't change
134 # (we keep a disk-based memory for this purpose)
135 # Once any user leaves, the most used proxy is returned to the pool
136 # If a new user joins, the least used proxy is obtained from the pool
139 def __init__(self
,config_dir
,proxy_list
):
140 self
.proxy_list
=proxy_list
141 self
.config_dir
=config_dir
142 self
.config_fname
="%s/proxy_user_rr.dat"%self
.config_dir
145 # what job attributes are used by this plugin
146 def get_required_job_attributes(self
):
147 return (('User','s'),)
149 # what glidein attributes are used by this plugin
150 def get_required_classad_attributes(self
):
153 # get the proxies, given the condor_q and condor_status data
154 def get_proxies(self
,condorq_dict
,condorq_dict_types
,
155 status_dict
,status_dict_types
):
156 new_users_set
=glideinFrontendLib
.getCondorQUsers(condorq_dict
)
157 old_users_set
=self
.config_data
['users_set']
158 if old_users_set
==new_users_set
:
159 return self
.get_proxies_from_data()
162 removed_users
=old_users_set
-new_users_set
163 added_users
=new_users_set
-old_users_set
165 if len(removed_users
)>0:
166 self
.shrink_proxies(len(removed_users
))
167 if len(added_users
)>0:
168 self
.expand_proxies(len(added_users
))
170 self
.config_data
['users_set']=new_users_set
173 return self
.get_proxies_from_data()
175 #############################
177 #############################
179 # load from self.config_fname into self.config_data
180 # if the file does not exist, create a new config_data
182 if not os
.path
.isfile(self
.config_fname
):
184 nr_proxies
=len(self
.proxy_list
)
185 for i
in range(nr_proxies
):
186 proxy_indexes
[self
.proxy_list
[i
]]=i
187 self
.config_data
={'users_set':sets
.Set(),
188 'proxies_range':{'min':0,'max':0},
189 'proxy_indexes':proxy_indexes
,
190 'first_free_index':nr_proxies
}
192 fd
=open(self
.config_fname
,"r")
194 self
.config_data
=pickle
.load(fd
)
198 # proxies may have changed... make sure you have them all indexed
199 proxy_indexes
=self
.config_data
['proxy_indexes']
200 added_proxies
=sets
.Set(self
.proxy_list
)-sets
.Set(proxy_indexes
.keys())
201 for proxy
in added_proxies
:
202 idx
=self
.config_data
['first_free_index']
203 proxy_indexes
[self
.proxy_list
[i
]]=idx
204 self
.config_data
['first_free_index']=idx
+1
208 # save self.config_data into self.config_fname
210 # fist save in a tmpfile
211 tmpname
="%s~"%self
.config_fname
218 pickle
.dump(self
.config_data
,fd
,0) # use ASCII version of protocol
222 # then atomicly move it in place
223 os
.rename(tmpname
,self
.config_fname
)
227 # remove a number of proxies from the internal data
228 def shrink_proxies(self
,nr
):
229 proxies_range
=self
.config_data
['proxies_range']
230 min_proxy_range
=proxies_range
['min']
231 max_proxy_range
=proxies_range
['max']
234 if min_proxy_range
>max_proxy_range
:
235 raise RuntimeError,"Cannot shrink so much: %i requested, %i available"%(nr
, max_proxy_range
-proxies_range
['min'])
237 proxies_range
['min']=min_proxy_range
241 # add a number of proxies from the internal data
242 def expand_proxies(self
,nr
):
243 proxies_range
=self
.config_data
['proxies_range']
244 min_proxy_range
=proxies_range
['min']
245 max_proxy_range
=proxies_range
['max']
248 if min_proxy_range
>max_proxy_range
:
249 raise RuntimeError,"Did we hit wraparound after the requested exansion of %i? min %i> max %i"%(nr
, min_proxy_range
,max_proxy_range
)
251 proxies_range
['max']=max_proxy_range
255 # return the proxies based on data held by the class
256 def get_proxies_from_data(self
):
257 nr_proxies
=len(self
.proxy_list
)
259 proxies_range
=self
.config_data
['proxies_range']
260 min_proxy_range
=proxies_range
['min']
261 max_proxy_range
=proxies_range
['max']
262 nr_requested_proxies
=max_proxy_range
-min_proxy_range
;
264 proxy_indexes
=self
.config_data
['proxy_indexes']
267 if nr_requested_proxies
>=nr_proxies
:
268 # wants all of them, no need to select
269 index_range
=range(nr_proxies
)
271 index_range
=range(min_proxy_range
,max_proxy_range
)
273 for i
in index_range
:
275 proxy
=self
.proxy_list
[i
]
276 out_proxies
.append(("urr_%i"%proxy_indexes
[proxy
],proxy
))
280 ######################################################################
282 # This plugin implements a user-based mapping policy
283 # with possibility of recycling of accounts:
284 # * when a user first enters the system, it gets mapped to a
285 # pilot proxy that was not used for the longest time
286 # * for existing users, just use the existing mapping
287 # * if an old user comes back, it may be mapped to the old account, if not
288 # yet recycled, else it is treated as a new user
290 class ProxyUserMapWRecycling
:
291 def __init__(self
,config_dir
,proxy_list
):
292 self
.proxy_list
=proxy_list
293 self
.config_dir
=config_dir
294 self
.config_fname
="%s/proxy_usermap_wr.dat"%self
.config_dir
297 # what job attributes are used by this plugin
298 def get_required_job_attributes(self
):
299 return (('User','s'),)
301 # what glidein attributes are used by this plugin
302 def get_required_classad_attributes(self
):
305 # get the proxies, given the condor_q and condor_status data
306 def get_proxies(self
,condorq_dict
,condorq_dict_types
,
307 status_dict
,status_dict_types
):
308 users
=list(glideinFrontendLib
.getCondorQUsers(condorq_dict
))
311 # check if there are more users than proxies
313 user_map
=self
.config_data
['user_map']
315 if len(users
)<len(user_map
.keys()):
316 # regular algorithm, find in cache
318 if not user_map
.has_key(user
):
319 # user not in cache, get the oldest unused entry
320 # not ordered, need to loop over the whole cache
323 min_key
=keys
[0] # will compare all others to the first
325 if user_map
[k
]['last_seen']<user_map
[min_key
]['last_seen']:
328 # replace min_key with the current user
329 user_map
[user
]=user_map
[min_key
]
330 del user_map
[min_key
]
331 # else the user is already in the cache... just use that
334 out_proxies
.append(("umrw_%i"%cel
['proxy_index'],cel
['proxy']))
335 # save that you have indeed seen the user
336 cel
['last_seen']=time
.time()
338 # more users than proxies, use all proxies
341 uncovered_users
=users
[0:]
343 # first get the covered keys
346 # the user in the cache is still present, use it
348 out_proxies
.append(("umrw_%i"%cel
['proxy_index'],cel
['proxy']))
349 # save that you have indeed seen the user
350 cel
['last_seen']=time
.time()
351 uncovered_users
.remove(k
)
353 # this cache entry need to be updated
354 uncovered_keys
.append(k
)
355 # now add uncovered keys
356 for k
in uncovered_keys
:
357 # change key value with an uncovered user
358 user
=uncovered_users
.pop()
359 user_map
[user
]=user_map
[k
]
363 out_proxies
.append(("umrw_%i"%cel
['proxy_index'],cel
['proxy']))
364 # save that you have indeed seen the user
365 cel
['last_seen']=time
.time()
373 #############################
375 #############################
377 # load from self.config_fname into self.config_data
378 # if the file does not exist, create a new config_data
380 if not os
.path
.exists(self
.config_fname
):
381 # no cache, create new cache structure from scratch
384 nr_proxies
=len(self
.proxy_list
)
385 for i
in range(nr_proxies
):
386 # use numbers for keys, so we are sure will not match to any user string
387 user_map
[i
]={'proxy':self
.proxy_list
[i
],
389 'last_seen':0} #0 is the oldest UNIX have ever seen ;)
390 self
.config_data
['user_map']=user_map
391 self
.config_data
['first_free_index']=nr_proxies
# this will be used for future updates
394 fd
=open(self
.config_fname
,"r")
396 self
.config_data
=pickle
.load(fd
)
400 # if proxies changed, remove old ones and insert the new ones
401 new_proxies
=sets
.Set(self
.proxy_list
)
402 cached_proxies
=sets
.Set() # here we will store the list of proxies in the cache
404 user_map
=self
.config_data
['user_map']
406 # need to iterate, since not indexed by proxy name
411 if not (el_proxy
in new_proxies
):
412 # cached proxy not used anymore... remove from cache
415 # add to the list, will process later
416 cached_proxies
.add(el_proxy
)
418 added_proxies
=new_proxies
-cached_proxies
419 # now that we know what proxies have been added, put them in cache
420 for proxy
in added_proxies
:
421 idx
=self
.config_data
['first_free_index']
422 # use numbers for keys, so we are user will not mutch to any user string
423 user_map
[idx
]={'proxy':proxy
,
425 'last_seen':0} #0 is the oldest UNIX have ever seen ;)
426 self
.config_data
['first_free_index']=idx
+1
430 # save self.config_data into self.config_fname
432 # fist save in a tmpfile
433 tmpname
="%s~"%self
.config_fname
440 pickle
.dump(self
.config_data
,fd
,0) # use ASCII version of protocol
444 # then atomicly move it in place
445 os
.rename(tmpname
,self
.config_fname
)
449 ###############################################
450 # INTERNAL to proxy_plugins, don't use directly
452 # convert a list into a list of (index, value)
455 # NOTE: This will not work if proxy order is changed between reconfigs :(
460 for i
in range(len(lst
)):
461 out
.append((i
,lst
[i
]))
466 ###################################################################
468 # Being plugins, users are not expected to directly reference the classes
469 # They should go throug the dictionaries below to find the appropriate plugin
471 proxy_plugins
={'ProxyAll':ProxyAll
,
472 'ProxyUserRR':ProxyUserRR
,
473 'ProxyFirst':ProxyFirst
,
474 'ProxyUserCardinality':ProxyUserCardinality
,
475 'ProxyUserMapWRecycling':ProxyUserMapWRecycling
}