6 # $LastChangedRevision$
12 my_getopt
= getopt
.gnu_getopt
13 except AttributeError:
14 my_getopt
= getopt
.getopt
17 __author__
= "Gustavo Niemeyer <gustavo@niemeyer.net>"
19 class Error(Exception): pass
21 SECTION
= re
.compile(r
'\[([^]]+?)(?:\s+extends\s+([^]]+))?\]')
22 OPTION
= re
.compile(r
'(\S+)\s*=\s*(.*)$')
25 def __init__(self
, filename
):
26 # Options are stored in __sections_list like this:
27 # [(sectname, [(optname, optval), ...]), ...]
28 self
._sections
_list
= []
29 self
._sections
_dict
= {}
32 def _read(self
, filename
):
33 # Use the same logic as in ConfigParser.__read()
38 for line
in file.xreadlines():
40 if line
.isspace() or line
[0] == '#':
42 if line
[0].isspace() and cursectdict
is not None and optname
:
44 cursectdict
[optname
] = "%s %s" % (cursectdict
[optname
], value
)
45 cursectlist
[-1][1] = "%s %s" % (cursectlist
[-1][1], value
)
47 m
= SECTION
.match(line
)
50 parentsectname
= m
.group(2)
51 if parentsectname
is None:
52 # No parent section defined, so start a new section
53 cursectdict
= self
._sections
_dict
.setdefault \
57 # Copy the parent section into the new section
58 parentsectdict
= self
._sections
_dict
.get \
60 cursectdict
= self
._sections
_dict
.setdefault \
61 (sectname
, parentsectdict
.copy())
62 cursectlist
= self
.walk(parentsectname
)
63 self
._sections
_list
.append((sectname
, cursectlist
))
65 elif cursectdict
is None:
66 raise Error
, "%s:%d: no section header" % \
69 m
= OPTION
.match(line
)
71 optname
, optval
= m
.groups()
72 optval
= optval
.strip()
73 cursectdict
[optname
] = optval
74 cursectlist
.append([optname
, optval
])
76 raise Error
, "%s:%d: parsing error" % \
80 return self
._sections
_dict
.keys()
82 def options(self
, section
):
83 return self
._sections
_dict
.get(section
, {}).keys()
85 def get(self
, section
, option
, default
=None):
86 return self
._sections
_dict
.get(option
, default
)
88 def walk(self
, section
, option
=None):
90 for sectname
, options
in self
._sections
_list
:
91 if sectname
== section
:
92 for optname
, value
in options
:
93 if not option
or optname
== option
:
94 ret
.append((optname
, value
))
103 def parse_groups(self
, groupsiter
):
104 for option
, value
in groupsiter
:
105 self
._group
[option
] = value
.split()
107 def parse_perms(self
, permsiter
):
108 for option
, value
in permsiter
:
109 # Paths never start with /, so remove it if provided
112 pattern
= re
.compile("^%s$" % option
)
113 for entry
in value
.split():
114 openpar
, closepar
= entry
.find("("), entry
.find(")")
115 groupsusers
= entry
[:openpar
].split(",")
116 perms
= entry
[openpar
+1:closepar
].split(",")
118 for groupuser
in groupsusers
:
119 if groupuser
[0] == "@":
121 users
.extend(self
._group
[groupuser
[1:]])
123 raise Error
, "group '%s' not found" % \
126 users
.append(groupuser
)
127 self
._permlist
.append((pattern
, users
, perms
))
129 def get(self
, user
, path
):
131 for pattern
, users
, perms
in self
._permlist
:
132 if pattern
.match(path
) and (user
in users
or "*" in users
):
137 def __init__(self
, repospath
, txn
=None, rev
=None):
138 self
.repospath
= repospath
142 def _execcmd(self
, *cmd
, **kwargs
):
143 cmdstr
= " ".join(cmd
)
144 status
, output
= commands
.getstatusoutput(cmdstr
)
146 sys
.stderr
.write(cmdstr
)
147 sys
.stderr
.write("\n")
148 sys
.stderr
.write(output
)
149 raise Error
, "command failed: %s\n%s" % (cmdstr
, output
)
150 return status
, output
152 def _execsvnlook(self
, cmd
, *args
, **kwargs
):
153 execcmd_args
= ["svnlook", cmd
, self
.repospath
]
154 self
._add
_txnrev
(execcmd_args
, kwargs
)
157 keywords
= ["show", "noerror"]
159 if kwargs
.has_key(key
):
160 execcmd_kwargs
[key
] = kwargs
[key
]
161 return self
._execcmd
(*execcmd_args
, **execcmd_kwargs
)
163 def _add_txnrev(self
, cmd_args
, received_kwargs
):
164 if received_kwargs
.has_key("txn"):
165 txn
= received_kwargs
.get("txn")
167 cmd_args
+= ["-t", txn
]
168 elif self
.txn
is not None:
169 cmd_args
+= ["-t", self
.txn
]
170 if received_kwargs
.has_key("rev"):
171 rev
= received_kwargs
.get("rev")
173 cmd_args
+= ["-r", rev
]
174 elif self
.rev
is not None:
175 cmd_args
+= ["-r", self
.rev
]
177 def changed(self
, **kwargs
):
178 status
, output
= self
._execsvnlook
("changed", **kwargs
)
182 for line
in output
.splitlines():
184 if not line
: continue
185 entry
= [None, None, None]
186 changedata
, changeprop
, path
= None, None, None
192 changes
.append((changedata
, changeprop
, path
))
195 def author(self
, **kwargs
):
196 status
, output
= self
._execsvnlook
("author", **kwargs
)
199 return output
.strip()
202 def check_perms(filename
, section
, repos
, txn
=None, rev
=None, author
=None):
203 svnlook
= SVNLook(repos
, txn
=txn
, rev
=rev
)
205 author
= svnlook
.author()
206 changes
= svnlook
.changed()
208 config
= Config(filename
)
210 raise Error
, "can't read config file "+filename
211 if not section
in config
.sections():
212 raise Error
, "section '%s' not found in config file" % section
214 perm
.parse_groups(config
.walk("groups"))
215 perm
.parse_groups(config
.walk(section
+" groups"))
216 perm
.parse_perms(config
.walk(section
))
218 for changedata
, changeprop
, path
in changes
:
219 pathperms
= perm
.get(author
, path
)
220 if changedata
== "A" and "add" not in pathperms
:
221 permerrors
.append("you can't add "+path
)
222 elif changedata
== "U" and "update" not in pathperms
:
223 permerrors
.append("you can't update "+path
)
224 elif changedata
== "D" and "remove" not in pathperms
:
225 permerrors
.append("you can't remove "+path
)
226 elif changeprop
== "U" and "update" not in pathperms
:
227 permerrors
.append("you can't update properties of "+path
)
229 # print "cdata=%s cprop=%s path=%s perms=%s" % \
230 # (str(changedata), str(changeprop), path, str(pathperms))
232 permerrors
.insert(0, "you don't have enough permissions for "
234 raise Error
, "\n".join(permerrors
)
240 Usage: svnperms.py OPTIONS
243 -r PATH Use repository at PATH to check transactions
244 -t TXN Query transaction TXN for commit information
245 -f PATH Use PATH as configuration file (default is repository
246 path + /conf/svnperms.conf)
247 -s NAME Use section NAME as permission section (default is
248 repository name, extracted from repository path)
249 -R REV Query revision REV for commit information (for tests)
250 -A AUTHOR Check commit as if AUTHOR had commited it (for tests)
254 class MissingArgumentsException(Exception):
255 "Thrown when required arguments are missing."
260 opts
, args
= my_getopt(sys
.argv
[1:], "f:s:r:t:R:A:h", ["help"])
261 except getopt
.GetoptError
, e
:
267 obj
.repository
= None
268 obj
.transaction
= None
271 for opt
, val
in opts
:
279 obj
.transaction
= val
284 elif opt
in ["-h", "--help"]:
285 sys
.stdout
.write(USAGE
)
288 if not obj
.repository
:
289 missingopts
.append("repository")
290 if not (obj
.transaction
or obj
.revision
):
291 missingopts
.append("either transaction or a revision")
293 raise MissingArgumentsException
, \
294 "missing required option(s): " + ", ".join(missingopts
)
295 obj
.repository
= os
.path
.abspath(obj
.repository
)
296 if obj
.filename
is None:
297 obj
.filename
= os
.path
.join(obj
.repository
, "conf", "svnperms.conf")
298 if obj
.section
is None:
299 obj
.section
= os
.path
.basename(obj
.repository
)
300 if not (os
.path
.isdir(obj
.repository
) and
301 os
.path
.isdir(os
.path
.join(obj
.repository
, "db")) and
302 os
.path
.isdir(os
.path
.join(obj
.repository
, "hooks")) and
303 os
.path
.isfile(os
.path
.join(obj
.repository
, "format"))):
304 raise Error
, "path '%s' doesn't look like a repository" % \
311 opts
= parse_options()
312 check_perms(opts
.filename
, opts
.section
,
313 opts
.repository
, opts
.transaction
, opts
.revision
,
315 except MissingArgumentsException
, e
:
316 sys
.stderr
.write("%s\n" % str(e
))
317 sys
.stderr
.write(USAGE
)
320 sys
.stderr
.write("error: %s\n" % str(e
))
323 if __name__
== "__main__":