by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Paweł Niedzielski
    @steveb_gitlab
    In phprdkafka, queues functions are already present in queue.c
    Amna Mazhar
    @amnamazhar

    @amnamazhar librdkafka has all the capabilities needed. It's only code in phprdkafka that is missing. There are examples I could point you to in librdkafka if you want to :) maybe you can give it a shot yourself :)

    @steveb_gitlab I will checkout the test sample you mentioned. if you can point me to any other example that would be great!

    Brendan
    @balexander
    I have a high-level consumer that just prints out the id of some test events I produced. I'm running that script twice (i.e., one consumer running in one terminal window and the other in a separate window) but only one consumes events at a time. Both consumers are in the same consumer group, and I have verified that the topic I'm consuming has 2 partitions, both of which have data. Am I wrong in thinking that both should be processing events concurrently? I haven't tried with a low level consumer yet. All configurations options are default except for the group id and some timeouts.
    Paweł Niedzielski
    @steveb_gitlab
    @balexander both should be consuming concurrently, but only after Kafka rebalances consumers
    first consumer that starts is assigned both partitions
    and only after consumer rebalancing (which should occur after a couple seconds usually) second consumer will be assigned the other partition and will start consuming
    until it happens, it will wait for assignments
    Brendan
    @balexander
    Yeah thats the behavior I was expecting. Weird.
    Brendan
    @balexander
    FYI the consumer problem I was having resolved itself. Not sure if it was some weird intermittent issue with my kafka set up, but creating a new topic with 2 partitions and running my consumers against that gave expected results
    aand18
    @aand18
    Hi! Is php-rdkafka comatible with PHP 7.2 and 7.3? I can't find DLL builds for the newer PHP version. Thanks
    Paweł Niedzielski
    @steveb_gitlab
    @aand18 while phprdkafka is compatible with PHP 7.2/7.3 (no major internal API changes in PHP that would cause issues) windows builds might not be created and published automatically.
    aand18
    @aand18
    @steveb_gitlab any clue as to why only v7.1 builds are created? Here are only empty 'logs' of v7.2 and v7.3 https://windows.php.net/downloads/pecl/releases/rdkafka/3.1.0/logs/
    Paweł Niedzielski
    @steveb_gitlab
    @aand18 probably the person that uploaded the builds no longer did so :) It might have not been automatic
    aand18
    @aand18
    @steveb_gitlab do you know of a guide for compiling your own DLLs? thanks
    Paweł Niedzielski
    @steveb_gitlab
    I don't have a windows machine at hand to compile, so I can't do that for you, and my net connection is bad (on mobile in a black hole :) )
    I did compile phprdkafka and other extension on linux, but never did on windows
    I'll try to dig a little
    @aand18 try it and see if it works - as I said, I can't check it instead of you since I don't have a win machine atm
    aand18
    @aand18
    @steveb_gitlab thanks, that's really kind of you. I'll take a shot at it. I hope it doesn't transform in a week of work, these kind of activities have a bad habit of doing so ...
    Paweł Niedzielski
    @steveb_gitlab
    yep, they do :D
    I don't really know if there was an automated push for windows builds
    hopefully Kafka will match your use-case and you'll stick with it ;)
    aand18
    @aand18
    I'm using for a while version 3.0.5, only for a consuming though. There's a bug that, when commanding the BAT script to close, the host console exits but the PHP process stays. So it's difficult to control through scheduled tasks. I was hoping version 3.1.0 fixes that.
    Paweł Niedzielski
    @steveb_gitlab
    @aand18 I'm not sure it's related to phprdkafka extension
    I mean, there is one issue that might cause librdkafka threads to stick around
    and that's if signals are not propagated properly
    It might be what you're looking for
    aand18
    @aand18
    @steveb_gitlab I don't think it works on Windows - signals in general I mean
    Paweł Niedzielski
    @steveb_gitlab
    I'm not sure as well
    technically they should work
    You're sure that the process is still present?
    Not just that Kafka does not reassign partitions to a new consumer?
    Paweł Niedzielski
    @steveb_gitlab
    because it's possible that Kafka broker does not realize that a consumer is dead for a while, which would prevent consumption in new process
    aand18
    @aand18
    @steveb_gitlab the PHP process stays alive, even though the parent console process gets closed... pretty strange. It happens even when I press Ctrl+C, I have to do it twice. It only happens when rdkafka is connected to the broker.
    Vitor Ruschoni
    @ruschoni02

    Hi guys. Good afternoon!

    Can anyone help me with this errors:
    {"message":"Local: No offset stored","context":{"exception":{"class":"RdKafka\Exception","message":"Local: No offset stored","code":-168,"file":"/application/vendor/arquivei/php-kafka-consumer/src/Consumer.php:128"}},"level":400,"level_name":"ERROR","channel":"local","datetime":"2019-06-24T14:51:16-03:00","extra":[]}

    I'm using this config to instanciated my consumer

    $topicConf = new \RdKafka\TopicConf();
    $topicConf->set('auto.offset.reset', 'smallest');

    $conf = new \RdKafka\Conf();
    $conf->set('enable.auto.commit', 'false');
    $conf->set('compression.codec', 'gzip');
    $conf->set('max.poll.interval.ms', '86400000');
    $conf->set('group.id', $this->config->getGroupId());
    $conf->set('bootstrap.servers', $this->config->getBroker());
    $conf->set('security.protocol', $this->config->getSecurityProtocol());
    $conf->setDefaultTopicConf($topicConf);

    if ($this->config->isPlainText()) {
    $conf->set('sasl.username', $this->config->getSasl()->getUsername());
    $conf->set('sasl.password', $this->config->getSasl()->getPassword());
    $conf->set('sasl.mechanisms', $this->config->getSasl()->getMechanisms());
    }

    Paweł Niedzielski
    @steveb_gitlab
    $topicConf = new \RdKafka\TopicConf();
    $topicConf->set('auto.offset.reset', 'smallest');
    
    $conf = new \RdKafka\Conf();
    $conf->set('enable.auto.commit', 'false');
    $conf->set('compression.codec', 'gzip');
    $conf->set('max.poll.interval.ms', '86400000');
    $conf->set('group.id', $this->config->getGroupId());
    $conf->set('bootstrap.servers', $this->config->getBroker());
    $conf->set('security.protocol', $this->config->getSecurityProtocol());
    $conf->setDefaultTopicConf($topicConf);
    
    if ($this->config->isPlainText()) {
        $conf->set('sasl.username', $this->config->getSasl()->getUsername());
        $conf->set('sasl.password', $this->config->getSasl()->getPassword());
        $conf->set('sasl.mechanisms', $this->config->getSasl()->getMechanisms());
    }
    ?
    {"message":"Local: No offset stored","context":{"exception":{"class":"RdKafka\Exception","message":"Local: No offset stored","code":-168,"file":"/application/vendor/arquivei/php-kafka-consumer/src/Consumer.php:128"}},"level":400,"level_name":"ERROR","channel":"local","datetime":"2019-06-24T14:51:16-03:00","extra":[]}
    This message was deleted
    Paweł Niedzielski
    @steveb_gitlab

    Initial search pointed to a couple of issues in python library (which uses librdkafka, just like phprdkafka does).

    https://www.google.com/search?q=Local%3A+No+offset+stored+rdkafka&oq=Local%3A+No+offset+stored+rdkafka

    Could you please provide an excerpt from your code near commit call, check if the issue originates around that location @ruschoni02 ?

    Also please check which versions of phprdkafka & librdkafka you're using. From phpinfo() or php -i | grep -C 1 kafka
    Антон Колинько
    @AntKolinko_twitter

    Hey guys, would like to get some recommendation from you. I am new to kafka, so don't yet know some best practices. We have a producers written on different languages and it's already running in prod. From php side I need to write a consumers that will run long term via supervisor in GCP.

    What I want to ask is what are the best settings for running kafka php consumer in long term? What settings you can recommend to change and to what value?

    Currently I ended up with the following:
    General config:

    $conf = new \RdKafka\Conf();
    $conf->set('compression.codec', 'gzip');
    $conf->set('max.poll.interval.ms', '86400000'); <- is that one ok?

    Topic config:

    $topicConf = new \RdKafka\TopicConf();
    $topicConf->set('auto.commit.enable', 'false');
    $topicConf->set('auto.commit.interval.ms', 5000); <- what value is best here?
    $topicConf->set('offset.store.method', 'broker'); <- is it good to use or "file" should be used? 
    $topicConf->set('auto.offset.reset', 'latest');

    This is how I consume messages (simplified):

    $topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED);
    
    while (true) {
        $message = $topic->consume($partition, 1000); <- what time it's better to set here?
    
        $messageErr = is_object($message) ? $message->err : RD_KAFKA_RESP_ERR_UNKNOWN;
        switch ($messageErr) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                echo $message->payload . PHP_EOL;
                break;
            ...
        }
    
        // After successfully consuming the message, schedule offset store.
        // Offset is actually committed after 'auto.commit.interval.ms' milliseconds.
        $topic->offsetStore($message->partition, $message->offset);
    }

    I highlight the important settings with arrow <-. Can someone please advise?

    Paweł Niedzielski
    @steveb_gitlab

    @AntKolinko_twitter
    auto.commit.interval.ms - this value decides how often client will send a request (small) to commit his offset. Your setting is ok, since afaik by default phprdkafka does that in an async manner, not blocking. You might want to disable (EDIT: Which you did, see below) automatic commit if your throughput is extremely high (like 10k messages per sec) and you need to squeeze those additional ms's)
    auto.commit.enable - actually, disregard the above. You've disabled it.
    offset.store.method - keep it on broker. It's default for newer Kafka servers. The difference is that brokers store offsets in a dedicated topic (and sync) instead of keeping it in a file.
    max.poll.interval.ms should not be set that high afaik. Keeping it on default 300000 should be ok. It shouldn't have too much of an impact on your application, unless there is more than 30s interval between your consume calls (Kafka will assume client is dead).

    I'm not 100% sure about max.poll.interval.ms, since I'm a bit confused about it. Especially since librdkafka introduced some changes in 1.0.0 regarding it afaik.

    Антон Колинько
    @AntKolinko_twitter

    @steveb_gitlab Thank you very much for response! Did I get it right that since auto.commit.enable = false I can remove this setting auto.commit.interval.ms at all as commit happens manually?

    About this one max.poll.interval.ms didn't understand how that works. Does it mean that if producers send 1 message per 10 minute and I have value set to 300000ms (which is 6 minutes) then my consumer will be considered as dead?

    Paweł Niedzielski
    @steveb_gitlab

    max.poll.interval.ms regulates how often your producers/consumers are expected to "poll". Poll is done when consumer calls consume or producer calls poll (in phprdkafka this equals waiting for message to actually be posted to kafka). If your process does not communicate in time it is considered "dead" by Kafka.

    This usually means that producer get disconnected, and consumer groups are rebalanced.

    If I remember correctly, producer will automatically reconnect if needed, but I'm not 100% sure
    and since your commiting manually you can safely omit auto.commit.interval.ms, as it should do nothing (unless there is some bug :P)