phplaravelapache-kafkalibrdkafka

Using multiple consumer in rdkafka laravel


I have used Rdkafka in laravel to insert record to mongodb. First i'm using 2 broker and 1 consumer and i have result 121000 record per minutes. Want more consumer to consumer message from producer, by duplicate consumer in code and its seems no work. How can i add more consumer to handle more request insert?.

Here is my code consumer.

    <?php
           
        public function handle()
        {
            $queue = env('KAFKA_QUEUE');
            $conf = new \RdKafka\Conf();
            $conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
                switch ($err) {
                    case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                        echo "Assign: ";
                        $kafka->assign($partitions);
                        break;
    
                    case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                        echo "Revoke: ";
                        $kafka->assign(NULL);
                        break;
    
                    default:
                        throw new Exception($err);
                }
            });
            $conf->set('group.id', env('KAFKA_CONSUMER_GROUP_ID', 'laravel_queue'));
            $conf->set('metadata.broker.list', env('KAFKA_BROKERS', 'localhost:9092'));
            $conf->set('auto.offset.reset', 'largest');
            $conf->set('enable.auto.commit', 'true');
            $conf->set('auto.commit.interval.ms', '101');
    
            $consumer = new \RdKafka\KafkaConsumer($conf);
            $consumer->subscribe([$queue]);
            
            $consumer2 = new \RdKafka\KafkaConsumer($conf);
            $consumer2->subscribe([$queue]);
    
            while (true) {
                $message = $consumer->consume(120 * 1000);
                $message2 = $consumer2->consume(120 * 1000);
                switch ($message->err) {
                    case RD_KAFKA_RESP_ERR_NO_ERROR:
                        $this->handle($message);
                        $this->handle($message2);
                        break;
                    case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                        $this->line("'[' . date('H:i:s') . \"][partition {$message->partition}] No more messages; will wait for more [key: '{$message->key}' offset: {$message->offset}]\n");
                        break;
                    case RD_KAFKA_RESP_ERR__TIMED_OUT:
                        $this->line('[' . date('H:i:s') . "] Timed out \n");
                        break;
                    default:
                        throw new \Exception($message->errstr(), $message->err);
                        break;
                }
            }
        }
    
        public function handle($data)
        {
            Bill::insert(json_decode($data->payload, true));
    //        foreach ($data as $bill) {
    //            dump($bill);
    //            Bill::insert($bill);
    //        }
        }
    }

And my code producer

<?php

    public function handle()
    {
        DB::table('bill')->orderBy('id', 'desc')->chunk(20, function ($bills) {
            $data = [];
            foreach ($bills as $bill) {
                $data[] = [
                    'id' => $bill->id,
                    'data1' => $bill->data1 ?? '',
                ];
                $service = new KafkaService($data);
                $service->handle();
            }

        });
        $this->info('DONE');
    }

And my docker-compose

version: "3.7"
services:
  app:
    build:
      args:
        user: www
        uid: 1000
      context: ./
      dockerfile: Dockerfile
    image: laravel-image
    container_name: app
    restart: unless-stopped
    working_dir: /var/www/
    privileged: true
    volumes:
      - ./:/var/www
    networks:
      - app

  zookeeper:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper
    extra_hosts:
      - "moby:127.0.0.1"
    ports:
      - "12181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 12181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - app

  broker:
    image: confluentinc/cp-kafka
    hostname: broker
    extra_hosts:
      - "moby:127.0.0.1"
    depends_on:
      - zookeeper
    ports:
      - '9092:9092'
    volumes:
      - ./docker/kafka-logs:/tmp/kafka-logs
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:12181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:19092,HOST://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:19092,HOST://localhost:9092
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    networks:
      - app

  broker2:
    image: confluentinc/cp-kafka
    hostname: broker
    extra_hosts:
      - "moby:127.0.0.1"
    depends_on:
      - zookeeper
    ports:
      - '29092:29092'
    volumes:
      - ./docker/kafka-logs:/tmp/kafka-logs
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:12181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:39092,HOST://0.0.0.0:29092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker2:39092,HOST://localhost:29092
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    networks:
      - app

  nginx:
    image: nginx:alpine
    restart: unless-stopped
    container_name: nginx
    ports:
      - 8000:80
    volumes:
      - ./:/var/www
      - ./docker-compose/nginx:/etc/nginx/conf.d/
    networks:
      - app    

  mongo:
    image: mongo
    hostname: mongo
    container_name: mongo
    ports:
      - "27029:27017"
    volumes:
      - mongo_data:/data/db
    restart: unless-stopped
    networks:
      - app         

networks:
  app:
    driver: bridge

volumes:     
  mongo_data:
    driver: local

And i got error when run cron job enter image description here


Solution

  • Your compose file has two Kafka containers with hostname: broker. Your logs indicate it's failing to connect to at least one of them, probably it's confused which is correct one.

    You do not really need two brokers, so I suggest simplifying your setup. Especially, given that your replication factor variables are all set to 1. Also, brokers only use variables starting with KAFKA_ (not KAFKA_CFG_, though; this isn't use by Confluent images at all), so unclear why you've added CONNECT_ ones there.

    Then, your PHP service should connect to broker:19092 as KAFKA_BROKERS variable, not 29092 or 39092.

    The above should solve your connection attempts, but increasing consumption speed is limited by partitions, not number of brokers. You've not mentioned how many partitions you have.