by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    aand18
    @aand18
    @steveb_gitlab I don't think it works on Windows - signals in general I mean
    Paweł Niedzielski
    @steveb_gitlab
    I'm not sure as well
    technically they should work
    You're sure that the process is still present?
    Not just that Kafka does not reassign partitions to a new consumer?
    Paweł Niedzielski
    @steveb_gitlab
    because it's possible that Kafka broker does not realize that a consumer is dead for a while, which would prevent consumption in new process
    aand18
    @aand18
    @steveb_gitlab the PHP process stays alive, even though the parent console process gets closed... pretty strange. It happens even when I press Ctrl+C, I have to do it twice. It only happens when rdkafka is connected to the broker.
    Vitor Ruschoni
    @ruschoni02

    Hi guys. Good afternoon!

    Can anyone help me with this errors:
    {"message":"Local: No offset stored","context":{"exception":{"class":"RdKafka\Exception","message":"Local: No offset stored","code":-168,"file":"/application/vendor/arquivei/php-kafka-consumer/src/Consumer.php:128"}},"level":400,"level_name":"ERROR","channel":"local","datetime":"2019-06-24T14:51:16-03:00","extra":[]}

    I'm using this config to instanciated my consumer

    $topicConf = new \RdKafka\TopicConf();
    $topicConf->set('auto.offset.reset', 'smallest');

    $conf = new \RdKafka\Conf();
    $conf->set('enable.auto.commit', 'false');
    $conf->set('compression.codec', 'gzip');
    $conf->set('max.poll.interval.ms', '86400000');
    $conf->set('group.id', $this->config->getGroupId());
    $conf->set('bootstrap.servers', $this->config->getBroker());
    $conf->set('security.protocol', $this->config->getSecurityProtocol());
    $conf->setDefaultTopicConf($topicConf);

    if ($this->config->isPlainText()) {
    $conf->set('sasl.username', $this->config->getSasl()->getUsername());
    $conf->set('sasl.password', $this->config->getSasl()->getPassword());
    $conf->set('sasl.mechanisms', $this->config->getSasl()->getMechanisms());
    }

    Paweł Niedzielski
    @steveb_gitlab
    $topicConf = new \RdKafka\TopicConf();
    $topicConf->set('auto.offset.reset', 'smallest');
    
    $conf = new \RdKafka\Conf();
    $conf->set('enable.auto.commit', 'false');
    $conf->set('compression.codec', 'gzip');
    $conf->set('max.poll.interval.ms', '86400000');
    $conf->set('group.id', $this->config->getGroupId());
    $conf->set('bootstrap.servers', $this->config->getBroker());
    $conf->set('security.protocol', $this->config->getSecurityProtocol());
    $conf->setDefaultTopicConf($topicConf);
    
    if ($this->config->isPlainText()) {
        $conf->set('sasl.username', $this->config->getSasl()->getUsername());
        $conf->set('sasl.password', $this->config->getSasl()->getPassword());
        $conf->set('sasl.mechanisms', $this->config->getSasl()->getMechanisms());
    }
    ?
    {"message":"Local: No offset stored","context":{"exception":{"class":"RdKafka\Exception","message":"Local: No offset stored","code":-168,"file":"/application/vendor/arquivei/php-kafka-consumer/src/Consumer.php:128"}},"level":400,"level_name":"ERROR","channel":"local","datetime":"2019-06-24T14:51:16-03:00","extra":[]}
    This message was deleted
    Paweł Niedzielski
    @steveb_gitlab

    Initial search pointed to a couple of issues in python library (which uses librdkafka, just like phprdkafka does).

    https://www.google.com/search?q=Local%3A+No+offset+stored+rdkafka&oq=Local%3A+No+offset+stored+rdkafka

    Could you please provide an excerpt from your code near commit call, check if the issue originates around that location @ruschoni02 ?

    Also please check which versions of phprdkafka & librdkafka you're using. From phpinfo() or php -i | grep -C 1 kafka
    Антон Колинько
    @AntKolinko_twitter

    Hey guys, would like to get some recommendation from you. I am new to kafka, so don't yet know some best practices. We have a producers written on different languages and it's already running in prod. From php side I need to write a consumers that will run long term via supervisor in GCP.

    What I want to ask is what are the best settings for running kafka php consumer in long term? What settings you can recommend to change and to what value?

    Currently I ended up with the following:
    General config:

    $conf = new \RdKafka\Conf();
    $conf->set('compression.codec', 'gzip');
    $conf->set('max.poll.interval.ms', '86400000'); <- is that one ok?

    Topic config:

    $topicConf = new \RdKafka\TopicConf();
    $topicConf->set('auto.commit.enable', 'false');
    $topicConf->set('auto.commit.interval.ms', 5000); <- what value is best here?
    $topicConf->set('offset.store.method', 'broker'); <- is it good to use or "file" should be used? 
    $topicConf->set('auto.offset.reset', 'latest');

    This is how I consume messages (simplified):

    $topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED);
    
    while (true) {
        $message = $topic->consume($partition, 1000); <- what time it's better to set here?
    
        $messageErr = is_object($message) ? $message->err : RD_KAFKA_RESP_ERR_UNKNOWN;
        switch ($messageErr) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                echo $message->payload . PHP_EOL;
                break;
            ...
        }
    
        // After successfully consuming the message, schedule offset store.
        // Offset is actually committed after 'auto.commit.interval.ms' milliseconds.
        $topic->offsetStore($message->partition, $message->offset);
    }

    I highlight the important settings with arrow <-. Can someone please advise?

    Paweł Niedzielski
    @steveb_gitlab

    @AntKolinko_twitter
    auto.commit.interval.ms - this value decides how often client will send a request (small) to commit his offset. Your setting is ok, since afaik by default phprdkafka does that in an async manner, not blocking. You might want to disable (EDIT: Which you did, see below) automatic commit if your throughput is extremely high (like 10k messages per sec) and you need to squeeze those additional ms's)
    auto.commit.enable - actually, disregard the above. You've disabled it.
    offset.store.method - keep it on broker. It's default for newer Kafka servers. The difference is that brokers store offsets in a dedicated topic (and sync) instead of keeping it in a file.
    max.poll.interval.ms should not be set that high afaik. Keeping it on default 300000 should be ok. It shouldn't have too much of an impact on your application, unless there is more than 30s interval between your consume calls (Kafka will assume client is dead).

    I'm not 100% sure about max.poll.interval.ms, since I'm a bit confused about it. Especially since librdkafka introduced some changes in 1.0.0 regarding it afaik.

    Антон Колинько
    @AntKolinko_twitter

    @steveb_gitlab Thank you very much for response! Did I get it right that since auto.commit.enable = false I can remove this setting auto.commit.interval.ms at all as commit happens manually?

    About this one max.poll.interval.ms didn't understand how that works. Does it mean that if producers send 1 message per 10 minute and I have value set to 300000ms (which is 6 minutes) then my consumer will be considered as dead?

    Paweł Niedzielski
    @steveb_gitlab

    max.poll.interval.ms regulates how often your producers/consumers are expected to "poll". Poll is done when consumer calls consume or producer calls poll (in phprdkafka this equals waiting for message to actually be posted to kafka). If your process does not communicate in time it is considered "dead" by Kafka.

    This usually means that producer get disconnected, and consumer groups are rebalanced.

    If I remember correctly, producer will automatically reconnect if needed, but I'm not 100% sure
    and since your commiting manually you can safely omit auto.commit.interval.ms, as it should do nothing (unless there is some bug :P)
    @AntKolinko_twitter
    see https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md (since most options are passed directly to the underlying librdkafka library)

    Maximum allowed time between calls to consume messages (e.g., rd_kafka_consumer_poll()) for high-level consumers. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. Warning: Offset commits may be not possible at this point. Note: It is recommended to set

    Actually max.poll.interval.ms should only affect consumers

    Антон Колинько
    @AntKolinko_twitter

    ok, thanks. Will remove max.poll.interval.ms, so default time is used and remove this one auto.commit.interval.ms.

    What do you think about $message = $topic->consume($partition, 1000); <- what time it's better to set here?

    Paweł Niedzielski
    @steveb_gitlab
    @AntKolinko_twitter doesn't matter too much. This timeout is the maximum amount of time that this one particular call can wait. You should be able to safely increase it to like 10 seconds. It's use-case is for application that run with an event loop, so they can transfer control back to the loop. Since you're in a while(true) loop it doesn't matter that much.
    You're never really exiting the loop.
    abdo1
    @abdo1
    How to run kafka on different machines ?
    When i stop any one server then another one is able to send the details .
    Paweł Niedzielski
    @steveb_gitlab
    @abdo1 I don't really understand the question. Are you trying to run multiple Phprdkafka clients connected to the same topic?
    Wesley Willians
    @wesleywillians
    Hello Guys, I'm working with kafka and PHP and I need to make a sync request. I'm trying to use Request/Reply pattern, but I'm facing a problem. My consumer always have a delay to be connected so that my producer already sent the msg and it cant be read unless I read all data from the topic.. any insight? The idea is a kind of PubSub.
    btw, how can I get a current offset of a topic?
    Kamil Hurajt
    @kamilhurajt
    Hi Guys, I'm first time trying to use kafka as FIFO queue, as in company it's implemented. But I'm facing issue that by using High-Level consuming the service is waiting for mesage for 40 seconds!!! What I'm doing wrong ?
    Amy Luo
    @amyluo
    Is there a way I can synchronously produce messages in php-rdkafka library?
    Paweł Niedzielski
    @steveb_gitlab
    @amyluo just flush it immediately :D
    Aleksandar Aramov
    @alex687
    Hello guys, RdKafka\Conf::setDefaultTopicConf This function has been DEPRECATED. Where I should put Topic config? Inside RdKafka\Conf::set, should I use RdKafka\TopicConf class ? Thanks in advance.
    Paweł Niedzielski
    @steveb_gitlab
    @alex687 practically all configuration settings that have been available in TopicConf can be set in Conf object directly. Those that cannot can still be passed as TopicConf object to those classes that can receive it as argument (which is in practice all places that previously used it)
    Aleksandar Aramov
    @alex687
    @steveb_gitlab Thanks, I was wondering, because when I dump config ( config->dump()) it topic configs were not there.
    Paweł Niedzielski
    @steveb_gitlab
    @alex687 actually, there is an issue for something like that
    I'm not sure it applies
    Nick
    @nick-zh
    late to the party, but yeah @steveb_gitlab is right i opened this issue because of this :)
    Zakhar Morozov
    @fortael
    Hello. Is there any way to use truststore with SASL_SSL? I'm getting the error Java TrustStores are not supported, use 'ssl.ca.location' and a certificate file instead. when i use ssl.truststore.location
    Paweł Niedzielski
    @steveb_gitlab
    @fortael according to this comment from 2017 it's not https://github.com/edenhill/librdkafka/issues/1412#issuecomment-327403461
    and it's not really an issue with phprdkafka (or librdkafka actually)
    Zakhar Morozov
    @fortael
    I'm not quite understood you. Our java team use it in that configuration. So i expect php-extension able to do the same. I tried to use .pem file but the i got Java JAAS configuration is not supported too.
    Paweł Niedzielski
    @steveb_gitlab
    @fortael Java client is not the same as librdkafka (C library for Kafka client for all other languages - node, python, or in this case php)
    librdkafka itself does not support Java Truststores as the exception is telling you
    Zakhar Morozov
    @fortael
    Got it
    Paweł Niedzielski
    @steveb_gitlab
    Judging from the fact that the librdkafka community didn't implement it it's either proprietary and cannot be replicated to librdkafka, or it's complicated for some other reason
    but I'd say the former
    Paweł Niedzielski
    @steveb_gitlab
    btw @fortael