node.jsapache-kafkakafkajs

Consumer gives "This server does not host this topic-partition" error


Im using Kafka with Raft mode with 3 brokers and 3 controllers. While my fastapi services are working without any problem, my nodejs services are giving me headaches. At the start of the service, first I'm trying to create topics and partition them and then starting the consumer. But on creating the topics I get this error and sometimes consumer does not subscribe them and the service lose the messages until restart.

This is the error I'm getting when createTopicsAndPartitions being invoked:

Trace: KafkaJSProtocolError: This server does not host this topic-partition
    at createErrorFromCode (/home/furkan/pms/backend/service_rbac/node_modules/kafkajs/src/protocol/error.js:581:10)
    at Object.parse (/home/furkan/pms/backend/service_rbac/node_modules/kafkajs/src/protocol/requests/metadata/v0/response.js:55:11)
    at Connection.send (/home/furkan/pms/backend/service_rbac/node_modules/kafkajs/src/network/connection.js:433:35)
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
    at async [private:Broker:sendRequest] (/home/furkan/pms/backend/service_rbac/node_modules/kafkajs/src/broker/index.js:904:14)
    at async Broker.metadata (/home/furkan/pms/backend/service_rbac/node_modules/kafkajs/src/broker/index.js:177:12)
    at async retryOnLeaderNotAvailable.delay (/home/furkan/pms/backend/service_rbac/node_modules/kafkajs/src/admin/index.js:175:55)
    at async callback (/home/furkan/pms/backend/service_rbac/node_modules/kafkajs/src/admin/index.js:36:14)
    at async checkCondition (/home/furkan/pms/backend/service_rbac/node_modules/kafkajs/src/utils/waitFor.js:21:22) {
  retriable: true,
  helpUrl: undefined,
  type: 'UNKNOWN_TOPIC_OR_PARTITION',
  code: 3,
  [cause]: undefined
}

This is my consumer config.

const kafka = new Kafka({
  clientId: "wins_rbac",
  brokers: config.KAFKA_HOST.split(',').map((x) => x.trim()),
  sasl: {
    mechanism: 'plain',
    username: config.KAFKA_USER,
    password: config.KAFKA_PWD,
  }
});
const consumer = kafka.consumer({
  groupId: "wins_rbac-business-consumer-cloud-1",
});
const startConsumer = async () => {
  let topicList = ["not_important"]
  if (config.IS_CLOUD) {
    topicList = [...topicList, "sem.initialize.database",]
  }

  await createTopicsAndPartitions(topicList, kafka)
  
  await consumer.connect();
  await consumer.subscribe({
    topics: topicList,
    fromBeginning: false,
  });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      try {
        switch (topic) {
          case "sem.initialize.database":
            return await initializeDatabase(message)
        
        }
      } catch (error) {
        console.trace(error);
      }
    },
  });

  const restart = async () => {
    try {
      await consumer.stop()
      await startConsumer(true)
      logger.info("Consumer has been restarted.")
    } catch (error) {
      logger.error(console.trace(error))
    }
  }
  const shutdown = async () => {
    logger.info("Shutting down the Kafka consumer...");
    try {
      await consumer.disconnect();
      logger.info("Kafka consumer disconnected");
      process.exit(0);
    } catch (error) {
      console.error("Error while disconnecting Kafka consumer:", error);
      process.exit(1);
    }
  };
  // Register signal handlers for SIGINT (Ctrl+C) and SIGTERM (termination) events
  process.on("SIGINT", shutdown);
  process.on("SIGTERM", shutdown);
  process.on("RESTART_CONSUMER", restart);
};

This is the function which tries to create topics.

const createTopicsAndPartitions = async (topicList, kafka, topics = null) => {
  let admin;
  topics = topics ? topics : topicList.map((x) => (
    {
      topic: x,
      numPartitions: x === "sem.initialize.database" ? 1 : 3,
      replicationFactor: 3
    }
  ))
  try {
    admin = kafka.admin()
    await admin.connect()
    await admin.createTopics({
      topics: topics
    })
  } catch (error) {
    console.trace(error)
    if (error.code = 3)
      setTimeout(async () => {
    logger.warn("")
        await createTopicsAndPartitions(null, kafka, topics)
      }, 5000)
  }
  finally {
    admin.disconnect()
  }
}

I tried to block the script right after topic creation but I couldn't manage it. I'm guessing this is happening because consumer is trying to subscribe to topics without waiting them to be created. Any help is much appreciated.


Solution

  • I changed createTopicsAndPartitions in the way that's shown below. It first checks the metadata and gets the topic list and if the list is empty then its creating the topics. If there's an error, it tries again.

    const createTopicsAndPartitions = async (topicList, kafka, attempt = 4) => {
      let admin;
    
      try {
        admin = kafka.admin()
        await admin.connect()
        const metadata = await admin.fetchTopicMetadata()
        let existingTopics = metadata.topics.map((x) => x.name)
        if (existingTopics.length > 0) {
          const nonExistingTopcis = topicList.filter((x) => !existingTopics.includes(x))
          const topics = nonExistingTopcis.length > 0 && nonExistingTopcis.map((x) => (
            {
              topic: x,
              numPartitions: x === 'xxx' ? 3 : 1,
              replicationFactor: 3
            }
          ))
          if (topics.length > 0) {
            return await admin.createTopics({
              topics: topics
            }).then(() => {
              logger.info("Topics have been created successfully.")
            }).catch((err) => {
              throw err
            }
            )
          }
          else logger.info("Topics have been created already.")
    
        }
    
      } catch (error) {
        if (error.code !== 3) console.trace(error)
        if (attempt === 0) {
          logger.error("Max retries reached. Unable to create topics")
          throw new Error("Topic creation failed after maximum retries...")
        }
        logger.error(`${error.type}. Retrying... Attempt Number: ${5 - attempt}`)
        await new Promise((res) => setTimeout(res, 1000))
        return createTopicsAndPartitions(topicList, kafka, --attempt)
      }
      finally {
        if (admin) admin.disconnect()
      }
    }