Confluent Kafka Python生产者介绍

Confluent Kafka Python生产者介绍

如今,数据是数字生态系统的重要组成部分,每个现代应用程序都依赖于其有效的管理和处理。针对这个数据驱动的时代,Apache Kafka是一个功能强大的事件流技术,提供了高吞吐量的解决方案。可以使用Confluent的Python客户端将这些强大的功能无缝集成到您的Python应用程序中。本文详细介绍了Confluent Kafka Python生产者,并提供了有用的示例供您开始使用。

Confluent Kafka Python生产者是什么

Confluent Kafka Python生产者是Confluent的Kafka Python客户端库的一个组件,提供了一种Pythonic的接口,用于访问Apache Kafka强大的数据流功能。结合Kafka消费者,它使Python程序能够完全参与基于Kafka的分布式系统,从而能够向Kafka主题中生产数据。

开始使用Confluent Kafka Python生产者

可以使用Python的软件包安装工具Pip来安装Confluent Kafka Python生产者。要安装,请执行以下命令−

pip install confluent-kafka

在安装了Kafka Producer之后,您可以在Python脚本中导入它

from confluent_kafka import Producer

将Confluent Kafka Python Producer投入使用

现在让我们探索如何使用Confluent Kafka Python Producer将消息发送到Kafka。

示例1: 生产一个简单的消息

如何创建对Kafka主题的直接响应如下所示−

from confluent_kafka import Producer

p = Producer({'bootstrap.servers': 'localhost:9092'})

p.produce('mytopic', 'Hello, Kafka!')

p.flush()

这个脚本通过创建一个Kafka Producer来与本地的Kafka broker建立连接。为了确保消息已发送,它首先将消息”Hello, Kafka!”发送到主题”mytopic”,然后再刷新Producer的消息队列。

示例2: 处理消息传递报告

另外,Confluent Kafka Producer可以报告消息传递到他们的主题的成功情况。

from confluent_kafka import Producer

def delivery_report(err, msg):
   if err is not None:
      print(f'Message delivery failed: {err}')
   else:
      print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

p = Producer({'bootstrap.servers': 'localhost:9092'})

p.produce('mytopic', 'Hello, Kafka!', callback=delivery_report)

p.flush()

在这里,当调用回调函数 delivery_report 时,给定了消息,这是 produce 方法的一部分。

示例3: 生产键-值消息

Kafka 消息经常同时包含键和值。创建键-值消息的方法如下:

from confluent_kafka import Producer

p = Producer({'bootstrap.servers': 'localhost:9092'})

p.produce('mytopic', key='mykey', value='myvalue')

p.flush()

这个脚本使用键”mykey”和”myvalue”为主题”mytopic”生成一条消息。

示例4: 生成Avro消息

通过数据序列化技术Avro的帮助,您可以加密消息的模式。这在创建将由各种消费者消耗的主题的通信时特别有帮助,每个消费者可能需要不同的格式。要创建Avro消息,请按照以下步骤进行−

from confluent_kafka import avro, Producer
from confluent_kafka.avro import AvroProducer

value_schema = avro.load('value_schema.avsc')
key_schema = avro.load('key_schema.avsc')
value = {"name": "Value"}
key = {"name": "Key"}

avroProducer = AvroProducer({
   'bootstrap.servers': 'localhost:9092', 
   'schema.registry.url': 'http://127.0.0.1:8081'
   }, default_key_schema=key_schema, default_value_schema=value_schema)

avroProducer.produce(topic='my_topic', value=value, key=key)
avroProducer.flush()

此脚本使用符合提供的Avro模式的密钥和值创建了一个名为”my_topic”的主题的消息。

示例5: 配置消息压缩

为了节省带宽,您可以将Kafka生产者设置为在发送消息之前压缩它们。以下是一个示例 –

from confluent_kafka import Producer

p = Producer({
   'bootstrap.servers': 'localhost:9092',
   'compression.type': 'gzip',
})

p.produce('mytopic', 'Hello, Kafka!')
p.flush()

这个脚本创建一个使用gzip压缩消息的Kafka生产者,然后将其发送到主题。

结论

Confluent的Kafka Python生产者是一个强大且高度适应的解决方案,可以使Python应用程序充分利用Kafka强大的数据流功能。无论是构建复杂的分布式系统还是只需可靠的数据流,它都是必不可少的工具。

本文详细介绍了从安装到在Python应用程序中实际使用的所有内容。已经详细介绍了五个示例:构建简单消息、管理传递报告、生产键值消息、构建Avro消息和自定义消息压缩。

但请记住,Confluent的Kafka Python生产者提供的功能远不止本书所涵盖的。我们建议参阅官方的Confluent文档,并继续进行高级用法的实验,例如与Kafka Streams集成或开发自定义序列化程序。

Camera课程

Python教程

Java教程

Web教程

数据库教程

图形图像教程

办公软件教程

Linux教程

计算机教程

大数据教程

开发工具教程