Source code for ironic.api.validation

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""API request/response validating middleware."""

import functools
import inspect
import typing as ty

from oslo_serialization import jsonutils
from webob import exc as webob_exc

from ironic import api
from ironic.api.validation import validators
from ironic.common.i18n import _


[docs] def api_version( min_version: ty.Optional[int], max_version: ty.Optional[int] = None, message: ty.Optional[str] = None, exception_class: ty.Type[webob_exc.HTTPException] = webob_exc.HTTPNotFound, ): """Decorator for marking lower and upper bounds on API methods. :param min_version: An integer representing the minimum API version that the API is available under. :param max_version: An integer representing the maximum API version that the API is available under. :param message: A message to return if the API is not supported. :param exception_class: The exception class to raise if the API version is not supported (default is HTTPNotFound). """ # Ensure the provided status code is valid for the given exception class assert isinstance( exception_class, type(webob_exc.HTTPException) ), ( "Invalid exception class provided, must be a " "subclass of webob_exc.HTTPException." ) def add_validator(func): @functools.wraps(func) def wrapper(*args, **kwargs): # Version checks if ( min_version and not api.request.version.minor >= min_version ) or ( max_version and not api.request.version.minor <= max_version ): # Raise provided exception with localized message raise exception_class( detail=_( message or 'The API is not supported for this version' ) ) return func(*args, **kwargs) wrapper.min_version = min_version wrapper.max_version = max_version return wrapper return add_validator
[docs] class Schemas: """A microversion-aware schema container. Allow definition and retrieval of schemas on a microversion-aware basis. """ def __init__(self) -> None: self._schemas: list[ tuple[dict[str, object], ty.Optional[int], ty.Optional[int]] ] = []
[docs] def add_schema( self, schema: tuple[dict[str, object]], min_version: ty.Optional[int], max_version: ty.Optional[int], ) -> None: self._schemas.append((schema, min_version, max_version))
[docs] def __call__(self) -> ty.Optional[dict[str, object]]: for schema, min_version, max_version in self._schemas: if ( min_version and not api.request.version.minor >= min_version ) or ( max_version and not api.request.version.minor <= max_version ): continue return schema return None
def _schema_validator( schema: ty.Dict[str, ty.Any], target: ty.Dict[str, ty.Any], min_version: ty.Optional[int], max_version: ty.Optional[int], is_body: bool = True, ): """A helper method to execute JSON Schema Validation. This method checks the request version whether matches the specified ``max_version`` and ``min_version``. If the version range matches the request, we validate ``schema`` against ``target``. A failure will result in ``ValidationError`` being raised. :param schema: The JSON Schema schema used to validate the target. :param target: The target to be validated by the schema. :param min_version: An integer indicating the minimum API version ``schema`` applies against. :param max_version: An integer indicating the maximum API version ``schema`` applies against. :param args: Positional arguments which passed into original method. :param kwargs: Keyword arguments which passed into original method. :param is_body: Whether ``target`` is a HTTP request body or not. :returns: None. :raises: ``ValidationError`` if validation fails. """ # Only validate against the schema if it lies within # the version range specified. Note that if both min # and max are not specified the validator will always # be run. if ( (min_version and api.request.version.minor < min_version) or (max_version and api.request.version.minor > max_version) ): return schema_validator = validators.SchemaValidator(schema, is_body=is_body) schema_validator.validate(target) def _extract_parameters(function): sig = inspect.signature(function) params = [] for param in sig.parameters.values(): if param.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD: if param.name == 'self': # skip validating self continue params.append(param) return params
[docs] def request_parameter_schema( schema: ty.Dict[str, ty.Any], min_version: ty.Optional[int] = None, max_version: ty.Optional[int] = None, ): """Decorator for registering a request parameter schema on API methods. ``schema`` will be used for validating request parameters just before the API method is executed. :param schema: The JSON Schema schema used to validate the target. :param min_version: An integer indicating the minimum API version ``schema`` applies against. :param max_version: An integer indicating the maximum API version ``schema`` applies against. """ def add_validator(func): @functools.wraps(func) def wrapper(*args, **kwargs): # we need to convert positional arguments to a dict mapping token # name to value so that we have a reference to compare against parameters = _extract_parameters(func) if func.__name__ in ('patch', 'post'): # if this a create or update method, we need to ignore the # request body parameter parameters = parameters[:-1] parameters = { p.name: args[i + 1] for i, p in enumerate(parameters) if p.name != '_' and p.default is p.empty } _schema_validator( schema, parameters, min_version, max_version, is_body=True, ) return func(*args, **kwargs) if not hasattr(wrapper, 'request_parameter_schemas'): wrapper.request_parameter_schemas = Schemas() wrapper.request_parameter_schemas .add_schema( schema, min_version, max_version ) return wrapper return add_validator
[docs] def request_query_schema( schema: ty.Dict[str, ty.Any], min_version: ty.Optional[int] = None, max_version: ty.Optional[int] = None, ): """Decorator for registering a request query string schema on API methods. ``schema`` will be used for validating request query strings just before the API method is executed. :param schema: The JSON Schema schema used to validate the target. :param min_version: An integer indicating the minimum API version ``schema`` applies against. :param max_version: An integer indicating the maximum API version ``schema`` applies against. """ def add_validator(func): @functools.wraps(func) def wrapper(*args, **kwargs): _schema_validator( schema, kwargs, min_version, max_version, is_body=True, ) return func(*args, **kwargs) if not hasattr(wrapper, 'request_query_schemas'): wrapper.request_query_schemas = Schemas() wrapper.request_query_schemas .add_schema( schema, min_version, max_version ) return wrapper return add_validator
[docs] def request_body_schema( schema: ty.Dict[str, ty.Any], min_version: ty.Optional[str] = None, max_version: ty.Optional[str] = None, ): """Decorator for registering a request body schema on API methods. ``schema`` will be used for validating the request body just before the API method is executed. :param schema: The JSON Schema schema used to validate the target. :param min_version: A string indicating the minimum API version ``schema`` applies against. :param max_version: A string indicating the maximum API version ``schema`` applies against. """ def add_validator(func): @functools.wraps(func) def wrapper(*args, **kwargs): parameters = _extract_parameters(func) if not parameters: # TODO(stephenfin): this would be a better check if we # distinguished between 'create' operations (which should have # at least one parameter, the body) and "update" operations # (which should have at least two, the IDs and the body) raise RuntimeError( 'The ironic.api.method.body decorator must be placed ' 'outside the validation helpers to ensure it runs first.' ) _schema_validator( schema, # The body argument will always be the last one kwargs[parameters[-1].name], min_version, max_version, is_body=True, ) return func(*args, **kwargs) if not hasattr(wrapper, 'request_body_schemas'): wrapper.request_body_schemas = Schemas() wrapper.request_body_schemas .add_schema( schema, min_version, max_version ) return wrapper return add_validator
[docs] def response_body_schema( schema: ty.Dict[str, ty.Any], min_version: ty.Optional[str] = None, max_version: ty.Optional[str] = None, ): """Decorator for registering a response body schema on API methods. ``schema`` will be used for validating the response body just after the API method is executed. :param schema: The JSON Schema schema used to validate the target. :param min_version: A string indicating the minimum API version ``schema`` applies against. :param max_version: A string indicating the maximum API version ``schema`` applies against. """ def add_validator(func): @functools.wraps(func) def wrapper(*args, **kwargs): response = func(*args, **kwargs) # FIXME(stephenfin): How is ironic/pecan doing jsonification? The # below will fail on e.g. date-time fields # NOTE(stephenfin): If our response is an object, we need to # serialize and deserialize to convert e.g. date-time to strings _body = jsonutils.dumps(response) if _body == b'': body = None else: body = jsonutils.loads(_body) _schema_validator( schema, body, min_version, max_version, is_body=True, ) return response if not hasattr(wrapper, 'response_body_schemas'): wrapper.response_body_schemas = Schemas() wrapper.response_body_schemas .add_schema( schema, min_version, max_version ) return wrapper return add_validator