# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This library allows export of metrics data to `Prometheus <https://prometheus.io/>`_.
Usage
-----
The **OpenTelemetry Prometheus Exporter** allows export of `OpenTelemetry`_
metrics to `Prometheus`_.
.. _Prometheus: https://prometheus.io/
.. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/
.. code:: python
from prometheus_client import start_http_server
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from opentelemetry.metrics import get_meter_provider, set_meter_provider
from opentelemetry.sdk.metrics import MeterProvider
# Start Prometheus client
start_http_server(port=8000, addr="localhost")
# Exporter to export metrics to Prometheus
prefix = "MyAppPrefix"
reader = PrometheusMetricReader(prefix=prefix)
# Meter is responsible for creating and recording metrics
set_meter_provider(MeterProvider(metric_readers=[reader]))
meter = get_meter_provider().get_meter("myapp", "0.1.2")
counter = meter.create_counter(
"requests",
"requests",
"number of requests",
)
# Labels are used to identify key-values that are associated with a specific
# metric that you want to record. These are useful for pre-aggregation and can
# be used to store custom dimensions pertaining to a metric
labels = {"environment": "staging"}
counter.add(25, labels)
input("Press any key to exit...")
API
---
"""
from collections import deque
from itertools import chain
from json import dumps
from logging import getLogger
from os import environ
from typing import Deque, Dict, Iterable, Sequence, Tuple, Union
from prometheus_client import start_http_server
from prometheus_client.core import (
REGISTRY,
CounterMetricFamily,
GaugeMetricFamily,
HistogramMetricFamily,
InfoMetricFamily,
)
from prometheus_client.core import Metric as PrometheusMetric
from opentelemetry.exporter.prometheus._mapping import (
map_unit,
sanitize_attribute,
sanitize_full_name,
)
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_PROMETHEUS_HOST,
OTEL_EXPORTER_PROMETHEUS_PORT,
)
from opentelemetry.sdk.metrics import (
Counter,
ObservableCounter,
ObservableGauge,
ObservableUpDownCounter,
UpDownCounter,
)
from opentelemetry.sdk.metrics import Histogram as HistogramInstrument
from opentelemetry.sdk.metrics.export import (
AggregationTemporality,
Gauge,
Histogram,
HistogramDataPoint,
MetricReader,
MetricsData,
Sum,
)
from opentelemetry.semconv._incubating.attributes.otel_attributes import (
OtelComponentTypeValues,
)
from opentelemetry.util.types import Attributes
_logger = getLogger(__name__)
_TARGET_INFO_NAME = "target"
_TARGET_INFO_DESCRIPTION = "Target metadata"
def _convert_buckets(
bucket_counts: Sequence[int], explicit_bounds: Sequence[float]
) -> Sequence[Tuple[str, int]]:
buckets = []
total_count = 0
for upper_bound, count in zip(
chain(explicit_bounds, ["+Inf"]),
bucket_counts,
):
total_count += count
buckets.append((f"{upper_bound}", total_count))
return buckets
[docs]
class PrometheusMetricReader(MetricReader):
"""Prometheus metric exporter for OpenTelemetry."""
def __init__(
self, disable_target_info: bool = False, prefix: str = ""
) -> None:
super().__init__(
preferred_temporality={
Counter: AggregationTemporality.CUMULATIVE,
UpDownCounter: AggregationTemporality.CUMULATIVE,
HistogramInstrument: AggregationTemporality.CUMULATIVE,
ObservableCounter: AggregationTemporality.CUMULATIVE,
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
ObservableGauge: AggregationTemporality.CUMULATIVE,
},
otel_component_type=OtelComponentTypeValues.PROMETHEUS_HTTP_TEXT_METRIC_EXPORTER,
)
self._collector = _CustomCollector(
disable_target_info=disable_target_info, prefix=prefix
)
REGISTRY.register(self._collector)
self._collector._callback = self.collect
self._prefix = prefix
def _receive_metrics(
self,
metrics_data: MetricsData,
timeout_millis: float = 10_000,
**kwargs,
) -> None:
if metrics_data is None:
return
self._collector.add_metrics_data(metrics_data)
[docs]
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
REGISTRY.unregister(self._collector)
class _CustomCollector:
"""_CustomCollector represents the Prometheus Collector object
See more:
https://github.com/prometheus/client_python#custom-collectors
"""
def __init__(self, disable_target_info: bool = False, prefix: str = ""):
self._callback = None
self._metrics_datas: Deque[MetricsData] = deque()
self._disable_target_info = disable_target_info
self._target_info = None
self._prefix = prefix
def add_metrics_data(self, metrics_data: MetricsData) -> None:
"""Add metrics to Prometheus data"""
self._metrics_datas.append(metrics_data)
def collect(self) -> Iterable[PrometheusMetric]:
"""Collect fetches the metrics from OpenTelemetry
and delivers them as Prometheus Metrics.
Collect is invoked every time a ``prometheus.Gatherer`` is run
for example when the HTTP endpoint is invoked by Prometheus.
"""
if self._callback is not None:
self._callback()
metric_family_id_metric_family = {}
if len(self._metrics_datas):
if not self._disable_target_info:
if self._target_info is None:
attributes: Attributes = {}
for res in self._metrics_datas[0].resource_metrics:
attributes = {**attributes, **res.resource.attributes}
self._target_info = self._create_info_metric(
_TARGET_INFO_NAME, _TARGET_INFO_DESCRIPTION, attributes
)
metric_family_id_metric_family[_TARGET_INFO_NAME] = (
self._target_info
)
while self._metrics_datas:
self._translate_to_prometheus(
self._metrics_datas.popleft(), metric_family_id_metric_family
)
if metric_family_id_metric_family:
yield from metric_family_id_metric_family.values()
# pylint: disable=too-many-locals,too-many-branches
def _translate_to_prometheus(
self,
metrics_data: MetricsData,
metric_family_id_metric_family: Dict[str, PrometheusMetric],
):
metrics = []
for resource_metrics in metrics_data.resource_metrics:
for scope_metrics in resource_metrics.scope_metrics:
for metric in scope_metrics.metrics:
metrics.append(metric)
for metric in metrics:
label_values_data_points = []
values = []
metric_name = metric.name
if self._prefix:
metric_name = self._prefix + "_" + metric_name
metric_name = sanitize_full_name(metric_name)
metric_description = metric.description or ""
metric_unit = map_unit(metric.unit)
# First pass: collect all unique label keys across all data points
all_label_keys_set = set()
data_point_attributes = []
for number_data_point in metric.data.data_points:
attrs = {}
for key, value in number_data_point.attributes.items():
sanitized_key = sanitize_attribute(key)
all_label_keys_set.add(sanitized_key)
attrs[sanitized_key] = self._check_value(value)
data_point_attributes.append(attrs)
if isinstance(number_data_point, HistogramDataPoint):
values.append(
{
"bucket_counts": number_data_point.bucket_counts,
"explicit_bounds": (
number_data_point.explicit_bounds
),
"sum": number_data_point.sum,
}
)
else:
values.append(number_data_point.value)
# Sort label keys for consistent ordering
all_label_keys = sorted(all_label_keys_set)
# Second pass: build label values with empty strings for missing labels
for attrs in data_point_attributes:
label_values = []
for key in all_label_keys:
label_values.append(attrs.get(key, ""))
label_values_data_points.append(label_values)
# Create metric family ID without label keys
per_metric_family_id = "|".join(
[
metric_name,
metric_description,
metric_unit,
]
)
is_non_monotonic_sum = (
isinstance(metric.data, Sum)
and metric.data.is_monotonic is False
)
is_cumulative = (
isinstance(metric.data, Sum)
and metric.data.aggregation_temporality
== AggregationTemporality.CUMULATIVE
)
# The prometheus compatibility spec for sums says: If the aggregation temporality is cumulative and the sum is non-monotonic, it MUST be converted to a Prometheus Gauge.
should_convert_sum_to_gauge = (
is_non_monotonic_sum and is_cumulative
)
if (
isinstance(metric.data, Sum)
and not should_convert_sum_to_gauge
):
metric_family_id = "|".join(
[per_metric_family_id, CounterMetricFamily.__name__]
)
if metric_family_id not in metric_family_id_metric_family:
metric_family_id_metric_family[metric_family_id] = (
CounterMetricFamily(
name=metric_name,
documentation=metric_description,
labels=all_label_keys,
unit=metric_unit,
)
)
for label_values, value in zip(
label_values_data_points, values
):
metric_family_id_metric_family[
metric_family_id
].add_metric(labels=label_values, value=value)
elif isinstance(metric.data, Gauge) or should_convert_sum_to_gauge:
metric_family_id = "|".join(
[per_metric_family_id, GaugeMetricFamily.__name__]
)
if (
metric_family_id
not in metric_family_id_metric_family.keys()
):
metric_family_id_metric_family[metric_family_id] = (
GaugeMetricFamily(
name=metric_name,
documentation=metric_description,
labels=all_label_keys,
unit=metric_unit,
)
)
for label_values, value in zip(
label_values_data_points, values
):
metric_family_id_metric_family[
metric_family_id
].add_metric(labels=label_values, value=value)
elif isinstance(metric.data, Histogram):
metric_family_id = "|".join(
[per_metric_family_id, HistogramMetricFamily.__name__]
)
if (
metric_family_id
not in metric_family_id_metric_family.keys()
):
metric_family_id_metric_family[metric_family_id] = (
HistogramMetricFamily(
name=metric_name,
documentation=metric_description,
labels=all_label_keys,
unit=metric_unit,
)
)
for label_values, value in zip(
label_values_data_points, values
):
metric_family_id_metric_family[
metric_family_id
].add_metric(
labels=label_values,
buckets=_convert_buckets(
value["bucket_counts"], value["explicit_bounds"]
),
sum_value=value["sum"],
)
else:
_logger.warning(
"Unsupported metric data. %s", type(metric.data)
)
# pylint: disable=no-self-use
def _check_value(self, value: Union[int, float, str, Sequence]) -> str:
"""Check the label value and return is appropriate representation"""
if not isinstance(value, str):
return dumps(value, default=str)
return str(value)
def _create_info_metric(
self, name: str, description: str, attributes: Dict[str, str]
) -> InfoMetricFamily:
"""Create an Info Metric Family with list of attributes"""
# sanitize the attribute names according to Prometheus rule
attributes = {
sanitize_attribute(key): self._check_value(value)
for key, value in attributes.items()
}
info = InfoMetricFamily(name, description, labels=attributes)
info.add_metric(labels=list(attributes.keys()), value=attributes)
return info
class _AutoPrometheusMetricReader(PrometheusMetricReader):
"""Thin wrapper around PrometheusMetricReader used for the opentelemetry_metrics_exporter entry point.
This allows users to use the prometheus exporter with opentelemetry-instrument. It handles
starting the Prometheus http server on the the correct port and host.
"""
def __init__(self) -> None:
super().__init__()
# Default values are specified in
# https://github.com/open-telemetry/opentelemetry-specification/blob/v1.24.0/specification/configuration/sdk-environment-variables.md#prometheus-exporter
start_http_server(
port=int(environ.get(OTEL_EXPORTER_PROMETHEUS_PORT, "9464")),
addr=environ.get(OTEL_EXPORTER_PROMETHEUS_HOST, "localhost"),
)