在Python中使用InfluxDB
在寻找监控基础设施或第三方应用程序时,Telegraph的内置插件成为了一个很好的选择。或者我们正在搜索系统资源,如磁盘和网络利用率,或者MySQL数据库的性能。
但是,如果我们正在创建一个应用程序,希望将用户的数据存储在时间序列数据库中呢?也许我们可以将其视为物联网(IoT)或基于智能家居的应用程序,每个用户都需要访问其智能牙刷的读数。我们希望存储每次刷牙的时间和持续时间,可以发送提醒孩子刷牙的警报,并追踪诸如电池健康状况和当前刷头使用时间的事物。
无论是为面向用户的应用程序还是为Telegraf插件尚未覆盖的基础设施需求收集自定义数据,都可能需要编写一段新的代码。
让我们以智能牙刷为例,假设我们有一个运行嵌入式Linux并与蓝牙连接的基站。我们已经编写了监听传入数据的代码块,并且似乎运行良好;现在,我们想将其存入InfluxDB中。
一种方法是与应用程序一起运行Telegraf,并通过UDP、Unix或TCP套接字发送数据,让Telegraf处理与InfluxDB的连接以及批处理和写入数据点。
如果我们只需要进行数据收集,那么这种方法是完全可以的;然而,如果我们想查询并获取用户的数据,则可能需要利用InfluxDB在应用程序中提供的不同语言库之一来处理与InfluxDB的交互。
已经存在多种语言的InfluxDB库,其中许多由其社区管理。我们将了解使用influxdb-python库的用法。
因此,让我们开始吧。
了解InfluxDB Python客户端库
InfluxDB 是一个开源的时间序列数据库或TSDB,由名为 InfluxData 的公司设计和开发。它使用Go编程语言编写,用于存储和检索包括操作监控、物联网传感器数据、应用程序指标和实时分析在内的时间序列数据。它还提供了对Graphite的数据处理支持。
Influxdb-python 库作为Python客户端与InfluxDB进行交互。该库托管在InfluxDB的GitHub帐户上,并由三个社区志愿者进行维护。
安装库
我们可以使用pip安装程序来安装Python库的最简单方式。安装influxdb库的语法如下所示:
语法:
$ pip install influxdb
一旦安装完成,我们可以通过创建一个新的Python程序文件并输入以下代码片段来验证。
语法:
from influxdb import InfluxDBClient
现在,让我们保存文件并尝试执行它。如果没有发生错误,则表示库已正确安装。然而,如果出现任何异常,请尝试重新安装或参考官方文档。
创建连接
在下一步中,我们将使用要访问的服务器的相关信息创建一个新的InfluxDBClient实例。我们可以使用以下代码片段,将host和port的值替换为InfluxDB主机的适当URL/IP地址和端口。在下面的示例中,我们在默认端口上本地运行:
示例:
# importing the required module
from influxdb import InfluxDBClient
# defining the host and port
my_Client = InfluxDBClient(host = 'localhost', port = 8086)
说明:
在上面的代码片段中,我们从influxdb库中导入了InfluxDBClient模块。然后我们使用InfluxDBClient()函数为名为my_Client的变量定义了主机和端口,其中我们分别定义了host和port参数的值。
InfluxDBClient构造函数还可用于设置其他参数,包括用户名和密码、要连接的数据库、是否使用SSL、超时和UDP参数等。
如果我们需要连接到远程主机somedomain.com的8086端口,使用用户名(比如anonymous)和密码(比如somepass),并启用SSL,我们可以使用以下代码片段来启用SSL和SSL验证,使用两个其他参数ssl=True和ssl_verify=True:
示例:
# importing the required module
from influxdb import InfluxDBClient
# defining different entities
my_Client = InfluxDBClient(
host = 'somedomain.com',
port = 8086,
username = 'anonymous',
password = "somepass",
ssl = True,
verify_ssl = True
)
说明:
在上述代码段中,我们从influxdb库中导入了InfluxDBClient模块。然后使用该模块定义了host、port、username、password、ssl和verify_ssl,并将这些值存储到my_Client变量中。
现在,让我们创建一个名为mydatabase的新数据库,以便按如下所示存储数据:
示例:
# importing the required module
from influxdb import InfluxDBClient
# defining the host and port
my_Client = InfluxDBClient(
host = 'somedomain.com',
port = 8086,
username = 'anonymous',
password = "somepass",
ssl = True,
verify_ssl = True
)
# creating a database
my_Client.create_database('mydatabase')
说明:
在上面的代码片段中,我们使用 my_Client 的 create_database 方法创建了一个名为 mydatabase 的新数据库。
我们可以使用 my_Client 的 get_list_database() 函数来检查数据库是否已创建,如下所示:
示例:
# importing the required module
from influxdb import InfluxDBClient
# defining the host and port
my_Client = InfluxDBClient(
host = 'somedomain.com',
port = 8086,
username = 'anonymous',
password = "somepass",
ssl = True,
verify_ssl = True
)
# creating a database
my_Client.create_database('mydatabase')
# verifying if the database is created or not
my_Client.get_list_database()
输出:
[{'name': 'telegraf'}, {'name': '_internal'}, {'name': 'mydatabase'}]
说明:
在上述的代码片段中,我们使用了 get_list_database() 函数来验证数据库是否已创建。结果是,我们可以看到名为 mydatabase 的数据库存在,以及我们安装中的 telegraf 和 _internal 数据库。
最后,我们可以使用以下代码片段将客户端设置为使用此数据库:
示例:
# importing the required module
from influxdb import InfluxDBClient
# defining the host and port
my_Client = InfluxDBClient(
host = 'somedomain.com',
port = 8086,
username = 'anonymous',
password = "somepass",
ssl = True,
verify_ssl = True
)
# creating a database
my_Client.create_database('mydatabase')
# setting client to use specified database
my_Client.switch_database('mydatabase')
说明:
在上面的代码片段中,我们使用了 switch_database 来设置客户端使用指定的数据库,即 mydatabase。
插入数据
现在我们有了一个要写入数据的数据库并且配置了客户端,现在是时候添加一些数据了。我们将使用客户端的 write_points() 方法来做这件事。该方法接受一个点列表和一些其他参数,包括 “批量大小”(batch size), 这使我们能够批量插入数据,而不是一次性插入。我们可以使用这个方法插入大量数据。
write_points() 方法有一个名为 my_points 的参数,它是一个包含需要写入数据库的点的字典列表。现在让我们创建一些示例数据并将其插入。首先,让我们将三个点以JSON格式插入到一个名为 json_body 的变量中,如下面的代码片段所示:
示例:
json_body = [
{
"measurement": "brushEvents",
"tags": {
"user": "Derek",
"brushId": "6a89f539-71c6-490d-a28d-6c5d84c0ee2f"
},
"time": "2021-08-04T8:01:00Z",
"fields": {
"duration": 147
}
},
{
"measurement": "brushEvents",
"tags": {
"user": "Derek",
"brushId": "6a89f539-71c6-4xx90d-a28d-6c5d84c0ee2f"
},
"time": "2021-08-05T8:04:00Z",
"fields": {
"duration": 131
}
},
{
"measurement": "brushEvents",
"tags": {
"user": "Derek",
"brushId": "6a89f539-71c6-490d-a28d-6c5d84c0ee2f"
},
"time": "2021-08-06T8:02:00Z",
"fields": {
"duration": 124
}
}
]
解释:
上述代码片段表示智能牙刷的 “刷牙事件” ,每个事件都发生在早上八点左右,标有使用牙刷的用户名和牙刷本身的ID(帮助我们跟踪使用同一刷头的时间),并包含用户使用牙刷的时长(以秒为单位)的字段。
既然我们已经设置了数据库,并且 write_points() 的默认输入为JSON,我们可以通过使用 json_body 变量作为唯一参数来调用该方法,如下所示:
示例:
my_Client.write_points(json_body)
输出:
True
解释:
在上面的代码片段中,我们使用了 write_points() 函数,并且将 json_body 变量作为参数传递进去。作为结果,如果写入操作成功,函数返回一个布尔值 true 。如果我们创建一个应用程序,我们需要自动收集这些数据,每当用户尝试与牙刷进行交互时,将数据插入到数据库中。
查询数据
一旦数据存储在数据库中,我们可以尝试使用一些查询来获取它。我们将使用与写入数据时相同的client对象,只是这次我们会在InfluxDB上执行一个查询,并使用client的 query() 函数获取结果。
示例:
my_Client.query('SELECT "duration" FROM "mydatabase"."autogen"."brushEvents" WHERE time > now() - 4d GROUP BY "user"')
解释:
在上面的代码片段中,我们使用了返回包含输出数据以及一些方便方法的 ResultSet 对象的 query() 函数。我们的查询请求的是 mydatabase 数据库中所有用户的测量数据,按用户分组。我们可以使用参数 .raw 来访问InfluxDB返回的原始JSON响应。
示例:
results.raw
输出:
{'statement_id': 0, 'series': [{'name': 'brushEvents', 'tags': {'user': 'Derek'}, 'columns': ['time', 'duration'], 'values': [['2021-08-04T08:01:00Z', 147], ['2021-08-05T08:04:00Z', 131], ['2018-08-06T08:02:00Z', 124]]}]}
说明:
在上面的代码片段中,我们使用了raw参数来访问从InfluxDB返回的原始JSON响应。在大多数情况下,我们不需要直接访问JSON。相反,我们可以利用ResultSet的get_points()方法来从请求中获取测量结果,通过标签或字段进行过滤。如果我们想遍历Derek的所有刷牙会话,我们可以使用以下命令获取所有在标签”user”下值为”Derek”的点集合:
示例:
my_points = results.get_points(tags = {'user':'Derek'})
说明:
在上面的示例中, my_points 变量是一个Python生成器,它是一个类似迭代器的函数;我们可以使用 for x in y 循环对其进行迭代,如下所示:
示例:
my_points = results.get_points(tags = {'user': 'Derek'})
for my_point in my_points:
print("Time: %s, Duration: %i" % (my_point['time'], my_point['duration']))
输出:
Time: 2021-08-04T08:01:00Z, Duration: 147
Time: 2021-08-05T08:04:00Z, Duration: 131
Time: 2021-08-06T08:02:00Z, Duration: 124
解释:
在上面的代码片段中,我们使用for循环打印了用户每次刷牙时间和持续时间。根据应用程序的不同,我们可以迭代这些点来计算用户的平均刷牙时间,或者仅仅验证每天是否发生了 X 次刷牙事件。