下面是详细讲解“node连接kafka2.0实现方法示例”的完整攻略。
kafka 是由 Apache 软件基金会开发的一个分布式流处理平台。它由 Scala 和 Java 写成。Kafka 是一个强大、高吞吐量的分布式系统,它可以处理海量的消息,并且提供了很好的消息存储和查询能力。Node.js 中有多个 kafka client 库可供使用,本文主要介绍 node-rdkafka 库。
node-rdkafka 是部分 C++ 代码编写的 Node.js 模块。该模块可用于连接和操作 Kafka 的 broker。使用 node-rdkafka 库的优点在于,它使用 Kafka 的 C++ 驱动程序,因此性能非常好,支持高并发场景。
使用 node-rdkafka 库之前,首先需要安装 node-gyp
工具。
npm install -g node-gyp
接着,安装 node-rdkafka:
npm install node-rdkafka
package.json
的目录中创建一个新文件,例如 consumer.js
。将以下内容复制到文件中:const Kafka = require('kafkajs').Kafka;
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
})
const producer = kafka.producer()
const consumer = kafka.consumer({ groupId: 'test-group' })
const test = async () => {
await producer.connect()
await producer.send({
topic: 'test-topic',
messages: [
{ value: 'Hello KafkaJS user!' },
],
})
await producer.disconnect()
await consumer.connect()
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
value: message.value.toString(),
})
},
})
}
test();
上述代码包含以下操作:
Kafka
实例producer
对象consumer
对象消费者连接到同一主题,从 beginning 开始消费,并在每次收到数据时调用异步函数。
执行文件 consumer.js
node consumer.js
{ value: 'Hello KafkaJS user!' }
如果要生产者能够正常的发送消息,您需要创建具备写入权限的主题。如果您未创建主题,则可以在 Kafka quick start guide 中找到有关如何创建主题的指南。
本文链接:http://task.lmcjl.com/news/7907.html