Practical examples and guidance for using Apache Kafka from Node.js and TypeScript via the kafkajs library. Covers client setup, producer, consumer, consumer groups, partitions, transactions, admin client, and error handling. Use when the user asks about Kafka integration with Node.js or TypeScript.
This skill documents idiomatic use of Kafka in the Node.js ecosystem. It
focuses on the kafkajs client, which is well-maintained, fully typed and
simple to use.
npm install kafkajs
# or yarn add kafkajs
The package ships with its own TypeScript definitions so no extra @types
package is needed.
import { Kafka } from 'kafkajs';
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});
You can provide SSL options, SASL auth, and other configuration documented in
the library. Keep a single Kafka instance per application and derive
producers/consumers from it.
async function produce(topic: string, messages: Array<{ key?: string; value: string }>) {
const producer = kafka.producer();
await producer.connect();
await producer.send({
topic,
messages
});
await producer.disconnect();
}
// usage
await produce('orders', [{ key: 'order-1', value: JSON.stringify(order) }]);
send returns a record metadata object describing partition offsets.send in try/catch and optionally enable
idempotence in broker config.async function consume(topic: string, groupId: string) {
const consumer = kafka.consumer({ groupId });
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
key: message.key?.toString(),
value: message.value?.toString(),
headers: message.headers
});
}
});
}
// usage
await consume('orders', 'order-service');
fromBeginning controls whether to start at earliest or latest offset.eachBatch is available if you need batch processing.kafkajs exports types such as Kafka, Producer, Consumer,
Message, ISerializer etc. You can annotate your wrappers accordingly:
import { Producer, Message } from 'kafkajs';
async function sendMessage(producer: Producer, topic: string, msg: Message) {
await producer.send({ topic, messages: [msg] });
}
consumer.on('consumer.group_join', ...) events when partitions move; ensure state is checkpointed.await producer.send in a tight loop without backpressure, or use the high-level sendBatch.producer.on('producer.crash', ...) and
consumer.on('consumer.crash', ...) to log problems.const tx = await producer.transaction(); then use
tx.send, tx.commit(), tx.abort() for atomic multi-topic writes.const admin = kafka.admin(); await admin.connect(); allows
topic creation, metadata queries, and ACL management.compression option per message to CompressionTypes.GZIP,
CompressionTypes.Snappy, or CompressionTypes.LZ4.consumer.run({ eachBatch }) for lower overhead when
processing many messages at once.const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka:9093'],
ssl: true,
sasl: {
mechanism: 'plain', // 'scram-sha-256' | 'scram-sha-512' | 'aws'
username: process.env.KAFKA_USER!,
password: process.env.KAFKA_PASSWORD!,
},
});
For mTLS pass ssl: { ca, cert, key } instead of SASL.
KAFKA_BROKERS=kafka1:9092,kafka2:9092
KAFKA_CLIENT_ID=my-service
KAFKA_GROUP_ID=my-service-group
KAFKA_USER=myuser
KAFKA_PASSWORD=secret
const kafka = new Kafka({
clientId: process.env.KAFKA_CLIENT_ID || 'app',
brokers: (process.env.KAFKA_BROKERS || 'localhost:9092').split(','),
});
Always disconnect producers and consumers on process signals to flush in-flight messages and release partition assignments:
const errorTypes = ['unhandledRejection', 'uncaughtException'];
const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2'];
errorTypes.forEach(type => process.on(type, async () => {
await consumer.disconnect();
process.exit(0);
}));
signalTraps.forEach(type => process.once(type, async () => {
await consumer.disconnect();
process.kill(process.pid, type);
}));
(async () => {
const kafka = new Kafka({ clientId: 'app', brokers: ['localhost:9092'] });
const producer = kafka.producer();
await producer.connect();
await producer.send({ topic: 'test', messages: [{ value: 'hello kafka' }] });
await producer.disconnect();
const consumer = kafka.consumer({ groupId: 'test-group' });
await consumer.connect();
await consumer.subscribe({ topic: 'test', fromBeginning: true });
await consumer.run({ eachMessage: async ({ message }) => {
console.log(message.value?.toString());
} });
})();