Kakajs the simplest way to use Kafka with Node JS

Kakajs the simplest way to use Kafka with Node JS

I am writing this based on my experience of using Kafka confluent with Node JS, let’s first check on Kafka

What is Apache Kafka?

Apache Kafka is a distributed data store optimized for ingesting and processing streaming data in real-time. Streaming data is data that is continuously generated by thousands of data sources, which typically send the data records in simultaneously. A streaming platform needs to handle this constant influx of data, and process the data sequentially and incrementally.

Kafka provides three main functions to its users:

  • Publish and subscribe to streams of records
  • Effectively store streams of records in the order in which records were generated
  • Process streams of records in real-time

Kafka is primarily used to build real-time streaming data pipelines and applications that adapt to the data streams. It combines messaging, storage, and stream processing to allow storage and analysis of both historical and real-time data

Kafka messages are persisted on the disk and replicated within the cluster to prevent data loss. Kafka is built on top of the ZooKeeper synchronization service. It integrates very well with Apache Storm and Spark for real-time streaming data analysis

In this blog, we are talking about how to connect and build your service with Kafka where the Kafka cluster is already there

![](https://miro.medium.com/max/1400/1*9P0g-p6CIzHiBUM_B7T3cA.png)

it's a simple picture we have Kafka Platform ready from https://confluent.cloud/ its Kafka platform provider from where we can buy this service and can start using it, it's like managed solutions provided by AWS

Now we can send or stream messages to Kafka where consumers can consume and react on that message, so what do we need for doing this, some library and Kafka connection details from https://confluent.cloud/

Let's check different options

node-rdkafka

(https://www.npmjs.com/package/node-rdkafka)

KafkaJS · KafkaJS, a modern Apache Kafka client for Node.js

KafkaJS, a modern Apache Kafka client for Node.js

KafkaJS, a modern Apache Kafka client for Node.jskafka.js.org

(https://kafka.js.org/)

I started with node-rdkafka and later I moved to kafkajs but why ??

  • Node.js version compatibility can cause problems with node-rdkafka.
  • Use the OS installation of librdkafka or build from source.
  • If you use node-rdkafka, you are bound to encounter compatibility issues as you upgrade the library or versions of Node.js. I recommend you use a system installation of librdkafka and the BUILD_LIBRDKAFKA=0 flag to prevent the recompilation of the library on npm install. Configuring Kafka can be complicated — https://rclayton.silvrback.com/thoughts-on-node-rdkafka-development
UnhandledPromiseRejectionWarning: Error: Unsupported value "sasl_ssl" for configuration property "security.protocol": OpenSSL not available at build time  
at Producer.Client (/Users/node_modules/node-rdkafka/lib/client.js:54:18)  
at new Producer (/Users/node_modules/node-rdkafka/lib/producer.js:75:10)
  • user compatible node version with node-rdkafka
  • possible error UnhandledPromiseRejectionWarning: Error: Unsupported value “sasl_ssl” for configuration property “security.protocol”: OpenSSL not available at build time fix is link OpenSSL properly
  • You can see if you can fix by linking open SSL properly
brew link openssl --force   
export LDFLAGS="-L/usr/local/opt/openssl@1.1/lib"   
export CPPFLAGS="-I/usr/local/opt/openssl@1.1/include"   
echo 'export PATH="/usr/local/opt/openssl@1.1/bin:$PATH"' >> ~/.zshrc   
npm rebuild node-rdkafka

So the conclusion is I am not a big fan of node-rdkafka

Here is the better solution Kafkajs

Getting Started · KafkaJS

Install KafkaJS using :

yarn add kafkajs
npm install kafkajs

Let's start by instantiating the KafkaJS client by…

kafka js is a native library without any node js binding so there will be no compatibility issues and no runtime errors

So with kafka js, lots of problems with integration are no more for the developers

const { Kafka } = require('kafkajs')

// This creates a client instance that is configured to connect to the Kafka broker provided by
// the environment variable KAFKA_BOOTSTRAP_SERVER
const kafka = new Kafka({
  clientId: 'qa-topic',
  brokers: ['xxxxxxxxx.confluent.cloud:9092'],
  ssl: true,
  logLevel: 2,
  sasl: {
    mechanism: 'plain',
    username: 'xxxxxxxxxxx',
    password: 'xxxxxxxxxx'
  }
})


const producer = kafka.producer()
producer.on('producer.connect', () => {
  console.log(`KafkaProvider: connected`);
});
producer.on('producer.disconnect', () => {
  console.log(`KafkaProvider: could not connect`);
});
producer.on('producer.network.request_timeout', (payload) => {
  console.log(`KafkaProvider: request timeout ${payload.clientId}`);
});
const run = async () => {
  // Producing
  await producer.connect()
  await producer.send({
    topic: 'supplier-ratings',
    messages: [
      {
        value: Buffer.from(JSON.stringify(
          {
            "event_name": "QA",
            "external_id": user_uuiD,
            "payload": {
              "supplier_id": i.supplier_id,
              "assessment": {
                "performance": 7,
                "quality": 7,
                "communication": 7,
                "flexibility": 7,
                "cost": 7,
                "delivery": 6
              }
            },
            "metadata": {
              "user_uuid": "5a12cba8-f4b5-495b-80ea-d0dd5d4ee17e"
            }
          }
        ))
      },
    ],
  })

  Consuming
  await consumer.connect()
  await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      })
    },
  })
}

run().catch(console.error)

References

Comments