I have used Rdkafka in laravel to insert record to mongodb. First i'm using 2 broker and 1 consumer and i have result 121000 record per minutes. Want more consumer to consumer message from producer, by duplicate consumer in code and its seems no work. How can i add more consumer to handle more request insert?.
Here is my code consumer.
<?php
public function handle()
{
$queue = env('KAFKA_QUEUE');
$conf = new \RdKafka\Conf();
$conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
echo "Assign: ";
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
echo "Revoke: ";
$kafka->assign(NULL);
break;
default:
throw new Exception($err);
}
});
$conf->set('group.id', env('KAFKA_CONSUMER_GROUP_ID', 'laravel_queue'));
$conf->set('metadata.broker.list', env('KAFKA_BROKERS', 'localhost:9092'));
$conf->set('auto.offset.reset', 'largest');
$conf->set('enable.auto.commit', 'true');
$conf->set('auto.commit.interval.ms', '101');
$consumer = new \RdKafka\KafkaConsumer($conf);
$consumer->subscribe([$queue]);
$consumer2 = new \RdKafka\KafkaConsumer($conf);
$consumer2->subscribe([$queue]);
while (true) {
$message = $consumer->consume(120 * 1000);
$message2 = $consumer2->consume(120 * 1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$this->handle($message);
$this->handle($message2);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
$this->line("'[' . date('H:i:s') . \"][partition {$message->partition}] No more messages; will wait for more [key: '{$message->key}' offset: {$message->offset}]\n");
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
$this->line('[' . date('H:i:s') . "] Timed out \n");
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
}
public function handle($data)
{
Bill::insert(json_decode($data->payload, true));
// foreach ($data as $bill) {
// dump($bill);
// Bill::insert($bill);
// }
}
}
And my code producer
<?php
public function handle()
{
DB::table('bill')->orderBy('id', 'desc')->chunk(20, function ($bills) {
$data = [];
foreach ($bills as $bill) {
$data[] = [
'id' => $bill->id,
'data1' => $bill->data1 ?? '',
];
$service = new KafkaService($data);
$service->handle();
}
});
$this->info('DONE');
}
And my docker-compose
version: "3.7"
services:
app:
build:
args:
user: www
uid: 1000
context: ./
dockerfile: Dockerfile
image: laravel-image
container_name: app
restart: unless-stopped
working_dir: /var/www/
privileged: true
volumes:
- ./:/var/www
networks:
- app
zookeeper:
image: confluentinc/cp-zookeeper
hostname: zookeeper
extra_hosts:
- "moby:127.0.0.1"
ports:
- "12181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 12181
ZOOKEEPER_TICK_TIME: 2000
networks:
- app
broker:
image: confluentinc/cp-kafka
hostname: broker
extra_hosts:
- "moby:127.0.0.1"
depends_on:
- zookeeper
ports:
- '9092:9092'
volumes:
- ./docker/kafka-logs:/tmp/kafka-logs
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:12181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:19092,HOST://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:19092,HOST://localhost:9092
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
networks:
- app
broker2:
image: confluentinc/cp-kafka
hostname: broker
extra_hosts:
- "moby:127.0.0.1"
depends_on:
- zookeeper
ports:
- '29092:29092'
volumes:
- ./docker/kafka-logs:/tmp/kafka-logs
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:12181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:39092,HOST://0.0.0.0:29092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker2:39092,HOST://localhost:29092
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
networks:
- app
nginx:
image: nginx:alpine
restart: unless-stopped
container_name: nginx
ports:
- 8000:80
volumes:
- ./:/var/www
- ./docker-compose/nginx:/etc/nginx/conf.d/
networks:
- app
mongo:
image: mongo
hostname: mongo
container_name: mongo
ports:
- "27029:27017"
volumes:
- mongo_data:/data/db
restart: unless-stopped
networks:
- app
networks:
app:
driver: bridge
volumes:
mongo_data:
driver: local
Your compose file has two Kafka containers with hostname: broker
. Your logs indicate it's failing to connect to at least one of them, probably it's confused which is correct one.
You do not really need two brokers, so I suggest simplifying your setup. Especially, given that your replication factor variables are all set to 1. Also, brokers only use variables starting with KAFKA_
(not KAFKA_CFG_
, though; this isn't use by Confluent images at all), so unclear why you've added CONNECT_
ones there.
Then, your PHP service should connect to broker:19092
as KAFKA_BROKERS variable, not 29092 or 39092.
The above should solve your connection attempts, but increasing consumption speed is limited by partitions, not number of brokers. You've not mentioned how many partitions you have.