# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABC, abstractmethod
from collections import defaultdict
from random import randrange
from typing import (
Any,
Callable,
Dict,
List,
Mapping,
Optional,
Sequence,
Union,
)
from opentelemetry import trace
from opentelemetry.context import Context
from opentelemetry.trace.span import INVALID_SPAN
from opentelemetry.util.types import Attributes
from .exemplar import Exemplar
[docs]
class ExemplarReservoir(ABC):
"""ExemplarReservoir provide a method to offer measurements to the reservoir
and another to collect accumulated Exemplars.
Note:
The constructor MUST accept ``**kwargs`` that may be set from aggregation
parameters.
Reference:
https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exemplarreservoir
"""
[docs]
@abstractmethod
def offer(
self,
value: Union[int, float],
time_unix_nano: int,
attributes: Attributes,
context: Context,
) -> None:
"""Offers a measurement to be sampled.
Args:
value: Measured value
time_unix_nano: Measurement instant
attributes: Measurement attributes
context: Measurement context
"""
raise NotImplementedError("ExemplarReservoir.offer is not implemented")
[docs]
@abstractmethod
def collect(self, point_attributes: Attributes) -> List[Exemplar]:
"""Returns accumulated Exemplars and also resets the reservoir for the next
sampling period
Args:
point_attributes: The attributes associated with metric point.
Returns:
a list of ``opentelemetry.sdk.metrics._internal.exemplar.exemplar.Exemplar`` s. Returned
exemplars contain the attributes that were filtered out by the aggregator,
but recorded alongside the original measurement.
"""
raise NotImplementedError(
"ExemplarReservoir.collect is not implemented"
)
class ExemplarBucket:
def __init__(self) -> None:
self.__value: Union[int, float] = 0
self.__attributes: Attributes = None
self.__time_unix_nano: int = 0
self.__span_id: Optional[int] = None
self.__trace_id: Optional[int] = None
self.__offered: bool = False
def offer(
self,
value: Union[int, float],
time_unix_nano: int,
attributes: Attributes,
context: Context,
) -> None:
"""Offers a measurement to be sampled.
Args:
value: Measured value
time_unix_nano: Measurement instant
attributes: Measurement attributes
context: Measurement context
"""
self.__value = value
self.__time_unix_nano = time_unix_nano
self.__attributes = attributes
span = trace.get_current_span(context)
if span != INVALID_SPAN:
span_context = span.get_span_context()
self.__span_id = span_context.span_id
self.__trace_id = span_context.trace_id
self.__offered = True
def collect(self, point_attributes: Attributes) -> Optional[Exemplar]:
"""May return an Exemplar and resets the bucket for the next sampling period."""
if not self.__offered:
return None
# filters out attributes from the measurement that are already included in the metric data point
# See the specification for more details:
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exemplar
filtered_attributes = (
{
k: v
for k, v in self.__attributes.items()
if k not in point_attributes
}
if self.__attributes
else None
)
exemplar = Exemplar(
filtered_attributes,
self.__value,
self.__time_unix_nano,
self.__span_id,
self.__trace_id,
)
self.__reset()
return exemplar
def __reset(self) -> None:
"""Reset the bucket state after a collection cycle."""
self.__value = 0
self.__attributes = {}
self.__time_unix_nano = 0
self.__span_id = None
self.__trace_id = None
self.__offered = False
class BucketIndexError(ValueError):
"""An exception raised when the bucket index cannot be found."""
class FixedSizeExemplarReservoirABC(ExemplarReservoir):
"""Abstract class for a reservoir with fixed size."""
def __init__(self, size: int, **kwargs) -> None:
super().__init__(**kwargs)
self._size: int = size
self._reservoir_storage: Mapping[int, ExemplarBucket] = defaultdict(
ExemplarBucket
)
def collect(self, point_attributes: Attributes) -> List[Exemplar]:
"""Returns accumulated Exemplars and also resets the reservoir for the next
sampling period
Args:
point_attributes: The attributes associated with metric point.
Returns:
a list of ``opentelemetry.sdk.metrics._internal.exemplar.exemplar.Exemplar`` s. Returned
exemplars contain the attributes that were filtered out by the aggregator,
but recorded alongside the original measurement.
"""
exemplars = [
e
for e in (
bucket.collect(point_attributes)
for _, bucket in sorted(self._reservoir_storage.items())
)
if e is not None
]
self._reset()
return exemplars
def offer(
self,
value: Union[int, float],
time_unix_nano: int,
attributes: Attributes,
context: Context,
) -> None:
"""Offers a measurement to be sampled.
Args:
value: Measured value
time_unix_nano: Measurement instant
attributes: Measurement attributes
context: Measurement context
"""
try:
index = self._find_bucket_index(
value, time_unix_nano, attributes, context
)
self._reservoir_storage[index].offer(
value, time_unix_nano, attributes, context
)
except BucketIndexError:
# Ignore invalid bucket index
pass
@abstractmethod
def _find_bucket_index(
self,
value: Union[int, float],
time_unix_nano: int,
attributes: Attributes,
context: Context,
) -> int:
"""Determines the bucket index for the given measurement.
It should be implemented by subclasses based on specific strategies.
Args:
value: Measured value
time_unix_nano: Measurement instant
attributes: Measurement attributes
context: Measurement context
Returns:
The bucket index
Raises:
BucketIndexError: If no bucket index can be found.
"""
def _reset(self) -> None:
"""Reset the reservoir by resetting any stateful logic after a collection cycle."""
[docs]
class SimpleFixedSizeExemplarReservoir(FixedSizeExemplarReservoirABC):
"""This reservoir uses an uniformly-weighted sampling algorithm based on the number
of samples the reservoir has seen so far to determine if the offered measurements
should be sampled.
Reference:
https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#simplefixedsizeexemplarreservoir
"""
def __init__(self, size: int = 1, **kwargs) -> None:
super().__init__(size, **kwargs)
self._measurements_seen: int = 0
def _reset(self) -> None:
super()._reset()
self._measurements_seen = 0
def _find_bucket_index(
self,
value: Union[int, float],
time_unix_nano: int,
attributes: Attributes,
context: Context,
) -> int:
self._measurements_seen += 1
if self._measurements_seen < self._size:
return self._measurements_seen - 1
index = randrange(0, self._measurements_seen)
if index < self._size:
return index
raise BucketIndexError("Unable to find the bucket index.")
[docs]
class AlignedHistogramBucketExemplarReservoir(FixedSizeExemplarReservoirABC):
"""This Exemplar reservoir takes a configuration parameter that is the
configuration of a Histogram. This implementation keeps the last seen measurement
that falls within a histogram bucket.
Reference:
https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#alignedhistogrambucketexemplarreservoir
"""
def __init__(self, boundaries: Sequence[float], **kwargs) -> None:
super().__init__(len(boundaries) + 1, **kwargs)
self._boundaries: Sequence[float] = boundaries
[docs]
def offer(
self,
value: Union[int, float],
time_unix_nano: int,
attributes: Attributes,
context: Context,
) -> None:
"""Offers a measurement to be sampled."""
index = self._find_bucket_index(
value, time_unix_nano, attributes, context
)
self._reservoir_storage[index].offer(
value, time_unix_nano, attributes, context
)
def _find_bucket_index(
self,
value: Union[int, float],
time_unix_nano: int,
attributes: Attributes,
context: Context,
) -> int:
for index, boundary in enumerate(self._boundaries):
if value <= boundary:
return index
return len(self._boundaries)
ExemplarReservoirBuilder = Callable[[Dict[str, Any]], ExemplarReservoir]
ExemplarReservoirBuilder.__doc__ = """ExemplarReservoir builder.
It may receive the Aggregation parameters it is bounded to; e.g.
the _ExplicitBucketHistogramAggregation will provide the boundaries.
"""