# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import operator
import re
import netaddr
from oslo_log import log
from ironic.common import exception
from ironic.common.i18n import _
from ironic.common.inspection_rules import base
from ironic.common.inspection_rules import utils
LOG = log.getLogger(__name__)
OPERATORS = {
"eq": "EqOperator",
"lt": "LtOperator",
"gt": "GtOperator",
"is-empty": "EmptyOperator",
"in-net": "NetOperator",
"matches": "MatchesOperator",
"contains": "ContainsOperator",
"one-of": "OneOfOperator",
"is-none": "IsNoneOperator",
"is-true": "IsTrueOperator",
"is-false": "IsFalseOperator",
}
[docs]
def get_operator(op_name):
"""Get operator class by name."""
class_name = OPERATORS[op_name]
return globals()[class_name]
[docs]
def coerce(value, expected):
if isinstance(expected, float):
return float(value)
elif isinstance(expected, int):
return int(value)
else:
return value
[docs]
class OperatorBase(base.Base, metaclass=abc.ABCMeta):
"""Abstract base class for rule condition plugins."""
[docs]
@abc.abstractmethod
def __call__(self, task, *args, **kwargs):
"""Checks if condition holds for a given field."""
[docs]
def check_with_loop(self, task, condition, inventory, plugin_data):
loop_items = condition.get('loop', [])
multiple = condition.get('multiple', 'any')
results = []
if isinstance(loop_items, (list, dict)):
for item in loop_items:
condition_copy = condition.copy()
condition_copy['args'] = item
result = self.check_condition(task, condition_copy,
inventory, plugin_data)
results.append(result)
if multiple == 'first' and result:
return True
elif multiple == 'last':
results = [result]
if multiple == 'any':
return any(results)
elif multiple == 'all':
return all(results)
return results[0] if results else False
return self.check_condition(task, condition, inventory, plugin_data)
[docs]
def check_condition(self, task, condition, inventory, plugin_data):
"""Process condition arguments and apply the check logic.
:param task: TaskManger instance
:param condition: condition to check
:param args: parameters as a dictionary, changing it here will change
what will be stored in database
:param kwargs: used for extensibility without breaking existing plugins
:raises InspectionRuleExecutionFailure: on unacceptable field value
:returns: True if check succeeded, otherwise False
"""
op, is_inverted = utils.parse_inverted_operator(
condition['op'])
processed_args = self._process_args(task, condition, inventory,
plugin_data)
result = self(task, **processed_args)
return not result if is_inverted else result
[docs]
class SimpleOperator(OperatorBase):
op = None
[docs]
def __call__(self, task, values, force_strings=False):
if not isinstance(values, list):
msg = _("Failed to check condition: '%(op)s' on values: "
"%(values)s: Expected list for 'values', got: "
"%(invalid_type)s") % {
'op': self.op.__name__, 'values': values,
"invalid_type": type(values).__name__}
LOG.error(msg)
raise exception.RuleConditionCheckFailure(reason=msg)
if len(values) < 2:
return True
if force_strings:
values = [coerce(value, str) for value in values]
return all(self.op(values[i], values[i + 1])
for i in range(len(values) - 1))
[docs]
class EqOperator(SimpleOperator):
op = operator.eq
[docs]
class LtOperator(SimpleOperator):
op = operator.lt
[docs]
class GtOperator(SimpleOperator):
op = operator.gt
[docs]
class EmptyOperator(OperatorBase):
FORMATTED_ARGS = ['value']
[docs]
def __call__(self, task, value):
return str(value) in ("", 'None', '[]', '{}')
[docs]
class NetOperator(OperatorBase):
FORMATTED_ARGS = ['address', 'subnet']
[docs]
def __call__(self, task, address, subnet):
try:
network = netaddr.IPNetwork(subnet)
except netaddr.AddrFormatError as exc:
raise exception.InspectionRuleExecutionFailure(
_('invalid value: %s') % exc)
return netaddr.IPAddress(address) in network
[docs]
class IsTrueOperator(OperatorBase):
FORMATTED_ARGS = ['value']
[docs]
def __call__(self, task, value):
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return bool(value)
if isinstance(value, str):
return value.lower() in ('yes', 'true')
return False
[docs]
class IsFalseOperator(OperatorBase):
FORMATTED_ARGS = ['value']
[docs]
def __call__(self, task, value):
if isinstance(value, bool):
return not value
if isinstance(value, (int, float)):
return not bool(value)
if isinstance(value, str):
return value.lower() in ('no', 'false')
return value is None
[docs]
class IsNoneOperator(OperatorBase):
FORMATTED_ARGS = ['value']
[docs]
def __call__(self, task, value):
return str(value) == 'None'
[docs]
class OneOfOperator(OperatorBase):
FORMATTED_ARGS = ['value']
[docs]
def __call__(self, task, value, values=[]):
return value in values
[docs]
class ReOperator(OperatorBase):
FORMATTED_ARGS = ['value']
[docs]
def validate_regex(self, regex):
try:
re.compile(regex)
except re.error as exc:
raise exception.InspectionRuleExecutionFailure(
_('invalid regular expression: %s') % exc)
[docs]
class MatchesOperator(ReOperator):
[docs]
def __call__(self, task, value, regex):
self.validate_regex(regex)
if regex[-1] != '$':
regex += '$'
return re.match(regex, str(value)) is not None
[docs]
class ContainsOperator(ReOperator):
[docs]
def __call__(self, task, value, regex):
self.validate_regex(regex)
return re.search(regex, str(value)) is not None