These are chat archives for arnaud-lb/php-rdkafka

10th
Sep 2018
sonalithakkar
@sonalithakkar
Sep 10 2018 04:46
Please help me for different group id to consumer
karavzeka
@karavzeka
Sep 10 2018 12:51
It's very hard to read your code, part of it relates to DB.
I wrote example from scratch.
<?php
declare(strict_types=1);

$brokerList = 'brokerAddr:port';
$topicName = 'any_topic';

$conf = new \RdKafka\Conf();
$producer = new \RdKafka\Producer($conf);
$producer->addBrokers($brokerList);

$topic = $producer->newTopic($topicName);

$counter = 0;
for ($i = 0; $i < 50; $i++) {
    $counter++;

    $msg = 'msg #' . $counter;
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, $msg);
    $producer->poll(0);

    echo "Sent: {$msg}\n";
}

// Consumer 1
$conf = new RdKafka\Conf();
$conf->set('group.id', 'myConsumerGroup_1');
$conf->set('metadata.broker.list', $brokerList);

$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');

$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe([$topicName]);

echo "\nI'm consumer 1\n";
for ($i = 0; $i < 50; $i++) {
    $message = $consumer->consume(120*1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            echo $message->payload . "\n";
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

// Consumer 2
$conf = new RdKafka\Conf();
$conf->set('group.id', 'myConsumerGroup_2');
$conf->set('metadata.broker.list', $brokerList);

$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');

$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe([$topicName]);

echo "\nI'm consumer 2\n";
for ($i = 0; $i < 50; $i++) {
    $message = $consumer->consume(120*1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            echo $message->payload . "\n";
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}
If you run it you will see that second consumer read the same 50 messages as first consumer.
karavzeka
@karavzeka
Sep 10 2018 13:06
This is an alternative example which you can run in different consoles at the same time.
https://gist.github.com/karavzeka/0fe6e7214ef47d11989ea8dc151b2272
sonalithakkar
@sonalithakkar
Sep 10 2018 14:01
Thanks @karavzeka