在Python中使用InfluxDB

在Python中使用InfluxDB

在寻找监控基础设施或第三方应用程序时,Telegraph的内置插件成为了一个很好的选择。或者我们正在搜索系统资源,如磁盘和网络利用率,或者MySQL数据库的性能。

但是,如果我们正在创建一个应用程序,希望将用户的数据存储在时间序列数据库中呢?也许我们可以将其视为物联网(IoT)或基于智能家居的应用程序,每个用户都需要访问其智能牙刷的读数。我们希望存储每次刷牙的时间和持续时间,可以发送提醒孩子刷牙的警报,并追踪诸如电池健康状况和当前刷头使用时间的事物。

无论是为面向用户的应用程序还是为Telegraf插件尚未覆盖的基础设施需求收集自定义数据,都可能需要编写一段新的代码。

让我们以智能牙刷为例,假设我们有一个运行嵌入式Linux并与蓝牙连接的基站。我们已经编写了监听传入数据的代码块,并且似乎运行良好;现在,我们想将其存入InfluxDB中。

一种方法是与应用程序一起运行Telegraf,并通过UDP、Unix或TCP套接字发送数据,让Telegraf处理与InfluxDB的连接以及批处理和写入数据点。

如果我们只需要进行数据收集,那么这种方法是完全可以的;然而,如果我们想查询并获取用户的数据,则可能需要利用InfluxDB在应用程序中提供的不同语言库之一来处理与InfluxDB的交互。

已经存在多种语言的InfluxDB库,其中许多由其社区管理。我们将了解使用influxdb-python库的用法。

因此,让我们开始吧。

了解InfluxDB Python客户端库

InfluxDB 是一个开源的时间序列数据库或TSDB,由名为 InfluxData 的公司设计和开发。它使用Go编程语言编写,用于存储和检索包括操作监控、物联网传感器数据、应用程序指标和实时分析在内的时间序列数据。它还提供了对Graphite的数据处理支持。

Influxdb-python 库作为Python客户端与InfluxDB进行交互。该库托管在InfluxDB的GitHub帐户上,并由三个社区志愿者进行维护。

安装库

我们可以使用pip安装程序来安装Python库的最简单方式。安装influxdb库的语法如下所示:

语法:

$ pip install influxdb

一旦安装完成,我们可以通过创建一个新的Python程序文件并输入以下代码片段来验证。

语法:

from influxdb import InfluxDBClient

现在,让我们保存文件并尝试执行它。如果没有发生错误,则表示库已正确安装。然而,如果出现任何异常,请尝试重新安装或参考官方文档。

创建连接

在下一步中,我们将使用要访问的服务器的相关信息创建一个新的InfluxDBClient实例。我们可以使用以下代码片段,将host和port的值替换为InfluxDB主机的适当URL/IP地址和端口。在下面的示例中,我们在默认端口上本地运行:

示例:

# importing the required module
from influxdb import InfluxDBClient
# defining the host and port
my_Client = InfluxDBClient(host = 'localhost', port = 8086)

说明:

在上面的代码片段中,我们从influxdb库中导入了InfluxDBClient模块。然后我们使用InfluxDBClient()函数为名为my_Client的变量定义了主机和端口,其中我们分别定义了host和port参数的值。

InfluxDBClient构造函数还可用于设置其他参数,包括用户名和密码、要连接的数据库、是否使用SSL、超时和UDP参数等。

如果我们需要连接到远程主机somedomain.com的8086端口,使用用户名(比如anonymous)和密码(比如somepass),并启用SSL,我们可以使用以下代码片段来启用SSL和SSL验证,使用两个其他参数ssl=True和ssl_verify=True:

示例:

# importing the required module
from influxdb import InfluxDBClient
# defining different entities
my_Client = InfluxDBClient(
    host = 'somedomain.com',
    port = 8086,
    username = 'anonymous',
    password = "somepass",
    ssl = True,
    verify_ssl = True
    )

说明:

在上述代码段中,我们从influxdb库中导入了InfluxDBClient模块。然后使用该模块定义了host、port、username、password、ssl和verify_ssl,并将这些值存储到my_Client变量中。

现在,让我们创建一个名为mydatabase的新数据库,以便按如下所示存储数据:

示例:

# importing the required module
from influxdb import InfluxDBClient
# defining the host and port
my_Client = InfluxDBClient(
    host = 'somedomain.com',
    port = 8086,
    username = 'anonymous',
    password = "somepass",
    ssl = True,
    verify_ssl = True
    )
# creating a database
my_Client.create_database('mydatabase')

说明:

在上面的代码片段中,我们使用 my_Clientcreate_database 方法创建了一个名为 mydatabase 的新数据库。

我们可以使用 my_Clientget_list_database() 函数来检查数据库是否已创建,如下所示:

示例:

# importing the required module
from influxdb import InfluxDBClient
# defining the host and port
my_Client = InfluxDBClient(
    host = 'somedomain.com',
    port = 8086,
    username = 'anonymous',
    password = "somepass",
    ssl = True,
    verify_ssl = True
    )
# creating a database
my_Client.create_database('mydatabase')
# verifying if the database is created or not
my_Client.get_list_database()

输出:

[{'name': 'telegraf'}, {'name': '_internal'}, {'name': 'mydatabase'}]

说明:

在上述的代码片段中,我们使用了 get_list_database() 函数来验证数据库是否已创建。结果是,我们可以看到名为 mydatabase 的数据库存在,以及我们安装中的 telegraf_internal 数据库。

