如何使用python查询Prometheus监控数据

一.环境准备

软件库包

版本

python

3.8

prometheus-api-client

0.5.7

二.安装步骤

代码语言:python代码运行次数:0运行复制
pip3 install prometheus-api-client

默认安装最新版本的prometheus-api-client

三.开发调试

在该代码中,或许30min内的时间序列监控数据,步长为1m。

代码语言:python代码运行次数:0运行复制
import json
from datetime import datetime, timedelta
from prometheus_api_client import PrometheusConnect
import requests
        
#query中填写Prometheus的查询语句       
QUERY = ''
PROMETHEUS_URL = "http://127.0.0.1:9090"
        
def get_prometheus_data(query):
    # 设置时间范围(最近 30 分钟)
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(minutes=30)

    prom = PrometheusConnect(PROMETHEUS_URL, disable_ssl=True)
    # 获取时间序列数据
    result = prom.custom_query_range(
        query=query,
        start_time=start_time,
        end_time=end_time,
        step="1m"
    )
    
    #打印获取到的监控数据
    print(result)
    
    latest_by_mountpoint = {}
    
    #解析返回的监控数据
    for series in result:
        metric = series.get("metric", {})
        mountpoint = metric.get("mountpoint", "unknown")
        values = series.get("values", [])
        if not values:
            continue
        # 找出时间最新的一条数据
        # x = [timestamp, value]
        latest_point = max(values, key=lambda x: float(x[0]))  

        latest_by_mountpoint[mountpoint] = {
            "timestamp": datetime.utcfromtimestamp(float(latest_point[0])),
            "usage_percent": float(latest_point[1]),
            "instance": metric.get("instance", "unknown")
        }
        
        # todo 告警解析后结合自身业务逻辑进行后续处理......

四.问题处理

FAQ:运行程序报错AttributeError: np.float_ was removed in the NumPy 2.0 release. Use np.float64 instead.

原因:python环境使用的是numpy 2.0及以上版本,在prometheus-api-client库中使用的是1.x版本。导致程序运行报错。

解决办法:降级numpy到1.x版本。

代码语言:python代码运行次数:0运行复制
#手动安装低于2.0版本的numpy
pip install "numpy<2.0"
#安装指定版本的numpy
pip install numpy==1.26.4
#安装完成后,可以验证已安装的版本
python3 -c "import numpy; print(numpy.__version__)"