I've been studying kafkajs and socket.io I'm am very new to it and i cant seem to understand some things. I have created a chat application that basically by opening a browser(client) you can type messages and they get displayed in a chat-window. I found a tutorial that makes kafka print "this message + i". I want to instead of sending to the topic and printing message+i to print what people type in chat and I'm not sure how I'm supposed to do that.
This is my consumer.js:
const { Kafka } = require("kafkajs")
const clientId = "my-app"
const brokers = ["localhost:9092"]
const topic = "message-log"
const kafka = new Kafka({ clientId, brokers })
// create a new consumer from the kafka client, and set its group ID
// the group ID helps Kafka keep track of the messages that this client
// is yet to receive
const consumer = kafka.consumer({ groupId: clientId })
const consume = async () => {
// first, we wait for the client to connect and subscribe to the given topic
await consumer.connect()
await consumer.subscribe({ topic })
await consumer.run({
// this function is called every time the consumer gets a new message
eachMessage: ({ message }) => {
// here, we just log the message to the standard output
console.log(`received message: ${message.value}`)
module.exports = consume
This is my producer.js:
// import the `Kafka` instance from the kafkajs library
const { Kafka } = require("kafkajs")
// the client ID lets kafka know who's producing the messages
const clientId = "my-app"
// we can define the list of brokers in the cluster
const brokers = ["localhost:9092"]
// this is the topic to which we want to write messages
const topic = "message-log"
// initialize a new kafka client and initialize a producer from it
const kafka = new Kafka({ clientId, brokers })
const producer = kafka.producer()
// we define an async function that writes a new message each second
const produce = async () => {
await producer.connect()
let i = 0
// after the produce has connected, we start an interval timer
setInterval(async () => {
try {
// send a message to the configured topic with
// the key and value formed from the current value of `i`
await producer.send({
messages: [
key: String(i),
value: "this is message " + i,
// if the message is written successfully, log it and increment `i`
console.log("writes: ", i)
} catch (err) {
console.error("could not write message " + err)
}, 1000)
module.exports = produce
I know I'm supposed to somehow connect the topics brokers and clients with the socket.io but I'm not sure how.
Here is my chat.js:
/* Kane connection sto server opos prin
exw tin ikanotita na xrhsimopoihsw to io logo tou library pou phra apo to documentation*/
var socket = io.connect('http://localhost:8000');
// linking Variables toy indexhtml
var message = document.getElementById('message');
var username = document.getElementById('username');
var btn = document.getElementById('send');
var output = document.getElementById('output');
var feedback = document.getElementById('feedback');
// Stelnw events pou ginonte apo ton xristi kai stelnonte ston server
btn.addEventListener('click', function(){
socket.emit('chat', {
message: message.value,
username: username.value
message.value = "";
message.addEventListener('keypress', function(){
socket.emit('typing', username.value);
// Events wste na perimenw to data apo ton server
socket.on('chat', function(data){
feedback.innerHTML = '';
output.innerHTML += '<p><strong>' + data.username + ': </strong>' + data.message + '</p>';
socket.on('typing', function(data){
feedback.innerHTML = '<p><em>' + data + ' is typing a message...</em></p>';
You'll need a socket.io server.
const consume = require('consumer.js');
const produce = require('producer.js');
const { Server } = require('socket.io');
const io = new Server();
consume(({ from, to, message }) => {
io.sockets.emit('newMessage', { from, to, message });
io.on('connection', function(socket) {
socket.emit('Hi!', { message: 'Chat connected', id: socket.id });
socket.on('sendMessage', ({ message, to }) => {
produce({ from: socket.id, to, message });
You also need to modify your consumer & producer to accept parameters and callbacks.
Consumer Example:
const consume = async cb => {
// first, we wait for the client to connect and subscribe to the given topic
await consumer.connect()
await consumer.subscribe({ topic })
await consumer.run({
// this function is called every time the consumer gets a new message
eachMessage: ({ from, to, message }) => {
cb({ from, to, message });
Producer Example:
const produce = async ({ from, to, message }) => {
producer.send(topic, { from, to, message });
Don't forget to modify your chat.js on the client side
All of this can be optimized and is just a brief example