from trac.core import * from trac.util import sorted, reversed from trac.perm import IPermissionPolicy, PermissionSystem, IPermissionGroupProvider from ConfigParser import ConfigParser from fnmatch import fnmatch import os class Authz(object): def __init__(self, env, file=None): self.env = env self.rules = {} if not file: return config = ConfigParser() config.read(file) # Expand groups groups = {} if config.has_section('groups'): groups = dict([(group, [m.strip() for m in filter(None, members.split(','))]) for group, members in config.items('groups')]) for section in [s for s in config.sections() if s not in ('DEFAULT', 'groups')]: if ':' not in section: raise TracError('AuthzPolicy authz section names must be in the' ' form `type:identifier`. eg. `wiki:SandBox`') module, identifier = section.split(':', 1) self.rules.setdefault(module, {}).setdefault(identifier, {}) rule = self.rules[module][identifier] for subject, permissions in config.items(section): permissions = filter(None, [p.strip() for p in permissions.split(',')]) # Expand group if subject.startswith('@'): for member in groups[subject[1:]]: rule[member] = permissions else: rule[subject] = permissions def access(self, authname, rtype, ridentifier): for type in (rtype, '*'): if type not in self.rules: continue subset = self.rules[type] for id, sub in sorted(subset.iteritems(), key=lambda k: -len(k[0])): if fnmatch(ridentifier, id): for user in (authname, '*'): if user in sub: return sub[user] return [] class AuthzPolicy(Component): implements(IPermissionPolicy) group_providers = ExtensionPoint(IPermissionGroupProvider) # Internal methods def _load_policy(self): self.file = self.env.config.get('authz_policy', 'authz_file') self.env.log.debug('Loading authz policy from %s' % self.file) self.authz = Authz(self.env, self.file or None) self.mtime = self.file and os.path.getmtime(self.file) def __init__(self): self._load_policy() # IPermissionPolicy methods def check_permission(self, username, action, context): id = self.compose_lookup(context) if self.file and os.path.getmtime(self.file) != self.mtime: self._load_policy() # Resource identifiers end up as : separated string. eg. # attachment:wiki:WikiStart:some_attachment.jpg if isinstance(id[1], (tuple, list)): id = (id[0], ':'.join([unicode(i).strip() for i in id[1]])) else: id = (id[0], unicode(id[1])) # Expand users subjects = [username] for provider in self.group_providers: subjects += list(provider.get_permission_groups(username)) permissions = set() for subject in subjects: permissions.update(self.authz.access(subject, id[0], id[1])) permissions = PermissionSystem(self.env).expand_actions(permissions) return action in permissions # Internal methods def compose_lookup(self, context): components = [] while context: components.append(context.id or '*') components.append(context.realm or '*') context = context.parent realm = components.pop() return realm, ':'.join(reversed(components))