最后,我们可以使用以下代码片段将客户端设置为使用此数据库:

示例:

# importing the required module
from influxdb import InfluxDBClient
# defining the host and port
my_Client = InfluxDBClient(
    host = 'somedomain.com',
    port = 8086,
    username = 'anonymous',
    password = "somepass",
    ssl = True,
    verify_ssl = True
    )
# creating a database
my_Client.create_database('mydatabase')
# setting client to use specified database
my_Client.switch_database('mydatabase')

说明:

在上面的代码片段中,我们使用了 switch_database 来设置客户端使用指定的数据库,即 mydatabase。

插入数据

现在我们有了一个要写入数据的数据库并且配置了客户端,现在是时候添加一些数据了。我们将使用客户端的 write_points() 方法来做这件事。该方法接受一个点列表和一些其他参数,包括 “批量大小”(batch size), 这使我们能够批量插入数据,而不是一次性插入。我们可以使用这个方法插入大量数据。

write_points() 方法有一个名为 my_points 的参数,它是一个包含需要写入数据库的点的字典列表。现在让我们创建一些示例数据并将其插入。首先,让我们将三个点以JSON格式插入到一个名为 json_body 的变量中,如下面的代码片段所示:

示例:

json_body = [
    {
        "measurement": "brushEvents",
        "tags": {
            "user": "Derek",
            "brushId": "6a89f539-71c6-490d-a28d-6c5d84c0ee2f"
        },
        "time": "2021-08-04T8:01:00Z",
        "fields": {
            "duration": 147
        }
    },
    {
        "measurement": "brushEvents",
        "tags": {
            "user": "Derek",
            "brushId": "6a89f539-71c6-4xx90d-a28d-6c5d84c0ee2f"
        },
        "time": "2021-08-05T8:04:00Z",
        "fields": {
            "duration": 131
        }
    },
    {
        "measurement": "brushEvents",
        "tags": {
            "user": "Derek",
            "brushId": "6a89f539-71c6-490d-a28d-6c5d84c0ee2f"
        },
        "time": "2021-08-06T8:02:00Z",
        "fields": {
            "duration": 124
        }
    }
]

解释:

上述代码片段表示智能牙刷的 “刷牙事件” ,每个事件都发生在早上八点左右,标有使用牙刷的用户名和牙刷本身的ID(帮助我们跟踪使用同一刷头的时间),并包含用户使用牙刷的时长(以秒为单位)的字段。

既然我们已经设置了数据库,并且 write_points() 的默认输入为JSON,我们可以通过使用 json_body 变量作为唯一参数来调用该方法,如下所示:

示例:

my_Client.write_points(json_body)

输出:

True

解释:

在上面的代码片段中,我们使用了 write_points() 函数,并且将 json_body 变量作为参数传递进去。作为结果,如果写入操作成功,函数返回一个布尔值 true 。如果我们创建一个应用程序,我们需要自动收集这些数据,每当用户尝试与牙刷进行交互时,将数据插入到数据库中。

查询数据

一旦数据存储在数据库中,我们可以尝试使用一些查询来获取它。我们将使用与写入数据时相同的client对象,只是这次我们会在InfluxDB上执行一个查询,并使用client的 query() 函数获取结果。

示例:

my_Client.query('SELECT "duration" FROM "mydatabase"."autogen"."brushEvents" WHERE time > now() - 4d GROUP BY "user"')

解释:

在上面的代码片段中,我们使用了返回包含输出数据以及一些方便方法的 ResultSet 对象的 query() 函数。我们的查询请求的是 mydatabase 数据库中所有用户的测量数据,按用户分组。我们可以使用参数 .raw 来访问InfluxDB返回的原始JSON响应。

示例:

results.raw

输出:

{'statement_id': 0, 'series': [{'name': 'brushEvents', 'tags': {'user': 'Derek'}, 'columns': ['time', 'duration'], 'values': [['2021-08-04T08:01:00Z', 147], ['2021-08-05T08:04:00Z', 131], ['2018-08-06T08:02:00Z', 124]]}]}

说明:

在上面的代码片段中,我们使用了raw参数来访问从InfluxDB返回的原始JSON响应。在大多数情况下,我们不需要直接访问JSON。相反,我们可以利用ResultSet的get_points()方法来从请求中获取测量结果,通过标签或字段进行过滤。如果我们想遍历Derek的所有刷牙会话,我们可以使用以下命令获取所有在标签”user”下值为”Derek”的点集合:

示例:

my_points = results.get_points(tags = {'user':'Derek'})

说明:

在上面的示例中, my_points 变量是一个Python生成器,它是一个类似迭代器的函数;我们可以使用 for x in y 循环对其进行迭代,如下所示:

示例:

my_points = results.get_points(tags = {'user': 'Derek'})
for my_point in my_points:
    print("Time: %s, Duration: %i" % (my_point['time'], my_point['duration']))

输出:

Time: 2021-08-04T08:01:00Z, Duration: 147
Time: 2021-08-05T08:04:00Z, Duration: 131
Time: 2021-08-06T08:02:00Z, Duration: 124

解释:

在上面的代码片段中,我们使用for循环打印了用户每次刷牙时间和持续时间。根据应用程序的不同,我们可以迭代这些点来计算用户的平均刷牙时间,或者仅仅验证每天是否发生了 X 次刷牙事件。

Camera课程

Python教程

Java教程

Web教程

数据库教程

图形图像教程

办公软件教程

Linux教程

计算机教程

大数据教程

开发工具教程