Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Amna Mazhar
    @amnamazhar
    @steveb_gitlab Thanks for considering it!
    Paweł Niedzielski
    @steveb_gitlab
    @amnamazhar I've taken a look at how it should work, but it will take some time, especially since I'm not really a specialist in C programming ;)
    Amna Mazhar
    @amnamazhar
    @steveb_gitlab thank you for looking into that! Really appreciate it! If you find anything useful let me know!
    I can use kafka proxy rest api to get that information but the client dont want that so probably make my own script for that or checkout the source code for librfkafka to enhance it myself
    Paweł Niedzielski
    @steveb_gitlab
    @amnamazhar librdkafka has all the capabilities needed. It's only code in phprdkafka that is missing. There are examples I could point you to in librdkafka if you want to :) maybe you can give it a shot yourself :)
    In phprdkafka, queues functions are already present in queue.c
    Amna Mazhar
    @amnamazhar

    @amnamazhar librdkafka has all the capabilities needed. It's only code in phprdkafka that is missing. There are examples I could point you to in librdkafka if you want to :) maybe you can give it a shot yourself :)

    @steveb_gitlab I will checkout the test sample you mentioned. if you can point me to any other example that would be great!

    Brendan
    @balexander
    I have a high-level consumer that just prints out the id of some test events I produced. I'm running that script twice (i.e., one consumer running in one terminal window and the other in a separate window) but only one consumes events at a time. Both consumers are in the same consumer group, and I have verified that the topic I'm consuming has 2 partitions, both of which have data. Am I wrong in thinking that both should be processing events concurrently? I haven't tried with a low level consumer yet. All configurations options are default except for the group id and some timeouts.
    Paweł Niedzielski
    @steveb_gitlab
    @balexander both should be consuming concurrently, but only after Kafka rebalances consumers
    first consumer that starts is assigned both partitions
    and only after consumer rebalancing (which should occur after a couple seconds usually) second consumer will be assigned the other partition and will start consuming
    until it happens, it will wait for assignments
    Brendan
    @balexander
    Yeah thats the behavior I was expecting. Weird.
    Brendan
    @balexander
    FYI the consumer problem I was having resolved itself. Not sure if it was some weird intermittent issue with my kafka set up, but creating a new topic with 2 partitions and running my consumers against that gave expected results
    aand18
    @aand18
    Hi! Is php-rdkafka comatible with PHP 7.2 and 7.3? I can't find DLL builds for the newer PHP version. Thanks
    Paweł Niedzielski
    @steveb_gitlab
    @aand18 while phprdkafka is compatible with PHP 7.2/7.3 (no major internal API changes in PHP that would cause issues) windows builds might not be created and published automatically.
    aand18
    @aand18
    @steveb_gitlab any clue as to why only v7.1 builds are created? Here are only empty 'logs' of v7.2 and v7.3 https://windows.php.net/downloads/pecl/releases/rdkafka/3.1.0/logs/
    Paweł Niedzielski
    @steveb_gitlab
    @aand18 probably the person that uploaded the builds no longer did so :) It might have not been automatic
    aand18
    @aand18
    @steveb_gitlab do you know of a guide for compiling your own DLLs? thanks
    Paweł Niedzielski
    @steveb_gitlab
    I don't have a windows machine at hand to compile, so I can't do that for you, and my net connection is bad (on mobile in a black hole :) )
    I did compile phprdkafka and other extension on linux, but never did on windows
    I'll try to dig a little
    @aand18 try it and see if it works - as I said, I can't check it instead of you since I don't have a win machine atm
    aand18
    @aand18
    @steveb_gitlab thanks, that's really kind of you. I'll take a shot at it. I hope it doesn't transform in a week of work, these kind of activities have a bad habit of doing so ...
    Paweł Niedzielski
    @steveb_gitlab
    yep, they do :D
    I don't really know if there was an automated push for windows builds
    hopefully Kafka will match your use-case and you'll stick with it ;)
    aand18
    @aand18
    I'm using for a while version 3.0.5, only for a consuming though. There's a bug that, when commanding the BAT script to close, the host console exits but the PHP process stays. So it's difficult to control through scheduled tasks. I was hoping version 3.1.0 fixes that.
    Paweł Niedzielski
    @steveb_gitlab
    @aand18 I'm not sure it's related to phprdkafka extension
    I mean, there is one issue that might cause librdkafka threads to stick around
    and that's if signals are not propagated properly
    It might be what you're looking for
    aand18
    @aand18
    @steveb_gitlab I don't think it works on Windows - signals in general I mean
    Paweł Niedzielski
    @steveb_gitlab
    I'm not sure as well
    technically they should work
    You're sure that the process is still present?
    Not just that Kafka does not reassign partitions to a new consumer?
    Paweł Niedzielski
    @steveb_gitlab
    because it's possible that Kafka broker does not realize that a consumer is dead for a while, which would prevent consumption in new process
    aand18
    @aand18
    @steveb_gitlab the PHP process stays alive, even though the parent console process gets closed... pretty strange. It happens even when I press Ctrl+C, I have to do it twice. It only happens when rdkafka is connected to the broker.
    Vitor Ruschoni
    @ruschoni02

    Hi guys. Good afternoon!

    Can anyone help me with this errors:
    {"message":"Local: No offset stored","context":{"exception":{"class":"RdKafka\Exception","message":"Local: No offset stored","code":-168,"file":"/application/vendor/arquivei/php-kafka-consumer/src/Consumer.php:128"}},"level":400,"level_name":"ERROR","channel":"local","datetime":"2019-06-24T14:51:16-03:00","extra":[]}

    I'm using this config to instanciated my consumer

    $topicConf = new \RdKafka\TopicConf();
    $topicConf->set('auto.offset.reset', 'smallest');

    $conf = new \RdKafka\Conf();
    $conf->set('enable.auto.commit', 'false');
    $conf->set('compression.codec', 'gzip');
    $conf->set('max.poll.interval.ms', '86400000');
    $conf->set('group.id', $this->config->getGroupId());
    $conf->set('bootstrap.servers', $this->config->getBroker());
    $conf->set('security.protocol', $this->config->getSecurityProtocol());
    $conf->setDefaultTopicConf($topicConf);

    if ($this->config->isPlainText()) {
    $conf->set('sasl.username', $this->config->getSasl()->getUsername());
    $conf->set('sasl.password', $this->config->getSasl()->getPassword());
    $conf->set('sasl.mechanisms', $this->config->getSasl()->getMechanisms());
    }

    Paweł Niedzielski
    @steveb_gitlab
    $topicConf = new \RdKafka\TopicConf();
    $topicConf->set('auto.offset.reset', 'smallest');
    
    $conf = new \RdKafka\Conf();
    $conf->set('enable.auto.commit', 'false');
    $conf->set('compression.codec', 'gzip');
    $conf->set('max.poll.interval.ms', '86400000');
    $conf->set('group.id', $this->config->getGroupId());
    $conf->set('bootstrap.servers', $this->config->getBroker());
    $conf->set('security.protocol', $this->config->getSecurityProtocol());
    $conf->setDefaultTopicConf($topicConf);
    
    if ($this->config->isPlainText()) {
        $conf->set('sasl.username', $this->config->getSasl()->getUsername());
        $conf->set('sasl.password', $this->config->getSasl()->getPassword());
        $conf->set('sasl.mechanisms', $this->config->getSasl()->getMechanisms());
    }
    ?
    {"message":"Local: No offset stored","context":{"exception":{"class":"RdKafka\Exception","message":"Local: No offset stored","code":-168,"file":"/application/vendor/arquivei/php-kafka-consumer/src/Consumer.php:128"}},"level":400,"level_name":"ERROR","channel":"local","datetime":"2019-06-24T14:51:16-03:00","extra":[]}
    This message was deleted
    Paweł Niedzielski
    @steveb_gitlab

    Initial search pointed to a couple of issues in python library (which uses librdkafka, just like phprdkafka does).

    https://www.google.com/search?q=Local%3A+No+offset+stored+rdkafka&oq=Local%3A+No+offset+stored+rdkafka

    Could you please provide an excerpt from your code near commit call, check if the issue originates around that location @ruschoni02 ?

    Also please check which versions of phprdkafka & librdkafka you're using. From phpinfo() or php -i | grep -C 1 kafka
    Антон Колинько
    @AntKolinko_twitter

    Hey guys, would like to get some recommendation from you. I am new to kafka, so don't yet know some best practices. We have a producers written on different languages and it's already running in prod. From php side I need to write a consumers that will run long term via supervisor in GCP.

    What I want to ask is what are the best settings for running kafka php consumer in long term? What settings you can recommend to change and to what value?

    Currently I ended up with the following:
    General config:

    $conf = new \RdKafka\Conf();
    $conf->set('compression.codec', 'gzip');
    $conf->set('max.poll.interval.ms', '86400000'); <- is that one ok?

    Topic config:

    $topicConf = new \RdKafka\TopicConf();
    $topicConf->set('auto.commit.enable', 'false');
    $topicConf->set('auto.commit.interval.ms', 5000); <- what value is best here?
    $topicConf->set('offset.store.method', 'broker'); <- is it good to use or "file" should be used? 
    $topicConf->set('auto.offset.reset', 'latest');

    This is how I consume messages (simplified):

    $topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED);
    
    while (true) {
        $message = $topic->consume($partition, 1000); <- what time it's better to set here?
    
        $messageErr = is_object($message) ? $message->err : RD_KAFKA_RESP_ERR_UNKNOWN;
        switch ($messageErr) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                echo $message->payload . PHP_EOL;
                break;
            ...
        }
    
        // After successfully consuming the message, schedule offset store.
        // Offset is actually committed after 'auto.commit.interval.ms' milliseconds.
        $topic->offsetStore($message->partition, $message->offset);
    }

    I highlight the important settings with arrow <-. Can someone please advise?