Source code for heat.common.policy
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 OpenStack, LLC.
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.
# Based on glance/api/policy.py
"""Policy Engine For Heat"""
import json
import os.path
from oslo.config import cfg
from heat.common import exception
import heat.openstack.common.log as logging
from heat.openstack.common import policy
logger = logging.getLogger(__name__)
policy_opts = [
    cfg.StrOpt('policy_file', default='policy.json'),
    cfg.StrOpt('policy_default_rule', default='default'),
]
CONF = cfg.CONF
CONF.register_opts(policy_opts)
DEFAULT_RULES = {
    'default': policy.TrueCheck(),
}
[docs]class Enforcer(object):
    """Responsible for loading and enforcing rules"""
    def __init__(self, scope='heat', exc=exception.Forbidden):
        self.scope = scope
        self.exc = exc
        self.default_rule = CONF.policy_default_rule
        self.policy_path = self._find_policy_file()
        self.policy_file_mtime = None
        self.policy_file_contents = None
[docs]    def set_rules(self, rules):
        """Create a new Rules object based on the provided dict of rules"""
        rules_obj = policy.Rules(rules, self.default_rule)
        policy.set_rules(rules_obj)
 
[docs]    def load_rules(self):
        """Set the rules found in the json file on disk"""
        if self.policy_path:
            rules = self._read_policy_file()
            rule_type = ""
        else:
            rules = DEFAULT_RULES
            rule_type = "default "
        text_rules = dict((k, str(v)) for k, v in rules.items())
        self.set_rules(rules)
 
    @staticmethod
    def _find_policy_file():
        """Locate the policy json data file"""
        policy_file = CONF.find_file(CONF.policy_file)
        if policy_file:
            return policy_file
        else:
            logger.warn(_('Unable to find policy file'))
            return None
    def _read_policy_file(self):
        """Read contents of the policy file
        This re-caches policy data if the file has been changed.
        """
        mtime = os.path.getmtime(self.policy_path)
        if not self.policy_file_contents or mtime != self.policy_file_mtime:
            logger.debug(_("Loading policy from %s") % self.policy_path)
            with open(self.policy_path) as fap:
                raw_contents = fap.read()
                rules_dict = json.loads(raw_contents)
                self.policy_file_contents = dict(
                    (k, policy.parse_rule(v))
                    for k, v in rules_dict.items())
            self.policy_file_mtime = mtime
        return self.policy_file_contents
    def _check(self, context, rule, target, *args, **kwargs):
        """Verifies that the action is valid on the target in this context.
           :param context: Heat request context
           :param rule: String representing the action to be checked
           :param object: Dictionary representing the object of the action.
           :raises: self.exc (defaults to heat.common.exception.Forbidden)
           :returns: A non-False value if access is allowed.
        """
        self.load_rules()
        credentials = {
            'roles': context.roles,
            'user': context.username,
            'tenant': context.tenant,
        }
        return policy.check(rule, target, credentials, *args, **kwargs)
[docs]    def enforce(self, context, action, target):
        """Verifies that the action is valid on the target in this context.
           :param context: Heat request context
           :param action: String representing the action to be checked
           :param object: Dictionary representing the object of the action.
           :raises: self.exc (defaults to heat.common.exception.Forbidden)
           :returns: A non-False value if access is allowed.
        """
        _action = '%s:%s' % (self.scope, action)
        return self._check(context, _action, target, self.exc, action=action)
 
[docs]    def check(self, context, action, target):
        """Verifies that the action is valid on the target in this context.
           :param context: Heat request context
           :param action: String representing the action to be checked
           :param object: Dictionary representing the object of the action.
           :returns: A non-False value if access is allowed.
        """
        return self._check(context, action, target)