Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Thomas Ploch
    @tPl0ch

    @DavidGoodwin

    (while there are commit() methods, there obviously aren't rollback() like methods ..... Is there any point in me calling commitAsync() in the above? )

    The manual commits are there since you can disable the auto commit interval. These are not at all related to the transactional API.

    Aarongoo
    @aaron8573
    I have a question, how to update metadata?
    Alexey
    @bluntik
    Hi all. Do you have a plan for update in pecl to new release version in the near future?
    Benjamin Schulz
    @benschu
    Hi there. what time will php-rdkafka fully support librdkafka 0.11.4 and message headers?
    Bikash Sapkota
    @bikashsapkota
    hello how to check that the rdkafka extension is loaded or not?
    David Goodwin
    @DavidGoodwin
    @bikashsapkota php -m | grep rdkafka or check phpinfo() output or do "extension_loaded('rdkafka')" ?
    Sandip Mavani
    @sandipmavani
    how to distribute kafka event in multiple consumer?
    Paul Dragoonis
    @dragoonis
    @sandipmavani i believe the answer is Consumer Groups
    Sandip Mavani
    @sandipmavani
    @dragoonis In this all group listen event but if any of them is taken then second don't get it.
    how this possible
    Paul Dragoonis
    @dragoonis
    @sandipmavani you're right - first consumer gets the ACK and no other consumer gets it
    I recommend to ask on #kafka room on irc.freenode.net for better support
    Sandip Mavani
    @sandipmavani
    @dragoonis ok thanks.
    Ton Wittenberg
    @yctn
    what is the difference between high/low level consuming?
    Hamoon Mohammadian Pour
    @HaMo0n
    Hello. I have a problem to Enable compression.codec. I want to set compression.codec to gzip but I get error: Invalid value for configuration property "compression.codec", When I set this config to snappy or lz4 It work but I don't know why It can't work for gzip.
    jianzhiunique
    @jianzhiunique
    futex(0x27fddac, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 63, {1528294517, 150249000}, ffffffff) = -1 ETIMEDOUT (Connection timed out)
    anybody know this?
    karavzeka
    @karavzeka

    Hello everyone! Is there some guru here who can help me? I'll describe a logic of my app and I'd like get some advice.

    I have script which generates certain amount of child processes. Every child use php-rdkafka to consume messages from topic and process them. Child has certain life time, after which it die. Then parent script generates new child and so on.
    When child born, it call initChild() method, where php-rdkafka is initialized.

    // Child class
    
    protected function initChild(): void
    {
        $this->childBornTime = time();
    
        $brokerList = [];
        foreach ($this->brokerAddresses as $index => $brokerAddress) {
            $brokerList[] = $brokerAddress['host'] . ':' . $brokerAddress['port'];
        }
        if (empty($brokerList)) {
            throw new \ErrorException('No one broker is assigned');
        }
    
        $topicConf = new TopicConf();
        $topicConf->set('auto.offset.reset', 'smallest');
    
        $conf = new Conf();
        $conf->set('client.id', __CLASS__);
        $conf->set('group.id', 'group_' . $this->queueName);
        $conf->set('metadata.broker.list', implode(',', $brokerList));
        $conf->setDefaultTopicConf($topicConf);
    
        $this->kafkaConsumer = new KafkaConsumer($conf);
        $this->kafkaConsumer->subscribe([$this->queueName]);
    }

    Then in the while cycle I call method processMsg()

    protected function processMsg($msg): void
    {
        $msg = $this->kafkaConsumer->consume(self::WAIT_MSG_TIMEOUT_MS);
    
        if ($msg !== null) {
            switch ($msg->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    sleep(rand(1, 4));
                    echo '[' . date('H:i:s') . "][part {$msg->partition}] {$msg->payload} [key: '{$msg->key}' offset: {$msg->offset}]\n";
                    sleep(8);
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    $this->consoleLog('Got PARTITION_EOF');
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    $this->consoleLog('Timeout');
                    break;
                default:
                    throw new \ErrorException($msg->errstr(), $msg->err);
                    break;
            }
        } else {
            $this->consoleLog('Timeout');
        }
    }

    My problem. When child process die and new child process borns and connects to the kafka topic, it get timeouts for some period of time (30-50 seconds). I dont understand why. Can I do something to avoid timeauts after new connection?

    I created topic with 2 partitions. Then I run script which generates 2 child processes. This is log how they work.

    [15:15:25][26378] Process forked, child pid 26379
    [15:15:25][26378] Process forked, child pid 26384
    [15:15:27][26384] Timeout
    [15:15:27][26379] Timeout
    [15:15:30][part 0] Hello Kafka 2 [key: '' offset: 0]
    [15:15:31][part 1] Hello Kafka 1 [key: '' offset: 0]
    [15:15:41][part 0] Hello Kafka 5 [key: '' offset: 1]
    [15:15:42][part 1] Hello Kafka 3 [key: '' offset: 1]
    ...
    [15:17:04][part 0] Hello Kafka 19 [key: '' offset: 9]
    [15:17:04][part 1] Hello Kafka 22 [key: '' offset: 9]
    [15:17:13][part 1] Hello Kafka 23 [key: '' offset: 10]
    [15:17:15][part 0] Hello Kafka 20 [key: '' offset: 10]
    [15:17:23][part 1] Hello Kafka 24 [key: '' offset: 11]
    [15:17:24][part 0] Hello Kafka 21 [key: '' offset: 11]
    [15:17:31][26379] Child process die
    [15:17:31][26378] Process forked, child pid 26402
    [15:17:32][26384] Child process die
    [15:17:32][26378] Process forked, child pid 26407
    [15:17:33][26402] Timeout
    [15:17:34][26407] Timeout
    [15:17:35][26402] Timeout
    [15:17:36][26407] Timeout
    [15:17:37][26402] Timeout
    ...
    [15:17:56][26407] Timeout
    [15:17:57][26402] Timeout
    [15:17:58][26407] Timeout
    [15:17:59][26402] Timeout
    [15:18:00][26407] Timeout
    [15:18:02][part 0] Hello Kafka 26 [key: '' offset: 12]
    [15:18:02][part 1] Hello Kafka 25 [key: '' offset: 12]
    [15:18:12][part 0] Hello Kafka 27 [key: '' offset: 13]
    [15:18:14][part 1] Hello Kafka 29 [key: '' offset: 13]

    As you can see I got timeouts from 15:17:33 to 15:18:00.

    karavzeka
    @karavzeka
    I found decision. Perhaps it will help someone else. I changed High-level consumer to Low-level consumer and timeouts disappeared.
    This is an example of code
    protected function initChild(): void
    {
        $this->childBornTime = time();
    
        $brokerList = [];
        foreach ($this->brokerAddresses as $index => $brokerAddress) {
            $brokerList[] = $brokerAddress['host'] . ':' . $brokerAddress['port'];
        }
        if (empty($brokerList)) {
            throw new \ErrorException('No one broker is assigned');
        }
    
        $topicConf = new TopicConf();
        $topicConf->set('auto.offset.reset', 'smallest');
    
        $conf = new Conf();
        $conf->set('client.id', __CLASS__ . ':' . $this->partitionNumber);
        $conf->set('group.id', 'group_' . $this->queueName);
    
        $kafkaConsumer = new Consumer($conf);
        $kafkaConsumer->addBrokers(implode(',', $brokerList));
    
        $this->queue = $kafkaConsumer->newQueue();
    
        $topic = $kafkaConsumer->newTopic($this->queueName, $topicConf);
        $topic->consumeQueueStart($this->partitionNumber, RD_KAFKA_OFFSET_STORED, $this->queue);
    }
    alonegrowing
    @alonegrowing
    Hi,we encounter a problem, we use rdkafka consumer in a php program, we called a undefined function, but the program did't exit, and did't print any Fatal Error, did anyone has met before?
    PHP program use rdkafka consumer does not exit when encounter php fatal error,did anyone has met before?
    Mathieu Pipet
    @mpipet
    Hi @levin217, I encountered this issue, the only fix i've found so far is to triple check every cases of failures and handle properly every exceptions that could be thrown.
    On our side the issue was even worse, sometimes when a non catched exception occured, the consumer stay alive but just stopped consuming.
    alonegrowing
    @alonegrowing
    Hi @mpipet, we just encountered the second case you mentioned last reply: "a non catched exception occured, the consumer stay alive but just stopped consuming.", nomater whis the error level is, the consumer will never exit, and just stoped consuming without any exception be thrown. So what can we do if there is nothing exception was thrown, and the consumer just stoped?
    tambach
    @tambach

    Hi all !
    I want to send an array just with four fields, i am trying this way
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, $result[$i]);

    But consumer cant see it and i have the warning:
    PHP Warning: RdKafka\ProducerTopic::produce() expects parameter 3 to be string, array given in /var/www/html/new-tests/producer-new-test.php on line 28

    Is there any other way to do this?
    Or what am i doing wrong?
    Thank you in advance

    karavzeka
    @karavzeka
    $result[$i] contains array. It means that you have array of arrays.
    you may use it that way
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, serialize($result[$i]));
    and deserialize message in consumer
    tambach
    @tambach
    @karavzeka thank you so much, It works !
    I used unserialize() in consumer and everything is fine.
    Steven Lloyd
    @slloyd88
    Hi guys! New this group and am just starting to use rdkafka. So far I have found it pretty useful but I would love $topic->consumeSeek to be added and/or a way to get the topic count. In order to get the topic count I am having to do the following:
         $topic->consumeStart(0, -1);
         $message = $topic->consume(0, 120*10000);
         $max_offset=$message->offset;
         $topic->consumeStop(0);
         $topic->consumeStart(0, 0);
    karavzeka
    @karavzeka
    Actually you can't know count of messages in topic. Your code looks for the biggest offset. It doesn't say you how many elements in topic because old elements are deleted in some times (see log.retention.hours parameter in kafka configuration).
    karavzeka
    @karavzeka
    I have task to seek elements matched some certain pattern. I will have to walk throw all topic to find matched messages.
    sonalithakkar
    @sonalithakkar
    Hi all, I need to implement one producer and on topic with same message consumed by 2 consumer need code for same please help me ASAP
    karavzeka
    @karavzeka
    @sonalithakkar https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka.examples-producer.html - producer
    https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka.examples-high-level-consumer.html - consumer
    What way of consume do you suppose? You can consume in parallel (every message will be consumed twice) or you can consume consistently (first consumer retrieve one message, second consumer retrieve next, then first next etc.)
    sonalithakkar
    @sonalithakkar
    Consumer consume parallel. One message consumed by 2 consumer parallely
    karavzeka
    @karavzeka

    Ok. In consumer example you can see this code

    $conf->set('group.id', 'myConsumerGroup');

    If you set different group.id in your scripts they will work parallel.

    Mathieu Pipet
    @mpipet
    @sonalithakkar I strongly advise not to set anything in production if you do not know all kafka base concepts:
    https://kafka.apache.org/quickstart
    and / or
    https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying
    sonalithakkar
    @sonalithakkar
    @karavzeka thanks
    @mpipet thanks for your suggestion
    sonalithakkar
    @sonalithakkar
    Hi

    <?php
    echo "started----------------------<br> ";
    $conf = new \RdKafka\Conf();
    $conf->set('socket.timeout.ms', 50); // or socket.blocking.max.ms, depending on librdkafka version
    if (function_exists('pcntl_sigprocmask')) {
    pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
    $conf->set('internal.termination.signal', SIGIO);
    } else {
    $conf->set('queue.buffering.max.ms', 1);
    }

    /////////////Producer produces data

    $producer = new \RdKafka\Producer($conf);
    $producer->setLogLevel(LOG_DEBUG);
    $producer->addBrokers("127.0.0.2");
    $topic = $producer->newTopic("dbtest");
    for ($i = 0; $i < 10; $i++) {
    $data[$i] = array(
    'name'=>$i,
    'email'=>"sonali".$i
    );
    echo "produced $i----------------------/n";
    $payload = json_encode($data[$i]);
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, $payload);
    $producer->poll(0);
    }
    while ($producer->getOutQLen() > 0) {
    $producer->poll(0);
    }
    /////////////////////////Consumer consumes data

    $conf->set('group.id', 'myConsumerGroup');

    // Initial list of Kafka brokers
    $conf->set('metadata.broker.list', '127.0.0.2');

    $topicConf = new RdKafka\TopicConf();

    // Set where to start consuming messages when there is no initial offset in
    // offset store or the desired offset is out of range.
    // 'smallest': start from the beginning
    $topicConf->set('auto.offset.reset', 'smallest');

    // Set the configuration to use for subscribed/assigned topics
    $conf->setDefaultTopicConf($topicConf);

    //////////////////First Consumer
    $consumer1 = new RdKafka\KafkaConsumer($conf);

    // Subscribe to topic 'test'
    $consumer1->subscribe(['dbtest']);

    while (true) {
    $j= 0;
    $message = $consumer1->consume(120*1000);

    switch ($message->err) {
    case RD_KAFKA_RESP_ERR_NO_ERROR:
    $dataEx = json_decode($message->payload,true);
    var_dump($data);
    $sql = "INSERT INTO emp (name, email) VALUES ('".$dataEx['name']."','".$dataEx['email']."')";

        $servername = "localhost";
        $username = "a";
        $password = "a";
        $dbname = "test";
    
        // Create connection
        $conn = new mysqli($servername, $username, $password, $dbname);
        // Check connection
        if ($conn->connect_error) {
            die("Connection failed: " . $conn->connect_error);
        }
    
        if ($conn->query($sql) === TRUE) {
            echo "New record created successfully to datbase ".$dbname."/n";
        } else {
            echo "Error: " . $sql . "<br>" . $conn->error;
        }
        $conn->close();
    
        echo "produced $j----------------------<br> ";
        break;
    case RD_KAFKA_RESP_ERR__PARTITION_EOF:
        echo "No more messages; will wait for more\n";
        break;
    case RD_KAFKA_RESP_ERR__TIMED_OUT:
        echo "Timed out\n";
        break;
    default:
        throw new \Exception($message->errstr(), $message->err);
        break;
    }
    $j++;

    }

    echo "here";
    /////////////////Second Consumer

    $conf->set('group.id', 'myConsumerGroup1');

    // Initial list of Kafka brokers
    $conf->set('metadata.broker.list', '127.0.0.2');

    $topicConf = new RdKafka\TopicConf();

    // Set where to start consuming messages when there is no initial offset in
    // offset store or the desired offset is out of range.
    // 'smallest': start from the beginning
    $topicConf->set('auto.offset.reset', 'smallest');

    // Set the configuration to use for subscribed/assigned topics
    $conf->setDefaultTopicConf($topicConf);

    $consumer2 = new RdKafka\KafkaConsumer($conf);
    echo "sonali";
    // Subscribe to topic 'test'
    $consumer2->subscribe(['dbtest']);

    while (true) {

    $message2 = $consumer2->consume(1000);
    switch ($message2->err) {
    case RD_KAFKA_RESP_ERR_NO_ERROR:
    $dataEx = json_decode($message2->payload,true);
    var_dump($data);
    $sql = "INSERT INTO emp (name, email) VALUES ('".$dataEx['name']."','".$dataEx['email']."')";
    $servername1 = "localhost";
    $username1 = "a";
    $password1 = "a";
    $dbname1 = "test1";

        // Create connection
        $conn1 = new mysqli($servername1, $username1, $password1, $dbname1);
        // Ch
    this is my code with different groupid but still it no working
    first consumer consumes all data
    so second consumer not getting it
    please help
    me
    sonalithakkar
    @sonalithakkar
    $message2 = $consumer2->consume(120*1000);
    its by mistake $message2 = $consumer2->consume(1000);
    sonalithakkar
    @sonalithakkar
    Please help me for different group id to consumer
    karavzeka
    @karavzeka
    It's very hard to read your code, part of it relates to DB.
    I wrote example from scratch.
    <?php
    declare(strict_types=1);
    
    $brokerList = 'brokerAddr:port';
    $topicName = 'any_topic';
    
    $conf = new \RdKafka\Conf();
    $producer = new \RdKafka\Producer($conf);
    $producer->addBrokers($brokerList);
    
    $topic = $producer->newTopic($topicName);
    
    $counter = 0;
    for ($i = 0; $i < 50; $i++) {
        $counter++;
    
        $msg = 'msg #' . $counter;
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, $msg);
        $producer->poll(0);
    
        echo "Sent: {$msg}\n";
    }
    
    // Consumer 1
    $conf = new RdKafka\Conf();
    $conf->set('group.id', 'myConsumerGroup_1');
    $conf->set('metadata.broker.list', $brokerList);
    
    $topicConf = new RdKafka\TopicConf();
    $topicConf->set('auto.offset.reset', 'smallest');
    
    $conf->setDefaultTopicConf($topicConf);
    
    $consumer = new RdKafka\KafkaConsumer($conf);
    $consumer->subscribe([$topicName]);
    
    echo "\nI'm consumer 1\n";
    for ($i = 0; $i < 50; $i++) {
        $message = $consumer->consume(120*1000);
        switch ($message->err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                echo $message->payload . "\n";
                break;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                echo "No more messages; will wait for more\n";
                break;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                echo "Timed out\n";
                break;
            default:
                throw new \Exception($message->errstr(), $message->err);
                break;
        }
    }
    
    // Consumer 2
    $conf = new RdKafka\Conf();
    $conf->set('group.id', 'myConsumerGroup_2');
    $conf->set('metadata.broker.list', $brokerList);
    
    $topicConf = new RdKafka\TopicConf();
    $topicConf->set('auto.offset.reset', 'smallest');
    
    $conf->setDefaultTopicConf($topicConf);
    
    $consumer = new RdKafka\KafkaConsumer($conf);
    $consumer->subscribe([$topicName]);
    
    echo "\nI'm consumer 2\n";
    for ($i = 0; $i < 50; $i++) {
        $message = $consumer->consume(120*1000);
        switch ($message->err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                echo $message->payload . "\n";
                break;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                echo "No more messages; will wait for more\n";
                break;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                echo "Timed out\n";
                break;
            default:
                throw new \Exception($message->errstr(), $message->err);
                break;
        }
    }
    If you run it you will see that second consumer read the same 50 messages as first consumer.