Nodejs Kafka 客户端

来自百合仙子's Wiki
跳转到导航 跳转到搜索

no-kafka

一个不好用的 Kafka 客户端,文档三言两语、语焉不详。没弄明白怎么继续从上次位置处理,如果没有上次位置则使用最新的位置。

kafka-node

consumer 的用法

#!/usr/bin/env node

'use strict'

const util = require('util')

const Kafka = require('kafka-node')

async function main() {
  const consumerStream = new Kafka.ConsumerGroupStream({
    kafkaHost: 'IP1:PORT,IP2:PORT',
    groupId: 'node-test',
  }, 'topic or topiclist')

  consumerStream.on('data', function(msg) {
    console.log(msg.value)
  })

  consumerStream.resume()
  const p = new Promise(function(resolve, reject) {
    consumerStream.on('close', function() {
      resolve()
    })
    consumerStream.on('error', function(e) {
      reject(e)
    })
  })
  await p
}

process.on('unhandledRejection', function(e) {
  console.error('unhandled rejection %s: %s', util.inspect(e), e.stack)
  process.exit(1)
})

main().then(() => console.log('done'))