在Python中的Kafka教程
在下面的教程中,我们将讨论Apache Kafka以及它在Python编程语言中的使用。
了解Apache Kafka
Apache Kafka是一个开源的流式平台,最初由LinkedIn设计。后来,它移交给了Apache Foundation并在2011年开源。
根据维基百科的定义:
Apache Kafka是由Apache Software Foundation开发的开源平台,用于处理流式数据。它使用Java和Scala编写。该项目的目标是提供一个高吞吐量、统一、低延迟的平台,以处理实时数据流。Apache Kafka的存储层实质上是一个”以分布式事务日志为基础的大规模可扩展的发布/订阅消息队列”,使其在处理流式数据的企业基础设施中非常有价值。此外,Kafka通过Kafka Connect连接到外部系统(用于导入和导出数据),并提供了一个用于Java流处理的Kafka Streams库。
我们可以把它看作一个巨大的提交日志,可以按照发生的顺序存储数据。该日志的使用者可以根据需要访问和利用它。
Apache Kafka的一些用例
我们可以在不同的地方使用Apache Kafka。让我们考虑一些Kafka的用例,这可以帮助我们了解它的使用情况:
- 活动监控: 我们可以使用Kafka来监控活动。这些活动可以来自物理传感器和设备,也可以来自网站。生产者可以发布来自数据源的原始数据,以便之后可以利用这些数据来发现趋势和模式。
- 消息传递: 我们还可以将Kafka用作服务之间的消息代理。如果我们正在实施微服务架构,我们可以有一个微服务作为生产者,另一个微服务作为消费者。例如,我们有一个负责创建新账户和向用户发送与账户创建相关的电子邮件的微服务。
- 日志聚合: 我们还可以利用Kafka从不同系统收集日志,并将其存储在一个集中式系统中进行进一步处理。
- ETL: Kafka提供了几乎实时流处理的特性,因此我们可以根据需求开发ETL。
- 数据库: 根据之前提到的内容,我们可以说Kafka也可以充当数据库。它不是一个具有按需求进行数据查询功能的典型数据库,但Kafka可以持久存储数据,直到我们需要使用它。
理解Kafka的概念
让我们讨论一下Kafka的核心概念。
- 主题: 在系统中输入的每条消息都必须属于某个主题。主题是一系列记录的流。消息以键值对的格式存储。每条消息都被指定一个称为偏移量的序列号。一条消息的结果可以作为另一条消息的输入进行进一步处理。
- 生产者: 生产者是负责将数据发布到Kafka系统的应用程序。它们在自己选择的主题上发布数据。
- 消费者: 有一些消费者应用程序使用发布到主题中的消息。消费者订阅它偏好的主题并消费数据。
- 代理: 代理是Kafka的一个实例,负责消息交换。我们可以将Kafka用作集群的一部分或者独立的机器。
现在,让我们考虑一个简单的示例,有一个餐厅的仓库,所有的原材料都存放在里面,比如蔬菜、大米、面粉等等。
这家餐厅提供各种类型的菜肴,比如印度菜、意大利菜、中国菜等等。每种菜系的厨师可以参考仓库来选择所需的物品并制作菜肴。不同菜系的厨师可能使用由原材料制成的相同的食材。这可能是用于各种菜肴的任何秘密配料。在这种情况下,仓库充当代理,商品的商家是生产者,商品和厨师制作的秘密配料是主题,厨师是消费者。
如何在Python中访问Kafka
Python编程语言中有各种库可用于使用Kafka。以下是其中一些库的描述:
序号 | 库 | 描述 |
---|---|---|
1 | Kafka-Python | 这是一个由Python社区设计的开源库。 |
2 | PyKafka | 这个库由Parsly维护,声称是一个Python风格的API。然而,与Kafka-Python不同,我们无法在这个库中创建动态主题。 |
3 | Confluent Python Kafka | 这个库由Confluent提供,作为librdkafka的薄包装。因此,它比上述两个库性能更好。 |
安装依赖项
我们将使用Kafka-Python进行此项目。因此,我们可以使用pip安装程序手动安装它,如下所示:
语法:
$ pip install kafka-python
现在,让我们开始构建这个项目。
项目代码
在下面的示例中,我们将创建一个生产者来生成从1到500的数字,并将它们发送到Kafka代理。然后,一个消费者将从代理中读取这些数据,并将它们保存在一个MongoDB集合中。
使用Kafka的一个好处是,如果一个消费者发生故障,另一个或修复后的消费者将继续从之前停止的地方读取。这是一种确保所有数据都被送入数据库而没有丢失数据或重复数据的好方法。
在下面的示例中,让我们创建一个名为produce.py的新的Python程序文件,并开始导入一些必需的库和模块。
文件:produce.py
# importing the required libraries
from time import sleep
from json import dumps
from kafka import KafkaProducer
解释:
在上面的代码片段中,我们导入了所需的库和模块。现在,让我们初始化一个新的Kafka生产者。注意以下参数:
- bootstrap_servers = [‘localhost: 9092’]: 此参数设置主机和端口,以便联系生产者以引导初始集群元数据。在这里设置此参数是不强制的,因为默认主机和端口是localhost: 9092。
- value_serializer = lambda x: dumps(x).encode(‘utf-8’): 此参数用于在将数据发送到代理之前对数据进行序列化。在这里,我们将数据转换为JSON文件并将其编码为UTF-8。
让我们考虑以下相同的代码片段。
文件:produce.py
# initializing the Kafka producer
my_producer = KafkaProducer(
bootstrap_servers = ['localhost:9092'],
value_serializer = lambda x:dumps(x).encode('utf-8')
)
Explanation:
在以上的代码片段中,我们使用 KafkaProducer() 函数初始化了Kafka生产者,其中我们使用了上面描述的参数。
现在,我们需要生成从1到500的数字。我们可以使用 for 循环来执行此操作,在循环中将每个数字作为字典的一个值,字典只有一个键:num。这个键仅仅作为数据的键,不是主题的键。在同一个循环中,我们还会将数据发送到代理。
我们可以通过调用生产者的发送方法并详细说明主题和数据来执行此操作。
注意: 值序列化器会自动转换和编码数据。
我们可以休息五秒钟来结束迭代。如果我们需要确认代理是否收到了消息,建议包含回调函数。
文件: produce.py
# generating the numbers ranging from 1 to 500
for n in range(500):
my_data = {'num' : n}
my_producer.send('testnum', value = my_data)
sleep(5)
解释:
在上面的代码片段中,我们使用 for 循环来迭代从一到500的数字。我们还在每次迭代之间增加了五秒的间隔。
如果有人想要测试这段代码,建议创建一个新的主题并将数据发送到该新生成的主题。这种方法将避免重复值和在测试生产者和消费者时可能出现的混淆。
消费数据
在开始编写消费者的代码之前,让我们创建一个新的Python程序文件,并将其命名为consume.py。我们将导入一些模块,如 json.loads,MongoClient 和 KafkaConsumer .由于 PyMongo 超出了本教程的范围,我们不会更深入地挖掘它的代码。
此外,根据需要,也可以将mongo代码替换为任何其他代码。我们可以编写代码以将数据输入到另一个数据库中,编写代码以处理数据,或者任何其他可以想到的事情。
让我们考虑以下的代码片段开始。
文件:consume.py
# importing the required modules
from json import loads
from kafka import KafkaConsumer
from pymongo import MongoClient
解释:
在以上代码片段中,我们从各自的库中导入了所需的模块。
让我们创建Kafka消费者。我们将使用 KafkaConsumer() 函数来完成此工作;所以让我们仔细看一下这个函数中使用的参数。
- 主题: KafkaConsumer() 函数的第一个参数是主题。在以下情况下,它是 testnum 。
- bootstrap_servers = [‘localhost: 9092’]: 这个参数和生产者是一样的。
- auto_offset_reset = ‘earliest’: 这个参数是在其他重要参数中之一。它处理消费者在关闭或中断后重新启动阅读的位置,我们可以将其设置为最新或最早的。每当我们将其设置为最早,消费者就开始从最新的提交偏移量处阅读。每当我们将其设置为最新时,消费者就开始从日志的末尾读取。这正是我们在这里需要的。
- enable_auto_commit = True: 这个参数确认消费者是否在每个间隔中提交其读取的偏移量。
- auto_commit_interval_ms = 1000ms: 此参数用于设置两次提交之间的时间间隔。由于每隔五秒钟就会有一条消息传入,每秒提交一次看起来是公平的。
- group_id = ‘counters’: 这个参数是消费者所属的消费者组。请注意,消费者必须是消费者组的一部分,才能使它们自动提交工作。
- 值 deserializer 用于将数据反序列化为一般的JSON格式,与值序列化器的工作相反。
让我们考虑下面的代码片段。
文件:consume.py
# generating the Kafka Consumer
my_consumer = KafkaConsumer(
'testnum',
bootstrap_servers = ['localhost : 9092'],
auto_offset_reset = 'earliest',
enable_auto_commit = True,
group_id = 'my-group',
value_deserializer = lambda x : loads(x.decode('utf-8'))
)
解释:
在上面的代码片段中,我们使用了 KafkaConsumer() 函数来生成Kafka消费者。我们还在函数中添加了我们之前学习的参数。
现在,让我们来看一下以下代码片段,以连接到MongoDB数据库的 testnum 集合(此集合类似于关系数据库中的表)。
文件:consume.py
my_client = MongoClient('localhost : 27017')
my_collection = my_client.testnum.testnum
说明:
在上述代码片段中,我们定义了一个变量 my_client ,它使用了 MongoClient() 函数,并且指定了主机和端口。然后,我们又定义了另一个变量 my_collection ,它使用了 my_client 变量来访问 testnum 主题中的数据。
可以通过循环来提取消费者中的数据(在这里,消费者可以被视为一个可迭代对象)。消费者会一直监听,直到代理不再回应。我们可以使用value属性来访问消息值。在这里,我们用消息值覆盖了消息。
下一行将数据插入到数据库集合中。最后一行将打印确认消息已添加到我们的集合中。
注意:可以将回调函数插入到此循环中的所有操作。
文件:consume.py
for message in my_consumer:
message = message.value
collection.insert_one(message)
print(message + " added to " + my_collection)
解释:
在上述代码片段中,我们使用了 for 循环来遍历消费者以提取数据。现在为了测试代码,可以先执行 produce.py 文件,然后再执行 consume.py 。